Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
76902cf
docs: changelog entry for Workflow Attributes MVP plan
VaguelySerious May 22, 2026
6caf143
feat(attributes): MVP implementation across SDK + world adapters
VaguelySerious May 22, 2026
c912dd7
fix(web-shared): add attribute-panel renderer for run attributes
VaguelySerious May 22, 2026
bff53a6
fix(core): split setAttributes for workflow VM bundle
VaguelySerious May 22, 2026
e258aa4
ci: retrigger
VaguelySerious May 22, 2026
0babcb5
fix(core): restrict setAttributes to step body in V5 MVP
VaguelySerious May 22, 2026
e64d634
Fix: Comment in set-attributes-shared.ts falsely claims workflow-body…
vercel[bot] May 23, 2026
4a8d5ba
doc
VaguelySerious May 25, 2026
13d698b
chore(world-vercel): point URL override at attributes-mvp preview
VaguelySerious May 25, 2026
83f4ad3
feat(attributes): support setAttributes from workflow body via builti…
VaguelySerious May 25, 2026
5083bb1
chore(core): correct stale comment in set-attributes-shared
VaguelySerious May 25, 2026
41ed14a
fix(workflow): hide step bridge specifier from deferred-entries disco…
VaguelySerious May 25, 2026
5972c4f
comments
VaguelySerious May 25, 2026
e25730f
comments
VaguelySerious May 25, 2026
860140b
revert: restore static specifier for __builtin_set_attributes step br…
VaguelySerious May 25, 2026
fb35e10
refactor(attributes): collapse setAttributes to workflow-body-only si…
VaguelySerious May 25, 2026
7cc364a
fix(next): replace base64 sourcemap regex with string scan
VaguelySerious May 27, 2026
85ecadc
attributes: address review feedback — rename, world fixes, warn-once,…
VaguelySerious May 27, 2026
2d5099b
fix(world-local): preserve discriminated-union narrowing in writeRunU…
VaguelySerious May 27, 2026
284e90d
attributes: revert preview URL, add fire-and-forget/Promise.all/throw…
pranaygp May 27, 2026
6ab1a11
fix(core): drain queues pending step messages, not just creates step_…
pranaygp May 27, 2026
6a5ef86
attributes: skip fire-and-forget last-call-before-return test as .tod…
pranaygp May 27, 2026
9be88c3
docs(attributes): mark new usage-pattern snippets as skip-typecheck
pranaygp May 27, 2026
f66576b
attributes: add { allowReservedAttributes } opt-in for framework callers
pranaygp May 28, 2026
4edc59c
Apply suggestion from @VaguelySerious
VaguelySerious May 28, 2026
45db5ae
fix(workflow): drop failed internal attribute writes
VaguelySerious May 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .changeset/attributes-mvp-plan.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
'@workflow/core': patch
'@workflow/world': patch
'@workflow/world-local': patch
'@workflow/world-postgres': patch
'@workflow/world-vercel': patch
'workflow': patch
---

Add `experimental_setAttributes()` workflow-level helper for attaching string key/value metadata to a workflow run, surfaced as `run.attributes`
3 changes: 3 additions & 0 deletions docs/content/docs/v4/cookbook/common-patterns/timeouts.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ export async function waitForApproval(requestId: string) {
throw new Error("Approval request expired after 7 days");
}

// You may see warnings like `Workflow run completed with 1 uncommitted operations` in your
// logs when the workflow completes. This is expected behavior.

return result.approved;
}
```
Expand Down
365 changes: 365 additions & 0 deletions docs/content/docs/v5/changelog/attributes-mvp.mdx

Large diffs are not rendered by default.

113 changes: 113 additions & 0 deletions packages/core/e2e/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3430,4 +3430,117 @@ describe('e2e', () => {
expect(returnValue.reason).toBe('Test complete');
}
);

// ==========================================================================
// experimental_setAttributes (experimental MVP)
// ==========================================================================

describe('experimental_setAttributes', () => {
test(
'experimentalSetAttributesWorkflow: workflow-body calls dispatch through the step bridge and merge correctly',
{ timeout: 30_000 },
async () => {
const run = await start(
await e2e('experimentalSetAttributesWorkflow'),
[7]
);
const output = await run.returnValue;
expect(output).toBe(21);

const world = await getWorld();
const persisted = await world.runs.get(run.runId);

// First call sets {phase: 'init', source: 'workflow-body'}; second
// overwrites phase; third unsets source via undefined → null.
expect(persisted?.attributes).toEqual({ phase: 'done' });
expect(persisted?.attributes ?? {}).not.toHaveProperty('source');

// Dispatch is via a real step — verify at least one
// `step_created`/`step_completed` pair for the `__builtin_set_attributes`
// step exists on the run's event log.
const { data: events } = await world.events.list({ runId: run.runId });
const attrStepEvents = events.filter(
(e) =>
(e.eventType === 'step_created' ||
e.eventType === 'step_completed') &&
typeof (e.eventData as { stepName?: string } | undefined)
?.stepName === 'string' &&
(e.eventData as { stepName: string }).stepName.includes(
'__builtin_set_attributes'
)
);
expect(attrStepEvents.length).toBeGreaterThanOrEqual(2);
}
);

// TODO(attributes): un-skip once the platform supports executing
// step bodies queued by `drainPendingQueueItems`. Today the step
// worker calls `executeStep` → `world.events.create('step_started')`,
// which the server rejects with `RunExpiredError` (HTTP 410) once
// the run has transitioned to a terminal state. Drain commits the
// `step_created` event and enqueues the message, but by the time
// the queue worker picks it up `run_completed` has landed and the
// worker skips the step ("Workflow run X has already completed,
// skipping step Y" in step-executor.ts).
//
// The fire-and-forget pattern itself works for `void` calls placed
// before any later `await` on a runtime primitive (the suspension
// queues the step before the run terminates) — see the awaited
// workflow-body test above for that coverage. What's broken is
// specifically "last void immediately before return". Either the
// platform needs to keep accepting `step_started` for steps the
// workflow itself queued at drain time, or attribute writes need
// a non-step dispatch path (planned for the full V1 attributes
// feature where attr_set is a first-class event type).
test.todo(
'fire-and-forget: void experimental_setAttributes lands without awaiting'
);

test(
'Promise.all of disjoint-key writes: every key lands',
{ timeout: 30_000 },
async () => {
const run = await start(
await e2e('experimentalSetAttributesParallelWorkflow'),
[]
);
const output = await run.returnValue;
expect(output).toBe('done');

const world = await getWorld();
const persisted = await world.runs.get(run.runId);

// Disjoint-key writes never collide, so all three keys must
// land regardless of dispatch ordering at the world.
expect(persisted?.attributes).toEqual({ a: '1', b: '2', c: '3' });
}
);

test(
'workflow throws after awaited setAttributes: attribute still persists on the failed run',
{ timeout: 30_000 },
async () => {
const run = await start(
await e2e('experimentalSetAttributesThrowsAfterWorkflow'),
[]
);
// The workflow throws — `returnValue` rejects.
await expect(run.returnValue).rejects.toThrow(/intentional failure/);

const world = await getWorld();
const persisted = await world.runs.get(run.runId);

expect(persisted?.status).toBe('failed');
// The attribute was awaited and therefore landed before the
// throw. The `run_failed` lifecycle write must preserve the
// attribute snapshot — the per-run file lock guarantees the
// lifecycle handler reads the fresh value off disk inside its
// critical section before writing the failed state back.
expect(persisted?.attributes).toEqual({
phase: 'about-to-fail',
reason: 'intentional',
});
}
);
});
});
1 change: 1 addition & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ export {
type WebhookOptions,
} from './create-hook.js';
export { defineHook, type TypedHook } from './define-hook.js';
export { experimental_setAttributes } from './set-attributes.js';
export { sleep } from './sleep.js';
export {
getStepMetadata,
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/runtime/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ export async function loadWorkflowRunEvents(
// Preserve the last non-null cursor across pages. A World may
// legitimately return `{ data: [], cursor: null, hasMore: false }`
// on a trailing empty page — for example when the previous page's
// underlying DynamoDB query hit `Limit` exactly and returned a
// underlying DB query hit the limit exactly and returned a
// `LastEvaluatedKey` "just in case". Overwriting with that null
// would lose the position past the last real event we loaded and
// force the runtime into the "no cursor after initial load" full-
Expand Down
19 changes: 19 additions & 0 deletions packages/core/src/set-attributes.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { FatalError } from '@workflow/errors';
import { describe, expect, it } from 'vitest';
import { experimental_setAttributes } from './set-attributes.js';

describe('experimental_setAttributes (host-side stub)', () => {
// The host-side `experimental_setAttributes` is the fallback resolved when callers
// are NOT in the workflow VM. The real implementation lives in
// `workflow/set-attributes.ts` and is selected via the `workflow`
// package-exports condition. Reaching this file from a step body or
// plain host code is unsupported and must surface a clear error.
it('throws FatalError telling the user experimental_setAttributes is workflow-body only', async () => {
await expect(experimental_setAttributes({ phase: 'init' })).rejects.toBeInstanceOf(
FatalError
);
await expect(experimental_setAttributes({ phase: 'init' })).rejects.toThrow(
/workflow.*function/i
);
});
});
25 changes: 25 additions & 0 deletions packages/core/src/set-attributes.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { FatalError } from '@workflow/errors';
import type { ExperimentalSetAttributesOptions } from './workflow/set-attributes.js';

export type { ExperimentalSetAttributesOptions };

/**
* Host-side stub for `experimental_setAttributes`. The real
* implementation lives in `./workflow/set-attributes.ts` and is
* selected by the `workflow` package-exports condition when the
* workflow VM bundle is resolved.
*
* Reaching this stub means the function was called outside a workflow
* body — most likely from a `'use step'` function or plain host code.
* That isn't supported in the MVP: attribute mutations must be
* event-sourced through the workflow runtime so they survive replay.
*/
export async function experimental_setAttributes(
_attrs: Record<string, string | undefined>,
_options?: ExperimentalSetAttributesOptions
): Promise<void> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the workflow-body-only restriction load-bearing, or scope?

Step-body support looks like ~10 lines: the host stub would do exactly what __builtin_set_attributes already does — read the runId from Symbol.for('WORKFLOW_STEP_CONTEXT_STORAGE'), read the world from Symbol.for('@workflow/world//cache'), validate, dispatch directly. Both symbols are already populated when a step body runs (the bridge step proves it).

const ctx = globalThis[Symbol.for('WORKFLOW_STEP_CONTEXT_STORAGE')]?.getStore?.();
const runId = ctx?.workflowMetadata?.workflowRunId;
if (!runId) throw new FatalError('must be called from workflow or step body');
const world = globalThis[Symbol.for('@workflow/world//cache')];
await world?.runs?.experimentalSetAttributes?.(runId, normalizedChanges);

Plain host code outside any run genuinely can't be supported with this signature (no runId to infer) — that part isn't arbitrary, you'd need a separate experimental_setRunAttributes(runId, {...}). But step body is just a scope decision.

Trade-offs of allowing step body:

What you'd gain:

  • Users writing a step that already has the data don't have to bubble values back to the workflow body to set an attribute.
  • The natural ergonomic model — setAttributes works "anywhere inside a run."

What you'd lose:

  • The "single dispatch path" simplicity the PR description cites.
  • Event-log visibility of the dispatch: workflow-body calls produce a step_created/step_completed pair via the __builtin_set_attributes bridge; step-body calls would write directly with no extra events. Replay determinism is unaffected (step bodies aren't re-executed during replay anyway).

Forward-compat with #1933 is fine either way — the planned attr_set event's writer: { type: 'step', stepId, attempt } discriminator already accounts for step writers.

Not asking you to add it in this PR — but if the restriction is just scope-cut rather than something the architecture is leaning on, please mention that in the changelog so it doesn't read as a hard architectural constraint that has to stay until V1.

throw new FatalError(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FatalError here feels too heavy-handed. FatalError in @workflow/errors marks the whole run as failed — a large blast radius for what is fundamentally observability metadata. "I moved a tag call into a step → my prod workflow run dies" violates the principle that telemetry should never crash the thing it instruments.

The PR is also internally inconsistent on this exact question. The world-adapter-doesn't-support case in __builtin_set_attributes (packages/workflow/src/internal/builtins.ts around lines 962–975) takes the opposite stance — it console.warns once and silently no-ops. Both situations are "the write won't land"; only the developer's mistake differs. The sharper failure mode for the wrong-call-site case is hard to defend when the unsupported-world case is allowed to drop silently.

A few alternatives worth considering:

  • Warn-once + drop (matches the unsupported-world path)
  • Non-Fatal Error (catchable, surfaces in logs, doesn't terminate the run)

Note also that empty objects are already a silent no-op in the sibling workflow-side implementation (if (changes.length === 0) return; in packages/core/src/workflow/set-attributes.ts). So the threshold for "throw vs drop" isn't really about correctness — it's about whether the author meant to do something. That's a hard line to draw with a Fatal hand grenade.

At minimum, the docstring should justify why this is Fatal rather than a plain Error or warn-and-drop.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FatalError is correct here for now. we can make tagging also work inside steps easily, but expanding this contract slowly and intentionally since the uderlying implementation might change. for now, trying to set a tag inside a step should fail the step loudly so you don't accidentally think it's ok

"experimental_setAttributes() must be called from a 'use workflow' function. " +
'Calling it from a step body or plain host code is not supported.'
);
}
17 changes: 13 additions & 4 deletions packages/core/src/workflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import {
WorkflowRuntimeError,
} from '@workflow/errors';
import { withResolvers } from '@workflow/utils';
import { getPortLazy } from './runtime/get-port-lazy.js';
import { parseWorkflowName } from '@workflow/utils/parse-name';
import type { Event, WorkflowRun } from '@workflow/world';
import * as nanoid from 'nanoid';
Expand All @@ -16,9 +15,10 @@ import { EventConsumerResult, EventsConsumer } from './events-consumer.js';
import type { QueueItem } from './global.js';
import { ENOTSUP, WorkflowSuspension } from './global.js';
import { runtimeLogger } from './logger.js';
import type { WorkflowOrchestratorContext } from './private.js';
import { getPortLazy } from './runtime/get-port-lazy.js';
import { handleSuspension } from './runtime/suspension-handler.js';
import { getWorld } from './runtime/world.js';
import type { WorkflowOrchestratorContext } from './private.js';
import {
dehydrateWorkflowReturnValue,
hydrateWorkflowArguments,
Expand All @@ -36,12 +36,12 @@ import * as Attribute from './telemetry/semantic-conventions.js';
import { trace } from './telemetry.js';
import { getWorkflowRunStreamId } from './util.js';
import { createContext } from './vm/index.js';
import type { WorkflowMetadata } from './workflow/get-workflow-metadata.js';
import { WORKFLOW_CONTEXT_SYMBOL } from './workflow/get-workflow-metadata.js';
import {
createAbortSignalStatics,
createCreateAbortController,
} from './workflow/abort-controller.js';
import type { WorkflowMetadata } from './workflow/get-workflow-metadata.js';
import { WORKFLOW_CONTEXT_SYMBOL } from './workflow/get-workflow-metadata.js';
import { createCreateHook } from './workflow/hook.js';
import { createSleep } from './workflow/sleep.js';

Expand All @@ -60,6 +60,15 @@ import { createSleep } from './workflow/sleep.js';
* the abort hook is created but never resumed and the cancellation never
* reaches the running step.
*
* NOTE: drain only commits the `*_created` events; it does NOT enqueue step
* bodies for execution. The platform's step worker rejects `step_started`
* for runs that have already transitioned to terminal (`RunExpiredError`),
* so a step queued here would be skipped anyway. Fire-and-forget step calls
* with side effects therefore work only when followed by some later `await`
* on a runtime primitive that triggers a real suspension (the normal
* runtime loop in `runtime.ts` queues the step there). A `void` placed
* immediately before `return` is not reliably executed.
*
* Drain failures are swallowed: the workflow's own outcome (the user's return
* value or thrown error) is the source of truth; secondary cleanup that fails
* shouldn't change the run's terminal state.
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/workflow/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export {
type RetryableErrorOptions,
} from '@workflow/errors';
export type { Hook, HookOptions } from '../create-hook.js';
export { experimental_setAttributes } from './set-attributes.js';
export { sleep } from '../sleep.js';
export { createHook, createWebhook } from './create-hook.js';
export { defineHook } from './define-hook.js';
Expand Down
109 changes: 109 additions & 0 deletions packages/core/src/workflow/set-attributes.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import { FatalError } from '@workflow/errors';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { WORKFLOW_USE_STEP } from '../symbols.js';
import { experimental_setAttributes } from './set-attributes.js';

describe('workflow.experimental_setAttributes', () => {
const dispatchCalls: Array<{
stepName: string;
changes: Array<{ key: string; value: string | null }>;
options: { allowReservedAttributes?: boolean } | undefined;
}> = [];

beforeEach(() => {
dispatchCalls.length = 0;
(globalThis as Record<symbol, unknown>)[WORKFLOW_USE_STEP] = vi.fn(
(stepName: string) =>
async (
changes: Array<{ key: string; value: string | null }>,
options?: { allowReservedAttributes?: boolean }
) => {
dispatchCalls.push({ stepName, changes, options });
}
);
});

afterEach(() => {
delete (globalThis as Record<symbol, unknown>)[WORKFLOW_USE_STEP];
});

it('dispatches normalized changes through __builtin_set_attributes', async () => {
await experimental_setAttributes({ phase: 'init', orderId: 'ord_1' });
expect(dispatchCalls).toEqual([
{
stepName: '__builtin_set_attributes',
changes: [
{ key: 'phase', value: 'init' },
{ key: 'orderId', value: 'ord_1' },
],
options: {},
},
]);
});

it('translates undefined values into null (unset semantics)', async () => {
await experimental_setAttributes({ phase: 'done', stale: undefined });
expect(dispatchCalls).toEqual([
{
stepName: '__builtin_set_attributes',
changes: [
{ key: 'phase', value: 'done' },
{ key: 'stale', value: null },
],
options: {},
},
]);
});

it('is a no-op for an empty record (no dispatch)', async () => {
await experimental_setAttributes({});
expect(dispatchCalls).toHaveLength(0);
});

it('throws FatalError when the workflow runtime has not initialized useStep', async () => {
delete (globalThis as Record<symbol, unknown>)[WORKFLOW_USE_STEP];
await expect(
experimental_setAttributes({ phase: 'init' })
).rejects.toBeInstanceOf(FatalError);
});

it('throws FatalError for reserved-prefix keys before any dispatch', async () => {
await expect(
experimental_setAttributes({ $sys: 'x' })
).rejects.toBeInstanceOf(FatalError);
expect(dispatchCalls).toHaveLength(0);
});

it('dispatches reserved-prefix keys when allowReservedAttributes opt-in is set, and forwards the flag to the step', async () => {
await experimental_setAttributes(
{ '$framework.kind': 'agent' },
{ allowReservedAttributes: true }
);
expect(dispatchCalls).toEqual([
{
stepName: '__builtin_set_attributes',
changes: [{ key: '$framework.kind', value: 'agent' }],
options: { allowReservedAttributes: true },
},
]);
});

it('still rejects reserved-prefix keys when allowReservedAttributes is explicitly false', async () => {
await expect(
experimental_setAttributes(
{ '$framework.kind': 'agent' },
{ allowReservedAttributes: false }
)
).rejects.toBeInstanceOf(FatalError);
expect(dispatchCalls).toHaveLength(0);
});

it('throws FatalError when called with a non-object', async () => {
await expect(
experimental_setAttributes(null as unknown as Record<string, string>)
).rejects.toBeInstanceOf(FatalError);
await expect(
experimental_setAttributes([] as unknown as Record<string, string>)
).rejects.toBeInstanceOf(FatalError);
});
});
Loading
Loading