Skip to content

Commit 66a66d5

Browse files
fix(retention-job): add chunking strategy for cleanup
1 parent f330fe2 commit 66a66d5

4 files changed

Lines changed: 248 additions & 240 deletions

File tree

apps/sim/background/cleanup-logs.ts

Lines changed: 47 additions & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -4,169 +4,71 @@ import { createLogger } from '@sim/logger'
44
import { task } from '@trigger.dev/sdk'
55
import { and, inArray, lt } from 'drizzle-orm'
66
import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher'
7+
import {
8+
batchDeleteByWorkspaceAndTimestamp,
9+
chunkedBatchDelete,
10+
type TableCleanupResult,
11+
} from '@/lib/cleanup/batch-delete'
712
import { snapshotService } from '@/lib/logs/execution/snapshot/service'
813
import { isUsingCloudStorage, StorageService } from '@/lib/uploads'
914
import { deleteFileMetadata } from '@/lib/uploads/server/metadata'
1015

1116
const logger = createLogger('CleanupLogs')
1217

13-
const BATCH_SIZE = 2000
14-
const MAX_BATCHES_PER_TIER = 10
15-
16-
interface TierResults {
17-
total: number
18-
deleted: number
19-
deleteFailed: number
18+
interface FileDeleteStats {
2019
filesTotal: number
2120
filesDeleted: number
2221
filesDeleteFailed: number
2322
}
2423

25-
function emptyTierResults(): TierResults {
26-
return {
27-
total: 0,
28-
deleted: 0,
29-
deleteFailed: 0,
30-
filesTotal: 0,
31-
filesDeleted: 0,
32-
filesDeleteFailed: 0,
33-
}
34-
}
35-
36-
async function deleteExecutionFiles(files: unknown, results: TierResults): Promise<void> {
24+
async function deleteExecutionFiles(files: unknown, stats: FileDeleteStats): Promise<void> {
3725
if (!isUsingCloudStorage() || !files || !Array.isArray(files)) return
3826

3927
const keys = files.filter((f) => f && typeof f === 'object' && f.key).map((f) => f.key as string)
40-
results.filesTotal += keys.length
28+
stats.filesTotal += keys.length
4129

4230
await Promise.all(
4331
keys.map(async (key) => {
4432
try {
4533
await StorageService.deleteFile({ key, context: 'execution' })
4634
await deleteFileMetadata(key)
47-
results.filesDeleted++
35+
stats.filesDeleted++
4836
} catch (fileError) {
49-
results.filesDeleteFailed++
37+
stats.filesDeleteFailed++
5038
logger.error(`Failed to delete file ${key}:`, { fileError })
5139
}
5240
})
5341
)
5442
}
5543

56-
async function cleanupTier(
44+
async function cleanupWorkflowExecutionLogs(
5745
workspaceIds: string[],
5846
retentionDate: Date,
5947
label: string
60-
): Promise<TierResults> {
61-
const results = emptyTierResults()
62-
if (workspaceIds.length === 0) return results
63-
64-
let batchesProcessed = 0
65-
let hasMore = true
66-
67-
while (hasMore && batchesProcessed < MAX_BATCHES_PER_TIER) {
68-
const batch = await db
69-
.select({
70-
id: workflowExecutionLogs.id,
71-
files: workflowExecutionLogs.files,
72-
})
73-
.from(workflowExecutionLogs)
74-
.where(
75-
and(
76-
inArray(workflowExecutionLogs.workspaceId, workspaceIds),
77-
lt(workflowExecutionLogs.startedAt, retentionDate)
48+
): Promise<TableCleanupResult & FileDeleteStats> {
49+
const fileStats: FileDeleteStats = { filesTotal: 0, filesDeleted: 0, filesDeleteFailed: 0 }
50+
51+
const dbStats = await chunkedBatchDelete({
52+
tableDef: workflowExecutionLogs,
53+
workspaceIds,
54+
tableName: `${label}/workflow_execution_logs`,
55+
selectChunk: (chunkIds, limit) =>
56+
db
57+
.select({ id: workflowExecutionLogs.id, files: workflowExecutionLogs.files })
58+
.from(workflowExecutionLogs)
59+
.where(
60+
and(
61+
inArray(workflowExecutionLogs.workspaceId, chunkIds),
62+
lt(workflowExecutionLogs.startedAt, retentionDate)
63+
)
7864
)
79-
)
80-
.limit(BATCH_SIZE)
81-
82-
results.total += batch.length
65+
.limit(limit),
66+
onBatch: async (rows) => {
67+
for (const row of rows) await deleteExecutionFiles(row.files, fileStats)
68+
},
69+
})
8370

84-
if (batch.length === 0) {
85-
hasMore = false
86-
break
87-
}
88-
89-
for (const log of batch) {
90-
await deleteExecutionFiles(log.files, results)
91-
}
92-
93-
const logIds = batch.map((log) => log.id)
94-
try {
95-
const deleted = await db
96-
.delete(workflowExecutionLogs)
97-
.where(inArray(workflowExecutionLogs.id, logIds))
98-
.returning({ id: workflowExecutionLogs.id })
99-
100-
results.deleted += deleted.length
101-
} catch (deleteError) {
102-
results.deleteFailed += logIds.length
103-
logger.error(`Batch delete failed for ${label}:`, { deleteError })
104-
}
105-
106-
batchesProcessed++
107-
hasMore = batch.length === BATCH_SIZE
108-
109-
logger.info(`[${label}] Batch ${batchesProcessed}: ${batch.length} logs processed`)
110-
}
111-
112-
return results
113-
}
114-
115-
interface JobLogCleanupResults {
116-
deleted: number
117-
deleteFailed: number
118-
}
119-
120-
async function cleanupJobExecutionLogsTier(
121-
workspaceIds: string[],
122-
retentionDate: Date,
123-
label: string
124-
): Promise<JobLogCleanupResults> {
125-
const results: JobLogCleanupResults = { deleted: 0, deleteFailed: 0 }
126-
if (workspaceIds.length === 0) return results
127-
128-
let batchesProcessed = 0
129-
let hasMore = true
130-
131-
while (hasMore && batchesProcessed < MAX_BATCHES_PER_TIER) {
132-
const batch = await db
133-
.select({ id: jobExecutionLogs.id })
134-
.from(jobExecutionLogs)
135-
.where(
136-
and(
137-
inArray(jobExecutionLogs.workspaceId, workspaceIds),
138-
lt(jobExecutionLogs.startedAt, retentionDate)
139-
)
140-
)
141-
.limit(BATCH_SIZE)
142-
143-
if (batch.length === 0) {
144-
hasMore = false
145-
break
146-
}
147-
148-
const logIds = batch.map((log) => log.id)
149-
try {
150-
const deleted = await db
151-
.delete(jobExecutionLogs)
152-
.where(inArray(jobExecutionLogs.id, logIds))
153-
.returning({ id: jobExecutionLogs.id })
154-
155-
results.deleted += deleted.length
156-
} catch (deleteError) {
157-
results.deleteFailed += logIds.length
158-
logger.error(`Batch delete failed for ${label} (job_execution_logs):`, { deleteError })
159-
}
160-
161-
batchesProcessed++
162-
hasMore = batch.length === BATCH_SIZE
163-
164-
logger.info(
165-
`[${label}] job_execution_logs batch ${batchesProcessed}: ${batch.length} rows processed`
166-
)
167-
}
168-
169-
return results
71+
return { ...dbStats, ...fileStats }
17072
}
17173

17274
export async function runCleanupLogs(payload: CleanupJobPayload): Promise<void> {
@@ -190,15 +92,21 @@ export async function runCleanupLogs(payload: CleanupJobPayload): Promise<void>
19092
`[${label}] Cleaning ${workspaceIds.length} workspaces, cutoff: ${retentionDate.toISOString()}`
19193
)
19294

193-
const results = await cleanupTier(workspaceIds, retentionDate, label)
194-
logger.info(
195-
`[${label}] workflow_execution_logs: ${results.deleted} deleted, ${results.deleteFailed} failed out of ${results.total} candidates`
196-
)
95+
const workflowResults = await cleanupWorkflowExecutionLogs(workspaceIds, retentionDate, label)
96+
if (workflowResults.filesTotal > 0) {
97+
logger.info(
98+
`[${label}] workflow_execution_logs files: ${workflowResults.filesDeleted}/${workflowResults.filesTotal} deleted, ${workflowResults.filesDeleteFailed} failed`
99+
)
100+
}
197101

198-
const jobLogResults = await cleanupJobExecutionLogsTier(workspaceIds, retentionDate, label)
199-
logger.info(
200-
`[${label}] job_execution_logs: ${jobLogResults.deleted} deleted, ${jobLogResults.deleteFailed} failed`
201-
)
102+
await batchDeleteByWorkspaceAndTimestamp({
103+
tableDef: jobExecutionLogs,
104+
workspaceIdCol: jobExecutionLogs.workspaceId,
105+
timestampCol: jobExecutionLogs.startedAt,
106+
workspaceIds,
107+
retentionDate,
108+
tableName: `${label}/job_execution_logs`,
109+
})
202110

203111
// Snapshot cleanup runs only on the free job to avoid running it N times for N enterprise workspaces.
204112
if (payload.plan === 'free') {

apps/sim/background/cleanup-soft-deletes.ts

Lines changed: 41 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import {
2121
DEFAULT_BATCH_SIZE,
2222
DEFAULT_MAX_BATCHES_PER_TABLE,
2323
deleteRowsById,
24+
selectRowsByWorkspaceChunks,
2425
} from '@/lib/cleanup/batch-delete'
2526
import { prepareChatCleanup } from '@/lib/cleanup/chat-cleanup'
2627
import type { StorageContext } from '@/lib/uploads'
@@ -44,35 +45,37 @@ async function selectExpiredWorkspaceFiles(
4445
workspaceIds: string[],
4546
retentionDate: Date
4647
): Promise<WorkspaceFileScope> {
47-
const limit = DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE
48-
4948
const [legacyRows, multiContextRows] = await Promise.all([
50-
db
51-
.select({ id: workspaceFile.id, key: workspaceFile.key })
52-
.from(workspaceFile)
53-
.where(
54-
and(
55-
inArray(workspaceFile.workspaceId, workspaceIds),
56-
isNotNull(workspaceFile.deletedAt),
57-
lt(workspaceFile.deletedAt, retentionDate)
49+
selectRowsByWorkspaceChunks(workspaceIds, (chunkIds, chunkLimit) =>
50+
db
51+
.select({ id: workspaceFile.id, key: workspaceFile.key })
52+
.from(workspaceFile)
53+
.where(
54+
and(
55+
inArray(workspaceFile.workspaceId, chunkIds),
56+
isNotNull(workspaceFile.deletedAt),
57+
lt(workspaceFile.deletedAt, retentionDate)
58+
)
5859
)
59-
)
60-
.limit(limit),
61-
db
62-
.select({
63-
id: workspaceFiles.id,
64-
key: workspaceFiles.key,
65-
context: workspaceFiles.context,
66-
})
67-
.from(workspaceFiles)
68-
.where(
69-
and(
70-
inArray(workspaceFiles.workspaceId, workspaceIds),
71-
isNotNull(workspaceFiles.deletedAt),
72-
lt(workspaceFiles.deletedAt, retentionDate)
60+
.limit(chunkLimit)
61+
),
62+
selectRowsByWorkspaceChunks(workspaceIds, (chunkIds, chunkLimit) =>
63+
db
64+
.select({
65+
id: workspaceFiles.id,
66+
key: workspaceFiles.key,
67+
context: workspaceFiles.context,
68+
})
69+
.from(workspaceFiles)
70+
.where(
71+
and(
72+
inArray(workspaceFiles.workspaceId, chunkIds),
73+
isNotNull(workspaceFiles.deletedAt),
74+
lt(workspaceFiles.deletedAt, retentionDate)
75+
)
7376
)
74-
)
75-
.limit(limit),
77+
.limit(chunkLimit)
78+
),
7679
])
7780

7881
return {
@@ -182,17 +185,19 @@ export async function runCleanupSoftDeletes(payload: CleanupJobPayload): Promise
182185
// (chats + S3) AND the DB deletes below — selecting twice could return
183186
// different subsets above the LIMIT cap and orphan or prematurely purge data.
184187
const [doomedWorkflows, fileScope] = await Promise.all([
185-
db
186-
.select({ id: workflow.id })
187-
.from(workflow)
188-
.where(
189-
and(
190-
inArray(workflow.workspaceId, workspaceIds),
191-
isNotNull(workflow.archivedAt),
192-
lt(workflow.archivedAt, retentionDate)
188+
selectRowsByWorkspaceChunks(workspaceIds, (chunkIds, chunkLimit) =>
189+
db
190+
.select({ id: workflow.id })
191+
.from(workflow)
192+
.where(
193+
and(
194+
inArray(workflow.workspaceId, chunkIds),
195+
isNotNull(workflow.archivedAt),
196+
lt(workflow.archivedAt, retentionDate)
197+
)
193198
)
194-
)
195-
.limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE),
199+
.limit(chunkLimit)
200+
),
196201
selectExpiredWorkspaceFiles(workspaceIds, retentionDate),
197202
])
198203

apps/sim/background/cleanup-tasks.ts

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,8 @@ import { and, inArray, lt, sql } from 'drizzle-orm'
1313
import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher'
1414
import {
1515
batchDeleteByWorkspaceAndTimestamp,
16-
DEFAULT_BATCH_SIZE,
17-
DEFAULT_MAX_BATCHES_PER_TABLE,
1816
deleteRowsById,
17+
selectRowsByWorkspaceChunks,
1918
type TableCleanupResult,
2019
} from '@/lib/cleanup/batch-delete'
2120
import { prepareChatCleanup } from '@/lib/cleanup/chat-cleanup'
@@ -67,13 +66,15 @@ async function cleanupRunChildren(
6766
): Promise<TableCleanupResult[]> {
6867
if (workspaceIds.length === 0) return []
6968

70-
const runIds = await db
71-
.select({ id: copilotRuns.id })
72-
.from(copilotRuns)
73-
.where(
74-
and(inArray(copilotRuns.workspaceId, workspaceIds), lt(copilotRuns.updatedAt, retentionDate))
75-
)
76-
.limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE)
69+
const runIds = await selectRowsByWorkspaceChunks(workspaceIds, (chunkIds, chunkLimit) =>
70+
db
71+
.select({ id: copilotRuns.id })
72+
.from(copilotRuns)
73+
.where(
74+
and(inArray(copilotRuns.workspaceId, chunkIds), lt(copilotRuns.updatedAt, retentionDate))
75+
)
76+
.limit(chunkLimit)
77+
)
7778

7879
if (runIds.length === 0) {
7980
return RUN_CHILD_TABLES.map((t) => ({ table: `${label}/${t.name}`, deleted: 0, failed: 0 }))
@@ -107,17 +108,15 @@ export async function runCleanupTasks(payload: CleanupJobPayload): Promise<void>
107108
`[${label}] Processing ${workspaceIds.length} workspaces, cutoff: ${retentionDate.toISOString()}`
108109
)
109110

110-
// Collect chat IDs before deleting so we can clean up the copilot backend after
111-
const doomedChats = await db
112-
.select({ id: copilotChats.id })
113-
.from(copilotChats)
114-
.where(
115-
and(
116-
inArray(copilotChats.workspaceId, workspaceIds),
117-
lt(copilotChats.updatedAt, retentionDate)
111+
const doomedChats = await selectRowsByWorkspaceChunks(workspaceIds, (chunkIds, chunkLimit) =>
112+
db
113+
.select({ id: copilotChats.id })
114+
.from(copilotChats)
115+
.where(
116+
and(inArray(copilotChats.workspaceId, chunkIds), lt(copilotChats.updatedAt, retentionDate))
118117
)
119-
)
120-
.limit(DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE)
118+
.limit(chunkLimit)
119+
)
121120

122121
const doomedChatIds = doomedChats.map((c) => c.id)
123122

0 commit comments

Comments
 (0)