Files
Onyva-Postling/src/web/user/routes.py

3751 lines
140 KiB
Python

"""User frontend routes (LinkedIn OAuth protected)."""
import asyncio
import json
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Optional
from uuid import UUID
from fastapi import APIRouter, Request, Form, BackgroundTasks, HTTPException, UploadFile
from fastapi.templating import Jinja2Templates
from fastapi.responses import HTMLResponse, RedirectResponse, StreamingResponse, JSONResponse
from pydantic import BaseModel
from loguru import logger
from src.config import settings
from src.database import db
from src.orchestrator import orchestrator
from src.web.user.auth import (
get_user_session, get_user_session_async, set_user_session, clear_user_session,
get_supabase_login_url, handle_oauth_callback, UserSession,
handle_email_password_login, create_email_password_user, sign_out,
USER_SESSION_COOKIE
)
from src.web.user.password_auth import (
hash_password, verify_password, generate_invitation_token,
get_invitation_expiry, is_token_expired, validate_password_strength
)
from src.services.email_service import (
send_approval_request_email,
send_decision_notification_email,
validate_token,
mark_token_used
)
from src.services.background_jobs import (
job_manager, JobType, JobStatus,
run_post_scraping, run_profile_analysis, run_post_categorization, run_post_type_analysis,
run_full_analysis_pipeline
)
from src.services.storage_service import storage
# Router for user frontend
user_router = APIRouter(tags=["user"])
# Templates
templates = Jinja2Templates(directory=Path(__file__).parent.parent / "templates" / "user")
base_templates = Jinja2Templates(directory=Path(__file__).parent.parent / "templates")
# Store for progress updates
progress_store = {}
async def get_user_profile_picture(user_id: UUID) -> Optional[str]:
"""Get profile picture URL from user profile record (cached)."""
profile = await db.get_profile(user_id)
if profile and profile.profile_picture:
return profile.profile_picture
return None
def require_user_session(request: Request) -> Optional[UserSession]:
"""Check if user is authenticated, redirect to login if not."""
session = get_user_session(request)
if not session:
return None
return session
# ==================== PUBLIC ROUTES ====================
@user_router.get("/privacy-policy", response_class=HTMLResponse)
async def privacy_policy(request: Request):
"""Public privacy policy page."""
from datetime import date
return base_templates.TemplateResponse("privacy_policy.html", {
"request": request,
"current_date": date.today().strftime("%d.%m.%Y")
})
# ==================== AUTH ROUTES ====================
@user_router.get("/login", response_class=HTMLResponse)
async def login_page(request: Request, error: str = None):
"""User login page with LinkedIn OAuth button."""
# If already logged in, redirect to dashboard
session = get_user_session(request)
if session:
return RedirectResponse(url="/", status_code=302)
return templates.TemplateResponse("login.html", {
"request": request,
"error": error
})
@user_router.get("/auth/linkedin")
async def start_oauth(request: Request):
"""Start LinkedIn OAuth flow via Supabase."""
# Build callback URL
callback_url = settings.supabase_redirect_url
if not callback_url:
# Fallback to constructing from request
callback_url = str(request.url_for("oauth_callback"))
login_url = get_supabase_login_url(callback_url)
return RedirectResponse(url=login_url, status_code=302)
@user_router.get("/auth/callback")
async def oauth_callback(
request: Request,
access_token: str = None,
refresh_token: str = None,
error: str = None,
error_description: str = None
):
"""Handle OAuth callback from Supabase."""
if error:
logger.error(f"OAuth error: {error} - {error_description}")
return RedirectResponse(url=f"/login?error={error}", status_code=302)
# Supabase returns tokens in URL hash, not query params
# We need to handle this client-side and redirect back
# Check if we have the tokens
if not access_token:
# Render a page that extracts hash params and redirects
return templates.TemplateResponse("auth_callback.html", {
"request": request
})
# We have the tokens, try to authenticate
session, access_tok, refresh_tok = await handle_oauth_callback(access_token, refresh_token)
if not session:
return RedirectResponse(url="/not-authorized", status_code=302)
# Check onboarding status
if not session.is_onboarding_complete:
if session.account_type == "company":
redirect_url = "/onboarding/company"
else:
redirect_url = "/onboarding/profile"
else:
redirect_url = "/"
# Success - set session and redirect
response = RedirectResponse(url=redirect_url, status_code=302)
set_user_session(response, session, access_tok, refresh_tok)
return response
@user_router.get("/logout")
async def logout(request: Request):
"""Log out user from Supabase Auth."""
# Try to sign out from Supabase
access_token = request.cookies.get(USER_SESSION_COOKIE)
if access_token:
await sign_out(access_token)
response = RedirectResponse(url="/login", status_code=302)
clear_user_session(response)
return response
@user_router.get("/not-authorized", response_class=HTMLResponse)
async def not_authorized_page(request: Request):
"""Page shown when user's LinkedIn profile doesn't match any customer."""
return templates.TemplateResponse("not_authorized.html", {
"request": request
})
# ==================== REGISTRATION ROUTES ====================
@user_router.get("/register", response_class=HTMLResponse)
async def register_page(request: Request):
"""Registration page - redirect directly to company registration.
GHOSTWRITER FEATURE DISABLED: To re-enable, show register.html template instead.
"""
session = get_user_session(request)
if session and session.is_onboarding_complete:
return RedirectResponse(url="/", status_code=302)
# Redirect directly to company registration (ghostwriter disabled)
return RedirectResponse(url="/register/company", status_code=302)
# ==================== GHOSTWRITER REGISTRATION (DISABLED) ====================
# To re-enable ghostwriter registration:
# 1. Uncomment these routes
# 2. Change /register route to show register.html template
# 3. Update login.html to show ghostwriter registration link
# @user_router.get("/register/ghostwriter", response_class=HTMLResponse)
# async def register_ghostwriter_page(request: Request, error: str = None):
# """Ghostwriter registration page."""
# session = get_user_session(request)
# if session and session.is_onboarding_complete:
# return RedirectResponse(url="/", status_code=302)
# return templates.TemplateResponse("register_ghostwriter.html", {
# "request": request,
# "error": error
# })
# @user_router.post("/register/ghostwriter")
# async def register_ghostwriter(
# request: Request,
# email: str = Form(...),
# password: str = Form(...),
# password_confirm: str = Form(...)
# ):
# """Handle ghostwriter registration with email/password."""
# if password != password_confirm:
# return templates.TemplateResponse("register_ghostwriter.html", {
# "request": request,
# "error": "Passwörter stimmen nicht überein"
# })
#
# # Create user via Supabase Auth
# session, access_token, refresh_token, error = await create_email_password_user(
# email, password, "ghostwriter"
# )
#
# if not session:
# return templates.TemplateResponse("register_ghostwriter.html", {
# "request": request,
# "error": error or "E-Mail bereits registriert oder ungültig"
# })
#
# # Set session and redirect to onboarding
# response = RedirectResponse(url="/onboarding/profile", status_code=302)
# set_user_session(response, session, access_token, refresh_token)
# return response
@user_router.get("/register/company", response_class=HTMLResponse)
async def register_company_page(request: Request, error: str = None):
"""Company registration page."""
session = get_user_session(request)
if session and session.is_onboarding_complete:
return RedirectResponse(url="/", status_code=302)
return templates.TemplateResponse("register_company.html", {
"request": request,
"error": error
})
@user_router.post("/register/company")
async def register_company(
request: Request,
license_key: str = Form(...),
company_name: str = Form(...),
email: str = Form(...),
password: str = Form(...),
password_confirm: str = Form(...)
):
"""Handle company registration with email/password and license key validation."""
try:
# 1. Password match validation
if password != password_confirm:
return templates.TemplateResponse("register_company.html", {
"request": request,
"error": "Passwörter stimmen nicht überein",
"company_name": company_name,
"email": email
})
# 2. LICENSE KEY VALIDATION (BEFORE account creation)
license_key = license_key.strip().upper()
license = await db.get_license_key(license_key)
if not license:
return templates.TemplateResponse("register_company.html", {
"request": request,
"error": "Ungültiger Lizenzschlüssel",
"company_name": company_name,
"email": email
})
if license.used:
return templates.TemplateResponse("register_company.html", {
"request": request,
"error": "Dieser Lizenzschlüssel wurde bereits verwendet",
"company_name": company_name,
"email": email
})
# 3. Create user with company account type via Supabase Auth
session, access_token, refresh_token, error = await create_email_password_user(
email, password, "company"
)
if not session:
return templates.TemplateResponse("register_company.html", {
"request": request,
"error": error or "E-Mail bereits registriert oder ungültig",
"company_name": company_name,
"email": email
})
# 4. Create the company record with license key reference
from src.database.models import Company
company = Company(
name=company_name,
owner_user_id=UUID(session.user_id),
license_key_id=license.id
)
created_company = await db.create_company(company)
# 5. Mark license key as used
await db.mark_license_key_used(license_key, created_company.id)
# 6. Update profile with company_id
await db.update_profile(UUID(session.user_id), {"company_id": str(created_company.id)})
# 7. Update session
session.company_id = str(created_company.id)
session.company_name = company_name
# 8. Set session and redirect to company onboarding
response = RedirectResponse(url="/onboarding/company", status_code=302)
set_user_session(response, session, access_token, refresh_token)
return response
except Exception as e:
logger.error(f"Error in company registration: {e}")
return templates.TemplateResponse("register_company.html", {
"request": request,
"error": "Ein Fehler ist aufgetreten. Bitte versuche es erneut.",
"company_name": company_name,
"email": email
})
@user_router.post("/auth/login")
async def email_password_login(
request: Request,
email: str = Form(...),
password: str = Form(...)
):
"""Handle email/password login via Supabase Auth."""
session, access_token, refresh_token = await handle_email_password_login(email, password)
if not session:
return templates.TemplateResponse("login.html", {
"request": request,
"error": "Ungültige E-Mail oder Passwort"
})
# Check onboarding status
if not session.is_onboarding_complete:
# Redirect to appropriate onboarding step
if session.account_type == "company":
redirect_url = "/onboarding/company"
else:
redirect_url = "/onboarding/profile"
response = RedirectResponse(url=redirect_url, status_code=302)
else:
response = RedirectResponse(url="/", status_code=302)
set_user_session(response, session, access_token, refresh_token)
return response
# ==================== INVITATION ROUTES ====================
@user_router.get("/invite/{token}", response_class=HTMLResponse)
async def invite_page(request: Request, token: str):
"""Display invitation acceptance page."""
invitation = await db.get_invitation_by_token(token)
if not invitation:
return templates.TemplateResponse("invite_accept.html", {
"request": request,
"error": "Einladung nicht gefunden",
"expired": True
})
# Check if expired
if is_token_expired(invitation.expires_at):
return templates.TemplateResponse("invite_accept.html", {
"request": request,
"expired": True
})
# Check if already accepted
inv_status = invitation.status.value if hasattr(invitation.status, 'value') else invitation.status
if inv_status != "pending":
return templates.TemplateResponse("invite_accept.html", {
"request": request,
"error": "Diese Einladung wurde bereits verwendet",
"expired": True
})
# Get company info
company = await db.get_company(invitation.company_id)
inviter = await db.get_user(invitation.invited_by_user_id)
return templates.TemplateResponse("invite_accept.html", {
"request": request,
"invitation": invitation,
"company_name": company.name if company else "Unbekannt",
"inviter_name": inviter.linkedin_name or inviter.email if inviter else "Unbekannt"
})
@user_router.post("/invite/{token}/accept")
async def accept_invitation(
request: Request,
token: str,
email: str = Form(...),
password: str = Form(...),
password_confirm: str = Form(...)
):
"""Accept an invitation and create employee account."""
invitation = await db.get_invitation_by_token(token)
if not invitation:
return templates.TemplateResponse("invite_accept.html", {
"request": request,
"error": "Einladung nicht gefunden",
"expired": True
})
# Validate password
if password != password_confirm:
company = await db.get_company(invitation.company_id)
inviter = await db.get_user(invitation.invited_by_user_id)
return templates.TemplateResponse("invite_accept.html", {
"request": request,
"invitation": invitation,
"company_name": company.name if company else "Unbekannt",
"inviter_name": inviter.linkedin_name or inviter.email if inviter else "Unbekannt",
"error": "Passwörter stimmen nicht überein"
})
is_valid, error_msg = validate_password_strength(password)
if not is_valid:
company = await db.get_company(invitation.company_id)
inviter = await db.get_user(invitation.invited_by_user_id)
return templates.TemplateResponse("invite_accept.html", {
"request": request,
"invitation": invitation,
"company_name": company.name if company else "Unbekannt",
"inviter_name": inviter.linkedin_name or inviter.email if inviter else "Unbekannt",
"error": error_msg
})
# Create employee user via Supabase Auth
session, access_token, refresh_token, error = await create_email_password_user(
email=invitation.email,
password=password,
account_type="employee",
company_id=str(invitation.company_id)
)
if not session:
return templates.TemplateResponse("invite_accept.html", {
"request": request,
"error": error or "Konto konnte nicht erstellt werden. E-Mail bereits vergeben?",
"expired": True
})
# Mark invitation as accepted
from datetime import datetime
await db.update_invitation(invitation.id, {
"status": "accepted",
"accepted_at": datetime.utcnow(),
"accepted_by_user_id": session.user_id
})
# Get company name for session
company = await db.get_company(invitation.company_id)
session.company_name = company.name if company else None
# Redirect to onboarding
response = RedirectResponse(url="/onboarding/profile", status_code=302)
set_user_session(response, session, access_token, refresh_token)
return response
# ==================== ONBOARDING ROUTES ====================
def get_ghostwriter_steps(current: str) -> list:
"""Get onboarding steps for ghostwriter with current step highlighted."""
steps = [
{"name": "Profil", "status": "pending"},
{"name": "Posts", "status": "pending"},
{"name": "Typen", "status": "pending"},
{"name": "Fertig", "status": "pending"}
]
step_order = ["profile", "posts", "post_types", "complete"]
current_idx = step_order.index(current) if current in step_order else 0
for i, step in enumerate(steps):
if i < current_idx:
step["status"] = "done"
elif i == current_idx:
step["status"] = "active"
return steps
def get_company_steps(current: str) -> list:
"""Get onboarding steps for company with current step highlighted."""
steps = [
{"name": "Unternehmen", "status": "pending"},
{"name": "Strategie", "status": "pending"},
{"name": "Fertig", "status": "pending"}
]
step_order = ["company", "strategy", "complete"]
current_idx = step_order.index(current) if current in step_order else 0
for i, step in enumerate(steps):
if i < current_idx:
step["status"] = "done"
elif i == current_idx:
step["status"] = "active"
return steps
def get_employee_steps(current: str) -> list:
"""Get onboarding steps for employee with current step highlighted."""
steps = [
{"name": "Profil", "status": "pending"},
{"name": "Posts", "status": "pending"},
{"name": "Typen", "status": "pending"},
{"name": "Fertig", "status": "pending"}
]
step_order = ["profile", "posts", "post_types", "complete"]
current_idx = step_order.index(current) if current in step_order else 0
for i, step in enumerate(steps):
if i < current_idx:
step["status"] = "done"
elif i == current_idx:
step["status"] = "active"
return steps
@user_router.get("/onboarding/profile", response_class=HTMLResponse)
async def onboarding_profile_page(request: Request):
"""Profile setup page for ghostwriter/employee onboarding."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
# Check if already completed AND has user_id (to avoid redirect loop)
if session.is_onboarding_complete and session.user_id:
return RedirectResponse(url="/", status_code=302)
# Prefill data from session
prefill = {
"name": session.linkedin_name or "",
"linkedin_url": f"https://linkedin.com/in/{session.linkedin_vanity_name}" if session.linkedin_vanity_name else ""
}
# Use employee template for employees (simpler form)
if session.account_type == "employee":
return templates.TemplateResponse("onboarding/profile_employee.html", {
"request": request,
"session": session,
"steps": get_employee_steps("profile"),
"prefill": prefill
})
# Ghostwriter template (separate customer/ghostwriter fields)
return templates.TemplateResponse("onboarding/profile.html", {
"request": request,
"session": session,
"steps": get_ghostwriter_steps("profile"),
"prefill": prefill
})
@user_router.post("/onboarding/profile")
async def onboarding_profile_submit(
request: Request,
background_tasks: BackgroundTasks,
linkedin_url: str = Form(...),
# Employee form fields (simpler)
name: str = Form(None),
email: str = Form(None),
is_employee: str = Form(None),
# Ghostwriter form fields
customer_name: str = Form(None),
ghostwriter_name: str = Form(None),
creator_email: str = Form(""),
customer_email: str = Form(""),
writing_style_notes: str = Form(""),
company_name: str = Form("")
):
"""Handle profile setup form submission."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
# Determine if this is an employee submission
is_employee_flow = is_employee == "true" or session.account_type == "employee"
# For employees: name = customer_name = ghostwriter_name, email = both emails
if is_employee_flow:
actual_name = name or session.linkedin_name or "Mitarbeiter"
actual_customer_name = actual_name
actual_ghostwriter_name = actual_name
actual_email = email or session.email or ""
actual_creator_email = actual_email
actual_customer_email = actual_email
else:
actual_customer_name = customer_name or ""
actual_ghostwriter_name = ghostwriter_name or ""
actual_creator_email = creator_email
actual_customer_email = customer_email
try:
# Update profile with LinkedIn info and onboarding data
user_id = UUID(session.user_id)
profile_updates = {
"linkedin_url": linkedin_url,
"display_name": actual_ghostwriter_name,
"writing_style_notes": writing_style_notes or None,
"creator_email": actual_creator_email or None,
"customer_email": actual_customer_email or None,
"onboarding_status": "profile_setup"
}
await db.update_profile(user_id, profile_updates)
logger.info(f"Updated profile {user_id} with LinkedIn URL: {linkedin_url}")
# Update session
session.onboarding_status = "profile_setup"
session.linkedin_name = actual_ghostwriter_name
# Determine whether to scrape or skip
should_scrape = True
existing_posts = await db.get_linkedin_posts(user_id)
if existing_posts:
# This user already has posts (e.g. re-onboarding)
should_scrape = False
logger.info(f"Skipping scraping - {len(existing_posts)} posts already exist for user {user_id}")
if should_scrape:
job = job_manager.create_job(JobType.POST_SCRAPING, str(user_id))
background_tasks.add_task(run_post_scraping, user_id, linkedin_url, job.id)
logger.info(f"Started background scraping for user {user_id}")
response = RedirectResponse(url="/onboarding/posts", status_code=302)
set_user_session(response, session)
return response
except Exception as e:
logger.error(f"Error in profile onboarding: {e}")
# Use appropriate template based on account type
template_name = "onboarding/profile_employee.html" if is_employee_flow else "onboarding/profile.html"
steps = get_employee_steps("profile") if is_employee_flow else get_ghostwriter_steps("profile")
return templates.TemplateResponse(template_name, {
"request": request,
"session": session,
"steps": steps,
"error": str(e),
"prefill": {
"linkedin_url": linkedin_url,
"name": name or actual_customer_name,
"email": email or actual_creator_email,
"customer_name": actual_customer_name,
"ghostwriter_name": actual_ghostwriter_name,
"creator_email": actual_creator_email,
"customer_email": actual_customer_email,
"writing_style_notes": writing_style_notes,
"company_name": company_name
}
})
@user_router.get("/onboarding/posts", response_class=HTMLResponse)
async def onboarding_posts_page(request: Request):
"""Posts scraping/adding page."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
if not session.user_id:
return RedirectResponse(url="/onboarding/profile", status_code=302)
user_id = UUID(session.user_id)
profile = await db.get_profile(user_id)
scraped_posts = await db.get_linkedin_posts(user_id)
# Manual posts are now stored as LinkedInPost with manual:// URL
example_posts = [p for p in scraped_posts if p.post_url and p.post_url.startswith("manual://")]
reference_profiles = await db.get_reference_profiles(user_id)
total_posts = len(scraped_posts)
# Check if scraping is in progress
active_jobs = job_manager.get_active_jobs(session.user_id)
scraping_in_progress = any(j.job_type == JobType.POST_SCRAPING for j in active_jobs)
return templates.TemplateResponse("onboarding/posts.html", {
"request": request,
"session": session,
"steps": get_ghostwriter_steps("posts"),
"profile": profile,
"scraped_posts_count": len(scraped_posts),
"example_posts": example_posts,
"reference_profiles": reference_profiles,
"total_posts": total_posts,
"scraping_in_progress": scraping_in_progress
})
@user_router.post("/onboarding/posts")
async def onboarding_posts_submit(request: Request):
"""Move to next onboarding step after posts."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
# Update user status
if session.user_id:
await db.update_user(UUID(session.user_id), {"onboarding_status": "posts_scraped"})
session.onboarding_status = "posts_scraped"
response = RedirectResponse(url="/onboarding/post-types", status_code=302)
set_user_session(response, session)
return response
@user_router.get("/api/posts-count")
async def api_posts_count(request: Request):
"""Get current post count for onboarding polling."""
session = require_user_session(request)
if not session or not session.user_id:
return JSONResponse({"error": "Not authenticated"}, status_code=401)
user_id = UUID(session.user_id)
posts = await db.get_linkedin_posts(user_id)
# Check if scraping is active
active_jobs = job_manager.get_active_jobs(session.user_id)
scraping_active = any(j.job_type == JobType.POST_SCRAPING for j in active_jobs)
return JSONResponse({
"count": len(posts),
"scraping_active": scraping_active
})
@user_router.post("/api/onboarding/add-manual-post")
async def api_add_manual_post(request: Request):
"""Add a manual example post during onboarding."""
session = require_user_session(request)
if not session or not session.user_id:
return JSONResponse({"error": "Not authenticated"}, status_code=401)
try:
data = await request.json()
post_text = data.get("post_text", "").strip()
if not post_text:
return JSONResponse({"error": "Post text required"}, status_code=400)
from uuid import uuid4
from datetime import datetime
from src.database.models import LinkedInPost
manual_post = LinkedInPost(
user_id=UUID(session.user_id),
post_text=post_text,
post_url=f"manual://{uuid4()}",
post_date=datetime.now(timezone.utc),
classification_method="manual"
)
saved_posts = await db.save_linkedin_posts([manual_post])
saved = saved_posts[0] if saved_posts else manual_post
return JSONResponse({"success": True, "id": str(saved.id)})
except Exception as e:
logger.error(f"Error adding manual post: {e}")
return JSONResponse({"error": str(e)}, status_code=500)
@user_router.post("/api/onboarding/rescrape")
async def api_rescrape(request: Request, background_tasks: BackgroundTasks):
"""Trigger re-scraping of LinkedIn posts."""
session = require_user_session(request)
if not session or not session.user_id:
return JSONResponse({"error": "Not authenticated"}, status_code=401)
try:
user_id = UUID(session.user_id)
profile = await db.get_profile(user_id)
if not profile or not profile.linkedin_url:
return JSONResponse({"error": "No LinkedIn URL found"}, status_code=400)
# Create job and start scraping
job = job_manager.create_job(JobType.POST_SCRAPING, session.user_id)
background_tasks.add_task(run_post_scraping, user_id, profile.linkedin_url, job.id)
return JSONResponse({"success": True, "job_id": job.id})
except Exception as e:
logger.error(f"Error starting rescrape: {e}")
return JSONResponse({"error": str(e)}, status_code=500)
@user_router.get("/onboarding/post-types", response_class=HTMLResponse)
async def onboarding_post_types_page(request: Request):
"""Post types selection page."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
return templates.TemplateResponse("onboarding/post_types.html", {
"request": request,
"session": session,
"steps": get_ghostwriter_steps("post_types")
})
@user_router.post("/onboarding/post-types")
async def onboarding_post_types_submit(
request: Request,
post_types_json: str = Form("{}")
):
"""Save post types and move to categorization."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
try:
import json
post_types_data = json.loads(post_types_json)
if post_types_data and session.user_id:
from src.database.models import PostType
user_id = UUID(session.user_id)
post_types = []
for pt_data in post_types_data:
post_type = PostType(
user_id=user_id,
name=pt_data.get("name", ""),
description=pt_data.get("description"),
identifying_keywords=pt_data.get("identifying_keywords", [])
)
post_types.append(post_type)
if post_types:
await db.create_post_types_bulk(post_types)
logger.info(f"Created {len(post_types)} post types for user")
# Update status to completed (skip categorization - done in background)
if session.user_id:
await db.update_user(UUID(session.user_id), {"onboarding_status": "completed"})
session.onboarding_status = "completed"
response = RedirectResponse(url="/onboarding/complete", status_code=302)
set_user_session(response, session)
return response
except Exception as e:
logger.error(f"Error saving post types: {e}")
return templates.TemplateResponse("onboarding/post_types.html", {
"request": request,
"session": session,
"steps": get_ghostwriter_steps("post_types"),
"error": str(e)
})
@user_router.get("/onboarding/categorize", response_class=HTMLResponse)
async def onboarding_categorize_page(request: Request):
"""Post categorization page."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
if not session.user_id:
return RedirectResponse(url="/onboarding/profile", status_code=302)
user_id = UUID(session.user_id)
profile = await db.get_profile(user_id)
all_posts = await db.get_linkedin_posts(user_id)
post_types = await db.get_post_types(user_id)
# Count posts per type
type_counts = {}
for pt in post_types:
posts_for_type = await db.get_posts_by_type(user_id, pt.id)
type_counts[str(pt.id)] = len(posts_for_type)
# Get uncategorized posts
uncategorized = await db.get_unclassified_posts(user_id)
classified_count = len(all_posts) - len(uncategorized)
total_posts = len(all_posts)
progress = int((classified_count / total_posts * 100)) if total_posts > 0 else 0
post_types_with_counts = [
{"id": str(pt.id), "name": pt.name, "count": type_counts.get(str(pt.id), 0)}
for pt in post_types
]
return templates.TemplateResponse("onboarding/categorize.html", {
"request": request,
"session": session,
"steps": get_ghostwriter_steps("categorize"),
"profile": profile,
"post_types": post_types_with_counts,
"total_posts": total_posts,
"classified_count": classified_count,
"uncategorized_count": len(uncategorized),
"uncategorized_posts": uncategorized[:5],
"progress": progress,
"classification_complete": len(uncategorized) == 0 or classified_count >= 3
})
@user_router.post("/onboarding/categorize")
async def onboarding_categorize_submit(request: Request):
"""Complete categorization and move to final step."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
# Run profile analysis
if session.user_id:
try:
user_id = UUID(session.user_id)
profile = await db.get_profile(user_id)
posts = await db.get_linkedin_posts(user_id)
# Create profile and run analysis
from src.database.models import LinkedInProfile, ProfileAnalysis
linkedin_profile = LinkedInProfile(
user_id=user_id,
profile_data={"linkedin_url": profile.linkedin_url},
name=profile.name
)
await db.save_linkedin_profile(linkedin_profile)
# Run analysis
profile_analysis = await orchestrator.profile_analyzer.process(
profile=linkedin_profile,
posts=posts,
customer_data=profile.metadata
)
analysis_record = ProfileAnalysis(
user_id=user_id,
writing_style=profile_analysis.get("writing_style", {}),
tone_analysis=profile_analysis.get("tone_analysis", {}),
topic_patterns=profile_analysis.get("topic_patterns", {}),
audience_insights=profile_analysis.get("audience_insights", {}),
full_analysis=profile_analysis
)
await db.save_profile_analysis(analysis_record)
logger.info("Profile analysis completed during onboarding")
except Exception as e:
logger.error(f"Profile analysis failed during onboarding: {e}")
response = RedirectResponse(url="/onboarding/complete", status_code=302)
set_user_session(response, session)
return response
@user_router.get("/onboarding/complete", response_class=HTMLResponse)
async def onboarding_complete_page(request: Request, background_tasks: BackgroundTasks):
"""Onboarding completion page."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
# Mark onboarding as complete
if session.user_id:
await db.update_user(UUID(session.user_id), {"onboarding_status": "completed"})
session.onboarding_status = "completed"
customer = None
posts_count = 0
post_types_count = 0
profile_analysis = None
analysis_started = False
if session.user_id:
user_id = UUID(session.user_id)
profile = await db.get_profile(user_id)
posts = await db.get_linkedin_posts(user_id)
posts_count = len(posts)
post_types = await db.get_post_types(user_id)
post_types_count = len(post_types)
profile_analysis = await db.get_profile_analysis(user_id)
# Start background analysis if not already done
if not profile_analysis:
# Check if there's no active analysis job
active_jobs = job_manager.get_active_jobs(session.user_id)
if not active_jobs:
# Start full analysis pipeline in background
background_tasks.add_task(run_full_analysis_pipeline, user_id)
analysis_started = True
logger.info(f"Started background analysis pipeline for user {user_id}")
response = templates.TemplateResponse("onboarding/complete.html", {
"request": request,
"session": session,
"steps": get_ghostwriter_steps("complete"),
"profile": profile,
"posts_count": posts_count,
"post_types_count": post_types_count,
"profile_analysis": profile_analysis,
"analysis_started": analysis_started
})
set_user_session(response, session)
return response
# ==================== COMPANY ONBOARDING ROUTES ====================
@user_router.get("/onboarding/company", response_class=HTMLResponse)
async def onboarding_company_page(request: Request):
"""Company data setup page."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
if session.account_type != "company":
return RedirectResponse(url="/onboarding/profile", status_code=302)
company = None
if session.company_id:
company = await db.get_company(UUID(session.company_id))
return templates.TemplateResponse("onboarding/company.html", {
"request": request,
"session": session,
"steps": get_company_steps("company"),
"company": company
})
@user_router.post("/onboarding/company")
async def onboarding_company_submit(
request: Request,
name: str = Form(...),
description: str = Form(""),
website: str = Form(""),
industry: str = Form("")
):
"""Save company data and proceed to strategy."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
try:
if session.company_id:
await db.update_company(UUID(session.company_id), {
"name": name,
"description": description or None,
"website": website or None,
"industry": industry or None
})
session.company_name = name
response = RedirectResponse(url="/onboarding/strategy", status_code=302)
set_user_session(response, session)
return response
except Exception as e:
logger.error(f"Error saving company data: {e}")
return templates.TemplateResponse("onboarding/company.html", {
"request": request,
"session": session,
"steps": get_company_steps("company"),
"error": str(e)
})
@user_router.get("/onboarding/strategy", response_class=HTMLResponse)
async def onboarding_strategy_page(request: Request):
"""Company strategy setup page."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
company = None
strategy = {}
if session.company_id:
company = await db.get_company(UUID(session.company_id))
if company:
strategy = company.company_strategy or {}
return templates.TemplateResponse("onboarding/strategy.html", {
"request": request,
"session": session,
"steps": get_company_steps("strategy"),
"company": company,
"strategy": strategy
})
@user_router.post("/onboarding/strategy")
async def onboarding_strategy_submit(request: Request):
"""Save company strategy and complete onboarding."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
form = await request.form()
# Build strategy object
strategy = {
"mission": form.get("mission", ""),
"vision": form.get("vision", ""),
"brand_voice": form.get("brand_voice", ""),
"tone_guidelines": form.get("tone_guidelines", ""),
"target_audience": form.get("target_audience", ""),
"content_pillars": [p for p in form.getlist("content_pillar") if p],
"dos": [d for d in form.getlist("do_item") if d],
"donts": [d for d in form.getlist("dont_item") if d]
}
try:
if session.company_id:
await db.update_company(UUID(session.company_id), {
"company_strategy": strategy,
"onboarding_completed": True
})
# Mark user onboarding as complete
if session.user_id:
await db.update_user(UUID(session.user_id), {"onboarding_status": "completed"})
session.onboarding_status = "completed"
response = RedirectResponse(url="/", status_code=302)
set_user_session(response, session)
return response
except Exception as e:
logger.error(f"Error saving company strategy: {e}")
company = await db.get_company(UUID(session.company_id)) if session.company_id else None
return templates.TemplateResponse("onboarding/strategy.html", {
"request": request,
"session": session,
"steps": get_company_steps("strategy"),
"company": company,
"strategy": strategy,
"error": str(e)
})
# ==================== PROTECTED PAGES ====================
@user_router.get("/", response_class=HTMLResponse)
async def dashboard(request: Request):
"""User dashboard - shows only their own stats."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
# Check if onboarding is complete
if not session.is_onboarding_complete:
# Redirect to appropriate onboarding step based on account type
if session.account_type == "company":
# Company accounts need company onboarding
if not session.company_id:
return RedirectResponse(url="/onboarding/company", status_code=302)
# Check if company onboarding is done
if session.company_id:
company = await db.get_company(UUID(session.company_id))
if company and not company.onboarding_completed:
return RedirectResponse(url="/onboarding/strategy", status_code=302)
return RedirectResponse(url="/onboarding/company", status_code=302)
else:
# Ghostwriter/Employee accounts need customer profile
onboarding_status = session.onboarding_status
if onboarding_status == "pending":
return RedirectResponse(url="/onboarding/profile", status_code=302)
elif onboarding_status == "profile_setup":
return RedirectResponse(url="/onboarding/posts", status_code=302)
elif onboarding_status == "posts_scraped":
return RedirectResponse(url="/onboarding/post-types", status_code=302)
elif onboarding_status == "categorizing":
return RedirectResponse(url="/onboarding/post-types", status_code=302)
else:
return RedirectResponse(url="/onboarding/profile", status_code=302)
# For ghostwriter/employee: Check if user_id exists
if session.account_type != "company" and not session.user_id:
# Reset onboarding status if it was marked complete but no customer
if session.user_id:
await db.update_profile(UUID(session.user_id), {"onboarding_status": "pending"})
session.onboarding_status = "pending"
return RedirectResponse(url="/onboarding/profile", status_code=302)
try:
# Company accounts have a different dashboard
if session.account_type == "company":
company = await db.get_company(UUID(session.company_id)) if session.company_id else None
employees = await db.get_company_employees(UUID(session.company_id)) if session.company_id else []
pending_invitations = await db.get_pending_invitations(UUID(session.company_id)) if session.company_id else []
quota = await db.get_company_daily_quota(UUID(session.company_id)) if session.company_id else None
license_key = await db.get_company_limits(UUID(session.company_id)) if session.company_id else None
return templates.TemplateResponse("company_dashboard.html", {
"request": request,
"page": "home",
"session": session,
"company": company,
"employees": employees,
"total_employees": len(employees),
"pending_invitations": pending_invitations,
"quota": quota,
"license_key": license_key
})
# Employee accounts have their own dashboard
user_id = UUID(session.user_id)
profile = await db.get_profile(user_id)
posts = await db.get_generated_posts(user_id)
profile_picture = session.linkedin_picture or await get_user_profile_picture(user_id)
if session.account_type == "employee":
# Count post statuses
pending_posts = len([p for p in posts if p.status in ['draft', 'pending']])
approved_posts = len([p for p in posts if p.status in ['approved', 'ready']])
return templates.TemplateResponse("employee_dashboard.html", {
"request": request,
"page": "home",
"session": session,
"profile": profile,
"posts_count": len(posts),
"pending_posts": pending_posts,
"approved_posts": approved_posts,
"recent_posts": posts[:5] if posts else [],
"profile_picture": profile_picture
})
# Ghostwriter accounts
return templates.TemplateResponse("dashboard.html", {
"request": request,
"page": "home",
"session": session,
"profile": profile,
"total_posts": len(posts),
"profile_picture": profile_picture
})
except Exception as e:
logger.error(f"Error loading dashboard: {e}")
template = "employee_dashboard.html" if session.account_type == "employee" else "dashboard.html"
return templates.TemplateResponse(template, {
"request": request,
"page": "home",
"session": session,
"error": str(e)
})
@user_router.get("/posts", response_class=HTMLResponse)
async def posts_page(request: Request):
"""View user's own posts."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
try:
user_id = UUID(session.user_id)
profile = await db.get_profile(user_id)
posts = await db.get_generated_posts(user_id)
profile_picture = session.linkedin_picture or await get_user_profile_picture(user_id)
return templates.TemplateResponse("posts.html", {
"request": request,
"page": "posts",
"session": session,
"profile": profile,
"posts": posts,
"total_posts": len(posts),
"profile_picture": profile_picture
})
except Exception as e:
logger.error(f"Error loading posts: {e}")
return templates.TemplateResponse("posts.html", {
"request": request,
"page": "posts",
"session": session,
"posts": [],
"total_posts": 0,
"error": str(e)
})
@user_router.get("/post-types", response_class=HTMLResponse)
async def post_types_page(request: Request):
"""Post types management page."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
try:
user_id = UUID(session.user_id)
# Get post types
post_types = await db.get_post_types(user_id)
# Get all linkedin posts
all_posts = await db.get_linkedin_posts(user_id)
# Separate categorized and uncategorized
uncategorized_posts = [p for p in all_posts if not p.post_type_id]
categorized_posts = [p for p in all_posts if p.post_type_id]
# Group categorized by type
categorized_by_type = {}
post_type_map = {pt.id: pt.name for pt in post_types}
for post in categorized_posts:
type_name = post_type_map.get(post.post_type_id, "Unbekannt")
if type_name not in categorized_by_type:
categorized_by_type[type_name] = []
categorized_by_type[type_name].append(post)
return templates.TemplateResponse("post_types.html", {
"request": request,
"page": "post_types",
"session": session,
"post_types": post_types,
"uncategorized_posts": uncategorized_posts,
"categorized_posts": categorized_posts,
"categorized_by_type": categorized_by_type
})
except Exception as e:
logger.error(f"Error loading post types page: {e}")
return templates.TemplateResponse("post_types.html", {
"request": request,
"page": "post_types",
"session": session,
"post_types": [],
"uncategorized_posts": [],
"categorized_posts": [],
"categorized_by_type": {},
"error": str(e)
})
@user_router.post("/api/categorize-post")
async def api_categorize_post(request: Request):
"""Categorize a single post."""
session = require_user_session(request)
if not session:
return JSONResponse({"error": "Not authenticated"}, status_code=401)
try:
data = await request.json()
post_id = UUID(data["post_id"])
post_type_id = UUID(data["post_type_id"])
await db.update_linkedin_post(post_id, {"post_type_id": str(post_type_id)})
return JSONResponse({"success": True})
except Exception as e:
logger.error(f"Error categorizing post: {e}")
return JSONResponse({"error": str(e)}, status_code=500)
@user_router.get("/api/job-updates")
async def job_updates_sse(request: Request):
"""Server-Sent Events endpoint for job updates."""
session = require_user_session(request)
tracking_id = getattr(session, 'user_id', None) or getattr(session, 'company_id', None)
if not session or not tracking_id:
return JSONResponse({"error": "Not authenticated"}, status_code=401)
async def event_generator():
queue = asyncio.Queue()
async def on_job_update(job):
await queue.put(job)
# Register listener
job_manager.add_listener(tracking_id, on_job_update)
try:
# Send initial active jobs
active_jobs = job_manager.get_active_jobs(tracking_id)
for job in active_jobs:
data = {
"id": job.id,
"job_type": job.job_type.value,
"status": job.status.value,
"progress": job.progress,
"message": job.message,
"error": job.error
}
yield f"data: {json.dumps(data)}\n\n"
# Stream updates
while True:
try:
job = await asyncio.wait_for(queue.get(), timeout=30)
data = {
"id": job.id,
"job_type": job.job_type.value,
"status": job.status.value,
"progress": job.progress,
"message": job.message,
"error": job.error
}
yield f"data: {json.dumps(data)}\n\n"
except asyncio.TimeoutError:
# Send keepalive
yield ": keepalive\n\n"
finally:
job_manager.remove_listener(tracking_id, on_job_update)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no"
}
)
@user_router.post("/api/run-post-type-analysis")
async def api_run_post_type_analysis(request: Request, background_tasks: BackgroundTasks):
"""Start post type analysis in background."""
session = require_user_session(request)
if not session or not session.user_id:
return JSONResponse({"error": "Not authenticated"}, status_code=401)
try:
user_id = UUID(session.user_id)
# Create job
job = job_manager.create_job(JobType.POST_TYPE_ANALYSIS, session.user_id)
# Run in background
background_tasks.add_task(run_post_type_analysis, user_id, job.id)
return JSONResponse({"success": True, "job_id": job.id})
except Exception as e:
logger.error(f"Error starting post type analysis: {e}")
return JSONResponse({"error": str(e)}, status_code=500)
@user_router.post("/api/run-full-analysis")
async def api_run_full_analysis(request: Request, background_tasks: BackgroundTasks):
"""Start full analysis pipeline in background (profile -> categorization -> post types)."""
session = require_user_session(request)
if not session or not session.user_id:
return JSONResponse({"error": "Not authenticated"}, status_code=401)
try:
user_id = UUID(session.user_id)
# Run full pipeline in background
background_tasks.add_task(run_full_analysis_pipeline, user_id)
return JSONResponse({"success": True, "message": "Analysis pipeline started"})
except Exception as e:
logger.error(f"Error starting full analysis: {e}")
return JSONResponse({"error": str(e)}, status_code=500)
@user_router.get("/api/jobs")
async def api_get_jobs(request: Request):
"""Get all jobs for the current user."""
session = require_user_session(request)
if not session or not session.user_id:
return JSONResponse({"error": "Not authenticated"}, status_code=401)
jobs = job_manager.get_user_jobs(session.user_id)
return JSONResponse({
"jobs": [
{
"id": j.id,
"job_type": j.job_type.value,
"status": j.status.value,
"progress": j.progress,
"message": j.message,
"error": j.error,
"created_at": j.created_at.isoformat() if j.created_at else None
}
for j in jobs
]
})
@user_router.get("/posts/{post_id}", response_class=HTMLResponse)
async def post_detail_page(request: Request, post_id: str):
"""Detailed view of a single post."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
try:
post = await db.get_generated_post(UUID(post_id))
if not post:
return RedirectResponse(url="/posts", status_code=302)
# Verify user owns this post
if str(post.user_id) != session.user_id:
return RedirectResponse(url="/posts", status_code=302)
profile = await db.get_profile(post.user_id)
linkedin_posts = await db.get_linkedin_posts(post.user_id)
reference_posts = [p.post_text for p in linkedin_posts if p.post_text and len(p.post_text) > 100][:10]
profile_picture_url = session.linkedin_picture
if not profile_picture_url:
for lp in linkedin_posts:
if lp.raw_data and isinstance(lp.raw_data, dict):
author = lp.raw_data.get("author", {})
if author and isinstance(author, dict):
profile_picture_url = author.get("profile_picture")
if profile_picture_url:
break
profile_analysis_record = await db.get_profile_analysis(post.user_id)
profile_analysis = profile_analysis_record.full_analysis if profile_analysis_record else None
post_type = None
post_type_analysis = None
if post.post_type_id:
post_type = await db.get_post_type(post.post_type_id)
if post_type and post_type.analysis:
post_type_analysis = post_type.analysis
final_feedback = None
if post.critic_feedback and len(post.critic_feedback) > 0:
final_feedback = post.critic_feedback[-1]
# Convert media_items to dicts for JSON serialization in template
media_items_dict = []
if post.media_items:
media_items_dict = [
item.model_dump(mode='json') if hasattr(item, 'model_dump') else (item.dict() if hasattr(item, 'dict') else item)
for item in post.media_items
]
return templates.TemplateResponse("post_detail.html", {
"request": request,
"page": "posts",
"session": session,
"post": post,
"profile": profile,
"reference_posts": reference_posts,
"profile_analysis": profile_analysis,
"post_type": post_type,
"post_type_analysis": post_type_analysis,
"final_feedback": final_feedback,
"profile_picture_url": profile_picture_url,
"media_items_dict": media_items_dict
})
except Exception as e:
logger.error(f"Error loading post detail: {e}")
return RedirectResponse(url="/posts", status_code=302)
@user_router.get("/research", response_class=HTMLResponse)
async def research_page(request: Request):
"""Research topics page - with limit check for companies."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
# Check research limit for companies/employees
limit_reached = False
limit_message = ""
if session.account_type in ("company", "employee") and session.company_id:
can_create, error_msg = await db.check_company_research_limit(UUID(session.company_id))
limit_reached = not can_create
limit_message = error_msg
return templates.TemplateResponse("research.html", {
"request": request,
"page": "research",
"session": session,
"user_id": session.user_id,
"limit_reached": limit_reached,
"limit_message": limit_message
})
@user_router.get("/create", response_class=HTMLResponse)
async def create_post_page(request: Request):
"""Create post page - with limit check for companies."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
# Check post limit for companies/employees
limit_reached = False
limit_message = ""
if session.account_type in ("company", "employee") and session.company_id:
can_create, error_msg = await db.check_company_post_limit(UUID(session.company_id))
limit_reached = not can_create
limit_message = error_msg
return templates.TemplateResponse("create_post.html", {
"request": request,
"page": "create",
"session": session,
"user_id": session.user_id,
"limit_reached": limit_reached,
"limit_message": limit_message
})
@user_router.get("/status", response_class=HTMLResponse)
async def status_page(request: Request):
"""User's status page."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
try:
user_id = UUID(session.user_id)
profile = await db.get_profile(user_id)
status = await orchestrator.get_user_status(user_id)
profile_picture = session.linkedin_picture or await get_user_profile_picture(user_id)
return templates.TemplateResponse("status.html", {
"request": request,
"page": "status",
"session": session,
"profile": profile,
"status": status,
"profile_picture": profile_picture
})
except Exception as e:
logger.error(f"Error loading status: {e}")
return templates.TemplateResponse("status.html", {
"request": request,
"page": "status",
"session": session,
"error": str(e)
})
# ==================== API ENDPOINTS ====================
@user_router.get("/api/post-types")
async def get_post_types(request: Request, user_id: str = None):
"""Get post types for the logged-in user or specified user (for company owners)."""
session = require_user_session(request)
if not session:
raise HTTPException(status_code=401, detail="Not authenticated")
try:
# Company owners can specify a user_id for their employees
target_user_id = user_id if user_id and session.account_type == "company" else session.user_id
if not target_user_id:
return {"post_types": []}
post_types = await db.get_post_types(UUID(target_user_id))
return {
"post_types": [
{
"id": str(pt.id),
"name": pt.name,
"description": pt.description,
"has_analysis": pt.analysis is not None,
"analyzed_post_count": pt.analyzed_post_count,
}
for pt in post_types
]
}
except Exception as e:
logger.error(f"Error loading post types: {e}")
return {"post_types": [], "error": str(e)}
@user_router.get("/api/topics")
async def get_topics(request: Request, post_type_id: str = None, user_id: str = None):
"""Get research topics for the logged-in user or specified user (for company owners)."""
session = require_user_session(request)
if not session:
raise HTTPException(status_code=401, detail="Not authenticated")
try:
# Company owners can specify a user_id for their employees
target_user_id = user_id if user_id and session.account_type == "company" else session.user_id
if not target_user_id:
return {"topics": [], "available_count": 0, "used_count": 0}
user_id = UUID(target_user_id)
if post_type_id:
all_research = await db.get_all_research(user_id, UUID(post_type_id))
else:
all_research = await db.get_all_research(user_id)
# Get used topics
generated_posts = await db.get_generated_posts(user_id)
used_topic_titles = set()
for post in generated_posts:
if post.topic_title:
used_topic_titles.add(post.topic_title.lower().strip())
all_topics = []
for research in all_research:
if research.suggested_topics:
for topic in research.suggested_topics:
topic_title = topic.get("title", "").lower().strip()
if topic_title in used_topic_titles:
continue
topic["research_id"] = str(research.id)
topic["target_post_type_id"] = str(research.target_post_type_id) if research.target_post_type_id else None
all_topics.append(topic)
return {"topics": all_topics, "used_count": len(used_topic_titles), "available_count": len(all_topics)}
except Exception as e:
logger.error(f"Error loading topics: {e}")
return {"topics": [], "error": str(e)}
@user_router.get("/api/tasks/{task_id}")
async def get_task_status(task_id: str):
"""Get task progress."""
return progress_store.get(task_id, {"status": "unknown", "message": "Task not found"})
@user_router.post("/api/research")
async def start_research(request: Request, background_tasks: BackgroundTasks, post_type_id: str = Form(None), user_id: str = Form(None)):
"""Start research for the logged-in user or specified user (for company owners)."""
session = require_user_session(request)
if not session:
raise HTTPException(status_code=401, detail="Not authenticated")
# CHECK COMPANY RESEARCH LIMIT and capture company_id for quota tracking
quota_company_id = None
if session.account_type in ("company", "employee") and session.company_id:
quota_company_id = UUID(session.company_id)
can_create, error_msg = await db.check_company_research_limit(quota_company_id)
if not can_create:
raise HTTPException(status_code=429, detail=error_msg)
# Company owners can specify a user_id for their employees
target_user_id = user_id if user_id and session.account_type == "company" else session.user_id
if not target_user_id:
raise HTTPException(status_code=400, detail="No user ID available")
user_id = target_user_id
task_id = f"research_{user_id}_{asyncio.get_event_loop().time()}"
progress_store[task_id] = {"status": "starting", "message": "Starte Recherche...", "progress": 0}
async def run_research():
try:
def progress_callback(message: str, step: int, total: int):
progress_store[task_id] = {"status": "running", "message": message, "progress": int((step / total) * 100)}
topics = await orchestrator.research_new_topics(
UUID(user_id),
progress_callback=progress_callback,
post_type_id=UUID(post_type_id) if post_type_id else None
)
# INCREMENT COMPANY QUOTA after successful research
if quota_company_id:
try:
await db.increment_company_researches_quota(quota_company_id)
logger.info(f"Incremented research quota for company {quota_company_id}")
except Exception as quota_error:
logger.error(f"Failed to increment research quota: {quota_error}")
progress_store[task_id] = {"status": "completed", "message": f"{len(topics)} Topics gefunden!", "progress": 100, "topics": topics}
except Exception as e:
logger.exception(f"Research failed: {e}")
progress_store[task_id] = {"status": "error", "message": str(e), "progress": 0}
background_tasks.add_task(run_research)
return {"task_id": task_id}
@user_router.post("/api/transcribe")
async def transcribe_audio(request: Request):
"""Transcribe audio using OpenAI Whisper."""
session = require_user_session(request)
if not session:
raise HTTPException(status_code=401, detail="Not authenticated")
import openai
import tempfile
import os
try:
client = openai.OpenAI(api_key=settings.openai_api_key)
# Get the uploaded file from the request
form = await request.form()
audio_file = form.get("audio")
if not audio_file:
raise HTTPException(status_code=400, detail="No audio file provided")
# Read the audio content
audio_content = await audio_file.read()
# Save to temporary file (Whisper needs a file)
with tempfile.NamedTemporaryFile(suffix=".webm", delete=False) as tmp_file:
tmp_file.write(audio_content)
tmp_path = tmp_file.name
try:
# Transcribe with Whisper
with open(tmp_path, "rb") as f:
transcript = client.audio.transcriptions.create(
model="whisper-1",
file=f,
language="de",
response_format="text"
)
return {"text": transcript}
finally:
# Clean up temp file
os.unlink(tmp_path)
except HTTPException:
raise
except Exception as e:
logger.exception(f"Transcription failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@user_router.post("/api/hooks")
async def generate_hooks(
request: Request,
topic_json: str = Form(...),
user_thoughts: str = Form(""),
post_type_id: str = Form(None),
user_id: str = Form(None)
):
"""Generate hook options for a topic."""
session = require_user_session(request)
if not session:
raise HTTPException(status_code=401, detail="Not authenticated")
try:
topic = json.loads(topic_json)
# Company owners can specify a user_id for their employees
target_user_id = user_id if user_id and session.account_type == "company" else session.user_id
if not target_user_id:
raise HTTPException(status_code=400, detail="No user ID available")
user_id = UUID(target_user_id)
hooks = await orchestrator.generate_hooks(
user_id=user_id,
topic=topic,
user_thoughts=user_thoughts,
post_type_id=UUID(post_type_id) if post_type_id else None
)
return {"hooks": hooks}
except Exception as e:
logger.exception(f"Hook generation failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
@user_router.post("/api/posts")
async def create_post(
request: Request,
background_tasks: BackgroundTasks,
topic_json: str = Form(...),
post_type_id: str = Form(None),
user_thoughts: str = Form(""),
selected_hook: str = Form(""),
user_id: str = Form(None)
):
"""Create a new post for the logged-in user or specified user (for company owners)."""
session = require_user_session(request)
if not session:
raise HTTPException(status_code=401, detail="Not authenticated")
# CHECK COMPANY POST LIMIT and capture company_id for quota tracking
quota_company_id = None
if session.account_type in ("company", "employee") and session.company_id:
quota_company_id = UUID(session.company_id)
can_create, error_msg = await db.check_company_post_limit(quota_company_id)
if not can_create:
raise HTTPException(status_code=429, detail=error_msg)
# Company owners can specify a user_id for their employees
target_user_id = user_id if user_id and session.account_type == "company" else session.user_id
if not target_user_id:
raise HTTPException(status_code=400, detail="No user ID available")
user_id = target_user_id
task_id = f"post_{user_id}_{asyncio.get_event_loop().time()}"
progress_store[task_id] = {"status": "starting", "message": "Starte Post-Erstellung...", "progress": 0}
topic = json.loads(topic_json)
async def run_create_post():
try:
def progress_callback(message: str, iteration: int, max_iterations: int, score: int = None, versions: list = None, feedback_list: list = None):
progress = int((iteration / max_iterations) * 100) if iteration > 0 else 5
score_text = f" (Score: {score}/100)" if score else ""
progress_store[task_id] = {
"status": "running", "message": f"{message}{score_text}", "progress": progress,
"iteration": iteration, "max_iterations": max_iterations,
"versions": versions or [], "feedback_list": feedback_list or []
}
result = await orchestrator.create_post(
user_id=UUID(user_id), topic=topic, max_iterations=3,
progress_callback=progress_callback,
post_type_id=UUID(post_type_id) if post_type_id else None,
user_thoughts=user_thoughts,
selected_hook=selected_hook
)
# INCREMENT COMPANY QUOTA after successful creation
if quota_company_id:
try:
await db.increment_company_posts_quota(quota_company_id)
logger.info(f"Incremented post quota for company {quota_company_id}")
except Exception as quota_error:
logger.error(f"Failed to increment post quota: {quota_error}")
progress_store[task_id] = {
"status": "completed", "message": "Post erstellt!", "progress": 100,
"result": {
"post_id": str(result["post_id"]), "final_post": result["final_post"],
"iterations": result["iterations"], "final_score": result["final_score"], "approved": result["approved"]
}
}
except Exception as e:
logger.exception(f"Post creation failed: {e}")
progress_store[task_id] = {"status": "error", "message": str(e), "progress": 0}
background_tasks.add_task(run_create_post)
return {"task_id": task_id}
@user_router.get("/api/posts/{post_id}/suggestions")
async def get_post_suggestions(request: Request, post_id: str):
"""Get AI improvement suggestions for a post based on critic feedback."""
session = require_user_session(request)
if not session:
raise HTTPException(status_code=401, detail="Not authenticated")
try:
post = await db.get_generated_post(UUID(post_id))
if not post:
raise HTTPException(status_code=404, detail="Post not found")
# Verify user owns this post
if str(post.user_id) != session.user_id:
raise HTTPException(status_code=403, detail="Not authorized")
# Get the last critic feedback if available
critic_feedback = None
if post.critic_feedback and len(post.critic_feedback) > 0:
critic_feedback = post.critic_feedback[-1]
suggestions = await orchestrator.generate_improvement_suggestions(
user_id=UUID(session.user_id),
post_content=post.post_content,
critic_feedback=critic_feedback
)
return {"suggestions": suggestions}
except HTTPException:
raise
except Exception as e:
logger.exception(f"Failed to generate suggestions: {e}")
raise HTTPException(status_code=500, detail=str(e))
@user_router.post("/api/posts/{post_id}/revise")
async def revise_post(
request: Request,
post_id: str,
suggestion: str = Form(...)
):
"""Apply a suggestion to a post and save as new version."""
session = require_user_session(request)
if not session:
raise HTTPException(status_code=401, detail="Not authenticated")
try:
post = await db.get_generated_post(UUID(post_id))
if not post:
raise HTTPException(status_code=404, detail="Post not found")
# Verify user owns this post
if str(post.user_id) != session.user_id:
raise HTTPException(status_code=403, detail="Not authorized")
# Apply the suggestion
improved_content = await orchestrator.apply_suggestion_to_post(
user_id=UUID(session.user_id),
post_content=post.post_content,
suggestion=suggestion
)
# Save as new version without score
writer_versions = post.writer_versions or []
writer_versions.append(improved_content)
# Update post with new content and version
updated_post = await db.update_generated_post(
UUID(post_id),
{
"post_content": improved_content,
"writer_versions": writer_versions
}
)
return {
"success": True,
"new_content": improved_content,
"version_count": len(writer_versions)
}
except HTTPException:
raise
except Exception as e:
logger.exception(f"Failed to revise post: {e}")
raise HTTPException(status_code=500, detail=str(e))
@user_router.patch("/api/posts/{post_id}/status")
async def update_post_status(
request: Request,
post_id: str,
status: str = Form(...)
):
"""Update post status (for Kanban board). Sends email when moving to 'approved'."""
session = require_user_session(request)
if not session:
raise HTTPException(status_code=401, detail="Not authenticated")
# Validate status
valid_statuses = ["draft", "approved", "ready", "scheduled", "published"]
if status not in valid_statuses:
raise HTTPException(status_code=400, detail=f"Invalid status. Must be one of: {valid_statuses}")
try:
post = await db.get_generated_post(UUID(post_id))
if not post:
raise HTTPException(status_code=404, detail="Post not found")
# Verify user owns this post or is a company owner of the employee
is_owner = session.user_id and str(post.user_id) == session.user_id
is_company_owner = False
if not is_owner and session.account_type == "company" and session.company_id:
# Check if this post belongs to an employee of this company
profile = await db.get_profile(post.user_id)
if profile and profile.company_id and str(profile.company_id) == session.company_id:
is_company_owner = True
if not is_owner and not is_company_owner:
raise HTTPException(status_code=403, detail="Not authorized")
# Get profile for email settings
profile = await db.get_profile(post.user_id)
# Update status
await db.update_generated_post(UUID(post_id), {"status": status})
# Send email when moving to "approved" (Bearbeitet)
email_sent = False
if status == "approved" and profile and profile.customer_email:
# Build base URL from request
base_url = str(request.base_url).rstrip('/')
email_sent = send_approval_request_email(
to_email=profile.customer_email,
post_id=UUID(post_id),
post_title=post.topic_title or "Untitled Post",
post_content=post.post_content,
base_url=base_url,
image_url=post.image_url
)
return {"success": True, "status": status, "email_sent": email_sent}
except HTTPException:
raise
except Exception as e:
logger.exception(f"Failed to update post status: {e}")
raise HTTPException(status_code=500, detail=str(e))
@user_router.put("/api/posts/{post_id}")
async def update_post(
request: Request,
post_id: str,
content: str = Form(...)
):
"""Manually update post content and save as new version."""
session = require_user_session(request)
if not session:
raise HTTPException(status_code=401, detail="Not authenticated")
try:
post = await db.get_generated_post(UUID(post_id))
if not post:
raise HTTPException(status_code=404, detail="Post not found")
# Verify user owns this post
if str(post.user_id) != session.user_id:
raise HTTPException(status_code=403, detail="Not authorized")
# Save as new version
writer_versions = post.writer_versions or []
writer_versions.append(content)
# Update post with new content and version
updated_post = await db.update_generated_post(
UUID(post_id),
{
"post_content": content,
"writer_versions": writer_versions
}
)
return {
"success": True,
"new_content": content,
"version_count": len(writer_versions)
}
except HTTPException:
raise
except Exception as e:
logger.exception(f"Failed to update post: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ==================== EMAIL ACTION ENDPOINTS ====================
@user_router.get("/api/email-action/{token}", response_class=HTMLResponse)
async def handle_email_action(request: Request, token: str):
"""Handle email action (approve/reject) from email link."""
token_data = validate_token(token)
if not token_data:
return templates.TemplateResponse("email_action_result.html", {
"request": request,
"success": False,
"message": "Dieser Link ist ungültig oder abgelaufen.",
"title": "Link ungültig"
})
post_id = UUID(token_data["post_id"])
action = token_data["action"]
try:
post = await db.get_generated_post(post_id)
if not post:
return templates.TemplateResponse("email_action_result.html", {
"request": request,
"success": False,
"message": "Der Post wurde nicht gefunden.",
"title": "Post nicht gefunden"
})
profile = await db.get_profile(post.user_id)
# Determine new status based on action
if action == "approve":
new_status = "ready"
decision = "approved"
message = "Der Post wurde freigegeben und kann nun im Kalender eingeplant werden."
title = "Post freigegeben"
else: # reject
new_status = "draft"
decision = "rejected"
message = "Der Post wurde zur Überarbeitung zurückgeschickt."
title = "Zurück zur Überarbeitung"
# Update post status
await db.update_generated_post(post_id, {"status": new_status})
# Mark token as used
mark_token_used(token)
# Send notification to creator
if profile and profile.creator_email:
base_url = str(request.base_url).rstrip('/')
send_decision_notification_email(
to_email=profile.creator_email,
post_title=post.topic_title or "Untitled Post",
decision=decision,
base_url=base_url,
post_id=post_id,
image_url=post.image_url
)
return templates.TemplateResponse("email_action_result.html", {
"request": request,
"success": True,
"message": message,
"title": title,
"action": action
})
except Exception as e:
logger.exception(f"Failed to process email action: {e}")
return templates.TemplateResponse("email_action_result.html", {
"request": request,
"success": False,
"message": "Ein Fehler ist aufgetreten. Bitte versuche es später erneut.",
"title": "Fehler"
})
# ==================== SETTINGS ENDPOINTS ====================
@user_router.get("/settings", response_class=HTMLResponse)
async def settings_page(request: Request):
"""User settings page."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
try:
profile = await db.get_profile(UUID(session.user_id))
profile_picture = session.linkedin_picture or await get_user_profile_picture(UUID(session.user_id))
# Get LinkedIn account if linked
linkedin_account = await db.get_linkedin_account(UUID(session.user_id))
return templates.TemplateResponse("settings.html", {
"request": request,
"page": "settings",
"session": session,
"profile": profile,
"profile_picture": profile_picture,
"linkedin_account": linkedin_account
})
except Exception as e:
logger.error(f"Error loading settings: {e}")
return templates.TemplateResponse("settings.html", {
"request": request,
"page": "settings",
"session": session,
"error": str(e)
})
@user_router.post("/api/settings/emails")
async def update_email_settings(
request: Request,
creator_email: str = Form(""),
customer_email: str = Form("")
):
"""Update email settings for the customer."""
session = require_user_session(request)
if not session:
raise HTTPException(status_code=401, detail="Not authenticated")
try:
# Update profile with new email settings
await db.update_profile(
UUID(session.user_id),
{
"creator_email": creator_email if creator_email else None,
"customer_email": customer_email if customer_email else None
}
)
return {"success": True}
except Exception as e:
logger.exception(f"Failed to update email settings: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ==================== LINKEDIN ACCOUNT LINKING ====================
@user_router.get("/settings/linkedin/connect")
async def linkedin_connect(request: Request):
"""Initiate LinkedIn OAuth for account linking."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
# Only employees can link LinkedIn accounts
if session.account_type not in ["employee", "ghostwriter"]:
return RedirectResponse(url="/settings", status_code=302)
# Generate CSRF state token
import secrets
state = secrets.token_urlsafe(32)
# Build LinkedIn OAuth URL
from urllib.parse import urlencode
params = {
"response_type": "code",
"client_id": settings.linkedin_client_id,
"redirect_uri": settings.linkedin_redirect_uri,
"state": state,
"scope": "openid profile email w_member_social"
}
oauth_url = f"https://www.linkedin.com/oauth/v2/authorization?{urlencode(params)}"
# Store state in cookie for verification
response = RedirectResponse(url=oauth_url, status_code=302)
response.set_cookie("linkedin_oauth_state", state, max_age=600, httponly=True, secure=True, samesite="lax")
return response
@user_router.get("/settings/linkedin/callback")
async def linkedin_callback(
request: Request,
code: str = "",
state: str = "",
error: str = ""
):
"""Handle LinkedIn OAuth callback."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login?error=auth_required", status_code=302)
# Check for OAuth errors
if error:
logger.error(f"LinkedIn OAuth error: {error}")
return RedirectResponse(url="/settings?error=linkedin_auth_failed", status_code=302)
# Verify CSRF state
stored_state = request.cookies.get("linkedin_oauth_state")
if not stored_state or stored_state != state:
logger.error("LinkedIn OAuth state mismatch")
return RedirectResponse(url="/settings?error=invalid_state", status_code=302)
try:
# Exchange code for access token
import httpx
from datetime import timedelta
from src.database.models import LinkedInAccount
from src.utils.encryption import encrypt_token
async with httpx.AsyncClient() as client:
token_response = await client.post(
"https://www.linkedin.com/oauth/v2/accessToken",
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": settings.linkedin_redirect_uri,
"client_id": settings.linkedin_client_id,
"client_secret": settings.linkedin_client_secret
},
headers={"Content-Type": "application/x-www-form-urlencoded"}
)
if token_response.status_code != 200:
logger.error(f"Token exchange failed: {token_response.status_code} - {token_response.text}")
return RedirectResponse(url="/settings?error=token_exchange_failed", status_code=302)
token_data = token_response.json()
access_token = token_data["access_token"]
expires_in = token_data.get("expires_in", 5184000) # Default 60 days
refresh_token = token_data.get("refresh_token") # May not be provided
scope = token_data.get("scope", "")
# Get LinkedIn user info
userinfo_response = await client.get(
"https://api.linkedin.com/v2/userinfo",
headers={"Authorization": f"Bearer {access_token}"}
)
if userinfo_response.status_code != 200:
logger.error(f"Userinfo fetch failed: {userinfo_response.status_code}")
return RedirectResponse(url="/settings?error=userinfo_failed", status_code=302)
userinfo = userinfo_response.json()
# Extract LinkedIn user data
linkedin_user_id = userinfo.get("sub")
linkedin_name = userinfo.get("name", "")
linkedin_picture = userinfo.get("picture")
# Get vanity name if available (from profile API - optional)
linkedin_vanity_name = None
try:
profile_response = await client.get(
"https://api.linkedin.com/v2/me",
headers={"Authorization": f"Bearer {access_token}"}
)
if profile_response.status_code == 200:
profile_data = profile_response.json()
linkedin_vanity_name = profile_data.get("vanityName")
except Exception as e:
logger.warning(f"Could not fetch vanity name: {e}")
# Encrypt tokens
encrypted_access = encrypt_token(access_token)
encrypted_refresh = encrypt_token(refresh_token) if refresh_token else None
# Check if account already exists
existing_account = await db.get_linkedin_account(UUID(session.user_id))
if existing_account:
# Update existing account
await db.update_linkedin_account(
existing_account.id,
{
"linkedin_user_id": linkedin_user_id,
"linkedin_vanity_name": linkedin_vanity_name,
"linkedin_name": linkedin_name,
"linkedin_picture": linkedin_picture,
"access_token": encrypted_access,
"refresh_token": encrypted_refresh,
"token_expires_at": datetime.now(timezone.utc) + timedelta(seconds=expires_in),
"granted_scopes": scope.split() if scope else [],
"is_active": True,
"last_error": None,
"last_error_at": None
}
)
logger.info(f"Updated LinkedIn account for user {session.user_id}")
else:
# Create new account
new_account = LinkedInAccount(
user_id=UUID(session.user_id),
linkedin_user_id=linkedin_user_id,
linkedin_vanity_name=linkedin_vanity_name,
linkedin_name=linkedin_name,
linkedin_picture=linkedin_picture,
access_token=encrypted_access,
refresh_token=encrypted_refresh,
token_expires_at=datetime.now(timezone.utc) + timedelta(seconds=expires_in),
granted_scopes=scope.split() if scope else []
)
await db.create_linkedin_account(new_account)
logger.info(f"Created LinkedIn account for user {session.user_id}")
# Clear state cookie and redirect to settings
response = RedirectResponse(url="/settings?success=linkedin_connected", status_code=302)
response.delete_cookie("linkedin_oauth_state")
return response
except Exception as e:
logger.exception(f"LinkedIn OAuth callback error: {e}")
return RedirectResponse(url="/settings?error=connection_failed", status_code=302)
@user_router.post("/api/settings/linkedin/disconnect")
async def linkedin_disconnect(request: Request):
"""Remove LinkedIn account connection."""
session = require_user_session(request)
if not session:
raise HTTPException(status_code=401, detail="Not authenticated")
try:
# Get account
linkedin_account = await db.get_linkedin_account(UUID(session.user_id))
if not linkedin_account:
return {"success": False, "error": "No LinkedIn account found"}
# Optional: Revoke token with LinkedIn (not strictly necessary)
# LinkedIn doesn't have a reliable revocation endpoint, so we just delete from DB
# Delete from database
await db.delete_linkedin_account(linkedin_account.id)
logger.info(f"Disconnected LinkedIn account for user {session.user_id}")
return {"success": True}
except Exception as e:
logger.exception(f"Failed to disconnect LinkedIn: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ==================== COMPANY MANAGEMENT ENDPOINTS ====================
@user_router.get("/company/strategy", response_class=HTMLResponse)
async def company_strategy_page(request: Request, success: bool = False):
"""Company strategy editing page."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
if session.account_type != "company" or not session.company_id:
return RedirectResponse(url="/", status_code=302)
company = await db.get_company(UUID(session.company_id))
strategy = company.company_strategy if company else {}
return templates.TemplateResponse("company_strategy.html", {
"request": request,
"page": "strategy",
"session": session,
"company": company,
"strategy": strategy,
"success": success
})
@user_router.post("/company/strategy")
async def company_strategy_submit(request: Request):
"""Save company strategy."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
if session.account_type != "company" or not session.company_id:
return RedirectResponse(url="/", status_code=302)
form = await request.form()
strategy = {
"mission": form.get("mission", ""),
"vision": form.get("vision", ""),
"brand_voice": form.get("brand_voice", ""),
"tone_guidelines": form.get("tone_guidelines", ""),
"target_audience": form.get("target_audience", ""),
"content_pillars": [p for p in form.getlist("content_pillar") if p],
"dos": [d for d in form.getlist("do_item") if d],
"donts": [d for d in form.getlist("dont_item") if d]
}
try:
await db.update_company(UUID(session.company_id), {
"company_strategy": strategy
})
return RedirectResponse(url="/company/strategy?success=true", status_code=302)
except Exception as e:
logger.error(f"Error saving company strategy: {e}")
company = await db.get_company(UUID(session.company_id))
return templates.TemplateResponse("company_strategy.html", {
"request": request,
"page": "strategy",
"session": session,
"company": company,
"strategy": strategy,
"error": str(e)
})
@user_router.get("/company/accounts", response_class=HTMLResponse)
async def company_accounts_page(request: Request):
"""Company employee management page."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
# Only company owners can access
if session.account_type != "company" or not session.company_id:
return RedirectResponse(url="/", status_code=302)
company_id = UUID(session.company_id)
employees = await db.get_company_employees(company_id)
pending_invitations = await db.get_pending_invitations(company_id)
profile_picture = session.linkedin_picture
return templates.TemplateResponse("company_accounts.html", {
"request": request,
"page": "accounts",
"session": session,
"employees": employees,
"pending_invitations": pending_invitations,
"profile_picture": profile_picture
})
# ==================== COMPANY MANAGE ROUTES ====================
@user_router.get("/company/manage", response_class=HTMLResponse)
async def company_manage_page(request: Request, employee_id: str = None):
"""Company content management page - manage employee posts and research."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
# Only company owners can access
if session.account_type != "company" or not session.company_id:
return RedirectResponse(url="/", status_code=302)
company_id = UUID(session.company_id)
# Get all employees with completed onboarding
all_employees = await db.get_company_employees(company_id)
active_employees = [emp for emp in all_employees if emp.onboarding_status == "completed"]
# Build display info for employees
# Note: emp is a User object from get_company_employees which has linkedin_name and linkedin_picture
active_employees_info = []
for emp in active_employees:
active_employees_info.append({
"id": str(emp.id),
"email": emp.email,
"display_name": emp.linkedin_name or emp.display_name or emp.email,
"linkedin_picture": emp.linkedin_picture,
"onboarding_status": emp.onboarding_status
})
# Selected employee data
selected_employee = None
employee_posts = []
pending_posts = 0
approved_posts = 0
if employee_id:
# Find the selected employee
for emp in active_employees_info:
if emp["id"] == employee_id:
selected_employee = emp
break
if selected_employee:
# Get employee's user_id
emp_profile = await db.get_profile(UUID(employee_id))
if emp_profile:
employee_posts = await db.get_generated_posts(emp_profile.id)
pending_posts = len([p for p in employee_posts if p.status in ['draft', 'pending']])
approved_posts = len([p for p in employee_posts if p.status in ['approved', 'published']])
return templates.TemplateResponse("company_manage.html", {
"request": request,
"page": "manage",
"session": session,
"active_employees": active_employees_info,
"selected_employee": selected_employee,
"employee_posts": employee_posts,
"pending_posts": pending_posts,
"approved_posts": approved_posts
})
@user_router.get("/company/manage/posts", response_class=HTMLResponse)
async def company_manage_posts(request: Request, employee_id: str = None):
"""View all posts for a specific employee."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
if session.account_type != "company" or not session.company_id:
return RedirectResponse(url="/", status_code=302)
if not employee_id:
return RedirectResponse(url="/company/manage", status_code=302)
# Get employee info
emp_profile = await db.get_profile(UUID(employee_id))
if not emp_profile:
return RedirectResponse(url="/company/manage", status_code=302)
# Verify employee belongs to this company
emp_user = await db.get_user(UUID(employee_id))
if not emp_user or str(emp_user.company_id) != session.company_id:
return RedirectResponse(url="/company/manage", status_code=302)
profile = await db.get_profile(emp_profile.id)
posts = await db.get_generated_posts(emp_profile.id)
return templates.TemplateResponse("company_manage_posts.html", {
"request": request,
"page": "manage",
"session": session,
"employee_id": employee_id,
"employee_name": emp_user.linkedin_name or emp_profile.display_name or emp_user.email,
"profile": profile,
"posts": posts,
"total_posts": len(posts)
})
@user_router.get("/company/manage/post/{post_id}", response_class=HTMLResponse)
async def company_manage_post_detail(request: Request, post_id: str, employee_id: str = None):
"""View a specific post for an employee."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
if session.account_type != "company" or not session.company_id:
return RedirectResponse(url="/", status_code=302)
if not employee_id:
return RedirectResponse(url="/company/manage", status_code=302)
# Get employee info
emp_profile = await db.get_profile(UUID(employee_id))
if not emp_profile:
return RedirectResponse(url="/company/manage", status_code=302)
# Verify employee belongs to this company
emp_user = await db.get_user(UUID(employee_id))
if not emp_user or str(emp_user.company_id) != session.company_id:
return RedirectResponse(url="/company/manage", status_code=302)
post = await db.get_generated_post(UUID(post_id))
if not post or str(post.user_id) != str(emp_profile.id):
return RedirectResponse(url=f"/company/manage/posts?employee_id={employee_id}", status_code=302)
profile = await db.get_profile(emp_profile.id)
# Convert media_items to dicts for JSON serialization in template
media_items_dict = []
if post.media_items:
media_items_dict = [
item.model_dump(mode='json') if hasattr(item, 'model_dump') else (item.dict() if hasattr(item, 'dict') else item)
for item in post.media_items
]
return templates.TemplateResponse("company_manage_post_detail.html", {
"request": request,
"page": "manage",
"session": session,
"employee_id": employee_id,
"employee_name": emp_user.linkedin_name or emp_profile.display_name or emp_user.email,
"profile": profile,
"post": post,
"media_items_dict": media_items_dict
})
@user_router.get("/company/manage/research", response_class=HTMLResponse)
async def company_manage_research(request: Request, employee_id: str = None):
"""Research page for a specific employee."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
if session.account_type != "company" or not session.company_id:
return RedirectResponse(url="/", status_code=302)
if not employee_id:
return RedirectResponse(url="/company/manage", status_code=302)
# Get employee info
emp_profile = await db.get_profile(UUID(employee_id))
if not emp_profile:
return RedirectResponse(url="/company/manage", status_code=302)
# Verify employee belongs to this company
emp_user = await db.get_user(UUID(employee_id))
if not emp_user or str(emp_user.company_id) != session.company_id:
return RedirectResponse(url="/company/manage", status_code=302)
# Check research limit
limit_reached = False
limit_message = ""
if session.company_id:
can_create, error_msg = await db.check_company_research_limit(UUID(session.company_id))
limit_reached = not can_create
limit_message = error_msg
return templates.TemplateResponse("company_manage_research.html", {
"request": request,
"page": "manage",
"session": session,
"employee_id": employee_id,
"employee_name": emp_user.linkedin_name or emp_profile.display_name or emp_user.email,
"user_id": str(emp_profile.id),
"limit_reached": limit_reached,
"limit_message": limit_message
})
@user_router.get("/company/manage/create", response_class=HTMLResponse)
async def company_manage_create(request: Request, employee_id: str = None):
"""Create post page for a specific employee."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
if session.account_type != "company" or not session.company_id:
return RedirectResponse(url="/", status_code=302)
if not employee_id:
return RedirectResponse(url="/company/manage", status_code=302)
# Get employee info
emp_profile = await db.get_profile(UUID(employee_id))
if not emp_profile:
return RedirectResponse(url="/company/manage", status_code=302)
# Verify employee belongs to this company
emp_user = await db.get_user(UUID(employee_id))
if not emp_user or str(emp_user.company_id) != session.company_id:
return RedirectResponse(url="/company/manage", status_code=302)
# Check post limit
limit_reached = False
limit_message = ""
if session.company_id:
can_create, error_msg = await db.check_company_post_limit(UUID(session.company_id))
limit_reached = not can_create
limit_message = error_msg
return templates.TemplateResponse("company_manage_create.html", {
"request": request,
"page": "manage",
"session": session,
"employee_id": employee_id,
"employee_name": emp_user.linkedin_name or emp_profile.display_name or emp_user.email,
"user_id": str(emp_profile.id),
"limit_reached": limit_reached,
"limit_message": limit_message
})
# ==================== EMPLOYEE ROUTES ====================
@user_router.get("/employee/strategy", response_class=HTMLResponse)
async def employee_strategy_page(request: Request):
"""Read-only company strategy view for employees."""
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
# Only employees can access this page
if session.account_type != "employee" or not session.company_id:
return RedirectResponse(url="/", status_code=302)
company = await db.get_company(UUID(session.company_id))
strategy = company.company_strategy if company else {}
return templates.TemplateResponse("employee_strategy.html", {
"request": request,
"page": "strategy",
"session": session,
"company": company,
"strategy": strategy
})
@user_router.post("/api/company/invite")
async def send_company_invitation(request: Request):
"""Send invitation to a new employee."""
session = require_user_session(request)
if not session:
raise HTTPException(status_code=401, detail="Not authenticated")
if session.account_type != "company" or not session.company_id:
raise HTTPException(status_code=403, detail="Not a company owner")
try:
# CHECK EMPLOYEE LIMIT
company_id = UUID(session.company_id)
can_add, error_msg = await db.check_company_employee_limit(company_id)
if not can_add:
return {"success": False, "error": error_msg}
data = await request.json()
email = data.get("email", "").lower().strip()
if not email:
return {"success": False, "error": "E-Mail erforderlich"}
# Check if user already exists in the system
existing_user = await db.get_user_by_email(email)
if existing_user:
# User exists - check their status
if existing_user.company_id and str(existing_user.company_id) == session.company_id:
return {"success": False, "error": "Benutzer ist bereits Mitarbeiter in deinem Unternehmen"}
elif existing_user.company_id:
return {"success": False, "error": "Diese E-Mail ist bereits bei einem anderen Unternehmen registriert"}
else:
return {"success": False, "error": "Diese E-Mail ist bereits als Ghostwriter-Account registriert"}
# Check if email already has pending invitation
existing = await db.get_invitations_by_email(email)
for inv in existing:
inv_status = inv.status.value if hasattr(inv.status, 'value') else inv.status
if inv_status == "pending" and str(inv.company_id) == session.company_id:
return {"success": False, "error": "Einladung bereits gesendet"}
# Create invitation
from src.database.models import Invitation
invitation = Invitation(
email=email,
token=generate_invitation_token(),
expires_at=get_invitation_expiry(),
company_id=UUID(session.company_id),
invited_by_user_id=UUID(session.user_id)
)
created_invitation = await db.create_invitation(invitation)
# Send invitation email
from src.services.email_service import send_invitation_email
base_url = str(request.base_url).rstrip('/')
email_sent = send_invitation_email(
to_email=email,
company_name=session.company_name or "Unternehmen",
inviter_name=session.linkedin_name or session.email,
token=created_invitation.token,
base_url=base_url
)
return {"success": True, "email_sent": email_sent}
except Exception as e:
logger.exception(f"Failed to send invitation: {e}")
return {"success": False, "error": str(e)}
@user_router.delete("/api/company/invitations/{invitation_id}")
async def cancel_invitation(request: Request, invitation_id: str):
"""Cancel a pending invitation."""
session = require_user_session(request)
if not session:
raise HTTPException(status_code=401, detail="Not authenticated")
if session.account_type != "company" or not session.company_id:
raise HTTPException(status_code=403, detail="Not a company owner")
try:
invitation = await db.get_invitation(UUID(invitation_id))
if not invitation or str(invitation.company_id) != session.company_id:
return {"success": False, "error": "Einladung nicht gefunden"}
await db.update_invitation(UUID(invitation_id), {"status": "cancelled"})
return {"success": True}
except Exception as e:
logger.exception(f"Failed to cancel invitation: {e}")
return {"success": False, "error": str(e)}
@user_router.delete("/api/company/employees/{user_id}")
async def remove_employee(request: Request, user_id: str):
"""Remove an employee from the company - deletes user completely."""
session = require_user_session(request)
if not session:
raise HTTPException(status_code=401, detail="Not authenticated")
if session.account_type != "company" or not session.company_id:
raise HTTPException(status_code=403, detail="Not a company owner")
try:
user = await db.get_user(UUID(user_id))
if not user or str(user.company_id) != session.company_id:
return {"success": False, "error": "Mitarbeiter nicht gefunden"}
# Store user info before deletion for email
employee_email = user.email
employee_name = user.display_name or user.linkedin_name or user.email
company_name = session.company_name or "Unternehmen"
# Completely delete user and all related data
await db.delete_user_completely(UUID(user_id))
# Send notification email
from src.services.email_service import send_employee_removal_email
email_sent = send_employee_removal_email(
to_email=employee_email,
employee_name=employee_name,
company_name=company_name
)
logger.info(f"Removed employee {user_id} from company {session.company_id}, email_sent={email_sent}")
return {"success": True, "email_sent": email_sent}
except Exception as e:
logger.exception(f"Failed to remove employee: {e}")
return {"success": False, "error": str(e)}
# ==================== ONBOARDING API ENDPOINTS ====================
@user_router.post("/api/onboarding/scrape-posts")
async def api_scrape_posts(request: Request):
"""Scrape posts for onboarding."""
session = require_user_session(request)
if not session:
raise HTTPException(status_code=401, detail="Not authenticated")
if not session.user_id:
return {"success": False, "error": "User not found"}
try:
data = await request.json()
user_id = UUID(data.get("user_id", session.user_id))
profile = await db.get_profile(user_id)
if not profile:
return {"success": False, "error": "User profile not found"}
from src.scraper import scraper
from src.database.models import LinkedInPost
raw_posts = await scraper.scrape_posts(profile.linkedin_url, limit=50)
parsed_posts = scraper.parse_posts_data(raw_posts)
linkedin_posts = []
for post_data in parsed_posts:
post = LinkedInPost(user_id=user_id, **post_data)
linkedin_posts.append(post)
if linkedin_posts:
await db.save_linkedin_posts(linkedin_posts)
return {"success": True, "posts_count": len(linkedin_posts)}
except Exception as e:
logger.exception(f"Failed to scrape posts: {e}")
return {"success": False, "error": str(e)}
@user_router.post("/api/onboarding/add-manual-posts")
async def api_add_manual_posts(request: Request):
"""Add manual example posts."""
session = require_user_session(request)
if not session:
raise HTTPException(status_code=401, detail="Not authenticated")
try:
data = await request.json()
user_id = UUID(data.get("user_id", session.user_id))
posts_data = data.get("posts", [])
from uuid import uuid4
from datetime import datetime
from src.database.models import LinkedInPost
posts = []
for p in posts_data:
post = LinkedInPost(
user_id=user_id,
post_text=p.get("post_text", ""),
post_url=f"manual://{uuid4()}",
post_date=datetime.now(timezone.utc),
classification_method="manual"
)
posts.append(post)
if posts:
await db.save_linkedin_posts(posts)
return {"success": True, "count": len(posts)}
except Exception as e:
logger.exception(f"Failed to add manual posts: {e}")
return {"success": False, "error": str(e)}
@user_router.delete("/api/onboarding/remove-manual-post/{post_id}")
async def api_remove_manual_post(request: Request, post_id: str):
"""Remove a manual example post."""
session = require_user_session(request)
if not session:
raise HTTPException(status_code=401, detail="Not authenticated")
try:
await db.delete_linkedin_post(UUID(post_id))
return {"success": True}
except Exception as e:
logger.exception(f"Failed to remove manual post: {e}")
return {"success": False, "error": str(e)}
@user_router.post("/api/onboarding/add-reference")
async def api_add_reference_profile(request: Request):
"""Add a reference profile for scraping."""
session = require_user_session(request)
if not session:
raise HTTPException(status_code=401, detail="Not authenticated")
try:
data = await request.json()
user_id = UUID(data.get("user_id", session.user_id))
linkedin_url = data.get("linkedin_url", "")
from src.database.models import ReferenceProfile
profile = ReferenceProfile(
user_id=user_id,
linkedin_url=linkedin_url
)
await db.create_reference_profile(profile)
return {"success": True}
except Exception as e:
logger.exception(f"Failed to add reference profile: {e}")
return {"success": False, "error": str(e)}
@user_router.delete("/api/onboarding/remove-reference/{profile_id}")
async def api_remove_reference_profile(request: Request, profile_id: str):
"""Remove a reference profile."""
session = require_user_session(request)
if not session:
raise HTTPException(status_code=401, detail="Not authenticated")
try:
await db.delete_reference_profile(UUID(profile_id))
return {"success": True}
except Exception as e:
logger.exception(f"Failed to remove reference profile: {e}")
return {"success": False, "error": str(e)}
@user_router.post("/api/onboarding/classify-posts")
async def api_classify_posts(request: Request):
"""Run automatic post classification."""
session = require_user_session(request)
if not session:
raise HTTPException(status_code=401, detail="Not authenticated")
try:
data = await request.json()
user_id = UUID(data.get("user_id", session.user_id))
classified_count = await orchestrator.classify_posts(user_id)
return {"success": True, "classified_count": classified_count}
except Exception as e:
logger.exception(f"Failed to classify posts: {e}")
return {"success": False, "error": str(e)}
@user_router.post("/api/onboarding/categorize-post")
async def api_categorize_single_post(request: Request):
"""Manually categorize a single post."""
session = require_user_session(request)
if not session:
raise HTTPException(status_code=401, detail="Not authenticated")
try:
data = await request.json()
post_id = UUID(data.get("post_id"))
post_type_id = UUID(data.get("post_type_id"))
await db.update_post_classification(
post_id=post_id,
post_type_id=post_type_id,
classification_method="manual",
classification_confidence=1.0
)
return {"success": True}
except Exception as e:
logger.exception(f"Failed to categorize post: {e}")
return {"success": False, "error": str(e)}
# ==================== CALENDAR / SCHEDULING ENDPOINTS ====================
@user_router.get("/company/calendar", response_class=HTMLResponse)
async def company_calendar_page(request: Request, month: int = None, year: int = None, view: str = "month", week_start: str = None):
"""Company posting calendar page."""
from datetime import date, datetime, timedelta
import calendar
session = require_user_session(request)
if not session:
return RedirectResponse(url="/login", status_code=302)
if session.account_type != "company" or not session.company_id:
return RedirectResponse(url="/", status_code=302)
company_id = UUID(session.company_id)
# Determine current month/year
today = date.today()
current_month = month or today.month
current_year = year or today.year
# Calculate prev/next month
if current_month == 1:
prev_month, prev_year = 12, current_year - 1
else:
prev_month, prev_year = current_month - 1, current_year
if current_month == 12:
next_month, next_year = 1, current_year + 1
else:
next_month, next_year = current_month + 1, current_year
# Get month name in German
month_names = ["Januar", "Februar", "Marz", "April", "Mai", "Juni",
"Juli", "August", "September", "Oktober", "November", "Dezember"]
month_name = month_names[current_month - 1]
# Get all posts for the company (already enriched with employee info - single optimized query)
all_posts = await db.get_scheduled_posts_for_company(company_id)
# Build employee list from posts (no extra queries needed)
employee_map = {}
for post in all_posts:
user_id = str(post.get("user_id", ""))
if user_id and user_id not in employee_map:
employee_map[user_id] = {
"user_id": user_id,
"name": post.get("employee_name", "Unbekannt"),
"user_id": post.get("employee_user_id")
}
employee_customers = list(employee_map.values())
# Create calendar structure
cal = calendar.Calendar(firstweekday=0) # Monday first
month_days = cal.monthdayscalendar(current_year, current_month)
# Build posts by date
posts_by_date = {}
unscheduled_posts = []
for idx, post in enumerate(all_posts):
user_id = str(post.get("user_id", ""))
employee_index = list(employee_map.keys()).index(user_id) if user_id in employee_map else 0
# Parse created_at to datetime if it's a string
created_at = post.get("created_at")
if isinstance(created_at, str):
try:
created_at = datetime.fromisoformat(created_at.replace("Z", "+00:00"))
except (ValueError, TypeError):
created_at = None
post_data = {
"id": str(post.get("id")),
"topic_title": post.get("topic_title", "Ohne Titel"),
"post_content": post.get("post_content", ""),
"status": post.get("status", "draft"),
"employee_name": post.get("employee_name", "Unbekannt"),
"employee_index": employee_index,
"created_at": created_at
}
scheduled_at = post.get("scheduled_at")
if scheduled_at and post.get("status") in ("scheduled", "published"):
if isinstance(scheduled_at, str):
scheduled_dt = datetime.fromisoformat(scheduled_at.replace("Z", "+00:00"))
else:
scheduled_dt = scheduled_at
date_key = scheduled_dt.strftime("%Y-%m-%d")
post_data["time"] = scheduled_dt.strftime("%H:%M")
if date_key not in posts_by_date:
posts_by_date[date_key] = []
posts_by_date[date_key].append(post_data)
elif post.get("status") == "ready":
# Unscheduled but ready for scheduling
unscheduled_posts.append(post_data)
# Build calendar weeks
calendar_weeks = []
if view == "week":
# Week view: show a single week (Mon-Sun)
if week_start:
try:
ws_date = date.fromisoformat(week_start)
except ValueError:
ws_date = today - timedelta(days=today.weekday())
else:
ws_date = today - timedelta(days=today.weekday())
week_data = []
for i in range(7):
d = ws_date + timedelta(days=i)
date_str = d.isoformat()
week_data.append({
"day": d.day,
"date": date_str,
"other_month": d.month != current_month,
"is_today": d == today,
"posts": posts_by_date.get(date_str, []),
"weekday_name": ["Mo", "Di", "Mi", "Do", "Fr", "Sa", "So"][i],
"full_date": f"{d.day}. {month_names[d.month - 1]}"
})
calendar_weeks.append(week_data)
prev_ws = ws_date - timedelta(weeks=1)
next_ws = ws_date + timedelta(weeks=1)
ws_end = ws_date + timedelta(days=6)
# Week label e.g. "3. - 9. Feb 2026"
if ws_date.month == ws_end.month:
week_label = f"{ws_date.day}. - {ws_end.day}. {month_names[ws_date.month - 1]} {ws_date.year}"
else:
week_label = f"{ws_date.day}. {month_names[ws_date.month - 1]} - {ws_end.day}. {month_names[ws_end.month - 1]} {ws_end.year}"
else:
# Month view
for week in month_days:
week_data = []
for day in week:
if day == 0:
week_data.append({
"day": "",
"date": "",
"other_month": True,
"is_today": False,
"posts": []
})
else:
date_str = f"{current_year}-{current_month:02d}-{day:02d}"
is_today = (day == today.day and current_month == today.month and current_year == today.year)
week_data.append({
"day": day,
"date": date_str,
"other_month": False,
"is_today": is_today,
"posts": posts_by_date.get(date_str, [])
})
calendar_weeks.append(week_data)
week_label = None
prev_ws = None
next_ws = None
return templates.TemplateResponse("company_calendar.html", {
"request": request,
"page": "calendar",
"session": session,
"month": current_month,
"year": current_year,
"month_name": month_name,
"prev_month": prev_month,
"prev_year": prev_year,
"next_month": next_month,
"next_year": next_year,
"current_month": today.month,
"current_year": today.year,
"employees": employee_customers,
"calendar_weeks": calendar_weeks,
"unscheduled_posts": unscheduled_posts,
"view": view,
"week_label": week_label,
"week_start": prev_ws.isoformat() if prev_ws else None,
"prev_week_start": prev_ws.isoformat() if prev_ws else None,
"next_week_start": next_ws.isoformat() if next_ws else None
})
@user_router.get("/api/posts/{post_id}")
async def get_post_api(request: Request, post_id: str):
"""Get a single post as JSON."""
session = require_user_session(request)
if not session:
raise HTTPException(status_code=401, detail="Not authenticated")
try:
post = await db.get_generated_post(UUID(post_id))
if not post:
raise HTTPException(status_code=404, detail="Post not found")
# Verify access (owner or company owner)
is_owner = session.user_id and str(post.user_id) == session.user_id
is_company_owner = False
if not is_owner and session.account_type == "company" and session.company_id:
profile = await db.get_profile(post.user_id)
if profile and profile.company_id and str(profile.company_id) == session.company_id:
is_company_owner = True
if not is_owner and not is_company_owner:
raise HTTPException(status_code=403, detail="Not authorized")
return {
"id": str(post.id),
"topic_title": post.topic_title,
"post_content": post.post_content,
"status": post.status,
"scheduled_at": post.scheduled_at.isoformat() if post.scheduled_at else None,
"created_at": post.created_at.isoformat() if post.created_at else None
}
except HTTPException:
raise
except Exception as e:
logger.exception(f"Failed to get post: {e}")
raise HTTPException(status_code=500, detail=str(e))
@user_router.post("/api/posts/{post_id}/schedule")
async def schedule_post(request: Request, post_id: str, scheduled_at: str = Form(...)):
"""Schedule a post for publishing."""
from datetime import datetime
session = require_user_session(request)
if not session:
raise HTTPException(status_code=401, detail="Not authenticated")
# Only company owners can schedule for now
if session.account_type != "company" or not session.company_id:
raise HTTPException(status_code=403, detail="Only company owners can schedule posts")
try:
post = await db.get_generated_post(UUID(post_id))
if not post:
raise HTTPException(status_code=404, detail="Post not found")
# Verify this post belongs to a company employee
profile = await db.get_profile(post.user_id)
if not profile or not profile.company_id or str(profile.company_id) != session.company_id:
raise HTTPException(status_code=403, detail="Not authorized")
# Only ready posts can be scheduled
if post.status not in ["ready", "scheduled"]:
raise HTTPException(status_code=400, detail="Only ready (approved by customer) posts can be scheduled")
# Parse scheduled_at
try:
scheduled_datetime = datetime.fromisoformat(scheduled_at.replace("Z", "+00:00"))
except ValueError:
raise HTTPException(status_code=400, detail="Invalid datetime format")
# Schedule the post
updated_post = await db.schedule_post(
post_id=UUID(post_id),
scheduled_at=scheduled_datetime,
scheduled_by_user_id=UUID(session.user_id)
)
return {
"success": True,
"scheduled_at": updated_post.scheduled_at.isoformat() if updated_post.scheduled_at else None
}
except HTTPException:
raise
except Exception as e:
logger.exception(f"Failed to schedule post: {e}")
raise HTTPException(status_code=500, detail=str(e))
@user_router.post("/api/posts/{post_id}/unschedule")
async def unschedule_post(request: Request, post_id: str):
"""Remove scheduling from a post."""
session = require_user_session(request)
if not session:
raise HTTPException(status_code=401, detail="Not authenticated")
# Only company owners can unschedule for now
if session.account_type != "company" or not session.company_id:
raise HTTPException(status_code=403, detail="Only company owners can unschedule posts")
try:
post = await db.get_generated_post(UUID(post_id))
if not post:
raise HTTPException(status_code=404, detail="Post not found")
# Verify this post belongs to a company employee
profile = await db.get_profile(post.user_id)
if not profile or not profile.company_id or str(profile.company_id) != session.company_id:
raise HTTPException(status_code=403, detail="Not authorized")
# Unschedule the post
updated_post = await db.unschedule_post(UUID(post_id))
return {
"success": True,
"status": updated_post.status
}
except HTTPException:
raise
except Exception as e:
logger.exception(f"Failed to unschedule post: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ==================== POST IMAGE UPLOAD ====================
@user_router.post("/api/posts/{post_id}/image")
async def upload_post_image(request: Request, post_id: str):
"""DEPRECATED: Upload or replace an image for a post.
Use /api/posts/{post_id}/media instead. This endpoint is kept for backward compatibility.
"""
# Delegate to new media upload endpoint
result = await upload_post_media(request, post_id)
# Return legacy format for compatibility
if result.get("success") and result.get("media_item"):
return {"success": True, "image_url": result["media_item"]["url"]}
return result
@user_router.delete("/api/posts/{post_id}/image")
async def delete_post_image(request: Request, post_id: str):
"""Remove the image from a post."""
session = require_user_session(request)
if not session:
raise HTTPException(status_code=401, detail="Not authenticated")
try:
post = await db.get_generated_post(UUID(post_id))
if not post:
raise HTTPException(status_code=404, detail="Post not found")
# Check authorization: owner or company owner
is_owner = session.user_id and str(post.user_id) == session.user_id
is_company_owner = False
if not is_owner and session.account_type == "company" and session.company_id:
profile = await db.get_profile(post.user_id)
if profile and profile.company_id and str(profile.company_id) == session.company_id:
is_company_owner = True
if not is_owner and not is_company_owner:
raise HTTPException(status_code=403, detail="Not authorized")
# Delete from storage
if post.image_url:
try:
await storage.delete_image(post.image_url)
except Exception as e:
logger.warning(f"Failed to delete image from storage: {e}")
# Clear image_url on post
await db.update_generated_post(UUID(post_id), {"image_url": None})
return {"success": True}
except HTTPException:
raise
except Exception as e:
logger.exception(f"Failed to delete post image: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ==================== POST MEDIA UPLOAD (MULTI-MEDIA) ====================
@user_router.post("/api/posts/{post_id}/media")
async def upload_post_media(request: Request, post_id: str):
"""Upload a media item (image or video). Max 3 items per post."""
session = require_user_session(request)
if not session:
raise HTTPException(status_code=401, detail="Not authenticated")
try:
post = await db.get_generated_post(UUID(post_id))
if not post:
raise HTTPException(status_code=404, detail="Post not found")
# Check authorization: owner or company owner
is_owner = session.user_id and str(post.user_id) == session.user_id
is_company_owner = False
if not is_owner and session.account_type == "company" and session.company_id:
profile = await db.get_profile(post.user_id)
if profile and profile.company_id and str(profile.company_id) == session.company_id:
is_company_owner = True
if not is_owner and not is_company_owner:
raise HTTPException(status_code=403, detail="Not authorized")
# Get current media items (convert to dicts with JSON-serializable datetime)
media_items = [
item.model_dump(mode='json') if hasattr(item, 'model_dump') else (item.dict() if hasattr(item, 'dict') else item)
for item in (post.media_items or [])
]
# Validate: Max 3 items
if len(media_items) >= 3:
raise HTTPException(status_code=400, detail="Maximal 3 Medien erlaubt")
# Read file from form data
form = await request.form()
file = form.get("file") or form.get("image") # Support both 'file' and 'image' keys
if not file or not hasattr(file, "read"):
raise HTTPException(status_code=400, detail="No file provided")
file_content = await file.read()
content_type = file.content_type or "application/octet-stream"
# Validate media type consistency (no mixing)
is_video = content_type.startswith("video/")
has_videos = any(item.get("type") == "video" for item in media_items)
has_images = any(item.get("type") == "image" for item in media_items)
if (is_video and has_images) or (not is_video and has_videos):
raise HTTPException(status_code=400, detail="Kann nicht Bilder und Videos mischen")
# Only allow 1 video max
if is_video and len(media_items) > 0:
raise HTTPException(status_code=400, detail="Nur 1 Video pro Post erlaubt")
# Upload to storage
media_url = await storage.upload_media(
file_content=file_content,
content_type=content_type,
user_id=str(post.user_id),
)
# Add to array
new_item = {
"type": "video" if is_video else "image",
"url": media_url,
"order": len(media_items),
"content_type": content_type,
"uploaded_at": datetime.now(timezone.utc).isoformat(),
"metadata": None
}
media_items.append(new_item)
# Update DB
await db.update_generated_post(UUID(post_id), {"media_items": media_items})
# Also update image_url for backward compatibility (first image)
if new_item["type"] == "image" and len([i for i in media_items if i["type"] == "image"]) == 1:
await db.update_generated_post(UUID(post_id), {"image_url": media_url})
return {"success": True, "media_item": new_item, "media_items": media_items}
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.exception(f"Failed to upload post media: {e}")
raise HTTPException(status_code=500, detail=str(e))
@user_router.delete("/api/posts/{post_id}/media/{media_index}")
async def delete_post_media(request: Request, post_id: str, media_index: int):
"""Delete a specific media item by index."""
session = require_user_session(request)
if not session:
raise HTTPException(status_code=401, detail="Not authenticated")
try:
post = await db.get_generated_post(UUID(post_id))
if not post:
raise HTTPException(status_code=404, detail="Post not found")
# Check authorization: owner or company owner
is_owner = session.user_id and str(post.user_id) == session.user_id
is_company_owner = False
if not is_owner and session.account_type == "company" and session.company_id:
profile = await db.get_profile(post.user_id)
if profile and profile.company_id and str(profile.company_id) == session.company_id:
is_company_owner = True
if not is_owner and not is_company_owner:
raise HTTPException(status_code=403, detail="Not authorized")
# Get current media items (convert to dicts with JSON-serializable datetime)
media_items = [
item.model_dump(mode='json') if hasattr(item, 'model_dump') else (item.dict() if hasattr(item, 'dict') else item)
for item in (post.media_items or [])
]
if media_index < 0 or media_index >= len(media_items):
raise HTTPException(status_code=404, detail="Media item nicht gefunden")
# Delete from storage
item_to_delete = media_items[media_index]
try:
await storage.delete_image(item_to_delete["url"])
except Exception as e:
logger.warning(f"Failed to delete media from storage: {e}")
# Remove and re-index
media_items.pop(media_index)
for i, item in enumerate(media_items):
item["order"] = i
# Update DB
await db.update_generated_post(UUID(post_id), {"media_items": media_items})
# Update image_url for backward compatibility
first_image = next((item for item in media_items if item["type"] == "image"), None)
await db.update_generated_post(UUID(post_id), {"image_url": first_image["url"] if first_image else None})
return {"success": True, "media_items": media_items}
except HTTPException:
raise
except Exception as e:
logger.exception(f"Failed to delete post media: {e}")
raise HTTPException(status_code=500, detail=str(e))
@user_router.put("/api/posts/{post_id}/media/reorder")
async def reorder_post_media(request: Request, post_id: str):
"""Reorder media items. Expects JSON: {"order": [2, 0, 1]}"""
session = require_user_session(request)
if not session:
raise HTTPException(status_code=401, detail="Not authenticated")
try:
post = await db.get_generated_post(UUID(post_id))
if not post:
raise HTTPException(status_code=404, detail="Post not found")
# Check authorization: owner or company owner
is_owner = session.user_id and str(post.user_id) == session.user_id
is_company_owner = False
if not is_owner and session.account_type == "company" and session.company_id:
profile = await db.get_profile(post.user_id)
if profile and profile.company_id and str(profile.company_id) == session.company_id:
is_company_owner = True
if not is_owner and not is_company_owner:
raise HTTPException(status_code=403, detail="Not authorized")
# Get current media items (convert to dicts with JSON-serializable datetime)
media_items = [
item.model_dump(mode='json') if hasattr(item, 'model_dump') else (item.dict() if hasattr(item, 'dict') else item)
for item in (post.media_items or [])
]
# Parse request body
body = await request.json()
new_order = body.get("order", [])
# Validate order array
if len(new_order) != len(media_items) or set(new_order) != set(range(len(media_items))):
raise HTTPException(status_code=400, detail="Invalid order indices")
# Reorder
reordered = [media_items[i] for i in new_order]
for i, item in enumerate(reordered):
item["order"] = i
# Update DB
await db.update_generated_post(UUID(post_id), {"media_items": reordered})
return {"success": True, "media_items": reordered}
except HTTPException:
raise
except Exception as e:
logger.exception(f"Failed to reorder post media: {e}")
raise HTTPException(status_code=500, detail=str(e))
# ==================== IMAGE PROXY FOR HTTPS ====================
@user_router.get("/proxy/image/{bucket}/{path:path}")
async def proxy_supabase_image(bucket: str, path: str):
"""
Proxy Supabase storage images via HTTPS to avoid mixed content warnings.
This allows HTTPS pages to load images from HTTP Supabase storage.
"""
import httpx
from fastapi.responses import Response
try:
# Build the Supabase storage URL
storage_url = settings.supabase_url.replace("https://", "http://").replace("http://", "http://")
image_url = f"{storage_url}/storage/v1/object/public/{bucket}/{path}"
# Fetch the image from Supabase
async with httpx.AsyncClient(timeout=30.0) as client:
response = await client.get(image_url)
if response.status_code != 200:
raise HTTPException(status_code=404, detail="Image not found")
# Return the image with proper headers
return Response(
content=response.content,
media_type=response.headers.get("content-type", "image/jpeg"),
headers={
"Cache-Control": "public, max-age=31536000",
"Access-Control-Allow-Origin": "*"
}
)
except httpx.TimeoutException:
raise HTTPException(status_code=504, detail="Image fetch timeout")
except Exception as e:
logger.error(f"Failed to proxy image {bucket}/{path}: {e}")
raise HTTPException(status_code=500, detail="Failed to load image")