diff --git a/CHANGES.md b/CHANGES.md index ecc65bd58..94170b388 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -285,8 +285,18 @@ To be released. `FederationOptions.taskQueueResolution` option is set to `"strict"`. `Federation.startQueue()` now accepts `queue: "task"` to run a task-only worker. - - [[#206], [#797], [#803] by ChanHaeng Lee] + - Tasks can request at-most-once enqueue with a `deduplicationKey` + (new `TaskEnqueueOptions.deduplicationKey`). A queue declaring the new + `MessageQueue.nativeDeduplication` capability owns the check and + receives the key through the new + `MessageQueueEnqueueOptions.deduplicationKey`; otherwise Fedify + performs a best-effort key–value guard through the optional + `KvStore.cas` primitive, under a new `taskDeduplication` key prefix. + The marker TTL and the no-`cas` fallback are tunable with the new + `FederationOptions.taskDeduplicationTtl` and + `FederationOptions.taskDeduplicationFallback` options. + + [[#206], [#797], [#798], [#803] by ChanHaeng Lee] [Standard Schema]: https://standardschema.dev/ [#206]: https://github.com/fedify-dev/fedify/issues/206 @@ -320,6 +330,7 @@ To be released. [#782]: https://github.com/fedify-dev/fedify/issues/782 [#787]: https://github.com/fedify-dev/fedify/pull/787 [#797]: https://github.com/fedify-dev/fedify/issues/797 +[#798]: https://github.com/fedify-dev/fedify/issues/798 [#800]: https://github.com/fedify-dev/fedify/pull/800 [#803]: https://github.com/fedify-dev/fedify/pull/803 diff --git a/docs/manual/tasks.md b/docs/manual/tasks.md index 55adef8dc..3f51cdfe7 100644 --- a/docs/manual/tasks.md +++ b/docs/manual/tasks.md @@ -1,3 +1,5 @@ + + Background tasks ================ @@ -139,6 +141,10 @@ Both methods accept options: : Tasks with the same ordering key are processed sequentially (one at a time), like the same option on the message queue layer. +`deduplicationKey` +: Requests at-most-once enqueue for tasks that share the key; see + [Deduplication](#deduplication) below. + ~~~~ typescript await ctx.enqueueTask(sendDigest, payload, { delay: { minutes: 30 }, @@ -257,12 +263,96 @@ delivered it. > queue and set `taskQueueResolution: "strict"`. +Deduplication +------------- + +A task often needs *at-most-once-per-key* enqueue: a digest mailer must not +send twice when a request is retried, and a cleanup job should coalesce +duplicate triggers. Passing a `deduplicationKey` requests this—a second +enqueue with the same key is dropped while the first is still within the +deduplication window: + +~~~~ typescript +await ctx.enqueueTask(sendDigest, payload, { + deduplicationKey: `digest:${payload.userId}`, // [!code highlight] +}); +~~~~ + +How the key is resolved depends on the queue and the key–value store: + +1. **Native backend.** When the task's queue declares + `~MessageQueue.nativeDeduplication`, Fedify forwards the key in the + message queue's `~MessageQueueEnqueueOptions.deduplicationKey` and the + backend owns the check. Fedify does not touch the key–value store. + +2. **Key–value fallback.** Otherwise, if the configured `~KvStore` exposes + the optional compare-and-swap (`~KvStore.cas`) primitive, Fedify records + the key under a dedicated `taskDeduplication` prefix with a TTL and skips + the enqueue while a marker is present. The TTL defaults to one hour and is + configurable with `~FederationOptions.taskDeduplicationTtl`: + + ~~~~ typescript + const federation = createFederation({ + // ... + taskDeduplicationTtl: { minutes: 10 }, // [!code highlight] + }); + ~~~~ + +3. **No conditional write.** When neither applies—no native deduplication and + a key–value store without `~KvStore.cas`—the behavior is governed by + `~FederationOptions.taskDeduplicationFallback`. `"open"` (the default) + lets the enqueue proceed without deduplication after a debug-level log; + `"closed"` throws a `TypeError` before enqueuing: + + ~~~~ typescript + const federation = createFederation({ + // ... + taskDeduplicationFallback: "closed", // [!code highlight] + }); + ~~~~ + +Among the first-party adapters, the in-memory, Deno KV, SQLite, and MySQL +key–value stores implement `~KvStore.cas`; PostgreSQL, Redis, and +Cloudflare Workers KV do not yet, so those deployments take the +`taskDeduplicationFallback` branch until per-adapter follow-ups add it. + +For `~Context.enqueueTaskMany()`, a single `deduplicationKey` applies to the +whole batch: the batch enqueues as a unit or is skipped as a unit, never +partially. Per-item deduplication means calling `~Context.enqueueTask()` in +a loop, each with its own key. Deduplicating a multi-item batch requires the +queue to implement `~MessageQueue.enqueueMany()` so the batch enqueues +atomically—whether the check is native or the key–value fallback. Fanning the +key out across separate `~MessageQueue.enqueue()` calls cannot enqueue a whole +batch as one unit: a native per-message key cannot cover it, and a key–value +marker could not be rolled back cleanly if only some of the fanned-out enqueues +failed. So when deduplication is actually applied—a native queue, or a +key–value store with `~KvStore.cas`—Fedify rejects a multi-item batch with a +`deduplicationKey` on a queue without `~MessageQueue.enqueueMany()` instead of +risking duplicates. Under the `"open"` fallback (no native deduplication and no +`cas`), no marker is taken, so the batch simply fans out without deduplication. + +This applies through `~ParallelMessageQueue` as well: wrapping a queue that +lacks `~MessageQueue.enqueueMany()` does not make batch enqueue atomic, so a +deduplicated multi-item batch on such a wrapper is likewise rejected rather than +collapsed onto one message. + +> [!WARNING] +> The key–value fallback is **best-effort, not transactional**. The marker +> write and the enqueue are separate operations. Fedify rolls the marker back +> when an enqueue fails, so a transient failure does not suppress the retry, but +> a crash before that rollback, the `"open"` fallback under concurrency, a +> non-atomic third-party `~KvStore.cas`, or reuse of a key within its TTL window +> can still admit a duplicate or suppress a task. Cleanup is otherwise by TTL +> expiry, not active deletion on handler success. Deployments needing strict +> guarantees use a queue with `nativeDeduplication: true`, where the backend +> owns an atomic check. + + Limitations ----------- -The current API intentionally ships without deduplication, task-specific -OpenTelemetry spans and metrics, cron-style periodic scheduling, result -backends, and per-task priority. Some of these are planned as follow-ups; -see the [tracking issue]. +The current API intentionally ships without task-specific OpenTelemetry spans +and metrics, cron-style periodic scheduling, result backends, and per-task +priority. Some of these are planned as follow-ups; see the [tracking issue]. [tracking issue]: https://github.com/fedify-dev/fedify/issues/206 diff --git a/packages/fedify/src/federation/context.ts b/packages/fedify/src/federation/context.ts index fda194530..c381e7d06 100644 --- a/packages/fedify/src/federation/context.ts +++ b/packages/fedify/src/federation/context.ts @@ -459,8 +459,9 @@ export interface Context { /** * Enqueues multiple payloads for a custom background task at once. - * Uses the queue's bulk enqueue operation when available, falling back - * to parallel single enqueues. + * Uses the queue's bulk enqueue operation when available. Without + * deduplication, it may fall back to parallel single enqueues when the + * queue does not implement bulk enqueue. * @template TData The type of the task payload, inferred from the task's * schema. * @param task The handle returned by {@link TaskRegistry.defineTask}. @@ -468,8 +469,11 @@ export interface Context { * task's schema before being enqueued. * @param options Options for enqueuing the tasks. * @throws {TypeError} If the task is not defined on this federation, - * if no message queue is configured for tasks, or if - * a payload fails schema validation. + * if no message queue is configured for tasks, if + * a payload fails schema validation, or if a + * deduplicated multi-item batch cannot be enqueued + * atomically because the queue does not implement + * bulk enqueue. * @since 2.x.x */ enqueueTaskMany( diff --git a/packages/fedify/src/federation/federation.ts b/packages/fedify/src/federation/federation.ts index baa00edf2..5b4177f80 100644 --- a/packages/fedify/src/federation/federation.ts +++ b/packages/fedify/src/federation/federation.ts @@ -1103,6 +1103,32 @@ export interface FederationOptions { */ taskQueueResolution?: "fallback" | "strict"; + /** + * The time-to-live for a {@link TaskEnqueueOptions.deduplicationKey} marker + * stored in the key–value deduplication fallback. A second enqueue with the + * same key within this window is skipped; once it expires, the key may + * enqueue again. Ignored when the task's queue declares + * {@link MessageQueue.nativeDeduplication} (the backend owns the window). + * @default `{ hours: 1 }` + * @since 2.x.x + */ + taskDeduplicationTtl?: Temporal.DurationLike; + + /** + * The behavior when a {@link TaskEnqueueOptions.deduplicationKey} is supplied + * but the task's queue does not declare + * {@link MessageQueue.nativeDeduplication} *and* the configured + * {@link KvStore} exposes no `cas` (compare-and-swap) primitive: + * + * - `"open"` (the default): proceeds without deduplication after logging at + * debug level. + * - `"closed"`: rejects with a `TypeError` before enqueuing. + * + * @default `"open"` + * @since 2.x.x + */ + taskDeduplicationFallback?: "open" | "closed"; + /** * Activity transformers that are applied to outgoing activities. It is * useful for adjusting outgoing activities to satisfy some ActivityPub diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index 40b67f189..6ff54ca91 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -10565,12 +10565,15 @@ const withTimeout = ( return Promise.race([promise, timeout]).finally(() => clearTimeout(timer)); }; -const taskFederationOptions = { +// A factory, not a shared constant: each task test gets its own +// MemoryKvStore so deduplication markers never leak across tests and the +// suite stays order-independent as more cases are added. +const mockOptions = () => ({ kv: new MemoryKvStore(), documentLoaderFactory: () => mockDocumentLoader, contextLoaderFactory: () => mockDocumentLoader, manuallyStartQueue: true, -}; +}); test("ContextImpl.enqueueTask()", async (t) => { await t.step( @@ -10578,7 +10581,7 @@ test("ContextImpl.enqueueTask()", async (t) => { async () => { const queue = new MockQueue({ supportsEnqueueMany: true }); const federation = createFederation({ - ...taskFederationOptions, + ...mockOptions(), queue: { task: queue }, }); const task = federation.defineTask("greet", { @@ -10619,7 +10622,7 @@ test("ContextImpl.enqueueTaskMany()", async (t) => { async () => { const queue = new MockQueue({ supportsEnqueueMany: true }); const federation = createFederation({ - ...taskFederationOptions, + ...mockOptions(), queue: { task: queue }, }); const task = federation.defineTask("bulk", { @@ -10652,7 +10655,7 @@ test("ContextImpl.enqueueTaskMany()", async (t) => { async () => { const queue = new MockQueue({ supportsEnqueueMany: true }); const federation = createFederation({ - ...taskFederationOptions, + ...mockOptions(), queue: { task: queue }, }); const task = federation.defineTask("bulk-single", { @@ -10674,7 +10677,7 @@ test("ContextImpl.enqueueTaskMany()", async (t) => { async () => { const queue = new RendezvousQueue(2); const federation = createFederation({ - ...taskFederationOptions, + ...mockOptions(), queue: { task: queue }, }); const task = federation.defineTask("bulk-fallback", { @@ -10716,7 +10719,7 @@ test("ContextImpl.enqueueTaskMany()", async (t) => { async () => { const queue = new MockQueue(); const federation = createFederation({ - ...taskFederationOptions, + ...mockOptions(), queue: { task: queue }, }); const task = federation.defineTask("bulk-typed", { diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index 0148ef006..fa5e5c68d 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -168,6 +168,7 @@ import { type SenderKeyPair, } from "./send.ts"; import { + enqueueTasks, TaskCodec, type TaskDefinition, type TaskEnqueueOptions, @@ -512,6 +513,15 @@ export interface FederationKvPrefixes { * @since 2.3.0 */ readonly circuitBreaker: KvKey; + + /** + * The key prefix used for storing custom background task deduplication + * markers. Kept separate from {@link activityIdempotence} so the two key + * spaces never collide. + * @default `["_fedify", "taskDeduplication"]` + * @since 2.x.x + */ + readonly taskDeduplication: KvKey; } /** @@ -557,11 +567,7 @@ export class FederationImpl outboxQueue?: MessageQueue; fanoutQueue?: MessageQueue; taskQueue?: MessageQueue; - inboxQueueStarted: boolean; - outboxQueueStarted: boolean; - fanoutQueueStarted: boolean; - taskQueueStarted: boolean; - startedTaskQueues: Set; + startedQueues: Set; manuallyStartQueue: boolean; origin?: FederationOrigin; documentLoaderFactory: DocumentLoaderFactory; @@ -577,6 +583,8 @@ export class FederationImpl inboxRetryPolicy: RetryPolicy; taskRetryPolicy: RetryPolicy; taskQueueResolution: "fallback" | "strict"; + taskDeduplicationTtl: Temporal.Duration; + taskDeduplicationFallback: "open" | "closed"; circuitBreaker?: CircuitBreaker; activityTransformers: readonly ActivityTransformer[]; _tracerProvider: TracerProvider | undefined; @@ -638,6 +646,7 @@ export class FederationImpl httpMessageSignaturesSpec: ["_fedify", "httpMessageSignaturesSpec"], acceptSignatureNonce: ["_fedify", "acceptSignatureNonce"], circuitBreaker: ["_fedify", "circuit"], + taskDeduplication: ["_fedify", "taskDeduplication"], } satisfies FederationKvPrefixes), ...(options.kvPrefixes ?? {}), }; @@ -685,11 +694,7 @@ export class FederationImpl ); } } - this.inboxQueueStarted = false; - this.outboxQueueStarted = false; - this.fanoutQueueStarted = false; - this.taskQueueStarted = false; - this.startedTaskQueues = new Set(); + this.startedQueues = new Set(); this.manuallyStartQueue = options.manuallyStartQueue ?? false; if (options.origin != null) { if (typeof options.origin === "string") { @@ -871,6 +876,11 @@ export class FederationImpl this.taskRetryPolicy = options.taskRetryPolicy ?? createExponentialBackoffPolicy(); this.taskQueueResolution = options.taskQueueResolution ?? "fallback"; + this.taskDeduplicationTtl = Temporal.Duration.from( + options.taskDeduplicationTtl ?? { hours: 1 }, + ); + this.taskDeduplicationFallback = options.taskDeduplicationFallback ?? + "open"; this.activityTransformers = options.activityTransformers ?? getDefaultActivityTransformers(); this._tracerProvider = options.tracerProvider; @@ -935,109 +945,37 @@ export class FederationImpl signal?: AbortSignal, queue?: keyof FederationQueueOptions, ): Promise { - // Tasks may route to a dedicated queue of their own (defineTask({ queue })) - // even when no federation-wide queue is configured, so a deployment with - // only per-task queues still has work to start. - const hasDedicatedTaskQueue = [...this.taskDefinitions.values()].some( - (def) => def.queue != null, - ); - if ( - this.inboxQueue == null && this.outboxQueue == null && - this.fanoutQueue == null && this.taskQueue == null && - !hasDedicatedTaskQueue - ) { - return; - } + // Tasks fall back to the outbox queue and may add per-task queues; the + // identity Set then starts each instance once even when roles share one. + type QueueNameMessage = [ + keyof FederationQueueOptions, + MessageQueue | undefined, + ]; + const taskQueue = this.taskQueue ?? + (this.taskQueueResolution === "fallback" ? this.outboxQueue : undefined); + const customQueues = this.taskDefinitions.values() + .map((def): QueueNameMessage => ["task", def.queue]); + const targets: QueueNameMessage[] = [ + ["inbox", this.inboxQueue], + ["outbox", this.outboxQueue], + ["fanout", this.fanoutQueue], + ["task", taskQueue], + ...customQueues, + ]; const logger = getLogger(["fedify", "federation", "queue"]); const promises: Promise[] = []; - if ( - this.inboxQueue != null && (queue == null || queue === "inbox") && - !this.inboxQueueStarted - ) { - logger.debug("Starting an inbox task worker."); - this.inboxQueueStarted = true; + for (const [role, target] of targets) { + if (target == null || !(queue == null || queue === role)) continue; + if (this.startedQueues.has(target)) continue; + this.startedQueues.add(target); + logger.debug("Starting a {role} queue worker.", { role }); promises.push( - this.inboxQueue.listen( + target.listen( (msg) => this.processQueuedTask(ctxData, msg), { signal }, ), ); } - if ( - this.outboxQueue != null && - this.outboxQueue !== this.inboxQueue && - (queue == null || queue === "outbox") && - !this.outboxQueueStarted - ) { - logger.debug("Starting an outbox task worker."); - this.outboxQueueStarted = true; - promises.push( - this.outboxQueue.listen( - (msg) => this.processQueuedTask(ctxData, msg), - { signal }, - ), - ); - } - if ( - this.fanoutQueue != null && - this.fanoutQueue !== this.inboxQueue && - this.fanoutQueue !== this.outboxQueue && - (queue == null || queue === "fanout") && - !this.fanoutQueueStarted - ) { - logger.debug("Starting a fanout task worker."); - this.fanoutQueueStarted = true; - promises.push( - this.fanoutQueue.listen( - (msg) => this.processQueuedTask(ctxData, msg), - { signal }, - ), - ); - } - if ( - this.taskQueue != null && - this.taskQueue !== this.inboxQueue && - this.taskQueue !== this.outboxQueue && - this.taskQueue !== this.fanoutQueue && - (queue == null || queue === "task") && - !this.taskQueueStarted - ) { - logger.debug("Starting a task worker."); - this.taskQueueStarted = true; - promises.push( - this.taskQueue.listen( - (msg) => this.processQueuedTask(ctxData, msg), - { signal }, - ), - ); - } - // Dedicated per-task queues belong to the "task" selector. Each distinct - // instance needs its own worker; dedupe against the standard queues and - // against task queues already started on an earlier call so no instance is - // listened on twice. - if (queue == null || queue === "task") { - const standardQueues = new Set( - [this.inboxQueue, this.outboxQueue, this.fanoutQueue, this.taskQueue] - .filter((q): q is MessageQueue => q != null), - ); - for (const def of this.taskDefinitions.values()) { - const taskQueue = def.queue; - if ( - taskQueue == null || standardQueues.has(taskQueue) || - this.startedTaskQueues.has(taskQueue) - ) { - continue; - } - logger.debug("Starting a worker for a dedicated per-task queue."); - this.startedTaskQueues.add(taskQueue); - promises.push( - taskQueue.listen( - (msg) => this.processQueuedTask(ctxData, msg), - { signal }, - ), - ); - } - } await Promise.all(promises); } @@ -3106,6 +3044,10 @@ export class ContextImpl implements Context { return this.#codec ??= new TaskCodec(this); } + get #enqueueTasks() { + return enqueueTasks(this); + } + clone(data: TContextData): Context { return new ContextImpl({ url: this.url, @@ -3658,78 +3600,6 @@ export class ContextImpl implements Context { await this.#enqueueTasks(task, payloads, options); } - async #enqueueTasks( - task: TaskDefinition, - items: readonly TData[], - options: TaskEnqueueOptions, - ): Promise { - // Fail fast on a handle from another federation instance; without this - // check the message would enqueue fine and be dropped by the worker. - // Compare the registered handle by identity, not just the name: another - // instance may define the same task name with a different schema, and - // its handle would otherwise encode under that foreign schema here - // while the worker decodes under the local one. - const def = this.federation.taskDefinitions.get(task.name); - if (def == null || def.handle !== task) { - throw new TypeError( - `Task ${ - JSON.stringify(task.name) - } is not defined on this federation; ` + - "pass a handle returned by its defineTask().", - ); - } - const queue = this.federation.resolveTaskQueue(task.name); - if (queue == null) { - throw new TypeError( - "No message queue is configured for tasks; pass `queue` to " + - "createFederation() or to defineTask().", - ); - } - if (items.length < 1) return; - const delay = options.delay == null - ? undefined - : Temporal.Duration.from(options.delay); - // Encode in parallel: `enqueueTaskMany` is the bulk path, and the enqueue - // below already parallelizes, so serial encoding would be the bottleneck. - // `map` preserves order, and a rejected encode (validation failure) rejects - // the whole batch before anything is enqueued, keeping fail-fast intact. - const messages: TaskMessage[] = await Promise.all( - items.map(this.#encodeTaskMessage(task, options)), - ); - if (!this.federation.manuallyStartQueue) { - this.federation._startQueueInternal(this.data); - } - const enqueueOptions = { delay, orderingKey: options.orderingKey }; - if (messages.length === 1) { - await queue.enqueue(messages[0], enqueueOptions); - } else if (queue.enqueueMany != null) { - await queue.enqueueMany(messages, enqueueOptions); - } else { - await Promise.all(messages.map((m) => queue.enqueue(m, enqueueOptions))); - } - } - - #encodeTaskMessage = ( - task: TaskDefinition, - options: TaskEnqueueOptions, - ) => - async (data: TData): Promise => { - const encoded = await this.codec.encode(task.schema, data); - const carrier: Record = {}; - propagation.inject(context.active(), carrier); - return { - type: "task", - id: crypto.randomUUID(), - baseUrl: this.origin, - taskName: task.name, - data: encoded, - started: Temporal.Now.instant().toString(), - attempt: 0, - orderingKey: options.orderingKey, - traceContext: carrier, - }; - }; - sendActivity( sender: | SenderKeyPair diff --git a/packages/fedify/src/federation/mq.test.ts b/packages/fedify/src/federation/mq.test.ts index e7c402908..7c6497616 100644 --- a/packages/fedify/src/federation/mq.test.ts +++ b/packages/fedify/src/federation/mq.test.ts @@ -5,11 +5,13 @@ import { assertFalse, assertGreater, assertGreaterOrEqual, + assertRejects, } from "@std/assert"; import { delay } from "es-toolkit"; import { InProcessMessageQueue, type MessageQueue, + type MessageQueueEnqueueOptions, ParallelMessageQueue, } from "./mq.ts"; @@ -34,6 +36,10 @@ test("InProcessMessageQueue", async (t) => { assertFalse(mq.nativeRetrial); }); + await t.step("nativeDeduplication property", () => { + assertFalse(mq.nativeDeduplication); + }); + await t.step("getDepth() [empty]", async () => { assertEquals(await mq.getDepth(), { queued: 0, @@ -419,6 +425,138 @@ test("MessageQueue.nativeRetrial", async (t) => { }); }); +test("ParallelMessageQueue inherits nativeDeduplication", () => { + class NativeDeduplicationQueue implements MessageQueue { + readonly nativeDeduplication = true; + enqueue(): Promise { + return Promise.resolve(); + } + listen(): Promise { + return Promise.resolve(); + } + } + + const workers = new ParallelMessageQueue(new NativeDeduplicationQueue(), 5); + assert(workers.nativeDeduplication); +}); + +test( + "ParallelMessageQueue forwards deduplicationKey to the wrapped queue", + async () => { + class RecordingQueue implements MessageQueue { + readonly nativeDeduplication = true; + readonly singles: (MessageQueueEnqueueOptions | undefined)[] = []; + readonly batches: (MessageQueueEnqueueOptions | undefined)[] = []; + enqueue( + _message: unknown, + options?: MessageQueueEnqueueOptions, + ): Promise { + this.singles.push(options); + return Promise.resolve(); + } + enqueueMany( + _messages: readonly unknown[], + options?: MessageQueueEnqueueOptions, + ): Promise { + this.batches.push(options); + return Promise.resolve(); + } + listen(): Promise { + return Promise.resolve(); + } + } + + const inner = new RecordingQueue(); + const workers = new ParallelMessageQueue(inner, 5); + await workers.enqueue({ x: 1 }, { deduplicationKey: "k1" }); + await workers.enqueueMany([{ x: 1 }, { x: 2 }], { deduplicationKey: "k2" }); + assertEquals(inner.singles[0]?.deduplicationKey, "k1"); + assertEquals(inner.batches[0]?.deduplicationKey, "k2"); + }, +); + +test( + "ParallelMessageQueue rejects a deduplicated batch when the wrapped queue " + + "lacks enqueueMany", + async () => { + class NoBulkQueue implements MessageQueue { + readonly nativeDeduplication = true; + readonly enqueued: unknown[] = []; + enqueue(message: unknown): Promise { + this.enqueued.push(message); + return Promise.resolve(); + } + listen(): Promise { + return Promise.resolve(); + } + } + + const inner = new NoBulkQueue(); + const workers = new ParallelMessageQueue(inner, 5); + await assertRejects( + () => + workers.enqueueMany([{ x: 1 }, { x: 2 }], { deduplicationKey: "k" }), + TypeError, + "enqueueMany", + ); + // It threw before enqueuing anything. + assertEquals(inner.enqueued.length, 0); + }, +); + +test( + "ParallelMessageQueue falls back to enqueue() for a single deduplicated " + + "item when the wrapped queue lacks enqueueMany", + async () => { + class NoBulkQueue implements MessageQueue { + readonly nativeDeduplication = true; + readonly enqueued: unknown[] = []; + readonly options: (MessageQueueEnqueueOptions | undefined)[] = []; + enqueue( + message: unknown, + options?: MessageQueueEnqueueOptions, + ): Promise { + this.enqueued.push(message); + this.options.push(options); + return Promise.resolve(); + } + listen(): Promise { + return Promise.resolve(); + } + } + + const inner = new NoBulkQueue(); + const workers = new ParallelMessageQueue(inner, 5); + // The atomicity limitation only applies to multi-item fan-out, so a + // single-item batch forwards its deduplicationKey to enqueue() unchanged. + await workers.enqueueMany([{ x: 1 }], { deduplicationKey: "k" }); + assertEquals(inner.enqueued, [{ x: 1 }]); + assertEquals(inner.options[0]?.deduplicationKey, "k"); + }, +); + +test( + "ParallelMessageQueue still fans out a non-deduplicated batch when the " + + "wrapped queue lacks enqueueMany", + async () => { + class NoBulkQueue implements MessageQueue { + readonly enqueued: unknown[] = []; + enqueue(message: unknown): Promise { + this.enqueued.push(message); + return Promise.resolve(); + } + listen(): Promise { + return Promise.resolve(); + } + } + + const inner = new NoBulkQueue(); + const workers = new ParallelMessageQueue(inner, 5); + await workers.enqueueMany([{ x: 1 }, { x: 2 }, { x: 3 }]); + assertEquals(inner.enqueued.length, 3); + }, +); + const queues: Record Promise> = { InProcessMessageQueue: () => Promise.resolve(new InProcessMessageQueue()), }; @@ -450,6 +588,10 @@ for (const mqName in queues) { assertEquals(workers.nativeRetrial, mq.nativeRetrial); }); + await t.step("nativeDeduplication property inheritance", () => { + assertEquals(workers.nativeDeduplication, mq.nativeDeduplication); + }); + await t.step("getDepth() delegation", async () => { if (mq.getDepth == null) { assertEquals(workers.getDepth, undefined); diff --git a/packages/fedify/src/federation/mq.ts b/packages/fedify/src/federation/mq.ts index 36ba9900c..5fb33ea0c 100644 --- a/packages/fedify/src/federation/mq.ts +++ b/packages/fedify/src/federation/mq.ts @@ -25,6 +25,20 @@ export interface MessageQueueEnqueueOptions { * @since 2.0.0 */ readonly orderingKey?: string; + + /** + * An optional key requesting at-most-once enqueue semantics for messages + * that share it. A backend that declares + * {@link MessageQueue.nativeDeduplication} `true` owns the check: a message + * whose `deduplicationKey` was already seen within the backend's + * deduplication window is dropped instead of enqueued. Backends without + * native deduplication ignore this field; Fedify performs its own + * best-effort deduplication before reaching them on the paths that support + * it. + * + * @since 2.x.x + */ + readonly deduplicationKey?: string; } /** @@ -87,6 +101,18 @@ export interface MessageQueue { */ readonly nativeRetrial?: boolean; + /** + * Whether the message queue backend deduplicates messages that share a + * {@link MessageQueueEnqueueOptions.deduplicationKey} natively. When `true`, + * Fedify forwards the `deduplicationKey` and relies on the backend to drop + * duplicates; when `false` or omitted, Fedify applies its own best-effort + * key–value deduplication on the paths that request it. + * + * @default `false` + * @since 2.x.x + */ + readonly nativeDeduplication?: boolean; + /** * Enqueues a message in the queue. * @param message The message to enqueue. @@ -176,6 +202,12 @@ export class InProcessMessageQueue implements MessageQueue { */ readonly nativeRetrial = false; + /** + * In-process message queue does not deduplicate messages natively. + * @since 2.x.x + */ + readonly nativeDeduplication = false; + /** * Constructs a new {@link InProcessMessageQueue} with the given options. * @param options Additional options for the in-process message queue. @@ -365,6 +397,12 @@ export class ParallelMessageQueue implements MessageQueue { * @since 1.7.0 */ readonly nativeRetrial?: boolean; + + /** + * Inherits the native deduplication capability from the wrapped queue. + * @since 2.x.x + */ + readonly nativeDeduplication?: boolean; readonly getDepth?: () => Promise; /** @@ -398,6 +436,7 @@ export class ParallelMessageQueue implements MessageQueue { this.queue = queue; this.workers = workers; this.nativeRetrial = queue.nativeRetrial; + this.nativeDeduplication = queue.nativeDeduplication; if (queue.getDepth != null) { this.getDepth = () => queue.getDepth!(); } @@ -412,6 +451,19 @@ export class ParallelMessageQueue implements MessageQueue { options?: MessageQueueEnqueueOptions, ): Promise { if (this.queue.enqueueMany == null) { + if (messages.length === 1) { + await this.queue.enqueue(messages[0], options); + return; + } + if (options?.deduplicationKey != null) { + throw new TypeError( + "Cannot enqueue a batch with a deduplicationKey: the wrapped queue " + + "does not implement enqueueMany, so ParallelMessageQueue would " + + "have to fan out to individual enqueue() calls that cannot share " + + "one deduplicationKey atomically. Wrap a queue that implements " + + "enqueueMany instead.", + ); + } const results = await Promise.allSettled( messages.map((message) => this.queue.enqueue(message, options)), ); diff --git a/packages/fedify/src/federation/tasks/codec.ts b/packages/fedify/src/federation/tasks/codec.ts index f13d85581..e649ce061 100644 --- a/packages/fedify/src/federation/tasks/codec.ts +++ b/packages/fedify/src/federation/tasks/codec.ts @@ -4,12 +4,20 @@ import type { TracerProvider } from "@opentelemetry/api"; import type { StandardSchemaV1 } from "@standard-schema/spec"; import { parse, stringifyAsync } from "devalue"; +/** + * Serializes and deserializes task payloads for the queue, preserving + * `@fedify/vocab` objects across the wire by reducing them to JSON-LD and + * rebuilding them on the worker with the bound {@link TaskCodecLoaders}. + * @internal + */ export default class TaskCodec { constructor(readonly options: TaskCodecLoaders) {} + /** Serializes `data`, encoding any vocabulary object as its JSON-LD. */ serialize = (data: unknown): Promise => stringifyAsync(data, { Vocab: this.#stringifyVocab }); + /** Deserializes `raw`, rebuilding any encoded vocabulary object. */ deserialize = (raw: string): Promise => this.#revive(new Map())(parse(raw, { Vocab: VocabHolder.from })); @@ -26,6 +34,7 @@ export default class TaskCodec { ): Promise> => TaskCodec.validate(schema, await this.deserialize(raw)); + /** Validates `data` against `schema`, returning its parsed output. */ static validate = async ( schema: S, data: unknown, diff --git a/packages/fedify/src/federation/tasks/enqueue.test.ts b/packages/fedify/src/federation/tasks/enqueue.test.ts new file mode 100644 index 000000000..ee7db8014 --- /dev/null +++ b/packages/fedify/src/federation/tasks/enqueue.test.ts @@ -0,0 +1,1372 @@ +import { test } from "@fedify/fixture"; +import { configure, type LogRecord, reset } from "@logtape/logtape"; +import { delay } from "es-toolkit"; +import { deepStrictEqual, ok, rejects, strictEqual } from "node:assert/strict"; +import { + baseOptions, + makeSchema, + MockQueue, + numberSchema, + stringSchema, +} from "../../testing/mod.ts"; +import { + type KvKey, + type KvStore, + type KvStoreListEntry, + type KvStoreSetOptions, + MemoryKvStore, +} from "../kv.ts"; +import { createFederation } from "../middleware.ts"; +import { + type MessageQueue, + type MessageQueueEnqueueOptions, + ParallelMessageQueue, +} from "../mq.ts"; +import type { TaskMessage } from "../queue.ts"; + +/** + * A {@link KvStore} that delegates to an in-memory store but deliberately + * omits `cas`, so that `kv.cas == null`. This drives the deduplication + * fallback branches that fire when no conditional-write primitive exists. + */ +class CaslessKvStore implements KvStore { + readonly inner = new MemoryKvStore(); + get(key: KvKey): Promise { + return this.inner.get(key); + } + set(key: KvKey, value: unknown, options?: KvStoreSetOptions): Promise { + return this.inner.set(key, value, options); + } + delete(key: KvKey): Promise { + return this.inner.delete(key); + } + list(prefix?: KvKey): AsyncIterable { + return this.inner.list(prefix); + } + // No `cas`: the fallback branch is reached precisely when `kv.cas == null`. +} + +async function collectKeys(kv: KvStore, prefix: KvKey): Promise { + const keys: KvKey[] = []; + for await (const { key } of kv.list(prefix)) keys.push(key); + return keys; +} + +const TASK_DEDUP_PREFIX: KvKey = ["_fedify", "taskDeduplication"]; +const ACTIVITY_IDEMPOTENCE_PREFIX: KvKey = ["_fedify", "activityIdempotence"]; + +const UUID_RE = + /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i; + +test("enqueueTasks() validation and dispatch", async (t) => { + await t.step("rejects an invalid payload at enqueue", async () => { + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + queue: { task: queue }, + }); + const task = federation.defineTask("strictly-typed", { + schema: numberSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await rejects( + // deno-lint-ignore no-explicit-any + () => ctx.enqueueTask(task, "not a number" as any), + { name: "TypeError", message: /Task data failed schema validation/ }, + ); + strictEqual(queue.enqueued.length, 0); + }); + + await t.step("stamps the message envelope", async () => { + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + queue: { task: queue }, + }); + const task = federation.defineTask("envelope", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTask(task, "payload"); + strictEqual(queue.enqueued.length, 1); + const { message } = queue.enqueued[0]; + strictEqual(message.type, "task"); + strictEqual(message.taskName, "envelope"); + // encodeTaskMessage stamps the context's origin (no trailing slash). + strictEqual(message.baseUrl, "https://example.com"); + strictEqual(message.attempt, 0); + ok(UUID_RE.test(message.id)); + ok(typeof message.data === "string" && message.data.length > 0); + // `started` is a serialized Temporal.Instant. + ok(Temporal.Instant.from(message.started) instanceof Temporal.Instant); + // propagation.inject always populates a (possibly empty) carrier object. + ok( + typeof message.traceContext === "object" && message.traceContext != null, + ); + }); + + await t.step("passes delay and orderingKey through", async () => { + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + queue: { task: queue }, + }); + const task = federation.defineTask("delayed", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTask(task, "payload", { + delay: { seconds: 30 }, + orderingKey: "user:alice", + }); + strictEqual(queue.enqueued.length, 1); + const { message, options } = queue.enqueued[0]; + strictEqual(message.taskName, "delayed"); + strictEqual(message.orderingKey, "user:alice"); + strictEqual(message.attempt, 0); + ok(options?.delay instanceof Temporal.Duration); + strictEqual(options.delay.total("second"), 30); + strictEqual(options.orderingKey, "user:alice"); + }); + + await t.step( + "starts the task worker on first enqueue without startQueue()", + async () => { + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + manuallyStartQueue: false, + queue: { task: queue }, + }); + const task = federation.defineTask("auto-start", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + // An app that only uses the custom task API never sends an activity, + // so enqueueTask() itself must start the worker like the other + // enqueue paths do; otherwise tasks pile up unprocessed forever. + await ctx.enqueueTask(task, "first"); + strictEqual(queue.listenCount, 1); + // The started flag keeps a second enqueue from re-listening. + await ctx.enqueueTask(task, "second"); + strictEqual(queue.listenCount, 1); + strictEqual(queue.enqueued.length, 2); + }, + ); + + await t.step("throws when the resolved task queue is null", async () => { + // No queue is configured at all, so resolveTaskQueue() returns null and + // the enqueue pipeline must fail fast before encoding any payload. + const federation = createFederation({ ...baseOptions }); + const task = federation.defineTask("queueless", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await rejects( + () => ctx.enqueueTask(task, "data"), + { name: "TypeError", message: /No message queue is configured/ }, + ); + }); + + await t.step( + "rejects a handle from another federation at enqueue", + async () => { + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + queue: { task: queue }, + }); + const other = createFederation({ + ...baseOptions, + queue: { task: new MockQueue() }, + }); + const foreignTask = other.defineTask("foreign", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await rejects( + () => ctx.enqueueTask(foreignTask, "data"), + { name: "TypeError", message: /is not defined on this federation/ }, + ); + strictEqual(queue.enqueued.length, 0); + }, + ); + + await t.step( + "rejects a same-named handle from another federation", + async () => { + // Name lookup alone cannot tell a foreign handle apart once both + // instances define the same task name: the local context would + // encode under the *schema carried by the foreign handle*, so a + // payload the local schema rejects would enqueue anyway, only to be + // dropped by the worker decoding under the local schema. Both + // instances share TContextData = void, so the phantom-brand check + // cannot reject this at compile time; the handle-identity guard is + // the only defense. + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + queue: { task: queue }, + }); + let called = 0; + federation.defineTask("rename", { + schema: numberSchema, // the local "rename" takes a number… + handler: () => { + called++; + }, + }); + const other = createFederation({ + ...baseOptions, + queue: { task: new MockQueue() }, + }); + // …while the other instance's "rename" takes a string: + const foreignTask = other.defineTask("rename", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await rejects( + () => ctx.enqueueTask(foreignTask, "not a number"), + { name: "TypeError", message: /is not defined on this federation/ }, + ); + strictEqual(queue.enqueued.length, 0); + strictEqual(called, 0); + }, + ); + + await t.step( + "enqueueTaskMany() uses enqueueMany when available", + async () => { + const queue = new MockQueue({ supportsEnqueueMany: true }); + const federation = createFederation({ + ...baseOptions, + queue: { task: queue }, + }); + const task = federation.defineTask("bulk", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTaskMany(task, ["a", "b", "c"]); + strictEqual(queue.enqueued.length, 0); + strictEqual(queue.enqueuedMany.length, 1); + strictEqual(queue.enqueuedMany[0].messages.length, 3); + }, + ); + + await t.step( + "enqueueTaskMany() falls back to parallel enqueues", + async () => { + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + queue: { task: queue }, + }); + const task = federation.defineTask("bulk-fallback", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTaskMany(task, ["a", "b"]); + strictEqual(queue.enqueued.length, 2); + }, + ); + + await t.step( + "enqueueTaskMany() with no payloads touches no queue", + async () => { + const queue = new MockQueue({ supportsEnqueueMany: true }); + const federation = createFederation({ + ...baseOptions, + queue: { task: queue }, + }); + const task = federation.defineTask("bulk-empty", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTaskMany(task, []); + strictEqual(queue.enqueued.length, 0); + strictEqual(queue.enqueuedMany.length, 0); + }, + ); +}); + +test("task deduplication", async (t) => { + await t.step( + "forwards the key to a nativeDeduplication queue without writing KV", + async () => { + const queue = new MockQueue({ nativeDeduplication: true }); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("native-dedup", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTask(task, "payload", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 1); + strictEqual(queue.enqueued[0].options?.deduplicationKey, "k"); + // The backend owns the check, so Fedify must not write any KV marker. + strictEqual((await collectKeys(kv, TASK_DEDUP_PREFIX)).length, 0); + }, + ); + + await t.step( + "skips a second enqueue with the same key within the TTL", + async () => { + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + kv: new MemoryKvStore(), + queue: { task: queue }, + }); + const task = federation.defineTask("kv-dedup", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTask(task, "first", { deduplicationKey: "k" }); + await ctx.enqueueTask(task, "second", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 1); + strictEqual(queue.enqueued[0].message.taskName, "kv-dedup"); + // A non-native queue never receives a key it would ignore. + strictEqual(queue.enqueued[0].options?.deduplicationKey, undefined); + }, + ); + + await t.step( + "re-enqueues with the same key after the TTL expires", + async () => { + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + kv: new MemoryKvStore(), + queue: { task: queue }, + taskDeduplicationTtl: { milliseconds: 100 }, + }); + const task = federation.defineTask("kv-dedup-ttl", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTask(task, "first", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 1); + // Wait comfortably past the 100 ms TTL so the marker expires. + await delay(300); + await ctx.enqueueTask(task, "second", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 2); + }, + ); + + await t.step( + 'rejects with TypeError when fallback is "closed" and no cas exists', + async () => { + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + kv: new CaslessKvStore(), + queue: { task: queue }, + taskDeduplicationFallback: "closed", + }); + const task = federation.defineTask("closed-fallback", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await rejects( + () => ctx.enqueueTask(task, "payload", { deduplicationKey: "k" }), + { name: "TypeError" }, + ); + strictEqual(queue.enqueued.length, 0); + }, + ); + + await t.step( + 'proceeds when fallback is "open" and no cas exists', + async () => { + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + kv: new CaslessKvStore(), + queue: { task: queue }, + taskDeduplicationFallback: "open", + }); + const task = federation.defineTask("open-fallback", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTask(task, "payload", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 1); + // Best-effort fallback never forwards the key to a non-native queue. + strictEqual(queue.enqueued[0].options?.deduplicationKey, undefined); + }, + ); + + await t.step( + "writes only under taskDeduplication, never activityIdempotence", + async () => { + const queue = new MockQueue(); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("prefix-isolation", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTask(task, "payload", { deduplicationKey: "k" }); + strictEqual((await collectKeys(kv, TASK_DEDUP_PREFIX)).length, 1); + strictEqual( + (await collectKeys(kv, ACTIVITY_IDEMPOTENCE_PREFIX)).length, + 0, + ); + }, + ); + + await t.step("applies one batch-level key to enqueueTaskMany", async () => { + const queue = new MockQueue({ supportsEnqueueMany: true }); + const federation = createFederation({ + ...baseOptions, + kv: new MemoryKvStore(), + queue: { task: queue }, + }); + const task = federation.defineTask("batch-dedup", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTaskMany(task, ["a", "b", "c"], { + deduplicationKey: "batch", + }); + await ctx.enqueueTaskMany(task, ["a", "b", "c"], { + deduplicationKey: "batch", + }); + // First batch enqueues all three; the second is skipped entirely. + strictEqual(queue.enqueuedMany.length, 1); + strictEqual(queue.enqueuedMany[0].messages.length, 3); + }); +}); + +test( + "task deduplication validates every payload before reserving the key", + async () => { + const queue = new MockQueue(); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("dedup-validation", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + // A rejected payload must neither enqueue nor consume the key. + await rejects(() => + ctx.enqueueTask(task, 123 as unknown as string, { + deduplicationKey: "k", + }) + ); + strictEqual(queue.enqueued.length, 0); + deepStrictEqual(await collectKeys(kv, TASK_DEDUP_PREFIX), []); + + // The same key must remain usable by the first valid enqueue. + await ctx.enqueueTask(task, "valid", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 1); + deepStrictEqual( + await collectKeys(kv, TASK_DEDUP_PREFIX), + [[...TASK_DEDUP_PREFIX, "k"]], + ); + + // Once the valid enqueue reserves it, the same key must deduplicate. + await ctx.enqueueTask(task, "duplicate", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 1); + }, +); + +test( + "native task batch deduplication is one enqueueMany operation per call", + async () => { + class NativeBatchDeduplicatingQueue implements MessageQueue { + readonly nativeDeduplication = true; + readonly #seen = new Set(); + readonly attempts: { + messages: readonly TaskMessage[]; + options?: MessageQueueEnqueueOptions; + }[] = []; + readonly accepted: { + messages: readonly TaskMessage[]; + options?: MessageQueueEnqueueOptions; + }[] = []; + + enqueue(): Promise { + throw new Error("A multi-item native batch must use enqueueMany()."); + } + + enqueueMany( + messages: readonly TaskMessage[], + options?: MessageQueueEnqueueOptions, + ): Promise { + const key = options?.deduplicationKey; + if (key == null) { + throw new TypeError( + "Native batch enqueue requires a deduplication key.", + ); + } + this.attempts.push({ messages, options }); + if (this.#seen.has(key)) return Promise.resolve(); + this.#seen.add(key); + this.accepted.push({ messages, options }); + return Promise.resolve(); + } + + listen(): Promise { + return Promise.resolve(); + } + } + + const queue = new NativeBatchDeduplicatingQueue(); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("native-batch-dedup", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + await ctx.enqueueTaskMany(task, ["a1", "a2", "a3"], { + deduplicationKey: "batch-a", + }); + await ctx.enqueueTaskMany( + task, + ["duplicate1", "duplicate2", "duplicate3"], + { + deduplicationKey: "batch-a", + }, + ); + await ctx.enqueueTaskMany(task, ["b1", "b2", "b3"], { + deduplicationKey: "batch-b", + }); + + // Every API call reaches the backend exactly once, with one key governing + // all three messages. The backend accepts complete batches or none. + strictEqual(queue.attempts.length, 3); + deepStrictEqual( + queue.attempts.map(({ messages }) => messages.length), + [3, 3, 3], + ); + deepStrictEqual( + queue.attempts.map(({ options }) => options?.deduplicationKey), + ["batch-a", "batch-a", "batch-b"], + ); + strictEqual(queue.accepted.length, 2); + deepStrictEqual( + queue.accepted.map(({ messages }) => messages.length), + [3, 3], + ); + deepStrictEqual( + queue.accepted.map(({ options }) => options?.deduplicationKey), + ["batch-a", "batch-b"], + ); + deepStrictEqual(await collectKeys(kv, TASK_DEDUP_PREFIX), []); + }, +); + +test( + "native task batch deduplication rejects without enqueueMany", + async () => { + const queue = new MockQueue({ nativeDeduplication: true }); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("native-batch-without-enqueue-many", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + await rejects( + () => + ctx.enqueueTaskMany(task, ["a", "b", "c"], { + deduplicationKey: "batch", + }), + { name: "TypeError", message: /enqueueMany/ }, + ); + + // Reject before any partial enqueue or fallback KV write. Silently + // dropping the key from items 2..n cannot satisfy these assertions. + strictEqual(queue.enqueued.length, 0); + strictEqual(queue.enqueuedMany.length, 0); + deepStrictEqual(await collectKeys(kv, TASK_DEDUP_PREFIX), []); + + // A one-item batch is representable by enqueue() and must remain valid. + await ctx.enqueueTaskMany(task, ["single"], { + deduplicationKey: "single", + }); + strictEqual(queue.enqueued.length, 1); + strictEqual(queue.enqueued[0].options?.deduplicationKey, "single"); + }, +); + +test( + "deduplication - native batch capability errors precede payload validation", + async () => { + let validationCalls = 0; + const schema = makeSchema((data): data is string => { + validationCalls++; + return typeof data === "string"; + }); + const queue = new MockQueue({ nativeDeduplication: true }); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("native-batch-capability-order", { + schema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + let caught: unknown; + try { + await ctx.enqueueTaskMany( + task, + [1, 2, 3] as unknown as readonly string[], + { deduplicationKey: "batch" }, + ); + } catch (error) { + caught = error; + } + + // The queue capability makes this request impossible regardless of the + // payload, so no user-supplied validator may run first. + strictEqual(validationCalls, 0); + ok(caught instanceof TypeError); + ok(caught.message.includes("enqueueMany")); + strictEqual(queue.enqueued.length, 0); + strictEqual(queue.enqueuedMany.length, 0); + deepStrictEqual(await collectKeys(kv, TASK_DEDUP_PREFIX), []); + }, +); + +test( + "closed deduplication fallback errors precede payload validation", + async () => { + let validationCalls = 0; + const schema = makeSchema((data): data is string => { + validationCalls++; + return typeof data === "string"; + }); + const queue = new MockQueue(); + const kv = new CaslessKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + taskDeduplicationFallback: "closed", + }); + const task = federation.defineTask("closed-fallback-order", { + schema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + let caught: unknown; + try { + await ctx.enqueueTask( + task, + 1 as unknown as string, + { deduplicationKey: "k" }, + ); + } catch (error) { + caught = error; + } + + // Closed fallback is a configuration-level rejection. It must be + // deterministic and independent of user payload validation. + strictEqual(validationCalls, 0); + ok(caught instanceof TypeError); + ok(caught.message.includes("conditional write")); + strictEqual(queue.enqueued.length, 0); + strictEqual(queue.enqueuedMany.length, 0); + deepStrictEqual(await collectKeys(kv, TASK_DEDUP_PREFIX), []); + }, +); + +/** + * A {@link MessageQueue} that fails its first enqueue—single or batch—with a + * transient error, then records every later enqueue. One class covers both the + * `enqueue()` and `enqueueMany()` rollback paths; each test instantiates its own + * copy, so the one-shot `#failNext` flag never leaks between them. + */ +class FlakyQueue implements MessageQueue { + readonly nativeDeduplication = false; + #failNext = true; + readonly enqueued: TaskMessage[] = []; + readonly enqueuedMany: TaskMessage[][] = []; + + #failOnce(): boolean { + if (!this.#failNext) return false; + this.#failNext = false; + return true; + } + + enqueue(message: TaskMessage): Promise { + if (this.#failOnce()) { + return Promise.reject(new Error("transient backend failure")); + } + this.enqueued.push(message); + return Promise.resolve(); + } + + enqueueMany(messages: readonly TaskMessage[]): Promise { + if (this.#failOnce()) { + return Promise.reject(new Error("transient backend failure")); + } + this.enqueuedMany.push([...messages]); + return Promise.resolve(); + } + + listen(): Promise { + return Promise.resolve(); + } +} + +test( + "a failed enqueue rolls back its dedup marker so the retry is not dropped", + async () => { + const queue = new FlakyQueue(); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("flaky-enqueue", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + const markerKey: KvKey = [...TASK_DEDUP_PREFIX, "k"]; + + // First enqueue: the marker is claimed, then dispatch rejects. + await rejects( + () => ctx.enqueueTask(task, "first", { deduplicationKey: "k" }), + { message: /transient backend failure/ }, + ); + strictEqual(queue.enqueued.length, 0); + strictEqual(await kv.get(markerKey), undefined); + + // The retry (queue healthy again) must enqueue the task, not be dropped. + await ctx.enqueueTask(task, "first-retry", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 1); + + // A successful retry must keep its marker so later duplicates are dropped. + ok(await kv.get(markerKey) != null); + await ctx.enqueueTask(task, "duplicate", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 1); + }, +); + +test( + "a failed batch enqueue rolls back its dedup marker so the retry is not " + + "dropped", + async () => { + const queue = new FlakyQueue(); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("flaky-batch-enqueue", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + const markerKey: KvKey = [...TASK_DEDUP_PREFIX, "batch"]; + + await rejects( + () => + ctx.enqueueTaskMany(task, ["first", "second"], { + deduplicationKey: "batch", + }), + { message: /transient backend failure/ }, + ); + strictEqual(queue.enqueuedMany.length, 0); + // Asserted via get() for the same reason as the single-item rollback test + // above (MemoryKvStore.cas leaves a `value: undefined` entry). + strictEqual(await kv.get(markerKey), undefined); + + await ctx.enqueueTaskMany(task, ["first-retry", "second-retry"], { + deduplicationKey: "batch", + }); + strictEqual(queue.enqueuedMany.length, 1); + strictEqual(queue.enqueuedMany[0].length, 2); + ok(await kv.get(markerKey) != null); + + await ctx.enqueueTaskMany(task, ["duplicate-first", "duplicate-second"], { + deduplicationKey: "batch", + }); + strictEqual(queue.enqueuedMany.length, 1); + }, +); + +test( + "a stale rollback does not clear a marker another enqueue re-claimed", + async () => { + const kv = new MemoryKvStore(); + const markerKey: KvKey = [...TASK_DEDUP_PREFIX, "k"]; + let signalFirstEntered!: () => void; + const firstEntered = new Promise((resolve) => { + signalFirstEntered = resolve; + }); + let releaseFirst!: () => void; + const firstReleased = new Promise((resolve) => { + releaseFirst = resolve; + }); + class BlockingThenFailingQueue implements MessageQueue { + readonly nativeDeduplication = false; + #calls = 0; + async enqueue(): Promise { + this.#calls++; + if (this.#calls === 1) { + signalFirstEntered(); + await firstReleased; + throw new Error("transient backend failure"); + } + } + listen(): Promise { + return Promise.resolve(); + } + } + const queue = new BlockingThenFailingQueue(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + taskDeduplicationTtl: { milliseconds: 1 }, + }); + const task = federation.defineTask("stale-rollback", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + const first = ctx.enqueueTask(task, "first", { deduplicationKey: "k" }); + await firstEntered; + await delay(20); + await ctx.enqueueTask(task, "second", { deduplicationKey: "k" }); + const secondToken = await kv.get(markerKey); + ok(secondToken != null); + releaseFirst(); + await rejects(() => first, { message: /transient backend failure/ }); + strictEqual(await kv.get(markerKey), secondToken); + }, +); + +test( + "a multi-item batch dedup without enqueueMany is rejected on the cas path", + async () => { + const queue = new MockQueue(); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("cas-batch-without-enqueue-many", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + await rejects( + () => + ctx.enqueueTaskMany(task, ["a", "b", "c"], { + deduplicationKey: "batch", + }), + { name: "TypeError", message: /enqueueMany/ }, + ); + strictEqual(queue.enqueued.length, 0); + strictEqual(queue.enqueuedMany.length, 0); + deepStrictEqual(await collectKeys(kv, TASK_DEDUP_PREFIX), []); + + await ctx.enqueueTaskMany(task, ["single"], { deduplicationKey: "single" }); + strictEqual(queue.enqueued.length, 1); + }, +); + +test( + "a failed rollback is swallowed; the original enqueue error reaches the caller", + async () => { + class ClearFailingKvStore implements KvStore { + readonly inner = new MemoryKvStore(); + clearAttempts = 0; + get(key: KvKey): Promise { + return this.inner.get(key); + } + set( + key: KvKey, + value: unknown, + options?: KvStoreSetOptions, + ): Promise { + return this.inner.set(key, value, options); + } + delete(key: KvKey): Promise { + return this.inner.delete(key); + } + list(prefix?: KvKey): AsyncIterable { + return this.inner.list(prefix); + } + cas( + key: KvKey, + expectedValue: unknown, + newValue: unknown, + options?: KvStoreSetOptions, + ): Promise { + if (newValue === undefined) { + this.clearAttempts++; + return Promise.reject(new Error("kv cas clear failed")); + } + return this.inner.cas(key, expectedValue, newValue, options); + } + } + + const queue = new FlakyQueue(); + const kv = new ClearFailingKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("rollback-failure", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + await rejects( + () => ctx.enqueueTask(task, "first", { deduplicationKey: "k" }), + { message: /transient backend failure/ }, + ); + strictEqual(queue.enqueued.length, 0); + strictEqual(kv.clearAttempts, 1); + }, +); + +/** + * A native-deduplication backend that drops repeat-key single enqueues and + * does **not** implement `enqueueMany`. Wrapping it in + * {@link ParallelMessageQueue} used to fan a batch out to one `enqueue()` per + * message, all carrying the same `deduplicationKey`, so the backend collapsed + * the whole batch onto its first message. + */ +class NativeDedupNoBulkQueue implements MessageQueue { + readonly nativeDeduplication = true; + readonly #seen = new Set(); + readonly enqueued: { + message: TaskMessage; + options?: MessageQueueEnqueueOptions; + }[] = []; + + enqueue( + message: TaskMessage, + options?: MessageQueueEnqueueOptions, + ): Promise { + const key = options?.deduplicationKey; + if (key != null) { + if (this.#seen.has(key)) return Promise.resolve(); + this.#seen.add(key); + } + this.enqueued.push({ message, options }); + return Promise.resolve(); + } + + listen(): Promise { + return Promise.resolve(); + } +} + +test( + "a deduplicated batch over a ParallelMessageQueue wrapping a native, " + + "no-enqueueMany backend is rejected, not collapsed", + async () => { + const backend = new NativeDedupNoBulkQueue(); + const queue = new ParallelMessageQueue(backend, 5); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("parallel-native-no-bulk", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + // The wrapper cannot enqueue the batch atomically under one key, so the + // multi-item batch must be rejected rather than silently collapsed to one. + await rejects( + () => + ctx.enqueueTaskMany(task, ["a", "b", "c"], { + deduplicationKey: "batch", + }), + { name: "TypeError", message: /enqueueMany/ }, + ); + strictEqual(backend.enqueued.length, 0); + // A native plan never touches KV, even when it rejects. + deepStrictEqual(await collectKeys(kv, TASK_DEDUP_PREFIX), []); + + // A single-item batch needs no bulk path, so the key is still forwarded. + await ctx.enqueueTaskMany(task, ["solo"], { deduplicationKey: "solo" }); + strictEqual(backend.enqueued.length, 1); + strictEqual(backend.enqueued[0].options?.deduplicationKey, "solo"); + }, +); + +test( + "a deduplicated batch over a ParallelMessageQueue wrapping a native " + + "enqueueMany backend forwards the key atomically", + async () => { + class NativeBatchQueue implements MessageQueue { + readonly nativeDeduplication = true; + readonly #seen = new Set(); + readonly batches: { + messages: readonly TaskMessage[]; + options?: MessageQueueEnqueueOptions; + }[] = []; + + enqueue(): Promise { + throw new Error("A multi-item native batch must use enqueueMany()."); + } + + enqueueMany( + messages: readonly TaskMessage[], + options?: MessageQueueEnqueueOptions, + ): Promise { + const key = options?.deduplicationKey; + if (key != null && this.#seen.has(key)) return Promise.resolve(); + if (key != null) this.#seen.add(key); + this.batches.push({ messages, options }); + return Promise.resolve(); + } + + listen(): Promise { + return Promise.resolve(); + } + } + + const backend = new NativeBatchQueue(); + const queue = new ParallelMessageQueue(backend, 5); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("parallel-native-bulk", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + await ctx.enqueueTaskMany(task, ["a", "b", "c"], { + deduplicationKey: "batch", + }); + // The duplicate batch is dropped by the backend's native check. + await ctx.enqueueTaskMany(task, ["x", "y", "z"], { + deduplicationKey: "batch", + }); + + strictEqual(backend.batches.length, 1); + strictEqual(backend.batches[0].messages.length, 3); + strictEqual(backend.batches[0].options?.deduplicationKey, "batch"); + // The native path never writes KV, even through the wrapper. + deepStrictEqual(await collectKeys(kv, TASK_DEDUP_PREFIX), []); + }, +); + +test( + 'an "open" fallback fans out a multi-item batch without enqueueMany ' + + "instead of rejecting it", + async () => { + const queue = new MockQueue(); // no enqueueMany, not native + const kv = new CaslessKvStore(); // no cas + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + taskDeduplicationFallback: "open", + }); + const task = federation.defineTask("open-batch-fanout", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + // With neither native dedup nor cas under "open", the batch proceeds by + // fanning out every item; it must not throw the enqueueMany requirement. + await ctx.enqueueTaskMany(task, ["a", "b", "c"], { + deduplicationKey: "batch", + }); + strictEqual(queue.enqueued.length, 3); + for (const { options } of queue.enqueued) { + strictEqual(options?.deduplicationKey, undefined); + } + // The open path records nothing in the key–value store. + deepStrictEqual(await collectKeys(kv.inner, TASK_DEDUP_PREFIX), []); + }, +); + +test( + 'an "open" fallback logs a debug record when it ignores the key', + async () => { + const records: LogRecord[] = []; + await reset(); + try { + await configure({ + sinks: { + buffer(record: LogRecord): void { + records.push(record); + }, + }, + filters: {}, + loggers: [ + { category: [], lowestLevel: "debug", sinks: ["buffer"] }, + ], + }); + + const queue = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + kv: new CaslessKvStore(), + queue: { task: queue }, + taskDeduplicationFallback: "open", + }); + const task = federation.defineTask("open-debug-log", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTask(task, "payload", { deduplicationKey: "k" }); + + const matched = records.filter((record) => + record.level === "debug" && + record.properties.deduplicationKey === "k" && + record.properties.taskName === "open-debug-log" + ); + strictEqual(matched.length, 1); + } finally { + await reset(); + } + }, +); + +test( + "two concurrent enqueues sharing a key: exactly one wins the cas claim", + async () => { + let signalEntered!: () => void; + const entered = new Promise((resolve) => { + signalEntered = resolve; + }); + let release!: () => void; + const released = new Promise((resolve) => { + release = resolve; + }); + class BlockingQueue implements MessageQueue { + readonly nativeDeduplication = false; + readonly enqueued: TaskMessage[] = []; + #first = true; + async enqueue(message: TaskMessage): Promise { + if (this.#first) { + this.#first = false; + signalEntered(); + await released; + } + this.enqueued.push(message); + } + listen(): Promise { + return Promise.resolve(); + } + } + const queue = new BlockingQueue(); + const kv = new MemoryKvStore(); + const federation = createFederation({ + ...baseOptions, + kv, + queue: { task: queue }, + }); + const task = federation.defineTask("concurrent-claim", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + + // The first enqueue claims the marker, then blocks inside the queue. + const first = ctx.enqueueTask(task, "first", { deduplicationKey: "k" }); + await entered; + // With the first still in flight, the second must lose the cas claim and + // skip the queue entirely. + await ctx.enqueueTask(task, "second", { deduplicationKey: "k" }); + release(); + await first; + strictEqual(queue.enqueued.length, 1); + + // The winner kept its marker, so a later duplicate is still dropped. + await ctx.enqueueTask(task, "third", { deduplicationKey: "k" }); + strictEqual(queue.enqueued.length, 1); + }, +); + +test( + "a native enqueue forwards orderingKey and deduplicationKey together", + async () => { + const queue = new MockQueue({ nativeDeduplication: true }); + const federation = createFederation({ + ...baseOptions, + kv: new MemoryKvStore(), + queue: { task: queue }, + }); + const task = federation.defineTask("native-both-keys", { + schema: stringSchema, + handler: () => {}, + }); + const ctx = federation.createContext( + new URL("https://example.com/"), + undefined, + ); + await ctx.enqueueTask(task, "payload", { + orderingKey: "user:alice", + deduplicationKey: "dedup:alice", + }); + strictEqual(queue.enqueued.length, 1); + const { message, options } = queue.enqueued[0]; + strictEqual(message.orderingKey, "user:alice"); + strictEqual(options?.orderingKey, "user:alice"); + strictEqual(options?.deduplicationKey, "dedup:alice"); + }, +); diff --git a/packages/fedify/src/federation/tasks/enqueue.ts b/packages/fedify/src/federation/tasks/enqueue.ts new file mode 100644 index 000000000..6500301af --- /dev/null +++ b/packages/fedify/src/federation/tasks/enqueue.ts @@ -0,0 +1,294 @@ +/** + * The enqueue pipeline for custom background tasks. `ContextImpl.enqueueTask` + * and `ContextImpl.enqueueTaskMany` delegate to {@link enqueueTasks} so the + * handle validation, deduplication planning, payload encoding, and queue + * dispatch live in one cohesive place instead of one oversized method. + * + * @module + */ +import { getLogger } from "@logtape/logtape"; +import { context, propagation } from "@opentelemetry/api"; +import type { KvKey } from "../kv.ts"; +import type { FederationImpl } from "../middleware.ts"; +import type { MessageQueue } from "../mq.ts"; +import type { TaskMessage } from "../queue.ts"; +import type TaskCodec from "./codec.ts"; +import type { TaskDefinition, TaskEnqueueOptions } from "./task.ts"; + +/** + * The slice of an enqueueing {@link Context} that {@link enqueueTasks} needs: + * its federation plus the few values that are the context's own. `ContextImpl` + * assembles it from itself, so the enqueue pipeline stays out of that class. + * @template TContextData The context data to pass to the {@link Context}. + * @internal + */ +interface EnqueueTasksContext { + /** + * The federation that owns the task registry, queue resolution and start, + * the key-value store, and the deduplication configuration. The public + * {@link Federation} interface exposes none of these, so the concrete + * {@link FederationImpl} is required. + */ + readonly federation: FederationImpl; + + /** The codec, bound to this context's loaders, that encodes payloads. */ + readonly codec: TaskCodec; + + /** The context's origin, stamped onto each message as its `baseUrl`. */ + readonly origin: string; + + /** The context data handed to the queue worker when it auto-starts. */ + readonly data: TContextData; +} + +/** + * Validates the task handle, plans deduplication, encodes every payload, then + * dispatches the resulting messages to the queue. A single item flows through + * the same pipeline as a batch, so {@link Context.enqueueTask} and + * {@link Context.enqueueTaskMany} share one implementation. + * @template TContextData The context data to pass to the {@link Context}. + * @template TData The type of the task payload, inferred from the task's schema. + * @param ctx The enqueueing dependencies assembled by `ContextImpl`. + * @param task The handle returned by `defineTask()`. + * @param items The payloads to enqueue, in order. + * @param options The enqueue options governing delay, ordering, and dedup. + * @internal + */ +const enqueueTasks = ( + ctx: EnqueueTasksContext, +) => + async function ( + task: TaskDefinition, + items: readonly TData[], + options: TaskEnqueueOptions, + ): Promise { + const def = ctx.federation.taskDefinitions.get(task.name); + if (def == null || def.handle !== task) { + throw new TypeError( + `Task ${ + JSON.stringify(task.name) + } is not defined on this federation; ` + + "pass a handle returned by its defineTask().", + ); + } + const queue = ctx.federation.resolveTaskQueue(task.name); + if (queue == null) { + throw new TypeError( + "No message queue is configured for tasks; pass `queue` to " + + "createFederation() or to defineTask().", + ); + } + if (items.length < 1) return; + const plan = planDeduplication( + ctx, + queue, + task.name, + options, + items.length, + ); + const messages: TaskMessage[] = await Promise.all( + items.map(encodeTaskMessage(ctx.codec, ctx.origin, task, options)), + ); + const claim = await claimDeduplication(ctx, plan, task.name); + if (!claim.proceed) return; + if (!ctx.federation.manuallyStartQueue) { + ctx.federation._startQueueInternal(ctx.data); + } + try { + await dispatch(queue, messages, { + delay: getDurationIfDefined(options.delay), + orderingKey: options.orderingKey, + deduplicationKey: claim.forwardedDeduplicationKey, + }); + } catch (error) { + if (claim.rollback != null) { + try { + await claim.rollback(); + } catch (rollbackError) { + logger.warn( + "Failed to roll back the deduplication marker for task " + + "{taskName} after a failed enqueue; it will expire by TTL. " + + "{rollbackError}", + { taskName: task.name, rollbackError }, + ); + } + } + throw error; + } + }; + +export default enqueueTasks; + +const getDurationIfDefined = (item: Temporal.DurationLike | undefined) => + item == null ? undefined : Temporal.Duration.from(item); + +/** + * The deduplication strategy chosen for an enqueue, settled before any payload + * is encoded so the fail-fast errors surface first. + */ +type DedupPlan = + | { readonly kind: "none" } + | { readonly kind: "native"; readonly key: string } + | { readonly kind: "cas"; readonly key: string } + | { readonly kind: "open"; readonly key: string }; + +/** + * Decides how a `deduplicationKey` (if any) is honored: forwarded to a native + * queue, claimed via `cas`, or—when neither is available—dropped or rejected + * per the federation's `taskDeduplicationFallback`. Throws the fail-fast + * `TypeError`s so they precede the encode. + */ +function planDeduplication( + ctx: EnqueueTasksContext, + queue: MessageQueue, + taskName: string, + options: TaskEnqueueOptions, + itemCount: number, +): DedupPlan { + if (options.deduplicationKey == null) return { kind: "none" }; + const key = options.deduplicationKey; + const native = queue.nativeDeduplication === true; + const canCas = ctx.federation.kv.cas != null; + if (itemCount > 1 && queue.enqueueMany == null && (native || canCas)) { + throw new TypeError( + `Task ${ + JSON.stringify(taskName) + } was enqueued as a batch with a deduplicationKey, but its message ` + + "queue does not implement enqueueMany; a multi-item batch cannot be " + + "deduplicated atomically without it. Implement enqueueMany on the " + + "queue, or enqueue the tasks individually with enqueueTask().", + ); + } + if (native) return { kind: "native", key }; + if (canCas) return { kind: "cas", key }; + if (ctx.federation.taskDeduplicationFallback === "closed") { + // No conditional write, closed: fail fast before any side effect. + throw new TypeError( + "deduplicationKey was set but the message queue does not declare " + + "nativeDeduplication and the key-value store exposes no " + + 'conditional write (cas); set taskDeduplicationFallback to "open" ' + + "to proceed without deduplication, or use a backend that " + + "supports it.", + ); + } + return { kind: "open", key }; +} + +/** + * Executes the planned deduplication once the payloads are encoded. A native + * plan forwards its key to the queue; a `cas` plan claims the marker and stops + * the enqueue when it loses the race; an `open` plan logs and proceeds. + * @returns Whether to proceed, and the key (if any) to forward to the queue. + */ +async function claimDeduplication( + ctx: EnqueueTasksContext, + plan: DedupPlan, + taskName: string, +): Promise<{ + proceed: boolean; + forwardedDeduplicationKey?: string; + /** + * Undoes a reserved marker when dispatch fails. Present only for a `cas` + * plan that won its claim; a failed enqueue calls it so the retry is not + * deduplicated against a task that never reached the queue. + */ + rollback?: () => Promise; +}> { + switch (plan.kind) { + case "native": + return { proceed: true, forwardedDeduplicationKey: plan.key }; + case "cas": { + const cacheKey = [ + ...ctx.federation.kvPrefixes.taskDeduplication, + plan.key, + ] satisfies KvKey; + const token = crypto.randomUUID(); + const won = await ctx.federation.kv.cas!(cacheKey, undefined, token, { + ttl: ctx.federation.taskDeduplicationTtl, + }); + if (!won) return { proceed: false }; + return { + proceed: true, + // Conditional clear: cas succeeds only while the stored value is still + // our token, so we never delete a marker another enqueue now owns. + rollback: async () => { + await ctx.federation.kv.cas!(cacheKey, token, undefined); + }, + }; + } + case "open": + logger.debug( + "deduplicationKey {deduplicationKey} for task {taskName} ignored: " + + "the message queue declares no nativeDeduplication and the " + + "key-value store has no cas; proceeding (taskDeduplicationFallback " + + 'is "open").', + { deduplicationKey: plan.key, taskName }, + ); /* falls through */ + case "none": + return { proceed: true }; + default: { + const _exhaustive: never = plan; + throw new TypeError( + `Unknown deduplication plan: ${JSON.stringify(_exhaustive)}`, + ); + } + } +} + +/** + * Sends the encoded messages to the queue, picking the bulk path when the + * queue implements `enqueueMany` and otherwise fanning out parallel single + * enqueues. The fan-out drops `deduplicationKey`, which is only ever set for a + * native plan that the bulk paths already cover. + */ +async function dispatch( + queue: MessageQueue, + messages: readonly TaskMessage[], + options: { + delay?: Temporal.Duration; + orderingKey?: string; + deduplicationKey?: string; + }, +): Promise { + if (messages.length === 1) { + await queue.enqueue(messages[0], options); + } else if (queue.enqueueMany != null) { + await queue.enqueueMany(messages, options); + } else { + const fanoutOptions = { + delay: options.delay, + orderingKey: options.orderingKey, + }; + await Promise.all(messages.map((m) => queue.enqueue(m, fanoutOptions))); + } +} + +/** + * Builds the per-payload encoder: validates and serializes the payload, then + * stamps the message envelope with a fresh id, the context's origin, and the + * active trace context. Curried so the batch encode reuses one bound encoder. + */ +const encodeTaskMessage = ( + codec: TaskCodec, + origin: string, + task: TaskDefinition, + options: TaskEnqueueOptions, +) => +async (data: TData): Promise => { + const encoded = await codec.encode(task.schema, data); + const carrier: Record = {}; + propagation.inject(context.active(), carrier); + return { + type: "task", + id: crypto.randomUUID(), + baseUrl: origin, + taskName: task.name, + data: encoded, + started: Temporal.Now.instant().toString(), + attempt: 0, + orderingKey: options.orderingKey, + traceContext: carrier, + }; +}; + +const logger = getLogger(["fedify", "federation", "task"]); diff --git a/packages/fedify/src/federation/tasks/mod.ts b/packages/fedify/src/federation/tasks/mod.ts index e151fa576..62072ac45 100644 --- a/packages/fedify/src/federation/tasks/mod.ts +++ b/packages/fedify/src/federation/tasks/mod.ts @@ -7,6 +7,7 @@ * @module */ export { default as TaskCodec } from "./codec.ts"; +export { default as enqueueTasks } from "./enqueue.ts"; export type { TaskDefinition, TaskDefinitionInternal, diff --git a/packages/fedify/src/federation/tasks/task.ts b/packages/fedify/src/federation/tasks/task.ts index 0105e8e21..a095e713d 100644 --- a/packages/fedify/src/federation/tasks/task.ts +++ b/packages/fedify/src/federation/tasks/task.ts @@ -159,6 +159,26 @@ export interface TaskEnqueueOptions { * processed sequentially (one at a time). */ readonly orderingKey?: string; + + /** + * An optional key requesting at-most-once enqueue for tasks that share it. + * + * A queue with {@link MessageQueue.nativeDeduplication} `true` enforces it + * strictly; otherwise deduplication is best-effort via {@link KvStore.cas}, + * and {@link FederationOptions.taskDeduplicationFallback} decides whether a + * missing `cas` proceeds without deduplication or throws. + * + * For {@link Context.enqueueTaskMany}, one key governs the whole batch. When + * deduplication is actually applied—a native queue, or the key–value + * fallback through {@link KvStore.cas}—a multi-item batch with a + * `deduplicationKey` requires the queue to implement + * {@link MessageQueue.enqueueMany} so it enqueues atomically, or the call + * throws a `TypeError`. Under the `"open"` fallback with no `cas`, no marker + * is taken, so such a batch instead fans out without deduplication. + * + * @since 2.x.x + */ + readonly deduplicationKey?: string; } /** diff --git a/packages/fedify/src/federation/tasks/tasks.test.ts b/packages/fedify/src/federation/tasks/tasks.test.ts index 6ac05255a..c42c7bb94 100644 --- a/packages/fedify/src/federation/tasks/tasks.test.ts +++ b/packages/fedify/src/federation/tasks/tasks.test.ts @@ -8,31 +8,35 @@ import { strictEqual, throws, } from "node:assert/strict"; +import { + baseOptions, + makeSchema, + MockQueue, + numberSchema, + stringSchema, +} from "../../testing/mod.ts"; import { createFederationBuilder } from "../builder.ts"; import type { Context } from "../context.ts"; -import type { Federatable, FederationOptions } from "../federation.ts"; -import { MemoryKvStore } from "../kv.ts"; +import type { Federatable } from "../federation.ts"; import { createFederation, type FederationImpl } from "../middleware.ts"; import { InProcessMessageQueue } from "../mq.ts"; import type { TaskMessage } from "../queue.ts"; import TaskCodec from "./codec.ts"; import type { TaskDefinition, TaskRegistry } from "./task.ts"; -import { - type Envelope, - envelopeSchema, - MockQueue, - numberSchema, - stringSchema, -} from "../../testing/mod.ts"; type Assert = T; -const baseOptions: Omit, "queue"> = { - kv: new MemoryKvStore(), - documentLoaderFactory: () => mockDocumentLoader, - contextLoaderFactory: () => mockDocumentLoader, - manuallyStartQueue: true, -}; +interface Envelope { + note: Note; + title: string; +} + +const envelopeSchema = makeSchema( + (data): data is Envelope => + typeof data === "object" && data != null && + (data as Envelope).note instanceof Note && + typeof (data as Envelope).title === "string", +); const makeTaskMessage = async ( taskName: string, @@ -211,224 +215,6 @@ test("Context.enqueueTask() end-to-end", async (t) => { strictEqual(data.title, "greeting"); strictEqual(handlerCtx.origin, "https://example.com"); }); - - await t.step("rejects an invalid payload at enqueue", async () => { - const queue = new MockQueue(); - const federation = createFederation({ - ...baseOptions, - queue: { task: queue }, - }); - const task = federation.defineTask("strictly-typed", { - schema: numberSchema, - handler: () => {}, - }); - const ctx = federation.createContext( - new URL("https://example.com/"), - undefined, - ); - await rejects( - // deno-lint-ignore no-explicit-any - () => ctx.enqueueTask(task, "not a number" as any), - { name: "TypeError", message: /Task data failed schema validation/ }, - ); - strictEqual(queue.enqueued.length, 0); - }); - - await t.step( - "starts the task worker on first enqueue without startQueue()", - async () => { - const queue = new MockQueue(); - const federation = createFederation({ - ...baseOptions, - manuallyStartQueue: false, - queue: { task: queue }, - }); - const task = federation.defineTask("auto-start", { - schema: stringSchema, - handler: () => {}, - }); - const ctx = federation.createContext( - new URL("https://example.com/"), - undefined, - ); - // An app that only uses the custom task API never sends an activity, - // so enqueueTask() itself must start the worker like the other - // enqueue paths do; otherwise tasks pile up unprocessed forever. - await ctx.enqueueTask(task, "first"); - strictEqual(queue.listenCount, 1); - // The started flag keeps a second enqueue from re-listening. - await ctx.enqueueTask(task, "second"); - strictEqual(queue.listenCount, 1); - strictEqual(queue.enqueued.length, 2); - }, - ); - - await t.step( - "rejects a handle from another federation at enqueue", - async () => { - const queue = new MockQueue(); - const federation = createFederation({ - ...baseOptions, - queue: { task: queue }, - }); - const other = createFederation({ - ...baseOptions, - queue: { task: new MockQueue() }, - }); - const foreignTask = other.defineTask("foreign", { - schema: stringSchema, - handler: () => {}, - }); - const ctx = federation.createContext( - new URL("https://example.com/"), - undefined, - ); - await rejects( - () => ctx.enqueueTask(foreignTask, "data"), - { name: "TypeError", message: /is not defined on this federation/ }, - ); - strictEqual(queue.enqueued.length, 0); - }, - ); - - await t.step( - "rejects a same-named handle from another federation", - async () => { - // Name lookup alone cannot tell a foreign handle apart once both - // instances define the same task name: the local context would - // encode under the *schema carried by the foreign handle*, so a - // payload the local schema rejects would enqueue anyway, only to be - // dropped by the worker decoding under the local schema. Both - // instances share TContextData = void, so the phantom-brand check - // cannot reject this at compile time; the handle-identity guard is - // the only defense. - const queue = new MockQueue(); - const federation = createFederation({ - ...baseOptions, - queue: { task: queue }, - }); - let called = 0; - federation.defineTask("rename", { - schema: numberSchema, // the local "rename" takes a number… - handler: () => { - called++; - }, - }); - const other = createFederation({ - ...baseOptions, - queue: { task: new MockQueue() }, - }); - // …while the other instance's "rename" takes a string: - const foreignTask = other.defineTask("rename", { - schema: stringSchema, - handler: () => {}, - }); - const ctx = federation.createContext( - new URL("https://example.com/"), - undefined, - ); - await rejects( - () => ctx.enqueueTask(foreignTask, "not a number"), - { name: "TypeError", message: /is not defined on this federation/ }, - ); - strictEqual(queue.enqueued.length, 0); - strictEqual(called, 0); - }, - ); - - await t.step("passes delay and orderingKey through", async () => { - const queue = new MockQueue(); - const federation = createFederation({ - ...baseOptions, - queue: { task: queue }, - }); - const task = federation.defineTask("delayed", { - schema: stringSchema, - handler: () => {}, - }); - const ctx = federation.createContext( - new URL("https://example.com/"), - undefined, - ); - await ctx.enqueueTask(task, "payload", { - delay: { seconds: 30 }, - orderingKey: "user:alice", - }); - strictEqual(queue.enqueued.length, 1); - const { message, options } = queue.enqueued[0]; - strictEqual(message.taskName, "delayed"); - strictEqual(message.orderingKey, "user:alice"); - strictEqual(message.attempt, 0); - ok(options?.delay instanceof Temporal.Duration); - strictEqual(options.delay.total("second"), 30); - strictEqual(options.orderingKey, "user:alice"); - }); - - await t.step( - "enqueueTaskMany() uses enqueueMany when available", - async () => { - const queue = new MockQueue({ supportsEnqueueMany: true }); - const federation = createFederation({ - ...baseOptions, - queue: { task: queue }, - }); - const task = federation.defineTask("bulk", { - schema: stringSchema, - handler: () => {}, - }); - const ctx = federation.createContext( - new URL("https://example.com/"), - undefined, - ); - await ctx.enqueueTaskMany(task, ["a", "b", "c"]); - strictEqual(queue.enqueued.length, 0); - strictEqual(queue.enqueuedMany.length, 1); - strictEqual(queue.enqueuedMany[0].messages.length, 3); - }, - ); - - await t.step( - "enqueueTaskMany() falls back to parallel enqueues", - async () => { - const queue = new MockQueue(); - const federation = createFederation({ - ...baseOptions, - queue: { task: queue }, - }); - const task = federation.defineTask("bulk-fallback", { - schema: stringSchema, - handler: () => {}, - }); - const ctx = federation.createContext( - new URL("https://example.com/"), - undefined, - ); - await ctx.enqueueTaskMany(task, ["a", "b"]); - strictEqual(queue.enqueued.length, 2); - }, - ); - - await t.step( - "enqueueTaskMany() with no payloads touches no queue", - async () => { - const queue = new MockQueue({ supportsEnqueueMany: true }); - const federation = createFederation({ - ...baseOptions, - queue: { task: queue }, - }); - const task = federation.defineTask("bulk-empty", { - schema: stringSchema, - handler: () => {}, - }); - const ctx = federation.createContext( - new URL("https://example.com/"), - undefined, - ); - await ctx.enqueueTaskMany(task, []); - strictEqual(queue.enqueued.length, 0); - strictEqual(queue.enqueuedMany.length, 0); - }, - ); }); test("task queue routing", async (t) => { @@ -657,6 +443,75 @@ test("startQueue() task worker", async (t) => { strictEqual(received, "payload"); }, ); + + await t.step( + 'queue: "task" starts the outbox fallback when no task queue is set', + async () => { + const inbox = new MockQueue(); + const outbox = new MockQueue(); + const fanout = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + queue: { inbox, outbox, fanout }, + }); + federation.defineTask("fallback", { + schema: stringSchema, + handler: () => {}, + }); + const controller = new AbortController(); + const listening = federation.startQueue(undefined, { + signal: controller.signal, + queue: "task", + }); + // Tasks route to the outbox fallback, so the "task" selector must start + // the outbox worker—otherwise enqueued tasks have no listener. + strictEqual(outbox.listenCount, 1); + strictEqual(inbox.listenCount, 0); + strictEqual(fanout.listenCount, 0); + controller.abort(); + await listening; + }, + ); + + await t.step( + 'queue: "task" starts a task queue shared with the outbox once', + async () => { + const inbox = new MockQueue(); + const shared = new MockQueue(); + const fanout = new MockQueue(); + const federation = createFederation({ + ...baseOptions, + queue: { inbox, outbox: shared, fanout, task: shared }, + }); + const controller = new AbortController(); + const listening = federation.startQueue(undefined, { + signal: controller.signal, + queue: "task", + }); + strictEqual(shared.listenCount, 1); + strictEqual(inbox.listenCount, 0); + strictEqual(fanout.listenCount, 0); + controller.abort(); + await listening; + }, + ); +}); + +test("MockQueue.listen() resolves on a pre-aborted signal", async () => { + const queue = new MockQueue(); + const controller = new AbortController(); + controller.abort(); + let timer: ReturnType | undefined; + const settled = await Promise.race([ + queue.listen(() => {}, { signal: controller.signal }).then(() => + "resolved" + ), + new Promise((resolve) => { + timer = setTimeout(() => resolve("pending"), 50); + }), + ]); + clearTimeout(timer); + strictEqual(settled, "resolved"); }); test("processQueuedTask() task dispatch", async (t) => { diff --git a/packages/fedify/src/testing/mod.ts b/packages/fedify/src/testing/mod.ts index 27cf5bdc9..393b217f1 100644 --- a/packages/fedify/src/testing/mod.ts +++ b/packages/fedify/src/testing/mod.ts @@ -4,6 +4,7 @@ export { createRequestContext, } from "./context.ts"; export { + baseOptions, type Envelope, envelopeSchema, makeSchema, diff --git a/packages/fedify/src/testing/tasks.ts b/packages/fedify/src/testing/tasks.ts index 2dc50c83d..134281fbf 100644 --- a/packages/fedify/src/testing/tasks.ts +++ b/packages/fedify/src/testing/tasks.ts @@ -1,5 +1,19 @@ +/** + * Test-only utilities shared by the task suites: the schema factory and stock + * schemas, the base federation options, and the recording {@link MockQueue}. + * + * These helpers live beside the suites that use them rather than in a shared + * package because {@link MockQueue} needs the package-internal + * {@link TaskMessage} type, and *deno.json*'s `publish.exclude` keeps this + * module out of the published sources. + * + * @module + */ +import { mockDocumentLoader } from "@fedify/fixture"; import { Note } from "@fedify/vocab"; import type { StandardSchemaV1 } from "@standard-schema/spec"; +import type { FederationOptions } from "../federation/federation.ts"; +import { MemoryKvStore } from "../federation/kv.ts"; import type { MessageQueue, MessageQueueEnqueueOptions, @@ -7,6 +21,14 @@ import type { } from "../federation/mq.ts"; import type { TaskMessage } from "../federation/queue.ts"; +/** Federation options (sans `queue`) shared by the task suites. */ +export const baseOptions: Omit, "queue"> = { + kv: new MemoryKvStore(), + documentLoaderFactory: () => mockDocumentLoader, + contextLoaderFactory: () => mockDocumentLoader, + manuallyStartQueue: true, +}; + /** * Builds a minimal [Standard Schema](https://standardschema.dev/) from a type * guard, for use as a task payload schema in tests. @@ -45,19 +67,30 @@ export const envelopeSchema = makeSchema( typeof (data as Envelope).title === "string", ); -/** Options for {@link MockQueue}. */ +/** + * Options for the {@link MockQueue} constructor. + */ export interface MockQueueOptions { + /** Sets {@link MessageQueue.nativeRetrial}. Defaults to `false`. */ readonly nativeRetrial?: boolean; + /** Sets {@link MessageQueue.nativeDeduplication}. Defaults to `false`. */ + readonly nativeDeduplication?: boolean; + /** + * When `true`, the queue exposes {@link MockQueue.enqueueMany} and records + * bulk enqueues; when omitted, the method is absent so callers exercise the + * per-message fan-out path. + */ readonly supportsEnqueueMany?: boolean; } /** - * A {@link MessageQueue} that records what it was asked to enqueue and resolves - * its `listen()` when the abort signal fires, so tests can inspect dispatch - * without a real backend. + * An in-memory {@link MessageQueue} that records task enqueues for assertions + * instead of delivering anything. Its {@link listen} resolves only when the + * abort signal fires. */ export class MockQueue implements MessageQueue { readonly nativeRetrial: boolean; + readonly nativeDeduplication: boolean; readonly enqueued: { message: TaskMessage; options?: MessageQueueEnqueueOptions; @@ -74,6 +107,7 @@ export class MockQueue implements MessageQueue { constructor(options: MockQueueOptions = {}) { this.nativeRetrial = options.nativeRetrial ?? false; + this.nativeDeduplication = options.nativeDeduplication ?? false; if (options.supportsEnqueueMany) { this.enqueueMany = (messages, opts) => { this.enqueuedMany.push({ messages, options: opts }); @@ -82,20 +116,24 @@ export class MockQueue implements MessageQueue { } } - // deno-lint-ignore no-explicit-any - enqueue(message: any, options?: MessageQueueEnqueueOptions): Promise { + enqueue( + message: TaskMessage, + options?: MessageQueueEnqueueOptions, + ): Promise { this.enqueued.push({ message, options }); return Promise.resolve(); } listen( - // deno-lint-ignore no-explicit-any - _handler: (message: any) => Promise | void, + _handler: (message: TaskMessage) => Promise | void, options?: MessageQueueListenOptions, ): Promise { this.listenCount++; + if (options?.signal?.aborted) return Promise.resolve(); return new Promise((resolve) => { - options?.signal?.addEventListener("abort", () => resolve()); + options?.signal?.addEventListener("abort", () => resolve(), { + once: true, + }); }); } } diff --git a/scripts/check_fixture_usage.ts b/scripts/check_fixture_usage.ts index 73ef518d9..ad4c921cc 100644 --- a/scripts/check_fixture_usage.ts +++ b/scripts/check_fixture_usage.ts @@ -10,6 +10,7 @@ * Reviewers must NOT treat a passing run as proof of safety; code * review and the published package contents remain the source of truth. */ +import { expandGlobSync } from "@std/fs/expand-glob"; import { walk } from "@std/fs/walk"; import { dirname, @@ -20,6 +21,15 @@ import { SEPARATOR, } from "@std/path"; +const projectRoot = resolve(dirname(fromFileUrl(import.meta.url)), ".."); +const packagesDir = resolve(projectRoot, "packages"); + +const expandGlobPattern = (pattern: string) => + Array.from( + expandGlobSync(pattern, { root: projectRoot, includeDirs: false }), + (file) => relative(projectRoot, file.path), + ); + /** * Files exempt from the "@fedify/fixture imports must live in *.test.ts" * rule. Every entry MUST be accompanied by an inline comment explaining @@ -27,23 +37,13 @@ import { * necessary or not. */ const ALLOWLIST: readonly string[] = [ - // cfworkers test harness re-exports `mockDocumentLoader`; bundled in via - // tsdown `noExternal` so consumers never resolve `@fedify/fixture` at - // runtime. - "packages/fedify/src/testing/context.ts", - // cfworkers test harness re-exports `testDefinitions`; bundled in via - // tsdown `noExternal` so consumers never resolve `@fedify/fixture` at - // runtime. - "packages/fedify/src/testing/mod.ts", - // Test utils for custom tasks - "packages/fedify/src/testing/tasks.ts", + // Utils for tests. + "packages/fedify/src/testing/*", // JSDoc `@example` block mentions `import { test } from "@fedify/fixture"` // as documentation; not a real runtime import. "packages/testing/src/mq-tester.ts", -].map((path) => join(...path.split("/") as [string, ...string[]])); - -const projectRoot = resolve(dirname(fromFileUrl(import.meta.url)), ".."); -const packagesDir = resolve(projectRoot, "packages"); +].map((path) => join(...path.split("/") as [string, ...string[]])) + .flatMap((path) => path.includes("*") ? expandGlobPattern(path) : path); /** * Statement-level pattern for any `import` or `export ... from`