Skip to content

Commit d9506f7

Browse files
matt-aitkenclaude
andcommitted
Route sessions replication via ClickhouseFactory, gate replication services on factory readiness, and tighten review-comment fixes
- SessionsReplicationService now takes ClickhouseFactory; #flushBatch buckets per org and routes each bucket to the org's ClickHouse (mirrors runs replication). - sessionsReplicationInstance gates service.start() on clickhouseFactory.isReady() and owns SIGTERM/SIGINT shutdown — collapsed entry.server.tsx's 35-line block to a single bootstrapping reference. Prevents misrouting org-scoped writes to the default cluster while the registry is still loading. - Factory: added sessions_replication ClientType + default singleton (parallels runs replication), converted the events client to a singleton (was building two pools to the same URL for v1/v2 repos), added missing startTimeMaxAgeMs to the clickhouse_v2 case. - Converted non-hot-path callers (runEngineHandlers, services, route handlers) from sync resolveEventRepositoryForStore to async getEventRepositoryForStore so they await registry readiness too. Sync now reserved for OTEL exporter and the replication services, which gate startup on isReady. - resolveEventRepositoryForStore is now non-exported with a JSDoc note explaining when sync-bypass-with-isReady-gating is justified. - Minor review fixes: tryCatch from @trigger.dev/core/utils subpath, safeParse + 400 response in admin.data-stores add/update branches, otlpExporter await moved past the content-type check so the unsupported branch short-circuits. - TestReplicationClickhouseFactory routes both replication and sessions_replication to the test client; sessions replication tests updated. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent f235b8e commit d9506f7

18 files changed

Lines changed: 258 additions & 137 deletions

apps/webapp/app/entry.server.tsx

Lines changed: 5 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -24,44 +24,12 @@ import {
2424
registerRunEngineEventBusHandlers,
2525
setupBatchQueueCallbacks,
2626
} from "./v3/runEngineHandlers.server";
27+
// Touch the sessions replication singleton at entry so it boots deterministically
28+
// on webapp startup. The singleton's initializer wires start (gated on
29+
// `clickhouseFactory.isReady()`) and SIGTERM/SIGINT shutdown — mirrors
30+
// runsReplicationInstance.
2731
import { sessionsReplicationInstance } from "./services/sessionsReplicationInstance.server";
28-
import { signalsEmitter } from "./services/signals.server";
29-
30-
// Start the sessions replication service (subscribes to the logical replication
31-
// slot, runs leader election, flushes to ClickHouse). Done at entry level so it
32-
// runs deterministically on webapp boot rather than lazily via a singleton
33-
// reference elsewhere in the module graph.
34-
if (sessionsReplicationInstance && env.SESSION_REPLICATION_ENABLED === "1") {
35-
// Capture a non-nullable reference so the shutdown closure below
36-
// doesn't need to re-null-check (TS narrowing doesn't follow through
37-
// an inner function scope).
38-
const replicator = sessionsReplicationInstance;
39-
replicator
40-
.start()
41-
.then(() => {
42-
console.log("🗃️ Sessions replication service started");
43-
})
44-
.catch((error) => {
45-
console.error("🗃️ Sessions replication service failed to start", {
46-
error,
47-
});
48-
});
49-
50-
// Wrap the async shutdown in a sync handler that catches rejections —
51-
// SIGTERM/SIGINT fire during process teardown, and an unhandled
52-
// promise rejection from `_replicationClient.stop()` there would
53-
// bubble up past the process exit. Matches the pattern in
54-
// dynamicFlushScheduler.server.ts.
55-
const shutdownSessionsReplication = () => {
56-
replicator.shutdown().catch((error) => {
57-
console.error("🗃️ Sessions replication service shutdown error", {
58-
error,
59-
});
60-
});
61-
};
62-
signalsEmitter.on("SIGTERM", shutdownSessionsReplication);
63-
signalsEmitter.on("SIGINT", shutdownSessionsReplication);
64-
}
32+
void sessionsReplicationInstance;
6533

6634
const ABORT_DELAY = 30000;
6735

apps/webapp/app/routes/admin.data-stores.tsx

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import { prisma } from "~/db.server";
2828
import { requireUser } from "~/services/session.server";
2929
import { ClickhouseConnectionSchema } from "~/services/clickhouse/clickhouseSecretSchemas.server";
3030
import { organizationDataStoresRegistry } from "~/services/dataStores/organizationDataStoresRegistryInstance.server";
31-
import { tryCatch } from "@trigger.dev/core";
31+
import { tryCatch } from "@trigger.dev/core/utils";
3232

3333
// ---------------------------------------------------------------------------
3434
// Loader
@@ -93,14 +93,20 @@ export async function action({ request }: ActionFunctionArgs) {
9393
.map((s) => s.trim())
9494
.filter(Boolean);
9595

96-
const config = ClickhouseConnectionSchema.parse({ url: connectionUrl });
96+
const parsedConfig = ClickhouseConnectionSchema.safeParse({ url: connectionUrl });
97+
if (!parsedConfig.success) {
98+
return typedjson(
99+
{ error: parsedConfig.error.issues.map((i) => i.message).join(", ") },
100+
{ status: 400 }
101+
);
102+
}
97103

98104
const [error, _] = await tryCatch(
99105
organizationDataStoresRegistry.addDataStore({
100106
key,
101107
kind: "CLICKHOUSE",
102108
organizationIds,
103-
config,
109+
config: parsedConfig.data,
104110
})
105111
);
106112

@@ -117,9 +123,17 @@ export async function action({ request }: ActionFunctionArgs) {
117123
.map((s) => s.trim())
118124
.filter(Boolean);
119125

120-
const config = connectionUrl
121-
? ClickhouseConnectionSchema.parse({ url: connectionUrl })
122-
: undefined;
126+
let config: ReturnType<typeof ClickhouseConnectionSchema.parse> | undefined;
127+
if (connectionUrl) {
128+
const parsedConfig = ClickhouseConnectionSchema.safeParse({ url: connectionUrl });
129+
if (!parsedConfig.success) {
130+
return typedjson(
131+
{ error: parsedConfig.error.issues.map((i) => i.message).join(", ") },
132+
{ status: 400 }
133+
);
134+
}
135+
config = parsedConfig.data;
136+
}
123137

124138
const [error, _] = await tryCatch(
125139
organizationDataStoresRegistry.updateDataStore({

apps/webapp/app/routes/api.v1.runs.$runId.events.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import {
66
createLoaderApiRoute,
77
} from "~/services/routeBuilders/apiBuilder.server";
88
import { ApiRetrieveRunPresenter } from "~/presenters/v3/ApiRetrieveRunPresenter.server";
9-
import { resolveEventRepositoryForStore } from "~/v3/eventRepository/index.server";
9+
import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server";
1010

1111
const ParamsSchema = z.object({
1212
runId: z.string(), // This is the run friendly ID
@@ -38,7 +38,7 @@ export const loader = createLoaderApiRoute(
3838
},
3939
},
4040
async ({ resource: run, authentication }) => {
41-
const eventRepository = resolveEventRepositoryForStore(
41+
const eventRepository = await getEventRepositoryForStore(
4242
run.taskEventStore,
4343
authentication.environment.organization.id
4444
);

apps/webapp/app/routes/api.v1.runs.$runId.spans.$spanId.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import {
77
anyResource,
88
createLoaderApiRoute,
99
} from "~/services/routeBuilders/apiBuilder.server";
10-
import { resolveEventRepositoryForStore } from "~/v3/eventRepository/index.server";
10+
import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server";
1111
import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server";
1212

1313
const ParamsSchema = z.object({
@@ -45,7 +45,7 @@ export const loader = createLoaderApiRoute(
4545
},
4646
},
4747
async ({ params, resource: run, authentication }) => {
48-
const eventRepository = resolveEventRepositoryForStore(
48+
const eventRepository = await getEventRepositoryForStore(
4949
run.taskEventStore,
5050
authentication.environment.organization.id
5151
);

apps/webapp/app/routes/api.v1.runs.$runId.trace.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import {
66
anyResource,
77
createLoaderApiRoute,
88
} from "~/services/routeBuilders/apiBuilder.server";
9-
import { resolveEventRepositoryForStore } from "~/v3/eventRepository/index.server";
9+
import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server";
1010
import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server";
1111

1212
const ParamsSchema = z.object({
@@ -43,7 +43,7 @@ export const loader = createLoaderApiRoute(
4343
},
4444
},
4545
async ({ resource: run, authentication }) => {
46-
const eventRepository = resolveEventRepositoryForStore(
46+
const eventRepository = await getEventRepositoryForStore(
4747
run.taskEventStore,
4848
authentication.environment.organization.id
4949
);

apps/webapp/app/routes/otel.v1.metrics.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,10 @@ import { otlpExporter } from "~/v3/otlpExporter.server";
77

88
export async function action({ request }: ActionFunctionArgs) {
99
try {
10-
const exporter = await otlpExporter;
1110
const contentType = request.headers.get("content-type")?.toLowerCase() ?? "";
1211

1312
if (contentType.startsWith("application/json")) {
13+
const exporter = await otlpExporter;
1414
const body = await request.json();
1515

1616
const exportResponse = await exporter.exportMetrics(
@@ -19,6 +19,7 @@ export async function action({ request }: ActionFunctionArgs) {
1919

2020
return json(exportResponse, { status: 200 });
2121
} else if (contentType.startsWith("application/x-protobuf")) {
22+
const exporter = await otlpExporter;
2223
const buffer = await request.arrayBuffer();
2324

2425
const exportRequest = ExportMetricsServiceRequest.decode(new Uint8Array(buffer));

apps/webapp/app/routes/resources.runs.$runParam.logs.download.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { Readable } from "stream";
88
import { formatDurationMilliseconds } from "@trigger.dev/core/v3/utils/durations";
99
import { getTaskEventStoreTableForRun } from "~/v3/taskEventStore.server";
1010
import { TaskEventKind } from "@trigger.dev/database";
11-
import { resolveEventRepositoryForStore } from "~/v3/eventRepository/index.server";
11+
import { getEventRepositoryForStore } from "~/v3/eventRepository/index.server";
1212

1313
export async function loader({ params, request }: LoaderFunctionArgs) {
1414
const user = await requireUser(request);
@@ -33,7 +33,10 @@ export async function loader({ params, request }: LoaderFunctionArgs) {
3333
return new Response("Not found", { status: 404 });
3434
}
3535

36-
const eventRepository = resolveEventRepositoryForStore(run.taskEventStore, run.organizationId);
36+
const eventRepository = await getEventRepositoryForStore(
37+
run.taskEventStore,
38+
run.organizationId
39+
);
3740

3841
const runEvents = await eventRepository.getRunEvents(
3942
getTaskEventStoreTableForRun(run),

apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts

Lines changed: 81 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,63 @@ function initializeRunsReplicationClickhouseClient(): ClickHouse {
153153
});
154154
}
155155

156+
/** Session replication to ClickHouse (`SESSION_REPLICATION_CLICKHOUSE_URL`); not exported. */
157+
const defaultSessionsReplicationClickhouseClient = singleton(
158+
"sessionsReplicationClickhouseClient",
159+
initializeSessionsReplicationClickhouseClient
160+
);
161+
162+
function initializeSessionsReplicationClickhouseClient(): ClickHouse {
163+
if (!env.SESSION_REPLICATION_CLICKHOUSE_URL) {
164+
// Sessions replication worker gates on this URL; factory may still resolve "sessions_replication" for tests.
165+
return defaultClickhouseClient;
166+
}
167+
168+
const url = new URL(env.SESSION_REPLICATION_CLICKHOUSE_URL);
169+
url.searchParams.delete("secure");
170+
171+
return new ClickHouse({
172+
url: url.toString(),
173+
name: "sessions-replication",
174+
keepAlive: {
175+
enabled: env.SESSION_REPLICATION_KEEP_ALIVE_ENABLED === "1",
176+
idleSocketTtl: env.SESSION_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS,
177+
},
178+
logLevel: env.SESSION_REPLICATION_CLICKHOUSE_LOG_LEVEL,
179+
compression: { request: true },
180+
maxOpenConnections: env.SESSION_REPLICATION_MAX_OPEN_CONNECTIONS,
181+
});
182+
}
183+
184+
/** Task events (`EVENTS_CLICKHOUSE_URL`); not exported — accessed via factory. */
185+
const defaultEventsClickhouseClient = singleton(
186+
"eventsClickhouseClient",
187+
initializeEventsClickhouseClient
188+
);
189+
190+
function initializeEventsClickhouseClient(): ClickHouse {
191+
if (!env.EVENTS_CLICKHOUSE_URL) {
192+
throw new Error("EVENTS_CLICKHOUSE_URL is not set");
193+
}
194+
195+
const url = new URL(env.EVENTS_CLICKHOUSE_URL);
196+
url.searchParams.delete("secure");
197+
198+
return new ClickHouse({
199+
url: url.toString(),
200+
name: "task-events",
201+
keepAlive: {
202+
enabled: env.EVENTS_CLICKHOUSE_KEEP_ALIVE_ENABLED === "1",
203+
idleSocketTtl: env.EVENTS_CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS,
204+
},
205+
logLevel: env.EVENTS_CLICKHOUSE_LOG_LEVEL,
206+
compression: {
207+
request: env.EVENTS_CLICKHOUSE_COMPRESSION_REQUEST === "1",
208+
},
209+
maxOpenConnections: env.EVENTS_CLICKHOUSE_MAX_OPEN_CONNECTIONS,
210+
});
211+
}
212+
156213
// ---------------------------------------------------------------------------
157214
// Helpers
158215
// ---------------------------------------------------------------------------
@@ -162,7 +219,14 @@ function hashHostname(url: string): string {
162219
return createHash("sha256").update(parsed.hostname).digest("hex");
163220
}
164221

165-
export type ClientType = "standard" | "events" | "replication" | "logs" | "query" | "admin";
222+
export type ClientType =
223+
| "standard"
224+
| "events"
225+
| "replication"
226+
| "sessions_replication"
227+
| "logs"
228+
| "query"
229+
| "admin";
166230

167231
function buildOrgClickhouseClient(url: string, clientType: ClientType): ClickHouse {
168232
const parsed = new URL(url);
@@ -196,6 +260,18 @@ function buildOrgClickhouseClient(url: string, clientType: ClientType): ClickHou
196260
compression: { request: true },
197261
maxOpenConnections: env.RUN_REPLICATION_MAX_OPEN_CONNECTIONS,
198262
});
263+
case "sessions_replication":
264+
return new ClickHouse({
265+
url: parsed.toString(),
266+
name,
267+
keepAlive: {
268+
enabled: env.SESSION_REPLICATION_KEEP_ALIVE_ENABLED === "1",
269+
idleSocketTtl: env.SESSION_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS,
270+
},
271+
logLevel: env.SESSION_REPLICATION_CLICKHOUSE_LOG_LEVEL,
272+
compression: { request: true },
273+
maxOpenConnections: env.SESSION_REPLICATION_MAX_OPEN_CONNECTIONS,
274+
});
199275
case "logs":
200276
return new ClickHouse({
201277
url: parsed.toString(),
@@ -266,6 +342,8 @@ export class ClickhouseFactory {
266342
return defaultClickhouseClient;
267343
case "replication":
268344
return defaultRunsReplicationClickhouseClient;
345+
case "sessions_replication":
346+
return defaultSessionsReplicationClickhouseClient;
269347
case "logs":
270348
return defaultLogsClickhouseClient;
271349
case "query":
@@ -350,26 +428,7 @@ export function getDefaultLogsClickhouseClient(): ClickHouse {
350428
// ---------------------------------------------------------------------------
351429

352430
function getEventsClickhouseClient(): ClickHouse {
353-
if (!env.EVENTS_CLICKHOUSE_URL) {
354-
throw new Error("EVENTS_CLICKHOUSE_URL is not set");
355-
}
356-
357-
const url = new URL(env.EVENTS_CLICKHOUSE_URL);
358-
url.searchParams.delete("secure");
359-
360-
return new ClickHouse({
361-
url: url.toString(),
362-
name: "task-events",
363-
keepAlive: {
364-
enabled: env.EVENTS_CLICKHOUSE_KEEP_ALIVE_ENABLED === "1",
365-
idleSocketTtl: env.EVENTS_CLICKHOUSE_KEEP_ALIVE_IDLE_SOCKET_TTL_MS,
366-
},
367-
logLevel: env.EVENTS_CLICKHOUSE_LOG_LEVEL,
368-
compression: {
369-
request: env.EVENTS_CLICKHOUSE_COMPRESSION_REQUEST === "1",
370-
},
371-
maxOpenConnections: env.EVENTS_CLICKHOUSE_MAX_OPEN_CONNECTIONS,
372-
});
431+
return defaultEventsClickhouseClient;
373432
}
374433

375434
function buildEventRepository(store: string, clickhouse: ClickHouse): ClickhouseEventRepository {
@@ -411,6 +470,7 @@ function buildEventRepository(store: string, clickhouse: ClickHouse): Clickhouse
411470
waitForAsyncInsert: env.EVENTS_CLICKHOUSE_WAIT_FOR_ASYNC_INSERT === "1",
412471
asyncInsertMaxDataSize: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_MAX_DATA_SIZE,
413472
asyncInsertBusyTimeoutMs: env.EVENTS_CLICKHOUSE_ASYNC_INSERT_BUSY_TIMEOUT_MS,
473+
startTimeMaxAgeMs: env.EVENTS_CLICKHOUSE_START_TIME_MAX_AGE_MS,
414474
llmMetricsBatchSize: env.LLM_METRICS_BATCH_SIZE,
415475
llmMetricsFlushInterval: env.LLM_METRICS_FLUSH_INTERVAL_MS,
416476
llmMetricsMaxBatchSize: env.LLM_METRICS_MAX_BATCH_SIZE,

0 commit comments

Comments
 (0)