diff --git a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/ForEachFuncTest.java b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/ForEachFuncTest.java index 801f578c..b24e8575 100644 --- a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/ForEachFuncTest.java +++ b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/ForEachFuncTest.java @@ -75,7 +75,7 @@ void testForEachEmit() { .build(); List publishedEvents = new CopyOnWriteArrayList<>(); - InMemoryEvents eventBroker = new InMemoryEvents(); + InMemoryEvents eventBroker = new LaggedInMemoryEvents(); eventBroker.register(eventType, ce -> publishedEvents.add(ce)); try (WorkflowApplication app = diff --git a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/LaggedInMemoryEvents.java b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/LaggedInMemoryEvents.java new file mode 100644 index 00000000..d2e4151e --- /dev/null +++ b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/LaggedInMemoryEvents.java @@ -0,0 +1,37 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.test; + +import io.cloudevents.CloudEvent; +import io.serverlessworkflow.impl.events.InMemoryEvents; +import java.util.concurrent.CompletableFuture; + +public class LaggedInMemoryEvents extends InMemoryEvents { + + @Override + public CompletableFuture publish(CloudEvent ce) { + + return super.publish(ce) + .thenRun( + () -> { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java index 26ca371b..b4c024be 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java @@ -75,23 +75,30 @@ protected ForExecutor(ForExecutorBuilder builder) { @Override protected CompletableFuture internalExecute( WorkflowContext workflow, TaskContext taskContext) { - Iterator iter = collectionExpr.apply(workflow, taskContext, taskContext.input()).iterator(); - int i = 0; - CompletableFuture future = - CompletableFuture.completedFuture(taskContext.input()); - while (iter.hasNext()) { + return buildLoopFuture( + workflow, + taskContext, + taskContext.input(), + collectionExpr.apply(workflow, taskContext, taskContext.input()).iterator(), + -1); + } + + private CompletableFuture buildLoopFuture( + WorkflowContext workflow, + TaskContext taskContext, + WorkflowModel input, + Iterator iter, + int index) { + final int newIndex = index + 1; + if (iter.hasNext()) { taskContext.variables().put(task.getFor().getEach(), iter.next()); - taskContext.variables().put(task.getFor().getAt(), i++); - if (whileExpr.map(w -> w.test(workflow, taskContext, taskContext.input())).orElse(true)) { - future = - future.thenCompose( - input -> - TaskExecutorHelper.processTaskList( - taskExecutor, workflow, Optional.of(taskContext), input)); - } else { - break; + taskContext.variables().put(task.getFor().getAt(), newIndex); + if (whileExpr.map(w -> w.test(workflow, taskContext, input)).orElse(true)) { + return TaskExecutorHelper.processTaskList( + taskExecutor, workflow, Optional.of(taskContext), input) + .thenCompose(output -> buildLoopFuture(workflow, taskContext, output, iter, newIndex)); } } - return future; + return CompletableFuture.completedFuture(input); } }