Skip to content

Commit 00f6cd6

Browse files
committed
fix: importer 2..
1 parent 227d629 commit 00f6cd6

1 file changed

Lines changed: 31 additions & 28 deletions

File tree

packages/db/src/services/import.service.ts

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -226,16 +226,27 @@ export async function createSessionsStartEndEvents(
226226
"name NOT IN ('session_start', 'session_end')",
227227
].join(' AND ');
228228

229-
const sessionBatchSubquery = `
230-
(SELECT DISTINCT session_id
231-
FROM ${TABLE_NAMES.events_imports}
232-
WHERE ${baseWhere}
233-
AND session_id > {lastSessionId:String}
234-
ORDER BY session_id
235-
LIMIT {limit:UInt32})
236-
`;
237-
238229
while (true) {
230+
const idsResult = await ch.query({
231+
query: `
232+
SELECT DISTINCT session_id
233+
FROM ${TABLE_NAMES.events_imports}
234+
WHERE ${baseWhere}
235+
AND session_id > {lastSessionId:String}
236+
ORDER BY session_id
237+
LIMIT {limit:UInt32}
238+
`,
239+
query_params: { importId, lastSessionId, limit: SESSION_BATCH_SIZE },
240+
format: 'JSONEachRow',
241+
});
242+
243+
const idRows = (await idsResult.json()) as Array<{ session_id: string }>;
244+
if (idRows.length === 0) {
245+
break;
246+
}
247+
248+
const maxSessionId = idRows.at(-1)!.session_id;
249+
239250
const sessionEventsQuery = `
240251
SELECT
241252
device_id,
@@ -252,13 +263,14 @@ export async function createSessionsStartEndEvents(
252263
max(created_at) AS last_timestamp
253264
FROM ${TABLE_NAMES.events_imports}
254265
WHERE ${baseWhere}
255-
AND session_id IN ${sessionBatchSubquery}
266+
AND session_id > {lastSessionId:String}
267+
AND session_id <= {maxSessionId:String}
256268
GROUP BY session_id, device_id, project_id
257269
`;
258270

259271
const sessionEventsResult = await ch.query({
260272
query: sessionEventsQuery,
261-
query_params: { importId, lastSessionId, limit: SESSION_BATCH_SIZE },
273+
query_params: { importId, lastSessionId, maxSessionId },
262274
format: 'JSONEachRow',
263275
});
264276

@@ -411,11 +423,8 @@ export async function createSessionsStartEndEvents(
411423
await insertImportBatch(sessionEvents, importId);
412424
}
413425

414-
if (sessionData.length === 0) {
415-
break;
416-
}
417-
lastSessionId = sessionData.at(-1)!.session_id;
418-
if (sessionData.length < SESSION_BATCH_SIZE) {
426+
lastSessionId = maxSessionId;
427+
if (idRows.length < SESSION_BATCH_SIZE) {
419428
break;
420429
}
421430
}
@@ -476,15 +485,6 @@ export async function backfillSessionsToProduction(
476485
const SESSION_BATCH_SIZE = 5000;
477486
let lastSessionId = '';
478487

479-
const baseWhere = 'import_id = {importId:String} AND session_id > {lastSessionId:String}';
480-
const sessionBatchSubquery = `
481-
(SELECT DISTINCT session_id
482-
FROM ${TABLE_NAMES.events_imports}
483-
WHERE ${baseWhere}
484-
ORDER BY session_id
485-
LIMIT {limit:UInt32})
486-
`;
487-
488488
while (true) {
489489
const idsResult = await ch.query({
490490
query: `
@@ -504,6 +504,8 @@ export async function backfillSessionsToProduction(
504504
break;
505505
}
506506

507+
const maxSessionId = idRows.at(-1)!.session_id;
508+
507509
const sessionsInsertQuery = `
508510
INSERT INTO ${TABLE_NAMES.sessions} (
509511
id, project_id, profile_id, device_id, created_at, ended_at,
@@ -560,21 +562,22 @@ export async function backfillSessionsToProduction(
560562
FROM ${TABLE_NAMES.events_imports} e
561563
WHERE
562564
e.import_id = {importId:String}
563-
AND e.session_id IN ${sessionBatchSubquery}
565+
AND e.session_id > {lastSessionId:String}
566+
AND e.session_id <= {maxSessionId:String}
564567
GROUP BY e.session_id
565568
`;
566569

567570
await ch.command({
568571
query: sessionsInsertQuery,
569-
query_params: { importId, lastSessionId, limit: SESSION_BATCH_SIZE },
572+
query_params: { importId, lastSessionId, maxSessionId },
570573
clickhouse_settings: {
571574
wait_end_of_query: 1,
572575
send_progress_in_http_headers: 1,
573576
http_headers_progress_interval_ms: '50000',
574577
},
575578
});
576579

577-
lastSessionId = idRows.at(-1)!.session_id;
580+
lastSessionId = maxSessionId;
578581
if (idRows.length < SESSION_BATCH_SIZE) {
579582
break;
580583
}

0 commit comments

Comments
 (0)