Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
-- Test: ifNot empty object pattern - step runs when dependency is absent/skipped
--
-- Verifies that ifNot: { dep: {} } (empty object pattern) correctly detects
-- when a dependency is absent (skipped) and causes the fallback step to run.
--
-- PostgreSQL containment semantics:
-- - When dep is skipped, deps_output is {} (empty object)
-- - {} @> { dep: {} } = FALSE (empty object doesn't contain dep key)
-- - NOT(FALSE) = TRUE, so ifNot condition is met -> step runs
begin;
select plan(2);

select pgflow_tests.reset_db();

-- Create flow: skippable_dep -> fallback (with ifNot: { skippable_dep: {} })
select pgflow.create_flow('empty_pattern_test');

-- Step A: Skippable based on input pattern
select pgflow.add_step(
flow_slug => 'empty_pattern_test',
step_slug => 'skippable_dep',
required_input_pattern => '{"run_dep": true}'::jsonb,
when_unmet => 'skip'
);

-- Step B: Fallback - runs when A is absent (empty object pattern)
select pgflow.add_step(
flow_slug => 'empty_pattern_test',
step_slug => 'fallback',
deps_slugs => ARRAY['skippable_dep'],
forbidden_input_pattern => '{"skippable_dep": {}}'::jsonb,
when_unmet => 'skip'
);

-- Start flow with input that causes dep to skip (run_dep: false)
with flow as (
select * from pgflow.start_flow('empty_pattern_test', '{"run_dep": false}'::jsonb)
)
select run_id into temporary run_ids from flow;

-- Test 1: skippable_dep should be skipped
select is(
(select status from pgflow.step_states
where run_id = (select run_id from run_ids) and step_slug = 'skippable_dep'),
'skipped',
'Dependency with unmet condition should be skipped'
);

-- Test 2: fallback should be started (empty object pattern matched -> ifNot passed)
select is(
(select status from pgflow.step_states
where run_id = (select run_id from run_ids) and step_slug = 'fallback'),
'started',
'Step with ifNot: {dep: {}} should start when dep is absent'
);

drop table if exists run_ids;
select finish();
rollback;
46 changes: 28 additions & 18 deletions pkgs/dsl/__tests__/types/skippable-deps.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,24 @@ describe('skippable deps type safety', () => {
}>();
});

it('step with whenUnmet: skip-cascade makes output optional for dependents', () => {
it('step with whenUnmet: skip-cascade keeps output required (cascade skips dependents)', () => {
// skip-cascade means if the step is skipped, its dependents are ALSO skipped
// So if the dependent handler runs at all, the parent must have succeeded
// Therefore the dependency should be required, not optional
const flow = new Flow<{ value: number }>({ slug: 'test' })
.step(
{ slug: 'conditional', if: { value: 42 }, whenUnmet: 'skip-cascade' },
(input) => ({ result: input.value * 2 })
)
.step({ slug: 'dependent', dependsOn: ['conditional'] }, (deps) => {
expectTypeOf(deps.conditional).toEqualTypeOf<
{ result: number } | undefined
>();
// With skip-cascade, if we're running, the dependency succeeded
expectTypeOf(deps.conditional).toEqualTypeOf<{ result: number }>();
return { done: true };
});

type DepInput = StepInput<typeof flow, 'dependent'>;
expectTypeOf<DepInput>().toEqualTypeOf<{
conditional?: { result: number };
conditional: { result: number };
}>();
});

Expand Down Expand Up @@ -101,21 +103,22 @@ describe('skippable deps type safety', () => {
}>();
});

it('step with retriesExhausted: skip-cascade makes output optional for dependents', () => {
it('step with retriesExhausted: skip-cascade keeps output required (cascade skips dependents)', () => {
// skip-cascade means if the step is skipped, its dependents are ALSO skipped
// So if the dependent handler runs at all, the parent must have succeeded
const flow = new Flow<{ value: number }>({ slug: 'test' })
.step({ slug: 'risky', retriesExhausted: 'skip-cascade' }, (input) => ({
result: input.value * 2,
}))
.step({ slug: 'dependent', dependsOn: ['risky'] }, (deps) => {
expectTypeOf(deps.risky).toEqualTypeOf<
{ result: number } | undefined
>();
// With skip-cascade, if we're running, the dependency succeeded
expectTypeOf(deps.risky).toEqualTypeOf<{ result: number }>();
return { done: true };
});

type DepInput = StepInput<typeof flow, 'dependent'>;
expectTypeOf<DepInput>().toEqualTypeOf<{
risky?: { result: number };
risky: { result: number };
}>();
});

Expand Down Expand Up @@ -251,16 +254,17 @@ describe('skippable deps type safety', () => {
}>();
});

it('cascade does NOT propagate: A(skip-cascade) -> B: B output NOT automatically optional', () => {
it('cascade does NOT propagate: A(skip-cascade) -> B: B sees A as required', () => {
// skip-cascade means A and its dependents get skipped at RUNTIME
// but B itself is not marked as skippable in its definition
// so if B does run, its output is required for its own dependents
// If A is skipped, B is also skipped (cascade), so B never runs with undefined A
// Therefore B should see A as required, not optional
const flow = new Flow<{ value: number }>({ slug: 'test' })
.step({ slug: 'a', retriesExhausted: 'skip-cascade' }, () => ({
aVal: 1,
}))
.step({ slug: 'b', dependsOn: ['a'] }, (deps) => {
expectTypeOf(deps.a).toEqualTypeOf<{ aVal: number } | undefined>();
// With skip-cascade, if B runs, A must have succeeded
expectTypeOf(deps.a).toEqualTypeOf<{ aVal: number }>();
return { bVal: 2 };
})
.step({ slug: 'c', dependsOn: ['b'] }, (deps) => {
Expand All @@ -269,6 +273,9 @@ describe('skippable deps type safety', () => {
return { cVal: 3 };
});

type BInput = StepInput<typeof flow, 'b'>;
expectTypeOf<BInput>().toEqualTypeOf<{ a: { aVal: number } }>();

type CInput = StepInput<typeof flow, 'c'>;
expectTypeOf<CInput>().toEqualTypeOf<{ b: { bVal: number } }>();
});
Expand Down Expand Up @@ -475,26 +482,29 @@ describe('skippable deps compile-time errors', () => {
});
});

it('should reject direct property access with whenUnmet: skip-cascade', () => {
it('should ALLOW direct property access with whenUnmet: skip-cascade (cascade skips dependents)', () => {
// With skip-cascade, if the dependent runs, the parent must have succeeded
// So direct property access should be allowed
new Flow<{ value: number }>({ slug: 'test' })
.step(
{ slug: 'cascading', if: { value: 42 }, whenUnmet: 'skip-cascade' },
() => ({ count: 10 })
)
.step({ slug: 'next', dependsOn: ['cascading'] }, (deps) => {
// @ts-expect-error - deps.cascading is optional due to whenUnmet: skip-cascade
// No error - deps.cascading is required with skip-cascade
const num: number = deps.cascading.count;
return { num };
});
});

it('should reject direct property access with retriesExhausted: skip-cascade', () => {
it('should ALLOW direct property access with retriesExhausted: skip-cascade (cascade skips dependents)', () => {
// With skip-cascade, if the dependent runs, the parent must have succeeded
new Flow<{ value: number }>({ slug: 'test' })
.step({ slug: 'risky', retriesExhausted: 'skip-cascade' }, () => ({
status: 'ok',
}))
.step({ slug: 'next', dependsOn: ['risky'] }, (deps) => {
// @ts-expect-error - deps.risky is optional due to retriesExhausted: skip-cascade
// No error - deps.risky is required with skip-cascade
const s: string = deps.risky.status;
return { s };
});
Expand Down
65 changes: 42 additions & 23 deletions pkgs/dsl/src/dsl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,14 @@ export type AnyInput = Json;
export type AnyOutput = Json;

// Step Types
// Skippable mode: 'skip' makes deps optional, 'skip-cascade' keeps deps required
// (because cascade skips dependents at runtime, so if handler runs, dep succeeded)
export type SkippableMode = 'skip' | 'skip-cascade' | false;

// Step metadata structure - enriched type that tracks output and skippability
export interface StepMeta<
TOutput = AnyOutput,
TSkippable extends boolean = boolean
TSkippable extends SkippableMode = SkippableMode
> {
output: TOutput;
skippable: TSkippable;
Expand Down Expand Up @@ -285,23 +289,34 @@ export type StepOutput<
: never;

/**
* Checks if a step is skippable (has whenUnmet: 'skip' | 'skip-cascade' or retriesExhausted: 'skip' | 'skip-cascade')
* Gets the skippable mode for a step ('skip' | 'skip-cascade' | false)
* @template TFlow - The Flow type
* @template TStepSlug - The step slug to check
*/
export type IsStepSkippable<
export type GetSkippableMode<
TFlow extends AnyFlow,
TStepSlug extends string
> = TStepSlug extends keyof ExtractFlowStepsRaw<TFlow>
? ExtractFlowStepsRaw<TFlow>[TStepSlug]['skippable']
: false;

/**
* Checks if a step makes its dependents' deps optional (only 'skip' mode, not 'skip-cascade')
* With 'skip-cascade', dependents are also skipped at runtime, so if handler runs, dep succeeded.
*/
export type IsStepSkippable<
TFlow extends AnyFlow,
TStepSlug extends string
> = GetSkippableMode<TFlow, TStepSlug> extends 'skip' ? true : false;

// Helper types for StepInput with optional skippable deps
// Only 'skip' mode makes deps optional (dependents run with undefined value)
// 'skip-cascade' keeps deps required (dependents also skipped, so value guaranteed if running)
type RequiredDeps<TFlow extends AnyFlow, TStepSlug extends string> = {
[K in Extract<
keyof ExtractFlowSteps<TFlow>,
StepDepsOf<TFlow, TStepSlug>
> as IsStepSkippable<TFlow, K & string> extends true
> as GetSkippableMode<TFlow, K & string> extends 'skip'
? never
: K]: ExtractFlowSteps<TFlow>[K];
};
Expand All @@ -310,7 +325,7 @@ type OptionalDeps<TFlow extends AnyFlow, TStepSlug extends string> = {
[K in Extract<
keyof ExtractFlowSteps<TFlow>,
StepDepsOf<TFlow, TStepSlug>
> as IsStepSkippable<TFlow, K & string> extends true
> as GetSkippableMode<TFlow, K & string> extends 'skip'
? K
: never]?: ExtractFlowSteps<TFlow>[K];
};
Expand All @@ -319,8 +334,10 @@ type OptionalDeps<TFlow extends AnyFlow, TStepSlug extends string> = {
* Asymmetric step input type:
* - Root steps (no dependencies): receive flow input directly
* - Dependent steps: receive only their dependencies (flow input available via context)
* - Skippable deps (whenUnmet/retriesExhausted: 'skip' | 'skip-cascade') are optional
* - Required deps are required
* - Skippable deps (whenUnmet/retriesExhausted: 'skip') are optional
* - Cascade deps (whenUnmet/retriesExhausted: 'skip-cascade') are required
* (because if handler runs, the dependency must have succeeded)
* - All other deps are required
*
* This enables functional composition where subflows can receive typed inputs
* without the 'run' wrapper that previously blocked type matching.
Expand Down Expand Up @@ -446,21 +463,23 @@ export type RetriesExhaustedMode = 'fail' | 'skip' | 'skip-cascade';

/**
* Helper type for dependent step handlers - creates deps object with correct optionality.
* Skippable deps (steps with whenUnmet/retriesExhausted: 'skip' | 'skip-cascade') are optional.
* Required deps are required.
* Only steps with 'skip' mode (not 'skip-cascade') make deps optional.
* With 'skip-cascade', dependents are also skipped at runtime, so if handler runs, dep succeeded.
*/
type DepsWithOptionalSkippable<
TSteps extends AnySteps,
TDeps extends string
> = {
// Required deps: either not skippable or skip-cascade (cascade skips dependents, so value guaranteed)
[K in TDeps as K extends keyof TSteps
? TSteps[K]['skippable'] extends true
? TSteps[K]['skippable'] extends 'skip'
? never
: K
: K]: K extends keyof TSteps ? TSteps[K]['output'] : never;
} & {
// Optional deps: only 'skip' mode (dependents run with undefined value)
[K in TDeps as K extends keyof TSteps
? TSteps[K]['skippable'] extends true
? TSteps[K]['skippable'] extends 'skip'
? K
: never
: never]?: K extends keyof TSteps ? TSteps[K]['output'] : never;
Expand Down Expand Up @@ -726,9 +745,9 @@ export class Flow<
[K in Slug]: StepMeta<
Awaited<TOutput>,
TWhenUnmet extends 'skip' | 'skip-cascade'
? true
? TWhenUnmet
: TRetries extends 'skip' | 'skip-cascade'
? true
? TRetries
: false
>;
},
Expand Down Expand Up @@ -775,9 +794,9 @@ export class Flow<
[K in Slug]: StepMeta<
Awaited<TOutput>,
TWhenUnmet extends 'skip' | 'skip-cascade'
? true
? TWhenUnmet
: TRetries extends 'skip' | 'skip-cascade'
? true
? TRetries
: false
>;
},
Expand Down Expand Up @@ -891,9 +910,9 @@ export class Flow<
[K in Slug]: StepMeta<
Awaited<TOutput>,
TWhenUnmet extends 'skip' | 'skip-cascade'
? true
? TWhenUnmet
: TRetries extends 'skip' | 'skip-cascade'
? true
? TRetries
: false
>;
},
Expand Down Expand Up @@ -939,9 +958,9 @@ export class Flow<
[K in Slug]: StepMeta<
Awaited<TOutput>,
TWhenUnmet extends 'skip' | 'skip-cascade'
? true
? TWhenUnmet
: TRetries extends 'skip' | 'skip-cascade'
? true
? TRetries
: false
>;
},
Expand Down Expand Up @@ -999,9 +1018,9 @@ export class Flow<
[K in Slug]: StepMeta<
AwaitedReturn<THandler>[],
TWhenUnmet extends 'skip' | 'skip-cascade'
? true
? TWhenUnmet
: TRetries extends 'skip' | 'skip-cascade'
? true
? TRetries
: false
>;
},
Expand Down Expand Up @@ -1048,9 +1067,9 @@ export class Flow<
[K in Slug]: StepMeta<
AwaitedReturn<THandler>[],
TWhenUnmet extends 'skip' | 'skip-cascade'
? true
? TWhenUnmet
: TRetries extends 'skip' | 'skip-cascade'
? true
? TRetries
: false
>;
},
Expand Down