Skip to content

Commit 330acfe

Browse files
committed
Run Replication using the factory
1 parent 44df180 commit 330acfe

12 files changed

+253
-125
lines changed

apps/webapp/app/routes/admin.api.v1.runs-replication.create.ts

Lines changed: 26 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
22
import { prisma } from "~/db.server";
33
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
44
import { z } from "zod";
5-
import { ClickHouse } from "@internal/clickhouse";
65
import { env } from "~/env.server";
6+
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactory.server";
77
import { RunsReplicationService } from "~/services/runsReplicationService.server";
88
import {
99
getRunsReplicationGlobal,
@@ -62,6 +62,8 @@ export async function action({ request }: ActionFunctionArgs) {
6262

6363
const params = CreateRunReplicationServiceParams.parse(await request.json());
6464

65+
await clickhouseFactory.isReady();
66+
6567
const service = createRunReplicationService(params);
6668

6769
setRunsReplicationGlobal(service);
@@ -77,24 +79,23 @@ export async function action({ request }: ActionFunctionArgs) {
7779
}
7880

7981
function createRunReplicationService(params: CreateRunReplicationServiceParams) {
80-
const clickhouse = new ClickHouse({
81-
url: env.RUN_REPLICATION_CLICKHOUSE_URL,
82-
name: params.name,
83-
keepAlive: {
84-
enabled: params.keepAliveEnabled,
85-
idleSocketTtl: params.keepAliveIdleSocketTtl,
86-
},
87-
logLevel: "debug",
88-
compression: {
89-
request: true,
90-
},
91-
maxOpenConnections: params.maxOpenConnections,
92-
});
82+
const {
83+
name,
84+
maxFlushConcurrency,
85+
flushIntervalMs,
86+
flushBatchSize,
87+
leaderLockTimeoutMs,
88+
leaderLockExtendIntervalMs,
89+
leaderLockAcquireAdditionalTimeMs,
90+
leaderLockRetryIntervalMs,
91+
ackIntervalSeconds,
92+
waitForAsyncInsert,
93+
} = params;
9394

9495
const service = new RunsReplicationService({
95-
clickhouse: clickhouse,
96+
clickhouseFactory,
9697
pgConnectionUrl: env.DATABASE_URL,
97-
serviceName: params.name,
98+
serviceName: name,
9899
slotName: env.RUN_REPLICATION_SLOT_NAME,
99100
publicationName: env.RUN_REPLICATION_PUBLICATION_NAME,
100101
redisOptions: {
@@ -106,16 +107,16 @@ function createRunReplicationService(params: CreateRunReplicationServiceParams)
106107
enableAutoPipelining: true,
107108
...(env.RUN_REPLICATION_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
108109
},
109-
maxFlushConcurrency: params.maxFlushConcurrency,
110-
flushIntervalMs: params.flushIntervalMs,
111-
flushBatchSize: params.flushBatchSize,
112-
leaderLockTimeoutMs: params.leaderLockTimeoutMs,
113-
leaderLockExtendIntervalMs: params.leaderLockExtendIntervalMs,
114-
leaderLockAcquireAdditionalTimeMs: params.leaderLockAcquireAdditionalTimeMs,
115-
leaderLockRetryIntervalMs: params.leaderLockRetryIntervalMs,
116-
ackIntervalSeconds: params.ackIntervalSeconds,
110+
maxFlushConcurrency,
111+
flushIntervalMs,
112+
flushBatchSize,
113+
leaderLockTimeoutMs,
114+
leaderLockExtendIntervalMs,
115+
leaderLockAcquireAdditionalTimeMs,
116+
leaderLockRetryIntervalMs,
117+
ackIntervalSeconds,
117118
logLevel: "debug",
118-
waitForAsyncInsert: params.waitForAsyncInsert,
119+
waitForAsyncInsert,
119120
});
120121

121122
return service;

apps/webapp/app/routes/admin.api.v1.runs-replication.start.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { ActionFunctionArgs, json } from "@remix-run/server-runtime";
22
import { prisma } from "~/db.server";
33
import { authenticateApiRequestWithPersonalAccessToken } from "~/services/personalAccessToken.server";
4+
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactory.server";
45
import { getRunsReplicationGlobal } from "~/services/runsReplicationGlobal.server";
56
import { runsReplicationInstance } from "~/services/runsReplicationInstance.server";
67

@@ -29,6 +30,8 @@ export async function action({ request }: ActionFunctionArgs) {
2930
try {
3031
const globalService = getRunsReplicationGlobal();
3132

33+
await clickhouseFactory.isReady();
34+
3235
if (globalService) {
3336
await globalService.start();
3437
} else {

apps/webapp/app/services/clickhouse/clickhouseFactory.server.ts

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,34 @@ function initializeQueryClickhouseClient() {
122122
});
123123
}
124124

125+
/** TaskRun replication to ClickHouse (`RUN_REPLICATION_CLICKHOUSE_URL`); not exported. */
126+
const defaultRunsReplicationClickhouseClient = singleton(
127+
"runsReplicationClickhouseClient",
128+
initializeRunsReplicationClickhouseClient
129+
);
130+
131+
function initializeRunsReplicationClickhouseClient(): ClickHouse {
132+
if (!env.RUN_REPLICATION_CLICKHOUSE_URL) {
133+
// Runs replication worker gates on this URL; factory may still resolve "replication" for tests.
134+
return defaultClickhouseClient;
135+
}
136+
137+
const url = new URL(env.RUN_REPLICATION_CLICKHOUSE_URL);
138+
url.searchParams.delete("secure");
139+
140+
return new ClickHouse({
141+
url: url.toString(),
142+
name: "runs-replication",
143+
keepAlive: {
144+
enabled: env.RUN_REPLICATION_KEEP_ALIVE_ENABLED === "1",
145+
idleSocketTtl: env.RUN_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS,
146+
},
147+
logLevel: env.RUN_REPLICATION_CLICKHOUSE_LOG_LEVEL,
148+
compression: { request: true },
149+
maxOpenConnections: env.RUN_REPLICATION_MAX_OPEN_CONNECTIONS,
150+
});
151+
}
152+
125153
// ---------------------------------------------------------------------------
126154
// Helpers
127155
// ---------------------------------------------------------------------------
@@ -187,8 +215,9 @@ export class ClickhouseFactory {
187215
switch (clientType) {
188216
case "standard":
189217
case "events":
190-
case "replication":
191218
return defaultClickhouseClient;
219+
case "replication":
220+
return defaultRunsReplicationClickhouseClient;
192221
case "logs":
193222
return defaultLogsClickhouseClient;
194223
case "query":

apps/webapp/app/services/runsReplicationInstance.server.ts

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
import { ClickHouse } from "@internal/clickhouse";
21
import invariant from "tiny-invariant";
32
import { env } from "~/env.server";
3+
import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactory.server";
44
import { singleton } from "~/utils/singleton";
55
import { meter, provider } from "~/v3/tracer.server";
66
import { RunsReplicationService } from "./runsReplicationService.server";
@@ -22,22 +22,8 @@ function initializeRunsReplicationInstance() {
2222

2323
console.log("🗃️ Runs replication service enabled");
2424

25-
const clickhouse = new ClickHouse({
26-
url: env.RUN_REPLICATION_CLICKHOUSE_URL,
27-
name: "runs-replication",
28-
keepAlive: {
29-
enabled: env.RUN_REPLICATION_KEEP_ALIVE_ENABLED === "1",
30-
idleSocketTtl: env.RUN_REPLICATION_KEEP_ALIVE_IDLE_SOCKET_TTL_MS,
31-
},
32-
logLevel: env.RUN_REPLICATION_CLICKHOUSE_LOG_LEVEL,
33-
compression: {
34-
request: true,
35-
},
36-
maxOpenConnections: env.RUN_REPLICATION_MAX_OPEN_CONNECTIONS,
37-
});
38-
3925
const service = new RunsReplicationService({
40-
clickhouse: clickhouse,
26+
clickhouseFactory,
4127
pgConnectionUrl: DATABASE_URL,
4228
serviceName: "runs-replication",
4329
slotName: env.RUN_REPLICATION_SLOT_NAME,
@@ -72,8 +58,9 @@ function initializeRunsReplicationInstance() {
7258
});
7359

7460
if (env.RUN_REPLICATION_ENABLED === "1") {
75-
service
76-
.start()
61+
clickhouseFactory
62+
.isReady()
63+
.then(() => service.start())
7764
.then(() => {
7865
console.log("🗃️ Runs replication service started");
7966
})

0 commit comments

Comments
 (0)