Skip to content

Commit 517b658

Browse files
committed
feat: add client support for skipped step events
- Add FlowStepStatus.Skipped to enum - Add SkipReason type (condition_unmet|handler_failed|dependency_skipped) - Add BroadcastStepSkippedEvent type - Add skipped event handling to FlowStep class - Add skipped_at and skip_reason getters - Update waitForStatus to accept Skipped - Treat Skipped as terminal state - Update eventAdapters for skipped events - Add unit tests, type tests, and E2E tests - All 206 client tests pass
1 parent d62437b commit 517b658

9 files changed

Lines changed: 898 additions & 106 deletions

File tree

Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
import { describe, it, expect } from 'vitest';
2+
import { withPgNoTransaction } from '../helpers/db.js';
3+
import { createTestSupabaseClient } from '../helpers/setup.js';
4+
import { createTestFlow } from '../helpers/fixtures.js';
5+
import { grantMinimalPgflowPermissions } from '../helpers/permissions.js';
6+
import { PgflowClient } from '../../src/lib/PgflowClient.js';
7+
import { FlowStepStatus } from '../../src/lib/types.js';
8+
import { PgflowSqlClient } from '@pgflow/core';
9+
import { readAndStart } from '../helpers/polling.js';
10+
import { cleanupFlow } from '../helpers/cleanup.js';
11+
import { createEventTracker } from '../helpers/test-utils.js';
12+
13+
/**
14+
* Tests for skipped step event handling in the client.
15+
*
16+
* Skipped steps can occur when:
17+
* - A step's condition evaluates to false (condition_unmet)
18+
* - A dependency was skipped, causing cascading skips (dependency_skipped)
19+
* - A handler fails during evaluation (handler_failed)
20+
*
21+
* These tests verify the client correctly:
22+
* - Receives and processes skipped broadcast events
23+
* - Updates step state with skipped_at and skip_reason
24+
* - Treats skipped as a terminal state
25+
* - Handles waitForStatus(Skipped) correctly
26+
*/
27+
describe('Skipped Step Handling', () => {
28+
it(
29+
'client handles skipped step state from database snapshot',
30+
withPgNoTransaction(async (sql) => {
31+
// This test verifies the client correctly handles skipped step state
32+
// when fetched from the database (e.g., on reconnect or late join)
33+
34+
const testFlow = createTestFlow('skipped_snapshot_flow');
35+
await cleanupFlow(sql, testFlow.slug);
36+
await grantMinimalPgflowPermissions(sql);
37+
38+
await sql`SELECT pgflow.create_flow(${testFlow.slug})`;
39+
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'will_skip_step')`;
40+
41+
const supabaseClient = createTestSupabaseClient();
42+
const pgflowClient = new PgflowClient(supabaseClient, {
43+
realtimeStabilizationDelayMs: 1000,
44+
});
45+
46+
// Start the flow
47+
const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' });
48+
const step = run.step('will_skip_step');
49+
50+
// Verify initial state is Started (root step)
51+
expect(step.status).toBe(FlowStepStatus.Started);
52+
53+
// Directly call pgflow.skip_step to simulate the step being skipped
54+
// This mimics what would happen when a condition evaluates to false
55+
await sql`SELECT pgflow.skip_step(
56+
${run.run_id}::uuid,
57+
${'will_skip_step'}::text,
58+
${'condition_unmet'}::text
59+
)`;
60+
61+
// Wait for the skipped event to be received
62+
await step.waitForStatus(FlowStepStatus.Skipped, { timeoutMs: 10000 });
63+
64+
// Verify skipped state
65+
expect(step.status).toBe(FlowStepStatus.Skipped);
66+
expect(step.skipped_at).toBeInstanceOf(Date);
67+
expect(step.skip_reason).toBe('condition_unmet');
68+
69+
// Verify output is null for skipped steps (per design decision Q1)
70+
expect(step.output).toBeNull();
71+
72+
await supabaseClient.removeAllChannels();
73+
}),
74+
{ timeout: 15000 }
75+
);
76+
77+
it(
78+
'receives skipped broadcast event and updates step state',
79+
withPgNoTransaction(async (sql) => {
80+
// This test verifies the client receives and processes skipped events
81+
// broadcast via Supabase realtime
82+
83+
const testFlow = createTestFlow('skipped_broadcast_flow');
84+
await cleanupFlow(sql, testFlow.slug);
85+
await grantMinimalPgflowPermissions(sql);
86+
87+
await sql`SELECT pgflow.create_flow(${testFlow.slug})`;
88+
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'skipped_step')`;
89+
90+
const supabaseClient = createTestSupabaseClient();
91+
const pgflowClient = new PgflowClient(supabaseClient, {
92+
realtimeStabilizationDelayMs: 1000,
93+
});
94+
95+
const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' });
96+
const step = run.step('skipped_step');
97+
98+
// Set up event tracking BEFORE the skip happens
99+
const tracker = createEventTracker();
100+
step.on('*', tracker.callback);
101+
102+
// Skip the step
103+
await sql`SELECT pgflow.skip_step(
104+
${run.run_id}::uuid,
105+
${'skipped_step'}::text,
106+
${'handler_failed'}::text
107+
)`;
108+
109+
// Wait for the skipped status
110+
await step.waitForStatus(FlowStepStatus.Skipped, { timeoutMs: 10000 });
111+
112+
// Verify we received the skipped event
113+
expect(tracker).toHaveReceivedEvent('step:skipped');
114+
expect(tracker).toHaveReceivedEvent('step:skipped', {
115+
run_id: run.run_id,
116+
step_slug: 'skipped_step',
117+
status: FlowStepStatus.Skipped,
118+
skip_reason: 'handler_failed',
119+
});
120+
121+
// Verify step state
122+
expect(step.status).toBe(FlowStepStatus.Skipped);
123+
expect(step.skip_reason).toBe('handler_failed');
124+
125+
await supabaseClient.removeAllChannels();
126+
}),
127+
{ timeout: 15000 }
128+
);
129+
130+
it(
131+
'skipped is a terminal state - no further status changes',
132+
withPgNoTransaction(async (sql) => {
133+
// Verify that once a step is skipped, it cannot transition to other states
134+
135+
const testFlow = createTestFlow('skipped_terminal_flow');
136+
await cleanupFlow(sql, testFlow.slug);
137+
await grantMinimalPgflowPermissions(sql);
138+
139+
await sql`SELECT pgflow.create_flow(${testFlow.slug})`;
140+
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'terminal_step')`;
141+
142+
const supabaseClient = createTestSupabaseClient();
143+
const pgflowClient = new PgflowClient(supabaseClient, {
144+
realtimeStabilizationDelayMs: 1000,
145+
});
146+
147+
const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' });
148+
const step = run.step('terminal_step');
149+
150+
// Skip the step
151+
await sql`SELECT pgflow.skip_step(
152+
${run.run_id}::uuid,
153+
${'terminal_step'}::text,
154+
${'dependency_skipped'}::text
155+
)`;
156+
157+
await step.waitForStatus(FlowStepStatus.Skipped, { timeoutMs: 10000 });
158+
159+
// Store original skipped_at
160+
const originalSkippedAt = step.skipped_at;
161+
162+
// Set up tracker for any subsequent events
163+
const tracker = createEventTracker();
164+
step.on('*', tracker.callback);
165+
166+
// Try to complete the step (should be ignored by the database, no event broadcast)
167+
// The database should reject this, but even if an event comes through,
168+
// the client should ignore it due to terminal state protection
169+
const sqlClient = new PgflowSqlClient(sql);
170+
try {
171+
// This should fail at the database level
172+
const tasks = await readAndStart(sql, sqlClient, testFlow.slug, 0.1, 1);
173+
if (tasks.length > 0) {
174+
await sqlClient.completeTask(tasks[0], { result: 'should not work' });
175+
}
176+
} catch (e) {
177+
// Expected - skipped steps shouldn't have tasks
178+
}
179+
180+
// Give time for any potential events
181+
await new Promise((resolve) => setTimeout(resolve, 1000));
182+
183+
// Verify step is still skipped with same timestamp
184+
expect(step.status).toBe(FlowStepStatus.Skipped);
185+
expect(step.skipped_at).toEqual(originalSkippedAt);
186+
187+
// Verify no additional events were processed
188+
expect(tracker).toHaveReceivedTotalEvents(0);
189+
190+
await supabaseClient.removeAllChannels();
191+
}),
192+
{ timeout: 15000 }
193+
);
194+
195+
it(
196+
'waitForStatus(Skipped) resolves when step is skipped',
197+
withPgNoTransaction(async (sql) => {
198+
// Verify waitForStatus works correctly with Skipped status
199+
200+
const testFlow = createTestFlow('wait_skipped_flow');
201+
await cleanupFlow(sql, testFlow.slug);
202+
await grantMinimalPgflowPermissions(sql);
203+
204+
await sql`SELECT pgflow.create_flow(${testFlow.slug})`;
205+
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'wait_step')`;
206+
207+
const supabaseClient = createTestSupabaseClient();
208+
const pgflowClient = new PgflowClient(supabaseClient, {
209+
realtimeStabilizationDelayMs: 1000,
210+
});
211+
212+
const run = await pgflowClient.startFlow(testFlow.slug, { test: 'data' });
213+
const step = run.step('wait_step');
214+
215+
// Start waiting for skipped BEFORE the skip happens
216+
const waitPromise = step.waitForStatus(FlowStepStatus.Skipped, {
217+
timeoutMs: 10000,
218+
});
219+
220+
// Skip the step after a small delay
221+
setTimeout(async () => {
222+
await sql`SELECT pgflow.skip_step(
223+
${run.run_id}::uuid,
224+
${'wait_step'}::text,
225+
${'condition_unmet'}::text
226+
)`;
227+
}, 100);
228+
229+
// Wait should resolve with the step
230+
const result = await waitPromise;
231+
expect(result).toBe(step);
232+
expect(result.status).toBe(FlowStepStatus.Skipped);
233+
expect(result.skip_reason).toBe('condition_unmet');
234+
235+
await supabaseClient.removeAllChannels();
236+
}),
237+
{ timeout: 15000 }
238+
);
239+
240+
it(
241+
'handles all skip reasons correctly',
242+
withPgNoTransaction(async (sql) => {
243+
// Verify all three skip reasons are handled correctly
244+
245+
const skipReasons = [
246+
'condition_unmet',
247+
'handler_failed',
248+
'dependency_skipped',
249+
] as const;
250+
251+
for (const skipReason of skipReasons) {
252+
const testFlow = createTestFlow(`skip_reason_${skipReason}_flow`);
253+
await cleanupFlow(sql, testFlow.slug);
254+
await grantMinimalPgflowPermissions(sql);
255+
256+
await sql`SELECT pgflow.create_flow(${testFlow.slug})`;
257+
await sql`SELECT pgflow.add_step(${testFlow.slug}, 'reason_step')`;
258+
259+
const supabaseClient = createTestSupabaseClient();
260+
const pgflowClient = new PgflowClient(supabaseClient, {
261+
realtimeStabilizationDelayMs: 1000,
262+
});
263+
264+
const run = await pgflowClient.startFlow(testFlow.slug, {
265+
test: 'data',
266+
});
267+
const step = run.step('reason_step');
268+
269+
// Skip with specific reason
270+
await sql`SELECT pgflow.skip_step(
271+
${run.run_id}::uuid,
272+
${'reason_step'}::text,
273+
${skipReason}::text
274+
)`;
275+
276+
await step.waitForStatus(FlowStepStatus.Skipped, { timeoutMs: 10000 });
277+
278+
// Verify the skip reason was captured correctly
279+
expect(step.status).toBe(FlowStepStatus.Skipped);
280+
expect(step.skip_reason).toBe(skipReason);
281+
282+
await supabaseClient.removeAllChannels();
283+
}
284+
}),
285+
{ timeout: 45000 }
286+
);
287+
});

pkgs/client/__tests__/helpers/event-factories.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import type {
77
BroadcastStepStartedEvent,
88
BroadcastStepCompletedEvent,
99
BroadcastStepFailedEvent,
10+
BroadcastStepSkippedEvent,
1011
} from '../../src/lib/types';
1112

1213
/**
@@ -98,3 +99,17 @@ export function createStepFailedEvent(
9899
...overrides,
99100
};
100101
}
102+
103+
export function createStepSkippedEvent(
104+
overrides: Partial<BroadcastStepSkippedEvent> = {}
105+
): BroadcastStepSkippedEvent {
106+
return {
107+
event_type: 'step:skipped',
108+
run_id: '123e4567-e89b-12d3-a456-426614174000',
109+
step_slug: 'test-step',
110+
status: FlowStepStatus.Skipped,
111+
skipped_at: new Date().toISOString(),
112+
skip_reason: 'condition_unmet',
113+
...overrides,
114+
};
115+
}

0 commit comments

Comments
 (0)