diff --git a/apps/api/src/config.ts b/apps/api/src/config.ts index 7f0665263..4c334c1fe 100644 --- a/apps/api/src/config.ts +++ b/apps/api/src/config.ts @@ -48,8 +48,12 @@ const configSchema = z.object({ LLAMAPARSE_API_KEY: z.string().optional(), STRIPE_SECRET_KEY: z.string().optional(), AUTUMN_SECRET_KEY: z.string().optional(), + AUTUMN_CHECK_ENABLED: z.string().optional(), + AUTUMN_CHECK_EXPERIMENT_PERCENT: z.coerce.number().default(100), AUTUMN_EXPERIMENT: z.string().optional(), AUTUMN_EXPERIMENT_PERCENT: z.coerce.number().default(100), + AUTUMN_REQUEST_TRACK_EXPERIMENT: z.string().optional(), + AUTUMN_REQUEST_TRACK_EXPERIMENT_PERCENT: z.coerce.number().default(100), RESEND_API_KEY: z.string().optional(), PREVIEW_TOKEN: z.string().optional(), SEARCH_PREVIEW_TOKEN: z.string().optional(), diff --git a/apps/api/src/controllers/auth.ts b/apps/api/src/controllers/auth.ts index c595872eb..7905d1ffa 100644 --- a/apps/api/src/controllers/auth.ts +++ b/apps/api/src/controllers/auth.ts @@ -12,6 +12,7 @@ import { redlock } from "../services/redlock"; import { supabase_rr_service, supabase_service } from "../services/supabase"; import { AuthResponse, RateLimiterMode } from "../types"; import { AuthCreditUsageChunk, AuthCreditUsageChunkFromTeam } from "./v1/types"; +import { isAutumnCheckEnabled } from "../services/autumn/autumn.service"; function normalizedApiIsUuid(potentialUuid: string): boolean { // Check if the string is a valid UUID @@ -387,9 +388,45 @@ export async function authenticateUser( success: true, chunk: null, team_id: "bypass", + org_id: null, })(req, res, mode); } +/** + * Backfills org_id for stale cached auth chunks so Autumn check gating can run. + */ +async function ensureChunkOrgId( + apiKey: string, + chunk: AuthCreditUsageChunk | null, +): Promise { + if ( + !chunk || + chunk.org_id || + config.USE_DB_AUTHENTICATION !== true || + !isAutumnCheckEnabled() + ) { + return chunk; + } + + const { data, error } = await supabase_rr_service + .from("teams") + .select("org_id") + .eq("id", chunk.team_id) + .single(); + + if (error || !data?.org_id) { + logger.warn("Failed to backfill org_id for auth chunk", { + teamId: chunk.team_id, + error, + }); + return chunk; + } + + chunk.org_id = data.org_id; + await setCachedACUC(apiKey, !!chunk.is_extract, chunk); + return chunk; +} + async function supaAuthenticateUser( req, res, @@ -449,6 +486,7 @@ async function supaAuthenticateUser( } chunk = await getACUC(normalizedApi, false, true, RateLimiterMode.Scrape); + chunk = await ensureChunkOrgId(normalizedApi, chunk); if (chunk === null) { return { @@ -513,6 +551,7 @@ async function supaAuthenticateUser( return { success: true, team_id: `preview_${iptoken}`, + org_id: null, chunk: null, }; // check the origin of the request and make sure its from firecrawl.dev @@ -551,6 +590,7 @@ async function supaAuthenticateUser( return { success: true, team_id: teamId ?? undefined, + org_id: chunk?.org_id ?? null, chunk, }; } diff --git a/apps/api/src/controllers/v1/crawl-status-ws.ts b/apps/api/src/controllers/v1/crawl-status-ws.ts index 634e8ce1c..ffe8439cf 100644 --- a/apps/api/src/controllers/v1/crawl-status-ws.ts +++ b/apps/api/src/controllers/v1/crawl-status-ws.ts @@ -182,9 +182,9 @@ export async function crawlStatusWSController( }); } - const { team_id } = auth; + const { team_id, org_id } = auth; - req.auth = { team_id }; + req.auth = { team_id, org_id }; await crawlStatusWS(ws, req); } catch (err) { diff --git a/apps/api/src/controllers/v1/types.ts b/apps/api/src/controllers/v1/types.ts index 6e230c498..2c01cdc32 100644 --- a/apps/api/src/controllers/v1/types.ts +++ b/apps/api/src/controllers/v1/types.ts @@ -1219,6 +1219,7 @@ export type CrawlErrorsResponse = type AuthObject = { team_id: string; + org_id?: string | null; }; type Account = { @@ -1229,6 +1230,7 @@ export type AuthCreditUsageChunk = { api_key: string; api_key_id: number; team_id: string; + org_id?: string | null; sub_id: string | null; sub_current_period_start: string | null; sub_current_period_end: string | null; diff --git a/apps/api/src/controllers/v2/crawl-status-ws.ts b/apps/api/src/controllers/v2/crawl-status-ws.ts index 52211ae96..f5c4113d6 100644 --- a/apps/api/src/controllers/v2/crawl-status-ws.ts +++ b/apps/api/src/controllers/v2/crawl-status-ws.ts @@ -211,9 +211,9 @@ export async function crawlStatusWSController( }); } - const { team_id } = auth; + const { team_id, org_id } = auth; - req.auth = { team_id }; + req.auth = { team_id, org_id }; await crawlStatusWS(ws, req); } catch (err) { diff --git a/apps/api/src/controllers/v2/search.ts b/apps/api/src/controllers/v2/search.ts index 85f4e77e6..3cbef22e0 100644 --- a/apps/api/src/controllers/v2/search.ts +++ b/apps/api/src/controllers/v2/search.ts @@ -90,6 +90,18 @@ export async function searchController( zeroDataRetention = isZDROrAnon ?? false; applyZdrScope(isZDROrAnon ?? false); + // Verify the team has searchZDR enabled before allowing enterprise ZDR/anon + if (isZDROrAnon) { + const searchMode = getSearchZDR(req.acuc?.flags); + if (searchMode !== "allowed" && searchMode !== "forced") { + return res.status(403).json({ + success: false, + error: + "Zero Data Retention (ZDR) search is not enabled for your team. Contact support@firecrawl.com to enable this feature.", + }); + } + } + if (!agentRequestId) { await logRequest({ id: jobId, diff --git a/apps/api/src/controllers/v2/types.ts b/apps/api/src/controllers/v2/types.ts index 5fd359bf1..03ee4626b 100644 --- a/apps/api/src/controllers/v2/types.ts +++ b/apps/api/src/controllers/v2/types.ts @@ -1276,6 +1276,7 @@ export type CrawlErrorsResponse = type AuthObject = { team_id: string; + org_id?: string | null; }; type Account = { diff --git a/apps/api/src/controllers/v2/x402-search.ts b/apps/api/src/controllers/v2/x402-search.ts index 03d43c200..bcaaf2eb6 100644 --- a/apps/api/src/controllers/v2/x402-search.ts +++ b/apps/api/src/controllers/v2/x402-search.ts @@ -243,6 +243,20 @@ export async function x402SearchController( origin: req.body.origin, }); + // Verify the team has searchZDR enabled before allowing enterprise ZDR/anon + const isZDR = req.body.enterprise?.includes("zdr"); + const isAnon = req.body.enterprise?.includes("anon"); + if (isZDR || isAnon) { + const searchMode = getSearchZDR(req.acuc?.flags); + if (searchMode !== "allowed" && searchMode !== "forced") { + return res.status(403).json({ + success: false, + error: + "Zero Data Retention (ZDR) search is not enabled for your team. Contact support@firecrawl.com to enable this feature.", + }); + } + } + await logRequest({ id: jobId, kind: "search", diff --git a/apps/api/src/routes/shared.ts b/apps/api/src/routes/shared.ts index 884c1b951..7fd580aa3 100644 --- a/apps/api/src/routes/shared.ts +++ b/apps/api/src/routes/shared.ts @@ -24,6 +24,11 @@ import { validate as isUuid } from "uuid"; import { config } from "../config"; import { supabase_service } from "../services/supabase"; +import { + autumnService, + isAutumnCheckEnabled, +} from "../services/autumn/autumn.service"; + export function checkCreditsMiddleware( _minimum?: number, ): (req: RequestWithAuth, res: Response, next: NextFunction) => void { @@ -114,12 +119,42 @@ export function checkCreditsMiddleware( } } - const { success, remainingCredits, chunk } = await checkTeamCredits( - req.acuc ?? null, - req.auth.team_id, - minimum ?? 1, - ); - //todo(autumn) add .check call here + const requestedCredits = minimum ?? 1; + const useAutumnCheck = + !!req.auth.org_id && + isAutumnCheckEnabled(req.auth.org_id) && + !req.acuc?.is_extract; + + const autumnProperties = { + source: "checkCreditsMiddleware", + path: req.path, + }; + const [legacyCheck, autumnAllowed] = await Promise.all([ + checkTeamCredits(req.acuc ?? null, req.auth.team_id, requestedCredits), + useAutumnCheck + ? autumnService.checkCredits({ + teamId: req.auth.team_id, + value: requestedCredits, + properties: autumnProperties, + }) + : null, + ]); + let { success, remainingCredits, chunk } = legacyCheck; + + if (autumnAllowed !== null) { + if (autumnAllowed !== legacyCheck.success) { + logger.warn("Autumn check result diverged from legacy credit gate", { + teamId: req.auth.team_id, + path: req.path, + requestedCredits, + autumnAllowed, + legacyAllowed: legacyCheck.success, + }); + } + success = autumnAllowed; + remainingCredits = legacyCheck.remainingCredits; + } + if (chunk) { req.acuc = chunk; } @@ -204,9 +239,9 @@ export function authMiddleware( } } - const { team_id, chunk } = auth; + const { team_id, org_id, chunk } = auth; - req.auth = { team_id }; + req.auth = { team_id, org_id }; req.acuc = chunk ?? undefined; if (chunk) { req.account = { diff --git a/apps/api/src/services/autumn/__tests__/autumn.service.test.ts b/apps/api/src/services/autumn/__tests__/autumn.service.test.ts index e48302b04..f1096a740 100644 --- a/apps/api/src/services/autumn/__tests__/autumn.service.test.ts +++ b/apps/api/src/services/autumn/__tests__/autumn.service.test.ts @@ -70,8 +70,11 @@ jest.mock("../../supabase", () => ({ jest.mock("../../../config", () => ({ config: { + AUTUMN_CHECK_ENABLED: undefined, AUTUMN_EXPERIMENT: "true", AUTUMN_EXPERIMENT_PERCENT: 100, + AUTUMN_REQUEST_TRACK_EXPERIMENT: undefined, + AUTUMN_REQUEST_TRACK_EXPERIMENT_PERCENT: 100, }, })); @@ -80,7 +83,9 @@ import { AutumnService, BoundedMap, BoundedSet, + isAutumnCheckEnabled, isAutumnEnabled, + isAutumnRequestTrackEnabled, orgBucket, } from "../autumn.service"; import { config } from "../../../config"; @@ -97,6 +102,23 @@ function makeEntity(usage: number) { return { balances: { CREDITS: { usage } } }; } +function setAutumnConfig(overrides: { + AUTUMN_CHECK_ENABLED?: string; + AUTUMN_EXPERIMENT?: string; + AUTUMN_EXPERIMENT_PERCENT?: number; + AUTUMN_REQUEST_TRACK_EXPERIMENT?: string; + AUTUMN_REQUEST_TRACK_EXPERIMENT_PERCENT?: number; +} = {}) { + config.AUTUMN_CHECK_ENABLED = overrides.AUTUMN_CHECK_ENABLED; + config.AUTUMN_EXPERIMENT = overrides.AUTUMN_EXPERIMENT ?? "true"; + config.AUTUMN_EXPERIMENT_PERCENT = + overrides.AUTUMN_EXPERIMENT_PERCENT ?? 100; + config.AUTUMN_REQUEST_TRACK_EXPERIMENT = + overrides.AUTUMN_REQUEST_TRACK_EXPERIMENT; + config.AUTUMN_REQUEST_TRACK_EXPERIMENT_PERCENT = + overrides.AUTUMN_REQUEST_TRACK_EXPERIMENT_PERCENT ?? 100; +} + // --------------------------------------------------------------------------- // Test setup // --------------------------------------------------------------------------- @@ -105,6 +127,7 @@ beforeEach(() => { jest.clearAllMocks(); autumnClientRef = mockAutumnClient; supabaseStubData = { data: { org_id: "org-1" }, error: null }; + setAutumnConfig({ AUTUMN_CHECK_ENABLED: undefined }); mockCheck.mockResolvedValue({ allowed: true, customerId: "org-1", @@ -263,11 +286,11 @@ describe("ensureTrackingContext warm-cache short-circuit", () => { mockEntityGet.mockResolvedValue(makeEntity(0)); // Warm the caches. - await svc.reserveCredits({ teamId: "team-1", value: 5 }); + await svc.trackCredits({ teamId: "team-1", value: 5 }); const callsAfterWarm = mockEntityGet.mock.calls.length; // Subsequent call — should not touch provisioning. - await svc.reserveCredits({ teamId: "team-1", value: 5 }); + await svc.trackCredits({ teamId: "team-1", value: 5 }); // No additional getEntity calls for provisioning. expect(mockEntityGet.mock.calls.length).toBe(callsAfterWarm); @@ -340,21 +363,72 @@ describe("lockCredits", () => { }); // --------------------------------------------------------------------------- -// reserveCredits +// checkCredits // --------------------------------------------------------------------------- -describe("reserveCredits", () => { +describe("checkCredits", () => { + it("returns null when autumnClient is null", async () => { + autumnClientRef = null; + config.AUTUMN_CHECK_ENABLED = "true"; + const svc = makeService(); + const result = await svc.checkCredits({ teamId: "team-1", value: 10 }); + expect(result).toBeNull(); + expect(mockCheck).not.toHaveBeenCalled(); + }); + + it("returns allowed on happy path without a lock", async () => { + config.AUTUMN_CHECK_ENABLED = "true"; + const svc = makeService(); + const result = await svc.checkCredits({ + teamId: "team-1", + value: 42, + properties: { source: "checkCreditsMiddleware" }, + }); + + expect(result).toBe(true); + expect(mockCheck).toHaveBeenCalledWith( + expect.objectContaining({ + customerId: "org-1", + entityId: "team-1", + featureId: "CREDITS", + requiredBalance: 42, + properties: { source: "checkCreditsMiddleware" }, + }), + ); + expect(mockCheck).toHaveBeenCalledWith( + expect.not.objectContaining({ lock: expect.anything() }), + ); + }); + + it("returns false when Autumn denies the check", async () => { + config.AUTUMN_CHECK_ENABLED = "true"; + mockCheck.mockResolvedValue({ + allowed: false, + customerId: "org-1", + balance: null, + }); + const svc = makeService(); + const result = await svc.checkCredits({ teamId: "team-1", value: 10 }); + expect(result).toBe(false); + }); +}); + +// --------------------------------------------------------------------------- +// trackCredits +// --------------------------------------------------------------------------- + +describe("trackCredits", () => { it("returns false when autumnClient is null", async () => { autumnClientRef = null; const svc = makeService(); - const result = await svc.reserveCredits({ teamId: "team-1", value: 10 }); + const result = await svc.trackCredits({ teamId: "team-1", value: 10 }); expect(result).toBe(false); expect(mockTrack).not.toHaveBeenCalled(); }); it("returns false for preview teams", async () => { const svc = makeService(); - const result = await svc.reserveCredits({ + const result = await svc.trackCredits({ teamId: "preview_abc", value: 10, }); @@ -365,7 +439,7 @@ describe("reserveCredits", () => { it("calls track with correct feature and value on happy path", async () => { const svc = makeService(); - const result = await svc.reserveCredits({ + const result = await svc.trackCredits({ teamId: "team-1", value: 42, properties: { source: "test", endpoint: "extract" }, @@ -380,6 +454,15 @@ describe("reserveCredits", () => { expect(usageCall).toBeDefined(); expect((usageCall as any[])[0].properties?.endpoint).toBe("extract"); }); + + it("returns false when the Autumn track request fails", async () => { + mockTrack.mockRejectedValueOnce(new Error("track failed")); + const svc = makeService(); + + expect(await svc.trackCredits({ teamId: "team-1", value: 42 })).toBe( + false, + ); + }); }); // --------------------------------------------------------------------------- @@ -477,9 +560,7 @@ describe("orgBucket", () => { describe("isAutumnEnabled", () => { afterEach(() => { - // Restore defaults for other tests. - (config as any).AUTUMN_EXPERIMENT = "true"; - (config as any).AUTUMN_EXPERIMENT_PERCENT = 100; + setAutumnConfig({ AUTUMN_CHECK_ENABLED: undefined }); }); it("returns true when experiment is enabled and percent is 100", () => { @@ -487,37 +568,73 @@ describe("isAutumnEnabled", () => { }); it("returns true without orgId even when percent < 100 (fast bail-out only)", () => { - (config as any).AUTUMN_EXPERIMENT_PERCENT = 0; + config.AUTUMN_EXPERIMENT_PERCENT = 0; // Without orgId the percent gate is skipped — only the on/off flag matters. expect(isAutumnEnabled()).toBe(true); }); it("returns false when AUTUMN_EXPERIMENT is not 'true'", () => { - (config as any).AUTUMN_EXPERIMENT = undefined; + config.AUTUMN_EXPERIMENT = undefined; expect(isAutumnEnabled()).toBe(false); }); it("returns false for an orgId whose bucket >= percent", () => { // orgBucket("a1b2c3d4-...") = 16, so percent=10 should exclude it. - (config as any).AUTUMN_EXPERIMENT_PERCENT = 10; + config.AUTUMN_EXPERIMENT_PERCENT = 10; expect(isAutumnEnabled("a1b2c3d4-0000-0000-0000-000000000000")).toBe(false); }); it("returns true for an orgId whose bucket < percent", () => { // orgBucket("a1b2c3d4-...") = 16, so percent=50 should include it. - (config as any).AUTUMN_EXPERIMENT_PERCENT = 50; + config.AUTUMN_EXPERIMENT_PERCENT = 50; expect(isAutumnEnabled("a1b2c3d4-0000-0000-0000-000000000000")).toBe(true); }); }); +describe("isAutumnCheckEnabled", () => { + afterEach(() => { + setAutumnConfig({ AUTUMN_CHECK_ENABLED: undefined }); + }); + + it("returns false when AUTUMN_CHECK_ENABLED is not 'true'", () => { + config.AUTUMN_CHECK_ENABLED = undefined; + expect(isAutumnCheckEnabled()).toBe(false); + }); + + it("returns false when Autumn experiment is disabled", () => { + config.AUTUMN_CHECK_ENABLED = "true"; + config.AUTUMN_EXPERIMENT = undefined; + expect(isAutumnCheckEnabled()).toBe(false); + }); + + it("returns true only when both check flag and experiment are enabled", () => { + config.AUTUMN_CHECK_ENABLED = "true"; + expect(isAutumnCheckEnabled()).toBe(true); + }); +}); + +describe("isAutumnRequestTrackEnabled", () => { + afterEach(() => { + setAutumnConfig({ AUTUMN_REQUEST_TRACK_EXPERIMENT: undefined }); + }); + + it("returns false when request tracking flag is not 'true'", () => { + expect(isAutumnRequestTrackEnabled()).toBe(false); + }); + + it("returns true only when both request tracking and Autumn experiment are enabled", () => { + config.AUTUMN_REQUEST_TRACK_EXPERIMENT = "true"; + expect(isAutumnRequestTrackEnabled()).toBe(true); + }); +}); + describe("experiment gate on lockCredits", () => { afterEach(() => { - (config as any).AUTUMN_EXPERIMENT = "true"; - (config as any).AUTUMN_EXPERIMENT_PERCENT = 100; + setAutumnConfig(); }); it("lockCredits returns null when experiment is disabled", async () => { - (config as any).AUTUMN_EXPERIMENT = undefined; + config.AUTUMN_EXPERIMENT = undefined; const svc = makeService(); const result = await svc.lockCredits({ teamId: "team-1", value: 10 }); expect(result).toBeNull(); @@ -530,7 +647,7 @@ describe("experiment gate on lockCredits", () => { data: { org_id: "a1b2c3d4-0000-0000-0000-000000000000" }, error: null, }; - (config as any).AUTUMN_EXPERIMENT_PERCENT = 10; + config.AUTUMN_EXPERIMENT_PERCENT = 10; const svc = makeService(); const result = await svc.lockCredits({ teamId: "team-1", value: 10 }); expect(result).toBeNull(); @@ -543,7 +660,7 @@ describe("experiment gate on lockCredits", () => { data: { org_id: "a1b2c3d4-0000-0000-0000-000000000000" }, error: null, }; - (config as any).AUTUMN_EXPERIMENT_PERCENT = 50; + config.AUTUMN_EXPERIMENT_PERCENT = 50; const svc = makeService(); const result = await svc.lockCredits({ teamId: "team-1", @@ -555,22 +672,22 @@ describe("experiment gate on lockCredits", () => { }); it("refundCredits still works when experiment is disabled (guard is autumnReserved)", async () => { - (config as any).AUTUMN_EXPERIMENT = undefined; + config.AUTUMN_EXPERIMENT = undefined; const svc = makeService(); // Warm the caches so refund can resolve the tracking context. - (config as any).AUTUMN_EXPERIMENT = "true"; - await svc.reserveCredits({ teamId: "team-1", value: 10 }); + config.AUTUMN_EXPERIMENT = "true"; + await svc.trackCredits({ teamId: "team-1", value: 10 }); jest.clearAllMocks(); // Disable experiment — refund must still succeed to avoid orphaned credits. - (config as any).AUTUMN_EXPERIMENT = undefined; + config.AUTUMN_EXPERIMENT = undefined; mockTrack.mockResolvedValue(undefined); await svc.refundCredits({ teamId: "team-1", value: 10 }); expect(mockTrack).toHaveBeenCalled(); }); it("ensureTeamProvisioned still works when experiment is disabled (handled by firecrawl-web)", async () => { - (config as any).AUTUMN_EXPERIMENT = undefined; + config.AUTUMN_EXPERIMENT = undefined; const svc = makeService(); await svc.ensureTeamProvisioned({ teamId: "team-1", orgId: "org-1" }); // Provisioning should proceed — firecrawl-web edge functions do this diff --git a/apps/api/src/services/autumn/autumn.service.ts b/apps/api/src/services/autumn/autumn.service.ts index 25227fa5f..c735b9f04 100644 --- a/apps/api/src/services/autumn/autumn.service.ts +++ b/apps/api/src/services/autumn/autumn.service.ts @@ -40,7 +40,7 @@ export function orgBucket(orgId: string): number { * same answer. * * Only checked at the top-level billing entry points (`lockCredits` and the - * legacy direct-track `reserveCredits`). + * direct-track `trackCredits`). * NOT checked by `finalizeCreditsLock`, `refundCredits`, or * `ensureTeamProvisioned`. */ @@ -50,6 +50,23 @@ export function isAutumnEnabled(orgId?: string): boolean { return orgBucket(orgId) < config.AUTUMN_EXPERIMENT_PERCENT; } +export function isAutumnCheckEnabled(orgId?: string): boolean { + if (config.AUTUMN_CHECK_ENABLED !== "true") return false; + if (config.AUTUMN_EXPERIMENT !== "true") return false; + const percent = config.AUTUMN_CHECK_EXPERIMENT_PERCENT ?? 100; + if (!orgId || percent >= 100) return true; + return orgBucket(orgId) < percent; +} + +export function isAutumnRequestTrackEnabled(orgId?: string): boolean { + if (config.AUTUMN_REQUEST_TRACK_EXPERIMENT !== "true") return false; + if (!isAutumnEnabled(orgId)) return false; + if (!orgId || config.AUTUMN_REQUEST_TRACK_EXPERIMENT_PERCENT >= 100) { + return true; + } + return orgBucket(orgId) < config.AUTUMN_REQUEST_TRACK_EXPERIMENT_PERCENT; +} + const AUTUMN_DEFAULT_PLAN_ID = "free"; /** * Size-bounded Map with FIFO eviction. When the map is at capacity the oldest @@ -204,8 +221,8 @@ export class AutumnService { featureId, value, properties, - }: TrackParams): Promise { - if (!autumnClient) return; + }: TrackParams): Promise { + if (!autumnClient) return false; try { await autumnClient.track({ @@ -221,6 +238,7 @@ export class AutumnService { featureId, value, }); + return true; } catch (error) { logger.warn("Autumn track failed", { customerId, @@ -229,6 +247,7 @@ export class AutumnService { value, error, }); + return false; } } @@ -326,6 +345,54 @@ export class AutumnService { return orgId; } + /** + * Checks whether a team has enough Autumn balance to cover a request. + * Returns null when Autumn gating is unavailable and callers should fall back. + */ + async checkCredits({ + teamId, + value, + properties, + }: TrackCreditsParams): Promise { + if ( + !isAutumnCheckEnabled() || + !autumnClient || + this.isPreviewTeam(teamId) + ) { + return null; + } + + try { + const orgId = await this.resolveOrgId(teamId); + if (!isAutumnCheckEnabled(orgId)) return null; + + const customerId = await this.ensureTrackingContext(teamId); + const { allowed } = await autumnClient.check({ + customerId, + entityId: teamId, + featureId: CREDITS_FEATURE_ID, + requiredBalance: value, + properties, + }); + + logger.debug("Autumn checkCredits completed", { + customerId, + entityId: teamId, + featureId: CREDITS_FEATURE_ID, + value, + allowed, + }); + return allowed; + } catch (error) { + logger.warn("Autumn checkCredits failed", { + teamId, + value, + error, + }); + return null; + } + } + /** * Reserves a team's credits in Autumn without letting Autumn gate usage. * Returns the lock ID on success, or null if no lock was acquired. @@ -424,39 +491,42 @@ export class AutumnService { } /** - * Records a credit usage event directly in Autumn for flows that do not - * acquire a request-time lock. Returns true on success. + * Records a credit usage event directly in Autumn. Returns true on success. * * The experiment gate is evaluated here — once per request — using a stable * bucket derived from the org UUID so the same org always gets the same * answer for a given AUTUMN_EXPERIMENT_PERCENT value. */ - async reserveCredits({ + async trackCredits({ teamId, value, properties, + requestScoped = false, }: TrackCreditsParams): Promise { - if (!isAutumnEnabled()) return false; // fast bail-out: experiment off + const isEnabled = requestScoped + ? isAutumnRequestTrackEnabled + : isAutumnEnabled; + if (!isEnabled()) return false; if (!autumnClient) return false; if (this.isPreviewTeam(teamId)) return false; try { const orgId = await this.resolveOrgId(teamId); - if (!isAutumnEnabled(orgId)) return false; // stable percent gate + if (!isEnabled(orgId)) return false; const customerId = await this.ensureTrackingContext(teamId); - await this.track({ + return await this.track({ customerId, entityId: teamId, featureId: CREDITS_FEATURE_ID, value, properties, }); - return true; } catch (error) { - logger.warn("Autumn reserveCredits failed", { + logger.warn("Autumn trackCredits failed", { teamId, value, + requestScoped, error, }); return false; @@ -464,7 +534,7 @@ export class AutumnService { } /** - * Reverses a prior reserveCredits call by tracking a negative usage event. + * Reverses a prior trackCredits call by tracking a negative usage event. */ async refundCredits({ teamId, diff --git a/apps/api/src/services/autumn/types.ts b/apps/api/src/services/autumn/types.ts index ec05e5c5c..d863aa5b8 100644 --- a/apps/api/src/services/autumn/types.ts +++ b/apps/api/src/services/autumn/types.ts @@ -56,6 +56,7 @@ export type TrackCreditsParams = { teamId: string; value: number; properties?: Record; + requestScoped?: boolean; }; export type CreateEntityResult = diff --git a/apps/api/src/services/billing/__tests__/batch_billing.test.ts b/apps/api/src/services/billing/__tests__/batch_billing.test.ts index 0b6e617e8..ce4fbb35b 100644 --- a/apps/api/src/services/billing/__tests__/batch_billing.test.ts +++ b/apps/api/src/services/billing/__tests__/batch_billing.test.ts @@ -20,12 +20,12 @@ jest.mock("../../../lib/withAuth", () => ({ withAuth, })); -const reserveCredits = jest.fn<(args: any) => Promise>(); -const finalizeCreditsLock = jest.fn<(args: any) => Promise>(); +const trackCredits = jest.fn<(args: any) => Promise>(); +const refundCredits = jest.fn<(args: any) => Promise>(); jest.mock("../../autumn/autumn.service", () => ({ autumnService: { - reserveCredits, - finalizeCreditsLock, + trackCredits, + refundCredits, }, })); @@ -117,134 +117,128 @@ function makeOp(overrides: Record = {}) { is_extract: false, timestamp: "2026-03-13T00:00:00.000Z", api_key_id: 123, - autumnLockId: null, - autumnProperties: { - source: "billTeam", - endpoint: "extract", - apiKeyId: 123, - }, ...overrides, }); } -function deferred() { - let resolve!: (value: T | PromiseLike) => void; - let reject!: (reason?: unknown) => void; - const promise = new Promise((res, rej) => { - resolve = res; - reject = rej; - }); - return { promise, resolve, reject }; -} - beforeEach(() => { jest.clearAllMocks(); queue = []; billedTeams.clear(); locks.clear(); - reserveCredits.mockResolvedValue(true); - finalizeCreditsLock.mockResolvedValue(undefined); rpc.mockResolvedValue({ data: [], error: null }); + trackCredits.mockResolvedValue(true); + refundCredits.mockResolvedValue(undefined); }); describe("processBillingBatch", () => { - it("awaits lock confirmation before tracking unlocked credits", async () => { - const finalize = deferred(); - finalizeCreditsLock.mockReturnValueOnce(finalize.promise); - queue = [ - makeOp({ credits: 7, autumnLockId: "lock-1" }), - makeOp({ credits: 3, autumnLockId: null }), - ]; - - const run = processBillingBatch(); - await new Promise(resolve => setImmediate(resolve)); - - expect(finalizeCreditsLock).toHaveBeenCalledWith({ - lockId: "lock-1", - action: "confirm", - properties: expect.objectContaining({ - source: "billTeam", - apiKeyId: 123, - subscriptionId: "sub-1", - finalizeSource: "processBillingBatch", - }), - }); - expect(reserveCredits).not.toHaveBeenCalled(); + it("tracks queued Autumn usage when the request path did not", async () => { + queue = [makeOp()]; - finalize.resolve(); - await run; + await processBillingBatch(); - expect(reserveCredits).toHaveBeenCalledWith({ + expect(rpc).toHaveBeenCalled(); + expect(trackCredits).toHaveBeenCalledWith({ teamId: "team-1", - value: 3, - properties: expect.objectContaining({ + value: 10, + properties: { source: "processBillingBatch", + endpoint: "extract", apiKeyId: 123, subscriptionId: "sub-1", - }), + }, }); + expect(captureException).not.toHaveBeenCalled(); }); - it("releases Autumn locks when billing returns success false", async () => { - queue = [makeOp({ autumnLockId: "lock-1" })]; - rpc.mockResolvedValueOnce({ data: null, error: new Error("db failed") }); + it("skips Autumn tracking when the request path already tracked the op", async () => { + queue = [makeOp({ autumnTrackInRequest: true })]; await processBillingBatch(); - expect(finalizeCreditsLock).toHaveBeenCalledWith({ - lockId: "lock-1", - action: "release", - properties: expect.objectContaining({ - source: "billTeam", - finalizeSource: "processBillingBatch_failure", - }), - }); - expect(reserveCredits).not.toHaveBeenCalled(); + expect(rpc).toHaveBeenCalled(); + expect(trackCredits).not.toHaveBeenCalled(); }); - it("releases Autumn locks when billing throws", async () => { - queue = [makeOp({ autumnLockId: "lock-1" })]; - rpc.mockRejectedValueOnce(new Error("rpc exploded")); + it("continues when billing returns success false", async () => { + queue = [makeOp({ autumnTrackInRequest: true })]; + rpc.mockResolvedValueOnce({ data: null, error: new Error("db failed") }); await processBillingBatch(); - expect(finalizeCreditsLock).toHaveBeenCalledWith({ - lockId: "lock-1", - action: "release", - properties: expect.objectContaining({ - source: "billTeam", - finalizeSource: "processBillingBatch_exception", - }), + expect(refundCredits).toHaveBeenCalledWith({ + teamId: "team-1", + value: 10, + properties: { + source: "processBillingBatch", + endpoint: "extract", + apiKeyId: 123, + subscriptionId: "sub-1", + }, }); expect(captureException).toHaveBeenCalled(); }); - it("treats undefined autumnLockId as unlocked for legacy queued ops", async () => { - queue = [makeOp({ autumnLockId: undefined })]; + it("captures exceptions when billing throws", async () => { + queue = [makeOp({ autumnTrackInRequest: true })]; + rpc.mockRejectedValueOnce(new Error("rpc exploded")); await processBillingBatch(); - expect(finalizeCreditsLock).not.toHaveBeenCalled(); - expect(reserveCredits).toHaveBeenCalledWith({ + expect(refundCredits).toHaveBeenCalledWith({ teamId: "team-1", value: 10, - properties: expect.objectContaining({ + properties: { source: "processBillingBatch", - }), + endpoint: "extract", + apiKeyId: 123, + subscriptionId: "sub-1", + }, }); + expect(captureException).toHaveBeenCalled(); }); - it("survives unexpected synchronous finalizeAutumnLocks failures", async () => { - queue = [makeOp({ autumnLockId: "lock-1" })]; - finalizeCreditsLock.mockImplementationOnce(() => { - throw new Error("sync finalize failure"); - }); + it("continues processing later groups when Autumn refund fails", async () => { + queue = [ + makeOp({ + team_id: "team-1", + subscription_id: "sub-1", + autumnTrackInRequest: true, + }), + makeOp({ + team_id: "team-2", + subscription_id: "sub-2", + autumnTrackInRequest: false, + }), + ]; + rpc + .mockResolvedValueOnce({ data: null, error: new Error("db failed") }) + .mockResolvedValueOnce({ data: [], error: null }); + refundCredits.mockRejectedValueOnce(new Error("refund failed")); await processBillingBatch(); - expect(logger.warn).toHaveBeenCalledWith( - "Autumn finalizeAutumnLocks failed unexpectedly", - expect.objectContaining({ team_id: "team-1", action: "confirm" }), - ); + expect(refundCredits).toHaveBeenCalledWith({ + teamId: "team-1", + value: 10, + properties: { + source: "processBillingBatch", + endpoint: "extract", + apiKeyId: 123, + subscriptionId: "sub-1", + }, + }); + expect(rpc).toHaveBeenCalledTimes(2); + expect(trackCredits).toHaveBeenCalledWith({ + teamId: "team-2", + value: 10, + properties: { + source: "processBillingBatch", + endpoint: "extract", + apiKeyId: 123, + subscriptionId: "sub-2", + }, + }); + expect(captureException).toHaveBeenCalled(); }); }); diff --git a/apps/api/src/services/billing/__tests__/credit_billing.test.ts b/apps/api/src/services/billing/__tests__/credit_billing.test.ts new file mode 100644 index 000000000..beb3d3606 --- /dev/null +++ b/apps/api/src/services/billing/__tests__/credit_billing.test.ts @@ -0,0 +1,121 @@ +import { jest } from "@jest/globals"; + +const withAuth = jest.fn((fn: any) => fn); +jest.mock("../../../lib/withAuth", () => ({ + withAuth, +})); + +const queueBillingOperation = jest.fn<(args: any[]) => Promise>(); +jest.mock("../batch_billing", () => ({ + queueBillingOperation: (...args: any[]) => queueBillingOperation(args), +})); + +const trackCredits = jest.fn<(args: any) => Promise>(); +const refundCredits = jest.fn<(args: any) => Promise>(); +jest.mock("../../autumn/autumn.service", () => ({ + autumnService: { + trackCredits, + refundCredits, + }, +})); + +jest.mock("../../notification/email_notification", () => ({ + sendNotification: jest.fn(), +})); +jest.mock("../../supabase", () => ({ + supabase_rr_service: {}, + supabase_service: {}, +})); +jest.mock("../auto_charge", () => ({ + autoCharge: jest.fn(), +})); +jest.mock("../../redis", () => ({ + getValue: jest.fn(), + setValue: jest.fn(), +})); +jest.mock("../../../lib/logger", () => ({ + logger: { + info: jest.fn(), + warn: jest.fn(), + error: jest.fn(), + }, +})); + +import { billTeam } from "../credit_billing"; + +beforeEach(() => { + jest.clearAllMocks(); + queueBillingOperation.mockResolvedValue({ success: true }); + trackCredits.mockResolvedValue(true); + refundCredits.mockResolvedValue(undefined); +}); + +describe("billTeam", () => { + it("marks billing as already tracked when request tracking succeeds", async () => { + await billTeam("team-1", "sub-1", 3, 123, { + endpoint: "search", + jobId: "job-1", + }); + + expect(queueBillingOperation).toHaveBeenCalledWith([ + "team-1", + "sub-1", + 3, + 123, + { endpoint: "search", jobId: "job-1" }, + false, + true, + ]); + expect(trackCredits).toHaveBeenCalledWith({ + teamId: "team-1", + value: 3, + properties: { + source: "billTeam", + endpoint: "search", + jobId: "job-1", + apiKeyId: 123, + }, + requestScoped: true, + }); + }); + + it("refunds Autumn when queueing fails after request tracking", async () => { + queueBillingOperation.mockResolvedValueOnce({ success: false }); + + await billTeam("team-1", "sub-1", 3, 123, { + endpoint: "search", + jobId: "job-1", + }); + + expect(refundCredits).toHaveBeenCalledWith({ + teamId: "team-1", + value: 3, + properties: { + source: "billTeam", + endpoint: "search", + jobId: "job-1", + apiKeyId: 123, + }, + }); + }); + + it("leaves batch tracking enabled when request tracking is off", async () => { + trackCredits.mockResolvedValueOnce(false); + + await billTeam("team-1", "sub-1", 3, 123, { + endpoint: "search", + jobId: "job-1", + }); + + expect(queueBillingOperation).toHaveBeenCalledWith([ + "team-1", + "sub-1", + 3, + 123, + { endpoint: "search", jobId: "job-1" }, + false, + false, + ]); + expect(refundCredits).not.toHaveBeenCalled(); + }); +}); diff --git a/apps/api/src/services/billing/batch_billing.ts b/apps/api/src/services/billing/batch_billing.ts index b7f734a0f..00005c2bd 100644 --- a/apps/api/src/services/billing/batch_billing.ts +++ b/apps/api/src/services/billing/batch_billing.ts @@ -30,9 +30,7 @@ interface BillingOperation { is_extract: boolean; timestamp: string; api_key_id: number | null; - /** Autumn lock ID acquired at request time, if any. */ - autumnLockId: string | null; - autumnProperties?: Record; + autumnTrackInRequest: boolean; } // Grouped billing operations for batch processing @@ -65,52 +63,37 @@ async function releaseLock() { logger.info("🔓 Released billing batch processing lock"); } -async function finalizeAutumnLocks( - group: GroupedBillingOperation, - action: "confirm" | "release", - source: string, -) { - const lockedOperations = group.operations.filter( - ( - op, - ): op is BillingOperation & { - autumnLockId: string; - } => typeof op.autumnLockId === "string", - ); +async function refundRequestTrackedCredits(group: GroupedBillingOperation) { + const requestTrackedCredits = group.operations + .filter(op => op.autumnTrackInRequest) + .reduce((sum, op) => sum + op.credits, 0); - if (lockedOperations.length === 0) return; + if (requestTrackedCredits <= 0) return; try { - const results = await Promise.allSettled( - lockedOperations.map(op => - autumnService.finalizeCreditsLock({ - lockId: op.autumnLockId, - action, - properties: { - ...op.autumnProperties, - subscriptionId: op.subscription_id, - finalizeSource: source, - }, - }), - ), - ); - - const rejectedCount = results.filter( - result => result.status === "rejected", - ).length; - if (rejectedCount > 0) { - logger.warn("Autumn finalizeCreditsLock rejected unexpectedly", { - team_id: group.team_id, - action, - rejectedCount, - }); - } + await autumnService.refundCredits({ + teamId: group.team_id, + value: requestTrackedCredits, + properties: { + source: "processBillingBatch", + ...toAutumnBillingProperties(group.billing), + apiKeyId: group.api_key_id, + subscriptionId: group.subscription_id, + }, + }); } catch (error) { - logger.warn("Autumn finalizeAutumnLocks failed unexpectedly", { - team_id: group.team_id, - action, - operation_count: lockedOperations.length, + logger.warn("Failed to refund Autumn request-tracked credits", { error, + team_id: group.team_id, + credits: requestTrackedCredits, + billing: group.billing, + }); + Sentry.captureException(error, { + data: { + operation: "batch_billing_refund", + team_id: group.team_id, + credits: requestTrackedCredits, + }, }); } } @@ -118,10 +101,6 @@ async function finalizeAutumnLocks( /** * Dequeues pending billing operations from Redis, groups them by team, and * commits each group to Supabase via the `bill_team_6` RPC. - * - * For groups where credits were locked in Autumn (`autumnLockId != null`), - * the lock is confirmed on success and released on failure. For unlocked - * groups (legacy / BullMQ path), Autumn is updated post-commit. */ export async function processBillingBatch() { const redis = getRedisConnection(); @@ -197,8 +176,8 @@ export async function processBillingBatch() { continue; } - const unreservedCredits = group.operations - .filter(op => op.autumnLockId == null) + const batchTrackedCredits = group.operations + .filter(op => !op.autumnTrackInRequest) .reduce((sum, op) => sum + op.credits, 0); try { @@ -216,19 +195,15 @@ export async function processBillingBatch() { ); if (!billingResult.success) { + await refundRequestTrackedCredits(group); logger.warn( - `⚠️ Billing returned success: false for team ${group.team_id}, skipping Autumn tracking`, + `⚠️ Billing returned success: false for team ${group.team_id}`, { billingResult, team_id: group.team_id, credits: group.total_credits, }, ); - await finalizeAutumnLocks( - group, - "release", - "processBillingBatch_failure", - ); continue; } @@ -236,14 +211,10 @@ export async function processBillingBatch() { `✅ Successfully billed team ${group.team_id} for ${group.total_credits} credits`, ); - await finalizeAutumnLocks(group, "confirm", "processBillingBatch"); - - // Track only unlocked credits post-commit; locked credits are - // confirmed above and should not be re-tracked. - if (unreservedCredits > 0) { - await autumnService.reserveCredits({ + if (batchTrackedCredits > 0) { + await autumnService.trackCredits({ teamId: group.team_id, - value: unreservedCredits, + value: batchTrackedCredits, properties: { source: "processBillingBatch", ...toAutumnBillingProperties(group.billing), @@ -252,7 +223,9 @@ export async function processBillingBatch() { }, }); } + } catch (error) { + await refundRequestTrackedCredits(group); logger.error(`❌ Failed to bill team ${group.team_id}`, { error, group, @@ -264,11 +237,6 @@ export async function processBillingBatch() { credits: group.total_credits, }, }); - await finalizeAutumnLocks( - group, - "release", - "processBillingBatch_exception", - ); } } @@ -305,9 +273,7 @@ export function startBillingBatchProcessing() { /** * Enqueues a billing operation for async batch processing. * - * Pass `autumnLockId` when credits were locked in Autumn via - * `autumnService.lockCredits()` — the batch processor will release on - * `bill_team_6` failure and confirm on success. + * Internal billing operations are batched and committed to Supabase. */ export async function queueBillingOperation( team_id: string, @@ -316,8 +282,7 @@ export async function queueBillingOperation( api_key_id: number | null, billing: BillingMetadata, is_extract: boolean = false, - autumnLockId: string | null = null, - autumnProperties?: Record, + autumnTrackInRequest: boolean = false, ) { // Skip queuing for preview teams if (team_id === "preview" || team_id.startsWith("preview_")) { @@ -342,8 +307,7 @@ export async function queueBillingOperation( is_extract, timestamp: new Date().toISOString(), api_key_id, - autumnLockId, - autumnProperties, + autumnTrackInRequest, }; // Add operation to Redis list diff --git a/apps/api/src/services/billing/credit_billing.ts b/apps/api/src/services/billing/credit_billing.ts index 52d63bea4..f951620cc 100644 --- a/apps/api/src/services/billing/credit_billing.ts +++ b/apps/api/src/services/billing/credit_billing.ts @@ -37,23 +37,32 @@ export async function billTeam( ...toAutumnBillingProperties(billing), apiKeyId: api_key_id, }; - // Acquire an Autumn lock opportunistically, but never gate usage on it. - // billTeam is fire-and-forget at call sites, so this does not block responses. - const autumnLockId = await autumnService.lockCredits({ + const trackedInRequest = await autumnService.trackCredits({ teamId: team_id, value: credits, properties: autumnProperties, + requestScoped: true, }); - return queueBillingOperation( + + const result = await queueBillingOperation( team_id, subscription_id, credits, api_key_id, billing, false, - autumnLockId, - autumnProperties, + trackedInRequest, ); + + if (!result.success && trackedInRequest) { + await autumnService.refundCredits({ + teamId: team_id, + value: credits, + properties: autumnProperties, + }); + } + + return result; }, { success: true, message: "No DB, bypassed." }, )(team_id, subscription_id, credits, api_key_id, billing, logger); @@ -78,6 +87,33 @@ export async function checkTeamCredits( })(chunk, team_id, credits); } +function evaluateTeamCredits( + chunk: AuthCreditUsageChunk, + credits: number, + isAutoRechargeEnabled: boolean, +) { + const allowOverages = + chunk.price_should_be_graceful && isAutoRechargeEnabled; + const remainingCredits = allowOverages + ? chunk.remaining_credits + chunk.price_credits + : chunk.remaining_credits; + const creditsWillBeUsed = chunk.adjusted_credits_used + credits; + const totalPriceCredits = allowOverages + ? (chunk.total_credits_sum ?? 100000000) + chunk.price_credits + : (chunk.total_credits_sum ?? 100000000); + const creditUsagePercentage = + chunk.adjusted_credits_used / (chunk.total_credits_sum ?? 100000000); + + return { + allowOverages, + remainingCredits, + creditsWillBeUsed, + totalPriceCredits, + creditUsagePercentage, + success: creditsWillBeUsed <= totalPriceCredits, + }; +} + // if team has enough credits for the operation, return true, else return false async function supaCheckTeamCredits( chunk: AuthCreditUsageChunk | null, @@ -127,23 +163,14 @@ async function supaCheckTeamCredits( } } - // Graceful billing only applies if the plan supports it AND auto-recharge is enabled - const allowOverages = chunk.price_should_be_graceful && isAutoRechargeEnabled; - - const remainingCredits = allowOverages - ? chunk.remaining_credits + chunk.price_credits - : chunk.remaining_credits; - - const creditsWillBeUsed = chunk.adjusted_credits_used + credits; - - // In case chunk.price_credits is undefined, set it to a large number to avoid mistakes - const totalPriceCredits = allowOverages - ? (chunk.total_credits_sum ?? 100000000) + chunk.price_credits - : (chunk.total_credits_sum ?? 100000000); - - // Removal of + credits - const creditUsagePercentage = - chunk.adjusted_credits_used / (chunk.total_credits_sum ?? 100000000); + const { + success, + allowOverages, + remainingCredits, + creditsWillBeUsed, + totalPriceCredits, + creditUsagePercentage, + } = evaluateTeamCredits(chunk, credits, isAutoRechargeEnabled); if ( isAutoRechargeEnabled && @@ -199,7 +226,7 @@ async function supaCheckTeamCredits( } // Compare the adjusted total credits used with the credits allowed by the plan (and graceful) - if (creditsWillBeUsed > totalPriceCredits) { + if (!success) { logger.warn("Credit check failed - insufficient credits", { team_id, teamId: team_id, @@ -236,7 +263,7 @@ async function supaCheckTeamCredits( return { success: true, message: "Sufficient credits available", - remainingCredits: chunk.remaining_credits, + remainingCredits, chunk, }; } diff --git a/apps/api/src/services/indexing/index-worker.ts b/apps/api/src/services/indexing/index-worker.ts index f1fa0dc05..52dcd4e2c 100644 --- a/apps/api/src/services/indexing/index-worker.ts +++ b/apps/api/src/services/indexing/index-worker.ts @@ -81,7 +81,16 @@ const processBillingJobInternal = async (token: string, job: Job) => { await processBillingBatch(); } else if (job.name === "bill_team") { // This is an individual billing operation that should be queued for batch processing - const { team_id, subscription_id, credits, billing, endpoint, is_extract, api_key_id } = + const { + team_id, + subscription_id, + credits, + billing, + endpoint, + is_extract, + api_key_id, + autumnTrackInRequest, + } = job.data; logger.info(`Adding team ${team_id} billing operation to batch queue`, { @@ -101,6 +110,7 @@ const processBillingJobInternal = async (token: string, job: Job) => { isExtract: is_extract, }), is_extract, + autumnTrackInRequest, ); } else { logger.warn(`Unknown billing job type: ${job.name}`); diff --git a/apps/api/src/services/worker/scrape-worker.ts b/apps/api/src/services/worker/scrape-worker.ts index 94cc00dd6..e9dbcf241 100644 --- a/apps/api/src/services/worker/scrape-worker.ts +++ b/apps/api/src/services/worker/scrape-worker.ts @@ -30,7 +30,11 @@ import { StoredCrawl, } from "../../lib/crawl-redis"; import { redisEvictConnection } from "../redis"; -import { resolveBillingMetadata } from "../billing/types"; +import { + resolveBillingMetadata, + toAutumnBillingProperties, +} from "../billing/types"; +import { autumnService } from "../autumn/autumn.service"; import { _addScrapeJobToBullMQ, addScrapeJob, @@ -99,6 +103,17 @@ async function billScrapeJob( unsupportedFeatures?: Set, ) { let creditsToBeBilled: number | null = null; + const billing = resolveBillingMetadata({ + billing: job.data.billing, + crawlId: job.data.crawl_id, + crawlerOptions: job.data.crawlerOptions, + }); + const autumnProperties = { + source: "billScrapeJob", + ...toAutumnBillingProperties(billing), + apiKeyId: job.data.apiKeyId, + }; + let trackedInRequest = false; if (job.data.is_scrape !== true && !job.data.internalOptions?.bypassBilling) { creditsToBeBilled = await calculateCreditsToBeBilled( @@ -116,10 +131,11 @@ async function billScrapeJob( config.USE_DB_AUTHENTICATION ) { try { - const billing = resolveBillingMetadata({ - billing: job.data.billing, - crawlId: job.data.crawl_id, - crawlerOptions: job.data.crawlerOptions, + trackedInRequest = await autumnService.trackCredits({ + teamId: job.data.team_id, + value: creditsToBeBilled, + properties: autumnProperties, + requestScoped: true, }); const billingJobId = uuidv7(); logger.debug( @@ -144,18 +160,27 @@ async function billScrapeJob( timestamp: new Date().toISOString(), originating_job_id: job.id, api_key_id: job.data.apiKeyId, + autumnTrackInRequest: trackedInRequest, }, { jobId: billingJobId, priority: 10, }, ); + return creditsToBeBilled; } catch (error) { logger.error( `Failed to add billing job to queue for team ${job.data.team_id} for ${creditsToBeBilled} credits`, { error }, ); + if (trackedInRequest && creditsToBeBilled !== null) { + await autumnService.refundCredits({ + teamId: job.data.team_id, + value: creditsToBeBilled, + properties: autumnProperties, + }); + } captureExceptionWithZdrCheck(error, { extra: { zeroDataRetention: job.data.zeroDataRetention ?? false }, }); diff --git a/apps/api/src/types.ts b/apps/api/src/types.ts index b933da50f..e9437594c 100644 --- a/apps/api/src/types.ts +++ b/apps/api/src/types.ts @@ -151,6 +151,7 @@ export type AuthResponse = | { success: true; team_id: string; + org_id?: string | null; api_key?: string; chunk: AuthCreditUsageChunk | null; }