Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -285,8 +285,30 @@ To be released.
`FederationOptions.taskQueueResolution` option is set to `"strict"`.
`Federation.startQueue()` now accepts `queue: "task"` to run
a task-only worker.

[[#206], [#797], [#803] by ChanHaeng Lee]
- Tasks can request at-most-once enqueue with a `deduplicationKey`
(new `TaskEnqueueOptions.deduplicationKey`). A queue declaring the new
`MessageQueue.nativeDeduplication` capability owns the check and
receives the key through the new
`MessageQueueEnqueueOptions.deduplicationKey`; otherwise Fedify
performs a best-effort key–value guard through the optional
`KvStore.cas` primitive, under a new `taskDeduplication` key prefix.
The marker TTL and the no-`cas` fallback are tunable with the new
`FederationOptions.taskDeduplicationTtl` and
`FederationOptions.taskDeduplicationFallback` options.
- Tasks are instrumented with task-specific OpenTelemetry telemetry.
Each dequeued task runs in a `fedify.task` span that inherits the
enqueue site's trace context and carries `fedify.task.name`,
`fedify.task.attempt`, and, on a terminal failure,
`fedify.task.failure_reason`. The existing `fedify.queue.task.*`
metrics now report task runs under `fedify.queue.role` `task` with the
task name and, on a failure, a bounded `fedify.task.failure_reason`
(one of `deserialization`, `validation`, `unknown_task`, or
`handler`). `QueueTaskRole` gained `"task"`,
`QueueTaskCommonAttributes` gained `taskName`, and the new
`QueueTaskFailureReason` type and an optional trailing `failureReason`
parameter on `recordQueueTaskOutcome()` carry the failure reason.

[[#206], [#797], [#798], [#799], [#803] by ChanHaeng Lee]

[Standard Schema]: https://standardschema.dev/
[#206]: https://github.com/fedify-dev/fedify/issues/206
Expand Down Expand Up @@ -320,6 +342,8 @@ To be released.
[#782]: https://github.com/fedify-dev/fedify/issues/782
[#787]: https://github.com/fedify-dev/fedify/pull/787
[#797]: https://github.com/fedify-dev/fedify/issues/797
[#798]: https://github.com/fedify-dev/fedify/issues/798
[#799]: https://github.com/fedify-dev/fedify/issues/799
[#800]: https://github.com/fedify-dev/fedify/pull/800
[#803]: https://github.com/fedify-dev/fedify/pull/803

Expand Down
201 changes: 109 additions & 92 deletions docs/manual/opentelemetry.md

Large diffs are not rendered by default.

164 changes: 160 additions & 4 deletions docs/manual/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ Both methods accept options:
: Tasks with the same ordering key are processed sequentially (one at
a time), like the same option on the message queue layer.

`deduplicationKey`
: Requests at-most-once enqueue for tasks that share the key; see
[Deduplication](#deduplication) below.

~~~~ typescript
await ctx.enqueueTask(sendDigest, payload, {
delay: { minutes: 30 },
Expand Down Expand Up @@ -257,12 +261,164 @@ delivered it.
> queue and set `taskQueueResolution: "strict"`.


Deduplication
-------------

A task often needs *at-most-once-per-key* enqueue: a digest mailer must not
send twice when a request is retried, and a cleanup job should coalesce
duplicate triggers. Passing a `deduplicationKey` requests this—a second
enqueue with the same key is dropped while the first is still within the
deduplication window:

~~~~ typescript
await ctx.enqueueTask(sendDigest, payload, {
deduplicationKey: `digest:${payload.userId}`, // [!code highlight]
});
~~~~

How the key is resolved depends on the queue and the key–value store:

1. **Native backend.** When the task's queue declares
`~MessageQueue.nativeDeduplication`, Fedify forwards the key in the
message queue's `~MessageQueueEnqueueOptions.deduplicationKey` and the
backend owns the check. Fedify does not touch the key–value store.

2. **Key–value fallback.** Otherwise, if the configured `~KvStore` exposes
the optional compare-and-swap (`~KvStore.cas`) primitive, Fedify records
the key under a dedicated `taskDeduplication` prefix with a TTL and skips
the enqueue while a marker is present. The TTL defaults to one hour and is
configurable with `~FederationOptions.taskDeduplicationTtl`:

~~~~ typescript
const federation = createFederation<void>({
// ...
taskDeduplicationTtl: { minutes: 10 }, // [!code highlight]
});
~~~~

3. **No conditional write.** When neither applies—no native deduplication and
a key–value store without `~KvStore.cas`—the behavior is governed by
`~FederationOptions.taskDeduplicationFallback`. `"open"` (the default)
lets the enqueue proceed without deduplication after a debug-level log;
`"closed"` throws a `TypeError` before enqueuing:

~~~~ typescript
const federation = createFederation<void>({
// ...
taskDeduplicationFallback: "closed", // [!code highlight]
});
~~~~

Among the first-party adapters, the in-memory, Deno KV, SQLite, and MySQL
key–value stores implement `~KvStore.cas`; PostgreSQL and Redis do not yet, so
those deployments take the `taskDeduplicationFallback` branch until per-adapter
follow-ups add it.

For `~Context.enqueueTaskMany()`, a single `deduplicationKey` applies to the
whole batch: the batch enqueues as a unit or is skipped as a unit, never
partially. Per-item deduplication means calling `~Context.enqueueTask()` in
a loop, each with its own key. Deduplicating a multi-item batch requires the
queue to implement `~MessageQueue.enqueueMany()` so the batch enqueues
atomically—whether the check is native or the key–value fallback. Fanning the
key out across separate `~MessageQueue.enqueue()` calls cannot enqueue a whole
batch as one unit: a native per-message key cannot cover it, and a key–value
marker could not be rolled back cleanly if only some of the fanned-out enqueues
failed. So when deduplication is actually applied—a native queue, or a
key–value store with `~KvStore.cas`—Fedify rejects a multi-item batch with a
`deduplicationKey` on a queue without `~MessageQueue.enqueueMany()` instead of
risking duplicates. Under the `"open"` fallback (no native deduplication and no
`cas`), no marker is taken, so the batch simply fans out without deduplication.

This applies through `~ParallelMessageQueue` as well: wrapping a queue that
lacks `~MessageQueue.enqueueMany()` does not make batch enqueue atomic, so a
deduplicated multi-item batch on such a wrapper is likewise rejected rather than
collapsed onto one message.

> [!WARNING]
> The key–value fallback is **best-effort, not transactional**. The marker
> write and the enqueue are separate operations. Fedify rolls the marker back
> when an enqueue fails, so a transient failure does not suppress the retry, but
> a crash before that rollback, the `"open"` fallback under concurrency, a
> non-atomic third-party `~KvStore.cas`, or reuse of a key within its TTL window
> can still admit a duplicate or suppress a task. Cleanup is otherwise by TTL
> expiry, not active deletion on handler success. Deployments needing strict
> guarantees use a queue with `nativeDeduplication: true`, where the backend
> owns an atomic check.


Observability
-------------

*Task-specific telemetry is available since Fedify 2.3.0.*

Each task the worker dequeues runs inside a `fedify.task` [OpenTelemetry] span
(a *consumer* span, since tasks are not part of ActivityPub it is namespaced
under `fedify.` rather than `activitypub.`). The span inherits the trace
context captured at the enqueue site, so a task's processing chains to the
request or job that enqueued it—and every retry attempt chains to the same
parent. The span carries:

- `fedify.task.name` — the registered task name.
- `fedify.task.attempt` — the zero-based attempt number; a retry re-enqueue
increments it.
- `fedify.task.failure_reason` — set only on a terminal failure, one of the
four bounded values below.

On a terminal failure the span's status is also set to `ERROR`, so trace-based
error views surface dropped and given-up tasks together with their
`fedify.task.failure_reason`. A worker shutdown is the one exception: an
`aborted` attempt leaves the status unset, since an interruption is not a task
failure.

Tasks also reuse the `fedify.queue.task.*` metric family (`enqueued`,
`started`, `completed`, `failed`, `duration`, `in_flight`) that the inbox,
outbox, and fanout workers already report. On a task run measurement
(`enqueued`, `started`, `completed`, `failed`, `duration`),
`fedify.queue.role` is `task` and `fedify.task.name` names the task; the
process-local `in_flight` UpDownCounter omits `fedify.task.name` so its
increments and decrements pair up cleanly.
`fedify.queue.backend` reflects the queue actually used after routing—so a task
that falls back to the `outboxQueue` (see
[Routing](#queue-routing-and-isolation)) is labeled with the outbox queue's
backend, not a task queue's. A failed outcome
also carries `fedify.task.failure_reason` on `fedify.queue.task.failed` and
`fedify.queue.task.duration`.

The `fedify.task.failure_reason` attribute takes one of four bounded values,
mapping to the worker's dispatch decision points:

| Value | Meaning |
| ----------------- | -------------------------------------------------- |
| `deserialization` | The wire payload could not be deserialized. |
| `validation` | The deserialized payload failed schema validation. |
| `unknown_task` | The task name has no registered handler. |
| `handler` | The registered handler threw. |

The first three are *drops*: the payload cannot succeed by retrying, so the
worker acknowledges the message and does not re-enqueue it. Telemetry still
records these as a failed outcome with the matching reason, while the queue is
left drained—so a drop is observable without being retried. A `handler`
failure follows the configured retry policy (see
[Retries](#retry-and-error-handling)). A worker shutdown is never counted as a
failure: an interrupted attempt carries no `fedify.task.failure_reason`,
recorded as an `aborted` outcome when the abort propagates (on a `nativeRetrial`
queue) and otherwise folded into a retry like any handler error.

The bounded value set keeps metric cardinality finite: a metric's task name is
a registered, known-at-startup value, never derived from message content—an
`unknown_task` drop carries a wire-supplied name, so that name is kept off the
metrics (it still appears on the span, which does not aggregate into time
series). See the [OpenTelemetry](./opentelemetry.md) manual for the full span,
attribute, and metric reference.

[OpenTelemetry]: https://opentelemetry.io/


Limitations
-----------

The current API intentionally ships without deduplication, task-specific
OpenTelemetry spans and metrics, cron-style periodic scheduling, result
backends, and per-task priority. Some of these are planned as follow-ups;
see the [tracking issue].
The current API intentionally ships without cron-style periodic scheduling,
result backends, and per-task priority. Some of these are planned as
follow-ups; see the [tracking issue].

[tracking issue]: https://github.com/fedify-dev/fedify/issues/206
26 changes: 26 additions & 0 deletions packages/fedify/src/federation/federation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1103,6 +1103,32 @@ export interface FederationOptions<TContextData> {
*/
taskQueueResolution?: "fallback" | "strict";

/**
* The time-to-live for a {@link TaskEnqueueOptions.deduplicationKey} marker
* stored in the key–value deduplication fallback. A second enqueue with the
* same key within this window is skipped; once it expires, the key may
* enqueue again. Ignored when the task's queue declares
* {@link MessageQueue.nativeDeduplication} (the backend owns the window).
* @default `{ hours: 1 }`
* @since 2.x.x
*/
taskDeduplicationTtl?: Temporal.DurationLike;

/**
* The behavior when a {@link TaskEnqueueOptions.deduplicationKey} is supplied
* but the task's queue does not declare
* {@link MessageQueue.nativeDeduplication} *and* the configured
* {@link KvStore} exposes no `cas` (compare-and-swap) primitive:
*
* - `"open"` (the default): proceeds without deduplication after logging at
* debug level.
* - `"closed"`: rejects with a `TypeError` before enqueuing.
*
* @default `"open"`
* @since 2.x.x
*/
taskDeduplicationFallback?: "open" | "closed";

/**
* Activity transformers that are applied to outgoing activities. It is
* useful for adjusting outgoing activities to satisfy some ActivityPub
Expand Down
33 changes: 32 additions & 1 deletion packages/fedify/src/federation/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import type { MessageQueue } from "./mq.ts";
* The role of a queued task, derived from the queued message's `type` field.
* @since 2.3.0
*/
export type QueueTaskRole = "fanout" | "outbox" | "inbox";
export type QueueTaskRole = "fanout" | "outbox" | "inbox" | "task";

/**
* The terminal result of a queued task processing attempt.
Expand Down Expand Up @@ -91,6 +91,13 @@ export interface QueueTaskCommonAttributes {
role: QueueTaskRole;
queue?: MessageQueue;
activityType?: string;

/**
* The registered name of a custom background task, emitted as the
* `fedify.task.name` attribute. Set only for the `"task"` role.
* @since 2.3.0
*/
taskName?: string;
}

/**
Expand Down Expand Up @@ -209,6 +216,23 @@ export type HttpSignatureMetricFailureReason =
| "invalidSignature"
| "keyFetchError";

/**
* The reason a custom background task terminated unsuccessfully, emitted as the
* `fedify.task.failure_reason` attribute. A small bounded set mapping to the
* worker's dispatch decision points; open to later refinement.
*
* - `deserialization`: the wire payload could not be deserialized.
* - `validation`: the deserialized payload failed schema validation.
* - `unknown_task`: the task name has no registered handler.
* - `handler`: the registered handler threw.
* @since 2.3.0
*/
export type QueueTaskFailureReason =
| "deserialization"
| "validation"
| "unknown_task"
| "handler";

/**
* Bounded values recorded as `ld_signatures.type` on the signature
* verification duration histogram. Fedify only signs and verifies
Expand Down Expand Up @@ -1009,9 +1033,13 @@ class FederationMetrics {
common: QueueTaskCommonAttributes,
result: QueueTaskResult,
durationMs: number,
failureReason?: QueueTaskFailureReason,
): void {
const attributes = buildQueueTaskAttributes(common);
attributes["fedify.queue.task.result"] = result;
if (failureReason != null && result === "failed") {
attributes["fedify.task.failure_reason"] = failureReason;
}
if (result === "completed") {
this.queueTaskCompleted.add(1, attributes);
} else if (result === "failed") {
Expand Down Expand Up @@ -1197,6 +1225,9 @@ function buildQueueTaskAttributes(
if (common.activityType != null) {
attributes["activitypub.activity.type"] = common.activityType;
}
if (common.taskName != null) {
attributes["fedify.task.name"] = common.taskName;
}
return attributes;
}

Expand Down
Loading