added post insight feature

This commit is contained in:
2026-02-25 15:07:53 +01:00
parent 7c9866c0a6
commit a3ea774b58
9 changed files with 992 additions and 1 deletions

View File

@@ -194,6 +194,81 @@ class DatabaseClient:
)
return [LinkedInPost(**item) for item in result.data]
# ==================== LINKEDIN POST INSIGHTS ====================
async def upsert_post_insights_posts(self, posts: List['LinkedInPostInsightPost']) -> List['LinkedInPostInsightPost']:
"""Upsert LinkedIn post insights posts."""
from datetime import datetime
from src.database.models import LinkedInPostInsightPost
data = []
for p in posts:
post_dict = p.model_dump(exclude={"id", "created_at", "updated_at", "first_seen_at"}, exclude_none=True)
post_dict["user_id"] = str(post_dict["user_id"])
if post_dict.get("linkedin_account_id"):
post_dict["linkedin_account_id"] = str(post_dict["linkedin_account_id"])
if "post_date" in post_dict and isinstance(post_dict["post_date"], datetime):
post_dict["post_date"] = post_dict["post_date"].isoformat()
data.append(post_dict)
if not data:
return []
result = await asyncio.to_thread(
lambda: self.client.table("linkedin_post_insights_posts").upsert(
data,
on_conflict="user_id,post_urn"
).execute()
)
return [LinkedInPostInsightPost(**item) for item in result.data]
async def upsert_post_insights_daily(self, snapshots: List['LinkedInPostInsightDaily']) -> List['LinkedInPostInsightDaily']:
"""Upsert daily snapshots for post insights."""
from src.database.models import LinkedInPostInsightDaily
data = []
for s in snapshots:
snap_dict = s.model_dump(exclude={"id", "created_at", "updated_at"}, exclude_none=True)
snap_dict["user_id"] = str(snap_dict["user_id"])
snap_dict["post_id"] = str(snap_dict["post_id"])
if "snapshot_date" in snap_dict and hasattr(snap_dict["snapshot_date"], "isoformat"):
snap_dict["snapshot_date"] = snap_dict["snapshot_date"].isoformat()
data.append(snap_dict)
if not data:
return []
result = await asyncio.to_thread(
lambda: self.client.table("linkedin_post_insights_daily").upsert(
data,
on_conflict="post_id,snapshot_date"
).execute()
)
return [LinkedInPostInsightDaily(**item) for item in result.data]
async def get_post_insights_posts(self, user_id: UUID) -> List['LinkedInPostInsightPost']:
"""Get all LinkedIn post insights posts for user."""
from src.database.models import LinkedInPostInsightPost
result = await asyncio.to_thread(
lambda: self.client.table("linkedin_post_insights_posts").select("*").eq(
"user_id", str(user_id)
).order("post_date", desc=True).execute()
)
return [LinkedInPostInsightPost(**item) for item in result.data]
async def get_post_insights_daily(self, user_id: UUID, since_date: Optional[str] = None) -> List['LinkedInPostInsightDaily']:
"""Get daily post insights snapshots for user."""
from src.database.models import LinkedInPostInsightDaily
def _query():
q = self.client.table("linkedin_post_insights_daily").select("*").eq("user_id", str(user_id))
if since_date:
q = q.gte("snapshot_date", since_date)
return q.order("snapshot_date", desc=False).execute()
result = await asyncio.to_thread(_query)
return [LinkedInPostInsightDaily(**item) for item in result.data]
async def update_post_classification(
self,
post_id: UUID,
@@ -768,6 +843,19 @@ class DatabaseClient:
return LinkedInAccount(**result.data[0])
return None
async def list_linkedin_accounts(self, active_only: bool = True) -> list['LinkedInAccount']:
"""List LinkedIn accounts (optionally only active ones)."""
from src.database.models import LinkedInAccount
def _query():
q = self.client.table("linkedin_accounts").select("*")
if active_only:
q = q.eq("is_active", True)
return q.execute()
result = await asyncio.to_thread(_query)
return [LinkedInAccount(**item) for item in result.data]
async def create_linkedin_account(self, account: 'LinkedInAccount') -> 'LinkedInAccount':
"""Create LinkedIn account connection."""
from src.database.models import LinkedInAccount

View File

@@ -289,6 +289,45 @@ class LinkedInPost(DBModel):
classification_confidence: Optional[float] = None
class LinkedInPostInsightPost(DBModel):
"""LinkedIn post record for insights (separate from linkedin_posts)."""
id: Optional[UUID] = None
user_id: UUID
linkedin_account_id: Optional[UUID] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
post_urn: str
post_url: Optional[str] = None
post_text: Optional[str] = None
post_date: Optional[datetime] = None
author_username: Optional[str] = None
total_reactions: int = 0
likes: int = 0
comments: int = 0
shares: int = 0
reactions_breakdown: Dict[str, Any] = Field(default_factory=dict)
raw_data: Optional[Dict[str, Any]] = None
first_seen_at: Optional[datetime] = None
class LinkedInPostInsightDaily(DBModel):
"""Daily snapshot of LinkedIn post insights."""
id: Optional[UUID] = None
user_id: UUID
post_id: UUID
snapshot_date: date
total_reactions: int = 0
likes: int = 0
comments: int = 0
shares: int = 0
reactions_breakdown: Dict[str, Any] = Field(default_factory=dict)
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
class Topic(DBModel):
"""Topic model."""
id: Optional[UUID] = None