Skip to content

Commit 0c32dd4

Browse files
Filter out waits from hitl endpoints
1 parent a4b5df1 commit 0c32dd4

4 files changed

Lines changed: 25 additions & 10 deletions

File tree

apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,7 @@ export const POST = withRouteHandler(
140140
contextId,
141141
resumeInput,
142142
userId,
143+
allowedPauseKinds: ['human'],
143144
})
144145

145146
if (enqueueResult.status === 'queued') {

apps/sim/app/api/resume/poll/route.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
128128
}
129129
}
130130

131+
// We never auto-retry a failed dispatch: workflow blocks aren't idempotent, and an
132+
// operator must investigate stranded rows by hand. Setting nextResumeAt to the next
133+
// future pause (or null) drops the row out of the poll, surfacing the failure.
131134
await db
132135
.update(pausedExecutions)
133136
.set({ nextResumeAt: nextRemaining })

apps/sim/app/resume/[workflowId]/[executionId]/resume-page-client.tsx

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -222,9 +222,7 @@ export default function ResumeExecutionPage({
222222
const [executionDetail, setExecutionDetail] = useState<PausedExecutionDetail | null>(
223223
initialExecutionDetail
224224
)
225-
const pausePoints = (executionDetail?.pausePoints ?? []).filter(
226-
(point) => point.pauseKind !== 'time'
227-
)
225+
const pausePoints = executionDetail?.pausePoints ?? []
228226

229227
const defaultContextId = useMemo(() => {
230228
if (initialContextId) return initialContextId

apps/sim/lib/workflows/executor/human-in-the-loop-manager.ts

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import type {
1919
} from '@/executor/execution/types'
2020
import type {
2121
ExecutionResult,
22+
PauseKind,
2223
PausePoint,
2324
SerializedSnapshot,
2425
StreamingExecution,
@@ -87,6 +88,8 @@ interface EnqueueResumeArgs {
8788
contextId: string
8889
resumeInput: unknown
8990
userId: string
91+
/** Restrict which `pauseKind`s are eligible to resume. Defaults to allowing any. */
92+
allowedPauseKinds?: PauseKind[]
9093
}
9194

9295
type EnqueueResumeResult =
@@ -191,7 +194,7 @@ export class PauseResumeManager {
191194
}
192195

193196
static async enqueueOrStartResume(args: EnqueueResumeArgs): Promise<EnqueueResumeResult> {
194-
const { executionId, contextId, resumeInput, userId } = args
197+
const { executionId, contextId, resumeInput, userId, allowedPauseKinds } = args
195198

196199
return await db.transaction(async (tx) => {
197200
const pausedExecution = await tx
@@ -222,6 +225,11 @@ export class PauseResumeManager {
222225
throw new Error('Snapshot not ready; execution still finalizing pause')
223226
}
224227

228+
const pauseKind: PauseKind = pausePoint.pauseKind ?? 'human'
229+
if (allowedPauseKinds && !allowedPauseKinds.includes(pauseKind)) {
230+
throw new Error(`Pause point cannot be resumed manually (pauseKind=${pauseKind})`)
231+
}
232+
225233
const activeResume = await tx
226234
.select({ id: resumeQueue.id })
227235
.from(resumeQueue)
@@ -1334,12 +1342,13 @@ export class PauseResumeManager {
13341342
.where(whereClause)
13351343
.orderBy(desc(pausedExecutions.pausedAt))
13361344

1337-
return rows.map((row) =>
1338-
PauseResumeManager.normalizePausedExecution(
1339-
row,
1340-
PauseResumeManager.mapPausePoints(row.pausePoints)
1345+
return rows.flatMap((row) => {
1346+
const humanPoints = PauseResumeManager.mapPausePoints(row.pausePoints).filter(
1347+
(point) => point.pauseKind !== 'time'
13411348
)
1342-
)
1349+
if (humanPoints.length === 0) return []
1350+
return [PauseResumeManager.normalizePausedExecution(row, humanPoints)]
1351+
})
13431352
}
13441353

13451354
static async getPausedExecutionById(
@@ -1391,7 +1400,11 @@ export class PauseResumeManager {
13911400
row.pausePoints,
13921401
queuePositions,
13931402
latestEntries
1394-
)
1403+
).filter((point) => point.pauseKind !== 'time')
1404+
1405+
if (pausePoints.length === 0) {
1406+
return null
1407+
}
13951408

13961409
const executionSummary = PauseResumeManager.normalizePausedExecution(row, pausePoints)
13971410

0 commit comments

Comments
 (0)