Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/engine/src/lib/services/progress.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ const sendUpdateRunRequest = async (flowRunId: FlowRunId): Promise<void> => {
workerHandlerId: engineConstants.serverHandlerId ?? null,
runDetails: await flowExecutorContext.toResponse(),
progressUpdateType: engineConstants.progressUpdateType,
flowId: engineConstants.flowId,
};

logger.debug(
Expand Down
23 changes: 23 additions & 0 deletions packages/engine/src/lib/workflow-deletion-validator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { logger, wasWorkflowDeletionRequested } from '@openops/server-shared';
import { FlowId } from '@openops/shared';

const defaultMessage =
'Workflow deletion was requested. Stopping workflow executions.';

export class WorkflowDeletionRequestedError extends Error {
constructor(message = defaultMessage) {
super(message);
this.name = 'WorkflowDeletionRequestedError';
}
}

export async function throwIfWorkflowDeletionRequested(
flowId: FlowId,
): Promise<void> {
const wasDeletionRequested = await wasWorkflowDeletionRequested(flowId);

if (wasDeletionRequested) {
logger.info(defaultMessage);
throw new WorkflowDeletionRequestedError();
}
}
1 change: 1 addition & 0 deletions packages/server/shared/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,4 @@ export * from './lib/system';
export * from './lib/truncation-utils';
export * from './lib/webhook-secrets-util';
export * from './lib/workflow-cancellation-manager';
export * from './lib/workflow-deletion-manager';
25 changes: 25 additions & 0 deletions packages/server/shared/src/lib/workflow-deletion-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { FlowId } from '@openops/shared';
import { cacheWrapper } from './cache/cache-wrapper';

const DELETED_PREFIX = 'workflow:deleted:';

function buildCancellationKey(flowId: FlowId): string {
return `${DELETED_PREFIX}${flowId}`;
}

export async function signalWorkflowDeletion(flowId: FlowId): Promise<void> {
const key = buildCancellationKey(flowId);
await cacheWrapper.setKey(key, 'true');
}

export async function wasWorkflowDeletionRequested(
flowId: FlowId,
): Promise<boolean> {
const key = buildCancellationKey(flowId);
const value: string | null = await cacheWrapper.getKey(key);
if (value === null) {
return false;
}

return value === 'true';
}
25 changes: 16 additions & 9 deletions packages/server/worker/src/lib/executors/flow-job-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,24 +88,25 @@ async function executeFlow(
timeout: (flowTimeoutSandbox + 3) * 1000, // Engine timeout plus 3 more seconds
});

const flow = await engineApiService(engineToken).getFlowWithExactBlocks({
versionId: jobData.flowVersionId,
type: GetFlowVersionForWorkerRequestType.EXACT,
});

if (isNil(flow)) {
return;
}

let jobStatus: JobStatus | undefined;
let jobFinalMessage: string | undefined;
try {
const flow = await engineApiService(engineToken).getFlowWithExactBlocks({
versionId: jobData.flowVersionId,
type: GetFlowVersionForWorkerRequestType.EXACT,
});

if (isNil(flow)) {
return;
}

if (jobData.executionType === ExecutionType.BEGIN) {
await setFirstRunningState(
jobData,
engineToken,
flow.version.trigger.name,
flow.version.trigger.type,
flow.id,
);
}

Expand Down Expand Up @@ -142,6 +143,7 @@ async function executeFlow(
await updateRunWithError(
jobData,
engineToken,
flow.id,
failedRunStatus,
terminationReason,
);
Expand Down Expand Up @@ -173,6 +175,7 @@ async function executeFlow(
await updateRunWithError(
jobData,
engineToken,
flow.id,
failedRunStatus,
terminationReason,
);
Expand All @@ -194,6 +197,7 @@ async function setFirstRunningState(
engineToken: string,
triggerName: string,
triggerType: string,
flowId: string,
): Promise<void> {
await engineApiService(engineToken).updateRunStatus({
runDetails: {
Expand All @@ -213,12 +217,14 @@ async function setFirstRunningState(
progressUpdateType: jobData.progressUpdateType,
workerHandlerId: jobData.synchronousHandlerId,
runId: jobData.runId,
flowId,
});
}

async function updateRunWithError(
jobData: OneTimeJobData,
engineToken: string,
flowId: string,
status:
| FlowRunStatus.TIMEOUT
| FlowRunStatus.STOPPED
Expand All @@ -238,6 +244,7 @@ async function updateRunWithError(
progressUpdateType: jobData.progressUpdateType,
workerHandlerId: jobData.synchronousHandlerId,
runId: jobData.runId,
flowId,
});
}

Expand Down
1 change: 1 addition & 0 deletions packages/shared/src/lib/engine/requests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export const UpdateRunProgressRequest = Type.Object({
runId: Type.String(),
progressUpdateType: Type.Optional(Type.Enum(ProgressUpdateType)),
workerHandlerId: Nullable(Type.String()),
flowId: Type.String(),
});

export type UpdateRunProgressRequest = Static<typeof UpdateRunProgressRequest>;
Expand Down
Loading