diff --git a/impl/a2a/pom.xml b/impl/a2a/pom.xml new file mode 100644 index 000000000..91a8fac99 --- /dev/null +++ b/impl/a2a/pom.xml @@ -0,0 +1,30 @@ + + 4.0.0 + + io.serverlessworkflow + serverlessworkflow-impl + 8.0.0-SNAPSHOT + + serverlessworkflow-impl-a2a + Serverless Workflow :: Impl :: A2A + + + io.serverlessworkflow + serverlessworkflow-impl-core + + + org.a2aproject.sdk + a2a-java-sdk-client + + + org.junit.jupiter + junit-jupiter-engine + test + + + org.junit.jupiter + junit-jupiter-params + test + + + diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AExceptionHandler.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AExceptionHandler.java new file mode 100644 index 000000000..05aaad38c --- /dev/null +++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AExceptionHandler.java @@ -0,0 +1,46 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.a2a; + +import io.serverlessworkflow.impl.WorkflowError; +import io.serverlessworkflow.impl.WorkflowException; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowPosition; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +class A2AExceptionHandler implements Consumer { + + private final CompletableFuture future; + private final WorkflowPosition position; + + A2AExceptionHandler(CompletableFuture future, WorkflowPosition position) { + this.future = future; + this.position = position; + } + + @Override + public void accept(Throwable ex) { + + future.completeExceptionally( + new WorkflowException( + A2AUtils.workflowError(position) + .title(ex.getMessage()) + .details(WorkflowError.getStackTrace(ex)) + .build(), + ex)); + } +} diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AExecutor.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AExecutor.java new file mode 100644 index 000000000..bb1b47712 --- /dev/null +++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AExecutor.java @@ -0,0 +1,65 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.a2a; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowValueResolver; +import io.serverlessworkflow.impl.executors.CallableTask; +import java.net.URI; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.a2aproject.sdk.client.Client; +import org.a2aproject.sdk.client.config.ClientConfig; +import org.a2aproject.sdk.client.http.A2ACardResolver; +import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransport; +import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransportConfig; + +class A2AExecutor implements CallableTask { + + private final WorkflowValueResolver uriSupplier; + private final A2ARequestDispatcher dispatcher; + private final WorkflowValueResolver> mapResolver; + + public A2AExecutor( + WorkflowValueResolver uriSupplier, + A2ARequestDispatcher dispatcher, + WorkflowValueResolver> mapResolver) { + this.uriSupplier = uriSupplier; + this.dispatcher = dispatcher; + this.mapResolver = mapResolver; + } + + @Override + public CompletableFuture apply( + WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) { + URI uri = uriSupplier.apply(workflowContext, taskContext, input); + return dispatcher.apply( + Client.builder( + A2ACardResolver.builder() + .baseUrl(uri.resolve("/").toString()) + .agentCardPath(uri.getPath()) + .build() + .getAgentCard()) + .clientConfig(new ClientConfig.Builder().build()) + .withTransport(JSONRPCTransport.class, new JSONRPCTransportConfig()) + .build(), + mapResolver.apply(workflowContext, taskContext, input), + workflowContext, + taskContext); + } +} diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AExecutorBuilder.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AExecutorBuilder.java new file mode 100644 index 000000000..a8a77009d --- /dev/null +++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AExecutorBuilder.java @@ -0,0 +1,96 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.a2a; + +import io.serverlessworkflow.api.types.A2AArguments; +import io.serverlessworkflow.api.types.CallA2A; +import io.serverlessworkflow.api.types.TaskBase; +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowMutablePosition; +import io.serverlessworkflow.impl.WorkflowUtils; +import io.serverlessworkflow.impl.WorkflowValueResolver; +import io.serverlessworkflow.impl.executors.CallableTaskBuilder; +import io.serverlessworkflow.impl.executors.CallableTaskFactory; +import java.net.URI; + +public class A2AExecutorBuilder implements CallableTaskBuilder { + + @Override + public boolean accept(Class clazz) { + return CallA2A.class.equals(clazz); + } + + @Override + public CallableTaskFactory init( + CallA2A task, WorkflowDefinition definition, WorkflowMutablePosition position) { + A2AArguments args = task.getWith(); + + WorkflowValueResolver uriSupplier; + if (args.getServer() != null) { + uriSupplier = definition.resourceLoader().uriSupplier(args.getServer()); + } else if (args.getAgentCard() != null) { + uriSupplier = definition.resourceLoader().uriSupplier(args.getAgentCard().getEndpoint()); + } else { + throw new IllegalArgumentException("Neither server or agent card is set for task" + task); + } + + A2ARequestDispatcher dispatcher = + switch (args.getMethod()) { + case MESSAGE_SEND -> + new MessageDispatcher( + new MessageConsumerFactory() { + @Override + protected MessageConsumer buildConsumer( + WorkflowContext workflowContext, TaskContext taskContext) { + return new MessageSendConsumer(workflowContext.definition()); + } + }); + case MESSAGE_STREAM -> + new MessageDispatcher( + new MessageConsumerFactory() { + + @Override + protected MessageConsumer buildConsumer( + WorkflowContext workflowContext, TaskContext taskContext) { + return new MessageStreamConsumer( + workflowContext.definition(), taskContext.position()); + } + }); + case TASKS_LIST -> new ListTaskParamsDispatcher(); + case TASKS_GET -> new GetTaskParamsDispatcher(); + case TASKS_CANCEL -> new CancelTaskParamsDispatcher(); + // TODO handle missing cases + case AGENT_GET_AUTHENTICATED_EXTENDED_CARD, + TASKS_PUSH_NOTIFICATION_CONFIG_DELETE, + TASKS_PUSH_NOTIFICATION_CONFIG_GET, + TASKS_PUSH_NOTIFICATION_CONFIG_LIST, + TASKS_PUSH_NOTIFICATION_CONFIG_SET, + TASKS_RESUBSCRIBE -> + throw new UnsupportedOperationException("Unimplemented case: " + args.getMethod()); + }; + + return () -> + new A2AExecutor( + uriSupplier, + dispatcher, + WorkflowUtils.buildMapResolver( + definition.application(), + args.getParameters().getString(), + args.getParameters().getWithA2AParameters().getAdditionalProperties())); + } +} diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2ARequestDispatcher.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2ARequestDispatcher.java new file mode 100644 index 000000000..91916368e --- /dev/null +++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2ARequestDispatcher.java @@ -0,0 +1,32 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.a2a; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.a2aproject.sdk.client.Client; + +@FunctionalInterface +interface A2ARequestDispatcher { + CompletableFuture apply( + Client client, + Map parameters, + WorkflowContext workflowContext, + TaskContext taskCtontext); +} diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AUtils.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AUtils.java new file mode 100644 index 000000000..8778a51df --- /dev/null +++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/A2AUtils.java @@ -0,0 +1,126 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.a2a; + +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowError; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowModelFactory; +import io.serverlessworkflow.impl.WorkflowPosition; +import io.serverlessworkflow.types.Errors; +import java.time.Instant; +import java.util.Map; +import java.util.Optional; +import java.util.function.Supplier; +import org.a2aproject.sdk.spec.Message; +import org.a2aproject.sdk.spec.Task; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class A2AUtils { + + static final String TASK_ID = "taskId"; + + private static final Logger logger = LoggerFactory.getLogger(A2AUtils.class); + + static T param(Map map, String key, Class instanceClass) { + Object obj = map.get(key); + return isInstanceOrThrow( + obj, + instanceClass, + () -> + "value " + + obj + + " for key " + + key + + " is not an instance of type of type " + + instanceClass); + } + + static T optionalParam( + Map map, String key, Class instanceClass, Supplier defaultValue) { + return isInstanceOrDefault(map.get(key), instanceClass, defaultValue); + } + + static > T enumParam( + Map map, String key, Class instanceClass, T defaultValue) { + Object obj = map.get(key); + + if (instanceClass.isInstance(obj)) { + return instanceClass.cast(obj); + } else if (String.class.isInstance(obj)) { + return Enum.valueOf(instanceClass, String.class.cast(obj)); + } else { + return defaultValue; + } + } + + static T optionalParam(Map map, String key, Class instanceClass) { + return isInstanceOrDefault(key, instanceClass, () -> null); + } + + static T isInstanceOrThrow(Object obj, Class instanceClass, Supplier message) { + return isInstance(obj, instanceClass) + .orElseThrow(() -> new IllegalArgumentException(message.get())); + } + + static T isInstanceOrDefault(Object obj, Class instanceClass, Supplier defaultValue) { + return isInstance(obj, instanceClass) + .orElseGet( + () -> { + if (obj != null) { + logger.warn( + "Object " + + obj + + " is expected to be of class" + + instanceClass + + " but it is of class " + + obj.getClass() + + " Using provided default"); + } + return defaultValue.get(); + }); + } + + private static Optional isInstance(Object obj, Class instanceClass) { + if (instanceClass.isInstance(obj)) { + return Optional.of(instanceClass.cast(obj)); + } else if (Instant.class.isAssignableFrom(instanceClass) && String.class.isInstance(obj)) { + return Optional.of(instanceClass.cast(Instant.parse(String.class.cast(obj)))); + } else { + return Optional.empty(); + } + } + + static WorkflowModel fromTask(WorkflowModelFactory factory, Task task) { + return factory.fromOther(task); + } + + static WorkflowModel fromTask(WorkflowContext context, Task task) { + return fromTask(context.definition().application().modelFactory(), task); + } + + static WorkflowModel fromMessage(WorkflowModelFactory factory, Message message) { + return factory.fromOther(message); + } + + static WorkflowError.Builder workflowError(WorkflowPosition position) { + return WorkflowError.error(Errors.RUNTIME.toString(), Errors.RUNTIME.status()) + .instance(position.jsonPointer().toString()); + } + + private A2AUtils() {} +} diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/CancelTaskParamsDispatcher.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/CancelTaskParamsDispatcher.java new file mode 100644 index 000000000..cd0dbc01f --- /dev/null +++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/CancelTaskParamsDispatcher.java @@ -0,0 +1,42 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.a2a; + +import static io.serverlessworkflow.impl.executors.a2a.A2AUtils.param; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.a2aproject.sdk.client.Client; +import org.a2aproject.sdk.spec.CancelTaskParams; + +class CancelTaskParamsDispatcher implements A2ARequestDispatcher { + + @Override + public CompletableFuture apply( + Client client, + Map parameters, + WorkflowContext workflowContext, + TaskContext taskCtontext) { + return CompletableFuture.completedFuture( + A2AUtils.fromTask( + workflowContext, + client.cancelTask( + new CancelTaskParams(param(parameters, A2AUtils.TASK_ID, String.class))))); + } +} diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/GetTaskParamsDispatcher.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/GetTaskParamsDispatcher.java new file mode 100644 index 000000000..ff4f0f35d --- /dev/null +++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/GetTaskParamsDispatcher.java @@ -0,0 +1,42 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.a2a; + +import static io.serverlessworkflow.impl.executors.a2a.A2AUtils.param; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.a2aproject.sdk.client.Client; +import org.a2aproject.sdk.spec.TaskQueryParams; + +class GetTaskParamsDispatcher implements A2ARequestDispatcher { + + @Override + public CompletableFuture apply( + Client client, + Map parameters, + WorkflowContext workflowContext, + TaskContext taskCtontext) { + return CompletableFuture.completedFuture( + A2AUtils.fromTask( + workflowContext, + client.getTask( + new TaskQueryParams(param(parameters, A2AUtils.TASK_ID, String.class))))); + } +} diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/ListTaskParamsDispatcher.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/ListTaskParamsDispatcher.java new file mode 100644 index 000000000..45a5f3527 --- /dev/null +++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/ListTaskParamsDispatcher.java @@ -0,0 +1,59 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.a2a; + +import static io.serverlessworkflow.impl.executors.a2a.A2AUtils.enumParam; +import static io.serverlessworkflow.impl.executors.a2a.A2AUtils.optionalParam; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowModelCollection; +import io.serverlessworkflow.impl.WorkflowModelFactory; +import java.time.Instant; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.a2aproject.sdk.client.Client; +import org.a2aproject.sdk.jsonrpc.common.wrappers.ListTasksResult; +import org.a2aproject.sdk.spec.ListTasksParams; +import org.a2aproject.sdk.spec.TaskState; + +class ListTaskParamsDispatcher implements A2ARequestDispatcher { + + @Override + public CompletableFuture apply( + Client client, + Map parameters, + WorkflowContext workflowContext, + TaskContext taskCtontext) { + ListTasksResult tasks = + client.listTasks( + new ListTasksParams( + optionalParam(parameters, "contextId", String.class), + enumParam(parameters, "status", TaskState.class, null), + optionalParam(parameters, "pageSize", Integer.class), + optionalParam(parameters, "pageToken", String.class), + optionalParam(parameters, "historyLength", Integer.class), + optionalParam(parameters, "statusTimestampAfter", Instant.class), + optionalParam(parameters, "includeArtifacts", Boolean.class), + optionalParam(parameters, "tenant", String.class))); + + WorkflowModelFactory factory = workflowContext.definition().application().modelFactory(); + WorkflowModelCollection model = factory.createCollection(); + tasks.tasks().forEach(t -> model.add(A2AUtils.fromTask(factory, t))); + return CompletableFuture.completedFuture(model); + } +} diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageConsumer.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageConsumer.java new file mode 100644 index 000000000..a975f6b8e --- /dev/null +++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageConsumer.java @@ -0,0 +1,39 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.a2a; + +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowModelFactory; +import java.util.concurrent.CompletableFuture; +import java.util.function.BiConsumer; +import org.a2aproject.sdk.client.ClientEvent; +import org.a2aproject.sdk.spec.AgentCard; + +abstract class MessageConsumer implements BiConsumer { + + protected final CompletableFuture completableFuture; + protected final WorkflowModelFactory factory; + + public MessageConsumer(WorkflowDefinition definition) { + completableFuture = new CompletableFuture<>(); + factory = definition.application().modelFactory(); + } + + public CompletableFuture future() { + return completableFuture; + } +} diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageConsumerFactory.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageConsumerFactory.java new file mode 100644 index 000000000..00a398fc9 --- /dev/null +++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageConsumerFactory.java @@ -0,0 +1,35 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.a2a; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +abstract class MessageConsumerFactory { + + protected final CompletableFuture completableFuture = new CompletableFuture<>(); + + protected abstract MessageConsumer buildConsumer( + WorkflowContext workflowContext, TaskContext taskContext); + + protected Consumer buildExceptionHandler( + WorkflowContext workflowContext, TaskContext taskContext) { + return new A2AExceptionHandler(completableFuture, taskContext.position()); + } +} diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageDispatcher.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageDispatcher.java new file mode 100644 index 000000000..5e8add9aa --- /dev/null +++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageDispatcher.java @@ -0,0 +1,87 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.a2a; + +import static io.serverlessworkflow.impl.executors.a2a.A2AUtils.isInstanceOrThrow; +import static io.serverlessworkflow.impl.executors.a2a.A2AUtils.optionalParam; +import static io.serverlessworkflow.impl.executors.a2a.A2AUtils.param; + +import io.serverlessworkflow.impl.TaskContext; +import io.serverlessworkflow.impl.WorkflowContext; +import io.serverlessworkflow.impl.WorkflowModel; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.a2aproject.sdk.client.Client; +import org.a2aproject.sdk.spec.DataPart; +import org.a2aproject.sdk.spec.Message; +import org.a2aproject.sdk.spec.Message.Role; +import org.a2aproject.sdk.spec.Part; +import org.a2aproject.sdk.spec.TextPart; + +class MessageDispatcher implements A2ARequestDispatcher { + + private final MessageConsumerFactory consumerFactory; + + public MessageDispatcher(MessageConsumerFactory consumerFactory) { + this.consumerFactory = consumerFactory; + } + + @Override + public CompletableFuture apply( + Client client, + Map parameters, + WorkflowContext workflowContext, + TaskContext taskContext) { + + MessageConsumer consumer = consumerFactory.buildConsumer(workflowContext, taskContext); + Message message = buildMessage(parameters); + client.sendMessage( + message, + List.of(consumer), + consumerFactory.buildExceptionHandler(workflowContext, taskContext)); + return consumer.future(); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private Message buildMessage(Map parameters) { + + Map message = param(parameters, "message", Map.class); + Message.Builder messageBuilder = Message.builder(); + + Collection items = param(message, "parts", Collection.class); + List> parts = new ArrayList<>(); + for (Object item : items) { + Map part = isInstanceOrThrow(item, Map.class, () -> "One item of parts is not an object"); + String kind = optionalParam(part, "kind", String.class, () -> "text"); + parts.add( + switch (kind) { + case TextPart.TEXT -> new TextPart(param(part, TextPart.TEXT, String.class)); + case DataPart.DATA -> new DataPart(param(part, DataPart.DATA, Object.class)); + default -> throw new UnsupportedOperationException("Unimplemented kind: " + kind); + }); + } + messageBuilder.parts(parts); + Object value = message.get("messageId"); + if (value instanceof String messageId) { + messageBuilder.messageId(messageId); + } + messageBuilder.role(A2AUtils.enumParam(message, "role", Role.class, Role.ROLE_USER)); + return messageBuilder.build(); + } +} diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageSendConsumer.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageSendConsumer.java new file mode 100644 index 000000000..e8f395ffc --- /dev/null +++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageSendConsumer.java @@ -0,0 +1,38 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.a2a; + +import io.serverlessworkflow.impl.WorkflowDefinition; +import org.a2aproject.sdk.client.ClientEvent; +import org.a2aproject.sdk.client.MessageEvent; +import org.a2aproject.sdk.client.TaskEvent; +import org.a2aproject.sdk.spec.AgentCard; + +class MessageSendConsumer extends MessageConsumer { + + public MessageSendConsumer(WorkflowDefinition definition) { + super(definition); + } + + @Override + public void accept(ClientEvent event, AgentCard card) { + if (event instanceof MessageEvent resp) { + completableFuture.complete(A2AUtils.fromMessage(factory, resp.getMessage())); + } else if (event instanceof TaskEvent resp) { + completableFuture.complete(A2AUtils.fromTask(factory, resp.getTask())); + } + } +} diff --git a/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageStreamConsumer.java b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageStreamConsumer.java new file mode 100644 index 000000000..d9d1268d3 --- /dev/null +++ b/impl/a2a/src/main/java/io/serverlessworkflow/impl/executors/a2a/MessageStreamConsumer.java @@ -0,0 +1,67 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.executors.a2a; + +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowException; +import io.serverlessworkflow.impl.WorkflowPosition; +import org.a2aproject.sdk.client.ClientEvent; +import org.a2aproject.sdk.client.MessageEvent; +import org.a2aproject.sdk.client.TaskEvent; +import org.a2aproject.sdk.client.TaskUpdateEvent; +import org.a2aproject.sdk.spec.AgentCard; +import org.a2aproject.sdk.spec.Task; + +class MessageStreamConsumer extends MessageConsumer { + + private final WorkflowPosition position; + + public MessageStreamConsumer(WorkflowDefinition definition, WorkflowPosition position) { + super(definition); + this.position = position; + } + + @Override + public void accept(ClientEvent event, AgentCard card) { + if (event instanceof MessageEvent resp) { + completableFuture.complete(A2AUtils.fromMessage(factory, resp.getMessage())); + } else if (event instanceof TaskUpdateEvent resp) { + checkTaskCompletion(resp.getTask()); + } else if (event instanceof TaskEvent resp) { + checkTaskCompletion(resp.getTask()); + } + } + + private void checkTaskCompletion(Task task) { + switch (task.status().state()) { + case TASK_STATE_REJECTED, TASK_STATE_FAILED, TASK_STATE_CANCELED: + completableFuture.completeExceptionally(exception(task)); + break; + case TASK_STATE_COMPLETED: + completableFuture.complete(A2AUtils.fromTask(factory, task)); + default: + // do nothing + } + } + + private WorkflowException exception(Task task) { + return new WorkflowException( + A2AUtils.workflowError(position) + .title(task.status().state().toString()) + .details(task.history().toString()) + .build()); + } +} diff --git a/impl/a2a/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder b/impl/a2a/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder new file mode 100644 index 000000000..bc458e2b9 --- /dev/null +++ b/impl/a2a/src/main/resources/META-INF/services/io.serverlessworkflow.impl.executors.CallableTaskBuilder @@ -0,0 +1 @@ +io.serverlessworkflow.impl.executors.a2a.A2AExecutorBuilder diff --git a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowError.java b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowError.java index bd0d84d51..ee2c4a6c5 100644 --- a/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowError.java +++ b/impl/core/src/main/java/io/serverlessworkflow/impl/WorkflowError.java @@ -83,16 +83,20 @@ private static WorkflowError error(Throwable cause, WorkflowPosition position) { } private static WorkflowError commonError(Throwable cause, WorkflowPosition position) { + return new WorkflowError( + cause.getClass().getTypeName(), + 500, + position == null ? null : position.jsonPointer(), + cause.getMessage(), + getStackTrace(cause)); + } + + public static String getStackTrace(Throwable cause) { StringWriter stackTrace = new StringWriter(); try (PrintWriter writer = new PrintWriter(stackTrace)) { cause.printStackTrace(writer); - return new WorkflowError( - cause.getClass().getTypeName(), - 500, - position == null ? null : position.jsonPointer(), - cause.getMessage(), - stackTrace.toString()); } + return stackTrace.toString(); } @Deprecated diff --git a/impl/pom.xml b/impl/pom.xml index 72737c6db..8c7b1249d 100644 --- a/impl/pom.xml +++ b/impl/pom.xml @@ -17,6 +17,8 @@ 9.2.1 3.7.1 25.0.3 + 1.0.0.Final + 2.14.0 @@ -123,6 +125,11 @@ serverlessworkflow-impl-grpc ${project.version} + + io.serverlessworkflow + serverlessworkflow-impl-a2a + ${project.version} + net.thisptr jackson-jq @@ -191,6 +198,16 @@ polyglot ${version.org.graalvm.polyglot} + + org.a2aproject.sdk + a2a-java-sdk-client + ${version.org.a2aproject.sdk} + + + com.google.code.gson + gson + ${version.com.google.code.gson} + @@ -213,5 +230,6 @@ python grpc openapi-jackson + a2a diff --git a/impl/test/pom.xml b/impl/test/pom.xml index 49ad1d893..165ef1a98 100644 --- a/impl/test/pom.xml +++ b/impl/test/pom.xml @@ -42,6 +42,10 @@ io.serverlessworkflow serverlessworkflow-impl-script-python + + io.serverlessworkflow + serverlessworkflow-impl-a2a + org.glassfish.jersey.media jersey-media-json-jackson diff --git a/impl/test/src/test/java/io/serverlessworkflow/impl/test/A2ADefinitionTest.java b/impl/test/src/test/java/io/serverlessworkflow/impl/test/A2ADefinitionTest.java new file mode 100644 index 000000000..c44f776cc --- /dev/null +++ b/impl/test/src/test/java/io/serverlessworkflow/impl/test/A2ADefinitionTest.java @@ -0,0 +1,37 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverlessworkflow.impl.test; + +import static io.serverlessworkflow.api.WorkflowReader.readWorkflowFromClasspath; + +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import java.io.IOException; +import org.junit.jupiter.api.Test; + +class A2ADefinitionTest { + + @Test + void testHelloWorld() throws IOException { + + try (WorkflowApplication appl = WorkflowApplication.builder().build()) { + WorkflowDefinition def = + appl.workflowDefinition( + readWorkflowFromClasspath("workflows-samples/a2a/a2a-hello-world.yaml")); + System.out.println(def.instance().start().join().asJavaObject()); + } + } +} diff --git a/impl/test/src/test/resources/workflows-samples/a2a/a2a-hello-world.yaml b/impl/test/src/test/resources/workflows-samples/a2a/a2a-hello-world.yaml new file mode 100644 index 000000000..68867309b --- /dev/null +++ b/impl/test/src/test/resources/workflows-samples/a2a/a2a-hello-world.yaml @@ -0,0 +1,17 @@ +document: + dsl: '1.0.3' + namespace: test + name: a2a-hello-world + version: '0.1.0' +do: + - HowMuch: + call: a2a + with: + method: message/send + agentCard: + endpoint: http://localhost:9999 + parameters: + message: + parts: + - kind: text + text: how much is 10 USD in INR? \ No newline at end of file