Skip to content

Commit 7a2103e

Browse files
authored
improvement(logs): move per-block progress markers to Redis to cut write amplification (#5248)
* improvement(logs): move per-block progress markers to Redis to cut write amplification Per-block lastStartedBlock/lastCompletedBlock markers were persisted via a jsonb_set UPDATE on workflow_execution_logs on every block start and complete (~2N UPDATEs per run) — the heaviest write query in the DB. These are live progress breadcrumbs with no DB-polling consumer (live progress comes from the executor over WebSocket); their only durable value is a breadcrumb folded into the final record. Behind the redis-progress-markers flag, markers now live in Redis during the run and are folded into the single terminal UPDATE at completion, dropping per-run row UPDATEs from ~2N+1 to 1. - New progress-markers module: HASH execution:progress:{id}, atomic Lua monotonic-guard writes preserving the existing <= ordering, reservation-aligned TTL backstop, graceful no-op when Redis is unavailable - Deterministic GC: cleared at every terminal/pause boundary; TTL covers crashes - Flag resolved once per logging session so a run never mixes write paths - Fold markers into the completion record (Redis wins, falls back to row markers) - Merge live markers for in-flight detail reads - Extract shared getExecutionReservationTtlMs so marker and admission-slot TTLs share one source of truth * fix(logs): SQL fallback when Redis marker write fails, fold markers on force-fail, validate marker shape Addresses review feedback on the redis-progress-markers PR: - persistLast* now falls back to the jsonb_set UPDATE when Redis is unavailable or the write fails (setLast* returns whether it persisted), so a marker is never dropped when the flag is on without a healthy Redis. - markExecutionAsFailed folds live Redis markers into execution_data before clearing, so the last-started/last-completed breadcrumb survives the force-fail path. - getProgressMarkers validates marker shape (rebuilds from typed fields), so a stale or wrong-shaped Redis value can never reach API consumers. * chore(logs): convert inline marker comments to TSDoc * fix(logs): preserve markers when the completion read fails getProgressMarkers now returns null on a Redis read error (vs {} for genuinely empty). completeWorkflowExecution and markExecutionAsFailed skip clearProgressMarkers when the read returns null, so a transient read error at completion no longer wipes markers that are still durably in Redis — the TTL reclaims them instead. * fix(logs): resolve marker store split-brain by latest-timestamp-wins + drain on force-fail - When a Redis marker write falls back to SQL, Redis and the row can each hold a marker for a different block; reads/folds previously preferred Redis unconditionally and could pick a stale value. Now the completion fold, the in-flight detail read, and the force-fail fold all pick the marker with the later timestamp (pickLatestStartedMarker/pickLatestCompletedMarker; markExecutionAsFailed uses a monotonic SQL guard). - markAsFailed now drains pending per-block marker writes (not just the completion promise) before folding, so a force-fail racing onBlockStart/onBlockComplete still captures the latest breadcrumb. * fix(logs): harden Lua marker guard against non-table decoded values Guard the monotonic-check index with type(decoded) == 'table' so a corrupted Redis field that decodes to a non-table (e.g. a number) can't error the eval; our write path only ever stores JSON objects, so this is defense-in-depth. * perf(logs): skip completion Redis read/clear when markers went to SQL completeWorkflowExecution now takes readProgressMarkers (the session's resolved marker mode); when the flag is off it skips the per-completion HGETALL+DEL entirely instead of probing a key that was never written. Sticky to the session so it stays flip-safe (an execution that wrote to Redis always folds+clears Redis). Non-session callers default to true (safe read-and-fold). Also hardened the Lua guard with type(decoded)=='table'.
1 parent b8d0b4f commit 7a2103e

11 files changed

Lines changed: 773 additions & 19 deletions

File tree

apps/sim/lib/billing/calculations/usage-reservation.ts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { getPlanTypeForLimits } from '@/lib/billing/plan-helpers'
55
import { isOrgScopedSubscription } from '@/lib/billing/subscriptions/utils'
66
import { isBillingEnabled } from '@/lib/core/config/env-flags'
77
import { getRedisClient } from '@/lib/core/config/redis'
8-
import { getMaxExecutionTimeout } from '@/lib/core/execution-limits'
8+
import { getExecutionReservationTtlMs } from '@/lib/core/execution-limits'
99
import type { SubscriptionPlan } from '@/lib/core/rate-limiter/types'
1010

1111
const logger = createLogger('UsageReservation')
@@ -41,9 +41,6 @@ const MAX_CONCURRENT_EXECUTIONS: Record<SubscriptionPlan, number> = {
4141
*/
4242
const SLOT_COST_ESTIMATE = BASE_EXECUTION_CHARGE
4343

44-
/** Safety buffer added to the reservation TTL beyond the max execution timeout. */
45-
const RESERVATION_TTL_BUFFER_MS = 60_000
46-
4744
const INFLIGHT_KEY_PREFIX = 'usage:inflight:'
4845
const POINTER_KEY_PREFIX = 'usage:reservation:'
4946

@@ -135,7 +132,7 @@ export async function reserveExecutionSlot(
135132
const maxConcurrency = getMaxConcurrentExecutions(subscription?.plan)
136133
const headroom = Math.max(0, limit - currentUsage)
137134
const headroomSlots = Math.floor(headroom / SLOT_COST_ESTIMATE)
138-
const ttlMs = getMaxExecutionTimeout() + RESERVATION_TTL_BUFFER_MS
135+
const ttlMs = getExecutionReservationTtlMs()
139136
const now = Date.now()
140137
const expiryScore = now + ttlMs
141138

apps/sim/lib/core/config/env.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ export const env = createEnv({
7777
TABLE_SNAPSHOT_CACHE: z.boolean().optional(), // Mount tables into sandboxes by reference via a version-keyed CSV snapshot in object storage instead of draining the whole table into web-process heap
7878
PII_REDACTION: z.boolean().optional(), // Redact PII from workflow logs via configurable Data Retention rules (Presidio at the logger persist choke point) and expose the Data Retention config UI
7979
TRIGGER_EU_REGION: z.boolean().optional(), // Route Trigger.dev runs to eu-central-1 instead of the default us-east-1 (fallback for the trigger-eu-region flag when AppConfig is not the source of truth)
80+
REDIS_PROGRESS_MARKERS: z.boolean().optional(), // Write per-block live progress markers to Redis instead of jsonb_set UPDATEs on workflow_execution_logs (fallback for the redis-progress-markers flag when AppConfig is not the source of truth)
8081

8182
// Table feature limits (per plan). Apply when billing is disabled (free tier defaults) or for billed plans.
8283
FREE_TABLES_LIMIT: z.number().optional(), // Max user tables per workspace on free tier (default: 5)

apps/sim/lib/core/config/feature-flags.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,14 @@ const FEATURE_FLAGS = {
9797
'resolveTriggerRegion, so the whole deployment switches regions together.',
9898
fallback: 'TRIGGER_EU_REGION',
9999
},
100+
'redis-progress-markers': {
101+
description:
102+
'Write per-block live progress markers (lastStartedBlock/lastCompletedBlock) to Redis ' +
103+
'instead of jsonb_set UPDATEs on workflow_execution_logs, folding them into the single ' +
104+
'terminal UPDATE at completion. Eliminates the heaviest write query. Resolved once per ' +
105+
'logging session (no user/org context) so an execution never mixes write paths.',
106+
fallback: 'REDIS_PROGRESS_MARKERS',
107+
},
100108
} satisfies Record<string, FeatureFlagDefinition>
101109

102110
/**

apps/sim/lib/core/execution-limits/types.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,19 @@ export function getMaxExecutionTimeout(): number {
7575
return EXECUTION_TIMEOUTS.enterprise.async
7676
}
7777

78+
/** Safety buffer added beyond the max execution timeout for execution-lifetime TTLs. */
79+
export const RESERVATION_TTL_BUFFER_MS = 60_000
80+
81+
/**
82+
* TTL (ms) bounding how long a single execution can remain in flight: the max
83+
* execution timeout plus a safety buffer. Shared source of truth for the
84+
* admission-reservation key and the live progress-marker key so they expire on
85+
* the same timeline.
86+
*/
87+
export function getExecutionReservationTtlMs(): number {
88+
return getMaxExecutionTimeout() + RESERVATION_TTL_BUFFER_MS
89+
}
90+
7891
export const DEFAULT_EXECUTION_TIMEOUT_MS = EXECUTION_TIMEOUTS.free.sync
7992

8093
export function isTimeoutError(error: unknown): boolean {

apps/sim/lib/logs/execution/logger.ts

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,13 @@ import {
3737
replaceLargeValueReferenceKeysWithClient,
3838
} from '@/lib/execution/payloads/large-value-metadata'
3939
import { type RedactablePayload, redactPIIFromExecution } from '@/lib/logs/execution/pii-redaction'
40+
import {
41+
clearProgressMarkers,
42+
type ExecutionProgressMarkers,
43+
getProgressMarkers,
44+
pickLatestCompletedMarker,
45+
pickLatestStartedMarker,
46+
} from '@/lib/logs/execution/progress-markers'
4047
import { snapshotService } from '@/lib/logs/execution/snapshot/service'
4148
import {
4249
externalizeExecutionData,
@@ -417,8 +424,15 @@ export class ExecutionLogger implements IExecutionLoggerService {
417424
return minimalWithSize.executionData
418425
}
419426

427+
/**
428+
* Assemble the final `execution_data` for the terminal UPDATE. Live progress
429+
* markers are sourced from `progressMarkers` (Redis, current run) and fall back
430+
* to markers already on the row — the legacy SQL path and resumed rows that
431+
* folded markers in at their prior pause boundary.
432+
*/
420433
private buildCompletedExecutionData(params: {
421434
existingExecutionData?: WorkflowExecutionLog['executionData']
435+
progressMarkers?: ExecutionProgressMarkers
422436
traceSpans?: TraceSpan[]
423437
finalOutput: BlockOutputData
424438
finalizationPath?: ExecutionFinalizationPath
@@ -436,6 +450,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
436450
}): WorkflowExecutionLog['executionData'] {
437451
const {
438452
existingExecutionData,
453+
progressMarkers,
439454
traceSpans,
440455
finalOutput,
441456
finalizationPath,
@@ -446,6 +461,15 @@ export class ExecutionLogger implements IExecutionLoggerService {
446461
} = params
447462
const traceSpanCount = countTraceSpans(traceSpans)
448463

464+
const lastStartedBlock = pickLatestStartedMarker(
465+
progressMarkers?.lastStartedBlock,
466+
existingExecutionData?.lastStartedBlock
467+
)
468+
const lastCompletedBlock = pickLatestCompletedMarker(
469+
progressMarkers?.lastCompletedBlock,
470+
existingExecutionData?.lastCompletedBlock
471+
)
472+
449473
return {
450474
...(existingExecutionData?.environment
451475
? { environment: existingExecutionData.environment }
@@ -459,12 +483,8 @@ export class ExecutionLogger implements IExecutionLoggerService {
459483
}
460484
: {}),
461485
...(existingExecutionData?.error ? { error: existingExecutionData.error } : {}),
462-
...(existingExecutionData?.lastStartedBlock
463-
? { lastStartedBlock: existingExecutionData.lastStartedBlock }
464-
: {}),
465-
...(existingExecutionData?.lastCompletedBlock
466-
? { lastCompletedBlock: existingExecutionData.lastCompletedBlock }
467-
: {}),
486+
...(lastStartedBlock ? { lastStartedBlock } : {}),
487+
...(lastCompletedBlock ? { lastCompletedBlock } : {}),
468488
...(completionFailure ? { completionFailure } : {}),
469489
...(finalizationPath ? { finalizationPath } : {}),
470490
hasTraceSpans: traceSpanCount > 0,
@@ -659,6 +679,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
659679
isResume?: boolean
660680
level?: 'info' | 'error'
661681
status?: 'completed' | 'failed' | 'cancelled' | 'pending'
682+
readProgressMarkers?: boolean
662683
}): Promise<WorkflowExecutionLog> {
663684
const {
664685
executionId,
@@ -674,6 +695,7 @@ export class ExecutionLogger implements IExecutionLoggerService {
674695
isResume,
675696
level: levelOverride,
676697
status: statusOverride,
698+
readProgressMarkers = true,
677699
} = params
678700

679701
let execLog = logger.withMetadata({ executionId })
@@ -731,8 +753,11 @@ export class ExecutionLogger implements IExecutionLoggerService {
731753
models: costSummary.models,
732754
}
733755

756+
const progressMarkers = readProgressMarkers ? await getProgressMarkers(executionId) : null
757+
734758
const builtExecutionData = this.buildCompletedExecutionData({
735759
existingExecutionData,
760+
progressMarkers: progressMarkers ?? undefined,
736761
traceSpans: mergedTraceSpans,
737762
finalOutput,
738763
finalizationPath,
@@ -893,6 +918,8 @@ export class ExecutionLogger implements IExecutionLoggerService {
893918
return log
894919
})
895920

921+
if (progressMarkers !== null) void clearProgressMarkers(executionId)
922+
896923
try {
897924
// Skip workflow lookup if workflow was deleted.
898925
const wf = updatedLog.workflowId

apps/sim/lib/logs/execution/logging-session.test.ts

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,31 @@ vi.mock('@/lib/logs/execution/logger', () => ({
6666
},
6767
}))
6868

69+
const {
70+
isFeatureEnabledMock,
71+
setLastStartedBlockMock,
72+
setLastCompletedBlockMock,
73+
getProgressMarkersMock,
74+
clearProgressMarkersMock,
75+
} = vi.hoisted(() => ({
76+
isFeatureEnabledMock: vi.fn().mockResolvedValue(false),
77+
setLastStartedBlockMock: vi.fn().mockResolvedValue(false),
78+
setLastCompletedBlockMock: vi.fn().mockResolvedValue(false),
79+
getProgressMarkersMock: vi.fn().mockResolvedValue({}),
80+
clearProgressMarkersMock: vi.fn().mockResolvedValue(undefined),
81+
}))
82+
83+
vi.mock('@/lib/core/config/feature-flags', () => ({
84+
isFeatureEnabled: isFeatureEnabledMock,
85+
}))
86+
87+
vi.mock('@/lib/logs/execution/progress-markers', () => ({
88+
setLastStartedBlock: setLastStartedBlockMock,
89+
setLastCompletedBlock: setLastCompletedBlockMock,
90+
getProgressMarkers: getProgressMarkersMock,
91+
clearProgressMarkers: clearProgressMarkersMock,
92+
}))
93+
6994
vi.mock('@/lib/logs/execution/logging-factory', () => ({
7095
calculateCostSummary: vi.fn().mockReturnValue({
7196
totalCost: 0,
@@ -647,4 +672,126 @@ describe('LoggingSession.markExecutionAsFailed workflowId scoping', () => {
647672
const combined = String(Array.from(strings)).toLowerCase() + values.join(' ').toLowerCase()
648673
expect(combined).toContain('force_failed')
649674
})
675+
676+
it('clears Redis markers when marking failed (terminal boundary outside completeWorkflowExecution)', async () => {
677+
await LoggingSession.markExecutionAsFailed('exec-3', 'boom', undefined, 'wf-3')
678+
expect(clearProgressMarkersMock).toHaveBeenCalledWith('exec-3')
679+
})
680+
681+
it('folds live Redis markers into the row before clearing on force-fail', async () => {
682+
getProgressMarkersMock.mockResolvedValueOnce({
683+
lastStartedBlock: { blockId: 'b1', blockName: 'Fetch', blockType: 'api', startedAt: 't1' },
684+
lastCompletedBlock: {
685+
blockId: 'b1',
686+
blockName: 'Fetch',
687+
blockType: 'api',
688+
endedAt: 't2',
689+
success: false,
690+
},
691+
})
692+
693+
await LoggingSession.markExecutionAsFailed('exec-9', 'boom', undefined, 'wf-9')
694+
695+
const folded = dbMocks.sql.mock.calls
696+
.map((c) => String(Array.from(c[0] as TemplateStringsArray)))
697+
.join(' ')
698+
expect(folded).toContain('lastStartedBlock')
699+
expect(folded).toContain('lastCompletedBlock')
700+
expect(clearProgressMarkersMock).toHaveBeenCalledWith('exec-9')
701+
})
702+
703+
it('does not clear markers when the Redis read fails (avoids wiping the only copy)', async () => {
704+
getProgressMarkersMock.mockResolvedValueOnce(null)
705+
await LoggingSession.markExecutionAsFailed('exec-readfail', 'boom', undefined, 'wf-x')
706+
expect(clearProgressMarkersMock).not.toHaveBeenCalled()
707+
})
708+
})
709+
710+
describe('LoggingSession progress-marker write path', () => {
711+
beforeEach(() => {
712+
vi.clearAllMocks()
713+
startWorkflowExecutionMock.mockResolvedValue({})
714+
loadWorkflowStateForExecutionMock.mockResolvedValue({
715+
blocks: {},
716+
edges: [],
717+
loops: {},
718+
parallels: {},
719+
})
720+
dbMocks.execute.mockResolvedValue(undefined)
721+
})
722+
723+
it('writes markers to Redis (not the row) when the flag is on and Redis accepts the write', async () => {
724+
isFeatureEnabledMock.mockResolvedValue(true)
725+
setLastStartedBlockMock.mockResolvedValue(true)
726+
setLastCompletedBlockMock.mockResolvedValue(true)
727+
const session = new LoggingSession('wf-1', 'exec-redis', 'manual', 'req-1')
728+
await session.start({ workspaceId: 'ws-1' })
729+
730+
await session.onBlockStart('b1', 'Fetch', 'api', '2026-06-27T10:00:00.000Z')
731+
await session.onBlockComplete('b1', 'Fetch', 'api', { endedAt: '2026-06-27T10:00:01.000Z' })
732+
733+
expect(setLastStartedBlockMock).toHaveBeenCalledWith(
734+
'exec-redis',
735+
expect.objectContaining({ blockId: 'b1', startedAt: '2026-06-27T10:00:00.000Z' })
736+
)
737+
expect(setLastCompletedBlockMock).toHaveBeenCalledWith(
738+
'exec-redis',
739+
expect.objectContaining({ blockId: 'b1', success: true })
740+
)
741+
expect(dbMocks.execute).not.toHaveBeenCalled()
742+
})
743+
744+
it('falls back to the SQL UPDATE when the flag is on but the Redis write fails', async () => {
745+
isFeatureEnabledMock.mockResolvedValue(true)
746+
setLastStartedBlockMock.mockResolvedValue(false)
747+
const session = new LoggingSession('wf-1', 'exec-redis-down', 'manual', 'req-1')
748+
await session.start({ workspaceId: 'ws-1' })
749+
750+
await session.onBlockStart('b1', 'Fetch', 'api', '2026-06-27T10:00:00.000Z')
751+
752+
expect(setLastStartedBlockMock).toHaveBeenCalled()
753+
expect(dbMocks.execute).toHaveBeenCalledTimes(1)
754+
})
755+
756+
it('writes markers via jsonb_set UPDATE when the flag is off', async () => {
757+
isFeatureEnabledMock.mockResolvedValue(false)
758+
const session = new LoggingSession('wf-1', 'exec-sql', 'manual', 'req-1')
759+
await session.start({ workspaceId: 'ws-1' })
760+
761+
await session.onBlockStart('b1', 'Fetch', 'api', '2026-06-27T10:00:00.000Z')
762+
763+
expect(dbMocks.execute).toHaveBeenCalledTimes(1)
764+
expect(setLastStartedBlockMock).not.toHaveBeenCalled()
765+
})
766+
767+
it('falls back to the SQL path when flag resolution throws', async () => {
768+
isFeatureEnabledMock.mockRejectedValue(new Error('appconfig unavailable'))
769+
const session = new LoggingSession('wf-1', 'exec-fallback', 'manual', 'req-1')
770+
await session.start({ workspaceId: 'ws-1' })
771+
772+
await session.onBlockStart('b1', 'Fetch', 'api', '2026-06-27T10:00:00.000Z')
773+
774+
expect(dbMocks.execute).toHaveBeenCalledTimes(1)
775+
expect(setLastStartedBlockMock).not.toHaveBeenCalled()
776+
})
777+
778+
it('tells completion to read Redis markers only when the flag is on (no wasted ops when off)', async () => {
779+
completeWorkflowExecutionMock.mockResolvedValue({})
780+
781+
isFeatureEnabledMock.mockResolvedValue(true)
782+
const onSession = new LoggingSession('wf-1', 'exec-on', 'manual', 'req-1')
783+
await onSession.start({ workspaceId: 'ws-1' })
784+
await onSession.safeComplete({ finalOutput: { ok: true } })
785+
expect(completeWorkflowExecutionMock).toHaveBeenLastCalledWith(
786+
expect.objectContaining({ executionId: 'exec-on', readProgressMarkers: true })
787+
)
788+
789+
isFeatureEnabledMock.mockResolvedValue(false)
790+
const offSession = new LoggingSession('wf-1', 'exec-off', 'manual', 'req-1')
791+
await offSession.start({ workspaceId: 'ws-1' })
792+
await offSession.safeComplete({ finalOutput: { ok: true } })
793+
expect(completeWorkflowExecutionMock).toHaveBeenLastCalledWith(
794+
expect.objectContaining({ executionId: 'exec-off', readProgressMarkers: false })
795+
)
796+
})
650797
})

0 commit comments

Comments
 (0)