Major updates: LinkedIn auto-posting, timezone fixes, and Docker improvements

Features:
- Add LinkedIn OAuth integration and auto-posting functionality
- Add scheduler service for automated post publishing
- Add metadata field to generated_posts for LinkedIn URLs
- Add privacy policy page for LinkedIn API compliance
- Add company management features and employee accounts
- Add license key system for company registrations

Fixes:
- Fix timezone issues (use UTC consistently across app)
- Fix datetime serialization errors in database operations
- Fix scheduling timezone conversion (local time to UTC)
- Fix import errors (get_database -> db)

Infrastructure:
- Update Docker setup to use port 8001 (avoid conflicts)
- Add SSL support with nginx-proxy and Let's Encrypt
- Add LinkedIn setup documentation
- Add migration scripts for schema updates

Services:
- Add linkedin_service.py for LinkedIn API integration
- Add scheduler_service.py for background job processing
- Add storage_service.py for Supabase Storage
- Add email_service.py improvements
- Add encryption utilities for token storage

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-11 11:30:20 +01:00
parent b50594dbfa
commit f14515e9cf
94 changed files with 21601 additions and 5111 deletions

View File

@@ -5,7 +5,7 @@ from uuid import UUID
from loguru import logger
from src.config import settings
from src.database import db, Customer, LinkedInProfile, LinkedInPost, Topic
from src.database import db, LinkedInProfile, LinkedInPost, Topic
from src.scraper import scraper
from src.agents import (
ProfileAnalyzerAgent,
@@ -31,65 +31,80 @@ class WorkflowOrchestrator:
self.critic = CriticAgent()
self.post_classifier = PostClassifierAgent()
self.post_type_analyzer = PostTypeAnalyzerAgent()
self._all_agents = [
self.profile_analyzer, self.topic_extractor, self.researcher,
self.writer, self.critic, self.post_classifier, self.post_type_analyzer
]
logger.info("WorkflowOrchestrator initialized")
def _set_tracking(self, operation: str, user_id: Optional[str] = None,
company_id: Optional[str] = None):
"""Set tracking context on all agents."""
uid = str(user_id) if user_id else None
comp_id = str(company_id) if company_id else None
for agent in self._all_agents:
agent.set_tracking_context(operation=operation, user_id=uid, company_id=comp_id)
async def _resolve_tracking_ids(self, user_id: UUID) -> dict:
"""Resolve company_id from a user_id for tracking."""
try:
profile = await db.get_profile(user_id)
if profile:
return {
"user_id": str(user_id),
"company_id": str(profile.company_id) if profile.company_id else None
}
except Exception as e:
logger.debug(f"Could not resolve tracking IDs for user {user_id}: {e}")
return {"user_id": str(user_id), "company_id": None}
async def run_initial_setup(
self,
user_id: UUID,
linkedin_url: str,
customer_name: str,
customer_data: Dict[str, Any],
profile_data: Dict[str, Any],
post_types_data: Optional[List[Dict[str, Any]]] = None
) -> Customer:
) -> None:
"""
Run initial setup for a new customer.
Run initial setup for a user.
This includes:
1. Creating customer record
1. Updating profile with linkedin_url and metadata
2. Creating post types (if provided)
3. Scraping LinkedIn posts (NO profile scraping)
4. Creating profile from customer_data
4. Creating profile from profile_data
5. Analyzing profile
6. Extracting topics from existing posts
7. Classifying posts by type (if post types exist)
8. Analyzing post types (if enough posts)
Args:
user_id: User UUID
linkedin_url: LinkedIn profile URL
customer_name: Customer name
customer_data: Complete customer data (company, persona, style_guide, etc.)
profile_data: Profile data (writing style notes, etc.)
post_types_data: Optional list of post type definitions
Returns:
Customer object
"""
logger.info(f"=== INITIAL SETUP for {customer_name} ===")
logger.info(f"=== INITIAL SETUP for user {user_id} ===")
ids = await self._resolve_tracking_ids(user_id)
self._set_tracking("initial_setup", **ids)
# Step 1: Check if customer already exists
existing_customer = await db.get_customer_by_linkedin(linkedin_url)
if existing_customer:
logger.warning(f"Customer already exists: {existing_customer.id}")
return existing_customer
# Step 2: Create customer
# Step 1: Update profile with linkedin_url
total_steps = 7 if post_types_data else 5
logger.info(f"Step 1/{total_steps}: Creating customer record")
customer = Customer(
name=customer_name,
linkedin_url=linkedin_url,
company_name=customer_data.get("company_name"),
email=customer_data.get("email"),
metadata=customer_data
)
customer = await db.create_customer(customer)
logger.info(f"Customer created: {customer.id}")
logger.info(f"Step 1/{total_steps}: Updating profile")
await db.update_profile(user_id, {
"linkedin_url": linkedin_url,
"writing_style_notes": profile_data.get("writing_style_notes"),
"metadata": profile_data
})
logger.info(f"Profile updated for user: {user_id}")
# Step 2.5: Create post types if provided
# Step 2: Create post types if provided
created_post_types = []
if post_types_data:
logger.info(f"Step 2/{total_steps}: Creating {len(post_types_data)} post types")
for pt_data in post_types_data:
post_type = PostType(
customer_id=customer.id,
user_id=user_id,
name=pt_data.get("name", "Unnamed"),
description=pt_data.get("description"),
identifying_hashtags=pt_data.get("identifying_hashtags", []),
@@ -102,19 +117,20 @@ class WorkflowOrchestrator:
created_post_types = await db.create_post_types_bulk(created_post_types)
logger.info(f"Created {len(created_post_types)} post types")
# Step 3: Create LinkedIn profile from customer data (NO scraping)
# Step 3: Create LinkedIn profile from profile data (NO scraping)
step_num = 3 if post_types_data else 2
logger.info(f"Step {step_num}/{total_steps}: Creating LinkedIn profile from provided data")
profile = await db.get_profile(user_id)
linkedin_profile = LinkedInProfile(
customer_id=customer.id,
user_id=user_id,
profile_data={
"persona": customer_data.get("persona"),
"form_of_address": customer_data.get("form_of_address"),
"style_guide": customer_data.get("style_guide"),
"persona": profile_data.get("persona"),
"form_of_address": profile_data.get("form_of_address"),
"style_guide": profile_data.get("style_guide"),
"linkedin_url": linkedin_url
},
name=customer_name,
headline=customer_data.get("persona", "")[:100] if customer_data.get("persona") else None
name=profile.display_name or "",
headline=profile_data.get("persona", "")[:100] if profile_data.get("persona") else None
)
await db.save_linkedin_profile(linkedin_profile)
logger.info("LinkedIn profile saved")
@@ -129,7 +145,7 @@ class WorkflowOrchestrator:
linkedin_posts = []
for post_data in parsed_posts:
post = LinkedInPost(
customer_id=customer.id,
user_id=user_id,
**post_data
)
linkedin_posts.append(post)
@@ -151,13 +167,13 @@ class WorkflowOrchestrator:
profile_analysis = await self.profile_analyzer.process(
profile=linkedin_profile,
posts=linkedin_posts,
customer_data=customer_data
customer_data=profile_data
)
# Save profile analysis
from src.database.models import ProfileAnalysis
analysis_record = ProfileAnalysis(
customer_id=customer.id,
user_id=user_id,
writing_style=profile_analysis.get("writing_style", {}),
tone_analysis=profile_analysis.get("tone_analysis", {}),
topic_patterns=profile_analysis.get("topic_patterns", {}),
@@ -177,7 +193,7 @@ class WorkflowOrchestrator:
try:
topics = await self.topic_extractor.process(
posts=linkedin_posts,
customer_id=customer.id # Pass UUID directly
user_id=user_id
)
if topics:
await db.save_topics(topics)
@@ -192,40 +208,41 @@ class WorkflowOrchestrator:
# Step 7: Classify posts
logger.info(f"Step {total_steps - 1}/{total_steps}: Classifying posts by type")
try:
await self.classify_posts(customer.id)
await self.classify_posts(user_id)
except Exception as e:
logger.error(f"Post classification failed: {e}", exc_info=True)
# Step 8: Analyze post types
logger.info(f"Step {total_steps}/{total_steps}: Analyzing post types")
try:
await self.analyze_post_types(customer.id)
await self.analyze_post_types(user_id)
except Exception as e:
logger.error(f"Post type analysis failed: {e}", exc_info=True)
logger.info(f"Step {total_steps}/{total_steps}: Initial setup complete!")
return customer
async def classify_posts(self, customer_id: UUID) -> int:
async def classify_posts(self, user_id: UUID) -> int:
"""
Classify unclassified posts for a customer.
Classify unclassified posts for a user.
Args:
customer_id: Customer UUID
user_id: User UUID
Returns:
Number of posts classified
"""
logger.info(f"=== CLASSIFYING POSTS for customer {customer_id} ===")
logger.info(f"=== CLASSIFYING POSTS for user {user_id} ===")
ids = await self._resolve_tracking_ids(user_id)
self._set_tracking("classify_posts", **ids)
# Get post types
post_types = await db.get_post_types(customer_id)
post_types = await db.get_post_types(user_id)
if not post_types:
logger.info("No post types defined, skipping classification")
return 0
# Get unclassified posts
posts = await db.get_unclassified_posts(customer_id)
posts = await db.get_unclassified_posts(user_id)
if not posts:
logger.info("No unclassified posts found")
return 0
@@ -243,20 +260,22 @@ class WorkflowOrchestrator:
return 0
async def analyze_post_types(self, customer_id: UUID) -> Dict[str, Any]:
async def analyze_post_types(self, user_id: UUID) -> Dict[str, Any]:
"""
Analyze all post types for a customer.
Analyze all post types for a user.
Args:
customer_id: Customer UUID
user_id: User UUID
Returns:
Dictionary with analysis results per post type
"""
logger.info(f"=== ANALYZING POST TYPES for customer {customer_id} ===")
logger.info(f"=== ANALYZING POST TYPES for user {user_id} ===")
ids = await self._resolve_tracking_ids(user_id)
self._set_tracking("analyze_post_types", **ids)
# Get post types
post_types = await db.get_post_types(customer_id)
post_types = await db.get_post_types(user_id)
if not post_types:
logger.info("No post types defined")
return {}
@@ -264,7 +283,7 @@ class WorkflowOrchestrator:
results = {}
for post_type in post_types:
# Get posts for this type
posts = await db.get_posts_by_type(customer_id, post_type.id)
posts = await db.get_posts_by_type(user_id, post_type.id)
if len(posts) < self.post_type_analyzer.MIN_POSTS_FOR_ANALYSIS:
logger.info(f"Post type '{post_type.name}' has only {len(posts)} posts, skipping analysis")
@@ -292,22 +311,24 @@ class WorkflowOrchestrator:
async def research_new_topics(
self,
customer_id: UUID,
user_id: UUID,
progress_callback: Optional[Callable[[str, int, int], None]] = None,
post_type_id: Optional[UUID] = None
) -> List[Dict[str, Any]]:
"""
Research new content topics for a customer.
Research new content topics for a user.
Args:
customer_id: Customer UUID
user_id: User UUID
progress_callback: Optional callback(message, current_step, total_steps)
post_type_id: Optional post type to target research for
Returns:
List of suggested topics
"""
logger.info(f"=== RESEARCHING NEW TOPICS for customer {customer_id} ===")
logger.info(f"=== RESEARCHING NEW TOPICS for user {user_id} ===")
ids = await self._resolve_tracking_ids(user_id)
self._set_tracking("research", **ids)
# Get post type context if specified
post_type = None
@@ -324,7 +345,7 @@ class WorkflowOrchestrator:
# Step 1: Get profile analysis
report_progress("Lade Profil-Analyse...", 1)
profile_analysis = await db.get_profile_analysis(customer_id)
profile_analysis = await db.get_profile_analysis(user_id)
if not profile_analysis:
raise ValueError("Profile analysis not found. Run initial setup first.")
@@ -333,12 +354,12 @@ class WorkflowOrchestrator:
existing_topics = set()
# From topics table
existing_topics_records = await db.get_topics(customer_id)
existing_topics_records = await db.get_topics(user_id)
for t in existing_topics_records:
existing_topics.add(t.title)
# From previous research results
all_research = await db.get_all_research(customer_id)
all_research = await db.get_all_research(user_id)
for research in all_research:
if research.suggested_topics:
for topic in research.suggested_topics:
@@ -346,7 +367,7 @@ class WorkflowOrchestrator:
existing_topics.add(topic["title"])
# From generated posts
generated_posts = await db.get_generated_posts(customer_id)
generated_posts = await db.get_generated_posts(user_id)
for post in generated_posts:
if post.topic_title:
existing_topics.add(post.topic_title)
@@ -354,15 +375,15 @@ class WorkflowOrchestrator:
existing_topics = list(existing_topics)
logger.info(f"Found {len(existing_topics)} existing topics to avoid")
# Get customer data
customer = await db.get_customer(customer_id)
# Get profile data
profile = await db.get_profile(user_id)
# Get example posts to understand the person's actual content style
# If post_type_id is specified, only use posts of that type
if post_type_id:
linkedin_posts = await db.get_posts_by_type(customer_id, post_type_id)
linkedin_posts = await db.get_posts_by_type(user_id, post_type_id)
else:
linkedin_posts = await db.get_linkedin_posts(customer_id)
linkedin_posts = await db.get_linkedin_posts(user_id)
example_post_texts = [
post.post_text for post in linkedin_posts
@@ -376,7 +397,7 @@ class WorkflowOrchestrator:
research_results = await self.researcher.process(
profile_analysis=profile_analysis.full_analysis,
existing_topics=existing_topics,
customer_data=customer.metadata,
customer_data=profile.metadata,
example_posts=example_post_texts,
post_type=post_type,
post_type_analysis=post_type_analysis
@@ -386,8 +407,8 @@ class WorkflowOrchestrator:
report_progress("Speichere Ergebnisse...", 4)
from src.database.models import ResearchResult
research_record = ResearchResult(
customer_id=customer_id,
query=f"New topics for {customer.name}" + (f" ({post_type.name})" if post_type else ""),
user_id=user_id,
query=f"New topics for {profile.display_name}" + (f" ({post_type.name})" if post_type else ""),
results={"raw_response": research_results["raw_response"]},
suggested_topics=research_results["suggested_topics"],
target_post_type_id=post_type_id
@@ -397,19 +418,135 @@ class WorkflowOrchestrator:
return research_results["suggested_topics"]
async def generate_hooks(
self,
user_id: UUID,
topic: Dict[str, Any],
user_thoughts: str = "",
post_type_id: Optional[UUID] = None
) -> List[Dict[str, str]]:
"""
Generate 4 hook options for a topic.
Args:
user_id: User UUID
topic: Topic dictionary
user_thoughts: User's personal thoughts about the topic
post_type_id: Optional post type for context
Returns:
List of {"hook": "...", "style": "..."} dictionaries
"""
logger.info(f"=== GENERATING HOOKS for topic: {topic.get('title')} ===")
ids = await self._resolve_tracking_ids(user_id)
self._set_tracking("generate_hooks", **ids)
# Get profile analysis for style matching
profile_analysis = await db.get_profile_analysis(user_id)
if not profile_analysis:
raise ValueError("Profile analysis not found. Run initial setup first.")
# Get post type context if specified
post_type = None
if post_type_id:
post_type = await db.get_post_type(post_type_id)
# Generate hooks via writer agent
hooks = await self.writer.generate_hooks(
topic=topic,
profile_analysis=profile_analysis.full_analysis,
user_thoughts=user_thoughts,
post_type=post_type
)
logger.info(f"Generated {len(hooks)} hooks")
return hooks
async def generate_improvement_suggestions(
self,
user_id: UUID,
post_content: str,
critic_feedback: Optional[Dict[str, Any]] = None
) -> List[Dict[str, str]]:
"""
Generate improvement suggestions for an existing post.
Args:
user_id: User UUID
post_content: The current post content
critic_feedback: Optional feedback from the critic
Returns:
List of {"label": "...", "action": "..."} dictionaries
"""
logger.info("=== GENERATING IMPROVEMENT SUGGESTIONS ===")
ids = await self._resolve_tracking_ids(user_id)
self._set_tracking("improvement_suggestions", **ids)
# Get profile analysis for style matching
profile_analysis = await db.get_profile_analysis(user_id)
if not profile_analysis:
raise ValueError("Profile analysis not found.")
suggestions = await self.writer.generate_improvement_suggestions(
post_content=post_content,
profile_analysis=profile_analysis.full_analysis,
critic_feedback=critic_feedback
)
logger.info(f"Generated {len(suggestions)} improvement suggestions")
return suggestions
async def apply_suggestion_to_post(
self,
user_id: UUID,
post_content: str,
suggestion: str
) -> str:
"""
Apply a suggestion to a post and return the improved version.
Args:
user_id: User UUID
post_content: The current post content
suggestion: The suggestion to apply
Returns:
The improved post content
"""
logger.info(f"=== APPLYING SUGGESTION TO POST ===")
ids = await self._resolve_tracking_ids(user_id)
self._set_tracking("apply_suggestion", **ids)
# Get profile analysis for style matching
profile_analysis = await db.get_profile_analysis(user_id)
if not profile_analysis:
raise ValueError("Profile analysis not found.")
improved_post = await self.writer.apply_suggestion(
post_content=post_content,
suggestion=suggestion,
profile_analysis=profile_analysis.full_analysis
)
logger.info("Successfully applied suggestion to post")
return improved_post
async def create_post(
self,
customer_id: UUID,
user_id: UUID,
topic: Dict[str, Any],
max_iterations: int = 3,
progress_callback: Optional[Callable[[str, int, int, Optional[int], Optional[List], Optional[List]], None]] = None,
post_type_id: Optional[UUID] = None
post_type_id: Optional[UUID] = None,
user_thoughts: str = "",
selected_hook: str = ""
) -> Dict[str, Any]:
"""
Create a LinkedIn post through writer-critic iteration.
Args:
customer_id: Customer UUID
user_id: User UUID
topic: Topic dictionary
max_iterations: Maximum number of writer-critic iterations
progress_callback: Optional callback(message, iteration, max_iterations, score, versions, feedback_list)
@@ -419,6 +556,8 @@ class WorkflowOrchestrator:
Dictionary with final post and metadata
"""
logger.info(f"=== CREATING POST for topic: {topic.get('title')} ===")
ids = await self._resolve_tracking_ids(user_id)
self._set_tracking("post_creation", **ids)
def report_progress(message: str, iteration: int, score: Optional[int] = None,
versions: Optional[List] = None, feedback_list: Optional[List] = None):
@@ -427,7 +566,7 @@ class WorkflowOrchestrator:
# Get profile analysis
report_progress("Lade Profil-Analyse...", 0, None, [], [])
profile_analysis = await db.get_profile_analysis(customer_id)
profile_analysis = await db.get_profile_analysis(user_id)
if not profile_analysis:
raise ValueError("Profile analysis not found. Run initial setup first.")
@@ -440,16 +579,16 @@ class WorkflowOrchestrator:
post_type_analysis = post_type.analysis
logger.info(f"Using post type '{post_type.name}' for writing")
# Load customer's real posts as style examples
# Load user's real posts as style examples
# If post_type_id is specified, only use posts of that type
if post_type_id:
linkedin_posts = await db.get_posts_by_type(customer_id, post_type_id)
linkedin_posts = await db.get_posts_by_type(user_id, post_type_id)
if len(linkedin_posts) < 3:
# Fall back to all posts if not enough type-specific posts
linkedin_posts = await db.get_linkedin_posts(customer_id)
linkedin_posts = await db.get_linkedin_posts(user_id)
logger.info("Not enough type-specific posts, using all posts")
else:
linkedin_posts = await db.get_linkedin_posts(customer_id)
linkedin_posts = await db.get_linkedin_posts(user_id)
example_post_texts = [
post.post_text for post in linkedin_posts
@@ -458,7 +597,7 @@ class WorkflowOrchestrator:
logger.info(f"Loaded {len(example_post_texts)} example posts for style reference")
# Extract lessons from past feedback (if enabled)
feedback_lessons = await self._extract_recurring_feedback(customer_id)
feedback_lessons = await self._extract_recurring_feedback(user_id)
# Initialize tracking
writer_versions = []
@@ -467,6 +606,15 @@ class WorkflowOrchestrator:
approved = False
iteration = 0
# Load company strategy if user belongs to a company
company_strategy = None
profile = await db.get_profile(user_id)
if profile and profile.company_id:
company = await db.get_company(profile.company_id)
if company and company.company_strategy:
company_strategy = company.company_strategy
logger.info(f"Loaded company strategy for post creation: {company.name}")
# Writer-Critic loop
while iteration < max_iterations and not approved:
iteration += 1
@@ -482,7 +630,10 @@ class WorkflowOrchestrator:
example_posts=example_post_texts,
learned_lessons=feedback_lessons, # Pass lessons from past feedback
post_type=post_type,
post_type_analysis=post_type_analysis
post_type_analysis=post_type_analysis,
user_thoughts=user_thoughts,
selected_hook=selected_hook,
company_strategy=company_strategy # NEW: Pass company strategy
)
else:
# Revision based on feedback - pass full critic result for structured changes
@@ -497,7 +648,10 @@ class WorkflowOrchestrator:
critic_result=last_feedback, # Pass full critic result with specific_changes
learned_lessons=feedback_lessons, # Also for revisions
post_type=post_type,
post_type_analysis=post_type_analysis
post_type_analysis=post_type_analysis,
user_thoughts=user_thoughts,
selected_hook=selected_hook,
company_strategy=company_strategy # NEW: Pass company strategy
)
writer_versions.append(current_post)
@@ -538,19 +692,14 @@ class WorkflowOrchestrator:
if iteration < max_iterations:
logger.info("Post needs revision, continuing...")
# Determine final status based on score
# All new posts start as draft - user moves them via Kanban board
final_score = critic_feedback_list[-1].get("overall_score", 0) if critic_feedback_list else 0
if approved and final_score >= 85:
status = "approved"
elif approved and final_score >= 80:
status = "approved" # Auto-approved
else:
status = "draft"
status = "draft"
# Save generated post
from src.database.models import GeneratedPost
generated_post = GeneratedPost(
customer_id=customer_id,
user_id=user_id,
topic_title=topic.get("title", "Unknown"),
post_content=current_post,
iterations=iteration,
@@ -573,12 +722,12 @@ class WorkflowOrchestrator:
"critic_feedback": critic_feedback_list
}
async def _extract_recurring_feedback(self, customer_id: UUID) -> Dict[str, Any]:
async def _extract_recurring_feedback(self, user_id: UUID) -> Dict[str, Any]:
"""
Extract recurring feedback patterns from past generated posts.
Args:
customer_id: Customer UUID
user_id: User UUID
Returns:
Dictionary with recurring improvements and lessons learned
@@ -587,7 +736,7 @@ class WorkflowOrchestrator:
return {"lessons": [], "patterns": {}}
# Get recent generated posts with their critic feedback
generated_posts = await db.get_generated_posts(customer_id)
generated_posts = await db.get_generated_posts(user_id)
if not generated_posts:
return {"lessons": [], "patterns": {}}
@@ -683,26 +832,26 @@ class WorkflowOrchestrator:
}
}
async def get_customer_status(self, customer_id: UUID) -> Dict[str, Any]:
async def get_user_status(self, user_id: UUID) -> Dict[str, Any]:
"""
Get status information for a customer.
Get status information for a user.
Args:
customer_id: Customer UUID
user_id: User UUID
Returns:
Status dictionary
"""
customer = await db.get_customer(customer_id)
if not customer:
raise ValueError("Customer not found")
profile = await db.get_profile(user_id)
if not profile:
raise ValueError("User not found")
profile = await db.get_linkedin_profile(customer_id)
posts = await db.get_linkedin_posts(customer_id)
analysis = await db.get_profile_analysis(customer_id)
generated_posts = await db.get_generated_posts(customer_id)
all_research = await db.get_all_research(customer_id)
post_types = await db.get_post_types(customer_id)
linkedin_profile = await db.get_linkedin_profile(user_id)
posts = await db.get_linkedin_posts(user_id)
analysis = await db.get_profile_analysis(user_id)
generated_posts = await db.get_generated_posts(user_id)
all_research = await db.get_all_research(user_id)
post_types = await db.get_post_types(user_id)
# Count total research entries
research_count = len(all_research)