From 8046caa1ba3b312a505a9cadc72cfda59bce3cfa Mon Sep 17 00:00:00 2001 From: Matt Aitken Date: Tue, 7 Apr 2026 14:16:25 +0100 Subject: [PATCH] fix(batch): retry R2 upload on transient failure in BatchPayloadProcessor A single "fetch failed" from the object store was aborting the entire batch stream with no retry. Added p-retry (3 attempts, 500ms-2s backoff) around uploadPacketToObjectStore so transient network errors self-heal server-side instead of propagating to the SDK. --- .server-changes/batch-r2-upload-retry.md | 9 ++ .../routes/api.v3.batches.$batchId.items.ts | 5 +- .../concerns/batchPayloads.server.ts | 45 ++++--- apps/webapp/test/engine/batchPayloads.test.ts | 115 ++++++++++++++++++ 4 files changed, 155 insertions(+), 19 deletions(-) create mode 100644 .server-changes/batch-r2-upload-retry.md create mode 100644 apps/webapp/test/engine/batchPayloads.test.ts diff --git a/.server-changes/batch-r2-upload-retry.md b/.server-changes/batch-r2-upload-retry.md new file mode 100644 index 00000000000..a2c6415635b --- /dev/null +++ b/.server-changes/batch-r2-upload-retry.md @@ -0,0 +1,9 @@ +--- +area: webapp +type: fix +--- + +Fix transient R2/object store upload failures during batchTrigger() item streaming. + +- Added p-retry (3 attempts, 500ms–2s exponential backoff) around `uploadPacketToObjectStore` in `BatchPayloadProcessor.process()` so transient network errors self-heal server-side rather than aborting the entire batch stream. +- Removed `x-should-retry: false` from the 500 response on the batch items route so the SDK's existing 5xx retry path can recover if server-side retries are exhausted. Item deduplication by index makes full-stream retries safe. diff --git a/apps/webapp/app/routes/api.v3.batches.$batchId.items.ts b/apps/webapp/app/routes/api.v3.batches.$batchId.items.ts index b3ed1c22422..2d732d1555a 100644 --- a/apps/webapp/app/routes/api.v3.batches.$batchId.items.ts +++ b/apps/webapp/app/routes/api.v3.batches.$batchId.items.ts @@ -104,10 +104,7 @@ export async function action({ request, params }: ActionFunctionArgs) { return json({ error: error.message }, { status: 400 }); } - return json( - { error: error.message }, - { status: 500, headers: { "x-should-retry": "false" } } - ); + return json({ error: error.message }, { status: 500 }); } return json({ error: "Something went wrong" }, { status: 500 }); diff --git a/apps/webapp/app/runEngine/concerns/batchPayloads.server.ts b/apps/webapp/app/runEngine/concerns/batchPayloads.server.ts index eeb33fa4b41..ab464f03fa2 100644 --- a/apps/webapp/app/runEngine/concerns/batchPayloads.server.ts +++ b/apps/webapp/app/runEngine/concerns/batchPayloads.server.ts @@ -1,4 +1,5 @@ import { type IOPacket, packetRequiresOffloading, tryCatch } from "@trigger.dev/core/v3"; +import pRetry from "p-retry"; import { env } from "~/env.server"; import type { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; @@ -103,32 +104,46 @@ export class BatchPayloadProcessor { }; } - // Upload to object store + // Upload to object store, retrying on transient network errors + const { data: packetData, dataType: packetDataType } = packet; const filename = `batch_${batchId}/item_${itemIndex}/payload.json`; const [uploadError, uploadedFilename] = await tryCatch( - uploadPacketToObjectStore( - filename, - packet.data, - packet.dataType, - environment, - env.OBJECT_STORE_DEFAULT_PROTOCOL + pRetry( + () => + uploadPacketToObjectStore( + filename, + packetData, + packetDataType, + environment, + env.OBJECT_STORE_DEFAULT_PROTOCOL + ), + { + retries: 3, + minTimeout: 500, + maxTimeout: 2000, + factor: 2, + onFailedAttempt: (error) => { + logger.warn("Batch item payload upload to object store failed, retrying", { + batchId, + itemIndex, + attempt: error.attemptNumber, + retriesLeft: error.retriesLeft, + error: error.message, + }); + }, + } ) ); if (uploadError) { - logger.error("Failed to upload batch item payload to object store", { + logger.error("Failed to upload batch item payload to object store after retries", { batchId, itemIndex, - error: uploadError instanceof Error ? uploadError.message : String(uploadError), + error: uploadError.message, }); - // Throw to fail this item - SDK can retry - throw new Error( - `Failed to upload large payload to object store: ${ - uploadError instanceof Error ? uploadError.message : String(uploadError) - }` - ); + throw new Error(`Failed to upload large payload to object store: ${uploadError.message}`); } logger.debug("Batch item payload offloaded to object store", { diff --git a/apps/webapp/test/engine/batchPayloads.test.ts b/apps/webapp/test/engine/batchPayloads.test.ts new file mode 100644 index 00000000000..69ef6224e74 --- /dev/null +++ b/apps/webapp/test/engine/batchPayloads.test.ts @@ -0,0 +1,115 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +// --- Module mocks (must come before imports) --- + +vi.mock("~/v3/objectStore.server", () => ({ + hasObjectStoreClient: vi.fn().mockReturnValue(true), + uploadPacketToObjectStore: vi.fn(), +})); + +// Threshold of 10 bytes so any non-trivial payload triggers offloading +vi.mock("~/env.server", () => ({ + env: { + BATCH_PAYLOAD_OFFLOAD_THRESHOLD: 10, + TASK_PAYLOAD_OFFLOAD_THRESHOLD: 10, + OBJECT_STORE_DEFAULT_PROTOCOL: undefined, + }, +})); + +// Execute the span callback synchronously without real OTel +vi.mock("~/v3/tracer.server", () => ({ + startActiveSpan: vi.fn(async (_name: string, fn: (span: any) => any) => + fn({ setAttribute: vi.fn() }) + ), +})); + +import { BatchPayloadProcessor } from "../../app/runEngine/concerns/batchPayloads.server"; +import * as objectStore from "~/v3/objectStore.server"; + +vi.setConfig({ testTimeout: 30_000 }); + +// Minimal AuthenticatedEnvironment shape required by BatchPayloadProcessor +const mockEnvironment = { + id: "env-test", + slug: "production", + project: { externalRef: "proj-ext-ref" }, +} as any; + +describe("BatchPayloadProcessor", () => { + let mockUpload: ReturnType>; + + beforeEach(() => { + mockUpload = vi.mocked(objectStore.uploadPacketToObjectStore); + mockUpload.mockReset(); + }); + + it("offloads a large payload successfully on first attempt", async () => { + mockUpload.mockResolvedValueOnce("batch_abc/item_0/payload.json"); + + const processor = new BatchPayloadProcessor(); + const result = await processor.process( + '{"message":"hello world"}', + "application/json", + "batch-internal-abc", + 0, + mockEnvironment + ); + + expect(result.wasOffloaded).toBe(true); + expect(result.payloadType).toBe("application/store"); + expect(result.payload).toBe("batch_abc/item_0/payload.json"); + expect(mockUpload).toHaveBeenCalledTimes(1); + }); + + it("retries on transient fetch failure and succeeds on third attempt", async () => { + mockUpload + .mockRejectedValueOnce(new Error("fetch failed")) + .mockRejectedValueOnce(new Error("fetch failed")) + .mockResolvedValueOnce("batch_abc/item_0/payload.json"); + + const processor = new BatchPayloadProcessor(); + const result = await processor.process( + '{"message":"hello world"}', + "application/json", + "batch-internal-abc", + 0, + mockEnvironment + ); + + expect(result.wasOffloaded).toBe(true); + expect(mockUpload).toHaveBeenCalledTimes(3); + }); + + it("throws after exhausting all retry attempts", async () => { + mockUpload.mockRejectedValue(new Error("fetch failed")); + + const processor = new BatchPayloadProcessor(); + + await expect( + processor.process( + '{"message":"hello world"}', + "application/json", + "batch-internal-abc", + 0, + mockEnvironment + ) + ).rejects.toThrow("Failed to upload large payload to object store: fetch failed"); + + // 1 initial attempt + 3 retries = 4 total calls + expect(mockUpload).toHaveBeenCalledTimes(4); + }); + + it("does not offload when there is no payload data", async () => { + const processor = new BatchPayloadProcessor(); + const result = await processor.process( + undefined, + "application/json", + "batch-internal-abc", + 0, + mockEnvironment + ); + + expect(result.wasOffloaded).toBe(false); + expect(mockUpload).not.toHaveBeenCalled(); + }); +});