-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy path0100_function_fail_task.sql
More file actions
390 lines (365 loc) · 14.2 KB
/
0100_function_fail_task.sql
File metadata and controls
390 lines (365 loc) · 14.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
create or replace function pgflow.fail_task(
run_id uuid,
step_slug text,
task_index int,
error_message text
)
returns setof pgflow.step_tasks
language plpgsql
volatile
set search_path to ''
as $$
DECLARE
v_run_failed boolean;
v_step_failed boolean;
v_step_skipped boolean;
v_when_exhausted text;
v_task_exhausted boolean;
v_flow_slug_for_deps text;
v_prev_step_status text;
v_flow_slug text;
begin
-- If run is already failed, no retries allowed
IF EXISTS (SELECT 1 FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id AND pgflow.runs.status = 'failed') THEN
UPDATE pgflow.step_tasks
SET status = 'failed',
failed_at = now(),
error_message = fail_task.error_message
WHERE pgflow.step_tasks.run_id = fail_task.run_id
AND pgflow.step_tasks.step_slug = fail_task.step_slug
AND pgflow.step_tasks.task_index = fail_task.task_index
AND pgflow.step_tasks.status = 'started';
-- Archive the task's message
PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id))
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = fail_task.run_id
AND st.step_slug = fail_task.step_slug
AND st.task_index = fail_task.task_index
AND st.message_id IS NOT NULL
GROUP BY r.flow_slug
HAVING COUNT(st.message_id) > 0;
RETURN QUERY SELECT * FROM pgflow.step_tasks
WHERE pgflow.step_tasks.run_id = fail_task.run_id
AND pgflow.step_tasks.step_slug = fail_task.step_slug
AND pgflow.step_tasks.task_index = fail_task.task_index;
RETURN;
END IF;
-- Late callback guard: if step is not 'started', don't mutate step/run state
-- Capture previous status BEFORE any CTE updates (for transition-based decrement)
SELECT ss.status INTO v_prev_step_status
FROM pgflow.step_states ss
WHERE ss.run_id = fail_task.run_id
AND ss.step_slug = fail_task.step_slug;
IF v_prev_step_status IS NOT NULL AND v_prev_step_status != 'started' THEN
-- Archive the task message if present
SELECT r.flow_slug INTO v_flow_slug
FROM pgflow.runs r
WHERE r.run_id = fail_task.run_id;
PERFORM pgmq.archive(v_flow_slug, ARRAY_AGG(st.message_id))
FROM pgflow.step_tasks st
WHERE st.run_id = fail_task.run_id
AND st.step_slug = fail_task.step_slug
AND st.task_index = fail_task.task_index
AND st.message_id IS NOT NULL
HAVING COUNT(st.message_id) > 0;
RETURN QUERY SELECT * FROM pgflow.step_tasks
WHERE pgflow.step_tasks.run_id = fail_task.run_id
AND pgflow.step_tasks.step_slug = fail_task.step_slug
AND pgflow.step_tasks.task_index = fail_task.task_index;
RETURN;
END IF;
WITH run_lock AS (
SELECT * FROM pgflow.runs
WHERE pgflow.runs.run_id = fail_task.run_id
FOR UPDATE
),
step_lock AS (
SELECT * FROM pgflow.step_states
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug
FOR UPDATE
),
flow_info AS (
SELECT r.flow_slug
FROM pgflow.runs r
WHERE r.run_id = fail_task.run_id
),
config AS (
SELECT
COALESCE(s.opt_max_attempts, f.opt_max_attempts) AS opt_max_attempts,
COALESCE(s.opt_base_delay, f.opt_base_delay) AS opt_base_delay,
s.when_exhausted
FROM pgflow.steps s
JOIN pgflow.flows f ON f.flow_slug = s.flow_slug
JOIN flow_info fi ON fi.flow_slug = s.flow_slug
WHERE s.flow_slug = fi.flow_slug AND s.step_slug = fail_task.step_slug
),
fail_or_retry_task as (
UPDATE pgflow.step_tasks as task
SET
status = CASE
WHEN task.attempts_count < (SELECT opt_max_attempts FROM config) THEN 'queued'
ELSE 'failed'
END,
failed_at = CASE
WHEN task.attempts_count >= (SELECT opt_max_attempts FROM config) THEN now()
ELSE NULL
END,
started_at = CASE
WHEN task.attempts_count < (SELECT opt_max_attempts FROM config) THEN NULL
ELSE task.started_at
END,
error_message = fail_task.error_message
WHERE task.run_id = fail_task.run_id
AND task.step_slug = fail_task.step_slug
AND task.task_index = fail_task.task_index
AND task.status = 'started'
RETURNING *
),
-- Determine if task exhausted retries and get when_exhausted mode
task_status AS (
SELECT
(select status from fail_or_retry_task) AS new_task_status,
(select when_exhausted from config) AS when_exhausted_mode,
-- Task is exhausted when it's failed (no more retries)
((select status from fail_or_retry_task) = 'failed') AS is_exhausted
),
maybe_fail_step AS (
UPDATE pgflow.step_states
SET
-- Status logic:
-- - If task not exhausted (retrying): keep current status
-- - If exhausted AND when_exhausted='fail': set to 'failed'
-- - If exhausted AND when_exhausted IN ('skip', 'skip-cascade'): set to 'skipped'
status = CASE
WHEN NOT (select is_exhausted from task_status) THEN pgflow.step_states.status
WHEN (select when_exhausted_mode from task_status) = 'fail' THEN 'failed'
ELSE 'skipped' -- skip or skip-cascade
END,
failed_at = CASE
WHEN (select is_exhausted from task_status) AND (select when_exhausted_mode from task_status) = 'fail' THEN now()
ELSE NULL
END,
error_message = CASE
WHEN (select is_exhausted from task_status) THEN fail_task.error_message
ELSE NULL
END,
skip_reason = CASE
WHEN (select is_exhausted from task_status) AND (select when_exhausted_mode from task_status) IN ('skip', 'skip-cascade') THEN 'handler_failed'
ELSE pgflow.step_states.skip_reason
END,
skipped_at = CASE
WHEN (select is_exhausted from task_status) AND (select when_exhausted_mode from task_status) IN ('skip', 'skip-cascade') THEN now()
ELSE pgflow.step_states.skipped_at
END,
-- Clear remaining_tasks when skipping (required by remaining_tasks_state_consistency constraint)
remaining_tasks = CASE
WHEN (select is_exhausted from task_status) AND (select when_exhausted_mode from task_status) IN ('skip', 'skip-cascade') THEN NULL
ELSE pgflow.step_states.remaining_tasks
END
FROM fail_or_retry_task
WHERE pgflow.step_states.run_id = fail_task.run_id
AND pgflow.step_states.step_slug = fail_task.step_slug
RETURNING pgflow.step_states.*
),
run_update AS (
-- Update run status: only fail when when_exhausted='fail' and step was failed
UPDATE pgflow.runs
SET status = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN 'failed'
ELSE status
END,
failed_at = CASE
WHEN (select status from maybe_fail_step) = 'failed' THEN now()
ELSE NULL
END,
-- Decrement remaining_steps only on FIRST transition to skipped
-- (not when step was already skipped and a second task fails)
-- Uses PL/pgSQL variable captured before CTE chain
remaining_steps = CASE
WHEN (select status from maybe_fail_step) = 'skipped'
AND v_prev_step_status != 'skipped'
THEN pgflow.runs.remaining_steps - 1
ELSE pgflow.runs.remaining_steps
END
WHERE pgflow.runs.run_id = fail_task.run_id
RETURNING pgflow.runs.status
)
SELECT
COALESCE((SELECT status = 'failed' FROM run_update), false),
COALESCE((SELECT status = 'failed' FROM maybe_fail_step), false),
COALESCE((SELECT status = 'skipped' FROM maybe_fail_step), false),
COALESCE((SELECT is_exhausted FROM task_status), false)
INTO v_run_failed, v_step_failed, v_step_skipped, v_task_exhausted;
-- Capture when_exhausted mode for later skip handling
SELECT s.when_exhausted INTO v_when_exhausted
FROM pgflow.steps s
JOIN pgflow.runs r ON r.flow_slug = s.flow_slug
WHERE r.run_id = fail_task.run_id
AND s.step_slug = fail_task.step_slug;
-- Send broadcast event for step failure if the step was failed
IF v_task_exhausted AND v_step_failed THEN
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'step:failed',
'run_id', fail_task.run_id,
'step_slug', fail_task.step_slug,
'status', 'failed',
'error_message', fail_task.error_message,
'failed_at', now()
),
concat('step:', fail_task.step_slug, ':failed'),
concat('pgflow:run:', fail_task.run_id),
false
);
END IF;
-- Handle step skipping (when_exhausted = 'skip' or 'skip-cascade')
IF v_task_exhausted AND v_step_skipped THEN
-- Archive all queued/started sibling task messages for this step
PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id))
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = fail_task.run_id
AND st.step_slug = fail_task.step_slug
AND st.status IN ('queued', 'started')
AND st.message_id IS NOT NULL
GROUP BY r.flow_slug
HAVING COUNT(st.message_id) > 0;
-- Send broadcast event for step skipped
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'step:skipped',
'run_id', fail_task.run_id,
'step_slug', fail_task.step_slug,
'status', 'skipped',
'skip_reason', 'handler_failed',
'error_message', fail_task.error_message,
'skipped_at', now()
),
concat('step:', fail_task.step_slug, ':skipped'),
concat('pgflow:run:', fail_task.run_id),
false
);
-- For skip-cascade: cascade skip to all downstream dependents
IF v_when_exhausted = 'skip-cascade' THEN
PERFORM pgflow._cascade_force_skip_steps(fail_task.run_id, fail_task.step_slug, 'handler_failed');
ELSE
-- For plain 'skip': decrement remaining_deps on dependent steps
-- (This mirrors the pattern in cascade_resolve_conditions.sql for when_unmet='skip')
SELECT flow_slug INTO v_flow_slug_for_deps
FROM pgflow.runs
WHERE pgflow.runs.run_id = fail_task.run_id;
UPDATE pgflow.step_states AS child_state
SET remaining_deps = child_state.remaining_deps - 1,
-- If child is a map step and this skipped step is its only dependency,
-- set initial_tasks = 0 (skipped dep = empty array)
initial_tasks = CASE
WHEN child_step.step_type = 'map' AND child_step.deps_count = 1 THEN 0
ELSE child_state.initial_tasks
END
FROM pgflow.deps AS dep
JOIN pgflow.steps AS child_step ON child_step.flow_slug = dep.flow_slug AND child_step.step_slug = dep.step_slug
WHERE child_state.run_id = fail_task.run_id
AND dep.flow_slug = v_flow_slug_for_deps
AND dep.dep_slug = fail_task.step_slug
AND child_state.step_slug = dep.step_slug;
-- Evaluate conditions on newly-ready dependent steps
-- This must happen before cascade_complete_taskless_steps so that
-- skipped steps can set initial_tasks=0 for their map dependents
IF NOT pgflow.cascade_resolve_conditions(fail_task.run_id) THEN
-- Run was failed due to a condition with when_unmet='fail'
-- Archive the failed task's message before returning
PERFORM pgflow._archive_task_message(fail_task.run_id, fail_task.step_slug, fail_task.task_index);
-- Return the task row (API contract)
RETURN QUERY SELECT * FROM pgflow.step_tasks
WHERE pgflow.step_tasks.run_id = fail_task.run_id
AND pgflow.step_tasks.step_slug = fail_task.step_slug
AND pgflow.step_tasks.task_index = fail_task.task_index;
RETURN;
END IF;
-- Auto-complete taskless steps (e.g., map steps with initial_tasks=0 from skipped dep)
PERFORM pgflow.cascade_complete_taskless_steps(fail_task.run_id);
-- Start steps that became ready after condition resolution and taskless completion
PERFORM pgflow.start_ready_steps(fail_task.run_id);
END IF;
-- Try to complete the run (remaining_steps may now be 0)
PERFORM pgflow.maybe_complete_run(fail_task.run_id);
END IF;
-- Send broadcast event for run failure if the run was failed
IF v_run_failed THEN
DECLARE
v_flow_slug text;
BEGIN
SELECT flow_slug INTO v_flow_slug FROM pgflow.runs WHERE pgflow.runs.run_id = fail_task.run_id;
PERFORM realtime.send(
jsonb_build_object(
'event_type', 'run:failed',
'run_id', fail_task.run_id,
'flow_slug', v_flow_slug,
'status', 'failed',
'error_message', fail_task.error_message,
'failed_at', now()
),
'run:failed',
concat('pgflow:run:', fail_task.run_id),
false
);
END;
END IF;
-- Archive all active messages (both queued and started) when run fails
IF v_run_failed THEN
PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id))
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = fail_task.run_id
AND st.status IN ('queued', 'started')
AND st.message_id IS NOT NULL
GROUP BY r.flow_slug
HAVING COUNT(st.message_id) > 0;
END IF;
-- For queued tasks: delay the message for retry with exponential backoff
PERFORM (
WITH retry_config AS (
SELECT
COALESCE(s.opt_base_delay, f.opt_base_delay) AS base_delay
FROM pgflow.steps s
JOIN pgflow.flows f ON f.flow_slug = s.flow_slug
JOIN pgflow.runs r ON r.flow_slug = f.flow_slug
WHERE r.run_id = fail_task.run_id
AND s.step_slug = fail_task.step_slug
),
queued_tasks AS (
SELECT
r.flow_slug,
st.message_id,
pgflow.calculate_retry_delay((SELECT base_delay FROM retry_config), st.attempts_count) AS calculated_delay
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = fail_task.run_id
AND st.step_slug = fail_task.step_slug
AND st.task_index = fail_task.task_index
AND st.status = 'queued'
)
SELECT pgmq.set_vt(qt.flow_slug, qt.message_id, qt.calculated_delay)
FROM queued_tasks qt
WHERE EXISTS (SELECT 1 FROM queued_tasks)
);
-- For failed tasks: archive the message
PERFORM pgmq.archive(r.flow_slug, ARRAY_AGG(st.message_id))
FROM pgflow.step_tasks st
JOIN pgflow.runs r ON st.run_id = r.run_id
WHERE st.run_id = fail_task.run_id
AND st.step_slug = fail_task.step_slug
AND st.task_index = fail_task.task_index
AND st.status = 'failed'
AND st.message_id IS NOT NULL
GROUP BY r.flow_slug
HAVING COUNT(st.message_id) > 0;
return query select *
from pgflow.step_tasks st
where st.run_id = fail_task.run_id
and st.step_slug = fail_task.step_slug
and st.task_index = fail_task.task_index;
end;
$$;