Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion experimental/fluent/func/pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.serverlessworkflow</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.fluent.func.dsl.internal;
package io.serverlessworkflow.fluent.func.dsl;

import io.cloudevents.CloudEventData;
import io.serverlessworkflow.api.types.FlowDirectiveEnum;
Expand All @@ -22,19 +22,17 @@
import io.serverlessworkflow.fluent.func.FuncSwitchTaskBuilder;
import io.serverlessworkflow.fluent.func.configurers.FuncPredicateEventConfigurer;
import io.serverlessworkflow.fluent.func.configurers.SwitchCaseConfigurer;
import io.serverlessworkflow.fluent.func.dsl.ReflectionUtils;
import io.serverlessworkflow.fluent.func.dsl.SwitchCaseSpec;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;

public interface CommonFuncOps {
interface CommonFuncOps {

default <T, V> Consumer<FuncCallTaskBuilder> fn(Function<T, V> function, Class<T> argClass) {
return f -> f.function(function, argClass);
}

default <T, V> Consumer<FuncCallTaskBuilder> fn(Function<T, V> function) {
default <T, V> Consumer<FuncCallTaskBuilder> fn(SerializableFunction<T, V> function) {
Class<T> clazz = ReflectionUtils.inferInputType(function);
return f -> f.function(function, clazz);
}
Expand All @@ -51,8 +49,8 @@ default <T> SwitchCaseSpec<T> caseOf(Predicate<T> when, Class<T> whenClass) {
return new SwitchCaseSpec<T>().when(when, whenClass);
}

default <T> SwitchCaseSpec<T> caseOf(Predicate<T> when) {
return new SwitchCaseSpec<T>().when(when);
default <T> SwitchCaseSpec<T> caseOf(SerializablePredicate<T> when) {
return new SwitchCaseSpec<T>().when(when, ReflectionUtils.inferInputType(when));
}

default SwitchCaseConfigurer caseDefault(String task) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.serverlessworkflow.fluent.func.configurers.FuncPredicateEventConfigurer;
import io.serverlessworkflow.fluent.func.configurers.FuncTaskConfigurer;
import io.serverlessworkflow.fluent.func.configurers.SwitchCaseConfigurer;
import io.serverlessworkflow.fluent.func.dsl.internal.CommonFuncOps;
import io.serverlessworkflow.fluent.spec.configurers.AuthenticationConfigurer;
import io.serverlessworkflow.impl.TaskContextData;
import io.serverlessworkflow.impl.WorkflowContextData;
Expand Down Expand Up @@ -97,15 +96,15 @@ public static <T, V> Consumer<FuncCallTaskBuilder> fn(
* @param <V> output type
* @return a consumer that configures a {@code FuncCallTaskBuilder}
*/
public static <T, V> Consumer<FuncCallTaskBuilder> fn(Function<T, V> function) {
return f -> f.function(function);
public static <T, V> Consumer<FuncCallTaskBuilder> fn(SerializableFunction<T, V> function) {
return f -> f.function(function, ReflectionUtils.inferInputType(function));
}

/**
* Compose multiple switch cases into a single configurer for {@link FuncSwitchTaskBuilder}.
*
* @param cases one or more {@link SwitchCaseConfigurer} built via {@link #caseOf(Predicate)} or
* {@link #caseDefault(String)}
* @param cases one or more {@link SwitchCaseConfigurer} built via {@link
* #caseOf(SerializablePredicate)} or {@link #caseDefault(String)}
* @return a consumer to apply on a switch task builder
*/
public static Consumer<FuncSwitchTaskBuilder> cases(SwitchCaseConfigurer... cases) {
Expand All @@ -131,7 +130,7 @@ public static <T> SwitchCaseSpec<T> caseOf(Predicate<T> when, Class<T> whenClass
* @param <T> predicate input type
* @return a fluent builder to set the consequent action (e.g., {@code then("taskName")})
*/
public static <T> SwitchCaseSpec<T> caseOf(Predicate<T> when) {
public static <T> SwitchCaseSpec<T> caseOf(SerializablePredicate<T> when) {
return OPS.caseOf(when);
}

Expand Down Expand Up @@ -213,12 +212,13 @@ public static FuncListenSpec toAny(String... types) {
* @return a consumer to configure {@link FuncEmitTaskBuilder}
*/
public static <T> Consumer<FuncEmitTaskBuilder> event(
String type, Function<T, CloudEventData> function) {
return OPS.event(type, function);
String type, SerializableFunction<T, CloudEventData> function) {
return OPS.event(type, function, ReflectionUtils.inferInputType(function));
}

/**
* Same as {@link #event(String, Function)} but with an explicit input class to guide conversion.
* Same as {@link #event(String, SerializableFunction)} but with an explicit input class to guide
* conversion.
*
* @param type CloudEvent type
* @param function function that maps workflow input to {@link CloudEventData}
Expand Down Expand Up @@ -279,20 +279,6 @@ public static FuncPredicateEventConfigurer event(String type) {
return OPS.event(type);
}

/**
* Create a {@link FuncCallStep} that calls a simple Java {@link Function} with explicit input
* type.
*
* @param fn the function to execute at runtime
* @param inputClass expected input class for model conversion
* @param <T> input type
* @param <R> result type
* @return a call step which supports chaining (e.g., {@code .exportAs(...).when(...)})
*/
public static <T, R> FuncCallStep<T, R> function(Function<T, R> fn, Class<T> inputClass) {
return new FuncCallStep<>(fn, inputClass);
}

/**
* Build a call step for functions that need {@link WorkflowContextData} as the first parameter.
* The DSL wraps it as a {@link JavaContextFunction} and injects the runtime context.
Expand All @@ -309,6 +295,10 @@ public static <T, R> FuncCallStep<T, R> withContext(JavaContextFunction<T, R> fn
return withContext(null, fn, in);
}

public static <T, R> FuncCallStep<T, R> withContext(JavaContextFunction<T, R> fn) {
return withContext(null, fn, ReflectionUtils.inferInputType(fn));
}

/**
* Build a call step for functions that expect the workflow instance ID as the first parameter.
* The instance ID is extracted from the runtime context.
Expand Down Expand Up @@ -341,6 +331,10 @@ public static <T, R> FuncCallStep<T, R> withContext(
return new FuncCallStep<>(name, fn, in);
}

public static <T, R> FuncCallStep<T, R> withContext(String name, JavaContextFunction<T, R> fn) {
return new FuncCallStep<>(name, fn, ReflectionUtils.inferInputType(fn));
}

/**
* Build a call step for functions that need {@link WorkflowContextData} and {@link
* TaskContextData} as the first and second parameter. The DSL wraps it as a {@link
Expand Down Expand Up @@ -373,6 +367,14 @@ public static <T, R> FuncCallStep<T, R> withFilter(
return new FuncCallStep<>(name, fn, in);
}

public static <T, R> FuncCallStep<T, R> withFilter(JavaFilterFunction<T, R> fn) {
return withFilter(null, fn, ReflectionUtils.inferInputType(fn));
}

public static <T, R> FuncCallStep<T, R> withFilter(String name, JavaFilterFunction<T, R> fn) {
return withFilter(name, fn, ReflectionUtils.inferInputType(fn));
}

/**
* Named variant of {@link #withInstanceId(InstanceIdBiFunction, Class)}.
*
Expand All @@ -389,6 +391,11 @@ public static <T, R> FuncCallStep<T, R> withInstanceId(
return new FuncCallStep<>(name, jcf, in);
}

public static <T, R> FuncCallStep<T, R> withInstanceId(
String name, InstanceIdBiFunction<T, R> fn) {
return withInstanceId(name, fn, ReflectionUtils.inferInputType(fn));
}

/**
* Builds a composition of the current workflow instance id and the definition of the task
* position as a JSON pointer, used as a stable "unique id" for the task.
Expand Down Expand Up @@ -422,6 +429,10 @@ public static <T, R> FuncCallStep<T, R> withUniqueId(
return new FuncCallStep<>(name, jff, in);
}

public static <T, R> FuncCallStep<T, R> withUniqueId(String name, UniqueIdBiFunction<T, R> fn) {
return withUniqueId(name, fn, ReflectionUtils.inferInputType(fn));
}

/**
* Variant of {@link #withUniqueId(String, UniqueIdBiFunction, Class)} without an explicit task
* name.
Expand All @@ -436,6 +447,10 @@ public static <T, R> FuncCallStep<T, R> withUniqueId(UniqueIdBiFunction<T, R> fn
return withUniqueId(null, fn, in);
}

public static <T, R> FuncCallStep<T, R> withUniqueId(UniqueIdBiFunction<T, R> fn) {
return withUniqueId(null, fn, ReflectionUtils.inferInputType(fn));
}

/**
* Create a fire-and-forget side-effect step (unnamed). The consumer receives the typed input.
*
Expand All @@ -449,6 +464,10 @@ public static <T> ConsumeStep<T> consume(Consumer<T> consumer, Class<T> inputCla
return new ConsumeStep<>(consumer, inputClass);
}

public static <T> ConsumeStep<T> consume(SerializableConsumer<T> consumer) {
return consume(consumer, ReflectionUtils.inferInputType(consumer));
}

/**
* Named variant of {@link #consume(Consumer, Class)}.
*
Expand All @@ -462,6 +481,10 @@ public static <T> ConsumeStep<T> consume(String name, Consumer<T> consumer, Clas
return new ConsumeStep<>(name, consumer, inputClass);
}

public static <T> ConsumeStep<T> consume(String name, SerializableConsumer<T> consumer) {
return consume(name, consumer, ReflectionUtils.inferInputType(consumer));
}

/**
* Agent-style sugar for methods that receive a "memory id" as first parameter. The DSL uses a
* derived unique id composed of the workflow instance id and the task position (JSON pointer),
Expand All @@ -479,6 +502,10 @@ public static <T, R> FuncCallStep<T, R> agent(UniqueIdBiFunction<T, R> fn, Class
return withUniqueId(fn, in);
}

public static <T, R> FuncCallStep<T, R> agent(UniqueIdBiFunction<T, R> fn) {
return withUniqueId(fn, ReflectionUtils.inferInputType(fn));
}

/**
* Named agent-style sugar. See {@link #agent(UniqueIdBiFunction, Class)}.
*
Expand All @@ -496,6 +523,24 @@ public static <T, R> FuncCallStep<T, R> agent(
return withUniqueId(name, fn, in);
}

public static <T, R> FuncCallStep<T, R> agent(String name, UniqueIdBiFunction<T, R> fn) {
return withUniqueId(name, fn, ReflectionUtils.inferInputType(fn));
}

/**
* Create a {@link FuncCallStep} that calls a simple Java {@link Function} with explicit input
* type.
*
* @param fn the function to execute at runtime
* @param inputClass expected input class for model conversion
* @param <T> input type
* @param <R> result type
* @return a call step which supports chaining (e.g., {@code .exportAs(...).when(...)})
*/
public static <T, R> FuncCallStep<T, R> function(Function<T, R> fn, Class<T> inputClass) {
return new FuncCallStep<>(fn, inputClass);
}

/**
* Create a {@link FuncCallStep} that invokes a plain Java {@link Function} with inferred input
* type.
Expand All @@ -505,21 +550,21 @@ public static <T, R> FuncCallStep<T, R> agent(
* @param <R> output type
* @return a call step
*/
public static <T, R> FuncCallStep<T, R> function(Function<T, R> fn) {
public static <T, R> FuncCallStep<T, R> function(SerializableFunction<T, R> fn) {
Class<T> inputClass = ReflectionUtils.inferInputType(fn);
return new FuncCallStep<>(fn, inputClass);
}
Comment on lines +553 to 556
Copy link

Copilot AI Mar 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing the inferred-type overload from function(Function<T,R>) to function(SerializableFunction<T,R>) is a source-breaking API change for callers that hold functions in java.util.function.Function variables (or return them from APIs). If backward compatibility is desired, consider keeping an overload that accepts Function and either (a) falls back to requiring an explicit Class<T> when inference isn't possible, or (b) throws a targeted error message explaining the need for a SerializableFunction/method reference.

Copilot uses AI. Check for mistakes.

/**
* Named variant of {@link #function(Function)} with inferred input type.
* Named variant of {@link #function(SerializableFunction)} with inferred input type.
*
* @param name task name
* @param fn the function to execute
* @param <T> input type
* @param <R> output type
* @return a named call step
*/
public static <T, R> FuncCallStep<T, R> function(String name, Function<T, R> fn) {
public static <T, R> FuncCallStep<T, R> function(String name, SerializableFunction<T, R> fn) {
Class<T> inputClass = ReflectionUtils.inferInputType(fn);
return new FuncCallStep<>(name, fn, inputClass);
}
Expand Down Expand Up @@ -584,20 +629,21 @@ public static EmitStep emit(String name, Consumer<FuncEmitTaskBuilder> cfg) {
* @param <T> input type
* @return an {@link EmitStep}
*/
public static <T> EmitStep emit(String type, Function<T, CloudEventData> fn) {
public static <T> EmitStep emit(String type, SerializableFunction<T, CloudEventData> fn) {
return new EmitStep(null, event(type, fn));
}

/**
* Named variant of {@link #emit(String, Function)}.
* Named variant of {@link #emit(String, SerializableFunction)}.
*
* @param name task name
* @param type CloudEvent type
* @param fn function producing {@link CloudEventData}
* @param <T> input type
* @return a named {@link EmitStep}
*/
public static <T> EmitStep emit(String name, String type, Function<T, CloudEventData> fn) {
public static <T> EmitStep emit(
String name, String type, SerializableFunction<T, CloudEventData> fn) {
return new EmitStep(name, event(type, fn));
}

Expand Down Expand Up @@ -678,7 +724,7 @@ public static ListenStep listen(String name, FuncListenSpec spec) {

/**
* Low-level switch case configurer using a custom builder consumer. Prefer the {@link
* #caseOf(Predicate)} helpers when possible.
* #caseOf(SerializablePredicate)} helpers when possible.
*
* @param taskName optional task name
* @param switchCase consumer to configure the {@link FuncSwitchTaskBuilder}
Expand All @@ -701,7 +747,7 @@ public static FuncTaskConfigurer switchCase(Consumer<FuncSwitchTaskBuilder> swit

/**
* Convenience to apply multiple {@link SwitchCaseConfigurer} built via {@link
* #caseOf(Predicate)}.
* #caseOf(SerializablePredicate)}.
*
* @param cases case configurers
* @return list configurer
Expand Down Expand Up @@ -773,6 +819,11 @@ public static <T> FuncTaskConfigurer switchWhenOrElse(
FuncDSL.cases(caseOf(pred, predClass).then(thenTask), caseDefault(otherwise)));
}

public static <T> FuncTaskConfigurer switchWhenOrElse(
SerializablePredicate<T> pred, String thenTask, FlowDirectiveEnum otherwise) {
return switchWhenOrElse(pred, thenTask, otherwise, ReflectionUtils.inferInputType(pred));
}

/**
* Sugar for a single-case switch with a default task fallback.
*
Expand All @@ -789,6 +840,11 @@ public static <T> FuncTaskConfigurer switchWhenOrElse(
list.switchCase(cases(caseOf(pred, predClass).then(thenTask), caseDefault(otherwiseTask)));
}

public static <T> FuncTaskConfigurer switchWhenOrElse(
SerializablePredicate<T> pred, String thenTask, String otherwiseTask) {
return switchWhenOrElse(pred, thenTask, otherwiseTask, ReflectionUtils.inferInputType(pred));
}

/**
* JQ-based condition: if the JQ expression evaluates truthy → jump to {@code thenTask}, otherwise
* follow the {@link FlowDirectiveEnum} given in {@code otherwise}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public abstract class FuncEventFilterSpec<SELF>
}

/** Sets the event data and the contentType to `application/json` */
public <T> SELF jsonData(Function<T, CloudEventData> function) {
public <T> SELF jsonData(SerializableFunction<T, CloudEventData> function) {
Class<T> clazz = ReflectionUtils.inferInputType(function);
addStep(e -> e.data(new EventDataFunction().withFunction(function, clazz)));
return JSON();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
*/
package io.serverlessworkflow.fluent.func.dsl;

import java.io.Serializable;

/**
* Functions that expect a workflow instance ID injection in runtime
*
* @param <T> The task payload input
* @param <R> The task result output
*/
@FunctionalInterface
public interface InstanceIdBiFunction<T, R> {
public interface InstanceIdBiFunction<T, R> extends Serializable {
R apply(String instanceId, T payload);
}
Loading