diff --git a/packages/server/api/src/app/flows/flow-version/flow-version.service.ts b/packages/server/api/src/app/flows/flow-version/flow-version.service.ts index d71aff80106..3e11ad975aa 100644 --- a/packages/server/api/src/app/flows/flow-version/flow-version.service.ts +++ b/packages/server/api/src/app/flows/flow-version/flow-version.service.ts @@ -64,6 +64,18 @@ export const flowVersionService = (log: FastifyBaseLogger) => ({ notes: previousVersion.notes, }, }] + if ( + previousVersion.trigger.type === FlowTriggerType.PIECE && + !isNil(previousVersion.trigger.settings.sampleData) + ) { + operations.push({ + type: FlowOperationType.UPDATE_SAMPLE_DATA_INFO, + request: { + stepName: previousVersion.trigger.name, + sampleDataSettings: previousVersion.trigger.settings.sampleData, + }, + }) + } break } case FlowOperationType.SAVE_SAMPLE_DATA: { diff --git a/packages/server/api/src/app/flows/flow/flow.service.ts b/packages/server/api/src/app/flows/flow/flow.service.ts index 3262912a385..e0c05dd4440 100644 --- a/packages/server/api/src/app/flows/flow/flow.service.ts +++ b/packages/server/api/src/app/flows/flow/flow.service.ts @@ -13,6 +13,7 @@ import { FlowOperationType, flowPieceUtil, FlowStatus, + FlowTriggerType, FlowVersion, FlowVersionId, FlowVersionState, @@ -865,16 +866,31 @@ async function createNewDraftIfVersionIsPublished({ displayName: lockedVersion.displayName, notes: lockedVersion.notes, }) - lastVersion = await flowVersionService(log).applyOperation({ - userId, - projectId, - platformId, - flowVersion: lastVersion, - userOperation: { - type: FlowOperationType.IMPORT_FLOW, - request: lockedVersion, - }, - }) + const operations: FlowOperationRequest[] = [{ + type: FlowOperationType.IMPORT_FLOW, + request: lockedVersion, + }] + if ( + lockedVersion.trigger.type === FlowTriggerType.PIECE && + !isNil(lockedVersion.trigger.settings.sampleData) + ) { + operations.push({ + type: FlowOperationType.UPDATE_SAMPLE_DATA_INFO, + request: { + stepName: lockedVersion.trigger.name, + sampleDataSettings: lockedVersion.trigger.settings.sampleData, + }, + }) + } + for (const operation of operations) { + lastVersion = await flowVersionService(log).applyOperation({ + userId, + projectId, + platformId, + flowVersion: lastVersion, + userOperation: operation, + }) + } } return lastVersion } diff --git a/packages/server/api/test/unit/app/flows/flow-version/flow-version.service.test.ts b/packages/server/api/test/unit/app/flows/flow-version/flow-version.service.test.ts index e2e32b116de..9ab97513b58 100644 --- a/packages/server/api/test/unit/app/flows/flow-version/flow-version.service.test.ts +++ b/packages/server/api/test/unit/app/flows/flow-version/flow-version.service.test.ts @@ -1,19 +1,25 @@ import { describe, it, expect, vi, beforeEach } from 'vitest' import { FlowActionType, + FlowOperationType, FlowTriggerType, FlowVersionState, + PieceTrigger, + SampleDataSettings, } from '@activepieces/shared' import type { FlowVersion } from '@activepieces/shared' const mockGetPiece = vi.fn() const mockGetPlatformId = vi.fn().mockResolvedValue('platform-1') +const mockRepoFindOne = vi.fn() +const mockRepoSave = vi.fn() +const mockRepoExists = vi.fn() vi.mock('../../../../../src/app/core/db/repo-factory', () => ({ repoFactory: vi.fn(() => () => ({ - findOne: vi.fn().mockResolvedValue(null), - save: vi.fn(), - exists: vi.fn().mockResolvedValue(false), + findOne: mockRepoFindOne, + save: mockRepoSave, + exists: mockRepoExists, })), })) @@ -55,11 +61,12 @@ vi.mock('../../../../../src/app/flows/flow-version/flow-version-side-effects', ( vi.mock('../../../../../src/app/flows/flow-version/flow-version-validator-util', () => ({ flowVersionValidationUtil: vi.fn(() => ({ - prepareRequest: vi.fn((r: unknown) => Promise.resolve(r)), + prepareRequest: vi.fn(({ request }: { request: unknown }) => Promise.resolve(request)), })), })) import { flowVersionService } from '../../../../../src/app/flows/flow-version/flow-version.service' +import type { FastifyBaseLogger } from 'fastify' const mockLog = { info: vi.fn(), @@ -71,28 +78,33 @@ const mockLog = { trace: vi.fn(), silent: vi.fn(), level: 'info', -} as any +} as unknown as FastifyBaseLogger -function makeFlowVersionWithPiece(): FlowVersion { +function makePieceTriggerSettings(extras: Partial = {}): PieceTrigger['settings'] { return { - id: 'fv-1', + pieceName: '@activepieces/piece-gmail', + pieceVersion: '~0.1.0', + triggerName: 'new_email', + input: {}, + propertySettings: {}, + ...extras, + } +} + +function makeFlowVersion(overrides: { id?: string, trigger?: FlowVersion['trigger'] } = {}): FlowVersion { + return { + id: overrides.id ?? 'fv-1', created: '2024-01-01T00:00:00Z', updated: '2024-01-01T00:00:00Z', flowId: 'flow-1', displayName: 'Test Flow', - trigger: { - name: 'trigger_1', + trigger: overrides.trigger ?? { + name: 'trigger', valid: true, displayName: 'Gmail Trigger', lastUpdatedDate: '2024-01-01T00:00:00Z', type: FlowTriggerType.PIECE, - settings: { - pieceName: '@activepieces/piece-gmail', - pieceVersion: '~0.1.0', - triggerName: 'new_email', - input: {}, - propertySettings: {}, - }, + settings: makePieceTriggerSettings(), nextAction: { name: 'step_1', valid: true, @@ -119,47 +131,94 @@ function makeFlowVersionWithPiece(): FlowVersion { } } -describe('lockPieceVersions', () => { +describe('flowVersionService.applyOperation - USE_AS_DRAFT', () => { beforeEach(() => { vi.clearAllMocks() mockGetPlatformId.mockResolvedValue('platform-1') + mockRepoFindOne.mockResolvedValue(null) + mockRepoSave.mockImplementation((v: FlowVersion) => Promise.resolve(v)) + mockRepoExists.mockResolvedValue(false) }) - it('returns success:false when a piece is not found', async () => { - mockGetPiece.mockResolvedValue(null) + it('preserves PIECE trigger sample data from the previous version', async () => { + const sampleData: SampleDataSettings = { + sampleDataFileId: 'sd-file-1', + sampleDataInputFileId: 'sdi-file-1', + lastTestDate: '2024-01-01T00:00:00Z', + } + const currentDraft = makeFlowVersion() + const previousVersion = makeFlowVersion({ + id: 'fv-prev', + trigger: { + ...makeFlowVersion().trigger, + settings: makePieceTriggerSettings({ sampleData }), + } as PieceTrigger, + }) + mockRepoFindOne.mockResolvedValue(previousVersion) - const result = await flowVersionService(mockLog).lockPieceVersions({ + const result = await flowVersionService(mockLog).applyOperation({ projectId: 'proj-1', - flowVersion: makeFlowVersionWithPiece(), + platformId: 'platform-1', + userId: 'user-1', + flowVersion: currentDraft, + userOperation: { + type: FlowOperationType.USE_AS_DRAFT, + request: { versionId: 'fv-prev' }, + }, }) - expect(result.success).toBe(false) - expect(result.message).toContain('@activepieces/piece-gmail') + expect(result.trigger.type).toBe(FlowTriggerType.PIECE) + const settings = (result.trigger as PieceTrigger).settings + expect(settings.sampleData?.sampleDataFileId).toBe(sampleData.sampleDataFileId) + expect(settings.sampleData?.sampleDataInputFileId).toBe(sampleData.sampleDataInputFileId) }) - it('returns success:true with locked versions when all pieces are found', async () => { - mockGetPiece.mockImplementation(({ name }: { name: string }) => - Promise.resolve({ version: name.includes('gmail') ? '1.2.3' : '4.5.6' }), - ) + it('does not set trigger sample data when previous version has no sampleData', async () => { + const currentDraft = makeFlowVersion() + const previousVersion = makeFlowVersion({ id: 'fv-prev' }) + mockRepoFindOne.mockResolvedValue(previousVersion) - const result = await flowVersionService(mockLog).lockPieceVersions({ + const result = await flowVersionService(mockLog).applyOperation({ projectId: 'proj-1', - flowVersion: makeFlowVersionWithPiece(), + platformId: 'platform-1', + userId: 'user-1', + flowVersion: currentDraft, + userOperation: { + type: FlowOperationType.USE_AS_DRAFT, + request: { versionId: 'fv-prev' }, + }, }) - expect(result.success).toBe(true) - expect(result.data?.trigger.settings.pieceVersion).toBe('1.2.3') + expect(result.trigger.type).toBe(FlowTriggerType.PIECE) + expect((result.trigger as PieceTrigger).settings.sampleData).toBeUndefined() }) - it('returns the flow as-is when already LOCKED', async () => { - const locked: FlowVersion = { ...makeFlowVersionWithPiece(), state: FlowVersionState.LOCKED } + it('skips the sample data preservation when previous version has an EMPTY trigger', async () => { + const currentDraft = makeFlowVersion() + const previousVersion = makeFlowVersion({ + id: 'fv-prev', + trigger: { + name: 'trigger', + valid: false, + displayName: 'Select Trigger', + lastUpdatedDate: '2024-01-01T00:00:00Z', + type: FlowTriggerType.EMPTY, + settings: {}, + }, + }) + mockRepoFindOne.mockResolvedValue(previousVersion) - const result = await flowVersionService(mockLog).lockPieceVersions({ + const result = await flowVersionService(mockLog).applyOperation({ projectId: 'proj-1', - flowVersion: locked, + platformId: 'platform-1', + userId: 'user-1', + flowVersion: currentDraft, + userOperation: { + type: FlowOperationType.USE_AS_DRAFT, + request: { versionId: 'fv-prev' }, + }, }) - expect(result.success).toBe(true) - expect(result.data).toBe(locked) + expect(result.trigger.type).toBe(FlowTriggerType.EMPTY) }) }) diff --git a/packages/server/engine/src/lib/handler/code-executor.ts b/packages/server/engine/src/lib/handler/code-executor.ts index eaaf8899d1f..282305c6d3b 100644 --- a/packages/server/engine/src/lib/handler/code-executor.ts +++ b/packages/server/engine/src/lib/handler/code-executor.ts @@ -3,9 +3,9 @@ import { LATEST_CONTEXT_VERSION } from '@activepieces/pieces-framework' import { CodeAction, EngineGenericError, FlowActionType, FlowRunStatus, GenericStepOutput, isNil, StepOutputStatus } from '@activepieces/shared' import { initCodeSandbox } from '../core/code/code-sandbox' import { continueIfFailureHandler, runWithExponentialBackoff } from '../helper/error-handling' +import { flowRunProgressReporter } from '../helper/flow-run-progress-reporter' import { utils } from '../utils' import { ActionHandler, BaseExecutor } from './base-executor' -import { runProgressService } from './run-progress' export const codeExecutor: BaseExecutor = { async handle({ @@ -35,7 +35,7 @@ const executeAction: ActionHandler = async ({ action, executionState }) const { data: executionStateResult, error: executionStateError } = await utils.tryCatchAndThrowOnEngineError((async () => { - await runProgressService.sendUpdate({ + await flowRunProgressReporter.sendUpdate({ engineConstants: constants, flowExecutorContext: executionState.upsertStep(action.name, stepOutput), stepNameToUpdate: action.name, diff --git a/packages/server/engine/src/lib/handler/flow-executor.ts b/packages/server/engine/src/lib/handler/flow-executor.ts index 5eb0687e4ab..7fac445f2e9 100644 --- a/packages/server/engine/src/lib/handler/flow-executor.ts +++ b/packages/server/engine/src/lib/handler/flow-executor.ts @@ -1,6 +1,7 @@ import { performance } from 'node:perf_hooks' import { EngineGenericError, ExecuteFlowOperation, ExecutionType, FlowAction, FlowActionType, FlowRunStatus, FlowTrigger, GenericStepOutput, isNil, StepOutputStatus } from '@activepieces/shared' import dayjs from 'dayjs' +import { flowRunProgressReporter } from '../helper/flow-run-progress-reporter' import { loggingUtils } from '../helper/logging-utils' import { triggerHelper } from '../helper/trigger-helper' import { BaseExecutor } from './base-executor' @@ -10,7 +11,6 @@ import { FlowExecutorContext } from './context/flow-execution-context' import { loopExecutor } from './loop-executor' import { pieceExecutor } from './piece-executor' import { routerExecuter } from './router-executor' -import { runProgressService } from './run-progress' function getExecuteFunction(): Record> { return { @@ -39,14 +39,15 @@ export const flowExecutor = { }): Promise { const trigger = input.flowVersion.trigger if (input.executionType === ExecutionType.BEGIN) { - void runProgressService.backup({ + await flowRunProgressReporter.sendUpdate({ engineConstants: constants, flowExecutorContext: executionState, - }).catch((err) => { + }) + void flowRunProgressReporter.backup().catch((err) => { console.error('[Progress] Initial payload upload failed', err) }) await triggerHelper.executeOnStart(trigger, constants, input.triggerPayload) - await runProgressService.sendUpdate({ + await flowRunProgressReporter.sendUpdate({ engineConstants: constants, flowExecutorContext: executionState, stepNameToUpdate: trigger.name, @@ -82,7 +83,7 @@ export const flowExecutor = { } const handler = this.getExecutorForAction(currentAction.type) - await runProgressService.sendUpdate({ + await flowRunProgressReporter.sendUpdate({ engineConstants: constants, flowExecutorContext: flowExecutionContext, stepNameToUpdate: previousAction!.name, @@ -108,7 +109,7 @@ export const flowExecutor = { } - await runProgressService.sendUpdate({ + await flowRunProgressReporter.sendUpdate({ engineConstants: constants, flowExecutorContext: flowExecutionContext, stepNameToUpdate: previousAction?.name, diff --git a/packages/server/engine/src/lib/handler/piece-executor.ts b/packages/server/engine/src/lib/handler/piece-executor.ts index ebae767cbae..58b043116a8 100644 --- a/packages/server/engine/src/lib/handler/piece-executor.ts +++ b/packages/server/engine/src/lib/handler/piece-executor.ts @@ -3,6 +3,7 @@ import { AUTHENTICATION_PROPERTY_NAME, EngineGenericError, ExecutionType, FlowAc import type { ToolSet } from 'ai' import dayjs from 'dayjs' import { continueIfFailureHandler, runWithExponentialBackoff } from '../helper/error-handling' +import { flowRunProgressReporter } from '../helper/flow-run-progress-reporter' import { pieceLoader } from '../helper/piece-loader' import { createFileUploader } from '../piece-context/file-uploader' import { createFlowsContext } from '../piece-context/flows' @@ -14,7 +15,6 @@ import { propsProcessor } from '../variables/props-processor' import { workerSocket } from '../worker-socket' import { ActionHandler, BaseExecutor } from './base-executor' import { EngineConstants } from './context/engine-constants' -import { runProgressService } from './run-progress' const AP_PAUSED_FLOW_TIMEOUT_DAYS = Number(process.env.AP_PAUSED_FLOW_TIMEOUT_DAYS) @@ -73,7 +73,7 @@ const executeAction: ActionHandler = async ({ action, executionStat tags: [], }, } - const outputContext = runProgressService.createOutputContext({ + const outputContext = flowRunProgressReporter.createOutputContext({ engineConstants: constants, flowExecutorContext: executionState, stepName: action.name, @@ -82,7 +82,7 @@ const executeAction: ActionHandler = async ({ action, executionStat const isPaused = executionState.isPaused({ stepName: action.name }) if (!isPaused) { - await runProgressService.sendUpdate({ + await flowRunProgressReporter.sendUpdate({ engineConstants: constants, flowExecutorContext: executionState.upsertStep(action.name, stepOutput), stepNameToUpdate: action.name, diff --git a/packages/server/engine/src/lib/handler/run-progress.ts b/packages/server/engine/src/lib/helper/flow-run-progress-reporter.ts similarity index 77% rename from packages/server/engine/src/lib/handler/run-progress.ts rename to packages/server/engine/src/lib/helper/flow-run-progress-reporter.ts index 9476998c847..ec8695fc2a2 100644 --- a/packages/server/engine/src/lib/handler/run-progress.ts +++ b/packages/server/engine/src/lib/helper/flow-run-progress-reporter.ts @@ -2,57 +2,36 @@ import { promisify } from 'node:util' import { zstdCompress as zstdCompressCallback } from 'node:zlib' import { setTimeout } from 'timers/promises' import { OutputContext } from '@activepieces/pieces-framework' -import { CONTENT_ENCODING_ZSTD, DEFAULT_MCP_DATA, EngineGenericError, FlowActionType, GenericStepOutput, isFlowRunStateTerminal, isNil, logSerializer, RunEnvironment, StepOutput, StepOutputStatus, StepRunResponse, UpdateRunProgressRequest, UploadRunLogsRequest } from '@activepieces/shared' +import { CONTENT_ENCODING_ZSTD, DEFAULT_MCP_DATA, EngineGenericError, FlowActionType, GenericStepOutput, isFlowRunStateTerminal, isNil, logSerializer, RunEnvironment, StepOutput, StepOutputStatus, StepRunResponse, tryCatch, UpdateRunProgressRequest, UploadRunLogsRequest } from '@activepieces/shared' import { Mutex } from 'async-mutex' import dayjs from 'dayjs' import fetchRetry from 'fetch-retry' +import { EngineConstants } from '../handler/context/engine-constants' +import { FlowExecutorContext } from '../handler/context/flow-execution-context' import { utils } from '../utils' import { workerSocket } from '../worker-socket' -import { EngineConstants } from './context/engine-constants' -import { FlowExecutorContext } from './context/flow-execution-context' const zstdCompress = promisify(zstdCompressCallback) -const lock = new Mutex() -const updateLock = new Mutex() +const stateLock = new Mutex() const fetchWithRetry = fetchRetry(global.fetch) -const BACKUP_INTERVAL_MS = 15000 -export let latestUpdateParams: UpdateStepProgressParams | null = null +const SNAPSHOT_FLUSH_INTERVAL_MS = 15000 +let latestUpdateParams: UpdateStepProgressParams | null = null let savedStartTime: string | null = null -let backupController: AbortController | null = null -let backupLoopPromise: Promise | null = null +let flushController: AbortController | null = null +let flushLoopPromise: Promise | null = null -async function backupLoop(signal: AbortSignal): Promise { - while (!signal.aborted) { - try { - if (latestUpdateParams) { - await runProgressService.backup(latestUpdateParams) - } - } - catch (err) { - console.error('[Progress] Backup failed', err) - } - - try { - await setTimeout(BACKUP_INTERVAL_MS, undefined, { signal }) - } - catch { - // sleep aborted → loop will exit naturally - } - } -} - -export const runProgressService = { +export const flowRunProgressReporter = { init: (): void => { - if (backupController) { + if (flushController) { return } - backupController = new AbortController() - backupLoopPromise = backupLoop(backupController.signal) + flushController = new AbortController() + flushLoopPromise = runFlushLoop(flushController.signal) }, sendUpdate: async (params: UpdateStepProgressParams): Promise => { - return updateLock.runExclusive(async () => { + return stateLock.runExclusive(async () => { const { engineConstants, flowExecutorContext, stepNameToUpdate } = params if (params.startTime) { savedStartTime = params.startTime @@ -109,13 +88,19 @@ export const runProgressService = { }, } }, - backup: async (updateParams: BackUpLogsParams): Promise => { - const isRunningMcp = updateParams.engineConstants.flowRunId === DEFAULT_MCP_DATA.flowRunId - if (isRunningMcp) { - return - } - await lock.runExclusive(async () => { - const { flowExecutorContext, engineConstants } = updateParams + backup: async (): Promise => { + await stateLock.runExclusive(async () => { + const params = latestUpdateParams + if (isNil(params)) { + return + } + const { flowExecutorContext, engineConstants } = params + if (engineConstants.flowRunId === DEFAULT_MCP_DATA.flowRunId) { + return + } + const status = flowExecutorContext.verdict.status + const isTerminal = isFlowRunStateTerminal({ status, ignoreInternalError: false }) + const serialized = await logSerializer.serialize({ executionState: { steps: flowExecutorContext.steps, @@ -142,17 +127,14 @@ export const runProgressService = { const request: UploadRunLogsRequest = { runId: engineConstants.flowRunId, projectId: engineConstants.projectId, - status: flowExecutorContext.verdict.status, + status, streamStepProgress: engineConstants.streamStepProgress, logsFileId: engineConstants.logsFileId, failedStep: 'failedStep' in flowExecutorContext.verdict ? flowExecutorContext.verdict.failedStep : undefined, stepNameToTest: engineConstants.stepNameToTest, stepResponse, startTime: savedStartTime ?? undefined, - finishTime: isFlowRunStateTerminal({ - status: flowExecutorContext.verdict.status, - ignoreInternalError: false, - }) ? dayjs().toISOString() : undefined, + finishTime: isTerminal ? dayjs().toISOString() : undefined, tags: Array.from(flowExecutorContext.tags), stepsCount: flowExecutorContext.stepsCount, } @@ -160,25 +142,37 @@ export const runProgressService = { }) }, shutdown: async () => { - if (!backupController) { + if (!flushController) { return } - backupController.abort() + flushController.abort() - if (backupLoopPromise) { - await backupLoopPromise + if (flushLoopPromise) { + await flushLoopPromise } - backupController = null - backupLoopPromise = null + flushController = null + flushLoopPromise = null latestUpdateParams = null savedStartTime = null }, } -process.on('SIGTERM', () => void runProgressService.shutdown()) -process.on('SIGINT', () => void runProgressService.shutdown()) +process.on('SIGTERM', () => void flowRunProgressReporter.shutdown()) +process.on('SIGINT', () => void flowRunProgressReporter.shutdown()) + +async function runFlushLoop(signal: AbortSignal): Promise { + while (!signal.aborted) { + const { error: flushError } = await tryCatch(() => flowRunProgressReporter.backup()) + if (flushError) { + console.error('[Progress] Snapshot flush failed', flushError) + } + + // sleep aborted → loop will exit naturally on the next signal check + await tryCatch(() => setTimeout(SNAPSHOT_FLUSH_INTERVAL_MS, undefined, { signal })) + } +} const sendUpdateProgress = async (request: UpdateRunProgressRequest): Promise => { const result = await utils.tryCatchAndThrowOnEngineError(() => @@ -243,12 +237,6 @@ type UpdateStepProgressParams = { startTime?: string } -type BackUpLogsParams = { - engineConstants: EngineConstants - flowExecutorContext: FlowExecutorContext - stepNameToUpdate?: string -} - type CreateOutputContextParams = { engineConstants: EngineConstants flowExecutorContext: FlowExecutorContext diff --git a/packages/server/engine/src/lib/operations/flow.operation.ts b/packages/server/engine/src/lib/operations/flow.operation.ts index e084dcd7ce1..09953f39e5f 100644 --- a/packages/server/engine/src/lib/operations/flow.operation.ts +++ b/packages/server/engine/src/lib/operations/flow.operation.ts @@ -21,7 +21,7 @@ import { EngineConstants } from '../handler/context/engine-constants' import { FlowExecutorContext } from '../handler/context/flow-execution-context' import { testExecutionContext } from '../handler/context/test-execution-context' import { flowExecutor } from '../handler/flow-executor' -import { runProgressService } from '../handler/run-progress' +import { flowRunProgressReporter } from '../helper/flow-run-progress-reporter' import { triggerHelper } from '../helper/trigger-helper' export const flowOperation = { @@ -29,10 +29,11 @@ export const flowOperation = { const input = operation as ExecuteFlowOperation const constants = EngineConstants.fromExecuteFlowInput(input) const output: FlowExecutorContext = (await executieSingleStepOrFlowOperation(input)).finishExecution() - await runProgressService.backup({ + await flowRunProgressReporter.sendUpdate({ engineConstants: constants, flowExecutorContext: output, }) + await flowRunProgressReporter.backup() const status = output.verdict.status === FlowRunStatus.LOG_SIZE_EXCEEDED ? EngineResponseStatus.LOG_SIZE_EXCEEDED : EngineResponseStatus.OK diff --git a/packages/server/engine/src/lib/worker-socket.ts b/packages/server/engine/src/lib/worker-socket.ts index 6a2bdc02bba..255b0d38066 100644 --- a/packages/server/engine/src/lib/worker-socket.ts +++ b/packages/server/engine/src/lib/worker-socket.ts @@ -12,7 +12,7 @@ import { WorkerNotifyContract, } from '@activepieces/shared' import { io, type ManagerOptions, type Socket, type SocketOptions } from 'socket.io-client' -import { runProgressService } from './handler/run-progress' +import { flowRunProgressReporter } from './helper/flow-run-progress-reporter' import { execute } from './operations' let socket: Socket | undefined @@ -51,13 +51,13 @@ export const workerSocket = { createRpcServer(socket, { executeOperation: async ({ operationType, operation }): Promise> => { - runProgressService.init() + flowRunProgressReporter.init() try { const response = await execute(operationType, operation) return JSON.parse(JSON.stringify(response)) as EngineResponse } finally { - await runProgressService.shutdown() + await flowRunProgressReporter.shutdown() } }, }) diff --git a/packages/server/engine/src/main.ts b/packages/server/engine/src/main.ts index 4aa52b82af9..b24abab76a3 100755 --- a/packages/server/engine/src/main.ts +++ b/packages/server/engine/src/main.ts @@ -1,5 +1,5 @@ import { isNil } from '@activepieces/shared' -import { runProgressService } from './lib/handler/run-progress' +import { flowRunProgressReporter } from './lib/helper/flow-run-progress-reporter' import { ssrfGuard } from './lib/network/ssrf-guard' import { workerSocket } from './lib/worker-socket' @@ -10,7 +10,7 @@ process.title = `sandbox-${SANDBOX_ID}` if (!isNil(SANDBOX_ID)) { workerSocket.init(SANDBOX_ID) - runProgressService.init() + flowRunProgressReporter.init() } process.on('uncaughtException', (error) => { diff --git a/packages/server/engine/test/handler/flow-log-size.test.ts b/packages/server/engine/test/handler/flow-log-size.test.ts index 2b558d99bae..2bfb6c602f2 100644 --- a/packages/server/engine/test/handler/flow-log-size.test.ts +++ b/packages/server/engine/test/handler/flow-log-size.test.ts @@ -4,8 +4,8 @@ import { FlowExecutorContext } from '../../src/lib/handler/context/flow-executio import { flowExecutor } from '../../src/lib/handler/flow-executor' import { buildCodeAction, buildMockBeginExecuteFlowOperation, buildSimpleLoopAction, generateMockEngineConstants } from './test-helper' -vi.mock('../../src/lib/handler/run-progress', () => ({ - runProgressService: { +vi.mock('../../src/lib/helper/flow-run-progress-reporter', () => ({ + flowRunProgressReporter: { sendUpdate: vi.fn().mockResolvedValue(undefined), backup: vi.fn().mockResolvedValue(undefined), init: vi.fn(), diff --git a/packages/server/engine/test/handler/test-helper.ts b/packages/server/engine/test/handler/test-helper.ts index 713dee02bdb..42a74b5a6e4 100644 --- a/packages/server/engine/test/handler/test-helper.ts +++ b/packages/server/engine/test/handler/test-helper.ts @@ -27,6 +27,8 @@ export const generateMockEngineConstants = (params?: Partial): runEnvironment: params?.runEnvironment ?? RunEnvironment.TESTING, stepNameToTest: params?.stepNameToTest ?? undefined, stepNames: params?.stepNames ?? [], + logsUploadUrl: params?.logsUploadUrl, + logsFileId: params?.logsFileId, }) } diff --git a/packages/server/engine/test/helper/flow-run-progress-reporter.test.ts b/packages/server/engine/test/helper/flow-run-progress-reporter.test.ts new file mode 100644 index 00000000000..0e0c5fd2fdc --- /dev/null +++ b/packages/server/engine/test/helper/flow-run-progress-reporter.test.ts @@ -0,0 +1,112 @@ +import { FlowRunStatus, StreamStepProgress, UpdateRunProgressRequest, UploadRunLogsRequest } from '@activepieces/shared' +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' +import { FlowExecutorContext } from '../../src/lib/handler/context/flow-execution-context' +import { generateMockEngineConstants } from '../handler/test-helper' + +const { uploadRunLogMock, updateRunProgressMock } = vi.hoisted(() => ({ + uploadRunLogMock: vi.fn<(request: UploadRunLogsRequest) => Promise>(async () => undefined), + updateRunProgressMock: vi.fn<(request: UpdateRunProgressRequest) => Promise>(async () => undefined), +})) + +vi.mock('../../src/lib/worker-socket', () => ({ + workerSocket: { + getWorkerClient: () => ({ + uploadRunLog: uploadRunLogMock, + updateRunProgress: updateRunProgressMock, + updateStepProgress: vi.fn(), + }), + }, +})) + +vi.mock('fetch-retry', () => ({ + default: () => async () => new Response(null, { status: 200 }), +})) + +import { flowRunProgressReporter } from '../../src/lib/helper/flow-run-progress-reporter' + +const buildUpdateParams = ({ status }: { status: FlowRunStatus }) => { + const engineConstants = generateMockEngineConstants({ + streamStepProgress: StreamStepProgress.NONE, + logsUploadUrl: 'http://127.0.0.1:65535/upload', + }) + const flowExecutorContext = new FlowExecutorContext() + flowExecutorContext.verdict = status === FlowRunStatus.RUNNING + ? { status: FlowRunStatus.RUNNING } + : { status: FlowRunStatus.SUCCEEDED, stopResponse: undefined } + return { engineConstants, flowExecutorContext } +} + +const uploadStatuses = (): FlowRunStatus[] => + uploadRunLogMock.mock.calls.map(([request]) => request.status) + +const lastUploadStatus = (): FlowRunStatus | undefined => uploadStatuses().at(-1) + +describe('flow-run-progress-reporter backup ordering', () => { + beforeEach(() => { + uploadRunLogMock.mockClear() + updateRunProgressMock.mockClear() + }) + + afterEach(async () => { + await flowRunProgressReporter.shutdown() + }) + + it('the last write wins: a periodic backup firing after the terminal sendUpdate cannot overwrite SUCCEEDED', async () => { + flowRunProgressReporter.init() + + await flowRunProgressReporter.sendUpdate(buildUpdateParams({ status: FlowRunStatus.RUNNING })) + await flowRunProgressReporter.sendUpdate(buildUpdateParams({ status: FlowRunStatus.SUCCEEDED })) + await flowRunProgressReporter.backup() + + // Simulate the periodic loop firing one more time after the terminal + // state is set. It must read the current latest state (SUCCEEDED) — not + // a stale RUNNING — so the run never reverts to running. + await flowRunProgressReporter.backup() + + expect(lastUploadStatus()).toBe(FlowRunStatus.SUCCEEDED) + expect(uploadStatuses()).not.toContain(FlowRunStatus.RUNNING) + }) + + it('preserves order under concurrent backup calls', async () => { + flowRunProgressReporter.init() + + await flowRunProgressReporter.sendUpdate(buildUpdateParams({ status: FlowRunStatus.RUNNING })) + const firstBackup = flowRunProgressReporter.backup() + await flowRunProgressReporter.sendUpdate(buildUpdateParams({ status: FlowRunStatus.SUCCEEDED })) + const secondBackup = flowRunProgressReporter.backup() + await Promise.all([firstBackup, secondBackup]) + + expect(lastUploadStatus()).toBe(FlowRunStatus.SUCCEEDED) + const allStatuses = uploadStatuses() + const terminalIndex = allStatuses.indexOf(FlowRunStatus.SUCCEEDED) + const runningAfterTerminal = allStatuses + .slice(terminalIndex + 1) + .some((s) => s === FlowRunStatus.RUNNING) + expect(runningAfterTerminal).toBe(false) + }) + + it('still uploads RUNNING progress while the flow is in progress', async () => { + flowRunProgressReporter.init() + + await flowRunProgressReporter.sendUpdate(buildUpdateParams({ status: FlowRunStatus.RUNNING })) + await flowRunProgressReporter.backup() + + expect(lastUploadStatus()).toBe(FlowRunStatus.RUNNING) + }) + + it('clears state on shutdown so the next run starts clean', async () => { + flowRunProgressReporter.init() + await flowRunProgressReporter.sendUpdate(buildUpdateParams({ status: FlowRunStatus.SUCCEEDED })) + await flowRunProgressReporter.backup() + await flowRunProgressReporter.shutdown() + + flowRunProgressReporter.init() + const before = uploadRunLogMock.mock.calls.length + await flowRunProgressReporter.backup() + expect(uploadRunLogMock.mock.calls.length).toBe(before) + + await flowRunProgressReporter.sendUpdate(buildUpdateParams({ status: FlowRunStatus.RUNNING })) + await flowRunProgressReporter.backup() + expect(lastUploadStatus()).toBe(FlowRunStatus.RUNNING) + }) +}) diff --git a/packages/server/engine/test/operations/flow-operation-invariants.test.ts b/packages/server/engine/test/operations/flow-operation-invariants.test.ts index 4c5246792c8..4a15d346ccd 100644 --- a/packages/server/engine/test/operations/flow-operation-invariants.test.ts +++ b/packages/server/engine/test/operations/flow-operation-invariants.test.ts @@ -10,9 +10,10 @@ import { } from '@activepieces/shared' import type { BeginExecuteFlowOperation, FlowVersion } from '@activepieces/shared' -vi.mock('../../src/lib/handler/run-progress', () => ({ - runProgressService: { - backup: vi.fn(), +vi.mock('../../src/lib/helper/flow-run-progress-reporter', () => ({ + flowRunProgressReporter: { + sendUpdate: vi.fn().mockResolvedValue(undefined), + backup: vi.fn().mockResolvedValue(undefined), }, }))