Skip to content

Commit a48b43f

Browse files
fix(table): unify auto-fire and manual run paths in scheduler
scheduleWorkflowGroupRuns now owns eligibility, autoRun semantics, dep evaluation, and enqueue for both paths. Auto-fire callers omit opts; manual callers (triggerWorkflowGroupRun) pass { groupId, isManualRun: true } to bypass the autoRun=false skip and (for autoRun=false groups) the dep check. Per-row /run-workflow-group route delegates to triggerWorkflowGroupRun with rowIds=[rowId]. Single server-side path for both manual entry points. Also: optimisticallyScheduleNewlyEligibleGroups skips autoRun=false groups so editing a row's data doesn't phantom-mark autoRun=false output cells as Queued.
1 parent 306e122 commit a48b43f

4 files changed

Lines changed: 103 additions & 137 deletions

File tree

apps/sim/app/api/table/[tableId]/rows/[rowId]/run-workflow-group/route.ts

Lines changed: 17 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
import { createLogger } from '@sim/logger'
2-
import { generateId } from '@sim/utils/id'
32
import { type NextRequest, NextResponse } from 'next/server'
43
import { runRowWorkflowGroupContract } from '@/lib/api/contracts/tables'
54
import { parseRequest } from '@/lib/api/server'
65
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
76
import { generateRequestId } from '@/lib/core/utils/request'
87
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
9-
import type { RowExecutionMetadata } from '@/lib/table'
10-
import { updateRow } from '@/lib/table'
8+
import { triggerWorkflowGroupRun } from '@/lib/table/workflow-columns'
119
import { accessError, checkAccess } from '@/app/api/table/utils'
1210

1311
const logger = createLogger('TableRunWorkflowGroupAPI')
@@ -16,13 +14,7 @@ interface RouteParams {
1614
params: Promise<{ tableId: string; rowId: string }>
1715
}
1816

19-
/**
20-
* POST /api/table/[tableId]/rows/[rowId]/run-workflow-group
21-
*
22-
* Manually (re-)runs a workflow group for a single row by force-resetting
23-
* `executions[groupId]` to `pending`. The `updateRow` call fires the
24-
* scheduler which enqueues the cell job.
25-
*/
17+
/** POST /api/table/[tableId]/rows/[rowId]/run-workflow-group */
2618
export const POST = withRouteHandler(async (request: NextRequest, { params }: RouteParams) => {
2719
const requestId = generateRequestId()
2820

@@ -39,58 +31,27 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
3931

4032
const result = await checkAccess(tableId, authResult.userId, 'write')
4133
if (!result.ok) return accessError(result, requestId, tableId)
42-
const { table } = result
4334

44-
if (table.workspaceId !== workspaceId) {
45-
return NextResponse.json({ error: 'Invalid workspace ID' }, { status: 400 })
46-
}
47-
48-
const group = (table.schema.workflowGroups ?? []).find((g) => g.id === groupId)
49-
if (!group) {
50-
return NextResponse.json({ error: 'Workflow group not found' }, { status: 404 })
51-
}
52-
53-
const executionId = generateId()
54-
const pendingExec: RowExecutionMetadata = {
55-
status: 'pending',
56-
executionId,
57-
jobId: null,
58-
workflowId: group.workflowId,
59-
error: null,
60-
}
61-
/**
62-
* Clear the group's output cells so the rerun starts visually fresh —
63-
* otherwise stale values from the previous run linger in the UI until the
64-
* new run writes new ones (or doesn't, on error/router-skip).
65-
*/
66-
const clearedData = Object.fromEntries(group.outputs.map((o) => [o.columnName, null]))
67-
const updated = await updateRow(
68-
{
69-
tableId,
70-
rowId,
71-
data: clearedData,
72-
workspaceId,
73-
executionsPatch: { [groupId]: pendingExec },
74-
},
75-
table,
76-
requestId
77-
)
78-
if (updated === null) {
79-
// The cell-task cancellation guard rejected the write — typically a
80-
// racing stop click that already wrote `cancelled` for this run.
81-
// Surface 409 so the caller doesn't poll indefinitely for a run that
82-
// was never enqueued.
83-
return NextResponse.json(
84-
{ error: 'Run was cancelled before it could be scheduled' },
85-
{ status: 409 }
86-
)
87-
}
35+
const { triggered } = await triggerWorkflowGroupRun({
36+
tableId,
37+
groupId,
38+
workspaceId,
39+
mode: 'all',
40+
requestId,
41+
rowIds: [rowId],
42+
})
8843

89-
return NextResponse.json({ success: true, data: { executionId } })
44+
return NextResponse.json({ success: true, data: { triggered } })
9045
} catch (error) {
9146
if (error instanceof Error && error.message === 'Row not found') {
9247
return NextResponse.json({ error: 'Row not found' }, { status: 404 })
9348
}
49+
if (error instanceof Error && error.message === 'Workflow group not found') {
50+
return NextResponse.json({ error: 'Workflow group not found' }, { status: 404 })
51+
}
52+
if (error instanceof Error && error.message === 'Invalid workspace ID') {
53+
return NextResponse.json({ error: 'Invalid workspace ID' }, { status: 400 })
54+
}
9455
logger.error(`run-workflow-group failed:`, error)
9556
return NextResponse.json({ error: 'Failed to run workflow group' }, { status: 500 })
9657
}

apps/sim/lib/api/contracts/tables.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -909,7 +909,7 @@ export const runRowWorkflowGroupContract = defineRouteContract({
909909
body: runRowWorkflowGroupBodySchema,
910910
response: {
911911
mode: 'json',
912-
schema: successResponseSchema(z.object({ executionId: z.string() })),
912+
schema: successResponseSchema(z.object({ triggered: z.number() })),
913913
},
914914
})
915915

apps/sim/lib/table/deps.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ export function optimisticallyScheduleNewlyEligibleGroups(
7575

7676
let next: RowExecutions | null = null
7777
for (const group of groups) {
78+
// autoRun=false groups don't auto-fire on dep-fill — leave them empty.
79+
if (group.autoRun === false) continue
7880
const wasSatisfied = areGroupDepsSatisfied(group, beforeRow)
7981
if (wasSatisfied) continue
8082
if (!areGroupDepsSatisfied(group, afterRow)) continue

apps/sim/lib/table/workflow-columns.ts

Lines changed: 83 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,19 @@ export {
3636
} from './deps'
3737

3838
/**
39-
* Per-(row, group) eligibility for the **automatic** scheduler path. Skip
40-
* when the group has `autoRun: false` set (manual-only), when the group is
41-
* in flight (`queued`, `running`, or `pending` with a `jobId` already
42-
* stamped), or in a terminal state. Plain `pending` without a jobId is the
43-
* "ready to dispatch" state — the run route sets it and the scheduler is
44-
* what actually enqueues the job.
45-
*
46-
* The manual "Run all" path (`triggerWorkflowGroupRun`) uses
47-
* `areGroupDepsSatisfied` directly and bypasses this guard, so the user can
48-
* still kick off a run on a group that's set to manual-only.
39+
* Per-(row, group) eligibility for both the auto-fire reactor and manual
40+
* runs. Manual runs bypass the `autoRun === false` skip, and additionally
41+
* bypass the dep check for `autoRun === false` groups (those are user-model
42+
* "no deps, manual only").
4943
*/
50-
export function isGroupEligible(group: WorkflowGroup, row: TableRow): boolean {
51-
if (group.autoRun === false) return false
44+
export function isGroupEligible(
45+
group: WorkflowGroup,
46+
row: TableRow,
47+
opts?: { isManualRun?: boolean }
48+
): boolean {
49+
const isManualRun = opts?.isManualRun ?? false
50+
if (group.autoRun === false && !isManualRun) return false
51+
5252
const exec = row.executions?.[group.id]
5353
const status = exec?.status
5454
if (
@@ -60,33 +60,45 @@ export function isGroupEligible(group: WorkflowGroup, row: TableRow): boolean {
6060
) {
6161
return false
6262
}
63-
if (status === 'pending' && exec?.jobId) {
64-
return false
65-
}
63+
if (status === 'pending' && exec?.jobId) return false
64+
65+
if (isManualRun && group.autoRun === false) return true
6666
return areGroupDepsSatisfied(group, row)
6767
}
6868

6969
/**
7070
* Iterates workflow groups × rows and enqueues eligible cell jobs. Safe to
71-
* call after any row-write; errors are logged. Concurrency is bounded by the
72-
* trigger.dev queue (`concurrencyKey: tableId`), so this just enqueues.
71+
* call after any row-write; errors are logged. Auto-fire callers omit
72+
* `opts`; manual callers (`triggerWorkflowGroupRun`) pass `{ groupId,
73+
* isManualRun: true }`.
7374
*/
75+
export interface ScheduleOpts {
76+
groupId?: string
77+
isManualRun?: boolean
78+
}
79+
7480
export async function scheduleWorkflowGroupRuns(
7581
table: TableDefinition,
76-
rows: TableRow[]
77-
): Promise<void> {
82+
rows: TableRow[],
83+
opts?: ScheduleOpts
84+
): Promise<{ triggered: number }> {
7885
try {
79-
const groups = table.schema.workflowGroups ?? []
80-
if (groups.length === 0) return
81-
if (rows.length === 0) return
86+
const allGroups = table.schema.workflowGroups ?? []
87+
if (allGroups.length === 0) return { triggered: 0 }
88+
if (rows.length === 0) return { triggered: 0 }
89+
90+
const groups = opts?.groupId
91+
? allGroups.filter((g) => g.id === opts.groupId)
92+
: allGroups
93+
if (groups.length === 0) return { triggered: 0 }
8294

8395
const orderedRows = rows.length <= 1 ? rows : [...rows].sort((a, b) => a.position - b.position)
8496

8597
const pendingRuns: RunGroupCellOptions[] = []
8698

8799
for (const row of orderedRows) {
88100
for (const group of groups) {
89-
if (!isGroupEligible(group, row)) continue
101+
if (!isGroupEligible(group, row, { isManualRun: opts?.isManualRun })) continue
90102
pendingRuns.push({
91103
tableId: table.id,
92104
tableName: table.name,
@@ -99,29 +111,33 @@ export async function scheduleWorkflowGroupRuns(
99111
}
100112
}
101113

102-
if (pendingRuns.length === 0) return
114+
if (pendingRuns.length === 0) return { triggered: 0 }
103115

104116
logger.info(`Scheduling ${pendingRuns.length} workflow group cell run(s) for table=${table.id}`)
105117

106118
const queue = await getJobQueue()
107119
const { executeWorkflowGroupCellJob } = await import('@/background/workflow-column-execution')
108-
const items = pendingRuns.map((opts) => ({
109-
payload: opts,
120+
const items = pendingRuns.map((runOpts) => ({
121+
payload: runOpts,
110122
options: {
111123
metadata: {
112-
workflowId: opts.workflowId,
113-
workspaceId: opts.workspaceId,
124+
workflowId: runOpts.workflowId,
125+
workspaceId: runOpts.workspaceId,
114126
correlation: {
115-
executionId: opts.executionId,
116-
requestId: `wfgrp-${opts.executionId}`,
127+
executionId: runOpts.executionId,
128+
requestId: `wfgrp-${runOpts.executionId}`,
117129
source: 'workflow' as const,
118-
workflowId: opts.workflowId,
130+
workflowId: runOpts.workflowId,
119131
triggerType: 'table',
120132
},
121133
},
122-
concurrencyKey: opts.tableId,
134+
concurrencyKey: runOpts.tableId,
123135
concurrencyLimit: TABLE_CONCURRENCY_LIMIT,
124-
tags: [`tableId:${opts.tableId}`, `rowId:${opts.rowId}`, `group:${opts.groupId}`],
136+
tags: [
137+
`tableId:${runOpts.tableId}`,
138+
`rowId:${runOpts.rowId}`,
139+
`group:${runOpts.groupId}`,
140+
],
125141
runner: executeWorkflowGroupCellJob as EnqueueOptions['runner'],
126142
},
127143
}))
@@ -132,26 +148,28 @@ export async function scheduleWorkflowGroupRuns(
132148
} catch (err) {
133149
logger.error(`Batch enqueue failed for table=${table.id}:`, err)
134150
await Promise.allSettled(
135-
pendingRuns.map((opts) =>
136-
writeWorkflowGroupState(opts, {
151+
pendingRuns.map((runOpts) =>
152+
writeWorkflowGroupState(runOpts, {
137153
executionState: {
138154
status: 'error',
139-
executionId: opts.executionId,
155+
executionId: runOpts.executionId,
140156
jobId: null,
141-
workflowId: opts.workflowId,
157+
workflowId: runOpts.workflowId,
142158
error: toError(err).message,
143159
},
144160
})
145161
)
146162
)
147-
return
163+
return { triggered: 0 }
148164
}
149165

150166
for (let i = 0; i < pendingRuns.length; i++) {
151167
await stampQueuedOrCancel(queue, pendingRuns[i], jobIds[i])
152168
}
169+
return { triggered: pendingRuns.length }
153170
} catch (err) {
154171
logger.error('scheduleWorkflowGroupRuns failed:', err)
172+
return { triggered: 0 }
155173
}
156174
}
157175

@@ -298,16 +316,10 @@ export async function cancelWorkflowGroupRuns(tableId: string, rowId?: string):
298316
}
299317

300318
/**
301-
* Manually triggers a workflow group for every dep-satisfied row in a table.
302-
* `mode: 'all'` re-runs every eligible row; `mode: 'incomplete'` skips rows
303-
* whose group is already `completed`. When `rowIds` is provided, only those
304-
* rows are candidates — the same eligibility predicate still applies, so a
305-
* mid-run row or one with unmet deps is silently skipped. Eligible rows have
306-
* their output cells cleared and their `executions[groupId]` reset to
307-
* `pending`; the scheduler picks them up and enqueues per-cell jobs. Returns
308-
* the number of rows that were marked for re-run. Used by the
309-
* `groups/[groupId]/run` HTTP route and the Copilot/Mothership
310-
* `run_workflow_group` op so both share one eligibility predicate.
319+
* Manually runs a workflow group on the user-selected rows. `mode:
320+
* 'incomplete'` skips already-completed rows; `rowIds` narrows the
321+
* candidate set further. Clears output cells, then delegates to
322+
* `scheduleWorkflowGroupRuns` with `isManualRun: true`.
311323
*/
312324
export async function triggerWorkflowGroupRun(opts: {
313325
tableId: string
@@ -326,10 +338,8 @@ export async function triggerWorkflowGroupRun(opts: {
326338
const group = (table.schema.workflowGroups ?? []).find((g) => g.id === groupId)
327339
if (!group) throw new Error('Workflow group not found')
328340

329-
// Push the in-flight / terminal-state filters into SQL so we don't pull
330-
// every row in the table into Node just to discard most of them. Dependency
331-
// satisfaction is still checked in JS afterwards (it can span multiple
332-
// columns and other groups' statuses, so it's awkward to express in JSONB).
341+
// SQL pre-filter so we don't pull the whole table into Node;
342+
// isGroupEligible re-checks every survivor.
333343
const filters = [
334344
eq(userTableRows.tableId, tableId),
335345
eq(userTableRows.workspaceId, workspaceId),
@@ -358,42 +368,35 @@ export async function triggerWorkflowGroupRun(opts: {
358368

359369
if (candidateRows.length === 0) return { triggered: 0 }
360370

361-
const eligibleRows = candidateRows.filter((r) => {
362-
const tableRow: TableRow = {
371+
// Clear output values + drop the group's exec entry so the cell goes
372+
// back to empty before the scheduler stamps `queued`. `null` in the
373+
// executionsPatch deletes the entry.
374+
const clearedData = Object.fromEntries(group.outputs.map((o) => [o.columnName, null])) as RowData
375+
const updates = candidateRows.map((r) => ({
376+
rowId: r.id,
377+
data: clearedData,
378+
executionsPatch: { [groupId]: null },
379+
}))
380+
381+
await batchUpdateRows({ tableId, updates, workspaceId }, table, requestId)
382+
383+
const clearedRows: TableRow[] = candidateRows.map((r) => {
384+
const existingExec = (r.executions as RowExecutions) ?? {}
385+
const { [groupId]: _, ...remaining } = existingExec
386+
return {
363387
id: r.id,
364-
data: r.data as RowData,
365-
executions: (r.executions as RowExecutions) ?? {},
388+
data: { ...(r.data as RowData), ...clearedData },
389+
executions: remaining,
366390
position: r.position,
367391
createdAt: r.createdAt,
368392
updatedAt: r.updatedAt,
369393
}
370-
try {
371-
return areGroupDepsSatisfied(group, tableRow)
372-
} catch {
373-
return false
374-
}
375394
})
376395

377-
if (eligibleRows.length === 0) return { triggered: 0 }
378-
379-
const clearedData = Object.fromEntries(group.outputs.map((o) => [o.columnName, null])) as RowData
380-
const updates = eligibleRows.map((r) => {
381-
const pendingExec: RowExecutionMetadata = {
382-
status: 'pending',
383-
executionId: generateId(),
384-
jobId: null,
385-
workflowId: group.workflowId,
386-
error: null,
387-
}
388-
return {
389-
rowId: r.id,
390-
data: clearedData,
391-
executionsPatch: { [groupId]: pendingExec },
392-
}
396+
return scheduleWorkflowGroupRuns(table, clearedRows, {
397+
groupId,
398+
isManualRun: true,
393399
})
394-
395-
const opResult = await batchUpdateRows({ tableId, updates, workspaceId }, table, requestId)
396-
return { triggered: opResult.affectedCount }
397400
}
398401

399402
// ───────────────────────────── Validation ─────────────────────────────

0 commit comments

Comments
 (0)