diff --git a/src/config.py b/src/config.py index aa0cc00..ae3d182 100644 --- a/src/config.py +++ b/src/config.py @@ -69,6 +69,13 @@ class Settings(BaseSettings): redis_url: str = "redis://redis:6379/0" scheduler_enabled: bool = False # True only on dedicated scheduler container + # Telegram Bot (experimental) + telegram_enabled: bool = False + telegram_bot_token: str = "" + telegram_bot_username: str = "" # e.g. "MyLinkedInBot" (without @) + telegram_webhook_secret: str = "" # Random string to validate incoming webhooks + telegram_webhook_url: str = "" # Base URL of the app, e.g. https://app.example.com + model_config = SettingsConfigDict( env_file=".env", env_file_encoding="utf-8", diff --git a/src/database/client.py b/src/database/client.py index 376d39a..fe4ba83 100644 --- a/src/database/client.py +++ b/src/database/client.py @@ -817,6 +817,63 @@ class DatabaseClient: await cache.invalidate_linkedin_account(user_id) logger.info(f"Deleted LinkedIn account: {account_id}") + # ==================== TELEGRAM ACCOUNTS ==================== + + async def get_telegram_account(self, user_id: UUID) -> Optional['TelegramAccount']: + """Get Telegram account for user.""" + from src.database.models import TelegramAccount + result = await asyncio.to_thread( + lambda: self.client.table("telegram_accounts").select("*") + .eq("user_id", str(user_id)).eq("is_active", True).execute() + ) + if result.data: + return TelegramAccount(**result.data[0]) + return None + + async def get_telegram_account_by_chat_id(self, chat_id: str) -> Optional['TelegramAccount']: + """Get Telegram account by chat_id.""" + from src.database.models import TelegramAccount + result = await asyncio.to_thread( + lambda: self.client.table("telegram_accounts").select("*") + .eq("telegram_chat_id", chat_id).eq("is_active", True).execute() + ) + if result.data: + return TelegramAccount(**result.data[0]) + return None + + async def save_telegram_account(self, account: 'TelegramAccount') -> 'TelegramAccount': + """Create or update a Telegram account connection.""" + from src.database.models import TelegramAccount + data = account.model_dump(exclude={'id', 'created_at', 'updated_at'}, exclude_none=True) + data['user_id'] = str(data['user_id']) + + existing = await asyncio.to_thread( + lambda: self.client.table("telegram_accounts").select("id") + .eq("user_id", str(account.user_id)).execute() + ) + + if existing.data: + result = await asyncio.to_thread( + lambda: self.client.table("telegram_accounts").update(data) + .eq("user_id", str(account.user_id)).execute() + ) + else: + result = await asyncio.to_thread( + lambda: self.client.table("telegram_accounts").insert(data).execute() + ) + + logger.info(f"Saved Telegram account for user: {account.user_id}") + return TelegramAccount(**result.data[0]) + + async def delete_telegram_account(self, user_id: UUID) -> bool: + """Delete Telegram account connection for user.""" + await asyncio.to_thread( + lambda: self.client.table("telegram_accounts").delete() + .eq("user_id", str(user_id)).execute() + ) + logger.info(f"Deleted Telegram account for user: {user_id}") + return True + # ==================== USERS ==================== async def get_user(self, user_id: UUID) -> Optional[User]: diff --git a/src/database/models.py b/src/database/models.py index dbfb755..b3f58bb 100644 --- a/src/database/models.py +++ b/src/database/models.py @@ -103,6 +103,22 @@ class LinkedInAccount(DBModel): last_error_at: Optional[datetime] = None +class TelegramAccount(DBModel): + """Telegram account connection for bot access.""" + id: Optional[UUID] = None + user_id: UUID + created_at: Optional[datetime] = None + updated_at: Optional[datetime] = None + telegram_user_id: str + telegram_username: Optional[str] = None + telegram_first_name: Optional[str] = None + telegram_chat_id: str + is_active: bool = True + last_used_at: Optional[datetime] = None + last_error: Optional[str] = None + last_error_at: Optional[datetime] = None + + class User(DBModel): """User model - combines auth.users data with profile data. diff --git a/src/services/telegram_service.py b/src/services/telegram_service.py new file mode 100644 index 0000000..7c1be9a --- /dev/null +++ b/src/services/telegram_service.py @@ -0,0 +1,383 @@ +"""Telegram bot service for LinkedIn post creation via chat.""" +import asyncio +from datetime import datetime, timezone +from typing import Optional +from uuid import UUID + +import httpx +from loguru import logger + +from src.config import settings +from src.services.redis_client import get_redis + +# Conversation state TTL: 24 hours +CONV_TTL = 86400 +# Rate limit: max requests per hour per user +RATE_LIMIT_MAX = 10 +RATE_LIMIT_TTL = 3600 + + +class TelegramService: + """Handles Telegram bot interactions for LinkedIn post creation.""" + + def __init__(self): + self._base_url = f"https://api.telegram.org/bot{settings.telegram_bot_token}" + + # ==================== TELEGRAM API HELPERS ==================== + + async def send_message(self, chat_id: str, text: str, reply_markup: Optional[dict] = None) -> dict: + """Send a text message to a Telegram chat.""" + payload: dict = {"chat_id": chat_id, "text": text, "parse_mode": "HTML"} + if reply_markup: + payload["reply_markup"] = reply_markup + async with httpx.AsyncClient(timeout=15.0) as client: + resp = await client.post(f"{self._base_url}/sendMessage", json=payload) + return resp.json() + + async def answer_callback_query(self, callback_query_id: str, text: str = "") -> None: + """Answer a callback query to dismiss the loading indicator.""" + async with httpx.AsyncClient(timeout=10.0) as client: + await client.post(f"{self._base_url}/answerCallbackQuery", json={ + "callback_query_id": callback_query_id, + "text": text + }) + + async def edit_message(self, chat_id: str, message_id: int, text: str, reply_markup: Optional[dict] = None) -> dict: + """Edit an existing message.""" + payload: dict = {"chat_id": chat_id, "message_id": message_id, "text": text, "parse_mode": "HTML"} + if reply_markup: + payload["reply_markup"] = reply_markup + async with httpx.AsyncClient(timeout=15.0) as client: + resp = await client.post(f"{self._base_url}/editMessageText", json=payload) + return resp.json() + + async def register_webhook(self, url: str, secret: str) -> None: + """Register webhook URL with Telegram.""" + async with httpx.AsyncClient(timeout=15.0) as client: + resp = await client.post(f"{self._base_url}/setWebhook", json={ + "url": url, + "secret_token": secret, + "allowed_updates": ["message", "callback_query"] + }) + data = resp.json() + if data.get("ok"): + logger.info(f"Telegram webhook registered: {url}") + else: + logger.error(f"Failed to register Telegram webhook: {data}") + + # ==================== CONVERSATION STATE ==================== + + def _conv_key(self, chat_id: str) -> str: + return f"telegram_conv:{chat_id}" + + async def _get_conv(self, chat_id: str) -> dict: + """Get conversation state from Redis.""" + try: + redis = await get_redis() + raw = await redis.get(self._conv_key(chat_id)) + if raw: + import json + return json.loads(raw) + except Exception as e: + logger.warning(f"Failed to get telegram conv state: {e}") + return {"state": "idle"} + + async def _set_conv(self, chat_id: str, data: dict) -> None: + """Save conversation state to Redis with 24h TTL.""" + try: + redis = await get_redis() + import json + await redis.setex(self._conv_key(chat_id), CONV_TTL, json.dumps(data)) + except Exception as e: + logger.warning(f"Failed to set telegram conv state: {e}") + + async def _clear_conv(self, chat_id: str) -> None: + """Delete conversation state from Redis.""" + try: + redis = await get_redis() + await redis.delete(self._conv_key(chat_id)) + except Exception as e: + logger.warning(f"Failed to clear telegram conv state: {e}") + + # ==================== RATE LIMITING ==================== + + async def _check_rate_limit(self, user_id: str) -> bool: + """Return True if request is allowed, False if rate limit exceeded.""" + try: + redis = await get_redis() + now = datetime.now(timezone.utc) + key = f"telegram_rate:{user_id}:{now.strftime('%Y%m%d%H')}" + count = await redis.incr(key) + if count == 1: + await redis.expire(key, RATE_LIMIT_TTL) + return count <= RATE_LIMIT_MAX + except Exception as e: + logger.warning(f"Rate limit check failed (allowing): {e}") + return True + + # ==================== KEYBOARD BUILDERS ==================== + + def _post_type_keyboard(self, post_types: list) -> dict: + """Build inline keyboard for post type selection.""" + buttons = [] + for pt in post_types: + buttons.append([{"text": pt.name, "callback_data": f"posttype:{pt.id}"}]) + return {"inline_keyboard": buttons} + + def _action_keyboard(self) -> dict: + """Build inline keyboard for post actions.""" + return {"inline_keyboard": [[ + {"text": "✏️ Überarbeiten", "callback_data": "revise"}, + {"text": "🔄 Neuer Post", "callback_data": "newpost"} + ]]} + + # ==================== MAIN DISPATCHER ==================== + + async def handle_update(self, update: dict, db) -> None: + """Route incoming Telegram update to the right handler.""" + try: + if "message" in update: + msg = update["message"] + chat_id = str(msg["chat"]["id"]) + text = msg.get("text", "") + await self._handle_message(chat_id, text, db) + + elif "callback_query" in update: + cq = update["callback_query"] + chat_id = str(cq["message"]["chat"]["id"]) + cb_id = cq["id"] + data = cq.get("data", "") + message_id = cq["message"]["message_id"] + await self.answer_callback_query(cb_id) + await self._handle_callback_query(chat_id, cb_id, data, message_id, db) + + except Exception as e: + logger.exception(f"Error handling Telegram update: {e}") + + # ==================== MESSAGE HANDLER ==================== + + async def _handle_message(self, chat_id: str, text: str, db) -> None: + """Handle incoming text messages.""" + # Look up linked account + tg_account = await db.get_telegram_account_by_chat_id(chat_id) + + # Handle /start {token} — account linking flow + if text.startswith("/start"): + parts = text.split(maxsplit=1) + token = parts[1].strip() if len(parts) > 1 else "" + await self._handle_start(chat_id, token, db) + return + + # All other messages require a linked account + if not tg_account: + await self.send_message( + chat_id, + "Dein Telegram-Konto ist noch nicht verknüpft.\n" + "Öffne die App unter Einstellungen → Telegram verbinden und folge den Anweisungen." + ) + return + + user_id = str(tg_account.user_id) + + # Rate limiting + if not await self._check_rate_limit(user_id): + await self.send_message(chat_id, "⚠️ Zu viele Anfragen. Bitte warte eine Stunde und versuche es erneut.") + return + + conv = await self._get_conv(chat_id) + state = conv.get("state", "idle") + + if state == "waiting_feedback": + await self._handle_feedback(chat_id, user_id, text, conv, db) + else: + # Treat any other text as a new topic + await self._handle_new_topic(chat_id, user_id, text, db) + + # ==================== START / ACCOUNT LINKING ==================== + + async def _handle_start(self, chat_id: str, token: str, db) -> None: + """Handle /start command — link account if token provided.""" + if not token: + tg_account = await db.get_telegram_account_by_chat_id(chat_id) + if tg_account: + await self.send_message(chat_id, "Du bist bereits verbunden! Schreib mir ein Thema für deinen nächsten LinkedIn-Post.") + else: + await self.send_message( + chat_id, + "Willkommen! 👋\n\nUm diesen Bot zu nutzen, verknüpfe zuerst dein Konto in der App unter " + "Einstellungen → Telegram verbinden." + ) + return + + # Look up one-time token in Redis + try: + redis = await get_redis() + link_key = f"telegram_link:{token}" + raw_user_id = await redis.get(link_key) + if not raw_user_id: + await self.send_message(chat_id, "❌ Dieser Link ist ungültig oder abgelaufen. Bitte erstelle einen neuen Link in der App.") + return + + user_id_str = raw_user_id.decode() if isinstance(raw_user_id, bytes) else raw_user_id + await redis.delete(link_key) + except Exception as e: + logger.error(f"Failed to look up telegram link token: {e}") + await self.send_message(chat_id, "❌ Ein Fehler ist aufgetreten. Bitte versuche es erneut.") + return + + # Save or update the telegram account + from src.database.models import TelegramAccount + account = TelegramAccount( + user_id=UUID(user_id_str), + telegram_chat_id=chat_id, + telegram_user_id=chat_id, # chat_id is sufficient as unique identifier + is_active=True + ) + await db.save_telegram_account(account) + await self._clear_conv(chat_id) + + await self.send_message( + chat_id, + "✅ Verbunden!\n\nSchreib mir ein Thema für deinen nächsten LinkedIn-Post und ich erstelle ihn für dich." + ) + + # ==================== TOPIC HANDLING ==================== + + async def _handle_new_topic(self, chat_id: str, user_id: str, topic_text: str, db) -> None: + """Handle a new post topic — show post type selection.""" + post_types = await db.get_post_types(UUID(user_id)) + active_types = [pt for pt in post_types if pt.is_active] + + if not active_types: + await self.send_message(chat_id, "⚠️ Keine Post-Typen gefunden. Bitte richte zuerst deinen Account in der App ein.") + return + + conv = { + "state": "waiting_post_type", + "user_id": user_id, + "topic": topic_text + } + await self._set_conv(chat_id, conv) + + await self.send_message( + chat_id, + f"📝 Thema: {topic_text}\n\nWähle einen Post-Typ:", + reply_markup=self._post_type_keyboard(active_types) + ) + + # ==================== FEEDBACK HANDLING ==================== + + async def _handle_feedback(self, chat_id: str, user_id: str, feedback: str, conv: dict, db) -> None: + """Handle user feedback to revise a post.""" + post_id = conv.get("post_id") + post_content = conv.get("post_content", "") + + if not post_content: + await self.send_message(chat_id, "❌ Kein Post gefunden. Bitte starte von vorne mit einem neuen Thema.") + await self._clear_conv(chat_id) + return + + await self.send_message(chat_id, "⏳ Überarbeite deinen Post...") + + try: + from src.orchestrator import orchestrator + improved = await orchestrator.apply_suggestion_to_post( + user_id=UUID(user_id), + post_content=post_content, + suggestion=feedback + ) + + # Update post in DB if we have a post_id + if post_id: + await db.update_generated_post(UUID(post_id), {"post_content": improved}) + + # Update conversation state + conv["post_content"] = improved + conv["state"] = "waiting_feedback" + await self._set_conv(chat_id, conv) + + await self.send_message( + chat_id, + f"✨ Überarbeiteter Post:\n\n{improved}", + reply_markup=self._action_keyboard() + ) + + except Exception as e: + logger.error(f"Failed to apply feedback for user {user_id}: {e}") + await self.send_message(chat_id, "❌ Fehler beim Überarbeiten. Bitte versuche es erneut.") + + # ==================== CALLBACK HANDLER ==================== + + async def _handle_callback_query(self, chat_id: str, cb_id: str, data: str, message_id: int, db) -> None: + """Handle inline keyboard button presses.""" + tg_account = await db.get_telegram_account_by_chat_id(chat_id) + if not tg_account: + return + + user_id = str(tg_account.user_id) + conv = await self._get_conv(chat_id) + + if data.startswith("posttype:"): + post_type_id_str = data.split(":", 1)[1] + await self._handle_post_type_selected(chat_id, user_id, post_type_id_str, conv, message_id, db) + + elif data == "revise": + conv["state"] = "waiting_feedback" + await self._set_conv(chat_id, conv) + await self.send_message(chat_id, "✏️ Alles klar, was soll ich verbessern?") + + elif data == "newpost": + await self._clear_conv(chat_id) + await self.send_message(chat_id, "🔄 Alles klar! Schreib mir dein neues Thema.") + + async def _handle_post_type_selected( + self, chat_id: str, user_id: str, post_type_id_str: str, conv: dict, message_id: int, db + ) -> None: + """Generate a post after the user selects a post type.""" + topic_text = conv.get("topic", "") + if not topic_text: + await self.send_message(chat_id, "❌ Kein Thema gefunden. Bitte starte von vorne.") + await self._clear_conv(chat_id) + return + + # Edit the post-type selection message to show progress + await self.edit_message(chat_id, message_id, "⏳ Erstelle deinen Post... Das kann einen Moment dauern.") + + try: + from src.orchestrator import orchestrator + result = await orchestrator.create_post( + user_id=UUID(user_id), + topic={"title": topic_text, "description": topic_text}, + post_type_id=UUID(post_type_id_str), + max_iterations=2 # Fewer iterations for faster Telegram response + ) + + post_content = result.get("final_post", "") + post_id = str(result.get("post_id", "")) + + # Update conversation state + new_conv = { + "state": "waiting_feedback", + "user_id": user_id, + "topic": topic_text, + "post_id": post_id, + "post_content": post_content + } + await self._set_conv(chat_id, new_conv) + + await self.send_message( + chat_id, + f"✨ Generierter Post:\n\n{post_content}", + reply_markup=self._action_keyboard() + ) + + except Exception as e: + logger.error(f"Failed to create post for user {user_id}: {e}") + await self.send_message(chat_id, "❌ Fehler beim Erstellen des Posts. Bitte versuche es erneut.") + await self._clear_conv(chat_id) + + +# Module-level singleton — only created when Telegram is enabled +telegram_service: Optional[TelegramService] = None +if settings.telegram_enabled: + telegram_service = TelegramService() diff --git a/src/web/app.py b/src/web/app.py index 7244ec7..df60fa7 100644 --- a/src/web/app.py +++ b/src/web/app.py @@ -22,6 +22,17 @@ async def lifespan(app: FastAPI): from src.services.redis_client import get_redis, close_redis await get_redis() + # Register Telegram webhook if enabled + if settings.telegram_enabled and settings.telegram_bot_token and settings.telegram_webhook_url: + try: + from src.services.telegram_service import telegram_service + if telegram_service: + webhook_url = f"{settings.telegram_webhook_url}/api/telegram/webhook" + await telegram_service.register_webhook(webhook_url, settings.telegram_webhook_secret) + logger.info("Telegram webhook registered") + except Exception as e: + logger.error(f"Failed to register Telegram webhook: {e}") + # Start scheduler only when this process is the dedicated scheduler container scheduler = None if settings.scheduler_enabled: diff --git a/src/web/templates/user/settings.html b/src/web/templates/user/settings.html index bbc44fb..f2a79da 100644 --- a/src/web/templates/user/settings.html +++ b/src/web/templates/user/settings.html @@ -169,6 +169,71 @@ {% endif %} + + {% if telegram_enabled %} +
+ {% if telegram_account.telegram_username %} + @{{ telegram_account.telegram_username }} + {% elif telegram_account.telegram_first_name %} + {{ telegram_account.telegram_first_name }} + {% else %} + Telegram verbunden + {% endif %} +
+ {% if telegram_account.created_at %} ++ Verbunden seit {{ telegram_account.created_at.strftime('%d.%m.%Y') }} +
+ {% endif %} ++ Verbinde Telegram, um LinkedIn-Posts direkt per Chat zu erstellen – ohne die Web-App öffnen zu müssen. +
+ + + {% endif %} +