diff --git a/CHANGES.md b/CHANGES.md index ecc65bd58..5d7a8f17b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -285,8 +285,30 @@ To be released. `FederationOptions.taskQueueResolution` option is set to `"strict"`. `Federation.startQueue()` now accepts `queue: "task"` to run a task-only worker. - - [[#206], [#797], [#803] by ChanHaeng Lee] + - Tasks can request at-most-once enqueue with a `deduplicationKey` + (new `TaskEnqueueOptions.deduplicationKey`). A queue declaring the new + `MessageQueue.nativeDeduplication` capability owns the check and + receives the key through the new + `MessageQueueEnqueueOptions.deduplicationKey`; otherwise Fedify + performs a best-effort key–value guard through the optional + `KvStore.cas` primitive, under a new `taskDeduplication` key prefix. + The marker TTL and the no-`cas` fallback are tunable with the new + `FederationOptions.taskDeduplicationTtl` and + `FederationOptions.taskDeduplicationFallback` options. + - Tasks are instrumented with task-specific OpenTelemetry telemetry. + Each dequeued task runs in a `fedify.task` span that inherits the + enqueue site's trace context and carries `fedify.task.name`, + `fedify.task.attempt`, and, on a terminal failure, + `fedify.task.failure_reason`. The existing `fedify.queue.task.*` + metrics now report task runs under `fedify.queue.role` `task` with the + task name and, on a failure, a bounded `fedify.task.failure_reason` + (one of `deserialization`, `validation`, `unknown_task`, or + `handler`). `QueueTaskRole` gained `"task"`, + `QueueTaskCommonAttributes` gained `taskName`, and the new + `QueueTaskFailureReason` type and an optional trailing `failureReason` + parameter on `recordQueueTaskOutcome()` carry the failure reason. + + [[#206], [#797], [#798], [#799], [#803] by ChanHaeng Lee] [Standard Schema]: https://standardschema.dev/ [#206]: https://github.com/fedify-dev/fedify/issues/206 @@ -320,6 +342,8 @@ To be released. [#782]: https://github.com/fedify-dev/fedify/issues/782 [#787]: https://github.com/fedify-dev/fedify/pull/787 [#797]: https://github.com/fedify-dev/fedify/issues/797 +[#798]: https://github.com/fedify-dev/fedify/issues/798 +[#799]: https://github.com/fedify-dev/fedify/issues/799 [#800]: https://github.com/fedify-dev/fedify/pull/800 [#803]: https://github.com/fedify-dev/fedify/pull/803 diff --git a/docs/manual/opentelemetry.md b/docs/manual/opentelemetry.md index b0d7476bf..7a12a5ea2 100644 --- a/docs/manual/opentelemetry.md +++ b/docs/manual/opentelemetry.md @@ -226,6 +226,7 @@ spans: | `activitypub.fetch_document` | Client | Fetches a remote JSON-LD document. | | `activitypub.send_activity` | Client | Sends the ActivityPub activity. | | `activitypub.verify_key_ownership` | Internal | Verifies actor ownership of a key. | +| `fedify.task` | Consumer | Dequeues a custom background task to process. | | `http_signatures.sign` | Internal | Signs the HTTP request. | | `http_signatures.verify` | Internal | Verifies the HTTP request signature. | | `ld_signatures.sign` | Internal | Makes the Linked Data signature. | @@ -364,7 +365,7 @@ Fedify records the following OpenTelemetry metrics: | `webfinger.handle.duration` | Histogram | `ms` | Measures inbound WebFinger request handling duration. | | `fedify.http.server.request.count` | Counter | `{request}` | Counts inbound HTTP requests handled by `Federation.fetch()`. | | `fedify.http.server.request.duration` | Histogram | `ms` | Measures inbound HTTP request duration in `Federation.fetch()`. | -| `fedify.queue.task.enqueued` | Counter | `{task}` | Counts inbox, outbox, and fanout tasks Fedify enqueued. | +| `fedify.queue.task.enqueued` | Counter | `{task}` | Counts inbox, outbox, fanout, and custom background tasks Fedify enqueued. | | `fedify.queue.task.started` | Counter | `{task}` | Counts queue tasks Fedify began processing as a worker. | | `fedify.queue.task.completed` | Counter | `{task}` | Counts queue tasks Fedify finished processing without throwing. | | `fedify.queue.task.failed` | Counter | `{task}` | Counts queue tasks Fedify abandoned because processing threw. | @@ -799,11 +800,20 @@ Fedify records the following OpenTelemetry metrics: `fedify.queue.task.enqueued`, `fedify.queue.task.started`, `fedify.queue.task.completed`, `fedify.queue.task.failed`, and `fedify.queue.task.duration` -: `fedify.queue.role` (`inbox`, `outbox`, or `fanout`) is always present. +: `fedify.queue.role` (`inbox`, `outbox`, `fanout`, or `task`) is always + present. `fedify.queue.backend` is the queue implementation's constructor name (for example `RedisMessageQueue`) when available; it is omitted for queues whose constructor is the plain `Object` (for example, - `MessageQueue` instances built from an object literal). + `MessageQueue` instances built from an object literal). For a custom + background task (`role=task`) the backend reflects the queue actually used + after routing, including the `outboxQueue` fallback, and `fedify.task.name` + carries the registered task name—it is omitted for an `unknown_task` drop, + whose name is wire-derived, so task-metric cardinality stays bounded to the + registered names. A failed task outcome + (`fedify.queue.task.result=failed`) additionally carries + `fedify.task.failure_reason`, one of `deserialization`, `validation`, + `unknown_task`, or `handler`. `fedify.queue.native_retrial` reflects the queue backend's `nativeRetrial` flag when set on the queue. `activitypub.activity.type` is recorded whenever Fedify knows the activity type for the queued message; for inbox @@ -814,22 +824,26 @@ Fedify records the following OpenTelemetry metrics: from initial enqueues. `fedify.queue.task.completed`, `fedify.queue.task.failed`, and `fedify.queue.task.duration` carry `fedify.queue.task.result`, which is `completed` when processing returned - without throwing, `failed` when the worker re-threw a non-abort error, and - `aborted` when the worker re-threw an `AbortError` (for example, because a - graceful-shutdown `AbortSignal` interrupted processing). When the queue - backend does not declare `nativeRetrial`, Fedify catches inbox listener and - outbox delivery errors itself; if its retry policy still allows another - attempt, it schedules a retry by re-enqueuing the message and returns from - the worker without re-throwing, so the worker boundary records - `result=completed`. When the retry policy gives up, the worker also - returns normally (`result=completed`) without scheduling a retry. - Outbox-side activity failures remain observable through the - `activitypub.delivery.*` metrics and the `activitypub.delivery.failed` - span event, and any retry attempt (inbox or outbox) appears as a - `fedify.queue.task.enqueued` measurement with a non-zero - `fedify.queue.task.attempt`. Inbox listener errors that the retry policy - abandons are visible through error logs and the inbox span's error status, - but not through a dedicated metric. + without throwing, `failed` when processing did not succeed (for inbox and + outbox, the worker re-threw a non-abort error; for a custom task, either + the handler threw or the payload was dropped—`deserialization`, + `validation`, or `unknown_task`—in which case the message is still acked + but the outcome is recorded as `failed` with a + `fedify.task.failure_reason`), and `aborted` when the worker re-threw an + `AbortError` (for example, because a graceful-shutdown `AbortSignal` + interrupted processing). When the queue backend does not declare + `nativeRetrial`, Fedify catches inbox listener and outbox delivery errors + itself; if its retry policy still allows another attempt, it schedules a + retry by re-enqueuing the message and returns from the worker without + re-throwing, so the worker boundary records `result=completed`. When the + retry policy gives up, the worker also returns normally + (`result=completed`) without scheduling a retry. Outbox-side activity + failures remain observable through the `activitypub.delivery.*` metrics and + the `activitypub.delivery.failed` span event, and any retry attempt (inbox + or outbox) appears as a `fedify.queue.task.enqueued` measurement with a + non-zero `fedify.queue.task.attempt`. Inbox listener errors that the retry + policy abandons are visible through error logs and the inbox span's error + status, but not through a dedicated metric. `fedify.queue.task.in_flight` : `fedify.queue.role` and `fedify.queue.backend` (when available), plus @@ -930,79 +944,82 @@ for ActivityPub as of November 2024. However, Fedify provides a set of semantic [attributes] for ActivityPub. The following table shows the semantic attributes for ActivityPub: -| Attribute | Type | Description | Example | -| -------------------------------------------- | -------- | ---------------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------- | -| `activitypub.activity.id` | string | The URI of the activity object. | `"https://example.com/activity/1"` | -| `activitypub.activity.type` | string[] | The qualified URI(s) of the activity type(s). | `["https://www.w3.org/ns/activitystreams#Create"]` | -| `activitypub.activity.to` | string[] | The URI(s) of the recipient collections/actors of the activity. | `["https://example.com/1/followers/2"]` | -| `activitypub.activity.cc` | string[] | The URI(s) of the carbon-copied recipient collections/actors of the activity. | `["https://www.w3.org/ns/activitystreams#Public"]` | -| `activitypub.activity.bto` | string[] | The URI(s) of the blind recipient collections/actors of the activity. | `["https://example.com/1/followers/2"]` | -| `activitypub.activity.bcc` | string[] | The URI(s) of the blind carbon-copied recipient collections/actors of the activity. | `["https://www.w3.org/ns/activitystreams#Public"]` | -| `activitypub.activity.retries` | int | The ordinal number of activity resending attempt (if and only if it's retried). | `3` | -| `activitypub.delivery.attempt` | int | The zero-based delivery attempt number for a queued outgoing activity. | `0` | -| `activitypub.delivery.permanent_failure` | boolean | Whether an outgoing delivery failure will be abandoned instead of retried. | `true` | -| `activitypub.circuit_breaker.previous_state` | string | Previous queued outbox circuit breaker state: `closed`, `open`, or `half_open`. | `"closed"` | -| `activitypub.circuit_breaker.state` | string | Current queued outbox circuit breaker state: `closed`, `open`, or `half_open`. | `"open"` | -| `activitypub.processing.result` | string | Lifecycle outcome of an inbox or outbox activity: `queued`, `processed`, `retried`, `rejected`, or `abandoned`. | `"retried"` | -| `activitypub.actor.discovery.result` | string | Terminal outcome of `getActorHandle()`: `resolved`, `not_found`, or `error`. | `"resolved"` | -| `activitypub.actor.id` | string | The URI of the actor object. | `"https://example.com/actor/1"` | -| `activitypub.actor.key.cached` | boolean | Whether the actor's public keys are cached. | `true` | -| `activitypub.actor.type` | string[] | The qualified URI(s) of the actor type(s). | `["https://www.w3.org/ns/activitystreams#Person"]` | -| `activitypub.key.id` | string | The URI of the cryptographic key being verified. | `"https://example.com/actor/1#main-key"` | -| `activitypub.key_ownership.method` | string | The method used to verify key ownership (`owner_id` or `actor_fetch`). | `"actor_fetch"` | -| `activitypub.key_ownership.verified` | boolean | Whether the key ownership was successfully verified. | `true` | -| `activitypub.collection.id` | string | The URI of the collection object. | `"https://example.com/collection/1"` | -| `activitypub.collection.kind` | string | The bounded collection kind: `inbox`, `outbox`, `following`, `followers`, `liked`, `featured`, `featured_tags`, or `custom`. | `"followers"` | -| `activitypub.collection.page` | boolean | Whether the collection request targets a cursor page rather than the collection object. | `false` | -| `activitypub.collection.result` | string | Terminal collection request outcome: `served`, `not_found`, `not_acceptable`, `unauthorized`, or `error`. | `"served"` | -| `activitypub.collection.type` | string[] | The qualified URI(s) of the collection type(s). | `["https://www.w3.org/ns/activitystreams#OrderedCollection"]` | -| `activitypub.collection.total_items` | int | The total number of items in the collection. | `42` | -| `activitypub.object.id` | string | The URI of the object or the object enclosed by the activity. | `"https://example.com/object/1"` | -| `activitypub.object.type` | string[] | The qualified URI(s) of the object type(s). | `["https://www.w3.org/ns/activitystreams#Note"]` | -| `activitypub.object.in_reply_to` | string[] | The URI(s) of the original object to which the object reply. | `["https://example.com/object/1"]` | -| `activitypub.inboxes` | int | The number of inboxes the activity is sent to. | `12` | -| `activitypub.remote.host` | string | The host of the remote ActivityPub server, including any non-default port. | `"example.com:8443"` | -| `activitypub.shared_inbox` | boolean | Whether the activity is sent to the shared inbox. | `true` | -| `docloader.context_url` | string | The URL of the JSON-LD context document (if provided via Link header). | `"https://www.w3.org/ns/activitystreams"` | -| `docloader.document_url` | string | The final URL of the fetched document (after following redirects). | `"https://example.com/object/1"` | -| `fedify.actor.identifier` | string | The identifier of the actor. | `"1"` | -| `fedify.endpoint` | string | The bounded endpoint category that classified an inbound HTTP request handled by `Federation.fetch()`. | `"actor"` | -| `fedify.federation.instance_id` | string | Opaque per-Federation instance identifier used to distinguish queue depth series on a shared `MeterProvider`. | `"fedify-1"` | -| `fedify.route.template` | string | The matched URI Template, with parameter names (not values). | `"/users/{identifier}"` | -| `fedify.inbox.recipient` | string | The identifier of the inbox recipient. | `"1"` | -| `fedify.object.type` | string | The URI of the object type. | `"https://www.w3.org/ns/activitystreams#Note"` | -| `fedify.object.values.{parameter}` | string[] | The argument values of the object dispatcher. | `["1", "2"]` | -| `fedify.collection.dispatcher` | string | The collection dispatcher family: `built_in` or `custom`. | `"built_in"` | -| `fedify.collection.cursor` | string | The cursor of the collection. | `"eyJpZCI6IjEiLCJ0eXBlIjoiT3JkZXJlZENvbGxlY3Rpb24ifQ=="` | -| `fedify.collection.items` | number | The number of materialized items in the collection response or page. It can be less than the total items. | `10` | -| `fedify.queue.role` | string | The Fedify queue role: `inbox`, `outbox`, `fanout`, or `shared` for queue depth rows where one queue backs multiple roles. | `"outbox"` | -| `fedify.queue.backend` | string | The queue implementation's constructor name (best-effort backend identifier). | `"RedisMessageQueue"` | -| `fedify.queue.native_retrial` | boolean | Whether the queue backend declares `nativeRetrial`, meaning Fedify defers retry handling to the backend. | `true` | -| `fedify.queue.depth.state` | string | Queue depth count kind: `queued`, `ready`, or `delayed`. | `"queued"` | -| `fedify.queue.roles` | string | Comma-separated queue roles when one queue instance backs multiple roles. | `"fanout,inbox,outbox"` | -| `fedify.queue.task.attempt` | int | The zero-based attempt number recorded on `fedify.queue.task.enqueued`; non-zero for retry re-enqueues. | `1` | -| `fedify.queue.task.result` | string | The terminal outcome of queue task processing: `completed`, `failed`, or `aborted`. | `"failed"` | -| `http.redirect.url` | string | The redirect URL when a document fetch results in a redirect. | `"https://example.com/new-location"` | -| `http.response.status_code` | int | The HTTP response status code. | `200` | -| `http_signatures.signature` | string | The signature of the HTTP request in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | -| `http_signatures.algorithm` | string | The algorithm of the HTTP request signature. | `"rsa-sha256"` | -| `http_signatures.key_id` | string | The public key ID of the HTTP request signature. | `"https://example.com/actor/1#main-key"` | -| `http_signatures.verified` | boolean | Whether the HTTP request signature was verified successfully. | `false` | -| `http_signatures.failure_reason` | string | Why HTTP signature verification failed (`noSignature`, `invalidSignature`, or `keyFetchError`). | `"keyFetchError"` | -| `http_signatures.key_fetch_status` | int | The HTTP status code from a failed signing-key fetch, when available. | `410` | -| `http_signatures.key_fetch_error` | string | The error type from a non-HTTP signing-key fetch failure, when available. | `"TypeError"` | -| `http_signatures.digest.{algorithm}` | string | The digest of the HTTP request body in hexadecimal. The `{algorithm}` is the digest algorithm (e.g., `sha`, `sha-256`). | `"d41d8cd98f00b204e9800998ecf8427e"` | -| `ld_signatures.key_id` | string | The public key ID of the Linked Data signature. | `"https://example.com/actor/1#main-key"` | -| `ld_signatures.signature` | string | The signature of the Linked Data in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | -| `ld_signatures.type` | string | The algorithm of the Linked Data signature. | `"RsaSignature2017"` | -| `object_integrity_proofs.cryptosuite` | string | The cryptographic suite of the object integrity proof. | `"eddsa-jcs-2022"` | -| `object_integrity_proofs.key_id` | string | The public key ID of the object integrity proof. | `"https://example.com/actor/1#main-key"` | -| `object_integrity_proofs.signature` | string | The integrity proof of the object in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | -| `url.full` | string | The full URL being fetched by the document loader. | `"https://example.com/actor/1"` | -| `webfinger.handle.result` | string | Terminal outcome of an incoming WebFinger request: `resolved`, `invalid`, `not_found`, `tombstoned`, or `error`. | `"resolved"` | -| `webfinger.lookup.result` | string | Terminal outcome of an outgoing WebFinger lookup: `found`, `not_found`, `invalid`, `network_error`, or `error`. | `"found"` | -| `webfinger.resource` | string | The queried resource URI. | `"acct:fedify@hollo.social"` | -| `webfinger.resource.scheme` | string | The scheme of the queried resource URI. Metric attribute is bucketed to `acct`, `http`, `https`, `mailto`, or `other`. | `"acct"` | +| Attribute | Type | Description | Example | +| -------------------------------------------- | -------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------- | +| `activitypub.activity.id` | string | The URI of the activity object. | `"https://example.com/activity/1"` | +| `activitypub.activity.type` | string[] | The qualified URI(s) of the activity type(s). | `["https://www.w3.org/ns/activitystreams#Create"]` | +| `activitypub.activity.to` | string[] | The URI(s) of the recipient collections/actors of the activity. | `["https://example.com/1/followers/2"]` | +| `activitypub.activity.cc` | string[] | The URI(s) of the carbon-copied recipient collections/actors of the activity. | `["https://www.w3.org/ns/activitystreams#Public"]` | +| `activitypub.activity.bto` | string[] | The URI(s) of the blind recipient collections/actors of the activity. | `["https://example.com/1/followers/2"]` | +| `activitypub.activity.bcc` | string[] | The URI(s) of the blind carbon-copied recipient collections/actors of the activity. | `["https://www.w3.org/ns/activitystreams#Public"]` | +| `activitypub.activity.retries` | int | The ordinal number of activity resending attempt (if and only if it's retried). | `3` | +| `activitypub.delivery.attempt` | int | The zero-based delivery attempt number for a queued outgoing activity. | `0` | +| `activitypub.delivery.permanent_failure` | boolean | Whether an outgoing delivery failure will be abandoned instead of retried. | `true` | +| `activitypub.circuit_breaker.previous_state` | string | Previous queued outbox circuit breaker state: `closed`, `open`, or `half_open`. | `"closed"` | +| `activitypub.circuit_breaker.state` | string | Current queued outbox circuit breaker state: `closed`, `open`, or `half_open`. | `"open"` | +| `activitypub.processing.result` | string | Lifecycle outcome of an inbox or outbox activity: `queued`, `processed`, `retried`, `rejected`, or `abandoned`. | `"retried"` | +| `activitypub.actor.discovery.result` | string | Terminal outcome of `getActorHandle()`: `resolved`, `not_found`, or `error`. | `"resolved"` | +| `activitypub.actor.id` | string | The URI of the actor object. | `"https://example.com/actor/1"` | +| `activitypub.actor.key.cached` | boolean | Whether the actor's public keys are cached. | `true` | +| `activitypub.actor.type` | string[] | The qualified URI(s) of the actor type(s). | `["https://www.w3.org/ns/activitystreams#Person"]` | +| `activitypub.key.id` | string | The URI of the cryptographic key being verified. | `"https://example.com/actor/1#main-key"` | +| `activitypub.key_ownership.method` | string | The method used to verify key ownership (`owner_id` or `actor_fetch`). | `"actor_fetch"` | +| `activitypub.key_ownership.verified` | boolean | Whether the key ownership was successfully verified. | `true` | +| `activitypub.collection.id` | string | The URI of the collection object. | `"https://example.com/collection/1"` | +| `activitypub.collection.kind` | string | The bounded collection kind: `inbox`, `outbox`, `following`, `followers`, `liked`, `featured`, `featured_tags`, or `custom`. | `"followers"` | +| `activitypub.collection.page` | boolean | Whether the collection request targets a cursor page rather than the collection object. | `false` | +| `activitypub.collection.result` | string | Terminal collection request outcome: `served`, `not_found`, `not_acceptable`, `unauthorized`, or `error`. | `"served"` | +| `activitypub.collection.type` | string[] | The qualified URI(s) of the collection type(s). | `["https://www.w3.org/ns/activitystreams#OrderedCollection"]` | +| `activitypub.collection.total_items` | int | The total number of items in the collection. | `42` | +| `activitypub.object.id` | string | The URI of the object or the object enclosed by the activity. | `"https://example.com/object/1"` | +| `activitypub.object.type` | string[] | The qualified URI(s) of the object type(s). | `["https://www.w3.org/ns/activitystreams#Note"]` | +| `activitypub.object.in_reply_to` | string[] | The URI(s) of the original object to which the object reply. | `["https://example.com/object/1"]` | +| `activitypub.inboxes` | int | The number of inboxes the activity is sent to. | `12` | +| `activitypub.remote.host` | string | The host of the remote ActivityPub server, including any non-default port. | `"example.com:8443"` | +| `activitypub.shared_inbox` | boolean | Whether the activity is sent to the shared inbox. | `true` | +| `docloader.context_url` | string | The URL of the JSON-LD context document (if provided via Link header). | `"https://www.w3.org/ns/activitystreams"` | +| `docloader.document_url` | string | The final URL of the fetched document (after following redirects). | `"https://example.com/object/1"` | +| `fedify.actor.identifier` | string | The identifier of the actor. | `"1"` | +| `fedify.endpoint` | string | The bounded endpoint category that classified an inbound HTTP request handled by `Federation.fetch()`. | `"actor"` | +| `fedify.federation.instance_id` | string | Opaque per-Federation instance identifier used to distinguish queue depth series on a shared `MeterProvider`. | `"fedify-1"` | +| `fedify.route.template` | string | The matched URI Template, with parameter names (not values). | `"/users/{identifier}"` | +| `fedify.inbox.recipient` | string | The identifier of the inbox recipient. | `"1"` | +| `fedify.object.type` | string | The URI of the object type. | `"https://www.w3.org/ns/activitystreams#Note"` | +| `fedify.object.values.{parameter}` | string[] | The argument values of the object dispatcher. | `["1", "2"]` | +| `fedify.collection.dispatcher` | string | The collection dispatcher family: `built_in` or `custom`. | `"built_in"` | +| `fedify.collection.cursor` | string | The cursor of the collection. | `"eyJpZCI6IjEiLCJ0eXBlIjoiT3JkZXJlZENvbGxlY3Rpb24ifQ=="` | +| `fedify.collection.items` | number | The number of materialized items in the collection response or page. It can be less than the total items. | `10` | +| `fedify.queue.role` | string | The Fedify queue role: `inbox`, `outbox`, `fanout`, or `task`; `shared` additionally appears on queue depth rows where one queue backs multiple roles. | `"outbox"` | +| `fedify.queue.backend` | string | The queue implementation's constructor name (best-effort backend identifier). | `"RedisMessageQueue"` | +| `fedify.queue.native_retrial` | boolean | Whether the queue backend declares `nativeRetrial`, meaning Fedify defers retry handling to the backend. | `true` | +| `fedify.queue.depth.state` | string | Queue depth count kind: `queued`, `ready`, or `delayed`. | `"queued"` | +| `fedify.queue.roles` | string | Comma-separated queue roles when one queue instance backs multiple roles. | `"fanout,inbox,outbox"` | +| `fedify.queue.task.attempt` | int | The zero-based attempt number recorded on `fedify.queue.task.enqueued`; non-zero for retry re-enqueues. | `1` | +| `fedify.queue.task.result` | string | The terminal outcome of queue task processing: `completed`, `failed`, or `aborted`. | `"failed"` | +| `fedify.task.name` | string | The name of a custom background task: always on the `fedify.task` span; on the task's `fedify.queue.task.*` run metrics only for a registered task (omitted for an `unknown_task` drop, keeping cardinality bounded). | `"sendDigest"` | +| `fedify.task.attempt` | int | The zero-based attempt number of a custom background task, on the `fedify.task` span. | `0` | +| `fedify.task.failure_reason` | string | Why a custom background task failed: `deserialization`, `validation`, `unknown_task`, or `handler`. Set only on a terminal failure. | `"validation"` | +| `http.redirect.url` | string | The redirect URL when a document fetch results in a redirect. | `"https://example.com/new-location"` | +| `http.response.status_code` | int | The HTTP response status code. | `200` | +| `http_signatures.signature` | string | The signature of the HTTP request in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | +| `http_signatures.algorithm` | string | The algorithm of the HTTP request signature. | `"rsa-sha256"` | +| `http_signatures.key_id` | string | The public key ID of the HTTP request signature. | `"https://example.com/actor/1#main-key"` | +| `http_signatures.verified` | boolean | Whether the HTTP request signature was verified successfully. | `false` | +| `http_signatures.failure_reason` | string | Why HTTP signature verification failed (`noSignature`, `invalidSignature`, or `keyFetchError`). | `"keyFetchError"` | +| `http_signatures.key_fetch_status` | int | The HTTP status code from a failed signing-key fetch, when available. | `410` | +| `http_signatures.key_fetch_error` | string | The error type from a non-HTTP signing-key fetch failure, when available. | `"TypeError"` | +| `http_signatures.digest.{algorithm}` | string | The digest of the HTTP request body in hexadecimal. The `{algorithm}` is the digest algorithm (e.g., `sha`, `sha-256`). | `"d41d8cd98f00b204e9800998ecf8427e"` | +| `ld_signatures.key_id` | string | The public key ID of the Linked Data signature. | `"https://example.com/actor/1#main-key"` | +| `ld_signatures.signature` | string | The signature of the Linked Data in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | +| `ld_signatures.type` | string | The algorithm of the Linked Data signature. | `"RsaSignature2017"` | +| `object_integrity_proofs.cryptosuite` | string | The cryptographic suite of the object integrity proof. | `"eddsa-jcs-2022"` | +| `object_integrity_proofs.key_id` | string | The public key ID of the object integrity proof. | `"https://example.com/actor/1#main-key"` | +| `object_integrity_proofs.signature` | string | The integrity proof of the object in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | +| `url.full` | string | The full URL being fetched by the document loader. | `"https://example.com/actor/1"` | +| `webfinger.handle.result` | string | Terminal outcome of an incoming WebFinger request: `resolved`, `invalid`, `not_found`, `tombstoned`, or `error`. | `"resolved"` | +| `webfinger.lookup.result` | string | Terminal outcome of an outgoing WebFinger lookup: `found`, `not_found`, `invalid`, `network_error`, or `error`. | `"found"` | +| `webfinger.resource` | string | The queried resource URI. | `"acct:fedify@hollo.social"` | +| `webfinger.resource.scheme` | string | The scheme of the queried resource URI. Metric attribute is bucketed to `acct`, `http`, `https`, `mailto`, or `other`. | `"acct"` | [attributes]: https://opentelemetry.io/docs/specs/otel/common/#attribute [OpenTelemetry Semantic Conventions]: https://opentelemetry.io/docs/specs/semconv/ diff --git a/docs/manual/tasks.md b/docs/manual/tasks.md index 55adef8dc..a7fede0bd 100644 --- a/docs/manual/tasks.md +++ b/docs/manual/tasks.md @@ -139,6 +139,10 @@ Both methods accept options: : Tasks with the same ordering key are processed sequentially (one at a time), like the same option on the message queue layer. +`deduplicationKey` +: Requests at-most-once enqueue for tasks that share the key; see + [Deduplication](#deduplication) below. + ~~~~ typescript await ctx.enqueueTask(sendDigest, payload, { delay: { minutes: 30 }, @@ -257,12 +261,164 @@ delivered it. > queue and set `taskQueueResolution: "strict"`. +Deduplication +------------- + +A task often needs *at-most-once-per-key* enqueue: a digest mailer must not +send twice when a request is retried, and a cleanup job should coalesce +duplicate triggers. Passing a `deduplicationKey` requests this—a second +enqueue with the same key is dropped while the first is still within the +deduplication window: + +~~~~ typescript +await ctx.enqueueTask(sendDigest, payload, { + deduplicationKey: `digest:${payload.userId}`, // [!code highlight] +}); +~~~~ + +How the key is resolved depends on the queue and the key–value store: + +1. **Native backend.** When the task's queue declares + `~MessageQueue.nativeDeduplication`, Fedify forwards the key in the + message queue's `~MessageQueueEnqueueOptions.deduplicationKey` and the + backend owns the check. Fedify does not touch the key–value store. + +2. **Key–value fallback.** Otherwise, if the configured `~KvStore` exposes + the optional compare-and-swap (`~KvStore.cas`) primitive, Fedify records + the key under a dedicated `taskDeduplication` prefix with a TTL and skips + the enqueue while a marker is present. The TTL defaults to one hour and is + configurable with `~FederationOptions.taskDeduplicationTtl`: + + ~~~~ typescript + const federation = createFederation({ + // ... + taskDeduplicationTtl: { minutes: 10 }, // [!code highlight] + }); + ~~~~ + +3. **No conditional write.** When neither applies—no native deduplication and + a key–value store without `~KvStore.cas`—the behavior is governed by + `~FederationOptions.taskDeduplicationFallback`. `"open"` (the default) + lets the enqueue proceed without deduplication after a debug-level log; + `"closed"` throws a `TypeError` before enqueuing: + + ~~~~ typescript + const federation = createFederation({ + // ... + taskDeduplicationFallback: "closed", // [!code highlight] + }); + ~~~~ + +Among the first-party adapters, the in-memory, Deno KV, SQLite, and MySQL +key–value stores implement `~KvStore.cas`; PostgreSQL and Redis do not yet, so +those deployments take the `taskDeduplicationFallback` branch until per-adapter +follow-ups add it. + +For `~Context.enqueueTaskMany()`, a single `deduplicationKey` applies to the +whole batch: the batch enqueues as a unit or is skipped as a unit, never +partially. Per-item deduplication means calling `~Context.enqueueTask()` in +a loop, each with its own key. Deduplicating a multi-item batch requires the +queue to implement `~MessageQueue.enqueueMany()` so the batch enqueues +atomically—whether the check is native or the key–value fallback. Fanning the +key out across separate `~MessageQueue.enqueue()` calls cannot enqueue a whole +batch as one unit: a native per-message key cannot cover it, and a key–value +marker could not be rolled back cleanly if only some of the fanned-out enqueues +failed. So when deduplication is actually applied—a native queue, or a +key–value store with `~KvStore.cas`—Fedify rejects a multi-item batch with a +`deduplicationKey` on a queue without `~MessageQueue.enqueueMany()` instead of +risking duplicates. Under the `"open"` fallback (no native deduplication and no +`cas`), no marker is taken, so the batch simply fans out without deduplication. + +This applies through `~ParallelMessageQueue` as well: wrapping a queue that +lacks `~MessageQueue.enqueueMany()` does not make batch enqueue atomic, so a +deduplicated multi-item batch on such a wrapper is likewise rejected rather than +collapsed onto one message. + +> [!WARNING] +> The key–value fallback is **best-effort, not transactional**. The marker +> write and the enqueue are separate operations. Fedify rolls the marker back +> when an enqueue fails, so a transient failure does not suppress the retry, but +> a crash before that rollback, the `"open"` fallback under concurrency, a +> non-atomic third-party `~KvStore.cas`, or reuse of a key within its TTL window +> can still admit a duplicate or suppress a task. Cleanup is otherwise by TTL +> expiry, not active deletion on handler success. Deployments needing strict +> guarantees use a queue with `nativeDeduplication: true`, where the backend +> owns an atomic check. + + +Observability +------------- + +*Task-specific telemetry is available since Fedify 2.3.0.* + +Each task the worker dequeues runs inside a `fedify.task` [OpenTelemetry] span +(a *consumer* span, since tasks are not part of ActivityPub it is namespaced +under `fedify.` rather than `activitypub.`). The span inherits the trace +context captured at the enqueue site, so a task's processing chains to the +request or job that enqueued it—and every retry attempt chains to the same +parent. The span carries: + + - `fedify.task.name` — the registered task name. + - `fedify.task.attempt` — the zero-based attempt number; a retry re-enqueue + increments it. + - `fedify.task.failure_reason` — set only on a terminal failure, one of the + four bounded values below. + +On a terminal failure the span's status is also set to `ERROR`, so trace-based +error views surface dropped and given-up tasks together with their +`fedify.task.failure_reason`. A worker shutdown is the one exception: an +`aborted` attempt leaves the status unset, since an interruption is not a task +failure. + +Tasks also reuse the `fedify.queue.task.*` metric family (`enqueued`, +`started`, `completed`, `failed`, `duration`, `in_flight`) that the inbox, +outbox, and fanout workers already report. On a task run measurement +(`enqueued`, `started`, `completed`, `failed`, `duration`), +`fedify.queue.role` is `task` and `fedify.task.name` names the task; the +process-local `in_flight` UpDownCounter omits `fedify.task.name` so its +increments and decrements pair up cleanly. +`fedify.queue.backend` reflects the queue actually used after routing—so a task +that falls back to the `outboxQueue` (see +[Routing](#queue-routing-and-isolation)) is labeled with the outbox queue's +backend, not a task queue's. A failed outcome +also carries `fedify.task.failure_reason` on `fedify.queue.task.failed` and +`fedify.queue.task.duration`. + +The `fedify.task.failure_reason` attribute takes one of four bounded values, +mapping to the worker's dispatch decision points: + +| Value | Meaning | +| ----------------- | -------------------------------------------------- | +| `deserialization` | The wire payload could not be deserialized. | +| `validation` | The deserialized payload failed schema validation. | +| `unknown_task` | The task name has no registered handler. | +| `handler` | The registered handler threw. | + +The first three are *drops*: the payload cannot succeed by retrying, so the +worker acknowledges the message and does not re-enqueue it. Telemetry still +records these as a failed outcome with the matching reason, while the queue is +left drained—so a drop is observable without being retried. A `handler` +failure follows the configured retry policy (see +[Retries](#retry-and-error-handling)). A worker shutdown is never counted as a +failure: an interrupted attempt carries no `fedify.task.failure_reason`, +recorded as an `aborted` outcome when the abort propagates (on a `nativeRetrial` +queue) and otherwise folded into a retry like any handler error. + +The bounded value set keeps metric cardinality finite: a metric's task name is +a registered, known-at-startup value, never derived from message content—an +`unknown_task` drop carries a wire-supplied name, so that name is kept off the +metrics (it still appears on the span, which does not aggregate into time +series). See the [OpenTelemetry](./opentelemetry.md) manual for the full span, +attribute, and metric reference. + +[OpenTelemetry]: https://opentelemetry.io/ + + Limitations ----------- -The current API intentionally ships without deduplication, task-specific -OpenTelemetry spans and metrics, cron-style periodic scheduling, result -backends, and per-task priority. Some of these are planned as follow-ups; -see the [tracking issue]. +The current API intentionally ships without cron-style periodic scheduling, +result backends, and per-task priority. Some of these are planned as +follow-ups; see the [tracking issue]. [tracking issue]: https://github.com/fedify-dev/fedify/issues/206 diff --git a/packages/fedify/src/federation/federation.ts b/packages/fedify/src/federation/federation.ts index baa00edf2..5b4177f80 100644 --- a/packages/fedify/src/federation/federation.ts +++ b/packages/fedify/src/federation/federation.ts @@ -1103,6 +1103,32 @@ export interface FederationOptions { */ taskQueueResolution?: "fallback" | "strict"; + /** + * The time-to-live for a {@link TaskEnqueueOptions.deduplicationKey} marker + * stored in the key–value deduplication fallback. A second enqueue with the + * same key within this window is skipped; once it expires, the key may + * enqueue again. Ignored when the task's queue declares + * {@link MessageQueue.nativeDeduplication} (the backend owns the window). + * @default `{ hours: 1 }` + * @since 2.x.x + */ + taskDeduplicationTtl?: Temporal.DurationLike; + + /** + * The behavior when a {@link TaskEnqueueOptions.deduplicationKey} is supplied + * but the task's queue does not declare + * {@link MessageQueue.nativeDeduplication} *and* the configured + * {@link KvStore} exposes no `cas` (compare-and-swap) primitive: + * + * - `"open"` (the default): proceeds without deduplication after logging at + * debug level. + * - `"closed"`: rejects with a `TypeError` before enqueuing. + * + * @default `"open"` + * @since 2.x.x + */ + taskDeduplicationFallback?: "open" | "closed"; + /** * Activity transformers that are applied to outgoing activities. It is * useful for adjusting outgoing activities to satisfy some ActivityPub diff --git a/packages/fedify/src/federation/metrics.ts b/packages/fedify/src/federation/metrics.ts index 2c53ca86d..358fe18e8 100644 --- a/packages/fedify/src/federation/metrics.ts +++ b/packages/fedify/src/federation/metrics.ts @@ -16,7 +16,7 @@ import type { MessageQueue } from "./mq.ts"; * The role of a queued task, derived from the queued message's `type` field. * @since 2.3.0 */ -export type QueueTaskRole = "fanout" | "outbox" | "inbox"; +export type QueueTaskRole = "fanout" | "outbox" | "inbox" | "task"; /** * The terminal result of a queued task processing attempt. @@ -91,6 +91,13 @@ export interface QueueTaskCommonAttributes { role: QueueTaskRole; queue?: MessageQueue; activityType?: string; + + /** + * The registered name of a custom background task, emitted as the + * `fedify.task.name` attribute. Set only for the `"task"` role. + * @since 2.3.0 + */ + taskName?: string; } /** @@ -209,6 +216,23 @@ export type HttpSignatureMetricFailureReason = | "invalidSignature" | "keyFetchError"; +/** + * The reason a custom background task terminated unsuccessfully, emitted as the + * `fedify.task.failure_reason` attribute. A small bounded set mapping to the + * worker's dispatch decision points; open to later refinement. + * + * - `deserialization`: the wire payload could not be deserialized. + * - `validation`: the deserialized payload failed schema validation. + * - `unknown_task`: the task name has no registered handler. + * - `handler`: the registered handler threw. + * @since 2.3.0 + */ +export type QueueTaskFailureReason = + | "deserialization" + | "validation" + | "unknown_task" + | "handler"; + /** * Bounded values recorded as `ld_signatures.type` on the signature * verification duration histogram. Fedify only signs and verifies @@ -1009,9 +1033,13 @@ class FederationMetrics { common: QueueTaskCommonAttributes, result: QueueTaskResult, durationMs: number, + failureReason?: QueueTaskFailureReason, ): void { const attributes = buildQueueTaskAttributes(common); attributes["fedify.queue.task.result"] = result; + if (failureReason != null && result === "failed") { + attributes["fedify.task.failure_reason"] = failureReason; + } if (result === "completed") { this.queueTaskCompleted.add(1, attributes); } else if (result === "failed") { @@ -1197,6 +1225,9 @@ function buildQueueTaskAttributes( if (common.activityType != null) { attributes["activitypub.activity.type"] = common.activityType; } + if (common.taskName != null) { + attributes["fedify.task.name"] = common.taskName; + } return attributes; } diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index 0148ef006..ee2b432aa 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -141,6 +141,7 @@ import { isAbortError, type QueueDepthGaugeEntry, type QueueTaskCommonAttributes, + type QueueTaskFailureReason, type QueueTaskResult, recordCircuitBreakerStateChange, recordCollectionRequest, @@ -168,6 +169,7 @@ import { type SenderKeyPair, } from "./send.ts"; import { + enqueueTasks, TaskCodec, type TaskDefinition, type TaskEnqueueOptions, @@ -512,6 +514,15 @@ export interface FederationKvPrefixes { * @since 2.3.0 */ readonly circuitBreaker: KvKey; + + /** + * The key prefix used for storing custom background task deduplication + * markers. Kept separate from {@link activityIdempotence} so the two key + * spaces never collide. + * @default `["_fedify", "taskDeduplication"]` + * @since 2.x.x + */ + readonly taskDeduplication: KvKey; } /** @@ -577,6 +588,8 @@ export class FederationImpl inboxRetryPolicy: RetryPolicy; taskRetryPolicy: RetryPolicy; taskQueueResolution: "fallback" | "strict"; + taskDeduplicationTtl: Temporal.Duration; + taskDeduplicationFallback: "open" | "closed"; circuitBreaker?: CircuitBreaker; activityTransformers: readonly ActivityTransformer[]; _tracerProvider: TracerProvider | undefined; @@ -638,6 +651,7 @@ export class FederationImpl httpMessageSignaturesSpec: ["_fedify", "httpMessageSignaturesSpec"], acceptSignatureNonce: ["_fedify", "acceptSignatureNonce"], circuitBreaker: ["_fedify", "circuit"], + taskDeduplication: ["_fedify", "taskDeduplication"], } satisfies FederationKvPrefixes), ...(options.kvPrefixes ?? {}), }; @@ -871,6 +885,11 @@ export class FederationImpl this.taskRetryPolicy = options.taskRetryPolicy ?? createExponentialBackoffPolicy(); this.taskQueueResolution = options.taskQueueResolution ?? "fallback"; + this.taskDeduplicationTtl = Temporal.Duration.from( + options.taskDeduplicationTtl ?? { hours: 1 }, + ); + this.taskDeduplicationFallback = options.taskDeduplicationFallback ?? + "open"; this.activityTransformers = options.activityTransformers ?? getDefaultActivityTransformers(); this._tracerProvider = options.tracerProvider; @@ -1219,7 +1238,73 @@ export class FederationImpl }, ); } else if (message.type === "task") { - await this.#listenTaskMessage(contextData, message); + const registered = this.taskDefinitions.get(message.taskName) != null; + const common: QueueTaskCommonAttributes = { + role: "task", + queue: this.resolveTaskQueue(message.taskName), + taskName: registered ? message.taskName : undefined, + }; + await tracer.startActiveSpan( + "fedify.task", + { + kind: SpanKind.CONSUMER, + attributes: { + "fedify.task.name": message.taskName, + "fedify.task.attempt": message.attempt, + }, + }, + extractedContext, + async (span) => { + const spanCtx = span.spanContext(); + return await withContext( + { traceId: spanCtx.traceId, spanId: spanCtx.spanId }, + async () => { + meter.recordQueueTaskStarted(common); + meter.incrementQueueTaskInFlight(common); + const startedAt = performance.now(); + const recordOutcome = ( + outcome: QueueTaskResult, + failureReason: QueueTaskFailureReason | undefined, + error?: unknown, + ): void => { + if (failureReason != null) { + span.setAttribute( + "fedify.task.failure_reason", + failureReason, + ); + span.setStatus({ + code: SpanStatusCode.ERROR, + ...(error == null ? {} : { message: String(error) }), + }); + } + meter.recordQueueTaskOutcome( + common, + outcome, + getDurationMs(startedAt), + outcome === "failed" ? failureReason : undefined, + ); + }; + try { + const failureReason = await this.#listenTaskMessage( + contextData, + message, + ); + recordOutcome( + failureReason == null ? "completed" : "failed", + failureReason, + ); + } catch (e) { + if (isAbortError(e)) recordOutcome("aborted", undefined); + else recordOutcome("failed", "handler", e); + throw e; + } finally { + meter.decrementQueueTaskInFlight(common); + span.end(); + } + }, + ); + }, + ); } }); } @@ -2118,7 +2203,7 @@ export class FederationImpl async #listenTaskMessage( contextData: TContextData, message: TaskMessage, - ): Promise { + ): Promise { const logger = getLogger(["fedify", "federation", "task"]); const def = this.taskDefinitions.get(message.taskName); if (def == null) { @@ -2128,30 +2213,39 @@ export class FederationImpl "dropping.", { taskName: message.taskName }, ); - return; + return "unknown_task"; } const context = this.#createContext(new URL(message.baseUrl), contextData); - let data: unknown; - try { - // decode() deserializes then re-validates at the dequeue boundary - // (drift protection): a durable queue can hand a new deploy a payload - // an old deploy enqueued. - data = await context.codec.decode(def.schema, message.data); - } catch (error) { - // A malformed or incompatible payload won't succeed by retrying. + const deserialized = await context.codec.deserialize(message.data) + .then((value) => ({ ok: true, value }) as const) + .catch((error) => ({ ok: false, error }) as const); + if (!deserialized.ok) { logger.error( - "Custom task {taskName} payload could not be decoded or validated; " + + "Custom task {taskName} payload could not be deserialized; " + "dropping:\n{error}", - { taskName: message.taskName, error }, + { taskName: message.taskName, error: deserialized.error }, ); - return; + return "deserialization"; + } + const data = await context.codec.validate(def.schema, deserialized.value) + .then((value) => ({ ok: true, value }) as const) + .catch((error) => ({ ok: false, error }) as const); + if (!data.ok) { + // An incompatible payload won't succeed by retrying. + logger.error( + "Custom task {taskName} payload failed schema validation; " + + "dropping:\n{error}", + { taskName: message.taskName, error: data.error }, + ); + return "validation"; } try { - await def.handler(context, data); + await def.handler(context, data.value); + return undefined; } catch (error) { if (def.onError != null) { try { - await def.onError(context, error, data); + await def.onError(context, error, data.value); } catch (onErrorError) { logger.error( "onError for custom task {taskName} threw:\n{error}", @@ -2196,6 +2290,10 @@ export class FederationImpl delay: clampNegativeDelay(delay), orderingKey: message.orderingKey, }); + getFederationMetrics(this.meterProvider).recordQueueTaskEnqueued( + { role: "task", queue, taskName: message.taskName }, + retryMessage.attempt, + ); } else { logger.error( "Custom task {taskName} failed after {attempt} attempts; giving " + @@ -2203,6 +2301,8 @@ export class FederationImpl { taskName: message.taskName, attempt: message.attempt, error }, ); } + // A swallowed abort is a graceful interruption, not a task failure. + return isAbortError(error) ? undefined : "handler"; } } @@ -3106,6 +3206,10 @@ export class ContextImpl implements Context { return this.#codec ??= new TaskCodec(this); } + get #enqueueTasks() { + return enqueueTasks(this); + } + clone(data: TContextData): Context { return new ContextImpl({ url: this.url, @@ -3658,78 +3762,6 @@ export class ContextImpl implements Context { await this.#enqueueTasks(task, payloads, options); } - async #enqueueTasks( - task: TaskDefinition, - items: readonly TData[], - options: TaskEnqueueOptions, - ): Promise { - // Fail fast on a handle from another federation instance; without this - // check the message would enqueue fine and be dropped by the worker. - // Compare the registered handle by identity, not just the name: another - // instance may define the same task name with a different schema, and - // its handle would otherwise encode under that foreign schema here - // while the worker decodes under the local one. - const def = this.federation.taskDefinitions.get(task.name); - if (def == null || def.handle !== task) { - throw new TypeError( - `Task ${ - JSON.stringify(task.name) - } is not defined on this federation; ` + - "pass a handle returned by its defineTask().", - ); - } - const queue = this.federation.resolveTaskQueue(task.name); - if (queue == null) { - throw new TypeError( - "No message queue is configured for tasks; pass `queue` to " + - "createFederation() or to defineTask().", - ); - } - if (items.length < 1) return; - const delay = options.delay == null - ? undefined - : Temporal.Duration.from(options.delay); - // Encode in parallel: `enqueueTaskMany` is the bulk path, and the enqueue - // below already parallelizes, so serial encoding would be the bottleneck. - // `map` preserves order, and a rejected encode (validation failure) rejects - // the whole batch before anything is enqueued, keeping fail-fast intact. - const messages: TaskMessage[] = await Promise.all( - items.map(this.#encodeTaskMessage(task, options)), - ); - if (!this.federation.manuallyStartQueue) { - this.federation._startQueueInternal(this.data); - } - const enqueueOptions = { delay, orderingKey: options.orderingKey }; - if (messages.length === 1) { - await queue.enqueue(messages[0], enqueueOptions); - } else if (queue.enqueueMany != null) { - await queue.enqueueMany(messages, enqueueOptions); - } else { - await Promise.all(messages.map((m) => queue.enqueue(m, enqueueOptions))); - } - } - - #encodeTaskMessage = ( - task: TaskDefinition, - options: TaskEnqueueOptions, - ) => - async (data: TData): Promise => { - const encoded = await this.codec.encode(task.schema, data); - const carrier: Record = {}; - propagation.inject(context.active(), carrier); - return { - type: "task", - id: crypto.randomUUID(), - baseUrl: this.origin, - taskName: task.name, - data: encoded, - started: Temporal.Now.instant().toString(), - attempt: 0, - orderingKey: options.orderingKey, - traceContext: carrier, - }; - }; - sendActivity( sender: | SenderKeyPair diff --git a/packages/fedify/src/federation/mq.test.ts b/packages/fedify/src/federation/mq.test.ts index e7c402908..3bb5c0d88 100644 --- a/packages/fedify/src/federation/mq.test.ts +++ b/packages/fedify/src/federation/mq.test.ts @@ -5,11 +5,13 @@ import { assertFalse, assertGreater, assertGreaterOrEqual, + assertRejects, } from "@std/assert"; import { delay } from "es-toolkit"; import { InProcessMessageQueue, type MessageQueue, + type MessageQueueEnqueueOptions, ParallelMessageQueue, } from "./mq.ts"; @@ -34,6 +36,10 @@ test("InProcessMessageQueue", async (t) => { assertFalse(mq.nativeRetrial); }); + await t.step("nativeDeduplication property", () => { + assertFalse(mq.nativeDeduplication); + }); + await t.step("getDepth() [empty]", async () => { assertEquals(await mq.getDepth(), { queued: 0, @@ -419,6 +425,107 @@ test("MessageQueue.nativeRetrial", async (t) => { }); }); +test("ParallelMessageQueue inherits nativeDeduplication", () => { + class NativeDeduplicationQueue implements MessageQueue { + readonly nativeDeduplication = true; + enqueue(): Promise { + return Promise.resolve(); + } + listen(): Promise { + return Promise.resolve(); + } + } + + const workers = new ParallelMessageQueue(new NativeDeduplicationQueue(), 5); + assert(workers.nativeDeduplication); +}); + +test( + "ParallelMessageQueue forwards deduplicationKey to the wrapped queue", + async () => { + class RecordingQueue implements MessageQueue { + readonly nativeDeduplication = true; + readonly singles: (MessageQueueEnqueueOptions | undefined)[] = []; + readonly batches: (MessageQueueEnqueueOptions | undefined)[] = []; + enqueue( + _message: unknown, + options?: MessageQueueEnqueueOptions, + ): Promise { + this.singles.push(options); + return Promise.resolve(); + } + enqueueMany( + _messages: readonly unknown[], + options?: MessageQueueEnqueueOptions, + ): Promise { + this.batches.push(options); + return Promise.resolve(); + } + listen(): Promise { + return Promise.resolve(); + } + } + + const inner = new RecordingQueue(); + const workers = new ParallelMessageQueue(inner, 5); + await workers.enqueue({ x: 1 }, { deduplicationKey: "k1" }); + await workers.enqueueMany([{ x: 1 }, { x: 2 }], { deduplicationKey: "k2" }); + assertEquals(inner.singles[0]?.deduplicationKey, "k1"); + assertEquals(inner.batches[0]?.deduplicationKey, "k2"); + }, +); + +test( + "ParallelMessageQueue rejects a deduplicated batch when the wrapped queue " + + "lacks enqueueMany", + async () => { + class NoBulkQueue implements MessageQueue { + readonly nativeDeduplication = true; + readonly enqueued: unknown[] = []; + enqueue(message: unknown): Promise { + this.enqueued.push(message); + return Promise.resolve(); + } + listen(): Promise { + return Promise.resolve(); + } + } + + const inner = new NoBulkQueue(); + const workers = new ParallelMessageQueue(inner, 5); + await assertRejects( + () => + workers.enqueueMany([{ x: 1 }, { x: 2 }], { deduplicationKey: "k" }), + TypeError, + "enqueueMany", + ); + // It threw before enqueuing anything. + assertEquals(inner.enqueued.length, 0); + }, +); + +test( + "ParallelMessageQueue still fans out a non-deduplicated batch when the " + + "wrapped queue lacks enqueueMany", + async () => { + class NoBulkQueue implements MessageQueue { + readonly enqueued: unknown[] = []; + enqueue(message: unknown): Promise { + this.enqueued.push(message); + return Promise.resolve(); + } + listen(): Promise { + return Promise.resolve(); + } + } + + const inner = new NoBulkQueue(); + const workers = new ParallelMessageQueue(inner, 5); + await workers.enqueueMany([{ x: 1 }, { x: 2 }, { x: 3 }]); + assertEquals(inner.enqueued.length, 3); + }, +); + const queues: Record Promise> = { InProcessMessageQueue: () => Promise.resolve(new InProcessMessageQueue()), }; @@ -450,6 +557,10 @@ for (const mqName in queues) { assertEquals(workers.nativeRetrial, mq.nativeRetrial); }); + await t.step("nativeDeduplication property inheritance", () => { + assertEquals(workers.nativeDeduplication, mq.nativeDeduplication); + }); + await t.step("getDepth() delegation", async () => { if (mq.getDepth == null) { assertEquals(workers.getDepth, undefined); diff --git a/packages/fedify/src/federation/mq.ts b/packages/fedify/src/federation/mq.ts index 36ba9900c..c99cc63e7 100644 --- a/packages/fedify/src/federation/mq.ts +++ b/packages/fedify/src/federation/mq.ts @@ -25,6 +25,20 @@ export interface MessageQueueEnqueueOptions { * @since 2.0.0 */ readonly orderingKey?: string; + + /** + * An optional key requesting at-most-once enqueue semantics for messages + * that share it. A backend that declares + * {@link MessageQueue.nativeDeduplication} `true` owns the check: a message + * whose `deduplicationKey` was already seen within the backend's + * deduplication window is dropped instead of enqueued. Backends without + * native deduplication ignore this field; Fedify performs its own + * best-effort deduplication before reaching them on the paths that support + * it. + * + * @since 2.x.x + */ + readonly deduplicationKey?: string; } /** @@ -87,6 +101,18 @@ export interface MessageQueue { */ readonly nativeRetrial?: boolean; + /** + * Whether the message queue backend deduplicates messages that share a + * {@link MessageQueueEnqueueOptions.deduplicationKey} natively. When `true`, + * Fedify forwards the `deduplicationKey` and relies on the backend to drop + * duplicates; when `false` or omitted, Fedify applies its own best-effort + * key–value deduplication on the paths that request it. + * + * @default `false` + * @since 2.x.x + */ + readonly nativeDeduplication?: boolean; + /** * Enqueues a message in the queue. * @param message The message to enqueue. @@ -176,6 +202,12 @@ export class InProcessMessageQueue implements MessageQueue { */ readonly nativeRetrial = false; + /** + * In-process message queue does not deduplicate messages natively. + * @since 2.x.x + */ + readonly nativeDeduplication = false; + /** * Constructs a new {@link InProcessMessageQueue} with the given options. * @param options Additional options for the in-process message queue. @@ -365,6 +397,12 @@ export class ParallelMessageQueue implements MessageQueue { * @since 1.7.0 */ readonly nativeRetrial?: boolean; + + /** + * Inherits the native deduplication capability from the wrapped queue. + * @since 2.x.x + */ + readonly nativeDeduplication?: boolean; readonly getDepth?: () => Promise; /** @@ -398,6 +436,7 @@ export class ParallelMessageQueue implements MessageQueue { this.queue = queue; this.workers = workers; this.nativeRetrial = queue.nativeRetrial; + this.nativeDeduplication = queue.nativeDeduplication; if (queue.getDepth != null) { this.getDepth = () => queue.getDepth!(); } @@ -412,6 +451,15 @@ export class ParallelMessageQueue implements MessageQueue { options?: MessageQueueEnqueueOptions, ): Promise { if (this.queue.enqueueMany == null) { + if (options?.deduplicationKey != null) { + throw new TypeError( + "Cannot enqueue a batch with a deduplicationKey: the wrapped queue " + + "does not implement enqueueMany, so ParallelMessageQueue would " + + "have to fan out to individual enqueue() calls that cannot share " + + "one deduplicationKey atomically. Wrap a queue that implements " + + "enqueueMany instead.", + ); + } const results = await Promise.allSettled( messages.map((message) => this.queue.enqueue(message, options)), ); diff --git a/packages/fedify/src/federation/tasks/codec.ts b/packages/fedify/src/federation/tasks/codec.ts index f13d85581..ae075d4ea 100644 --- a/packages/fedify/src/federation/tasks/codec.ts +++ b/packages/fedify/src/federation/tasks/codec.ts @@ -10,8 +10,8 @@ export default class TaskCodec { serialize = (data: unknown): Promise => stringifyAsync(data, { Vocab: this.#stringifyVocab }); - deserialize = (raw: string): Promise => - this.#revive(new Map())(parse(raw, { Vocab: VocabHolder.from })); + deserialize = async (raw: string): Promise => + await this.#revive(new Map())(parse(raw, { Vocab: VocabHolder.from })); /** Validates `data` against `schema`, then serializes it. */ encode = async ( @@ -26,6 +26,19 @@ export default class TaskCodec { ): Promise> => TaskCodec.validate(schema, await this.deserialize(raw)); + /** + * Validates an already-deserialized `data` against `schema`. An instance + * wrapper over {@link TaskCodec.validate} so the dispatch site can split + * {@link decode} into its deserialize and validate phases—telling a + * deserialization failure apart from a validation failure—without importing + * the class. + */ + validate = ( + schema: S, + data: unknown, + ): Promise> => + TaskCodec.validate(schema, data); + static validate = async ( schema: S, data: unknown, diff --git a/packages/fedify/src/federation/tasks/enqueue.test.ts b/packages/fedify/src/federation/tasks/enqueue.test.ts new file mode 100644 index 000000000..ee7db8014 --- /dev/null +++ b/packages/fedify/src/federation/tasks/enqueue.test.ts @@ -0,0 +1,1372 @@ +import { test } from "@fedify/fixture"; +import { configure, type LogRecord, reset } from "@logtape/logtape"; +import { delay } from "es-toolkit"; +import { deepStrictEqual, ok, rejects, strictEqual } from "node:assert/strict"; +import { + baseOptions, + makeSchema, + MockQueue, + numberSchema, + stringSchema, +} from "../../testing/mod.ts"; +import { + type KvKey, + type KvStore, + type KvStoreListEntry, + type KvStoreSetOptions, + MemoryKvStore, +} from "../kv.ts"; +import { createFederation } from "../middleware.ts"; +import { + type MessageQueue, + type MessageQueueEnqueueOptions, + ParallelMessageQueue, +} from "../mq.ts"; +import type { TaskMessage } from "../queue.ts"; + +/** + * A {@link KvStore} that delegates to an in-memory store but deliberately + * omits `cas`, so that `kv.cas == null`. This drives the deduplication + * fallback branches that fire when no conditional-write primitive exists. + */ +class CaslessKvStore implements KvStore { + readonly inner = new MemoryKvStore(); + get(key: KvKey): Promise { + return this.inner.get(key); + } + set(key: KvKey, value: unknown, options?: KvStoreSetOptions): Promise { + return this.inner.set(key, value, options); + } + delete(key: KvKey): Promise { + return this.inner.delete(key); + } + list(prefix?: KvKey): AsyncIterable { + return this.inner.list(prefix); + } + // No `cas`: the fallback branch is reached precisely when `kv.cas == null`. +} + +async function collectKeys(kv: KvStore, prefix: KvKey): Promise { + const keys: KvKey[] = []; + for await (const { key } of kv.list(prefix)) keys.push(key); + return keys; +} + +const TASK_DEDUP_PREFIX: KvKey = ["_fedify", "taskDeduplication"]; +const ACTIVITY_IDEMPOTENCE_PREFIX: KvKey = ["_fedify", "activityIdempotence"]; + +const UUID_RE = + /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i; + +test("enqueueTasks() validation and dispatch", async (t) => { + await t.step("rejects an invalid payload at enqueue", async () => { + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + queue: { task: queue }, + }); + const task = federation.defineTask("strictly-typed", { + schema: numberSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await rejects( + // deno-lint-ignore no-explicit-any + () => ctx.enqueueTask(task, "not a number" as any), + { name: "TypeError", message: /Task data failed schema validation/ }, + ); + strictEqual(queue.enqueued.length, 0); + }); + + await t.step("stamps the message envelope", async () => { + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + queue: { task: queue }, + }); + const task = federation.defineTask("envelope", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTask(task, "payload"); + strictEqual(queue.enqueued.length, 1); + const { message } = queue.enqueued[0]; + strictEqual(message.type, "task"); + strictEqual(message.taskName, "envelope"); + // encodeTaskMessage stamps the context's origin (no trailing slash). + strictEqual(message.baseUrl, "https://example.com"); + strictEqual(message.attempt, 0); + ok(UUID_RE.test(message.id)); + ok(typeof message.data === "string" && message.data.length > 0); + // `started` is a serialized Temporal.Instant. + ok(Temporal.Instant.from(message.started) instanceof Temporal.Instant); + // propagation.inject always populates a (possibly empty) carrier object. + ok( + typeof message.traceContext === "object" && message.traceContext != null, + ); + }); + + await t.step("passes delay and orderingKey through", async () => { + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + queue: { task: queue }, + }); + const task = federation.defineTask("delayed", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTask(task, "payload", { + delay: { seconds: 30 }, + orderingKey: "user:alice", + }); + strictEqual(queue.enqueued.length, 1); + const { message, options } = queue.enqueued[0]; + strictEqual(message.taskName, "delayed"); + strictEqual(message.orderingKey, "user:alice"); + strictEqual(message.attempt, 0); + ok(options?.delay instanceof Temporal.Duration); + strictEqual(options.delay.total("second"), 30); + strictEqual(options.orderingKey, "user:alice"); + }); + + await t.step( + "starts the task worker on first enqueue without startQueue()", + async () => { + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + manuallyStartQueue: false, + queue: { task: queue }, + }); + const task = federation.defineTask("auto-start", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + // An app that only uses the custom task API never sends an activity, + // so enqueueTask() itself must start the worker like the other + // enqueue paths do; otherwise tasks pile up unprocessed forever. + await ctx.enqueueTask(task, "first"); + strictEqual(queue.listenCount, 1); + // The started flag keeps a second enqueue from re-listening. + await ctx.enqueueTask(task, "second"); + strictEqual(queue.listenCount, 1); + strictEqual(queue.enqueued.length, 2); + }, + ); + + await t.step("throws when the resolved task queue is null", async () => { + // No queue is configured at all, so resolveTaskQueue() returns null and + // the enqueue pipeline must fail fast before encoding any payload. + const federation = createFederation({ ...baseOptions }); + const task = federation.defineTask("queueless", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await rejects( + () => ctx.enqueueTask(task, "data"), + { name: "TypeError", message: /No message queue is configured/ }, + ); + }); + + await t.step( + "rejects a handle from another federation at enqueue", + async () => { + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + queue: { task: queue }, + }); + const other = createFederation({ + ...baseOptions, + queue: { task: new MockQueue() }, + }); + const foreignTask = other.defineTask("foreign", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await rejects( + () => ctx.enqueueTask(foreignTask, "data"), + { name: "TypeError", message: /is not defined on this federation/ }, + ); + strictEqual(queue.enqueued.length, 0); + }, + ); + + await t.step( + "rejects a same-named handle from another federation", + async () => { + // Name lookup alone cannot tell a foreign handle apart once both + // instances define the same task name: the local context would + // encode under the *schema carried by the foreign handle*, so a + // payload the local schema rejects would enqueue anyway, only to be + // dropped by the worker decoding under the local schema. Both + // instances share TContextData = void, so the phantom-brand check + // cannot reject this at compile time; the handle-identity guard is + // the only defense. + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + queue: { task: queue }, + }); + let called = 0; + federation.defineTask("rename", { + schema: numberSchema, // the local "rename" takes a number… + handler: () => { + called++; + }, + }); + const other = createFederation({ + ...baseOptions, + queue: { task: new MockQueue() }, + }); + // …while the other instance's "rename" takes a string: + const foreignTask = other.defineTask("rename", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await rejects( + () => ctx.enqueueTask(foreignTask, "not a number"), + { name: "TypeError", message: /is not defined on this federation/ }, + ); + strictEqual(queue.enqueued.length, 0); + strictEqual(called, 0); + }, + ); + + await t.step( + "enqueueTaskMany() uses enqueueMany when available", + async () => { + const queue = new MockQueue({ supportsEnqueueMany: true }); + const federation = createFederation({ + ...baseOptions, + queue: { task: queue }, + }); + const task = federation.defineTask("bulk", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTaskMany(task, ["a", "b", "c"]); + strictEqual(queue.enqueued.length, 0); + strictEqual(queue.enqueuedMany.length, 1); + strictEqual(queue.enqueuedMany[0].messages.length, 3); + }, + ); + + await t.step( + "enqueueTaskMany() falls back to parallel enqueues", + async () => { + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + queue: { task: queue }, + }); + const task = federation.defineTask("bulk-fallback", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTaskMany(task, ["a", "b"]); + strictEqual(queue.enqueued.length, 2); + }, + ); + + await t.step( + "enqueueTaskMany() with no payloads touches no queue", + async () => { + const queue = new MockQueue({ supportsEnqueueMany: true }); + const federation = createFederation({ + ...baseOptions, + queue: { task: queue }, + }); + const task = federation.defineTask("bulk-empty", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTaskMany(task, []); + strictEqual(queue.enqueued.length, 0); + strictEqual(queue.enqueuedMany.length, 0); + }, + ); +}); + +test("task deduplication", async (t) => { + await t.step( + "forwards the key to a nativeDeduplication queue without writing KV", + async () => { + const queue = new MockQueue({ nativeDeduplication: true }); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("native-dedup", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTask(task, "payload", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 1); + strictEqual(queue.enqueued[0].options?.deduplicationKey, "k"); + // The backend owns the check, so Fedify must not write any KV marker. + strictEqual((await collectKeys(kv, TASK_DEDUP_PREFIX)).length, 0); + }, + ); + + await t.step( + "skips a second enqueue with the same key within the TTL", + async () => { + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + kv: new MemoryKvStore(), + queue: { task: queue }, + }); + const task = federation.defineTask("kv-dedup", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTask(task, "first", { deduplicationKey: "k" }); + await ctx.enqueueTask(task, "second", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 1); + strictEqual(queue.enqueued[0].message.taskName, "kv-dedup"); + // A non-native queue never receives a key it would ignore. + strictEqual(queue.enqueued[0].options?.deduplicationKey, undefined); + }, + ); + + await t.step( + "re-enqueues with the same key after the TTL expires", + async () => { + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + kv: new MemoryKvStore(), + queue: { task: queue }, + taskDeduplicationTtl: { milliseconds: 100 }, + }); + const task = federation.defineTask("kv-dedup-ttl", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTask(task, "first", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 1); + // Wait comfortably past the 100 ms TTL so the marker expires. + await delay(300); + await ctx.enqueueTask(task, "second", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 2); + }, + ); + + await t.step( + 'rejects with TypeError when fallback is "closed" and no cas exists', + async () => { + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + kv: new CaslessKvStore(), + queue: { task: queue }, + taskDeduplicationFallback: "closed", + }); + const task = federation.defineTask("closed-fallback", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await rejects( + () => ctx.enqueueTask(task, "payload", { deduplicationKey: "k" }), + { name: "TypeError" }, + ); + strictEqual(queue.enqueued.length, 0); + }, + ); + + await t.step( + 'proceeds when fallback is "open" and no cas exists', + async () => { + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + kv: new CaslessKvStore(), + queue: { task: queue }, + taskDeduplicationFallback: "open", + }); + const task = federation.defineTask("open-fallback", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTask(task, "payload", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 1); + // Best-effort fallback never forwards the key to a non-native queue. + strictEqual(queue.enqueued[0].options?.deduplicationKey, undefined); + }, + ); + + await t.step( + "writes only under taskDeduplication, never activityIdempotence", + async () => { + const queue = new MockQueue(); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("prefix-isolation", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTask(task, "payload", { deduplicationKey: "k" }); + strictEqual((await collectKeys(kv, TASK_DEDUP_PREFIX)).length, 1); + strictEqual( + (await collectKeys(kv, ACTIVITY_IDEMPOTENCE_PREFIX)).length, + 0, + ); + }, + ); + + await t.step("applies one batch-level key to enqueueTaskMany", async () => { + const queue = new MockQueue({ supportsEnqueueMany: true }); + const federation = createFederation({ + ...baseOptions, + kv: new MemoryKvStore(), + queue: { task: queue }, + }); + const task = federation.defineTask("batch-dedup", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTaskMany(task, ["a", "b", "c"], { + deduplicationKey: "batch", + }); + await ctx.enqueueTaskMany(task, ["a", "b", "c"], { + deduplicationKey: "batch", + }); + // First batch enqueues all three; the second is skipped entirely. + strictEqual(queue.enqueuedMany.length, 1); + strictEqual(queue.enqueuedMany[0].messages.length, 3); + }); +}); + +test( + "task deduplication validates every payload before reserving the key", + async () => { + const queue = new MockQueue(); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("dedup-validation", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + // A rejected payload must neither enqueue nor consume the key. + await rejects(() => + ctx.enqueueTask(task, 123 as unknown as string, { + deduplicationKey: "k", + }) + ); + strictEqual(queue.enqueued.length, 0); + deepStrictEqual(await collectKeys(kv, TASK_DEDUP_PREFIX), []); + + // The same key must remain usable by the first valid enqueue. + await ctx.enqueueTask(task, "valid", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 1); + deepStrictEqual( + await collectKeys(kv, TASK_DEDUP_PREFIX), + [[...TASK_DEDUP_PREFIX, "k"]], + ); + + // Once the valid enqueue reserves it, the same key must deduplicate. + await ctx.enqueueTask(task, "duplicate", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 1); + }, +); + +test( + "native task batch deduplication is one enqueueMany operation per call", + async () => { + class NativeBatchDeduplicatingQueue implements MessageQueue { + readonly nativeDeduplication = true; + readonly #seen = new Set(); + readonly attempts: { + messages: readonly TaskMessage[]; + options?: MessageQueueEnqueueOptions; + }[] = []; + readonly accepted: { + messages: readonly TaskMessage[]; + options?: MessageQueueEnqueueOptions; + }[] = []; + + enqueue(): Promise { + throw new Error("A multi-item native batch must use enqueueMany()."); + } + + enqueueMany( + messages: readonly TaskMessage[], + options?: MessageQueueEnqueueOptions, + ): Promise { + const key = options?.deduplicationKey; + if (key == null) { + throw new TypeError( + "Native batch enqueue requires a deduplication key.", + ); + } + this.attempts.push({ messages, options }); + if (this.#seen.has(key)) return Promise.resolve(); + this.#seen.add(key); + this.accepted.push({ messages, options }); + return Promise.resolve(); + } + + listen(): Promise { + return Promise.resolve(); + } + } + + const queue = new NativeBatchDeduplicatingQueue(); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("native-batch-dedup", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + await ctx.enqueueTaskMany(task, ["a1", "a2", "a3"], { + deduplicationKey: "batch-a", + }); + await ctx.enqueueTaskMany( + task, + ["duplicate1", "duplicate2", "duplicate3"], + { + deduplicationKey: "batch-a", + }, + ); + await ctx.enqueueTaskMany(task, ["b1", "b2", "b3"], { + deduplicationKey: "batch-b", + }); + + // Every API call reaches the backend exactly once, with one key governing + // all three messages. The backend accepts complete batches or none. + strictEqual(queue.attempts.length, 3); + deepStrictEqual( + queue.attempts.map(({ messages }) => messages.length), + [3, 3, 3], + ); + deepStrictEqual( + queue.attempts.map(({ options }) => options?.deduplicationKey), + ["batch-a", "batch-a", "batch-b"], + ); + strictEqual(queue.accepted.length, 2); + deepStrictEqual( + queue.accepted.map(({ messages }) => messages.length), + [3, 3], + ); + deepStrictEqual( + queue.accepted.map(({ options }) => options?.deduplicationKey), + ["batch-a", "batch-b"], + ); + deepStrictEqual(await collectKeys(kv, TASK_DEDUP_PREFIX), []); + }, +); + +test( + "native task batch deduplication rejects without enqueueMany", + async () => { + const queue = new MockQueue({ nativeDeduplication: true }); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("native-batch-without-enqueue-many", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + await rejects( + () => + ctx.enqueueTaskMany(task, ["a", "b", "c"], { + deduplicationKey: "batch", + }), + { name: "TypeError", message: /enqueueMany/ }, + ); + + // Reject before any partial enqueue or fallback KV write. Silently + // dropping the key from items 2..n cannot satisfy these assertions. + strictEqual(queue.enqueued.length, 0); + strictEqual(queue.enqueuedMany.length, 0); + deepStrictEqual(await collectKeys(kv, TASK_DEDUP_PREFIX), []); + + // A one-item batch is representable by enqueue() and must remain valid. + await ctx.enqueueTaskMany(task, ["single"], { + deduplicationKey: "single", + }); + strictEqual(queue.enqueued.length, 1); + strictEqual(queue.enqueued[0].options?.deduplicationKey, "single"); + }, +); + +test( + "deduplication - native batch capability errors precede payload validation", + async () => { + let validationCalls = 0; + const schema = makeSchema((data): data is string => { + validationCalls++; + return typeof data === "string"; + }); + const queue = new MockQueue({ nativeDeduplication: true }); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("native-batch-capability-order", { + schema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + let caught: unknown; + try { + await ctx.enqueueTaskMany( + task, + [1, 2, 3] as unknown as readonly string[], + { deduplicationKey: "batch" }, + ); + } catch (error) { + caught = error; + } + + // The queue capability makes this request impossible regardless of the + // payload, so no user-supplied validator may run first. + strictEqual(validationCalls, 0); + ok(caught instanceof TypeError); + ok(caught.message.includes("enqueueMany")); + strictEqual(queue.enqueued.length, 0); + strictEqual(queue.enqueuedMany.length, 0); + deepStrictEqual(await collectKeys(kv, TASK_DEDUP_PREFIX), []); + }, +); + +test( + "closed deduplication fallback errors precede payload validation", + async () => { + let validationCalls = 0; + const schema = makeSchema((data): data is string => { + validationCalls++; + return typeof data === "string"; + }); + const queue = new MockQueue(); + const kv = new CaslessKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + taskDeduplicationFallback: "closed", + }); + const task = federation.defineTask("closed-fallback-order", { + schema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + let caught: unknown; + try { + await ctx.enqueueTask( + task, + 1 as unknown as string, + { deduplicationKey: "k" }, + ); + } catch (error) { + caught = error; + } + + // Closed fallback is a configuration-level rejection. It must be + // deterministic and independent of user payload validation. + strictEqual(validationCalls, 0); + ok(caught instanceof TypeError); + ok(caught.message.includes("conditional write")); + strictEqual(queue.enqueued.length, 0); + strictEqual(queue.enqueuedMany.length, 0); + deepStrictEqual(await collectKeys(kv, TASK_DEDUP_PREFIX), []); + }, +); + +/** + * A {@link MessageQueue} that fails its first enqueue—single or batch—with a + * transient error, then records every later enqueue. One class covers both the + * `enqueue()` and `enqueueMany()` rollback paths; each test instantiates its own + * copy, so the one-shot `#failNext` flag never leaks between them. + */ +class FlakyQueue implements MessageQueue { + readonly nativeDeduplication = false; + #failNext = true; + readonly enqueued: TaskMessage[] = []; + readonly enqueuedMany: TaskMessage[][] = []; + + #failOnce(): boolean { + if (!this.#failNext) return false; + this.#failNext = false; + return true; + } + + enqueue(message: TaskMessage): Promise { + if (this.#failOnce()) { + return Promise.reject(new Error("transient backend failure")); + } + this.enqueued.push(message); + return Promise.resolve(); + } + + enqueueMany(messages: readonly TaskMessage[]): Promise { + if (this.#failOnce()) { + return Promise.reject(new Error("transient backend failure")); + } + this.enqueuedMany.push([...messages]); + return Promise.resolve(); + } + + listen(): Promise { + return Promise.resolve(); + } +} + +test( + "a failed enqueue rolls back its dedup marker so the retry is not dropped", + async () => { + const queue = new FlakyQueue(); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("flaky-enqueue", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + const markerKey: KvKey = [...TASK_DEDUP_PREFIX, "k"]; + + // First enqueue: the marker is claimed, then dispatch rejects. + await rejects( + () => ctx.enqueueTask(task, "first", { deduplicationKey: "k" }), + { message: /transient backend failure/ }, + ); + strictEqual(queue.enqueued.length, 0); + strictEqual(await kv.get(markerKey), undefined); + + // The retry (queue healthy again) must enqueue the task, not be dropped. + await ctx.enqueueTask(task, "first-retry", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 1); + + // A successful retry must keep its marker so later duplicates are dropped. + ok(await kv.get(markerKey) != null); + await ctx.enqueueTask(task, "duplicate", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 1); + }, +); + +test( + "a failed batch enqueue rolls back its dedup marker so the retry is not " + + "dropped", + async () => { + const queue = new FlakyQueue(); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("flaky-batch-enqueue", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + const markerKey: KvKey = [...TASK_DEDUP_PREFIX, "batch"]; + + await rejects( + () => + ctx.enqueueTaskMany(task, ["first", "second"], { + deduplicationKey: "batch", + }), + { message: /transient backend failure/ }, + ); + strictEqual(queue.enqueuedMany.length, 0); + // Asserted via get() for the same reason as the single-item rollback test + // above (MemoryKvStore.cas leaves a `value: undefined` entry). + strictEqual(await kv.get(markerKey), undefined); + + await ctx.enqueueTaskMany(task, ["first-retry", "second-retry"], { + deduplicationKey: "batch", + }); + strictEqual(queue.enqueuedMany.length, 1); + strictEqual(queue.enqueuedMany[0].length, 2); + ok(await kv.get(markerKey) != null); + + await ctx.enqueueTaskMany(task, ["duplicate-first", "duplicate-second"], { + deduplicationKey: "batch", + }); + strictEqual(queue.enqueuedMany.length, 1); + }, +); + +test( + "a stale rollback does not clear a marker another enqueue re-claimed", + async () => { + const kv = new MemoryKvStore(); + const markerKey: KvKey = [...TASK_DEDUP_PREFIX, "k"]; + let signalFirstEntered!: () => void; + const firstEntered = new Promise((resolve) => { + signalFirstEntered = resolve; + }); + let releaseFirst!: () => void; + const firstReleased = new Promise((resolve) => { + releaseFirst = resolve; + }); + class BlockingThenFailingQueue implements MessageQueue { + readonly nativeDeduplication = false; + #calls = 0; + async enqueue(): Promise { + this.#calls++; + if (this.#calls === 1) { + signalFirstEntered(); + await firstReleased; + throw new Error("transient backend failure"); + } + } + listen(): Promise { + return Promise.resolve(); + } + } + const queue = new BlockingThenFailingQueue(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + taskDeduplicationTtl: { milliseconds: 1 }, + }); + const task = federation.defineTask("stale-rollback", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + const first = ctx.enqueueTask(task, "first", { deduplicationKey: "k" }); + await firstEntered; + await delay(20); + await ctx.enqueueTask(task, "second", { deduplicationKey: "k" }); + const secondToken = await kv.get(markerKey); + ok(secondToken != null); + releaseFirst(); + await rejects(() => first, { message: /transient backend failure/ }); + strictEqual(await kv.get(markerKey), secondToken); + }, +); + +test( + "a multi-item batch dedup without enqueueMany is rejected on the cas path", + async () => { + const queue = new MockQueue(); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("cas-batch-without-enqueue-many", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + await rejects( + () => + ctx.enqueueTaskMany(task, ["a", "b", "c"], { + deduplicationKey: "batch", + }), + { name: "TypeError", message: /enqueueMany/ }, + ); + strictEqual(queue.enqueued.length, 0); + strictEqual(queue.enqueuedMany.length, 0); + deepStrictEqual(await collectKeys(kv, TASK_DEDUP_PREFIX), []); + + await ctx.enqueueTaskMany(task, ["single"], { deduplicationKey: "single" }); + strictEqual(queue.enqueued.length, 1); + }, +); + +test( + "a failed rollback is swallowed; the original enqueue error reaches the caller", + async () => { + class ClearFailingKvStore implements KvStore { + readonly inner = new MemoryKvStore(); + clearAttempts = 0; + get(key: KvKey): Promise { + return this.inner.get(key); + } + set( + key: KvKey, + value: unknown, + options?: KvStoreSetOptions, + ): Promise { + return this.inner.set(key, value, options); + } + delete(key: KvKey): Promise { + return this.inner.delete(key); + } + list(prefix?: KvKey): AsyncIterable { + return this.inner.list(prefix); + } + cas( + key: KvKey, + expectedValue: unknown, + newValue: unknown, + options?: KvStoreSetOptions, + ): Promise { + if (newValue === undefined) { + this.clearAttempts++; + return Promise.reject(new Error("kv cas clear failed")); + } + return this.inner.cas(key, expectedValue, newValue, options); + } + } + + const queue = new FlakyQueue(); + const kv = new ClearFailingKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("rollback-failure", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + await rejects( + () => ctx.enqueueTask(task, "first", { deduplicationKey: "k" }), + { message: /transient backend failure/ }, + ); + strictEqual(queue.enqueued.length, 0); + strictEqual(kv.clearAttempts, 1); + }, +); + +/** + * A native-deduplication backend that drops repeat-key single enqueues and + * does **not** implement `enqueueMany`. Wrapping it in + * {@link ParallelMessageQueue} used to fan a batch out to one `enqueue()` per + * message, all carrying the same `deduplicationKey`, so the backend collapsed + * the whole batch onto its first message. + */ +class NativeDedupNoBulkQueue implements MessageQueue { + readonly nativeDeduplication = true; + readonly #seen = new Set(); + readonly enqueued: { + message: TaskMessage; + options?: MessageQueueEnqueueOptions; + }[] = []; + + enqueue( + message: TaskMessage, + options?: MessageQueueEnqueueOptions, + ): Promise { + const key = options?.deduplicationKey; + if (key != null) { + if (this.#seen.has(key)) return Promise.resolve(); + this.#seen.add(key); + } + this.enqueued.push({ message, options }); + return Promise.resolve(); + } + + listen(): Promise { + return Promise.resolve(); + } +} + +test( + "a deduplicated batch over a ParallelMessageQueue wrapping a native, " + + "no-enqueueMany backend is rejected, not collapsed", + async () => { + const backend = new NativeDedupNoBulkQueue(); + const queue = new ParallelMessageQueue(backend, 5); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("parallel-native-no-bulk", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + // The wrapper cannot enqueue the batch atomically under one key, so the + // multi-item batch must be rejected rather than silently collapsed to one. + await rejects( + () => + ctx.enqueueTaskMany(task, ["a", "b", "c"], { + deduplicationKey: "batch", + }), + { name: "TypeError", message: /enqueueMany/ }, + ); + strictEqual(backend.enqueued.length, 0); + // A native plan never touches KV, even when it rejects. + deepStrictEqual(await collectKeys(kv, TASK_DEDUP_PREFIX), []); + + // A single-item batch needs no bulk path, so the key is still forwarded. + await ctx.enqueueTaskMany(task, ["solo"], { deduplicationKey: "solo" }); + strictEqual(backend.enqueued.length, 1); + strictEqual(backend.enqueued[0].options?.deduplicationKey, "solo"); + }, +); + +test( + "a deduplicated batch over a ParallelMessageQueue wrapping a native " + + "enqueueMany backend forwards the key atomically", + async () => { + class NativeBatchQueue implements MessageQueue { + readonly nativeDeduplication = true; + readonly #seen = new Set(); + readonly batches: { + messages: readonly TaskMessage[]; + options?: MessageQueueEnqueueOptions; + }[] = []; + + enqueue(): Promise { + throw new Error("A multi-item native batch must use enqueueMany()."); + } + + enqueueMany( + messages: readonly TaskMessage[], + options?: MessageQueueEnqueueOptions, + ): Promise { + const key = options?.deduplicationKey; + if (key != null && this.#seen.has(key)) return Promise.resolve(); + if (key != null) this.#seen.add(key); + this.batches.push({ messages, options }); + return Promise.resolve(); + } + + listen(): Promise { + return Promise.resolve(); + } + } + + const backend = new NativeBatchQueue(); + const queue = new ParallelMessageQueue(backend, 5); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("parallel-native-bulk", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + await ctx.enqueueTaskMany(task, ["a", "b", "c"], { + deduplicationKey: "batch", + }); + // The duplicate batch is dropped by the backend's native check. + await ctx.enqueueTaskMany(task, ["x", "y", "z"], { + deduplicationKey: "batch", + }); + + strictEqual(backend.batches.length, 1); + strictEqual(backend.batches[0].messages.length, 3); + strictEqual(backend.batches[0].options?.deduplicationKey, "batch"); + // The native path never writes KV, even through the wrapper. + deepStrictEqual(await collectKeys(kv, TASK_DEDUP_PREFIX), []); + }, +); + +test( + 'an "open" fallback fans out a multi-item batch without enqueueMany ' + + "instead of rejecting it", + async () => { + const queue = new MockQueue(); // no enqueueMany, not native + const kv = new CaslessKvStore(); // no cas + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + taskDeduplicationFallback: "open", + }); + const task = federation.defineTask("open-batch-fanout", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + // With neither native dedup nor cas under "open", the batch proceeds by + // fanning out every item; it must not throw the enqueueMany requirement. + await ctx.enqueueTaskMany(task, ["a", "b", "c"], { + deduplicationKey: "batch", + }); + strictEqual(queue.enqueued.length, 3); + for (const { options } of queue.enqueued) { + strictEqual(options?.deduplicationKey, undefined); + } + // The open path records nothing in the key–value store. + deepStrictEqual(await collectKeys(kv.inner, TASK_DEDUP_PREFIX), []); + }, +); + +test( + 'an "open" fallback logs a debug record when it ignores the key', + async () => { + const records: LogRecord[] = []; + await reset(); + try { + await configure({ + sinks: { + buffer(record: LogRecord): void { + records.push(record); + }, + }, + filters: {}, + loggers: [ + { category: [], lowestLevel: "debug", sinks: ["buffer"] }, + ], + }); + + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + kv: new CaslessKvStore(), + queue: { task: queue }, + taskDeduplicationFallback: "open", + }); + const task = federation.defineTask("open-debug-log", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTask(task, "payload", { deduplicationKey: "k" }); + + const matched = records.filter((record) => + record.level === "debug" && + record.properties.deduplicationKey === "k" && + record.properties.taskName === "open-debug-log" + ); + strictEqual(matched.length, 1); + } finally { + await reset(); + } + }, +); + +test( + "two concurrent enqueues sharing a key: exactly one wins the cas claim", + async () => { + let signalEntered!: () => void; + const entered = new Promise((resolve) => { + signalEntered = resolve; + }); + let release!: () => void; + const released = new Promise((resolve) => { + release = resolve; + }); + class BlockingQueue implements MessageQueue { + readonly nativeDeduplication = false; + readonly enqueued: TaskMessage[] = []; + #first = true; + async enqueue(message: TaskMessage): Promise { + if (this.#first) { + this.#first = false; + signalEntered(); + await released; + } + this.enqueued.push(message); + } + listen(): Promise { + return Promise.resolve(); + } + } + const queue = new BlockingQueue(); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("concurrent-claim", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + // The first enqueue claims the marker, then blocks inside the queue. + const first = ctx.enqueueTask(task, "first", { deduplicationKey: "k" }); + await entered; + // With the first still in flight, the second must lose the cas claim and + // skip the queue entirely. + await ctx.enqueueTask(task, "second", { deduplicationKey: "k" }); + release(); + await first; + strictEqual(queue.enqueued.length, 1); + + // The winner kept its marker, so a later duplicate is still dropped. + await ctx.enqueueTask(task, "third", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 1); + }, +); + +test( + "a native enqueue forwards orderingKey and deduplicationKey together", + async () => { + const queue = new MockQueue({ nativeDeduplication: true }); + const federation = createFederation({ + ...baseOptions, + kv: new MemoryKvStore(), + queue: { task: queue }, + }); + const task = federation.defineTask("native-both-keys", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTask(task, "payload", { + orderingKey: "user:alice", + deduplicationKey: "dedup:alice", + }); + strictEqual(queue.enqueued.length, 1); + const { message, options } = queue.enqueued[0]; + strictEqual(message.orderingKey, "user:alice"); + strictEqual(options?.orderingKey, "user:alice"); + strictEqual(options?.deduplicationKey, "dedup:alice"); + }, +); diff --git a/packages/fedify/src/federation/tasks/enqueue.ts b/packages/fedify/src/federation/tasks/enqueue.ts new file mode 100644 index 000000000..d35889aed --- /dev/null +++ b/packages/fedify/src/federation/tasks/enqueue.ts @@ -0,0 +1,304 @@ +/** + * The enqueue pipeline for custom background tasks. `ContextImpl.enqueueTask` + * and `ContextImpl.enqueueTaskMany` delegate to {@link enqueueTasks} so the + * handle validation, deduplication planning, payload encoding, and queue + * dispatch live in one cohesive place instead of one oversized method. + * + * @module + */ +import { getLogger } from "@logtape/logtape"; +import { context, propagation } from "@opentelemetry/api"; +import type { KvKey } from "../kv.ts"; +import { getFederationMetrics } from "../metrics.ts"; +import type { FederationImpl } from "../middleware.ts"; +import type { MessageQueue } from "../mq.ts"; +import type { TaskMessage } from "../queue.ts"; +import type TaskCodec from "./codec.ts"; +import type { TaskDefinition, TaskEnqueueOptions } from "./task.ts"; + +/** + * The slice of an enqueueing {@link Context} that {@link enqueueTasks} needs: + * its federation plus the few values that are the context's own. `ContextImpl` + * assembles it from itself, so the enqueue pipeline stays out of that class. + * @template TContextData The context data to pass to the {@link Context}. + * @internal + */ +interface EnqueueTasksContext { + /** + * The federation that owns the task registry, queue resolution and start, + * the key-value store, and the deduplication configuration. The public + * {@link Federation} interface exposes none of these, so the concrete + * {@link FederationImpl} is required. + */ + readonly federation: FederationImpl; + + /** The codec, bound to this context's loaders, that encodes payloads. */ + readonly codec: TaskCodec; + + /** The context's origin, stamped onto each message as its `baseUrl`. */ + readonly origin: string; + + /** The context data handed to the queue worker when it auto-starts. */ + readonly data: TContextData; +} + +/** + * Validates the task handle, plans deduplication, encodes every payload, then + * dispatches the resulting messages to the queue. A single item flows through + * the same pipeline as a batch, so {@link Context.enqueueTask} and + * {@link Context.enqueueTaskMany} share one implementation. + * @template TContextData The context data to pass to the {@link Context}. + * @template TData The type of the task payload, inferred from the task's schema. + * @param ctx The enqueueing dependencies assembled by `ContextImpl`. + * @param task The handle returned by `defineTask()`. + * @param items The payloads to enqueue, in order. + * @param options The enqueue options governing delay, ordering, and dedup. + * @internal + */ +const enqueueTasks = ( + ctx: EnqueueTasksContext, +) => + async function ( + task: TaskDefinition, + items: readonly TData[], + options: TaskEnqueueOptions, + ): Promise { + const def = ctx.federation.taskDefinitions.get(task.name); + if (def == null || def.handle !== task) { + throw new TypeError( + `Task ${ + JSON.stringify(task.name) + } is not defined on this federation; ` + + "pass a handle returned by its defineTask().", + ); + } + const queue = ctx.federation.resolveTaskQueue(task.name); + if (queue == null) { + throw new TypeError( + "No message queue is configured for tasks; pass `queue` to " + + "createFederation() or to defineTask().", + ); + } + if (items.length < 1) return; + const plan = planDeduplication( + ctx, + queue, + task.name, + options, + items.length, + ); + const messages: TaskMessage[] = await Promise.all( + items.map(encodeTaskMessage(ctx.codec, ctx.origin, task, options)), + ); + const claim = await claimDeduplication(ctx, plan, task.name); + if (!claim.proceed) return; + if (!ctx.federation.manuallyStartQueue) { + ctx.federation._startQueueInternal(ctx.data); + } + try { + await dispatch(queue, messages, { + delay: getDurationIfDefined(options.delay), + orderingKey: options.orderingKey, + deduplicationKey: claim.forwardedDeduplicationKey, + }); + // Counted only after a genuine dispatch: a dedup skip returns before + // this, and a failed dispatch throws into the rollback below. + const meter = getFederationMetrics(ctx.federation.meterProvider); + for (const message of messages) { + meter.recordQueueTaskEnqueued( + { role: "task", queue, taskName: task.name }, + message.attempt, + ); + } + } catch (error) { + if (claim.rollback != null) { + try { + await claim.rollback(); + } catch (rollbackError) { + logger.warn( + "Failed to roll back the deduplication marker for task " + + "{taskName} after a failed enqueue; it will expire by TTL. " + + "{rollbackError}", + { taskName: task.name, rollbackError }, + ); + } + } + throw error; + } + }; + +export default enqueueTasks; + +const getDurationIfDefined = (item: Temporal.DurationLike | undefined) => + item == null ? undefined : Temporal.Duration.from(item); + +/** + * The deduplication strategy chosen for an enqueue, settled before any payload + * is encoded so the fail-fast errors surface first. + */ +type DedupPlan = + | { readonly kind: "none" } + | { readonly kind: "native"; readonly key: string } + | { readonly kind: "cas"; readonly key: string } + | { readonly kind: "open"; readonly key: string }; + +/** + * Decides how a `deduplicationKey` (if any) is honored: forwarded to a native + * queue, claimed via `cas`, or—when neither is available—dropped or rejected + * per the federation's `taskDeduplicationFallback`. Throws the fail-fast + * `TypeError`s so they precede the encode. + */ +function planDeduplication( + ctx: EnqueueTasksContext, + queue: MessageQueue, + taskName: string, + options: TaskEnqueueOptions, + itemCount: number, +): DedupPlan { + if (options.deduplicationKey == null) return { kind: "none" }; + const key = options.deduplicationKey; + const native = queue.nativeDeduplication === true; + const canCas = ctx.federation.kv.cas != null; + if (itemCount > 1 && queue.enqueueMany == null && (native || canCas)) { + throw new TypeError( + `Task ${ + JSON.stringify(taskName) + } was enqueued as a batch with a deduplicationKey, but its message ` + + "queue does not implement enqueueMany; a multi-item batch cannot be " + + "deduplicated atomically without it. Implement enqueueMany on the " + + "queue, or enqueue the tasks individually with enqueueTask().", + ); + } + if (native) return { kind: "native", key }; + if (canCas) return { kind: "cas", key }; + if (ctx.federation.taskDeduplicationFallback === "closed") { + // No conditional write, closed: fail fast before any side effect. + throw new TypeError( + "deduplicationKey was set but the message queue does not declare " + + "nativeDeduplication and the key-value store exposes no " + + 'conditional write (cas); set taskDeduplicationFallback to "open" ' + + "to proceed without deduplication, or use a backend that " + + "supports it.", + ); + } + return { kind: "open", key }; +} + +/** + * Executes the planned deduplication once the payloads are encoded. A native + * plan forwards its key to the queue; a `cas` plan claims the marker and stops + * the enqueue when it loses the race; an `open` plan logs and proceeds. + * @returns Whether to proceed, and the key (if any) to forward to the queue. + */ +async function claimDeduplication( + ctx: EnqueueTasksContext, + plan: DedupPlan, + taskName: string, +): Promise<{ + proceed: boolean; + forwardedDeduplicationKey?: string; + /** + * Undoes a reserved marker when dispatch fails. Present only for a `cas` + * plan that won its claim; a failed enqueue calls it so the retry is not + * deduplicated against a task that never reached the queue. + */ + rollback?: () => Promise; +}> { + switch (plan.kind) { + case "native": + return { proceed: true, forwardedDeduplicationKey: plan.key }; + case "cas": { + const cacheKey = [ + ...ctx.federation.kvPrefixes.taskDeduplication, + plan.key, + ] satisfies KvKey; + const token = crypto.randomUUID(); + const won = await ctx.federation.kv.cas!(cacheKey, undefined, token, { + ttl: ctx.federation.taskDeduplicationTtl, + }); + if (!won) return { proceed: false }; + return { + proceed: true, + // Conditional clear: cas succeeds only while the stored value is still + // our token, so we never delete a marker another enqueue now owns. + rollback: async () => { + await ctx.federation.kv.cas!(cacheKey, token, undefined); + }, + }; + } + case "open": + logger.debug( + "deduplicationKey {deduplicationKey} for task {taskName} ignored: " + + "the message queue declares no nativeDeduplication and the " + + "key-value store has no cas; proceeding (taskDeduplicationFallback " + + 'is "open").', + { deduplicationKey: plan.key, taskName }, + ); /* falls through */ + case "none": + return { proceed: true }; + default: { + const _exhaustive: never = plan; + throw new TypeError( + `Unknown deduplication plan: ${JSON.stringify(_exhaustive)}`, + ); + } + } +} + +/** + * Sends the encoded messages to the queue, picking the bulk path when the + * queue implements `enqueueMany` and otherwise fanning out parallel single + * enqueues. The fan-out drops `deduplicationKey`, which is only ever set for a + * native plan that the bulk paths already cover. + */ +async function dispatch( + queue: MessageQueue, + messages: readonly TaskMessage[], + options: { + delay?: Temporal.Duration; + orderingKey?: string; + deduplicationKey?: string; + }, +): Promise { + if (messages.length === 1) { + await queue.enqueue(messages[0], options); + } else if (queue.enqueueMany != null) { + await queue.enqueueMany(messages, options); + } else { + const fanoutOptions = { + delay: options.delay, + orderingKey: options.orderingKey, + }; + await Promise.all(messages.map((m) => queue.enqueue(m, fanoutOptions))); + } +} + +/** + * Builds the per-payload encoder: validates and serializes the payload, then + * stamps the message envelope with a fresh id, the context's origin, and the + * active trace context. Curried so the batch encode reuses one bound encoder. + */ +const encodeTaskMessage = ( + codec: TaskCodec, + origin: string, + task: TaskDefinition, + options: TaskEnqueueOptions, +) => +async (data: TData): Promise => { + const encoded = await codec.encode(task.schema, data); + const carrier: Record = {}; + propagation.inject(context.active(), carrier); + return { + type: "task", + id: crypto.randomUUID(), + baseUrl: origin, + taskName: task.name, + data: encoded, + started: Temporal.Now.instant().toString(), + attempt: 0, + orderingKey: options.orderingKey, + traceContext: carrier, + }; +}; + +const logger = getLogger(["fedify", "federation", "task"]); diff --git a/packages/fedify/src/federation/tasks/mod.ts b/packages/fedify/src/federation/tasks/mod.ts index e151fa576..62072ac45 100644 --- a/packages/fedify/src/federation/tasks/mod.ts +++ b/packages/fedify/src/federation/tasks/mod.ts @@ -7,6 +7,7 @@ * @module */ export { default as TaskCodec } from "./codec.ts"; +export { default as enqueueTasks } from "./enqueue.ts"; export type { TaskDefinition, TaskDefinitionInternal, diff --git a/packages/fedify/src/federation/tasks/task.ts b/packages/fedify/src/federation/tasks/task.ts index 0105e8e21..a095e713d 100644 --- a/packages/fedify/src/federation/tasks/task.ts +++ b/packages/fedify/src/federation/tasks/task.ts @@ -159,6 +159,26 @@ export interface TaskEnqueueOptions { * processed sequentially (one at a time). */ readonly orderingKey?: string; + + /** + * An optional key requesting at-most-once enqueue for tasks that share it. + * + * A queue with {@link MessageQueue.nativeDeduplication} `true` enforces it + * strictly; otherwise deduplication is best-effort via {@link KvStore.cas}, + * and {@link FederationOptions.taskDeduplicationFallback} decides whether a + * missing `cas` proceeds without deduplication or throws. + * + * For {@link Context.enqueueTaskMany}, one key governs the whole batch. When + * deduplication is actually applied—a native queue, or the key–value + * fallback through {@link KvStore.cas}—a multi-item batch with a + * `deduplicationKey` requires the queue to implement + * {@link MessageQueue.enqueueMany} so it enqueues atomically, or the call + * throws a `TypeError`. Under the `"open"` fallback with no `cas`, no marker + * is taken, so such a batch instead fans out without deduplication. + * + * @since 2.x.x + */ + readonly deduplicationKey?: string; } /** diff --git a/packages/fedify/src/federation/tasks/tasks.test.ts b/packages/fedify/src/federation/tasks/tasks.test.ts index 6ac05255a..265e2ba34 100644 --- a/packages/fedify/src/federation/tasks/tasks.test.ts +++ b/packages/fedify/src/federation/tasks/tasks.test.ts @@ -1,5 +1,12 @@ -import { mockDocumentLoader, test } from "@fedify/fixture"; +import { + createTestMeterProvider, + createTestTracerProvider, + mockDocumentLoader, + test, +} from "@fedify/fixture"; import { Note } from "@fedify/vocab"; +import { propagation, SpanStatusCode } from "@opentelemetry/api"; +import { W3CTraceContextPropagator } from "@opentelemetry/core"; import { delay } from "es-toolkit"; import { deepStrictEqual, @@ -8,31 +15,35 @@ import { strictEqual, throws, } from "node:assert/strict"; +import { + baseOptions, + makeSchema, + MockQueue, + numberSchema, + stringSchema, +} from "../../testing/mod.ts"; import { createFederationBuilder } from "../builder.ts"; import type { Context } from "../context.ts"; import type { Federatable, FederationOptions } from "../federation.ts"; -import { MemoryKvStore } from "../kv.ts"; import { createFederation, type FederationImpl } from "../middleware.ts"; import { InProcessMessageQueue } from "../mq.ts"; import type { TaskMessage } from "../queue.ts"; import TaskCodec from "./codec.ts"; import type { TaskDefinition, TaskRegistry } from "./task.ts"; -import { - type Envelope, - envelopeSchema, - MockQueue, - numberSchema, - stringSchema, -} from "../../testing/mod.ts"; type Assert = T; -const baseOptions: Omit, "queue"> = { - kv: new MemoryKvStore(), - documentLoaderFactory: () => mockDocumentLoader, - contextLoaderFactory: () => mockDocumentLoader, - manuallyStartQueue: true, -}; +interface Envelope { + note: Note; + title: string; +} + +const envelopeSchema = makeSchema( + (data): data is Envelope => + typeof data === "object" && data != null && + (data as Envelope).note instanceof Note && + typeof (data as Envelope).title === "string", +); const makeTaskMessage = async ( taskName: string, @@ -211,224 +222,6 @@ test("Context.enqueueTask() end-to-end", async (t) => { strictEqual(data.title, "greeting"); strictEqual(handlerCtx.origin, "https://example.com"); }); - - await t.step("rejects an invalid payload at enqueue", async () => { - const queue = new MockQueue(); - const federation = createFederation({ - ...baseOptions, - queue: { task: queue }, - }); - const task = federation.defineTask("strictly-typed", { - schema: numberSchema, - handler: () => {}, - }); - const ctx = federation.createContext( - new URL("https://example.com/"), - undefined, - ); - await rejects( - // deno-lint-ignore no-explicit-any - () => ctx.enqueueTask(task, "not a number" as any), - { name: "TypeError", message: /Task data failed schema validation/ }, - ); - strictEqual(queue.enqueued.length, 0); - }); - - await t.step( - "starts the task worker on first enqueue without startQueue()", - async () => { - const queue = new MockQueue(); - const federation = createFederation({ - ...baseOptions, - manuallyStartQueue: false, - queue: { task: queue }, - }); - const task = federation.defineTask("auto-start", { - schema: stringSchema, - handler: () => {}, - }); - const ctx = federation.createContext( - new URL("https://example.com/"), - undefined, - ); - // An app that only uses the custom task API never sends an activity, - // so enqueueTask() itself must start the worker like the other - // enqueue paths do; otherwise tasks pile up unprocessed forever. - await ctx.enqueueTask(task, "first"); - strictEqual(queue.listenCount, 1); - // The started flag keeps a second enqueue from re-listening. - await ctx.enqueueTask(task, "second"); - strictEqual(queue.listenCount, 1); - strictEqual(queue.enqueued.length, 2); - }, - ); - - await t.step( - "rejects a handle from another federation at enqueue", - async () => { - const queue = new MockQueue(); - const federation = createFederation({ - ...baseOptions, - queue: { task: queue }, - }); - const other = createFederation({ - ...baseOptions, - queue: { task: new MockQueue() }, - }); - const foreignTask = other.defineTask("foreign", { - schema: stringSchema, - handler: () => {}, - }); - const ctx = federation.createContext( - new URL("https://example.com/"), - undefined, - ); - await rejects( - () => ctx.enqueueTask(foreignTask, "data"), - { name: "TypeError", message: /is not defined on this federation/ }, - ); - strictEqual(queue.enqueued.length, 0); - }, - ); - - await t.step( - "rejects a same-named handle from another federation", - async () => { - // Name lookup alone cannot tell a foreign handle apart once both - // instances define the same task name: the local context would - // encode under the *schema carried by the foreign handle*, so a - // payload the local schema rejects would enqueue anyway, only to be - // dropped by the worker decoding under the local schema. Both - // instances share TContextData = void, so the phantom-brand check - // cannot reject this at compile time; the handle-identity guard is - // the only defense. - const queue = new MockQueue(); - const federation = createFederation({ - ...baseOptions, - queue: { task: queue }, - }); - let called = 0; - federation.defineTask("rename", { - schema: numberSchema, // the local "rename" takes a number… - handler: () => { - called++; - }, - }); - const other = createFederation({ - ...baseOptions, - queue: { task: new MockQueue() }, - }); - // …while the other instance's "rename" takes a string: - const foreignTask = other.defineTask("rename", { - schema: stringSchema, - handler: () => {}, - }); - const ctx = federation.createContext( - new URL("https://example.com/"), - undefined, - ); - await rejects( - () => ctx.enqueueTask(foreignTask, "not a number"), - { name: "TypeError", message: /is not defined on this federation/ }, - ); - strictEqual(queue.enqueued.length, 0); - strictEqual(called, 0); - }, - ); - - await t.step("passes delay and orderingKey through", async () => { - const queue = new MockQueue(); - const federation = createFederation({ - ...baseOptions, - queue: { task: queue }, - }); - const task = federation.defineTask("delayed", { - schema: stringSchema, - handler: () => {}, - }); - const ctx = federation.createContext( - new URL("https://example.com/"), - undefined, - ); - await ctx.enqueueTask(task, "payload", { - delay: { seconds: 30 }, - orderingKey: "user:alice", - }); - strictEqual(queue.enqueued.length, 1); - const { message, options } = queue.enqueued[0]; - strictEqual(message.taskName, "delayed"); - strictEqual(message.orderingKey, "user:alice"); - strictEqual(message.attempt, 0); - ok(options?.delay instanceof Temporal.Duration); - strictEqual(options.delay.total("second"), 30); - strictEqual(options.orderingKey, "user:alice"); - }); - - await t.step( - "enqueueTaskMany() uses enqueueMany when available", - async () => { - const queue = new MockQueue({ supportsEnqueueMany: true }); - const federation = createFederation({ - ...baseOptions, - queue: { task: queue }, - }); - const task = federation.defineTask("bulk", { - schema: stringSchema, - handler: () => {}, - }); - const ctx = federation.createContext( - new URL("https://example.com/"), - undefined, - ); - await ctx.enqueueTaskMany(task, ["a", "b", "c"]); - strictEqual(queue.enqueued.length, 0); - strictEqual(queue.enqueuedMany.length, 1); - strictEqual(queue.enqueuedMany[0].messages.length, 3); - }, - ); - - await t.step( - "enqueueTaskMany() falls back to parallel enqueues", - async () => { - const queue = new MockQueue(); - const federation = createFederation({ - ...baseOptions, - queue: { task: queue }, - }); - const task = federation.defineTask("bulk-fallback", { - schema: stringSchema, - handler: () => {}, - }); - const ctx = federation.createContext( - new URL("https://example.com/"), - undefined, - ); - await ctx.enqueueTaskMany(task, ["a", "b"]); - strictEqual(queue.enqueued.length, 2); - }, - ); - - await t.step( - "enqueueTaskMany() with no payloads touches no queue", - async () => { - const queue = new MockQueue({ supportsEnqueueMany: true }); - const federation = createFederation({ - ...baseOptions, - queue: { task: queue }, - }); - const task = federation.defineTask("bulk-empty", { - schema: stringSchema, - handler: () => {}, - }); - const ctx = federation.createContext( - new URL("https://example.com/"), - undefined, - ); - await ctx.enqueueTaskMany(task, []); - strictEqual(queue.enqueued.length, 0); - strictEqual(queue.enqueuedMany.length, 0); - }, - ); }); test("task queue routing", async (t) => { @@ -849,3 +642,426 @@ test("processQueuedTask() task dispatch", async (t) => { strictEqual(queue.enqueued.length, 0); }); }); + +/** Wires test telemetry doubles into a fresh federation for the suite below. */ +const instrument = (options: FederationOptions) => { + const [meterProvider, recorder] = createTestMeterProvider(); + const [tracerProvider, exporter] = createTestTracerProvider(); + const federation = createFederation({ + ...options, + meterProvider, + tracerProvider, + }) as FederationImpl; + return { federation, recorder, exporter }; +}; + +test("task observability", async (t) => { + await t.step( + "opens a fedify.task span carrying name and attempt on success", + async () => { + const queue = new MockQueue(); + const { federation, recorder, exporter } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + federation.defineTask("sendDigest", { + schema: stringSchema, + handler: () => {}, + }); + await federation.processQueuedTask( + undefined, + await makeTaskMessage("sendDigest", "payload"), + ); + + const spans = exporter.getSpans("fedify.task"); + strictEqual(spans.length, 1); + strictEqual(spans[0].attributes["fedify.task.name"], "sendDigest"); + strictEqual(spans[0].attributes["fedify.task.attempt"], 0); + // A completed task carries no failure reason on its span… + strictEqual(spans[0].attributes["fedify.task.failure_reason"], undefined); + strictEqual(spans[0].status.code, SpanStatusCode.UNSET); + + const started = recorder.getMeasurements("fedify.queue.task.started"); + strictEqual(started.length, 1); + strictEqual(started[0].attributes["fedify.queue.role"], "task"); + strictEqual(started[0].attributes["fedify.task.name"], "sendDigest"); + + const completed = recorder.getMeasurements("fedify.queue.task.completed"); + strictEqual(completed.length, 1); + strictEqual(completed[0].attributes["fedify.queue.role"], "task"); + strictEqual(completed[0].attributes["fedify.task.name"], "sendDigest"); + strictEqual( + completed[0].attributes["fedify.queue.task.result"], + "completed", + ); + // …nor on its outcome metric. + strictEqual( + completed[0].attributes["fedify.task.failure_reason"], + undefined, + ); + strictEqual( + recorder.getMeasurements("fedify.queue.task.failed").length, + 0, + ); + }, + ); + + await t.step( + "inherits the parent trace context from the enqueue site", + async () => { + // The worker extracts the parent through the global propagator; a real + // W3C propagator is required because the default is a no-op. + const traceId = "0af7651916cd43dd8448eb211c80319c"; + const spanId = "b7ad6b7169203331"; + propagation.setGlobalPropagator(new W3CTraceContextPropagator()); + try { + const queue = new MockQueue(); + const { federation, exporter } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + federation.defineTask("traced", { + schema: stringSchema, + handler: () => {}, + }); + await federation.processQueuedTask( + undefined, + await makeTaskMessage("traced", "payload", { + traceContext: { + traceparent: `00-${traceId}-${spanId}-01`, + }, + }), + ); + + const span = exporter.getSpans("fedify.task")[0]; + ok(span != null); + strictEqual(span.spanContext().traceId, traceId); + strictEqual(span.parentSpanContext?.spanId, spanId); + } finally { + propagation.disable(); + } + }, + ); + + await t.step( + "attributes a deserialization failure and drops without retry", + async () => { + const queue = new MockQueue(); + const { federation, recorder, exporter } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + let called = 0; + federation.defineTask("decode-me", { + schema: stringSchema, + handler: () => { + called++; + }, + }); + const message = await makeTaskMessage("decode-me", "payload"); + await federation.processQueuedTask(undefined, { + ...message, + data: "garbage that is not devalue", + }); + + strictEqual(called, 0); + strictEqual(queue.enqueued.length, 0); // dropped, not retried + const failed = recorder.getMeasurements("fedify.queue.task.failed"); + strictEqual(failed.length, 1); + strictEqual(failed[0].attributes["fedify.queue.role"], "task"); + strictEqual( + failed[0].attributes["fedify.task.failure_reason"], + "deserialization", + ); + const span = exporter.getSpans("fedify.task")[0]; + strictEqual( + span.attributes["fedify.task.failure_reason"], + "deserialization", + ); + // A dropped payload is a failed outcome, so the span status is ERROR. + strictEqual(span.status.code, SpanStatusCode.ERROR); + }, + ); + + await t.step( + "attributes a validation failure and drops without retry", + async () => { + const queue = new MockQueue(); + const { federation, recorder, exporter } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + let called = 0; + federation.defineTask("strict-shape", { + schema: numberSchema, // expects a number… + handler: () => { + called++; + }, + }); + // …but a valid devalue payload carries a string. + await federation.processQueuedTask( + undefined, + await makeTaskMessage("strict-shape", "not a number"), + ); + + strictEqual(called, 0); + strictEqual(queue.enqueued.length, 0); // dropped, not retried + const failed = recorder.getMeasurements("fedify.queue.task.failed"); + strictEqual(failed.length, 1); + strictEqual( + failed[0].attributes["fedify.task.failure_reason"], + "validation", + ); + const span = exporter.getSpans("fedify.task")[0]; + strictEqual( + span.attributes["fedify.task.failure_reason"], + "validation", + ); + strictEqual(span.status.code, SpanStatusCode.ERROR); + }, + ); + + await t.step( + "attributes an unknown task and drops without retry", + async () => { + const queue = new MockQueue(); + const { federation, recorder, exporter } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + await federation.processQueuedTask( + undefined, + await makeTaskMessage("never-registered", "payload"), + ); + + strictEqual(queue.enqueued.length, 0); // dropped, not retried + const failed = recorder.getMeasurements("fedify.queue.task.failed"); + strictEqual(failed.length, 1); + strictEqual( + failed[0].attributes["fedify.task.failure_reason"], + "unknown_task", + ); + // The backend label is still populated on an unknown-task drop. + strictEqual(failed[0].attributes["fedify.queue.backend"], "MockQueue"); + // Cardinality guard: an unregistered, wire-derived task name must NOT + // become a metric attribute—it would spawn unbounded time series… + strictEqual(failed[0].attributes["fedify.task.name"], undefined); + strictEqual( + recorder.getMeasurements("fedify.queue.task.started")[0] + .attributes["fedify.task.name"], + undefined, + ); + const span = exporter.getSpans("fedify.task")[0]; + // …but the span still carries the real name for tracing the drop. + strictEqual(span.attributes["fedify.task.name"], "never-registered"); + strictEqual( + span.attributes["fedify.task.failure_reason"], + "unknown_task", + ); + strictEqual(span.status.code, SpanStatusCode.ERROR); + }, + ); + + await t.step( + "attributes a handler failure on a terminal give-up", + async () => { + const queue = new MockQueue(); + const { federation, recorder, exporter } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + federation.defineTask("explode", { + schema: stringSchema, + handler: () => { + throw new Error("boom"); + }, + retryPolicy: () => null, // give up immediately + }); + await federation.processQueuedTask( + undefined, + await makeTaskMessage("explode", "payload"), + ); + + strictEqual(queue.enqueued.length, 0); // gave up, no retry + const failed = recorder.getMeasurements("fedify.queue.task.failed"); + strictEqual(failed.length, 1); + strictEqual( + failed[0].attributes["fedify.task.failure_reason"], + "handler", + ); + const span = exporter.getSpans("fedify.task")[0]; + strictEqual( + span.attributes["fedify.task.failure_reason"], + "handler", + ); + // A terminal give-up is a failed outcome, so the span status is ERROR. + strictEqual(span.status.code, SpanStatusCode.ERROR); + }, + ); + + await t.step( + "reports the resolved outbox queue as the backend on fallback", + async () => { + // A distinctly named queue so the backend label is unambiguous. + class FallbackOutboxQueue extends MockQueue {} + const outboxQueue = new FallbackOutboxQueue(); + const { federation, recorder } = instrument({ + ...baseOptions, + queue: { outbox: outboxQueue }, // no dedicated task queue + }); + federation.defineTask("fallback", { + schema: stringSchema, + handler: () => {}, + }); + await federation.processQueuedTask( + undefined, + await makeTaskMessage("fallback", "payload"), + ); + + const completed = recorder.getMeasurements("fedify.queue.task.completed"); + strictEqual(completed.length, 1); + strictEqual( + completed[0].attributes["fedify.queue.backend"], + "FallbackOutboxQueue", + ); + }, + ); + + await t.step( + "records an enqueue measurement with role task", + async () => { + const queue = new MockQueue(); + const { federation, recorder } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + const task = federation.defineTask("enqueue-me", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTask(task, "payload"); + + const enqueued = recorder.getMeasurements("fedify.queue.task.enqueued"); + strictEqual(enqueued.length, 1); + strictEqual(enqueued[0].attributes["fedify.queue.role"], "task"); + strictEqual(enqueued[0].attributes["fedify.task.name"], "enqueue-me"); + strictEqual(enqueued[0].attributes["fedify.queue.task.attempt"], 0); + }, + ); + + await t.step( + "records the retry re-enqueue with role task and a bumped attempt", + async () => { + const queue = new MockQueue(); + const { federation, recorder } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + federation.defineTask("retry-me", { + schema: stringSchema, + handler: () => { + throw new Error("boom"); + }, + retryPolicy: () => Temporal.Duration.from({ milliseconds: 1 }), + }); + await federation.processQueuedTask( + undefined, + await makeTaskMessage("retry-me", "payload"), + ); + + strictEqual(queue.enqueued.length, 1); + strictEqual(queue.enqueued[0].message.attempt, 1); + const enqueued = recorder.getMeasurements("fedify.queue.task.enqueued"); + strictEqual(enqueued.length, 1); + strictEqual(enqueued[0].attributes["fedify.queue.role"], "task"); + strictEqual(enqueued[0].attributes["fedify.task.name"], "retry-me"); + strictEqual(enqueued[0].attributes["fedify.queue.task.attempt"], 1); + }, + ); + + await t.step( + "records an abort as aborted, without a failure reason or error status", + async () => { + const queue = new MockQueue({ nativeRetrial: true }); + const { federation, recorder, exporter } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + federation.defineTask("aborts", { + schema: stringSchema, + handler: () => { + throw globalThis.Object.assign(new Error("shutting down"), { + name: "AbortError", + }); + }, + }); + const message = await makeTaskMessage("aborts", "payload"); + await rejects( + () => federation.processQueuedTask(undefined, message), + { name: "AbortError" }, + ); + + const span = exporter.getSpans("fedify.task")[0]; + ok(span != null); + strictEqual(span.attributes["fedify.task.failure_reason"], undefined); + strictEqual(span.status.code, SpanStatusCode.UNSET); + strictEqual( + recorder.getMeasurements("fedify.queue.task.failed").length, + 0, + ); + const durations = recorder.getMeasurements("fedify.queue.task.duration"); + strictEqual(durations.length, 1); + strictEqual( + durations[0].attributes["fedify.queue.task.result"], + "aborted", + ); + }, + ); + + await t.step( + "on a non-native queue an aborted handler is retried, not failed", + async () => { + const queue = new MockQueue(); // nativeRetrial: false + const { federation, recorder, exporter } = instrument({ + ...baseOptions, + queue: { task: queue }, + }); + federation.defineTask("aborts-soft", { + schema: stringSchema, + handler: () => { + throw globalThis.Object.assign(new Error("shutting down"), { + name: "AbortError", + }); + }, + retryPolicy: () => Temporal.Duration.from({ milliseconds: 1 }), + }); + await federation.processQueuedTask( + undefined, + await makeTaskMessage("aborts-soft", "payload"), + ); + + strictEqual(queue.enqueued.length, 1); // retried, behavior unchanged + strictEqual(queue.enqueued[0].message.attempt, 1); + // No `handler` failure leaks onto the span or any failure metric… + const span = exporter.getSpans("fedify.task")[0]; + strictEqual(span.attributes["fedify.task.failure_reason"], undefined); + strictEqual(span.status.code, SpanStatusCode.UNSET); + strictEqual( + recorder.getMeasurements("fedify.queue.task.failed").length, + 0, + ); + // …and the swallowed-into-retry attempt records `completed`, matching the + // inbox/outbox internal-retry convention. + const completed = recorder.getMeasurements("fedify.queue.task.completed"); + strictEqual(completed.length, 1); + strictEqual( + completed[0].attributes["fedify.task.failure_reason"], + undefined, + ); + }, + ); +}); diff --git a/packages/fedify/src/testing/mod.ts b/packages/fedify/src/testing/mod.ts index 27cf5bdc9..393b217f1 100644 --- a/packages/fedify/src/testing/mod.ts +++ b/packages/fedify/src/testing/mod.ts @@ -4,6 +4,7 @@ export { createRequestContext, } from "./context.ts"; export { + baseOptions, type Envelope, envelopeSchema, makeSchema, diff --git a/packages/fedify/src/testing/tasks.ts b/packages/fedify/src/testing/tasks.ts index 2dc50c83d..7ccf727fc 100644 --- a/packages/fedify/src/testing/tasks.ts +++ b/packages/fedify/src/testing/tasks.ts @@ -1,5 +1,19 @@ +/** + * Test-only utilities shared by the task suites: the schema factory and stock + * schemas, the base federation options, and the recording {@link MockQueue}. + * + * These helpers live beside the suites that use them rather than in a shared + * package because {@link MockQueue} needs the package-internal + * {@link TaskMessage} type, and *deno.json*'s `publish.exclude` keeps this + * module out of the published sources. + * + * @module + */ +import { mockDocumentLoader } from "@fedify/fixture"; import { Note } from "@fedify/vocab"; import type { StandardSchemaV1 } from "@standard-schema/spec"; +import type { FederationOptions } from "../federation/federation.ts"; +import { MemoryKvStore } from "../federation/kv.ts"; import type { MessageQueue, MessageQueueEnqueueOptions, @@ -7,6 +21,14 @@ import type { } from "../federation/mq.ts"; import type { TaskMessage } from "../federation/queue.ts"; +/** Federation options (sans `queue`) shared by the task suites. */ +export const baseOptions: Omit, "queue"> = { + kv: new MemoryKvStore(), + documentLoaderFactory: () => mockDocumentLoader, + contextLoaderFactory: () => mockDocumentLoader, + manuallyStartQueue: true, +}; + /** * Builds a minimal [Standard Schema](https://standardschema.dev/) from a type * guard, for use as a task payload schema in tests. @@ -45,19 +67,30 @@ export const envelopeSchema = makeSchema( typeof (data as Envelope).title === "string", ); -/** Options for {@link MockQueue}. */ +/** + * Options for the {@link MockQueue} constructor. + */ export interface MockQueueOptions { + /** Sets {@link MessageQueue.nativeRetrial}. Defaults to `false`. */ readonly nativeRetrial?: boolean; + /** Sets {@link MessageQueue.nativeDeduplication}. Defaults to `false`. */ + readonly nativeDeduplication?: boolean; + /** + * When `true`, the queue exposes {@link MockQueue.enqueueMany} and records + * bulk enqueues; when omitted, the method is absent so callers exercise the + * per-message fan-out path. + */ readonly supportsEnqueueMany?: boolean; } /** - * A {@link MessageQueue} that records what it was asked to enqueue and resolves - * its `listen()` when the abort signal fires, so tests can inspect dispatch - * without a real backend. + * An in-memory {@link MessageQueue} that records task enqueues for assertions + * instead of delivering anything. Its {@link listen} resolves only when the + * abort signal fires. */ export class MockQueue implements MessageQueue { readonly nativeRetrial: boolean; + readonly nativeDeduplication: boolean; readonly enqueued: { message: TaskMessage; options?: MessageQueueEnqueueOptions; @@ -74,6 +107,7 @@ export class MockQueue implements MessageQueue { constructor(options: MockQueueOptions = {}) { this.nativeRetrial = options.nativeRetrial ?? false; + this.nativeDeduplication = options.nativeDeduplication ?? false; if (options.supportsEnqueueMany) { this.enqueueMany = (messages, opts) => { this.enqueuedMany.push({ messages, options: opts }); @@ -82,15 +116,16 @@ export class MockQueue implements MessageQueue { } } - // deno-lint-ignore no-explicit-any - enqueue(message: any, options?: MessageQueueEnqueueOptions): Promise { + enqueue( + message: TaskMessage, + options?: MessageQueueEnqueueOptions, + ): Promise { this.enqueued.push({ message, options }); return Promise.resolve(); } listen( - // deno-lint-ignore no-explicit-any - _handler: (message: any) => Promise | void, + _handler: (message: TaskMessage) => Promise | void, options?: MessageQueueListenOptions, ): Promise { this.listenCount++; diff --git a/scripts/check_fixture_usage.ts b/scripts/check_fixture_usage.ts index 73ef518d9..ad4c921cc 100644 --- a/scripts/check_fixture_usage.ts +++ b/scripts/check_fixture_usage.ts @@ -10,6 +10,7 @@ * Reviewers must NOT treat a passing run as proof of safety; code * review and the published package contents remain the source of truth. */ +import { expandGlobSync } from "@std/fs/expand-glob"; import { walk } from "@std/fs/walk"; import { dirname, @@ -20,6 +21,15 @@ import { SEPARATOR, } from "@std/path"; +const projectRoot = resolve(dirname(fromFileUrl(import.meta.url)), ".."); +const packagesDir = resolve(projectRoot, "packages"); + +const expandGlobPattern = (pattern: string) => + Array.from( + expandGlobSync(pattern, { root: projectRoot, includeDirs: false }), + (file) => relative(projectRoot, file.path), + ); + /** * Files exempt from the "@fedify/fixture imports must live in *.test.ts" * rule. Every entry MUST be accompanied by an inline comment explaining @@ -27,23 +37,13 @@ import { * necessary or not. */ const ALLOWLIST: readonly string[] = [ - // cfworkers test harness re-exports `mockDocumentLoader`; bundled in via - // tsdown `noExternal` so consumers never resolve `@fedify/fixture` at - // runtime. - "packages/fedify/src/testing/context.ts", - // cfworkers test harness re-exports `testDefinitions`; bundled in via - // tsdown `noExternal` so consumers never resolve `@fedify/fixture` at - // runtime. - "packages/fedify/src/testing/mod.ts", - // Test utils for custom tasks - "packages/fedify/src/testing/tasks.ts", + // Utils for tests. + "packages/fedify/src/testing/*", // JSDoc `@example` block mentions `import { test } from "@fedify/fixture"` // as documentation; not a real runtime import. "packages/testing/src/mq-tester.ts", -].map((path) => join(...path.split("/") as [string, ...string[]])); - -const projectRoot = resolve(dirname(fromFileUrl(import.meta.url)), ".."); -const packagesDir = resolve(projectRoot, "packages"); +].map((path) => join(...path.split("/") as [string, ...string[]])) + .flatMap((path) => path.includes("*") ? expandGlobPattern(path) : path); /** * Statement-level pattern for any `import` or `export ... from`