@@ -74,19 +74,15 @@ class OTLPExporter {
7474
7575 async exportMetrics ( request : ExportMetricsServiceRequest ) : Promise < ExportMetricsServiceResponse > {
7676 return await startSpan ( this . _tracer , "exportMetrics" , async ( span ) => {
77- const metricsWithStores = this . #filterResourceMetrics( request . resourceMetrics ) . map (
77+ const rows = this . #filterResourceMetrics( request . resourceMetrics ) . flatMap (
7878 ( resourceMetrics ) =>
79- convertResourceMetricsToRowsWithStore (
80- resourceMetrics ,
81- this . _spanAttributeValueLengthLimit
82- )
79+ convertMetricsToClickhouseRows ( resourceMetrics , this . _spanAttributeValueLengthLimit )
8380 ) ;
8481
85- const rowCount = metricsWithStores . reduce ( ( acc , m ) => acc + m . rows . length , 0 ) ;
86- span . setAttribute ( "metric_row_count" , rowCount ) ;
82+ span . setAttribute ( "metric_row_count" , rows . length ) ;
8783
88- if ( rowCount > 0 ) {
89- await this . #exportMetricRows( metricsWithStores ) ;
84+ if ( rows . length > 0 ) {
85+ await this . #exportMetricRows( rows ) ;
9086 }
9187
9288 return ExportMetricsServiceResponse . create ( ) ;
@@ -155,34 +151,31 @@ class OTLPExporter {
155151 return eventCount ;
156152 }
157153
158- async #exportMetricRows(
159- metricsWithStores : { rows : MetricsV1Input [ ] ; taskEventStore : string } [ ]
160- ) : Promise < void > {
154+ async #exportMetricRows( rows : MetricsV1Input [ ] ) : Promise < void > {
161155 const routeCache = new Map < string , { key : string ; repository : IEventRepository } > ( ) ;
162156 const groups = new Map < string , { repository : IEventRepository ; rows : MetricsV1Input [ ] } > ( ) ;
163- for ( const { rows, taskEventStore } of metricsWithStores ) {
164- for ( const row of rows ) {
165- const routeKey = `${ row . organization_id } \0${ taskEventStore } ` ;
166- let resolved = routeCache . get ( routeKey ) ;
167- if ( ! resolved ) {
168- resolved = this . _clickhouseFactory . getEventRepositoryForOrganizationSync (
169- taskEventStore ,
170- row . organization_id
171- ) ;
172- routeCache . set ( routeKey , resolved ) ;
173- }
174157
175- let group = groups . get ( resolved . key ) ;
176- if ( ! group ) {
177- group = { repository : resolved . repository , rows : [ ] } ;
178- groups . set ( resolved . key , group ) ;
179- }
180- group . rows . push ( row ) ;
158+ for ( const row of rows ) {
159+ const routeKey = row . organization_id ;
160+ let resolved = routeCache . get ( routeKey ) ;
161+ if ( ! resolved ) {
162+ resolved = this . _clickhouseFactory . getEventRepositoryForOrganizationSync (
163+ "clickhouse_v2" ,
164+ row . organization_id
165+ ) ;
166+ routeCache . set ( routeKey , resolved ) ;
181167 }
168+
169+ let group = groups . get ( resolved . key ) ;
170+ if ( ! group ) {
171+ group = { repository : resolved . repository , rows : [ ] } ;
172+ groups . set ( resolved . key , group ) ;
173+ }
174+ group . rows . push ( row ) ;
182175 }
183176
184- for ( const [ , { repository, rows } ] of groups ) {
185- repository . insertManyMetrics ( rows ) ;
177+ for ( const [ , { repository, rows : groupedRows } ] of groups ) {
178+ repository . insertManyMetrics ( groupedRows ) ;
186179 }
187180 }
188181
@@ -601,21 +594,6 @@ function convertMetricsToClickhouseRows(
601594 return rows ;
602595}
603596
604- function convertResourceMetricsToRowsWithStore (
605- resourceMetrics : ResourceMetrics ,
606- spanAttributeValueLengthLimit : number
607- ) : { rows : MetricsV1Input [ ] ; taskEventStore : string } {
608- const resourceAttributes = resourceMetrics . resource ?. attributes ?? [ ] ;
609- const taskEventStore =
610- extractStringAttribute ( resourceAttributes , [ SemanticInternalAttributes . TASK_EVENT_STORE ] ) ??
611- env . EVENT_REPOSITORY_DEFAULT_STORE ;
612-
613- return {
614- rows : convertMetricsToClickhouseRows ( resourceMetrics , spanAttributeValueLengthLimit ) ,
615- taskEventStore,
616- } ;
617- }
618-
619597// Prefixes injected by TaskContextMetricExporter — these are extracted into
620598// the nested `trigger` key and should not appear as top-level user attributes.
621599const INTERNAL_METRIC_ATTRIBUTE_PREFIXES = [ "ctx." , "worker." ] ;
0 commit comments