Skip to content

Commit 9f71b2d

Browse files
committed
fix(batch-queue): Batch items that hit the environment queue size limit now fast-fail
1 parent bf736a7 commit 9f71b2d

File tree

8 files changed

+360
-19
lines changed

8 files changed

+360
-19
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Batch items that hit the environment queue size limit now fast-fail without
7+
retries and without creating pre-failed TaskRuns.

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import type {
4141
TriggerTaskRequest,
4242
TriggerTaskValidator,
4343
} from "../types";
44-
import { ServiceValidationError } from "~/v3/services/common.server";
44+
import { QueueSizeLimitExceededError, ServiceValidationError } from "~/v3/services/common.server";
4545

4646
class NoopTriggerRacepointSystem implements TriggerRacepointSystem {
4747
async waitForRacepoint(options: { racepoint: TriggerRacepoints; id: string }): Promise<void> {
@@ -271,8 +271,9 @@ export class RunEngineTriggerTaskService {
271271
);
272272

273273
if (!queueSizeGuard.ok) {
274-
throw new ServiceValidationError(
274+
throw new QueueSizeLimitExceededError(
275275
`Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`,
276+
queueSizeGuard.maximumSize ?? 0,
276277
undefined,
277278
"warn"
278279
);
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import { generateJWT as internal_generateJWT } from "@trigger.dev/core/v3";
2+
import { extractJwtSigningSecretKey } from "./jwtAuth.server";
3+
4+
type Environment = Parameters<typeof extractJwtSigningSecretKey>[0];
5+
6+
export type MintRunTokenOptions = {
7+
/** Include the input-stream write scope (needed for steering messages from the playground). */
8+
includeInputStreamWrite?: boolean;
9+
/** Token expiration. Defaults to "1h". */
10+
expirationTime?: string;
11+
};
12+
13+
/**
14+
* Mint a run-scoped public access token (JWT) for browser subscription to a
15+
* run's realtime streams.
16+
*
17+
* Used by:
18+
* - The playground action to give a freshly triggered chat session a token.
19+
* - The run details page to let the agent view subscribe to the chat stream
20+
* of an existing run (read-only).
21+
*/
22+
export async function mintRunToken(
23+
environment: Environment,
24+
runFriendlyId: string,
25+
options: MintRunTokenOptions = {}
26+
): Promise<string> {
27+
const scopes = [`read:runs:${runFriendlyId}`];
28+
if (options.includeInputStreamWrite) {
29+
scopes.push(`write:inputStreams:${runFriendlyId}`);
30+
}
31+
32+
return internal_generateJWT({
33+
secretKey: extractJwtSigningSecretKey(environment),
34+
payload: {
35+
sub: environment.id,
36+
pub: true,
37+
scopes,
38+
},
39+
expirationTime: options.expirationTime ?? "1h",
40+
});
41+
}

apps/webapp/app/v3/runEngineHandlers.server.ts

Lines changed: 85 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { logger } from "~/services/logger.server";
1313
import { updateMetadataService } from "~/services/metadata/updateMetadataInstance.server";
1414
import { reportInvocationUsage } from "~/services/platform.v3.server";
1515
import { MetadataTooLargeError } from "~/utils/packets";
16+
import { QueueSizeLimitExceededError } from "~/v3/services/common.server";
1617
import { TriggerTaskService } from "~/v3/services/triggerTask.server";
1718
import { tracer } from "~/v3/tracer.server";
1819
import { createExceptionPropertiesFromError } from "./eventRepository/common.server";
@@ -637,6 +638,15 @@ export function registerRunEngineEventBusHandlers() {
637638
});
638639
}
639640

641+
/**
642+
* errorCode returned by the batch process-item callback when the trigger was
643+
* rejected because the environment's queue is at its maximum size. The
644+
* BatchQueue (via `skipRetries`) short-circuits retries for this code, and the
645+
* batch completion callback collapses per-item errors into a single aggregate
646+
* `BatchTaskRunError` row instead of writing one per item.
647+
*/
648+
const QUEUE_SIZE_LIMIT_EXCEEDED_ERROR_CODE = "QUEUE_SIZE_LIMIT_EXCEEDED";
649+
640650
/**
641651
* Set up the BatchQueue processing callbacks.
642652
* These handle creating runs from batch items and completing batches.
@@ -808,6 +818,37 @@ export function setupBatchQueueCallbacks() {
808818
} catch (error) {
809819
const errorMessage = error instanceof Error ? error.message : String(error);
810820

821+
// Queue-size-limit rejections are a customer-overload scenario (the
822+
// env's queue is at its configured max). Retrying is pointless — the
823+
// same item will fail again — and creating pre-failed TaskRuns for
824+
// every item of every retried batch is exactly what chews through
825+
// DB capacity when a noisy tenant fills their queue. Signal the
826+
// BatchQueue to skip retries and skip pre-failed run creation, and
827+
// let the completion callback collapse the per-item errors into a
828+
// single summary row.
829+
if (error instanceof QueueSizeLimitExceededError) {
830+
logger.warn("[BatchQueue] Batch item rejected: queue size limit reached", {
831+
batchId,
832+
friendlyId,
833+
itemIndex,
834+
task: item.task,
835+
environmentId: meta.environmentId,
836+
maximumSize: error.maximumSize,
837+
});
838+
839+
span.setAttribute("batch.result.error", errorMessage);
840+
span.setAttribute("batch.result.errorCode", QUEUE_SIZE_LIMIT_EXCEEDED_ERROR_CODE);
841+
span.setAttribute("batch.result.skipRetries", true);
842+
span.end();
843+
844+
return {
845+
success: false as const,
846+
error: errorMessage,
847+
errorCode: QUEUE_SIZE_LIMIT_EXCEEDED_ERROR_CODE,
848+
skipRetries: true,
849+
};
850+
}
851+
811852
logger.error("[BatchQueue] Failed to trigger batch item", {
812853
batchId,
813854
friendlyId,
@@ -889,20 +930,51 @@ export function setupBatchQueueCallbacks() {
889930
},
890931
});
891932

892-
// Create error records if there were failures
933+
// Create error records if there were failures.
934+
//
935+
// Fast-path for queue-size-limit overload: when every failure is the
936+
// same QUEUE_SIZE_LIMIT_EXCEEDED error, collapse them into a single
937+
// aggregate row instead of writing one per item. This keeps the DB
938+
// write volume bounded to O(batches) instead of O(items) when a noisy
939+
// tenant fills their queue and all of their batches start bouncing.
893940
if (failures.length > 0) {
894-
await tx.batchTaskRunError.createMany({
895-
data: failures.map((failure) => ({
896-
batchTaskRunId: batchId,
897-
index: failure.index,
898-
taskIdentifier: failure.taskIdentifier,
899-
payload: failure.payload,
900-
options: failure.options as Prisma.InputJsonValue | undefined,
901-
error: failure.error,
902-
errorCode: failure.errorCode,
903-
})),
904-
skipDuplicates: true,
905-
});
941+
const allQueueSizeLimit = failures.every(
942+
(f) => f.errorCode === QUEUE_SIZE_LIMIT_EXCEEDED_ERROR_CODE
943+
);
944+
945+
if (allQueueSizeLimit) {
946+
const sample = failures[0]!;
947+
await tx.batchTaskRunError.createMany({
948+
data: [
949+
{
950+
batchTaskRunId: batchId,
951+
// Use the first item's index as a stable anchor for the
952+
// (batchTaskRunId, index) unique constraint so callback
953+
// retries remain idempotent.
954+
index: sample.index,
955+
taskIdentifier: sample.taskIdentifier,
956+
payload: sample.payload,
957+
options: sample.options as Prisma.InputJsonValue | undefined,
958+
error: `${sample.error} (${failures.length} items in this batch failed with the same error)`,
959+
errorCode: sample.errorCode,
960+
},
961+
],
962+
skipDuplicates: true,
963+
});
964+
} else {
965+
await tx.batchTaskRunError.createMany({
966+
data: failures.map((failure) => ({
967+
batchTaskRunId: batchId,
968+
index: failure.index,
969+
taskIdentifier: failure.taskIdentifier,
970+
payload: failure.payload,
971+
options: failure.options as Prisma.InputJsonValue | undefined,
972+
error: failure.error,
973+
errorCode: failure.errorCode,
974+
})),
975+
skipDuplicates: true,
976+
});
977+
}
906978
}
907979
});
908980

apps/webapp/app/v3/services/common.server.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,22 @@ export class ServiceValidationError extends Error {
1010
this.name = "ServiceValidationError";
1111
}
1212
}
13+
14+
/**
15+
* Thrown when a trigger is rejected because the environment's queue is at its
16+
* maximum size. This is identified separately from other validation errors so
17+
* the batch queue worker can short-circuit retries and skip pre-failed run
18+
* creation for this specific overload scenario — see the batch process item
19+
* callback in `runEngineHandlers.server.ts`.
20+
*/
21+
export class QueueSizeLimitExceededError extends ServiceValidationError {
22+
constructor(
23+
message: string,
24+
public maximumSize: number,
25+
status?: number,
26+
logLevel?: ServiceValidationErrorLevel
27+
) {
28+
super(message, status, logLevel);
29+
this.name = "QueueSizeLimitExceededError";
30+
}
31+
}

internal-packages/run-engine/src/batch-queue/index.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -865,8 +865,16 @@ export class BatchQueue {
865865
span?.setAttribute("batch.errorCode", result.errorCode);
866866
}
867867

868-
// If retries are available, use FairQueue retry scheduling
869-
if (!isFinalAttempt) {
868+
const skipRetries = result.skipRetries === true;
869+
if (skipRetries) {
870+
span?.setAttribute("batch.skipRetries", true);
871+
}
872+
873+
// If retries are available AND the callback didn't opt out, use
874+
// FairQueue retry scheduling. `skipRetries` short-circuits this
875+
// regardless of attempt number so the batch can finalize quickly
876+
// when the error is known to be non-recoverable on retry.
877+
if (!isFinalAttempt && !skipRetries) {
870878
span?.setAttribute("batch.retry", true);
871879
span?.setAttribute("batch.attempt", attempt);
872880

@@ -890,7 +898,7 @@ export class BatchQueue {
890898
return;
891899
}
892900

893-
// Final attempt exhausted - record permanent failure
901+
// Final attempt exhausted (or retries skipped) - record permanent failure
894902
const payloadStr = await this.#startSpan(
895903
"BatchQueue.serializePayload",
896904
async (innerSpan) => {

0 commit comments

Comments
 (0)