Skip to content

Commit 22ccaf1

Browse files
feat(block): Allow wait block to wait up to 30 days
1 parent 3afcad2 commit 22ccaf1

16 files changed

Lines changed: 15665 additions & 157 deletions

File tree

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
import { db } from '@sim/db'
2+
import { pausedExecutions } from '@sim/db/schema'
3+
import { createLogger } from '@sim/logger'
4+
import { toError } from '@sim/utils/errors'
5+
import { generateShortId } from '@sim/utils/id'
6+
import { and, eq, isNotNull, lte } from 'drizzle-orm'
7+
import { type NextRequest, NextResponse } from 'next/server'
8+
import { verifyCronAuth } from '@/lib/auth/internal'
9+
import { acquireLock, releaseLock } from '@/lib/core/config/redis'
10+
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
11+
import { PauseResumeManager } from '@/lib/workflows/executor/human-in-the-loop-manager'
12+
13+
const logger = createLogger('TimePauseResumePoll')
14+
15+
export const dynamic = 'force-dynamic'
16+
export const maxDuration = 120
17+
18+
const LOCK_KEY = 'time-pause-resume-poll-lock'
19+
const LOCK_TTL_SECONDS = 120
20+
const POLL_BATCH_LIMIT = 200
21+
22+
interface StoredPausePoint {
23+
contextId?: string
24+
resumeStatus?: string
25+
pauseKind?: string
26+
resumeAt?: string
27+
}
28+
29+
export const GET = withRouteHandler(async (request: NextRequest) => {
30+
const requestId = generateShortId()
31+
32+
const authError = verifyCronAuth(request, 'Time-pause resume poll')
33+
if (authError) return authError
34+
35+
const lockAcquired = await acquireLock(LOCK_KEY, requestId, LOCK_TTL_SECONDS)
36+
if (!lockAcquired) {
37+
return NextResponse.json(
38+
{ success: true, message: 'Polling already in progress – skipped', requestId },
39+
{ status: 202 }
40+
)
41+
}
42+
43+
let claimedRows = 0
44+
let dispatched = 0
45+
const failures: { executionId: string; contextId: string; error: string }[] = []
46+
47+
try {
48+
const now = new Date()
49+
50+
const dueRows = await db
51+
.select({
52+
id: pausedExecutions.id,
53+
executionId: pausedExecutions.executionId,
54+
workflowId: pausedExecutions.workflowId,
55+
pausePoints: pausedExecutions.pausePoints,
56+
metadata: pausedExecutions.metadata,
57+
})
58+
.from(pausedExecutions)
59+
.where(
60+
and(
61+
eq(pausedExecutions.status, 'paused'),
62+
isNotNull(pausedExecutions.nextResumeAt),
63+
lte(pausedExecutions.nextResumeAt, now)
64+
)
65+
)
66+
.limit(POLL_BATCH_LIMIT)
67+
68+
claimedRows = dueRows.length
69+
70+
for (const row of dueRows) {
71+
const points = (row.pausePoints ?? {}) as Record<string, StoredPausePoint>
72+
const metadata = (row.metadata ?? {}) as Record<string, unknown>
73+
const userId = typeof metadata.executorUserId === 'string' ? metadata.executorUserId : ''
74+
75+
const duePoints: StoredPausePoint[] = []
76+
let nextRemaining: Date | null = null
77+
78+
for (const point of Object.values(points)) {
79+
if (point.pauseKind !== 'time' || !point.resumeAt) continue
80+
if (point.resumeStatus && point.resumeStatus !== 'paused') continue
81+
82+
const resumeAt = new Date(point.resumeAt)
83+
if (Number.isNaN(resumeAt.getTime())) continue
84+
85+
if (resumeAt <= now) {
86+
duePoints.push(point)
87+
} else if (!nextRemaining || resumeAt < nextRemaining) {
88+
nextRemaining = resumeAt
89+
}
90+
}
91+
92+
for (const point of duePoints) {
93+
const contextId = point.contextId
94+
if (!contextId) continue
95+
try {
96+
const enqueueResult = await PauseResumeManager.enqueueOrStartResume({
97+
executionId: row.executionId,
98+
contextId,
99+
resumeInput: {},
100+
userId,
101+
})
102+
103+
if (enqueueResult.status === 'starting') {
104+
PauseResumeManager.startResumeExecution({
105+
resumeEntryId: enqueueResult.resumeEntryId,
106+
resumeExecutionId: enqueueResult.resumeExecutionId,
107+
pausedExecution: enqueueResult.pausedExecution,
108+
contextId: enqueueResult.contextId,
109+
resumeInput: enqueueResult.resumeInput,
110+
userId: enqueueResult.userId,
111+
}).catch((error) => {
112+
logger.error('Background time-pause resume failed', {
113+
executionId: row.executionId,
114+
contextId,
115+
error: toError(error).message,
116+
})
117+
})
118+
}
119+
dispatched++
120+
} catch (error) {
121+
const message = toError(error).message
122+
logger.warn('Failed to dispatch time-pause resume', {
123+
executionId: row.executionId,
124+
contextId,
125+
error: message,
126+
})
127+
failures.push({ executionId: row.executionId, contextId, error: message })
128+
}
129+
}
130+
131+
await db
132+
.update(pausedExecutions)
133+
.set({ nextResumeAt: nextRemaining })
134+
.where(eq(pausedExecutions.id, row.id))
135+
}
136+
137+
logger.info('Time-pause resume poll completed', {
138+
requestId,
139+
claimedRows,
140+
dispatched,
141+
failureCount: failures.length,
142+
})
143+
144+
return NextResponse.json({
145+
success: true,
146+
requestId,
147+
claimedRows,
148+
dispatched,
149+
failures,
150+
})
151+
} catch (error) {
152+
const message = toError(error).message
153+
logger.error('Time-pause resume poll failed', { requestId, error: message })
154+
return NextResponse.json({ success: false, requestId, error: message }, { status: 500 })
155+
} finally {
156+
await releaseLock(LOCK_KEY, requestId).catch(() => {})
157+
}
158+
})

apps/sim/app/resume/[workflowId]/[executionId]/resume-page-client.tsx

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ interface PausePointWithQueue {
8686
latestResumeEntry?: ResumeQueueEntrySummary | null
8787
parallelScope?: any
8888
loopScope?: any
89+
pauseKind?: 'human' | 'time'
90+
resumeAt?: string
8991
}
9092

9193
interface PausedExecutionSummary {
@@ -220,7 +222,9 @@ export default function ResumeExecutionPage({
220222
const [executionDetail, setExecutionDetail] = useState<PausedExecutionDetail | null>(
221223
initialExecutionDetail
222224
)
223-
const pausePoints = executionDetail?.pausePoints ?? []
225+
const pausePoints = (executionDetail?.pausePoints ?? []).filter(
226+
(point) => point.pauseKind !== 'time'
227+
)
224228

225229
const defaultContextId = useMemo(() => {
226230
if (initialContextId) return initialContextId

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/panel/components/editor/components/sub-block/components/short-input/short-input.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,7 +347,7 @@ export const ShortInput = memo(function ShortInput({
347347
<>
348348
<Input
349349
ref={ref as React.RefObject<HTMLInputElement>}
350-
className='allow-scroll w-full overflow-auto text-transparent selection:text-transparent caret-foreground [-ms-overflow-style:none] [scrollbar-width:none] placeholder:text-muted-foreground/50 [&::-webkit-scrollbar]:hidden'
350+
className='allow-scroll w-full overflow-auto text-transparent caret-foreground [-ms-overflow-style:none] [scrollbar-width:none] selection:text-transparent placeholder:text-muted-foreground/50 [&::-webkit-scrollbar]:hidden'
351351
readOnly={readOnly}
352352
placeholder={placeholder ?? ''}
353353
type='text'

apps/sim/blocks/blocks/wait.ts

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,14 @@ const WaitIcon = (props: SVGProps<SVGSVGElement>) => createElement(PauseCircle,
88
export const WaitBlock: BlockConfig = {
99
type: 'wait',
1010
name: 'Wait',
11-
description: 'Pause workflow execution for a specified time delay',
11+
description: 'Pause workflow execution for up to 30 days',
1212
longDescription:
13-
'Pauses workflow execution for a specified time interval. The wait executes a simple sleep for the configured duration.',
13+
'Pauses workflow execution for a specified time interval. Waits up to five minutes are held in-process; longer waits suspend the workflow and resume automatically once the configured duration elapses.',
1414
bestPractices: `
15-
- Use for simple time delays (max 10 minutes)
16-
- Configure the wait amount and unit (seconds or minutes)
17-
- Time-based waits are interruptible via workflow cancellation
15+
- Configure the wait amount and unit (seconds, minutes, hours, or days)
16+
- Maximum wait duration is 30 days
17+
- Waits up to 5 minutes execute in-process and are interruptible via workflow cancellation
18+
- Longer waits suspend the workflow; the execution resumes automatically when the timer fires
1819
- Enter a positive number for the wait amount
1920
`,
2021
category: 'blocks',
@@ -26,7 +27,7 @@ export const WaitBlock: BlockConfig = {
2627
id: 'timeValue',
2728
title: 'Wait Amount',
2829
type: 'short-input',
29-
description: 'Max: 600 seconds or 10 minutes',
30+
description: 'Max: 30 days',
3031
placeholder: '10',
3132
value: () => '10',
3233
required: true,
@@ -38,6 +39,8 @@ export const WaitBlock: BlockConfig = {
3839
options: [
3940
{ label: 'Seconds', id: 'seconds' },
4041
{ label: 'Minutes', id: 'minutes' },
42+
{ label: 'Hours', id: 'hours' },
43+
{ label: 'Days', id: 'days' },
4144
],
4245
value: () => 'seconds',
4346
required: true,
@@ -53,7 +56,7 @@ export const WaitBlock: BlockConfig = {
5356
},
5457
timeUnit: {
5558
type: 'string',
56-
description: 'Wait duration unit (seconds or minutes)',
59+
description: 'Wait duration unit (seconds, minutes, hours, or days)',
5760
},
5861
},
5962
outputs: {
@@ -65,5 +68,9 @@ export const WaitBlock: BlockConfig = {
6568
type: 'string',
6669
description: 'Status of the wait block (waiting, completed, cancelled)',
6770
},
71+
resumeAt: {
72+
type: 'string',
73+
description: 'ISO timestamp at which a suspended wait will resume (long waits only)',
74+
},
6875
},
6976
}

apps/sim/executor/execution/engine.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,8 @@ export class ExecutionEngine {
484484
parallelScope: pause.parallelScope,
485485
loopScope: pause.loopScope,
486486
resumeLinks: pause.resumeLinks,
487+
pauseKind: pause.pauseKind,
488+
resumeAt: pause.resumeAt,
487489
}))
488490

489491
return {

apps/sim/executor/handlers/human-in-the-loop/human-in-the-loop-handler.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,7 @@ export class HumanInTheLoopBlockHandler implements BlockHandler {
188188
parallelScope,
189189
loopScope,
190190
resumeLinks,
191+
pauseKind: 'human',
191192
}
192193

193194
const responseOutput: Record<string, any> = {

0 commit comments

Comments
 (0)