forked from modelcontextprotocol/java-sdk
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathMockMcpClientTransport.java
More file actions
99 lines (79 loc) · 2.85 KB
/
MockMcpClientTransport.java
File metadata and controls
99 lines (79 loc) · 2.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
/*
* Copyright 2024-2024 the original author or authors.
*/
package io.modelcontextprotocol;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.Function;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.modelcontextprotocol.spec.McpClientTransport;
import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.McpSchema.JSONRPCNotification;
import io.modelcontextprotocol.spec.McpSchema.JSONRPCRequest;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
/**
* A mock implementation of the {@link McpClientTransport} interfaces.
*/
public class MockMcpClientTransport implements McpClientTransport {
private final Sinks.Many<McpSchema.JSONRPCMessage> inbound = Sinks.many().unicast().onBackpressureBuffer();
private final List<McpSchema.JSONRPCMessage> sent = new ArrayList<>();
private final BiConsumer<MockMcpClientTransport, McpSchema.JSONRPCMessage> interceptor;
public MockMcpClientTransport() {
this((t, msg) -> {
});
}
public MockMcpClientTransport(BiConsumer<MockMcpClientTransport, McpSchema.JSONRPCMessage> interceptor) {
this.interceptor = interceptor;
}
public void simulateIncomingMessage(McpSchema.JSONRPCMessage message) {
if (inbound.tryEmitNext(message).isFailure()) {
throw new RuntimeException("Failed to process incoming message " + message);
}
}
@Override
public Mono<Void> sendMessage(McpSchema.JSONRPCMessage message) {
sent.add(message);
interceptor.accept(this, message);
return Mono.empty();
}
public McpSchema.JSONRPCRequest getLastSentMessageAsRequest() {
return (JSONRPCRequest) getLastSentMessage();
}
public McpSchema.JSONRPCNotification getLastSentMessageAsNotification() {
return (JSONRPCNotification) getLastSentMessage();
}
public McpSchema.JSONRPCMessage getLastSentMessage() {
return !sent.isEmpty() ? sent.get(sent.size() - 1) : null;
}
public McpSchema.JSONRPCBatchResponse getSentMessagesAsBatchResponse() {
return new McpSchema.JSONRPCBatchResponse(sent);
}
private volatile boolean connected = false;
@Override
public Mono<Void> connect(Function<Mono<McpSchema.JSONRPCMessage>, Mono<McpSchema.JSONRPCMessage>> handler) {
if (connected) {
return Mono.error(new IllegalStateException("Already connected"));
}
connected = true;
return inbound.asFlux()
.flatMap(message -> Mono.just(message).transform(handler))
.doFinally(signal -> connected = false)
.then();
}
@Override
public Mono<Void> closeGracefully() {
return Mono.defer(() -> {
connected = false;
inbound.tryEmitComplete();
// Wait for all subscribers to complete
return Mono.empty();
});
}
@Override
public <T> T unmarshalFrom(Object data, TypeReference<T> typeRef) {
return new ObjectMapper().convertValue(data, typeRef);
}
}