From 22b51cd10db4cf37291a77e6e10356bdc039dc64 Mon Sep 17 00:00:00 2001 From: Edmund Hung Date: Fri, 13 Mar 2026 08:05:24 +0000 Subject: [PATCH] Revert "[Workflows] Implement Workflows instance methods" (#12872) --- .changeset/workflows-instance-methods.md | 17 - .../workflows-vitest-pool-waitforstatus.md | 5 - .../workflows/test/integration.test.ts | 95 +-- fixtures/workflow-multiple/src/index.ts | 53 +- .../workflow-multiple/tests/index.test.ts | 153 +---- fixtures/workflow-multiple/wrangler.jsonc | 5 - fixtures/workflow/src/index.ts | 12 - fixtures/workflow/tests/index.test.ts | 132 ---- .../workflows/wrapped-binding.worker.ts | 27 +- .../test/plugins/workflows/index.spec.ts | 219 +----- .../src/worker/workflows.ts | 9 + packages/workflows-shared/README.md | 6 +- packages/workflows-shared/src/binding.ts | 94 +-- packages/workflows-shared/src/context.ts | 73 +- packages/workflows-shared/src/engine.ts | 321 +-------- packages/workflows-shared/src/instance.ts | 13 +- packages/workflows-shared/src/lib/errors.ts | 52 -- .../src/lib/gracePeriodSemaphore.ts | 87 +-- .../src/lib/timePriorityQueue.ts | 24 - .../workflows-shared/src/lib/validators.ts | 41 -- packages/workflows-shared/src/modifier.ts | 60 +- .../workflows-shared/tests/binding.test.ts | 624 +---------------- .../workflows-shared/tests/context.test.ts | 78 +-- .../workflows-shared/tests/engine.test.ts | 643 +----------------- packages/workflows-shared/tests/env.d.ts | 2 +- packages/workflows-shared/tests/test-entry.ts | 51 -- packages/workflows-shared/tests/tsconfig.json | 11 +- packages/workflows-shared/tests/utils.ts | 5 +- .../workflows-shared/tests/validators.test.ts | 32 - packages/workflows-shared/tsconfig.json | 2 +- packages/workflows-shared/vitest.config.ts | 13 +- 31 files changed, 181 insertions(+), 2778 deletions(-) delete mode 100644 .changeset/workflows-instance-methods.md delete mode 100644 .changeset/workflows-vitest-pool-waitforstatus.md delete mode 100644 packages/workflows-shared/tests/test-entry.ts diff --git a/.changeset/workflows-instance-methods.md b/.changeset/workflows-instance-methods.md deleted file mode 100644 index 5caac400c135..000000000000 --- a/.changeset/workflows-instance-methods.md +++ /dev/null @@ -1,17 +0,0 @@ ---- -"@cloudflare/workflows-shared": minor -"miniflare": minor ---- - -Workflow instances now support pause, resume, restart, and terminate in local dev. - -```js -const instance = await env.MY_WORKFLOW.create({ - id: "my-instance", -}); - -await instance.pause(); // pauses after the current step completes -await instance.resume(); // resumes from where it left off -await instance.restart(); // restarts the workflow from the beginning -await instance.terminate(); // terminates the workflow immediately -``` diff --git a/.changeset/workflows-vitest-pool-waitforstatus.md b/.changeset/workflows-vitest-pool-waitforstatus.md deleted file mode 100644 index 94b6398018a5..000000000000 --- a/.changeset/workflows-vitest-pool-waitforstatus.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -"@cloudflare/vitest-pool-workers": patch ---- - -Workflows testing util `waitForStatus` now supports waiting for "terminated" and "paused" states. diff --git a/fixtures/vitest-pool-workers-examples/workflows/test/integration.test.ts b/fixtures/vitest-pool-workers-examples/workflows/test/integration.test.ts index 9e7bf7ec4de2..33e3bdcf9b1a 100644 --- a/fixtures/vitest-pool-workers-examples/workflows/test/integration.test.ts +++ b/fixtures/vitest-pool-workers-examples/workflows/test/integration.test.ts @@ -1,5 +1,5 @@ import { env, introspectWorkflow, SELF } from "cloudflare:test"; -import { describe, it } from "vitest"; +import { it } from "vitest"; const STATUS_COMPLETE = "complete"; const STEP_NAME = "AI content scan"; @@ -70,96 +70,3 @@ it("workflow batch should be able to reach the end and be successful", async ({ await introspector.dispose(); } }); - -describe("workflow instance lifecycle methods", () => { - it("should terminate a workflow instance", async ({ expect }) => { - // CONFIG: - await using introspector = await introspectWorkflow(env.MODERATOR); - await introspector.modifyAll(async (m) => { - await m.disableSleeps(); - await m.mockStepResult({ name: STEP_NAME }, { violationScore: 50 }); - }); - - const res = await SELF.fetch("https://mock-worker.local/moderate"); - const data = (await res.json()) as { id: string; details: unknown }; - - const instances = introspector.get(); - expect(instances.length).toBe(1); - const instance = instances[0]; - - expect(await instance.waitForStepResult({ name: STEP_NAME })).toEqual({ - violationScore: 50, - }); - - const handle = await env.MODERATOR.get(data.id); - await handle.terminate(); - - // ASSERTIONS: - await expect(instance.waitForStatus("terminated")).resolves.not.toThrow(); - - // DISPOSE: ensured by `await using` - }); - - it("should restart a workflow instance", async ({ expect }) => { - // CONFIG: - await using introspector = await introspectWorkflow(env.MODERATOR); - await introspector.modifyAll(async (m) => { - await m.disableSleeps(); - await m.mockStepResult({ name: STEP_NAME }, { violationScore: 50 }); - await m.mockEvent({ - type: "moderation-decision", - payload: { moderatorAction: "approve" }, - }); - }); - - const res = await SELF.fetch("https://mock-worker.local/moderate"); - const data = (await res.json()) as { id: string; details: unknown }; - - const instances = introspector.get(); - expect(instances.length).toBe(1); - const instance = instances[0]; - - expect(await instance.waitForStepResult({ name: STEP_NAME })).toEqual({ - violationScore: 50, - }); - - const handle = await env.MODERATOR.get(data.id); - await handle.restart(); - - // Mocks survive instace restart, so the restarted workflow re-runs - // with the same config - await expect( - instance.waitForStatus(STATUS_COMPLETE) - ).resolves.not.toThrow(); - - // DISPOSE: ensured by `await using` - }); - - it("should pause a workflow instance", async ({ expect }) => { - // CONFIG: - await using introspector = await introspectWorkflow(env.MODERATOR); - await introspector.modifyAll(async (m) => { - await m.disableSleeps(); - await m.mockStepResult({ name: STEP_NAME }, { violationScore: 50 }); - }); - - const res = await SELF.fetch("https://mock-worker.local/moderate"); - const data = (await res.json()) as { id: string; details: unknown }; - - const instances = introspector.get(); - expect(instances.length).toBe(1); - const instance = instances[0]; - - expect(await instance.waitForStepResult({ name: STEP_NAME })).toEqual({ - violationScore: 50, - }); - - const handle = await env.MODERATOR.get(data.id); - await handle.pause(); - - // ASSERTIONS: - await expect(instance.waitForStatus("paused")).resolves.not.toThrow(); - - // DISPOSE: ensured by `await using` - }); -}); diff --git a/fixtures/workflow-multiple/src/index.ts b/fixtures/workflow-multiple/src/index.ts index 083ed1214f3f..adaf369e8e93 100644 --- a/fixtures/workflow-multiple/src/index.ts +++ b/fixtures/workflow-multiple/src/index.ts @@ -57,32 +57,9 @@ export class Demo2 extends WorkflowEntrypoint<{}, Params> { } } -export class Demo3 extends WorkflowEntrypoint<{}, Params> { - async run(event: WorkflowEvent, step: WorkflowStep) { - const result = await step.do("First step", async function () { - return { - output: "First step result", - }; - }); - - await step.waitForEvent("wait for signal", { - type: "continue", - }); - - const result2 = await step.do("Second step", async function () { - return { - output: "workflow3", - }; - }); - - return "i'm workflow3"; - } -} - type Env = { WORKFLOW: Workflow; WORKFLOW2: Workflow; - WORKFLOW3: Workflow; }; export default class extends WorkerEntrypoint { @@ -94,15 +71,8 @@ export default class extends WorkerEntrypoint { if (url.pathname === "/favicon.ico") { return new Response(null, { status: 404 }); } - - let workflowToUse: Workflow; - if (workflowName === "3") { - workflowToUse = this.env.WORKFLOW3; - } else if (workflowName === "2") { - workflowToUse = this.env.WORKFLOW2; - } else { - workflowToUse = this.env.WORKFLOW; - } + let workflowToUse = + workflowName == "2" ? this.env.WORKFLOW2 : this.env.WORKFLOW; let handle: WorkflowInstance; if (url.pathname === "/create") { @@ -111,25 +81,6 @@ export default class extends WorkerEntrypoint { } else { handle = await workflowToUse.create({ id }); } - } else if (url.pathname === "/pause") { - handle = await workflowToUse.get(id); - await handle.pause(); - } else if (url.pathname === "/resume") { - handle = await workflowToUse.get(id); - await handle.resume(); - } else if (url.pathname === "/restart") { - handle = await workflowToUse.get(id); - await handle.restart(); - } else if (url.pathname === "/terminate") { - handle = await workflowToUse.get(id); - await handle.terminate(); - } else if (url.pathname === "/sendEvent") { - handle = await workflowToUse.get(id); - await handle.sendEvent({ - type: "continue", - payload: await req.json(), - }); - return Response.json({ ok: true }); } else { handle = await workflowToUse.get(id); } diff --git a/fixtures/workflow-multiple/tests/index.test.ts b/fixtures/workflow-multiple/tests/index.test.ts index 410d17dd6f57..4adc65897f83 100644 --- a/fixtures/workflow-multiple/tests/index.test.ts +++ b/fixtures/workflow-multiple/tests/index.test.ts @@ -1,8 +1,7 @@ -import { randomUUID } from "crypto"; import { rm } from "fs/promises"; import { resolve } from "path"; import { fetch } from "undici"; -import { afterAll, beforeAll, describe, it, test, vi } from "vitest"; +import { afterAll, beforeAll, describe, it, vi } from "vitest"; import { runWranglerDev } from "../../shared/src/run-wrangler-long-lived"; describe("Workflows", () => { @@ -27,13 +26,11 @@ describe("Workflows", () => { await stop?.(); }); - async function fetchJson(url: string, body?: unknown, method?: string) { + async function fetchJson(url: string) { const response = await fetch(url, { headers: { "MF-Disable-Pretty-Error": "1", }, - method: method ?? "GET", - body: body !== undefined ? JSON.stringify(body) : undefined, }); const text = await response.text(); @@ -95,150 +92,4 @@ describe("Workflows", () => { ), ]); }); - - describe("instance lifecycle methods (workflow3)", () => { - test("pause and resume a workflow", async ({ expect }) => { - const id = randomUUID(); - - await fetchJson(`http://${ip}:${port}/create?workflowName=3&id=${id}`); - - await vi.waitFor( - async () => { - const result = (await fetchJson( - `http://${ip}:${port}/status?workflowName=3&id=${id}` - )) as { - status: { - __LOCAL_DEV_STEP_OUTPUTS: { output: string }[]; - }; - }; - expect(result.status.__LOCAL_DEV_STEP_OUTPUTS).toContainEqual({ - output: "First step result", - }); - }, - { timeout: 5000 } - ); - - // Pause the instance - await fetchJson(`http://${ip}:${port}/pause?workflowName=3&id=${id}`); - - await vi.waitFor( - async () => { - const result = (await fetchJson( - `http://${ip}:${port}/status?workflowName=3&id=${id}` - )) as { status: { status: string } }; - expect(result.status.status).toBe("paused"); - }, - { timeout: 5000 } - ); - - // Resume the instance - await fetchJson(`http://${ip}:${port}/resume?workflowName=3&id=${id}`); - - await fetchJson( - `http://${ip}:${port}/sendEvent?workflowName=3&id=${id}`, - { done: true }, - "POST" - ); - - await vi.waitFor( - async () => { - const result = (await fetchJson( - `http://${ip}:${port}/status?workflowName=3&id=${id}` - )) as { status: { status: string; output: string } }; - expect(result.status.status).toBe("complete"); - expect(result.status.output).toBe("i'm workflow3"); - }, - { timeout: 5000 } - ); - }); - - test("terminate a running workflow", async ({ expect }) => { - const id = randomUUID(); - - await fetchJson(`http://${ip}:${port}/create?workflowName=3&id=${id}`); - - await vi.waitFor( - async () => { - const result = (await fetchJson( - `http://${ip}:${port}/status?workflowName=3&id=${id}` - )) as { - status: { - __LOCAL_DEV_STEP_OUTPUTS: { output: string }[]; - }; - }; - expect(result.status.__LOCAL_DEV_STEP_OUTPUTS).toContainEqual({ - output: "First step result", - }); - }, - { timeout: 5000 } - ); - - // Terminate - await fetchJson(`http://${ip}:${port}/terminate?workflowName=3&id=${id}`); - - await vi.waitFor( - async () => { - const result = (await fetchJson( - `http://${ip}:${port}/status?workflowName=3&id=${id}` - )) as { status: { status: string } }; - expect(result.status.status).toBe("terminated"); - }, - { timeout: 5000 } - ); - }); - - test("restart a running workflow", async ({ expect }) => { - const id = randomUUID(); - - await fetchJson(`http://${ip}:${port}/create?workflowName=3&id=${id}`); - - await vi.waitFor( - async () => { - const result = (await fetchJson( - `http://${ip}:${port}/status?workflowName=3&id=${id}` - )) as { - status: { - __LOCAL_DEV_STEP_OUTPUTS: { output: string }[]; - }; - }; - expect(result.status.__LOCAL_DEV_STEP_OUTPUTS).toContainEqual({ - output: "First step result", - }); - }, - { timeout: 5000 } - ); - - // Restart the instance - await fetchJson(`http://${ip}:${port}/restart?workflowName=3&id=${id}`); - - // After restart, wait for it to be running again - await vi.waitFor( - async () => { - const result = (await fetchJson( - `http://${ip}:${port}/status?workflowName=3&id=${id}` - )) as { status: { status: string } }; - expect(result.status.status).toBe("running"); - }, - { timeout: 5000 } - ); - - // Send event to complete the restarted workflow - await fetchJson( - `http://${ip}:${port}/sendEvent?workflowName=3&id=${id}`, - { done: true }, - "POST" - ); - - await vi.waitFor( - async () => { - const result = (await fetchJson( - `http://${ip}:${port}/status?workflowName=3&id=${id}` - )) as { status: { status: string; output: string } }; - expect(result.status.status).toBe("complete"); - expect(result.status.output).toBe("i'm workflow3"); - }, - { timeout: 5000 } - ); - }); - }); }); diff --git a/fixtures/workflow-multiple/wrangler.jsonc b/fixtures/workflow-multiple/wrangler.jsonc index 08061b67ebfa..30d930f6d5af 100644 --- a/fixtures/workflow-multiple/wrangler.jsonc +++ b/fixtures/workflow-multiple/wrangler.jsonc @@ -13,10 +13,5 @@ "name": "my-workflow-2", "class_name": "Demo2", }, - { - "binding": "WORKFLOW3", - "name": "my-workflow-3", - "class_name": "Demo3", - }, ], } diff --git a/fixtures/workflow/src/index.ts b/fixtures/workflow/src/index.ts index ccc64359290c..ebf5b0565cd8 100644 --- a/fixtures/workflow/src/index.ts +++ b/fixtures/workflow/src/index.ts @@ -142,18 +142,6 @@ export default class extends WorkerEntrypoint { type: "event-1", payload: await req.json(), }); - } else if (url.pathname === "/pause") { - handle = await this.env.WORKFLOW2.get(id); - await handle.pause(); - } else if (url.pathname === "/resume") { - handle = await this.env.WORKFLOW2.get(id); - await handle.resume(); - } else if (url.pathname === "/restart") { - handle = await this.env.WORKFLOW2.get(id); - await handle.restart(); - } else if (url.pathname === "/terminate") { - handle = await this.env.WORKFLOW2.get(id); - await handle.terminate(); } else if (url.pathname === "/get2") { handle = await this.env.WORKFLOW2.get(id); } else if (url.pathname === "/get3") { diff --git a/fixtures/workflow/tests/index.test.ts b/fixtures/workflow/tests/index.test.ts index 477c6796940f..c3d9f4633a72 100644 --- a/fixtures/workflow/tests/index.test.ts +++ b/fixtures/workflow/tests/index.test.ts @@ -265,138 +265,6 @@ describe("Workflows", () => { ); }); - describe("instance lifecycle methods", () => { - test("pause and resume a workflow", async ({ expect }) => { - const name = randomUUID(); - - await fetchJson(`http://${ip}:${port}/createDemo2?workflowName=${name}`); - - await vi.waitFor( - async () => { - const result = await fetchJson( - `http://${ip}:${port}/get2?workflowName=${name}` - ); - expect(result.__LOCAL_DEV_STEP_OUTPUTS).toContainEqual({ - output: "First step result", - }); - }, - { timeout: 1500 } - ); - - // Pause the instance - await fetchJson(`http://${ip}:${port}/pause?workflowName=${name}`); - - await vi.waitFor( - async () => { - const result = await fetchJson( - `http://${ip}:${port}/get2?workflowName=${name}` - ); - expect(result.status).toBe("paused"); - }, - { timeout: 1500 } - ); - - // Resume the instance - await fetchJson(`http://${ip}:${port}/resume?workflowName=${name}`); - - await fetchJson( - `http://${ip}:${port}/sendEvent?workflowName=${name}`, - { event: true }, - "POST" - ); - - await vi.waitFor( - async () => { - const result = await fetchJson( - `http://${ip}:${port}/get2?workflowName=${name}` - ); - expect(result.status).toBe("complete"); - }, - { timeout: 5000 } - ); - }); - - test("terminate a running workflow", async ({ expect }) => { - const name = randomUUID(); - - await fetchJson(`http://${ip}:${port}/createDemo2?workflowName=${name}`); - - await vi.waitFor( - async () => { - const result = await fetchJson( - `http://${ip}:${port}/get2?workflowName=${name}` - ); - expect(result.__LOCAL_DEV_STEP_OUTPUTS).toContainEqual({ - output: "First step result", - }); - }, - { timeout: 1500 } - ); - - // Terminate the instance - await fetchJson(`http://${ip}:${port}/terminate?workflowName=${name}`); - - await vi.waitFor( - async () => { - const result = await fetchJson( - `http://${ip}:${port}/get2?workflowName=${name}` - ); - expect(result.status).toBe("terminated"); - }, - { timeout: 1500 } - ); - }); - - test("restart a running workflow", async ({ expect }) => { - const name = randomUUID(); - - await fetchJson(`http://${ip}:${port}/createDemo2?workflowName=${name}`); - - await vi.waitFor( - async () => { - const result = await fetchJson( - `http://${ip}:${port}/get2?workflowName=${name}` - ); - expect(result.__LOCAL_DEV_STEP_OUTPUTS).toContainEqual({ - output: "First step result", - }); - }, - { timeout: 5000 } - ); - - // Restart the instance - await fetchJson(`http://${ip}:${port}/restart?workflowName=${name}`); - - // After restart, wait for it to be running again - await vi.waitFor( - async () => { - const result = await fetchJson( - `http://${ip}:${port}/get2?workflowName=${name}` - ); - expect(result.status).toBe("running"); - }, - { timeout: 1500 } - ); - - // Send event to complete the restarted workflow - await fetchJson( - `http://${ip}:${port}/sendEvent?workflowName=${name}`, - { event: true }, - "POST" - ); - - await vi.waitFor( - async () => { - const result = await fetchJson( - `http://${ip}:${port}/get2?workflowName=${name}` - ); - expect(result.status).toBe("complete"); - }, - { timeout: 5000 } - ); - }); - }); - it("should create an instance after immediate redirect", async ({ expect, }) => { diff --git a/packages/miniflare/src/workers/workflows/wrapped-binding.worker.ts b/packages/miniflare/src/workers/workflows/wrapped-binding.worker.ts index 21cd991f5092..9a4614aa7677 100644 --- a/packages/miniflare/src/workers/workflows/wrapped-binding.worker.ts +++ b/packages/miniflare/src/workers/workflows/wrapped-binding.worker.ts @@ -69,33 +69,30 @@ class InstanceImpl implements WorkflowInstance { private binding: WorkflowBinding ) {} - private async getInstance(): Promise { - return (await this.binding.get(this.id)) as WorkflowInstance & Disposable; - } - public async pause(): Promise { - using instance = await this.getInstance(); - await instance.pause(); + // Look for instance in namespace + // Get engine stub + // Call a few functions on stub + throw new Error("Not implemented yet"); } public async resume(): Promise { - using instance = await this.getInstance(); - await instance.resume(); + throw new Error("Not implemented yet"); } public async terminate(): Promise { - using instance = await this.getInstance(); - await instance.terminate(); + throw new Error("Not implemented yet"); } public async restart(): Promise { - using instance = await this.getInstance(); - await instance.restart(); + throw new Error("Not implemented yet"); } public async status(): Promise { - using instance = await this.getInstance(); + const instance = (await this.binding.get(this.id)) as WorkflowInstance & + Disposable; using res = (await instance.status()) as InstanceStatus & Disposable; + instance[Symbol.dispose](); return structuredClone(res); } @@ -103,8 +100,10 @@ class InstanceImpl implements WorkflowInstance { payload: unknown; type: string; }): Promise { - using instance = await this.getInstance(); + const instance = (await this.binding.get(this.id)) as WorkflowInstance & + Disposable; await instance.sendEvent(args); + instance[Symbol.dispose](); } } diff --git a/packages/miniflare/test/plugins/workflows/index.spec.ts b/packages/miniflare/test/plugins/workflows/index.spec.ts index c0be12a95db5..8f6acb15229a 100644 --- a/packages/miniflare/test/plugins/workflows/index.spec.ts +++ b/packages/miniflare/test/plugins/workflows/index.spec.ts @@ -1,7 +1,7 @@ import * as fs from "node:fs/promises"; import { scheduler } from "node:timers/promises"; import { Miniflare, MiniflareOptions } from "miniflare"; -import { describe, test } from "vitest"; +import { test } from "vitest"; import { useDispose, useTmp } from "../../test-shared"; const WORKFLOW_SCRIPT = () => ` @@ -81,220 +81,3 @@ test("persists Workflow data on file-system between runs", async ({ '{"status":"complete","__LOCAL_DEV_STEP_OUTPUTS":["yes you are"],"output":"I\'m a output string"}' ); }); - -const LIFECYCLE_WORKFLOW_SCRIPT = () => ` -import { WorkflowEntrypoint } from "cloudflare:workers"; -export class LifecycleWorkflow extends WorkflowEntrypoint { - async run(event, step) { - const first = await step.do("first step", async () => "step-1-done"); - - await step.do("long step", async () => { - await scheduler.wait(500); - return "long-step-done"; - }); - - const second = await step.do("third step", async () => "step-3-done"); - - return "workflow-complete"; - } -} -export default { - async fetch(request, env, ctx) { - const url = new URL(request.url); - const id = url.searchParams.get("id") || "lifecycle-test"; - - if (url.pathname === "/create") { - const instance = await env.LIFECYCLE_WORKFLOW.create({ id }); - const status = await instance.status(); - return Response.json({ id: instance.id, status }); - } - - if (url.pathname === "/status") { - const instance = await env.LIFECYCLE_WORKFLOW.get(id); - return Response.json(await instance.status()); - } - - if (url.pathname === "/pause") { - const instance = await env.LIFECYCLE_WORKFLOW.get(id); - await instance.pause(); - return Response.json(await instance.status()); - } - - if (url.pathname === "/resume") { - const instance = await env.LIFECYCLE_WORKFLOW.get(id); - await instance.resume(); - return Response.json(await instance.status()); - } - - if (url.pathname === "/restart") { - const instance = await env.LIFECYCLE_WORKFLOW.get(id); - await instance.restart(); - return Response.json(await instance.status()); - } - - if (url.pathname === "/terminate") { - const instance = await env.LIFECYCLE_WORKFLOW.get(id); - await instance.terminate(); - return Response.json(await instance.status()); - } - - if (url.pathname === "/sendEvent") { - const instance = await env.LIFECYCLE_WORKFLOW.get(id); - await instance.sendEvent({ type: "continue", payload: { sent: true } }); - return Response.json({ ok: true }); - } - - return new Response("Not found", { status: 404 }); - }, -};`; - -function lifecycleMiniflareOpts(tmp: string): MiniflareOptions { - return { - name: "lifecycle-worker", - compatibilityDate: "2026-03-09", - modules: true, - script: LIFECYCLE_WORKFLOW_SCRIPT(), - workflows: { - LIFECYCLE_WORKFLOW: { - className: "LifecycleWorkflow", - name: "LIFECYCLE_WORKFLOW", - }, - }, - workflowsPersist: tmp, - }; -} - -async function waitForStatus( - mf: Miniflare, - id: string, - expectedStatus: string, - timeoutMs = 5000 -): Promise> { - const begin = performance.now(); - let lastResult: Record = {}; - while (performance.now() - begin < timeoutMs) { - const res = await mf.dispatchFetch(`http://localhost/status?id=${id}`); - lastResult = (await res.json()) as Record; - if (lastResult.status === expectedStatus) { - return lastResult; - } - await scheduler.wait(100); - } - throw new Error( - `Timed out waiting for status "${expectedStatus}" after ${timeoutMs}ms. Last status: ${JSON.stringify(lastResult)}` - ); -} - -async function waitForStepOutput( - mf: Miniflare, - id: string, - expectedOutput: string, - timeoutMs = 5000 -): Promise { - const begin = performance.now(); - while (performance.now() - begin < timeoutMs) { - const res = await mf.dispatchFetch(`http://localhost/status?id=${id}`); - const data = (await res.json()) as { - __LOCAL_DEV_STEP_OUTPUTS?: string[]; - }; - if ( - data.__LOCAL_DEV_STEP_OUTPUTS && - data.__LOCAL_DEV_STEP_OUTPUTS.includes(expectedOutput) - ) { - return; - } - await scheduler.wait(100); - } - throw new Error( - `Timed out waiting for step output "${expectedOutput}" after ${timeoutMs}ms` - ); -} - -describe("workflow instance lifecycle methods", () => { - test("pause and resume a running workflow", async ({ expect }) => { - const tmp = await useTmp(); - const mf = new Miniflare(lifecycleMiniflareOpts(tmp)); - useDispose(mf); - - const createRes = await mf.dispatchFetch( - "http://localhost/create?id=pause-resume-test" - ); - const createData = (await createRes.json()) as Record; - expect(createData.id).toBe("pause-resume-test"); - - await waitForStepOutput(mf, "pause-resume-test", "step-1-done"); - - // Pause the instance — waits for the in-flight long step to finish, then pauses - const pauseRes = await mf.dispatchFetch( - "http://localhost/pause?id=pause-resume-test" - ); - const pauseData = (await pauseRes.json()) as Record; - expect(pauseData).toHaveProperty("status"); - - await waitForStatus(mf, "pause-resume-test", "paused"); - - // Resume the instance - const resumeRes = await mf.dispatchFetch( - "http://localhost/resume?id=pause-resume-test" - ); - const resumeData = (await resumeRes.json()) as Record; - expect(resumeData).toHaveProperty("status"); - - // After resume, the workflow should complete (third step runs, then returns) - const finalStatus = await waitForStatus( - mf, - "pause-resume-test", - "complete" - ); - expect(finalStatus.output).toBe("workflow-complete"); - }); - - test("terminate a running workflow", async ({ expect }) => { - const tmp = await useTmp(); - const mf = new Miniflare(lifecycleMiniflareOpts(tmp)); - useDispose(mf); - - const createRes = await mf.dispatchFetch( - "http://localhost/create?id=terminate-test" - ); - await createRes.text(); // consume the body - - await waitForStepOutput(mf, "terminate-test", "step-1-done"); - - // Terminate the instance - const terminateRes = await mf.dispatchFetch( - "http://localhost/terminate?id=terminate-test" - ); - const terminateData = (await terminateRes.json()) as Record< - string, - unknown - >; - expect(terminateData).toHaveProperty("status"); - - await waitForStatus(mf, "terminate-test", "terminated"); - }); - - test("restart a running workflow", async ({ expect }) => { - const tmp = await useTmp(); - const mf = new Miniflare(lifecycleMiniflareOpts(tmp)); - useDispose(mf); - - const createRes = await mf.dispatchFetch( - "http://localhost/create?id=restart-test" - ); - await createRes.text(); // consume the body - - await waitForStepOutput(mf, "restart-test", "step-1-done"); - - // Restart the instance - const restartRes = await mf.dispatchFetch( - "http://localhost/restart?id=restart-test" - ); - const restartData = (await restartRes.json()) as Record; - expect(restartData).toHaveProperty("status"); - - // After restart, the workflow restarts from scratch and runs to completion - const finalStatus = await waitForStatus(mf, "restart-test", "complete"); - expect(finalStatus.output).toBe("workflow-complete"); - }); -}); diff --git a/packages/vitest-pool-workers/src/worker/workflows.ts b/packages/vitest-pool-workers/src/worker/workflows.ts index 686b09c38323..220f7e25de64 100644 --- a/packages/vitest-pool-workers/src/worker/workflows.ts +++ b/packages/vitest-pool-workers/src/worker/workflows.ts @@ -86,6 +86,15 @@ class WorkflowInstanceIntrospectorHandle } async waitForStatus(status: InstanceStatus["status"]): Promise { + if ( + status === instanceStatusName(InstanceStatusNumber.Terminated) || + status === instanceStatusName(InstanceStatusNumber.Paused) + ) { + throw new Error( + `[WorkflowIntrospector] InstanceStatus '${status}' is not implemented yet and cannot be waited.` + ); + } + if (status === instanceStatusName(InstanceStatusNumber.Queued)) { // we currently don't have a queue mechanism, but it would happen before it // starts running, so waiting for it to be queued should always return diff --git a/packages/workflows-shared/README.md b/packages/workflows-shared/README.md index 4b91e805b0da..a70d1b59b613 100644 --- a/packages/workflows-shared/README.md +++ b/packages/workflows-shared/README.md @@ -1,3 +1,7 @@ # `@cloudflare/workflows-shared` -This package powers the local development experience for [Cloudflare Workflows](https://developers.cloudflare.com/workflows/) in workers-sdk and Wrangler. +This is a package that is used at Cloudflare to power some internal features of [Cloudflare Workflows](https://developers.cloudflare.com/workflows/), as well as their open-source equivalents here in workers-sdk and Wrangler. + +> [!NOTE] +> Since code in this package is used by the Workflows infrastructure, it is important that PRs are given careful review with regards to how they could cause a failure in production. +> Ideally, there should be comprehensive tests for changes being made to give extra confidence about the behavior. diff --git a/packages/workflows-shared/src/binding.ts b/packages/workflows-shared/src/binding.ts index 40fac32a715e..f02a672f9d8c 100644 --- a/packages/workflows-shared/src/binding.ts +++ b/packages/workflows-shared/src/binding.ts @@ -1,12 +1,6 @@ import { RpcTarget, WorkerEntrypoint } from "cloudflare:workers"; import { InstanceEvent, instanceStatusName } from "./instance"; -import { - isAbortError, - isUserTriggeredPause, - isUserTriggeredRestart, - isUserTriggeredTerminate, - WorkflowError, -} from "./lib/errors"; +import { WorkflowError } from "./lib/errors"; import { isValidWorkflowInstanceId } from "./lib/validators"; import type { DatabaseInstance, @@ -15,7 +9,6 @@ import type { Engine, EngineLogs, } from "./engine"; -import type { InstanceStatus as EngineInstanceStatus } from "./instance"; type Env = { ENGINE: DurableObjectNamespace; @@ -57,12 +50,6 @@ export class WorkflowBinding extends WorkerEntrypoint { if (val !== undefined) { val[Symbol.dispose](); } - }) - .catch((e) => { - // Suppress abort errors since they're expected - if (!isAbortError(e)) { - throw e; - } }); this.ctx.waitUntil(initPromise); @@ -75,11 +62,7 @@ export class WorkflowBinding extends WorkerEntrypoint { public async get(id: string): Promise { const stubId = this.env.ENGINE.idFromName(id); const stub = this.env.ENGINE.get(stubId); - - // Pass a getter function so WorkflowHandle can get a fresh stub after abort - const getStub = () => this.env.ENGINE.get(this.env.ENGINE.idFromName(id)); - - const handle = new WorkflowHandle(id, stub, getStub); + const handle = new WorkflowHandle(id, stub); try { await handle.status(); @@ -164,86 +147,41 @@ export class WorkflowBinding extends WorkerEntrypoint { } export class WorkflowHandle extends RpcTarget implements WorkflowInstance { - private stub: DurableObjectStub; - constructor( public id: string, - stub: DurableObjectStub, - private getStub: () => DurableObjectStub + private stub: DurableObjectStub ) { super(); - this.stub = stub; } public async pause(): Promise { - try { - await this.stub.changeInstanceStatus("pause"); - } catch (e) { - // pause causes instance abortion - if (!isUserTriggeredPause(e)) { - throw e; - } - } + // Look for instance in namespace + // Get engine stub + // Call a few functions on stub + throw new Error("Not implemented yet"); } public async resume(): Promise { - await this.stub.changeInstanceStatus("resume"); + throw new Error("Not implemented yet"); } public async terminate(): Promise { - try { - await this.stub.changeInstanceStatus("terminate"); - } catch (e) { - // terminate causes instance abortion - if (!isUserTriggeredTerminate(e)) { - throw e; - } - } + throw new Error("Not implemented yet"); } public async restart(): Promise { - try { - await this.stub.changeInstanceStatus("restart"); - } catch (e) { - // restart causes instance abortion - if (!isUserTriggeredRestart(e)) { - throw e; - } - } - - // trigger restart flow after abortion - this.stub = this.getStub(); - await this.stub.attemptRestart(); + throw new Error("Not implemented yet"); } public async status(): Promise< InstanceStatus & { __LOCAL_DEV_STEP_OUTPUTS: unknown[] } > { - // Both getStatus() and readLogs() must use the same fresh stub. - // After pause/restart/terminate aborts the DO, the stub goes stale - const fetchStatusAndLogs = async () => { - const status = await this.stub.getStatus(); - - // NOTE(lduarte): for some reason, sync functions over RPC are typed as never instead of Promise - const logs = await (this.stub.readLogs() as unknown as Promise< - EngineLogs & Disposable - >); - - return { status, logs }; - }; + const status = await this.stub.getStatus(); - let result: { - status: EngineInstanceStatus; - logs: EngineLogs & Disposable; - }; - try { - result = await fetchStatusAndLogs(); - } catch { - this.stub = this.getStub(); - result = await fetchStatusAndLogs(); - } - // Dispose the RPC handle when the method scope exits - using logs = result.logs; + // NOTE(lduarte): for some reason, sync functions over RPC are typed as never instead of Promise + using logs = await (this.stub.readLogs() as unknown as Promise< + EngineLogs & Disposable + >); const filteredLogs = logs.logs.filter( (log) => @@ -266,7 +204,7 @@ export class WorkflowHandle extends RpcTarget implements WorkflowInstance { )?.metadata.error; return { - status: instanceStatusName(result.status), + status: instanceStatusName(status), __LOCAL_DEV_STEP_OUTPUTS: stepOutputs, output: workflowOutput, error: workflowError, diff --git a/packages/workflows-shared/src/context.ts b/packages/workflows-shared/src/context.ts index 7e27a875f27c..ab2a145d9119 100644 --- a/packages/workflows-shared/src/context.ts +++ b/packages/workflows-shared/src/context.ts @@ -3,18 +3,12 @@ import { ms } from "itty-time"; import { INSTANCE_METADATA, InstanceEvent, InstanceStatus } from "./instance"; import { computeHash } from "./lib/cache"; import { - ABORT_REASONS, WorkflowFatalError, WorkflowInternalError, WorkflowTimeoutError, } from "./lib/errors"; import { calcRetryDuration } from "./lib/retries"; -import { - isValidStepConfig, - isValidStepName, - MAX_STEP_NAME_LENGTH, -} from "./lib/validators"; -import { MODIFIER_KEYS } from "./modifier"; +import { isValidStepName, MAX_STEP_NAME_LENGTH } from "./lib/validators"; import type { Engine } from "./engine"; import type { InstanceMetadata } from "./instance"; import type { @@ -51,7 +45,6 @@ export type StepState = { export type WorkflowStepContext = { attempt: number; }; -const PAUSE_DATETIME = "PAUSE_DATETIME"; export class Context extends RpcTarget { #engine: Engine; @@ -66,27 +59,6 @@ export class Context extends RpcTarget { this.#state = state; } - async #checkForPendingPause(): Promise { - if (this.#engine.timeoutHandler.isRunningStep()) { - return; - } - - const status = await this.#engine.getStatus(); - if (status === InstanceStatus.WaitingForPause) { - await this.#state.storage.put(PAUSE_DATETIME, new Date()); - const metadata = - await this.#state.storage.get(INSTANCE_METADATA); - if (metadata) { - await this.#engine.setStatus( - metadata.accountId, - metadata.instance.id, - InstanceStatus.Paused - ); - } - await this.#engine.abort(ABORT_REASONS.USER_PAUSE); - } - } - #getCount(name: string): number { let val = this.#counters.get(name) ?? 0; // 1-indexed, as we're increasing the value before write @@ -144,14 +116,6 @@ export class Context extends RpcTarget { throw error; } - if (!isValidStepConfig(stepConfig)) { - const error = new WorkflowFatalError( - `Step config for "${name}" is in a invalid format. See https://developers.cloudflare.com/workflows/build/sleeping-and-retrying/` - ) as Error & UserErrorField; - error.isUserError = true; - throw error; - } - let config: ResolvedStepConfig = { ...defaultConfig, ...stepConfig, @@ -336,7 +300,7 @@ export class Context extends RpcTarget { await this.#state.storage.put(stepStateKey, stepState); const priorityQueueHash = `${cacheKey}-${stepState.attemptedCount}`; - const mockErrorKey = `${MODIFIER_KEYS.MOCK_STEP_ERROR}${valueKey}`; + const mockErrorKey = `mock-step-error-${valueKey}`; const persistentMockError = await this.#state.storage.get<{ name: string; message: string; @@ -355,10 +319,10 @@ export class Context extends RpcTarget { } const replaceResult = await this.#state.storage.get( - `${MODIFIER_KEYS.REPLACE_RESULT}${valueKey}` + `replace-result-${valueKey}` ); - const forceStepTimeoutKey = `${MODIFIER_KEYS.FORCE_STEP_TIMEOUT}${valueKey}`; + const forceStepTimeoutKey = `force-step-timeout-${valueKey}`; const persistentStepTimeout = await this.#state.storage.get(forceStepTimeoutKey); const transientStepTimeout = await this.#state.storage.get( @@ -370,6 +334,7 @@ export class Context extends RpcTarget { result = await timeoutPromise(); } else if (replaceResult) { result = replaceResult; + await this.#state.storage.delete(`replace-result-${valueKey}`); // if there is a timeout to be forced we dont want to race with closure } else { result = await Promise.race([ @@ -422,18 +387,11 @@ export class Context extends RpcTarget { InstanceStatus.Errored ); await this.#engine.timeoutHandler.release(this.#engine); - await this.#engine.abort(ABORT_REASONS.NOT_SERIALISABLE); - } else if ( - e instanceof Error && - e.message.includes("string or blob too big: SQLITE_TOOBIG") - ) { - throw new WorkflowInternalError( - `Step ${stepNameWithCounter} output is too large. Maximum allowed size is 1MiB.` - ); + await this.#engine.abort("Value is not serialisable"); } else { // TODO (WOR-77): Send this to Sentry throw new WorkflowInternalError( - `Storage failure for ${stepNameWithCounter} due to internal error.` + `Storage failure for ${valueKey}: ${e} ` ); } return; @@ -549,12 +507,7 @@ export class Context extends RpcTarget { return result; }; - const result = await doWrapper(closure); - - // Check if a pause was requested while this step was running - await this.#checkForPendingPause(); - - return result; + return doWrapper(closure); } async sleep(name: string, duration: WorkflowSleepDuration): Promise { @@ -574,12 +527,8 @@ export class Context extends RpcTarget { const sleepNameCountHash = await computeHash( name + this.#getCount("sleep-" + name) ); - const disableThisSleep = await this.#state.storage.get( - `${MODIFIER_KEYS.DISABLE_SLEEP}${sleepNameCountHash}` - ); - const disableAllSleeps = await this.#state.storage.get( - MODIFIER_KEYS.DISABLE_ALL_SLEEPS - ); + const disableThisSleep = await this.#state.storage.get(sleepNameCountHash); + const disableAllSleeps = await this.#state.storage.get("disableAllSleeps"); const disableSleep = disableAllSleeps || disableThisSleep; @@ -735,7 +684,7 @@ export class Context extends RpcTarget { (a) => a.hash === cacheKey && a.type === "timeout" ); const forceEventTimeout = await this.#state.storage.get( - `${MODIFIER_KEYS.FORCE_EVENT_TIMEOUT}${waitForEventKey}` + `force-event-timeout-${waitForEventKey}` ); if ( (timeoutEntryPQ === undefined && diff --git a/packages/workflows-shared/src/engine.ts b/packages/workflows-shared/src/engine.ts index 0d07b7be6d45..6413eee84fd8 100644 --- a/packages/workflows-shared/src/engine.ts +++ b/packages/workflows-shared/src/engine.ts @@ -9,24 +9,14 @@ import { toInstanceStatus, } from "./instance"; import { computeHash } from "./lib/cache"; -import { - ABORT_REASONS, - createWorkflowError, - isAbortError, - isUserTriggeredPause, - WorkflowFatalError, -} from "./lib/errors"; +import { WorkflowFatalError } from "./lib/errors"; import { ENGINE_TIMEOUT, GracePeriodSemaphore, startGracePeriod, } from "./lib/gracePeriodSemaphore"; import { TimePriorityQueue } from "./lib/timePriorityQueue"; -import { - isModifierKey, - MODIFIER_KEYS, - WorkflowInstanceModifier, -} from "./modifier"; +import { WorkflowInstanceModifier } from "./modifier"; import type { Event } from "./context"; import type { InstanceMetadata, RawInstanceLog } from "./instance"; import type { WorkflowEntrypoint, WorkflowEvent } from "cloudflare:workers"; @@ -87,8 +77,6 @@ const EVENT_MAP_PREFIX = "EVENT_MAP"; export const DEFAULT_STEP_LIMIT = 10_000; -const PAUSE_DATETIME = "PAUSE_DATETIME"; - export class Engine extends DurableObject { logs: Array = []; @@ -125,14 +113,13 @@ export class Engine extends DurableObject { CHECK (action IN (0, 1)), -- guararentee that action can only be 0 or 1 UNIQUE (action, entryType, hash) ); - CREATE TABLE IF NOT EXISTS states ( - id INTEGER PRIMARY KEY NOT NULL, - timestamp TIMESTAMP DEFAULT (DATETIME('now','subsec')), - groupKey TEXT, - target TEXT, - metadata TEXT, - event INTEGER NOT NULL - ) + CREATE TABLE IF NOT EXISTS states ( + id INTEGER PRIMARY KEY NOT NULL, + groupKey TEXT, + target TEXT, + metadata TEXT, + event INTEGER NOT NULL + ) `); } catch (e) { console.error(e); @@ -423,9 +410,8 @@ export class Engine extends DurableObject { } } - async abort(reason: string) { - await this.ctx.storage.sync(); - this.ctx.abort(reason); + async abort(_reason: string) { + // TODO: Maybe don't actually kill but instead check a flag and return early if true } // Called by the dispose function when introspecting the instance in tests @@ -438,16 +424,16 @@ export class Engine extends DurableObject { } async storeEventMap() { + // TODO: this can be more efficient, but oh well await this.ctx.blockConcurrencyWhile(async () => { - const entries: Record = {}; - for (const [type, events] of this.eventMap.entries()) { - for (let i = 0; i < events.length; i++) { - entries[`${EVENT_MAP_PREFIX}\n${type}\n${i}`] = events[i]; + for (const [key, value] of this.eventMap.entries()) { + for (const eventIdx in value) { + await this.ctx.storage.put( + `${EVENT_MAP_PREFIX}\n${key}\n${eventIdx}`, + value[eventIdx] + ); } } - if (Object.keys(entries).length > 0) { - await this.ctx.storage.put(entries); - } }); } @@ -473,6 +459,8 @@ export class Engine extends DurableObject { payload: unknown; type: string; }) { + // Always queue the event first + // TODO: Persist it across lifetimes // There are four possible cases here: // - There is a callback waiting, send it // - There is no callback waiting but engine is alive, store it @@ -481,9 +469,9 @@ export class Engine extends DurableObject { // - Engine is not awake and is Errored or Terminated, this should not get called let eventTypeQueue = this.eventMap.get(event.type) ?? []; eventTypeQueue.push(event as Event); - - this.eventMap.set(event.type, eventTypeQueue); await this.storeEventMap(); + // TODO: persist eventMap - it can be over 2MiB + this.eventMap.set(event.type, eventTypeQueue); // if the engine is running if (this.isRunning) { @@ -505,9 +493,7 @@ export class Engine extends DurableObject { } } } else { - const mockEvent = await this.ctx.storage.get( - `${MODIFIER_KEYS.MOCK_EVENT}${event.type}` - ); + const mockEvent = await this.ctx.storage.get(`mock-event-${event.type}`); if (mockEvent) { return; } @@ -533,249 +519,7 @@ export class Engine extends DurableObject { return new WorkflowInstanceModifier(this, this.ctx); } - async changeInstanceStatus( - newStatus: "resume" | "pause" | "terminate" | "restart" - ) { - const metadata = - await this.ctx.storage.get(INSTANCE_METADATA); - - if (metadata === undefined) { - throw createWorkflowError( - "Instance does not exist", - "instance.not_found" - ); - } - - switch (newStatus) { - case "pause": - await this.userTriggeredPause(); - break; - case "resume": { - const currentStatus = await this.getStatus(); - if (currentStatus === InstanceStatus.WaitingForPause) { - // Engine is still running — cancel the pending pause - this.timeoutHandler.cancelWaitingPromisesByType("pause"); - await this.setStatus( - metadata.accountId, - metadata.instance.id, - InstanceStatus.Running - ); - } else if (currentStatus === InstanceStatus.Paused) { - await this.attemptResume(); - } - break; - } - case "terminate": { - const currentStatus = await this.getStatus(); - if ( - [ - InstanceStatus.Terminated, - InstanceStatus.Complete, - InstanceStatus.Errored, - ].includes(currentStatus) - ) { - throw createWorkflowError( - "Cannot terminate instance since its on a finite state", - "instance.cannot_terminate" - ); - } - await this.userTriggeredTerminate(); - break; - } - case "restart": - await this.userTriggeredRestart(); - break; - } - } - - async userTriggeredTerminate() { - const metadata = - await this.ctx.storage.get(INSTANCE_METADATA); - - if (metadata === undefined) { - throw createWorkflowError( - "Instance does not exist", - "instance.not_found" - ); - } - - this.writeLog(InstanceEvent.WORKFLOW_TERMINATED, null, null, { - trigger: { - source: InstanceTrigger.API, - }, - }); - - await this.setStatus( - metadata.accountId, - metadata.instance.id, - InstanceStatus.Terminated - ); - - await this.abort(ABORT_REASONS.USER_TERMINATE); - } - - async userTriggeredPause() { - const status = await this.getStatus(); - - if ( - status === InstanceStatus.Paused || - status === InstanceStatus.WaitingForPause - ) { - return; - } - - if ( - status !== InstanceStatus.Running && - status !== InstanceStatus.Waiting - ) { - return; - } - - const metadata = - await this.ctx.storage.get(INSTANCE_METADATA); - - if (metadata === undefined) { - throw createWorkflowError( - "Instance does not exist", - "instance.not_found" - ); - } - - await this.setStatus( - metadata.accountId, - metadata.instance.id, - InstanceStatus.WaitingForPause - ); - - void this.timeoutHandler - .waitUntilNothingIsRunning("pause", async () => { - await this.ctx.storage.put(PAUSE_DATETIME, new Date()); - await this.setStatus( - metadata.accountId, - metadata.instance.id, - InstanceStatus.Paused - ); - await this.abort(ABORT_REASONS.USER_PAUSE); - }) - .catch((e) => { - // Expected: abort rejects the promise chain when it kills the DO - if (!isAbortError(e)) { - throw e; - } - }); - } - - async userTriggeredRestart() { - // cleanup is done in attemptRestart() on the fresh DO instance - - await this.abort(ABORT_REASONS.USER_RESTART); - } - - private getMockedEventMapKeys(allKeys: Map): Set { - const mockEventTypes = new Set(); - for (const key of allKeys.keys()) { - if (key.startsWith(MODIFIER_KEYS.MOCK_EVENT)) { - mockEventTypes.add(key.slice(MODIFIER_KEYS.MOCK_EVENT.length)); - } - } - - if (mockEventTypes.size === 0) { - return new Set(); - } - - const preserved = new Set(); - for (const key of allKeys.keys()) { - if (key.startsWith(`${EVENT_MAP_PREFIX}\n`)) { - // EVENT_MAP keys are formatted as "EVENT_MAP\n{type}\n{idx}" - const eventType = key.split("\n")[1]; - if (eventType !== undefined && mockEventTypes.has(eventType)) { - preserved.add(key); - } - } - } - - return preserved; - } - - async attemptRestart() { - this.ctx.storage.sql.exec("DELETE FROM states"); - this.ctx.storage.sql.exec("DELETE FROM priority_queue"); - - const allKeys = await this.ctx.storage.list(); - const preservedEventMapKeys = this.getMockedEventMapKeys(allKeys); - - // Remove all KV keys except: - // - INSTANCE_METADATA (needed to re-run the workflow) - // - Modifier/mock keys (so mocks survive restart) - // - EVENT_MAP entries for mocked event types - for (const key of allKeys.keys()) { - if ( - key === INSTANCE_METADATA || - isModifierKey(key) || - preservedEventMapKeys.has(key) - ) { - continue; - } - await this.ctx.storage.delete(key); - } - - const metadata = - await this.ctx.storage.get(INSTANCE_METADATA); - - if (metadata === undefined) { - throw createWorkflowError( - "Instance does not exist", - "instance.not_found" - ); - } - - const { accountId, workflow, version, instance, event } = metadata; - - this.writeLog(InstanceEvent.WORKFLOW_QUEUED, null, null, { - params: event.payload, - versionId: version.id, - trigger: { - source: InstanceTrigger.API, - }, - }); - this.writeLog(InstanceEvent.WORKFLOW_START, null, null, {}); - - void this.init(accountId, workflow, version, instance, event); - } - - async attemptResume() { - const metadata = - await this.ctx.storage.get(INSTANCE_METADATA); - - if (metadata === undefined) { - throw createWorkflowError( - "Instance does not exist", - "instance.not_found" - ); - } - - const status = - await this.ctx.storage.get(ENGINE_STATUS_KEY); - if (status !== InstanceStatus.Paused) { - return; - } - - // Offset priority queue timers by the pause duration so that - // sleeps/retries resume from where they left off - const pausedDate = await this.ctx.storage.get(PAUSE_DATETIME); - if (pausedDate !== undefined) { - const offset = Date.now() - new Date(pausedDate).valueOf(); - const pq = new TimePriorityQueue(this.ctx, metadata); - pq.offsetAll(offset); - } - await this.ctx.storage.delete(PAUSE_DATETIME); - - const { accountId, workflow, version, instance, event } = metadata; - - await this.ctx.storage.put(ENGINE_STATUS_KEY, InstanceStatus.Queued); - - void this.init(accountId, workflow, version, instance, event); - } + async userTriggeredTerminate() {} async init( accountId: number, @@ -816,20 +560,11 @@ export class Engine extends DurableObject { InstanceStatus.Errored, // TODO (WOR-85): Remove this once upgrade story is done InstanceStatus.Terminated, InstanceStatus.Complete, - InstanceStatus.Paused, ].includes(status) ) { return; } - // If the DO restarted (e.g. from alarm) while in WaitingForPause state, - // transition to Paused and return early — same as production behaviour. - if (status === InstanceStatus.WaitingForPause) { - await this.ctx.storage.put(PAUSE_DATETIME, new Date()); - await this.setStatus(accountId, instance.id, InstanceStatus.Paused); - return; - } - if ((await this.ctx.storage.get(INSTANCE_METADATA)) == undefined) { const instanceMetadata: InstanceMetadata = { accountId, @@ -841,6 +576,7 @@ export class Engine extends DurableObject { await this.ctx.storage.put(INSTANCE_METADATA, instanceMetadata); // TODO (WOR-78): We currently don't have a queue mechanism + // WORKFLOW_QUEUED should happen before engine is spun up this.writeLog(InstanceEvent.WORKFLOW_QUEUED, null, null, { params: event.payload, versionId: version.id, @@ -879,11 +615,6 @@ export class Engine extends DurableObject { }); this.isRunning = false; } catch (err) { - if (isUserTriggeredPause(err)) { - this.isRunning = false; - return; - } - let error; if (err instanceof Error) { if ( @@ -897,7 +628,7 @@ export class Engine extends DurableObject { }); await this.setStatus(accountId, instance.id, InstanceStatus.Errored); - await this.abort(ABORT_REASONS.NON_RETRYABLE_ERROR); + await this.abort(`A step threw a NonRetryableError`); this.isRunning = false; return; } diff --git a/packages/workflows-shared/src/instance.ts b/packages/workflows-shared/src/instance.ts index a0994fc49b16..0b6bcd1b79e8 100644 --- a/packages/workflows-shared/src/instance.ts +++ b/packages/workflows-shared/src/instance.ts @@ -34,8 +34,7 @@ export enum InstanceStatus { Errored = 3, // Stopped due to a user or system Error Terminated = 4, // Stopped explicitly by user Complete = 5, // Successful completion - WaitingForPause = 6, - Waiting = 7, + // TODO (WOR-71): Sleep } export function instanceStatusName(status: InstanceStatus) { @@ -52,10 +51,6 @@ export function instanceStatusName(status: InstanceStatus) { return "terminated"; case InstanceStatus.Complete: return "complete"; - case InstanceStatus.WaitingForPause: - return "waitingForPause"; - case InstanceStatus.Waiting: - return "waiting"; default: return "unknown"; } @@ -68,8 +63,6 @@ export const instanceStatusNames = [ "errored", "terminated", "complete", - "waitingForPause", - "waiting", "unknown", ] as const; @@ -87,10 +80,6 @@ export function toInstanceStatus(status: string): InstanceStatus { return InstanceStatus.Terminated; case "complete": return InstanceStatus.Complete; - case "waitingForPause": - return InstanceStatus.WaitingForPause; - case "waiting": - return InstanceStatus.Waiting; case "unknown": throw new Error("unknown cannot be parsed into a InstanceStatus"); default: diff --git a/packages/workflows-shared/src/lib/errors.ts b/packages/workflows-shared/src/lib/errors.ts index 1f529692c8c9..e8ee6b2a590f 100644 --- a/packages/workflows-shared/src/lib/errors.ts +++ b/packages/workflows-shared/src/lib/errors.ts @@ -20,55 +20,3 @@ export class WorkflowFatalError extends Error { export class WorkflowError extends Error { name = "WorkflowError"; } - -export function createWorkflowError( - message: string, - errorCode: string -): WorkflowError { - return new WorkflowError(`(${errorCode}) ${message}`); -} - -const ABORT_PREFIX = "Aborting engine:" as const; - -export const ABORT_REASONS = { - USER_PAUSE: `${ABORT_PREFIX} User called pause`, - USER_RESTART: `${ABORT_PREFIX} User called restart`, - USER_TERMINATE: `${ABORT_PREFIX} User called terminate`, - NON_RETRYABLE_ERROR: `${ABORT_PREFIX} A step threw a NonRetryableError`, - NOT_SERIALISABLE: `${ABORT_PREFIX} Value is not serialisable`, - GRACE_PERIOD_COMPLETE: `${ABORT_PREFIX} Grace period complete`, -} as const; - -const ABORT_REASON_SET: ReadonlySet = new Set( - Object.values(ABORT_REASONS) -); - -function getErrorMessage(e: unknown): string | undefined { - if (e instanceof Error) { - return e.message; - } - if (typeof e === "object" && e !== null) { - const msg = (e as { message?: string }).message; - if (typeof msg === "string") { - return msg; - } - } - return undefined; -} - -export function isAbortError(e: unknown): boolean { - const msg = getErrorMessage(e); - return msg !== undefined && ABORT_REASON_SET.has(msg); -} - -export function isUserTriggeredPause(e: unknown): boolean { - return getErrorMessage(e) === ABORT_REASONS.USER_PAUSE; -} - -export function isUserTriggeredRestart(e: unknown): boolean { - return getErrorMessage(e) === ABORT_REASONS.USER_RESTART; -} - -export function isUserTriggeredTerminate(e: unknown): boolean { - return getErrorMessage(e) === ABORT_REASONS.USER_TERMINATE; -} diff --git a/packages/workflows-shared/src/lib/gracePeriodSemaphore.ts b/packages/workflows-shared/src/lib/gracePeriodSemaphore.ts index 1480ca82c7f7..6c48c9d73ea5 100644 --- a/packages/workflows-shared/src/lib/gracePeriodSemaphore.ts +++ b/packages/workflows-shared/src/lib/gracePeriodSemaphore.ts @@ -1,5 +1,4 @@ import { ms } from "itty-time"; -import { ABORT_REASONS } from "./errors"; import type { Engine } from "../engine"; import type { WorkflowSleepDuration } from "cloudflare:workers"; @@ -7,24 +6,12 @@ export const ENGINE_TIMEOUT = ms("5 minutes" satisfies WorkflowSleepDuration); let latestGracePeriodTimestamp: number | undefined = undefined; -export type WaitingPromiseType = "pause"; - export type GracePeriodCallback = (engine: Engine, timeoutMs: number) => void; export class GracePeriodSemaphore { #counter: number = 0; readonly callback: GracePeriodCallback; readonly timeoutMs: number; - #waitingPromises: { - rejectCallback: () => void; - resolveCallback: (value: unknown) => void; - type: WaitingPromiseType; - }[] = []; - #canInitiateSteps = true; - #waitingSteps: { - rejectCallback: () => void; - resolveCallback: (value: unknown) => void; - }[] = []; constructor(callback: GracePeriodCallback, timeoutMs: number) { this.callback = callback; @@ -33,14 +20,6 @@ export class GracePeriodSemaphore { // acquire takes engine to be the same as release async acquire(_engine: Engine) { - if (!this.#canInitiateSteps) { - await new Promise((resolve, reject) => { - this.#waitingSteps.push({ - resolveCallback: resolve, - rejectCallback: reject, - }); - }); - } // when the counter goes from 0 to 1 - we can safely reject the previous grace period if (this.#counter == 0) { latestGracePeriodTimestamp = undefined; @@ -54,71 +33,7 @@ export class GracePeriodSemaphore { // Trigger timeout promise, no need to await here, // this can be triggered slightly after it's not time sensitive this.callback(engine, this.timeoutMs); - // Resolve any promises waiting for all steps to finish (e.g. pause) - for (const promise of this.#waitingPromises) { - promise.resolveCallback(undefined); - } - this.#waitingPromises = []; - } - } - - async waitUntilNothingIsRunning( - type: WaitingPromiseType, - callback: () => Promise - ): Promise { - this.#canInitiateSteps = false; - if (this.#counter > 0) { - try { - await new Promise((resolve, reject) => { - this.#waitingPromises.push({ - resolveCallback: resolve, - rejectCallback: reject, - type, - }); - }); - } catch { - // If the promise gets rejected (e.g. resume cancels the pause), - // allow steps to run again and unblock any that were waiting - for (const promise of this.#waitingSteps) { - promise.resolveCallback(undefined); - } - this.#waitingSteps = []; - this.#canInitiateSteps = true; - return; - } - } - await callback(); - // Allow steps to run again and unblock any that were waiting - for (const promise of this.#waitingSteps) { - promise.resolveCallback(undefined); - } - this.#waitingSteps = []; - this.#canInitiateSteps = true; - } - - cancelWaitingPromisesByType(type: WaitingPromiseType) { - const sameTypePromises = this.#waitingPromises.filter( - (val) => val.type === type - ); - if (sameTypePromises.length === 0) { - return; - } - - for (const promise of sameTypePromises) { - promise.rejectCallback(); - } - - this.#waitingPromises = this.#waitingPromises.filter( - (val) => val.type !== type - ); - - this.#canInitiateSteps = true; - - // Unblock any steps that were waiting to acquire while the pause was pending - for (const promise of this.#waitingSteps) { - promise.resolveCallback(undefined); } - this.#waitingSteps = []; } isRunningStep() { @@ -166,7 +81,7 @@ export const startGracePeriod: GracePeriodCallback = async ( // Ensure next alarm is set before we abort await engine.priorityQueue?.handleNextAlarm(); - await engine.abort(ABORT_REASONS.GRACE_PERIOD_COMPLETE); + await engine.abort("Grace period complete"); }; void gracePeriodHandler(); }; diff --git a/packages/workflows-shared/src/lib/timePriorityQueue.ts b/packages/workflows-shared/src/lib/timePriorityQueue.ts index 2c213ac54401..a04e998a88eb 100644 --- a/packages/workflows-shared/src/lib/timePriorityQueue.ts +++ b/packages/workflows-shared/src/lib/timePriorityQueue.ts @@ -109,30 +109,6 @@ export class TimePriorityQueue { }); } - offsetAll(offset: number) { - // Clear the entire PQ table and re-insert only the offset entries. - // We can't use the append-only add/remove pattern here because the - // UNIQUE (action, entryType, hash) constraint would conflict with - // the original action=1 rows still in the table. - this.#ctx.storage.transactionSync(() => { - const entries = this.#heap.toArray(); - - // Wipe the table — removes all historical add/remove rows - this.#ctx.storage.sql.exec("DELETE FROM priority_queue"); - - const newEntries = entries.map((value) => ({ - ...value, - targetTimestamp: value.targetTimestamp + offset, - })); - for (const entry of newEntries) { - this.addEntryDB(entry); - } - // re-init in-memory heap - this.#heap = new Heap(wakerPriorityEntryComparator); - this.#heap.init(newEntries); - }); - } - popTypeAll(entryType: WakerPriorityType) { this.#ctx.storage.transactionSync(() => { this.filter((e) => e.type !== entryType); diff --git a/packages/workflows-shared/src/lib/validators.ts b/packages/workflows-shared/src/lib/validators.ts index ea702dc318dd..8fe17611f5c6 100644 --- a/packages/workflows-shared/src/lib/validators.ts +++ b/packages/workflows-shared/src/lib/validators.ts @@ -1,6 +1,3 @@ -import { ms } from "itty-time"; -import { z } from "zod"; - export const MAX_WORKFLOW_NAME_LENGTH = 64; export const MAX_WORKFLOW_INSTANCE_ID_LENGTH = 100; @@ -46,41 +43,3 @@ export function isValidStepName(name: string): boolean { return !CONTROL_CHAR_REGEX.test(name); } - -const STEP_CONFIG_SCHEMA = z - .object({ - retries: z - .object({ - delay: z.number().gte(0).or(z.string()), - limit: z.number().gte(0), - backoff: z.enum(["constant", "linear", "exponential"]).optional(), - }) - .strict() - .optional(), - timeout: z.number().gte(0).or(z.string()).optional(), - }) - .strict(); - -export function isValidStepConfig(stepConfig: unknown): boolean { - const config = STEP_CONFIG_SCHEMA.safeParse(stepConfig); - - if (!config.success) { - return false; - } - - if ( - config.data.retries !== undefined && - Number.isNaN(ms(config.data.retries.delay)) - ) { - return false; - } - - if (config.data.timeout !== undefined) { - const timeout = config.data.timeout; - if (timeout == 0 || Number.isNaN(ms(config.data.timeout))) { - return false; - } - } - - return true; -} diff --git a/packages/workflows-shared/src/modifier.ts b/packages/workflows-shared/src/modifier.ts index 04f32ccef24f..743dd2aec438 100644 --- a/packages/workflows-shared/src/modifier.ts +++ b/packages/workflows-shared/src/modifier.ts @@ -13,22 +13,6 @@ type UserEvent = { payload: unknown; }; -// KV key prefixes/values used by the modifier/mock system -export const MODIFIER_KEYS = { - REPLACE_RESULT: "replace-result-", - MOCK_STEP_ERROR: "mock-step-error-", - MOCK_EVENT: "mock-event-", - FORCE_STEP_TIMEOUT: "force-step-timeout-", - FORCE_EVENT_TIMEOUT: "force-event-timeout-", - FAILURE_INDEX: "failure-index-", - DISABLE_SLEEP: "disable-sleep-", - DISABLE_ALL_SLEEPS: "disableAllSleeps", -} as const; - -export function isModifierKey(key: string): boolean { - return Object.values(MODIFIER_KEYS).some((v) => key.startsWith(v)); -} - export class WorkflowInstanceModifier extends RpcTarget { #engine: Engine; #state: DurableObjectState; @@ -65,7 +49,7 @@ export class WorkflowInstanceModifier extends RpcTarget { } #getAndIncrementCounter = async (valueKey: string, by: number) => { - const counterKey = `${MODIFIER_KEYS.FAILURE_INDEX}${valueKey}`; + const counterKey = `failure-index-${valueKey}`; const next = (await this.#state.storage.get(counterKey)) ?? 1; await this.#state.storage.put(counterKey, next + by); return next; @@ -78,12 +62,12 @@ export class WorkflowInstanceModifier extends RpcTarget { } const sleepNameCountHash = await computeHash(step.name + count); - return `${MODIFIER_KEYS.DISABLE_SLEEP}${sleepNameCountHash}`; + return sleepNameCountHash; } async disableSleeps(steps?: StepSelector[]): Promise { if (!steps) { - await this.#state.storage.put(MODIFIER_KEYS.DISABLE_ALL_SLEEPS, true); + await this.#state.storage.put("disableAllSleeps", true); } else { for (const step of steps) { const sleepDisableKey = await this.#getSleepStepDisableKey(step); @@ -101,20 +85,13 @@ export class WorkflowInstanceModifier extends RpcTarget { async mockStepResult(step: StepSelector, stepResult: unknown): Promise { const valueKey = await this.#getStepCacheKey(step); - if ( - await this.#state.storage.get( - `${MODIFIER_KEYS.REPLACE_RESULT}${valueKey}` - ) - ) { + if (await this.#state.storage.get(`replace-result-${valueKey}`)) { throw new Error( `[WorkflowIntrospector] Trying to mock step '${step.name}' multiple times!` ); } - await this.#state.storage.put( - `${MODIFIER_KEYS.REPLACE_RESULT}${valueKey}`, - stepResult - ); + await this.#state.storage.put(`replace-result-${valueKey}`, stepResult); } // Same logic of `mockStepResult` but stores an error instead of a value. @@ -129,11 +106,7 @@ export class WorkflowInstanceModifier extends RpcTarget { message: error.message, }; - if ( - await this.#state.storage.get( - `${MODIFIER_KEYS.REPLACE_RESULT}${valueKey}` - ) - ) { + if (await this.#state.storage.get(`replace-result-${valueKey}`)) { throw new Error( `[WorkflowIntrospector] Trying to mock error on step '${step.name}' after mocking its result!` ); @@ -143,13 +116,13 @@ export class WorkflowInstanceModifier extends RpcTarget { const start = await this.#getAndIncrementCounter(valueKey, times); const mockErrorsPuts = Array.from({ length: times }, (_, i) => { const attempt = start + i; - const mockErrorKey = `${MODIFIER_KEYS.MOCK_STEP_ERROR}${valueKey}-${attempt}`; + const mockErrorKey = `mock-step-error-${valueKey}-${attempt}`; return this.#state.storage.put(mockErrorKey, serializableError); }); await Promise.all(mockErrorsPuts); } else { - const mockErrorKey = `${MODIFIER_KEYS.MOCK_STEP_ERROR}${valueKey}`; + const mockErrorKey = `mock-step-error-${valueKey}`; await this.#state.storage.put(mockErrorKey, serializableError); } } @@ -157,11 +130,7 @@ export class WorkflowInstanceModifier extends RpcTarget { async forceStepTimeout(step: StepSelector, times?: number) { const valueKey = await this.#getStepCacheKey(step); - if ( - await this.#state.storage.get( - `${MODIFIER_KEYS.REPLACE_RESULT}${valueKey}` - ) - ) { + if (await this.#state.storage.get(`replace-result-${valueKey}`)) { throw new Error( `[WorkflowIntrospector] Trying to force timeout on step '${step.name}' after mocking its result!` ); @@ -171,13 +140,13 @@ export class WorkflowInstanceModifier extends RpcTarget { const start = await this.#getAndIncrementCounter(valueKey, times); const forceTimeouts = Array.from({ length: times }, (_, i) => { const attempt = start + i; - const forceStepTimeoutKey = `${MODIFIER_KEYS.FORCE_STEP_TIMEOUT}${valueKey}-${attempt}`; + const forceStepTimeoutKey = `force-step-timeout-${valueKey}-${attempt}`; return this.#state.storage.put(forceStepTimeoutKey, true); }); await Promise.all(forceTimeouts); } else { - const forceStepTimeoutKey = `${MODIFIER_KEYS.FORCE_STEP_TIMEOUT}${valueKey}`; + const forceStepTimeoutKey = `force-step-timeout-${valueKey}`; await this.#state.storage.put(forceStepTimeoutKey, true); } } @@ -189,17 +158,14 @@ export class WorkflowInstanceModifier extends RpcTarget { type: event.type, }; - await this.#state.storage.put( - `${MODIFIER_KEYS.MOCK_EVENT}${event.type}`, - true - ); + await this.#state.storage.put(`mock-event-${event.type}`, true); await this.#engine.receiveEvent(myEvent); } async forceEventTimeout(step: StepSelector): Promise { const waitForEventKey = await this.#getWaitForEventCacheKey(step); await this.#state.storage.put( - `${MODIFIER_KEYS.FORCE_EVENT_TIMEOUT}${waitForEventKey}`, + `force-event-timeout-${waitForEventKey}`, true ); } diff --git a/packages/workflows-shared/tests/binding.test.ts b/packages/workflows-shared/tests/binding.test.ts index 45e38a642076..d4042fad6424 100644 --- a/packages/workflows-shared/tests/binding.test.ts +++ b/packages/workflows-shared/tests/binding.test.ts @@ -1,141 +1,26 @@ import { createExecutionContext, runInDurableObject } from "cloudflare:test"; import { env } from "cloudflare:workers"; import { describe, it, vi } from "vitest"; -import { InstanceEvent, InstanceStatus } from "../src"; import { WorkflowBinding } from "../src/binding"; -import { setTestWorkflowCallback } from "./test-entry"; -import type { WorkflowHandle } from "../src/binding"; -import type { Engine, EngineLogs } from "../src/engine"; -import type { WorkflowEvent } from "cloudflare:workers"; +import { setWorkflowEntrypoint } from "./utils"; -function createBinding(): WorkflowBinding { - const ctx = createExecutionContext(); - return new WorkflowBinding(ctx, { - ENGINE: env.ENGINE, - BINDING_NAME: "TEST_WORKFLOW", - }); -} - -async function waitUntilLogEvent( - engineStub: DurableObjectStub, - event: InstanceEvent, - timeout = 1000 -): Promise { - await vi.waitUntil( - async () => { - const logs = (await engineStub.readLogs()) as EngineLogs; - const hasEvent = logs.logs.some((log) => log.event === event); - return hasEvent; - }, - { timeout } - ); -} - -describe("WorkflowBinding", () => { - describe("create()", () => { - it("should create an instance with provided id and params", async ({ - expect, - }) => { - const binding = createBinding(); - const engineStub = env.ENGINE.get( - env.ENGINE.idFromName("some-instance-id") - ); - - setTestWorkflowCallback(async (event) => { - return (event as WorkflowEvent<{ key: string }>).payload; - }); - - const params = { key: "test-value" }; - const result = await binding.create({ id: "some-instance-id", params }); - - expect(result.id).toBe("some-instance-id"); - - await waitUntilLogEvent(engineStub, InstanceEvent.WORKFLOW_SUCCESS); - - const instance = await binding.get("some-instance-id"); - const status = await instance.status(); - expect(status.output).toEqual(params); - }); - - it("should auto-generate id when not provided", async ({ expect }) => { - const binding = createBinding(); - const result = await binding.create(); - - expect(result.id).toBeDefined(); - expect(result.id.length).toBeGreaterThan(0); - }); - - it("should throw WorkflowError for invalid instance id", async ({ - expect, - }) => { - const binding = createBinding(); - await expect(binding.create({ id: "#invalid!" })).rejects.toThrow( - "Workflow instance has invalid id" - ); - }); - }); - - describe("get()", () => { - it("should return a WorkflowHandle for an existing instance", async ({ - expect, - }) => { - const binding = createBinding(); - env.ENGINE.get(env.ENGINE.idFromName("some-instance-id")); - - setTestWorkflowCallback(async () => "done"); - await binding.create({ id: "some-instance-id" }); - - const instance = await binding.get("some-instance-id"); - - expect(instance).toMatchObject({ - id: "some-instance-id", - status: expect.any(Function), - pause: expect.any(Function), - resume: expect.any(Function), - terminate: expect.any(Function), - restart: expect.any(Function), - }); - }); - }); - - describe("createBatch()", () => { - it("should create multiple instances in a batch", async ({ expect }) => { - const binding = createBinding(); - const ids = ["batch-1", "batch-2", "batch-3"]; - setTestWorkflowCallback(async () => "done"); - - const results = await binding.createBatch(ids.map((id) => ({ id }))); - - expect(results).toHaveLength(3); - expect(results.map((r) => r.id)).toEqual(ids); - - for (const id of ids) { - const instance = await binding.get(id); - expect(instance.id).toBe(id); - } - }); - - it("should throw error for empty batch", async ({ expect }) => { - const binding = createBinding(); - - await expect(binding.createBatch([])).rejects.toThrow( - "WorkflowError: batchCreate should have at least 1 instance" - ); - }); - }); -}); - -// TODO: move describe("WorkflowBinding", () => { it("should not call dispose when sending an event to an instance", async ({ expect, }) => { - const binding = createBinding(); - const engineStub = env.ENGINE.get( - env.ENGINE.idFromName("some-instance-id") - ); + const instanceId = "test-instance-with-event"; + const ctx = createExecutionContext(); + + const binding = new WorkflowBinding(ctx, { + ENGINE: env.ENGINE, + BINDING_NAME: "TEST_WORKFLOW", + }); + + // Set up a workflow that waits for an event + const engineId = env.ENGINE.idFromName(instanceId); + const engineStub = env.ENGINE.get(engineId); - setTestWorkflowCallback(async (_event, step) => { + await setWorkflowEntrypoint(engineStub, async (event, step) => { const receivedEvent = await step.waitForEvent("wait-for-test-event", { type: "test-event", timeout: "10 seconds", @@ -143,11 +28,11 @@ describe("WorkflowBinding", () => { return receivedEvent; }); - const { id } = await binding.create({ id: "some-instance-id" }); - expect(id).toBe("some-instance-id"); + const { id } = await binding.create({ id: instanceId }); + expect(id).toBe(instanceId); - const instance = await binding.get("some-instance-id"); - expect(instance.id).toBe("some-instance-id"); + const instance = await binding.get(instanceId); + expect(instance.id).toBe(instanceId); const disposeSpy = vi.fn(); @@ -166,10 +51,11 @@ describe("WorkflowBinding", () => { payload: { test: "data" }, })) as unknown as Disposable; - await vi.waitUntil( + // Wait a bit to ensure event processing + await vi.waitFor( async () => { const status = await instance.status(); - return status.status === "complete"; + expect(status.status).toBe("complete"); }, { timeout: 1000 } ); @@ -177,473 +63,3 @@ describe("WorkflowBinding", () => { expect(disposeSpy).not.toHaveBeenCalled(); }); }); - -describe("WorkflowHandle", () => { - describe("status()", () => { - it("should return running status for a workflow waiting for an event", async ({ - expect, - }) => { - const binding = createBinding(); - const engineStub = env.ENGINE.get( - env.ENGINE.idFromName("some-instance-id") - ); - - setTestWorkflowCallback(async (_event, step) => { - await step.waitForEvent("wait-for-event", { - type: "some-event", - timeout: "2 seconds", - }); - return "completed"; - }); - - await binding.create({ id: "some-instance-id" }); - await waitUntilLogEvent(engineStub, InstanceEvent.WAIT_START); - - const instance = await binding.get("some-instance-id"); - const status = await instance.status(); - - expect(status.status).toBe("running"); - expect(status.output).toBeNull(); - expect(status.error).toBeUndefined(); - }); - - it("should return complete status and output for a successful workflow", async ({ - expect, - }) => { - const binding = createBinding(); - const engineStub = env.ENGINE.get( - env.ENGINE.idFromName("some-instance-id") - ); - const expectedOutput = { result: "success", value: 42 }; - - setTestWorkflowCallback(async () => expectedOutput); - await binding.create({ id: "some-instance-id" }); - await waitUntilLogEvent(engineStub, InstanceEvent.WORKFLOW_SUCCESS); - - const instance = await binding.get("some-instance-id"); - const status = await instance.status(); - - expect(status.status).toBe("complete"); - expect(status.output).toEqual(expectedOutput); - expect(status.error).toBeUndefined(); - }); - - it("should return errored status and error for a failed workflow", async ({ - expect, - }) => { - const binding = createBinding(); - const engineStub = env.ENGINE.get( - env.ENGINE.idFromName("some-instance-id") - ); - - setTestWorkflowCallback(async () => { - throw new Error("Workflow failed intentionally"); - }); - - await binding.create({ id: "some-instance-id" }); - await waitUntilLogEvent(engineStub, InstanceEvent.WORKFLOW_FAILURE); - - const instance = await binding.get("some-instance-id"); - const status = await instance.status(); - - expect(status.status).toBe("errored"); - expect(status.error).toBeDefined(); - expect(status.error?.message).toBe("Workflow failed intentionally"); - expect(status.output).toBeNull(); - }); - - it("should return step outputs in __LOCAL_DEV_STEP_OUTPUTS", async ({ - expect, - }) => { - const binding = createBinding(); - const engineStub = env.ENGINE.get( - env.ENGINE.idFromName("some-instance-id") - ); - - setTestWorkflowCallback(async (_event, step) => { - const step1Result = await step.do( - "step-1", - async () => "result-from-step-1" - ); - const step2Result = await step.do("step-2", async () => ({ - data: "result-from-step-2", - })); - const step3Result = await step.do("step-3", async () => 123); - return { step1Result, step2Result, step3Result }; - }); - - await binding.create({ id: "some-instance-id" }); - await waitUntilLogEvent(engineStub, InstanceEvent.WORKFLOW_SUCCESS); - - const instance = (await binding.get( - "some-instance-id" - )) as WorkflowHandle; - const status = await instance.status(); - - expect(status.status).toBe("complete"); - expect(status.__LOCAL_DEV_STEP_OUTPUTS).toHaveLength(3); - expect(status.__LOCAL_DEV_STEP_OUTPUTS[0]).toBe("result-from-step-1"); - expect(status.__LOCAL_DEV_STEP_OUTPUTS[1]).toEqual({ - data: "result-from-step-2", - }); - expect(status.__LOCAL_DEV_STEP_OUTPUTS[2]).toBe(123); - }); - - it("should return terminated status for a terminated instance", async ({ - expect, - }) => { - const binding = createBinding(); - const engineStub = env.ENGINE.get( - env.ENGINE.idFromName("some-instance-id") - ); - - setTestWorkflowCallback(async (_event, step) => { - await step.waitForEvent("wait-for-event", { - type: "some-event", - timeout: "1 second", - }); - return "completed"; - }); - - await binding.create({ id: "some-instance-id" }); - await waitUntilLogEvent(engineStub, InstanceEvent.WAIT_START); - - const instance = await binding.get("some-instance-id"); - await instance.terminate(); - - const newInstance = await binding.get("some-instance-id"); - const status = await newInstance.status(); - - expect(status.status).toBe("terminated"); - }); - }); - - describe("sendEvent()", () => { - it("should deliver event payload to a waiting workflow", async ({ - expect, - }) => { - const binding = createBinding(); - const engineStub = env.ENGINE.get( - env.ENGINE.idFromName("some-instance-id") - ); - - setTestWorkflowCallback(async (_event, step) => { - const receivedEvent = await step.waitForEvent("wait-for-event", { - type: "my-event-type", - timeout: "2 seconds", - }); - return receivedEvent; - }); - - await binding.create({ id: "some-instance-id" }); - await waitUntilLogEvent(engineStub, InstanceEvent.WAIT_START); - - const instance = await binding.get("some-instance-id"); - const eventPayload = { message: "hello", count: 42 }; - - await instance.sendEvent({ - type: "my-event-type", - payload: eventPayload, - }); - - await vi.waitUntil( - async () => { - const status = await instance.status(); - return status.status === "complete"; - }, - { timeout: 1000 } - ); - - const status = await instance.status(); - expect(status.output).toMatchObject({ - payload: eventPayload, - type: "my-event-type", - }); - }); - - it("should handle multiple sequential events", async ({ expect }) => { - const binding = createBinding(); - const engineStub = env.ENGINE.get( - env.ENGINE.idFromName("some-instance-id") - ); - - setTestWorkflowCallback(async (_event, step) => { - const event1 = await step.waitForEvent("wait-1", { - type: "event-type-1", - timeout: "10 seconds", - }); - const event2 = await step.waitForEvent("wait-2", { - type: "event-type-2", - timeout: "10 seconds", - }); - return { first: event1.payload, second: event2.payload }; - }); - - await binding.create({ id: "some-instance-id" }); - const instance = await binding.get("some-instance-id"); - - await waitUntilLogEvent(engineStub, InstanceEvent.WAIT_START); - await instance.sendEvent({ - type: "event-type-1", - payload: { value: "first" }, - }); - - // Wait for the second waitForEvent - await vi.waitUntil( - async () => { - const logs = (await engineStub.readLogs()) as EngineLogs; - const waitStarts = logs.logs.filter( - (log) => log.event === InstanceEvent.WAIT_START - ); - return waitStarts.length === 2; - }, - { timeout: 1000 } - ); - - await instance.sendEvent({ - type: "event-type-2", - payload: { value: "second" }, - }); - - await vi.waitUntil( - async () => { - const status = await instance.status(); - return status.status === "complete"; - }, - { timeout: 1000 } - ); - - const status = await instance.status(); - expect(status.output).toEqual({ - first: { value: "first" }, - second: { value: "second" }, - }); - }); - }); - - describe("terminate()", () => { - it("should terminate a running workflow instance", async ({ expect }) => { - const binding = createBinding(); - const engineStub = env.ENGINE.get( - env.ENGINE.idFromName("some-instance-id") - ); - - setTestWorkflowCallback(async (_event, step) => { - await step.waitForEvent("wait-for-event", { - type: "some-event", - timeout: "1 second", - }); - await step.do("should not be called", async () => { - return "should not be called"; - }); - return "should never complete"; - }); - - await binding.create({ id: "some-instance-id" }); - await waitUntilLogEvent(engineStub, InstanceEvent.WAIT_START); - - const instance = await binding.get("some-instance-id"); - await instance.terminate(); - - // Get a new stub since the engine was aborted - const newEngineStub = env.ENGINE.get( - env.ENGINE.idFromName("some-instance-id") - ); - - const status = await runInDurableObject(newEngineStub, (engine) => { - return engine.getStatus(); - }); - expect(status).toBe(InstanceStatus.Terminated); - - const logs = (await newEngineStub.readLogs()) as EngineLogs; - const hasTerminatedEvent = logs.logs.some( - (log) => log.event === InstanceEvent.WORKFLOW_TERMINATED - ); - expect(hasTerminatedEvent).toBe(true); - - // assert that step.do never started - const hasStepStart = logs.logs.some( - (log) => log.event === InstanceEvent.STEP_START - ); - expect(hasStepStart).toBe(false); - }); - }); - - describe("restart()", () => { - it("should restart a workflow instance", async ({ expect }) => { - const binding = createBinding(); - const engineStub = env.ENGINE.get( - env.ENGINE.idFromName("some-instance-id") - ); - - setTestWorkflowCallback(async (_event, step) => { - await step.sleep("sleep", 250); - return "complete"; - }); - - await binding.create({ id: "some-instance-id" }); - await waitUntilLogEvent(engineStub, InstanceEvent.WORKFLOW_SUCCESS); - - let instance = await binding.get("some-instance-id"); - let status = await instance.status(); - expect(status.status).toBe("complete"); - - // restart() aborts the old DO, gets a fresh stub, and calls attemptRestart() - // The service binding (USER_WORKFLOW) survives the abort, so no re-setup needed - await instance.restart(); - - const statusAfterRestart = await instance.status(); - expect(statusAfterRestart.status).toBe("running"); - - // Wait for the restarted workflow to complete via status polling - await vi.waitUntil( - async () => { - const s = await instance.status(); - return s.status === "complete"; - }, - { timeout: 1000 } - ); - - // Verify second run completed - instance = await binding.get("some-instance-id"); - status = await instance.status(); - expect(status.status).toBe("complete"); - }); - }); - - describe("pause()", () => { - it("should pause a running workflow", async ({ expect }) => { - const binding = createBinding(); - const engineStub = env.ENGINE.get( - env.ENGINE.idFromName("some-instance-id") - ); - - setTestWorkflowCallback(async (_event, step) => { - await step.do("long-step", async () => { - await scheduler.wait(500); - return "result-1"; - }); - // step-2 should never run because pause takes effect after long-step - await step.do("step-2", async () => "result-2"); - return "done"; - }); - - await binding.create({ id: "some-instance-id" }); - await waitUntilLogEvent(engineStub, InstanceEvent.STEP_START); - - const instance = await binding.get("some-instance-id"); - - // Pause while long-step is in flight - await instance.pause(); - - await vi.waitUntil( - async () => { - const s = await instance.status(); - return s.status === "paused"; - }, - { timeout: 2000 } - ); - - const finalStatus = await instance.status(); - expect(finalStatus.status).toBe("paused"); - }); - }); - - describe("resume()", () => { - it("should resume a paused workflow and complete it", async ({ - expect, - }) => { - const binding = createBinding(); - const engineStub = env.ENGINE.get( - env.ENGINE.idFromName("some-instance-id") - ); - - setTestWorkflowCallback(async (_event, step) => { - await step.do("long-step", async () => { - await scheduler.wait(500); - return "result-1"; - }); - await step.do("step-2", async () => "result-2"); - return "all-done"; - }); - - await binding.create({ id: "some-instance-id" }); - await waitUntilLogEvent(engineStub, InstanceEvent.STEP_START); - - const instance = await binding.get("some-instance-id"); - - // Pause while long-step is in flight - await instance.pause(); - - await vi.waitUntil( - async () => { - const s = await instance.status(); - return s.status === "paused"; - }, - { timeout: 2000 } - ); - - await instance.resume(); - - await vi.waitUntil( - async () => { - const s = await instance.status(); - return s.status === "complete"; - }, - { timeout: 3000 } - ); - - const finalStatus = await instance.status(); - expect(finalStatus.status).toBe("complete"); - expect(finalStatus.output).toBe("all-done"); - }); - - it("should cancel a pending pause when resume is called before step finishes", async ({ - expect, - }) => { - const binding = createBinding(); - const engineStub = env.ENGINE.get( - env.ENGINE.idFromName("some-instance-id") - ); - - setTestWorkflowCallback(async (_event, step) => { - await step.do("long-step", async () => { - await scheduler.wait(1000); - return "long-result"; - }); - await step.do("step-after", async () => "final-result"); - return "completed"; - }); - - await binding.create({ id: "some-instance-id" }); - await waitUntilLogEvent(engineStub, InstanceEvent.STEP_START); - - const instance = await binding.get("some-instance-id"); - - // Pause while long-step is in flight — sets WaitingForPause - await instance.pause(); - - const statusAfterPause = await instance.status(); - expect(statusAfterPause.status).toBe("waitingForPause"); - - // resume before the step finishes — this should cancel the pending pause - await instance.resume(); - - // status should go back to Running - const statusAfterResume = await instance.status(); - expect(statusAfterResume.status).toBe("running"); - - await vi.waitUntil( - async () => { - const s = await instance.status(); - return s.status === "complete"; - }, - { timeout: 3000 } - ); - - const finalStatus = await instance.status(); - expect(finalStatus.status).toBe("complete"); - expect(finalStatus.output).toBe("completed"); - }); - }); -}); diff --git a/packages/workflows-shared/tests/context.test.ts b/packages/workflows-shared/tests/context.test.ts index b2273455d753..01a4c5b1f44f 100644 --- a/packages/workflows-shared/tests/context.test.ts +++ b/packages/workflows-shared/tests/context.test.ts @@ -1,7 +1,5 @@ -import { describe, it, vi } from "vitest"; -import { InstanceEvent } from "../src"; +import { describe, it } from "vitest"; import { runWorkflow } from "./utils"; -import type { EngineLogs } from "../src/engine"; describe("Context", () => { it("should provide attempt count 1 on first successful attempt", async ({ @@ -9,26 +7,15 @@ describe("Context", () => { }) => { let receivedAttempt: number | undefined; - const engineStub = await runWorkflow( - "MOCK-INSTANCE-ID", - async (_event, step) => { - const result = await step.do("a successful step", async (ctx) => { - receivedAttempt = ctx.attempt; - return "success"; - }); - return result; - } - ); - - await vi.waitUntil( - async () => { - const logs = (await engineStub.readLogs()) as EngineLogs; - return logs.logs.some( - (val) => val.event === InstanceEvent.WORKFLOW_SUCCESS - ); - }, - { timeout: 1000 } - ); + await runWorkflow("MOCK-INSTANCE-ID", async (_event, step) => { + // TODO: remove after types are updated + // @ts-expect-error WorkflowStep types + const result = await step.do("a successful step", async (ctx) => { + receivedAttempt = ctx.attempt; + return "success"; + }); + return result; + }); expect(receivedAttempt).toBe(1); }); @@ -36,35 +23,24 @@ describe("Context", () => { it("should provide attempt count to callback", async ({ expect }) => { const receivedAttempts: number[] = []; - const engineStub = await runWorkflow( - "MOCK-INSTANCE-ID", - async (_event, step) => { - const result = await step.do( - "retrying step", - { - retries: { - limit: 2, - delay: 0, - }, + await runWorkflow("MOCK-INSTANCE-ID", async (_event, step) => { + const result = await step.do( + "retrying step", + { + retries: { + limit: 2, + delay: 0, }, - async (ctx) => { - receivedAttempts.push(ctx.attempt); - throw new Error(`Throwing`); - } - ); - return result; - } - ); - - await vi.waitUntil( - async () => { - const logs = (await engineStub.readLogs()) as EngineLogs; - return logs.logs.some( - (val) => val.event === InstanceEvent.WORKFLOW_FAILURE - ); - }, - { timeout: 1000 } - ); + }, + // TODO: remove after types are updated + // @ts-expect-error WorkflowStep types + async (ctx) => { + receivedAttempts.push(ctx.attempt); + throw new Error(`Throwing`); + } + ); + return result; + }); // Should have received attempts 1, 2, and 3 expect(receivedAttempts).toEqual([1, 2, 3]); diff --git a/packages/workflows-shared/tests/engine.test.ts b/packages/workflows-shared/tests/engine.test.ts index 9bb868686b58..3b2bbd24475b 100644 --- a/packages/workflows-shared/tests/engine.test.ts +++ b/packages/workflows-shared/tests/engine.test.ts @@ -3,67 +3,34 @@ import { env } from "cloudflare:workers"; import { NonRetryableError } from "cloudflare:workflows"; import { describe, it, vi } from "vitest"; import { DEFAULT_STEP_LIMIT, InstanceEvent, InstanceStatus } from "../src"; -import { ABORT_REASONS, isAbortError } from "../src/lib/errors"; -import { setTestWorkflowCallback } from "./test-entry"; -import { runWorkflow, setWorkflowEntrypoint } from "./utils"; +import { runWorkflow, runWorkflowDefer, setWorkflowEntrypoint } from "./utils"; import type { DatabaseInstance, DatabaseVersion, DatabaseWorkflow, EngineLogs, } from "../src/engine"; -import type { WorkflowStep } from "cloudflare:workers"; describe("Engine", () => { it("should not retry after NonRetryableError is thrown", async ({ expect, }) => { - const instanceId = "NON-RETRYABLE-ERROR"; - const engineId = env.ENGINE.idFromName(instanceId); - const engineStub = env.ENGINE.get(engineId); - - setTestWorkflowCallback(async (_event, step) => { - await step.do("should only have one retry", async () => { - throw new NonRetryableError("Should only retry once"); - }); - }); - - engineStub - .init( - 12346, - {} as DatabaseWorkflow, - {} as DatabaseVersion, - {} as DatabaseInstance, - { payload: {}, timestamp: new Date(), instanceId } - ) - .catch(() => { - // NonRetryableError aborts the DO - }); - - await vi.waitUntil( - async () => { - try { - const logs = (await env.ENGINE.get( - engineId - ).readLogs()) as EngineLogs; - return logs.logs.some( - (val) => val.event === InstanceEvent.WORKFLOW_FAILURE - ); - } catch (e) { - // DO may still be aborting — retry - if (isAbortError(e)) { - return false; - } - throw e; - } - }, - { timeout: 2000 } + const engineStub = await runWorkflow( + "MOCK-INSTANCE-ID", + async (event, step) => { + await step.do("should only have one retry", async () => { + throw new NonRetryableError("Should only retry once"); + }); + } ); - const logs = (await env.ENGINE.get(engineId).readLogs()) as EngineLogs; + const logs = (await engineStub.readLogs()) as EngineLogs; expect( - logs.logs.filter((val) => val.event === InstanceEvent.ATTEMPT_START) + logs.logs.some((val) => val.event == InstanceEvent.WORKFLOW_FAILURE) + ).toBe(true); + expect( + logs.logs.filter((val) => val.event == InstanceEvent.ATTEMPT_START) ).toHaveLength(1); }); @@ -72,7 +39,7 @@ describe("Engine", () => { }) => { const engineStub = await runWorkflow( "MOCK-INSTANCE-ID", - async (_event, step) => { + async (event, step) => { try { await step.do( "always errors out", @@ -91,16 +58,6 @@ describe("Engine", () => { } ); - await vi.waitUntil( - async () => { - const logs = (await engineStub.readLogs()) as EngineLogs; - return logs.logs.some( - (val) => val.event == InstanceEvent.WORKFLOW_SUCCESS - ); - }, - { timeout: 1000 } - ); - const logs = (await engineStub.readLogs()) as EngineLogs; expect( @@ -113,7 +70,7 @@ describe("Engine", () => { }); it("waitForEvent should receive events while active", async () => { - const engineStub = await runWorkflow( + const engineStub = await runWorkflowDefer( "MOCK-INSTANCE-ID", async (_, step) => { return await step.waitForEvent("i'm a event!", { @@ -143,7 +100,7 @@ describe("Engine", () => { }); it("waitForEvent should receive events even if not active", async () => { - const engineStub = await runWorkflow( + const engineStub = await runWorkflowDefer( "MOCK-INSTANCE-ID", async (_, step) => { return await step.waitForEvent("i'm a event!", { @@ -225,18 +182,11 @@ describe("Engine", () => { return "test"; }); - try { - await runInDurableObject(engineStub, async (engine) => { - await engine.init(accountId, workflow, version, instance, event); - await engine.setStatus(accountId, instanceId, InstanceStatus.Running); - await engine.abort(ABORT_REASONS.GRACE_PERIOD_COMPLETE); - }); - } catch (e) { - // Expected - abort throws to break the DO - if (!isAbortError(e)) { - throw e; - } - } + await runInDurableObject(engineStub, async (engine) => { + await engine.init(accountId, workflow, version, instance, event); + await engine.setStatus(accountId, instanceId, InstanceStatus.Running); + await engine.abort("kaboom"); + }); const engineId = env.ENGINE.idFromName(instanceId); const restartedStub = env.ENGINE.get(engineId); @@ -365,556 +315,5 @@ describe("Engine", () => { expect(stepLimit).toBe(DEFAULT_STEP_LIMIT); }); - it.for([ - InstanceStatus.Complete, - InstanceStatus.Errored, - InstanceStatus.Terminated, - ])( - "should throw when calling terminate on instance in finite state: %s", - async (finiteStatus, { expect }) => { - const engineStub = await runWorkflow( - `TERMINATE-${finiteStatus}-INSTANCE`, - async () => "done" - ); - - await vi.waitUntil( - async () => { - const status = await runInDurableObject(engineStub, (engine) => - engine.getStatus() - ); - return status === InstanceStatus.Complete; - }, - { timeout: 1000 } - ); - - // If not Complete, manually set the status - if (finiteStatus !== InstanceStatus.Complete) { - await runInDurableObject(engineStub, async (_engine, state) => { - await state.storage.put("ENGINE_STATUS", finiteStatus); - }); - } - - await expect( - runInDurableObject(engineStub, async (engine) => { - await engine.changeInstanceStatus("terminate"); - }) - ).rejects.toThrow( - "Cannot terminate instance since its on a finite state" - ); - } - ); - - it.for([ - InstanceStatus.Complete, - InstanceStatus.Errored, - InstanceStatus.Terminated, - InstanceStatus.Running, - InstanceStatus.Paused, - ])( - "should restart workflow from status: %s", - async (initialStatus, { expect }) => { - const instanceId = `RESTART-${initialStatus}-INSTANCE`; - const engineId = env.ENGINE.idFromName(instanceId); - const engineStub = env.ENGINE.get(engineId); - - const workflowCallback = async ( - _event: unknown, - step: WorkflowStep - ): Promise => { - await step.do("test-step", async () => "step-result"); - return "done"; - }; - - setTestWorkflowCallback(workflowCallback); - - await runInDurableObject(engineStub, async (engine) => { - await engine.init( - 12346, - {} as DatabaseWorkflow, - {} as DatabaseVersion, - {} as DatabaseInstance, - { - payload: {}, - timestamp: new Date(), - instanceId, - } - ); - }); - - await vi.waitUntil( - async () => { - const status = await runInDurableObject(engineStub, (engine) => - engine.getStatus() - ); - return status === InstanceStatus.Complete; - }, - { timeout: 1000 } - ); - - // Set the status to initalStatus - await runInDurableObject(engineStub, async (_engine, state) => { - await state.storage.put("ENGINE_STATUS", initialStatus); - }); - - try { - await runInDurableObject(engineStub, async (engine) => { - await engine.changeInstanceStatus("restart"); - }); - } catch (e) { - // Expected - abort throws to break the DO - if (!isAbortError(e)) { - throw e; - } - } - - const restartedStub = env.ENGINE.get(engineId); - - await runInDurableObject(restartedStub, async (engine) => { - await engine.attemptRestart(); - }); - - await vi.waitUntil( - async () => { - const status = await runInDurableObject(restartedStub, (engine) => - engine.getStatus() - ); - return status === InstanceStatus.Complete; - }, - { timeout: 1000 } - ); - - // Verify the workflow ran again by checking logs - const logs = (await restartedStub.readLogs()) as EngineLogs; - - expect( - logs.logs.some((log) => log.event === InstanceEvent.WORKFLOW_START) - ).toBe(true); - - expect( - logs.logs.some((log) => log.event === InstanceEvent.STEP_START) - ).toBe(true); - - expect( - logs.logs.some((log) => log.event === InstanceEvent.WORKFLOW_SUCCESS) - ).toBe(true); - } - ); - - it("should pause after in-flight step.do finishes", async ({ expect }) => { - const instanceId = "PAUSE-AFTER-DO"; - const engineId = env.ENGINE.idFromName(instanceId); - const engineStub = env.ENGINE.get(engineId); - - setTestWorkflowCallback(async (_event, step) => { - await step.do("long-step", async () => { - await scheduler.wait(500); - return "first"; - }); - // step-2 should never run because pause takes effect after long-step - await step.do("step-2", async () => "second"); - return "done"; - }); - - await runInDurableObject(engineStub, async (engine) => { - // Fire-and-forget: catch to prevent unhandled rejections on - // abort or runtime teardown after the test completes - engine - .init( - 12346, - {} as DatabaseWorkflow, - {} as DatabaseVersion, - { id: instanceId } as DatabaseInstance, - { payload: {}, timestamp: new Date(), instanceId } - ) - .catch(() => {}); - }); - - // Wait for long-step to start - await vi.waitUntil( - async () => { - return await runInDurableObject(engineStub, (engine) => { - const logs = engine.readLogs() as unknown as EngineLogs; - return logs.logs.some( - (log) => log.event === InstanceEvent.STEP_START - ); - }); - }, - { timeout: 1000 } - ); - - // Request pause while long-step is in flight. - // The pause fires this.ctx.abort() which breaks the DO's output gate — - // any runInDurableObject call (including the ones below) can throw the - // abort error while the DO is tearing down, so we must catch it. - try { - await runInDurableObject(engineStub, async (engine) => { - await engine.changeInstanceStatus("pause"); - }); - } catch (e) { - if (!isAbortError(e)) { - throw e; - } - } - - await vi.waitUntil( - async () => { - try { - return await runInDurableObject( - env.ENGINE.get(engineId), - async (engine) => - (await engine.getStatus()) === InstanceStatus.Paused - ); - } catch (e) { - if (isAbortError(e)) { - return false; - } - throw e; - } - }, - { timeout: 2000 } - ); - - const freshStub = env.ENGINE.get(engineId); - const finalStatus = await runInDurableObject(freshStub, (engine) => - engine.getStatus() - ); - expect(finalStatus).toBe(InstanceStatus.Paused); - - // Verify long-step completed but step-2 never ran - const logs = await runInDurableObject(freshStub, (engine) => { - return engine.readLogs() as unknown as EngineLogs; - }); - const stepSuccesses = logs.logs.filter( - (log) => log.event === InstanceEvent.STEP_SUCCESS - ); - expect(stepSuccesses).toHaveLength(1); - }); - - it("should pause after multiple concurrent in-flight step.dos finish", async ({ - expect, - }) => { - const instanceId = "PAUSE-AFTER-CONCURRENT-DOS"; - const engineId = env.ENGINE.idFromName(instanceId); - const engineStub = env.ENGINE.get(engineId); - - setTestWorkflowCallback(async (_event, step) => { - const [resultA, resultB] = await Promise.all([ - step.do("slow-step-a", async () => { - await scheduler.wait(500); - return "a-done"; - }), - step.do("slow-step-b", async () => { - await scheduler.wait(500); - return "b-done"; - }), - ]); - - // This step should never run - await step.do("step-after-pause", async () => "should-not-run"); - return { resultA, resultB }; - }); - - await runInDurableObject(engineStub, async (engine) => { - engine - .init( - 12346, - {} as DatabaseWorkflow, - {} as DatabaseVersion, - { id: instanceId } as DatabaseInstance, - { payload: {}, timestamp: new Date(), instanceId } - ) - .catch(() => {}); - }); - - await vi.waitUntil( - async () => { - return await runInDurableObject(engineStub, (engine) => { - const logs = engine.readLogs() as unknown as EngineLogs; - return ( - logs.logs.filter((log) => log.event === InstanceEvent.STEP_START) - .length >= 2 - ); - }); - }, - { timeout: 1000 } - ); - - // Request pause while both slow steps are in flight - try { - await runInDurableObject(engineStub, async (engine) => { - await engine.changeInstanceStatus("pause"); - }); - } catch (e) { - if (!isAbortError(e)) { - throw e; - } - } - - await vi.waitUntil( - async () => { - try { - return await runInDurableObject( - env.ENGINE.get(engineId), - async (engine) => - (await engine.getStatus()) === InstanceStatus.Paused - ); - } catch (e) { - if (isAbortError(e)) { - return false; - } - throw e; - } - }, - { timeout: 2000 } - ); - - const freshStub = env.ENGINE.get(engineId); - const finalStatus = await runInDurableObject(freshStub, (engine) => - engine.getStatus() - ); - expect(finalStatus).toBe(InstanceStatus.Paused); - - // Both concurrent steps should have completed, but step-after-pause should not - const logs = await runInDurableObject(freshStub, (engine) => { - return engine.readLogs() as unknown as EngineLogs; - }); - const stepSuccesses = logs.logs.filter( - (log) => log.event === InstanceEvent.STEP_SUCCESS - ); - expect(stepSuccesses).toHaveLength(2); - }); - - it("should unblock concurrent steps blocked when resume cancels pending pause", async ({ - expect, - }) => { - const instanceId = "RESUME-UNBLOCKS-WAITING-STEPS"; - const engineId = env.ENGINE.idFromName(instanceId); - const engineStub = env.ENGINE.get(engineId); - - setTestWorkflowCallback(async (_event, step) => { - const [slowResult, fastResult] = await Promise.all([ - step.do("slow-step", async () => { - await scheduler.wait(1000); - return "slow-done"; - }), - step.do("fast-step", async () => { - return "fast-done"; - }), - ]); - - await step.do("step-after-resume", async () => "after-resume"); - return { slowResult, fastResult }; - }); - - await runInDurableObject(engineStub, async (engine) => { - engine - .init( - 12346, - {} as DatabaseWorkflow, - {} as DatabaseVersion, - { id: instanceId } as DatabaseInstance, - { payload: {}, timestamp: new Date(), instanceId } - ) - .catch(() => {}); - }); - - await vi.waitUntil( - async () => { - return await runInDurableObject(engineStub, (engine) => { - const logs = engine.readLogs() as unknown as EngineLogs; - return logs.logs.some( - (log) => log.event === InstanceEvent.STEP_START - ); - }); - }, - { timeout: 1000 } - ); - - await runInDurableObject(engineStub, async (engine) => { - await engine.changeInstanceStatus("pause"); - }); - - await vi.waitUntil( - async () => - runInDurableObject( - env.ENGINE.get(engineId), - async (engine) => - (await engine.getStatus()) === InstanceStatus.WaitingForPause - ), - { timeout: 1000 } - ); - - // Resume before slow-step finishes - await runInDurableObject(engineStub, async (engine) => { - await engine.changeInstanceStatus("resume"); - }); - - // Verify status goes back to Running - const statusAfterResume = await runInDurableObject( - env.ENGINE.get(engineId), - (engine) => engine.getStatus() - ); - expect(statusAfterResume).toBe(InstanceStatus.Running); - - await vi.waitUntil( - async () => - runInDurableObject( - env.ENGINE.get(engineId), - async (engine) => - (await engine.getStatus()) === InstanceStatus.Complete - ), - { timeout: 5000 } - ); - - const freshStub = env.ENGINE.get(engineId); - const finalStatus = await runInDurableObject(freshStub, (engine) => - engine.getStatus() - ); - expect(finalStatus).toBe(InstanceStatus.Complete); - - // All three steps should have completed - const logs = await runInDurableObject(freshStub, (engine) => { - return engine.readLogs() as unknown as EngineLogs; - }); - const stepSuccesses = logs.logs.filter( - (log) => log.event === InstanceEvent.STEP_SUCCESS - ); - expect(stepSuccesses).toHaveLength(3); - }); - - it("should pause immediately during a step.sleep", async ({ expect }) => { - const instanceId = "PAUSE-DURING-SLEEP"; - const engineId = env.ENGINE.idFromName(instanceId); - const engineStub = env.ENGINE.get(engineId); - - setTestWorkflowCallback(async (_event, step) => { - await step.do("first-step", async () => "first-done"); - - await step.sleep("long-sleep", "10 seconds"); - - await step.do("after-sleep", async () => "should-not-run"); - return "done"; - }); - - await runInDurableObject(engineStub, async (engine) => { - engine - .init( - 12346, - {} as DatabaseWorkflow, - {} as DatabaseVersion, - { id: instanceId } as DatabaseInstance, - { payload: {}, timestamp: new Date(), instanceId } - ) - .catch(() => {}); - }); - - // Wait for the first step to complete (workflow is now in the sleep) - await vi.waitUntil( - async () => { - return await runInDurableObject(engineStub, (engine) => { - const logs = engine.readLogs() as unknown as EngineLogs; - return logs.logs.some( - (log) => log.event === InstanceEvent.STEP_SUCCESS - ); - }); - }, - { timeout: 1000 } - ); - - // Request pause while in step.sleep — should pause immediately - await runInDurableObject(engineStub, async (engine) => { - await engine.changeInstanceStatus("pause"); - }); - - await vi.waitUntil( - async () => - runInDurableObject( - env.ENGINE.get(engineId), - async (engine) => - (await engine.getStatus()) === InstanceStatus.Paused - ), - { timeout: 1000 } - ); - - const freshStub = env.ENGINE.get(engineId); - const finalStatus = await runInDurableObject(freshStub, (engine) => - engine.getStatus() - ); - expect(finalStatus).toBe(InstanceStatus.Paused); - - // Only the first step should have succeeded — sleep was interrupted - const logs = await runInDurableObject(freshStub, (engine) => { - return engine.readLogs() as unknown as EngineLogs; - }); - const stepSuccesses = logs.logs.filter( - (log) => log.event === InstanceEvent.STEP_SUCCESS - ); - expect(stepSuccesses).toHaveLength(1); - }); - - it("should transition WaitingForPause to Paused on init() entry", async ({ - expect, - }) => { - const instanceId = "WAITING-FOR-PAUSE-INIT"; - const engineId = env.ENGINE.idFromName(instanceId); - const engineStub = env.ENGINE.get(engineId); - - setTestWorkflowCallback(async () => "done"); - - await runInDurableObject(engineStub, async (engine) => { - await engine.init( - 12346, - {} as DatabaseWorkflow, - {} as DatabaseVersion, - { id: instanceId } as DatabaseInstance, - { - payload: {}, - timestamp: new Date(), - instanceId, - } - ); - }); - - // Wait for workflow to complete first - await vi.waitUntil( - async () => { - const status = await runInDurableObject(engineStub, (engine) => - engine.getStatus() - ); - return status === InstanceStatus.Complete; - }, - { timeout: 1000 } - ); - - // Manually set status to WaitingForPause (simulating a DO restart scenario) - await runInDurableObject(engineStub, async (_engine, state) => { - await state.storage.put( - "ENGINE_STATUS", - InstanceStatus.WaitingForPause - ); - }); - - // Now call init() — it should detect WaitingForPause and transition to Paused - await runInDurableObject(engineStub, async (engine) => { - // Reset isRunning so init() doesn't short-circuit - engine.isRunning = false; - await engine.init( - 12346, - {} as DatabaseWorkflow, - {} as DatabaseVersion, - { id: instanceId } as DatabaseInstance, - { - payload: {}, - timestamp: new Date(), - instanceId, - } - ); - }); - - const status = await runInDurableObject(engineStub, (engine) => - engine.getStatus() - ); - expect(status).toBe(InstanceStatus.Paused); - }); }); }); diff --git a/packages/workflows-shared/tests/env.d.ts b/packages/workflows-shared/tests/env.d.ts index 8747a7f12a2b..38fe220a9c40 100644 --- a/packages/workflows-shared/tests/env.d.ts +++ b/packages/workflows-shared/tests/env.d.ts @@ -2,7 +2,7 @@ declare namespace Cloudflare { interface Env { - ENGINE: DurableObject; + ENGINE: DurableObjectNamespace; USER_WORKFLOW: import("cloudflare:workers").WorkflowEntrypoint; } } diff --git a/packages/workflows-shared/tests/test-entry.ts b/packages/workflows-shared/tests/test-entry.ts deleted file mode 100644 index 05ec9420c468..000000000000 --- a/packages/workflows-shared/tests/test-entry.ts +++ /dev/null @@ -1,51 +0,0 @@ -import { WorkerEntrypoint } from "cloudflare:workers"; -import type { WorkflowEvent, WorkflowStep } from "cloudflare:workers"; - -// Test entry point — re-exports everything from src/index.ts and adds -// a TestWorkflow class that can be bound as USER_WORKFLOW via serviceBindings. -// This allows the workflow entrypoint to survive DO aborts (unlike the old -// setWorkflowEntrypoint pattern which manually mutated instance.env). -// -// NOTE: We extend WorkerEntrypoint (not WorkflowEntrypoint) because workerd -// only recognises WorkerEntrypoint subclasses for service-binding RPC. -// WorkflowEntrypoint is a higher-level abstraction used by the Workflows -// platform; for our test harness the engine just needs a target with a -// callable run() method. - -export * from "../src/index"; - -type WorkflowCallback = ( - event: unknown, - step: WorkflowStep -) => Promise; - -let __testWorkflowCallback: WorkflowCallback | undefined; - -/** - * Set the workflow callback that TestWorkflow.run() will delegate to. - * Call this before creating or restarting a workflow instance in tests. - */ -export function setTestWorkflowCallback( - cb: WorkflowCallback | undefined -): void { - __testWorkflowCallback = cb; -} - -/** - * A WorkerEntrypoint subclass for tests that delegates run() to a - * module-level callback. Configured as the USER_WORKFLOW service binding - * in vitest.config.ts so it survives DO aborts (unlike manual env injection). - */ -export class TestWorkflow extends WorkerEntrypoint { - async run( - event: Readonly>, - step: WorkflowStep - ): Promise { - if (!__testWorkflowCallback) { - throw new Error( - "TestWorkflow callback not set — call setTestWorkflowCallback() before running the workflow" - ); - } - return await __testWorkflowCallback(event, step); - } -} diff --git a/packages/workflows-shared/tests/tsconfig.json b/packages/workflows-shared/tests/tsconfig.json index 597835e56ca3..ce1b88332495 100644 --- a/packages/workflows-shared/tests/tsconfig.json +++ b/packages/workflows-shared/tests/tsconfig.json @@ -1,8 +1,11 @@ { - "extends": "@cloudflare/workers-tsconfig/tsconfig.json", + "extends": "../tsconfig.json", "compilerOptions": { - "moduleResolution": "bundler", - "types": ["@cloudflare/workers-types", "@cloudflare/vitest-pool-workers"] + "types": [ + "@cloudflare/workers-types/experimental", + "@cloudflare/vitest-pool-workers" + ] }, - "include": ["./**/*.ts"] + "include": ["./**/*.ts"], + "exclude": [] } diff --git a/packages/workflows-shared/tests/utils.ts b/packages/workflows-shared/tests/utils.ts index 0d7312c0b919..4e58933d5b0c 100644 --- a/packages/workflows-shared/tests/utils.ts +++ b/packages/workflows-shared/tests/utils.ts @@ -3,7 +3,6 @@ import { env, runInDurableObject, } from "cloudflare:test"; -import { setTestWorkflowCallback } from "./test-entry"; import type { DatabaseInstance, DatabaseVersion, @@ -44,9 +43,9 @@ export async function runWorkflow( const engineId = env.ENGINE.idFromName(instanceId); const engineStub = env.ENGINE.get(engineId); - setTestWorkflowCallback(callback); + await setWorkflowEntrypoint(engineStub, callback); - void engineStub.init( + await engineStub.init( 12346, {} as DatabaseWorkflow, {} as DatabaseVersion, diff --git a/packages/workflows-shared/tests/validators.test.ts b/packages/workflows-shared/tests/validators.test.ts index a39f5aff7fc4..c13820699098 100644 --- a/packages/workflows-shared/tests/validators.test.ts +++ b/packages/workflows-shared/tests/validators.test.ts @@ -1,6 +1,5 @@ import { describe, it } from "vitest"; import { - isValidStepConfig, isValidStepName, isValidWorkflowInstanceId, isValidWorkflowName, @@ -74,34 +73,3 @@ describe("Workflow instance step name validation", () => { expect(isValidStepName(value as string)).toBe(true); }); }); - -describe("Workflow step config validation", () => { - it.for([ - { timeout: "5 years", retries: { limit: 1 } }, - { timeout: "5 years", retries: { delay: 50 } }, - { timeout: "5 years", retries: { backoff: "exponential" } }, - { - timeout: "5 years", - retries: { - delay: "10 minutes", - limit: 5, - "i-like-trains": "yes".repeat(100), - }, - }, - ])("should reject invalid step configs", (value, { expect }) => { - expect(isValidStepConfig(value)).toBe(false); - }); - - it.for([ - { - retries: { limit: 0, delay: 100000, backoff: "exponential" }, - timeout: "15 minutes", - }, - { - retries: { limit: 5, delay: 0, backoff: "constant" }, - timeout: "2 minutes", - }, - ])("should accept valid step configs", (value, { expect }) => { - expect(isValidStepConfig(value)).toBe(true); - }); -}); diff --git a/packages/workflows-shared/tsconfig.json b/packages/workflows-shared/tsconfig.json index 14c7f74e0ea1..4480d416cf58 100644 --- a/packages/workflows-shared/tsconfig.json +++ b/packages/workflows-shared/tsconfig.json @@ -5,7 +5,7 @@ "lib": ["es2022"], "module": "esnext", "moduleResolution": "bundler", - "types": ["@cloudflare/workers-types"], + "types": ["@cloudflare/workers-types/experimental"], "noEmit": true, "isolatedModules": true, "allowSyntheticDefaultImports": true, diff --git a/packages/workflows-shared/vitest.config.ts b/packages/workflows-shared/vitest.config.ts index 9bea2794d9cc..c124f987a1ba 100644 --- a/packages/workflows-shared/vitest.config.ts +++ b/packages/workflows-shared/vitest.config.ts @@ -1,29 +1,18 @@ import { defineWorkersProject } from "@cloudflare/vitest-pool-workers/config"; -// Use the well-known symbol for kCurrentWorker (defined in miniflare) -// so we can bind TestWorkflow as USER_WORKFLOW on the current worker. -const kCurrentWorker = Symbol.for("miniflare.kCurrentWorker"); - export default defineWorkersProject({ test: { poolOptions: { workers: { - main: "tests/test-entry.ts", + main: "src/index.ts", miniflare: { compatibilityDate: "2025-02-04", - compatibilityFlags: ["service_binding_extra_handlers"], durableObjects: { ENGINE: { className: "Engine", useSQLite: true, }, }, - serviceBindings: { - USER_WORKFLOW: { - name: kCurrentWorker as unknown as string, - entrypoint: "TestWorkflow", - }, - }, }, }, },