@@ -38,6 +38,11 @@ import EventEmitter from "node:events";
3838import pLimit from "p-limit" ;
3939import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings" ;
4040import { calculateErrorFingerprint } from "~/utils/errorFingerprinting" ;
41+ import {
42+ isClickHouseJsonParseError ,
43+ parseRowNumberFromError ,
44+ sanitizeRows ,
45+ } from "~/v3/eventRepository/sanitizeRowsOnParseError.server" ;
4146
4247interface TransactionEvent < T = any > {
4348 tag : "insert" | "update" | "delete" ;
@@ -129,6 +134,15 @@ export class RunsReplicationService {
129134 private _disablePayloadInsert : boolean ;
130135 private _disableErrorFingerprinting : boolean ;
131136
137+ /**
138+ * Counts batches that hit a ClickHouse `Cannot parse JSON object` failure
139+ * that survived one sanitize-retry. These batches are dropped on the floor
140+ * (returning success-ish to the caller so the retry layer doesn't spin on
141+ * the same deterministic failure), and we track the drop count for
142+ * observability. Counter only — does not gate behaviour.
143+ */
144+ private _permanentlyDroppedBatches = 0 ;
145+
132146 // Metrics
133147 private _replicationLagHistogram : Histogram ;
134148 private _batchesFlushedCounter : Counter ;
@@ -283,6 +297,11 @@ export class RunsReplicationService {
283297 this . _insertMaxDelayMs = options . insertMaxDelayMs ?? 2000 ;
284298 }
285299
300+ /** Exposed for tests and metrics — total batches lost to unrecoverable parse errors. */
301+ get permanentlyDroppedBatches ( ) {
302+ return this . _permanentlyDroppedBatches ;
303+ }
304+
286305 public async shutdown ( ) {
287306 if ( this . _isShuttingDown ) return ;
288307
@@ -837,24 +856,29 @@ export class RunsReplicationService {
837856 return ;
838857 }
839858 return await startSpan ( this . _tracer , "insertTaskRunsInserts" , async ( span ) => {
840- const [ insertError , insertResult ] =
841- await clickhouse . taskRuns . insertCompactArrays ( taskRunInserts , {
842- params : {
843- clickhouse_settings : this . #getClickhouseInsertSettings( ) ,
844- } ,
845- } ) ;
846-
847- if ( insertError ) {
848- this . logger . error ( "Error inserting task run inserts attempt" , {
849- error : insertError ,
850- attempt,
851- } ) ;
852-
853- recordSpanError ( span , insertError ) ;
854- throw insertError ;
855- }
856-
857- return insertResult ;
859+ const doInsert = async ( ) => {
860+ const [ insertError , insertResult ] = await clickhouse . taskRuns . insertCompactArrays (
861+ taskRunInserts ,
862+ { params : { clickhouse_settings : this . #getClickhouseInsertSettings( ) } }
863+ ) ;
864+ if ( insertError ) {
865+ this . logger . error ( "Error inserting task run inserts attempt" , {
866+ error : insertError ,
867+ attempt,
868+ } ) ;
869+ recordSpanError ( span , insertError ) ;
870+ throw insertError ;
871+ }
872+ return insertResult ;
873+ } ;
874+
875+ const outcome = await this . #insertWithJsonParseRecovery(
876+ taskRunInserts ,
877+ doInsert ,
878+ "task_runs_v2" ,
879+ attempt
880+ ) ;
881+ return outcome . kind === "dropped" ? undefined : outcome . insertResult ;
858882 } ) ;
859883 }
860884
@@ -867,25 +891,130 @@ export class RunsReplicationService {
867891 return ;
868892 }
869893 return await startSpan ( this . _tracer , "insertPayloadInserts" , async ( span ) => {
870- const [ insertError , insertResult ] =
871- await clickhouse . taskRuns . insertPayloadsCompactArrays ( payloadInserts , {
872- params : {
873- clickhouse_settings : this . #getClickhouseInsertSettings( ) ,
874- } ,
875- } ) ;
876-
877- if ( insertError ) {
878- this . logger . error ( "Error inserting payload inserts attempt" , {
879- error : insertError ,
880- attempt,
881- } ) ;
894+ const doInsert = async ( ) => {
895+ const [ insertError , insertResult ] = await clickhouse . taskRuns . insertPayloadsCompactArrays (
896+ payloadInserts ,
897+ { params : { clickhouse_settings : this . #getClickhouseInsertSettings( ) } }
898+ ) ;
899+ if ( insertError ) {
900+ this . logger . error ( "Error inserting payload inserts attempt" , {
901+ error : insertError ,
902+ attempt,
903+ } ) ;
904+ recordSpanError ( span , insertError ) ;
905+ throw insertError ;
906+ }
907+ return insertResult ;
908+ } ;
909+
910+ const outcome = await this . #insertWithJsonParseRecovery(
911+ payloadInserts ,
912+ doInsert ,
913+ "raw_task_runs_payload_v1" ,
914+ attempt
915+ ) ;
916+ return outcome . kind === "dropped" ? undefined : outcome . insertResult ;
917+ } ) ;
918+ }
882919
883- recordSpanError ( span , insertError ) ;
884- throw insertError ;
920+ /**
921+ * Wraps a ClickHouse insert with reactive UTF-16 sanitization for
922+ * `Cannot parse JSON object` rejections. Mirrors the pattern from
923+ * `ClickhouseEventRepository.#insertWithJsonParseRecovery` introduced
924+ * in #3659 — same root cause (lone UTF-16 surrogates in user-provided
925+ * JSON), same recovery shape:
926+ *
927+ * 1. Try the insert. Healthy batches pay zero scan cost.
928+ * 2. On parse error, walk the whole batch via `sanitizeRows` and
929+ * replace any lone-surrogate string with `"[invalid-utf16]"`.
930+ * 3. Retry once. If the sanitizer found nothing or the retry also
931+ * fails with the same error class, drop the batch loudly and
932+ * return — do NOT rethrow, otherwise the surrounding
933+ * `#insertWithRetry` layer would spin three more times on the
934+ * same deterministic failure.
935+ * 4. Non-parse errors propagate unchanged so the existing
936+ * transient-retry path still handles them.
937+ *
938+ * The whole-batch scan (rather than slicing on the `at row N` hint) is
939+ * deliberate: `at row N` semantics under `input_format_parallel_parsing`
940+ * aren't stable enough to safely skip rows. The cost is bounded because
941+ * `detectBadJsonStrings` exits in O(1) for clean strings.
942+ */
943+ async #insertWithJsonParseRecovery< T extends object > (
944+ rows : T [ ] ,
945+ doInsert : ( ) => Promise < unknown > ,
946+ contextLabel : string ,
947+ attempt : number
948+ ) : Promise <
949+ | { kind : "inserted" ; insertResult : unknown }
950+ | { kind : "sanitized" ; insertResult : unknown }
951+ | { kind : "dropped" }
952+ > {
953+ try {
954+ return { kind : "inserted" , insertResult : await doInsert ( ) } ;
955+ } catch ( firstError ) {
956+ if ( ! isClickHouseJsonParseError ( firstError ) ) throw firstError ;
957+
958+ const firstMessage =
959+ typeof firstError === "object" && firstError !== null && "message" in firstError
960+ ? String ( ( firstError as { message ?: unknown } ) . message ?? "" )
961+ : String ( firstError ) ;
962+
963+ const rowHint = parseRowNumberFromError ( firstMessage ) ;
964+ const { rowsTouched, fieldsSanitized } = sanitizeRows ( rows ) ;
965+
966+ if ( fieldsSanitized === 0 ) {
967+ this . _permanentlyDroppedBatches += 1 ;
968+ this . logger . error (
969+ "Dropped batch — ClickHouse JSON parse error but sanitizer found nothing to fix" ,
970+ {
971+ contextLabel,
972+ attempt,
973+ batchSize : rows . length ,
974+ clickhouseRowHint : rowHint ,
975+ permanentlyDroppedBatches : this . _permanentlyDroppedBatches ,
976+ sampleRow : JSON . stringify ( rows [ 0 ] ?? null ) . slice ( 0 , 1024 ) ,
977+ clickhouseError : firstMessage . split ( "\n" ) [ 0 ] ,
978+ }
979+ ) ;
980+ return { kind : "dropped" } ;
885981 }
886982
887- return insertResult ;
888- } ) ;
983+ this . logger . warn ( "Sanitizing batch after ClickHouse JSON parse error" , {
984+ contextLabel,
985+ attempt,
986+ batchSize : rows . length ,
987+ clickhouseRowHint : rowHint ,
988+ rowsTouched,
989+ fieldsSanitized,
990+ clickhouseError : firstMessage . split ( "\n" ) [ 0 ] ,
991+ } ) ;
992+
993+ try {
994+ return { kind : "sanitized" , insertResult : await doInsert ( ) } ;
995+ } catch ( retryError ) {
996+ if ( ! isClickHouseJsonParseError ( retryError ) ) throw retryError ;
997+
998+ this . _permanentlyDroppedBatches += 1 ;
999+ const retryMessage =
1000+ typeof retryError === "object" && retryError !== null && "message" in retryError
1001+ ? String ( ( retryError as { message ?: unknown } ) . message ?? "" )
1002+ : String ( retryError ) ;
1003+ this . logger . error (
1004+ "Dropped batch after sanitize-retry still hit ClickHouse JSON parse error" ,
1005+ {
1006+ contextLabel,
1007+ attempt,
1008+ batchSize : rows . length ,
1009+ permanentlyDroppedBatches : this . _permanentlyDroppedBatches ,
1010+ sampleRow : JSON . stringify ( rows [ 0 ] ?? null ) . slice ( 0 , 1024 ) ,
1011+ firstError : firstMessage . split ( "\n" ) [ 0 ] ,
1012+ retryError : retryMessage . split ( "\n" ) [ 0 ] ,
1013+ }
1014+ ) ;
1015+ return { kind : "dropped" } ;
1016+ }
1017+ }
8891018 }
8901019
8911020 async #prepareRunInserts(
0 commit comments