@@ -15,31 +15,41 @@ import {
1515import { createLogger } from '@sim/logger'
1616import { task } from '@trigger.dev/sdk'
1717import { and , inArray , isNotNull , lt } from 'drizzle-orm'
18+ import type { PgColumn , PgTable } from 'drizzle-orm/pg-core'
1819import { type CleanupJobPayload , resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher'
1920import {
2021 batchDeleteByWorkspaceAndTimestamp ,
2122 DEFAULT_BATCH_SIZE ,
2223 DEFAULT_MAX_BATCHES_PER_TABLE ,
24+ type TableCleanupResult ,
2325} from '@/lib/cleanup/batch-delete'
2426import { prepareChatCleanup } from '@/lib/cleanup/chat-cleanup'
2527import type { StorageContext } from '@/lib/uploads'
2628import { isUsingCloudStorage , StorageService } from '@/lib/uploads'
2729
2830const logger = createLogger ( 'CleanupSoftDeletes' )
2931
30- async function cleanupWorkspaceFileStorage (
32+ interface WorkspaceFileScope {
33+ /** Rows from `workspace_file` (singular, legacy workspace-context only). */
34+ legacyRows : Array < { id : string ; key : string } >
35+ /** Rows from `workspace_files` (plural, multi-context). */
36+ multiContextRows : Array < { id : string ; key : string ; context : StorageContext } >
37+ }
38+
39+ /**
40+ * Select every soft-deleted file row that's eligible for permanent removal.
41+ * Returned once and reused for both S3 deletion and DB deletion so the external
42+ * cleanup cannot drift from the row-level cleanup.
43+ */
44+ async function selectExpiredWorkspaceFiles (
3145 workspaceIds : string [ ] ,
3246 retentionDate : Date
33- ) : Promise < { filesDeleted : number ; filesFailed : number } > {
34- const stats = { filesDeleted : 0 , filesFailed : 0 }
35-
36- if ( ! isUsingCloudStorage ( ) || workspaceIds . length === 0 ) return stats
37-
47+ ) : Promise < WorkspaceFileScope > {
3848 const limit = DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE
3949
40- const [ legacyFiles , multiContextFiles ] = await Promise . all ( [
50+ const [ legacyRows , multiContextRows ] = await Promise . all ( [
4151 db
42- . select ( { key : workspaceFile . key } )
52+ . select ( { id : workspaceFile . id , key : workspaceFile . key } )
4353 . from ( workspaceFile )
4454 . where (
4555 and (
@@ -50,7 +60,11 @@ async function cleanupWorkspaceFileStorage(
5060 )
5161 . limit ( limit ) ,
5262 db
53- . select ( { key : workspaceFiles . key , context : workspaceFiles . context } )
63+ . select ( {
64+ id : workspaceFiles . id ,
65+ key : workspaceFiles . key ,
66+ context : workspaceFiles . context ,
67+ } )
5468 . from ( workspaceFiles )
5569 . where (
5670 and (
@@ -62,9 +76,25 @@ async function cleanupWorkspaceFileStorage(
6276 . limit ( limit ) ,
6377 ] )
6478
79+ return {
80+ legacyRows,
81+ multiContextRows : multiContextRows . map ( ( r ) => ( {
82+ id : r . id ,
83+ key : r . key ,
84+ context : r . context as StorageContext ,
85+ } ) ) ,
86+ }
87+ }
88+
89+ async function cleanupWorkspaceFileStorage (
90+ scope : WorkspaceFileScope
91+ ) : Promise < { filesDeleted : number ; filesFailed : number } > {
92+ const stats = { filesDeleted : 0 , filesFailed : 0 }
93+ if ( ! isUsingCloudStorage ( ) ) return stats
94+
6595 const toDelete : Array < { key : string ; context : StorageContext } > = [
66- ...legacyFiles . map ( ( f ) => ( { key : f . key , context : 'workspace' as StorageContext } ) ) ,
67- ...multiContextFiles . map ( ( f ) => ( { key : f . key , context : f . context as StorageContext } ) ) ,
96+ ...scope . legacyRows . map ( ( r ) => ( { key : r . key , context : 'workspace' as StorageContext } ) ) ,
97+ ...scope . multiContextRows . map ( ( r ) => ( { key : r . key , context : r . context } ) ) ,
6898 ]
6999
70100 await Promise . all (
@@ -82,13 +112,32 @@ async function cleanupWorkspaceFileStorage(
82112 return stats
83113}
84114
115+ async function deleteRowsById (
116+ tableDef : PgTable ,
117+ idCol : PgColumn ,
118+ ids : string [ ] ,
119+ tableName : string
120+ ) : Promise < TableCleanupResult > {
121+ const result : TableCleanupResult = { table : tableName , deleted : 0 , failed : 0 }
122+ if ( ids . length === 0 ) return result
123+ try {
124+ const deleted = await db . delete ( tableDef ) . where ( inArray ( idCol , ids ) ) . returning ( { id : idCol } )
125+ result . deleted = deleted . length
126+ logger . info ( `[${ tableName } ] Deleted ${ deleted . length } rows` )
127+ } catch ( error ) {
128+ result . failed ++
129+ logger . error ( `[${ tableName } ] Delete failed:` , { error } )
130+ }
131+ return result
132+ }
133+
134+ /**
135+ * Tables cleaned by the generic workspace-scoped batched DELETE. Tables whose
136+ * hard-delete triggers external side effects (workflow → copilot chats cascade,
137+ * workspace files → S3 storage) are handled explicitly so the SELECT that drives
138+ * the external cleanup and the SELECT that drives the DB delete see the same rows.
139+ */
85140const CLEANUP_TARGETS = [
86- {
87- table : workflow ,
88- softDeleteCol : workflow . archivedAt ,
89- wsCol : workflow . workspaceId ,
90- name : 'workflow' ,
91- } ,
92141 {
93142 table : workflowFolder ,
94143 softDeleteCol : workflowFolder . archivedAt ,
@@ -107,18 +156,6 @@ const CLEANUP_TARGETS = [
107156 wsCol : userTableDefinitions . workspaceId ,
108157 name : 'userTableDefinitions' ,
109158 } ,
110- {
111- table : workspaceFile ,
112- softDeleteCol : workspaceFile . deletedAt ,
113- wsCol : workspaceFile . workspaceId ,
114- name : 'workspaceFile' ,
115- } ,
116- {
117- table : workspaceFiles ,
118- softDeleteCol : workspaceFiles . deletedAt ,
119- wsCol : workspaceFiles . workspaceId ,
120- name : 'workspaceFiles' ,
121- } ,
122159 { table : memory , softDeleteCol : memory . deletedAt , wsCol : memory . workspaceId , name : 'memory' } ,
123160 {
124161 table : mcpServers ,
@@ -161,18 +198,23 @@ export async function runCleanupSoftDeletes(payload: CleanupJobPayload): Promise
161198 `[${ label } ] Processing ${ workspaceIds . length } workspaces, cutoff: ${ retentionDate . toISOString ( ) } `
162199 )
163200
164- // Find chats linked to workflows that are about to be cascade-deleted
165- const doomedWorkflows = await db
166- . select ( { id : workflow . id } )
167- . from ( workflow )
168- . where (
169- and (
170- inArray ( workflow . workspaceId , workspaceIds ) ,
171- isNotNull ( workflow . archivedAt ) ,
172- lt ( workflow . archivedAt , retentionDate )
201+ // Select workflows + files once. These sets drive BOTH external cleanup
202+ // (chats + S3) AND the DB deletes below — selecting twice could return
203+ // different subsets above the LIMIT cap and orphan or prematurely purge data.
204+ const [ doomedWorkflows , fileScope ] = await Promise . all ( [
205+ db
206+ . select ( { id : workflow . id } )
207+ . from ( workflow )
208+ . where (
209+ and (
210+ inArray ( workflow . workspaceId , workspaceIds ) ,
211+ isNotNull ( workflow . archivedAt ) ,
212+ lt ( workflow . archivedAt , retentionDate )
213+ )
173214 )
174- )
175- . limit ( DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE )
215+ . limit ( DEFAULT_BATCH_SIZE * DEFAULT_MAX_BATCHES_PER_TABLE ) ,
216+ selectExpiredWorkspaceFiles ( workspaceIds , retentionDate ) ,
217+ ] )
176218
177219 const doomedWorkflowIds = doomedWorkflows . map ( ( w ) => w . id )
178220 let chatCleanup : { execute : ( ) => Promise < void > } | null = null
@@ -190,9 +232,35 @@ export async function runCleanupSoftDeletes(payload: CleanupJobPayload): Promise
190232 }
191233 }
192234
193- const fileStats = await cleanupWorkspaceFileStorage ( workspaceIds , retentionDate )
235+ const fileStats = await cleanupWorkspaceFileStorage ( fileScope )
194236
195237 let totalDeleted = 0
238+
239+ // Delete the workflow + file rows using the exact IDs we already selected.
240+ const workflowResult = await deleteRowsById (
241+ workflow ,
242+ workflow . id ,
243+ doomedWorkflowIds ,
244+ `${ label } /workflow`
245+ )
246+ totalDeleted += workflowResult . deleted
247+
248+ const legacyFileResult = await deleteRowsById (
249+ workspaceFile ,
250+ workspaceFile . id ,
251+ fileScope . legacyRows . map ( ( r ) => r . id ) ,
252+ `${ label } /workspaceFile`
253+ )
254+ totalDeleted += legacyFileResult . deleted
255+
256+ const multiContextFileResult = await deleteRowsById (
257+ workspaceFiles ,
258+ workspaceFiles . id ,
259+ fileScope . multiContextRows . map ( ( r ) => r . id ) ,
260+ `${ label } /workspaceFiles`
261+ )
262+ totalDeleted += multiContextFileResult . deleted
263+
196264 for ( const target of CLEANUP_TARGETS ) {
197265 const result = await batchDeleteByWorkspaceAndTimestamp ( {
198266 tableDef : target . table ,
0 commit comments