diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java
index e02f2a803441..15464d1cb6f5 100644
--- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java
+++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java
@@ -4,6 +4,7 @@
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -10280,4 +10281,357 @@ void test_SelfApprovalPrevention(TestNamespace ns) throws Exception {
LOG.info("test_SelfApprovalPrevention completed successfully");
}
+
+ /**
+ * Bug 2: a no-op change event must not invalidate a pending approval.
+ *
+ *
CE1 changes a gated field (tag) → approval task created and parked. CE2 changes a different
+ * trigger field (domain) that passes the entity-level trigger filter but fails the workflow's
+ * checkChangeDescription rule, so it creates no task. The pending task must survive AND remain
+ * resolvable — resolving it is the discriminator: before the fix, CE2 eagerly terminated CE1's
+ * Flowable process at trigger time, leaving the OM task orphaned and unresolvable.
+ */
+ @Test
+ void test_NoOpEventDoesNotInvalidatePendingApproval(TestNamespace ns) throws Exception {
+ LOG.info("Starting test_NoOpEventDoesNotInvalidatePendingApproval");
+ OpenMetadataClient client = SdkClients.adminClient();
+ ensureWorkflowEventConsumerIsActive(client);
+ String suffix = String.valueOf(System.currentTimeMillis());
+
+ SupersedeFixtures fx = createTagApprovalFixtures(client, ns, suffix, "noop");
+ String schemaFqn = fx.schema.getFullyQualifiedName();
+
+ patchAddTag(client, fx.schema.getId(), "PII.Sensitive");
+ Task originalTask = awaitSingleOpenApprovalTask(client, schemaFqn);
+ LOG.debug("CE1 created approval task {}", originalTask.getId());
+
+ patchAddDomain(client, fx.schema.getId(), fx.financeDomain);
+
+ await("no-op event must leave the original approval task untouched")
+ .atMost(Duration.ofSeconds(45))
+ .pollInterval(Duration.ofSeconds(2))
+ .untilAsserted(
+ () -> {
+ ListResponse open = listOpenApprovalTasks(client, schemaFqn);
+ assertEquals(
+ 1, open.getData().size(), "Exactly one approval task should remain open");
+ assertEquals(
+ originalTask.getId(),
+ open.getData().getFirst().getId(),
+ "The original pending approval task must be the one still open");
+ });
+
+ OpenMetadataClient ownerClient =
+ SdkClients.createClient(fx.owner.getName(), fx.owner.getEmail(), new String[] {});
+ ownerClient
+ .tasks()
+ .resolve(
+ originalTask.getId().toString(),
+ new org.openmetadata.schema.api.tasks.ResolveTask()
+ .withResolutionType(TaskResolutionType.Approved));
+
+ await("surviving task must resolve and drive the workflow to Approved")
+ .atMost(Duration.ofSeconds(45))
+ .pollInterval(Duration.ofSeconds(2))
+ .until(() -> hasApprovalTaskWithStatus(client, schemaFqn, TaskEntityStatus.Approved));
+
+ LOG.info("test_NoOpEventDoesNotInvalidatePendingApproval completed successfully");
+ cleanupSupersedeFixtures(client, fx);
+ }
+
+ /**
+ * Bug 1: a new approval run supersedes the stale pending task.
+ *
+ * CE1 creates and parks an approval task. CE3 makes another gated change, producing a new
+ * approval run. When the new run materializes its task, the prior task must be Cancelled (no
+ * orphaned OM task) and exactly one new open task must remain. The new task must be resolvable,
+ * proving its Flowable process is live.
+ */
+ @Test
+ void test_NewApprovalRunSupersedesStalePendingTask(TestNamespace ns) throws Exception {
+ LOG.info("Starting test_NewApprovalRunSupersedesStalePendingTask");
+ OpenMetadataClient client = SdkClients.adminClient();
+ ensureWorkflowEventConsumerIsActive(client);
+ String suffix = String.valueOf(System.currentTimeMillis());
+
+ SupersedeFixtures fx = createTagApprovalFixtures(client, ns, suffix, "supersede");
+ String schemaFqn = fx.schema.getFullyQualifiedName();
+
+ patchAddTag(client, fx.schema.getId(), "PII.Sensitive");
+ Task firstTask = awaitSingleOpenApprovalTask(client, schemaFqn);
+ LOG.debug("CE1 created approval task {}", firstTask.getId());
+
+ patchAddTag(client, fx.schema.getId(), "PII.None");
+
+ await("new approval run must supersede the prior task")
+ .atMost(Duration.ofSeconds(60))
+ .pollInterval(Duration.ofSeconds(2))
+ .untilAsserted(
+ () -> {
+ ListResponse open = listOpenApprovalTasks(client, schemaFqn);
+ assertEquals(1, open.getData().size(), "Exactly one approval task should be open");
+ assertNotEquals(
+ firstTask.getId(),
+ open.getData().getFirst().getId(),
+ "A new approval task should have replaced the prior one");
+ assertTrue(
+ hasApprovalTaskWithStatus(client, schemaFqn, TaskEntityStatus.Cancelled),
+ "The superseded approval task must be Cancelled, not orphaned");
+ });
+
+ Task supersedingTask = listOpenApprovalTasks(client, schemaFqn).getData().getFirst();
+ OpenMetadataClient ownerClient =
+ SdkClients.createClient(fx.owner.getName(), fx.owner.getEmail(), new String[] {});
+ ownerClient
+ .tasks()
+ .resolve(
+ supersedingTask.getId().toString(),
+ new org.openmetadata.schema.api.tasks.ResolveTask()
+ .withResolutionType(TaskResolutionType.Approved));
+
+ await("superseding task must resolve and drive the workflow to Approved")
+ .atMost(Duration.ofSeconds(45))
+ .pollInterval(Duration.ofSeconds(2))
+ .until(() -> hasApprovalTaskWithStatus(client, schemaFqn, TaskEntityStatus.Approved));
+
+ LOG.info("test_NewApprovalRunSupersedesStalePendingTask completed successfully");
+ cleanupSupersedeFixtures(client, fx);
+ }
+
+ private static class SupersedeFixtures {
+ final User owner;
+ final DatabaseService dbService;
+ final Database database;
+ final DatabaseSchema schema;
+ final Domain financeDomain;
+ final WorkflowDefinition workflow;
+
+ SupersedeFixtures(
+ User owner,
+ DatabaseService dbService,
+ Database database,
+ DatabaseSchema schema,
+ Domain financeDomain,
+ WorkflowDefinition workflow) {
+ this.owner = owner;
+ this.dbService = dbService;
+ this.database = database;
+ this.schema = schema;
+ this.financeDomain = financeDomain;
+ this.workflow = workflow;
+ }
+ }
+
+ private SupersedeFixtures createTagApprovalFixtures(
+ OpenMetadataClient client, TestNamespace ns, String suffix, String label) throws Exception {
+ User owner =
+ client
+ .users()
+ .create(
+ new CreateUser()
+ .withName(label + "_owner_" + suffix)
+ .withEmail(label + "_owner_" + suffix + "@example.com")
+ .withDisplayName("Supersede Owner"));
+
+ DatabaseService dbService =
+ client
+ .databaseServices()
+ .create(
+ new CreateDatabaseService()
+ .withName(ns.prefix(label + "-db-service"))
+ .withServiceType(DatabaseServiceType.Mysql)
+ .withConnection(
+ new DatabaseConnection()
+ .withConfig(
+ new MysqlConnection()
+ .withHostPort("localhost:3306")
+ .withUsername("test")
+ .withAuthType(new basicAuth().withPassword("test")))));
+
+ Database database =
+ client
+ .databases()
+ .create(
+ new CreateDatabase()
+ .withName(ns.prefix(label + "-database"))
+ .withService(dbService.getFullyQualifiedName()));
+
+ DatabaseSchema schema =
+ client
+ .databaseSchemas()
+ .create(
+ new CreateDatabaseSchema()
+ .withName(ns.prefix(label + "-schema"))
+ .withDatabase(database.getFullyQualifiedName())
+ .withOwners(List.of(owner.getEntityReference())));
+
+ Domain financeDomain =
+ client
+ .domains()
+ .create(
+ new CreateDomain()
+ .withName(ns.prefix(label + "-Finance"))
+ .withDescription("Finance domain for supersede testing")
+ .withDomainType(CreateDomain.DomainType.CONSUMER_ALIGNED));
+
+ String workflowName = "SupersedeApprovalWorkflow_" + label + "_" + suffix;
+ CreateWorkflowDefinition workflowRequest =
+ MAPPER.readValue(tagApprovalWorkflowJson(workflowName), CreateWorkflowDefinition.class);
+ WorkflowDefinition workflow = client.workflowDefinitions().create(workflowRequest);
+ trackWorkflow(workflowName, workflow.getId().toString());
+ waitForWorkflowDeployment(client, workflowName);
+
+ return new SupersedeFixtures(owner, dbService, database, schema, financeDomain, workflow);
+ }
+
+ private String tagApprovalWorkflowJson(String workflowName) {
+ return """
+ {
+ "name": "%s",
+ "displayName": "Supersede Approval Workflow",
+ "description": "Creates an approval task only when gated tags change",
+ "type": "eventBasedEntity",
+ "trigger": {
+ "type": "eventBasedEntity",
+ "config": { "events": ["Updated"], "entityTypes": ["databaseSchema"] },
+ "output": ["relatedEntity", "updatedBy"]
+ },
+ "nodes": [
+ { "name": "start", "type": "startEvent", "subType": "startEvent" },
+ {
+ "name": "checkChangeDesc",
+ "type": "automatedTask",
+ "subType": "checkChangeDescriptionTask",
+ "displayName": "Check Gated Tags Changed",
+ "config": { "condition": "OR", "rules": { "tags": ["PII.Sensitive", "PII.None"] } },
+ "input": ["relatedEntity"],
+ "inputNamespaceMap": { "relatedEntity": "global" },
+ "branches": ["true", "false"]
+ },
+ {
+ "name": "userApproval",
+ "type": "userTask",
+ "subType": "userApprovalTask",
+ "displayName": "Approve Changes",
+ "input": ["relatedEntity"],
+ "output": ["updatedBy"],
+ "branches": ["true", "false"],
+ "config": {
+ "assignees": { "addReviewers": true, "addOwners": true, "candidates": [] },
+ "approvalThreshold": 1,
+ "rejectionThreshold": 1
+ },
+ "inputNamespaceMap": { "relatedEntity": "global" }
+ },
+ {
+ "name": "setApproved",
+ "type": "automatedTask",
+ "subType": "setEntityAttributeTask",
+ "displayName": "Set Status to Approved",
+ "config": { "fieldName": "status", "fieldValue": "Approved" },
+ "input": ["relatedEntity", "updatedBy"],
+ "inputNamespaceMap": { "relatedEntity": "global", "updatedBy": "userApproval" }
+ },
+ {
+ "name": "setRejected",
+ "type": "automatedTask",
+ "subType": "setEntityAttributeTask",
+ "displayName": "Set Status to Rejected",
+ "config": { "fieldName": "status", "fieldValue": "Rejected" },
+ "input": ["relatedEntity", "updatedBy"],
+ "inputNamespaceMap": { "relatedEntity": "global", "updatedBy": "userApproval" }
+ },
+ {
+ "name": "setDraft",
+ "type": "automatedTask",
+ "subType": "setEntityAttributeTask",
+ "displayName": "Set Status to Draft",
+ "config": { "fieldName": "status", "fieldValue": "Draft" },
+ "input": ["relatedEntity", "updatedBy"],
+ "inputNamespaceMap": { "relatedEntity": "global", "updatedBy": "global" }
+ },
+ { "name": "end", "type": "endEvent", "subType": "endEvent" }
+ ],
+ "edges": [
+ { "from": "start", "to": "checkChangeDesc" },
+ { "from": "checkChangeDesc", "to": "userApproval", "condition": "true" },
+ { "from": "checkChangeDesc", "to": "setDraft", "condition": "false" },
+ { "from": "userApproval", "to": "setApproved", "condition": "true" },
+ { "from": "userApproval", "to": "setRejected", "condition": "false" },
+ { "from": "setApproved", "to": "end" },
+ { "from": "setRejected", "to": "end" },
+ { "from": "setDraft", "to": "end" }
+ ]
+ }
+ """
+ .formatted(workflowName);
+ }
+
+ private void patchAddTag(OpenMetadataClient client, UUID schemaId, String tagFqn)
+ throws Exception {
+ String patch =
+ String.format(
+ "[{\"op\":\"add\",\"path\":\"/tags\",\"value\":[{\"tagFQN\":\"%s\","
+ + "\"source\":\"Classification\",\"labelType\":\"Manual\",\"state\":\"Confirmed\"}]}]",
+ tagFqn);
+ client.databaseSchemas().patch(schemaId, MAPPER.readTree(patch));
+ }
+
+ private void patchAddDomain(OpenMetadataClient client, UUID schemaId, Domain domain)
+ throws Exception {
+ String patch =
+ String.format(
+ """
+ [
+ {
+ "op": "add",
+ "path": "/domains",
+ "value": [ { "id": "%s", "type": "domain", "name": "%s", "fullyQualifiedName": "%s" } ]
+ }
+ ]
+ """,
+ domain.getId(), domain.getName(), domain.getFullyQualifiedName());
+ client.databaseSchemas().patch(schemaId, MAPPER.readTree(patch));
+ }
+
+ private Task awaitSingleOpenApprovalTask(OpenMetadataClient client, String schemaFqn) {
+ await("approval task to be created for " + schemaFqn)
+ .atMost(Duration.ofSeconds(60))
+ .pollInterval(Duration.ofSeconds(2))
+ .ignoreExceptions()
+ .until(() -> !listOpenApprovalTasks(client, schemaFqn).getData().isEmpty());
+ try {
+ return listOpenApprovalTasks(client, schemaFqn).getData().getFirst();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private boolean hasApprovalTaskWithStatus(
+ OpenMetadataClient client, String schemaFqn, TaskEntityStatus status) {
+ boolean found;
+ try {
+ Map filters = new HashMap<>();
+ filters.put("status", status.value());
+ filters.put("category", TaskCategory.Approval.value());
+ filters.put("aboutEntity", schemaFqn);
+ found = !client.tasks().listWithFilters(filters).getData().isEmpty();
+ } catch (Exception e) {
+ found = false;
+ }
+ return found;
+ }
+
+ private void cleanupSupersedeFixtures(OpenMetadataClient client, SupersedeFixtures fx) {
+ try {
+ client.workflowDefinitions().delete(fx.workflow.getId().toString());
+ client.databaseSchemas().delete(fx.schema.getId().toString());
+ client.databases().delete(fx.database.getId().toString());
+ client.databaseServices().delete(fx.dbService.getId().toString());
+ client.domains().delete(fx.financeDomain.getId().toString());
+ client.users().delete(fx.owner.getId().toString());
+ } catch (Exception e) {
+ LOG.warn("Cleanup error: {}", e.getMessage());
+ }
+ }
}
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java
index bbf1f88a816b..98b9847e5ad9 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/WorkflowHandler.java
@@ -40,7 +40,6 @@
import org.flowable.engine.runtime.ProcessInstance;
import org.flowable.job.api.Job;
import org.flowable.task.api.Task;
-import org.jdbi.v3.core.transaction.TransactionIsolationLevel;
import org.openmetadata.schema.configuration.WorkflowSettings;
import org.openmetadata.schema.entity.teams.User;
import org.openmetadata.schema.governance.workflows.WorkflowDefinition;
@@ -49,7 +48,6 @@
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.Include;
import org.openmetadata.schema.utils.JsonUtils;
-import org.openmetadata.schema.utils.ResultList;
import org.openmetadata.service.Entity;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.clients.pipeline.PipelineServiceClientFactory;
@@ -59,7 +57,6 @@
import org.openmetadata.service.governance.workflows.flowable.sql.UnlockJobSql;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.DeadlockRetry;
-import org.openmetadata.service.jdbi3.ListFilter;
import org.openmetadata.service.jdbi3.SystemRepository;
import org.openmetadata.service.jdbi3.TaskRepository;
import org.openmetadata.service.jdbi3.WorkflowDefinitionRepository;
@@ -67,7 +64,6 @@
import org.openmetadata.service.jdbi3.WorkflowInstanceStateRepository;
import org.openmetadata.service.jdbi3.locator.ConnectionType;
import org.openmetadata.service.resources.services.ingestionpipelines.IngestionPipelineMapper;
-import org.openmetadata.service.util.EntityUtil;
@Slf4j
public class WorkflowHandler {
@@ -1621,143 +1617,55 @@ public void terminateWorkflow(String workflowName) {
instance.getId(), "Terminating all instances due to user request."));
}
- public void terminateDuplicateInstances(
- String mainWorkflowDefinitionName, String entityLink, String currentProcessInstanceId) {
+ /**
+ * Terminates a single, specific workflow instance that has been superseded by a newer run.
+ *
+ * Invoked from {@code CreateTask} when a fresh approval run produces a new task for an entity
+ * that already had an open approval task from an earlier run. Only the superseded run's main
+ * process instance (matched by business key = workflow instance id and the main process
+ * definition key) is deleted, and the corresponding {@link WorkflowInstance} record is marked
+ * FAILED for audit. Failures are swallowed: this is best-effort cleanup of an orphaned Flowable
+ * process and must never propagate into — or roll back — the caller's transaction.
+ */
+ public void terminateWorkflowInstance(
+ UUID workflowInstanceId, String mainWorkflowName, String reason) {
try {
- WorkflowInstanceRepository workflowInstanceRepository =
- (WorkflowInstanceRepository)
- Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE);
- WorkflowInstanceStateRepository workflowInstanceStateRepository =
- (WorkflowInstanceStateRepository)
- Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE_STATE);
-
- ListFilter filter = new ListFilter(null);
- filter.addQueryParam("entityLink", entityLink);
-
- long endTs = System.currentTimeMillis();
- long startTs = endTs - (7L * 24 * 60 * 60 * 1000);
-
- ResultList allInstances =
- workflowInstanceRepository.list(null, startTs, endTs, 100, filter, false);
-
- List candidateInstances =
- allInstances.getData().stream()
- .filter(
- instance -> WorkflowInstance.WorkflowStatus.RUNNING.equals(instance.getStatus()))
- .filter(
- instance -> {
- try {
- WorkflowDefinitionRepository repo =
- (WorkflowDefinitionRepository)
- Entity.getEntityRepository(Entity.WORKFLOW_DEFINITION);
- var def =
- repo.get(
- null,
- instance.getWorkflowDefinitionId(),
- EntityUtil.Fields.EMPTY_FIELDS);
- return mainWorkflowDefinitionName.equals(def.getName());
- } catch (Exception e) {
- return false;
- }
- })
- .toList();
-
- RuntimeService runtimeService = getInstance().getRuntimeService();
- List runningProcessInstances =
- runtimeService
+ ProcessInstance mainInstance =
+ getRuntimeService()
.createProcessInstanceQuery()
- .processDefinitionKey(mainWorkflowDefinitionName)
- .list();
-
- List conflictingInstances =
- candidateInstances.stream()
- .filter(
- instance -> {
- return runningProcessInstances.stream()
- .filter(pi -> !pi.getId().equals(currentProcessInstanceId))
- .anyMatch(pi -> pi.getBusinessKey().equals(instance.getId().toString()));
- })
- .toList();
-
- if (conflictingInstances.isEmpty()) {
- LOG.debug("No conflicting instances found to terminate for {}", mainWorkflowDefinitionName);
- return;
- }
-
- // Terminate Flowable process instances OUTSIDE any JDBI transaction.
- // Calling runtimeService.deleteProcessInstance() inside a JDBI transaction causes a race
- // condition: the uncommitted DELETE on ACT_RU_EXECUTION holds an X-lock, Flowable's async
- // job executor concurrently tries to INSERT a timer job referencing that execution (FK
- // S-lock wait), and when the JDBI tx commits the execution is gone, so the timer INSERT
- // fails with SQLIntegrityConstraintViolationException.
- for (WorkflowInstance instance : conflictingInstances) {
- ProcessInstance mainInstance =
- runningProcessInstances.stream()
- .filter(
- pi ->
- pi.getBusinessKey() != null
- && pi.getBusinessKey().equals(instance.getId().toString()))
- .findFirst()
- .orElse(null);
-
- if (mainInstance != null) {
- String processId = mainInstance.getId();
- long activeUserTasks =
- processEngine
- .getTaskService()
- .createTaskQuery()
- .processInstanceId(processId)
- .active()
- .count();
- if (activeUserTasks == 0) {
- LOG.debug(
- "Process instance {} has no active user tasks — it is auto-completing; skipping external deletion",
- processId);
- continue;
- }
- LOG.info(
- "Terminating main workflow instance {} for conflicting instance {}",
- mainInstance.getId(),
- instance.getId());
- try {
- runtimeService.deleteProcessInstance(
- processId, "Terminated due to conflicting workflow instance");
- } catch (FlowableObjectNotFoundException e) {
- LOG.debug(
- "Process instance {} already completed before termination, skipping", processId);
- }
- }
+ .processInstanceBusinessKey(workflowInstanceId.toString())
+ .processDefinitionKey(mainWorkflowName)
+ .singleResult();
+ if (mainInstance != null) {
+ deleteProcessInstanceQuietly(mainInstance.getId(), reason);
}
-
- Entity.getJdbi()
- .inTransaction(
- TransactionIsolationLevel.READ_COMMITTED,
- handle -> {
- try {
- for (WorkflowInstance instance : conflictingInstances) {
- workflowInstanceStateRepository.markInstanceStatesAsFailed(
- instance.getId(), "Terminated due to conflicting workflow instance");
- workflowInstanceRepository.markInstanceAsFailed(
- instance.getId(), "Terminated due to conflicting workflow instance");
- }
- return null;
- } catch (Exception e) {
- LOG.error("Failed to update instance states in transaction: {}", e.getMessage());
- throw e;
- }
- });
-
- LOG.info(
- "Terminated {} conflicting instances of {} for entity {}",
- conflictingInstances.size(),
- mainWorkflowDefinitionName,
- entityLink);
-
+ markWorkflowInstanceFailed(workflowInstanceId, reason);
} catch (Exception e) {
- LOG.warn("Failed to terminate conflicting instances: {}", e.getMessage());
+ LOG.warn(
+ "Failed to terminate superseded workflow instance {}: {}",
+ workflowInstanceId,
+ e.getMessage());
+ }
+ }
+
+ private void deleteProcessInstanceQuietly(String processInstanceId, String reason) {
+ try {
+ getRuntimeService().deleteProcessInstance(processInstanceId, reason);
+ } catch (FlowableObjectNotFoundException e) {
+ LOG.debug("Process instance {} already completed before termination", processInstanceId);
}
}
+ private void markWorkflowInstanceFailed(UUID workflowInstanceId, String reason) {
+ WorkflowInstanceStateRepository workflowInstanceStateRepository =
+ (WorkflowInstanceStateRepository)
+ Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE_STATE);
+ WorkflowInstanceRepository workflowInstanceRepository =
+ (WorkflowInstanceRepository) Entity.getEntityTimeSeriesRepository(Entity.WORKFLOW_INSTANCE);
+ workflowInstanceStateRepository.markInstanceStatesAsFailed(workflowInstanceId, reason);
+ workflowInstanceRepository.markInstanceAsFailed(workflowInstanceId, reason);
+ }
+
public RuntimeService getRuntimeService() {
return processEngine.getRuntimeService();
}
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/CreateTask.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/CreateTask.java
index 670e3703503d..f9f66f759779 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/CreateTask.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/CreateTask.java
@@ -34,6 +34,7 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
@@ -88,6 +89,8 @@
public class CreateTask implements TaskListener {
static final String PENDING_WORKFLOW_START_STAGE_ID = "pending-workflow-start";
private static final String DEFAULT_SYSTEM_USER = "admin";
+ private static final String SUPERSEDED_BY_NEWER_RUN =
+ "Superseded by a newer approval workflow run for the same entity";
private static final int WORKFLOW_MANAGED_DRAFT_LOOKUP_MAX_ATTEMPTS = 6;
private static final long INITIAL_WORKFLOW_MANAGED_DRAFT_LOOKUP_DELAY_MILLIS = 25L;
private static final long MAX_WORKFLOW_MANAGED_DRAFT_LOOKUP_DELAY_MILLIS = 250L;
@@ -585,6 +588,19 @@ private Task createOrUpdateTask(
new com.fasterxml.jackson.core.type.TypeReference>() {}));
}
+ // A genuinely new approval run is materializing a task for this entity. Supersede any approval
+ // task still open from an earlier run of the same workflow before creating the new one, so a
+ // single live approval is maintained and the earlier run's orphaned Flowable process is cleaned
+ // up. Cancel-before-create keeps the "exactly one open" invariant.
+ supersedePriorApprovalTask(
+ delegateTask,
+ taskRepository,
+ entity,
+ taskCategory,
+ resolvedWorkflowDefinitionId,
+ workflowInstanceId,
+ updatedBy);
+
// Use the repository to create (handles taskId generation, FQN, relationships)
task = taskRepository.create(null, task);
@@ -608,6 +624,73 @@ private Task createOrUpdateTask(
return task;
}
+ private void supersedePriorApprovalTask(
+ DelegateTask delegateTask,
+ TaskRepository taskRepository,
+ EntityInterface entity,
+ TaskCategory taskCategory,
+ UUID currentWorkflowDefinitionId,
+ UUID currentWorkflowInstanceId,
+ String updatedBy) {
+ // Best-effort cleanup: failing to supersede the prior task must never abort creation of the new
+ // approval task, so all exceptions are contained here instead of bubbling up as a BpmnError.
+ try {
+ Task prior =
+ taskCategory == TaskCategory.Approval
+ ? taskRepository.findOpenTaskByEntityAndCategory(
+ entity.getFullyQualifiedName(), taskCategory)
+ : null;
+ if (isSupersedablePriorApprovalTask(
+ prior, currentWorkflowDefinitionId, currentWorkflowInstanceId)) {
+ cancelAndTerminatePriorApproval(delegateTask, taskRepository, prior, updatedBy);
+ }
+ } catch (Exception e) {
+ LOG.warn(
+ "[CreateTask] Failed to supersede prior approval task for entity '{}': {}",
+ entity.getFullyQualifiedName(),
+ e.getMessage());
+ }
+ }
+
+ static boolean isSupersedablePriorApprovalTask(
+ Task prior, UUID currentWorkflowDefinitionId, UUID currentWorkflowInstanceId) {
+ return prior != null
+ && prior.getWorkflowInstanceId() != null
+ && !prior.getWorkflowInstanceId().equals(currentWorkflowInstanceId)
+ && currentWorkflowDefinitionId != null
+ && currentWorkflowDefinitionId.equals(prior.getWorkflowDefinitionId());
+ }
+
+ private void cancelAndTerminatePriorApproval(
+ DelegateTask delegateTask, TaskRepository taskRepository, Task prior, String updatedBy) {
+ LOG.info(
+ "[CreateTask] Superseding prior approval task '{}' (workflowInstance '{}') with a newer run",
+ prior.getId(),
+ prior.getWorkflowInstanceId());
+ taskRepository.closeTask(prior, updatedBy, SUPERSEDED_BY_NEWER_RUN);
+ dispatchPriorInstanceTermination(delegateTask, prior.getWorkflowInstanceId());
+ }
+
+ private void dispatchPriorInstanceTermination(DelegateTask delegateTask, UUID priorInstanceId) {
+ String mainWorkflowName = inferWorkflowDefinitionRef(delegateTask);
+ // Dispatched async on purpose: deleteProcessInstance() then runs in its own Flowable command
+ // context, so a DB-level failure terminating the superseded process can never poison — or roll
+ // back — the current run's transaction, which has just created the replacement task.
+ CompletableFuture.runAsync(
+ () ->
+ WorkflowHandler.getInstance()
+ .terminateWorkflowInstance(
+ priorInstanceId, mainWorkflowName, SUPERSEDED_BY_NEWER_RUN))
+ .exceptionally(
+ ex -> {
+ LOG.error(
+ "[CreateTask] Failed to terminate superseded workflow instance '{}'",
+ priorInstanceId,
+ ex);
+ return null;
+ });
+ }
+
static List resolveExistingTaskAssignees(
Task existingTask,
List workflowAssignees,
diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FilterEntityImpl.java b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FilterEntityImpl.java
index 928beb6050a8..355e53e60965 100644
--- a/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FilterEntityImpl.java
+++ b/openmetadata-service/src/main/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FilterEntityImpl.java
@@ -10,7 +10,6 @@
import java.util.List;
import java.util.Optional;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
import org.flowable.common.engine.api.delegate.Expression;
import org.flowable.engine.delegate.DelegateExecution;
import org.flowable.engine.delegate.JavaDelegate;
@@ -25,7 +24,6 @@
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.governance.workflows.WorkflowHandler;
import org.openmetadata.service.governance.workflows.WorkflowVariableHandler;
-import org.openmetadata.service.governance.workflows.elements.TriggerFactory;
import org.openmetadata.service.jdbi3.RecognizerFeedbackRepository;
import org.openmetadata.service.resources.feeds.MessageParser;
import org.openmetadata.service.rules.RuleEngine;
@@ -74,37 +72,12 @@ public void execute(DelegateExecution execution) {
passesExcludedFilter(entityLinkStr, excludedFilter, includeFields, filterLogic);
}
- if (passesFilter) {
- String triggerWorkflowDefinitionKey =
- WorkflowHandler.getProcessDefinitionKeyFromId(execution.getProcessDefinitionId());
- String mainWorkflowDefinitionName =
- TriggerFactory.getMainWorkflowDefinitionNameFromTrigger(triggerWorkflowDefinitionKey);
- String currentProcessInstanceId = execution.getProcessInstanceId();
- // Terminate duplicate instances asynchronously to prevent a MySQL FK violation.
- // This JavaDelegate runs inside Flowable's signalEventReceived command context (TX_A).
- // Calling deleteProcessInstance() from within TX_A reuses the same DB transaction; the
- // uncommitted execution DELETE holds an X-lock, and Flowable's job executor may try to
- // INSERT a timer job referencing that execution (FK wait), causing a constraint violation
- // when TX_A commits. Running in a separate thread gives terminateDuplicateInstances its
- // own Flowable command context and independent DB transaction, avoiding that FK issue.
- // The deadlock (PostgreSQL lock-order reversal between deleteProcessInstance and a
- // concurrently auto-completing process) is prevented inside terminateDuplicateInstances
- // by skipping deletion for processes that have no active user tasks.
- final String workflowName = mainWorkflowDefinitionName;
- final String entityLinkStrFinal = entityLinkStr;
- final String processInstanceId = currentProcessInstanceId;
- CompletableFuture.runAsync(
- () ->
- WorkflowHandler.getInstance()
- .terminateDuplicateInstances(
- workflowName, entityLinkStrFinal, processInstanceId))
- .exceptionally(
- ex -> {
- log.error("Async termination of duplicate instances failed", ex);
- return null;
- });
- }
-
+ // Duplicate-instance supersede is intentionally NOT done here. Deciding "the new event
+ // supersedes the old" at trigger time is too early: this filter runs before the workflow
+ // evaluates checkChangeDescription/checkEntityAttributes, so a no-op event that passes the
+ // entity filter but creates no task would still kill a valid pending approval. The supersede
+ // now happens at task-creation time in CreateTask, where the run has genuinely produced a new
+ // approval task. See CreateTask#supersedePriorApprovalTask.
String workflowKey =
WorkflowHandler.getProcessDefinitionKeyFromId(execution.getProcessDefinitionId());
log.debug("Trigger {} - Entity {} passes filter: {}", workflowKey, entityLinkStr, passesFilter);
diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/CreateTaskTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/CreateTaskTest.java
index 96214bc45a12..1cae5cf3e7dd 100644
--- a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/CreateTaskTest.java
+++ b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/nodes/userTask/CreateTaskTest.java
@@ -585,4 +585,73 @@ void testParseMillisFromIso8601DurationReturnsFallbackForInvalidInput() {
void testParseMillisFromIso8601DurationReturnsFallbackForNullFallback() {
assertNull(CreateTask.parseMillisFromIso8601Duration("garbage", null));
}
+
+ // ---- isSupersedablePriorApprovalTask ----
+
+ @Test
+ void testIsSupersedableWhenPriorBelongsToEarlierRunOfSameWorkflow() {
+ UUID workflowDefinitionId = UUID.randomUUID();
+ Task prior =
+ new Task()
+ .withId(UUID.randomUUID())
+ .withWorkflowDefinitionId(workflowDefinitionId)
+ .withWorkflowInstanceId(UUID.randomUUID());
+
+ assertTrue(
+ CreateTask.isSupersedablePriorApprovalTask(prior, workflowDefinitionId, UUID.randomUUID()));
+ }
+
+ @Test
+ void testIsNotSupersedableWhenNoPriorTaskExists() {
+ assertFalse(
+ CreateTask.isSupersedablePriorApprovalTask(null, UUID.randomUUID(), UUID.randomUUID()));
+ }
+
+ @Test
+ void testIsNotSupersedableWhenPriorHasNoWorkflowInstance() {
+ UUID workflowDefinitionId = UUID.randomUUID();
+ Task prior =
+ new Task().withId(UUID.randomUUID()).withWorkflowDefinitionId(workflowDefinitionId);
+
+ assertFalse(
+ CreateTask.isSupersedablePriorApprovalTask(prior, workflowDefinitionId, UUID.randomUUID()));
+ }
+
+ @Test
+ void testIsNotSupersedableWhenPriorIsTheSameRun() {
+ UUID workflowDefinitionId = UUID.randomUUID();
+ UUID workflowInstanceId = UUID.randomUUID();
+ Task prior =
+ new Task()
+ .withId(UUID.randomUUID())
+ .withWorkflowDefinitionId(workflowDefinitionId)
+ .withWorkflowInstanceId(workflowInstanceId);
+
+ assertFalse(
+ CreateTask.isSupersedablePriorApprovalTask(
+ prior, workflowDefinitionId, workflowInstanceId));
+ }
+
+ @Test
+ void testIsNotSupersedableAcrossDifferentWorkflowDefinitions() {
+ Task prior =
+ new Task()
+ .withId(UUID.randomUUID())
+ .withWorkflowDefinitionId(UUID.randomUUID())
+ .withWorkflowInstanceId(UUID.randomUUID());
+
+ assertFalse(
+ CreateTask.isSupersedablePriorApprovalTask(prior, UUID.randomUUID(), UUID.randomUUID()));
+ }
+
+ @Test
+ void testIsNotSupersedableWhenCurrentWorkflowDefinitionUnknown() {
+ Task prior =
+ new Task()
+ .withId(UUID.randomUUID())
+ .withWorkflowDefinitionId(UUID.randomUUID())
+ .withWorkflowInstanceId(UUID.randomUUID());
+
+ assertFalse(CreateTask.isSupersedablePriorApprovalTask(prior, null, UUID.randomUUID()));
+ }
}