Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,18 @@ export const flowVersionService = (log: FastifyBaseLogger) => ({
notes: previousVersion.notes,
},
}]
if (
previousVersion.trigger.type === FlowTriggerType.PIECE &&
!isNil(previousVersion.trigger.settings.sampleData)
) {
operations.push({
type: FlowOperationType.UPDATE_SAMPLE_DATA_INFO,
request: {
stepName: previousVersion.trigger.name,
sampleDataSettings: previousVersion.trigger.settings.sampleData,
},
})
}
break
}
case FlowOperationType.SAVE_SAMPLE_DATA: {
Expand Down
36 changes: 26 additions & 10 deletions packages/server/api/src/app/flows/flow/flow.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
FlowOperationType,
flowPieceUtil,
FlowStatus,
FlowTriggerType,
FlowVersion,
FlowVersionId,
FlowVersionState,
Expand Down Expand Up @@ -865,16 +866,31 @@ async function createNewDraftIfVersionIsPublished({
displayName: lockedVersion.displayName,
notes: lockedVersion.notes,
})
lastVersion = await flowVersionService(log).applyOperation({
userId,
projectId,
platformId,
flowVersion: lastVersion,
userOperation: {
type: FlowOperationType.IMPORT_FLOW,
request: lockedVersion,
},
})
const operations: FlowOperationRequest[] = [{
type: FlowOperationType.IMPORT_FLOW,
request: lockedVersion,
}]
if (
lockedVersion.trigger.type === FlowTriggerType.PIECE &&
!isNil(lockedVersion.trigger.settings.sampleData)
) {
operations.push({
type: FlowOperationType.UPDATE_SAMPLE_DATA_INFO,
request: {
stepName: lockedVersion.trigger.name,
sampleDataSettings: lockedVersion.trigger.settings.sampleData,
},
})
}
for (const operation of operations) {
lastVersion = await flowVersionService(log).applyOperation({
userId,
projectId,
platformId,
flowVersion: lastVersion,
userOperation: operation,
})
}
}
return lastVersion
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
import { describe, it, expect, vi, beforeEach } from 'vitest'
import {
FlowActionType,
FlowOperationType,
FlowTriggerType,
FlowVersionState,
PieceTrigger,
SampleDataSettings,
} from '@activepieces/shared'
import type { FlowVersion } from '@activepieces/shared'

const mockGetPiece = vi.fn()
const mockGetPlatformId = vi.fn().mockResolvedValue('platform-1')
const mockRepoFindOne = vi.fn()
const mockRepoSave = vi.fn()
const mockRepoExists = vi.fn()

vi.mock('../../../../../src/app/core/db/repo-factory', () => ({
repoFactory: vi.fn(() => () => ({
findOne: vi.fn().mockResolvedValue(null),
save: vi.fn(),
exists: vi.fn().mockResolvedValue(false),
findOne: mockRepoFindOne,
save: mockRepoSave,
exists: mockRepoExists,
})),
}))

Expand Down Expand Up @@ -55,11 +61,12 @@ vi.mock('../../../../../src/app/flows/flow-version/flow-version-side-effects', (

vi.mock('../../../../../src/app/flows/flow-version/flow-version-validator-util', () => ({
flowVersionValidationUtil: vi.fn(() => ({
prepareRequest: vi.fn((r: unknown) => Promise.resolve(r)),
prepareRequest: vi.fn(({ request }: { request: unknown }) => Promise.resolve(request)),
})),
}))

import { flowVersionService } from '../../../../../src/app/flows/flow-version/flow-version.service'
import type { FastifyBaseLogger } from 'fastify'

const mockLog = {
info: vi.fn(),
Expand All @@ -71,28 +78,33 @@ const mockLog = {
trace: vi.fn(),
silent: vi.fn(),
level: 'info',
} as any
} as unknown as FastifyBaseLogger

function makeFlowVersionWithPiece(): FlowVersion {
function makePieceTriggerSettings(extras: Partial<PieceTrigger['settings']> = {}): PieceTrigger['settings'] {
return {
id: 'fv-1',
pieceName: '@activepieces/piece-gmail',
pieceVersion: '~0.1.0',
triggerName: 'new_email',
input: {},
propertySettings: {},
...extras,
}
}

function makeFlowVersion(overrides: { id?: string, trigger?: FlowVersion['trigger'] } = {}): FlowVersion {
return {
id: overrides.id ?? 'fv-1',
created: '2024-01-01T00:00:00Z',
updated: '2024-01-01T00:00:00Z',
flowId: 'flow-1',
displayName: 'Test Flow',
trigger: {
name: 'trigger_1',
trigger: overrides.trigger ?? {
name: 'trigger',
valid: true,
displayName: 'Gmail Trigger',
lastUpdatedDate: '2024-01-01T00:00:00Z',
type: FlowTriggerType.PIECE,
settings: {
pieceName: '@activepieces/piece-gmail',
pieceVersion: '~0.1.0',
triggerName: 'new_email',
input: {},
propertySettings: {},
},
settings: makePieceTriggerSettings(),
nextAction: {
name: 'step_1',
valid: true,
Expand All @@ -119,47 +131,94 @@ function makeFlowVersionWithPiece(): FlowVersion {
}
}

describe('lockPieceVersions', () => {
describe('flowVersionService.applyOperation - USE_AS_DRAFT', () => {
beforeEach(() => {
vi.clearAllMocks()
mockGetPlatformId.mockResolvedValue('platform-1')
mockRepoFindOne.mockResolvedValue(null)
mockRepoSave.mockImplementation((v: FlowVersion) => Promise.resolve(v))
mockRepoExists.mockResolvedValue(false)
})

it('returns success:false when a piece is not found', async () => {
mockGetPiece.mockResolvedValue(null)
it('preserves PIECE trigger sample data from the previous version', async () => {
const sampleData: SampleDataSettings = {
sampleDataFileId: 'sd-file-1',
sampleDataInputFileId: 'sdi-file-1',
lastTestDate: '2024-01-01T00:00:00Z',
}
const currentDraft = makeFlowVersion()
const previousVersion = makeFlowVersion({
id: 'fv-prev',
trigger: {
...makeFlowVersion().trigger,
settings: makePieceTriggerSettings({ sampleData }),
} as PieceTrigger,
})
mockRepoFindOne.mockResolvedValue(previousVersion)

const result = await flowVersionService(mockLog).lockPieceVersions({
const result = await flowVersionService(mockLog).applyOperation({
projectId: 'proj-1',
flowVersion: makeFlowVersionWithPiece(),
platformId: 'platform-1',
userId: 'user-1',
flowVersion: currentDraft,
userOperation: {
type: FlowOperationType.USE_AS_DRAFT,
request: { versionId: 'fv-prev' },
},
})

expect(result.success).toBe(false)
expect(result.message).toContain('@activepieces/piece-gmail')
expect(result.trigger.type).toBe(FlowTriggerType.PIECE)
const settings = (result.trigger as PieceTrigger).settings
expect(settings.sampleData?.sampleDataFileId).toBe(sampleData.sampleDataFileId)
expect(settings.sampleData?.sampleDataInputFileId).toBe(sampleData.sampleDataInputFileId)
})

it('returns success:true with locked versions when all pieces are found', async () => {
mockGetPiece.mockImplementation(({ name }: { name: string }) =>
Promise.resolve({ version: name.includes('gmail') ? '1.2.3' : '4.5.6' }),
)
it('does not set trigger sample data when previous version has no sampleData', async () => {
const currentDraft = makeFlowVersion()
const previousVersion = makeFlowVersion({ id: 'fv-prev' })
mockRepoFindOne.mockResolvedValue(previousVersion)

const result = await flowVersionService(mockLog).lockPieceVersions({
const result = await flowVersionService(mockLog).applyOperation({
projectId: 'proj-1',
flowVersion: makeFlowVersionWithPiece(),
platformId: 'platform-1',
userId: 'user-1',
flowVersion: currentDraft,
userOperation: {
type: FlowOperationType.USE_AS_DRAFT,
request: { versionId: 'fv-prev' },
},
})

expect(result.success).toBe(true)
expect(result.data?.trigger.settings.pieceVersion).toBe('1.2.3')
expect(result.trigger.type).toBe(FlowTriggerType.PIECE)
expect((result.trigger as PieceTrigger).settings.sampleData).toBeUndefined()
})

it('returns the flow as-is when already LOCKED', async () => {
const locked: FlowVersion = { ...makeFlowVersionWithPiece(), state: FlowVersionState.LOCKED }
it('skips the sample data preservation when previous version has an EMPTY trigger', async () => {
const currentDraft = makeFlowVersion()
const previousVersion = makeFlowVersion({
id: 'fv-prev',
trigger: {
name: 'trigger',
valid: false,
displayName: 'Select Trigger',
lastUpdatedDate: '2024-01-01T00:00:00Z',
type: FlowTriggerType.EMPTY,
settings: {},
},
})
mockRepoFindOne.mockResolvedValue(previousVersion)

const result = await flowVersionService(mockLog).lockPieceVersions({
const result = await flowVersionService(mockLog).applyOperation({
projectId: 'proj-1',
flowVersion: locked,
platformId: 'platform-1',
userId: 'user-1',
flowVersion: currentDraft,
userOperation: {
type: FlowOperationType.USE_AS_DRAFT,
request: { versionId: 'fv-prev' },
},
})

expect(result.success).toBe(true)
expect(result.data).toBe(locked)
expect(result.trigger.type).toBe(FlowTriggerType.EMPTY)
})
})
4 changes: 2 additions & 2 deletions packages/server/engine/src/lib/handler/code-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ import { LATEST_CONTEXT_VERSION } from '@activepieces/pieces-framework'
import { CodeAction, EngineGenericError, FlowActionType, FlowRunStatus, GenericStepOutput, isNil, StepOutputStatus } from '@activepieces/shared'
import { initCodeSandbox } from '../core/code/code-sandbox'
import { continueIfFailureHandler, runWithExponentialBackoff } from '../helper/error-handling'
import { flowRunProgressReporter } from '../helper/flow-run-progress-reporter'
import { utils } from '../utils'
import { ActionHandler, BaseExecutor } from './base-executor'
import { runProgressService } from './run-progress'

export const codeExecutor: BaseExecutor<CodeAction> = {
async handle({
Expand Down Expand Up @@ -35,7 +35,7 @@ const executeAction: ActionHandler<CodeAction> = async ({ action, executionState
})

const { data: executionStateResult, error: executionStateError } = await utils.tryCatchAndThrowOnEngineError((async () => {
await runProgressService.sendUpdate({
await flowRunProgressReporter.sendUpdate({
engineConstants: constants,
flowExecutorContext: executionState.upsertStep(action.name, stepOutput),
stepNameToUpdate: action.name,
Expand Down
13 changes: 7 additions & 6 deletions packages/server/engine/src/lib/handler/flow-executor.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { performance } from 'node:perf_hooks'
import { EngineGenericError, ExecuteFlowOperation, ExecutionType, FlowAction, FlowActionType, FlowRunStatus, FlowTrigger, GenericStepOutput, isNil, StepOutputStatus } from '@activepieces/shared'
import dayjs from 'dayjs'
import { flowRunProgressReporter } from '../helper/flow-run-progress-reporter'
import { loggingUtils } from '../helper/logging-utils'
import { triggerHelper } from '../helper/trigger-helper'
import { BaseExecutor } from './base-executor'
Expand All @@ -10,7 +11,6 @@ import { FlowExecutorContext } from './context/flow-execution-context'
import { loopExecutor } from './loop-executor'
import { pieceExecutor } from './piece-executor'
import { routerExecuter } from './router-executor'
import { runProgressService } from './run-progress'

function getExecuteFunction(): Record<FlowActionType, BaseExecutor<FlowAction>> {
return {
Expand Down Expand Up @@ -39,14 +39,15 @@ export const flowExecutor = {
}): Promise<FlowExecutorContext> {
const trigger = input.flowVersion.trigger
if (input.executionType === ExecutionType.BEGIN) {
void runProgressService.backup({
await flowRunProgressReporter.sendUpdate({
engineConstants: constants,
flowExecutorContext: executionState,
}).catch((err) => {
})
void flowRunProgressReporter.backup().catch((err) => {
console.error('[Progress] Initial payload upload failed', err)
})
await triggerHelper.executeOnStart(trigger, constants, input.triggerPayload)
await runProgressService.sendUpdate({
await flowRunProgressReporter.sendUpdate({
engineConstants: constants,
flowExecutorContext: executionState,
stepNameToUpdate: trigger.name,
Expand Down Expand Up @@ -82,7 +83,7 @@ export const flowExecutor = {
}
const handler = this.getExecutorForAction(currentAction.type)

await runProgressService.sendUpdate({
await flowRunProgressReporter.sendUpdate({
engineConstants: constants,
flowExecutorContext: flowExecutionContext,
stepNameToUpdate: previousAction!.name,
Expand All @@ -108,7 +109,7 @@ export const flowExecutor = {

}

await runProgressService.sendUpdate({
await flowRunProgressReporter.sendUpdate({
engineConstants: constants,
flowExecutorContext: flowExecutionContext,
stepNameToUpdate: previousAction?.name,
Expand Down
6 changes: 3 additions & 3 deletions packages/server/engine/src/lib/handler/piece-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { AUTHENTICATION_PROPERTY_NAME, EngineGenericError, ExecutionType, FlowAc
import type { ToolSet } from 'ai'
import dayjs from 'dayjs'
import { continueIfFailureHandler, runWithExponentialBackoff } from '../helper/error-handling'
import { flowRunProgressReporter } from '../helper/flow-run-progress-reporter'
import { pieceLoader } from '../helper/piece-loader'
import { createFileUploader } from '../piece-context/file-uploader'
import { createFlowsContext } from '../piece-context/flows'
Expand All @@ -14,7 +15,6 @@ import { propsProcessor } from '../variables/props-processor'
import { workerSocket } from '../worker-socket'
import { ActionHandler, BaseExecutor } from './base-executor'
import { EngineConstants } from './context/engine-constants'
import { runProgressService } from './run-progress'

const AP_PAUSED_FLOW_TIMEOUT_DAYS = Number(process.env.AP_PAUSED_FLOW_TIMEOUT_DAYS)

Expand Down Expand Up @@ -73,7 +73,7 @@ const executeAction: ActionHandler<PieceAction> = async ({ action, executionStat
tags: [],
},
}
const outputContext = runProgressService.createOutputContext({
const outputContext = flowRunProgressReporter.createOutputContext({
engineConstants: constants,
flowExecutorContext: executionState,
stepName: action.name,
Expand All @@ -82,7 +82,7 @@ const executeAction: ActionHandler<PieceAction> = async ({ action, executionStat

const isPaused = executionState.isPaused({ stepName: action.name })
if (!isPaused) {
await runProgressService.sendUpdate({
await flowRunProgressReporter.sendUpdate({
engineConstants: constants,
flowExecutorContext: executionState.upsertStep(action.name, stepOutput),
stepNameToUpdate: action.name,
Expand Down
Loading
Loading