diff --git a/experimental/fluent/func/pom.xml b/experimental/fluent/func/pom.xml index dcf2aa96f..c113290f3 100644 --- a/experimental/fluent/func/pom.xml +++ b/experimental/fluent/func/pom.xml @@ -1,5 +1,6 @@ - + 4.0.0 io.serverlessworkflow diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/internal/CommonFuncOps.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/CommonFuncOps.java similarity index 86% rename from experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/internal/CommonFuncOps.java rename to experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/CommonFuncOps.java index fe0c05dd7..eb35b6b99 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/internal/CommonFuncOps.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/CommonFuncOps.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.serverlessworkflow.fluent.func.dsl.internal; +package io.serverlessworkflow.fluent.func.dsl; import io.cloudevents.CloudEventData; import io.serverlessworkflow.api.types.FlowDirectiveEnum; @@ -22,19 +22,17 @@ import io.serverlessworkflow.fluent.func.FuncSwitchTaskBuilder; import io.serverlessworkflow.fluent.func.configurers.FuncPredicateEventConfigurer; import io.serverlessworkflow.fluent.func.configurers.SwitchCaseConfigurer; -import io.serverlessworkflow.fluent.func.dsl.ReflectionUtils; -import io.serverlessworkflow.fluent.func.dsl.SwitchCaseSpec; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; -public interface CommonFuncOps { +interface CommonFuncOps { default Consumer fn(Function function, Class argClass) { return f -> f.function(function, argClass); } - default Consumer fn(Function function) { + default Consumer fn(SerializableFunction function) { Class clazz = ReflectionUtils.inferInputType(function); return f -> f.function(function, clazz); } @@ -51,8 +49,8 @@ default SwitchCaseSpec caseOf(Predicate when, Class whenClass) { return new SwitchCaseSpec().when(when, whenClass); } - default SwitchCaseSpec caseOf(Predicate when) { - return new SwitchCaseSpec().when(when); + default SwitchCaseSpec caseOf(SerializablePredicate when) { + return new SwitchCaseSpec().when(when, ReflectionUtils.inferInputType(when)); } default SwitchCaseConfigurer caseDefault(String task) { diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java index 55b9a289e..eeaea6792 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncDSL.java @@ -28,7 +28,6 @@ import io.serverlessworkflow.fluent.func.configurers.FuncPredicateEventConfigurer; import io.serverlessworkflow.fluent.func.configurers.FuncTaskConfigurer; import io.serverlessworkflow.fluent.func.configurers.SwitchCaseConfigurer; -import io.serverlessworkflow.fluent.func.dsl.internal.CommonFuncOps; import io.serverlessworkflow.fluent.spec.configurers.AuthenticationConfigurer; import io.serverlessworkflow.impl.TaskContextData; import io.serverlessworkflow.impl.WorkflowContextData; @@ -97,15 +96,15 @@ public static Consumer fn( * @param output type * @return a consumer that configures a {@code FuncCallTaskBuilder} */ - public static Consumer fn(Function function) { - return f -> f.function(function); + public static Consumer fn(SerializableFunction function) { + return f -> f.function(function, ReflectionUtils.inferInputType(function)); } /** * Compose multiple switch cases into a single configurer for {@link FuncSwitchTaskBuilder}. * - * @param cases one or more {@link SwitchCaseConfigurer} built via {@link #caseOf(Predicate)} or - * {@link #caseDefault(String)} + * @param cases one or more {@link SwitchCaseConfigurer} built via {@link + * #caseOf(SerializablePredicate)} or {@link #caseDefault(String)} * @return a consumer to apply on a switch task builder */ public static Consumer cases(SwitchCaseConfigurer... cases) { @@ -131,7 +130,7 @@ public static SwitchCaseSpec caseOf(Predicate when, Class whenClass * @param predicate input type * @return a fluent builder to set the consequent action (e.g., {@code then("taskName")}) */ - public static SwitchCaseSpec caseOf(Predicate when) { + public static SwitchCaseSpec caseOf(SerializablePredicate when) { return OPS.caseOf(when); } @@ -213,12 +212,13 @@ public static FuncListenSpec toAny(String... types) { * @return a consumer to configure {@link FuncEmitTaskBuilder} */ public static Consumer event( - String type, Function function) { - return OPS.event(type, function); + String type, SerializableFunction function) { + return OPS.event(type, function, ReflectionUtils.inferInputType(function)); } /** - * Same as {@link #event(String, Function)} but with an explicit input class to guide conversion. + * Same as {@link #event(String, SerializableFunction)} but with an explicit input class to guide + * conversion. * * @param type CloudEvent type * @param function function that maps workflow input to {@link CloudEventData} @@ -279,20 +279,6 @@ public static FuncPredicateEventConfigurer event(String type) { return OPS.event(type); } - /** - * Create a {@link FuncCallStep} that calls a simple Java {@link Function} with explicit input - * type. - * - * @param fn the function to execute at runtime - * @param inputClass expected input class for model conversion - * @param input type - * @param result type - * @return a call step which supports chaining (e.g., {@code .exportAs(...).when(...)}) - */ - public static FuncCallStep function(Function fn, Class inputClass) { - return new FuncCallStep<>(fn, inputClass); - } - /** * Build a call step for functions that need {@link WorkflowContextData} as the first parameter. * The DSL wraps it as a {@link JavaContextFunction} and injects the runtime context. @@ -309,21 +295,8 @@ public static FuncCallStep withContext(JavaContextFunction fn return withContext(null, fn, in); } - /** - * Build a call step for functions that expect the workflow instance ID as the first parameter. - * The instance ID is extracted from the runtime context. - * - *

Signature expected: {@code (instanceId, payload) -> result} - * - * @param fn instance-id-aware function - * @param in payload input class - * @param input type - * @param result type - * @return a call step - */ - public static FuncCallStep withInstanceId( - InstanceIdBiFunction fn, Class in) { - return withInstanceId(null, fn, in); + public static FuncCallStep withContext(JavaContextFunction fn) { + return withContext(null, fn, ReflectionUtils.inferInputType(fn)); } /** @@ -341,6 +314,10 @@ public static FuncCallStep withContext( return new FuncCallStep<>(name, fn, in); } + public static FuncCallStep withContext(String name, JavaContextFunction fn) { + return new FuncCallStep<>(name, fn, ReflectionUtils.inferInputType(fn)); + } + /** * Build a call step for functions that need {@link WorkflowContextData} and {@link * TaskContextData} as the first and second parameter. The DSL wraps it as a {@link @@ -373,8 +350,16 @@ public static FuncCallStep withFilter( return new FuncCallStep<>(name, fn, in); } + public static FuncCallStep withFilter(JavaFilterFunction fn) { + return withFilter(null, fn, ReflectionUtils.inferInputType(fn)); + } + + public static FuncCallStep withFilter(String name, JavaFilterFunction fn) { + return withFilter(name, fn, ReflectionUtils.inferInputType(fn)); + } + /** - * Named variant of {@link #withInstanceId(InstanceIdBiFunction, Class)}. + * Named variant of {@link #withInstanceId(InstanceIdFunction, Class)}. * * @param name task name * @param fn instance-id-aware function @@ -384,11 +369,35 @@ public static FuncCallStep withFilter( * @return a named call step */ public static FuncCallStep withInstanceId( - String name, InstanceIdBiFunction fn, Class in) { + String name, InstanceIdFunction fn, Class in) { JavaContextFunction jcf = (payload, wctx) -> fn.apply(wctx.instanceData().id(), payload); return new FuncCallStep<>(name, jcf, in); } + /** + * Build a call step for functions that expect the workflow instance ID as the first parameter. + * The instance ID is extracted from the runtime context. + * + *

Signature expected: {@code (instanceId, payload) -> result} + * + * @param fn instance-id-aware function + * @param in payload input class + * @param input type + * @param result type + * @return a call step + */ + public static FuncCallStep withInstanceId(InstanceIdFunction fn, Class in) { + return withInstanceId(null, fn, in); + } + + public static FuncCallStep withInstanceId(String name, InstanceIdFunction fn) { + return withInstanceId(name, fn, ReflectionUtils.inferInputType(fn)); + } + + public static FuncCallStep withInstanceId(InstanceIdFunction fn) { + return withInstanceId(null, fn, ReflectionUtils.inferInputType(fn)); + } + /** * Builds a composition of the current workflow instance id and the definition of the task * position as a JSON pointer, used as a stable "unique id" for the task. @@ -422,6 +431,10 @@ public static FuncCallStep withUniqueId( return new FuncCallStep<>(name, jff, in); } + public static FuncCallStep withUniqueId(String name, UniqueIdBiFunction fn) { + return withUniqueId(name, fn, ReflectionUtils.inferInputType(fn)); + } + /** * Variant of {@link #withUniqueId(String, UniqueIdBiFunction, Class)} without an explicit task * name. @@ -436,6 +449,10 @@ public static FuncCallStep withUniqueId(UniqueIdBiFunction fn return withUniqueId(null, fn, in); } + public static FuncCallStep withUniqueId(UniqueIdBiFunction fn) { + return withUniqueId(null, fn, ReflectionUtils.inferInputType(fn)); + } + /** * Create a fire-and-forget side-effect step (unnamed). The consumer receives the typed input. * @@ -449,6 +466,10 @@ public static ConsumeStep consume(Consumer consumer, Class inputCla return new ConsumeStep<>(consumer, inputClass); } + public static ConsumeStep consume(SerializableConsumer consumer) { + return consume(consumer, ReflectionUtils.inferInputType(consumer)); + } + /** * Named variant of {@link #consume(Consumer, Class)}. * @@ -462,6 +483,10 @@ public static ConsumeStep consume(String name, Consumer consumer, Clas return new ConsumeStep<>(name, consumer, inputClass); } + public static ConsumeStep consume(String name, SerializableConsumer consumer) { + return consume(name, consumer, ReflectionUtils.inferInputType(consumer)); + } + /** * Agent-style sugar for methods that receive a "memory id" as first parameter. The DSL uses a * derived unique id composed of the workflow instance id and the task position (JSON pointer), @@ -479,6 +504,10 @@ public static FuncCallStep agent(UniqueIdBiFunction fn, Class return withUniqueId(fn, in); } + public static FuncCallStep agent(UniqueIdBiFunction fn) { + return withUniqueId(fn, ReflectionUtils.inferInputType(fn)); + } + /** * Named agent-style sugar. See {@link #agent(UniqueIdBiFunction, Class)}. * @@ -496,6 +525,24 @@ public static FuncCallStep agent( return withUniqueId(name, fn, in); } + public static FuncCallStep agent(String name, UniqueIdBiFunction fn) { + return withUniqueId(name, fn, ReflectionUtils.inferInputType(fn)); + } + + /** + * Create a {@link FuncCallStep} that calls a simple Java {@link Function} with explicit input + * type. + * + * @param fn the function to execute at runtime + * @param inputClass expected input class for model conversion + * @param input type + * @param result type + * @return a call step which supports chaining (e.g., {@code .exportAs(...).when(...)}) + */ + public static FuncCallStep function(Function fn, Class inputClass) { + return new FuncCallStep<>(fn, inputClass); + } + /** * Create a {@link FuncCallStep} that invokes a plain Java {@link Function} with inferred input * type. @@ -505,13 +552,13 @@ public static FuncCallStep agent( * @param output type * @return a call step */ - public static FuncCallStep function(Function fn) { + public static FuncCallStep function(SerializableFunction fn) { Class inputClass = ReflectionUtils.inferInputType(fn); return new FuncCallStep<>(fn, inputClass); } /** - * Named variant of {@link #function(Function)} with inferred input type. + * Named variant of {@link #function(SerializableFunction)} with inferred input type. * * @param name task name * @param fn the function to execute @@ -519,7 +566,7 @@ public static FuncCallStep function(Function fn) { * @param output type * @return a named call step */ - public static FuncCallStep function(String name, Function fn) { + public static FuncCallStep function(String name, SerializableFunction fn) { Class inputClass = ReflectionUtils.inferInputType(fn); return new FuncCallStep<>(name, fn, inputClass); } @@ -584,12 +631,12 @@ public static EmitStep emit(String name, Consumer cfg) { * @param input type * @return an {@link EmitStep} */ - public static EmitStep emit(String type, Function fn) { + public static EmitStep emit(String type, SerializableFunction fn) { return new EmitStep(null, event(type, fn)); } /** - * Named variant of {@link #emit(String, Function)}. + * Named variant of {@link #emit(String, SerializableFunction)}. * * @param name task name * @param type CloudEvent type @@ -597,7 +644,8 @@ public static EmitStep emit(String type, Function fn) { * @param input type * @return a named {@link EmitStep} */ - public static EmitStep emit(String name, String type, Function fn) { + public static EmitStep emit( + String name, String type, SerializableFunction fn) { return new EmitStep(name, event(type, fn)); } @@ -678,7 +726,7 @@ public static ListenStep listen(String name, FuncListenSpec spec) { /** * Low-level switch case configurer using a custom builder consumer. Prefer the {@link - * #caseOf(Predicate)} helpers when possible. + * #caseOf(SerializablePredicate)} helpers when possible. * * @param taskName optional task name * @param switchCase consumer to configure the {@link FuncSwitchTaskBuilder} @@ -701,7 +749,7 @@ public static FuncTaskConfigurer switchCase(Consumer swit /** * Convenience to apply multiple {@link SwitchCaseConfigurer} built via {@link - * #caseOf(Predicate)}. + * #caseOf(SerializablePredicate)}. * * @param cases case configurers * @return list configurer @@ -773,6 +821,11 @@ public static FuncTaskConfigurer switchWhenOrElse( FuncDSL.cases(caseOf(pred, predClass).then(thenTask), caseDefault(otherwise))); } + public static FuncTaskConfigurer switchWhenOrElse( + SerializablePredicate pred, String thenTask, FlowDirectiveEnum otherwise) { + return switchWhenOrElse(pred, thenTask, otherwise, ReflectionUtils.inferInputType(pred)); + } + /** * Sugar for a single-case switch with a default task fallback. * @@ -789,6 +842,11 @@ public static FuncTaskConfigurer switchWhenOrElse( list.switchCase(cases(caseOf(pred, predClass).then(thenTask), caseDefault(otherwiseTask))); } + public static FuncTaskConfigurer switchWhenOrElse( + SerializablePredicate pred, String thenTask, String otherwiseTask) { + return switchWhenOrElse(pred, thenTask, otherwiseTask, ReflectionUtils.inferInputType(pred)); + } + /** * JQ-based condition: if the JQ expression evaluates truthy → jump to {@code thenTask}, otherwise * follow the {@link FlowDirectiveEnum} given in {@code otherwise}. diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEventFilterSpec.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEventFilterSpec.java index 1e254467f..a7921273f 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEventFilterSpec.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/FuncEventFilterSpec.java @@ -34,7 +34,7 @@ public abstract class FuncEventFilterSpec } /** Sets the event data and the contentType to `application/json` */ - public SELF jsonData(Function function) { + public SELF jsonData(SerializableFunction function) { Class clazz = ReflectionUtils.inferInputType(function); addStep(e -> e.data(new EventDataFunction().withFunction(function, clazz))); return JSON(); diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/InstanceIdBiFunction.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/InstanceIdFunction.java similarity index 90% rename from experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/InstanceIdBiFunction.java rename to experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/InstanceIdFunction.java index e74876c98..8a9fe2916 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/InstanceIdBiFunction.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/InstanceIdFunction.java @@ -15,6 +15,8 @@ */ package io.serverlessworkflow.fluent.func.dsl; +import java.io.Serializable; + /** * Functions that expect a workflow instance ID injection in runtime * @@ -22,6 +24,6 @@ * @param The task result output */ @FunctionalInterface -public interface InstanceIdBiFunction { +public interface InstanceIdFunction extends Serializable { R apply(String instanceId, T payload); } diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/ReflectionUtils.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/ReflectionUtils.java index d9f3793a1..34617d787 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/ReflectionUtils.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/ReflectionUtils.java @@ -15,10 +15,11 @@ */ package io.serverlessworkflow.fluent.func.dsl; -import java.lang.invoke.MethodHandleInfo; +import io.serverlessworkflow.api.types.func.JavaContextFunction; +import io.serverlessworkflow.api.types.func.JavaFilterFunction; import java.lang.invoke.MethodType; +import java.lang.invoke.SerializedLambda; import java.lang.reflect.Method; -import java.util.Optional; import java.util.function.Function; /** @@ -26,57 +27,75 @@ * * @see Serialize a Lambda in Java */ -public final class ReflectionUtils { +final class ReflectionUtils { private ReflectionUtils() {} @SuppressWarnings("unchecked") - static Optional> safeInferInputType(Function fn) { + static Class inferInputType(JavaContextFunction fn) { + return throwIllegalStateIfNull((Class) inferInputTypeFromAny(fn, 0)); + } + + @SuppressWarnings("unchecked") + static Class inferInputType(JavaFilterFunction fn) { + return throwIllegalStateIfNull((Class) inferInputTypeFromAny(fn, 0)); + } + + @SuppressWarnings("unchecked") + static Class inferInputType(SerializableFunction fn) { + return throwIllegalStateIfNull((Class) inferInputTypeFromAny(fn, 0)); + } + + @SuppressWarnings("unchecked") + static Class inferInputType(SerializablePredicate fn) { + return throwIllegalStateIfNull((Class) inferInputTypeFromAny(fn, 0)); + } + + @SuppressWarnings("unchecked") + static Class inferInputType(InstanceIdFunction fn) { + return throwIllegalStateIfNull((Class) inferInputTypeFromAny(fn, 1)); + } + + @SuppressWarnings("unchecked") + static Class inferInputType(UniqueIdBiFunction fn) { + return throwIllegalStateIfNull((Class) inferInputTypeFromAny(fn, 1)); + } + + @SuppressWarnings("unchecked") + static Class inferInputType(SerializableConsumer fn) { + return throwIllegalStateIfNull((Class) inferInputTypeFromAny(fn, 0)); + } + + /** + * Extracts the input type using the resolved interface signature. * @param fn The serializable + * lambda + * + * @param lambdaParamIndex The index of the payload parameter in the interface's apply method + */ + private static Class inferInputTypeFromAny(Object fn, int lambdaParamIndex) { try { Method m = fn.getClass().getDeclaredMethod("writeReplace"); m.setAccessible(true); - java.lang.invoke.SerializedLambda sl = (java.lang.invoke.SerializedLambda) m.invoke(fn); + SerializedLambda sl = (SerializedLambda) m.invoke(fn); - // Owner class of the referenced implementation - String ownerName = sl.getImplClass().replace('/', '.'); ClassLoader cl = fn.getClass().getClassLoader(); - Class owner = Class.forName(ownerName, false, cl); - - // Parse the impl method descriptor to get raw param types - MethodType mt = MethodType.fromMethodDescriptorString(sl.getImplMethodSignature(), cl); - Class[] params = mt.parameterArray(); - int kind = sl.getImplMethodKind(); - - switch (kind) { - case MethodHandleInfo.REF_invokeStatic: - case MethodHandleInfo.REF_newInvokeSpecial: - // static method or constructor: T is the first parameter - return params.length >= 1 ? Optional.of((Class) params[0]) : Optional.empty(); - - case MethodHandleInfo.REF_invokeVirtual: - case MethodHandleInfo.REF_invokeInterface: - case MethodHandleInfo.REF_invokeSpecial: - // instance method ref like Foo::bar - // For Function, if bar has no params, T is the receiver type (owner). - // If bar has one param, that pattern would usually map to BiFunction, not Function, - // but keep a defensive branch anyway: - return (params.length == 0) - ? Optional.of((Class) owner) - : Optional.of((Class) params[0]); - - default: - return Optional.empty(); - } + + // getInstantiatedMethodType() provides the exact generic signature resolved + // by the compiler, completely bypassing captured variables and method kind switches! + MethodType mt = MethodType.fromMethodDescriptorString(sl.getInstantiatedMethodType(), cl); + + return mt.parameterArray()[lambdaParamIndex]; + } catch (Exception ignore) { - return Optional.empty(); + return null; } } - public static Class inferInputType(Function fn) { - return safeInferInputType(fn) - .orElseThrow( - () -> - new IllegalStateException( - "Cannot infer input type from lambda. Pass Class or use a method reference.")); + private static Class throwIllegalStateIfNull(Class clazz) { + if (clazz == null) { + throw new IllegalStateException( + "Cannot infer input type from lambda. Pass Class or use a method reference."); + } + return clazz; } } diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/SerializableConsumer.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/SerializableConsumer.java new file mode 100644 index 000000000..0de743402 --- /dev/null +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/SerializableConsumer.java @@ -0,0 +1,22 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.dsl; + +import java.io.Serializable; +import java.util.function.Consumer; + +@FunctionalInterface +public interface SerializableConsumer extends Consumer, Serializable {} diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/SerializableFunction.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/SerializableFunction.java new file mode 100644 index 000000000..e5d48d1d2 --- /dev/null +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/SerializableFunction.java @@ -0,0 +1,29 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.dsl; + +import java.io.Serializable; +import java.util.function.Function; + +/** + * Alternative to Function for our DSL to discover the input parameter class in runtime via + * reflection. + * + * @param + * @param + */ +@FunctionalInterface +public interface SerializableFunction extends Function, Serializable {} diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/SerializablePredicate.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/SerializablePredicate.java new file mode 100644 index 000000000..62d36d018 --- /dev/null +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/SerializablePredicate.java @@ -0,0 +1,22 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.fluent.func.dsl; + +import java.io.Serializable; +import java.util.function.Predicate; + +@FunctionalInterface +public interface SerializablePredicate extends Predicate, Serializable {} diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/UniqueIdBiFunction.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/UniqueIdBiFunction.java index 229ed7160..f1f0d7880 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/UniqueIdBiFunction.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/UniqueIdBiFunction.java @@ -15,6 +15,7 @@ */ package io.serverlessworkflow.fluent.func.dsl; +import java.io.Serializable; import java.util.function.BiFunction; /** @@ -25,6 +26,6 @@ * @param The task result output */ @FunctionalInterface -public interface UniqueIdBiFunction extends BiFunction { +public interface UniqueIdBiFunction extends BiFunction, Serializable { R apply(String uniqueId, T object); } diff --git a/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLUniqueIdTest.java b/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLUniqueIdTest.java index fe7a9b07d..7797556e8 100644 --- a/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLUniqueIdTest.java +++ b/experimental/fluent/func/src/test/java/io/serverlessworkflow/fluent/func/FuncDSLUniqueIdTest.java @@ -53,7 +53,7 @@ private static JavaFilterFunction extractJavaFilterFunction(Call @Test @DisplayName( "withUniqueId(name, fn, in) composes uniqueId = instanceId-jsonPointer and passes it") - void withUniqueId_uses_json_pointer_for_unique_id() throws Exception { + void withUniqueId_uses_json_pointer_for_unique_id() { AtomicReference receivedUniqueId = new AtomicReference<>(); AtomicReference receivedPayload = new AtomicReference<>(); @@ -104,7 +104,7 @@ void withUniqueId_uses_json_pointer_for_unique_id() throws Exception { @Test @DisplayName("agent(fn, in) composes uniqueId = instanceId-jsonPointer and passes it") - void agent_uses_json_pointer_for_unique_id() throws Exception { + void agent_uses_json_pointer_for_unique_id() { AtomicReference receivedUniqueId = new AtomicReference<>(); AtomicReference receivedPayload = new AtomicReference<>(); diff --git a/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/FuncDSLReflectionsTest.java b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/FuncDSLReflectionsTest.java new file mode 100644 index 000000000..bdb959390 --- /dev/null +++ b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/FuncDSLReflectionsTest.java @@ -0,0 +1,186 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverless.workflow.impl.executors.func; + +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.agent; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.function; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.switchWhenOrElse; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.withContext; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.withFilter; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.withInstanceId; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.serverlessworkflow.api.types.FlowDirectiveEnum; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder; +import io.serverlessworkflow.impl.TaskContextData; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowContextData; +import java.util.Optional; +import org.junit.jupiter.api.Test; + +public class FuncDSLReflectionsTest { + + @Test + void check_serializable_function() { + Workflow wf = + FuncWorkflowBuilder.workflow("strip-function").tasks(function(String::strip)).build(); + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + Optional output = + app.workflowDefinition(wf).instance("Hello World! ").start().join().asText(); + assertTrue(output.isPresent()); + assertEquals("Hello World!", output.get()); + } + } + + @Test + void check_serializable_function_with_non_serializable_capture() { + // 1. Create a clearly non-serializable object + class NonSerializableService { + String appendSuffix(String text) { + return text + " - Processed successfully"; + } + } + + NonSerializableService service = new NonSerializableService(); + + Workflow wf = + FuncWorkflowBuilder.workflow("capture-function") + .tasks(function(service::appendSuffix)) + .build(); + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + Optional output = + app.workflowDefinition(wf).instance("Test Input").start().join().asText(); + + assertTrue(output.isPresent()); + assertEquals("Test Input - Processed successfully", output.get()); + } + } + + @Test + void check_serializable_predicate_switch() { + Workflow wf = + FuncWorkflowBuilder.workflow("predicate-test") + .tasks( + // Infers Integer.class automatically + switchWhenOrElse((Integer v) -> v > 10, "highValueTask", FlowDirectiveEnum.END), + // Only executes if > 10 + function("highValueTask", (Integer v) -> v * 2)) + .build(); + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + // Test True path + Optional highOutput = + app.workflowDefinition(wf).instance(15).start().join().as(Integer.class); + + assertTrue(highOutput.isPresent()); + assertEquals(30, highOutput.get()); + + // Test False path (ends immediately, returning original input) + Optional lowOutput = + app.workflowDefinition(wf).instance(5).start().join().as(Integer.class); + + assertTrue(lowOutput.isPresent()); + assertEquals(5, lowOutput.get()); + } + } + + @Test + void check_serializable_unique_id_bifunction() { + Workflow wf = + FuncWorkflowBuilder.workflow("agent-unique-id-test") + .tasks( + // Infers String.class for the payload (the second parameter) + agent( + (String uniqueId, Integer payload) -> + "ID=[" + uniqueId + "] Payload=[" + payload + "]")) + .build(); + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + Optional output = app.workflowDefinition(wf).instance(123).start().join().asText(); + + assertTrue(output.isPresent()); + // The uniqueId should contain the workflow instance ID and the JSON pointer + assertTrue(output.get().contains("ID=[")); + assertTrue(output.get().contains("] Payload=[123]")); + } + } + + @Test + void check_serializable_java_context_function() { + Workflow wf = + FuncWorkflowBuilder.workflow("context-function-test") + .tasks( + // Infers String.class for the payload (the first parameter) + withContext( + (String payload, WorkflowContextData wctx) -> + payload + " processed by " + wctx.instanceData().id())) + .build(); + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + Optional output = + app.workflowDefinition(wf).instance("Context Data").start().join().asText(); + + assertTrue(output.isPresent()); + assertTrue(output.get().startsWith("Context Data processed by ")); + } + } + + @Test + void check_serializable_java_filter_function() { + Workflow wf = + FuncWorkflowBuilder.workflow("filter-function-test") + .tasks( + // Infers String.class for the payload (the first parameter) + withFilter( + (String payload, WorkflowContextData wctx, TaskContextData tctx) -> + payload + " at position " + tctx.position().jsonPointer())) + .build(); + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + Optional output = + app.workflowDefinition(wf).instance("Filter Data").start().join().asText(); + + assertTrue(output.isPresent()); + // It should append the task JSON pointer (likely "/tasks/0" or similar depending on spec) + assertTrue(output.get().startsWith("Filter Data at position do/")); + } + } + + @Test + void check_serializable_instance_id_function() { + Workflow wf = + FuncWorkflowBuilder.workflow("instance-id-test") + .tasks( + // Infers Integer.class for the payload (the second parameter) + withInstanceId( + "", + (String instanceId, Integer payload) -> + "Instance=[" + instanceId + "] Payload=[" + payload + "]")) + .build(); + + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + Optional output = app.workflowDefinition(wf).instance(456).start().join().asText(); + + assertTrue(output.isPresent()); + // The instanceId should be populated automatically by the workflow runtime + assertTrue(output.get().contains("Instance=[")); + assertTrue(output.get().contains("] Payload=[456]")); + } + } +} diff --git a/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/WorkflowThenTest.java b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/WorkflowThenTest.java index b84b27e98..7940eee71 100644 --- a/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/WorkflowThenTest.java +++ b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/WorkflowThenTest.java @@ -40,7 +40,7 @@ void consume_then_skips_next_task_and_jumps_to_target() { Workflow wf = FuncWorkflowBuilder.workflow("intelligent-newsletter") .tasks( - consume("sendNewsletter", input -> log.debug("Consuming: {}", input), Object.class) + consume("sendNewsletter", input -> log.debug("Consuming: {}", input)) .then("otherTask"), function("nextTask", v -> "nextTask: " + v, String.class), function("otherTask", v -> "otherTask: " + v, String.class)) diff --git a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/JavaContextFunction.java b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/JavaContextFunction.java index e74a090cb..aae14302b 100644 --- a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/JavaContextFunction.java +++ b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/JavaContextFunction.java @@ -16,8 +16,9 @@ package io.serverlessworkflow.api.types.func; import io.serverlessworkflow.impl.WorkflowContextData; +import java.io.Serializable; @FunctionalInterface -public interface JavaContextFunction { +public interface JavaContextFunction extends Serializable { R apply(T object, WorkflowContextData workflowContext); } diff --git a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/JavaFilterFunction.java b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/JavaFilterFunction.java index f6b9d3787..7bbb8e337 100644 --- a/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/JavaFilterFunction.java +++ b/experimental/types/src/main/java/io/serverlessworkflow/api/types/func/JavaFilterFunction.java @@ -17,8 +17,9 @@ import io.serverlessworkflow.impl.TaskContextData; import io.serverlessworkflow.impl.WorkflowContextData; +import java.io.Serializable; @FunctionalInterface -public interface JavaFilterFunction { +public interface JavaFilterFunction extends Serializable { R apply(T object, WorkflowContextData workflowContext, TaskContextData taskContext); }