diff --git a/sql/schema.sql b/sql/schema.sql index ac2f42e..c1d7743 100644 --- a/sql/schema.sql +++ b/sql/schema.sql @@ -93,6 +93,7 @@ begin max_attempts integer, cancellation jsonb, parent_task_id uuid, + idempotency_key text, enqueue_at timestamptz not null default durable.current_time(), first_started_at timestamptz, state text not null check (state in (''pending'', ''running'', ''sleeping'', ''completed'', ''failed'', ''cancelled'')), @@ -104,10 +105,17 @@ begin 't_' || p_queue_name ); + -- Idempotency might be added after the table was created; handle both cases + execute format( + 'alter table durable.%I add column if not exists idempotency_key text', + 't_' || p_queue_name + ); + execute format('comment on column durable.%I.params is %L', 't_' || p_queue_name, 'User-defined. Task input parameters. Schema depends on Task::Params type.'); execute format('comment on column durable.%I.headers is %L', 't_' || p_queue_name, 'User-defined. Optional key-value metadata as {"key": }.'); execute format('comment on column durable.%I.retry_strategy is %L', 't_' || p_queue_name, '{"kind": "none"} | {"kind": "fixed", "base_seconds": } | {"kind": "exponential", "base_seconds": , "factor": , "max_seconds": }'); execute format('comment on column durable.%I.cancellation is %L', 't_' || p_queue_name, '{"max_delay": , "max_duration": } - both optional. max_delay: cancel if not started within N seconds of enqueue. max_duration: cancel if not completed within N seconds of first start.'); + execute format('comment on column durable.%I.idempotency_key is %L', 't_' || p_queue_name, 'Optional dedup key. A key is permanently bound to the first task that uses it — subsequent spawns with the same key always return that task, regardless of state (running, completed, failed, or cancelled). The caller owns retry semantics. Set via SpawnOptions.idempotency_key.'); execute format('comment on column durable.%I.completed_payload is %L', 't_' || p_queue_name, 'User-defined. Task return value. Schema depends on Task::Output type.'); execute format( @@ -215,6 +223,13 @@ begin 't_' || p_queue_name ); + -- Idempotency key unique index (partial: only non-null keys) + execute format( + 'create unique index if not exists %I on durable.%I (idempotency_key) where idempotency_key is not null', + ('t_' || p_queue_name) || '_ik', + 't_' || p_queue_name + ); + -- Speed up claim timeout scans. execute format( 'create index if not exists %I on durable.%I (claim_expires_at) @@ -353,8 +368,10 @@ declare v_max_attempts integer; v_cancellation jsonb; v_parent_task_id uuid; + v_idempotency_key text; v_now timestamptz := durable.current_time(); v_params jsonb := coalesce(p_params, 'null'::jsonb); + v_existing_task_id uuid; begin if p_task_name is null or length(trim(p_task_name)) = 0 then raise exception 'task_name must be provided'; @@ -372,14 +389,44 @@ begin v_cancellation := p_options->'cancellation'; -- Extract parent_task_id for subtask tracking v_parent_task_id := (p_options->>'parent_task_id')::uuid; + v_idempotency_key := p_options->>'idempotency_key'; + end if; + + -- Idempotency check: a key is permanently bound to the first task that uses + -- it. Subsequent spawns with the same key always return that task, regardless + -- of its state (running, completed, failed, or cancelled). The caller owns + -- retry semantics — if you want to retry after a failure, use a new key. + if v_idempotency_key is not null then + execute format( + 'select t.task_id from durable.%I t + where t.idempotency_key = $1 + limit 1', + 't_' || p_queue_name + ) + into v_existing_task_id + using v_idempotency_key; + + if v_existing_task_id is not null then + return query + execute format( + 'select t.task_id, r.run_id, r.attempt + from durable.%I t + join durable.%I r on r.task_id = t.task_id and r.run_id = t.last_attempt_run + where t.task_id = $1', + 't_' || p_queue_name, + 'r_' || p_queue_name + ) + using v_existing_task_id; + return; + end if; end if; execute format( - 'insert into durable.%I (task_id, task_name, params, headers, retry_strategy, max_attempts, cancellation, parent_task_id, enqueue_at, first_started_at, state, attempts, last_attempt_run, completed_payload, cancelled_at) - values ($1, $2, $3, $4, $5, $6, $7, $8, $9, null, ''pending'', $10, $11, null, null)', + 'insert into durable.%I (task_id, task_name, params, headers, retry_strategy, max_attempts, cancellation, parent_task_id, idempotency_key, enqueue_at, first_started_at, state, attempts, last_attempt_run, completed_payload, cancelled_at) + values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, null, ''pending'', $11, $12, null, null)', 't_' || p_queue_name ) - using v_task_id, p_task_name, v_params, v_headers, v_retry_strategy, v_max_attempts, v_cancellation, v_parent_task_id, v_now, v_attempt, v_run_id; + using v_task_id, p_task_name, v_params, v_headers, v_retry_strategy, v_max_attempts, v_cancellation, v_parent_task_id, v_idempotency_key, v_now, v_attempt, v_run_id; execute format( 'insert into durable.%I (run_id, task_id, attempt, state, available_at, wake_event, event_payload, result, failure_reason) diff --git a/src/client.rs b/src/client.rs index 4da6686..4011f33 100644 --- a/src/client.rs +++ b/src/client.rs @@ -27,6 +27,8 @@ struct SpawnOptionsDb<'a> { cancellation: Option, #[serde(skip_serializing_if = "Option::is_none")] parent_task_id: Option<&'a Uuid>, + #[serde(skip_serializing_if = "Option::is_none")] + idempotency_key: Option<&'a str>, } /// Internal struct for serializing cancellation policy (only non-None fields). @@ -597,6 +599,7 @@ where .as_ref() .and_then(CancellationPolicyDb::from_policy), parent_task_id: options.parent_task_id.as_ref(), + idempotency_key: options.idempotency_key.as_deref(), }; serde_json::to_value(db_options) } diff --git a/src/postgres/migrations/20260409000000_add_idempotency_key.sql b/src/postgres/migrations/20260409000000_add_idempotency_key.sql new file mode 100644 index 0000000..62309b8 --- /dev/null +++ b/src/postgres/migrations/20260409000000_add_idempotency_key.sql @@ -0,0 +1,300 @@ +-- Add idempotency key support for task deduplication. +-- A key is permanently bound to the first task that uses it: subsequent spawns +-- with the same key always return that task, regardless of state (running, +-- completed, failed, or cancelled). The caller owns retry semantics — to retry +-- after a failure, use a new key. + +-- 1. Add column and index to all existing queue task tables +do $$ +declare + q record; +begin + for q in select queue_name from durable.queues loop + execute format( + 'alter table durable.%I add column if not exists idempotency_key text', + 't_' || q.queue_name + ); + execute format( + 'create unique index if not exists %I on durable.%I (idempotency_key) where idempotency_key is not null', + ('t_' || q.queue_name) || '_ik', + 't_' || q.queue_name + ); + end loop; +end; +$$; + +-- 2. Update ensure_queue_tables so new queues also get the column + index +create or replace function durable.ensure_queue_tables (p_queue_name text) + returns void + language plpgsql +as $$ +begin + execute format( + 'create table if not exists durable.%I ( + task_id uuid primary key, + task_name text not null, + params jsonb not null, + headers jsonb, + retry_strategy jsonb, + max_attempts integer, + cancellation jsonb, + parent_task_id uuid, + idempotency_key text, + enqueue_at timestamptz not null default durable.current_time(), + first_started_at timestamptz, + state text not null check (state in (''pending'', ''running'', ''sleeping'', ''completed'', ''failed'', ''cancelled'')), + attempts integer not null default 0, + last_attempt_run uuid, + completed_payload jsonb, + cancelled_at timestamptz + ) with (fillfactor=70)', + 't_' || p_queue_name + ); + + -- Idempotency might be added after the table was created; handle both cases + execute format( + 'alter table durable.%I add column if not exists idempotency_key text', + 't_' || p_queue_name + ); + + execute format('comment on column durable.%I.params is %L', 't_' || p_queue_name, 'User-defined. Task input parameters. Schema depends on Task::Params type.'); + execute format('comment on column durable.%I.headers is %L', 't_' || p_queue_name, 'User-defined. Optional key-value metadata as {"key": }.'); + execute format('comment on column durable.%I.retry_strategy is %L', 't_' || p_queue_name, '{"kind": "none"} | {"kind": "fixed", "base_seconds": } | {"kind": "exponential", "base_seconds": , "factor": , "max_seconds": }'); + execute format('comment on column durable.%I.cancellation is %L', 't_' || p_queue_name, '{"max_delay": , "max_duration": } - both optional. max_delay: cancel if not started within N seconds of enqueue. max_duration: cancel if not completed within N seconds of first start.'); + execute format('comment on column durable.%I.idempotency_key is %L', 't_' || p_queue_name, 'Optional dedup key. A key is permanently bound to the first task that uses it — subsequent spawns with the same key always return that task, regardless of state (running, completed, failed, or cancelled). The caller owns retry semantics. Set via SpawnOptions.idempotency_key.'); + execute format('comment on column durable.%I.completed_payload is %L', 't_' || p_queue_name, 'User-defined. Task return value. Schema depends on Task::Output type.'); + + execute format( + 'create table if not exists durable.%I ( + run_id uuid primary key, + task_id uuid not null, + attempt integer not null, + state text not null check (state in (''pending'', ''running'', ''sleeping'', ''completed'', ''failed'', ''cancelled'')), + claimed_by text, + claim_expires_at timestamptz, + available_at timestamptz not null, + wake_event text, + event_payload jsonb, + started_at timestamptz, + completed_at timestamptz, + failed_at timestamptz, + result jsonb, + failure_reason jsonb, + created_at timestamptz not null default durable.current_time() + ) with (fillfactor=70)', + 'r_' || p_queue_name + ); + + execute format('comment on column durable.%I.wake_event is %L', 'r_' || p_queue_name, 'Event name this run is waiting for while sleeping. Set by await_event when suspending, cleared when the event fires or timeout expires.'); + execute format('comment on column durable.%I.event_payload is %L', 'r_' || p_queue_name, 'Payload delivered by emit_event when waking this run. Consumed by await_event on the next claim to return the value to the caller.'); + execute format('comment on column durable.%I.result is %L', 'r_' || p_queue_name, 'User-defined. Serialized task output. Schema depends on Task::Output type.'); + execute format('comment on column durable.%I.failure_reason is %L', 'r_' || p_queue_name, '{"name": "", "message": "", "backtrace": ""}'); + + execute format( + 'create table if not exists durable.%I ( + task_id uuid not null, + checkpoint_name text not null, + state jsonb, + owner_run_id uuid, + updated_at timestamptz not null default durable.current_time(), + primary key (task_id, checkpoint_name) + ) with (fillfactor=70)', + 'c_' || p_queue_name + ); + + execute format('comment on column durable.%I.state is %L', 'c_' || p_queue_name, 'User-defined. Checkpoint value from ctx.step(). Any JSON-serializable value.'); + + execute format( + 'create table if not exists durable.%I ( + event_name text primary key, + payload jsonb, + emitted_at timestamptz not null default durable.current_time() + )', + 'e_' || p_queue_name + ); + + execute format('comment on column durable.%I.payload is %L', 'e_' || p_queue_name, 'User-defined. Event payload. Internal child events use: {"status": "completed"|"failed"|"cancelled", "result"?: , "error"?: }'); + + execute format( + 'create table if not exists durable.%I ( + task_id uuid not null, + run_id uuid not null, + step_name text not null, + event_name text not null, + timeout_at timestamptz, + created_at timestamptz not null default durable.current_time(), + primary key (run_id, step_name) + )', + 'w_' || p_queue_name + ); + + execute format( + 'create index if not exists %I on durable.%I (state, available_at)', + ('r_' || p_queue_name) || '_sai', + 'r_' || p_queue_name + ); + + -- Partial index for claim candidate ORDER BY (available_at, run_id). + -- Matches the exact ordering used in the claim query for ready runs. + execute format( + 'create index if not exists %I on durable.%I (available_at, run_id) include (task_id) + where state in (''pending'', ''sleeping'')', + ('r_' || p_queue_name) || '_ready', + 'r_' || p_queue_name + ); + + execute format( + 'create index if not exists %I on durable.%I (task_id)', + ('r_' || p_queue_name) || '_ti', + 'r_' || p_queue_name + ); + + execute format( + 'create index if not exists %I on durable.%I (event_name)', + ('w_' || p_queue_name) || '_eni', + 'w_' || p_queue_name + ); + + -- Speed up cleanup_task_terminal wait deletion by task_id. + execute format( + 'create index if not exists %I on durable.%I (task_id)', + ('w_' || p_queue_name) || '_ti', + 'w_' || p_queue_name + ); + + -- Index for finding children of a parent task (for cascade cancellation) + execute format( + 'create index if not exists %I on durable.%I (parent_task_id) where parent_task_id is not null', + ('t_' || p_queue_name) || '_pti', + 't_' || p_queue_name + ); + + -- Idempotency key unique index (partial: only non-null keys) + execute format( + 'create unique index if not exists %I on durable.%I (idempotency_key) where idempotency_key is not null', + ('t_' || p_queue_name) || '_ik', + 't_' || p_queue_name + ); + + -- Speed up claim timeout scans. + execute format( + 'create index if not exists %I on durable.%I (claim_expires_at) + where state = ''running'' and claim_expires_at is not null', + ('r_' || p_queue_name) || '_cei', + 'r_' || p_queue_name + ); + + -- Speed up cancellation sweep: only index tasks that have cancellation policies. + execute format( + 'create index if not exists %I on durable.%I (task_id) + where state in (''pending'', ''sleeping'', ''running'') + and cancellation is not null + and (cancellation ? ''max_delay'' or cancellation ? ''max_duration'')', + ('t_' || p_queue_name) || '_cxlpol', + 't_' || p_queue_name + ); + + -- Composite index for active task state lookups. + -- Enables Index Only Scans for claim_task join, emit_event, and cancel propagation. + execute format( + 'create index if not exists %I on durable.%I (state, task_id) + where state in (''pending'', ''sleeping'', ''running'', ''cancelled'')', + ('t_' || p_queue_name) || '_state_tid', + 't_' || p_queue_name + ); +end; +$$; + +-- 3. Update spawn_task to handle idempotency key +create or replace function durable.spawn_task ( + p_queue_name text, + p_task_name text, + p_params jsonb, + p_options jsonb default '{}'::jsonb +) + returns table ( + task_id uuid, + run_id uuid, + attempt integer + ) + language plpgsql +as $$ +declare + v_task_id uuid := durable.portable_uuidv7(); + v_run_id uuid := durable.portable_uuidv7(); + v_attempt integer := 1; + v_headers jsonb; + v_retry_strategy jsonb; + v_max_attempts integer; + v_cancellation jsonb; + v_parent_task_id uuid; + v_idempotency_key text; + v_now timestamptz := durable.current_time(); + v_params jsonb := coalesce(p_params, 'null'::jsonb); + v_existing_task_id uuid; +begin + if p_task_name is null or length(trim(p_task_name)) = 0 then + raise exception 'task_name must be provided'; + end if; + + if p_options is not null then + v_headers := p_options->'headers'; + v_retry_strategy := p_options->'retry_strategy'; + if p_options ? 'max_attempts' then + v_max_attempts := (p_options->>'max_attempts')::int; + if v_max_attempts is not null and v_max_attempts < 1 then + raise exception 'max_attempts must be >= 1'; + end if; + end if; + v_cancellation := p_options->'cancellation'; + -- Extract parent_task_id for subtask tracking + v_parent_task_id := (p_options->>'parent_task_id')::uuid; + v_idempotency_key := p_options->>'idempotency_key'; + end if; + + -- Idempotency check: a key is permanently bound to the first task that uses + -- it. Subsequent spawns with the same key always return that task, regardless + -- of its state (running, completed, failed, or cancelled). The caller owns + -- retry semantics — if you want to retry after a failure, use a new key. + if v_idempotency_key is not null then + execute format( + 'select t.task_id from durable.%I t + where t.idempotency_key = $1 + limit 1', + 't_' || p_queue_name + ) + into v_existing_task_id + using v_idempotency_key; + + if v_existing_task_id is not null then + return query + execute format( + 'select t.task_id, r.run_id, r.attempt + from durable.%I t + join durable.%I r on r.task_id = t.task_id and r.run_id = t.last_attempt_run + where t.task_id = $1', + 't_' || p_queue_name, + 'r_' || p_queue_name + ) + using v_existing_task_id; + return; + end if; + end if; + + execute format( + 'insert into durable.%I (task_id, task_name, params, headers, retry_strategy, max_attempts, cancellation, parent_task_id, idempotency_key, enqueue_at, first_started_at, state, attempts, last_attempt_run, completed_payload, cancelled_at) + values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, null, ''pending'', $11, $12, null, null)', + 't_' || p_queue_name + ) + using v_task_id, p_task_name, v_params, v_headers, v_retry_strategy, v_max_attempts, v_cancellation, v_parent_task_id, v_idempotency_key, v_now, v_attempt, v_run_id; + + execute format( + 'insert into durable.%I (run_id, task_id, attempt, state, available_at, wake_event, event_payload, result, failure_reason) + values ($1, $2, $3, ''pending'', $4, null, null, null, null)', + 'r_' || p_queue_name + ) + using v_run_id, v_task_id, v_attempt, v_now; + + return query select v_task_id, v_run_id, v_attempt; +end; +$$; diff --git a/src/types.rs b/src/types.rs index 8024781..3e49f89 100644 --- a/src/types.rs +++ b/src/types.rs @@ -137,6 +137,26 @@ pub struct SpawnOptions { #[serde(skip_serializing_if = "Option::is_none")] pub(crate) parent_task_id: Option, + + /// Explicit idempotency key for task deduplication. + /// + /// When set, `spawn_task` checks for an existing task with the same key on + /// this queue. If one exists, its identifiers are returned instead of + /// creating a duplicate. This is a first-served semantics: multiple + /// clients can safely race to spawn the same logical task. + /// + /// **A key is permanently bound to the first task that uses it.** This + /// match is unconditional on task state — running, completed, failed, and + /// cancelled tasks all match. Once a key is used, it can never produce a + /// new task. The caller owns retry semantics: to retry a failed operation, + /// pick a new key (e.g. by including an attempt number). + /// + /// Callers are responsible for choosing a key that captures what "the same + /// task" means for their use case (e.g. a hash of the inputs they consider + /// identifying). Other `SpawnOptions` fields like `max_attempts` are + /// ignored on subsequent spawns that hit the idempotency check. + #[serde(skip_serializing_if = "Option::is_none")] + pub idempotency_key: Option, } /// Options for configuring a worker.