Skip to content

Commit 3d8a404

Browse files
waleedlatif1claude
andcommitted
fix(data-drains): bugbot fixes for cursor precision, invisible failures, audit-log index
- cursor: compare in millisecond buckets so PG's microsecond-precision timestamps don't cause the cursor row to re-emit forever. The JS Date round-trip truncates 00:00:00.123456 to 00:00:00.123, which made gt(col, cursor) match the cursor row itself. - service: insert the run row before parse/decrypt so encryption-key rotation or schema drift surface as a failed run in the UI instead of vanishing into background-job logs while lastRunAt advances. - audit_log: add (workspace_id, created_at, id) composite index so the audit-logs source's tie-breaking ORDER BY is satisfied by the index without a heap fetch. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent fb19077 commit 3d8a404

5 files changed

Lines changed: 74 additions & 19 deletions

File tree

apps/sim/lib/data-drains/service.ts

Lines changed: 23 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,10 +56,6 @@ export async function runDrain(
5656

5757
const source = getSource(drain.source)
5858
const destination = getDestination(drain.destinationType)
59-
const config = destination.configSchema.parse(drain.destinationConfig)
60-
const credentials = destination.credentialsSchema.parse(
61-
await decryptCredentials(drain.destinationCredentials)
62-
)
6359

6460
const runId = generateId()
6561
const startedAt = new Date()
@@ -79,9 +75,21 @@ export async function runDrain(
7975
let sequence = 0
8076
const locators: string[] = []
8177

82-
const session = destination.openSession({ config, credentials })
78+
/**
79+
* Schema-parse and decrypt happen *after* the run row is created so failures
80+
* in either (e.g. encryption-key rotation, schema drift across versions)
81+
* surface as a `failed` run row in the UI rather than vanishing into the
82+
* background-job logs while `lastRunAt` quietly advances.
83+
*/
84+
let session: ReturnType<typeof destination.openSession> | null = null
8385

8486
try {
87+
const config = destination.configSchema.parse(drain.destinationConfig)
88+
const credentials = destination.credentialsSchema.parse(
89+
await decryptCredentials(drain.destinationCredentials)
90+
)
91+
session = destination.openSession({ config, credentials })
92+
8593
for await (const chunk of source.pages({
8694
organizationId: drain.organizationId,
8795
cursor,
@@ -203,14 +211,16 @@ export async function runDrain(
203211

204212
throw error
205213
} finally {
206-
try {
207-
await session.close()
208-
} catch (closeError) {
209-
logger.warn('Data drain session close failed', {
210-
drainId,
211-
runId,
212-
error: toError(closeError).message,
213-
})
214+
if (session) {
215+
try {
216+
await session.close()
217+
} catch (closeError) {
218+
logger.warn('Data drain session close failed', {
219+
drainId,
220+
runId,
221+
error: toError(closeError).message,
222+
})
223+
}
214224
}
215225
}
216226
}

apps/sim/lib/data-drains/sources/cursor.ts

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { and, eq, gt, or, type SQL } from 'drizzle-orm'
1+
import { and, gt, gte, lt, or, type SQL } from 'drizzle-orm'
22
import type { PgColumn } from 'drizzle-orm/pg-core'
33
import type { Cursor } from '@/lib/data-drains/types'
44

@@ -29,17 +29,29 @@ export function decodeTimeCursor(cursor: Cursor): TimeCursor | null {
2929

3030
/**
3131
* Builds a strict-greater-than predicate over a `(timestampCol, idCol)` pair.
32-
* Equivalent to `(timestampCol, idCol) > (cursor.ts, cursor.id)` — rows on the
33-
* cursor itself are excluded; ties on `timestampCol` break by `idCol`.
32+
* Semantically: `(timestampCol, idCol) > (cursor.ts, cursor.id)`.
33+
*
34+
* Postgres `timestamp` columns store microsecond precision but JS `Date`
35+
* round-trips at millisecond precision, so a row with PG value
36+
* `00:00:00.123456` survives the JS → cursor → JS round-trip as
37+
* `00:00:00.123000`. A naive `eq(col, new Date(cursor.ts))` would never match
38+
* such rows, and `gt(col, new Date(cursor.ts))` would match the cursor row
39+
* itself — re-emitting the last row of every chunk forever.
40+
*
41+
* Fix: compare in millisecond buckets. Anything in a strictly later ms bucket
42+
* is included; anything inside the cursor's own ms bucket is included only
43+
* when `idCol > cursor.id`.
3444
*/
3545
export function timeCursorPredicate(
3646
timestampCol: PgColumn,
3747
idCol: PgColumn,
3848
cursor: TimeCursor | null
3949
): SQL | undefined {
4050
if (!cursor) return undefined
51+
const cursorTs = new Date(cursor.ts)
52+
const nextMs = new Date(cursorTs.getTime() + 1)
4153
return or(
42-
gt(timestampCol, new Date(cursor.ts)),
43-
and(eq(timestampCol, new Date(cursor.ts)), gt(idCol, cursor.id))
54+
gte(timestampCol, nextMs),
55+
and(gte(timestampCol, cursorTs), lt(timestampCol, nextMs), gt(idCol, cursor.id))
4456
)
4557
}

packages/db/migrations/0202_panoramic_dreaming_celestial.sql

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,5 @@ CREATE UNIQUE INDEX "data_drains_org_name_unique" ON "data_drains" USING btree (
4646
CREATE INDEX "copilot_chats_workspace_created_at_id_idx" ON "copilot_chats" USING btree ("workspace_id","created_at","id");--> statement-breakpoint
4747
CREATE INDEX "copilot_runs_workspace_completed_at_id_idx" ON "copilot_runs" USING btree ("workspace_id","completed_at","id");--> statement-breakpoint
4848
CREATE INDEX "job_execution_logs_workspace_ended_at_id_idx" ON "job_execution_logs" USING btree ("workspace_id","ended_at","id");--> statement-breakpoint
49-
CREATE INDEX "workflow_execution_logs_workspace_ended_at_id_idx" ON "workflow_execution_logs" USING btree ("workspace_id","ended_at","id");
49+
CREATE INDEX "workflow_execution_logs_workspace_ended_at_id_idx" ON "workflow_execution_logs" USING btree ("workspace_id","ended_at","id");--> statement-breakpoint
50+
CREATE INDEX "audit_log_workspace_created_at_id_idx" ON "audit_log" USING btree ("workspace_id","created_at","id");

packages/db/migrations/meta/0202_snapshot.json

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1259,6 +1259,33 @@
12591259
"method": "btree",
12601260
"with": {}
12611261
},
1262+
"audit_log_workspace_created_at_id_idx": {
1263+
"name": "audit_log_workspace_created_at_id_idx",
1264+
"columns": [
1265+
{
1266+
"expression": "workspace_id",
1267+
"isExpression": false,
1268+
"asc": true,
1269+
"nulls": "last"
1270+
},
1271+
{
1272+
"expression": "created_at",
1273+
"isExpression": false,
1274+
"asc": true,
1275+
"nulls": "last"
1276+
},
1277+
{
1278+
"expression": "id",
1279+
"isExpression": false,
1280+
"asc": true,
1281+
"nulls": "last"
1282+
}
1283+
],
1284+
"isUnique": false,
1285+
"concurrently": false,
1286+
"method": "btree",
1287+
"with": {}
1288+
},
12621289
"audit_log_actor_created_idx": {
12631290
"name": "audit_log_actor_created_idx",
12641291
"columns": [

packages/db/schema.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2416,6 +2416,11 @@ export const auditLog = pgTable(
24162416
table.workspaceId,
24172417
table.createdAt
24182418
),
2419+
workspaceCreatedIdIdx: index('audit_log_workspace_created_at_id_idx').on(
2420+
table.workspaceId,
2421+
table.createdAt,
2422+
table.id
2423+
),
24192424
actorCreatedIdx: index('audit_log_actor_created_idx').on(table.actorId, table.createdAt),
24202425
resourceIdx: index('audit_log_resource_idx').on(table.resourceType, table.resourceId),
24212426
actionIdx: index('audit_log_action_idx').on(table.action),

0 commit comments

Comments
 (0)