diff --git a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/ForEachFuncTest.java b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/ForEachFuncTest.java index 801f578cf..01737f035 100644 --- a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/ForEachFuncTest.java +++ b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/ForEachFuncTest.java @@ -25,7 +25,7 @@ import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder; import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowModel; -import io.serverlessworkflow.impl.events.InMemoryEvents; +import io.serverlessworkflow.impl.lifecycle.TraceExecutionListener; import java.time.Duration; import java.util.Collection; import java.util.List; @@ -35,13 +35,13 @@ public class ForEachFuncTest { - private static record Order(String id) {} + private record Order(String id) {} - private static record EnhancedOrder(String id, int salary) {} + private record EnhancedOrder(String id, int salary) {} - private static record OrdersPayload(List orders) {} + private record OrdersPayload(List orders) {} - private static record OrderName(String id, String name) {} + private record OrderName(String id, String name) {} @Test void testForEachIteration() { @@ -75,13 +75,14 @@ void testForEachEmit() { .build(); List publishedEvents = new CopyOnWriteArrayList<>(); - InMemoryEvents eventBroker = new InMemoryEvents(); + LaggedInMemoryEvents eventBroker = new LaggedInMemoryEvents(); eventBroker.register(eventType, ce -> publishedEvents.add(ce)); try (WorkflowApplication app = WorkflowApplication.builder() .withEventConsumer(eventBroker) .withEventPublisher(eventBroker) + .withListener(new TraceExecutionListener()) .build()) { app.workflowDefinition(workflow) .instance( diff --git a/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/LaggedInMemoryEvents.java b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/LaggedInMemoryEvents.java new file mode 100644 index 000000000..d2e4151ed --- /dev/null +++ b/experimental/test/src/test/java/io/serverlessworkflow/fluent/test/LaggedInMemoryEvents.java @@ -0,0 +1,37 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.test; + +import io.cloudevents.CloudEvent; +import io.serverlessworkflow.impl.events.InMemoryEvents; +import java.util.concurrent.CompletableFuture; + +public class LaggedInMemoryEvents extends InMemoryEvents { + + @Override + public CompletableFuture publish(CloudEvent ce) { + + return super.publish(ce) + .thenRun( + () -> { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + } +} diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java b/impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java index 05473090c..f94e5511d 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/events/InMemoryEvents.java @@ -30,6 +30,10 @@ */ public class InMemoryEvents extends AbstractTypeConsumer implements EventPublisher { + private final ExecutorServiceFactory serviceFactory; + private final Map> topicMap = new ConcurrentHashMap<>(); + private final AtomicReference> allConsumerRef = new AtomicReference<>(); + public InMemoryEvents() { this(new DefaultExecutorServiceFactory()); } @@ -38,12 +42,6 @@ public InMemoryEvents(ExecutorServiceFactory serviceFactory) { this.serviceFactory = serviceFactory; } - private ExecutorServiceFactory serviceFactory; - - private Map> topicMap = new ConcurrentHashMap<>(); - - private AtomicReference> allConsumerRef = new AtomicReference<>(); - @Override public void register(String topicName, Consumer consumer) { topicMap.put(topicName, consumer); diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java index 26ca371b2..b4c024bee 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/executors/ForExecutor.java @@ -75,23 +75,30 @@ protected ForExecutor(ForExecutorBuilder builder) { @Override protected CompletableFuture internalExecute( WorkflowContext workflow, TaskContext taskContext) { - Iterator iter = collectionExpr.apply(workflow, taskContext, taskContext.input()).iterator(); - int i = 0; - CompletableFuture future = - CompletableFuture.completedFuture(taskContext.input()); - while (iter.hasNext()) { + return buildLoopFuture( + workflow, + taskContext, + taskContext.input(), + collectionExpr.apply(workflow, taskContext, taskContext.input()).iterator(), + -1); + } + + private CompletableFuture buildLoopFuture( + WorkflowContext workflow, + TaskContext taskContext, + WorkflowModel input, + Iterator iter, + int index) { + final int newIndex = index + 1; + if (iter.hasNext()) { taskContext.variables().put(task.getFor().getEach(), iter.next()); - taskContext.variables().put(task.getFor().getAt(), i++); - if (whileExpr.map(w -> w.test(workflow, taskContext, taskContext.input())).orElse(true)) { - future = - future.thenCompose( - input -> - TaskExecutorHelper.processTaskList( - taskExecutor, workflow, Optional.of(taskContext), input)); - } else { - break; + taskContext.variables().put(task.getFor().getAt(), newIndex); + if (whileExpr.map(w -> w.test(workflow, taskContext, input)).orElse(true)) { + return TaskExecutorHelper.processTaskList( + taskExecutor, workflow, Optional.of(taskContext), input) + .thenCompose(output -> buildLoopFuture(workflow, taskContext, output, iter, newIndex)); } } - return future; + return CompletableFuture.completedFuture(input); } } diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/TraceExecutionListener.java b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TraceExecutionListener.java similarity index 83% rename from impl/test/src/test/java/io/serverlessworkflow/impl/test/TraceExecutionListener.java rename to impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TraceExecutionListener.java index 9e75cd38f..762a60586 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/TraceExecutionListener.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/lifecycle/TraceExecutionListener.java @@ -13,19 +13,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.impl.test; +package io.serverlessworkflow.impl.lifecycle; -import io.serverlessworkflow.impl.lifecycle.TaskCompletedEvent; -import io.serverlessworkflow.impl.lifecycle.TaskFailedEvent; -import io.serverlessworkflow.impl.lifecycle.TaskRetriedEvent; -import io.serverlessworkflow.impl.lifecycle.TaskStartedEvent; -import io.serverlessworkflow.impl.lifecycle.WorkflowCompletedEvent; -import io.serverlessworkflow.impl.lifecycle.WorkflowExecutionListener; -import io.serverlessworkflow.impl.lifecycle.WorkflowFailedEvent; -import io.serverlessworkflow.impl.lifecycle.WorkflowResumedEvent; -import io.serverlessworkflow.impl.lifecycle.WorkflowStartedEvent; -import io.serverlessworkflow.impl.lifecycle.WorkflowStatusEvent; -import io.serverlessworkflow.impl.lifecycle.WorkflowSuspendedEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java index 9369edc85..d8273b3c4 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/DBGenerator.java @@ -20,6 +20,7 @@ import io.serverlessworkflow.impl.WorkflowApplication; import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.lifecycle.TraceExecutionListener; import io.serverlessworkflow.impl.persistence.DefaultPersistenceInstanceHandlers; import io.serverlessworkflow.impl.persistence.PersistenceApplicationBuilder; import io.serverlessworkflow.impl.persistence.PersistenceInstanceHandlers; diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java index a05149de4..5d0143f0c 100644 --- a/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/MvStorePersistenceTest.java @@ -22,6 +22,7 @@ import io.serverlessworkflow.impl.WorkflowDefinition; import io.serverlessworkflow.impl.WorkflowInstance; import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.lifecycle.TraceExecutionListener; import io.serverlessworkflow.impl.persistence.DefaultPersistenceInstanceHandlers; import io.serverlessworkflow.impl.persistence.PersistenceApplicationBuilder; import io.serverlessworkflow.impl.persistence.PersistenceInstanceHandlers;