Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 50 additions & 3 deletions sql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ begin
max_attempts integer,
cancellation jsonb,
parent_task_id uuid,
idempotency_key text,
enqueue_at timestamptz not null default durable.current_time(),
first_started_at timestamptz,
state text not null check (state in (''pending'', ''running'', ''sleeping'', ''completed'', ''failed'', ''cancelled'')),
Expand All @@ -104,10 +105,17 @@ begin
't_' || p_queue_name
);

-- Idempotency might be added after the table was created; handle both cases
execute format(
'alter table durable.%I add column if not exists idempotency_key text',
't_' || p_queue_name
);

execute format('comment on column durable.%I.params is %L', 't_' || p_queue_name, 'User-defined. Task input parameters. Schema depends on Task::Params type.');
execute format('comment on column durable.%I.headers is %L', 't_' || p_queue_name, 'User-defined. Optional key-value metadata as {"key": <any JSON value>}.');
execute format('comment on column durable.%I.retry_strategy is %L', 't_' || p_queue_name, '{"kind": "none"} | {"kind": "fixed", "base_seconds": <u64>} | {"kind": "exponential", "base_seconds": <u64>, "factor": <f64>, "max_seconds": <u64>}');
execute format('comment on column durable.%I.cancellation is %L', 't_' || p_queue_name, '{"max_delay": <seconds>, "max_duration": <seconds>} - both optional. max_delay: cancel if not started within N seconds of enqueue. max_duration: cancel if not completed within N seconds of first start.');
execute format('comment on column durable.%I.idempotency_key is %L', 't_' || p_queue_name, 'Optional dedup key. A key is permanently bound to the first task that uses it — subsequent spawns with the same key always return that task, regardless of state (running, completed, failed, or cancelled). The caller owns retry semantics. Set via SpawnOptions.idempotency_key.');
execute format('comment on column durable.%I.completed_payload is %L', 't_' || p_queue_name, 'User-defined. Task return value. Schema depends on Task::Output type.');

execute format(
Expand Down Expand Up @@ -215,6 +223,13 @@ begin
't_' || p_queue_name
);

-- Idempotency key unique index (partial: only non-null keys)
execute format(
'create unique index if not exists %I on durable.%I (idempotency_key) where idempotency_key is not null',
('t_' || p_queue_name) || '_ik',
't_' || p_queue_name
);

-- Speed up claim timeout scans.
execute format(
'create index if not exists %I on durable.%I (claim_expires_at)
Expand Down Expand Up @@ -353,8 +368,10 @@ declare
v_max_attempts integer;
v_cancellation jsonb;
v_parent_task_id uuid;
v_idempotency_key text;
v_now timestamptz := durable.current_time();
v_params jsonb := coalesce(p_params, 'null'::jsonb);
v_existing_task_id uuid;
begin
if p_task_name is null or length(trim(p_task_name)) = 0 then
raise exception 'task_name must be provided';
Expand All @@ -372,14 +389,44 @@ begin
v_cancellation := p_options->'cancellation';
-- Extract parent_task_id for subtask tracking
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you re-add this comment?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 447f2a6 — restored in both sql/schema.sql and the migration.

v_parent_task_id := (p_options->>'parent_task_id')::uuid;
v_idempotency_key := p_options->>'idempotency_key';
end if;

-- Idempotency check: a key is permanently bound to the first task that uses
-- it. Subsequent spawns with the same key always return that task, regardless
-- of its state (running, completed, failed, or cancelled). The caller owns
-- retry semantics — if you want to retry after a failure, use a new key.
if v_idempotency_key is not null then
execute format(
'select t.task_id from durable.%I t
where t.idempotency_key = $1
limit 1',
't_' || p_queue_name
)
into v_existing_task_id
using v_idempotency_key;

if v_existing_task_id is not null then
return query
execute format(
'select t.task_id, r.run_id, r.attempt
from durable.%I t
join durable.%I r on r.task_id = t.task_id and r.run_id = t.last_attempt_run
where t.task_id = $1',
't_' || p_queue_name,
'r_' || p_queue_name
)
using v_existing_task_id;
return;
end if;
end if;

execute format(
'insert into durable.%I (task_id, task_name, params, headers, retry_strategy, max_attempts, cancellation, parent_task_id, enqueue_at, first_started_at, state, attempts, last_attempt_run, completed_payload, cancelled_at)
values ($1, $2, $3, $4, $5, $6, $7, $8, $9, null, ''pending'', $10, $11, null, null)',
'insert into durable.%I (task_id, task_name, params, headers, retry_strategy, max_attempts, cancellation, parent_task_id, idempotency_key, enqueue_at, first_started_at, state, attempts, last_attempt_run, completed_payload, cancelled_at)
values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, null, ''pending'', $11, $12, null, null)',
't_' || p_queue_name
)
using v_task_id, p_task_name, v_params, v_headers, v_retry_strategy, v_max_attempts, v_cancellation, v_parent_task_id, v_now, v_attempt, v_run_id;
using v_task_id, p_task_name, v_params, v_headers, v_retry_strategy, v_max_attempts, v_cancellation, v_parent_task_id, v_idempotency_key, v_now, v_attempt, v_run_id;

execute format(
'insert into durable.%I (run_id, task_id, attempt, state, available_at, wake_event, event_payload, result, failure_reason)
Expand Down
3 changes: 3 additions & 0 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ struct SpawnOptionsDb<'a> {
cancellation: Option<CancellationPolicyDb>,
#[serde(skip_serializing_if = "Option::is_none")]
parent_task_id: Option<&'a Uuid>,
#[serde(skip_serializing_if = "Option::is_none")]
idempotency_key: Option<&'a str>,
}

/// Internal struct for serializing cancellation policy (only non-None fields).
Expand Down Expand Up @@ -597,6 +599,7 @@ where
.as_ref()
.and_then(CancellationPolicyDb::from_policy),
parent_task_id: options.parent_task_id.as_ref(),
idempotency_key: options.idempotency_key.as_deref(),
};
serde_json::to_value(db_options)
}
Expand Down
Loading
Loading