diff --git a/conformance-tests/VALIDATION_RESULTS.md b/conformance-tests/VALIDATION_RESULTS.md index 19e74330c..0cba495e0 100644 --- a/conformance-tests/VALIDATION_RESULTS.md +++ b/conformance-tests/VALIDATION_RESULTS.md @@ -2,27 +2,22 @@ ## Summary -**Server Tests:** 37/40 passed (92.5%) +**Server Tests:** 40/40 passed (100%) **Client Tests:** 3/4 scenarios passed (9/10 checks passed) **Auth Tests:** 12/14 scenarios fully passing (178 passed, 1 failed, 1 warning, 85.7% scenarios, 98.9% checks) ## Server Test Results -### Passing (37/40) +### Passing (40/40) - **Lifecycle & Utilities (4/4):** initialize, ping, logging-set-level, completion-complete - **Tools (11/11):** All scenarios including progress notifications ✨ - **Elicitation (10/10):** SEP-1034 defaults (5 checks), SEP-1330 enums (5 checks) -- **Resources (4/6):** list, read-text, read-binary, templates-read +- **Resources (6/6):** list, read-text, read-binary, templates-read, subscribe, unsubscribe - **Prompts (4/4):** list, simple, with-args, embedded-resource, with-image - **SSE Transport (2/2):** Multiple streams - **Security (2/2):** Localhost validation passes, DNS rebinding protection -### Failing (3/40) - -1. **resources-subscribe** - Not implemented in SDK -2. **resources-unsubscribe** - Not implemented in SDK - ## Client Test Results ### Passing (3/4 scenarios, 9/10 checks) @@ -68,10 +63,9 @@ Uses the `client-spring-http-client` module with Spring Security OAuth2 and the ## Known Limitations -1. **Resource Subscriptions:** SDK doesn't implement `resources/subscribe` and `resources/unsubscribe` handlers -2. **Client SSE Retry:** Client doesn't parse or respect the `retry:` field, reconnects immediately, and doesn't send Last-Event-ID header -3. **Auth Scope Step-Up:** Client does not fully handle scope step-up challenges where the server requests additional scopes after initial authorization -4. **Auth Basic CIMD:** Minor conformance warning in the basic Client-Initiated Metadata Discovery flow +1. **Client SSE Retry:** Client doesn't parse or respect the `retry:` field, reconnects immediately, and doesn't send Last-Event-ID header +2. **Auth Scope Step-Up:** Client does not fully handle scope step-up challenges where the server requests additional scopes after initial authorization +3. **Auth Basic CIMD:** Minor conformance warning in the basic Client-Initiated Metadata Discovery flow ## Running Tests @@ -119,6 +113,5 @@ npx @modelcontextprotocol/conformance@0.1.15 client \ ### High Priority 1. Fix client SSE retry field handling in `HttpClientStreamableHttpTransport` -2. Implement resource subscription handlers in `McpStatelessAsyncServer` -3. Implement CIMD -4. Implement scope step up +2. Implement CIMD +3. Implement scope step up diff --git a/conformance-tests/conformance-baseline.yml b/conformance-tests/conformance-baseline.yml index 4ab144063..d2990c155 100644 --- a/conformance-tests/conformance-baseline.yml +++ b/conformance-tests/conformance-baseline.yml @@ -2,11 +2,6 @@ # This file lists known failing scenarios that are expected to fail until fixed. # See: https://github.com/modelcontextprotocol/conformance/blob/main/SDK_INTEGRATION.md -server: - # Resource subscription not implemented in SDK - - resources-subscribe - - resources-unsubscribe - client: # SSE retry field handling not implemented # - Client does not parse or respect retry: field timing diff --git a/conformance-tests/server-servlet/README.md b/conformance-tests/server-servlet/README.md index bd86636b6..ef327ecf6 100644 --- a/conformance-tests/server-servlet/README.md +++ b/conformance-tests/server-servlet/README.md @@ -4,7 +4,7 @@ This module contains a comprehensive MCP (Model Context Protocol) server impleme ## Conformance Test Results -**Status: 37 out of 40 tests passing (92.5%)** +**Status: 40 out of 40 tests passing (100%)** The server has been validated against the official [MCP conformance test suite](https://github.com/modelcontextprotocol/conformance). See [VALIDATION_RESULTS.md](../VALIDATION_RESULTS.md) for detailed results. @@ -22,9 +22,8 @@ The server has been validated against the official [MCP conformance test suite]( - SEP-1034: Default values for all primitive types - SEP-1330: All enum schema variants -✅ **Resources** (4/6) -- List, read text/binary, templates -- ⚠️ Subscribe/unsubscribe (SDK limitation) +✅ **Resources** (6/6) +- List, read text/binary, templates, subscribe, unsubscribe ✅ **Prompts** (4/4) - Simple, parameterized, embedded resources, images @@ -191,12 +190,7 @@ curl -X POST http://localhost:8080/mcp \ ## Known Limitations -See [VALIDATION_RESULTS.md](../VALIDATION_RESULTS.md) for details on: - -1. **Resource Subscriptions** - Not implemented in Java SDK -2. **DNS Rebinding Protection** - Missing Host/Origin validation - -These are SDK-level limitations that require fixes in the core framework. +See [VALIDATION_RESULTS.md](../VALIDATION_RESULTS.md) for details on remaining client-side limitations. ## References diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java b/mcp-core/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java index 32256987a..b078493ef 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java @@ -5,10 +5,12 @@ package io.modelcontextprotocol.server; import java.time.Duration; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -25,7 +27,6 @@ import io.modelcontextprotocol.spec.McpSchema.CompleteResult.CompleteCompletion; import io.modelcontextprotocol.spec.McpSchema.ErrorCodes; import io.modelcontextprotocol.spec.McpSchema.LoggingLevel; -import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification; import io.modelcontextprotocol.spec.McpSchema.PromptReference; import io.modelcontextprotocol.spec.McpSchema.ResourceReference; import io.modelcontextprotocol.spec.McpSchema.SetLevelRequest; @@ -111,12 +112,10 @@ public class McpAsyncServer { private final ConcurrentHashMap prompts = new ConcurrentHashMap<>(); - // FIXME: this field is deprecated and should be remvoed together with the - // broadcasting loggingNotification. - private LoggingLevel minLoggingLevel = LoggingLevel.DEBUG; - private final ConcurrentHashMap completions = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> resourceSubscriptions = new ConcurrentHashMap<>(); + private List protocolVersions; private McpUriTemplateManagerFactory uriTemplateManagerFactory = new DefaultMcpUriTemplateManagerFactory(); @@ -149,8 +148,11 @@ public class McpAsyncServer { this.protocolVersions = mcpTransportProvider.protocolVersions(); - mcpTransportProvider.setSessionFactory(transport -> new McpServerSession(UUID.randomUUID().toString(), - requestTimeout, transport, this::asyncInitializeRequestHandler, requestHandlers, notificationHandlers)); + mcpTransportProvider.setSessionFactory(transport -> { + String sessionId = UUID.randomUUID().toString(); + return new McpServerSession(sessionId, requestTimeout, transport, this::asyncInitializeRequestHandler, + requestHandlers, notificationHandlers, () -> this.cleanupForSession(sessionId)); + }); } McpAsyncServer(McpStreamableServerTransportProvider mcpTransportProvider, McpJsonMapper jsonMapper, @@ -174,8 +176,9 @@ public class McpAsyncServer { this.protocolVersions = mcpTransportProvider.protocolVersions(); - mcpTransportProvider.setSessionFactory(new DefaultMcpStreamableServerSessionFactory(requestTimeout, - this::asyncInitializeRequestHandler, requestHandlers, notificationHandlers)); + mcpTransportProvider.setSessionFactory( + new DefaultMcpStreamableServerSessionFactory(requestTimeout, this::asyncInitializeRequestHandler, + requestHandlers, notificationHandlers, sessionId -> this.cleanupForSession(sessionId))); } private Map prepareNotificationHandlers(McpServerFeatures.Async features) { @@ -215,6 +218,10 @@ private Map> prepareRequestHandlers() { requestHandlers.put(McpSchema.METHOD_RESOURCES_LIST, resourcesListRequestHandler()); requestHandlers.put(McpSchema.METHOD_RESOURCES_READ, resourcesReadRequestHandler()); requestHandlers.put(McpSchema.METHOD_RESOURCES_TEMPLATES_LIST, resourceTemplateListRequestHandler()); + if (Boolean.TRUE.equals(this.serverCapabilities.resources().subscribe())) { + requestHandlers.put(McpSchema.METHOD_RESOURCES_SUBSCRIBE, resourcesSubscribeRequestHandler()); + requestHandlers.put(McpSchema.METHOD_RESOURCES_UNSUBSCRIBE, resourcesUnsubscribeRequestHandler()); + } } // Add prompts API handlers if provider exists @@ -685,12 +692,73 @@ public Mono notifyResourcesListChanged() { } /** - * Notifies clients that the resources have updated. - * @return A Mono that completes when all clients have been notified + * Notifies only the sessions that have subscribed to the updated resource URI. + * @param resourcesUpdatedNotification the notification containing the updated + * resource URI + * @return A Mono that completes when all subscribed sessions have been notified */ public Mono notifyResourcesUpdated(McpSchema.ResourcesUpdatedNotification resourcesUpdatedNotification) { - return this.mcpTransportProvider.notifyClients(McpSchema.METHOD_NOTIFICATION_RESOURCES_UPDATED, - resourcesUpdatedNotification); + return Mono.defer(() -> { + String uri = resourcesUpdatedNotification.uri(); + Set subscribedSessions = this.resourceSubscriptions.get(uri); + if (subscribedSessions == null || subscribedSessions.isEmpty()) { + logger.debug("No sessions subscribed to resource URI: {}", uri); + return Mono.empty(); + } + return Flux.fromIterable(subscribedSessions) + .flatMap(sessionId -> this.mcpTransportProvider + .notifyClient(sessionId, McpSchema.METHOD_NOTIFICATION_RESOURCES_UPDATED, + resourcesUpdatedNotification) + .doOnError(e -> logger.error("Failed to notify session {} of resource update for {}", sessionId, + uri, e)) + .onErrorComplete()) + .then(); + }); + } + + private Mono cleanupForSession(String sessionId) { + return Mono.fromRunnable(() -> { + removeSessionSubscriptions(sessionId); + }); + } + + private void removeSessionSubscriptions(String sessionId) { + this.resourceSubscriptions.forEach((uri, sessions) -> sessions.remove(sessionId)); + this.resourceSubscriptions.entrySet().removeIf(entry -> entry.getValue().isEmpty()); + } + + private McpRequestHandler resourcesSubscribeRequestHandler() { + return (exchange, params) -> Mono.defer(() -> { + McpSchema.SubscribeRequest subscribeRequest = jsonMapper.convertValue(params, + new TypeRef() { + }); + String uri = subscribeRequest.uri(); + String sessionId = exchange.sessionId(); + this.resourceSubscriptions.computeIfAbsent(uri, k -> Collections.newSetFromMap(new ConcurrentHashMap<>())) + .add(sessionId); + logger.debug("Session {} subscribed to resource URI: {}", sessionId, uri); + + return Mono.just(Map.of()); + }); + } + + private McpRequestHandler resourcesUnsubscribeRequestHandler() { + return (exchange, params) -> Mono.defer(() -> { + McpSchema.UnsubscribeRequest unsubscribeRequest = jsonMapper.convertValue(params, + new TypeRef() { + }); + String uri = unsubscribeRequest.uri(); + String sessionId = exchange.sessionId(); + Set sessions = this.resourceSubscriptions.get(uri); + if (sessions != null) { + sessions.remove(sessionId); + if (sessions.isEmpty()) { + this.resourceSubscriptions.remove(uri, sessions); + } + } + logger.debug("Session {} unsubscribed from resource URI: {}", sessionId, uri); + return Mono.just(Map.of()); + }); } private McpRequestHandler resourcesListRequestHandler() { @@ -878,10 +946,6 @@ private McpRequestHandler setLoggerRequestHandler() { exchange.setMinLoggingLevel(newMinLoggingLevel.level()); - // FIXME: this field is deprecated and should be removed together - // with the broadcasting loggingNotification. - this.minLoggingLevel = newMinLoggingLevel.level(); - return Mono.just(Map.of()); }); }; diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProvider.java b/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProvider.java index 7037ff293..d3648a06f 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProvider.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProvider.java @@ -228,6 +228,25 @@ public Mono notifyClients(String method, Object params) { .then(); } + @Override + public Mono notifyClient(String sessionId, String method, Object params) { + return Mono.defer(() -> { + // Need to iterate in O(n) because the transport session id + // is different from the server-logical session id (in streamable http this + // design issue was solved) + McpServerSession session = sessions.values() + .stream() + .filter(s -> sessionId.equals(s.getId())) + .findFirst() + .orElse(null); + if (session == null) { + logger.debug("Session {} not found", sessionId); + return Mono.empty(); + } + return session.sendNotification(method, params); + }); + } + /** * Handles GET requests to establish SSE connections. *

diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletStreamableServerTransportProvider.java b/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletStreamableServerTransportProvider.java index d7561188c..95edb63a0 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletStreamableServerTransportProvider.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletStreamableServerTransportProvider.java @@ -206,6 +206,18 @@ public Mono notifyClients(String method, Object params) { }); } + @Override + public Mono notifyClient(String sessionId, String method, Object params) { + return Mono.defer(() -> { + McpStreamableServerSession session = this.sessions.get(sessionId); + if (session == null) { + logger.debug("Session {} not found", sessionId); + return Mono.empty(); + } + return session.sendNotification(method, params); + }); + } + /** * Initiates a graceful shutdown of the transport. * @return A Mono that completes when all cleanup operations are finished diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/StdioServerTransportProvider.java b/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/StdioServerTransportProvider.java index d288ea3d6..79be014a6 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/StdioServerTransportProvider.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/StdioServerTransportProvider.java @@ -98,12 +98,26 @@ public void setSessionFactory(McpServerSession.Factory sessionFactory) { @Override public Mono notifyClients(String method, Object params) { if (this.session == null) { - return Mono.error(new IllegalStateException("No session to close")); + return Mono.error(new IllegalStateException("No session to notify")); } return this.session.sendNotification(method, params) .doOnError(e -> logger.error("Failed to send notification: {}", e.getMessage())); } + @Override + public Mono notifyClient(String sessionId, String method, Object params) { + return Mono.defer(() -> { + if (this.session == null) { + return Mono.error(new IllegalStateException("No session to notify")); + } + if (!this.session.getId().equals(sessionId)) { + return Mono.error(new IllegalStateException("Existing session id " + this.session.getId() + + " doesn't match the notification target: " + sessionId)); + } + return this.session.sendNotification(method, params); + }); + } + @Override public Mono closeGracefully() { if (this.session == null) { diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/spec/DefaultMcpStreamableServerSessionFactory.java b/mcp-core/src/main/java/io/modelcontextprotocol/spec/DefaultMcpStreamableServerSessionFactory.java index f497afd43..65da43202 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/spec/DefaultMcpStreamableServerSessionFactory.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/spec/DefaultMcpStreamableServerSessionFactory.java @@ -10,6 +10,9 @@ import java.time.Duration; import java.util.Map; import java.util.UUID; +import java.util.function.Function; + +import reactor.core.publisher.Mono; /** * A default implementation of {@link McpStreamableServerSession.Factory}. @@ -26,29 +29,53 @@ public class DefaultMcpStreamableServerSessionFactory implements McpStreamableSe Map notificationHandlers; + private final Function> onClose; + /** - * Constructs an instance + * Constructs an instance. * @param requestTimeout timeout for requests * @param initRequestHandler initialization request handler * @param requestHandlers map of MCP request handlers keyed by method name * @param notificationHandlers map of MCP notification handlers keyed by method name + * @param onClose reactive callback invoked with the session ID when a session is + * closed */ public DefaultMcpStreamableServerSessionFactory(Duration requestTimeout, McpStreamableServerSession.InitRequestHandler initRequestHandler, - Map> requestHandlers, - Map notificationHandlers) { + Map> requestHandlers, Map notificationHandlers, + Function> onClose) { this.requestTimeout = requestTimeout; this.initRequestHandler = initRequestHandler; this.requestHandlers = requestHandlers; this.notificationHandlers = notificationHandlers; + this.onClose = onClose; + } + + /** + * Constructs an instance. + * @param requestTimeout timeout for requests + * @param initRequestHandler initialization request handler + * @param requestHandlers map of MCP request handlers keyed by method name + * @param notificationHandlers map of MCP notification handlers keyed by method name + * @deprecated Use + * {@link #DefaultMcpStreamableServerSessionFactory(Duration, McpStreamableServerSession.InitRequestHandler, Map, Map, Function)} + * instead + */ + @Deprecated + public DefaultMcpStreamableServerSessionFactory(Duration requestTimeout, + McpStreamableServerSession.InitRequestHandler initRequestHandler, + Map> requestHandlers, + Map notificationHandlers) { + this(requestTimeout, initRequestHandler, requestHandlers, notificationHandlers, sessionId -> Mono.empty()); } @Override public McpStreamableServerSession.McpStreamableServerSessionInit startSession( McpSchema.InitializeRequest initializeRequest) { - return new McpStreamableServerSession.McpStreamableServerSessionInit( - new McpStreamableServerSession(UUID.randomUUID().toString(), initializeRequest.capabilities(), - initializeRequest.clientInfo(), requestTimeout, requestHandlers, notificationHandlers), + String sessionId = UUID.randomUUID().toString(); + return new McpStreamableServerSession.McpStreamableServerSessionInit(new McpStreamableServerSession(sessionId, + initializeRequest.capabilities(), initializeRequest.clientInfo(), requestTimeout, requestHandlers, + notificationHandlers, () -> this.onClose.apply(sessionId)), this.initRequestHandler.handle(initializeRequest)); } diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java index ecb1dafd8..fc011a4e3 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java @@ -7,6 +7,7 @@ import java.time.Duration; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -65,25 +66,47 @@ public class McpServerSession implements McpLoggableSession { private volatile McpSchema.LoggingLevel minLoggingLevel = McpSchema.LoggingLevel.INFO; + private final Supplier> onClose; + /** * Creates a new server session with the given parameters and the transport to use. * @param id session id + * @param requestTimeout duration to wait for request responses before timing out * @param transport the transport to use * @param initHandler called when a * {@link io.modelcontextprotocol.spec.McpSchema.InitializeRequest} is received by the * server * @param requestHandlers map of request handlers to use * @param notificationHandlers map of notification handlers to use + * @param onClose supplier of a reactive callback invoked when the session is closed */ public McpServerSession(String id, Duration requestTimeout, McpServerTransport transport, McpInitRequestHandler initHandler, Map> requestHandlers, - Map notificationHandlers) { + Map notificationHandlers, Supplier> onClose) { this.id = id; this.requestTimeout = requestTimeout; this.transport = transport; this.initRequestHandler = initHandler; this.requestHandlers = requestHandlers; this.notificationHandlers = notificationHandlers; + this.onClose = onClose; + } + + /** + * Creates a new server session with the given parameters and the transport to use. + * @param id session id + * @param requestTimeout duration to wait for request responses before timing out + * @param transport the transport to use + * @param initHandler called when a + * {@link io.modelcontextprotocol.spec.McpSchema.InitializeRequest} is received by the + * server + * @param requestHandlers map of request handlers to use + * @param notificationHandlers map of notification handlers to use + */ + public McpServerSession(String id, Duration requestTimeout, McpServerTransport transport, + McpInitRequestHandler initHandler, Map> requestHandlers, + Map notificationHandlers) { + this(id, requestTimeout, transport, initHandler, requestHandlers, notificationHandlers, Mono::empty); } /** @@ -318,12 +341,13 @@ private MethodNotFoundError getMethodNotFoundError(String method) { @Override public Mono closeGracefully() { // TODO: clear pendingResponses and emit errors? - return this.transport.closeGracefully(); + return this.onClose.get().onErrorComplete().then(this.transport.closeGracefully()); } @Override public void close() { // TODO: clear pendingResponses and emit errors? + this.onClose.get().onErrorComplete().subscribe(); this.transport.close(); } diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpServerTransportProviderBase.java b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpServerTransportProviderBase.java index acb1ecac6..8d5e0f847 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpServerTransportProviderBase.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpServerTransportProviderBase.java @@ -45,6 +45,23 @@ public interface McpServerTransportProviderBase { */ Mono notifyClients(String method, Object params); + /** + * Sends a notification to a specific client session. Transport providers that support + * resource subscriptions must override this method to enable per-session + * notifications. The default implementation returns an error indicating that this + * operation is not supported. + * @param sessionId the id of the session to notify + * @param method the name of the notification method to be called on the client + * @param params parameters to be sent with the notification + * @return a Mono that completes when the notification has been sent, or empty if the + * session is not found + */ + default Mono notifyClient(String sessionId, String method, Object params) { + return Mono.error( + new UnsupportedOperationException("This transport provider does not support per-session notifications. " + + "Override notifyClient() to enable resource subscription support.")); + } + /** * Immediately closes all the transports with connected clients and releases any * associated resources. diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpStreamableServerSession.java b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpStreamableServerSession.java index 95f8959f5..9ec2117bb 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpStreamableServerSession.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpStreamableServerSession.java @@ -62,6 +62,8 @@ public class McpStreamableServerSession implements McpLoggableSession { private volatile McpSchema.LoggingLevel minLoggingLevel = McpSchema.LoggingLevel.INFO; + private final Supplier> onClose; + /** * Create an instance of the streamable session. * @param id session ID @@ -71,11 +73,12 @@ public class McpStreamableServerSession implements McpLoggableSession { * @param requestHandlers the map of MCP request handlers keyed by method name * @param notificationHandlers the map of MCP notification handlers keyed by method * name + * @param onClose supplier of a reactive callback invoked when the session is closed */ public McpStreamableServerSession(String id, McpSchema.ClientCapabilities clientCapabilities, McpSchema.Implementation clientInfo, Duration requestTimeout, - Map> requestHandlers, - Map notificationHandlers) { + Map> requestHandlers, Map notificationHandlers, + Supplier> onClose) { this.id = id; this.missingMcpTransportSession = new MissingMcpTransportSession(id); this.listeningStreamRef = new AtomicReference<>(this.missingMcpTransportSession); @@ -84,6 +87,24 @@ public McpStreamableServerSession(String id, McpSchema.ClientCapabilities client this.requestTimeout = requestTimeout; this.requestHandlers = requestHandlers; this.notificationHandlers = notificationHandlers; + this.onClose = onClose; + } + + /** + * Create an instance of the streamable session. + * @param id session ID + * @param clientCapabilities client capabilities + * @param clientInfo client info + * @param requestTimeout timeout to use for requests + * @param requestHandlers the map of MCP request handlers keyed by method name + * @param notificationHandlers the map of MCP notification handlers keyed by method + * name + */ + public McpStreamableServerSession(String id, McpSchema.ClientCapabilities clientCapabilities, + McpSchema.Implementation clientInfo, Duration requestTimeout, + Map> requestHandlers, + Map notificationHandlers) { + this(id, clientCapabilities, clientInfo, requestTimeout, requestHandlers, notificationHandlers, Mono::empty); } @Override @@ -126,6 +147,7 @@ public Mono sendNotification(String method, Object params) { } public Mono delete() { + // onClose is invoked inside closeGracefully return this.closeGracefully().then(Mono.fromRunnable(() -> { // TODO: review in the context of history storage // delete history, etc. @@ -258,15 +280,16 @@ private MethodNotFoundError getMethodNotFoundError(String method) { @Override public Mono closeGracefully() { - return Mono.defer(() -> { + return this.onClose.get().onErrorComplete().then(Mono.defer(() -> { McpLoggableSession listeningStream = this.listeningStreamRef.getAndSet(missingMcpTransportSession); return listeningStream.closeGracefully(); // TODO: Also close all the open streams - }); + })); } @Override public void close() { + this.onClose.get().onErrorComplete().subscribe(); McpLoggableSession listeningStream = this.listeningStreamRef.getAndSet(missingMcpTransportSession); if (listeningStream != null) { listeningStream.close(); diff --git a/mcp-test/src/main/java/io/modelcontextprotocol/AbstractMcpClientServerIntegrationTests.java b/mcp-test/src/main/java/io/modelcontextprotocol/AbstractMcpClientServerIntegrationTests.java index 270bc4308..1ed9b270a 100644 --- a/mcp-test/src/main/java/io/modelcontextprotocol/AbstractMcpClientServerIntegrationTests.java +++ b/mcp-test/src/main/java/io/modelcontextprotocol/AbstractMcpClientServerIntegrationTests.java @@ -1746,6 +1746,95 @@ void testStructuredOutputRuntimeToolAddition(String clientType) { } } + // --------------------------------------- + // Resource Subscription Tests + // --------------------------------------- + + @ParameterizedTest(name = "{0} : {displayName} ") + @MethodSource("clientsForTesting") + void testResourceSubscription(String clientType) throws InterruptedException { + + var clientBuilder = clientBuilders.get(clientType); + + String resourceUri = "test://subscribable-resource"; + var receivedContents = new AtomicReference>(); + var latch = new CountDownLatch(1); + + McpServerFeatures.SyncResourceSpecification resourceSpec = new McpServerFeatures.SyncResourceSpecification( + McpSchema.Resource.builder() + .uri(resourceUri) + .name("Subscribable Resource") + .mimeType("text/plain") + .build(), + (exchange, req) -> new McpSchema.ReadResourceResult( + List.of(new McpSchema.TextResourceContents(resourceUri, "text/plain", "initial content")))); + + McpSyncServer mcpServer = prepareSyncServerBuilder().serverInfo("test-server", "1.0.0") + .capabilities(McpSchema.ServerCapabilities.builder().resources(true, false).build()) + .resources(resourceSpec) + .build(); + + try (var mcpClient = clientBuilder.resourcesUpdateConsumer(contents -> { + receivedContents.set(contents); + latch.countDown(); + }).build()) { + + mcpClient.initialize(); + + mcpClient.subscribeResource(new McpSchema.SubscribeRequest(resourceUri)); + + mcpServer.notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification(resourceUri)); + + assertThat(latch.await(5, TimeUnit.SECONDS)) + .as("client should receive the resources/updated notification within 5 seconds") + .isTrue(); + assertThat(receivedContents.get()).isNotEmpty(); + } + finally { + mcpServer.closeGracefully(); + } + } + + @ParameterizedTest(name = "{0} : {displayName} ") + @MethodSource("clientsForTesting") + void testResourceSubscription_afterUnsubscribe_noNotification(String clientType) throws InterruptedException { + + var clientBuilder = clientBuilders.get(clientType); + + String resourceUri = "test://subscribable-resource-unsub"; + var notificationCount = new java.util.concurrent.atomic.AtomicInteger(0); + + McpServerFeatures.SyncResourceSpecification resourceSpec = new McpServerFeatures.SyncResourceSpecification( + McpSchema.Resource.builder() + .uri(resourceUri) + .name("Subscribable Resource") + .mimeType("text/plain") + .build(), + (exchange, req) -> new McpSchema.ReadResourceResult( + List.of(new McpSchema.TextResourceContents(resourceUri, "text/plain", "content")))); + + McpSyncServer mcpServer = prepareSyncServerBuilder().serverInfo("test-server", "1.0.0") + .capabilities(McpSchema.ServerCapabilities.builder().resources(true, false).build()) + .resources(resourceSpec) + .build(); + + try (var mcpClient = clientBuilder.resourcesUpdateConsumer(contents -> notificationCount.incrementAndGet()) + .build()) { + + mcpClient.initialize(); + + mcpClient.subscribeResource(new McpSchema.SubscribeRequest(resourceUri)); + mcpClient.unsubscribeResource(new McpSchema.UnsubscribeRequest(resourceUri)); + + mcpServer.notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification(resourceUri)); + + assertThat(notificationCount.get()).as("no notification should be received after unsubscribing").isZero(); + } + finally { + mcpServer.closeGracefully(); + } + } + private double evaluateExpression(String expression) { // Simple expression evaluator for testing return switch (expression) { diff --git a/mcp-test/src/test/java/io/modelcontextprotocol/MockMcpServerTransport.java b/mcp-test/src/test/java/io/modelcontextprotocol/MockMcpServerTransport.java index fac26596a..9d43968e5 100644 --- a/mcp-test/src/test/java/io/modelcontextprotocol/MockMcpServerTransport.java +++ b/mcp-test/src/test/java/io/modelcontextprotocol/MockMcpServerTransport.java @@ -6,6 +6,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.function.BiConsumer; import io.modelcontextprotocol.json.McpJsonDefaults; diff --git a/mcp-test/src/test/java/io/modelcontextprotocol/MockMcpServerTransportProvider.java b/mcp-test/src/test/java/io/modelcontextprotocol/MockMcpServerTransportProvider.java index e955be89f..9488870e5 100644 --- a/mcp-test/src/test/java/io/modelcontextprotocol/MockMcpServerTransportProvider.java +++ b/mcp-test/src/test/java/io/modelcontextprotocol/MockMcpServerTransportProvider.java @@ -38,6 +38,14 @@ public Mono notifyClients(String method, Object params) { return session.sendNotification(method, params); } + @Override + public Mono notifyClient(String sessionId, String method, Object params) { + if (session != null && session.getId().equals(sessionId)) { + return session.sendNotification(method, params); + } + return Mono.empty(); + } + @Override public Mono closeGracefully() { return session.closeGracefully(); diff --git a/mcp-test/src/test/java/io/modelcontextprotocol/server/ResourceSubscriptionTests.java b/mcp-test/src/test/java/io/modelcontextprotocol/server/ResourceSubscriptionTests.java new file mode 100644 index 000000000..016e25e9f --- /dev/null +++ b/mcp-test/src/test/java/io/modelcontextprotocol/server/ResourceSubscriptionTests.java @@ -0,0 +1,166 @@ +/* + * Copyright 2025-2025 the original author or authors. + */ + +package io.modelcontextprotocol.server; + +import java.util.UUID; + +import io.modelcontextprotocol.MockMcpServerTransport; +import io.modelcontextprotocol.MockMcpServerTransportProvider; +import io.modelcontextprotocol.spec.McpSchema; +import io.modelcontextprotocol.spec.ProtocolVersions; +import org.junit.jupiter.api.Test; +import reactor.test.StepVerifier; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Unit tests for resource subscription logic in {@link McpAsyncServer}. Uses + * {@link MockMcpServerTransportProvider} to drive sessions directly without a real + * network stack. + */ +class ResourceSubscriptionTests { + + private static final String RESOURCE_URI = "test://resource/1"; + + private static final McpSchema.Implementation SERVER_INFO = new McpSchema.Implementation("test-server", "1.0.0"); + + private static final McpSchema.Implementation CLIENT_INFO = new McpSchema.Implementation("test-client", "1.0.0"); + + private static McpAsyncServer buildServer(MockMcpServerTransportProvider transportProvider) { + return McpServer.async(transportProvider) + .serverInfo(SERVER_INFO) + .capabilities(McpSchema.ServerCapabilities.builder().resources(true, false).build()) + .build(); + } + + private static McpSchema.JSONRPCRequest initRequest() { + return new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_INITIALIZE, + UUID.randomUUID().toString(), + new McpSchema.InitializeRequest(ProtocolVersions.MCP_2025_11_25, null, CLIENT_INFO)); + } + + private static McpSchema.JSONRPCNotification initializedNotification() { + return new McpSchema.JSONRPCNotification(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_NOTIFICATION_INITIALIZED, + null); + } + + private static McpSchema.JSONRPCRequest subscribeRequest(String uri) { + return new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_RESOURCES_SUBSCRIBE, + UUID.randomUUID().toString(), new McpSchema.SubscribeRequest(uri)); + } + + private static McpSchema.JSONRPCRequest unsubscribeRequest(String uri) { + return new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_RESOURCES_UNSUBSCRIBE, + UUID.randomUUID().toString(), new McpSchema.UnsubscribeRequest(uri)); + } + + @Test + void notifyResourcesUpdated_noSubscribers_completesEmpty() { + MockMcpServerTransport transport = new MockMcpServerTransport(); + MockMcpServerTransportProvider transportProvider = new MockMcpServerTransportProvider(transport); + McpAsyncServer server = buildServer(transportProvider); + + transportProvider.simulateIncomingMessage(initRequest()); + transportProvider.simulateIncomingMessage(initializedNotification()); + transport.clearSentMessages(); + + StepVerifier.create(server.notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification(RESOURCE_URI))) + .verifyComplete(); + + assertThat(transport.getAllSentMessages()).as("no notification should be sent when nobody is subscribed") + .isEmpty(); + + server.closeGracefully().block(); + } + + @Test + void notifyResourcesUpdated_afterSubscribe_notifiesSession() { + MockMcpServerTransport transport = new MockMcpServerTransport(); + MockMcpServerTransportProvider transportProvider = new MockMcpServerTransportProvider(transport); + McpAsyncServer server = buildServer(transportProvider); + + transportProvider.simulateIncomingMessage(initRequest()); + transportProvider.simulateIncomingMessage(initializedNotification()); + transportProvider.simulateIncomingMessage(subscribeRequest(RESOURCE_URI)); + transport.clearSentMessages(); + + StepVerifier.create(server.notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification(RESOURCE_URI))) + .verifyComplete(); + + McpSchema.JSONRPCMessage sent = transport.getLastSentMessage(); + assertThat(sent).isInstanceOf(McpSchema.JSONRPCNotification.class); + McpSchema.JSONRPCNotification notification = (McpSchema.JSONRPCNotification) sent; + assertThat(notification.method()).isEqualTo(McpSchema.METHOD_NOTIFICATION_RESOURCES_UPDATED); + + server.closeGracefully().block(); + } + + @Test + void notifyResourcesUpdated_differentUri_doesNotNotifySession() { + MockMcpServerTransport transport = new MockMcpServerTransport(); + MockMcpServerTransportProvider transportProvider = new MockMcpServerTransportProvider(transport); + McpAsyncServer server = buildServer(transportProvider); + + transportProvider.simulateIncomingMessage(initRequest()); + transportProvider.simulateIncomingMessage(initializedNotification()); + transportProvider.simulateIncomingMessage(subscribeRequest(RESOURCE_URI)); + transport.clearSentMessages(); + + StepVerifier + .create(server.notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification("test://other/resource"))) + .verifyComplete(); + + assertThat(transport.getAllSentMessages()) + .as("notification for a different URI should not reach a session subscribed to a different URI") + .isEmpty(); + + server.closeGracefully().block(); + } + + @Test + void notifyResourcesUpdated_afterUnsubscribe_doesNotNotifySession() { + MockMcpServerTransport transport = new MockMcpServerTransport(); + MockMcpServerTransportProvider transportProvider = new MockMcpServerTransportProvider(transport); + McpAsyncServer server = buildServer(transportProvider); + + transportProvider.simulateIncomingMessage(initRequest()); + transportProvider.simulateIncomingMessage(initializedNotification()); + transportProvider.simulateIncomingMessage(subscribeRequest(RESOURCE_URI)); + transportProvider.simulateIncomingMessage(unsubscribeRequest(RESOURCE_URI)); + transport.clearSentMessages(); + + StepVerifier.create(server.notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification(RESOURCE_URI))) + .verifyComplete(); + + assertThat(transport.getAllSentMessages()).as("no notification should be sent after the session unsubscribed") + .isEmpty(); + + server.closeGracefully().block(); + } + + @Test + void notifyResourcesUpdated_afterSessionClose_doesNotNotifySession() { + MockMcpServerTransport transport = new MockMcpServerTransport(); + MockMcpServerTransportProvider transportProvider = new MockMcpServerTransportProvider(transport); + McpAsyncServer server = buildServer(transportProvider); + + transportProvider.simulateIncomingMessage(initRequest()); + transportProvider.simulateIncomingMessage(initializedNotification()); + transportProvider.simulateIncomingMessage(subscribeRequest(RESOURCE_URI)); + + // Close the session; onClose must fire and remove the subscription + transportProvider.closeGracefully().block(); + transport.clearSentMessages(); + + StepVerifier.create(server.notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification(RESOURCE_URI))) + .verifyComplete(); + + assertThat(transport.getAllSentMessages()).as("no notification should be sent after the session has closed") + .isEmpty(); + + server.closeGracefully().block(); + } + +}