From 788de6cfc7e0d6f291f9f4f51d7132cd1ab0ade3 Mon Sep 17 00:00:00 2001 From: Ram Narayan Balaji Date: Fri, 5 Jun 2026 19:13:23 +0530 Subject: [PATCH] fix(governance): supersede stale approval tasks at task-creation, not trigger time Event-based approval workflows mishandled sequential change events for the same entity, both rooted in WorkflowHandler.terminateDuplicateInstances. - Bug 1 (stale OM task on supersede): terminating a superseded run cleaned the Flowable side but never cancelled the backing task_entity, leaving an orphaned approval task. - Bug 2 (no-op event kills a valid approval): terminateDuplicateInstances ran in FilterEntityImpl at trigger time, before the run evaluated checkChangeDescription, so a no-op event that passed the entity filter destroyed a legitimate pending approval. Move the supersede from trigger time to task-creation time: - FilterEntityImpl: drop the eager terminateDuplicateInstances call and its async FK-race workaround. - CreateTask: when a new approval run materializes a task for an entity that already has an open Approval-category task from an earlier run of the same workflow, cancel the stale task synchronously and async-terminate the superseded Flowable process + mark its WorkflowInstance FAILED. Best-effort: cleanup failures never abort creation of the new task. - WorkflowHandler: replace the 7-day-scan terminateDuplicateInstances (and its FK-race / PG-deadlock workarounds) with a targeted terminateWorkflowInstance. Tests: unit coverage for the supersede predicate; integration tests for the no-op-survives and new-run-supersedes scenarios. --- .../tests/WorkflowDefinitionResourceIT.java | 354 ++++++++++++++++++ .../governance/workflows/WorkflowHandler.java | 176 +++------ .../elements/nodes/userTask/CreateTask.java | 83 ++++ .../triggers/impl/FilterEntityImpl.java | 39 +- .../nodes/userTask/CreateTaskTest.java | 69 ++++ 5 files changed, 554 insertions(+), 167 deletions(-) diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java index e02f2a803441..15464d1cb6f5 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java @@ -4,6 +4,7 @@ import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -10280,4 +10281,357 @@ void test_SelfApprovalPrevention(TestNamespace ns) throws Exception { LOG.info("test_SelfApprovalPrevention completed successfully"); } + + /** + * Bug 2: a no-op change event must not invalidate a pending approval. + * + *

CE1 changes a gated field (tag) → approval task created and parked. CE2 changes a different + * trigger field (domain) that passes the entity-level trigger filter but fails the workflow's + * checkChangeDescription rule, so it creates no task. The pending task must survive AND remain + * resolvable — resolving it is the discriminator: before the fix, CE2 eagerly terminated CE1's + * Flowable process at trigger time, leaving the OM task orphaned and unresolvable. + */ + @Test + void test_NoOpEventDoesNotInvalidatePendingApproval(TestNamespace ns) throws Exception { + LOG.info("Starting test_NoOpEventDoesNotInvalidatePendingApproval"); + OpenMetadataClient client = SdkClients.adminClient(); + ensureWorkflowEventConsumerIsActive(client); + String suffix = String.valueOf(System.currentTimeMillis()); + + SupersedeFixtures fx = createTagApprovalFixtures(client, ns, suffix, "noop"); + String schemaFqn = fx.schema.getFullyQualifiedName(); + + patchAddTag(client, fx.schema.getId(), "PII.Sensitive"); + Task originalTask = awaitSingleOpenApprovalTask(client, schemaFqn); + LOG.debug("CE1 created approval task {}", originalTask.getId()); + + patchAddDomain(client, fx.schema.getId(), fx.financeDomain); + + await("no-op event must leave the original approval task untouched") + .atMost(Duration.ofSeconds(45)) + .pollInterval(Duration.ofSeconds(2)) + .untilAsserted( + () -> { + ListResponse open = listOpenApprovalTasks(client, schemaFqn); + assertEquals( + 1, open.getData().size(), "Exactly one approval task should remain open"); + assertEquals( + originalTask.getId(), + open.getData().getFirst().getId(), + "The original pending approval task must be the one still open"); + }); + + OpenMetadataClient ownerClient = + SdkClients.createClient(fx.owner.getName(), fx.owner.getEmail(), new String[] {}); + ownerClient + .tasks() + .resolve( + originalTask.getId().toString(), + new org.openmetadata.schema.api.tasks.ResolveTask() + .withResolutionType(TaskResolutionType.Approved)); + + await("surviving task must resolve and drive the workflow to Approved") + .atMost(Duration.ofSeconds(45)) + .pollInterval(Duration.ofSeconds(2)) + .until(() -> hasApprovalTaskWithStatus(client, schemaFqn, TaskEntityStatus.Approved)); + + LOG.info("test_NoOpEventDoesNotInvalidatePendingApproval completed successfully"); + cleanupSupersedeFixtures(client, fx); + } + + /** + * Bug 1: a new approval run supersedes the stale pending task. + * + *

CE1 creates and parks an approval task. CE3 makes another gated change, producing a new + * approval run. When the new run materializes its task, the prior task must be Cancelled (no + * orphaned OM task) and exactly one new open task must remain. The new task must be resolvable, + * proving its Flowable process is live. + */ + @Test + void test_NewApprovalRunSupersedesStalePendingTask(TestNamespace ns) throws Exception { + LOG.info("Starting test_NewApprovalRunSupersedesStalePendingTask"); + OpenMetadataClient client = SdkClients.adminClient(); + ensureWorkflowEventConsumerIsActive(client); + String suffix = String.valueOf(System.currentTimeMillis()); + + SupersedeFixtures fx = createTagApprovalFixtures(client, ns, suffix, "supersede"); + String schemaFqn = fx.schema.getFullyQualifiedName(); + + patchAddTag(client, fx.schema.getId(), "PII.Sensitive"); + Task firstTask = awaitSingleOpenApprovalTask(client, schemaFqn); + LOG.debug("CE1 created approval task {}", firstTask.getId()); + + patchAddTag(client, fx.schema.getId(), "PII.None"); + + await("new approval run must supersede the prior task") + .atMost(Duration.ofSeconds(60)) + .pollInterval(Duration.ofSeconds(2)) + .untilAsserted( + () -> { + ListResponse open = listOpenApprovalTasks(client, schemaFqn); + assertEquals(1, open.getData().size(), "Exactly one approval task should be open"); + assertNotEquals( + firstTask.getId(), + open.getData().getFirst().getId(), + "A new approval task should have replaced the prior one"); + assertTrue( + hasApprovalTaskWithStatus(client, schemaFqn, TaskEntityStatus.Cancelled), + "The superseded approval task must be Cancelled, not orphaned"); + }); + + Task supersedingTask = listOpenApprovalTasks(client, schemaFqn).getData().getFirst(); + OpenMetadataClient ownerClient = + SdkClients.createClient(fx.owner.getName(), fx.owner.getEmail(), new String[] {}); + ownerClient + .tasks() + .resolve( + supersedingTask.getId().toString(), + new org.openmetadata.schema.api.tasks.ResolveTask() + .withResolutionType(TaskResolutionType.Approved)); + + await("superseding task must resolve and drive the workflow to Approved") + .atMost(Duration.ofSeconds(45)) + .pollInterval(Duration.ofSeconds(2)) + .until(() -> hasApprovalTaskWithStatus(client, schemaFqn, TaskEntityStatus.Approved)); + + LOG.info("test_NewApprovalRunSupersedesStalePendingTask completed successfully"); + cleanupSupersedeFixtures(client, fx); + } + + private static class SupersedeFixtures { + final User owner; + final DatabaseService dbService; + final Database database; + final DatabaseSchema schema; + final Domain financeDomain; + final WorkflowDefinition workflow; + + SupersedeFixtures( + User owner, + DatabaseService dbService, + Database database, + DatabaseSchema schema, + Domain financeDomain, + WorkflowDefinition workflow) { + this.owner = owner; + this.dbService = dbService; + this.database = database; + this.schema = schema; + this.financeDomain = financeDomain; + this.workflow = workflow; + } + } + + private SupersedeFixtures createTagApprovalFixtures( + OpenMetadataClient client, TestNamespace ns, String suffix, String label) throws Exception { + User owner = + client + .users() + .create( + new CreateUser() + .withName(label + "_owner_" + suffix) + .withEmail(label + "_owner_" + suffix + "@example.com") + .withDisplayName("Supersede Owner")); + + DatabaseService dbService = + client + .databaseServices() + .create( + new CreateDatabaseService() + .withName(ns.prefix(label + "-db-service")) + .withServiceType(DatabaseServiceType.Mysql) + .withConnection( + new DatabaseConnection() + .withConfig( + new MysqlConnection() + .withHostPort("localhost:3306") + .withUsername("test") + .withAuthType(new basicAuth().withPassword("test"))))); + + Database database = + client + .databases() + .create( + new CreateDatabase() + .withName(ns.prefix(label + "-database")) + .withService(dbService.getFullyQualifiedName())); + + DatabaseSchema schema = + client + .databaseSchemas() + .create( + new CreateDatabaseSchema() + .withName(ns.prefix(label + "-schema")) + .withDatabase(database.getFullyQualifiedName()) + .withOwners(List.of(owner.getEntityReference()))); + + Domain financeDomain = + client + .domains() + .create( + new CreateDomain() + .withName(ns.prefix(label + "-Finance")) + .withDescription("Finance domain for supersede testing") + .withDomainType(CreateDomain.DomainType.CONSUMER_ALIGNED)); + + String workflowName = "SupersedeApprovalWorkflow_" + label + "_" + suffix; + CreateWorkflowDefinition workflowRequest = + MAPPER.readValue(tagApprovalWorkflowJson(workflowName), CreateWorkflowDefinition.class); + WorkflowDefinition workflow = client.workflowDefinitions().create(workflowRequest); + trackWorkflow(workflowName, workflow.getId().toString()); + waitForWorkflowDeployment(client, workflowName); + + return new SupersedeFixtures(owner, dbService, database, schema, financeDomain, workflow); + } + + private String tagApprovalWorkflowJson(String workflowName) { + return """ + { + "name": "%s", + "displayName": "Supersede Approval Workflow", + "description": "Creates an approval task only when gated tags change", + "type": "eventBasedEntity", + "trigger": { + "type": "eventBasedEntity", + "config": { "events": ["Updated"], "entityTypes": ["databaseSchema"] }, + "output": ["relatedEntity", "updatedBy"] + }, + "nodes": [ + { "name": "start", "type": "startEvent", "subType": "startEvent" }, + { + "name": "checkChangeDesc", + "type": "automatedTask", + "subType": "checkChangeDescriptionTask", + "displayName": "Check Gated Tags Changed", + "config": { "condition": "OR", "rules": { "tags": ["PII.Sensitive", "PII.None"] } }, + "input": ["relatedEntity"], + "inputNamespaceMap": { "relatedEntity": "global" }, + "branches": ["true", "false"] + }, + { + "name": "userApproval", + "type": "userTask", + "subType": "userApprovalTask", + "displayName": "Approve Changes", + "input": ["relatedEntity"], + "output": ["updatedBy"], + "branches": ["true", "false"], + "config": { + "assignees": { "addReviewers": true, "addOwners": true, "candidates": [] }, + "approvalThreshold": 1, + "rejectionThreshold": 1 + }, + "inputNamespaceMap": { "relatedEntity": "global" } + }, + { + "name": "setApproved", + "type": "automatedTask", + "subType": "setEntityAttributeTask", + "displayName": "Set Status to Approved", + "config": { "fieldName": "status", "fieldValue": "Approved" }, + "input": ["relatedEntity", "updatedBy"], + "inputNamespaceMap": { "relatedEntity": "global", "updatedBy": "userApproval" } + }, + { + "name": "setRejected", + "type": "automatedTask", + "subType": "setEntityAttributeTask", + "displayName": "Set Status to Rejected", + "config": { "fieldName": "status", "fieldValue": "Rejected" }, + "input": ["relatedEntity", "updatedBy"], + "inputNamespaceMap": { "relatedEntity": "global", "updatedBy": "userApproval" } + }, + { + "name": "setDraft", + "type": "automatedTask", + "subType": "setEntityAttributeTask", + "displayName": "Set Status to Draft", + "config": { "fieldName": "status", "fieldValue": "Draft" }, + "input": ["relatedEntity", "updatedBy"], + "inputNamespaceMap": { "relatedEntity": "global", "updatedBy": "global" } + }, + { "name": "end", "type": "endEvent", "subType": "endEvent" } + ], + "edges": [ + { "from": "start", "to": "checkChangeDesc" }, + { "from": "checkChangeDesc", "to": "userApproval", "condition": "true" }, + { "from": "checkChangeDesc", "to": "setDraft", "condition": "false" }, + { "from": "userApproval", "to": "setApproved", "condition": "true" }, + { "from": "userApproval", "to": "setRejected", "condition": "false" }, + { "from": "setApproved", "to": "end" }, + { "from": "setRejected", "to": "end" }, + { "from": "setDraft", "to": "end" } + ] + } + """ + .formatted(workflowName); + } + + private void patchAddTag(OpenMetadataClient client, UUID schemaId, String tagFqn) + throws Exception { + String patch = + String.format( + "[{\"op\":\"add\",\"path\":\"/tags\",\"value\":[{\"tagFQN\":\"%s\"," + + "\"source\":\"Classification\",\"labelType\":\"Manual\",\"state\":\"Confirmed\"}]}]", + tagFqn); + client.databaseSchemas().patch(schemaId, MAPPER.readTree(patch)); + } + + private void patchAddDomain(OpenMetadataClient client, UUID schemaId, Domain domain) + throws Exception { + String patch = + String.format( + """ + [ + { + "op": "add", + "path": "/domains", + "value": [ { "id": "%s", "type": "domain", "name": "%s", "fullyQualifiedName": "%s" } ] + } + ] + """, + domain.getId(), domain.getName(), domain.getFullyQualifiedName()); + client.databaseSchemas().patch(schemaId, MAPPER.readTree(patch)); + } + + private Task awaitSingleOpenApprovalTask(OpenMetadataClient client, String schemaFqn) { + await("approval task to be created for " + schemaFqn) + .atMost(Duration.ofSeconds(60)) + .pollInterval(Duration.ofSeconds(2)) + .ignoreExceptions() + .until(() -> !listOpenApprovalTasks(client, schemaFqn).getData().isEmpty()); + try { + return listOpenApprovalTasks(client, schemaFqn).getData().getFirst(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private boolean hasApprovalTaskWithStatus( + OpenMetadataClient client, String schemaFqn, TaskEntityStatus status) { + boolean found; + try { + Map filters = new HashMap<>(); + filters.put("status", status.value()); + filters.put("category", TaskCategory.Approval.value()); + filters.put("aboutEntity", schemaFqn); + found = !client.tasks().listWithFilters(filters).getData().isEmpty(); + } catch (Exception e) { + found = false; + } + return found; + } + + private void cleanupSupersedeFixtures(OpenMetadataClient client, SupersedeFixtures fx) { + try { + client.workflowDefinitions().delete(fx.workflow.getId().toString()); + client.databaseSchemas().delete(fx.schema.getId().toString()); + client.databases().delete(fx.database.getId().toString()); + client.databaseServices().delete(fx.dbService.getId().toString()); + client.domains().delete(fx.financeDomain.getId().toString()); + client.users().delete(fx.owner.getId().toString()); + } catch (Exception e) { + LOG.warn("Cleanup error: {}", e.getMessage()); + } + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java index bbf1f88a816b..98b9847e5ad9 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java @@ -40,7 +40,6 @@ import org.flowable.engine.runtime.ProcessInstance; import org.flowable.job.api.Job; import org.flowable.task.api.Task; -import org.jdbi.v3.core.transaction.TransactionIsolationLevel; import org.openmetadata.schema.configuration.WorkflowSettings; import org.openmetadata.schema.entity.teams.User; import org.openmetadata.schema.governance.workflows.WorkflowDefinition; @@ -49,7 +48,6 @@ import org.openmetadata.schema.type.EntityReference; import org.openmetadata.schema.type.Include; import org.openmetadata.schema.utils.JsonUtils; -import org.openmetadata.schema.utils.ResultList; import org.openmetadata.service.Entity; import org.openmetadata.service.OpenMetadataApplicationConfig; import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory; @@ -59,7 +57,6 @@ import org.openmetadata.service.governance.workflows.flowable.sql.UnlockJobSql; import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.DeadlockRetry; -import org.openmetadata.service.jdbi3.ListFilter; import org.openmetadata.service.jdbi3.SystemRepository; import org.openmetadata.service.jdbi3.TaskRepository; import org.openmetadata.service.jdbi3.WorkflowDefinitionRepository; @@ -67,7 +64,6 @@ import org.openmetadata.service.jdbi3.WorkflowInstanceStateRepository; import org.openmetadata.service.jdbi3.locator.ConnectionType; import org.openmetadata.service.resources.services.ingestionpipelines.IngestionPipelineMapper; -import org.openmetadata.service.util.EntityUtil; @Slf4j public class WorkflowHandler { @@ -1621,143 +1617,55 @@ public void terminateWorkflow(String workflowName) { instance.getId(), "Terminating all instances due to user request.")); } - public void terminateDuplicateInstances( - String mainWorkflowDefinitionName, String entityLink, String currentProcessInstanceId) { + /** + * Terminates a single, specific workflow instance that has been superseded by a newer run. + * + *

Invoked from {@code CreateTask} when a fresh approval run produces a new task for an entity + * that already had an open approval task from an earlier run. Only the superseded run's main + * process instance (matched by business key = workflow instance id and the main process + * definition key) is deleted, and the corresponding {@link WorkflowInstance} record is marked + * FAILED for audit. Failures are swallowed: this is best-effort cleanup of an orphaned Flowable + * process and must never propagate into — or roll back — the caller's transaction. + */ + public void terminateWorkflowInstance( + UUID workflowInstanceId, String mainWorkflowName, String reason) { try { - WorkflowInstanceRepository workflowInstanceRepository = - (WorkflowInstanceRepository) - Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE); - WorkflowInstanceStateRepository workflowInstanceStateRepository = - (WorkflowInstanceStateRepository) - Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE_STATE); - - ListFilter filter = new ListFilter(null); - filter.addQueryParam("entityLink", entityLink); - - long endTs = System.currentTimeMillis(); - long startTs = endTs - (7L * 24 * 60 * 60 * 1000); - - ResultList allInstances = - workflowInstanceRepository.list(null, startTs, endTs, 100, filter, false); - - List candidateInstances = - allInstances.getData().stream() - .filter( - instance -> WorkflowInstance.WorkflowStatus.RUNNING.equals(instance.getStatus())) - .filter( - instance -> { - try { - WorkflowDefinitionRepository repo = - (WorkflowDefinitionRepository) - Entity.getEntityRepository(Entity.WORKFLOW_DEFINITION); - var def = - repo.get( - null, - instance.getWorkflowDefinitionId(), - EntityUtil.Fields.EMPTY_FIELDS); - return mainWorkflowDefinitionName.equals(def.getName()); - } catch (Exception e) { - return false; - } - }) - .toList(); - - RuntimeService runtimeService = getInstance().getRuntimeService(); - List runningProcessInstances = - runtimeService + ProcessInstance mainInstance = + getRuntimeService() .createProcessInstanceQuery() - .processDefinitionKey(mainWorkflowDefinitionName) - .list(); - - List conflictingInstances = - candidateInstances.stream() - .filter( - instance -> { - return runningProcessInstances.stream() - .filter(pi -> !pi.getId().equals(currentProcessInstanceId)) - .anyMatch(pi -> pi.getBusinessKey().equals(instance.getId().toString())); - }) - .toList(); - - if (conflictingInstances.isEmpty()) { - LOG.debug("No conflicting instances found to terminate for {}", mainWorkflowDefinitionName); - return; - } - - // Terminate Flowable process instances OUTSIDE any JDBI transaction. - // Calling runtimeService.deleteProcessInstance() inside a JDBI transaction causes a race - // condition: the uncommitted DELETE on ACT_RU_EXECUTION holds an X-lock, Flowable's async - // job executor concurrently tries to INSERT a timer job referencing that execution (FK - // S-lock wait), and when the JDBI tx commits the execution is gone, so the timer INSERT - // fails with SQLIntegrityConstraintViolationException. - for (WorkflowInstance instance : conflictingInstances) { - ProcessInstance mainInstance = - runningProcessInstances.stream() - .filter( - pi -> - pi.getBusinessKey() != null - && pi.getBusinessKey().equals(instance.getId().toString())) - .findFirst() - .orElse(null); - - if (mainInstance != null) { - String processId = mainInstance.getId(); - long activeUserTasks = - processEngine - .getTaskService() - .createTaskQuery() - .processInstanceId(processId) - .active() - .count(); - if (activeUserTasks == 0) { - LOG.debug( - "Process instance {} has no active user tasks — it is auto-completing; skipping external deletion", - processId); - continue; - } - LOG.info( - "Terminating main workflow instance {} for conflicting instance {}", - mainInstance.getId(), - instance.getId()); - try { - runtimeService.deleteProcessInstance( - processId, "Terminated due to conflicting workflow instance"); - } catch (FlowableObjectNotFoundException e) { - LOG.debug( - "Process instance {} already completed before termination, skipping", processId); - } - } + .processInstanceBusinessKey(workflowInstanceId.toString()) + .processDefinitionKey(mainWorkflowName) + .singleResult(); + if (mainInstance != null) { + deleteProcessInstanceQuietly(mainInstance.getId(), reason); } - - Entity.getJdbi() - .inTransaction( - TransactionIsolationLevel.READ_COMMITTED, - handle -> { - try { - for (WorkflowInstance instance : conflictingInstances) { - workflowInstanceStateRepository.markInstanceStatesAsFailed( - instance.getId(), "Terminated due to conflicting workflow instance"); - workflowInstanceRepository.markInstanceAsFailed( - instance.getId(), "Terminated due to conflicting workflow instance"); - } - return null; - } catch (Exception e) { - LOG.error("Failed to update instance states in transaction: {}", e.getMessage()); - throw e; - } - }); - - LOG.info( - "Terminated {} conflicting instances of {} for entity {}", - conflictingInstances.size(), - mainWorkflowDefinitionName, - entityLink); - + markWorkflowInstanceFailed(workflowInstanceId, reason); } catch (Exception e) { - LOG.warn("Failed to terminate conflicting instances: {}", e.getMessage()); + LOG.warn( + "Failed to terminate superseded workflow instance {}: {}", + workflowInstanceId, + e.getMessage()); + } + } + + private void deleteProcessInstanceQuietly(String processInstanceId, String reason) { + try { + getRuntimeService().deleteProcessInstance(processInstanceId, reason); + } catch (FlowableObjectNotFoundException e) { + LOG.debug("Process instance {} already completed before termination", processInstanceId); } } + private void markWorkflowInstanceFailed(UUID workflowInstanceId, String reason) { + WorkflowInstanceStateRepository workflowInstanceStateRepository = + (WorkflowInstanceStateRepository) + Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE_STATE); + WorkflowInstanceRepository workflowInstanceRepository = + (WorkflowInstanceRepository) Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE); + workflowInstanceStateRepository.markInstanceStatesAsFailed(workflowInstanceId, reason); + workflowInstanceRepository.markInstanceAsFailed(workflowInstanceId, reason); + } + public RuntimeService getRuntimeService() { return processEngine.getRuntimeService(); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/CreateTask.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/CreateTask.java index 670e3703503d..f9f66f759779 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/CreateTask.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/CreateTask.java @@ -34,6 +34,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -88,6 +89,8 @@ public class CreateTask implements TaskListener { static final String PENDING_WORKFLOW_START_STAGE_ID = "pending-workflow-start"; private static final String DEFAULT_SYSTEM_USER = "admin"; + private static final String SUPERSEDED_BY_NEWER_RUN = + "Superseded by a newer approval workflow run for the same entity"; private static final int WORKFLOW_MANAGED_DRAFT_LOOKUP_MAX_ATTEMPTS = 6; private static final long INITIAL_WORKFLOW_MANAGED_DRAFT_LOOKUP_DELAY_MILLIS = 25L; private static final long MAX_WORKFLOW_MANAGED_DRAFT_LOOKUP_DELAY_MILLIS = 250L; @@ -585,6 +588,19 @@ private Task createOrUpdateTask( new com.fasterxml.jackson.core.type.TypeReference>() {})); } + // A genuinely new approval run is materializing a task for this entity. Supersede any approval + // task still open from an earlier run of the same workflow before creating the new one, so a + // single live approval is maintained and the earlier run's orphaned Flowable process is cleaned + // up. Cancel-before-create keeps the "exactly one open" invariant. + supersedePriorApprovalTask( + delegateTask, + taskRepository, + entity, + taskCategory, + resolvedWorkflowDefinitionId, + workflowInstanceId, + updatedBy); + // Use the repository to create (handles taskId generation, FQN, relationships) task = taskRepository.create(null, task); @@ -608,6 +624,73 @@ private Task createOrUpdateTask( return task; } + private void supersedePriorApprovalTask( + DelegateTask delegateTask, + TaskRepository taskRepository, + EntityInterface entity, + TaskCategory taskCategory, + UUID currentWorkflowDefinitionId, + UUID currentWorkflowInstanceId, + String updatedBy) { + // Best-effort cleanup: failing to supersede the prior task must never abort creation of the new + // approval task, so all exceptions are contained here instead of bubbling up as a BpmnError. + try { + Task prior = + taskCategory == TaskCategory.Approval + ? taskRepository.findOpenTaskByEntityAndCategory( + entity.getFullyQualifiedName(), taskCategory) + : null; + if (isSupersedablePriorApprovalTask( + prior, currentWorkflowDefinitionId, currentWorkflowInstanceId)) { + cancelAndTerminatePriorApproval(delegateTask, taskRepository, prior, updatedBy); + } + } catch (Exception e) { + LOG.warn( + "[CreateTask] Failed to supersede prior approval task for entity '{}': {}", + entity.getFullyQualifiedName(), + e.getMessage()); + } + } + + static boolean isSupersedablePriorApprovalTask( + Task prior, UUID currentWorkflowDefinitionId, UUID currentWorkflowInstanceId) { + return prior != null + && prior.getWorkflowInstanceId() != null + && !prior.getWorkflowInstanceId().equals(currentWorkflowInstanceId) + && currentWorkflowDefinitionId != null + && currentWorkflowDefinitionId.equals(prior.getWorkflowDefinitionId()); + } + + private void cancelAndTerminatePriorApproval( + DelegateTask delegateTask, TaskRepository taskRepository, Task prior, String updatedBy) { + LOG.info( + "[CreateTask] Superseding prior approval task '{}' (workflowInstance '{}') with a newer run", + prior.getId(), + prior.getWorkflowInstanceId()); + taskRepository.closeTask(prior, updatedBy, SUPERSEDED_BY_NEWER_RUN); + dispatchPriorInstanceTermination(delegateTask, prior.getWorkflowInstanceId()); + } + + private void dispatchPriorInstanceTermination(DelegateTask delegateTask, UUID priorInstanceId) { + String mainWorkflowName = inferWorkflowDefinitionRef(delegateTask); + // Dispatched async on purpose: deleteProcessInstance() then runs in its own Flowable command + // context, so a DB-level failure terminating the superseded process can never poison — or roll + // back — the current run's transaction, which has just created the replacement task. + CompletableFuture.runAsync( + () -> + WorkflowHandler.getInstance() + .terminateWorkflowInstance( + priorInstanceId, mainWorkflowName, SUPERSEDED_BY_NEWER_RUN)) + .exceptionally( + ex -> { + LOG.error( + "[CreateTask] Failed to terminate superseded workflow instance '{}'", + priorInstanceId, + ex); + return null; + }); + } + static List resolveExistingTaskAssignees( Task existingTask, List workflowAssignees, diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FilterEntityImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FilterEntityImpl.java index 928beb6050a8..355e53e60965 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FilterEntityImpl.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FilterEntityImpl.java @@ -10,7 +10,6 @@ import java.util.List; import java.util.Optional; import java.util.UUID; -import java.util.concurrent.CompletableFuture; import org.flowable.common.engine.api.delegate.Expression; import org.flowable.engine.delegate.DelegateExecution; import org.flowable.engine.delegate.JavaDelegate; @@ -25,7 +24,6 @@ import org.openmetadata.service.exception.EntityNotFoundException; import org.openmetadata.service.governance.workflows.WorkflowHandler; import org.openmetadata.service.governance.workflows.WorkflowVariableHandler; -import org.openmetadata.service.governance.workflows.elements.TriggerFactory; import org.openmetadata.service.jdbi3.RecognizerFeedbackRepository; import org.openmetadata.service.resources.feeds.MessageParser; import org.openmetadata.service.rules.RuleEngine; @@ -74,37 +72,12 @@ public void execute(DelegateExecution execution) { passesExcludedFilter(entityLinkStr, excludedFilter, includeFields, filterLogic); } - if (passesFilter) { - String triggerWorkflowDefinitionKey = - WorkflowHandler.getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()); - String mainWorkflowDefinitionName = - TriggerFactory.getMainWorkflowDefinitionNameFromTrigger(triggerWorkflowDefinitionKey); - String currentProcessInstanceId = execution.getProcessInstanceId(); - // Terminate duplicate instances asynchronously to prevent a MySQL FK violation. - // This JavaDelegate runs inside Flowable's signalEventReceived command context (TX_A). - // Calling deleteProcessInstance() from within TX_A reuses the same DB transaction; the - // uncommitted execution DELETE holds an X-lock, and Flowable's job executor may try to - // INSERT a timer job referencing that execution (FK wait), causing a constraint violation - // when TX_A commits. Running in a separate thread gives terminateDuplicateInstances its - // own Flowable command context and independent DB transaction, avoiding that FK issue. - // The deadlock (PostgreSQL lock-order reversal between deleteProcessInstance and a - // concurrently auto-completing process) is prevented inside terminateDuplicateInstances - // by skipping deletion for processes that have no active user tasks. - final String workflowName = mainWorkflowDefinitionName; - final String entityLinkStrFinal = entityLinkStr; - final String processInstanceId = currentProcessInstanceId; - CompletableFuture.runAsync( - () -> - WorkflowHandler.getInstance() - .terminateDuplicateInstances( - workflowName, entityLinkStrFinal, processInstanceId)) - .exceptionally( - ex -> { - log.error("Async termination of duplicate instances failed", ex); - return null; - }); - } - + // Duplicate-instance supersede is intentionally NOT done here. Deciding "the new event + // supersedes the old" at trigger time is too early: this filter runs before the workflow + // evaluates checkChangeDescription/checkEntityAttributes, so a no-op event that passes the + // entity filter but creates no task would still kill a valid pending approval. The supersede + // now happens at task-creation time in CreateTask, where the run has genuinely produced a new + // approval task. See CreateTask#supersedePriorApprovalTask. String workflowKey = WorkflowHandler.getProcessDefinitionKeyFromId(execution.getProcessDefinitionId()); log.debug("Trigger {} - Entity {} passes filter: {}", workflowKey, entityLinkStr, passesFilter); diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/CreateTaskTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/CreateTaskTest.java index 96214bc45a12..1cae5cf3e7dd 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/CreateTaskTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/CreateTaskTest.java @@ -585,4 +585,73 @@ void testParseMillisFromIso8601DurationReturnsFallbackForInvalidInput() { void testParseMillisFromIso8601DurationReturnsFallbackForNullFallback() { assertNull(CreateTask.parseMillisFromIso8601Duration("garbage", null)); } + + // ---- isSupersedablePriorApprovalTask ---- + + @Test + void testIsSupersedableWhenPriorBelongsToEarlierRunOfSameWorkflow() { + UUID workflowDefinitionId = UUID.randomUUID(); + Task prior = + new Task() + .withId(UUID.randomUUID()) + .withWorkflowDefinitionId(workflowDefinitionId) + .withWorkflowInstanceId(UUID.randomUUID()); + + assertTrue( + CreateTask.isSupersedablePriorApprovalTask(prior, workflowDefinitionId, UUID.randomUUID())); + } + + @Test + void testIsNotSupersedableWhenNoPriorTaskExists() { + assertFalse( + CreateTask.isSupersedablePriorApprovalTask(null, UUID.randomUUID(), UUID.randomUUID())); + } + + @Test + void testIsNotSupersedableWhenPriorHasNoWorkflowInstance() { + UUID workflowDefinitionId = UUID.randomUUID(); + Task prior = + new Task().withId(UUID.randomUUID()).withWorkflowDefinitionId(workflowDefinitionId); + + assertFalse( + CreateTask.isSupersedablePriorApprovalTask(prior, workflowDefinitionId, UUID.randomUUID())); + } + + @Test + void testIsNotSupersedableWhenPriorIsTheSameRun() { + UUID workflowDefinitionId = UUID.randomUUID(); + UUID workflowInstanceId = UUID.randomUUID(); + Task prior = + new Task() + .withId(UUID.randomUUID()) + .withWorkflowDefinitionId(workflowDefinitionId) + .withWorkflowInstanceId(workflowInstanceId); + + assertFalse( + CreateTask.isSupersedablePriorApprovalTask( + prior, workflowDefinitionId, workflowInstanceId)); + } + + @Test + void testIsNotSupersedableAcrossDifferentWorkflowDefinitions() { + Task prior = + new Task() + .withId(UUID.randomUUID()) + .withWorkflowDefinitionId(UUID.randomUUID()) + .withWorkflowInstanceId(UUID.randomUUID()); + + assertFalse( + CreateTask.isSupersedablePriorApprovalTask(prior, UUID.randomUUID(), UUID.randomUUID())); + } + + @Test + void testIsNotSupersedableWhenCurrentWorkflowDefinitionUnknown() { + Task prior = + new Task() + .withId(UUID.randomUUID()) + .withWorkflowDefinitionId(UUID.randomUUID()) + .withWorkflowInstanceId(UUID.randomUUID()); + + assertFalse(CreateTask.isSupersedablePriorApprovalTask(prior, null, UUID.randomUUID())); + } }