Skip to content

Commit 6023294

Browse files
Add idempotency key support for task deduplication
Two new fields on SpawnOptions: - `only_once: bool` — auto-derives key from hash(task_name, params) - `idempotency_key: Option<String>` — explicit key (takes precedence) When a key is set, spawn_task checks for an existing non-terminal task with the same key. If found, returns the existing task instead of creating a duplicate. This enables first-served semantics where multiple clients can safely try to spawn the same logical task. DB changes: - New nullable `idempotency_key` column on task tables - Partial unique index on idempotency_key WHERE NOT NULL - Migration adds column/index to all existing queues - ensure_queue_tables updated for new queues Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 06dece0 commit 6023294

4 files changed

Lines changed: 371 additions & 4 deletions

File tree

sql/schema.sql

Lines changed: 53 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ begin
9393
max_attempts integer,
9494
cancellation jsonb,
9595
parent_task_id uuid,
96+
idempotency_key text,
9697
enqueue_at timestamptz not null default durable.current_time(),
9798
first_started_at timestamptz,
9899
state text not null check (state in (''pending'', ''running'', ''sleeping'', ''completed'', ''failed'', ''cancelled'')),
@@ -104,10 +105,17 @@ begin
104105
't_' || p_queue_name
105106
);
106107

108+
-- Idempotency might be added after the table was created; handle both cases
109+
execute format(
110+
'alter table durable.%I add column if not exists idempotency_key text',
111+
't_' || p_queue_name
112+
);
113+
107114
execute format('comment on column durable.%I.params is %L', 't_' || p_queue_name, 'User-defined. Task input parameters. Schema depends on Task::Params type.');
108115
execute format('comment on column durable.%I.headers is %L', 't_' || p_queue_name, 'User-defined. Optional key-value metadata as {"key": <any JSON value>}.');
109116
execute format('comment on column durable.%I.retry_strategy is %L', 't_' || p_queue_name, '{"kind": "none"} | {"kind": "fixed", "base_seconds": <u64>} | {"kind": "exponential", "base_seconds": <u64>, "factor": <f64>, "max_seconds": <u64>}');
110117
execute format('comment on column durable.%I.cancellation is %L', 't_' || p_queue_name, '{"max_delay": <seconds>, "max_duration": <seconds>} - both optional. max_delay: cancel if not started within N seconds of enqueue. max_duration: cancel if not completed within N seconds of first start.');
118+
execute format('comment on column durable.%I.idempotency_key is %L', 't_' || p_queue_name, 'Optional dedup key. When set, only one non-terminal task with this key can exist. Set via SpawnOptions.only_once or SpawnOptions.idempotency_key.');
111119
execute format('comment on column durable.%I.completed_payload is %L', 't_' || p_queue_name, 'User-defined. Task return value. Schema depends on Task::Output type.');
112120

113121
execute format(
@@ -215,6 +223,13 @@ begin
215223
't_' || p_queue_name
216224
);
217225

226+
-- Idempotency key unique index (partial: only non-null keys)
227+
execute format(
228+
'create unique index if not exists %I on durable.%I (idempotency_key) where idempotency_key is not null',
229+
('t_' || p_queue_name) || '_ik',
230+
't_' || p_queue_name
231+
);
232+
218233
-- Speed up claim timeout scans.
219234
execute format(
220235
'create index if not exists %I on durable.%I (claim_expires_at)
@@ -353,8 +368,10 @@ declare
353368
v_max_attempts integer;
354369
v_cancellation jsonb;
355370
v_parent_task_id uuid;
371+
v_idempotency_key text;
356372
v_now timestamptz := durable.current_time();
357373
v_params jsonb := coalesce(p_params, 'null'::jsonb);
374+
v_existing_task_id uuid;
358375
begin
359376
if p_task_name is null or length(trim(p_task_name)) = 0 then
360377
raise exception 'task_name must be provided';
@@ -370,16 +387,48 @@ begin
370387
end if;
371388
end if;
372389
v_cancellation := p_options->'cancellation';
373-
-- Extract parent_task_id for subtask tracking
374390
v_parent_task_id := (p_options->>'parent_task_id')::uuid;
391+
392+
-- Resolve idempotency key: explicit key takes precedence over only_once
393+
v_idempotency_key := p_options->>'idempotency_key';
394+
if v_idempotency_key is null and (p_options->>'only_once')::boolean = true then
395+
v_idempotency_key := md5(p_task_name || '::' || v_params::text);
396+
end if;
397+
end if;
398+
399+
-- Idempotency check: return existing non-terminal task if key matches
400+
if v_idempotency_key is not null then
401+
execute format(
402+
'select t.task_id from durable.%I t
403+
where t.idempotency_key = $1
404+
and t.state not in (''completed'', ''failed'', ''cancelled'')
405+
limit 1',
406+
't_' || p_queue_name
407+
)
408+
into v_existing_task_id
409+
using v_idempotency_key;
410+
411+
if v_existing_task_id is not null then
412+
return query
413+
execute format(
414+
'select t.task_id, r.run_id, r.attempt
415+
from durable.%I t
416+
join durable.%I r on r.task_id = t.task_id and r.run_id = t.last_attempt_run
417+
where t.task_id = $1',
418+
't_' || p_queue_name,
419+
'r_' || p_queue_name
420+
)
421+
using v_existing_task_id;
422+
return;
423+
end if;
375424
end if;
376425

377426
execute format(
378-
'insert into durable.%I (task_id, task_name, params, headers, retry_strategy, max_attempts, cancellation, parent_task_id, enqueue_at, first_started_at, state, attempts, last_attempt_run, completed_payload, cancelled_at)
379-
values ($1, $2, $3, $4, $5, $6, $7, $8, $9, null, ''pending'', $10, $11, null, null)',
427+
'insert into durable.%I (task_id, task_name, params, headers, retry_strategy, max_attempts, cancellation, parent_task_id, idempotency_key, enqueue_at, first_started_at, state, attempts, last_attempt_run, completed_payload, cancelled_at)
428+
values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, null, ''pending'', $11, $12, null, null)',
380429
't_' || p_queue_name
381430
)
382-
using v_task_id, p_task_name, v_params, v_headers, v_retry_strategy, v_max_attempts, v_cancellation, v_parent_task_id, v_now, v_attempt, v_run_id;
431+
using v_task_id, p_task_name, v_params, v_headers, v_retry_strategy, v_max_attempts, v_cancellation, v_parent_task_id, v_idempotency_key, v_now, v_attempt, v_run_id;
383432

384433
execute format(
385434
'insert into durable.%I (run_id, task_id, attempt, state, available_at, wake_event, event_payload, result, failure_reason)

src/client.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ struct SpawnOptionsDb<'a> {
2727
cancellation: Option<CancellationPolicyDb>,
2828
#[serde(skip_serializing_if = "Option::is_none")]
2929
parent_task_id: Option<&'a Uuid>,
30+
#[serde(skip_serializing_if = "std::ops::Not::not")]
31+
only_once: bool,
32+
#[serde(skip_serializing_if = "Option::is_none")]
33+
idempotency_key: Option<&'a str>,
3034
}
3135

3236
/// Internal struct for serializing cancellation policy (only non-None fields).
@@ -597,6 +601,8 @@ where
597601
.as_ref()
598602
.and_then(CancellationPolicyDb::from_policy),
599603
parent_task_id: options.parent_task_id.as_ref(),
604+
only_once: options.only_once,
605+
idempotency_key: options.idempotency_key.as_deref(),
600606
};
601607
serde_json::to_value(db_options)
602608
}

0 commit comments

Comments
 (0)