Skip to content

Commit 2888404

Browse files
committed
fix(data-drains): align cursor predicate and ORDER BY on ms-truncated timestamps
Postgres timestamps are µs-precision but the cursor only round-trips at ms. Previously the predicate compared in ms buckets while ORDER BY sorted on raw µs — a row with later µs but a lexicographically earlier id than the cursor row could be permanently skipped. Use date_trunc('milliseconds', col) in both the predicate and ORDER BY across all 5 sources so the two agree. Also include organizationId in the data-drain runs query key so cache entries don't collide across orgs.
1 parent 3d8a404 commit 2888404

7 files changed

Lines changed: 34 additions & 30 deletions

File tree

apps/sim/ee/data-drains/hooks/data-drains.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,8 @@ export const dataDrainKeys = {
2525
detail: (drainId?: string) => [...dataDrainKeys.details(), drainId ?? ''] as const,
2626
runsAll: () => [...dataDrainKeys.all, 'runs'] as const,
2727
runs: (drainId?: string) => [...dataDrainKeys.runsAll(), drainId ?? ''] as const,
28-
runsList: (drainId?: string, limit?: number) =>
29-
[...dataDrainKeys.runs(drainId), limit ?? 10] as const,
28+
runsList: (organizationId?: string, drainId?: string, limit?: number) =>
29+
[...dataDrainKeys.runs(drainId), organizationId ?? '', limit ?? 10] as const,
3030
}
3131

3232
async function fetchDataDrains(organizationId: string, signal?: AbortSignal): Promise<DataDrain[]> {
@@ -62,7 +62,7 @@ export function useDataDrains(organizationId?: string) {
6262

6363
export function useDataDrainRuns(organizationId?: string, drainId?: string, limit = 10) {
6464
return useQuery<DataDrainRun[]>({
65-
queryKey: dataDrainKeys.runsList(drainId, limit),
65+
queryKey: dataDrainKeys.runsList(organizationId, drainId, limit),
6666
queryFn: ({ signal }) =>
6767
fetchDataDrainRuns(organizationId as string, drainId as string, limit, signal),
6868
enabled: Boolean(organizationId && drainId),

apps/sim/lib/data-drains/sources/audit-logs.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import { db } from '@sim/db'
22
import { auditLog } from '@sim/db/schema'
3-
import { and, asc, inArray, isNull, or, sql } from 'drizzle-orm'
3+
import { and, inArray, isNull, or, sql } from 'drizzle-orm'
44
import {
55
decodeTimeCursor,
66
encodeTimeCursor,
7+
timeCursorOrderBy,
78
timeCursorPredicate,
89
} from '@/lib/data-drains/sources/cursor'
910
import { getOrganizationWorkspaceIds } from '@/lib/data-drains/sources/helpers'
@@ -38,7 +39,7 @@ async function* pages(input: SourcePageInput): AsyncIterable<AuditLogRow[]> {
3839
.select()
3940
.from(auditLog)
4041
.where(and(scopeClause, cursorClause))
41-
.orderBy(asc(auditLog.createdAt), asc(auditLog.id))
42+
.orderBy(...timeCursorOrderBy(auditLog.createdAt, auditLog.id))
4243
.limit(input.chunkSize)
4344

4445
if (rows.length === 0) return

apps/sim/lib/data-drains/sources/copilot-chats.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import { db } from '@sim/db'
22
import { copilotChats } from '@sim/db/schema'
3-
import { and, asc, inArray } from 'drizzle-orm'
3+
import { and, inArray } from 'drizzle-orm'
44
import {
55
decodeTimeCursor,
66
encodeTimeCursor,
7+
timeCursorOrderBy,
78
timeCursorPredicate,
89
} from '@/lib/data-drains/sources/cursor'
910
import { getOrganizationWorkspaceIds } from '@/lib/data-drains/sources/helpers'
@@ -31,7 +32,7 @@ async function* pages(input: SourcePageInput): AsyncIterable<CopilotChatRow[]> {
3132
.select()
3233
.from(copilotChats)
3334
.where(and(inArray(copilotChats.workspaceId, workspaceIds), cursorClause))
34-
.orderBy(asc(copilotChats.createdAt), asc(copilotChats.id))
35+
.orderBy(...timeCursorOrderBy(copilotChats.createdAt, copilotChats.id))
3536
.limit(input.chunkSize)
3637

3738
if (rows.length === 0) return

apps/sim/lib/data-drains/sources/copilot-runs.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import { db } from '@sim/db'
22
import { copilotRuns } from '@sim/db/schema'
3-
import { and, asc, inArray, isNotNull } from 'drizzle-orm'
3+
import { and, inArray, isNotNull } from 'drizzle-orm'
44
import {
55
decodeTimeCursor,
66
encodeTimeCursor,
7+
timeCursorOrderBy,
78
timeCursorPredicate,
89
} from '@/lib/data-drains/sources/cursor'
910
import { getOrganizationWorkspaceIds } from '@/lib/data-drains/sources/helpers'
@@ -33,7 +34,7 @@ async function* pages(input: SourcePageInput): AsyncIterable<CopilotRunRow[]> {
3334
cursorClause
3435
)
3536
)
36-
.orderBy(asc(copilotRuns.completedAt), asc(copilotRuns.id))
37+
.orderBy(...timeCursorOrderBy(copilotRuns.completedAt, copilotRuns.id))
3738
.limit(input.chunkSize)
3839

3940
if (rows.length === 0) return

apps/sim/lib/data-drains/sources/cursor.ts

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { and, gt, gte, lt, or, type SQL } from 'drizzle-orm'
1+
import { type SQL, sql } from 'drizzle-orm'
22
import type { PgColumn } from 'drizzle-orm/pg-core'
33
import type { Cursor } from '@/lib/data-drains/types'
44

@@ -29,29 +29,28 @@ export function decodeTimeCursor(cursor: Cursor): TimeCursor | null {
2929

3030
/**
3131
* Builds a strict-greater-than predicate over a `(timestampCol, idCol)` pair.
32-
* Semantically: `(timestampCol, idCol) > (cursor.ts, cursor.id)`.
3332
*
3433
* Postgres `timestamp` columns store microsecond precision but JS `Date`
35-
* round-trips at millisecond precision, so a row with PG value
36-
* `00:00:00.123456` survives the JS → cursor → JS round-trip as
37-
* `00:00:00.123000`. A naive `eq(col, new Date(cursor.ts))` would never match
38-
* such rows, and `gt(col, new Date(cursor.ts))` would match the cursor row
39-
* itself — re-emitting the last row of every chunk forever.
40-
*
41-
* Fix: compare in millisecond buckets. Anything in a strictly later ms bucket
42-
* is included; anything inside the cursor's own ms bucket is included only
43-
* when `idCol > cursor.id`.
34+
* round-trips at millisecond precision, so the cursor only ever captures
35+
* millisecond-truncated timestamps. We compare in millisecond buckets via
36+
* `date_trunc('milliseconds', col)` so the predicate's notion of order matches
37+
* `timeCursorOrderBy` exactly. If ORDER BY used raw microseconds while the
38+
* predicate used millisecond buckets, a row sorted later by µs but with a
39+
* lexicographically earlier id than the cursor row would be skipped forever.
4440
*/
4541
export function timeCursorPredicate(
4642
timestampCol: PgColumn,
4743
idCol: PgColumn,
4844
cursor: TimeCursor | null
4945
): SQL | undefined {
5046
if (!cursor) return undefined
51-
const cursorTs = new Date(cursor.ts)
52-
const nextMs = new Date(cursorTs.getTime() + 1)
53-
return or(
54-
gte(timestampCol, nextMs),
55-
and(gte(timestampCol, cursorTs), lt(timestampCol, nextMs), gt(idCol, cursor.id))
56-
)
47+
return sql`(date_trunc('milliseconds', ${timestampCol}), ${idCol}) > (${new Date(cursor.ts)}, ${cursor.id})`
48+
}
49+
50+
/**
51+
* ORDER BY fragments paired with `timeCursorPredicate`. Both must agree on
52+
* millisecond bucketing so cursor advancement never skips rows.
53+
*/
54+
export function timeCursorOrderBy(timestampCol: PgColumn, idCol: PgColumn): [SQL, SQL] {
55+
return [sql`date_trunc('milliseconds', ${timestampCol}) asc`, sql`${idCol} asc`]
5756
}

apps/sim/lib/data-drains/sources/job-logs.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import { db } from '@sim/db'
22
import { jobExecutionLogs } from '@sim/db/schema'
3-
import { and, asc, inArray, isNotNull } from 'drizzle-orm'
3+
import { and, inArray, isNotNull } from 'drizzle-orm'
44
import {
55
decodeTimeCursor,
66
encodeTimeCursor,
7+
timeCursorOrderBy,
78
timeCursorPredicate,
89
} from '@/lib/data-drains/sources/cursor'
910
import { getOrganizationWorkspaceIds } from '@/lib/data-drains/sources/helpers'
@@ -33,7 +34,7 @@ async function* pages(input: SourcePageInput): AsyncIterable<JobLogRow[]> {
3334
cursorClause
3435
)
3536
)
36-
.orderBy(asc(jobExecutionLogs.endedAt), asc(jobExecutionLogs.id))
37+
.orderBy(...timeCursorOrderBy(jobExecutionLogs.endedAt, jobExecutionLogs.id))
3738
.limit(input.chunkSize)
3839

3940
if (rows.length === 0) return

apps/sim/lib/data-drains/sources/workflow-logs.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import { db } from '@sim/db'
22
import { workflowExecutionLogs } from '@sim/db/schema'
3-
import { and, asc, inArray, isNotNull } from 'drizzle-orm'
3+
import { and, inArray, isNotNull } from 'drizzle-orm'
44
import {
55
decodeTimeCursor,
66
encodeTimeCursor,
7+
timeCursorOrderBy,
78
timeCursorPredicate,
89
} from '@/lib/data-drains/sources/cursor'
910
import { getOrganizationWorkspaceIds } from '@/lib/data-drains/sources/helpers'
@@ -40,7 +41,7 @@ async function* pages(input: SourcePageInput): AsyncIterable<WorkflowLogRow[]> {
4041
cursorClause
4142
)
4243
)
43-
.orderBy(asc(workflowExecutionLogs.endedAt), asc(workflowExecutionLogs.id))
44+
.orderBy(...timeCursorOrderBy(workflowExecutionLogs.endedAt, workflowExecutionLogs.id))
4445
.limit(input.chunkSize)
4546

4647
if (rows.length === 0) return

0 commit comments

Comments
 (0)