Skip to content

Commit 14ed89d

Browse files
committed
add StepMeta type structure for skippable deps tracking
1 parent 2b3cca7 commit 14ed89d

9 files changed

Lines changed: 999 additions & 241 deletions

File tree

pkgs/core/supabase/tests/condition_evaluation/skipped_deps_excluded_from_input.test.sql

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -76,32 +76,63 @@ select pgflow.complete_task(
7676
'{"data": "from_b"}'::jsonb
7777
);
7878

79-
-- Test 3: step_c remaining_deps should be 0 (both deps resolved - a skipped, b completed)
79+
-- Test 3: Verify step_c remaining_deps is 0 (ready to start)
8080
select is(
8181
(select remaining_deps from pgflow.step_states
8282
where run_id = (select run_id from run_ids) and step_slug = 'step_c'),
8383
0,
8484
'step_c remaining_deps should be 0 (a skipped + b completed)'
8585
);
8686

87-
-- Test 4: step_c should now be started
87+
-- Now read and start step_c - this replicates what read_and_start does
88+
-- and allows us to inspect the returned input value
89+
--
90+
-- We need to do this in steps:
91+
-- 1. Read the message from the queue
92+
-- 2. Start the task with start_tasks
93+
-- 3. Inspect the input returned by start_tasks
94+
95+
-- Read the message and store msg_id
96+
with read_msg as (
97+
select * from pgmq.read_with_poll('skip_diamond', 1, 1, 1, 50)
98+
limit 1
99+
),
100+
msg_ids as (
101+
select array_agg(msg_id) as ids from read_msg
102+
),
103+
-- Start the task and get the input
104+
start_result as (
105+
select st.input, st.step_slug, st.run_id
106+
from pgflow.start_tasks(
107+
'skip_diamond',
108+
(select ids from msg_ids),
109+
pgflow_tests.ensure_worker('skip_diamond')
110+
) st
111+
)
112+
-- Store the input for later testing
113+
select input, step_slug, run_id into temporary step_c_inputs
114+
from start_result
115+
where step_slug = 'step_c';
116+
117+
-- Test 4: Verify step_c was started
88118
select is(
89119
(select status from pgflow.step_states
90120
where run_id = (select run_id from run_ids) and step_slug = 'step_c'),
91121
'started',
92-
'step_c should be started after step_b completes'
122+
'step_c should be started after read_and_start'
93123
);
94124

95-
-- Test 5: step_b output should be in step_states
125+
-- Test 5: Verify the input does NOT contain step_a key
126+
-- The handler input should only have step_b, NOT step_a
96127
select is(
97-
(select output from pgflow.step_states
98-
where run_id = (select run_id from run_ids) and step_slug = 'step_b'),
99-
'{"data": "from_b"}'::jsonb,
100-
'step_b output should be stored'
128+
(select input from step_c_inputs),
129+
'{"step_b": {"data": "from_b"}}'::jsonb,
130+
'step_c input should only contain step_b, not step_a (skipped deps are excluded)'
101131
);
102132

103133
-- Clean up
104134
drop table if exists run_ids;
135+
drop table if exists step_c_inputs;
105136

106137
select finish();
107138
rollback;

pkgs/dsl/__tests__/runtime/condition-options.test.ts

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,69 +3,69 @@ import { Flow } from '../../src/dsl.js';
33
import { compileFlow } from '../../src/compile-flow.js';
44

55
describe('Condition Options', () => {
6-
describe('DSL accepts condition and whenUnmet', () => {
7-
it('should accept condition option on a step', () => {
6+
describe('DSL accepts if and else', () => {
7+
it('should accept if option on a step', () => {
88
const flow = new Flow({ slug: 'test_flow' })
99
.step(
10-
{ slug: 'conditional_step', condition: { enabled: true } },
10+
{ slug: 'conditional_step', if: { enabled: true } },
1111
() => 'result'
1212
);
1313

1414
const step = flow.getStepDefinition('conditional_step');
15-
expect(step.options.condition).toEqual({ enabled: true });
15+
expect(step.options.if).toEqual({ enabled: true });
1616
});
1717

18-
it('should accept whenUnmet option on a step', () => {
18+
it('should accept else option on a step', () => {
1919
const flow = new Flow({ slug: 'test_flow' })
2020
.step(
21-
{ slug: 'conditional_step', whenUnmet: 'skip' },
21+
{ slug: 'conditional_step', else: 'skip' },
2222
() => 'result'
2323
);
2424

2525
const step = flow.getStepDefinition('conditional_step');
26-
expect(step.options.whenUnmet).toBe('skip');
26+
expect(step.options.else).toBe('skip');
2727
});
2828

29-
it('should accept both condition and whenUnmet together', () => {
29+
it('should accept both if and else together', () => {
3030
const flow = new Flow({ slug: 'test_flow' })
3131
.step(
3232
{
3333
slug: 'conditional_step',
34-
condition: { status: 'active' },
35-
whenUnmet: 'skip-cascade',
34+
if: { status: 'active' },
35+
else: 'skip-cascade',
3636
},
3737
() => 'result'
3838
);
3939

4040
const step = flow.getStepDefinition('conditional_step');
41-
expect(step.options.condition).toEqual({ status: 'active' });
42-
expect(step.options.whenUnmet).toBe('skip-cascade');
41+
expect(step.options.if).toEqual({ status: 'active' });
42+
expect(step.options.else).toBe('skip-cascade');
4343
});
4444

45-
it('should accept condition on dependent steps', () => {
45+
it('should accept if on dependent steps', () => {
4646
const flow = new Flow({ slug: 'test_flow' })
4747
.step({ slug: 'first' }, () => ({ success: true }))
4848
.step(
4949
{
5050
slug: 'conditional_step',
5151
dependsOn: ['first'],
52-
condition: { first: { success: true } },
53-
whenUnmet: 'skip',
52+
if: { first: { success: true } },
53+
else: 'skip',
5454
},
5555
() => 'result'
5656
);
5757

5858
const step = flow.getStepDefinition('conditional_step');
59-
expect(step.options.condition).toEqual({ first: { success: true } });
60-
expect(step.options.whenUnmet).toBe('skip');
59+
expect(step.options.if).toEqual({ first: { success: true } });
60+
expect(step.options.else).toBe('skip');
6161
});
6262
});
6363

6464
describe('compileFlow includes condition parameters', () => {
6565
it('should compile condition_pattern for root step', () => {
6666
const flow = new Flow({ slug: 'test_flow' })
6767
.step(
68-
{ slug: 'step1', condition: { enabled: true } },
68+
{ slug: 'step1', if: { enabled: true } },
6969
() => 'result'
7070
);
7171

@@ -78,7 +78,7 @@ describe('Condition Options', () => {
7878
it('should compile when_unmet for step', () => {
7979
const flow = new Flow({ slug: 'test_flow' })
8080
.step(
81-
{ slug: 'step1', whenUnmet: 'fail' },
81+
{ slug: 'step1', else: 'fail' },
8282
() => 'result'
8383
);
8484

@@ -93,8 +93,8 @@ describe('Condition Options', () => {
9393
.step(
9494
{
9595
slug: 'step1',
96-
condition: { active: true, type: 'premium' },
97-
whenUnmet: 'skip-cascade',
96+
if: { active: true, type: 'premium' },
97+
else: 'skip-cascade',
9898
},
9999
() => 'result'
100100
);
@@ -113,8 +113,8 @@ describe('Condition Options', () => {
113113
slug: 'step1',
114114
maxAttempts: 3,
115115
timeout: 60,
116-
condition: { enabled: true },
117-
whenUnmet: 'skip',
116+
if: { enabled: true },
117+
else: 'skip',
118118
},
119119
() => 'result'
120120
);
@@ -135,8 +135,8 @@ describe('Condition Options', () => {
135135
{
136136
slug: 'second',
137137
dependsOn: ['first'],
138-
condition: { first: { success: true } },
139-
whenUnmet: 'skip',
138+
if: { first: { success: true } },
139+
else: 'skip',
140140
},
141141
() => 'result'
142142
);
@@ -150,26 +150,26 @@ describe('Condition Options', () => {
150150
});
151151
});
152152

153-
describe('whenUnmet validation', () => {
154-
it('should only accept valid whenUnmet values', () => {
153+
describe('else validation', () => {
154+
it('should only accept valid else values', () => {
155155
// Valid values should work
156156
expect(() =>
157157
new Flow({ slug: 'test' }).step(
158-
{ slug: 's1', whenUnmet: 'fail' },
158+
{ slug: 's1', else: 'fail' },
159159
() => 1
160160
)
161161
).not.toThrow();
162162

163163
expect(() =>
164164
new Flow({ slug: 'test' }).step(
165-
{ slug: 's1', whenUnmet: 'skip' },
165+
{ slug: 's1', else: 'skip' },
166166
() => 1
167167
)
168168
).not.toThrow();
169169

170170
expect(() =>
171171
new Flow({ slug: 'test' }).step(
172-
{ slug: 's1', whenUnmet: 'skip-cascade' },
172+
{ slug: 's1', else: 'skip-cascade' },
173173
() => 1
174174
)
175175
).not.toThrow();

0 commit comments

Comments
 (0)