Skip to content

Commit 44df180

Browse files
committed
Back to main runReplicationService
1 parent 676fca2 commit 44df180

File tree

1 file changed

+28
-122
lines changed

1 file changed

+28
-122
lines changed

apps/webapp/app/services/runsReplicationService.server.ts

Lines changed: 28 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,17 @@ import {
2121
import { Logger, type LogLevel } from "@trigger.dev/core/logger";
2222
import { tryCatch } from "@trigger.dev/core/utils";
2323
import { parsePacketAsJson } from "@trigger.dev/core/v3/utils/ioSerialization";
24-
import { unsafeExtractIdempotencyKeyScope, unsafeExtractIdempotencyKeyUser } from "@trigger.dev/core/v3/serverOnly";
24+
import {
25+
unsafeExtractIdempotencyKeyScope,
26+
unsafeExtractIdempotencyKeyUser,
27+
} from "@trigger.dev/core/v3/serverOnly";
2528
import { RunAnnotations } from "@trigger.dev/core/v3";
2629
import { type TaskRun } from "@trigger.dev/database";
2730
import { nanoid } from "nanoid";
2831
import EventEmitter from "node:events";
2932
import pLimit from "p-limit";
3033
import { detectBadJsonStrings } from "~/utils/detectBadJsonStrings";
3134
import { calculateErrorFingerprint } from "~/utils/errorFingerprinting";
32-
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactory.server";
3335

3436
interface TransactionEvent<T = any> {
3537
tag: "insert" | "update" | "delete";
@@ -618,73 +620,18 @@ export class RunsReplicationService {
618620
payloadInserts: payloadInserts.length,
619621
});
620622

621-
// Task runs are already sorted by org (lines 571-576), so we can stream through
622-
// and flush when org changes - no grouping overhead, no O(n²) lookups
623-
624-
// Build run_id -> org_id index for O(1) payload->org lookups
625-
const runIdToOrgId = new Map(
626-
taskRunInserts.map(tr => [getTaskRunField(tr, "run_id"), getTaskRunField(tr, "organization_id")])
623+
// Insert task runs and payloads with retry logic for connection errors
624+
const [taskRunError, taskRunResult] = await this.#insertWithRetry(
625+
(attempt) => this.#insertTaskRunInserts(taskRunInserts, attempt),
626+
"task run inserts",
627+
flushId
627628
);
628629

629-
// Group payloads by org using the index (O(n) instead of O(n²))
630-
const payloadsByOrg = new Map<string, PayloadInsertArray[]>();
631-
for (const payload of payloadInserts) {
632-
const runId = getPayloadField(payload, "run_id");
633-
const orgId = runIdToOrgId.get(runId);
634-
if (orgId) {
635-
const orgPayloads = payloadsByOrg.get(orgId);
636-
if (orgPayloads) {
637-
orgPayloads.push(payload);
638-
} else {
639-
payloadsByOrg.set(orgId, [payload]);
640-
}
641-
}
642-
}
643-
644-
// Stream through task runs, flushing when org changes
645-
const insertPromises: Promise<{ taskRunError: Error | null; payloadError: Error | null; orgId: string }>[] = [];
646-
let currentOrgId: string | null = null;
647-
let currentOrgTaskRuns: TaskRunInsertArray[] = [];
648-
649-
for (const taskRun of taskRunInserts) {
650-
const orgId = getTaskRunField(taskRun, "organization_id");
651-
652-
// Org changed? Flush previous org's batch
653-
if (currentOrgId !== null && currentOrgId !== orgId) {
654-
const orgPayloads = payloadsByOrg.get(currentOrgId) || [];
655-
insertPromises.push(
656-
this.#insertOrgBatch(currentOrgId, currentOrgTaskRuns, orgPayloads, flushId)
657-
);
658-
currentOrgTaskRuns = [];
659-
}
660-
661-
currentOrgId = orgId;
662-
currentOrgTaskRuns.push(taskRun);
663-
}
664-
665-
// Flush final org's batch
666-
if (currentOrgId !== null && currentOrgTaskRuns.length > 0) {
667-
const orgPayloads = payloadsByOrg.get(currentOrgId) || [];
668-
insertPromises.push(
669-
this.#insertOrgBatch(currentOrgId, currentOrgTaskRuns, orgPayloads, flushId)
670-
);
671-
}
672-
673-
// Wait for all org batches to complete (parallel execution)
674-
const results = await Promise.all(insertPromises);
675-
676-
// Aggregate errors from all organizations
677-
let taskRunError: Error | null = null;
678-
let payloadError: Error | null = null;
679-
680-
for (const result of results) {
681-
if (result.taskRunError) {
682-
taskRunError = result.taskRunError;
683-
}
684-
if (result.payloadError) {
685-
payloadError = result.payloadError;
686-
}
687-
}
630+
const [payloadError, payloadResult] = await this.#insertWithRetry(
631+
(attempt) => this.#insertPayloadInserts(payloadInserts, attempt),
632+
"payload inserts",
633+
flushId
634+
);
688635

689636
// Log any errors that occurred
690637
if (taskRunError) {
@@ -826,50 +773,19 @@ export class RunsReplicationService {
826773
};
827774
}
828775

829-
async #insertOrgBatch(
830-
organizationId: string,
831-
taskRunInserts: TaskRunInsertArray[],
832-
payloadInserts: PayloadInsertArray[],
833-
flushId: string
834-
): Promise<{ taskRunError: Error | null; payloadError: Error | null; orgId: string }> {
835-
const [taskRunError] = await this.#insertWithRetry(
836-
(attempt) => this.#insertTaskRunInserts(organizationId, taskRunInserts, attempt),
837-
"task run inserts",
838-
flushId
839-
);
840-
841-
const [payloadError] = await this.#insertWithRetry(
842-
(attempt) => this.#insertPayloadInserts(organizationId, payloadInserts, attempt),
843-
"payload inserts",
844-
flushId
845-
);
846-
847-
return { taskRunError, payloadError, orgId: organizationId };
848-
}
849-
850-
async #insertTaskRunInserts(
851-
organizationId: string,
852-
taskRunInserts: TaskRunInsertArray[],
853-
attempt: number
854-
) {
776+
async #insertTaskRunInserts(taskRunInserts: TaskRunInsertArray[], attempt: number) {
855777
return await startSpan(this._tracer, "insertTaskRunsInserts", async (span) => {
856-
// Get the appropriate ClickHouse client for this organization
857-
const clickhouse = await clickhouseFactory.getClickhouseForOrganization(organizationId, "replication");
858-
859-
const [insertError, insertResult] = await clickhouse.taskRuns.insertCompactArrays(
860-
taskRunInserts,
861-
{
778+
const [insertError, insertResult] =
779+
await this.options.clickhouse.taskRuns.insertCompactArrays(taskRunInserts, {
862780
params: {
863781
clickhouse_settings: this.#getClickhouseInsertSettings(),
864782
},
865-
}
866-
);
783+
});
867784

868785
if (insertError) {
869786
this.logger.error("Error inserting task run inserts attempt", {
870787
error: insertError,
871788
attempt,
872-
organizationId,
873789
});
874790

875791
recordSpanError(span, insertError);
@@ -880,29 +796,19 @@ export class RunsReplicationService {
880796
});
881797
}
882798

883-
async #insertPayloadInserts(
884-
organizationId: string,
885-
payloadInserts: PayloadInsertArray[],
886-
attempt: number
887-
) {
799+
async #insertPayloadInserts(payloadInserts: PayloadInsertArray[], attempt: number) {
888800
return await startSpan(this._tracer, "insertPayloadInserts", async (span) => {
889-
// Get the appropriate ClickHouse client for this organization
890-
const clickhouse = await clickhouseFactory.getClickhouseForOrganization(organizationId, "replication");
891-
892-
const [insertError, insertResult] = await clickhouse.taskRuns.insertPayloadsCompactArrays(
893-
payloadInserts,
894-
{
801+
const [insertError, insertResult] =
802+
await this.options.clickhouse.taskRuns.insertPayloadsCompactArrays(payloadInserts, {
895803
params: {
896804
clickhouse_settings: this.#getClickhouseInsertSettings(),
897805
},
898-
}
899-
);
806+
});
900807

901808
if (insertError) {
902809
this.logger.error("Error inserting payload inserts attempt", {
903810
error: insertError,
904811
attempt,
905-
organizationId,
906812
});
907813

908814
recordSpanError(span, insertError);
@@ -957,12 +863,13 @@ export class RunsReplicationService {
957863
const errorData = { data: run.error };
958864

959865
// Calculate error fingerprint for failed runs
960-
const errorFingerprint = (
866+
const errorFingerprint =
961867
!this._disableErrorFingerprinting &&
962-
['SYSTEM_FAILURE', 'CRASHED', 'INTERRUPTED', 'COMPLETED_WITH_ERRORS', 'TIMED_OUT'].includes(run.status)
963-
)
964-
? calculateErrorFingerprint(run.error)
965-
: '';
868+
["SYSTEM_FAILURE", "CRASHED", "INTERRUPTED", "COMPLETED_WITH_ERRORS", "TIMED_OUT"].includes(
869+
run.status
870+
)
871+
? calculateErrorFingerprint(run.error)
872+
: "";
966873

967874
const annotations = this.#parseAnnotations(run.annotations);
968875

@@ -1075,7 +982,6 @@ export class RunsReplicationService {
1075982

1076983
return { data: parsedData };
1077984
}
1078-
1079985
}
1080986

1081987
export type ConcurrentFlushSchedulerConfig<T> = {

0 commit comments

Comments
 (0)