Skip to content

Commit aaf6f19

Browse files
committed
deal with trigger worker crashes
1 parent c3513c2 commit aaf6f19

File tree

5 files changed

+240
-4
lines changed

5 files changed

+240
-4
lines changed
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
import { db } from '@sim/db'
2+
import { workflowExecutionLogs } from '@sim/db/schema'
3+
import { createLogger } from '@sim/logger'
4+
import { and, eq, lt, sql } from 'drizzle-orm'
5+
import { type NextRequest, NextResponse } from 'next/server'
6+
import { verifyCronAuth } from '@/lib/auth/internal'
7+
8+
const logger = createLogger('CleanupStaleExecutions')
9+
10+
const STALE_THRESHOLD_MINUTES = 30
11+
12+
export async function GET(request: NextRequest) {
13+
try {
14+
const authError = verifyCronAuth(request, 'Stale execution cleanup')
15+
if (authError) {
16+
return authError
17+
}
18+
19+
logger.info('Starting stale execution cleanup job')
20+
21+
const staleThreshold = new Date(Date.now() - STALE_THRESHOLD_MINUTES * 60 * 1000)
22+
23+
const staleExecutions = await db
24+
.select({
25+
id: workflowExecutionLogs.id,
26+
executionId: workflowExecutionLogs.executionId,
27+
workflowId: workflowExecutionLogs.workflowId,
28+
startedAt: workflowExecutionLogs.startedAt,
29+
})
30+
.from(workflowExecutionLogs)
31+
.where(
32+
and(
33+
eq(workflowExecutionLogs.status, 'running'),
34+
lt(workflowExecutionLogs.startedAt, staleThreshold)
35+
)
36+
)
37+
.limit(100)
38+
39+
logger.info(`Found ${staleExecutions.length} stale executions to clean up`)
40+
41+
let cleaned = 0
42+
let failed = 0
43+
44+
for (const execution of staleExecutions) {
45+
try {
46+
const staleDurationMs = Date.now() - new Date(execution.startedAt).getTime()
47+
const staleDurationMinutes = Math.round(staleDurationMs / 60000)
48+
49+
await db
50+
.update(workflowExecutionLogs)
51+
.set({
52+
status: 'failed',
53+
endedAt: new Date(),
54+
totalDurationMs: staleDurationMs,
55+
executionData: sql`jsonb_set(
56+
COALESCE(execution_data, '{}'::jsonb),
57+
ARRAY['error'],
58+
to_jsonb(${'Execution terminated: worker timeout or crash after ' + staleDurationMinutes + ' minutes'}::text)
59+
)`,
60+
})
61+
.where(eq(workflowExecutionLogs.id, execution.id))
62+
63+
logger.info(`Cleaned up stale execution ${execution.executionId}`, {
64+
workflowId: execution.workflowId,
65+
staleDurationMinutes,
66+
})
67+
68+
cleaned++
69+
} catch (error) {
70+
logger.error(`Failed to clean up execution ${execution.executionId}:`, {
71+
error: error instanceof Error ? error.message : String(error),
72+
})
73+
failed++
74+
}
75+
}
76+
77+
logger.info(`Stale execution cleanup completed. Cleaned: ${cleaned}, Failed: ${failed}`)
78+
79+
return NextResponse.json({
80+
success: true,
81+
found: staleExecutions.length,
82+
cleaned,
83+
failed,
84+
thresholdMinutes: STALE_THRESHOLD_MINUTES,
85+
})
86+
} catch (error) {
87+
logger.error('Error in stale execution cleanup job:', error)
88+
return NextResponse.json({ error: 'Internal server error' }, { status: 500 })
89+
}
90+
}

apps/sim/executor/execution/snapshot.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,12 @@ export interface ExecutionCallbacks {
3232
blockId: string,
3333
blockName: string,
3434
blockType: string,
35-
output: any
35+
output: any,
36+
iterationContext?: {
37+
iterationCurrent: number
38+
iterationTotal: number
39+
iterationType: 'loop' | 'parallel'
40+
}
3641
) => Promise<void>
3742
}
3843

apps/sim/lib/logs/execution/logger.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
import { createLogger } from '@sim/logger'
1010
import { eq, sql } from 'drizzle-orm'
1111
import { v4 as uuidv4 } from 'uuid'
12+
import { BASE_EXECUTION_CHARGE } from '@/lib/billing/constants'
1213
import { getHighestPrioritySubscription } from '@/lib/billing/core/subscription'
1314
import {
1415
checkUsageStatus,
@@ -130,6 +131,13 @@ export class ExecutionLogger implements IExecutionLoggerService {
130131
environment,
131132
trigger,
132133
},
134+
cost: {
135+
total: BASE_EXECUTION_CHARGE,
136+
input: 0,
137+
output: 0,
138+
tokens: { input: 0, output: 0, total: 0 },
139+
models: {},
140+
},
133141
})
134142
.returning()
135143

apps/sim/lib/logs/execution/logging-session.ts

Lines changed: 122 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,22 @@ export interface SessionPausedParams {
6060
workflowInput?: any
6161
}
6262

63+
interface AccumulatedCost {
64+
total: number
65+
input: number
66+
output: number
67+
tokens: { input: number; output: number; total: number }
68+
models: Record<
69+
string,
70+
{
71+
input: number
72+
output: number
73+
total: number
74+
tokens: { input: number; output: number; total: number }
75+
}
76+
>
77+
}
78+
6379
export class LoggingSession {
6480
private workflowId: string
6581
private executionId: string
@@ -70,6 +86,14 @@ export class LoggingSession {
7086
private workflowState?: WorkflowState
7187
private isResume = false
7288
private completed = false
89+
private accumulatedCost: AccumulatedCost = {
90+
total: BASE_EXECUTION_CHARGE,
91+
input: 0,
92+
output: 0,
93+
tokens: { input: 0, output: 0, total: 0 },
94+
models: {},
95+
}
96+
private costFlushed = false
7397

7498
constructor(
7599
workflowId: string,
@@ -83,6 +107,102 @@ export class LoggingSession {
83107
this.requestId = requestId
84108
}
85109

110+
async onBlockComplete(
111+
blockId: string,
112+
blockName: string,
113+
blockType: string,
114+
output: any
115+
): Promise<void> {
116+
if (!output?.cost || typeof output.cost.total !== 'number' || output.cost.total <= 0) {
117+
return
118+
}
119+
120+
const { cost, tokens, model } = output
121+
122+
this.accumulatedCost.total += cost.total || 0
123+
this.accumulatedCost.input += cost.input || 0
124+
this.accumulatedCost.output += cost.output || 0
125+
126+
if (tokens) {
127+
this.accumulatedCost.tokens.input += tokens.input || 0
128+
this.accumulatedCost.tokens.output += tokens.output || 0
129+
this.accumulatedCost.tokens.total += tokens.total || 0
130+
}
131+
132+
if (model) {
133+
if (!this.accumulatedCost.models[model]) {
134+
this.accumulatedCost.models[model] = {
135+
input: 0,
136+
output: 0,
137+
total: 0,
138+
tokens: { input: 0, output: 0, total: 0 },
139+
}
140+
}
141+
this.accumulatedCost.models[model].input += cost.input || 0
142+
this.accumulatedCost.models[model].output += cost.output || 0
143+
this.accumulatedCost.models[model].total += cost.total || 0
144+
if (tokens) {
145+
this.accumulatedCost.models[model].tokens.input += tokens.input || 0
146+
this.accumulatedCost.models[model].tokens.output += tokens.output || 0
147+
this.accumulatedCost.models[model].tokens.total += tokens.total || 0
148+
}
149+
}
150+
151+
await this.flushAccumulatedCost()
152+
}
153+
154+
private async flushAccumulatedCost(): Promise<void> {
155+
try {
156+
await db
157+
.update(workflowExecutionLogs)
158+
.set({
159+
cost: {
160+
total: this.accumulatedCost.total,
161+
input: this.accumulatedCost.input,
162+
output: this.accumulatedCost.output,
163+
tokens: this.accumulatedCost.tokens,
164+
models: this.accumulatedCost.models,
165+
},
166+
})
167+
.where(eq(workflowExecutionLogs.executionId, this.executionId))
168+
169+
this.costFlushed = true
170+
} catch (error) {
171+
logger.error(`Failed to flush accumulated cost for execution ${this.executionId}:`, {
172+
error: error instanceof Error ? error.message : String(error),
173+
})
174+
}
175+
}
176+
177+
private async loadExistingCost(): Promise<void> {
178+
try {
179+
const [existing] = await db
180+
.select({ cost: workflowExecutionLogs.cost })
181+
.from(workflowExecutionLogs)
182+
.where(eq(workflowExecutionLogs.executionId, this.executionId))
183+
.limit(1)
184+
185+
if (existing?.cost) {
186+
const cost = existing.cost as any
187+
this.accumulatedCost = {
188+
total: cost.total || BASE_EXECUTION_CHARGE,
189+
input: cost.input || 0,
190+
output: cost.output || 0,
191+
tokens: {
192+
input: cost.tokens?.input || 0,
193+
output: cost.tokens?.output || 0,
194+
total: cost.tokens?.total || 0,
195+
},
196+
models: cost.models || {},
197+
}
198+
}
199+
} catch (error) {
200+
logger.error(`Failed to load existing cost for execution ${this.executionId}:`, {
201+
error: error instanceof Error ? error.message : String(error),
202+
})
203+
}
204+
}
205+
86206
async start(params: SessionStartParams): Promise<void> {
87207
const { userId, workspaceId, variables, triggerData, skipLogCreation, deploymentVersionId } =
88208
params
@@ -102,7 +222,6 @@ export class LoggingSession {
102222
? await loadDeployedWorkflowStateForLogging(this.workflowId)
103223
: await loadWorkflowStateForExecution(this.workflowId)
104224

105-
// Only create a new log entry if not resuming
106225
if (!skipLogCreation) {
107226
await executionLogger.startWorkflowExecution({
108227
workflowId: this.workflowId,
@@ -118,7 +237,8 @@ export class LoggingSession {
118237
logger.debug(`[${this.requestId}] Started logging for execution ${this.executionId}`)
119238
}
120239
} else {
121-
this.isResume = true // Mark as resume
240+
this.isResume = true
241+
await this.loadExistingCost()
122242
if (this.requestId) {
123243
logger.debug(
124244
`[${this.requestId}] Resuming logging for existing execution ${this.executionId}`

apps/sim/lib/workflows/executor/execution-core.ts

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,19 @@ export async function executeWorkflowCore(
316316
})
317317
}
318318

319+
const wrappedOnBlockComplete = async (
320+
blockId: string,
321+
blockName: string,
322+
blockType: string,
323+
output: any,
324+
iterationContext?: any
325+
) => {
326+
await loggingSession.onBlockComplete(blockId, blockName, blockType, output)
327+
if (onBlockComplete) {
328+
await onBlockComplete(blockId, blockName, blockType, output, iterationContext)
329+
}
330+
}
331+
319332
const contextExtensions: any = {
320333
stream: !!onStream,
321334
selectedOutputs,
@@ -324,7 +337,7 @@ export async function executeWorkflowCore(
324337
userId,
325338
isDeployedContext: triggerType !== 'manual',
326339
onBlockStart,
327-
onBlockComplete,
340+
onBlockComplete: wrappedOnBlockComplete,
328341
onStream,
329342
resumeFromSnapshot,
330343
resumePendingQueue,

0 commit comments

Comments
 (0)