diff --git a/CHANGES.md b/CHANGES.md index c12781771..c57a58599 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -22,10 +22,43 @@ To be released. and delayed counts, and `ParallelMessageQueue` delegates depth reporting to its wrapped queue when supported. [[#735], [#748]] + - Added OpenTelemetry metrics for ActivityPub delivery attempts, permanent + delivery failures, inbox listener processing duration, and HTTP Signature + verification failures. Applications can pass the new `meterProvider` + option to `createFederation()`, and `Context.meterProvider` exposes the + provider available to request, inbox, and outbox code. + [[#316], [#619], [#755]] + + - Added the `activitypub.delivery.failed` span event to queued outbox + delivery spans so retry and permanent-failure decisions include the + remote host, attempt number, and HTTP status code when available. + [[#316], [#619], [#755]] + + - *Breaking change*: Changed the `activitypub.activity.sent` span event to + record delivery metadata (`activitypub.inbox.url` and + `activitypub.activity.id`) instead of the full `activitypub.activity.json` + payload. `FedifySpanExporter` now stores outbound records from those + attributes, and `TraceActivityRecord.activityJson` is present only when the + span event includes full activity JSON. [[#316], [#619], [#755]] + +[#316]: https://github.com/fedify-dev/fedify/issues/316 +[#619]: https://github.com/fedify-dev/fedify/issues/619 [#735]: https://github.com/fedify-dev/fedify/issues/735 [#748]: https://github.com/fedify-dev/fedify/pull/748 [#752]: https://github.com/fedify-dev/fedify/issues/752 [#753]: https://github.com/fedify-dev/fedify/pull/753 +[#755]: https://github.com/fedify-dev/fedify/pull/755 + +### @fedify/fixture + + - Added `createTestMeterProvider()` and `TestMetricRecorder` helpers for + asserting OpenTelemetry metric measurements in runtime-agnostic tests. + [[#316], [#619], [#755]] + +### @fedify/testing + + - Added a `meterProvider` option to `createFederation()` so mock contexts can + expose a test OpenTelemetry meter provider. [[#316], [#619], [#755]] ### @fedify/amqp diff --git a/deno.json b/deno.json index f8495b6cb..55f99c3a7 100644 --- a/deno.json +++ b/deno.json @@ -47,6 +47,7 @@ "@opentelemetry/api": "npm:@opentelemetry/api@^1.9.0", "@opentelemetry/context-async-hooks": "npm:@opentelemetry/context-async-hooks@^2.5.0", "@opentelemetry/core": "npm:@opentelemetry/core@^2.5.0", + "@opentelemetry/sdk-metrics": "npm:@opentelemetry/sdk-metrics@2.5.0", "@opentelemetry/sdk-trace-base": "npm:@opentelemetry/sdk-trace-base@^2.5.0", "@opentelemetry/semantic-conventions": "npm:@opentelemetry/semantic-conventions@^1.39.0", "@optique/config": "jsr:@optique/config@^1.0.2", diff --git a/deno.lock b/deno.lock index d1ae8395f..6d41fb003 100644 --- a/deno.lock +++ b/deno.lock @@ -84,7 +84,8 @@ "npm:@nuxt/schema@4": "4.4.2", "npm:@opentelemetry/api@^1.9.0": "1.9.1", "npm:@opentelemetry/context-async-hooks@^2.5.0": "2.7.0_@opentelemetry+api@1.9.1", - "npm:@opentelemetry/core@^2.5.0": "2.7.0_@opentelemetry+api@1.9.1", + "npm:@opentelemetry/core@^2.5.0": "2.7.1_@opentelemetry+api@1.9.1", + "npm:@opentelemetry/sdk-metrics@2.5.0": "2.5.0_@opentelemetry+api@1.9.1", "npm:@opentelemetry/sdk-trace-base@^2.5.0": "2.7.0_@opentelemetry+api@1.9.1", "npm:@opentelemetry/semantic-conventions@^1.39.0": "1.40.0", "npm:@poppanator/http-constants@^1.1.1": "1.1.1", @@ -2451,6 +2452,13 @@ "@opentelemetry/api" ] }, + "@opentelemetry/core@2.5.0_@opentelemetry+api@1.9.1": { + "integrity": "sha512-ka4H8OM6+DlUhSAZpONu0cPBtPPTQKxbxVzC4CzVx5+K4JnroJVBtDzLAMx4/3CDTJXRvVFhpFjtl4SaiTNoyQ==", + "dependencies": [ + "@opentelemetry/api", + "@opentelemetry/semantic-conventions" + ] + }, "@opentelemetry/core@2.7.0_@opentelemetry+api@1.9.1": { "integrity": "sha512-DT12SXVwV2eoJrGf4nnsvZojxxeQo+LlNAsoYGRRObPWTeN6APiqZ2+nqDCQDvQX40eLi1AePONS0onoASp3yQ==", "dependencies": [ @@ -2458,20 +2466,43 @@ "@opentelemetry/semantic-conventions" ] }, + "@opentelemetry/core@2.7.1_@opentelemetry+api@1.9.1": { + "integrity": "sha512-QAqIj32AtK6+pEVNG7EOVxHdE06RP+FM5qpiEJ4RtDcFIqKUZHYhl7/7UY5efhwmwNAg7j8QbJVBLxMerc0+gw==", + "dependencies": [ + "@opentelemetry/api", + "@opentelemetry/semantic-conventions" + ] + }, + "@opentelemetry/resources@2.5.0_@opentelemetry+api@1.9.1": { + "integrity": "sha512-F8W52ApePshpoSrfsSk1H2yJn9aKjCrbpQF1M9Qii0GHzbfVeFUB+rc3X4aggyZD8x9Gu3Slua+s6krmq6Dt8g==", + "dependencies": [ + "@opentelemetry/api", + "@opentelemetry/core@2.5.0_@opentelemetry+api@1.9.1", + "@opentelemetry/semantic-conventions" + ] + }, "@opentelemetry/resources@2.7.0_@opentelemetry+api@1.9.1": { "integrity": "sha512-K+oi0hNMv94EpZbnW3eyu2X6SGVpD3O5DhG2NIp65Hc7lhAj9brRXTAVzh3wB82+q3ThakEf7Zd7RsFUqcTc7A==", "dependencies": [ "@opentelemetry/api", - "@opentelemetry/core", + "@opentelemetry/core@2.7.0_@opentelemetry+api@1.9.1", "@opentelemetry/semantic-conventions" ] }, + "@opentelemetry/sdk-metrics@2.5.0_@opentelemetry+api@1.9.1": { + "integrity": "sha512-BeJLtU+f5Gf905cJX9vXFQorAr6TAfK3SPvTFqP+scfIpDQEJfRaGJWta7sJgP+m4dNtBf9y3yvBKVAZZtJQVA==", + "dependencies": [ + "@opentelemetry/api", + "@opentelemetry/core@2.5.0_@opentelemetry+api@1.9.1", + "@opentelemetry/resources@2.5.0_@opentelemetry+api@1.9.1" + ] + }, "@opentelemetry/sdk-trace-base@2.7.0_@opentelemetry+api@1.9.1": { "integrity": "sha512-Yg9zEXJB50DLVLpsKPk7NmNqlPlS+OvqhJGh0A8oawIOTPOwlm4eXs9BMJV7L79lvEwI+dWtAj+YjTyddV336A==", "dependencies": [ "@opentelemetry/api", - "@opentelemetry/core", - "@opentelemetry/resources", + "@opentelemetry/core@2.7.0_@opentelemetry+api@1.9.1", + "@opentelemetry/resources@2.7.0_@opentelemetry+api@1.9.1", "@opentelemetry/semantic-conventions" ] }, @@ -9269,6 +9300,7 @@ "npm:@opentelemetry/api@^1.9.0", "npm:@opentelemetry/context-async-hooks@^2.5.0", "npm:@opentelemetry/core@^2.5.0", + "npm:@opentelemetry/sdk-metrics@2.5.0", "npm:@opentelemetry/sdk-trace-base@^2.5.0", "npm:@opentelemetry/semantic-conventions@^1.39.0", "npm:@solidjs/start@^1.3.0", diff --git a/docs/manual/federation.md b/docs/manual/federation.md index 5e8b536df..3de062cfe 100644 --- a/docs/manual/federation.md +++ b/docs/manual/federation.md @@ -444,6 +444,18 @@ For more information, see the [*OpenTelemetry* section](./opentelemetry.md). [`trace.getTracerProvider()`]: https://open-telemetry.github.io/opentelemetry-js/classes/_opentelemetry_api._opentelemetry_api.TraceAPI.html#gettracerprovider +### `meterProvider` + +*This API is available since Fedify 2.3.0.* + +The OpenTelemetry meter provider that the `Federation` object uses to record +Fedify metrics. If omitted, it is configured to use the default meter provider +(i.e., [`metrics.getMeterProvider()`]). + +For more information, see the [*OpenTelemetry* section](./opentelemetry.md). + +[`metrics.getMeterProvider()`]: https://open-telemetry.github.io/opentelemetry-js/classes/_opentelemetry_api._opentelemetry_api.MetricsAPI.html#getmeterprovider + Builder pattern for structuring ------------------------------- diff --git a/docs/manual/opentelemetry.md b/docs/manual/opentelemetry.md index 39e4e684a..ff81f27dc 100644 --- a/docs/manual/opentelemetry.md +++ b/docs/manual/opentelemetry.md @@ -2,7 +2,8 @@ description: >- OpenTelemetry is a set of APIs, libraries, agents, and instrumentation to provide observability to your applications. Fedify supports OpenTelemetry - for tracing. This document explains how to use OpenTelemetry with Fedify. + for tracing and metrics. This document explains how to use OpenTelemetry + with Fedify. --- OpenTelemetry @@ -12,8 +13,8 @@ OpenTelemetry [OpenTelemetry] is a standardized set of APIs, libraries, agents, and instrumentation to provide observability to your applications. Fedify supports -OpenTelemetry for tracing. This document explains how to use OpenTelemetry with -Fedify. +OpenTelemetry for tracing and metrics. This document explains how to use +OpenTelemetry with Fedify. [OpenTelemetry]: https://opentelemetry.io/ @@ -26,10 +27,11 @@ Setting up OpenTelemetry > support. See the [*Using Deno's built-in OpenTelemetry support* > section](#using-deno-s-built-in-opentelemetry-support) for more details. -To trace your Fedify application with OpenTelemetry, you need to set up the -OpenTelemetry SDK. First of all, you need to install the OpenTelemetry SDK and -the tracer exporter you want to use. For example, if you want to use the trace -exporter for OTLP (http/protobuf), you should install the following packages: +To trace your Fedify application and collect metrics with OpenTelemetry, you +need to set up the OpenTelemetry SDK. First of all, you need to install the +OpenTelemetry SDK and the exporter you want to use. For example, if you want +to use the trace exporter for OTLP (http/protobuf), you should install the +following packages: ::: code-group @@ -146,6 +148,34 @@ const federation = createFederation({ [*OpenTelemetry Support* section]: https://docs.sentry.io/platforms/javascript/guides/node/opentelemetry/ +Explicit [`MeterProvider`] configuration +---------------------------------------- + +*This API is available since Fedify 2.3.0.* + +The `createFederation()` function also accepts the +[`meterProvider`](./federation.md#meterprovider) option to explicitly configure +the [`MeterProvider`] for OpenTelemetry metrics. If it is omitted, Fedify uses +the global default [`MeterProvider`] provided by the OpenTelemetry SDK. + +~~~~ typescript twoslash +import type { KvStore } from "@fedify/fedify"; +// ---cut-before--- +import { createFederation } from "@fedify/fedify"; +import { metrics } from "@opentelemetry/api"; + +const federation = createFederation({ +// ---cut-start--- + kv: null as unknown as KvStore, +// ---cut-end--- + // Omitted for brevity; see the related section for details. + meterProvider: metrics.getMeterProvider(), +}); +~~~~ + +[`MeterProvider`]: https://open-telemetry.github.io/opentelemetry-js/interfaces/_opentelemetry_api._opentelemetry_api.MeterProvider.html + + Instrumented spans ------------------ @@ -203,7 +233,8 @@ The following span events are recorded: | Event name | Recorded on span | Description | | ------------------------------- | --------------------------- | -------------------------------------------------------------------------------- | | `activitypub.activity.received` | `activitypub.inbox` | Records full activity JSON and verification status when an activity is received. | -| `activitypub.activity.sent` | `activitypub.send_activity` | Records full activity JSON and delivery details when an activity is sent. | +| `activitypub.activity.sent` | `activitypub.send_activity` | Records delivery details when an activity is sent. | +| `activitypub.delivery.failed` | `activitypub.outbox` | Records queued outbox delivery failure details before retry or abandonment. | | `activitypub.object.fetched` | `activitypub.lookup_object` | Records full object JSON when successfully fetched. | [span events]: https://opentelemetry.io/docs/concepts/signals/traces/#span-events @@ -232,9 +263,25 @@ Each span event includes attributes with detailed information: **`activitypub.activity.sent` event attributes:** - - `activitypub.activity.json`: The complete activity JSON being sent - `activitypub.inbox.url`: The inbox URL where the activity was delivered - `activitypub.activity.id`: The activity ID + - `activitypub.activity.type` (optional): The qualified activity type URI + - `activitypub.actor.id` (optional): The sender actor ID + +The `activitypub.activity.sent` event records delivery metadata and lightweight +activity identifiers only. It does not include the full +`activitypub.activity.json` payload; if you need the full outbound activity for +auditing, store it in your application before delivery and correlate it with +`activitypub.activity.id`. + +**`activitypub.delivery.failed` event attributes:** + + - `activitypub.remote.host`: The remote inbox host + - `activitypub.delivery.attempt`: The zero-based queue delivery attempt + - `activitypub.delivery.permanent_failure`: Whether Fedify will abandon the + delivery instead of retrying + - `http.response.status_code` (optional): The HTTP response status code + returned by the remote inbox **`activitypub.object.fetched` event attributes:** @@ -242,6 +289,47 @@ Each span event includes attributes with detailed information: - `activitypub.object.json`: The complete object JSON +Instrumented metrics +-------------------- + +*This API is available since Fedify 2.3.0.* + +Fedify records the following OpenTelemetry metrics: + +| Metric name | Instrument | Unit | Description | +| -------------------------------------------- | ---------- | ----------- | ----------------------------------------------------------- | +| `activitypub.delivery.sent` | Counter | `{attempt}` | Counts outgoing ActivityPub delivery attempts. | +| `activitypub.delivery.permanent_failure` | Counter | `{failure}` | Counts outgoing deliveries abandoned as permanent failures. | +| `activitypub.delivery.duration` | Histogram | `ms` | Measures outgoing ActivityPub delivery attempt duration. | +| `activitypub.inbox.processing_duration` | Histogram | `ms` | Measures inbox listener processing duration. | +| `activitypub.signature.verification_failure` | Counter | `{failure}` | Counts failed signature verification for inbox requests. | + +### Metric attributes + +`activitypub.delivery.sent` +: `activitypub.remote.host`, `activitypub.delivery.success`, and + `activitypub.activity.type` when Fedify knows the activity type. + +`activitypub.delivery.permanent_failure` +: `activitypub.remote.host` and `http.response.status_code`. + +`activitypub.delivery.duration` +: `activitypub.remote.host`, `activitypub.delivery.success`, and + `activitypub.activity.type` when Fedify knows the activity type. + +`activitypub.inbox.processing_duration` +: `activitypub.activity.type`. + +`activitypub.signature.verification_failure` +: `activitypub.verification.failure_reason`, plus + `activitypub.remote.host` when the failed signature includes a key ID. + +Fedify records `activitypub.remote.host` as the URL hostname only; ports, paths, +and query strings are deliberately excluded to keep metric cardinality bounded. +Activity types use the same qualified URI form as Fedify's trace attributes, +for example `https://www.w3.org/ns/activitystreams#Create`. + + Semantic [attributes] for ActivityPub ------------------------------------- @@ -250,56 +338,59 @@ for ActivityPub as of November 2024. However, Fedify provides a set of semantic [attributes] for ActivityPub. The following table shows the semantic attributes for ActivityPub: -| Attribute | Type | Description | Example | -| ------------------------------------- | -------- | ------------------------------------------------------------------------------------------------------------------------ | -------------------------------------------------------------------- | -| `activitypub.activity.id` | string | The URI of the activity object. | `"https://example.com/activity/1"` | -| `activitypub.activity.type` | string[] | The qualified URI(s) of the activity type(s). | `["https://www.w3.org/ns/activitystreams#Create"]` | -| `activitypub.activity.to` | string[] | The URI(s) of the recipient collections/actors of the activity. | `["https://example.com/1/followers/2"]` | -| `activitypub.activity.cc` | string[] | The URI(s) of the carbon-copied recipient collections/actors of the activity. | `["https://www.w3.org/ns/activitystreams#Public"]` | -| `activitypub.activity.bto` | string[] | The URI(s) of the blind recipient collections/actors of the activity. | `["https://example.com/1/followers/2"]` | -| `activitypub.activity.bcc` | string[] | The URI(s) of the blind carbon-copied recipient collections/actors of the activity. | `["https://www.w3.org/ns/activitystreams#Public"]` | -| `activitypub.activity.retries` | int | The ordinal number of activity resending attempt (if and only if it's retried). | `3` | -| `activitypub.actor.id` | string | The URI of the actor object. | `"https://example.com/actor/1"` | -| `activitypub.actor.key.cached` | boolean | Whether the actor's public keys are cached. | `true` | -| `activitypub.actor.type` | string[] | The qualified URI(s) of the actor type(s). | `["https://www.w3.org/ns/activitystreams#Person"]` | -| `activitypub.key.id` | string | The URI of the cryptographic key being verified. | `"https://example.com/actor/1#main-key"` | -| `activitypub.key_ownership.method` | string | The method used to verify key ownership (`owner_id` or `actor_fetch`). | `"actor_fetch"` | -| `activitypub.key_ownership.verified` | boolean | Whether the key ownership was successfully verified. | `true` | -| `activitypub.collection.id` | string | The URI of the collection object. | `"https://example.com/collection/1"` | -| `activitypub.collection.type` | string[] | The qualified URI(s) of the collection type(s). | `["https://www.w3.org/ns/activitystreams#OrderedCollection"]` | -| `activitypub.collection.total_items` | int | The total number of items in the collection. | `42` | -| `activitypub.object.id` | string | The URI of the object or the object enclosed by the activity. | `"https://example.com/object/1"` | -| `activitypub.object.type` | string[] | The qualified URI(s) of the object type(s). | `["https://www.w3.org/ns/activitystreams#Note"]` | -| `activitypub.object.in_reply_to` | string[] | The URI(s) of the original object to which the object reply. | `["https://example.com/object/1"]` | -| `activitypub.inboxes` | int | The number of inboxes the activity is sent to. | `12` | -| `activitypub.shared_inbox` | boolean | Whether the activity is sent to the shared inbox. | `true` | -| `docloader.context_url` | string | The URL of the JSON-LD context document (if provided via Link header). | `"https://www.w3.org/ns/activitystreams"` | -| `docloader.document_url` | string | The final URL of the fetched document (after following redirects). | `"https://example.com/object/1"` | -| `fedify.actor.identifier` | string | The identifier of the actor. | `"1"` | -| `fedify.inbox.recipient` | string | The identifier of the inbox recipient. | `"1"` | -| `fedify.object.type` | string | The URI of the object type. | `"https://www.w3.org/ns/activitystreams#Note"` | -| `fedify.object.values.{parameter}` | string[] | The argument values of the object dispatcher. | `["1", "2"]` | -| `fedify.collection.cursor` | string | The cursor of the collection. | `"eyJpZCI6IjEiLCJ0eXBlIjoiT3JkZXJlZENvbGxlY3Rpb24ifQ=="` | -| `fedify.collection.items` | number | The number of items in the collection page. It can be less than the total items. | `10` | -| `http.redirect.url` | string | The redirect URL when a document fetch results in a redirect. | `"https://example.com/new-location"` | -| `http.response.status_code` | int | The HTTP response status code. | `200` | -| `http_signatures.signature` | string | The signature of the HTTP request in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | -| `http_signatures.algorithm` | string | The algorithm of the HTTP request signature. | `"rsa-sha256"` | -| `http_signatures.key_id` | string | The public key ID of the HTTP request signature. | `"https://example.com/actor/1#main-key"` | -| `http_signatures.verified` | boolean | Whether the HTTP request signature was verified successfully. | `false` | -| `http_signatures.failure_reason` | string | Why HTTP signature verification failed (`noSignature`, `invalidSignature`, or `keyFetchError`). | `"keyFetchError"` | -| `http_signatures.key_fetch_status` | int | The HTTP status code from a failed signing-key fetch, when available. | `410` | -| `http_signatures.key_fetch_error` | string | The error type from a non-HTTP signing-key fetch failure, when available. | `"TypeError"` | -| `http_signatures.digest.{algorithm}` | string | The digest of the HTTP request body in hexadecimal. The `{algorithm}` is the digest algorithm (e.g., `sha`, `sha-256`). | `"d41d8cd98f00b204e9800998ecf8427e"` | -| `ld_signatures.key_id` | string | The public key ID of the Linked Data signature. | `"https://example.com/actor/1#main-key"` | -| `ld_signatures.signature` | string | The signature of the Linked Data in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | -| `ld_signatures.type` | string | The algorithm of the Linked Data signature. | `"RsaSignature2017"` | -| `object_integrity_proofs.cryptosuite` | string | The cryptographic suite of the object integrity proof. | `"eddsa-jcs-2022"` | -| `object_integrity_proofs.key_id` | string | The public key ID of the object integrity proof. | `"https://example.com/actor/1#main-key"` | -| `object_integrity_proofs.signature` | string | The integrity proof of the object in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | -| `url.full` | string | The full URL being fetched by the document loader. | `"https://example.com/actor/1"` | -| `webfinger.resource` | string | The queried resource URI. | `"acct:fedify@hollo.social"` | -| `webfinger.resource.scheme` | string | The scheme of the queried resource URI. | `"acct"` | +| Attribute | Type | Description | Example | +| ---------------------------------------- | -------- | ------------------------------------------------------------------------------------------------------------------------ | -------------------------------------------------------------------- | +| `activitypub.activity.id` | string | The URI of the activity object. | `"https://example.com/activity/1"` | +| `activitypub.activity.type` | string[] | The qualified URI(s) of the activity type(s). | `["https://www.w3.org/ns/activitystreams#Create"]` | +| `activitypub.activity.to` | string[] | The URI(s) of the recipient collections/actors of the activity. | `["https://example.com/1/followers/2"]` | +| `activitypub.activity.cc` | string[] | The URI(s) of the carbon-copied recipient collections/actors of the activity. | `["https://www.w3.org/ns/activitystreams#Public"]` | +| `activitypub.activity.bto` | string[] | The URI(s) of the blind recipient collections/actors of the activity. | `["https://example.com/1/followers/2"]` | +| `activitypub.activity.bcc` | string[] | The URI(s) of the blind carbon-copied recipient collections/actors of the activity. | `["https://www.w3.org/ns/activitystreams#Public"]` | +| `activitypub.activity.retries` | int | The ordinal number of activity resending attempt (if and only if it's retried). | `3` | +| `activitypub.delivery.attempt` | int | The zero-based delivery attempt number for a queued outgoing activity. | `0` | +| `activitypub.delivery.permanent_failure` | boolean | Whether an outgoing delivery failure will be abandoned instead of retried. | `true` | +| `activitypub.actor.id` | string | The URI of the actor object. | `"https://example.com/actor/1"` | +| `activitypub.actor.key.cached` | boolean | Whether the actor's public keys are cached. | `true` | +| `activitypub.actor.type` | string[] | The qualified URI(s) of the actor type(s). | `["https://www.w3.org/ns/activitystreams#Person"]` | +| `activitypub.key.id` | string | The URI of the cryptographic key being verified. | `"https://example.com/actor/1#main-key"` | +| `activitypub.key_ownership.method` | string | The method used to verify key ownership (`owner_id` or `actor_fetch`). | `"actor_fetch"` | +| `activitypub.key_ownership.verified` | boolean | Whether the key ownership was successfully verified. | `true` | +| `activitypub.collection.id` | string | The URI of the collection object. | `"https://example.com/collection/1"` | +| `activitypub.collection.type` | string[] | The qualified URI(s) of the collection type(s). | `["https://www.w3.org/ns/activitystreams#OrderedCollection"]` | +| `activitypub.collection.total_items` | int | The total number of items in the collection. | `42` | +| `activitypub.object.id` | string | The URI of the object or the object enclosed by the activity. | `"https://example.com/object/1"` | +| `activitypub.object.type` | string[] | The qualified URI(s) of the object type(s). | `["https://www.w3.org/ns/activitystreams#Note"]` | +| `activitypub.object.in_reply_to` | string[] | The URI(s) of the original object to which the object reply. | `["https://example.com/object/1"]` | +| `activitypub.inboxes` | int | The number of inboxes the activity is sent to. | `12` | +| `activitypub.remote.host` | string | The hostname of the remote ActivityPub server. | `"example.com"` | +| `activitypub.shared_inbox` | boolean | Whether the activity is sent to the shared inbox. | `true` | +| `docloader.context_url` | string | The URL of the JSON-LD context document (if provided via Link header). | `"https://www.w3.org/ns/activitystreams"` | +| `docloader.document_url` | string | The final URL of the fetched document (after following redirects). | `"https://example.com/object/1"` | +| `fedify.actor.identifier` | string | The identifier of the actor. | `"1"` | +| `fedify.inbox.recipient` | string | The identifier of the inbox recipient. | `"1"` | +| `fedify.object.type` | string | The URI of the object type. | `"https://www.w3.org/ns/activitystreams#Note"` | +| `fedify.object.values.{parameter}` | string[] | The argument values of the object dispatcher. | `["1", "2"]` | +| `fedify.collection.cursor` | string | The cursor of the collection. | `"eyJpZCI6IjEiLCJ0eXBlIjoiT3JkZXJlZENvbGxlY3Rpb24ifQ=="` | +| `fedify.collection.items` | number | The number of items in the collection page. It can be less than the total items. | `10` | +| `http.redirect.url` | string | The redirect URL when a document fetch results in a redirect. | `"https://example.com/new-location"` | +| `http.response.status_code` | int | The HTTP response status code. | `200` | +| `http_signatures.signature` | string | The signature of the HTTP request in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | +| `http_signatures.algorithm` | string | The algorithm of the HTTP request signature. | `"rsa-sha256"` | +| `http_signatures.key_id` | string | The public key ID of the HTTP request signature. | `"https://example.com/actor/1#main-key"` | +| `http_signatures.verified` | boolean | Whether the HTTP request signature was verified successfully. | `false` | +| `http_signatures.failure_reason` | string | Why HTTP signature verification failed (`noSignature`, `invalidSignature`, or `keyFetchError`). | `"keyFetchError"` | +| `http_signatures.key_fetch_status` | int | The HTTP status code from a failed signing-key fetch, when available. | `410` | +| `http_signatures.key_fetch_error` | string | The error type from a non-HTTP signing-key fetch failure, when available. | `"TypeError"` | +| `http_signatures.digest.{algorithm}` | string | The digest of the HTTP request body in hexadecimal. The `{algorithm}` is the digest algorithm (e.g., `sha`, `sha-256`). | `"d41d8cd98f00b204e9800998ecf8427e"` | +| `ld_signatures.key_id` | string | The public key ID of the Linked Data signature. | `"https://example.com/actor/1#main-key"` | +| `ld_signatures.signature` | string | The signature of the Linked Data in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | +| `ld_signatures.type` | string | The algorithm of the Linked Data signature. | `"RsaSignature2017"` | +| `object_integrity_proofs.cryptosuite` | string | The cryptographic suite of the object integrity proof. | `"eddsa-jcs-2022"` | +| `object_integrity_proofs.key_id` | string | The public key ID of the object integrity proof. | `"https://example.com/actor/1#main-key"` | +| `object_integrity_proofs.signature` | string | The integrity proof of the object in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | +| `url.full` | string | The full URL being fetched by the document loader. | `"https://example.com/actor/1"` | +| `webfinger.resource` | string | The queried resource URI. | `"acct:fedify@hollo.social"` | +| `webfinger.resource.scheme` | string | The scheme of the queried resource URI. | `"acct"` | [attributes]: https://opentelemetry.io/docs/specs/otel/common/#attribute [OpenTelemetry Semantic Conventions]: https://opentelemetry.io/docs/specs/semconv/ @@ -324,13 +415,22 @@ ActivityPub activities for a debug dashboard: import type { SpanExporter, ReadableSpan } from "@opentelemetry/sdk-trace-base"; import { ExportResultCode } from "@opentelemetry/core"; -interface ActivityRecord { - direction: "inbound" | "outbound"; +interface InboundActivityRecord { + direction: "inbound"; activity: unknown; timestamp: Date; verified?: boolean; } +interface OutboundActivityRecord { + direction: "outbound"; + activityId?: string; + inboxUrl?: string; + timestamp: Date; +} + +type ActivityRecord = InboundActivityRecord | OutboundActivityRecord; + export class FedifyDebugExporter implements SpanExporter { private activities: ActivityRecord[] = []; @@ -359,11 +459,24 @@ export class FedifyDebugExporter implements SpanExporter { (e) => e.name === "activitypub.activity.sent" ); if (event && event.attributes) { + const activityId = event.attributes[ + "activitypub.activity.id" + ] as string | undefined; + const inboxUrl = event.attributes[ + "activitypub.inbox.url" + ] as string | undefined; + const activityType = event.attributes[ + "activitypub.activity.type" + ] as string | undefined; + const actorId = event.attributes[ + "activitypub.actor.id" + ] as string | undefined; this.activities.push({ direction: "outbound", - activity: JSON.parse( - event.attributes["activitypub.activity.json"] as string - ), + activityId, + activityType, + actorId, + inboxUrl, timestamp: new Date(span.startTime[0] * 1000), }); } diff --git a/docs/why.md b/docs/why.md index 62213af01..4e12d55f6 100644 --- a/docs/why.md +++ b/docs/why.md @@ -201,8 +201,8 @@ TypeScript-native design : Comprehensive type definitions with intelligent auto-completion Observability -: [Built-in OpenTelemetry support](./manual/opentelemetry.md) for tracing and - monitoring +: [Built-in OpenTelemetry support](./manual/opentelemetry.md) for tracing, + metrics, and monitoring [CLI toolchain](./cli.md) : Tools for testing and debugging federation, including: diff --git a/packages/debugger/src/views/trace-detail.tsx b/packages/debugger/src/views/trace-detail.tsx index 22f2899bc..2277709a3 100644 --- a/packages/debugger/src/views/trace-detail.tsx +++ b/packages/debugger/src/views/trace-detail.tsx @@ -168,10 +168,12 @@ export const TraceDetailPage: FC = ( -
- Activity JSON -
{formatJson(activity.activityJson)}
-
+ {activity.activityJson == null ? null : ( +
+ Activity JSON +
{formatJson(activity.activityJson)}
+
+ )} )) )} diff --git a/packages/fedify/package.json b/packages/fedify/package.json index 8efbc46ad..4aab14153 100644 --- a/packages/fedify/package.json +++ b/packages/fedify/package.json @@ -160,6 +160,7 @@ "devDependencies": { "@fedify/fixture": "workspace:*", "@fedify/vocab-tools": "workspace:^", + "@opentelemetry/sdk-metrics": "catalog:", "@std/assert": "jsr:^0.226.0", "@std/path": "catalog:", "@types/node": "^24.2.1", diff --git a/packages/fedify/src/federation/context.ts b/packages/fedify/src/federation/context.ts index 0a82afdcf..14fa6a79e 100644 --- a/packages/fedify/src/federation/context.ts +++ b/packages/fedify/src/federation/context.ts @@ -16,7 +16,7 @@ import type { LookupWebFingerOptions, ResourceDescriptor, } from "@fedify/webfinger"; -import type { TracerProvider } from "@opentelemetry/api"; +import type { MeterProvider, TracerProvider } from "@opentelemetry/api"; import type { GetNodeInfoOptions } from "../nodeinfo/client.ts"; import type { JsonValue, NodeInfo } from "../nodeinfo/types.ts"; import type { GetKeyOwnerOptions } from "../sig/owner.ts"; @@ -70,6 +70,12 @@ export interface Context { */ readonly tracerProvider: TracerProvider; + /** + * The OpenTelemetry meter provider. + * @since 2.3.0 + */ + readonly meterProvider?: MeterProvider; + /** * The document loader for loading remote JSON-LD documents. */ diff --git a/packages/fedify/src/federation/federation.ts b/packages/fedify/src/federation/federation.ts index fa91f2253..483d0b764 100644 --- a/packages/fedify/src/federation/federation.ts +++ b/packages/fedify/src/federation/federation.ts @@ -10,7 +10,7 @@ import type { DocumentLoaderFactory, GetUserAgentOptions, } from "@fedify/vocab-runtime"; -import type { TracerProvider } from "@opentelemetry/api"; +import type { MeterProvider, TracerProvider } from "@opentelemetry/api"; import type { ActivityTransformer } from "../compat/types.ts"; import type { HttpMessageSignaturesSpec } from "../sig/http.ts"; import type { @@ -1052,6 +1052,13 @@ export interface FederationOptions { * @since 1.3.0 */ tracerProvider?: TracerProvider; + + /** + * The OpenTelemetry meter provider for recording metrics. If not provided, + * the default global meter provider is used. + * @since 2.3.0 + */ + meterProvider?: MeterProvider; } /** diff --git a/packages/fedify/src/federation/handler.test.ts b/packages/fedify/src/federation/handler.test.ts index 9bf81cc48..95dc2b144 100644 --- a/packages/fedify/src/federation/handler.test.ts +++ b/packages/fedify/src/federation/handler.test.ts @@ -1,4 +1,5 @@ import { + createTestMeterProvider, createTestTracerProvider, mockDocumentLoader, test, @@ -12,7 +13,12 @@ import { Tombstone, } from "@fedify/vocab"; import { FetchError } from "@fedify/vocab-runtime"; -import { assert, assertEquals, assertInstanceOf } from "@std/assert"; +import { + assert, + assertEquals, + assertGreaterOrEqual, + assertInstanceOf, +} from "@std/assert"; import { parseAcceptSignature } from "../sig/accept.ts"; import { signRequest } from "../sig/http.ts"; import { @@ -2318,8 +2324,13 @@ test("handleCustomCollection()", async () => { test("handleInbox() records OpenTelemetry span events", async () => { const [tracerProvider, exporter] = createTestTracerProvider(); + const [meterProvider, recorder] = createTestMeterProvider(); const kv = new MemoryKvStore(); - const federation = createFederation({ kv, tracerProvider }); + const federation = createFederation({ + kv, + meterProvider, + tracerProvider, + }); const activity = new Create({ id: new URL("https://example.com/activity"), @@ -2390,6 +2401,7 @@ test("handleInbox() records OpenTelemetry span events", async () => { onNotFound: (_request) => new Response("Not found", { status: 404 }), signatureTimeWindow: false, skipSignatureVerification: true, + meterProvider, tracerProvider, }); @@ -2428,12 +2440,28 @@ test("handleInbox() records OpenTelemetry span events", async () => { ); assertEquals(recordedActivity.id, "https://example.com/activity"); assertEquals(recordedActivity.type, "Create"); + + const durations = recorder.getMeasurements( + "activitypub.inbox.processing_duration", + ); + assertEquals(durations.length, 1); + assertEquals(durations[0].type, "histogram"); + assertGreaterOrEqual(durations[0].value, 0); + assertEquals( + durations[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); }); test("handleInbox() records unverified HTTP signature details", async () => { const [tracerProvider, exporter] = createTestTracerProvider(); + const [meterProvider, recorder] = createTestMeterProvider(); const kv = new MemoryKvStore(); - const federation = createFederation({ kv, tracerProvider }); + const federation = createFederation({ + kv, + meterProvider, + tracerProvider, + }); const keyId = new URL("https://gone.example/users/someone#main-key"); const activity = new Create({ @@ -2508,6 +2536,7 @@ test("handleInbox() records unverified HTTP signature details", async () => { onNotFound: (_request) => new Response("Not found", { status: 404 }), signatureTimeWindow: false, skipSignatureVerification: false, + meterProvider, tracerProvider, }); @@ -2536,6 +2565,21 @@ test("handleInbox() records unverified HTTP signature details", async () => { "keyFetchError", ); assertEquals(event.attributes["http_signatures.key_fetch_status"], 410); + + const failures = recorder.getMeasurements( + "activitypub.signature.verification_failure", + ); + assertEquals(failures.length, 1); + assertEquals(failures[0].type, "counter"); + assertEquals(failures[0].value, 1); + assertEquals( + failures[0].attributes["activitypub.remote.host"], + "gone.example", + ); + assertEquals( + failures[0].attributes["activitypub.verification.failure_reason"], + "keyFetchError", + ); }); test("handleInbox() challenge policy enabled + unsigned request", async () => { @@ -3009,6 +3053,7 @@ test("handleInbox() nonce consumption on valid signed request", async () => { }); test("handleInbox() nonce replay prevention", async () => { + const [meterProvider, recorder] = createTestMeterProvider(); const activity = new Create({ id: new URL("https://example.com/activities/nonce-3"), actor: new URL("https://example.com/person2"), @@ -3031,7 +3076,10 @@ test("handleInbox() nonce replay prevention", async () => { rsaPublicKey3.id!, { spec: "rfc9421", rfc9421: { nonce } }, ); - const federation = createFederation({ kv: new MemoryKvStore() }); + const federation = createFederation({ + kv: new MemoryKvStore(), + meterProvider, + }); const context = createRequestContext({ federation, request: signedRequest, @@ -3063,6 +3111,7 @@ test("handleInbox() nonce replay prevention", async () => { onNotFound: () => new Response("Not found", { status: 404 }), signatureTimeWindow: { minutes: 5 }, skipSignatureVerification: false, + meterProvider, inboxChallengePolicy: { enabled: true, requestNonce: true, @@ -3088,6 +3137,19 @@ test("handleInbox() nonce replay prevention", async () => { "no-store", "Challenge response must have Cache-Control: no-store", ); + const failures = recorder.getMeasurements( + "activitypub.signature.verification_failure", + ); + assertEquals(failures.length, 1); + assertEquals(failures[0].value, 1); + assertEquals( + failures[0].attributes["activitypub.remote.host"], + "example.com", + ); + assertEquals( + failures[0].attributes["activitypub.verification.failure_reason"], + "invalidNonce", + ); }); test( @@ -3229,6 +3291,7 @@ test( test( "handleInbox() actor/key mismatch does not consume nonce", async () => { + const [meterProvider, recorder] = createTestMeterProvider(); // A request that has a valid RFC 9421 signature with a nonce, but the // signing key does not belong to the claimed actor. The nonce must NOT be // consumed so the legitimate sender can still use it. @@ -3260,7 +3323,10 @@ test( rsaPublicKey3.id!, { spec: "rfc9421", rfc9421: { nonce } }, ); - const federation = createFederation({ kv: new MemoryKvStore() }); + const federation = createFederation({ + kv: new MemoryKvStore(), + meterProvider, + }); const context = createRequestContext({ federation, request: maliciousRequest, @@ -3292,6 +3358,7 @@ test( onNotFound: () => new Response("Not found", { status: 404 }), signatureTimeWindow: { minutes: 5 }, skipSignatureVerification: false, + meterProvider, inboxChallengePolicy: { enabled: true, requestNonce: true, @@ -3313,6 +3380,19 @@ test( true, "Nonce must not be consumed when actor/key ownership check fails", ); + const failures = recorder.getMeasurements( + "activitypub.signature.verification_failure", + ); + assertEquals(failures.length, 1); + assertEquals(failures[0].value, 1); + assertEquals( + failures[0].attributes["activitypub.remote.host"], + "example.com", + ); + assertEquals( + failures[0].attributes["activitypub.verification.failure_reason"], + "actorKeyMismatch", + ); }, ); diff --git a/packages/fedify/src/federation/handler.ts b/packages/fedify/src/federation/handler.ts index e407c18f9..5f6dc0ed9 100644 --- a/packages/fedify/src/federation/handler.ts +++ b/packages/fedify/src/federation/handler.ts @@ -15,6 +15,7 @@ import { import type { DocumentLoader } from "@fedify/vocab-runtime"; import { getLogger } from "@logtape/logtape"; import type { + MeterProvider, Span, SpanOptions, Tracer, @@ -63,6 +64,7 @@ import type { import { routeActivity } from "./inbox.ts"; import { KvKeyCache } from "./keycache.ts"; import type { KvKey, KvStore } from "./kv.ts"; +import { getFederationMetrics, getRemoteHost } from "./metrics.ts"; import type { MessageQueue } from "./mq.ts"; import { acceptsJsonLd } from "./negotiation.ts"; @@ -769,6 +771,11 @@ export interface InboxHandlerParameters { idempotencyStrategy?: | IdempotencyStrategy | IdempotencyKeyCallback; + /** + * The meter provider for recording metrics. + * @since 2.3.0 + */ + meterProvider?: MeterProvider; tracerProvider?: TracerProvider; } @@ -988,6 +995,11 @@ async function handleInboxInternal( }); if (verification.verified === false) { const reason = verification.reason; + const remoteHost = "keyId" in reason && reason.keyId != null + ? getRemoteHost(reason.keyId) + : undefined; + getFederationMetrics(parameters.meterProvider) + .recordSignatureVerificationFailure(reason.type, remoteHost); logger.error( "Failed to verify the request's HTTP Signatures.", { @@ -1122,6 +1134,11 @@ async function handleInboxInternal( if ( httpSigKey != null && !await doesActorOwnKey(activity, httpSigKey, ctx) ) { + getFederationMetrics(parameters.meterProvider) + .recordSignatureVerificationFailure( + "actorKeyMismatch", + httpSigKey.id == null ? undefined : getRemoteHost(httpSigKey.id), + ); logger.error( "The signer ({keyId}) and the actor ({actorId}) do not match.", { @@ -1150,6 +1167,11 @@ async function handleInboxInternal( pendingNonceLabel, ); if (!nonceValid) { + getFederationMetrics(parameters.meterProvider) + .recordSignatureVerificationFailure( + "invalidNonce", + httpSigKey?.id == null ? undefined : getRemoteHost(httpSigKey.id), + ); logger.error( "Signature nonce verification failed (missing, expired, or replayed).", { recipient }, @@ -1173,6 +1195,7 @@ async function handleInboxInternal( kvPrefixes, queue, span, + meterProvider: parameters.meterProvider, tracerProvider, idempotencyStrategy: parameters.idempotencyStrategy, }); diff --git a/packages/fedify/src/federation/inbox.ts b/packages/fedify/src/federation/inbox.ts index 3b242191c..db66692a8 100644 --- a/packages/fedify/src/federation/inbox.ts +++ b/packages/fedify/src/federation/inbox.ts @@ -3,6 +3,7 @@ import type { Activity } from "@fedify/vocab"; import { getLogger } from "@logtape/logtape"; import { context, + type MeterProvider, propagation, type Span, SpanKind, @@ -21,6 +22,7 @@ import type { KvKey, KvStore } from "./kv.ts"; import type { MessageQueue } from "./mq.ts"; import type { InboxMessage } from "./queue.ts"; import type { ActivityListenerSet } from "./activity-listener.ts"; +import { getDurationMs, getFederationMetrics } from "./metrics.ts"; export interface RouteActivityParameters { context: Context; @@ -39,6 +41,11 @@ export interface RouteActivityParameters { kvPrefixes: { activityIdempotence: KvKey }; queue?: MessageQueue; span: Span; + /** + * The meter provider for recording metrics. + * @since 2.3.0 + */ + meterProvider?: MeterProvider; tracerProvider?: TracerProvider; idempotencyStrategy?: | IdempotencyStrategy @@ -66,6 +73,7 @@ export async function routeActivity( kvPrefixes, queue, span, + meterProvider, tracerProvider, idempotencyStrategy, }: RouteActivityParameters, @@ -198,15 +206,24 @@ export async function routeActivity( const { class: cls, listener } = dispatched; span.updateName(`activitypub.dispatch_inbox_listener ${cls.name}`); try { - await listener( - inboxContextFactory( - recipient, - json, - activity?.id?.href, - getTypeId(activity!).href, - ), - activity!, - ); + const activityType = getTypeId(activity!).href; + const started = performance.now(); + try { + await listener( + inboxContextFactory( + recipient, + json, + activity?.id?.href, + activityType, + ), + activity!, + ); + } finally { + getFederationMetrics(meterProvider).recordInboxProcessingDuration( + activityType, + getDurationMs(started), + ); + } } catch (error) { try { await inboxErrorHandler?.(ctx, error as Error); diff --git a/packages/fedify/src/federation/metrics.ts b/packages/fedify/src/federation/metrics.ts new file mode 100644 index 000000000..8399b375b --- /dev/null +++ b/packages/fedify/src/federation/metrics.ts @@ -0,0 +1,131 @@ +import { + type Attributes, + type Counter, + type Histogram, + type MeterProvider, + metrics, +} from "@opentelemetry/api"; +import metadata from "../../deno.json" with { type: "json" }; + +class FederationMetrics { + readonly deliverySent: Counter; + readonly deliveryPermanentFailure: Counter; + readonly signatureVerificationFailure: Counter; + readonly deliveryDuration: Histogram; + readonly inboxProcessingDuration: Histogram; + + constructor(meterProvider: MeterProvider) { + const meter = meterProvider.getMeter(metadata.name, metadata.version); + this.deliverySent = meter.createCounter("activitypub.delivery.sent", { + description: "ActivityPub delivery attempts.", + unit: "{attempt}", + }); + this.deliveryPermanentFailure = meter.createCounter( + "activitypub.delivery.permanent_failure", + { + description: "ActivityPub deliveries abandoned as permanent failures.", + unit: "{failure}", + }, + ); + this.signatureVerificationFailure = meter.createCounter( + "activitypub.signature.verification_failure", + { + description: "ActivityPub signature verification failures.", + unit: "{failure}", + }, + ); + this.deliveryDuration = meter.createHistogram( + "activitypub.delivery.duration", + { + description: "Duration of ActivityPub delivery attempts.", + unit: "ms", + }, + ); + this.inboxProcessingDuration = meter.createHistogram( + "activitypub.inbox.processing_duration", + { + description: "Duration of ActivityPub inbox listener processing.", + unit: "ms", + }, + ); + } + + recordDelivery( + inbox: URL, + durationMs: number, + success: boolean, + activityType?: string, + ): void { + const deliveryAttributes: Attributes = { + "activitypub.remote.host": getRemoteHost(inbox), + "activitypub.delivery.success": success, + }; + if (activityType != null) { + deliveryAttributes["activitypub.activity.type"] = activityType; + } + this.deliverySent.add(1, deliveryAttributes); + this.deliveryDuration.record(durationMs, deliveryAttributes); + } + + recordPermanentFailure(inbox: URL, statusCode: number): void { + this.deliveryPermanentFailure.add(1, { + "activitypub.remote.host": getRemoteHost(inbox), + "http.response.status_code": statusCode, + }); + } + + recordSignatureVerificationFailure( + reason: string, + remoteHost?: string, + ): void { + const attributes: Attributes = { + "activitypub.verification.failure_reason": reason, + }; + if (remoteHost != null) { + attributes["activitypub.remote.host"] = remoteHost; + } + this.signatureVerificationFailure.add(1, attributes); + } + + recordInboxProcessingDuration( + activityType: string, + durationMs: number, + ): void { + this.inboxProcessingDuration.record(durationMs, { + "activitypub.activity.type": activityType, + }); + } +} + +const federationMetrics = new WeakMap(); + +/** + * Gets the cached Fedify metric instruments for a meter provider. + * @since 2.3.0 + */ +export function getFederationMetrics( + meterProvider: MeterProvider = metrics.getMeterProvider(), +): FederationMetrics { + let instruments = federationMetrics.get(meterProvider); + if (instruments == null) { + instruments = new FederationMetrics(meterProvider); + federationMetrics.set(meterProvider, instruments); + } + return instruments; +} + +/** + * Gets the bounded remote host attribute value for a URL. + * @since 2.3.0 + */ +export function getRemoteHost(url: URL): string { + return url.hostname; +} + +/** + * Gets an elapsed duration in milliseconds from a `performance.now()` value. + * @since 2.3.0 + */ +export function getDurationMs(start: number): number { + return Math.max(0, performance.now() - start); +} diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index 9390a1bf8..681a52805 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -1,4 +1,5 @@ import { + createTestMeterProvider, createTestTracerProvider, mockDocumentLoader, test, @@ -3274,6 +3275,62 @@ test("FederationImpl.processQueuedTask()", async (t) => { await federation.processQueuedTask(undefined, inboxMessage); assertEquals(queuedMessages, [{ ...inboxMessage, attempt: 1 }]); }); + + await t.step("records queued inbox processing duration", async () => { + const kv = new MemoryKvStore(); + const [meterProvider, recorder] = createTestMeterProvider(); + const queue: MessageQueue = { + enqueue(_message, _options) { + return Promise.resolve(); + }, + listen(_handler, _options) { + return Promise.resolve(); + }, + }; + const federation = new FederationImpl({ + kv, + meterProvider, + queue, + }); + let handled = false; + federation.setInboxListeners("/users/{identifier}/inbox", "/inbox") + .on(vocab.Create, () => { + handled = true; + }); + + await federation.processQueuedTask( + undefined, + { + type: "inbox", + id: crypto.randomUUID(), + baseUrl: "https://example.com", + activity: { + "@context": "https://www.w3.org/ns/activitystreams", + type: "Create", + id: "https://remote.example/activities/1", + actor: "https://remote.example/users/alice", + object: { + type: "Note", + content: "Hello world", + }, + }, + started: new Date().toISOString(), + attempt: 0, + identifier: null, + traceContext: {}, + } satisfies InboxMessage, + ); + + assert(handled); + const durations = recorder.getMeasurements( + "activitypub.inbox.processing_duration", + ); + assertEquals(durations.length, 1); + assertEquals( + durations[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); + }); }); test("FederationImpl.processQueuedTask() permanent failure", async (t) => { @@ -3305,6 +3362,12 @@ test("FederationImpl.processQueuedTask() permanent failure", async (t) => { options: { permanentFailureStatusCodes?: readonly number[]; nativeRetrial?: boolean; + meterProvider?: ConstructorParameters>[0][ + "meterProvider" + ]; + tracerProvider?: ConstructorParameters>[0][ + "tracerProvider" + ]; } = {}, ): PermanentFailureSetup { const kv = new MemoryKvStore(); @@ -3325,6 +3388,12 @@ test("FederationImpl.processQueuedTask() permanent failure", async (t) => { ...(options.permanentFailureStatusCodes ? { permanentFailureStatusCodes: options.permanentFailureStatusCodes } : {}), + ...(options.meterProvider + ? { meterProvider: options.meterProvider } + : {}), + ...(options.tracerProvider + ? { tracerProvider: options.tracerProvider } + : {}), }); federation.setInboxListeners("/users/{identifier}/inbox", "/inbox"); return { federation, queuedMessages }; @@ -3359,7 +3428,12 @@ test("FederationImpl.processQueuedTask() permanent failure", async (t) => { } await t.step("410 Gone triggers permanent failure handler", async () => { - const { federation, queuedMessages } = setup(); + const [meterProvider, recorder] = createTestMeterProvider(); + const [tracerProvider, exporter] = createTestTracerProvider(); + const { federation, queuedMessages } = setup({ + meterProvider, + tracerProvider, + }); let handlerCalled = false; let handlerValues: Record = {}; federation.setOutboxPermanentFailureHandler((_ctx, values) => { @@ -3391,6 +3465,36 @@ test("FederationImpl.processQueuedTask() permanent failure", async (t) => { ]); // Should NOT be re-enqueued for retry assertEquals(queuedMessages, []); + + const failures = recorder.getMeasurements( + "activitypub.delivery.permanent_failure", + ); + assertEquals(failures.length, 1); + assertEquals(failures[0].value, 1); + assertEquals( + failures[0].attributes["activitypub.remote.host"], + "gone.example", + ); + assertEquals( + failures[0].attributes["http.response.status_code"], + 410, + ); + + const events = exporter.getEvents( + "activitypub.outbox", + "activitypub.delivery.failed", + ); + assertEquals(events.length, 1); + assertEquals( + events[0].attributes?.["activitypub.remote.host"], + "gone.example", + ); + assertEquals(events[0].attributes?.["activitypub.delivery.attempt"], 0); + assertEquals( + events[0].attributes?.["activitypub.delivery.permanent_failure"], + true, + ); + assertEquals(events[0].attributes?.["http.response.status_code"], 410); }); await t.step("404 Not Found triggers permanent failure handler", async () => { @@ -3553,6 +3657,58 @@ test("FederationImpl.processQueuedTask() permanent failure", async (t) => { }, ); + await t.step("malformed inbox does not break failure handling", async () => { + await withLogtapeLock(async () => { + const [tracerProvider, exporter] = createTestTracerProvider(); + const { federation, queuedMessages } = setup({ tracerProvider }); + const records: LogRecord[] = []; + await reset(); + try { + await configure({ + sinks: { + buffer(record: LogRecord): void { + records.push(record); + }, + }, + filters: {}, + loggers: [{ category: [], sinks: ["buffer"] }], + }); + + await federation.processQueuedTask( + undefined, + createOutboxMessage( + "not a url", + "https://example.com/activity/9", + ["https://gone.example/users/bob"], + ), + ); + + assertEquals(queuedMessages.length, 1); + assertEquals((queuedMessages[0] as OutboxMessage).attempt, 1); + const events = exporter.getEvents( + "activitypub.outbox", + "activitypub.delivery.failed", + ); + assertEquals(events.length, 1); + assertEquals( + events[0].attributes?.["activitypub.remote.host"], + undefined, + ); + assertEquals(events[0].attributes?.["activitypub.delivery.attempt"], 0); + assertEquals( + records.some((record) => + record.rawMessage === + "Invalid inbox URL in queued outbox message: {inbox}" && + record.properties.inbox === "not a url" + ), + true, + ); + } finally { + await reset(); + } + }); + }); + fetchMock.hardReset(); }); diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index 27b8a6dcc..3ccbcba1a 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -32,6 +32,8 @@ import { lookupWebFinger } from "@fedify/webfinger"; import { getLogger, withContext } from "@logtape/logtape"; import { context, + type MeterProvider, + metrics, propagation, type Span, SpanKind, @@ -101,6 +103,11 @@ import { import { routeActivity } from "./inbox.ts"; import { KvKeyCache } from "./keycache.ts"; import type { KvKey, KvStore } from "./kv.ts"; +import { + getDurationMs, + getFederationMetrics, + getRemoteHost, +} from "./metrics.ts"; import type { MessageQueue } from "./mq.ts"; import { acceptsJsonLd } from "./negotiation.ts"; import type { @@ -248,6 +255,7 @@ export class FederationImpl inboxRetryPolicy: RetryPolicy; activityTransformers: readonly ActivityTransformer[]; _tracerProvider: TracerProvider | undefined; + _meterProvider: MeterProvider | undefined; firstKnock?: HttpMessageSignaturesSpec; inboxChallengePolicy?: InboxChallengePolicy; @@ -395,6 +403,7 @@ export class FederationImpl this.activityTransformers = options.activityTransformers ?? getDefaultActivityTransformers(); this._tracerProvider = options.tracerProvider; + this._meterProvider = options.meterProvider; this.firstKnock = options.firstKnock; } @@ -402,6 +411,10 @@ export class FederationImpl return this._tracerProvider ?? trace.getTracerProvider(); } + get meterProvider(): MeterProvider { + return this._meterProvider ?? metrics.getMeterProvider(); + } + _initializeRouter(): void { this.router.add("/.well-known/webfinger", "webfinger"); this.router.add("/.well-known/nodeinfo", "nodeInfoJrd"); @@ -673,10 +686,37 @@ export class FederationImpl this.kvPrefixes.httpMessageSignaturesSpec, this.firstKnock, ), + meterProvider: this.meterProvider, tracerProvider: this.tracerProvider, }); } catch (error) { span.setStatus({ code: SpanStatusCode.ERROR, message: String(error) }); + const remoteHost = (() => { + if (error instanceof SendActivityError) { + return getRemoteHost(error.inbox); + } + try { + return getRemoteHost(new URL(message.inbox)); + } catch (_) { + logger.warn( + "Invalid inbox URL in queued outbox message: {inbox}", + logData, + ); + return undefined; + } + })(); + span.addEvent("activitypub.delivery.failed", { + ...(remoteHost == null + ? {} + : { "activitypub.remote.host": remoteHost }), + "activitypub.delivery.attempt": message.attempt, + "activitypub.delivery.permanent_failure": + error instanceof SendActivityError && + this.permanentFailureStatusCodes.includes(error.statusCode), + ...(error instanceof SendActivityError + ? { "http.response.status_code": error.statusCode } + : {}), + }); const loaderOptions = this.#getLoaderOptions(message.baseUrl); const activity = await Activity.fromJsonLd(message.activity, { contextLoader: this.contextLoaderFactory(loaderOptions), @@ -699,6 +739,10 @@ export class FederationImpl error instanceof SendActivityError && this.permanentFailureStatusCodes.includes(error.statusCode) ) { + getFederationMetrics(this.meterProvider).recordPermanentFailure( + error.inbox, + error.statusCode, + ); logger.warn( "Permanent delivery failure for activity {activityId} to " + "{inbox} ({status}); not retrying.", @@ -861,15 +905,25 @@ export class FederationImpl const { class: cls, listener } = dispatched; span.updateName(`activitypub.dispatch_inbox_listener ${cls.name}`); try { - await listener( - context.toInboxContext( - message.identifier, - message.activity, - activity.id?.href, - getTypeId(activity).href, - ), - activity, - ); + const activityType = getTypeId(activity).href; + const started = performance.now(); + try { + await listener( + context.toInboxContext( + message.identifier, + message.activity, + activity.id?.href, + activityType, + ), + activity, + ); + } finally { + getFederationMetrics(this.meterProvider) + .recordInboxProcessingDuration( + activityType, + getDurationMs(started), + ); + } } catch (error) { try { await this.inboxErrorHandler?.(context, error as Error); @@ -1194,6 +1248,7 @@ export class FederationImpl this.kvPrefixes.httpMessageSignaturesSpec, this.firstKnock, ), + meterProvider: this.meterProvider, tracerProvider: this.tracerProvider, }), ); @@ -1550,6 +1605,7 @@ export class FederationImpl signatureTimeWindow: this.signatureTimeWindow, skipSignatureVerification: this.skipSignatureVerification, inboxChallengePolicy: this.inboxChallengePolicy, + meterProvider: this.meterProvider, tracerProvider: this.tracerProvider, idempotencyStrategy: this.idempotencyStrategy, }); @@ -1785,6 +1841,10 @@ export class ContextImpl implements Context { return this.federation.tracerProvider; } + get meterProvider(): MeterProvider { + return this.federation.meterProvider; + } + getNodeInfoUri(): URL { const path = this.federation.router.build("nodeInfo", {}); if (path == null) { @@ -3062,6 +3122,7 @@ async function forwardActivityInternal( activityType: ctx.activityType, inbox: new URL(inbox), sharedInbox: inboxes[inbox].sharedInbox, + meterProvider: ctx.meterProvider, tracerProvider: ctx.tracerProvider, specDeterminer: new KvSpecDeterminer( ctx.federation.kv, diff --git a/packages/fedify/src/federation/send.test.ts b/packages/fedify/src/federation/send.test.ts index 55fc4d039..f6ae684d1 100644 --- a/packages/fedify/src/federation/send.test.ts +++ b/packages/fedify/src/federation/send.test.ts @@ -1,4 +1,5 @@ import { + createTestMeterProvider, createTestTracerProvider, mockDocumentLoader, test, @@ -16,10 +17,17 @@ import { assert, assertEquals, assertFalse, + assertGreaterOrEqual, assertInstanceOf, assertNotEquals, assertRejects, } from "@std/assert"; +import { + AggregationTemporality, + InMemoryMetricExporter, + MeterProvider, + PeriodicExportingMetricReader, +} from "@opentelemetry/sdk-metrics"; import fetchMock from "fetch-mock"; import { verifyRequest } from "../sig/http.ts"; import { doesActorOwnKey } from "../sig/owner.ts"; @@ -490,16 +498,190 @@ test("sendActivity() records OpenTelemetry span events", async (t) => { event.attributes["activitypub.activity.id"], "https://example.com/activity", ); - assert(typeof event.attributes["activitypub.activity.json"] === "string"); - - // Verify the JSON contains the activity - const recordedActivity = JSON.parse( - event.attributes["activitypub.activity.json"] as string, + assertEquals( + event.attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); + assertEquals( + event.attributes["activitypub.actor.id"], + "https://example.com/person", ); - assertEquals(recordedActivity.id, "https://example.com/activity"); - assertEquals(recordedActivity.type, "Create"); + assertEquals(event.attributes["activitypub.activity.json"], undefined); exporter.clear(); fetchMock.hardReset(); }); }); + +test("sendActivity() records OpenTelemetry delivery metrics", async (t) => { + const [meterProvider, recorder] = createTestMeterProvider(); + fetchMock.spyGlobal(); + + await t.step("successful send", async () => { + fetchMock.post("https://metrics.example:8443/inbox/path?x=1", { + status: 202, + }); + + const activity = { + "@context": "https://www.w3.org/ns/activitystreams", + type: "Create", + id: "https://example.com/activity", + actor: "https://example.com/person", + }; + + await sendActivity({ + activity, + activityId: "https://example.com/activity", + activityType: "https://www.w3.org/ns/activitystreams#Create", + keys: [], + inbox: new URL("https://metrics.example:8443/inbox/path?x=1"), + meterProvider, + }); + + const sent = recorder.getMeasurements("activitypub.delivery.sent"); + assertEquals(sent.length, 1); + assertEquals(sent[0].type, "counter"); + assertEquals(sent[0].value, 1); + assertEquals( + sent[0].attributes["activitypub.remote.host"], + "metrics.example", + ); + assertEquals( + sent[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); + assertEquals(sent[0].attributes["activitypub.delivery.success"], true); + + const durations = recorder.getMeasurements( + "activitypub.delivery.duration", + ); + assertEquals(durations.length, 1); + assertEquals(durations[0].type, "histogram"); + assertGreaterOrEqual(durations[0].value, 0); + assertEquals( + durations[0].attributes["activitypub.remote.host"], + "metrics.example", + ); + assertEquals( + durations[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); + assertEquals( + durations[0].attributes["activitypub.delivery.success"], + true, + ); + + recorder.clear(); + fetchMock.hardReset(); + }); + + await t.step("failed HTTP response", async () => { + fetchMock.spyGlobal(); + fetchMock.post("https://metrics.example/inbox", { + status: 500, + body: "failed", + }); + + const activity = { + "@context": "https://www.w3.org/ns/activitystreams", + type: "Follow", + id: "https://example.com/follow", + actor: "https://example.com/person", + }; + + await assertRejects( + () => + sendActivity({ + activity, + activityId: "https://example.com/follow", + activityType: "https://www.w3.org/ns/activitystreams#Follow", + keys: [], + inbox: new URL("https://metrics.example/inbox"), + meterProvider, + }), + SendActivityError, + ); + + const sent = recorder.getMeasurements("activitypub.delivery.sent"); + assertEquals(sent.length, 1); + assertEquals(sent[0].attributes["activitypub.delivery.success"], false); + assertEquals( + sent[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Follow", + ); + assertEquals( + sent[0].attributes["activitypub.remote.host"], + "metrics.example", + ); + + const durations = recorder.getMeasurements( + "activitypub.delivery.duration", + ); + assertEquals(durations.length, 1); + assertGreaterOrEqual(durations[0].value, 0); + assertEquals( + durations[0].attributes["activitypub.remote.host"], + "metrics.example", + ); + assertEquals( + durations[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Follow", + ); + assertEquals( + durations[0].attributes["activitypub.delivery.success"], + false, + ); + + recorder.clear(); + fetchMock.hardReset(); + }); +}); + +test("sendActivity() exports delivery metrics through OpenTelemetry SDK", async () => { + const exporter = new InMemoryMetricExporter( + AggregationTemporality.CUMULATIVE, + ); + const reader = new PeriodicExportingMetricReader({ + exporter, + exportIntervalMillis: 60_000, + }); + const meterProvider = new MeterProvider({ readers: [reader] }); + fetchMock.spyGlobal(); + fetchMock.post("https://sdk-metrics.example/inbox", { status: 202 }); + + try { + await sendActivity({ + activity: { + "@context": "https://www.w3.org/ns/activitystreams", + type: "Create", + id: "https://example.com/activity", + actor: "https://example.com/person", + }, + activityId: "https://example.com/activity", + activityType: "https://www.w3.org/ns/activitystreams#Create", + keys: [], + inbox: new URL("https://sdk-metrics.example/inbox"), + meterProvider, + }); + + await meterProvider.forceFlush(); + const exportedMetrics = exporter.getMetrics() + .flatMap((resourceMetrics) => resourceMetrics.scopeMetrics) + .flatMap((scopeMetrics) => scopeMetrics.metrics); + const sent = exportedMetrics.find((metric) => + metric.descriptor.name === "activitypub.delivery.sent" + ); + assert(sent != null); + assertEquals(sent.dataPoints.length, 1); + assertEquals( + sent.dataPoints[0].attributes["activitypub.remote.host"], + "sdk-metrics.example", + ); + } finally { + try { + await meterProvider.shutdown(); + } finally { + fetchMock.hardReset(); + } + } +}); diff --git a/packages/fedify/src/federation/send.ts b/packages/fedify/src/federation/send.ts index 4389a7a54..263af1de1 100644 --- a/packages/fedify/src/federation/send.ts +++ b/packages/fedify/src/federation/send.ts @@ -1,6 +1,8 @@ import type { Recipient } from "@fedify/vocab"; import { getLogger } from "@logtape/logtape"; import { + type Attributes, + type MeterProvider, type Span, SpanKind, SpanStatusCode, @@ -12,6 +14,7 @@ import { doubleKnock, type HttpMessageSignaturesSpecDeterminer, } from "../sig/http.ts"; +import { getDurationMs, getFederationMetrics } from "./metrics.ts"; /** * Parameters for {@link extractInboxes}. @@ -140,6 +143,13 @@ export interface SendActivityParameters { */ readonly specDeterminer?: HttpMessageSignaturesSpecDeterminer; + /** + * The meter provider for recording metrics. + * If omitted, the global meter provider is used. + * @since 2.3.0 + */ + readonly meterProvider?: MeterProvider; + /** * The tracer provider for tracing the request. * If omitted, the global tracer provider is used. @@ -189,6 +199,29 @@ export function sendActivity( const MAX_ERROR_RESPONSE_BODY_BYTES = 1024; +function getActivityActorId(activity: unknown): string | undefined { + if (!isRecord(activity)) return undefined; + return getIdValue(activity.actor); +} + +function getIdValue(value: unknown): string | undefined { + if (typeof value === "string" && value !== "") return value; + if (value instanceof URL) return value.href; + if (Array.isArray(value)) { + for (const item of value) { + const id = getIdValue(item); + if (id != null) return id; + } + return undefined; + } + if (isRecord(value)) return getIdValue(value.id); + return undefined; +} + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value != null; +} + async function readLimitedResponseBody( response: Response, maxBytes: number, @@ -232,15 +265,20 @@ async function sendActivityInternal( { activity, activityId, + activityType, keys, inbox, headers, specDeterminer, + meterProvider, tracerProvider, }: SendActivityParameters, span: Span, ): Promise { const logger = getLogger(["fedify", "federation", "outbox"]); + const federationMetrics = getFederationMetrics(meterProvider); + const started = performance.now(); + let deliverySuccess = false; headers = new Headers(headers); headers.set("Content-Type", "application/activity+json"); const request = new Request(inbox, { @@ -284,44 +322,68 @@ async function sendActivityInternal( error, }, ); + federationMetrics.recordDelivery( + inbox, + getDurationMs(started), + false, + activityType, + ); throw error; } - if (!response.ok) { - let error: string; - try { - error = await readLimitedResponseBody( - response, - MAX_ERROR_RESPONSE_BODY_BYTES, + try { + if (!response.ok) { + let error: string; + try { + error = await readLimitedResponseBody( + response, + MAX_ERROR_RESPONSE_BODY_BYTES, + ); + } catch (_) { + error = ""; + } + logger.error( + "Failed to send activity {activityId} to {inbox} ({status} " + + "{statusText}):\n{error}", + { + activityId, + inbox: inbox.href, + status: response.status, + statusText: response.statusText, + error, + }, ); - } catch (_) { - error = ""; - } - logger.error( - "Failed to send activity {activityId} to {inbox} ({status} " + - "{statusText}):\n{error}", - { - activityId, - inbox: inbox.href, - status: response.status, - statusText: response.statusText, + throw new SendActivityError( + inbox, + response.status, + `Failed to send activity ${activityId} to ${inbox.href} ` + + `(${response.status} ${response.statusText}):\n${error}`, error, - }, - ); - throw new SendActivityError( + ); + } + + deliverySuccess = true; + + // Record the sent activity with delivery details + const eventAttributes: Attributes = { + "activitypub.inbox.url": inbox.href, + "activitypub.activity.id": activityId ?? "", + }; + if (activityType != null) { + eventAttributes["activitypub.activity.type"] = activityType; + } + const actorId = getActivityActorId(activity); + if (actorId != null) { + eventAttributes["activitypub.actor.id"] = actorId; + } + span.addEvent("activitypub.activity.sent", eventAttributes); + } finally { + federationMetrics.recordDelivery( inbox, - response.status, - `Failed to send activity ${activityId} to ${inbox.href} ` + - `(${response.status} ${response.statusText}):\n${error}`, - error, + getDurationMs(started), + deliverySuccess, + activityType, ); } - - // Record the sent activity with delivery details - span.addEvent("activitypub.activity.sent", { - "activitypub.activity.json": JSON.stringify(activity), - "activitypub.inbox.url": inbox.href, - "activitypub.activity.id": activityId ?? "", - }); } /** diff --git a/packages/fedify/src/otel/exporter.test.ts b/packages/fedify/src/otel/exporter.test.ts index 2b583d108..d5ebd7cdf 100644 --- a/packages/fedify/src/otel/exporter.test.ts +++ b/packages/fedify/src/otel/exporter.test.ts @@ -1,5 +1,10 @@ import { test } from "@fedify/fixture"; -import type { HrTime, SpanContext, SpanStatus } from "@opentelemetry/api"; +import type { + Attributes, + HrTime, + SpanContext, + SpanStatus, +} from "@opentelemetry/api"; import { SpanKind, SpanStatusCode, TraceFlags } from "@opentelemetry/api"; import type { ReadableSpan, TimedEvent } from "@opentelemetry/sdk-trace-base"; import { assertEquals } from "@std/assert"; @@ -91,18 +96,29 @@ function createActivityReceivedEvent(options: { } function createActivitySentEvent(options: { - activityJson: string; + activityJson?: string; inboxUrl: string; activityId?: string; + activityType?: string; + actorId?: string; }): TimedEvent { + const attributes: Attributes = { + "activitypub.inbox.url": options.inboxUrl, + "activitypub.activity.id": options.activityId ?? "", + }; + if (options.activityType != null) { + attributes["activitypub.activity.type"] = options.activityType; + } + if (options.actorId != null) { + attributes["activitypub.actor.id"] = options.actorId; + } + if (options.activityJson != null) { + attributes["activitypub.activity.json"] = options.activityJson; + } return { name: "activitypub.activity.sent", time: [1700000000, 500000000] as HrTime, - attributes: { - "activitypub.activity.json": options.activityJson, - "activitypub.inbox.url": options.inboxUrl, - "activitypub.activity.id": options.activityId ?? "", - }, + attributes, }; } @@ -171,14 +187,9 @@ test("FedifySpanExporter", async (t) => { const traceId = "trace789"; const spanId = "span012"; const inboxUrl = "https://example.com/users/alice/inbox"; - const activity = { - "@context": "https://www.w3.org/ns/activitystreams", - type: "Follow", - id: "https://myserver.com/activities/789", - actor: "https://myserver.com/users/bob", - object: "https://example.com/users/alice", - }; - const activityJson = JSON.stringify(activity); + const activityId = "https://myserver.com/activities/789"; + const activityType = "https://www.w3.org/ns/activitystreams#Accept"; + const actorId = "https://myserver.com/users/bob"; const span = createMockSpan({ traceId, @@ -186,9 +197,10 @@ test("FedifySpanExporter", async (t) => { name: "activitypub.send_activity", events: [ createActivitySentEvent({ - activityJson, inboxUrl, - activityId: activity.id, + activityId, + activityType, + actorId, }), ], }); @@ -205,8 +217,10 @@ test("FedifySpanExporter", async (t) => { assertEquals(activities[0].traceId, traceId); assertEquals(activities[0].spanId, spanId); assertEquals(activities[0].direction, "outbound"); - assertEquals(activities[0].activityType, activity.type); - assertEquals(activities[0].activityId, activity.id); + assertEquals(activities[0].activityType, activityType); + assertEquals(activities[0].activityId, activityId); + assertEquals(activities[0].actorId, actorId); + assertEquals(activities[0].activityJson, undefined); assertEquals(activities[0].inboxUrl, inboxUrl); }, ); diff --git a/packages/fedify/src/otel/exporter.ts b/packages/fedify/src/otel/exporter.ts index 685ff0940..af7f0edf5 100644 --- a/packages/fedify/src/otel/exporter.ts +++ b/packages/fedify/src/otel/exporter.ts @@ -91,9 +91,10 @@ export interface TraceActivityRecord { readonly actorId?: string; /** - * The full JSON representation of the activity. + * The full JSON representation of the activity, if the span event included + * it. */ - readonly activityJson: string; + readonly activityJson?: string; /** * Whether the activity was verified (for inbound activities). @@ -406,43 +407,51 @@ export class FedifySpanExporter implements SpanExporter { if (attrs == null) return null; const activityJson = attrs["activitypub.activity.json"]; - if (typeof activityJson !== "string") return null; let activityType = "Unknown"; let activityId: string | undefined; let actorId: string | undefined; - try { - const activity = JSON.parse(activityJson); - activityType = activity.type ?? "Unknown"; - activityId = activity.id; - // Extract actor ID from activity - if (typeof activity.actor === "string") { - actorId = activity.actor; - } else if ( - activity.actor != null && typeof activity.actor.id === "string" - ) { - actorId = activity.actor.id; + if (typeof activityJson === "string") { + try { + const activity = JSON.parse(activityJson); + activityType = activity.type ?? "Unknown"; + activityId = activity.id; + // Extract actor ID from activity + if (typeof activity.actor === "string") { + actorId = activity.actor; + } else if ( + activity.actor != null && typeof activity.actor.id === "string" + ) { + actorId = activity.actor.id; + } + } catch { + // Ignore JSON parse errors } - } catch { - // Ignore JSON parse errors } const inboxUrl = attrs["activitypub.inbox.url"]; const explicitActivityId = attrs["activitypub.activity.id"]; + const explicitActivityType = attrs["activitypub.activity.type"]; + const explicitActorId = attrs["activitypub.actor.id"]; return { traceId, spanId, parentSpanId, direction: "outbound", - activityType, + activityType: + typeof explicitActivityType === "string" && explicitActivityType !== "" + ? explicitActivityType + : activityType, activityId: activityId ?? (typeof explicitActivityId === "string" && explicitActivityId !== "" ? explicitActivityId : undefined), - actorId, - activityJson, + actorId: typeof explicitActorId === "string" && explicitActorId !== "" + ? explicitActorId + : actorId, + ...(typeof activityJson === "string" ? { activityJson } : {}), timestamp: new Date( event.time[0] * 1000 + event.time[1] / 1e6, ).toISOString(), diff --git a/packages/fedify/src/testing/context.ts b/packages/fedify/src/testing/context.ts index e84dee5fa..8787d11ce 100644 --- a/packages/fedify/src/testing/context.ts +++ b/packages/fedify/src/testing/context.ts @@ -4,7 +4,7 @@ import { traverseCollection as globalTraverseCollection, } from "@fedify/vocab"; import { lookupWebFinger as globalLookupWebFinger } from "@fedify/webfinger"; -import { trace } from "@opentelemetry/api"; +import { metrics, trace } from "@opentelemetry/api"; import type { Context, InboxContext, @@ -28,6 +28,7 @@ export function createContext( data, documentLoader, contextLoader, + meterProvider, tracerProvider, clone, getNodeInfoUri, @@ -63,6 +64,7 @@ export function createContext( hostname: url.hostname, documentLoader: documentLoader ?? mockDocumentLoader, contextLoader: contextLoader ?? mockDocumentLoader, + meterProvider: meterProvider ?? metrics.getMeterProvider(), tracerProvider: tracerProvider ?? trace.getTracerProvider(), clone: clone ?? ((data) => createContext({ ...values, data })), getNodeInfoUri: getNodeInfoUri ?? throwRouterError, diff --git a/packages/fixture/package.json b/packages/fixture/package.json index 774677bb1..aecaaf131 100644 --- a/packages/fixture/package.json +++ b/packages/fixture/package.json @@ -34,6 +34,7 @@ "dependencies": { "@fedify/vocab-runtime": "workspace:^", "@logtape/logtape": "catalog:", + "@opentelemetry/api": "catalog:", "@opentelemetry/core": "catalog:", "@opentelemetry/sdk-trace-base": "catalog:" }, diff --git a/packages/fixture/src/otel.ts b/packages/fixture/src/otel.ts index f6cb48a04..680730bff 100644 --- a/packages/fixture/src/otel.ts +++ b/packages/fixture/src/otel.ts @@ -4,6 +4,25 @@ import { SimpleSpanProcessor, } from "@opentelemetry/sdk-trace-base"; import { ExportResultCode } from "@opentelemetry/core"; +import type { + Attributes, + BatchObservableCallback, + Context, + Counter, + Gauge, + Histogram, + Meter, + MeterOptions, + MeterProvider, + MetricAttributes, + MetricOptions, + Observable, + ObservableCallback, + ObservableCounter, + ObservableGauge, + ObservableUpDownCounter, + UpDownCounter, +} from "@opentelemetry/api"; /** * A test spy for OpenTelemetry spans that captures all spans and events. @@ -76,3 +95,238 @@ export function createTestTracerProvider(): [ }); return [provider, exporter]; } + +/** + * A metric measurement captured by {@link TestMetricRecorder}. + * @since 2.3.0 + */ +export interface TestMetricMeasurement { + /** + * The metric instrument name. + * @since 2.3.0 + */ + readonly name: string; + + /** + * The instrument type that recorded the measurement. + * @since 2.3.0 + */ + readonly type: "counter" | "histogram" | "gauge" | "upDownCounter"; + + /** + * The recorded metric value. + * @since 2.3.0 + */ + readonly value: number; + + /** + * The attributes recorded with the measurement. + * @since 2.3.0 + */ + readonly attributes: Attributes; +} + +/** + * A test recorder for OpenTelemetry metric measurements. + * @since 2.3.0 + */ +export class TestMetricRecorder { + /** + * The captured metric measurements. + * @since 2.3.0 + */ + public measurements: TestMetricMeasurement[] = []; + + /** + * Records a metric measurement. + * @since 2.3.0 + */ + record(measurement: TestMetricMeasurement): void { + this.measurements.push(measurement); + } + + /** + * Gets all measurements with the given metric name. + * @since 2.3.0 + */ + getMeasurements(name: string): TestMetricMeasurement[] { + return this.measurements.filter((measurement) => measurement.name === name); + } + + /** + * Gets the first measurement with the given metric name. + * @since 2.3.0 + */ + getMeasurement(name: string): TestMetricMeasurement | undefined { + return this.measurements.find((measurement) => measurement.name === name); + } + + /** + * Clears all captured measurements. + * @since 2.3.0 + */ + clear(): void { + this.measurements = []; + } +} + +class TestCounter + implements Counter { + constructor( + private readonly name: string, + private readonly recorder: TestMetricRecorder, + private readonly type: TestMetricMeasurement["type"] = "counter", + ) { + } + + add(value: number, attributes?: AttributesTypes, _context?: Context): void { + this.recorder.record({ + name: this.name, + type: this.type, + value, + attributes: { ...(attributes ?? {}) }, + }); + } +} + +class TestHistogram< + AttributesTypes extends MetricAttributes = MetricAttributes, +> implements Histogram, Gauge { + constructor( + private readonly name: string, + private readonly recorder: TestMetricRecorder, + private readonly type: TestMetricMeasurement["type"] = "histogram", + ) { + } + + record( + value: number, + attributes?: AttributesTypes, + _context?: Context, + ): void { + this.recorder.record({ + name: this.name, + type: this.type, + value, + attributes: { ...(attributes ?? {}) }, + }); + } +} + +class TestObservable< + AttributesTypes extends MetricAttributes = MetricAttributes, +> implements Observable { + readonly callbacks = new Set>(); + + addCallback(callback: ObservableCallback): void { + this.callbacks.add(callback); + } + + removeCallback(callback: ObservableCallback): void { + this.callbacks.delete(callback); + } +} + +class TestMeter implements Meter { + constructor(private readonly recorder: TestMetricRecorder) { + } + + createCounter( + name: string, + _options?: MetricOptions, + ): Counter { + return new TestCounter(name, this.recorder); + } + + createUpDownCounter< + AttributesTypes extends MetricAttributes = MetricAttributes, + >( + name: string, + _options?: MetricOptions, + ): UpDownCounter { + return new TestCounter(name, this.recorder, "upDownCounter"); + } + + createHistogram( + name: string, + _options?: MetricOptions, + ): Histogram { + return new TestHistogram(name, this.recorder); + } + + createGauge( + name: string, + _options?: MetricOptions, + ): Gauge { + return new TestHistogram(name, this.recorder, "gauge"); + } + + createObservableCounter< + AttributesTypes extends MetricAttributes = MetricAttributes, + >( + _name: string, + _options?: MetricOptions, + ): ObservableCounter { + return new TestObservable(); + } + + createObservableUpDownCounter< + AttributesTypes extends MetricAttributes = MetricAttributes, + >( + _name: string, + _options?: MetricOptions, + ): ObservableUpDownCounter { + return new TestObservable(); + } + + createObservableGauge< + AttributesTypes extends MetricAttributes = MetricAttributes, + >( + _name: string, + _options?: MetricOptions, + ): ObservableGauge { + return new TestObservable(); + } + + addBatchObservableCallback< + AttributesTypes extends MetricAttributes = MetricAttributes, + >( + _callback: BatchObservableCallback, + _observables: Observable[], + ): void { + } + + removeBatchObservableCallback< + AttributesTypes extends MetricAttributes = MetricAttributes, + >( + _callback: BatchObservableCallback, + _observables: Observable[], + ): void { + } +} + +class TestMeterProvider implements MeterProvider { + constructor(private readonly recorder: TestMetricRecorder) { + } + + getMeter( + _name: string, + _version?: string, + _options?: MeterOptions, + ): Meter { + return new TestMeter(this.recorder); + } +} + +/** + * Creates a test meter provider with a test recorder. + * @returns A tuple of [meterProvider, testRecorder]. + * @since 2.3.0 + */ +export function createTestMeterProvider(): [ + MeterProvider, + TestMetricRecorder, +] { + const recorder = new TestMetricRecorder(); + return [new TestMeterProvider(recorder), recorder]; +} diff --git a/packages/testing/src/context.ts b/packages/testing/src/context.ts index 7721577fe..a4771cf14 100644 --- a/packages/testing/src/context.ts +++ b/packages/testing/src/context.ts @@ -37,6 +37,29 @@ const noopTracerProvider: any = { }), }; +const noopMeterProvider: any = { + getMeter: () => ({ + createCounter: () => ({ add: () => undefined }), + createGauge: () => ({ record: () => undefined }), + createHistogram: () => ({ record: () => undefined }), + createObservableCounter: () => ({ + addCallback: () => undefined, + removeCallback: () => undefined, + }), + createObservableGauge: () => ({ + addCallback: () => undefined, + removeCallback: () => undefined, + }), + createObservableUpDownCounter: () => ({ + addCallback: () => undefined, + removeCallback: () => undefined, + }), + createUpDownCounter: () => ({ add: () => undefined }), + addBatchObservableCallback: () => undefined, + removeBatchObservableCallback: () => undefined, + }), +}; + // NOTE: Copied from @fedify/fedify/testing/context.ts // Not exported - used internally only. Public API is in mock.ts @@ -54,6 +77,7 @@ function createContext( data, documentLoader, contextLoader, + meterProvider, tracerProvider, clone, getNodeInfoUri, @@ -89,6 +113,7 @@ function createContext( hostname: url.hostname, documentLoader: documentLoader ?? mockDocumentLoader, contextLoader: contextLoader ?? mockDocumentLoader, + meterProvider: meterProvider ?? noopMeterProvider, tracerProvider: tracerProvider ?? noopTracerProvider, clone: clone ?? ((data) => createContext({ ...values, data })), getNodeInfoUri: getNodeInfoUri ?? throwRouterError, diff --git a/packages/testing/src/mock.ts b/packages/testing/src/mock.ts index c618cd0a2..271387c25 100644 --- a/packages/testing/src/mock.ts +++ b/packages/testing/src/mock.ts @@ -56,6 +56,29 @@ const noopTracerProvider: any = { }), }; +const noopMeterProvider: any = { + getMeter: () => ({ + createCounter: () => ({ add: () => undefined }), + createGauge: () => ({ record: () => undefined }), + createHistogram: () => ({ record: () => undefined }), + createObservableCounter: () => ({ + addCallback: () => undefined, + removeCallback: () => undefined, + }), + createObservableGauge: () => ({ + addCallback: () => undefined, + removeCallback: () => undefined, + }), + createObservableUpDownCounter: () => ({ + addCallback: () => undefined, + removeCallback: () => undefined, + }), + createUpDownCounter: () => ({ add: () => undefined }), + addBatchObservableCallback: () => undefined, + removeBatchObservableCallback: () => undefined, + }), +}; + /** * Helper function to expand URI templates used by the mock. * Supports the RFC 6570 operators accepted by Fedify's identifier paths. @@ -282,6 +305,7 @@ class MockFederation implements Federation { private options: { contextData?: TContextData; origin?: string; + meterProvider?: any; tracerProvider?: any; } = {}, ) { @@ -513,6 +537,8 @@ class MockFederation implements Federation { request, data: contextData, federation: mockFederation as any, + meterProvider: this.options.meterProvider, + tracerProvider: this.options.tracerProvider, }); } @@ -784,6 +810,11 @@ export function createFederation( options: { contextData?: TContextData; origin?: string; + /** + * The OpenTelemetry meter provider to expose from mock contexts. + * @since 2.3.0 + */ + meterProvider?: any; tracerProvider?: any; } = {}, ): TestFederation { @@ -842,6 +873,7 @@ class MockContext implements Context { readonly federation: Federation; readonly documentLoader: DocumentLoader; readonly contextLoader: DocumentLoader; + readonly meterProvider: any; readonly tracerProvider: any; readonly request: Request; readonly url: URL; @@ -861,6 +893,7 @@ class MockContext implements Context { federation: Federation; documentLoader?: DocumentLoader; contextLoader?: DocumentLoader; + meterProvider?: any; tracerProvider?: any; }, ) { @@ -880,6 +913,7 @@ class MockContext implements Context { documentUrl: url, })); this.contextLoader = options.contextLoader ?? this.documentLoader; + this.meterProvider = options.meterProvider ?? noopMeterProvider; this.tracerProvider = options.tracerProvider ?? noopTracerProvider; } @@ -928,6 +962,7 @@ class MockContext implements Context { federation: this.federation, documentLoader: this.documentLoader, contextLoader: this.contextLoader, + meterProvider: this.meterProvider, tracerProvider: this.tracerProvider, }); } diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 2da678335..9b27433ed 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -45,6 +45,9 @@ catalogs: '@opentelemetry/exporter-trace-otlp-proto': specifier: ^0.211.0 version: 0.211.0 + '@opentelemetry/sdk-metrics': + specifier: 2.5.0 + version: 2.5.0 '@opentelemetry/sdk-node': specifier: ^0.211.0 version: 0.211.0 @@ -1201,6 +1204,9 @@ importers: '@fedify/vocab-tools': specifier: workspace:^ version: link:../vocab-tools + '@opentelemetry/sdk-metrics': + specifier: 'catalog:' + version: 2.5.0(@opentelemetry/api@1.9.0) '@std/assert': specifier: jsr:^0.226.0 version: '@jsr/std__assert@0.226.0' @@ -1237,6 +1243,9 @@ importers: '@logtape/logtape': specifier: 'catalog:' version: 2.0.5 + '@opentelemetry/api': + specifier: 'catalog:' + version: 1.9.0 '@opentelemetry/core': specifier: 'catalog:' version: 2.5.0(@opentelemetry/api@1.9.0) diff --git a/pnpm-workspace.yaml b/pnpm-workspace.yaml index 657564a6d..394778464 100644 --- a/pnpm-workspace.yaml +++ b/pnpm-workspace.yaml @@ -56,6 +56,7 @@ catalog: "@opentelemetry/context-async-hooks": ^2.5.0 "@opentelemetry/core": ^2.5.0 "@opentelemetry/sdk-node": ^0.211.0 + "@opentelemetry/sdk-metrics": 2.5.0 "@opentelemetry/sdk-trace-base": ^2.5.0 "@opentelemetry/semantic-conventions": ^1.39.0 "@optique/config": ^1.0.2