From 0790c97ebe6d799d7f6a58e73c3ede2005478970 Mon Sep 17 00:00:00 2001 From: Elliot Hesp Date: Wed, 25 Feb 2026 14:36:07 +0000 Subject: [PATCH] feat(js): add open telementry fetch provider --- js/core/src/config.ts | 13 ++ js/core/src/node.ts | 26 ++++ js/core/src/tracing.ts | 17 +++ .../src/tracing/fetch-telemetry-provider.ts | 127 ++++++++++++++++++ js/core/tests/action_test.ts | 3 +- js/core/tests/dynamic-action-provider_test.ts | 3 +- .../tests/fetch_telemetry_provider_test.ts | 103 ++++++++++++++ js/core/tests/flow_test.ts | 3 +- js/core/tests/registry_test.ts | 3 +- js/genkit/src/tracing.ts | 2 + 10 files changed, 296 insertions(+), 4 deletions(-) create mode 100644 js/core/src/tracing/fetch-telemetry-provider.ts create mode 100644 js/core/tests/fetch_telemetry_provider_test.ts diff --git a/js/core/src/config.ts b/js/core/src/config.ts index 43a68d875e..2ad92d5c1b 100644 --- a/js/core/src/config.ts +++ b/js/core/src/config.ts @@ -15,6 +15,7 @@ */ import { logger } from './logging'; +import type { TelemetryProvider } from './tracing.js'; const CONFIG_KEY = '__GENKIT_RUNTIME_CONFIG__'; @@ -34,6 +35,17 @@ export interface GenkitRuntimeConfig { * If true, features that require access to the file system or spawning processes (like the Reflection API) will be disabled. */ sandboxedRuntime?: boolean; + + /** + * Optional custom telemetry provider. + * When set, this provider is used instead of the default Node.js OpenTelemetry setup. + * Use this in worker-type environments (e.g. Cloudflare Workers) where Node SDK is not + * available; use a fetch-compatible provider such as {@link FetchTelemetryProvider}. + * + * Must be set before importing flows/actions (e.g. call setGenkitRuntimeConfig before + * any other genkit imports). + */ + telemetry?: TelemetryProvider; } function getConfig(): GenkitRuntimeConfig { @@ -64,6 +76,7 @@ export function getGenkitRuntimeConfig(): GenkitRuntimeConfig { return { jsonSchemaMode: config.jsonSchemaMode ?? 'compile', sandboxedRuntime: config.sandboxedRuntime ?? false, + telemetry: config.telemetry, }; } diff --git a/js/core/src/node.ts b/js/core/src/node.ts index 647acd1da4..bf0f33a430 100644 --- a/js/core/src/node.ts +++ b/js/core/src/node.ts @@ -14,10 +14,36 @@ * limitations under the License. */ +import { getGenkitRuntimeConfig } from './config.js'; import { initNodeAsyncContext } from './node-async-context.js'; +import { + setTelemetryProvider, + setTelemetryProviderInitializer, +} from './tracing.js'; import { initNodeTelemetryProvider } from './tracing/node-telemetry-provider.js'; +export { initNodeTelemetryProvider }; + +/** + * Node-specific runtime setup (e.g. async_hooks for context propagation). + */ export function initNodeFeatures() { initNodeAsyncContext(); +} + +/** + * Ensures a telemetry provider is set: uses config.telemetry if set, + * otherwise the default Node provider. Called lazily when telemetry is first needed. + */ +function ensureTelemetryProvider() { + const config = getGenkitRuntimeConfig(); + if (config.telemetry) { + setTelemetryProvider(config.telemetry); + return; + } initNodeTelemetryProvider(); } + +// Register so that any code path that loads core/node gets the initializer. +// This ensures telemetry works even when reflection or other core code runs before the genkit package has finished loading. +setTelemetryProviderInitializer(ensureTelemetryProvider); diff --git a/js/core/src/tracing.ts b/js/core/src/tracing.ts index 48886f08e0..10ecf2f819 100644 --- a/js/core/src/tracing.ts +++ b/js/core/src/tracing.ts @@ -19,6 +19,7 @@ import { logger } from './logging.js'; import type { TelemetryConfig } from './telemetryTypes.js'; export * from './tracing/exporter.js'; +export * from './tracing/fetch-telemetry-provider.js'; export * from './tracing/instrumentation.js'; export * from './tracing/types.js'; @@ -26,6 +27,18 @@ const oTelInitializationKey = '__GENKIT_DISABLE_GENKIT_OTEL_INITIALIZATION'; const instrumentationKey = '__GENKIT_TELEMETRY_INSTRUMENTED'; const telemetryProviderKey = '__GENKIT_TELEMETRY_PROVIDER'; +let telemetryProviderInitializer: (() => void) | null = null; + +/** + * Registers a function to run when the telemetry provider is first needed. + * This allows config (e.g. custom telemetry) to be set after importing genkit + * and still take effect. Called by the genkit package on load. + * @hidden + */ +export function setTelemetryProviderInitializer(fn: () => void) { + telemetryProviderInitializer = fn; +} + /** * @hidden */ @@ -81,6 +94,10 @@ function getTelemetryProvider(): TelemetryProvider { if (global[telemetryProviderKey]) { return global[telemetryProviderKey]; } + if (telemetryProviderInitializer) { + telemetryProviderInitializer(); + return global[telemetryProviderKey]; + } throw new GenkitError({ status: 'FAILED_PRECONDITION', message: 'TelemetryProvider is not initialized.', diff --git a/js/core/src/tracing/fetch-telemetry-provider.ts b/js/core/src/tracing/fetch-telemetry-provider.ts new file mode 100644 index 0000000000..7eb91ac352 --- /dev/null +++ b/js/core/src/tracing/fetch-telemetry-provider.ts @@ -0,0 +1,127 @@ +/** + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { context, trace } from '@opentelemetry/api'; +import { AsyncLocalStorageContextManager } from '@opentelemetry/context-async-hooks'; +import { + BasicTracerProvider, + BatchSpanProcessor, + SimpleSpanProcessor, + type SpanProcessor, +} from '@opentelemetry/sdk-trace-base'; +import type { TelemetryConfig } from '../telemetryTypes.js'; +import type { TelemetryProvider } from '../tracing.js'; +import { TraceServerExporter, setTelemetryServerUrl } from './exporter.js'; + +/** + * Options for the fetch-compatible telemetry provider. + * Uses only fetch and standard APIs (no Node.js SDK), suitable for workers. + */ +export interface FetchTelemetryProviderOptions { + /** + * URL of the Genkit telemetry server (e.g. from dev UI or your own collector). + * If not set, GENKIT_TELEMETRY_SERVER env var is used when enableTelemetry runs. + */ + serverUrl?: string; + /** + * If true, use SimpleSpanProcessor for more real-time export (e.g. in dev). + * Default: false (BatchSpanProcessor for production). + */ + realtime?: boolean; +} + +/** + * Telemetry provider that uses only fetch-compatible APIs. + * Use this in worker environments (Cloudflare Workers, Vercel Edge, etc.) where + * the Node.js OpenTelemetry SDK is not available. + * + * When the runtime has AsyncLocalStorage (e.g. Cloudflare Workers with `nodejs_compat`), + * OTel context propagation is enabled so parent-child span links are preserved across + * async boundaries and traces are fully hierarchical. + * + * Set via runtime config before any other genkit imports: + * + * @example + * ```ts + * import { setGenkitRuntimeConfig } from 'genkit'; + * import { FetchTelemetryProvider } from 'genkit/tracing'; + * + * setGenkitRuntimeConfig({ + * jsonSchemaMode: 'interpret', + * sandboxedRuntime: true, + * telemetry: new FetchTelemetryProvider({ + * serverUrl: 'https://your-telemetry-server.example.com', + * }), + * }); + * + * // Then import flows/actions + * import { flow, run } from 'genkit'; + * ``` + */ +export class FetchTelemetryProvider implements TelemetryProvider { + private readonly options: FetchTelemetryProviderOptions; + private spanProcessors: SpanProcessor[] = []; + + constructor(options: FetchTelemetryProviderOptions = {}) { + this.options = options; + } + + async enableTelemetry( + telemetryConfig: TelemetryConfig | Promise + ): Promise { + const config = + telemetryConfig instanceof Promise + ? await telemetryConfig + : telemetryConfig; + const serverUrl = + this.options.serverUrl ?? + (typeof process !== 'undefined' && process.env?.GENKIT_TELEMETRY_SERVER); + if (typeof serverUrl === 'string') { + setTelemetryServerUrl(serverUrl); + } + + const exporter = new TraceServerExporter(); + const processor: SpanProcessor = this.options.realtime + ? new SimpleSpanProcessor(exporter) + : new BatchSpanProcessor(exporter); + + this.spanProcessors = [processor]; + + if (config?.spanProcessors?.length) { + this.spanProcessors.push(...config.spanProcessors); + } + + // When AsyncLocalStorage is available (e.g. Node, Cloudflare Workers with nodejs_compat), + // enable OTel context propagation so parent-child span links work across await. + try { + const contextManager = new AsyncLocalStorageContextManager(); + contextManager.enable(); + context.setGlobalContextManager(contextManager); + } catch { + // No async_hooks / AsyncLocalStorage (e.g. some edge runtimes); spans still work, hierarchy may be flat. + } + + const provider = new BasicTracerProvider(); + for (const p of this.spanProcessors) { + provider.addSpanProcessor(p); + } + trace.setGlobalTracerProvider(provider); + } + + async flushTracing(): Promise { + await Promise.all(this.spanProcessors.map((p) => p.forceFlush())); + } +} diff --git a/js/core/tests/action_test.ts b/js/core/tests/action_test.ts index 841cdd72cd..75257bd72a 100644 --- a/js/core/tests/action_test.ts +++ b/js/core/tests/action_test.ts @@ -18,10 +18,11 @@ import * as assert from 'assert'; import { beforeEach, describe, it } from 'node:test'; import { z } from 'zod'; import { action, defineAction } from '../src/action.js'; -import { initNodeFeatures } from '../src/node.js'; +import { initNodeFeatures, initNodeTelemetryProvider } from '../src/node.js'; import { Registry } from '../src/registry.js'; initNodeFeatures(); +initNodeTelemetryProvider(); describe('action', () => { var registry: Registry; diff --git a/js/core/tests/dynamic-action-provider_test.ts b/js/core/tests/dynamic-action-provider_test.ts index 80ed0ce181..12ff07bc49 100644 --- a/js/core/tests/dynamic-action-provider_test.ts +++ b/js/core/tests/dynamic-action-provider_test.ts @@ -23,10 +23,11 @@ import { defineDynamicActionProvider, isDynamicActionProvider, } from '../src/dynamic-action-provider.js'; -import { initNodeFeatures } from '../src/node.js'; +import { initNodeFeatures, initNodeTelemetryProvider } from '../src/node.js'; import { Registry } from '../src/registry.js'; initNodeFeatures(); +initNodeTelemetryProvider(); describe('dynamic action provider', () => { let registry: Registry; diff --git a/js/core/tests/fetch_telemetry_provider_test.ts b/js/core/tests/fetch_telemetry_provider_test.ts new file mode 100644 index 0000000000..46f0cca858 --- /dev/null +++ b/js/core/tests/fetch_telemetry_provider_test.ts @@ -0,0 +1,103 @@ +/** + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Run just this file: + * npm test -- --test-name-pattern FetchTelemetryProvider + * Or from repo js/core: + * node --import tsx --test tests/fetch_telemetry_provider_test.ts + * + * Manual test with dev UI: set GENKIT_TELEMETRY_SERVER to your telemetry server URL + * (e.g. from `genkit start`), use FetchTelemetryProvider in your app, run a flow, + * then open the dev UI and confirm the trace appears. + */ + +import { trace } from '@opentelemetry/api'; +import * as assert from 'node:assert'; +import { afterEach, beforeEach, describe, it } from 'node:test'; +import { FetchTelemetryProvider } from '../src/tracing/fetch-telemetry-provider.js'; +import { sleep } from './utils.js'; + +describe('FetchTelemetryProvider', () => { + const capturedRequests: { url: string; body: unknown }[] = []; + let originalFetch: typeof globalThis.fetch; + + beforeEach(() => { + capturedRequests.length = 0; + originalFetch = globalThis.fetch; + globalThis.fetch = async ( + url: string | URL | Request, + init?: RequestInit + ) => { + const req = url instanceof Request ? url : new Request(url, init); + const urlStr = req.url; + if (urlStr.includes('/api/traces')) { + const body = + req.method === 'POST' && req.body + ? JSON.parse(await req.text()) + : undefined; + capturedRequests.push({ url: urlStr, body }); + return new Response('OK', { status: 200 }); + } + return originalFetch(url, init); + }; + }); + + afterEach(() => { + globalThis.fetch = originalFetch; + }); + + it('enables telemetry and exports spans via fetch when serverUrl is set', async () => { + const serverUrl = 'http://telemetry.test'; + const provider = new FetchTelemetryProvider({ + serverUrl, + realtime: true, // SimpleSpanProcessor so export happens on span end + }); + + await provider.enableTelemetry({}); + + const tracer = trace.getTracer('test', '1.0'); + tracer.startActiveSpan('testSpan', {}, (span) => { + span.end(); + }); + + await provider.flushTracing(); + // Export is async (fire-and-forget from processor); poll for the request. + for (let i = 0; i < 50; i++) { + if (capturedRequests.length >= 1) break; + await sleep(20); + } + assert.ok( + capturedRequests.length >= 1, + `expected at least 1 request to /api/traces, got ${capturedRequests.length}` + ); + const last = capturedRequests[capturedRequests.length - 1]; + assert.ok( + last.url.startsWith(serverUrl), + `url should start with serverUrl: ${last.url}` + ); + assert.ok( + last.url.endsWith('/api/traces'), + `url should end with /api/traces: ${last.url}` + ); + assert.strictEqual(typeof (last.body as any)?.traceId, 'string'); + assert.ok(typeof (last.body as any)?.spans === 'object'); + }); + + it('enableTelemetry does not throw when serverUrl is omitted', async () => { + const provider = new FetchTelemetryProvider({}); + await assert.doesNotReject(provider.enableTelemetry({})); + await assert.doesNotReject(provider.flushTracing()); + }); +}); diff --git a/js/core/tests/flow_test.ts b/js/core/tests/flow_test.ts index d6af65f60a..4ec91b835c 100644 --- a/js/core/tests/flow_test.ts +++ b/js/core/tests/flow_test.ts @@ -19,12 +19,13 @@ import * as assert from 'assert'; import { beforeEach, describe, it } from 'node:test'; import { defineFlow, run } from '../src/flow.js'; import { defineAction, getContext, z } from '../src/index.js'; -import { initNodeFeatures } from '../src/node.js'; +import { initNodeFeatures, initNodeTelemetryProvider } from '../src/node.js'; import { Registry } from '../src/registry.js'; import { enableTelemetry } from '../src/tracing.js'; import { TestSpanExporter } from './utils.js'; initNodeFeatures(); +initNodeTelemetryProvider(); const spanExporter = new TestSpanExporter(); enableTelemetry({ diff --git a/js/core/tests/registry_test.ts b/js/core/tests/registry_test.ts index a0b902433f..2257bb1438 100644 --- a/js/core/tests/registry_test.ts +++ b/js/core/tests/registry_test.ts @@ -22,10 +22,11 @@ import { runInActionRuntimeContext, } from '../src/action.js'; import { defineDynamicActionProvider } from '../src/dynamic-action-provider.js'; -import { initNodeFeatures } from '../src/node.js'; +import { initNodeFeatures, initNodeTelemetryProvider } from '../src/node.js'; import { Registry } from '../src/registry.js'; initNodeFeatures(); +initNodeTelemetryProvider(); describe('registry class', () => { var registry: Registry; diff --git a/js/genkit/src/tracing.ts b/js/genkit/src/tracing.ts index a616383389..8c1c57aa2b 100644 --- a/js/genkit/src/tracing.ts +++ b/js/genkit/src/tracing.ts @@ -15,6 +15,7 @@ */ export { + FetchTelemetryProvider, SPAN_TYPE_ATTR, SpanContextSchema, SpanDataSchema, @@ -34,6 +35,7 @@ export { setCustomMetadataAttributes, setTelemetryServerUrl, toDisplayPath, + type FetchTelemetryProviderOptions, type PathMetadata, type SpanData, type SpanMetadata,