Skip to content

Commit 09f4c94

Browse files
TheodoreSpeakswaleedlatif1Sg312icecrasher321
authored
feat(block): Allow wait block to wait up to 30 days (#4331)
* v0.6.29: login improvements, posthog telemetry (#4026) * feat(posthog): Add tracking on mothership abort (#4023) Co-authored-by: Theodore Li <theo@sim.ai> * fix(login): fix captcha headers for manual login (#4025) * fix(signup): fix turnstile key loading * fix(login): fix captcha header passing * Catch user already exists, remove login form captcha * feat(block): Allow wait block to wait up to 30 days * restore ff * Filter out waits from hitl endpoints * Use correct count, filtering out wait blocks * improvement(wait): tighten poll route and pause-manager helpers - Parallelize per-row dispatch with Promise.all - Add status='paused' guard on nextResumeAt rewrite to prevent clobbering concurrent resumes - Extract computeEarliestResumeAt + PauseResumeManager.setNextResumeAt helpers - Use canonical PausePoint type in poll route (drop StoredPausePoint) - Narrow UNIT_TO_MS via as const + WaitUnit guard - Bump LOCK_TTL_SECONDS above route maxDuration - Clearer error when allowedPauseKinds rejects a resume --------- Co-authored-by: Waleed <walif6@gmail.com> Co-authored-by: Siddharth Ganesan <33737564+Sg312@users.noreply.github.com> Co-authored-by: Vikhyath Mondreti <vikhyathvikku@gmail.com>
1 parent 1baa580 commit 09f4c94

17 files changed

Lines changed: 15811 additions & 171 deletions

File tree

apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,7 @@ export const POST = withRouteHandler(
145145
contextId,
146146
resumeInput,
147147
userId,
148+
allowedPauseKinds: ['human'],
148149
})
149150

150151
if (enqueueResult.status === 'queued') {
Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
import { db } from '@sim/db'
2+
import { pausedExecutions } from '@sim/db/schema'
3+
import { createLogger } from '@sim/logger'
4+
import { toError } from '@sim/utils/errors'
5+
import { generateShortId } from '@sim/utils/id'
6+
import { and, asc, eq, isNotNull, lte } from 'drizzle-orm'
7+
import { type NextRequest, NextResponse } from 'next/server'
8+
import { verifyCronAuth } from '@/lib/auth/internal'
9+
import { acquireLock, releaseLock } from '@/lib/core/config/redis'
10+
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
11+
import {
12+
computeEarliestResumeAt,
13+
PauseResumeManager,
14+
} from '@/lib/workflows/executor/human-in-the-loop-manager'
15+
import type { PausePoint } from '@/executor/types'
16+
17+
const logger = createLogger('TimePauseResumePoll')
18+
19+
export const dynamic = 'force-dynamic'
20+
export const maxDuration = 120
21+
22+
const LOCK_KEY = 'time-pause-resume-poll-lock'
23+
const LOCK_TTL_SECONDS = 180
24+
const POLL_BATCH_LIMIT = 200
25+
26+
interface DispatchFailure {
27+
executionId: string
28+
contextId: string
29+
error: string
30+
}
31+
32+
interface RowResult {
33+
dispatched: number
34+
failures: DispatchFailure[]
35+
}
36+
37+
export const GET = withRouteHandler(async (request: NextRequest) => {
38+
const requestId = generateShortId()
39+
40+
const authError = verifyCronAuth(request, 'Time-pause resume poll')
41+
if (authError) return authError
42+
43+
const lockAcquired = await acquireLock(LOCK_KEY, requestId, LOCK_TTL_SECONDS)
44+
if (!lockAcquired) {
45+
return NextResponse.json(
46+
{ success: true, message: 'Polling already in progress – skipped', requestId },
47+
{ status: 202 }
48+
)
49+
}
50+
51+
try {
52+
const now = new Date()
53+
54+
const dueRows = await db
55+
.select({
56+
id: pausedExecutions.id,
57+
executionId: pausedExecutions.executionId,
58+
workflowId: pausedExecutions.workflowId,
59+
pausePoints: pausedExecutions.pausePoints,
60+
metadata: pausedExecutions.metadata,
61+
})
62+
.from(pausedExecutions)
63+
.where(
64+
and(
65+
eq(pausedExecutions.status, 'paused'),
66+
isNotNull(pausedExecutions.nextResumeAt),
67+
lte(pausedExecutions.nextResumeAt, now)
68+
)
69+
)
70+
.orderBy(asc(pausedExecutions.nextResumeAt))
71+
.limit(POLL_BATCH_LIMIT)
72+
73+
const results = await Promise.all(dueRows.map((row) => dispatchRow(row, now)))
74+
const dispatched = results.reduce((sum, r) => sum + r.dispatched, 0)
75+
const failures = results.flatMap((r) => r.failures)
76+
77+
logger.info('Time-pause resume poll completed', {
78+
requestId,
79+
claimedRows: dueRows.length,
80+
dispatched,
81+
failureCount: failures.length,
82+
})
83+
84+
return NextResponse.json({
85+
success: true,
86+
requestId,
87+
claimedRows: dueRows.length,
88+
dispatched,
89+
failures,
90+
})
91+
} catch (error) {
92+
const message = toError(error).message
93+
logger.error('Time-pause resume poll failed', { requestId, error: message })
94+
return NextResponse.json({ success: false, requestId, error: message }, { status: 500 })
95+
} finally {
96+
await releaseLock(LOCK_KEY, requestId).catch(() => {})
97+
}
98+
})
99+
100+
interface DueRow {
101+
id: string
102+
executionId: string
103+
workflowId: string
104+
pausePoints: unknown
105+
metadata: unknown
106+
}
107+
108+
async function dispatchRow(row: DueRow, now: Date): Promise<RowResult> {
109+
const points = (row.pausePoints ?? {}) as Record<string, PausePoint>
110+
const metadata = (row.metadata ?? {}) as Record<string, unknown>
111+
const userId = typeof metadata.executorUserId === 'string' ? metadata.executorUserId : ''
112+
113+
const eligiblePoints = Object.values(points).filter(
114+
(point) =>
115+
point.pauseKind === 'time' && (!point.resumeStatus || point.resumeStatus === 'paused')
116+
)
117+
const duePoints = eligiblePoints.filter((point) => {
118+
if (!point.resumeAt) return false
119+
const at = new Date(point.resumeAt)
120+
return !Number.isNaN(at.getTime()) && at <= now
121+
})
122+
123+
const failures: DispatchFailure[] = []
124+
let dispatched = 0
125+
126+
for (const point of duePoints) {
127+
if (!point.contextId) continue
128+
try {
129+
const enqueueResult = await PauseResumeManager.enqueueOrStartResume({
130+
executionId: row.executionId,
131+
contextId: point.contextId,
132+
resumeInput: {},
133+
userId,
134+
allowedPauseKinds: ['time'],
135+
})
136+
137+
if (enqueueResult.status === 'starting') {
138+
PauseResumeManager.startResumeExecution({
139+
resumeEntryId: enqueueResult.resumeEntryId,
140+
resumeExecutionId: enqueueResult.resumeExecutionId,
141+
pausedExecution: enqueueResult.pausedExecution,
142+
contextId: enqueueResult.contextId,
143+
resumeInput: enqueueResult.resumeInput,
144+
userId: enqueueResult.userId,
145+
}).catch((error) => {
146+
logger.error('Background time-pause resume failed', {
147+
executionId: row.executionId,
148+
contextId: point.contextId,
149+
error: toError(error).message,
150+
})
151+
})
152+
}
153+
dispatched++
154+
} catch (error) {
155+
const message = toError(error).message
156+
logger.warn('Failed to dispatch time-pause resume', {
157+
executionId: row.executionId,
158+
contextId: point.contextId,
159+
error: message,
160+
})
161+
failures.push({ executionId: row.executionId, contextId: point.contextId, error: message })
162+
}
163+
}
164+
165+
// We never auto-retry a failed dispatch: workflow blocks aren't idempotent, and
166+
// an operator must investigate stranded rows by hand. The status='paused' guard
167+
// also prevents clobbering when a concurrent manual resume has already advanced
168+
// the row's state since we read it.
169+
await PauseResumeManager.setNextResumeAt({
170+
pausedExecutionId: row.id,
171+
nextResumeAt: computeEarliestResumeAt(eligiblePoints, { after: now }),
172+
})
173+
174+
return { dispatched, failures }
175+
}

apps/sim/app/resume/[workflowId]/[executionId]/resume-page-client.tsx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ interface PausePointWithQueue {
8888
latestResumeEntry?: ResumeQueueEntrySummary | null
8989
parallelScope?: any
9090
loopScope?: any
91+
pauseKind?: 'human' | 'time'
92+
resumeAt?: string
9193
}
9294

9395
interface PausedExecutionSummary {

apps/sim/blocks/blocks/wait.ts

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,14 @@ const WaitIcon = (props: SVGProps<SVGSVGElement>) => createElement(PauseCircle,
88
export const WaitBlock: BlockConfig = {
99
type: 'wait',
1010
name: 'Wait',
11-
description: 'Pause workflow execution for a specified time delay',
11+
description: 'Pause workflow execution for up to 30 days',
1212
longDescription:
13-
'Pauses workflow execution for a specified time interval. The wait executes a simple sleep for the configured duration.',
13+
'Pauses workflow execution for a specified time interval. Waits up to five minutes are held in-process; longer waits suspend the workflow and resume automatically once the configured duration elapses.',
1414
bestPractices: `
15-
- Use for simple time delays (max 10 minutes)
16-
- Configure the wait amount and unit (seconds or minutes)
17-
- Time-based waits are interruptible via workflow cancellation
15+
- Configure the wait amount and unit (seconds, minutes, hours, or days)
16+
- Maximum wait duration is 30 days
17+
- Waits up to 5 minutes execute in-process and are interruptible via workflow cancellation
18+
- Longer waits suspend the workflow; the execution resumes automatically when the timer fires
1819
- Enter a positive number for the wait amount
1920
`,
2021
category: 'blocks',
@@ -26,7 +27,7 @@ export const WaitBlock: BlockConfig = {
2627
id: 'timeValue',
2728
title: 'Wait Amount',
2829
type: 'short-input',
29-
description: 'Max: 600 seconds or 10 minutes',
30+
description: 'Max: 30 days',
3031
placeholder: '10',
3132
value: () => '10',
3233
required: true,
@@ -38,6 +39,8 @@ export const WaitBlock: BlockConfig = {
3839
options: [
3940
{ label: 'Seconds', id: 'seconds' },
4041
{ label: 'Minutes', id: 'minutes' },
42+
{ label: 'Hours', id: 'hours' },
43+
{ label: 'Days', id: 'days' },
4144
],
4245
value: () => 'seconds',
4346
required: true,
@@ -53,7 +56,7 @@ export const WaitBlock: BlockConfig = {
5356
},
5457
timeUnit: {
5558
type: 'string',
56-
description: 'Wait duration unit (seconds or minutes)',
59+
description: 'Wait duration unit (seconds, minutes, hours, or days)',
5760
},
5861
},
5962
outputs: {
@@ -65,5 +68,9 @@ export const WaitBlock: BlockConfig = {
6568
type: 'string',
6669
description: 'Status of the wait block (waiting, completed, cancelled)',
6770
},
71+
resumeAt: {
72+
type: 'string',
73+
description: 'ISO timestamp at which a suspended wait will resume (long waits only)',
74+
},
6875
},
6976
}

apps/sim/executor/execution/engine.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,8 @@ export class ExecutionEngine {
484484
parallelScope: pause.parallelScope,
485485
loopScope: pause.loopScope,
486486
resumeLinks: pause.resumeLinks,
487+
pauseKind: pause.pauseKind,
488+
resumeAt: pause.resumeAt,
487489
}))
488490

489491
return {

apps/sim/executor/handlers/human-in-the-loop/human-in-the-loop-handler.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ export class HumanInTheLoopBlockHandler implements BlockHandler {
188188
parallelScope,
189189
loopScope,
190190
resumeLinks,
191+
pauseKind: 'human',
191192
}
192193

193194
const responseOutput: Record<string, any> = {

0 commit comments

Comments
 (0)