Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -9415,7 +9416,7 @@
.untilAsserted(
() -> {
ListResponse<Task> tasks = listOpenApprovalTasks(client, tableFqn);
assertTrue(

Check failure on line 9419 in openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java

View workflow job for this annotation

GitHub Actions / Test Report

WorkflowDefinitionResourceIT.test_MultipleFieldChangesWithIncludeFields(TestNamespace)

Assertion condition Approval task should be created for domain field change ==> expected: <true> but was: <false> within 1 minutes.
Raw output
org.awaitility.core.ConditionTimeoutException: Assertion condition Approval task should be created for domain field change ==> expected: <true> but was: <false> within 1 minutes.
	at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:167)
	at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:119)
	at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:31)
	at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:985)
	at org.awaitility.core.ConditionFactory.untilAsserted(ConditionFactory.java:769)
	at org.openmetadata.it.tests.WorkflowDefinitionResourceIT.test_MultipleFieldChangesWithIncludeFields(WorkflowDefinitionResourceIT.java:9416)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
Caused by: org.opentest4j.AssertionFailedError: Approval task should be created for domain field change ==> expected: <true> but was: <false>
	at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
	at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
	at org.junit.jupiter.api.AssertTrue.failNotTrue(AssertTrue.java:63)
	at org.junit.jupiter.api.AssertTrue.assertTrue(AssertTrue.java:36)
	at org.junit.jupiter.api.Assertions.assertTrue(Assertions.java:214)
	at org.openmetadata.it.tests.WorkflowDefinitionResourceIT.lambda$test_MultipleFieldChangesWithIncludeFields$123(WorkflowDefinitionResourceIT.java:9419)
	at org.awaitility.core.AssertionCondition.lambda$new$0(AssertionCondition.java:53)
	at org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:248)
	at org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:235)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
tasks.getData().size() >= 2,
"Approval task should be created for domain field change");
});
Expand Down Expand Up @@ -10280,4 +10281,357 @@

LOG.info("test_SelfApprovalPrevention completed successfully");
}

/**
* Bug 2: a no-op change event must not invalidate a pending approval.
*
* <p>CE1 changes a gated field (tag) → approval task created and parked. CE2 changes a different
* trigger field (domain) that passes the entity-level trigger filter but fails the workflow's
* checkChangeDescription rule, so it creates no task. The pending task must survive AND remain
* resolvable — resolving it is the discriminator: before the fix, CE2 eagerly terminated CE1's
* Flowable process at trigger time, leaving the OM task orphaned and unresolvable.
*/
@Test
void test_NoOpEventDoesNotInvalidatePendingApproval(TestNamespace ns) throws Exception {
LOG.info("Starting test_NoOpEventDoesNotInvalidatePendingApproval");
OpenMetadataClient client = SdkClients.adminClient();
ensureWorkflowEventConsumerIsActive(client);
String suffix = String.valueOf(System.currentTimeMillis());

SupersedeFixtures fx = createTagApprovalFixtures(client, ns, suffix, "noop");
String schemaFqn = fx.schema.getFullyQualifiedName();

patchAddTag(client, fx.schema.getId(), "PII.Sensitive");
Task originalTask = awaitSingleOpenApprovalTask(client, schemaFqn);
LOG.debug("CE1 created approval task {}", originalTask.getId());

patchAddDomain(client, fx.schema.getId(), fx.financeDomain);

await("no-op event must leave the original approval task untouched")
.atMost(Duration.ofSeconds(45))
.pollInterval(Duration.ofSeconds(2))
.untilAsserted(
() -> {
ListResponse<Task> open = listOpenApprovalTasks(client, schemaFqn);
assertEquals(
1, open.getData().size(), "Exactly one approval task should remain open");
assertEquals(

Check failure on line 10318 in openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java

View workflow job for this annotation

GitHub Actions / Test Report

WorkflowDefinitionResourceIT.test_NoOpEventDoesNotInvalidatePendingApproval(TestNamespace)

Condition with alias 'no-op event must leave the original approval task untouched' didn't complete within 45 seconds because assertion condition with alias no-op event must leave the original approval task untouched The original pending approval task must be the one still open ==> expected: <9eac21e4-a69f-4027-b1bd-47c0ec63e13d> but was: <de9f8a6f-614a-4450-ab97-4aa0a67750f5>.
Raw output
org.awaitility.core.ConditionTimeoutException: Condition with alias 'no-op event must leave the original approval task untouched' didn't complete within 45 seconds because assertion condition with alias no-op event must leave the original approval task untouched The original pending approval task must be the one still open ==> expected: <9eac21e4-a69f-4027-b1bd-47c0ec63e13d> but was: <de9f8a6f-614a-4450-ab97-4aa0a67750f5>.
	at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:167)
	at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:119)
	at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:31)
	at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:985)
	at org.awaitility.core.ConditionFactory.untilAsserted(ConditionFactory.java:769)
	at org.openmetadata.it.tests.WorkflowDefinitionResourceIT.test_NoOpEventDoesNotInvalidatePendingApproval(WorkflowDefinitionResourceIT.java:10313)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
Caused by: org.opentest4j.AssertionFailedError: The original pending approval task must be the one still open ==> expected: <9eac21e4-a69f-4027-b1bd-47c0ec63e13d> but was: <de9f8a6f-614a-4450-ab97-4aa0a67750f5>
	at org.junit.jupiter.api.AssertionFailureBuilder.build(AssertionFailureBuilder.java:151)
	at org.junit.jupiter.api.AssertionFailureBuilder.buildAndThrow(AssertionFailureBuilder.java:132)
	at org.junit.jupiter.api.AssertEquals.failNotEqual(AssertEquals.java:197)
	at org.junit.jupiter.api.AssertEquals.assertEquals(AssertEquals.java:182)
	at org.junit.jupiter.api.Assertions.assertEquals(Assertions.java:1156)
	at org.openmetadata.it.tests.WorkflowDefinitionResourceIT.lambda$test_NoOpEventDoesNotInvalidatePendingApproval$130(WorkflowDefinitionResourceIT.java:10318)
	at org.awaitility.core.AssertionCondition.lambda$new$0(AssertionCondition.java:53)
	at org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:248)
	at org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:235)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
originalTask.getId(),
open.getData().getFirst().getId(),
"The original pending approval task must be the one still open");
});

OpenMetadataClient ownerClient =
SdkClients.createClient(fx.owner.getName(), fx.owner.getEmail(), new String[] {});
ownerClient
.tasks()
.resolve(
originalTask.getId().toString(),
new org.openmetadata.schema.api.tasks.ResolveTask()
.withResolutionType(TaskResolutionType.Approved));

await("surviving task must resolve and drive the workflow to Approved")
.atMost(Duration.ofSeconds(45))
.pollInterval(Duration.ofSeconds(2))
.until(() -> hasApprovalTaskWithStatus(client, schemaFqn, TaskEntityStatus.Approved));

LOG.info("test_NoOpEventDoesNotInvalidatePendingApproval completed successfully");
cleanupSupersedeFixtures(client, fx);
}
Comment on lines +10333 to +10340

/**
* Bug 1: a new approval run supersedes the stale pending task.
*
* <p>CE1 creates and parks an approval task. CE3 makes another gated change, producing a new
* approval run. When the new run materializes its task, the prior task must be Cancelled (no
* orphaned OM task) and exactly one new open task must remain. The new task must be resolvable,
* proving its Flowable process is live.
*/
@Test
void test_NewApprovalRunSupersedesStalePendingTask(TestNamespace ns) throws Exception {
LOG.info("Starting test_NewApprovalRunSupersedesStalePendingTask");
OpenMetadataClient client = SdkClients.adminClient();
ensureWorkflowEventConsumerIsActive(client);
String suffix = String.valueOf(System.currentTimeMillis());

SupersedeFixtures fx = createTagApprovalFixtures(client, ns, suffix, "supersede");
String schemaFqn = fx.schema.getFullyQualifiedName();

patchAddTag(client, fx.schema.getId(), "PII.Sensitive");
Task firstTask = awaitSingleOpenApprovalTask(client, schemaFqn);
LOG.debug("CE1 created approval task {}", firstTask.getId());

patchAddTag(client, fx.schema.getId(), "PII.None");

await("new approval run must supersede the prior task")
.atMost(Duration.ofSeconds(60))
.pollInterval(Duration.ofSeconds(2))
.untilAsserted(
() -> {
ListResponse<Task> open = listOpenApprovalTasks(client, schemaFqn);
assertEquals(1, open.getData().size(), "Exactly one approval task should be open");
assertNotEquals(
firstTask.getId(),
open.getData().getFirst().getId(),
"A new approval task should have replaced the prior one");
assertTrue(
hasApprovalTaskWithStatus(client, schemaFqn, TaskEntityStatus.Cancelled),
"The superseded approval task must be Cancelled, not orphaned");
});

Task supersedingTask = listOpenApprovalTasks(client, schemaFqn).getData().getFirst();
OpenMetadataClient ownerClient =
SdkClients.createClient(fx.owner.getName(), fx.owner.getEmail(), new String[] {});
ownerClient
.tasks()
.resolve(
supersedingTask.getId().toString(),
new org.openmetadata.schema.api.tasks.ResolveTask()
.withResolutionType(TaskResolutionType.Approved));

await("superseding task must resolve and drive the workflow to Approved")
.atMost(Duration.ofSeconds(45))
.pollInterval(Duration.ofSeconds(2))
.until(() -> hasApprovalTaskWithStatus(client, schemaFqn, TaskEntityStatus.Approved));

LOG.info("test_NewApprovalRunSupersedesStalePendingTask completed successfully");
cleanupSupersedeFixtures(client, fx);
}
Comment on lines +10392 to +10399

private static class SupersedeFixtures {
final User owner;
final DatabaseService dbService;
final Database database;
final DatabaseSchema schema;
final Domain financeDomain;
final WorkflowDefinition workflow;

SupersedeFixtures(
User owner,
DatabaseService dbService,
Database database,
DatabaseSchema schema,
Domain financeDomain,
WorkflowDefinition workflow) {
this.owner = owner;
this.dbService = dbService;
this.database = database;
this.schema = schema;
this.financeDomain = financeDomain;
this.workflow = workflow;
}
}

private SupersedeFixtures createTagApprovalFixtures(
OpenMetadataClient client, TestNamespace ns, String suffix, String label) throws Exception {
User owner =
client
.users()
.create(
new CreateUser()
.withName(label + "_owner_" + suffix)
.withEmail(label + "_owner_" + suffix + "@example.com")
.withDisplayName("Supersede Owner"));

DatabaseService dbService =
client
.databaseServices()
.create(
new CreateDatabaseService()
.withName(ns.prefix(label + "-db-service"))
.withServiceType(DatabaseServiceType.Mysql)
.withConnection(
new DatabaseConnection()
.withConfig(
new MysqlConnection()
.withHostPort("localhost:3306")
.withUsername("test")
.withAuthType(new basicAuth().withPassword("test")))));

Database database =
client
.databases()
.create(
new CreateDatabase()
.withName(ns.prefix(label + "-database"))
.withService(dbService.getFullyQualifiedName()));

DatabaseSchema schema =
client
.databaseSchemas()
.create(
new CreateDatabaseSchema()
.withName(ns.prefix(label + "-schema"))
.withDatabase(database.getFullyQualifiedName())
.withOwners(List.of(owner.getEntityReference())));

Domain financeDomain =
client
.domains()
.create(
new CreateDomain()
.withName(ns.prefix(label + "-Finance"))
.withDescription("Finance domain for supersede testing")
.withDomainType(CreateDomain.DomainType.CONSUMER_ALIGNED));

String workflowName = "SupersedeApprovalWorkflow_" + label + "_" + suffix;
CreateWorkflowDefinition workflowRequest =
MAPPER.readValue(tagApprovalWorkflowJson(workflowName), CreateWorkflowDefinition.class);
WorkflowDefinition workflow = client.workflowDefinitions().create(workflowRequest);
trackWorkflow(workflowName, workflow.getId().toString());
waitForWorkflowDeployment(client, workflowName);

return new SupersedeFixtures(owner, dbService, database, schema, financeDomain, workflow);
}

private String tagApprovalWorkflowJson(String workflowName) {
return """
{
"name": "%s",
"displayName": "Supersede Approval Workflow",
"description": "Creates an approval task only when gated tags change",
"type": "eventBasedEntity",
"trigger": {
"type": "eventBasedEntity",
"config": { "events": ["Updated"], "entityTypes": ["databaseSchema"] },
"output": ["relatedEntity", "updatedBy"]
},
"nodes": [
{ "name": "start", "type": "startEvent", "subType": "startEvent" },
{
"name": "checkChangeDesc",
"type": "automatedTask",
"subType": "checkChangeDescriptionTask",
"displayName": "Check Gated Tags Changed",
"config": { "condition": "OR", "rules": { "tags": ["PII.Sensitive", "PII.None"] } },
"input": ["relatedEntity"],
"inputNamespaceMap": { "relatedEntity": "global" },
"branches": ["true", "false"]
},
{
"name": "userApproval",
"type": "userTask",
"subType": "userApprovalTask",
"displayName": "Approve Changes",
"input": ["relatedEntity"],
"output": ["updatedBy"],
"branches": ["true", "false"],
"config": {
"assignees": { "addReviewers": true, "addOwners": true, "candidates": [] },
"approvalThreshold": 1,
"rejectionThreshold": 1
},
"inputNamespaceMap": { "relatedEntity": "global" }
},
{
"name": "setApproved",
"type": "automatedTask",
"subType": "setEntityAttributeTask",
"displayName": "Set Status to Approved",
"config": { "fieldName": "status", "fieldValue": "Approved" },
"input": ["relatedEntity", "updatedBy"],
"inputNamespaceMap": { "relatedEntity": "global", "updatedBy": "userApproval" }
},
{
"name": "setRejected",
"type": "automatedTask",
"subType": "setEntityAttributeTask",
"displayName": "Set Status to Rejected",
"config": { "fieldName": "status", "fieldValue": "Rejected" },
"input": ["relatedEntity", "updatedBy"],
"inputNamespaceMap": { "relatedEntity": "global", "updatedBy": "userApproval" }
},
{
"name": "setDraft",
"type": "automatedTask",
"subType": "setEntityAttributeTask",
"displayName": "Set Status to Draft",
"config": { "fieldName": "status", "fieldValue": "Draft" },
"input": ["relatedEntity", "updatedBy"],
"inputNamespaceMap": { "relatedEntity": "global", "updatedBy": "global" }
},
{ "name": "end", "type": "endEvent", "subType": "endEvent" }
],
"edges": [
{ "from": "start", "to": "checkChangeDesc" },
{ "from": "checkChangeDesc", "to": "userApproval", "condition": "true" },
{ "from": "checkChangeDesc", "to": "setDraft", "condition": "false" },
{ "from": "userApproval", "to": "setApproved", "condition": "true" },
{ "from": "userApproval", "to": "setRejected", "condition": "false" },
{ "from": "setApproved", "to": "end" },
{ "from": "setRejected", "to": "end" },
{ "from": "setDraft", "to": "end" }
]
}
"""
.formatted(workflowName);
}

private void patchAddTag(OpenMetadataClient client, UUID schemaId, String tagFqn)
throws Exception {
String patch =
String.format(
"[{\"op\":\"add\",\"path\":\"/tags\",\"value\":[{\"tagFQN\":\"%s\","
+ "\"source\":\"Classification\",\"labelType\":\"Manual\",\"state\":\"Confirmed\"}]}]",
tagFqn);
client.databaseSchemas().patch(schemaId, MAPPER.readTree(patch));
}

private void patchAddDomain(OpenMetadataClient client, UUID schemaId, Domain domain)
throws Exception {
String patch =
String.format(
"""
[
{
"op": "add",
"path": "/domains",
"value": [ { "id": "%s", "type": "domain", "name": "%s", "fullyQualifiedName": "%s" } ]
}
]
""",
domain.getId(), domain.getName(), domain.getFullyQualifiedName());
client.databaseSchemas().patch(schemaId, MAPPER.readTree(patch));
}

private Task awaitSingleOpenApprovalTask(OpenMetadataClient client, String schemaFqn) {
await("approval task to be created for " + schemaFqn)
.atMost(Duration.ofSeconds(60))
.pollInterval(Duration.ofSeconds(2))
.ignoreExceptions()
.until(() -> !listOpenApprovalTasks(client, schemaFqn).getData().isEmpty());
try {
return listOpenApprovalTasks(client, schemaFqn).getData().getFirst();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private boolean hasApprovalTaskWithStatus(
OpenMetadataClient client, String schemaFqn, TaskEntityStatus status) {
boolean found;
try {
Map<String, String> filters = new HashMap<>();
filters.put("status", status.value());
filters.put("category", TaskCategory.Approval.value());
filters.put("aboutEntity", schemaFqn);
found = !client.tasks().listWithFilters(filters).getData().isEmpty();
} catch (Exception e) {
found = false;
}
return found;
}

private void cleanupSupersedeFixtures(OpenMetadataClient client, SupersedeFixtures fx) {
try {
client.workflowDefinitions().delete(fx.workflow.getId().toString());
client.databaseSchemas().delete(fx.schema.getId().toString());
client.databases().delete(fx.database.getId().toString());
client.databaseServices().delete(fx.dbService.getId().toString());
client.domains().delete(fx.financeDomain.getId().toString());
client.users().delete(fx.owner.getId().toString());
} catch (Exception e) {
LOG.warn("Cleanup error: {}", e.getMessage());
}
}
}
Loading
Loading