Initial commit: LeadFlow lead generation platform

Full-stack Next.js 16 app with three scraping pipelines:
- AirScale CSV → Anymailfinder Bulk Decision Maker search
- LinkedIn Sales Navigator → Vayne → Anymailfinder email enrichment
- Apify Google SERP → domain extraction → Anymailfinder bulk enrichment

Includes Docker multi-stage build + docker-compose for Coolify deployment.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
Timo Uttenweiler
2026-03-17 11:21:11 +01:00
parent 5b84001c1e
commit facf8c9f69
59 changed files with 5800 additions and 233 deletions

View File

@@ -0,0 +1,16 @@
import { NextRequest, NextResponse } from "next/server";
import { prisma } from "@/lib/db";
export async function DELETE(
_req: NextRequest,
{ params }: { params: Promise<{ id: string }> }
) {
try {
const { id } = await params;
await prisma.job.delete({ where: { id } });
return NextResponse.json({ ok: true });
} catch (err) {
console.error("DELETE /api/jobs/[id] error:", err);
return NextResponse.json({ error: "Failed to delete" }, { status: 500 });
}
}

View File

@@ -0,0 +1,47 @@
import { NextRequest, NextResponse } from "next/server";
import { prisma } from "@/lib/db";
export async function GET(
_req: NextRequest,
{ params }: { params: Promise<{ id: string }> }
) {
try {
const { id } = await params;
const job = await prisma.job.findUnique({
where: { id },
include: {
results: {
orderBy: { createdAt: "desc" },
take: 200,
},
},
});
if (!job) return NextResponse.json({ error: "Job not found" }, { status: 404 });
return NextResponse.json({
id: job.id,
type: job.type,
status: job.status,
config: JSON.parse(job.config),
totalLeads: job.totalLeads,
emailsFound: job.emailsFound,
error: job.error,
createdAt: job.createdAt,
updatedAt: job.updatedAt,
results: job.results.map(r => ({
id: r.id,
companyName: r.companyName,
domain: r.domain,
contactName: r.contactName,
contactTitle: r.contactTitle,
email: r.email,
confidence: r.confidence,
linkedinUrl: r.linkedinUrl,
createdAt: r.createdAt,
})),
});
} catch (err) {
console.error("GET /api/jobs/[id]/status error:", err);
return NextResponse.json({ error: "Failed" }, { status: 500 });
}
}

View File

@@ -0,0 +1,106 @@
import { NextRequest, NextResponse } from "next/server";
import { prisma } from "@/lib/db";
import { decrypt } from "@/lib/utils/encryption";
import { cleanDomain } from "@/lib/utils/domains";
import { bulkSearchDomains, type DecisionMakerCategory } from "@/lib/services/anymailfinder";
export async function POST(req: NextRequest) {
try {
const body = await req.json() as {
companies: Array<{ name: string; domain: string }>;
categories: DecisionMakerCategory[];
};
const { companies, categories } = body;
if (!companies?.length) {
return NextResponse.json({ error: "No companies provided" }, { status: 400 });
}
const cred = await prisma.apiCredential.findUnique({ where: { service: "anymailfinder" } });
if (!cred?.value) {
return NextResponse.json({ error: "Anymailfinder API key not configured" }, { status: 400 });
}
const apiKey = decrypt(cred.value);
// Build domain → company map
const domainMap = new Map<string, string>();
for (const c of companies) {
const d = cleanDomain(c.domain);
if (d) domainMap.set(d, c.name);
}
const domains = Array.from(domainMap.keys());
const job = await prisma.job.create({
data: {
type: "airscale",
status: "running",
config: JSON.stringify({ categories, totalDomains: domains.length }),
totalLeads: domains.length,
},
});
// Run enrichment in background
runEnrichment(job.id, domains, domainMap, categories, apiKey).catch(console.error);
return NextResponse.json({ jobId: job.id });
} catch (err) {
console.error("POST /api/jobs/airscale-enrich error:", err);
return NextResponse.json({ error: "Failed to start job" }, { status: 500 });
}
}
async function runEnrichment(
jobId: string,
domains: string[],
domainMap: Map<string, string>,
categories: DecisionMakerCategory[],
apiKey: string
) {
try {
// Use bulk API: submit all domains, poll for completion, then store results.
const results = await bulkSearchDomains(
domains,
categories,
apiKey,
async (processed, total) => {
// Update progress while bulk job is running
await prisma.job.update({
where: { id: jobId },
data: { totalLeads: total },
});
}
);
// Store all results
let emailsFound = 0;
for (const result of results) {
const hasEmail = !!result.valid_email;
if (hasEmail) emailsFound++;
await prisma.leadResult.create({
data: {
jobId,
companyName: domainMap.get(result.domain || "") || null,
domain: result.domain || null,
contactName: result.person_full_name || null,
contactTitle: result.person_job_title || null,
email: result.email || null,
confidence: result.valid_email ? 1.0 : result.email_status === "risky" ? 0.5 : 0,
linkedinUrl: result.person_linkedin_url || null,
source: JSON.stringify({ email_status: result.email_status, category: result.decision_maker_category }),
},
});
}
await prisma.job.update({
where: { id: jobId },
data: { status: "complete", emailsFound, totalLeads: results.length },
});
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
await prisma.job.update({
where: { id: jobId },
data: { status: "failed", error: message },
});
}
}

View File

@@ -0,0 +1,167 @@
import { NextRequest, NextResponse } from "next/server";
import { prisma } from "@/lib/db";
import { decrypt } from "@/lib/utils/encryption";
import {
submitBulkPersonSearch,
getBulkSearchStatus,
downloadBulkResults,
searchDecisionMakerByDomain,
type DecisionMakerCategory,
} from "@/lib/services/anymailfinder";
export async function POST(req: NextRequest) {
try {
const body = await req.json() as {
jobId: string;
resultIds: string[];
categories: DecisionMakerCategory[];
};
const { jobId, resultIds, categories } = body;
const cred = await prisma.apiCredential.findUnique({ where: { service: "anymailfinder" } });
if (!cred?.value) {
return NextResponse.json({ error: "Anymailfinder API key not configured" }, { status: 400 });
}
const apiKey = decrypt(cred.value);
const results = await prisma.leadResult.findMany({
where: { id: { in: resultIds }, jobId, domain: { not: null } },
});
const enrichJob = await prisma.job.create({
data: {
type: "linkedin-enrich",
status: "running",
config: JSON.stringify({ parentJobId: jobId, categories }),
totalLeads: results.length,
},
});
runLinkedInEnrich(enrichJob.id, jobId, results, categories, apiKey).catch(console.error);
return NextResponse.json({ jobId: enrichJob.id });
} catch (err) {
console.error("POST /api/jobs/linkedin-enrich error:", err);
return NextResponse.json({ error: "Failed to start enrichment" }, { status: 500 });
}
}
async function runLinkedInEnrich(
enrichJobId: string,
parentJobId: string,
results: Array<{
id: string; domain: string | null; contactName: string | null;
companyName: string | null; contactTitle: string | null; linkedinUrl: string | null;
}>,
categories: DecisionMakerCategory[],
apiKey: string
) {
let emailsFound = 0;
try {
// Separate results into those with names (person search) and those without (decision maker search)
const withNames: typeof results = [];
const withoutNames: typeof results = [];
for (const r of results) {
if (r.contactName && r.domain) {
withNames.push(r);
} else if (r.domain) {
withoutNames.push(r);
}
}
// Map to look up results by domain
const resultByDomain = new Map(results.map(r => [r.domain!, r]));
// 1. Bulk person name search for leads with names
if (withNames.length > 0) {
const leads = withNames.map(r => {
const nameParts = (r.contactName || "").trim().split(/\s+/);
return {
domain: r.domain!,
firstName: nameParts[0] || "",
lastName: nameParts.slice(1).join(" ") || "",
};
});
try {
const searchId = await submitBulkPersonSearch(leads, apiKey, `linkedin-enrich-${enrichJobId}`);
// Poll for completion
let status;
do {
await sleep(5000);
status = await getBulkSearchStatus(searchId, apiKey);
} while (status.status !== "completed" && status.status !== "failed");
if (status.status === "completed") {
const rows = await downloadBulkResults(searchId, apiKey);
for (const row of rows) {
const domain = row["domain"] || row["Domain"] || "";
const result = resultByDomain.get(domain);
if (!result) continue;
const email = row["email"] || row["Email"] || null;
const emailStatus = (row["email_status"] || row["Email Status"] || "not_found").toLowerCase();
const isValid = emailStatus === "valid";
if (isValid) emailsFound++;
await prisma.leadResult.update({
where: { id: result.id },
data: {
email: email || null,
confidence: isValid ? 1.0 : emailStatus === "risky" ? 0.5 : 0,
contactName: row["person_full_name"] || row["Full Name"] || result.contactName || null,
contactTitle: row["person_job_title"] || row["Job Title"] || result.contactTitle || null,
},
});
}
}
} catch (err) {
console.error("Bulk person search error:", err);
// Fall through — will attempt decision-maker search below
}
}
// 2. Decision-maker search for leads without names
for (const r of withoutNames) {
if (!r.domain) continue;
try {
const found = await searchDecisionMakerByDomain(r.domain, categories, apiKey);
const isValid = !!found.valid_email;
if (isValid) emailsFound++;
await prisma.leadResult.update({
where: { id: r.id },
data: {
email: found.email || null,
confidence: isValid ? 1.0 : found.email_status === "risky" ? 0.5 : 0,
contactName: found.person_full_name || r.contactName || null,
contactTitle: found.person_job_title || r.contactTitle || null,
},
});
await prisma.job.update({ where: { id: enrichJobId }, data: { emailsFound } });
} catch (err) {
console.error(`Decision-maker search error for domain ${r.domain}:`, err);
}
}
await prisma.job.update({
where: { id: enrichJobId },
data: { status: "complete", emailsFound },
});
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
await prisma.job.update({
where: { id: enrichJobId },
data: { status: "failed", error: message },
});
}
}
function sleep(ms: number) {
return new Promise(r => setTimeout(r, ms));
}

41
app/api/jobs/route.ts Normal file
View File

@@ -0,0 +1,41 @@
import { NextResponse } from "next/server";
import { prisma } from "@/lib/db";
export async function GET() {
try {
const jobs = await prisma.job.findMany({
orderBy: { createdAt: "desc" },
take: 100,
});
const totalLeads = jobs.reduce((s, j) => s + j.totalLeads, 0);
const totalEmails = jobs.reduce((s, j) => s + j.emailsFound, 0);
const completedJobs = jobs.filter(j => j.status === "complete" && j.totalLeads > 0);
const avgHitRate = completedJobs.length > 0
? Math.round(
completedJobs.reduce((s, j) => s + (j.emailsFound / j.totalLeads) * 100, 0) / completedJobs.length
)
: 0;
return NextResponse.json({
jobs: jobs.map(j => ({
id: j.id,
type: j.type,
status: j.status,
totalLeads: j.totalLeads,
emailsFound: j.emailsFound,
createdAt: j.createdAt,
error: j.error,
})),
stats: {
totalJobs: jobs.length,
totalLeads,
totalEmails,
avgHitRate,
},
});
} catch (err) {
console.error("GET /api/jobs error:", err);
return NextResponse.json({ jobs: [], stats: {} }, { status: 500 });
}
}

View File

@@ -0,0 +1,155 @@
import { NextRequest, NextResponse } from "next/server";
import { prisma } from "@/lib/db";
import { decrypt } from "@/lib/utils/encryption";
import { isSocialOrDirectory } from "@/lib/utils/domains";
import { runGoogleSerpScraper, pollRunStatus, fetchDatasetItems } from "@/lib/services/apify";
import { bulkSearchDomains, type DecisionMakerCategory } from "@/lib/services/anymailfinder";
export async function POST(req: NextRequest) {
try {
const body = await req.json() as {
query: string;
maxPages: number;
countryCode: string;
languageCode: string;
filterSocial: boolean;
categories: DecisionMakerCategory[];
selectedDomains?: string[];
};
const apifyCred = await prisma.apiCredential.findUnique({ where: { service: "apify" } });
const anymailCred = await prisma.apiCredential.findUnique({ where: { service: "anymailfinder" } });
if (!apifyCred?.value) return NextResponse.json({ error: "Apify API token not configured" }, { status: 400 });
if (!anymailCred?.value) return NextResponse.json({ error: "Anymailfinder API key not configured" }, { status: 400 });
const apifyToken = decrypt(apifyCred.value);
const anymailKey = decrypt(anymailCred.value);
const job = await prisma.job.create({
data: {
type: "serp",
status: "running",
config: JSON.stringify(body),
totalLeads: 0,
},
});
runSerpEnrich(job.id, body, apifyToken, anymailKey).catch(console.error);
return NextResponse.json({ jobId: job.id });
} catch (err) {
console.error("POST /api/jobs/serp-enrich error:", err);
return NextResponse.json({ error: "Failed to start job" }, { status: 500 });
}
}
async function runSerpEnrich(
jobId: string,
params: {
query: string; maxPages: number; countryCode: string; languageCode: string;
filterSocial: boolean; categories: DecisionMakerCategory[]; selectedDomains?: string[];
},
apifyToken: string,
anymailKey: string
) {
try {
// 1. Run Apify SERP scraper
const runId = await runGoogleSerpScraper(
params.query, params.maxPages, params.countryCode, params.languageCode, apifyToken
);
// 2. Poll until complete
let runStatus = "";
let datasetId = "";
while (runStatus !== "SUCCEEDED" && runStatus !== "FAILED" && runStatus !== "ABORTED") {
await sleep(3000);
const result = await pollRunStatus(runId, apifyToken);
runStatus = result.status;
datasetId = result.defaultDatasetId;
}
if (runStatus !== "SUCCEEDED") throw new Error(`Apify run ${runStatus}`);
// 3. Fetch results
let serpResults = await fetchDatasetItems(datasetId, apifyToken);
// 4. Filter social/directories
if (params.filterSocial) {
serpResults = serpResults.filter(r => !isSocialOrDirectory(r.domain));
}
// 5. Deduplicate domains
const seenDomains = new Set<string>();
const uniqueResults = serpResults.filter(r => {
if (!r.domain || seenDomains.has(r.domain)) return false;
seenDomains.add(r.domain);
return true;
});
// 6. Apply selectedDomains filter if provided
const filteredResults = params.selectedDomains?.length
? uniqueResults.filter(r => params.selectedDomains!.includes(r.domain))
: uniqueResults;
const domains = filteredResults.map(r => r.domain);
const serpMap = new Map(filteredResults.map(r => [r.domain, r]));
await prisma.job.update({
where: { id: jobId },
data: { totalLeads: domains.length },
});
// 7. Enrich with Anymailfinder Bulk API
const enrichResults = await bulkSearchDomains(
domains,
params.categories,
anymailKey,
async (_completed, total) => {
await prisma.job.update({ where: { id: jobId }, data: { totalLeads: total } });
}
);
// 8. Store results
let emailsFound = 0;
for (const result of enrichResults) {
const serpData = serpMap.get(result.domain || "");
const hasEmail = !!result.valid_email;
if (hasEmail) emailsFound++;
await prisma.leadResult.create({
data: {
jobId,
companyName: serpData?.title || null,
domain: result.domain || null,
contactName: result.person_full_name || null,
contactTitle: result.person_job_title || null,
email: result.email || null,
confidence: result.valid_email ? 1.0 : result.email_status === "risky" ? 0.5 : 0,
linkedinUrl: result.person_linkedin_url || null,
source: JSON.stringify({
url: serpData?.url,
description: serpData?.description,
position: serpData?.position,
email_status: result.email_status,
}),
},
});
}
await prisma.job.update({
where: { id: jobId },
data: { status: "complete", emailsFound, totalLeads: enrichResults.length },
});
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
await prisma.job.update({
where: { id: jobId },
data: { status: "failed", error: message },
});
}
}
function sleep(ms: number) {
return new Promise(r => setTimeout(r, ms));
}

View File

@@ -0,0 +1,116 @@
import { NextRequest, NextResponse } from "next/server";
import { prisma } from "@/lib/db";
import { decrypt } from "@/lib/utils/encryption";
import { createOrder, getOrderStatus, triggerExport, downloadOrderCSV } from "@/lib/services/vayne";
export async function POST(req: NextRequest) {
try {
const body = await req.json() as { salesNavUrl: string; maxResults: number };
const { salesNavUrl, maxResults } = body;
if (!salesNavUrl?.includes("linkedin.com/sales")) {
return NextResponse.json({ error: "Invalid Sales Navigator URL" }, { status: 400 });
}
const cred = await prisma.apiCredential.findUnique({ where: { service: "vayne" } });
if (!cred?.value) {
return NextResponse.json({ error: "Vayne API token not configured" }, { status: 400 });
}
const apiToken = decrypt(cred.value);
const job = await prisma.job.create({
data: {
type: "linkedin",
status: "running",
config: JSON.stringify({ salesNavUrl, maxResults }),
totalLeads: 0,
},
});
runVayneScrape(job.id, salesNavUrl, maxResults, apiToken).catch(console.error);
return NextResponse.json({ jobId: job.id });
} catch (err) {
console.error("POST /api/jobs/vayne-scrape error:", err);
return NextResponse.json({ error: "Failed to start scrape" }, { status: 500 });
}
}
async function runVayneScrape(
jobId: string,
salesNavUrl: string,
maxResults: number,
apiToken: string
) {
try {
// 1. Create Vayne order
const order = await createOrder(salesNavUrl, maxResults, apiToken, `LeadFlow-${jobId.slice(0, 8)}`);
const orderId = order.id;
await prisma.job.update({
where: { id: jobId },
data: { config: JSON.stringify({ salesNavUrl, maxResults, vayneOrderId: orderId }) },
});
// 2. Poll until finished
let status = order.scraping_status;
let scraped = 0;
while (status !== "finished" && status !== "failed") {
await sleep(5000);
const updated = await getOrderStatus(orderId, apiToken);
status = updated.scraping_status;
scraped = updated.scraped || 0;
await prisma.job.update({ where: { id: jobId }, data: { totalLeads: scraped } });
}
if (status === "failed") {
throw new Error("Vayne scraping failed");
}
// 3. Trigger export
let exportOrder = await triggerExport(orderId, apiToken);
// 4. Poll for export completion
let exportStatus = exportOrder.exports?.[0]?.status;
while (exportStatus !== "completed") {
await sleep(3000);
exportOrder = await getOrderStatus(orderId, apiToken);
exportStatus = exportOrder.exports?.[0]?.status;
if (exportStatus === undefined) break; // fallback
}
const fileUrl = exportOrder.exports?.[0]?.file_url;
if (!fileUrl) throw new Error("No export file URL returned by Vayne");
// 5. Download and parse CSV
const profiles = await downloadOrderCSV(fileUrl);
// 6. Store results
await prisma.leadResult.createMany({
data: profiles.map(p => ({
jobId,
companyName: p.company || null,
domain: p.companyDomain || null,
contactName: p.fullName || null,
contactTitle: p.title || null,
linkedinUrl: p.linkedinUrl || null,
source: JSON.stringify({ location: p.location }),
})),
});
await prisma.job.update({
where: { id: jobId },
data: { status: "complete", totalLeads: profiles.length },
});
} catch (err) {
const message = err instanceof Error ? err.message : String(err);
await prisma.job.update({
where: { id: jobId },
data: { status: "failed", error: message },
});
}
}
function sleep(ms: number) {
return new Promise(r => setTimeout(r, ms));
}