Skip to content

Commit 80202dd

Browse files
improvement(wait): tighten poll route and pause-manager helpers
- Parallelize per-row dispatch with Promise.all - Add status='paused' guard on nextResumeAt rewrite to prevent clobbering concurrent resumes - Extract computeEarliestResumeAt + PauseResumeManager.setNextResumeAt helpers - Use canonical PausePoint type in poll route (drop StoredPausePoint) - Narrow UNIT_TO_MS via as const + WaitUnit guard - Bump LOCK_TTL_SECONDS above route maxDuration - Clearer error when allowedPauseKinds rejects a resume
1 parent c234b01 commit 80202dd

5 files changed

Lines changed: 497 additions & 1187 deletions

File tree

apps/sim/app/api/resume/poll/route.ts

Lines changed: 96 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,30 @@ import { type NextRequest, NextResponse } from 'next/server'
88
import { verifyCronAuth } from '@/lib/auth/internal'
99
import { acquireLock, releaseLock } from '@/lib/core/config/redis'
1010
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
11-
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
11+
import {
12+
computeEarliestResumeAt,
13+
PauseResumeManager,
14+
} from '@/lib/workflows/executor/human-in-the-loop-manager'
15+
import type { PausePoint } from '@/executor/types'
1216

1317
const logger = createLogger('TimePauseResumePoll')
1418

1519
export const dynamic = 'force-dynamic'
1620
export const maxDuration = 120
1721

1822
const LOCK_KEY = 'time-pause-resume-poll-lock'
19-
const LOCK_TTL_SECONDS = 120
23+
const LOCK_TTL_SECONDS = 180
2024
const POLL_BATCH_LIMIT = 200
2125

22-
interface StoredPausePoint {
23-
contextId?: string
24-
resumeStatus?: string
25-
pauseKind?: string
26-
resumeAt?: string
26+
interface DispatchFailure {
27+
executionId: string
28+
contextId: string
29+
error: string
30+
}
31+
32+
interface RowResult {
33+
dispatched: number
34+
failures: DispatchFailure[]
2735
}
2836

2937
export const GET = withRouteHandler(async (request: NextRequest) => {
@@ -40,10 +48,6 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
4048
)
4149
}
4250

43-
let claimedRows = 0
44-
let dispatched = 0
45-
const failures: { executionId: string; contextId: string; error: string }[] = []
46-
4751
try {
4852
const now = new Date()
4953

@@ -65,89 +69,21 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
6569
)
6670
.limit(POLL_BATCH_LIMIT)
6771

68-
claimedRows = dueRows.length
69-
70-
for (const row of dueRows) {
71-
const points = (row.pausePoints ?? {}) as Record<string, StoredPausePoint>
72-
const metadata = (row.metadata ?? {}) as Record<string, unknown>
73-
const userId = typeof metadata.executorUserId === 'string' ? metadata.executorUserId : ''
74-
75-
const duePoints: StoredPausePoint[] = []
76-
let nextRemaining: Date | null = null
77-
78-
for (const point of Object.values(points)) {
79-
if (point.pauseKind !== 'time' || !point.resumeAt) continue
80-
if (point.resumeStatus && point.resumeStatus !== 'paused') continue
81-
82-
const resumeAt = new Date(point.resumeAt)
83-
if (Number.isNaN(resumeAt.getTime())) continue
84-
85-
if (resumeAt <= now) {
86-
duePoints.push(point)
87-
} else if (!nextRemaining || resumeAt < nextRemaining) {
88-
nextRemaining = resumeAt
89-
}
90-
}
91-
92-
for (const point of duePoints) {
93-
const contextId = point.contextId
94-
if (!contextId) continue
95-
try {
96-
const enqueueResult = await PauseResumeManager.enqueueOrStartResume({
97-
executionId: row.executionId,
98-
contextId,
99-
resumeInput: {},
100-
userId,
101-
})
102-
103-
if (enqueueResult.status === 'starting') {
104-
PauseResumeManager.startResumeExecution({
105-
resumeEntryId: enqueueResult.resumeEntryId,
106-
resumeExecutionId: enqueueResult.resumeExecutionId,
107-
pausedExecution: enqueueResult.pausedExecution,
108-
contextId: enqueueResult.contextId,
109-
resumeInput: enqueueResult.resumeInput,
110-
userId: enqueueResult.userId,
111-
}).catch((error) => {
112-
logger.error('Background time-pause resume failed', {
113-
executionId: row.executionId,
114-
contextId,
115-
error: toError(error).message,
116-
})
117-
})
118-
}
119-
dispatched++
120-
} catch (error) {
121-
const message = toError(error).message
122-
logger.warn('Failed to dispatch time-pause resume', {
123-
executionId: row.executionId,
124-
contextId,
125-
error: message,
126-
})
127-
failures.push({ executionId: row.executionId, contextId, error: message })
128-
}
129-
}
130-
131-
// We never auto-retry a failed dispatch: workflow blocks aren't idempotent, and an
132-
// operator must investigate stranded rows by hand. Setting nextResumeAt to the next
133-
// future pause (or null) drops the row out of the poll, surfacing the failure.
134-
await db
135-
.update(pausedExecutions)
136-
.set({ nextResumeAt: nextRemaining })
137-
.where(eq(pausedExecutions.id, row.id))
138-
}
72+
const results = await Promise.all(dueRows.map((row) => dispatchRow(row, now)))
73+
const dispatched = results.reduce((sum, r) => sum + r.dispatched, 0)
74+
const failures = results.flatMap((r) => r.failures)
13975

14076
logger.info('Time-pause resume poll completed', {
14177
requestId,
142-
claimedRows,
78+
claimedRows: dueRows.length,
14379
dispatched,
14480
failureCount: failures.length,
14581
})
14682

14783
return NextResponse.json({
14884
success: true,
14985
requestId,
150-
claimedRows,
86+
claimedRows: dueRows.length,
15187
dispatched,
15288
failures,
15389
})
@@ -159,3 +95,79 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
15995
await releaseLock(LOCK_KEY, requestId).catch(() => {})
16096
}
16197
})
98+
99+
interface DueRow {
100+
id: string
101+
executionId: string
102+
workflowId: string
103+
pausePoints: unknown
104+
metadata: unknown
105+
}
106+
107+
async function dispatchRow(row: DueRow, now: Date): Promise<RowResult> {
108+
const points = (row.pausePoints ?? {}) as Record<string, PausePoint>
109+
const metadata = (row.metadata ?? {}) as Record<string, unknown>
110+
const userId = typeof metadata.executorUserId === 'string' ? metadata.executorUserId : ''
111+
112+
const eligiblePoints = Object.values(points).filter(
113+
(point) =>
114+
point.pauseKind === 'time' && (!point.resumeStatus || point.resumeStatus === 'paused')
115+
)
116+
const duePoints = eligiblePoints.filter((point) => {
117+
if (!point.resumeAt) return false
118+
const at = new Date(point.resumeAt)
119+
return !Number.isNaN(at.getTime()) && at <= now
120+
})
121+
122+
const failures: DispatchFailure[] = []
123+
let dispatched = 0
124+
125+
for (const point of duePoints) {
126+
if (!point.contextId) continue
127+
try {
128+
const enqueueResult = await PauseResumeManager.enqueueOrStartResume({
129+
executionId: row.executionId,
130+
contextId: point.contextId,
131+
resumeInput: {},
132+
userId,
133+
})
134+
135+
if (enqueueResult.status === 'starting') {
136+
PauseResumeManager.startResumeExecution({
137+
resumeEntryId: enqueueResult.resumeEntryId,
138+
resumeExecutionId: enqueueResult.resumeExecutionId,
139+
pausedExecution: enqueueResult.pausedExecution,
140+
contextId: enqueueResult.contextId,
141+
resumeInput: enqueueResult.resumeInput,
142+
userId: enqueueResult.userId,
143+
}).catch((error) => {
144+
logger.error('Background time-pause resume failed', {
145+
executionId: row.executionId,
146+
contextId: point.contextId,
147+
error: toError(error).message,
148+
})
149+
})
150+
}
151+
dispatched++
152+
} catch (error) {
153+
const message = toError(error).message
154+
logger.warn('Failed to dispatch time-pause resume', {
155+
executionId: row.executionId,
156+
contextId: point.contextId,
157+
error: message,
158+
})
159+
failures.push({ executionId: row.executionId, contextId: point.contextId, error: message })
160+
}
161+
}
162+
163+
// We never auto-retry a failed dispatch: workflow blocks aren't idempotent, and
164+
// an operator must investigate stranded rows by hand. The status='paused' guard
165+
// also prevents clobbering when a concurrent manual resume has already advanced
166+
// the row's state since we read it.
167+
await PauseResumeManager.setNextResumeAt({
168+
pausedExecutionId: row.id,
169+
nextResumeAt: computeEarliestResumeAt(eligiblePoints, { after: now }),
170+
})
171+
172+
return { dispatched, failures }
173+
}

apps/sim/executor/handlers/wait/wait-handler.ts

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,17 @@ const sleep = async (ms: number, options: SleepOptions = {}): Promise<boolean> =
7575
})
7676
}
7777

78-
const UNIT_TO_MS: Record<string, number> = {
78+
const UNIT_TO_MS = {
7979
seconds: 1000,
8080
minutes: 60 * 1000,
8181
hours: 60 * 60 * 1000,
8282
days: 24 * 60 * 60 * 1000,
83+
} as const satisfies Record<string, number>
84+
85+
type WaitUnit = keyof typeof UNIT_TO_MS
86+
87+
function isWaitUnit(value: string): value is WaitUnit {
88+
return value in UNIT_TO_MS
8389
}
8490

8591
/**
@@ -122,12 +128,10 @@ export class WaitBlockHandler implements BlockHandler {
122128
throw new Error('Wait amount must be a positive number')
123129
}
124130

125-
const unitMs = UNIT_TO_MS[timeUnit]
126-
if (!unitMs) {
131+
if (!isWaitUnit(timeUnit)) {
127132
throw new Error(`Unknown wait unit: ${timeUnit}`)
128133
}
129-
130-
const waitMs = timeValue * unitMs
134+
const waitMs = timeValue * UNIT_TO_MS[timeUnit]
131135

132136
if (waitMs > MAX_WAIT_MS) {
133137
throw new Error('Wait time exceeds maximum of 30 days')

apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts

Lines changed: 42 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,27 @@ interface StartResumeExecutionArgs {
126126
abortSignal?: AbortSignal
127127
}
128128

129+
/**
130+
* Returns the earliest `resumeAt` across `pauseKind: 'time'` pause points whose
131+
* `resumeAt` is a valid date and (when `after` is provided) strictly later than it.
132+
* Returns `null` when no candidate exists.
133+
*/
134+
export function computeEarliestResumeAt(
135+
points: Iterable<Pick<PausePoint, 'pauseKind' | 'resumeAt'>>,
136+
options: { after?: Date } = {}
137+
): Date | null {
138+
const { after } = options
139+
let earliest: Date | null = null
140+
for (const point of points) {
141+
if (point.pauseKind !== 'time' || !point.resumeAt) continue
142+
const candidate = new Date(point.resumeAt)
143+
if (Number.isNaN(candidate.getTime())) continue
144+
if (after && candidate <= after) continue
145+
if (!earliest || candidate < earliest) earliest = candidate
146+
}
147+
return earliest
148+
}
149+
129150
export class PauseResumeManager {
130151
static async persistPauseResult(args: PersistPauseResultArgs): Promise<void> {
131152
const { workflowId, executionId, pausePoints, snapshotSeed, executorUserId } = args
@@ -147,13 +168,7 @@ export class PauseResumeManager {
147168
return acc
148169
}, {})
149170

150-
const nextResumeAt = pausePoints.reduce<Date | null>((earliest, point) => {
151-
if (point.pauseKind !== 'time' || !point.resumeAt) return earliest
152-
const candidate = new Date(point.resumeAt)
153-
if (Number.isNaN(candidate.getTime())) return earliest
154-
if (!earliest || candidate < earliest) return candidate
155-
return earliest
156-
}, null)
171+
const nextResumeAt = computeEarliestResumeAt(pausePoints)
157172

158173
const now = new Date()
159174

@@ -232,7 +247,9 @@ export class PauseResumeManager {
232247

233248
const pauseKind: PauseKind = pausePoint.pauseKind ?? 'human'
234249
if (allowedPauseKinds && !allowedPauseKinds.includes(pauseKind)) {
235-
throw new Error(`Pause point cannot be resumed manually (pauseKind=${pauseKind})`)
250+
throw new Error(
251+
`Pause kind '${pauseKind}' is not allowed for this resume endpoint (allowed: ${allowedPauseKinds.join(', ')})`
252+
)
236253
}
237254

238255
const activeResume = await tx
@@ -1329,6 +1346,23 @@ export class PauseResumeManager {
13291346
})
13301347
}
13311348

1349+
/**
1350+
* Updates `next_resume_at` only when the row is still `status='paused'`.
1351+
* Guard prevents the cron poller from clobbering a freshly-written value when a
1352+
* concurrent manual resume has already advanced the row's state.
1353+
*/
1354+
static async setNextResumeAt(args: {
1355+
pausedExecutionId: string
1356+
nextResumeAt: Date | null
1357+
}): Promise<void> {
1358+
await db
1359+
.update(pausedExecutions)
1360+
.set({ nextResumeAt: args.nextResumeAt })
1361+
.where(
1362+
and(eq(pausedExecutions.id, args.pausedExecutionId), eq(pausedExecutions.status, 'paused'))
1363+
)
1364+
}
1365+
13321366
static async listPausedExecutions(options: {
13331367
workflowId: string
13341368
status?: string | string[]

0 commit comments

Comments
 (0)