Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions .changeset/workflows-instance-methods.md

This file was deleted.

5 changes: 0 additions & 5 deletions .changeset/workflows-vitest-pool-waitforstatus.md

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { env, introspectWorkflow, SELF } from "cloudflare:test";
import { describe, it } from "vitest";
import { it } from "vitest";

const STATUS_COMPLETE = "complete";
const STEP_NAME = "AI content scan";
Expand Down Expand Up @@ -70,96 +70,3 @@ it("workflow batch should be able to reach the end and be successful", async ({
await introspector.dispose();
}
});

describe("workflow instance lifecycle methods", () => {
it("should terminate a workflow instance", async ({ expect }) => {
// CONFIG:
await using introspector = await introspectWorkflow(env.MODERATOR);
await introspector.modifyAll(async (m) => {
await m.disableSleeps();
await m.mockStepResult({ name: STEP_NAME }, { violationScore: 50 });
});

const res = await SELF.fetch("https://mock-worker.local/moderate");
const data = (await res.json()) as { id: string; details: unknown };

const instances = introspector.get();
expect(instances.length).toBe(1);
const instance = instances[0];

expect(await instance.waitForStepResult({ name: STEP_NAME })).toEqual({
violationScore: 50,
});

const handle = await env.MODERATOR.get(data.id);
await handle.terminate();

// ASSERTIONS:
await expect(instance.waitForStatus("terminated")).resolves.not.toThrow();

// DISPOSE: ensured by `await using`
});

it("should restart a workflow instance", async ({ expect }) => {
// CONFIG:
await using introspector = await introspectWorkflow(env.MODERATOR);
await introspector.modifyAll(async (m) => {
await m.disableSleeps();
await m.mockStepResult({ name: STEP_NAME }, { violationScore: 50 });
await m.mockEvent({
type: "moderation-decision",
payload: { moderatorAction: "approve" },
});
});

const res = await SELF.fetch("https://mock-worker.local/moderate");
const data = (await res.json()) as { id: string; details: unknown };

const instances = introspector.get();
expect(instances.length).toBe(1);
const instance = instances[0];

expect(await instance.waitForStepResult({ name: STEP_NAME })).toEqual({
violationScore: 50,
});

const handle = await env.MODERATOR.get(data.id);
await handle.restart();

// Mocks survive instace restart, so the restarted workflow re-runs
// with the same config
await expect(
instance.waitForStatus(STATUS_COMPLETE)
).resolves.not.toThrow();

// DISPOSE: ensured by `await using`
});

it("should pause a workflow instance", async ({ expect }) => {
// CONFIG:
await using introspector = await introspectWorkflow(env.MODERATOR);
await introspector.modifyAll(async (m) => {
await m.disableSleeps();
await m.mockStepResult({ name: STEP_NAME }, { violationScore: 50 });
});

const res = await SELF.fetch("https://mock-worker.local/moderate");
const data = (await res.json()) as { id: string; details: unknown };

const instances = introspector.get();
expect(instances.length).toBe(1);
const instance = instances[0];

expect(await instance.waitForStepResult({ name: STEP_NAME })).toEqual({
violationScore: 50,
});

const handle = await env.MODERATOR.get(data.id);
await handle.pause();

// ASSERTIONS:
await expect(instance.waitForStatus("paused")).resolves.not.toThrow();

// DISPOSE: ensured by `await using`
});
});
53 changes: 2 additions & 51 deletions fixtures/workflow-multiple/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,32 +57,9 @@ export class Demo2 extends WorkflowEntrypoint<{}, Params> {
}
}

export class Demo3 extends WorkflowEntrypoint<{}, Params> {
async run(event: WorkflowEvent<Params>, step: WorkflowStep) {
const result = await step.do("First step", async function () {
return {
output: "First step result",
};
});

await step.waitForEvent("wait for signal", {
type: "continue",
});

const result2 = await step.do("Second step", async function () {
return {
output: "workflow3",
};
});

return "i'm workflow3";
}
}

type Env = {
WORKFLOW: Workflow;
WORKFLOW2: Workflow;
WORKFLOW3: Workflow;
};

export default class extends WorkerEntrypoint<Env> {
Expand All @@ -94,15 +71,8 @@ export default class extends WorkerEntrypoint<Env> {
if (url.pathname === "/favicon.ico") {
return new Response(null, { status: 404 });
}

let workflowToUse: Workflow;
if (workflowName === "3") {
workflowToUse = this.env.WORKFLOW3;
} else if (workflowName === "2") {
workflowToUse = this.env.WORKFLOW2;
} else {
workflowToUse = this.env.WORKFLOW;
}
let workflowToUse =
workflowName == "2" ? this.env.WORKFLOW2 : this.env.WORKFLOW;

let handle: WorkflowInstance;
if (url.pathname === "/create") {
Expand All @@ -111,25 +81,6 @@ export default class extends WorkerEntrypoint<Env> {
} else {
handle = await workflowToUse.create({ id });
}
} else if (url.pathname === "/pause") {
handle = await workflowToUse.get(id);
await handle.pause();
} else if (url.pathname === "/resume") {
handle = await workflowToUse.get(id);
await handle.resume();
} else if (url.pathname === "/restart") {
handle = await workflowToUse.get(id);
await handle.restart();
} else if (url.pathname === "/terminate") {
handle = await workflowToUse.get(id);
await handle.terminate();
} else if (url.pathname === "/sendEvent") {
handle = await workflowToUse.get(id);
await handle.sendEvent({
type: "continue",
payload: await req.json(),
});
return Response.json({ ok: true });
} else {
handle = await workflowToUse.get(id);
}
Expand Down
153 changes: 2 additions & 151 deletions fixtures/workflow-multiple/tests/index.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { randomUUID } from "crypto";
import { rm } from "fs/promises";
import { resolve } from "path";
import { fetch } from "undici";
import { afterAll, beforeAll, describe, it, test, vi } from "vitest";
import { afterAll, beforeAll, describe, it, vi } from "vitest";
import { runWranglerDev } from "../../shared/src/run-wrangler-long-lived";

describe("Workflows", () => {
Expand All @@ -27,13 +26,11 @@ describe("Workflows", () => {
await stop?.();
});

async function fetchJson(url: string, body?: unknown, method?: string) {
async function fetchJson(url: string) {
const response = await fetch(url, {
headers: {
"MF-Disable-Pretty-Error": "1",
},
method: method ?? "GET",
body: body !== undefined ? JSON.stringify(body) : undefined,
});
const text = await response.text();

Expand Down Expand Up @@ -95,150 +92,4 @@ describe("Workflows", () => {
),
]);
});

describe("instance lifecycle methods (workflow3)", () => {
test("pause and resume a workflow", async ({ expect }) => {
const id = randomUUID();

await fetchJson(`http://${ip}:${port}/create?workflowName=3&id=${id}`);

await vi.waitFor(
async () => {
const result = (await fetchJson(
`http://${ip}:${port}/status?workflowName=3&id=${id}`
)) as {
status: {
__LOCAL_DEV_STEP_OUTPUTS: { output: string }[];
};
};
expect(result.status.__LOCAL_DEV_STEP_OUTPUTS).toContainEqual({
output: "First step result",
});
},
{ timeout: 5000 }
);

// Pause the instance
await fetchJson(`http://${ip}:${port}/pause?workflowName=3&id=${id}`);

await vi.waitFor(
async () => {
const result = (await fetchJson(
`http://${ip}:${port}/status?workflowName=3&id=${id}`
)) as { status: { status: string } };
expect(result.status.status).toBe("paused");
},
{ timeout: 5000 }
);

// Resume the instance
await fetchJson(`http://${ip}:${port}/resume?workflowName=3&id=${id}`);

await fetchJson(
`http://${ip}:${port}/sendEvent?workflowName=3&id=${id}`,
{ done: true },
"POST"
);

await vi.waitFor(
async () => {
const result = (await fetchJson(
`http://${ip}:${port}/status?workflowName=3&id=${id}`
)) as { status: { status: string; output: string } };
expect(result.status.status).toBe("complete");
expect(result.status.output).toBe("i'm workflow3");
},
{ timeout: 5000 }
);
});

test("terminate a running workflow", async ({ expect }) => {
const id = randomUUID();

await fetchJson(`http://${ip}:${port}/create?workflowName=3&id=${id}`);

await vi.waitFor(
async () => {
const result = (await fetchJson(
`http://${ip}:${port}/status?workflowName=3&id=${id}`
)) as {
status: {
__LOCAL_DEV_STEP_OUTPUTS: { output: string }[];
};
};
expect(result.status.__LOCAL_DEV_STEP_OUTPUTS).toContainEqual({
output: "First step result",
});
},
{ timeout: 5000 }
);

// Terminate
await fetchJson(`http://${ip}:${port}/terminate?workflowName=3&id=${id}`);

await vi.waitFor(
async () => {
const result = (await fetchJson(
`http://${ip}:${port}/status?workflowName=3&id=${id}`
)) as { status: { status: string } };
expect(result.status.status).toBe("terminated");
},
{ timeout: 5000 }
);
});

test("restart a running workflow", async ({ expect }) => {
const id = randomUUID();

await fetchJson(`http://${ip}:${port}/create?workflowName=3&id=${id}`);

await vi.waitFor(
async () => {
const result = (await fetchJson(
`http://${ip}:${port}/status?workflowName=3&id=${id}`
)) as {
status: {
__LOCAL_DEV_STEP_OUTPUTS: { output: string }[];
};
};
expect(result.status.__LOCAL_DEV_STEP_OUTPUTS).toContainEqual({
output: "First step result",
});
},
{ timeout: 5000 }
);

// Restart the instance
await fetchJson(`http://${ip}:${port}/restart?workflowName=3&id=${id}`);

// After restart, wait for it to be running again
await vi.waitFor(
async () => {
const result = (await fetchJson(
`http://${ip}:${port}/status?workflowName=3&id=${id}`
)) as { status: { status: string } };
expect(result.status.status).toBe("running");
},
{ timeout: 5000 }
);

// Send event to complete the restarted workflow
await fetchJson(
`http://${ip}:${port}/sendEvent?workflowName=3&id=${id}`,
{ done: true },
"POST"
);

await vi.waitFor(
async () => {
const result = (await fetchJson(
`http://${ip}:${port}/status?workflowName=3&id=${id}`
)) as { status: { status: string; output: string } };
expect(result.status.status).toBe("complete");
expect(result.status.output).toBe("i'm workflow3");
},
{ timeout: 5000 }
);
});
});
});
5 changes: 0 additions & 5 deletions fixtures/workflow-multiple/wrangler.jsonc
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,5 @@
"name": "my-workflow-2",
"class_name": "Demo2",
},
{
"binding": "WORKFLOW3",
"name": "my-workflow-3",
"class_name": "Demo3",
},
],
}
Loading
Loading