Skip to content

Commit 4354fcb

Browse files
committed
implement ifNot negative condition pattern
1 parent bb09b62 commit 4354fcb

28 files changed

Lines changed: 1460 additions & 365 deletions

pkgs/core/schemas/0050_tables_definitions.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ create table pgflow.steps (
2424
opt_base_delay int,
2525
opt_timeout int,
2626
opt_start_delay int,
27-
condition_pattern jsonb, -- JSON pattern for @> containment check
27+
condition_pattern jsonb, -- JSON pattern for @> containment check (if)
28+
condition_not_pattern jsonb, -- JSON pattern for NOT @> containment check (ifNot)
2829
when_unmet text not null default 'skip', -- What to do when condition not met (skip is natural default)
2930
when_failed text not null default 'fail', -- What to do when handler fails after retries
3031
created_at timestamptz not null default now(),

pkgs/core/schemas/0100_function_add_step.sql

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ create or replace function pgflow.add_step(
88
start_delay int default null,
99
step_type text default 'single',
1010
condition_pattern jsonb default null,
11+
condition_not_pattern jsonb default null,
1112
when_unmet text default 'skip',
1213
when_failed text default 'fail'
1314
)
@@ -40,7 +41,7 @@ BEGIN
4041
INSERT INTO pgflow.steps (
4142
flow_slug, step_slug, step_type, step_index, deps_count,
4243
opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay,
43-
condition_pattern, when_unmet, when_failed
44+
condition_pattern, condition_not_pattern, when_unmet, when_failed
4445
)
4546
VALUES (
4647
add_step.flow_slug,
@@ -53,6 +54,7 @@ BEGIN
5354
add_step.timeout,
5455
add_step.start_delay,
5556
add_step.condition_pattern,
57+
add_step.condition_not_pattern,
5658
add_step.when_unmet,
5759
add_step.when_failed
5860
)

pkgs/core/schemas/0100_function_cascade_resolve_conditions.sql

Lines changed: 34 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,15 @@ BEGIN
4646
-- PHASE 1a: CHECK FOR FAIL CONDITIONS
4747
-- ==========================================
4848
-- Find first step (by topological order) with unmet condition and 'fail' mode.
49+
-- Condition is unmet when:
50+
-- (condition_pattern is set AND input does NOT contain it) OR
51+
-- (condition_not_pattern is set AND input DOES contain it)
4952
WITH steps_with_conditions AS (
5053
SELECT
5154
step_state.flow_slug,
5255
step_state.step_slug,
5356
step.condition_pattern,
57+
step.condition_not_pattern,
5458
step.when_unmet,
5559
step.deps_count,
5660
step.step_index
@@ -61,7 +65,7 @@ BEGIN
6165
WHERE step_state.run_id = cascade_resolve_conditions.run_id
6266
AND step_state.status = 'created'
6367
AND step_state.remaining_deps = 0
64-
AND step.condition_pattern IS NOT NULL
68+
AND (step.condition_pattern IS NOT NULL OR step.condition_not_pattern IS NOT NULL)
6569
),
6670
step_deps_output AS (
6771
SELECT
@@ -79,26 +83,31 @@ BEGIN
7983
condition_evaluations AS (
8084
SELECT
8185
swc.*,
82-
CASE
83-
WHEN swc.deps_count = 0 THEN v_run_input @> swc.condition_pattern
84-
ELSE COALESCE(sdo.deps_output, '{}'::jsonb) @> swc.condition_pattern
85-
END AS condition_met
86+
-- condition_met = (if IS NULL OR input @> if) AND (ifNot IS NULL OR NOT(input @> ifNot))
87+
(swc.condition_pattern IS NULL OR
88+
CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.condition_pattern)
89+
AND
90+
(swc.condition_not_pattern IS NULL OR
91+
NOT (CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.condition_not_pattern))
92+
AS condition_met
8693
FROM steps_with_conditions swc
8794
LEFT JOIN step_deps_output sdo ON sdo.step_slug = swc.step_slug
8895
)
89-
SELECT flow_slug, step_slug, condition_pattern
96+
SELECT flow_slug, step_slug, condition_pattern, condition_not_pattern
9097
INTO v_first_fail
9198
FROM condition_evaluations
9299
WHERE NOT condition_met AND when_unmet = 'fail'
93100
ORDER BY step_index
94101
LIMIT 1;
95102

96103
-- Handle fail mode: fail step and run, return false
97-
IF v_first_fail IS NOT NULL THEN
104+
-- Note: Cannot use "v_first_fail IS NOT NULL" because records with NULL fields
105+
-- evaluate to NULL in IS NOT NULL checks. Use FOUND instead.
106+
IF FOUND THEN
98107
UPDATE pgflow.step_states
99108
SET status = 'failed',
100109
failed_at = now(),
101-
error_message = 'Condition not met: ' || v_first_fail.condition_pattern::text
110+
error_message = 'Condition not met'
102111
WHERE pgflow.step_states.run_id = cascade_resolve_conditions.run_id
103112
AND pgflow.step_states.step_slug = v_first_fail.step_slug;
104113

@@ -114,12 +123,13 @@ BEGIN
114123
-- PHASE 1b: HANDLE SKIP CONDITIONS (with propagation)
115124
-- ==========================================
116125
-- Skip steps with unmet conditions and whenUnmet='skip'.
117-
-- NEW: Also decrement remaining_deps on dependents and set initial_tasks=0 for map dependents.
126+
-- Also decrement remaining_deps on dependents and set initial_tasks=0 for map dependents.
118127
WITH steps_with_conditions AS (
119128
SELECT
120129
step_state.flow_slug,
121130
step_state.step_slug,
122131
step.condition_pattern,
132+
step.condition_not_pattern,
123133
step.when_unmet,
124134
step.deps_count,
125135
step.step_index
@@ -130,7 +140,7 @@ BEGIN
130140
WHERE step_state.run_id = cascade_resolve_conditions.run_id
131141
AND step_state.status = 'created'
132142
AND step_state.remaining_deps = 0
133-
AND step.condition_pattern IS NOT NULL
143+
AND (step.condition_pattern IS NOT NULL OR step.condition_not_pattern IS NOT NULL)
134144
),
135145
step_deps_output AS (
136146
SELECT
@@ -148,10 +158,13 @@ BEGIN
148158
condition_evaluations AS (
149159
SELECT
150160
swc.*,
151-
CASE
152-
WHEN swc.deps_count = 0 THEN v_run_input @> swc.condition_pattern
153-
ELSE COALESCE(sdo.deps_output, '{}'::jsonb) @> swc.condition_pattern
154-
END AS condition_met
161+
-- condition_met = (if IS NULL OR input @> if) AND (ifNot IS NULL OR NOT(input @> ifNot))
162+
(swc.condition_pattern IS NULL OR
163+
CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.condition_pattern)
164+
AND
165+
(swc.condition_not_pattern IS NULL OR
166+
NOT (CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.condition_not_pattern))
167+
AS condition_met
155168
FROM steps_with_conditions swc
156169
LEFT JOIN step_deps_output sdo ON sdo.step_slug = swc.step_slug
157170
),
@@ -231,13 +244,15 @@ BEGIN
231244
WHERE ready_step.run_id = cascade_resolve_conditions.run_id
232245
AND ready_step.status = 'created'
233246
AND ready_step.remaining_deps = 0
234-
AND step.condition_pattern IS NOT NULL
247+
AND (step.condition_pattern IS NOT NULL OR step.condition_not_pattern IS NOT NULL)
235248
AND step.when_unmet = 'skip-cascade'
249+
-- Condition is NOT met when: (if fails) OR (ifNot fails)
236250
AND NOT (
237-
CASE
238-
WHEN step.deps_count = 0 THEN v_run_input @> step.condition_pattern
239-
ELSE COALESCE(agg_deps.deps_output, '{}'::jsonb) @> step.condition_pattern
240-
END
251+
(step.condition_pattern IS NULL OR
252+
CASE WHEN step.deps_count = 0 THEN v_run_input ELSE COALESCE(agg_deps.deps_output, '{}'::jsonb) END @> step.condition_pattern)
253+
AND
254+
(step.condition_not_pattern IS NULL OR
255+
NOT (CASE WHEN step.deps_count = 0 THEN v_run_input ELSE COALESCE(agg_deps.deps_output, '{}'::jsonb) END @> step.condition_not_pattern))
241256
)
242257
ORDER BY step.step_index;
243258

pkgs/core/src/database-types.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ export type Database = {
278278
}
279279
steps: {
280280
Row: {
281+
condition_not_pattern: Json | null
281282
condition_pattern: Json | null
282283
created_at: string
283284
deps_count: number
@@ -293,6 +294,7 @@ export type Database = {
293294
when_unmet: string
294295
}
295296
Insert: {
297+
condition_not_pattern?: Json | null
296298
condition_pattern?: Json | null
297299
created_at?: string
298300
deps_count?: number
@@ -308,6 +310,7 @@ export type Database = {
308310
when_unmet?: string
309311
}
310312
Update: {
313+
condition_not_pattern?: Json | null
311314
condition_pattern?: Json | null
312315
created_at?: string
313316
deps_count?: number
@@ -410,6 +413,7 @@ export type Database = {
410413
add_step: {
411414
Args: {
412415
base_delay?: number
416+
condition_not_pattern?: Json
413417
condition_pattern?: Json
414418
deps_slugs?: string[]
415419
flow_slug: string
@@ -422,6 +426,7 @@ export type Database = {
422426
when_unmet?: string
423427
}
424428
Returns: {
429+
condition_not_pattern: Json | null
425430
condition_pattern: Json | null
426431
created_at: string
427432
deps_count: number

pkgs/core/supabase/migrations/20260105214940_pgflow_step_conditions.sql renamed to pkgs/core/supabase/migrations/20260108131350_pgflow_step_conditions.sql

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ END) <= 1), ADD CONSTRAINT "skip_reason_matches_status" CHECK (((status = 'skipp
1515
-- Create index "idx_step_states_skipped" to table: "step_states"
1616
CREATE INDEX "idx_step_states_skipped" ON "pgflow"."step_states" ("run_id", "step_slug") WHERE (status = 'skipped'::text);
1717
-- Modify "steps" table
18-
ALTER TABLE "pgflow"."steps" ADD CONSTRAINT "when_failed_is_valid" CHECK (when_failed = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD CONSTRAINT "when_unmet_is_valid" CHECK (when_unmet = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD COLUMN "condition_pattern" jsonb NULL, ADD COLUMN "when_unmet" text NOT NULL DEFAULT 'skip', ADD COLUMN "when_failed" text NOT NULL DEFAULT 'fail';
18+
ALTER TABLE "pgflow"."steps" ADD CONSTRAINT "when_failed_is_valid" CHECK (when_failed = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD CONSTRAINT "when_unmet_is_valid" CHECK (when_unmet = ANY (ARRAY['fail'::text, 'skip'::text, 'skip-cascade'::text])), ADD COLUMN "condition_pattern" jsonb NULL, ADD COLUMN "condition_not_pattern" jsonb NULL, ADD COLUMN "when_unmet" text NOT NULL DEFAULT 'skip', ADD COLUMN "when_failed" text NOT NULL DEFAULT 'fail';
1919
-- Create "_cascade_force_skip_steps" function
2020
CREATE FUNCTION "pgflow"."_cascade_force_skip_steps" ("run_id" uuid, "step_slug" text, "skip_reason" text) RETURNS integer LANGUAGE plpgsql AS $$
2121
DECLARE
@@ -151,11 +151,15 @@ BEGIN
151151
-- PHASE 1a: CHECK FOR FAIL CONDITIONS
152152
-- ==========================================
153153
-- Find first step (by topological order) with unmet condition and 'fail' mode.
154+
-- Condition is unmet when:
155+
-- (condition_pattern is set AND input does NOT contain it) OR
156+
-- (condition_not_pattern is set AND input DOES contain it)
154157
WITH steps_with_conditions AS (
155158
SELECT
156159
step_state.flow_slug,
157160
step_state.step_slug,
158161
step.condition_pattern,
162+
step.condition_not_pattern,
159163
step.when_unmet,
160164
step.deps_count,
161165
step.step_index
@@ -166,7 +170,7 @@ BEGIN
166170
WHERE step_state.run_id = cascade_resolve_conditions.run_id
167171
AND step_state.status = 'created'
168172
AND step_state.remaining_deps = 0
169-
AND step.condition_pattern IS NOT NULL
173+
AND (step.condition_pattern IS NOT NULL OR step.condition_not_pattern IS NOT NULL)
170174
),
171175
step_deps_output AS (
172176
SELECT
@@ -184,26 +188,31 @@ BEGIN
184188
condition_evaluations AS (
185189
SELECT
186190
swc.*,
187-
CASE
188-
WHEN swc.deps_count = 0 THEN v_run_input @> swc.condition_pattern
189-
ELSE COALESCE(sdo.deps_output, '{}'::jsonb) @> swc.condition_pattern
190-
END AS condition_met
191+
-- condition_met = (if IS NULL OR input @> if) AND (ifNot IS NULL OR NOT(input @> ifNot))
192+
(swc.condition_pattern IS NULL OR
193+
CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.condition_pattern)
194+
AND
195+
(swc.condition_not_pattern IS NULL OR
196+
NOT (CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.condition_not_pattern))
197+
AS condition_met
191198
FROM steps_with_conditions swc
192199
LEFT JOIN step_deps_output sdo ON sdo.step_slug = swc.step_slug
193200
)
194-
SELECT flow_slug, step_slug, condition_pattern
201+
SELECT flow_slug, step_slug, condition_pattern, condition_not_pattern
195202
INTO v_first_fail
196203
FROM condition_evaluations
197204
WHERE NOT condition_met AND when_unmet = 'fail'
198205
ORDER BY step_index
199206
LIMIT 1;
200207

201208
-- Handle fail mode: fail step and run, return false
202-
IF v_first_fail IS NOT NULL THEN
209+
-- Note: Cannot use "v_first_fail IS NOT NULL" because records with NULL fields
210+
-- evaluate to NULL in IS NOT NULL checks. Use FOUND instead.
211+
IF FOUND THEN
203212
UPDATE pgflow.step_states
204213
SET status = 'failed',
205214
failed_at = now(),
206-
error_message = 'Condition not met: ' || v_first_fail.condition_pattern::text
215+
error_message = 'Condition not met'
207216
WHERE pgflow.step_states.run_id = cascade_resolve_conditions.run_id
208217
AND pgflow.step_states.step_slug = v_first_fail.step_slug;
209218

@@ -219,12 +228,13 @@ BEGIN
219228
-- PHASE 1b: HANDLE SKIP CONDITIONS (with propagation)
220229
-- ==========================================
221230
-- Skip steps with unmet conditions and whenUnmet='skip'.
222-
-- NEW: Also decrement remaining_deps on dependents and set initial_tasks=0 for map dependents.
231+
-- Also decrement remaining_deps on dependents and set initial_tasks=0 for map dependents.
223232
WITH steps_with_conditions AS (
224233
SELECT
225234
step_state.flow_slug,
226235
step_state.step_slug,
227236
step.condition_pattern,
237+
step.condition_not_pattern,
228238
step.when_unmet,
229239
step.deps_count,
230240
step.step_index
@@ -235,7 +245,7 @@ BEGIN
235245
WHERE step_state.run_id = cascade_resolve_conditions.run_id
236246
AND step_state.status = 'created'
237247
AND step_state.remaining_deps = 0
238-
AND step.condition_pattern IS NOT NULL
248+
AND (step.condition_pattern IS NOT NULL OR step.condition_not_pattern IS NOT NULL)
239249
),
240250
step_deps_output AS (
241251
SELECT
@@ -253,10 +263,13 @@ BEGIN
253263
condition_evaluations AS (
254264
SELECT
255265
swc.*,
256-
CASE
257-
WHEN swc.deps_count = 0 THEN v_run_input @> swc.condition_pattern
258-
ELSE COALESCE(sdo.deps_output, '{}'::jsonb) @> swc.condition_pattern
259-
END AS condition_met
266+
-- condition_met = (if IS NULL OR input @> if) AND (ifNot IS NULL OR NOT(input @> ifNot))
267+
(swc.condition_pattern IS NULL OR
268+
CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.condition_pattern)
269+
AND
270+
(swc.condition_not_pattern IS NULL OR
271+
NOT (CASE WHEN swc.deps_count = 0 THEN v_run_input ELSE COALESCE(sdo.deps_output, '{}'::jsonb) END @> swc.condition_not_pattern))
272+
AS condition_met
260273
FROM steps_with_conditions swc
261274
LEFT JOIN step_deps_output sdo ON sdo.step_slug = swc.step_slug
262275
),
@@ -336,13 +349,15 @@ BEGIN
336349
WHERE ready_step.run_id = cascade_resolve_conditions.run_id
337350
AND ready_step.status = 'created'
338351
AND ready_step.remaining_deps = 0
339-
AND step.condition_pattern IS NOT NULL
352+
AND (step.condition_pattern IS NOT NULL OR step.condition_not_pattern IS NOT NULL)
340353
AND step.when_unmet = 'skip-cascade'
354+
-- Condition is NOT met when: (if fails) OR (ifNot fails)
341355
AND NOT (
342-
CASE
343-
WHEN step.deps_count = 0 THEN v_run_input @> step.condition_pattern
344-
ELSE COALESCE(agg_deps.deps_output, '{}'::jsonb) @> step.condition_pattern
345-
END
356+
(step.condition_pattern IS NULL OR
357+
CASE WHEN step.deps_count = 0 THEN v_run_input ELSE COALESCE(agg_deps.deps_output, '{}'::jsonb) END @> step.condition_pattern)
358+
AND
359+
(step.condition_not_pattern IS NULL OR
360+
NOT (CASE WHEN step.deps_count = 0 THEN v_run_input ELSE COALESCE(agg_deps.deps_output, '{}'::jsonb) END @> step.condition_not_pattern))
346361
)
347362
ORDER BY step.step_index;
348363

@@ -1440,7 +1455,7 @@ with tasks as (
14401455
dep_out.step_slug = st.step_slug
14411456
$$;
14421457
-- Create "add_step" function
1443-
CREATE FUNCTION "pgflow"."add_step" ("flow_slug" text, "step_slug" text, "deps_slugs" text[] DEFAULT '{}', "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer, "start_delay" integer DEFAULT NULL::integer, "step_type" text DEFAULT 'single', "condition_pattern" jsonb DEFAULT NULL::jsonb, "when_unmet" text DEFAULT 'skip', "when_failed" text DEFAULT 'fail') RETURNS "pgflow"."steps" LANGUAGE plpgsql SET "search_path" = '' AS $$
1458+
CREATE FUNCTION "pgflow"."add_step" ("flow_slug" text, "step_slug" text, "deps_slugs" text[] DEFAULT '{}', "max_attempts" integer DEFAULT NULL::integer, "base_delay" integer DEFAULT NULL::integer, "timeout" integer DEFAULT NULL::integer, "start_delay" integer DEFAULT NULL::integer, "step_type" text DEFAULT 'single', "condition_pattern" jsonb DEFAULT NULL::jsonb, "condition_not_pattern" jsonb DEFAULT NULL::jsonb, "when_unmet" text DEFAULT 'skip', "when_failed" text DEFAULT 'fail') RETURNS "pgflow"."steps" LANGUAGE plpgsql SET "search_path" = '' AS $$
14441459
DECLARE
14451460
result_step pgflow.steps;
14461461
next_idx int;
@@ -1465,7 +1480,7 @@ BEGIN
14651480
INSERT INTO pgflow.steps (
14661481
flow_slug, step_slug, step_type, step_index, deps_count,
14671482
opt_max_attempts, opt_base_delay, opt_timeout, opt_start_delay,
1468-
condition_pattern, when_unmet, when_failed
1483+
condition_pattern, condition_not_pattern, when_unmet, when_failed
14691484
)
14701485
VALUES (
14711486
add_step.flow_slug,
@@ -1478,6 +1493,7 @@ BEGIN
14781493
add_step.timeout,
14791494
add_step.start_delay,
14801495
add_step.condition_pattern,
1496+
add_step.condition_not_pattern,
14811497
add_step.when_unmet,
14821498
add_step.when_failed
14831499
)

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:YiBO80ZA6oQ84E10ZabIvo3OS/XglHkEmBn1Rp5Iay4=
1+
h1:UUZln51my4XRIQECtp1HayMW7tGjk5w8qLQhW0x7gEY=
22
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
@@ -16,4 +16,4 @@ h1:YiBO80ZA6oQ84E10ZabIvo3OS/XglHkEmBn1Rp5Iay4=
1616
20251212100113_pgflow_allow_data_loss_parameter.sql h1:Fg3RHj51STNHS4epQ2J4AFMj7NwG0XfyDTSA/9dcBIQ=
1717
20251225163110_pgflow_add_flow_input_column.sql h1:734uCbTgKmPhTK3TY56uNYZ31T8u59yll9ea7nwtEoc=
1818
20260103145141_pgflow_step_output_storage.sql h1:mgVHSFDLdtYy//SZ6C03j9Str1iS9xCM8Rz/wyFwn3o=
19-
20260105214940_pgflow_step_conditions.sql h1:DIta8qrr+qRvA9aFCdWefk72qp27mcPvGGlAJswmitw=
19+
20260108131350_pgflow_step_conditions.sql h1:7YMszmTlExOtx9PyYLB7hIc3RiMmtB4ZOc2EOQVfuPs=

0 commit comments

Comments
 (0)