import { NextRequest, NextResponse } from "next/server"; import { prisma } from "@/lib/db"; import { getApiKey } from "@/lib/utils/apiKey"; import { isSocialOrDirectory } from "@/lib/utils/domains"; import { runGoogleSerpScraper, pollRunStatus, fetchDatasetItems } from "@/lib/services/apify"; import { bulkSearchDomains, type DecisionMakerCategory } from "@/lib/services/anymailfinder"; import { sinkLeadsToVault } from "@/lib/services/leadVault"; export async function POST(req: NextRequest) { try { const body = await req.json() as { query: string; maxPages: number; countryCode: string; languageCode: string; filterSocial: boolean; categories: DecisionMakerCategory[]; selectedDomains?: string[]; }; const [apifyToken, anymailKey] = await Promise.all([getApiKey("apify"), getApiKey("anymailfinder")]); if (!apifyToken) return NextResponse.json({ error: "Apify API token not configured" }, { status: 400 }); if (!anymailKey) return NextResponse.json({ error: "Anymailfinder API key not configured" }, { status: 400 }); const job = await prisma.job.create({ data: { type: "serp", status: "running", config: JSON.stringify(body), totalLeads: 0, }, }); runSerpEnrich(job.id, body, apifyToken, anymailKey).catch(console.error); return NextResponse.json({ jobId: job.id }); } catch (err) { console.error("POST /api/jobs/serp-enrich error:", err); return NextResponse.json({ error: "Failed to start job" }, { status: 500 }); } } async function runSerpEnrich( jobId: string, params: { query: string; maxPages: number; countryCode: string; languageCode: string; filterSocial: boolean; categories: DecisionMakerCategory[]; selectedDomains?: string[]; }, apifyToken: string, anymailKey: string ) { try { // 1. Run Apify SERP scraper const runId = await runGoogleSerpScraper( params.query, params.maxPages, params.countryCode, params.languageCode, apifyToken ); // 2. Poll until complete let runStatus = ""; let datasetId = ""; while (runStatus !== "SUCCEEDED" && runStatus !== "FAILED" && runStatus !== "ABORTED") { await sleep(3000); const result = await pollRunStatus(runId, apifyToken); runStatus = result.status; datasetId = result.defaultDatasetId; } if (runStatus !== "SUCCEEDED") throw new Error(`Apify run ${runStatus}`); // 3. Fetch results let serpResults = await fetchDatasetItems(datasetId, apifyToken); // 4. Filter social/directories if (params.filterSocial) { serpResults = serpResults.filter(r => !isSocialOrDirectory(r.domain)); } // 5. Deduplicate domains const seenDomains = new Set(); const uniqueResults = serpResults.filter(r => { if (!r.domain || seenDomains.has(r.domain)) return false; seenDomains.add(r.domain); return true; }); // 6. Apply selectedDomains filter if provided const filteredResults = params.selectedDomains?.length ? uniqueResults.filter(r => params.selectedDomains!.includes(r.domain)) : uniqueResults; const domains = filteredResults.map(r => r.domain); const serpMap = new Map(filteredResults.map(r => [r.domain, r])); await prisma.job.update({ where: { id: jobId }, data: { totalLeads: domains.length }, }); // 7. Store raw SERP results first (so we have leads even if enrichment fails) for (const r of filteredResults) { await prisma.leadResult.create({ data: { jobId, companyName: r.title || null, domain: r.domain || null, source: JSON.stringify({ url: r.url, description: r.description, position: r.position }), }, }); } // 8. Sink raw results to vault immediately (no contact info yet) await sinkLeadsToVault( filteredResults.map(r => ({ domain: r.domain || null, companyName: r.title || null, serpTitle: r.title || null, serpSnippet: r.description || null, serpRank: r.position ?? null, serpUrl: r.url || null, })), "serp", params.query, jobId, ); // 9. Enrich with Anymailfinder (best-effort — failure still completes the job) try { const enrichResults = await bulkSearchDomains( domains, params.categories, anymailKey, async (_completed, total) => { await prisma.job.update({ where: { id: jobId }, data: { totalLeads: total } }); } ); // Build domain → leadResult id map for updating const domainToResultId = new Map(); const storedResults = await prisma.leadResult.findMany({ where: { jobId }, select: { id: true, domain: true }, }); for (const r of storedResults) { if (r.domain) domainToResultId.set(r.domain, r.id); } let emailsFound = 0; for (const result of enrichResults) { const hasEmail = !!result.valid_email; if (hasEmail) emailsFound++; const resultId = domainToResultId.get(result.domain || ""); if (resultId) { await prisma.leadResult.update({ where: { id: resultId }, data: { contactName: result.person_full_name || null, contactTitle: result.person_job_title || null, email: result.email || null, linkedinUrl: result.person_linkedin_url || null, }, }); } } await prisma.job.update({ where: { id: jobId }, data: { status: "complete", emailsFound, totalLeads: filteredResults.length }, }); // Update vault entries with contact info await sinkLeadsToVault( enrichResults .filter(r => r.email) .map(r => { const serpData = serpMap.get(r.domain || ""); return { domain: r.domain || null, companyName: serpData?.title || null, contactName: r.person_full_name || null, contactTitle: r.person_job_title || null, email: r.email || null, linkedinUrl: r.person_linkedin_url || null, serpTitle: serpData?.title || null, serpSnippet: serpData?.description || null, serpRank: serpData?.position ?? null, serpUrl: serpData?.url || null, }; }), "serp", params.query, jobId, ); } catch (enrichErr) { // Anymailfinder failed — complete with SERP-only results (no emails) console.warn(`[serp-enrich] Anymailfinder failed for job ${jobId}:`, enrichErr); await prisma.job.update({ where: { id: jobId }, data: { status: "complete", totalLeads: filteredResults.length }, }); } } catch (err) { const message = err instanceof Error ? err.message : String(err); await prisma.job.update({ where: { id: jobId }, data: { status: "failed", error: message }, }); } } function sleep(ms: number) { return new Promise(r => setTimeout(r, ms)); }