Skip to content

Commit 849561a

Browse files
committed
Move metrics sending to the event repository
1 parent 6996dfb commit 849561a

File tree

5 files changed

+107
-36
lines changed

5 files changed

+107
-36
lines changed

apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,9 @@ function buildEventRepository(store: string, clickhouse: ClickHouse): Clickhouse
324324
llmMetricsFlushInterval: env.LLM_METRICS_FLUSH_INTERVAL_MS,
325325
llmMetricsMaxBatchSize: env.LLM_METRICS_MAX_BATCH_SIZE,
326326
llmMetricsMaxConcurrency: env.LLM_METRICS_MAX_CONCURRENCY,
327+
otlpMetricsBatchSize: env.METRICS_CLICKHOUSE_BATCH_SIZE,
328+
otlpMetricsFlushInterval: env.METRICS_CLICKHOUSE_FLUSH_INTERVAL_MS,
329+
otlpMetricsMaxConcurrency: env.METRICS_CLICKHOUSE_MAX_CONCURRENCY,
327330
version: "v1",
328331
});
329332
}
@@ -344,6 +347,9 @@ function buildEventRepository(store: string, clickhouse: ClickHouse): Clickhouse
344347
llmMetricsFlushInterval: env.LLM_METRICS_FLUSH_INTERVAL_MS,
345348
llmMetricsMaxBatchSize: env.LLM_METRICS_MAX_BATCH_SIZE,
346349
llmMetricsMaxConcurrency: env.LLM_METRICS_MAX_CONCURRENCY,
350+
otlpMetricsBatchSize: env.METRICS_CLICKHOUSE_BATCH_SIZE,
351+
otlpMetricsFlushInterval: env.METRICS_CLICKHOUSE_FLUSH_INTERVAL_MS,
352+
otlpMetricsMaxConcurrency: env.METRICS_CLICKHOUSE_MAX_CONCURRENCY,
347353
version: "v2",
348354
});
349355
}

apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type {
22
ClickHouse,
33
LlmMetricsV1Input,
4+
MetricsV1Input,
45
TaskEventDetailedSummaryV1Result,
56
TaskEventDetailsV1Result,
67
TaskEventSummaryV1Result,
@@ -91,6 +92,10 @@ export type ClickhouseEventRepositoryConfig = {
9192
llmMetricsFlushInterval?: number;
9293
llmMetricsMaxBatchSize?: number;
9394
llmMetricsMaxConcurrency?: number;
95+
/** OTLP / task metrics_v1 flush scheduler config */
96+
otlpMetricsBatchSize?: number;
97+
otlpMetricsFlushInterval?: number;
98+
otlpMetricsMaxConcurrency?: number;
9499
};
95100

96101
/**
@@ -102,6 +107,7 @@ export class ClickhouseEventRepository implements IEventRepository {
102107
private _config: ClickhouseEventRepositoryConfig;
103108
private readonly _flushScheduler: DynamicFlushScheduler<TaskEventV1Input | TaskEventV2Input>;
104109
private readonly _llmMetricsFlushScheduler: DynamicFlushScheduler<LlmMetricsV1Input>;
110+
private readonly _otlpMetricsFlushScheduler: DynamicFlushScheduler<MetricsV1Input>;
105111
private _tracer: Tracer;
106112
private _version: "v1" | "v2";
107113

@@ -137,6 +143,15 @@ export class ClickhouseEventRepository implements IEventRepository {
137143
memoryPressureThreshold: config.llmMetricsMaxBatchSize ?? 10000,
138144
loadSheddingEnabled: false,
139145
});
146+
147+
this._otlpMetricsFlushScheduler = new DynamicFlushScheduler({
148+
batchSize: config.otlpMetricsBatchSize ?? 10000,
149+
flushInterval: config.otlpMetricsFlushInterval ?? 1000,
150+
callback: this.#flushOtelMetricsBatch.bind(this),
151+
minConcurrency: 1,
152+
maxConcurrency: config.otlpMetricsMaxConcurrency ?? 3,
153+
loadSheddingEnabled: false,
154+
});
140155
}
141156

142157
get version() {
@@ -251,6 +266,27 @@ export class ClickhouseEventRepository implements IEventRepository {
251266
});
252267
}
253268

269+
async #flushOtelMetricsBatch(flushId: string, rows: MetricsV1Input[]) {
270+
await startSpan(this._tracer, "flushOtelMetricsBatch", async (span) => {
271+
span.setAttribute("flush_id", flushId);
272+
span.setAttribute("row_count", rows.length);
273+
274+
const [insertError] = await this._clickhouse.metrics.insert(rows, {
275+
params: {
276+
clickhouse_settings: this.#getClickhouseInsertSettings(),
277+
},
278+
});
279+
280+
if (insertError) {
281+
throw insertError;
282+
}
283+
284+
logger.info("ClickhouseEventRepository.flushOtelMetricsBatch Inserted OTLP metrics batch", {
285+
rows: rows.length,
286+
});
287+
});
288+
}
289+
254290
#createLlmMetricsInput(event: CreateEventInput): LlmMetricsV1Input {
255291
const llmMetrics = event._llmMetrics!;
256292

@@ -326,6 +362,11 @@ export class ClickhouseEventRepository implements IEventRepository {
326362
this.insertMany(events);
327363
}
328364

365+
insertManyMetrics(rows: MetricsV1Input[]): void {
366+
if (rows.length === 0) return;
367+
this._otlpMetricsFlushScheduler.addToBatch(rows);
368+
}
369+
329370
private createEventToTaskEventV1Input(event: CreateEventInput): TaskEventV1Input[] {
330371
return [
331372
{

apps/webapp/app/v3/eventRepository/eventRepository.server.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
unflattenAttributes,
1919
} from "@trigger.dev/core/v3";
2020
import { serializeTraceparent } from "@trigger.dev/core/v3/isomorphic";
21+
import type { MetricsV1Input } from "@internal/clickhouse";
2122
import { Prisma, TaskEvent, TaskEventKind } from "@trigger.dev/database";
2223
import { nanoid } from "nanoid";
2324
import { Gauge } from "prom-client";
@@ -159,6 +160,8 @@ export class EventRepository implements IEventRepository {
159160
await this.#flushBatchWithReturn(nanoid(), events.map(this.#createableEventToPrismaEvent));
160161
}
161162

163+
insertManyMetrics(_rows: MetricsV1Input[]): void {}
164+
162165
async completeSuccessfulRunEvent({ run, endTime }: { run: CompleteableTaskRun; endTime?: Date }) {
163166
const startTime = convertDateToNanoseconds(run.createdAt);
164167

apps/webapp/app/v3/eventRepository/eventRepository.types.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import type {
1414
TaskEventStatus,
1515
TaskRun,
1616
} from "@trigger.dev/database";
17+
import type { MetricsV1Input } from "@internal/clickhouse";
1718
import type { DetailedTraceEvent, TaskEventStoreTable } from "../taskEventStore.server";
1819
export type { ExceptionEventProperties };
1920

@@ -347,6 +348,7 @@ export interface IEventRepository {
347348
// Event insertion methods
348349
insertMany(events: CreateEventInput[]): void;
349350
insertManyImmediate(events: CreateEventInput[]): Promise<void>;
351+
insertManyMetrics(rows: MetricsV1Input[]): void;
350352

351353
// Run event completion methods
352354
completeSuccessfulRunEvent(params: { run: CompleteableTaskRun; endTime?: Date }): Promise<void>;

apps/webapp/app/v3/otlpExporter.server.ts

Lines changed: 55 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,8 @@ import {
1919
Status_StatusCode,
2020
} from "@trigger.dev/otlp-importer";
2121
import type { MetricsV1Input } from "@internal/clickhouse";
22-
import { ClickHouse } from "@internal/clickhouse";
2322
import { logger } from "~/services/logger.server";
24-
import {
25-
clickhouseFactory,
26-
ClickhouseFactory,
27-
getDefaultClickhouseClient,
28-
} from "~/services/clickhouse/clickhouseFactory.server";
29-
import { DynamicFlushScheduler } from "./dynamicFlushScheduler.server";
23+
import { clickhouseFactory, type ClickhouseFactory } from "~/services/clickhouse/clickhouseFactory.server";
3024

3125
import { generateSpanId } from "./eventRepository/common.server";
3226
import type {
@@ -43,19 +37,13 @@ import { singleton } from "~/utils/singleton";
4337

4438
type OTLPExporterConfig = {
4539
clickhouseFactory: ClickhouseFactory;
46-
metrics: {
47-
batchSize: number;
48-
flushInterval: number;
49-
maxConcurrency: number;
50-
};
5140
verbose: boolean;
5241
spanAttributeValueLengthLimit: number;
5342
};
5443

5544
class OTLPExporter {
5645
private _tracer: Tracer;
5746
private readonly _clickhouseFactory: ClickhouseFactory;
58-
private readonly _defaultMetricsFlushScheduler: DynamicFlushScheduler<MetricsV1Input>;
5947
private readonly _verbose: boolean;
6048
private readonly _spanAttributeValueLengthLimit: number;
6149

@@ -64,16 +52,6 @@ class OTLPExporter {
6452
this._clickhouseFactory = config.clickhouseFactory;
6553
this._verbose = config.verbose;
6654
this._spanAttributeValueLengthLimit = config.spanAttributeValueLengthLimit;
67-
this._defaultMetricsFlushScheduler = new DynamicFlushScheduler<MetricsV1Input>({
68-
batchSize: config.metrics.batchSize,
69-
flushInterval: config.metrics.flushInterval,
70-
callback: async (_flushId, batch) => {
71-
await config.metrics.clickhouse.metrics.insert(batch);
72-
},
73-
minConcurrency: 1,
74-
maxConcurrency: config.metrics.maxConcurrency,
75-
loadSheddingEnabled: false,
76-
});
7755
}
7856

7957
async exportTraces(request: ExportTraceServiceRequest): Promise<ExportTraceServiceResponse> {
@@ -96,19 +74,19 @@ class OTLPExporter {
9674

9775
async exportMetrics(request: ExportMetricsServiceRequest): Promise<ExportMetricsServiceResponse> {
9876
return await startSpan(this._tracer, "exportMetrics", async (span) => {
99-
const rows = this.#filterResourceMetrics(request.resourceMetrics).flatMap(
100-
(resourceMetrics) => {
101-
return convertMetricsToClickhouseRows(
77+
const metricsWithStores = this.#filterResourceMetrics(request.resourceMetrics).map(
78+
(resourceMetrics) =>
79+
convertResourceMetricsToRowsWithStore(
10280
resourceMetrics,
10381
this._spanAttributeValueLengthLimit
104-
);
105-
}
82+
)
10683
);
10784

108-
span.setAttribute("metric_row_count", rows.length);
85+
const rowCount = metricsWithStores.reduce((acc, m) => acc + m.rows.length, 0);
86+
span.setAttribute("metric_row_count", rowCount);
10987

110-
if (rows.length > 0) {
111-
this._defaultMetricsFlushScheduler.addToBatch(rows);
88+
if (rowCount > 0) {
89+
await this.#exportMetricRows(metricsWithStores);
11290
}
11391

11492
return ExportMetricsServiceResponse.create();
@@ -177,6 +155,37 @@ class OTLPExporter {
177155
return eventCount;
178156
}
179157

158+
async #exportMetricRows(
159+
metricsWithStores: { rows: MetricsV1Input[]; taskEventStore: string }[]
160+
): Promise<void> {
161+
const routeCache = new Map<string, { key: string; repository: IEventRepository }>();
162+
const groups = new Map<string, { repository: IEventRepository; rows: MetricsV1Input[] }>();
163+
for (const { rows, taskEventStore } of metricsWithStores) {
164+
for (const row of rows) {
165+
const routeKey = `${row.organization_id}\0${taskEventStore}`;
166+
let resolved = routeCache.get(routeKey);
167+
if (!resolved) {
168+
resolved = this._clickhouseFactory.getEventRepositoryForOrganizationSync(
169+
taskEventStore,
170+
row.organization_id
171+
);
172+
routeCache.set(routeKey, resolved);
173+
}
174+
175+
let group = groups.get(resolved.key);
176+
if (!group) {
177+
group = { repository: resolved.repository, rows: [] };
178+
groups.set(resolved.key, group);
179+
}
180+
group.rows.push(row);
181+
}
182+
}
183+
184+
for (const [, { repository, rows }] of groups) {
185+
repository.insertManyMetrics(rows);
186+
}
187+
}
188+
180189
#logEventsVerbose(events: CreateEventInput[], prefix: string) {
181190
if (!this._verbose) return;
182191

@@ -592,6 +601,21 @@ function convertMetricsToClickhouseRows(
592601
return rows;
593602
}
594603

604+
function convertResourceMetricsToRowsWithStore(
605+
resourceMetrics: ResourceMetrics,
606+
spanAttributeValueLengthLimit: number
607+
): { rows: MetricsV1Input[]; taskEventStore: string } {
608+
const resourceAttributes = resourceMetrics.resource?.attributes ?? [];
609+
const taskEventStore =
610+
extractStringAttribute(resourceAttributes, [SemanticInternalAttributes.TASK_EVENT_STORE]) ??
611+
env.EVENT_REPOSITORY_DEFAULT_STORE;
612+
613+
return {
614+
rows: convertMetricsToClickhouseRows(resourceMetrics, spanAttributeValueLengthLimit),
615+
taskEventStore,
616+
};
617+
}
618+
595619
// Prefixes injected by TaskContextMetricExporter — these are extracted into
596620
// the nested `trigger` key and should not appear as top-level user attributes.
597621
const INTERNAL_METRIC_ATTRIBUTE_PREFIXES = ["ctx.", "worker."];
@@ -1203,11 +1227,6 @@ export const otlpExporter = singleton("otlpExporter", initializeOTLPExporter);
12031227
function initializeOTLPExporter() {
12041228
return new OTLPExporter({
12051229
clickhouseFactory,
1206-
metrics: {
1207-
batchSize: env.METRICS_CLICKHOUSE_BATCH_SIZE,
1208-
flushInterval: env.METRICS_CLICKHOUSE_FLUSH_INTERVAL_MS,
1209-
maxConcurrency: env.METRICS_CLICKHOUSE_MAX_CONCURRENCY,
1210-
},
12111230
verbose: process.env.OTLP_EXPORTER_VERBOSE === "1",
12121231
spanAttributeValueLengthLimit: process.env.SERVER_OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT
12131232
? parseInt(process.env.SERVER_OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT, 10)

0 commit comments

Comments
 (0)