Files
Onyva-Postling/src/web/user/auth.py

521 lines
18 KiB
Python

"""User authentication with Supabase Auth.
Uses Supabase's built-in authentication for:
- Email/password signup and login
- LinkedIn OAuth
- Session management via JWT tokens
"""
import secrets
from typing import Optional
from uuid import UUID
from fastapi import Request, Response
from loguru import logger
from supabase import create_client
from src.config import settings
from src.database import db
# Session management - using Supabase JWT tokens stored in cookies
USER_SESSION_COOKIE = "sb_session" # Supabase session cookie
REFRESH_TOKEN_COOKIE = "sb_refresh_token"
SESSION_SECRET = settings.session_secret or secrets.token_hex(32)
# Supabase client for auth operations
_supabase_client = None
def get_supabase():
"""Get or create Supabase client."""
global _supabase_client
if _supabase_client is None:
_supabase_client = create_client(settings.supabase_url, settings.supabase_key)
return _supabase_client
class UserSession:
"""User session data."""
def __init__(
self,
user_id: Optional[str] = None,
linkedin_vanity_name: str = "",
linkedin_name: Optional[str] = None,
linkedin_picture: Optional[str] = None,
email: Optional[str] = None,
account_type: str = "ghostwriter",
company_id: Optional[str] = None,
onboarding_status: str = "completed",
company_name: Optional[str] = None,
display_name: Optional[str] = None
):
self.user_id = user_id
self.linkedin_vanity_name = linkedin_vanity_name
self.linkedin_name = linkedin_name
self.linkedin_picture = linkedin_picture
self.email = email
self.account_type = account_type
self.company_id = company_id
self.onboarding_status = onboarding_status
self.company_name = company_name
self.display_name = display_name or linkedin_name
@property
def is_onboarding_complete(self) -> bool:
"""Check if user has completed onboarding."""
return self.onboarding_status == "completed"
@property
def is_company_owner(self) -> bool:
"""Check if user is a company owner."""
return self.account_type == "company"
@property
def is_employee(self) -> bool:
"""Check if user is an employee."""
return self.account_type == "employee"
@property
def is_ghostwriter(self) -> bool:
"""Check if user is a ghostwriter."""
return self.account_type == "ghostwriter"
def to_cookie_value(self) -> str:
"""Serialize session to cookie value."""
import json
import hashlib
data = {
"user_id": self.user_id,
"linkedin_vanity_name": self.linkedin_vanity_name,
"linkedin_name": self.linkedin_name,
"linkedin_picture": self.linkedin_picture,
"email": self.email,
"account_type": self.account_type,
"company_id": self.company_id,
"onboarding_status": self.onboarding_status,
"company_name": self.company_name,
"display_name": self.display_name
}
# Create signed cookie value
json_data = json.dumps(data)
signature = hashlib.sha256(f"{json_data}{SESSION_SECRET}".encode()).hexdigest()[:16]
import base64
encoded = base64.b64encode(json_data.encode()).decode()
return f"{encoded}.{signature}"
@classmethod
def from_cookie_value(cls, cookie_value: str) -> Optional["UserSession"]:
"""Deserialize session from cookie value."""
import json
import hashlib
import base64
try:
parts = cookie_value.split(".")
if len(parts) != 2:
return None
encoded, signature = parts
json_data = base64.b64decode(encoded.encode()).decode()
# Verify signature
expected_sig = hashlib.sha256(f"{json_data}{SESSION_SECRET}".encode()).hexdigest()[:16]
if signature != expected_sig:
logger.warning("Invalid session signature")
return None
data = json.loads(json_data)
return cls(
user_id=data.get("user_id"),
linkedin_vanity_name=data.get("linkedin_vanity_name", ""),
linkedin_name=data.get("linkedin_name"),
linkedin_picture=data.get("linkedin_picture"),
email=data.get("email"),
account_type=data.get("account_type", "ghostwriter"),
company_id=data.get("company_id"),
onboarding_status=data.get("onboarding_status", "completed"),
company_name=data.get("company_name"),
display_name=data.get("display_name")
)
except Exception as e:
logger.error(f"Failed to parse session cookie: {e}")
return None
def get_user_session(request: Request) -> Optional[UserSession]:
"""Get user session from request cookies."""
# Try legacy cookie first (contains full session data including profile info)
legacy_cookie = request.cookies.get("linkedin_user_session")
if legacy_cookie:
session = UserSession.from_cookie_value(legacy_cookie)
if session:
return session
# Fallback to Supabase session validation
access_token = request.cookies.get(USER_SESSION_COOKIE)
if access_token:
try:
supabase = get_supabase()
user_response = supabase.auth.get_user(access_token)
if user_response and user_response.user:
return _create_session_from_supabase_user(user_response.user)
except Exception as e:
logger.debug(f"Could not validate Supabase session: {e}")
return None
async def get_user_session_async(request: Request) -> Optional[UserSession]:
"""Async version of get_user_session with profile lookup."""
session = get_user_session(request)
if session and session.user_id:
# Fetch additional profile data if needed
try:
user = await db.get_user(UUID(session.user_id))
if user:
session.onboarding_status = user.onboarding_status.value if hasattr(user.onboarding_status, 'value') else user.onboarding_status
session.account_type = user.account_type.value if hasattr(user.account_type, 'value') else user.account_type
session.company_id = str(user.company_id) if user.company_id else None
except Exception as e:
logger.warning(f"Could not fetch profile data: {e}")
return session
def _create_session_from_supabase_user(user) -> UserSession:
"""Create UserSession from Supabase user object."""
user_metadata = user.user_metadata or {}
return UserSession(
user_id=str(user.id),
linkedin_vanity_name=user_metadata.get("vanityName", ""),
linkedin_name=user_metadata.get("name"),
linkedin_picture=user_metadata.get("picture"),
email=user.email,
account_type=user_metadata.get("account_type", "ghostwriter"),
company_id=None, # Will be fetched from profile
onboarding_status="pending", # Will be fetched from profile
company_name=None,
display_name=user_metadata.get("display_name") or user_metadata.get("name")
)
def set_user_session(response: Response, session: UserSession, access_token: str = None, refresh_token: str = None) -> None:
"""Set user session cookies."""
if access_token:
response.set_cookie(
key=USER_SESSION_COOKIE,
value=access_token,
httponly=True,
max_age=60 * 60,
samesite="lax",
secure=True
)
if refresh_token:
response.set_cookie(
key=REFRESH_TOKEN_COOKIE,
value=refresh_token,
httponly=True,
max_age=60 * 60 * 24 * 7,
samesite="lax",
secure=True
)
# Also set legacy cookie for backwards compatibility
response.set_cookie(
key="linkedin_user_session",
value=session.to_cookie_value(),
httponly=True,
max_age=60 * 60 * 24 * 7,
samesite="lax",
secure=True,
)
def clear_user_session(response: Response) -> None:
"""Clear all session cookies."""
response.delete_cookie(USER_SESSION_COOKIE)
response.delete_cookie(REFRESH_TOKEN_COOKIE)
response.delete_cookie("linkedin_user_session")
async def handle_oauth_callback(
access_token: str,
refresh_token: Optional[str] = None,
allow_registration: bool = True,
account_type: str = "ghostwriter"
) -> tuple[Optional[UserSession], Optional[str], Optional[str]]:
"""Handle OAuth callback from Supabase Auth.
Supabase Auth handles the user creation in auth.users automatically.
Our trigger creates the profile in the profiles table.
Returns:
Tuple of (UserSession, access_token, refresh_token) if authorized,
(None, None, None) if not.
"""
from src.database.models import Profile, AccountType, OnboardingStatus
try:
supabase = get_supabase()
# Get user info using the access token
user_response = supabase.auth.get_user(access_token)
if not user_response or not user_response.user:
logger.error("Failed to get user from Supabase")
return None, None, None
user = user_response.user
user_metadata = user.user_metadata or {}
import json
logger.info(f"=== OAUTH CALLBACK ===")
logger.info(f"user.id: {user.id}")
logger.info(f"user.email: {user.email}")
logger.info(f"user.user_metadata: {json.dumps(user.user_metadata, indent=2)}")
vanity_name = user_metadata.get("vanityName")
name = user_metadata.get("name")
picture = user_metadata.get("picture")
email = user.email
logger.info(f"OAuth callback for user: {name} (vanityName={vanity_name}, email={email})")
# Check if profile exists (should be created by trigger)
profile = await db.get_profile(UUID(str(user.id)))
if not profile:
logger.info(f"Profile not found for user {user.id}, creating...")
profile = Profile(
account_type=AccountType(account_type),
onboarding_status=OnboardingStatus.PENDING,
display_name=name
)
profile = await db.create_profile(UUID(str(user.id)), profile)
# Get company name if applicable
company_name = None
if profile.company_id:
company = await db.get_company(profile.company_id)
if company:
company_name = company.name
session = UserSession(
user_id=str(user.id),
linkedin_vanity_name=vanity_name or "",
linkedin_name=name,
linkedin_picture=picture,
email=email,
account_type=profile.account_type.value if hasattr(profile.account_type, 'value') else profile.account_type,
company_id=str(profile.company_id) if profile.company_id else None,
onboarding_status=profile.onboarding_status.value if hasattr(profile.onboarding_status, 'value') else profile.onboarding_status,
company_name=company_name,
display_name=profile.display_name or name
)
return session, access_token, refresh_token
except Exception as e:
logger.exception(f"OAuth callback error: {e}")
return None, None, None
async def handle_email_password_login(email: str, password: str) -> tuple[Optional[UserSession], Optional[str], Optional[str]]:
"""Handle email/password login via Supabase Auth."""
try:
supabase = get_supabase()
auth_response = supabase.auth.sign_in_with_password({
"email": email.lower(),
"password": password
})
if not auth_response or not auth_response.user:
logger.warning(f"Failed login attempt for: {email}")
return None, None, None
user = auth_response.user
session = auth_response.session
logger.info(f"Successful email/password login for: {email}")
# Get profile data
profile = await db.get_profile(UUID(str(user.id)))
if not profile:
logger.error(f"No profile found for user {user.id}")
return None, None, None
# Get company name if applicable
company_name = None
if profile.company_id:
company = await db.get_company(profile.company_id)
if company:
company_name = company.name
user_session = UserSession(
user_id=str(user.id),
linkedin_vanity_name="",
linkedin_name=profile.display_name,
linkedin_picture=None,
email=user.email,
account_type=profile.account_type.value if hasattr(profile.account_type, 'value') else profile.account_type,
company_id=str(profile.company_id) if profile.company_id else None,
onboarding_status=profile.onboarding_status.value if hasattr(profile.onboarding_status, 'value') else profile.onboarding_status,
company_name=company_name,
display_name=profile.display_name
)
return user_session, session.access_token, session.refresh_token
except Exception as e:
logger.warning(f"Email/password login error: {e}")
return None, None, None
async def create_email_password_user(
email: str,
password: str,
account_type: str = "ghostwriter",
company_id: Optional[str] = None,
display_name: Optional[str] = None
) -> tuple[Optional[UserSession], Optional[str], Optional[str], Optional[str]]:
"""Create a new user with email/password authentication via Supabase Auth."""
from src.database.models import Profile, AccountType, OnboardingStatus
from src.web.user.password_auth import validate_password_strength
try:
# Validate password
is_valid, error_msg = validate_password_strength(password)
if not is_valid:
logger.warning(f"Weak password for registration: {error_msg}")
return None, None, None, error_msg
supabase = get_supabase()
# Sign up via Supabase Auth
auth_response = supabase.auth.sign_up({
"email": email.lower(),
"password": password,
"options": {
"data": {
"account_type": account_type,
"display_name": display_name
}
}
})
if not auth_response or not auth_response.user:
logger.warning(f"Failed to create user: {email}")
return None, None, None, "Registrierung fehlgeschlagen"
user = auth_response.user
session = auth_response.session
logger.info(f"Created new user via Supabase Auth: {user.id}")
# Wait a moment for the trigger to create the profile
import asyncio
await asyncio.sleep(0.5)
# Update profile with additional data
profile = await db.get_profile(UUID(str(user.id)))
if not profile:
logger.warning(f"Profile not created by trigger, creating manually")
acc_type = AccountType(account_type)
profile = Profile(
account_type=acc_type,
display_name=display_name,
onboarding_status=OnboardingStatus.PENDING
)
if company_id:
profile.company_id = UUID(company_id)
profile = await db.create_profile(UUID(str(user.id)), profile)
elif company_id or display_name:
updates = {}
if company_id:
updates["company_id"] = company_id
if display_name:
updates["display_name"] = display_name
if updates:
profile = await db.update_profile(UUID(str(user.id)), updates)
# Get company name if applicable
company_name = None
if profile.company_id:
company = await db.get_company(profile.company_id)
if company:
company_name = company.name
user_session = UserSession(
user_id=str(user.id),
linkedin_vanity_name="",
linkedin_name=display_name,
linkedin_picture=None,
email=user.email,
account_type=account_type,
company_id=company_id,
onboarding_status="pending",
company_name=company_name,
display_name=display_name
)
access_token = session.access_token if session else None
refresh_token = session.refresh_token if session else None
return user_session, access_token, refresh_token, None
except Exception as e:
error_str = str(e)
logger.exception(f"Error creating email/password user: {e}")
if "already registered" in error_str.lower() or "already exists" in error_str.lower():
return None, None, None, "Diese E-Mail-Adresse ist bereits registriert"
return None, None, None, "Registrierung fehlgeschlagen"
async def sign_out(access_token: Optional[str] = None) -> bool:
"""Sign out user from Supabase Auth."""
try:
if access_token:
supabase = get_supabase()
supabase.auth.sign_out()
return True
except Exception as e:
logger.warning(f"Error signing out: {e}")
return False
def get_supabase_login_url(redirect_to: str) -> str:
"""Generate Supabase OAuth login URL for LinkedIn."""
from urllib.parse import urlencode
base_url = f"{settings.supabase_url}/auth/v1/authorize"
params = {
"provider": "linkedin_oidc",
"redirect_to": redirect_to
}
return f"{base_url}?{urlencode(params)}"
async def refresh_session(refresh_token: str) -> tuple[Optional[str], Optional[str]]:
"""Refresh the user's session using a refresh token."""
try:
supabase = get_supabase()
response = supabase.auth.refresh_session(refresh_token)
if response and response.session:
return response.session.access_token, response.session.refresh_token
return None, None
except Exception as e:
logger.warning(f"Error refreshing session: {e}")
return None, None