Skip to content

Commit 8b40252

Browse files
committed
improvements and shift to outbox policy with eager call
1 parent 53b2a06 commit 8b40252

47 files changed

Lines changed: 2006 additions & 556 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

apps/sim/app/api/chat/manage/[id]/route.ts

Lines changed: 25 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,12 @@ import { isDev } from '@/lib/core/config/feature-flags'
1111
import { encryptSecret } from '@/lib/core/security/encryption'
1212
import { getEmailDomain } from '@/lib/core/utils/urls'
1313
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
14-
import { notifySocketDeploymentChanged, performChatUndeploy } from '@/lib/workflows/orchestration'
15-
import { deployWorkflow } from '@/lib/workflows/persistence/utils'
14+
import { performChatUndeploy, performFullDeploy } from '@/lib/workflows/orchestration'
1615
import { checkChatAccess } from '@/app/api/chat/utils'
1716
import { createErrorResponse, createSuccessResponse } from '@/app/api/workflows/utils'
1817

1918
export const dynamic = 'force-dynamic'
19+
export const maxDuration = 120
2020

2121
const logger = createLogger('ChatDetailAPI')
2222

@@ -122,21 +122,8 @@ export const PATCH = withRouteHandler(
122122
}
123123
}
124124

125-
// Redeploy the workflow to ensure latest version is active
126-
const deployResult = await deployWorkflow({
127-
workflowId: existingChat[0].workflowId,
128-
deployedBy: session.user.id,
129-
})
130-
131-
if (!deployResult.success) {
132-
logger.warn(
133-
`Failed to redeploy workflow for chat update: ${deployResult.error}, continuing with chat update`
134-
)
135-
} else {
136-
logger.info(
137-
`Redeployed workflow ${existingChat[0].workflowId} for chat update (v${deployResult.version})`
138-
)
139-
await notifySocketDeploymentChanged(existingChat[0].workflowId)
125+
if (workflowId && workflowId !== existingChat[0].workflowId) {
126+
return createErrorResponse('Changing a chat deployment workflow is not supported', 400)
140127
}
141128

142129
let encryptedPassword
@@ -152,11 +139,31 @@ export const PATCH = withRouteHandler(
152139
logger.info('Keeping existing password')
153140
}
154141

142+
// Redeploy the workflow to ensure latest version is active
143+
const deployResult = await performFullDeploy({
144+
workflowId: existingChat[0].workflowId,
145+
userId: session.user.id,
146+
request,
147+
})
148+
149+
if (!deployResult.success) {
150+
logger.warn(`Failed to redeploy workflow for chat update: ${deployResult.error}`)
151+
const status =
152+
deployResult.errorCode === 'validation'
153+
? 400
154+
: deployResult.errorCode === 'not_found'
155+
? 404
156+
: 500
157+
return createErrorResponse(deployResult.error || 'Failed to redeploy workflow', status)
158+
}
159+
logger.info(
160+
`Redeployed workflow ${existingChat[0].workflowId} for chat update (v${deployResult.version})`
161+
)
162+
155163
const updateData: Record<string, unknown> = {
156164
updatedAt: new Date(),
157165
}
158166

159-
if (workflowId) updateData.workflowId = workflowId
160167
if (identifier) updateData.identifier = identifier
161168
if (title) updateData.title = title
162169
if (description !== undefined) updateData.description = description

apps/sim/app/api/form/route.ts

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,15 @@ import { isDev } from '@/lib/core/config/feature-flags'
1212
import { encryptSecret } from '@/lib/core/security/encryption'
1313
import { getEmailDomain } from '@/lib/core/utils/urls'
1414
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
15-
import { notifySocketDeploymentChanged } from '@/lib/workflows/orchestration'
16-
import { deployWorkflow } from '@/lib/workflows/persistence/utils'
15+
import { performFullDeploy } from '@/lib/workflows/orchestration'
1716
import {
1817
checkWorkflowAccessForFormCreation,
1918
DEFAULT_FORM_CUSTOMIZATIONS,
2019
} from '@/app/api/form/utils'
2120
import { createErrorResponse, createSuccessResponse } from '@/app/api/workflows/utils'
2221

2322
const logger = createLogger('FormAPI')
23+
export const maxDuration = 120
2424

2525
function getErrorMessage(error: unknown, fallback: string): string {
2626
return error instanceof Error ? error.message : fallback
@@ -106,21 +106,6 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
106106
return createErrorResponse('Workflow not found or access denied', 404)
107107
}
108108

109-
const result = await deployWorkflow({
110-
workflowId,
111-
deployedBy: session.user.id,
112-
})
113-
114-
if (!result.success) {
115-
return createErrorResponse(result.error || 'Failed to deploy workflow', 500)
116-
}
117-
118-
logger.info(
119-
`${workflowRecord.isDeployed ? 'Redeployed' : 'Auto-deployed'} workflow ${workflowId} for form (v${result.version})`
120-
)
121-
122-
await notifySocketDeploymentChanged(workflowId)
123-
124109
let encryptedPassword = null
125110
if (authType === 'password' && password) {
126111
const { encrypted } = await encryptSecret(password)
@@ -161,6 +146,23 @@ export const POST = withRouteHandler(async (request: NextRequest) => {
161146
updatedAt: new Date(),
162147
})
163148

149+
const result = await performFullDeploy({
150+
workflowId,
151+
userId: session.user.id,
152+
request,
153+
})
154+
155+
if (!result.success) {
156+
await db.delete(form).where(eq(form.id, id))
157+
const status =
158+
result.errorCode === 'validation' ? 400 : result.errorCode === 'not_found' ? 404 : 500
159+
return createErrorResponse(result.error || 'Failed to deploy workflow', status)
160+
}
161+
162+
logger.info(
163+
`${workflowRecord.isDeployed ? 'Redeployed' : 'Auto-deployed'} workflow ${workflowId} for form (v${result.version})`
164+
)
165+
164166
const baseDomain = getEmailDomain()
165167
const protocol = isDev ? 'http' : 'https'
166168
const formUrl = `${protocol}://${baseDomain}/form/${identifier}`

apps/sim/app/api/schedules/execute/route.test.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ vi.mock('drizzle-orm', () => ({
6969
ne: vi.fn((field: unknown, value: unknown) => ({ field, value, type: 'ne' })),
7070
lte: vi.fn((field: unknown, value: unknown) => ({ field, value, type: 'lte' })),
7171
lt: vi.fn((field: unknown, value: unknown) => ({ field, value, type: 'lt' })),
72+
inArray: vi.fn((field: unknown, values: unknown[]) => ({ field, values, type: 'inArray' })),
7273
not: vi.fn((condition: unknown) => ({ type: 'not', condition })),
7374
isNull: vi.fn((field: unknown) => ({ type: 'isNull', field })),
7475
or: vi.fn((...conditions: unknown[]) => ({ type: 'or', conditions })),
@@ -166,6 +167,8 @@ function createMockRequest(): NextRequest {
166167
describe('Scheduled Workflow Execution API Route', () => {
167168
beforeEach(() => {
168169
vi.clearAllMocks()
170+
dbChainMockFns.limit.mockReset()
171+
dbChainMockFns.returning.mockReset()
169172
resetDbChainMock()
170173
requestUtilsMockFns.mockGenerateRequestId.mockReturnValue('test-request-id')
171174
workflowsUtilsMockFns.mockGetWorkflowById.mockResolvedValue({
@@ -180,6 +183,7 @@ describe('Scheduled Workflow Execution API Route', () => {
180183
})
181184

182185
it('should execute scheduled workflows with Trigger.dev disabled', async () => {
186+
dbChainMockFns.limit.mockResolvedValueOnce([{ id: 'schedule-1' }]).mockResolvedValueOnce([])
183187
dbChainMockFns.returning.mockReturnValueOnce(SINGLE_SCHEDULE).mockReturnValueOnce([])
184188

185189
const response = await GET(createMockRequest())
@@ -193,6 +197,7 @@ describe('Scheduled Workflow Execution API Route', () => {
193197

194198
it('should queue schedules to Trigger.dev when enabled', async () => {
195199
mockFeatureFlags.isTriggerDevEnabled = true
200+
dbChainMockFns.limit.mockResolvedValueOnce([{ id: 'schedule-1' }]).mockResolvedValueOnce([])
196201
dbChainMockFns.returning.mockReturnValueOnce(SINGLE_SCHEDULE).mockReturnValueOnce([])
197202

198203
const response = await GET(createMockRequest())
@@ -215,6 +220,9 @@ describe('Scheduled Workflow Execution API Route', () => {
215220
})
216221

217222
it('should execute multiple schedules in parallel', async () => {
223+
dbChainMockFns.limit
224+
.mockResolvedValueOnce([{ id: 'schedule-1' }, { id: 'schedule-2' }])
225+
.mockResolvedValueOnce([])
218226
dbChainMockFns.returning.mockReturnValueOnce(MULTIPLE_SCHEDULES).mockReturnValueOnce([])
219227

220228
const response = await GET(createMockRequest())
@@ -225,7 +233,8 @@ describe('Scheduled Workflow Execution API Route', () => {
225233
})
226234

227235
it('should execute mothership jobs inline', async () => {
228-
dbChainMockFns.returning.mockReturnValueOnce([]).mockReturnValueOnce(SINGLE_JOB)
236+
dbChainMockFns.limit.mockResolvedValueOnce([]).mockResolvedValueOnce([{ id: 'job-1' }])
237+
dbChainMockFns.returning.mockReturnValueOnce(SINGLE_JOB)
229238

230239
const response = await GET(createMockRequest())
231240

@@ -241,6 +250,7 @@ describe('Scheduled Workflow Execution API Route', () => {
241250
})
242251

243252
it('should enqueue schedule with correlation metadata via job queue', async () => {
253+
dbChainMockFns.limit.mockResolvedValueOnce([{ id: 'schedule-1' }]).mockResolvedValueOnce([])
244254
dbChainMockFns.returning.mockReturnValueOnce(SINGLE_SCHEDULE).mockReturnValueOnce([])
245255

246256
const response = await GET(createMockRequest())

apps/sim/app/api/schedules/execute/route.ts

Lines changed: 84 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { db, workflowDeploymentVersion, workflowSchedule } from '@sim/db'
22
import { createLogger } from '@sim/logger'
33
import { toError } from '@sim/utils/errors'
44
import { generateId } from '@sim/utils/id'
5-
import { and, eq, isNull, lt, lte, ne, not, or, sql } from 'drizzle-orm'
5+
import { and, eq, inArray, isNull, lt, lte, ne, not, or, sql } from 'drizzle-orm'
66
import { type NextRequest, NextResponse } from 'next/server'
77
import { verifyCronAuth } from '@/lib/auth/internal'
88
import { getJobQueue, shouldExecuteInline } from '@/lib/core/async-jobs'
@@ -17,6 +17,9 @@ import {
1717
export const dynamic = 'force-dynamic'
1818

1919
const logger = createLogger('ScheduledExecuteAPI')
20+
const MAX_CRON_CLAIMS = 20
21+
const RESERVED_WORKFLOW_CLAIMS = 10
22+
const RESERVED_JOB_CLAIMS = MAX_CRON_CLAIMS - RESERVED_WORKFLOW_CLAIMS
2023

2124
const dueFilter = (queuedAt: Date) =>
2225
and(
@@ -30,27 +33,42 @@ const dueFilter = (queuedAt: Date) =>
3033
)
3134
)
3235

33-
export const GET = withRouteHandler(async (request: NextRequest) => {
34-
const requestId = generateRequestId()
35-
logger.info(`[${requestId}] Scheduled execution triggered at ${new Date().toISOString()}`)
36+
const activeWorkflowDeploymentFilter = () =>
37+
sql`${workflowSchedule.deploymentVersionId} = (select ${workflowDeploymentVersion.id} from ${workflowDeploymentVersion} where ${workflowDeploymentVersion.workflowId} = ${workflowSchedule.workflowId} and ${workflowDeploymentVersion.isActive} = true)`
3638

37-
const authError = verifyCronAuth(request, 'Schedule execution')
38-
if (authError) {
39-
return authError
40-
}
39+
const workflowScheduleFilter = (queuedAt: Date) =>
40+
and(
41+
dueFilter(queuedAt),
42+
or(eq(workflowSchedule.sourceType, 'workflow'), isNull(workflowSchedule.sourceType)),
43+
activeWorkflowDeploymentFilter()
44+
)
4145

42-
const queuedAt = new Date()
46+
const jobScheduleFilter = (queuedAt: Date) =>
47+
and(dueFilter(queuedAt), eq(workflowSchedule.sourceType, 'job'))
4348

44-
try {
45-
// Workflow schedules (require active deployment)
46-
const dueSchedules = await db
49+
async function claimWorkflowSchedules(queuedAt: Date, limit: number) {
50+
if (limit <= 0) return []
51+
52+
return db.transaction(async (tx) => {
53+
const rows = await tx
54+
.select({ id: workflowSchedule.id })
55+
.from(workflowSchedule)
56+
.where(workflowScheduleFilter(queuedAt))
57+
.for('update', { skipLocked: true })
58+
.limit(limit)
59+
60+
if (rows.length === 0) return []
61+
62+
return tx
4763
.update(workflowSchedule)
4864
.set({ lastQueuedAt: queuedAt, updatedAt: queuedAt })
4965
.where(
5066
and(
51-
dueFilter(queuedAt),
52-
or(eq(workflowSchedule.sourceType, 'workflow'), isNull(workflowSchedule.sourceType)),
53-
sql`${workflowSchedule.deploymentVersionId} = (select ${workflowDeploymentVersion.id} from ${workflowDeploymentVersion} where ${workflowDeploymentVersion.workflowId} = ${workflowSchedule.workflowId} and ${workflowDeploymentVersion.isActive} = true)`
67+
workflowScheduleFilter(queuedAt),
68+
inArray(
69+
workflowSchedule.id,
70+
rows.map((row) => row.id)
71+
)
5472
)
5573
)
5674
.returning({
@@ -62,21 +80,68 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
6280
failedCount: workflowSchedule.failedCount,
6381
nextRunAt: workflowSchedule.nextRunAt,
6482
lastQueuedAt: workflowSchedule.lastQueuedAt,
83+
deploymentVersionId: workflowSchedule.deploymentVersionId,
6584
sourceType: workflowSchedule.sourceType,
6685
})
86+
})
87+
}
6788

68-
// Jobs (no deployment, dispatch inline)
69-
const dueJobs = await db
89+
async function claimJobSchedules(queuedAt: Date, limit: number) {
90+
if (limit <= 0) return []
91+
92+
return db.transaction(async (tx) => {
93+
const rows = await tx
94+
.select({ id: workflowSchedule.id })
95+
.from(workflowSchedule)
96+
.where(jobScheduleFilter(queuedAt))
97+
.for('update', { skipLocked: true })
98+
.limit(limit)
99+
100+
if (rows.length === 0) return []
101+
102+
return tx
70103
.update(workflowSchedule)
71104
.set({ lastQueuedAt: queuedAt, updatedAt: queuedAt })
72-
.where(and(dueFilter(queuedAt), eq(workflowSchedule.sourceType, 'job')))
105+
.where(
106+
and(
107+
jobScheduleFilter(queuedAt),
108+
inArray(
109+
workflowSchedule.id,
110+
rows.map((row) => row.id)
111+
)
112+
)
113+
)
73114
.returning({
74115
id: workflowSchedule.id,
75116
cronExpression: workflowSchedule.cronExpression,
76117
failedCount: workflowSchedule.failedCount,
77118
lastQueuedAt: workflowSchedule.lastQueuedAt,
78119
sourceType: workflowSchedule.sourceType,
79120
})
121+
})
122+
}
123+
124+
export const GET = withRouteHandler(async (request: NextRequest) => {
125+
const requestId = generateRequestId()
126+
logger.info(`[${requestId}] Scheduled execution triggered at ${new Date().toISOString()}`)
127+
128+
const authError = verifyCronAuth(request, 'Schedule execution')
129+
if (authError) {
130+
return authError
131+
}
132+
133+
const queuedAt = new Date()
134+
135+
try {
136+
const dueSchedules = await claimWorkflowSchedules(queuedAt, RESERVED_WORKFLOW_CLAIMS)
137+
const dueJobs = await claimJobSchedules(queuedAt, RESERVED_JOB_CLAIMS)
138+
const remainingClaimBudget = Math.max(0, MAX_CRON_CLAIMS - dueSchedules.length - dueJobs.length)
139+
140+
if (remainingClaimBudget > 0 && dueSchedules.length === RESERVED_WORKFLOW_CLAIMS) {
141+
dueSchedules.push(...(await claimWorkflowSchedules(queuedAt, remainingClaimBudget)))
142+
} else if (remainingClaimBudget > 0 && dueJobs.length === RESERVED_JOB_CLAIMS) {
143+
dueJobs.push(...(await claimJobSchedules(queuedAt, remainingClaimBudget)))
144+
}
80145

81146
const totalCount = dueSchedules.length + dueJobs.length
82147
logger.info(
@@ -108,6 +173,7 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
108173
requestId,
109174
correlation,
110175
blockId: schedule.blockId || undefined,
176+
deploymentVersionId: schedule.deploymentVersionId || undefined,
111177
cronExpression: schedule.cronExpression || undefined,
112178
lastRanAt: schedule.lastRanAt?.toISOString(),
113179
failedCount: schedule.failedCount || 0,

apps/sim/app/api/v1/admin/types.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -645,6 +645,7 @@ export interface AdminDeployResult {
645645

646646
export interface AdminUndeployResult {
647647
isDeployed: boolean
648+
warnings?: string[]
648649
}
649650

650651
// =============================================================================

apps/sim/app/api/v1/admin/workflows/[id]/deploy/route.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
import type { AdminDeployResult, AdminUndeployResult } from '@/app/api/v1/admin/types'
1919

2020
const logger = createLogger('AdminWorkflowDeployAPI')
21+
export const maxDuration = 120
2122

2223
interface RouteParams {
2324
id: string
@@ -109,6 +110,7 @@ export const DELETE = withRouteHandler(
109110

110111
const response: AdminUndeployResult = {
111112
isDeployed: false,
113+
warnings: result.warnings,
112114
}
113115

114116
return singleResponse(response)

apps/sim/app/api/v1/admin/workflows/[id]/versions/[versionId]/activate/route.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import {
1414
} from '@/app/api/v1/admin/responses'
1515

1616
const logger = createLogger('AdminWorkflowActivateVersionAPI')
17+
export const maxDuration = 120
1718

1819
interface RouteParams {
1920
id: string

0 commit comments

Comments
 (0)