Skip to content

Commit f32dc72

Browse files
committed
fix(batch-queue): Batch items that hit the environment queue size limit now fast-fail
1 parent bf736a7 commit f32dc72

File tree

7 files changed

+319
-19
lines changed

7 files changed

+319
-19
lines changed
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Batch items that hit the environment queue size limit now fast-fail without
7+
retries and without creating pre-failed TaskRuns.

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import type {
4141
TriggerTaskRequest,
4242
TriggerTaskValidator,
4343
} from "../types";
44-
import { ServiceValidationError } from "~/v3/services/common.server";
44+
import { QueueSizeLimitExceededError, ServiceValidationError } from "~/v3/services/common.server";
4545

4646
class NoopTriggerRacepointSystem implements TriggerRacepointSystem {
4747
async waitForRacepoint(options: { racepoint: TriggerRacepoints; id: string }): Promise<void> {
@@ -271,8 +271,9 @@ export class RunEngineTriggerTaskService {
271271
);
272272

273273
if (!queueSizeGuard.ok) {
274-
throw new ServiceValidationError(
274+
throw new QueueSizeLimitExceededError(
275275
`Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`,
276+
queueSizeGuard.maximumSize ?? 0,
276277
undefined,
277278
"warn"
278279
);

apps/webapp/app/v3/runEngineHandlers.server.ts

Lines changed: 85 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { logger } from "~/services/logger.server";
1313
import { updateMetadataService } from "~/services/metadata/updateMetadataInstance.server";
1414
import { reportInvocationUsage } from "~/services/platform.v3.server";
1515
import { MetadataTooLargeError } from "~/utils/packets";
16+
import { QueueSizeLimitExceededError } from "~/v3/services/common.server";
1617
import { TriggerTaskService } from "~/v3/services/triggerTask.server";
1718
import { tracer } from "~/v3/tracer.server";
1819
import { createExceptionPropertiesFromError } from "./eventRepository/common.server";
@@ -637,6 +638,15 @@ export function registerRunEngineEventBusHandlers() {
637638
});
638639
}
639640

641+
/**
642+
* errorCode returned by the batch process-item callback when the trigger was
643+
* rejected because the environment's queue is at its maximum size. The
644+
* BatchQueue (via `skipRetries`) short-circuits retries for this code, and the
645+
* batch completion callback collapses per-item errors into a single aggregate
646+
* `BatchTaskRunError` row instead of writing one per item.
647+
*/
648+
const QUEUE_SIZE_LIMIT_EXCEEDED_ERROR_CODE = "QUEUE_SIZE_LIMIT_EXCEEDED";
649+
640650
/**
641651
* Set up the BatchQueue processing callbacks.
642652
* These handle creating runs from batch items and completing batches.
@@ -808,6 +818,37 @@ export function setupBatchQueueCallbacks() {
808818
} catch (error) {
809819
const errorMessage = error instanceof Error ? error.message : String(error);
810820

821+
// Queue-size-limit rejections are a customer-overload scenario (the
822+
// env's queue is at its configured max). Retrying is pointless — the
823+
// same item will fail again — and creating pre-failed TaskRuns for
824+
// every item of every retried batch is exactly what chews through
825+
// DB capacity when a noisy tenant fills their queue. Signal the
826+
// BatchQueue to skip retries and skip pre-failed run creation, and
827+
// let the completion callback collapse the per-item errors into a
828+
// single summary row.
829+
if (error instanceof QueueSizeLimitExceededError) {
830+
logger.warn("[BatchQueue] Batch item rejected: queue size limit reached", {
831+
batchId,
832+
friendlyId,
833+
itemIndex,
834+
task: item.task,
835+
environmentId: meta.environmentId,
836+
maximumSize: error.maximumSize,
837+
});
838+
839+
span.setAttribute("batch.result.error", errorMessage);
840+
span.setAttribute("batch.result.errorCode", QUEUE_SIZE_LIMIT_EXCEEDED_ERROR_CODE);
841+
span.setAttribute("batch.result.skipRetries", true);
842+
span.end();
843+
844+
return {
845+
success: false as const,
846+
error: errorMessage,
847+
errorCode: QUEUE_SIZE_LIMIT_EXCEEDED_ERROR_CODE,
848+
skipRetries: true,
849+
};
850+
}
851+
811852
logger.error("[BatchQueue] Failed to trigger batch item", {
812853
batchId,
813854
friendlyId,
@@ -889,20 +930,51 @@ export function setupBatchQueueCallbacks() {
889930
},
890931
});
891932

892-
// Create error records if there were failures
933+
// Create error records if there were failures.
934+
//
935+
// Fast-path for queue-size-limit overload: when every failure is the
936+
// same QUEUE_SIZE_LIMIT_EXCEEDED error, collapse them into a single
937+
// aggregate row instead of writing one per item. This keeps the DB
938+
// write volume bounded to O(batches) instead of O(items) when a noisy
939+
// tenant fills their queue and all of their batches start bouncing.
893940
if (failures.length > 0) {
894-
await tx.batchTaskRunError.createMany({
895-
data: failures.map((failure) => ({
896-
batchTaskRunId: batchId,
897-
index: failure.index,
898-
taskIdentifier: failure.taskIdentifier,
899-
payload: failure.payload,
900-
options: failure.options as Prisma.InputJsonValue | undefined,
901-
error: failure.error,
902-
errorCode: failure.errorCode,
903-
})),
904-
skipDuplicates: true,
905-
});
941+
const allQueueSizeLimit = failures.every(
942+
(f) => f.errorCode === QUEUE_SIZE_LIMIT_EXCEEDED_ERROR_CODE
943+
);
944+
945+
if (allQueueSizeLimit) {
946+
const sample = failures[0]!;
947+
await tx.batchTaskRunError.createMany({
948+
data: [
949+
{
950+
batchTaskRunId: batchId,
951+
// Use the first item's index as a stable anchor for the
952+
// (batchTaskRunId, index) unique constraint so callback
953+
// retries remain idempotent.
954+
index: sample.index,
955+
taskIdentifier: sample.taskIdentifier,
956+
payload: sample.payload,
957+
options: sample.options as Prisma.InputJsonValue | undefined,
958+
error: `${sample.error} (${failures.length} items in this batch failed with the same error)`,
959+
errorCode: sample.errorCode,
960+
},
961+
],
962+
skipDuplicates: true,
963+
});
964+
} else {
965+
await tx.batchTaskRunError.createMany({
966+
data: failures.map((failure) => ({
967+
batchTaskRunId: batchId,
968+
index: failure.index,
969+
taskIdentifier: failure.taskIdentifier,
970+
payload: failure.payload,
971+
options: failure.options as Prisma.InputJsonValue | undefined,
972+
error: failure.error,
973+
errorCode: failure.errorCode,
974+
})),
975+
skipDuplicates: true,
976+
});
977+
}
906978
}
907979
});
908980

apps/webapp/app/v3/services/common.server.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,22 @@ export class ServiceValidationError extends Error {
1010
this.name = "ServiceValidationError";
1111
}
1212
}
13+
14+
/**
15+
* Thrown when a trigger is rejected because the environment's queue is at its
16+
* maximum size. This is identified separately from other validation errors so
17+
* the batch queue worker can short-circuit retries and skip pre-failed run
18+
* creation for this specific overload scenario — see the batch process item
19+
* callback in `runEngineHandlers.server.ts`.
20+
*/
21+
export class QueueSizeLimitExceededError extends ServiceValidationError {
22+
constructor(
23+
message: string,
24+
public maximumSize: number,
25+
status?: number,
26+
logLevel?: ServiceValidationErrorLevel
27+
) {
28+
super(message, status, logLevel);
29+
this.name = "QueueSizeLimitExceededError";
30+
}
31+
}

internal-packages/run-engine/src/batch-queue/index.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -865,8 +865,16 @@ export class BatchQueue {
865865
span?.setAttribute("batch.errorCode", result.errorCode);
866866
}
867867

868-
// If retries are available, use FairQueue retry scheduling
869-
if (!isFinalAttempt) {
868+
const skipRetries = result.skipRetries === true;
869+
if (skipRetries) {
870+
span?.setAttribute("batch.skipRetries", true);
871+
}
872+
873+
// If retries are available AND the callback didn't opt out, use
874+
// FairQueue retry scheduling. `skipRetries` short-circuits this
875+
// regardless of attempt number so the batch can finalize quickly
876+
// when the error is known to be non-recoverable on retry.
877+
if (!isFinalAttempt && !skipRetries) {
870878
span?.setAttribute("batch.retry", true);
871879
span?.setAttribute("batch.attempt", attempt);
872880

@@ -890,7 +898,7 @@ export class BatchQueue {
890898
return;
891899
}
892900

893-
// Final attempt exhausted - record permanent failure
901+
// Final attempt exhausted (or retries skipped) - record permanent failure
894902
const payloadStr = await this.#startSpan(
895903
"BatchQueue.serializePayload",
896904
async (innerSpan) => {

internal-packages/run-engine/src/batch-queue/tests/index.test.ts

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -776,4 +776,184 @@ describe("BatchQueue", () => {
776776
}
777777
);
778778
});
779+
780+
describe("skipRetries on failed items", () => {
781+
function createBatchQueueWithRetry(
782+
redisContainer: { getHost: () => string; getPort: () => number },
783+
maxAttempts: number
784+
) {
785+
return new BatchQueue({
786+
redis: {
787+
host: redisContainer.getHost(),
788+
port: redisContainer.getPort(),
789+
keyPrefix: "test:",
790+
},
791+
drr: { quantum: 5, maxDeficit: 50 },
792+
consumerCount: 1,
793+
consumerIntervalMs: 50,
794+
startConsumers: true,
795+
retry: {
796+
maxAttempts,
797+
// Keep the ladder tiny so a regression (items retrying N times)
798+
// still finishes inside the waitFor timeout and the test surfaces
799+
// the problem as a failed attempt-count assertion rather than a
800+
// timeout.
801+
minTimeoutInMs: 20,
802+
maxTimeoutInMs: 100,
803+
factor: 2,
804+
randomize: false,
805+
},
806+
});
807+
}
808+
809+
redisTest(
810+
"should not retry when callback returns skipRetries: true",
811+
async ({ redisContainer }) => {
812+
const queue = createBatchQueueWithRetry(redisContainer, 6);
813+
const itemAttempts = new Map<number, number>();
814+
let completionResult: CompleteBatchResult | null = null;
815+
816+
try {
817+
queue.onProcessItem(async ({ itemIndex }) => {
818+
itemAttempts.set(itemIndex, (itemAttempts.get(itemIndex) ?? 0) + 1);
819+
return {
820+
success: false as const,
821+
error: "Queue at maximum size",
822+
errorCode: "QUEUE_SIZE_LIMIT_EXCEEDED",
823+
skipRetries: true,
824+
};
825+
});
826+
827+
queue.onBatchComplete(async (result) => {
828+
completionResult = result;
829+
});
830+
831+
await queue.initializeBatch(createInitOptions("batch1", "env1", 3));
832+
await enqueueItems(queue, "batch1", "env1", createBatchItems(3));
833+
834+
await vi.waitFor(
835+
() => {
836+
expect(completionResult).not.toBeNull();
837+
},
838+
{ timeout: 5000 }
839+
);
840+
841+
// Every item should have been called exactly once — skipRetries
842+
// must bypass the 6-attempt retry ladder on the very first attempt.
843+
expect(itemAttempts.get(0)).toBe(1);
844+
expect(itemAttempts.get(1)).toBe(1);
845+
expect(itemAttempts.get(2)).toBe(1);
846+
847+
expect(completionResult!.successfulRunCount).toBe(0);
848+
expect(completionResult!.failedRunCount).toBe(3);
849+
expect(completionResult!.failures).toHaveLength(3);
850+
for (const failure of completionResult!.failures) {
851+
expect(failure.errorCode).toBe("QUEUE_SIZE_LIMIT_EXCEEDED");
852+
}
853+
} finally {
854+
await queue.close();
855+
}
856+
}
857+
);
858+
859+
redisTest(
860+
"should still retry up to maxAttempts when skipRetries is not set (regression guard)",
861+
async ({ redisContainer }) => {
862+
const maxAttempts = 3;
863+
const queue = createBatchQueueWithRetry(redisContainer, maxAttempts);
864+
const itemAttempts = new Map<number, number>();
865+
let completionResult: CompleteBatchResult | null = null;
866+
867+
try {
868+
queue.onProcessItem(async ({ itemIndex }) => {
869+
itemAttempts.set(itemIndex, (itemAttempts.get(itemIndex) ?? 0) + 1);
870+
return {
871+
success: false as const,
872+
error: "Transient error",
873+
errorCode: "TRIGGER_ERROR",
874+
// Intentionally NOT setting skipRetries — the existing
875+
// exponential-backoff retry path should still be honored.
876+
};
877+
});
878+
879+
queue.onBatchComplete(async (result) => {
880+
completionResult = result;
881+
});
882+
883+
await queue.initializeBatch(createInitOptions("batch1", "env1", 2));
884+
await enqueueItems(queue, "batch1", "env1", createBatchItems(2));
885+
886+
await vi.waitFor(
887+
() => {
888+
expect(completionResult).not.toBeNull();
889+
},
890+
{ timeout: 5000 }
891+
);
892+
893+
expect(itemAttempts.get(0)).toBe(maxAttempts);
894+
expect(itemAttempts.get(1)).toBe(maxAttempts);
895+
896+
expect(completionResult!.failedRunCount).toBe(2);
897+
} finally {
898+
await queue.close();
899+
}
900+
}
901+
);
902+
903+
redisTest(
904+
"should honor skipRetries on a per-item basis within the same batch",
905+
async ({ redisContainer }) => {
906+
const maxAttempts = 4;
907+
const queue = createBatchQueueWithRetry(redisContainer, maxAttempts);
908+
const itemAttempts = new Map<number, number>();
909+
let completionResult: CompleteBatchResult | null = null;
910+
911+
try {
912+
queue.onProcessItem(async ({ itemIndex }) => {
913+
itemAttempts.set(itemIndex, (itemAttempts.get(itemIndex) ?? 0) + 1);
914+
// Even items fast-fail (queue-size-limit style),
915+
// odd items retry the full ladder.
916+
if (itemIndex % 2 === 0) {
917+
return {
918+
success: false as const,
919+
error: "Queue at maximum size",
920+
errorCode: "QUEUE_SIZE_LIMIT_EXCEEDED",
921+
skipRetries: true,
922+
};
923+
}
924+
return {
925+
success: false as const,
926+
error: "Transient error",
927+
errorCode: "TRIGGER_ERROR",
928+
};
929+
});
930+
931+
queue.onBatchComplete(async (result) => {
932+
completionResult = result;
933+
});
934+
935+
await queue.initializeBatch(createInitOptions("batch1", "env1", 4));
936+
await enqueueItems(queue, "batch1", "env1", createBatchItems(4));
937+
938+
await vi.waitFor(
939+
() => {
940+
expect(completionResult).not.toBeNull();
941+
},
942+
{ timeout: 5000 }
943+
);
944+
945+
// Even-indexed items should fast-fail (1 attempt each)
946+
expect(itemAttempts.get(0)).toBe(1);
947+
expect(itemAttempts.get(2)).toBe(1);
948+
// Odd-indexed items should exhaust the retry ladder
949+
expect(itemAttempts.get(1)).toBe(maxAttempts);
950+
expect(itemAttempts.get(3)).toBe(maxAttempts);
951+
952+
expect(completionResult!.failedRunCount).toBe(4);
953+
} finally {
954+
await queue.close();
955+
}
956+
}
957+
);
958+
});
779959
});

0 commit comments

Comments
 (0)