Skip to content

Commit 9083fa2

Browse files
committed
feat: add requeue tracking columns and implement stalled task requeue logic
- Introduced requeued_count and last_requeued_at columns to step_tasks table - Developed requeue_stalled_tasks function to requeue or fail stalled tasks based on max requeues - Created setup_requeue_stalled_tasks_cron function to schedule automatic requeue checks - Updated migration scripts to include new columns and functions - Added comprehensive tests for requeue behavior, max requeue limit, and cron setup
1 parent 367f62f commit 9083fa2

10 files changed

Lines changed: 619 additions & 11 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
'@pgflow/core': patch
3+
'@pgflow/edge-worker': patch
4+
---
5+
6+
Add automatic requeue for stalled tasks via cron job - tasks stuck beyond timeout+30s are requeued up to 3 times, then archived with status left as 'started' for easy identification (closes #586)

pkgs/core/schemas/0060_tables_runtime.sql

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@ create table pgflow.step_tasks (
8181
completed_at timestamptz,
8282
failed_at timestamptz,
8383
last_worker_id uuid references pgflow.workers (worker_id) on delete set null,
84+
-- Requeue tracking columns
85+
requeued_count int not null default 0,
86+
last_requeued_at timestamptz,
8487
constraint step_tasks_pkey primary key (run_id, step_slug, task_index),
8588
foreign key (run_id, step_slug)
8689
references pgflow.step_states (run_id, step_slug),
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
-- Requeue stalled tasks that have been in 'started' status longer than their timeout + 30s buffer
2+
-- This handles tasks that got stuck when workers crashed without completing them
3+
create or replace function pgflow.requeue_stalled_tasks()
4+
returns int
5+
language plpgsql
6+
security definer
7+
set search_path = ''
8+
as $$
9+
declare
10+
result_count int := 0;
11+
max_requeues constant int := 3;
12+
begin
13+
-- Find and requeue stalled tasks (where started_at > timeout + 30s buffer)
14+
-- Tasks with requeued_count >= max_requeues will have their message archived
15+
-- but status left as 'started' for easy identification via requeued_count column
16+
with stalled_tasks as (
17+
select
18+
st.run_id,
19+
st.step_slug,
20+
st.task_index,
21+
st.message_id,
22+
r.flow_slug,
23+
st.requeued_count,
24+
f.opt_timeout
25+
from pgflow.step_tasks st
26+
join pgflow.runs r on r.run_id = st.run_id
27+
join pgflow.flows f on f.flow_slug = r.flow_slug
28+
where st.status = 'started'
29+
and st.started_at < now() - (f.opt_timeout * interval '1 second') - interval '30 seconds'
30+
for update of st skip locked
31+
),
32+
-- Separate tasks that can be requeued from those that exceeded max requeues
33+
to_requeue as (
34+
select * from stalled_tasks where requeued_count < max_requeues
35+
),
36+
to_archive as (
37+
select * from stalled_tasks where requeued_count >= max_requeues
38+
),
39+
-- Update tasks that will be requeued
40+
requeued as (
41+
update pgflow.step_tasks st
42+
set
43+
status = 'queued',
44+
started_at = null,
45+
last_worker_id = null,
46+
requeued_count = st.requeued_count + 1,
47+
last_requeued_at = now()
48+
from to_requeue tr
49+
where st.run_id = tr.run_id
50+
and st.step_slug = tr.step_slug
51+
and st.task_index = tr.task_index
52+
returning tr.flow_slug as queue_name, tr.message_id
53+
),
54+
-- Make requeued messages visible immediately (batched per queue)
55+
visibility_reset as (
56+
select pgflow.set_vt_batch(
57+
r.queue_name,
58+
array_agg(r.message_id),
59+
array_agg(0) -- all offsets are 0 (immediate visibility)
60+
)
61+
from requeued r
62+
where r.message_id is not null
63+
group by r.queue_name
64+
),
65+
-- Archive messages for tasks that exceeded max requeues (batched per queue)
66+
-- Task status remains 'started' with requeued_count >= 3 for easy identification
67+
archived as (
68+
select pgmq.archive(ta.flow_slug, array_agg(ta.message_id))
69+
from to_archive ta
70+
where ta.message_id is not null
71+
group by ta.flow_slug
72+
),
73+
-- Force execution of visibility_reset CTE
74+
_vr as (select count(*) from visibility_reset),
75+
-- Force execution of archived CTE
76+
_ar as (select count(*) from archived)
77+
select count(*) into result_count
78+
from requeued, _vr, _ar;
79+
80+
return result_count;
81+
end;
82+
$$;
83+
84+
-- Cron setup function for automatic requeue monitoring
85+
create or replace function pgflow.setup_requeue_stalled_tasks_cron(
86+
cron_interval text default '15 seconds'
87+
)
88+
returns text
89+
language plpgsql
90+
security definer
91+
set search_path = pgflow, cron, pg_temp
92+
as $$
93+
declare
94+
job_id bigint;
95+
begin
96+
-- Remove existing job if any
97+
begin
98+
perform cron.unschedule('pgflow_requeue_stalled_tasks');
99+
exception
100+
when others then null;
101+
end;
102+
103+
-- Schedule the new job
104+
job_id := cron.schedule(
105+
job_name => 'pgflow_requeue_stalled_tasks',
106+
schedule => setup_requeue_stalled_tasks_cron.cron_interval,
107+
command => 'select pgflow.requeue_stalled_tasks()'
108+
);
109+
110+
return format('Scheduled pgflow_requeue_stalled_tasks (every %s, job_id=%s)',
111+
setup_requeue_stalled_tasks_cron.cron_interval, job_id);
112+
end;
113+
$$;
114+
115+
comment on function pgflow.setup_requeue_stalled_tasks_cron(text) is
116+
'Sets up cron job to automatically requeue stalled tasks.
117+
Schedules pgflow_requeue_stalled_tasks at the specified cron_interval (default: 15 seconds).
118+
Replaces existing job if it exists (idempotent).
119+
Returns a confirmation message with job ID.';
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
-- Modify "step_tasks" table
2+
ALTER TABLE "pgflow"."step_tasks" ADD COLUMN "requeued_count" integer NOT NULL DEFAULT 0, ADD COLUMN "last_requeued_at" timestamptz NULL;
3+
-- Create "requeue_stalled_tasks" function
4+
CREATE FUNCTION "pgflow"."requeue_stalled_tasks" () RETURNS integer LANGUAGE plpgsql SECURITY DEFINER SET "search_path" = '' AS $$
5+
declare
6+
result_count int := 0;
7+
max_requeues constant int := 3;
8+
begin
9+
-- Find and requeue stalled tasks (where started_at > timeout + 30s buffer)
10+
-- Tasks with requeued_count >= max_requeues will have their message archived
11+
-- but status left as 'started' for easy identification via requeued_count column
12+
with stalled_tasks as (
13+
select
14+
st.run_id,
15+
st.step_slug,
16+
st.task_index,
17+
st.message_id,
18+
r.flow_slug,
19+
st.requeued_count,
20+
f.opt_timeout
21+
from pgflow.step_tasks st
22+
join pgflow.runs r on r.run_id = st.run_id
23+
join pgflow.flows f on f.flow_slug = r.flow_slug
24+
where st.status = 'started'
25+
and st.started_at < now() - (f.opt_timeout * interval '1 second') - interval '30 seconds'
26+
for update of st skip locked
27+
),
28+
-- Separate tasks that can be requeued from those that exceeded max requeues
29+
to_requeue as (
30+
select * from stalled_tasks where requeued_count < max_requeues
31+
),
32+
to_archive as (
33+
select * from stalled_tasks where requeued_count >= max_requeues
34+
),
35+
-- Update tasks that will be requeued
36+
requeued as (
37+
update pgflow.step_tasks st
38+
set
39+
status = 'queued',
40+
started_at = null,
41+
last_worker_id = null,
42+
requeued_count = st.requeued_count + 1,
43+
last_requeued_at = now()
44+
from to_requeue tr
45+
where st.run_id = tr.run_id
46+
and st.step_slug = tr.step_slug
47+
and st.task_index = tr.task_index
48+
returning tr.flow_slug as queue_name, tr.message_id
49+
),
50+
-- Make requeued messages visible immediately (batched per queue)
51+
visibility_reset as (
52+
select pgflow.set_vt_batch(
53+
r.queue_name,
54+
array_agg(r.message_id),
55+
array_agg(0) -- all offsets are 0 (immediate visibility)
56+
)
57+
from requeued r
58+
where r.message_id is not null
59+
group by r.queue_name
60+
),
61+
-- Archive messages for tasks that exceeded max requeues (batched per queue)
62+
-- Task status remains 'started' with requeued_count >= 3 for easy identification
63+
archived as (
64+
select pgmq.archive(ta.flow_slug, array_agg(ta.message_id))
65+
from to_archive ta
66+
where ta.message_id is not null
67+
group by ta.flow_slug
68+
),
69+
-- Force execution of visibility_reset CTE
70+
_vr as (select count(*) from visibility_reset),
71+
-- Force execution of archived CTE
72+
_ar as (select count(*) from archived)
73+
select count(*) into result_count
74+
from requeued, _vr, _ar;
75+
76+
return result_count;
77+
end;
78+
$$;
79+
-- Create "setup_requeue_stalled_tasks_cron" function
80+
CREATE FUNCTION "pgflow"."setup_requeue_stalled_tasks_cron" ("cron_interval" text DEFAULT '15 seconds') RETURNS text LANGUAGE plpgsql SECURITY DEFINER SET "search_path" = pgflow, cron, pg_temp AS $$
81+
declare
82+
job_id bigint;
83+
begin
84+
-- Remove existing job if any
85+
begin
86+
perform cron.unschedule('pgflow_requeue_stalled_tasks');
87+
exception
88+
when others then null;
89+
end;
90+
91+
-- Schedule the new job
92+
job_id := cron.schedule(
93+
job_name => 'pgflow_requeue_stalled_tasks',
94+
schedule => setup_requeue_stalled_tasks_cron.cron_interval,
95+
command => 'select pgflow.requeue_stalled_tasks()'
96+
);
97+
98+
return format('Scheduled pgflow_requeue_stalled_tasks (every %s, job_id=%s)',
99+
setup_requeue_stalled_tasks_cron.cron_interval, job_id);
100+
end;
101+
$$;
102+
-- Set comment to function: "setup_requeue_stalled_tasks_cron"
103+
COMMENT ON FUNCTION "pgflow"."setup_requeue_stalled_tasks_cron" IS 'Sets up cron job to automatically requeue stalled tasks.
104+
Schedules pgflow_requeue_stalled_tasks at the specified cron_interval (default: 15 seconds).
105+
Replaces existing job if it exists (idempotent).
106+
Returns a confirmation message with job ID.';
107+
108+
-- Setup cron job to automatically requeue stalled tasks (every 15 seconds)
109+
SELECT pgflow.setup_requeue_stalled_tasks_cron('15 seconds');

pkgs/core/supabase/migrations/atlas.sum

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
h1:sIw3ylBXnDTOY5woU5hCoL+eT87Nb0XyctIIQl3Aq2g=
1+
h1:DGjqEwXpbrpLbFLXmBEHQabXBHvwznaMtUYaXEdYQ9k=
22
20250429164909_pgflow_initial.sql h1:I3n/tQIg5Q5nLg7RDoU3BzqHvFVjmumQxVNbXTPG15s=
33
20250517072017_pgflow_fix_poll_for_tasks_to_use_separate_statement_for_polling.sql h1:wTuXuwMxVniCr3ONCpodpVWJcHktoQZIbqMZ3sUHKMY=
44
20250609105135_pgflow_add_start_tasks_and_started_status.sql h1:ggGanW4Wyt8Kv6TWjnZ00/qVb3sm+/eFVDjGfT8qyPg=
@@ -16,3 +16,4 @@ h1:sIw3ylBXnDTOY5woU5hCoL+eT87Nb0XyctIIQl3Aq2g=
1616
20251212100113_pgflow_allow_data_loss_parameter.sql h1:Fg3RHj51STNHS4epQ2J4AFMj7NwG0XfyDTSA/9dcBIQ=
1717
20251225163110_pgflow_add_flow_input_column.sql h1:734uCbTgKmPhTK3TY56uNYZ31T8u59yll9ea7nwtEoc=
1818
20260103145141_pgflow_step_output_storage.sql h1:mgVHSFDLdtYy//SZ6C03j9Str1iS9xCM8Rz/wyFwn3o=
19+
20260112094903_pgflow_requeue_stalled_tasks.sql h1:xDkh3LSMke9gG7Gd37D1EpNmzmjFuPzeY1VNWmEOlz4=
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
-- Test: Basic requeue functionality for stalled tasks
2+
begin;
3+
select plan(12);
4+
5+
select pgflow_tests.reset_db();
6+
7+
-- Create a flow with timeout of 5 seconds
8+
select pgflow.create_flow('test_flow', null, null, 5);
9+
select pgflow.add_step('test_flow', 'step_a');
10+
11+
-- Start a flow run
12+
select pgflow.start_flow('test_flow', '{"input": "test"}'::jsonb);
13+
14+
-- Ensure worker and read+start a task
15+
select pgflow_tests.ensure_worker('test_flow');
16+
select pgflow_tests.read_and_start('test_flow', 30, 1);
17+
18+
-- Test 1: Task is initially in 'started' status
19+
select is(
20+
(select status from pgflow.step_tasks where step_slug = 'step_a' limit 1),
21+
'started',
22+
'Task should be in started status initially'
23+
);
24+
25+
-- Test 2: requeue_stalled_tasks returns 0 when no tasks are stalled (within timeout)
26+
select is(
27+
pgflow.requeue_stalled_tasks(),
28+
0,
29+
'Should return 0 when no tasks are stalled yet'
30+
);
31+
32+
-- Test 3: Task still started (not stalled yet - within timeout + 30s buffer)
33+
select is(
34+
(select status from pgflow.step_tasks where step_slug = 'step_a' limit 1),
35+
'started',
36+
'Task should remain started when within timeout window'
37+
);
38+
39+
-- Simulate a stalled task by backdating timestamps to timeout + 31 seconds ago
40+
-- Must also backdate queued_at to satisfy started_at >= queued_at constraint
41+
update pgflow.step_tasks
42+
set
43+
queued_at = now() - interval '40 seconds',
44+
started_at = now() - interval '36 seconds'
45+
where step_slug = 'step_a';
46+
47+
-- Test 4: requeue_stalled_tasks returns 1 when task is stalled
48+
select is(
49+
pgflow.requeue_stalled_tasks(),
50+
1,
51+
'Should return 1 when one task is stalled'
52+
);
53+
54+
-- Test 5: Task is now back to 'queued' status
55+
select is(
56+
(select status from pgflow.step_tasks where step_slug = 'step_a' limit 1),
57+
'queued',
58+
'Stalled task should be requeued to queued status'
59+
);
60+
61+
-- Test 6: requeued_count is incremented
62+
select is(
63+
(select requeued_count from pgflow.step_tasks where step_slug = 'step_a' limit 1),
64+
1,
65+
'requeued_count should be 1 after first requeue'
66+
);
67+
68+
-- Test 7: last_requeued_at is set
69+
select ok(
70+
(select last_requeued_at is not null from pgflow.step_tasks where step_slug = 'step_a' limit 1),
71+
'last_requeued_at should be set after requeue'
72+
);
73+
74+
-- Test 8: started_at is cleared
75+
select is(
76+
(select started_at from pgflow.step_tasks where step_slug = 'step_a' limit 1),
77+
null,
78+
'started_at should be cleared after requeue'
79+
);
80+
81+
-- Test 9: attempts_count is NOT reset (task will retry)
82+
select is(
83+
(select attempts_count from pgflow.step_tasks where step_slug = 'step_a' limit 1),
84+
1,
85+
'attempts_count should remain unchanged after requeue'
86+
);
87+
88+
-- Test 10: Calling again returns 0 (task no longer stalled)
89+
select is(
90+
pgflow.requeue_stalled_tasks(),
91+
0,
92+
'Should return 0 when called again with no stalled tasks'
93+
);
94+
95+
-- Test 11: Message is visible in queue again (can be read)
96+
select ok(
97+
(select count(*) > 0 from pgmq.read('test_flow', 0, 1)),
98+
'Message should be readable from the queue after requeue'
99+
);
100+
101+
-- Test 12: last_worker_id is cleared
102+
select is(
103+
(select last_worker_id from pgflow.step_tasks where step_slug = 'step_a' limit 1),
104+
null,
105+
'last_worker_id should be cleared after requeue'
106+
);
107+
108+
select finish();
109+
rollback;

0 commit comments

Comments
 (0)