Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package io.temporal.workflow.childWorkflowTests;

import static org.junit.Assert.assertTrue;

import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.enums.v1.EventType;
import io.temporal.client.WorkflowFailedException;
import io.temporal.client.WorkflowStub;
import io.temporal.failure.ChildWorkflowFailure;
import io.temporal.failure.TerminatedFailure;
import io.temporal.internal.Signal;
import io.temporal.testing.internal.SDKTestWorkflowRule;
import io.temporal.workflow.ChildWorkflowOptions;
import io.temporal.workflow.Workflow;
import io.temporal.workflow.shared.TestWorkflows.NoArgsWorkflow;
import io.temporal.workflow.shared.TestWorkflows.TestWorkflowReturnString;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

public class ChildWorkflowTerminationTest {

private static final Signal childStarted = new Signal();

@Rule
public SDKTestWorkflowRule testWorkflowRule =
SDKTestWorkflowRule.newBuilder()
.setWorkflowTypes(TestParentWorkflowImpl.class, TestChildWorkflowImpl.class)
.build();

@Before
public void setUp() throws Exception {
childStarted.clearSignal();
}

@Test
public void testChildWorkflowTermination() throws Exception {
WorkflowStub client =
testWorkflowRule.newUntypedWorkflowStubTimeoutOptions("TestWorkflowReturnString");
WorkflowExecution execution = client.start();
childStarted.waitForSignal();
testWorkflowRule
.getWorkflowClient()
.newUntypedWorkflowStub("test-terminated-child-workflow-id")
.terminate("Terminating child for test");
try {
client.getResult(String.class);
Assert.fail("unreachable");
} catch (WorkflowFailedException e) {
assertTrue(e.getCause() instanceof ChildWorkflowFailure);
assertTrue(e.getCause().getCause() instanceof TerminatedFailure);
}
testWorkflowRule.assertHistoryEvent(
execution.getWorkflowId(), EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TERMINATED);
}

public static class TestParentWorkflowImpl implements TestWorkflowReturnString {

@Override
public String execute() {
NoArgsWorkflow child =
Workflow.newChildWorkflowStub(
NoArgsWorkflow.class,
ChildWorkflowOptions.newBuilder()
.setWorkflowId("test-terminated-child-workflow-id")
.build());
child.execute();
return "unreachable";
}
}

public static class TestChildWorkflowImpl implements NoArgsWorkflow {

@Override
public void execute() {
childStarted.signal();
Workflow.await(() -> false);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,8 @@ public static StateMachine<ChildWorkflowData> newChildWorkflowStateMachine(
.add(STARTED, COMPLETE, COMPLETED, StateMachines::childWorkflowCompleted)
.add(STARTED, FAIL, FAILED, StateMachines::childWorkflowFailed)
.add(STARTED, TIME_OUT, TIMED_OUT, StateMachines::timeoutChildWorkflow)
.add(STARTED, CANCEL, CANCELED, StateMachines::childWorkflowCanceled);
.add(STARTED, CANCEL, CANCELED, StateMachines::childWorkflowCanceled)
.add(STARTED, TERMINATE, TERMINATED, StateMachines::childWorkflowTerminated);
}

public static StateMachine<UpdateWorkflowExecutionData> newUpdateWorkflowExecution(
Expand Down Expand Up @@ -1171,6 +1172,24 @@ private static void childWorkflowCanceled(
ctx.addEvent(event);
}

private static void childWorkflowTerminated(
RequestContext ctx,
ChildWorkflowData data,
ChildWorkflowExecutionTerminatedEventAttributes a,
long notUsed) {
ChildWorkflowExecutionTerminatedEventAttributes updatedAttr =
a.toBuilder()
.setInitiatedEventId(data.initiatedEventId)
.setStartedEventId(data.startedEventId)
.build();
HistoryEvent event =
HistoryEvent.newBuilder()
.setEventType(EventType.EVENT_TYPE_CHILD_WORKFLOW_EXECUTION_TERMINATED)
.setChildWorkflowExecutionTerminatedEventAttributes(updatedAttr)
.build();
ctx.addEvent(event);
}

@SuppressWarnings("deprecation")
private static void initiateChildWorkflow(
RequestContext ctx,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ void failStartChildWorkflow(

void childWorkflowCanceled(String workflowId, ChildWorkflowExecutionCanceledEventAttributes a);

void childWorkflowTerminated(
String workflowId, ChildWorkflowExecutionTerminatedEventAttributes a);

@Nullable
PollWorkflowTaskQueueResponse startWorkflow(
boolean continuedAsNew,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1441,6 +1441,19 @@ public void childWorkflowCanceled(
});
}

@Override
public void childWorkflowTerminated(
String activityId, ChildWorkflowExecutionTerminatedEventAttributes a) {
update(
ctx -> {
StateMachine<ChildWorkflowData> child = getChildWorkflow(a.getInitiatedEventId());
child.action(StateMachines.Action.TERMINATE, ctx, a, 0);
childWorkflows.remove(a.getInitiatedEventId());
scheduleWorkflowTask(ctx);
ctx.unlockTimer("childWorkflowTerminated");
});
}

private void processStartTimer(
RequestContext ctx,
StartTimerCommandAttributes a,
Expand Down Expand Up @@ -2964,6 +2977,33 @@ public void terminateWorkflowExecution(TerminateWorkflowExecutionRequest request
workflow.action(Action.TERMINATE, ctx, request, 0);
workflowTaskStateMachine.getData().workflowCompleted = true;
processWorkflowCompletionCallbacks(ctx);
if (parent.isPresent()) {
ctx.lockTimer("terminateWorkflowExecution notify parent"); // unlocked by the parent
ChildWorkflowExecutionTerminatedEventAttributes a =
ChildWorkflowExecutionTerminatedEventAttributes.newBuilder()
.setInitiatedEventId(parentChildInitiatedEventId.getAsLong())
.setNamespace(ctx.getNamespace())
.setWorkflowExecution(ctx.getExecution())
.setWorkflowType(startRequest.getWorkflowType())
.build();
ForkJoinPool.commonPool()
.execute(
() -> {
try {
parent
.get()
.childWorkflowTerminated(
ctx.getExecutionId().getWorkflowId().getWorkflowId(), a);
} catch (StatusRuntimeException e) {
// Parent might already close
if (e.getStatus().getCode() != Status.Code.NOT_FOUND) {
log.error("Failure reporting child termination", e);
}
} catch (Throwable e) {
log.error("Failure reporting child termination", e);
}
});
}
});
}

Expand Down