Skip to content

Commit 417fb39

Browse files
committed
perf(run-store,webapp): scope idempotency dedup to one table for non-v2 orgs, add cross-table tests
The idempotency-key dedup is a non-id predicate, so RunStore read BOTH run tables in parallel on every idempotency-keyed trigger, including orgs not cut over to v2 (whose runs only live in TaskRun, so the task_run_v2 query is always empty; while native realtime is off that is every org). Add an optional `tables: "legacy" | "both"` scope to findRun and pass "legacy" from the idempotency concern when the org is not on v2, keeping the trigger hot path single-table. Backfills cross-table tests the audit flagged as missing: findRun legacy-scope skips task_run_v2, and clearIdempotencyKey fans out across both tables (byPredicate hits v2; a mixed byFriendlyIds array clears both).
1 parent 5f14bf3 commit 417fb39

4 files changed

Lines changed: 182 additions & 20 deletions

File tree

apps/webapp/app/runEngine/concerns/idempotencyKeys.server.ts

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,20 @@ export class IdempotencyKeyConcern {
302302
return { isCached: false, idempotencyKey, idempotencyKeyExpiresAt };
303303
}
304304

305+
// Resolve the org's v2-cutover state ONCE: it gates both the cross-table
306+
// idempotency lookup below and the pre-gate claim further down. While the
307+
// org is not on v2 (the default, and every org while native realtime is
308+
// off) the run can only be in the legacy table, so scope the dedup read to
309+
// "legacy" and skip the empty task_run_v2 query on the trigger hot path.
310+
const orgFeatureFlags =
311+
(request.environment.organization?.featureFlags as
312+
| Record<string, unknown>
313+
| null
314+
| undefined) ?? null;
315+
const orgUsesV2 = shouldUseV2RunTable(orgFeatureFlags, {
316+
nativeRealtimeEnabled: env.REALTIME_BACKEND_NATIVE_ENABLED === "1",
317+
});
318+
305319
const existingRun = idempotencyKey
306320
? await runStore.findRun(
307321
{
@@ -313,6 +327,7 @@ export class IdempotencyKeyConcern {
313327
include: {
314328
associatedWaitpoint: true,
315329
},
330+
tables: orgUsesV2 ? "both" : "legacy",
316331
},
317332
this.prisma
318333
)
@@ -408,11 +423,6 @@ export class IdempotencyKeyConcern {
408423
// TaskRun, ksuid -> task_run_v2); the per-table idempotency unique
409424
// constraints can't see each other, so neither INSERT raises P2002 and two
410425
// runs share one key. The Redis claim is the only backstop in that window.
411-
const orgFeatureFlags =
412-
(request.environment.organization?.featureFlags as
413-
| Record<string, unknown>
414-
| null
415-
| undefined) ?? null;
416426
// v2-cutover orgs: an idempotency-keyed trigger can straddle a `runTableV2`
417427
// flag flip into different physical tables (cuid -> TaskRun, ksuid ->
418428
// task_run_v2), and the per-table idempotency-key unique constraints can't
@@ -430,9 +440,7 @@ export class IdempotencyKeyConcern {
430440
// a pathological client. shouldUseV2RunTable is checked first so a v2 org
431441
// skips the mollifier-flag resolve entirely.
432442
const claimEligible =
433-
shouldUseV2RunTable(orgFeatureFlags, {
434-
nativeRealtimeEnabled: env.REALTIME_BACKEND_NATIVE_ENABLED === "1",
435-
}) ||
443+
orgUsesV2 ||
436444
(!request.body.options?.resumeParentOnCompletion &&
437445
!request.body.options?.debounce &&
438446
!request.options?.oneTimeUseToken &&

internal-packages/run-store/src/PostgresRunStore.test.ts

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2048,6 +2048,127 @@ describe("PostgresRunStore — table routing by id format", () => {
20482048
}
20492049
);
20502050

2051+
postgresTest(
2052+
"findRun tables:'legacy' skips the task_run_v2 query (idempotency hot-path scope)",
2053+
async ({ prisma }) => {
2054+
const { organization, project, environment } = await seedEnvironment(prisma);
2055+
const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma });
2056+
2057+
// A v2 (ksuid) run carrying an idempotency key — it lives only in
2058+
// task_run_v2.
2059+
const ksuid = RunId.generateKsuid();
2060+
await seedRoutedRun(prisma, {
2061+
id: ksuid.id,
2062+
friendlyId: ksuid.friendlyId,
2063+
organizationId: organization.id,
2064+
projectId: project.id,
2065+
runtimeEnvironmentId: environment.id,
2066+
idempotencyKey: "idem-scope",
2067+
taskIdentifier: "my-task",
2068+
});
2069+
2070+
const where = {
2071+
runtimeEnvironmentId: environment.id,
2072+
idempotencyKey: "idem-scope",
2073+
taskIdentifier: "my-task",
2074+
};
2075+
2076+
// Default (both tables) and an explicit "both" find the v2 run.
2077+
expect((await store.findRun(where))?.id).toBe(ksuid.id);
2078+
expect(
2079+
(await store.findRun(where, { select: { id: true }, tables: "both" }))?.id
2080+
).toBe(ksuid.id);
2081+
2082+
// "legacy" scope skips task_run_v2 entirely, so the v2 run is NOT found.
2083+
// This is the hot-path optimisation for an org not cut over to v2: its
2084+
// runs only live in TaskRun, so the second (v2) query is always empty and
2085+
// can be skipped. (If a caller mis-scopes a genuinely-v2 org to legacy it
2086+
// would miss the run — hence it is gated on shouldUseV2RunTable upstream.)
2087+
expect(await store.findRun(where, { select: { id: true }, tables: "legacy" })).toBeNull();
2088+
}
2089+
);
2090+
2091+
postgresTest(
2092+
"clearIdempotencyKey fans out across both tables (byPredicate hits v2; byFriendlyIds partitions a mixed array)",
2093+
async ({ prisma }) => {
2094+
const { organization, project, environment } = await seedEnvironment(prisma);
2095+
const store = new PostgresRunStore({ prisma, readOnlyPrisma: prisma });
2096+
2097+
// byPredicate carries no id, so it must reach task_run_v2 to clear a v2 run.
2098+
const v2Pred = RunId.generateKsuid();
2099+
await seedRoutedRun(prisma, {
2100+
id: v2Pred.id,
2101+
friendlyId: v2Pred.friendlyId,
2102+
organizationId: organization.id,
2103+
projectId: project.id,
2104+
runtimeEnvironmentId: environment.id,
2105+
idempotencyKey: "kp-v2",
2106+
taskIdentifier: "my-task",
2107+
});
2108+
2109+
const predResult = await store.clearIdempotencyKey({
2110+
byPredicate: {
2111+
idempotencyKey: "kp-v2",
2112+
taskIdentifier: "my-task",
2113+
runtimeEnvironmentId: environment.id,
2114+
},
2115+
});
2116+
expect(predResult.count).toBe(1);
2117+
expect(
2118+
(
2119+
await prisma.taskRunV2.findFirst({
2120+
where: { id: v2Pred.id },
2121+
select: { idempotencyKey: true },
2122+
})
2123+
)?.idempotencyKey
2124+
).toBeNull();
2125+
2126+
// byFriendlyIds with a MIXED (ksuid + cuid) array must clear rows in BOTH
2127+
// physical tables — the partition + sum is the cross-table behaviour.
2128+
const v2F = RunId.generateKsuid();
2129+
const legacyF = RunId.generate();
2130+
await seedRoutedRun(prisma, {
2131+
id: v2F.id,
2132+
friendlyId: v2F.friendlyId,
2133+
organizationId: organization.id,
2134+
projectId: project.id,
2135+
runtimeEnvironmentId: environment.id,
2136+
idempotencyKey: "kf-v2",
2137+
taskIdentifier: "my-task",
2138+
});
2139+
await seedRoutedRun(prisma, {
2140+
id: legacyF.id,
2141+
friendlyId: legacyF.friendlyId,
2142+
organizationId: organization.id,
2143+
projectId: project.id,
2144+
runtimeEnvironmentId: environment.id,
2145+
idempotencyKey: "kf-legacy",
2146+
taskIdentifier: "my-task",
2147+
});
2148+
2149+
const friendlyResult = await store.clearIdempotencyKey({
2150+
byFriendlyIds: [v2F.friendlyId, legacyF.friendlyId],
2151+
});
2152+
expect(friendlyResult.count).toBe(2);
2153+
expect(
2154+
(
2155+
await prisma.taskRunV2.findFirst({
2156+
where: { id: v2F.id },
2157+
select: { idempotencyKey: true },
2158+
})
2159+
)?.idempotencyKey
2160+
).toBeNull();
2161+
expect(
2162+
(
2163+
await prisma.taskRun.findFirst({
2164+
where: { id: legacyF.id },
2165+
select: { idempotencyKey: true },
2166+
})
2167+
)?.idempotencyKey
2168+
).toBeNull();
2169+
}
2170+
);
2171+
20512172
postgresTest(
20522173
"expireRunsBatch with a mixed array updates both tables and returns the combined count",
20532174
async ({ prisma }) => {

internal-packages/run-store/src/PostgresRunStore.ts

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import type {
1313
CreateFailedRunInput,
1414
CreateRunInput,
1515
ExpireSnapshotInput,
16+
FindRunTableScope,
1617
LockRunData,
1718
ReadClient,
1819
RescheduleSnapshotInput,
@@ -114,8 +115,16 @@ export class PostgresRunStore implements RunStore {
114115
async #findFirstAcrossTables(
115116
prisma: ReadClient,
116117
where: Prisma.TaskRunWhereInput,
117-
args: { select?: Prisma.TaskRunSelect; include?: Prisma.TaskRunInclude }
118+
args: { select?: Prisma.TaskRunSelect; include?: Prisma.TaskRunInclude },
119+
tables: FindRunTableScope = "both"
118120
): Promise<unknown> {
121+
// Legacy-only scope: the caller knows the run can't be in task_run_v2 (e.g.
122+
// idempotency dedup for an org not cut over to v2), so skip the second,
123+
// empty v2 query and keep this a single-table read on the hot path.
124+
if (tables === "legacy") {
125+
return prisma.taskRun.findFirst({ where, ...args });
126+
}
127+
119128
const v2Model = prisma.taskRunV2 as unknown as typeof prisma.taskRun;
120129

121130
const [legacyRun, v2Run] = await Promise.all([
@@ -755,12 +764,12 @@ export class PostgresRunStore implements RunStore {
755764

756765
findRun<S extends Prisma.TaskRunSelect>(
757766
where: Prisma.TaskRunWhereInput,
758-
args: { select: S },
767+
args: { select: S; tables?: FindRunTableScope },
759768
client?: ReadClient
760769
): Promise<Prisma.TaskRunGetPayload<{ select: S }> | null>;
761770
findRun<I extends Prisma.TaskRunInclude>(
762771
where: Prisma.TaskRunWhereInput,
763-
args: { include: I },
772+
args: { include: I; tables?: FindRunTableScope },
764773
client?: ReadClient
765774
): Promise<Prisma.TaskRunGetPayload<{ include: I }> | null>;
766775
findRun(
@@ -769,10 +778,12 @@ export class PostgresRunStore implements RunStore {
769778
): Promise<TaskRun | null>;
770779
async findRun(
771780
where: Prisma.TaskRunWhereInput,
772-
argsOrClient?: { select?: Prisma.TaskRunSelect; include?: Prisma.TaskRunInclude } | ReadClient,
781+
argsOrClient?:
782+
| { select?: Prisma.TaskRunSelect; include?: Prisma.TaskRunInclude; tables?: FindRunTableScope }
783+
| ReadClient,
773784
client?: ReadClient
774785
): Promise<unknown> {
775-
const { args, prisma } = this.#resolveReadArgs(argsOrClient, client);
786+
const { args, prisma, tables } = this.#resolveReadArgs(argsOrClient, client);
776787

777788
const routingKey = this.#routingKeyOf(where);
778789
if (routingKey !== undefined) {
@@ -781,8 +792,8 @@ export class PostgresRunStore implements RunStore {
781792
}
782793

783794
// Non-id predicate (e.g. idempotency-key dedup): the match can be in
784-
// either table, so read both.
785-
return this.#findFirstAcrossTables(prisma, where, args);
795+
// either table, so read both (unless the caller scopes to legacy-only).
796+
return this.#findFirstAcrossTables(prisma, where, args, tables);
786797
}
787798

788799
findRunOrThrow<S extends Prisma.TaskRunSelect>(
@@ -1298,30 +1309,40 @@ export class PostgresRunStore implements RunStore {
12981309
*/
12991310
#resolveReadArgs(
13001311
argsOrClient:
1301-
| { select?: Prisma.TaskRunSelect; include?: Prisma.TaskRunInclude }
1312+
| { select?: Prisma.TaskRunSelect; include?: Prisma.TaskRunInclude; tables?: FindRunTableScope }
13021313
| ReadClient
13031314
| undefined,
13041315
client: ReadClient | undefined
13051316
): {
13061317
args: { select?: Prisma.TaskRunSelect; include?: Prisma.TaskRunInclude };
13071318
prisma: ReadClient;
1319+
tables: FindRunTableScope;
13081320
} {
13091321
const isProjection =
13101322
typeof argsOrClient === "object" &&
13111323
argsOrClient !== null &&
1312-
("select" in argsOrClient || "include" in argsOrClient);
1324+
("select" in argsOrClient || "include" in argsOrClient || "tables" in argsOrClient);
13131325

13141326
if (isProjection) {
1327+
// Split the table-scope hint out of the args that get spread into Prisma
1328+
// (which would reject an unknown `tables` field).
1329+
const { tables, ...prismaArgs } = argsOrClient as {
1330+
select?: Prisma.TaskRunSelect;
1331+
include?: Prisma.TaskRunInclude;
1332+
tables?: FindRunTableScope;
1333+
};
13151334
return {
1316-
args: argsOrClient as { select?: Prisma.TaskRunSelect; include?: Prisma.TaskRunInclude },
1335+
args: prismaArgs,
13171336
prisma: client ?? this.readOnlyPrisma,
1337+
tables: tables ?? "both",
13181338
};
13191339
}
13201340

13211341
// No projection: the second positional arg, when present, is the client.
13221342
return {
13231343
args: {},
13241344
prisma: (argsOrClient as ReadClient | undefined) ?? this.readOnlyPrisma,
1345+
tables: "both",
13251346
};
13261347
}
13271348
}

internal-packages/run-store/src/types.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,18 @@ export type ClearIdempotencyKeyInput =
232232

233233
export type TaskRunWithWaitpoint = TaskRun & { associatedWaitpoint: Waitpoint | null };
234234

235+
/**
236+
* Which physical run tables a non-id `findRun` predicate should read.
237+
*
238+
* Defaults to `"both"` (the safe cross-table behaviour). A caller that KNOWS
239+
* the run can only be in the legacy table — e.g. the idempotency-key dedup for
240+
* an org that is not cut over to `task_run_v2` — can pass `"legacy"` to skip the
241+
* second (empty) `task_run_v2` query and keep the trigger hot path single-table.
242+
* Only meaningful for non-id predicates; id/friendlyId reads already route to
243+
* exactly one table by id format.
244+
*/
245+
export type FindRunTableScope = "both" | "legacy";
246+
235247
export interface RunStore {
236248
// Create
237249
createRun(params: CreateRunInput, tx?: PrismaClientOrTransaction): Promise<TaskRunWithWaitpoint>;
@@ -332,12 +344,12 @@ export interface RunStore {
332344
// Read
333345
findRun<S extends Prisma.TaskRunSelect>(
334346
where: Prisma.TaskRunWhereInput,
335-
args: { select: S },
347+
args: { select: S; tables?: FindRunTableScope },
336348
client?: ReadClient
337349
): Promise<Prisma.TaskRunGetPayload<{ select: S }> | null>;
338350
findRun<I extends Prisma.TaskRunInclude>(
339351
where: Prisma.TaskRunWhereInput,
340-
args: { include: I },
352+
args: { include: I; tables?: FindRunTableScope },
341353
client?: ReadClient
342354
): Promise<Prisma.TaskRunGetPayload<{ include: I }> | null>;
343355
findRun(where: Prisma.TaskRunWhereInput, client?: ReadClient): Promise<TaskRun | null>;

0 commit comments

Comments
 (0)