Skip to content
15 changes: 13 additions & 2 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,18 @@ To be released.
`FederationOptions.taskQueueResolution` option is set to `"strict"`.
`Federation.startQueue()` now accepts `queue: "task"` to run
a task-only worker.

[[#206], [#797], [#803] by ChanHaeng Lee]
- Tasks can request at-most-once enqueue with a `deduplicationKey`
(new `TaskEnqueueOptions.deduplicationKey`). A queue declaring the new
`MessageQueue.nativeDeduplication` capability owns the check and
receives the key through the new
`MessageQueueEnqueueOptions.deduplicationKey`; otherwise Fedify
performs a best-effort key–value guard through the optional
`KvStore.cas` primitive, under a new `taskDeduplication` key prefix.
The marker TTL and the no-`cas` fallback are tunable with the new
`FederationOptions.taskDeduplicationTtl` and
`FederationOptions.taskDeduplicationFallback` options.

[[#206], [#797], [#798], [#803] by ChanHaeng Lee]

[Standard Schema]: https://standardschema.dev/
[#206]: https://github.com/fedify-dev/fedify/issues/206
Expand Down Expand Up @@ -320,6 +330,7 @@ To be released.
[#782]: https://github.com/fedify-dev/fedify/issues/782
[#787]: https://github.com/fedify-dev/fedify/pull/787
[#797]: https://github.com/fedify-dev/fedify/issues/797
[#798]: https://github.com/fedify-dev/fedify/issues/798
[#800]: https://github.com/fedify-dev/fedify/pull/800
[#803]: https://github.com/fedify-dev/fedify/pull/803

Expand Down
98 changes: 94 additions & 4 deletions docs/manual/tasks.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
<!-- deno-fmt-ignore-file -->

Background tasks
================

Expand Down Expand Up @@ -139,6 +141,10 @@ Both methods accept options:
: Tasks with the same ordering key are processed sequentially (one at
a time), like the same option on the message queue layer.

`deduplicationKey`
: Requests at-most-once enqueue for tasks that share the key; see
[Deduplication](#deduplication) below.

~~~~ typescript
await ctx.enqueueTask(sendDigest, payload, {
delay: { minutes: 30 },
Expand Down Expand Up @@ -257,12 +263,96 @@ delivered it.
> queue and set `taskQueueResolution: "strict"`.


Deduplication
-------------

A task often needs *at-most-once-per-key* enqueue: a digest mailer must not
send twice when a request is retried, and a cleanup job should coalesce
duplicate triggers. Passing a `deduplicationKey` requests this—a second
enqueue with the same key is dropped while the first is still within the
deduplication window:

~~~~ typescript
await ctx.enqueueTask(sendDigest, payload, {
deduplicationKey: `digest:${payload.userId}`, // [!code highlight]
});
~~~~

How the key is resolved depends on the queue and the key–value store:

1. **Native backend.** When the task's queue declares
`~MessageQueue.nativeDeduplication`, Fedify forwards the key in the
message queue's `~MessageQueueEnqueueOptions.deduplicationKey` and the
backend owns the check. Fedify does not touch the key–value store.

2. **Key–value fallback.** Otherwise, if the configured `~KvStore` exposes
the optional compare-and-swap (`~KvStore.cas`) primitive, Fedify records
the key under a dedicated `taskDeduplication` prefix with a TTL and skips
the enqueue while a marker is present. The TTL defaults to one hour and is
configurable with `~FederationOptions.taskDeduplicationTtl`:

~~~~ typescript
const federation = createFederation<void>({
// ...
taskDeduplicationTtl: { minutes: 10 }, // [!code highlight]
});
~~~~

3. **No conditional write.** When neither applies—no native deduplication and
a key–value store without `~KvStore.cas`—the behavior is governed by
`~FederationOptions.taskDeduplicationFallback`. `"open"` (the default)
lets the enqueue proceed without deduplication after a debug-level log;
`"closed"` throws a `TypeError` before enqueuing:

~~~~ typescript
const federation = createFederation<void>({
// ...
taskDeduplicationFallback: "closed", // [!code highlight]
});
~~~~

Among the first-party adapters, the in-memory, Deno KV, SQLite, and MySQL
key–value stores implement `~KvStore.cas`; PostgreSQL, Redis, and
Cloudflare Workers KV do not yet, so those deployments take the
`taskDeduplicationFallback` branch until per-adapter follow-ups add it.

For `~Context.enqueueTaskMany()`, a single `deduplicationKey` applies to the
whole batch: the batch enqueues as a unit or is skipped as a unit, never
partially. Per-item deduplication means calling `~Context.enqueueTask()` in
a loop, each with its own key. Deduplicating a multi-item batch requires the
queue to implement `~MessageQueue.enqueueMany()` so the batch enqueues
atomically—whether the check is native or the key–value fallback. Fanning the
key out across separate `~MessageQueue.enqueue()` calls cannot enqueue a whole
batch as one unit: a native per-message key cannot cover it, and a key–value
marker could not be rolled back cleanly if only some of the fanned-out enqueues
failed. So when deduplication is actually applied—a native queue, or a
key–value store with `~KvStore.cas`—Fedify rejects a multi-item batch with a
`deduplicationKey` on a queue without `~MessageQueue.enqueueMany()` instead of
risking duplicates. Under the `"open"` fallback (no native deduplication and no
`cas`), no marker is taken, so the batch simply fans out without deduplication.

This applies through `~ParallelMessageQueue` as well: wrapping a queue that
lacks `~MessageQueue.enqueueMany()` does not make batch enqueue atomic, so a
deduplicated multi-item batch on such a wrapper is likewise rejected rather than
collapsed onto one message.

> [!WARNING]
> The key–value fallback is **best-effort, not transactional**. The marker
> write and the enqueue are separate operations. Fedify rolls the marker back
> when an enqueue fails, so a transient failure does not suppress the retry, but
> a crash before that rollback, the `"open"` fallback under concurrency, a
> non-atomic third-party `~KvStore.cas`, or reuse of a key within its TTL window
> can still admit a duplicate or suppress a task. Cleanup is otherwise by TTL
> expiry, not active deletion on handler success. Deployments needing strict
> guarantees use a queue with `nativeDeduplication: true`, where the backend
> owns an atomic check.


Limitations
-----------

The current API intentionally ships without deduplication, task-specific
OpenTelemetry spans and metrics, cron-style periodic scheduling, result
backends, and per-task priority. Some of these are planned as follow-ups;
see the [tracking issue].
The current API intentionally ships without task-specific OpenTelemetry spans
and metrics, cron-style periodic scheduling, result backends, and per-task
priority. Some of these are planned as follow-ups; see the [tracking issue].

[tracking issue]: https://github.com/fedify-dev/fedify/issues/206
12 changes: 8 additions & 4 deletions packages/fedify/src/federation/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -459,17 +459,21 @@ export interface Context<TContextData> {

/**
* Enqueues multiple payloads for a custom background task at once.
* Uses the queue's bulk enqueue operation when available, falling back
* to parallel single enqueues.
* Uses the queue's bulk enqueue operation when available. Without
* deduplication, it may fall back to parallel single enqueues when the
* queue does not implement bulk enqueue.
* @template TData The type of the task payload, inferred from the task's
* schema.
* @param task The handle returned by {@link TaskRegistry.defineTask}.
* @param payloads The task payloads. Each is validated against the
* task's schema before being enqueued.
* @param options Options for enqueuing the tasks.
* @throws {TypeError} If the task is not defined on this federation,
* if no message queue is configured for tasks, or if
* a payload fails schema validation.
* if no message queue is configured for tasks, if
* a payload fails schema validation, or if a
* deduplicated multi-item batch cannot be enqueued
* atomically because the queue does not implement
* bulk enqueue.
* @since 2.x.x
Comment thread
coderabbitai[bot] marked this conversation as resolved.
*/
enqueueTaskMany<TData>(
Expand Down
26 changes: 26 additions & 0 deletions packages/fedify/src/federation/federation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1103,6 +1103,32 @@ export interface FederationOptions<TContextData> {
*/
taskQueueResolution?: "fallback" | "strict";

/**
* The time-to-live for a {@link TaskEnqueueOptions.deduplicationKey} marker
* stored in the key–value deduplication fallback. A second enqueue with the
* same key within this window is skipped; once it expires, the key may
* enqueue again. Ignored when the task's queue declares
* {@link MessageQueue.nativeDeduplication} (the backend owns the window).
* @default `{ hours: 1 }`
* @since 2.x.x
*/
taskDeduplicationTtl?: Temporal.DurationLike;

/**
* The behavior when a {@link TaskEnqueueOptions.deduplicationKey} is supplied
* but the task's queue does not declare
* {@link MessageQueue.nativeDeduplication} *and* the configured
* {@link KvStore} exposes no `cas` (compare-and-swap) primitive:
*
* - `"open"` (the default): proceeds without deduplication after logging at
* debug level.
* - `"closed"`: rejects with a `TypeError` before enqueuing.
*
* @default `"open"`
* @since 2.x.x
*/
taskDeduplicationFallback?: "open" | "closed";

/**
* Activity transformers that are applied to outgoing activities. It is
* useful for adjusting outgoing activities to satisfy some ActivityPub
Expand Down
17 changes: 10 additions & 7 deletions packages/fedify/src/federation/middleware.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10565,20 +10565,23 @@ const withTimeout = <T>(
return Promise.race([promise, timeout]).finally(() => clearTimeout(timer));
};

const taskFederationOptions = {
// A factory, not a shared constant: each task test gets its own
// MemoryKvStore so deduplication markers never leak across tests and the
// suite stays order-independent as more cases are added.
const mockOptions = () => ({
kv: new MemoryKvStore(),
documentLoaderFactory: () => mockDocumentLoader,
contextLoaderFactory: () => mockDocumentLoader,
manuallyStartQueue: true,
};
});

test("ContextImpl.enqueueTask()", async (t) => {
await t.step(
"builds the task message envelope and round-trips a vocab payload",
async () => {
const queue = new MockQueue({ supportsEnqueueMany: true });
const federation = createFederation<void>({
...taskFederationOptions,
...mockOptions(),
queue: { task: queue },
});
const task = federation.defineTask("greet", {
Expand Down Expand Up @@ -10619,7 +10622,7 @@ test("ContextImpl.enqueueTaskMany()", async (t) => {
async () => {
const queue = new MockQueue({ supportsEnqueueMany: true });
const federation = createFederation<void>({
...taskFederationOptions,
...mockOptions(),
queue: { task: queue },
});
const task = federation.defineTask("bulk", {
Expand Down Expand Up @@ -10652,7 +10655,7 @@ test("ContextImpl.enqueueTaskMany()", async (t) => {
async () => {
const queue = new MockQueue({ supportsEnqueueMany: true });
const federation = createFederation<void>({
...taskFederationOptions,
...mockOptions(),
queue: { task: queue },
});
const task = federation.defineTask("bulk-single", {
Expand All @@ -10674,7 +10677,7 @@ test("ContextImpl.enqueueTaskMany()", async (t) => {
async () => {
const queue = new RendezvousQueue(2);
const federation = createFederation<void>({
...taskFederationOptions,
...mockOptions(),
queue: { task: queue },
});
const task = federation.defineTask("bulk-fallback", {
Expand Down Expand Up @@ -10716,7 +10719,7 @@ test("ContextImpl.enqueueTaskMany()", async (t) => {
async () => {
const queue = new MockQueue();
const federation = createFederation<void>({
...taskFederationOptions,
...mockOptions(),
queue: { task: queue },
});
const task = federation.defineTask("bulk-typed", {
Expand Down
Loading
Loading