11import { db } from '@sim/db'
2- import { workflowExecutionLogs , workspace } from '@sim/db/schema'
2+ import { workflowExecutionLogs } from '@sim/db/schema'
33import { createLogger } from '@sim/logger'
44import { task } from '@trigger.dev/sdk'
5- import { and , eq , inArray , lt } from 'drizzle-orm'
6- import {
7- type CleanupJobPayload ,
8- getRetentionDefaultHours ,
9- resolveTierWorkspaceIds ,
10- } from '@/lib/billing/cleanup-dispatcher'
5+ import { and , inArray , lt } from 'drizzle-orm'
6+ import { type CleanupJobPayload , resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher'
117import { snapshotService } from '@/lib/logs/execution/snapshot/service'
128import { isUsingCloudStorage , StorageService } from '@/lib/uploads'
139import { deleteFileMetadata } from '@/lib/uploads/server/metadata'
@@ -40,24 +36,27 @@ function emptyTierResults(): TierResults {
4036async function deleteExecutionFiles ( files : unknown , results : TierResults ) : Promise < void > {
4137 if ( ! isUsingCloudStorage ( ) || ! files || ! Array . isArray ( files ) ) return
4238
43- for ( const file of files ) {
44- if ( ! file || typeof file !== 'object' || ! file . key ) continue
45- results . filesTotal ++
46- try {
47- await StorageService . deleteFile ( { key : file . key , context : 'execution' } )
48- await deleteFileMetadata ( file . key )
49- results . filesDeleted ++
50- } catch ( fileError ) {
51- results . filesDeleteFailed ++
52- logger . error ( `Failed to delete file ${ file . key } :` , { fileError } )
53- }
54- }
39+ const keys = files . filter ( ( f ) => f && typeof f === 'object' && f . key ) . map ( ( f ) => f . key as string )
40+ results . filesTotal += keys . length
41+
42+ await Promise . all (
43+ keys . map ( async ( key ) => {
44+ try {
45+ await StorageService . deleteFile ( { key, context : 'execution' } )
46+ await deleteFileMetadata ( key )
47+ results . filesDeleted ++
48+ } catch ( fileError ) {
49+ results . filesDeleteFailed ++
50+ logger . error ( `Failed to delete file ${ key } :` , { fileError } )
51+ }
52+ } )
53+ )
5554}
5655
5756async function cleanupTier (
5857 workspaceIds : string [ ] ,
5958 retentionDate : Date ,
60- tierLabel : string
59+ label : string
6160) : Promise < TierResults > {
6261 const results = emptyTierResults ( )
6362 if ( workspaceIds . length === 0 ) return results
@@ -101,77 +100,46 @@ async function cleanupTier(
101100 results . deleted += deleted . length
102101 } catch ( deleteError ) {
103102 results . deleteFailed += logIds . length
104- logger . error ( `Batch delete failed for ${ tierLabel } :` , { deleteError } )
103+ logger . error ( `Batch delete failed for ${ label } :` , { deleteError } )
105104 }
106105
107106 batchesProcessed ++
108107 hasMore = batch . length === BATCH_SIZE
109108
110- logger . info ( `[${ tierLabel } ] Batch ${ batchesProcessed } : ${ batch . length } logs processed` )
109+ logger . info ( `[${ label } ] Batch ${ batchesProcessed } : ${ batch . length } logs processed` )
111110 }
112111
113112 return results
114113}
115114
116- async function resolvePayload ( payload : CleanupJobPayload ) : Promise < {
117- workspaceIds : string [ ]
118- retentionHours : number
119- tierLabel : string
120- } | null > {
121- if ( payload . tier === 'free' || payload . tier === 'paid' ) {
122- const retentionHours = getRetentionDefaultHours ( payload . tier , 'logRetentionHours' )
123- if ( retentionHours === null ) {
124- logger . info ( `[${ payload . tier } ] No default retention, skipping` )
125- return null
126- }
127- const workspaceIds = await resolveTierWorkspaceIds ( payload . tier )
128- return { workspaceIds, retentionHours, tierLabel : payload . tier }
129- }
130-
131- // enterprise
132- const [ ws ] = await db
133- . select ( { logRetentionHours : workspace . logRetentionHours } )
134- . from ( workspace )
135- . where ( eq ( workspace . id , payload . workspaceId ) )
136- . limit ( 1 )
137-
138- if ( ! ws ?. logRetentionHours ) {
139- logger . info ( `[enterprise/${ payload . workspaceId } ] No retention configured, skipping` )
140- return null
141- }
142-
143- return {
144- workspaceIds : [ payload . workspaceId ] ,
145- retentionHours : ws . logRetentionHours ,
146- tierLabel : `enterprise/${ payload . workspaceId } ` ,
147- }
148- }
149-
150115export async function runCleanupLogs ( payload : CleanupJobPayload ) : Promise < void > {
151116 const startTime = Date . now ( )
152117
153- const resolved = await resolvePayload ( payload )
154- if ( ! resolved ) return
118+ const scope = await resolveCleanupScope ( 'cleanup-logs' , payload )
119+ if ( ! scope ) {
120+ logger . info ( `[${ payload . plan } ] No retention configured, skipping` )
121+ return
122+ }
155123
156- const { workspaceIds, retentionHours, tierLabel } = resolved
124+ const { workspaceIds, retentionHours, label } = scope
157125
158126 if ( workspaceIds . length === 0 ) {
159- logger . info ( `[${ tierLabel } ] No workspaces to process` )
127+ logger . info ( `[${ label } ] No workspaces to process` )
160128 return
161129 }
162130
163131 const retentionDate = new Date ( Date . now ( ) - retentionHours * 60 * 60 * 1000 )
164132 logger . info (
165- `[${ tierLabel } ] Cleaning ${ workspaceIds . length } workspaces, cutoff: ${ retentionDate . toISOString ( ) } `
133+ `[${ label } ] Cleaning ${ workspaceIds . length } workspaces, cutoff: ${ retentionDate . toISOString ( ) } `
166134 )
167135
168- const results = await cleanupTier ( workspaceIds , retentionDate , tierLabel )
136+ const results = await cleanupTier ( workspaceIds , retentionDate , label )
169137 logger . info (
170- `[${ tierLabel } ] Result: ${ results . deleted } deleted, ${ results . deleteFailed } failed out of ${ results . total } candidates`
138+ `[${ label } ] Result: ${ results . deleted } deleted, ${ results . deleteFailed } failed out of ${ results . total } candidates`
171139 )
172140
173141 // Snapshot cleanup runs only on the free job to avoid running it N times for N enterprise workspaces.
174- if ( payload . tier === 'free' ) {
142+ if ( payload . plan === 'free' ) {
175143 try {
176144 const retentionDays = Math . floor ( retentionHours / 24 )
177145 const snapshotsCleaned = await snapshotService . cleanupOrphanedSnapshots ( retentionDays + 1 )
@@ -182,7 +150,7 @@ export async function runCleanupLogs(payload: CleanupJobPayload): Promise<void>
182150 }
183151
184152 const timeElapsed = ( Date . now ( ) - startTime ) / 1000
185- logger . info ( `[${ tierLabel } ] Job completed in ${ timeElapsed . toFixed ( 2 ) } s` )
153+ logger . info ( `[${ label } ] Job completed in ${ timeElapsed . toFixed ( 2 ) } s` )
186154}
187155
188156export const cleanupLogsTask = task ( {
0 commit comments