Skip to content

Commit 3c48b24

Browse files
change stats to be perjob not per chunk
1 parent 66a66d5 commit 3c48b24

4 files changed

Lines changed: 60 additions & 37 deletions

File tree

apps/sim/background/cleanup-logs.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,9 @@ export async function runCleanupLogs(payload: CleanupJobPayload): Promise<void>
9393
)
9494

9595
const workflowResults = await cleanupWorkflowExecutionLogs(workspaceIds, retentionDate, label)
96-
if (workflowResults.filesTotal > 0) {
97-
logger.info(
98-
`[${label}] workflow_execution_logs files: ${workflowResults.filesDeleted}/${workflowResults.filesTotal} deleted, ${workflowResults.filesDeleteFailed} failed`
99-
)
100-
}
96+
logger.info(
97+
`[${label}] workflow_execution_logs files: ${workflowResults.filesDeleted}/${workflowResults.filesTotal} deleted, ${workflowResults.filesDeleteFailed} failed`
98+
)
10199

102100
await batchDeleteByWorkspaceAndTimestamp({
103101
tableDef: jobExecutionLogs,

apps/sim/background/cleanup-soft-deletes.ts

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,8 @@ import { and, inArray, isNotNull, lt } from 'drizzle-orm'
1818
import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher'
1919
import {
2020
batchDeleteByWorkspaceAndTimestamp,
21-
DEFAULT_BATCH_SIZE,
22-
DEFAULT_MAX_BATCHES_PER_TABLE,
2321
deleteRowsById,
24-
selectRowsByWorkspaceChunks,
22+
selectRowsByIdChunks,
2523
} from '@/lib/cleanup/batch-delete'
2624
import { prepareChatCleanup } from '@/lib/cleanup/chat-cleanup'
2725
import type { StorageContext } from '@/lib/uploads'
@@ -46,7 +44,7 @@ async function selectExpiredWorkspaceFiles(
4644
retentionDate: Date
4745
): Promise<WorkspaceFileScope> {
4846
const [legacyRows, multiContextRows] = await Promise.all([
49-
selectRowsByWorkspaceChunks(workspaceIds, (chunkIds, chunkLimit) =>
47+
selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) =>
5048
db
5149
.select({ id: workspaceFile.id, key: workspaceFile.key })
5250
.from(workspaceFile)
@@ -59,7 +57,7 @@ async function selectExpiredWorkspaceFiles(
5957
)
6058
.limit(chunkLimit)
6159
),
62-
selectRowsByWorkspaceChunks(workspaceIds, (chunkIds, chunkLimit) =>
60+
selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) =>
6361
db
6462
.select({
6563
id: workspaceFiles.id,
@@ -185,7 +183,7 @@ export async function runCleanupSoftDeletes(payload: CleanupJobPayload): Promise
185183
// (chats + S3) AND the DB deletes below — selecting twice could return
186184
// different subsets above the LIMIT cap and orphan or prematurely purge data.
187185
const [doomedWorkflows, fileScope] = await Promise.all([
188-
selectRowsByWorkspaceChunks(workspaceIds, (chunkIds, chunkLimit) =>
186+
selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) =>
189187
db
190188
.select({ id: workflow.id })
191189
.from(workflow)
@@ -205,11 +203,13 @@ export async function runCleanupSoftDeletes(payload: CleanupJobPayload): Promise
205203
let chatCleanup: { execute: () => Promise<void> } | null = null
206204

207205
if (doomedWorkflowIds.length > 0) {
208-
const doomedChats = await db
209-
.select({ id: copilotChats.id })
210-
.from(copilotChats)
211-
.where(inArray(copilotChats.workflowId, doomedWorkflowIds))
212-
.limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE)
206+
const doomedChats = await selectRowsByIdChunks(doomedWorkflowIds, (chunkIds, chunkLimit) =>
207+
db
208+
.select({ id: copilotChats.id })
209+
.from(copilotChats)
210+
.where(inArray(copilotChats.workflowId, chunkIds))
211+
.limit(chunkLimit)
212+
)
213213

214214
const doomedChatIds = doomedChats.map((c) => c.id)
215215
if (doomedChatIds.length > 0) {

apps/sim/background/cleanup-tasks.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/clean
1414
import {
1515
batchDeleteByWorkspaceAndTimestamp,
1616
deleteRowsById,
17-
selectRowsByWorkspaceChunks,
17+
selectRowsByIdChunks,
1818
type TableCleanupResult,
1919
} from '@/lib/cleanup/batch-delete'
2020
import { prepareChatCleanup } from '@/lib/cleanup/chat-cleanup'
@@ -66,7 +66,7 @@ async function cleanupRunChildren(
6666
): Promise<TableCleanupResult[]> {
6767
if (workspaceIds.length === 0) return []
6868

69-
const runIds = await selectRowsByWorkspaceChunks(workspaceIds, (chunkIds, chunkLimit) =>
69+
const runIds = await selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) =>
7070
db
7171
.select({ id: copilotRuns.id })
7272
.from(copilotRuns)
@@ -108,7 +108,7 @@ export async function runCleanupTasks(payload: CleanupJobPayload): Promise<void>
108108
`[${label}] Processing ${workspaceIds.length} workspaces, cutoff: ${retentionDate.toISOString()}`
109109
)
110110

111-
const doomedChats = await selectRowsByWorkspaceChunks(workspaceIds, (chunkIds, chunkLimit) =>
111+
const doomedChats = await selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) =>
112112
db
113113
.select({ id: copilotChats.id })
114114
.from(copilotChats)

apps/sim/lib/cleanup/batch-delete.ts

Lines changed: 43 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,30 +21,34 @@ export function chunkArray<T>(arr: T[], size: number): T[][] {
2121
return out
2222
}
2323

24-
export interface SelectByWorkspaceChunksOptions {
24+
export interface SelectByIdChunksOptions {
2525
/** Cap on rows returned across all chunks. Defaults to a full per-table cleanup budget. */
2626
overallLimit?: number
27-
workspaceChunkSize?: number
27+
chunkSize?: number
2828
}
2929

3030
/**
31-
* Run a SELECT query once per workspace chunk and concatenate results up to
31+
* Run a SELECT query once per ID chunk and concatenate results up to
3232
* `overallLimit`. Each chunk's query is passed the remaining row budget so the
3333
* total never exceeds the cap. Use this when you need the selected row set
3434
* (e.g. to drive S3 or copilot-backend cleanup alongside the DB delete).
35+
*
36+
* Works for any large ID set — workspace IDs, workflow IDs, etc. Avoids
37+
* sending one massive `IN (...)` list that would blow Postgres's statement
38+
* timeout.
3539
*/
36-
export async function selectRowsByWorkspaceChunks<T>(
37-
workspaceIds: string[],
40+
export async function selectRowsByIdChunks<T>(
41+
ids: string[],
3842
query: (chunkIds: string[], chunkLimit: number) => Promise<T[]>,
3943
{
4044
overallLimit = DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE,
41-
workspaceChunkSize = DEFAULT_WORKSPACE_CHUNK_SIZE,
42-
}: SelectByWorkspaceChunksOptions = {}
45+
chunkSize = DEFAULT_WORKSPACE_CHUNK_SIZE,
46+
}: SelectByIdChunksOptions = {}
4347
): Promise<T[]> {
44-
if (workspaceIds.length === 0) return []
48+
if (ids.length === 0) return []
4549

4650
const rows: T[] = []
47-
for (const chunkIds of chunkArray(workspaceIds, workspaceChunkSize)) {
51+
for (const chunkIds of chunkArray(ids, chunkSize)) {
4852
if (rows.length >= overallLimit) break
4953
const remaining = overallLimit - rows.length
5054
const chunkRows = await query(chunkIds, remaining)
@@ -68,8 +72,14 @@ export interface ChunkedBatchDeleteOptions<TRow extends { id: string }> {
6872
/** Runs between SELECT and DELETE; receives the just-selected rows. */
6973
onBatch?: (rows: TRow[]) => Promise<void>
7074
batchSize?: number
71-
/** Max batches per workspace chunk. Total per-run cap = chunks * maxBatches * batchSize. */
75+
/** Max batches per workspace chunk. */
7276
maxBatches?: number
77+
/**
78+
* Hard cap on rows processed (deleted + failed) across all chunks per call.
79+
* Defaults to `DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE`. Cron
80+
* runs frequently enough to catch up the backlog over multiple invocations.
81+
*/
82+
totalRowLimit?: number
7383
workspaceChunkSize?: number
7484
}
7585

@@ -78,7 +88,8 @@ export interface ChunkedBatchDeleteOptions<TRow extends { id: string }> {
7888
*
7989
* For each workspace chunk: SELECT a batch of eligible rows → run optional
8090
* `onBatch` hook (e.g. to delete S3 files) → DELETE those rows by ID. Repeats
81-
* until exhausted or `maxBatches` is hit, then moves to the next chunk.
91+
* until exhausted or `maxBatches` is hit, then moves to the next chunk. Stops
92+
* the whole call once `totalRowLimit` rows have been processed.
8293
*
8394
* Workspace IDs are chunked before the SELECT — see
8495
* `DEFAULT_WORKSPACE_CHUNK_SIZE` for why.
@@ -91,6 +102,7 @@ export async function chunkedBatchDelete<TRow extends { id: string }>({
91102
onBatch,
92103
batchSize = DEFAULT_BATCH_SIZE,
93104
maxBatches = DEFAULT_MAX_BATCHES_PER_TABLE,
105+
totalRowLimit = DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE,
94106
workspaceChunkSize = DEFAULT_WORKSPACE_CHUNK_SIZE,
95107
}: ChunkedBatchDeleteOptions<TRow>): Promise<TableCleanupResult> {
96108
const result: TableCleanupResult = { table: tableName, deleted: 0, failed: 0 }
@@ -101,14 +113,25 @@ export async function chunkedBatchDelete<TRow extends { id: string }>({
101113
}
102114

103115
const chunks = chunkArray(workspaceIds, workspaceChunkSize)
116+
let stoppedEarly = false
104117

105118
for (const [chunkIdx, chunkIds] of chunks.entries()) {
119+
if (result.deleted + result.failed >= totalRowLimit) {
120+
stoppedEarly = true
121+
break
122+
}
123+
106124
let batchesProcessed = 0
107125
let hasMore = true
108126

109-
while (hasMore && batchesProcessed < maxBatches) {
127+
while (
128+
hasMore &&
129+
batchesProcessed < maxBatches &&
130+
result.deleted + result.failed < totalRowLimit
131+
) {
132+
let rows: TRow[] = []
110133
try {
111-
const rows = await selectChunk(chunkIds, batchSize)
134+
rows = await selectChunk(chunkIds, batchSize)
112135

113136
if (rows.length === 0) {
114137
hasMore = false
@@ -127,17 +150,19 @@ export async function chunkedBatchDelete<TRow extends { id: string }>({
127150
hasMore = rows.length === batchSize
128151
batchesProcessed++
129152
} catch (error) {
130-
result.failed++
131-
logger.error(`[${tableName}] Batch failed (chunk ${chunkIdx + 1}/${chunks.length}):`, {
132-
error,
133-
})
153+
// Count rows we tried to delete; SELECT-stage errors leave rows=[].
154+
result.failed += rows.length
155+
logger.error(
156+
`[${tableName}] Batch failed (chunk ${chunkIdx + 1}/${chunks.length}, ${rows.length} rows):`,
157+
{ error }
158+
)
134159
hasMore = false
135160
}
136161
}
137162
}
138163

139164
logger.info(
140-
`[${tableName}] Complete: ${result.deleted} rows deleted across ${chunks.length} chunks (${result.failed} chunk failures)`
165+
`[${tableName}] Complete: ${result.deleted} deleted, ${result.failed} failed across ${chunks.length} chunks${stoppedEarly ? ' (row-limit reached, remaining chunks deferred to next run)' : ''}`
141166
)
142167

143168
return result

0 commit comments

Comments
 (0)