Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions impl/a2a/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.serverlessworkflow</groupId>
<artifactId>serverlessworkflow-impl</artifactId>
<version>8.0.0-SNAPSHOT</version>
</parent>
<artifactId>serverlessworkflow-impl-a2a</artifactId>
<name>Serverless Workflow :: Impl :: A2A</name>
<dependencies>
<dependency>
<groupId>io.serverlessworkflow</groupId>
<artifactId>serverlessworkflow-impl-core</artifactId>
</dependency>
<dependency>
<groupId>org.a2aproject.sdk</groupId>
<artifactId>a2a-java-sdk-client</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.executors.a2a;

import io.serverlessworkflow.impl.WorkflowError;
import io.serverlessworkflow.impl.WorkflowException;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.WorkflowPosition;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

class A2AExceptionHandler implements Consumer<Throwable> {

private final CompletableFuture<WorkflowModel> future;
private final WorkflowPosition position;

A2AExceptionHandler(CompletableFuture<WorkflowModel> future, WorkflowPosition position) {
this.future = future;
this.position = position;
}

@Override
public void accept(Throwable ex) {

future.completeExceptionally(
new WorkflowException(
A2AUtils.workflowError(position)
.title(ex.getMessage())
.details(WorkflowError.getStackTrace(ex))
.build(),
ex));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.executors.a2a;

import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowModel;
import io.serverlessworkflow.impl.WorkflowValueResolver;
import io.serverlessworkflow.impl.executors.CallableTask;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.a2aproject.sdk.client.Client;
import org.a2aproject.sdk.client.config.ClientConfig;
import org.a2aproject.sdk.client.http.A2ACardResolver;
import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransport;
import org.a2aproject.sdk.client.transport.jsonrpc.JSONRPCTransportConfig;

class A2AExecutor implements CallableTask {

private final WorkflowValueResolver<URI> uriSupplier;
private final A2ARequestDispatcher dispatcher;
private final WorkflowValueResolver<Map<String, Object>> mapResolver;

public A2AExecutor(
WorkflowValueResolver<URI> uriSupplier,
A2ARequestDispatcher dispatcher,
WorkflowValueResolver<Map<String, Object>> mapResolver) {
this.uriSupplier = uriSupplier;
this.dispatcher = dispatcher;
this.mapResolver = mapResolver;
}

@Override
public CompletableFuture<WorkflowModel> apply(
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
URI uri = uriSupplier.apply(workflowContext, taskContext, input);
return dispatcher.apply(
Client.builder(
A2ACardResolver.builder()
.baseUrl(uri.resolve("/").toString())
.agentCardPath(uri.getPath())
.build()
.getAgentCard())
.clientConfig(new ClientConfig.Builder().build())
.withTransport(JSONRPCTransport.class, new JSONRPCTransportConfig())
.build(),
mapResolver.apply(workflowContext, taskContext, input),
workflowContext,
taskContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.executors.a2a;

import io.serverlessworkflow.api.types.A2AArguments;
import io.serverlessworkflow.api.types.CallA2A;
import io.serverlessworkflow.api.types.TaskBase;
import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowDefinition;
import io.serverlessworkflow.impl.WorkflowMutablePosition;
import io.serverlessworkflow.impl.WorkflowUtils;
import io.serverlessworkflow.impl.WorkflowValueResolver;
import io.serverlessworkflow.impl.executors.CallableTaskBuilder;
import io.serverlessworkflow.impl.executors.CallableTaskFactory;
import java.net.URI;

public class A2AExecutorBuilder implements CallableTaskBuilder<CallA2A> {

@Override
public boolean accept(Class<? extends TaskBase> clazz) {
return CallA2A.class.equals(clazz);
}

@Override
public CallableTaskFactory init(
CallA2A task, WorkflowDefinition definition, WorkflowMutablePosition position) {
A2AArguments args = task.getWith();

WorkflowValueResolver<URI> uriSupplier;
if (args.getServer() != null) {
uriSupplier = definition.resourceLoader().uriSupplier(args.getServer());
} else if (args.getAgentCard() != null) {
uriSupplier = definition.resourceLoader().uriSupplier(args.getAgentCard().getEndpoint());
} else {
throw new IllegalArgumentException("Neither server or agent card is set for task" + task);
}

A2ARequestDispatcher dispatcher =
switch (args.getMethod()) {
case MESSAGE_SEND ->
new MessageDispatcher(
new MessageConsumerFactory() {
@Override
protected MessageConsumer buildConsumer(
WorkflowContext workflowContext, TaskContext taskContext) {
return new MessageSendConsumer(workflowContext.definition());
}
});
case MESSAGE_STREAM ->
new MessageDispatcher(
new MessageConsumerFactory() {

@Override
protected MessageConsumer buildConsumer(
WorkflowContext workflowContext, TaskContext taskContext) {
return new MessageStreamConsumer(
workflowContext.definition(), taskContext.position());
}
});
case TASKS_LIST -> new ListTaskParamsDispatcher();
case TASKS_GET -> new GetTaskParamsDispatcher();
case TASKS_CANCEL -> new CancelTaskParamsDispatcher();
// TODO handle missing cases
case AGENT_GET_AUTHENTICATED_EXTENDED_CARD,
TASKS_PUSH_NOTIFICATION_CONFIG_DELETE,
TASKS_PUSH_NOTIFICATION_CONFIG_GET,
TASKS_PUSH_NOTIFICATION_CONFIG_LIST,
TASKS_PUSH_NOTIFICATION_CONFIG_SET,
TASKS_RESUBSCRIBE ->
throw new UnsupportedOperationException("Unimplemented case: " + args.getMethod());
};

return () ->
new A2AExecutor(
uriSupplier,
dispatcher,
WorkflowUtils.buildMapResolver(
definition.application(),
args.getParameters().getString(),
args.getParameters().getWithA2AParameters().getAdditionalProperties()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2020-Present The Serverless Workflow Specification Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.serverlessworkflow.impl.executors.a2a;

import io.serverlessworkflow.impl.TaskContext;
import io.serverlessworkflow.impl.WorkflowContext;
import io.serverlessworkflow.impl.WorkflowModel;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.a2aproject.sdk.client.Client;

@FunctionalInterface
interface A2ARequestDispatcher {
CompletableFuture<WorkflowModel> apply(
Client client,
Map<String, Object> parameters,
WorkflowContext workflowContext,
TaskContext taskCtontext);
}
Loading
Loading