Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .server-changes/batch-r2-upload-retry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
area: webapp
type: fix
---

Fix transient R2/object store upload failures during batchTrigger() item streaming.

- Added p-retry (3 attempts, 500ms–2s exponential backoff) around `uploadPacketToObjectStore` in `BatchPayloadProcessor.process()` so transient network errors self-heal server-side rather than aborting the entire batch stream.
- Removed `x-should-retry: false` from the 500 response on the batch items route so the SDK's existing 5xx retry path can recover if server-side retries are exhausted. Item deduplication by index makes full-stream retries safe.
5 changes: 1 addition & 4 deletions apps/webapp/app/routes/api.v3.batches.$batchId.items.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,7 @@ export async function action({ request, params }: ActionFunctionArgs) {
return json({ error: error.message }, { status: 400 });
}

return json(
{ error: error.message },
{ status: 500, headers: { "x-should-retry": "false" } }
);
return json({ error: error.message }, { status: 500 });
}

return json({ error: "Something went wrong" }, { status: 500 });
Expand Down
45 changes: 30 additions & 15 deletions apps/webapp/app/runEngine/concerns/batchPayloads.server.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { type IOPacket, packetRequiresOffloading, tryCatch } from "@trigger.dev/core/v3";
import pRetry from "p-retry";
import { env } from "~/env.server";
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
Expand Down Expand Up @@ -103,32 +104,46 @@ export class BatchPayloadProcessor {
};
}

// Upload to object store
// Upload to object store, retrying on transient network errors
const { data: packetData, dataType: packetDataType } = packet;
const filename = `batch_${batchId}/item_${itemIndex}/payload.json`;

const [uploadError, uploadedFilename] = await tryCatch(
uploadPacketToObjectStore(
filename,
packet.data,
packet.dataType,
environment,
env.OBJECT_STORE_DEFAULT_PROTOCOL
pRetry(
() =>
uploadPacketToObjectStore(
filename,
packetData,
packetDataType,
environment,
env.OBJECT_STORE_DEFAULT_PROTOCOL
),
{
retries: 3,
minTimeout: 500,
maxTimeout: 2000,
factor: 2,
onFailedAttempt: (error) => {
logger.warn("Batch item payload upload to object store failed, retrying", {
batchId,
itemIndex,
attempt: error.attemptNumber,
retriesLeft: error.retriesLeft,
error: error.message,
});
},
}
)
);

if (uploadError) {
logger.error("Failed to upload batch item payload to object store", {
logger.error("Failed to upload batch item payload to object store after retries", {
batchId,
itemIndex,
error: uploadError instanceof Error ? uploadError.message : String(uploadError),
error: uploadError.message,
});

// Throw to fail this item - SDK can retry
throw new Error(
`Failed to upload large payload to object store: ${
uploadError instanceof Error ? uploadError.message : String(uploadError)
}`
);
throw new Error(`Failed to upload large payload to object store: ${uploadError.message}`);
}

logger.debug("Batch item payload offloaded to object store", {
Expand Down
115 changes: 115 additions & 0 deletions apps/webapp/test/engine/batchPayloads.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";

// --- Module mocks (must come before imports) ---

vi.mock("~/v3/objectStore.server", () => ({
hasObjectStoreClient: vi.fn().mockReturnValue(true),
uploadPacketToObjectStore: vi.fn(),
}));

// Threshold of 10 bytes so any non-trivial payload triggers offloading
vi.mock("~/env.server", () => ({
env: {
BATCH_PAYLOAD_OFFLOAD_THRESHOLD: 10,
TASK_PAYLOAD_OFFLOAD_THRESHOLD: 10,
OBJECT_STORE_DEFAULT_PROTOCOL: undefined,
},
}));

// Execute the span callback synchronously without real OTel
vi.mock("~/v3/tracer.server", () => ({
startActiveSpan: vi.fn(async (_name: string, fn: (span: any) => any) =>
fn({ setAttribute: vi.fn() })
),
}));

import { BatchPayloadProcessor } from "../../app/runEngine/concerns/batchPayloads.server";
import * as objectStore from "~/v3/objectStore.server";

vi.setConfig({ testTimeout: 30_000 });

// Minimal AuthenticatedEnvironment shape required by BatchPayloadProcessor
const mockEnvironment = {
id: "env-test",
slug: "production",
project: { externalRef: "proj-ext-ref" },
} as any;

describe("BatchPayloadProcessor", () => {
let mockUpload: ReturnType<typeof vi.mocked<typeof objectStore.uploadPacketToObjectStore>>;

beforeEach(() => {
mockUpload = vi.mocked(objectStore.uploadPacketToObjectStore);
mockUpload.mockReset();
});

it("offloads a large payload successfully on first attempt", async () => {
mockUpload.mockResolvedValueOnce("batch_abc/item_0/payload.json");

const processor = new BatchPayloadProcessor();
const result = await processor.process(
'{"message":"hello world"}',
"application/json",
"batch-internal-abc",
0,
mockEnvironment
);

expect(result.wasOffloaded).toBe(true);
expect(result.payloadType).toBe("application/store");
expect(result.payload).toBe("batch_abc/item_0/payload.json");
expect(mockUpload).toHaveBeenCalledTimes(1);
});

it("retries on transient fetch failure and succeeds on third attempt", async () => {
mockUpload
.mockRejectedValueOnce(new Error("fetch failed"))
.mockRejectedValueOnce(new Error("fetch failed"))
.mockResolvedValueOnce("batch_abc/item_0/payload.json");

const processor = new BatchPayloadProcessor();
const result = await processor.process(
'{"message":"hello world"}',
"application/json",
"batch-internal-abc",
0,
mockEnvironment
);

expect(result.wasOffloaded).toBe(true);
expect(mockUpload).toHaveBeenCalledTimes(3);
});

it("throws after exhausting all retry attempts", async () => {
mockUpload.mockRejectedValue(new Error("fetch failed"));

const processor = new BatchPayloadProcessor();

await expect(
processor.process(
'{"message":"hello world"}',
"application/json",
"batch-internal-abc",
0,
mockEnvironment
)
).rejects.toThrow("Failed to upload large payload to object store: fetch failed");

// 1 initial attempt + 3 retries = 4 total calls
expect(mockUpload).toHaveBeenCalledTimes(4);
});

it("does not offload when there is no payload data", async () => {
const processor = new BatchPayloadProcessor();
const result = await processor.process(
undefined,
"application/json",
"batch-internal-abc",
0,
mockEnvironment
);

expect(result.wasOffloaded).toBe(false);
expect(mockUpload).not.toHaveBeenCalled();
});
});
Loading