94 lines
2.6 KiB
Python
94 lines
2.6 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Maintenance script to extract and save topics for existing customers.
|
|
|
|
This script:
|
|
1. Loads all customers
|
|
2. For each customer, extracts topics from existing posts
|
|
3. Saves extracted topics to the topics table
|
|
4. Also saves any topics from research results to the topics table
|
|
"""
|
|
import asyncio
|
|
from loguru import logger
|
|
|
|
from src.database import db
|
|
from src.agents import TopicExtractorAgent
|
|
|
|
|
|
async def extract_and_save_topics_for_customer(customer_id):
|
|
"""Extract and save topics for a single customer."""
|
|
logger.info(f"Processing customer: {customer_id}")
|
|
|
|
# Get customer
|
|
customer = await db.get_customer(customer_id)
|
|
if not customer:
|
|
logger.error(f"Customer {customer_id} not found")
|
|
return
|
|
|
|
logger.info(f"Customer: {customer.name}")
|
|
|
|
# Get LinkedIn posts
|
|
posts = await db.get_linkedin_posts(customer_id)
|
|
logger.info(f"Found {len(posts)} posts")
|
|
|
|
if not posts:
|
|
logger.warning("No posts found, skipping topic extraction")
|
|
else:
|
|
# Extract topics from posts
|
|
logger.info("Extracting topics from posts...")
|
|
topic_extractor = TopicExtractorAgent()
|
|
|
|
try:
|
|
topics = await topic_extractor.process(
|
|
posts=posts,
|
|
customer_id=customer_id
|
|
)
|
|
|
|
if topics:
|
|
# Save topics
|
|
saved_topics = await db.save_topics(topics)
|
|
logger.info(f"✓ Saved {len(saved_topics)} extracted topics")
|
|
else:
|
|
logger.warning("No topics extracted")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to extract topics: {e}", exc_info=True)
|
|
|
|
logger.info(f"Finished processing customer: {customer.name}\n")
|
|
|
|
|
|
async def main():
|
|
"""Main function."""
|
|
logger.info("=== TOPIC EXTRACTION MAINTENANCE SCRIPT ===\n")
|
|
|
|
# List all customers
|
|
customers = await db.list_customers()
|
|
|
|
if not customers:
|
|
logger.warning("No customers found")
|
|
return
|
|
|
|
logger.info(f"Found {len(customers)} customers\n")
|
|
|
|
# Process each customer
|
|
for customer in customers:
|
|
try:
|
|
await extract_and_save_topics_for_customer(customer.id)
|
|
except Exception as e:
|
|
logger.error(f"Error processing customer {customer.id}: {e}", exc_info=True)
|
|
|
|
logger.info("\n=== MAINTENANCE COMPLETE ===")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
# Setup logging
|
|
logger.add(
|
|
"logs/maintenance_{time:YYYY-MM-DD}.log",
|
|
rotation="1 day",
|
|
retention="7 days",
|
|
level="INFO"
|
|
)
|
|
|
|
# Run
|
|
asyncio.run(main())
|