Files
Onyva-Postling/src/orchestrator.py

1006 lines
39 KiB
Python

"""Main orchestrator for the LinkedIn workflow."""
from collections import Counter
from typing import Dict, Any, List, Optional, Callable
from uuid import UUID
from loguru import logger
from src.config import settings
from src.database import db, LinkedInProfile, LinkedInPost, Topic
from src.scraper import scraper
from src.agents import (
ProfileAnalyzerAgent,
TopicExtractorAgent,
ResearchAgent,
WriterAgent,
CriticAgent,
PostClassifierAgent,
PostTypeAnalyzerAgent,
)
from src.agents.grammar_checker import GrammarCheckAgent
from src.agents.style_validator import StyleValidator
from src.agents.readability_checker import ReadabilityChecker
from src.agents.quality_refiner import QualityRefinerAgent
from src.database.models import PostType
class WorkflowOrchestrator:
"""Orchestrates the entire LinkedIn post creation workflow."""
def __init__(self):
"""Initialize orchestrator with all agents."""
self.profile_analyzer = ProfileAnalyzerAgent()
self.topic_extractor = TopicExtractorAgent()
self.researcher = ResearchAgent()
self.writer = WriterAgent()
self.critic = CriticAgent()
self.post_classifier = PostClassifierAgent()
self.post_type_analyzer = PostTypeAnalyzerAgent()
# New quality check agents
self.grammar_checker = GrammarCheckAgent()
self.style_validator = StyleValidator()
self.readability_checker = ReadabilityChecker()
self.quality_refiner = QualityRefinerAgent()
self._all_agents = [
self.profile_analyzer, self.topic_extractor, self.researcher,
self.writer, self.critic, self.post_classifier, self.post_type_analyzer
]
logger.info("WorkflowOrchestrator initialized with quality check & refiner agents")
def _set_tracking(self, operation: str, user_id: Optional[str] = None,
company_id: Optional[str] = None):
"""Set tracking context on all agents."""
uid = str(user_id) if user_id else None
comp_id = str(company_id) if company_id else None
for agent in self._all_agents:
agent.set_tracking_context(operation=operation, user_id=uid, company_id=comp_id)
async def _resolve_tracking_ids(self, user_id: UUID) -> dict:
"""Resolve company_id from a user_id for tracking."""
try:
profile = await db.get_profile(user_id)
if profile:
return {
"user_id": str(user_id),
"company_id": str(profile.company_id) if profile.company_id else None
}
except Exception as e:
logger.debug(f"Could not resolve tracking IDs for user {user_id}: {e}")
return {"user_id": str(user_id), "company_id": None}
async def run_initial_setup(
self,
user_id: UUID,
linkedin_url: str,
profile_data: Dict[str, Any],
post_types_data: Optional[List[Dict[str, Any]]] = None
) -> None:
"""
Run initial setup for a user.
This includes:
1. Updating profile with linkedin_url and metadata
2. Creating post types (if provided)
3. Scraping LinkedIn posts (NO profile scraping)
4. Creating profile from profile_data
5. Analyzing profile
6. Extracting topics from existing posts
7. Classifying posts by type (if post types exist)
8. Analyzing post types (if enough posts)
Args:
user_id: User UUID
linkedin_url: LinkedIn profile URL
profile_data: Profile data (writing style notes, etc.)
post_types_data: Optional list of post type definitions
"""
logger.info(f"=== INITIAL SETUP for user {user_id} ===")
ids = await self._resolve_tracking_ids(user_id)
self._set_tracking("initial_setup", **ids)
# Step 1: Update profile with linkedin_url
total_steps = 7 if post_types_data else 5
logger.info(f"Step 1/{total_steps}: Updating profile")
await db.update_profile(user_id, {
"linkedin_url": linkedin_url,
"writing_style_notes": profile_data.get("writing_style_notes"),
"metadata": profile_data
})
logger.info(f"Profile updated for user: {user_id}")
# Step 2: Create post types if provided
created_post_types = []
if post_types_data:
logger.info(f"Step 2/{total_steps}: Creating {len(post_types_data)} post types")
for pt_data in post_types_data:
post_type = PostType(
user_id=user_id,
name=pt_data.get("name", "Unnamed"),
description=pt_data.get("description"),
identifying_hashtags=pt_data.get("identifying_hashtags", []),
identifying_keywords=pt_data.get("identifying_keywords", []),
semantic_properties=pt_data.get("semantic_properties", {})
)
created_post_types.append(post_type)
if created_post_types:
created_post_types = await db.create_post_types_bulk(created_post_types)
logger.info(f"Created {len(created_post_types)} post types")
# Step 3: Create LinkedIn profile from profile data (NO scraping)
step_num = 3 if post_types_data else 2
logger.info(f"Step {step_num}/{total_steps}: Creating LinkedIn profile from provided data")
profile = await db.get_profile(user_id)
linkedin_profile = LinkedInProfile(
user_id=user_id,
profile_data={
"persona": profile_data.get("persona"),
"form_of_address": profile_data.get("form_of_address"),
"style_guide": profile_data.get("style_guide"),
"linkedin_url": linkedin_url
},
name=profile.display_name or "",
headline=profile_data.get("persona", "")[:100] if profile_data.get("persona") else None
)
await db.save_linkedin_profile(linkedin_profile)
logger.info("LinkedIn profile saved")
# Step 4: Scrape ONLY posts using Apify
step_num = 4 if post_types_data else 3
logger.info(f"Step {step_num}/{total_steps}: Scraping LinkedIn posts")
try:
raw_posts = await scraper.scrape_posts(linkedin_url, limit=50)
parsed_posts = scraper.parse_posts_data(raw_posts)
linkedin_posts = []
for post_data in parsed_posts:
post = LinkedInPost(
user_id=user_id,
**post_data
)
linkedin_posts.append(post)
if linkedin_posts:
await db.save_linkedin_posts(linkedin_posts)
logger.info(f"Saved {len(linkedin_posts)} posts")
else:
logger.warning("No posts scraped")
linkedin_posts = []
except Exception as e:
logger.error(f"Failed to scrape posts: {e}")
linkedin_posts = []
# Step 5: Analyze profile (with manual data + scraped posts)
step_num = 5 if post_types_data else 4
logger.info(f"Step {step_num}/{total_steps}: Analyzing profile with AI")
try:
profile_analysis = await self.profile_analyzer.process(
profile=linkedin_profile,
posts=linkedin_posts,
customer_data=profile_data
)
# Save profile analysis
from src.database.models import ProfileAnalysis
analysis_record = ProfileAnalysis(
user_id=user_id,
writing_style=profile_analysis.get("writing_style", {}),
tone_analysis=profile_analysis.get("tone_analysis", {}),
topic_patterns=profile_analysis.get("topic_patterns", {}),
audience_insights=profile_analysis.get("audience_insights", {}),
full_analysis=profile_analysis
)
await db.save_profile_analysis(analysis_record)
logger.info("Profile analysis saved")
except Exception as e:
logger.error(f"Profile analysis failed: {e}", exc_info=True)
raise
# Step 6: Extract topics from posts
step_num = 6 if post_types_data else 5
logger.info(f"Step {step_num}/{total_steps}: Extracting topics from posts")
if linkedin_posts:
try:
topics = await self.topic_extractor.process(
posts=linkedin_posts,
user_id=user_id
)
if topics:
await db.save_topics(topics)
logger.info(f"Extracted and saved {len(topics)} topics")
except Exception as e:
logger.error(f"Topic extraction failed: {e}", exc_info=True)
else:
logger.info("No posts to extract topics from")
# Step 7 & 8: Classify and analyze post types (if post types exist)
if created_post_types and linkedin_posts:
# Step 7: Classify posts
logger.info(f"Step {total_steps - 1}/{total_steps}: Classifying posts by type")
try:
await self.classify_posts(user_id)
except Exception as e:
logger.error(f"Post classification failed: {e}", exc_info=True)
# Step 8: Analyze post types
logger.info(f"Step {total_steps}/{total_steps}: Analyzing post types")
try:
await self.analyze_post_types(user_id)
except Exception as e:
logger.error(f"Post type analysis failed: {e}", exc_info=True)
logger.info(f"Step {total_steps}/{total_steps}: Initial setup complete!")
async def classify_posts(self, user_id: UUID) -> int:
"""
Classify unclassified posts for a user.
Args:
user_id: User UUID
Returns:
Number of posts classified
"""
logger.info(f"=== CLASSIFYING POSTS for user {user_id} ===")
ids = await self._resolve_tracking_ids(user_id)
self._set_tracking("classify_posts", **ids)
# Get post types
post_types = await db.get_post_types(user_id)
if not post_types:
logger.info("No post types defined, skipping classification")
return 0
# Get unclassified posts
posts = await db.get_unclassified_posts(user_id)
if not posts:
logger.info("No unclassified posts found")
return 0
logger.info(f"Classifying {len(posts)} posts into {len(post_types)} types")
# Run classification
classifications = await self.post_classifier.process(posts, post_types)
if classifications:
# Bulk update classifications
await db.update_posts_classification_bulk(classifications)
logger.info(f"Classified {len(classifications)} posts")
return len(classifications)
return 0
async def analyze_post_types(self, user_id: UUID) -> Dict[str, Any]:
"""
Analyze all post types for a user.
Args:
user_id: User UUID
Returns:
Dictionary with analysis results per post type
"""
logger.info(f"=== ANALYZING POST TYPES for user {user_id} ===")
ids = await self._resolve_tracking_ids(user_id)
self._set_tracking("analyze_post_types", **ids)
# Get post types
post_types = await db.get_post_types(user_id)
if not post_types:
logger.info("No post types defined")
return {}
results = {}
for post_type in post_types:
# Get posts for this type
posts = await db.get_posts_by_type(user_id, post_type.id)
if len(posts) < self.post_type_analyzer.MIN_POSTS_FOR_ANALYSIS:
logger.info(f"Post type '{post_type.name}' has only {len(posts)} posts, skipping analysis")
results[str(post_type.id)] = {
"skipped": True,
"reason": f"Not enough posts ({len(posts)} < {self.post_type_analyzer.MIN_POSTS_FOR_ANALYSIS})"
}
continue
# Run analysis
logger.info(f"Analyzing post type '{post_type.name}' with {len(posts)} posts")
analysis = await self.post_type_analyzer.process(post_type, posts)
# Save analysis to database
if analysis.get("sufficient_data"):
await db.update_post_type_analysis(
post_type_id=post_type.id,
analysis=analysis,
analyzed_post_count=len(posts)
)
results[str(post_type.id)] = analysis
return results
async def research_new_topics(
self,
user_id: UUID,
progress_callback: Optional[Callable[[str, int, int], None]] = None,
post_type_id: Optional[UUID] = None
) -> List[Dict[str, Any]]:
"""
Research new content topics for a user.
Args:
user_id: User UUID
progress_callback: Optional callback(message, current_step, total_steps)
post_type_id: Optional post type to target research for
Returns:
List of suggested topics
"""
logger.info(f"=== RESEARCHING NEW TOPICS for user {user_id} ===")
ids = await self._resolve_tracking_ids(user_id)
self._set_tracking("research", **ids)
# Get post type context if specified
post_type = None
post_type_analysis = None
strategy_weight = 0.5 # Default strategy weight
if post_type_id:
post_type = await db.get_post_type(post_type_id)
if post_type:
post_type_analysis = post_type.analysis
strategy_weight = post_type.strategy_weight
logger.info(f"Targeting research for post type: {post_type.name} with strategy weight {strategy_weight:.1f}")
def report_progress(message: str, step: int, total: int = 4):
if progress_callback:
progress_callback(message, step, total)
# Step 1: Get profile analysis
report_progress("Lade Profil-Analyse...", 1)
profile_analysis = await db.get_profile_analysis(user_id)
if not profile_analysis:
raise ValueError("Profile analysis not found. Run initial setup first.")
# Step 1.5: Load company strategy if user belongs to a company
company_strategy = None
profile = await db.get_profile(user_id)
if profile and profile.company_id:
company = await db.get_company(profile.company_id)
if company and company.company_strategy:
company_strategy = company.company_strategy
logger.info(f"Loaded company strategy for research: {company.name}")
# Step 2: Get ALL existing topics (from multiple sources to avoid repetition)
report_progress("Lade existierende Topics...", 2)
existing_topics = set()
# From topics table
existing_topics_records = await db.get_topics(user_id)
for t in existing_topics_records:
existing_topics.add(t.title)
# From previous research results
all_research = await db.get_all_research(user_id)
for research in all_research:
if research.suggested_topics:
for topic in research.suggested_topics:
if topic.get("title"):
existing_topics.add(topic["title"])
# From generated posts
generated_posts = await db.get_generated_posts(user_id)
for post in generated_posts:
if post.topic_title:
existing_topics.add(post.topic_title)
existing_topics = list(existing_topics)
logger.info(f"Found {len(existing_topics)} existing topics to avoid")
# Get example posts to understand the person's actual content style
# If post_type_id is specified, only use posts of that type
if post_type_id:
linkedin_posts = await db.get_posts_by_type(user_id, post_type_id)
else:
linkedin_posts = await db.get_linkedin_posts(user_id)
example_post_texts = [
post.post_text for post in linkedin_posts
if post.post_text and len(post.post_text) > 100 # Only substantial posts
][:15] # Limit to 15 best examples
logger.info(f"Loaded {len(example_post_texts)} example posts for research context")
# Step 3: Run research
report_progress("AI recherchiert neue Topics...", 3)
logger.info("Running research with AI")
research_results = await self.researcher.process(
profile_analysis=profile_analysis.full_analysis,
existing_topics=existing_topics,
customer_data=profile.metadata,
example_posts=example_post_texts,
post_type=post_type,
post_type_analysis=post_type_analysis,
company_strategy=company_strategy,
strategy_weight=strategy_weight
)
# Step 4: Save research results
report_progress("Speichere Ergebnisse...", 4)
from src.database.models import ResearchResult
research_record = ResearchResult(
user_id=user_id,
query=f"New topics for {profile.display_name}" + (f" ({post_type.name})" if post_type else ""),
results={"raw_response": research_results["raw_response"]},
suggested_topics=research_results["suggested_topics"],
target_post_type_id=post_type_id
)
await db.save_research_result(research_record)
logger.info(f"Research completed with {len(research_results['suggested_topics'])} suggestions")
return research_results["suggested_topics"]
async def generate_hooks(
self,
user_id: UUID,
topic: Dict[str, Any],
user_thoughts: str = "",
post_type_id: Optional[UUID] = None
) -> List[Dict[str, str]]:
"""
Generate 4 hook options for a topic.
Args:
user_id: User UUID
topic: Topic dictionary
user_thoughts: User's personal thoughts about the topic
post_type_id: Optional post type for context
Returns:
List of {"hook": "...", "style": "..."} dictionaries
"""
logger.info(f"=== GENERATING HOOKS for topic: {topic.get('title')} ===")
ids = await self._resolve_tracking_ids(user_id)
self._set_tracking("generate_hooks", **ids)
# Get profile analysis for style matching
profile_analysis = await db.get_profile_analysis(user_id)
if not profile_analysis:
raise ValueError("Profile analysis not found. Run initial setup first.")
# Get post type context if specified
post_type = None
if post_type_id:
post_type = await db.get_post_type(post_type_id)
# Generate hooks via writer agent
hooks = await self.writer.generate_hooks(
topic=topic,
profile_analysis=profile_analysis.full_analysis,
user_thoughts=user_thoughts,
post_type=post_type
)
logger.info(f"Generated {len(hooks)} hooks")
return hooks
async def generate_improvement_suggestions(
self,
user_id: UUID,
post_content: str,
critic_feedback: Optional[Dict[str, Any]] = None
) -> List[Dict[str, str]]:
"""
Generate improvement suggestions for an existing post.
Args:
user_id: User UUID
post_content: The current post content
critic_feedback: Optional feedback from the critic
Returns:
List of {"label": "...", "action": "..."} dictionaries
"""
logger.info("=== GENERATING IMPROVEMENT SUGGESTIONS ===")
ids = await self._resolve_tracking_ids(user_id)
self._set_tracking("improvement_suggestions", **ids)
# Get profile analysis for style matching
profile_analysis = await db.get_profile_analysis(user_id)
if not profile_analysis:
raise ValueError("Profile analysis not found.")
suggestions = await self.writer.generate_improvement_suggestions(
post_content=post_content,
profile_analysis=profile_analysis.full_analysis,
critic_feedback=critic_feedback
)
logger.info(f"Generated {len(suggestions)} improvement suggestions")
return suggestions
async def apply_suggestion_to_post(
self,
user_id: UUID,
post_content: str,
suggestion: str
) -> str:
"""
Apply a suggestion to a post and return the improved version.
Args:
user_id: User UUID
post_content: The current post content
suggestion: The suggestion to apply
Returns:
The improved post content
"""
logger.info(f"=== APPLYING SUGGESTION TO POST ===")
ids = await self._resolve_tracking_ids(user_id)
self._set_tracking("apply_suggestion", **ids)
# Get profile analysis for style matching
profile_analysis = await db.get_profile_analysis(user_id)
if not profile_analysis:
raise ValueError("Profile analysis not found.")
improved_post = await self.writer.apply_suggestion(
post_content=post_content,
suggestion=suggestion,
profile_analysis=profile_analysis.full_analysis
)
logger.info("Successfully applied suggestion to post")
return improved_post
async def create_post(
self,
user_id: UUID,
topic: Dict[str, Any],
max_iterations: int = 3,
progress_callback: Optional[Callable[[str, int, int, Optional[int], Optional[List], Optional[List]], None]] = None,
post_type_id: Optional[UUID] = None,
user_thoughts: str = "",
selected_hook: str = ""
) -> Dict[str, Any]:
"""
Create a LinkedIn post through writer-critic iteration.
Args:
user_id: User UUID
topic: Topic dictionary
max_iterations: Maximum number of writer-critic iterations
progress_callback: Optional callback(message, iteration, max_iterations, score, versions, feedback_list)
post_type_id: Optional post type for type-specific writing
Returns:
Dictionary with final post and metadata
"""
logger.info(f"=== CREATING POST for topic: {topic.get('title')} ===")
ids = await self._resolve_tracking_ids(user_id)
self._set_tracking("post_creation", **ids)
def report_progress(message: str, iteration: int, score: Optional[int] = None,
versions: Optional[List] = None, feedback_list: Optional[List] = None):
if progress_callback:
progress_callback(message, iteration, max_iterations, score, versions, feedback_list)
# Get profile analysis
report_progress("Lade Profil-Analyse...", 0, None, [], [])
profile_analysis = await db.get_profile_analysis(user_id)
if not profile_analysis:
raise ValueError("Profile analysis not found. Run initial setup first.")
# Get post type info if specified
post_type = None
post_type_analysis = None
strategy_weight = 0.5 # Default strategy weight
if post_type_id:
post_type = await db.get_post_type(post_type_id)
if post_type:
if post_type.analysis:
post_type_analysis = post_type.analysis
strategy_weight = post_type.strategy_weight # Extract strategy weight from post type
logger.info(f"Using post type '{post_type.name}' with strategy weight {strategy_weight:.1f}")
# Load user's real posts as style examples
# If post_type_id is specified, only use posts of that type
if post_type_id:
linkedin_posts = await db.get_posts_by_type(user_id, post_type_id)
if len(linkedin_posts) < 3:
# Fall back to all posts if not enough type-specific posts
linkedin_posts = await db.get_linkedin_posts(user_id)
logger.info("Not enough type-specific posts, using all posts")
else:
linkedin_posts = await db.get_linkedin_posts(user_id)
example_post_texts = [
post.post_text for post in linkedin_posts
if post.post_text and len(post.post_text) > 100 # Only use substantial posts
]
logger.info(f"Loaded {len(example_post_texts)} example posts for style reference")
# Extract lessons from past feedback (if enabled)
feedback_lessons = await self._extract_recurring_feedback(user_id)
# Initialize tracking
writer_versions = []
critic_feedback_list = []
current_post = None
approved = False
iteration = 0
# Load company strategy if user belongs to a company
company_strategy = None
profile = await db.get_profile(user_id)
if profile and profile.company_id:
company = await db.get_company(profile.company_id)
if company and company.company_strategy:
company_strategy = company.company_strategy
logger.info(f"Loaded company strategy for post creation: {company.name}")
# Writer-Critic loop
while iteration < max_iterations and not approved:
iteration += 1
logger.info(f"--- Iteration {iteration}/{max_iterations} ---")
# Writer creates/revises post
if iteration == 1:
# Initial post
report_progress("Writer erstellt ersten Entwurf...", iteration, None, writer_versions, critic_feedback_list)
current_post = await self.writer.process(
topic=topic,
profile_analysis=profile_analysis.full_analysis,
example_posts=example_post_texts,
learned_lessons=feedback_lessons, # Pass lessons from past feedback
post_type=post_type,
post_type_analysis=post_type_analysis,
user_thoughts=user_thoughts,
selected_hook=selected_hook,
company_strategy=company_strategy, # Pass company strategy
strategy_weight=strategy_weight # NEW: Pass strategy weight
)
else:
# Revision based on feedback - pass full critic result for structured changes
report_progress("Writer überarbeitet Post...", iteration, None, writer_versions, critic_feedback_list)
last_feedback = critic_feedback_list[-1]
current_post = await self.writer.process(
topic=topic,
profile_analysis=profile_analysis.full_analysis,
feedback=last_feedback.get("feedback", ""),
previous_version=writer_versions[-1],
example_posts=example_post_texts,
critic_result=last_feedback, # Pass full critic result with specific_changes
learned_lessons=feedback_lessons, # Also for revisions
post_type=post_type,
post_type_analysis=post_type_analysis,
user_thoughts=user_thoughts,
selected_hook=selected_hook,
company_strategy=company_strategy, # Pass company strategy
strategy_weight=strategy_weight # NEW: Pass strategy weight
)
writer_versions.append(current_post)
logger.info(f"Writer produced version {iteration}")
# Report progress with new version
report_progress("Critic bewertet Post...", iteration, None, writer_versions, critic_feedback_list)
# Critic reviews post with iteration awareness
critic_result = await self.critic.process(
post=current_post,
profile_analysis=profile_analysis.full_analysis,
topic=topic,
example_posts=example_post_texts,
iteration=iteration,
max_iterations=max_iterations
)
critic_feedback_list.append(critic_result)
approved = critic_result.get("approved", False)
score = critic_result.get("overall_score", 0)
# Auto-approve on last iteration if score is decent (>= 80)
if iteration == max_iterations and not approved and score >= 80:
approved = True
critic_result["approved"] = True
logger.info(f"Auto-approved on final iteration with score {score}")
logger.info(f"Critic score: {score}/100 | Approved: {approved}")
if approved:
report_progress("Post genehmigt!", iteration, score, writer_versions, critic_feedback_list)
logger.info("Post approved!")
break
else:
report_progress(f"Score: {score}/100 - Überarbeitung nötig", iteration, score, writer_versions, critic_feedback_list)
if iteration < max_iterations:
logger.info("Post needs revision, continuing...")
# === POST-CRITIC QUALITY POLISH ===
# Simple approach: Analyze quality, then one final LLM polish
if settings.quality_refiner_enabled:
logger.info("=== Running Quality Analysis & Polish ===")
# Run quality checks (analysis only, no score changes)
quality_checks = await self._run_quality_checks(current_post, example_post_texts)
# Log quality metrics (for transparency)
grammar_errors = quality_checks['grammar_check'].get('error_count', 0)
style_similarity = quality_checks['style_check'].get('avg_similarity', 1.0)
readability_passed = quality_checks['readability_check'].get('passed', True)
logger.info(f"Quality Analysis:")
logger.info(f" Grammar: {grammar_errors} errors")
logger.info(f" Style: {style_similarity:.1%} similarity")
logger.info(f" Readability: {'' if readability_passed else '⚠️'}")
# Check if polish is needed
needs_polish = (
grammar_errors > 0 or
style_similarity < 0.75 or
not readability_passed
)
if needs_polish:
logger.info("Quality issues detected, running final polish...")
polished_post = await self.quality_refiner.final_polish(
post=current_post,
quality_checks=quality_checks,
profile_analysis=profile_analysis.full_analysis,
example_posts=example_post_texts
)
current_post = polished_post
logger.info("✅ Post polished (Formatierung erhalten)")
else:
logger.info("✅ No quality issues, skipping polish")
# Store quality info in critic result (for reference)
final_critic_result = critic_feedback_list[-1] if critic_feedback_list else {}
final_critic_result["quality_checks"] = quality_checks
final_critic_result["quality_polished"] = needs_polish
# === END QUALITY POLISH ===
# All new posts start as draft - user moves them via Kanban board
# IMPORTANT: Keep original critic score!
final_score = critic_feedback_list[-1].get("overall_score", 0) if critic_feedback_list else 0
status = "draft"
# Save generated post
from src.database.models import GeneratedPost
generated_post = GeneratedPost(
user_id=user_id,
topic_title=topic.get("title", "Unknown"),
post_content=current_post,
iterations=iteration,
writer_versions=writer_versions,
critic_feedback=critic_feedback_list,
status=status,
post_type_id=post_type_id
)
saved_post = await db.save_generated_post(generated_post)
logger.info(f"Post creation complete after {iteration} iterations")
return {
"post_id": saved_post.id,
"final_post": current_post,
"iterations": iteration,
"approved": approved,
"final_score": final_score,
"writer_versions": writer_versions,
"critic_feedback": critic_feedback_list
}
async def _run_quality_checks(
self,
post: str,
example_posts: List[str]
) -> Dict[str, Any]:
"""
Run all quality checks on a post.
Args:
post: Post text to check
example_posts: Reference posts for style comparison
Returns:
Dictionary with all quality check results
"""
# Grammar Check
grammar_result = await self.grammar_checker.process(
text=post,
auto_correct=False # Don't auto-correct here, refiner will do it
)
# Style Similarity Check
style_result = await self.style_validator.process(
generated_text=post,
reference_texts=example_posts,
threshold=0.75
)
# Readability Check
readability_result = await self.readability_checker.process(
text=post,
target_grade=10.0,
target_flesch=60.0,
max_sentence_length=20
)
return {
"grammar_check": grammar_result,
"style_check": style_result,
"readability_check": readability_result
}
async def _extract_recurring_feedback(self, user_id: UUID) -> Dict[str, Any]:
"""
Extract recurring feedback patterns from past generated posts.
Args:
user_id: User UUID
Returns:
Dictionary with recurring improvements and lessons learned
"""
if not settings.writer_learn_from_feedback:
return {"lessons": [], "patterns": {}}
# Get recent generated posts with their critic feedback
generated_posts = await db.get_generated_posts(user_id)
if not generated_posts:
return {"lessons": [], "patterns": {}}
# Limit to recent posts
recent_posts = generated_posts[:settings.writer_feedback_history_count]
# Collect all improvements from final feedback
all_improvements = []
all_scores = []
low_score_issues = [] # Issues from posts that scored < 85
for post in recent_posts:
if not post.critic_feedback:
continue
# Get final feedback (last in list)
final_feedback = post.critic_feedback[-1] if post.critic_feedback else None
if not final_feedback:
continue
score = final_feedback.get("overall_score", 0)
all_scores.append(score)
# Collect improvements
improvements = final_feedback.get("improvements", [])
all_improvements.extend(improvements)
# Track issues from lower-scoring posts
if score < 85:
low_score_issues.extend(improvements)
if not all_improvements:
return {"lessons": [], "patterns": {}}
# Count frequency of improvements (normalized)
def normalize_improvement(text: str) -> str:
"""Normalize improvement text for comparison."""
text = text.lower().strip()
# Remove common prefixes
for prefix in ["der ", "die ", "das ", "mehr ", "weniger ", "zu "]:
if text.startswith(prefix):
text = text[len(prefix):]
return text[:50] # Limit length for comparison
improvement_counts = Counter([normalize_improvement(imp) for imp in all_improvements])
low_score_counts = Counter([normalize_improvement(imp) for imp in low_score_issues])
# Find recurring issues (mentioned 2+ times)
recurring_issues = [
imp for imp, count in improvement_counts.most_common(10)
if count >= 2
]
# Find critical issues (from low-scoring posts, mentioned 2+ times)
critical_issues = [
imp for imp, count in low_score_counts.most_common(5)
if count >= 2
]
# Build lessons learned
lessons = []
if critical_issues:
lessons.append({
"type": "critical",
"message": "Diese Punkte führten zu niedrigen Scores - UNBEDINGT vermeiden:",
"items": critical_issues[:3]
})
if recurring_issues:
# Filter out critical issues
non_critical = [r for r in recurring_issues if r not in critical_issues]
if non_critical:
lessons.append({
"type": "recurring",
"message": "Häufig genannte Verbesserungspunkte aus vergangenen Posts:",
"items": non_critical[:4]
})
# Calculate average score for context
avg_score = sum(all_scores) / len(all_scores) if all_scores else 0
logger.info(f"Extracted feedback from {len(recent_posts)} posts: {len(lessons)} lesson categories, avg score: {avg_score:.1f}")
return {
"lessons": lessons,
"patterns": {
"avg_score": avg_score,
"posts_analyzed": len(recent_posts),
"recurring_count": len(recurring_issues),
"critical_count": len(critical_issues)
}
}
async def get_user_status(self, user_id: UUID) -> Dict[str, Any]:
"""
Get status information for a user.
Args:
user_id: User UUID
Returns:
Status dictionary
"""
profile = await db.get_profile(user_id)
if not profile:
raise ValueError("User not found")
linkedin_profile = await db.get_linkedin_profile(user_id)
posts = await db.get_linkedin_posts(user_id)
analysis = await db.get_profile_analysis(user_id)
generated_posts = await db.get_generated_posts(user_id)
all_research = await db.get_all_research(user_id)
post_types = await db.get_post_types(user_id)
# Count total research entries
research_count = len(all_research)
# Count classified posts
classified_posts = [p for p in posts if p.post_type_id]
# Count analyzed post types
analyzed_types = [pt for pt in post_types if pt.analysis]
# Check what's missing
missing_items = []
if not posts:
missing_items.append("LinkedIn Posts (Scraping)")
if not analysis:
missing_items.append("Profil-Analyse")
if research_count == 0:
missing_items.append("Research Topics")
# Ready for posts if we have scraped posts and profile analysis
ready_for_posts = len(posts) > 0 and analysis is not None
return {
"has_scraped_posts": len(posts) > 0,
"scraped_posts_count": len(posts),
"has_profile_analysis": analysis is not None,
"research_count": research_count,
"posts_count": len(generated_posts),
"ready_for_posts": ready_for_posts,
"missing_items": missing_items,
"post_types_count": len(post_types),
"classified_posts_count": len(classified_posts),
"analyzed_types_count": len(analyzed_types)
}
# Global orchestrator instance
orchestrator = WorkflowOrchestrator()