From ad17a4ec471f339dc8a184156f33ee31a4cf54fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Fri, 27 Feb 2026 00:51:12 +0100 Subject: [PATCH 1/4] Resource subscriptions MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Dariusz Jędrzejczyk --- .../server/McpAsyncServer.java | 98 +++++++-- ...HttpServletSseServerTransportProvider.java | 18 ++ ...vletStreamableServerTransportProvider.java | 12 ++ .../StdioServerTransportProvider.java | 16 +- ...aultMcpStreamableServerSessionFactory.java | 39 +++- .../spec/McpServerSession.java | 28 ++- .../spec/McpServerTransportProviderBase.java | 17 ++ .../spec/McpStreamableServerSession.java | 31 ++- ...stractMcpClientServerIntegrationTests.java | 89 +++++++++ .../MockMcpServerTransport.java | 24 +++ .../MockMcpServerTransportProvider.java | 8 + .../server/ResourceSubscriptionTests.java | 188 ++++++++++++++++++ 12 files changed, 538 insertions(+), 30 deletions(-) create mode 100644 mcp-test/src/test/java/io/modelcontextprotocol/server/ResourceSubscriptionTests.java diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java b/mcp-core/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java index 32256987a..b078493ef 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/server/McpAsyncServer.java @@ -5,10 +5,12 @@ package io.modelcontextprotocol.server; import java.time.Duration; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -25,7 +27,6 @@ import io.modelcontextprotocol.spec.McpSchema.CompleteResult.CompleteCompletion; import io.modelcontextprotocol.spec.McpSchema.ErrorCodes; import io.modelcontextprotocol.spec.McpSchema.LoggingLevel; -import io.modelcontextprotocol.spec.McpSchema.LoggingMessageNotification; import io.modelcontextprotocol.spec.McpSchema.PromptReference; import io.modelcontextprotocol.spec.McpSchema.ResourceReference; import io.modelcontextprotocol.spec.McpSchema.SetLevelRequest; @@ -111,12 +112,10 @@ public class McpAsyncServer { private final ConcurrentHashMap prompts = new ConcurrentHashMap<>(); - // FIXME: this field is deprecated and should be remvoed together with the - // broadcasting loggingNotification. - private LoggingLevel minLoggingLevel = LoggingLevel.DEBUG; - private final ConcurrentHashMap completions = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> resourceSubscriptions = new ConcurrentHashMap<>(); + private List protocolVersions; private McpUriTemplateManagerFactory uriTemplateManagerFactory = new DefaultMcpUriTemplateManagerFactory(); @@ -149,8 +148,11 @@ public class McpAsyncServer { this.protocolVersions = mcpTransportProvider.protocolVersions(); - mcpTransportProvider.setSessionFactory(transport -> new McpServerSession(UUID.randomUUID().toString(), - requestTimeout, transport, this::asyncInitializeRequestHandler, requestHandlers, notificationHandlers)); + mcpTransportProvider.setSessionFactory(transport -> { + String sessionId = UUID.randomUUID().toString(); + return new McpServerSession(sessionId, requestTimeout, transport, this::asyncInitializeRequestHandler, + requestHandlers, notificationHandlers, () -> this.cleanupForSession(sessionId)); + }); } McpAsyncServer(McpStreamableServerTransportProvider mcpTransportProvider, McpJsonMapper jsonMapper, @@ -174,8 +176,9 @@ public class McpAsyncServer { this.protocolVersions = mcpTransportProvider.protocolVersions(); - mcpTransportProvider.setSessionFactory(new DefaultMcpStreamableServerSessionFactory(requestTimeout, - this::asyncInitializeRequestHandler, requestHandlers, notificationHandlers)); + mcpTransportProvider.setSessionFactory( + new DefaultMcpStreamableServerSessionFactory(requestTimeout, this::asyncInitializeRequestHandler, + requestHandlers, notificationHandlers, sessionId -> this.cleanupForSession(sessionId))); } private Map prepareNotificationHandlers(McpServerFeatures.Async features) { @@ -215,6 +218,10 @@ private Map> prepareRequestHandlers() { requestHandlers.put(McpSchema.METHOD_RESOURCES_LIST, resourcesListRequestHandler()); requestHandlers.put(McpSchema.METHOD_RESOURCES_READ, resourcesReadRequestHandler()); requestHandlers.put(McpSchema.METHOD_RESOURCES_TEMPLATES_LIST, resourceTemplateListRequestHandler()); + if (Boolean.TRUE.equals(this.serverCapabilities.resources().subscribe())) { + requestHandlers.put(McpSchema.METHOD_RESOURCES_SUBSCRIBE, resourcesSubscribeRequestHandler()); + requestHandlers.put(McpSchema.METHOD_RESOURCES_UNSUBSCRIBE, resourcesUnsubscribeRequestHandler()); + } } // Add prompts API handlers if provider exists @@ -685,12 +692,73 @@ public Mono notifyResourcesListChanged() { } /** - * Notifies clients that the resources have updated. - * @return A Mono that completes when all clients have been notified + * Notifies only the sessions that have subscribed to the updated resource URI. + * @param resourcesUpdatedNotification the notification containing the updated + * resource URI + * @return A Mono that completes when all subscribed sessions have been notified */ public Mono notifyResourcesUpdated(McpSchema.ResourcesUpdatedNotification resourcesUpdatedNotification) { - return this.mcpTransportProvider.notifyClients(McpSchema.METHOD_NOTIFICATION_RESOURCES_UPDATED, - resourcesUpdatedNotification); + return Mono.defer(() -> { + String uri = resourcesUpdatedNotification.uri(); + Set subscribedSessions = this.resourceSubscriptions.get(uri); + if (subscribedSessions == null || subscribedSessions.isEmpty()) { + logger.debug("No sessions subscribed to resource URI: {}", uri); + return Mono.empty(); + } + return Flux.fromIterable(subscribedSessions) + .flatMap(sessionId -> this.mcpTransportProvider + .notifyClient(sessionId, McpSchema.METHOD_NOTIFICATION_RESOURCES_UPDATED, + resourcesUpdatedNotification) + .doOnError(e -> logger.error("Failed to notify session {} of resource update for {}", sessionId, + uri, e)) + .onErrorComplete()) + .then(); + }); + } + + private Mono cleanupForSession(String sessionId) { + return Mono.fromRunnable(() -> { + removeSessionSubscriptions(sessionId); + }); + } + + private void removeSessionSubscriptions(String sessionId) { + this.resourceSubscriptions.forEach((uri, sessions) -> sessions.remove(sessionId)); + this.resourceSubscriptions.entrySet().removeIf(entry -> entry.getValue().isEmpty()); + } + + private McpRequestHandler resourcesSubscribeRequestHandler() { + return (exchange, params) -> Mono.defer(() -> { + McpSchema.SubscribeRequest subscribeRequest = jsonMapper.convertValue(params, + new TypeRef() { + }); + String uri = subscribeRequest.uri(); + String sessionId = exchange.sessionId(); + this.resourceSubscriptions.computeIfAbsent(uri, k -> Collections.newSetFromMap(new ConcurrentHashMap<>())) + .add(sessionId); + logger.debug("Session {} subscribed to resource URI: {}", sessionId, uri); + + return Mono.just(Map.of()); + }); + } + + private McpRequestHandler resourcesUnsubscribeRequestHandler() { + return (exchange, params) -> Mono.defer(() -> { + McpSchema.UnsubscribeRequest unsubscribeRequest = jsonMapper.convertValue(params, + new TypeRef() { + }); + String uri = unsubscribeRequest.uri(); + String sessionId = exchange.sessionId(); + Set sessions = this.resourceSubscriptions.get(uri); + if (sessions != null) { + sessions.remove(sessionId); + if (sessions.isEmpty()) { + this.resourceSubscriptions.remove(uri, sessions); + } + } + logger.debug("Session {} unsubscribed from resource URI: {}", sessionId, uri); + return Mono.just(Map.of()); + }); } private McpRequestHandler resourcesListRequestHandler() { @@ -878,10 +946,6 @@ private McpRequestHandler setLoggerRequestHandler() { exchange.setMinLoggingLevel(newMinLoggingLevel.level()); - // FIXME: this field is deprecated and should be removed together - // with the broadcasting loggingNotification. - this.minLoggingLevel = newMinLoggingLevel.level(); - return Mono.just(Map.of()); }); }; diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProvider.java b/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProvider.java index 7037ff293..decd2ff77 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProvider.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProvider.java @@ -228,6 +228,24 @@ public Mono notifyClients(String method, Object params) { .then(); } + @Override + public Mono notifyClient(String sessionId, String method, Object params) { + return Mono.defer(() -> { + // Need to iterate in O(n) because the transport session id + // is different from the server-logical session id + McpServerSession session = sessions.values() + .stream() + .filter(s -> sessionId.equals(s.getId())) + .findFirst() + .orElse(null); + if (session == null) { + logger.debug("Session {} not found", sessionId); + return Mono.empty(); + } + return session.sendNotification(method, params); + }); + } + /** * Handles GET requests to establish SSE connections. *

diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletStreamableServerTransportProvider.java b/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletStreamableServerTransportProvider.java index d7561188c..95edb63a0 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletStreamableServerTransportProvider.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletStreamableServerTransportProvider.java @@ -206,6 +206,18 @@ public Mono notifyClients(String method, Object params) { }); } + @Override + public Mono notifyClient(String sessionId, String method, Object params) { + return Mono.defer(() -> { + McpStreamableServerSession session = this.sessions.get(sessionId); + if (session == null) { + logger.debug("Session {} not found", sessionId); + return Mono.empty(); + } + return session.sendNotification(method, params); + }); + } + /** * Initiates a graceful shutdown of the transport. * @return A Mono that completes when all cleanup operations are finished diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/StdioServerTransportProvider.java b/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/StdioServerTransportProvider.java index d288ea3d6..79be014a6 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/StdioServerTransportProvider.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/StdioServerTransportProvider.java @@ -98,12 +98,26 @@ public void setSessionFactory(McpServerSession.Factory sessionFactory) { @Override public Mono notifyClients(String method, Object params) { if (this.session == null) { - return Mono.error(new IllegalStateException("No session to close")); + return Mono.error(new IllegalStateException("No session to notify")); } return this.session.sendNotification(method, params) .doOnError(e -> logger.error("Failed to send notification: {}", e.getMessage())); } + @Override + public Mono notifyClient(String sessionId, String method, Object params) { + return Mono.defer(() -> { + if (this.session == null) { + return Mono.error(new IllegalStateException("No session to notify")); + } + if (!this.session.getId().equals(sessionId)) { + return Mono.error(new IllegalStateException("Existing session id " + this.session.getId() + + " doesn't match the notification target: " + sessionId)); + } + return this.session.sendNotification(method, params); + }); + } + @Override public Mono closeGracefully() { if (this.session == null) { diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/spec/DefaultMcpStreamableServerSessionFactory.java b/mcp-core/src/main/java/io/modelcontextprotocol/spec/DefaultMcpStreamableServerSessionFactory.java index f497afd43..65da43202 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/spec/DefaultMcpStreamableServerSessionFactory.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/spec/DefaultMcpStreamableServerSessionFactory.java @@ -10,6 +10,9 @@ import java.time.Duration; import java.util.Map; import java.util.UUID; +import java.util.function.Function; + +import reactor.core.publisher.Mono; /** * A default implementation of {@link McpStreamableServerSession.Factory}. @@ -26,29 +29,53 @@ public class DefaultMcpStreamableServerSessionFactory implements McpStreamableSe Map notificationHandlers; + private final Function> onClose; + /** - * Constructs an instance + * Constructs an instance. * @param requestTimeout timeout for requests * @param initRequestHandler initialization request handler * @param requestHandlers map of MCP request handlers keyed by method name * @param notificationHandlers map of MCP notification handlers keyed by method name + * @param onClose reactive callback invoked with the session ID when a session is + * closed */ public DefaultMcpStreamableServerSessionFactory(Duration requestTimeout, McpStreamableServerSession.InitRequestHandler initRequestHandler, - Map> requestHandlers, - Map notificationHandlers) { + Map> requestHandlers, Map notificationHandlers, + Function> onClose) { this.requestTimeout = requestTimeout; this.initRequestHandler = initRequestHandler; this.requestHandlers = requestHandlers; this.notificationHandlers = notificationHandlers; + this.onClose = onClose; + } + + /** + * Constructs an instance. + * @param requestTimeout timeout for requests + * @param initRequestHandler initialization request handler + * @param requestHandlers map of MCP request handlers keyed by method name + * @param notificationHandlers map of MCP notification handlers keyed by method name + * @deprecated Use + * {@link #DefaultMcpStreamableServerSessionFactory(Duration, McpStreamableServerSession.InitRequestHandler, Map, Map, Function)} + * instead + */ + @Deprecated + public DefaultMcpStreamableServerSessionFactory(Duration requestTimeout, + McpStreamableServerSession.InitRequestHandler initRequestHandler, + Map> requestHandlers, + Map notificationHandlers) { + this(requestTimeout, initRequestHandler, requestHandlers, notificationHandlers, sessionId -> Mono.empty()); } @Override public McpStreamableServerSession.McpStreamableServerSessionInit startSession( McpSchema.InitializeRequest initializeRequest) { - return new McpStreamableServerSession.McpStreamableServerSessionInit( - new McpStreamableServerSession(UUID.randomUUID().toString(), initializeRequest.capabilities(), - initializeRequest.clientInfo(), requestTimeout, requestHandlers, notificationHandlers), + String sessionId = UUID.randomUUID().toString(); + return new McpStreamableServerSession.McpStreamableServerSessionInit(new McpStreamableServerSession(sessionId, + initializeRequest.capabilities(), initializeRequest.clientInfo(), requestTimeout, requestHandlers, + notificationHandlers, () -> this.onClose.apply(sessionId)), this.initRequestHandler.handle(initializeRequest)); } diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java index ecb1dafd8..fc011a4e3 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java @@ -7,6 +7,7 @@ import java.time.Duration; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -65,25 +66,47 @@ public class McpServerSession implements McpLoggableSession { private volatile McpSchema.LoggingLevel minLoggingLevel = McpSchema.LoggingLevel.INFO; + private final Supplier> onClose; + /** * Creates a new server session with the given parameters and the transport to use. * @param id session id + * @param requestTimeout duration to wait for request responses before timing out * @param transport the transport to use * @param initHandler called when a * {@link io.modelcontextprotocol.spec.McpSchema.InitializeRequest} is received by the * server * @param requestHandlers map of request handlers to use * @param notificationHandlers map of notification handlers to use + * @param onClose supplier of a reactive callback invoked when the session is closed */ public McpServerSession(String id, Duration requestTimeout, McpServerTransport transport, McpInitRequestHandler initHandler, Map> requestHandlers, - Map notificationHandlers) { + Map notificationHandlers, Supplier> onClose) { this.id = id; this.requestTimeout = requestTimeout; this.transport = transport; this.initRequestHandler = initHandler; this.requestHandlers = requestHandlers; this.notificationHandlers = notificationHandlers; + this.onClose = onClose; + } + + /** + * Creates a new server session with the given parameters and the transport to use. + * @param id session id + * @param requestTimeout duration to wait for request responses before timing out + * @param transport the transport to use + * @param initHandler called when a + * {@link io.modelcontextprotocol.spec.McpSchema.InitializeRequest} is received by the + * server + * @param requestHandlers map of request handlers to use + * @param notificationHandlers map of notification handlers to use + */ + public McpServerSession(String id, Duration requestTimeout, McpServerTransport transport, + McpInitRequestHandler initHandler, Map> requestHandlers, + Map notificationHandlers) { + this(id, requestTimeout, transport, initHandler, requestHandlers, notificationHandlers, Mono::empty); } /** @@ -318,12 +341,13 @@ private MethodNotFoundError getMethodNotFoundError(String method) { @Override public Mono closeGracefully() { // TODO: clear pendingResponses and emit errors? - return this.transport.closeGracefully(); + return this.onClose.get().onErrorComplete().then(this.transport.closeGracefully()); } @Override public void close() { // TODO: clear pendingResponses and emit errors? + this.onClose.get().onErrorComplete().subscribe(); this.transport.close(); } diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpServerTransportProviderBase.java b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpServerTransportProviderBase.java index acb1ecac6..8d5e0f847 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpServerTransportProviderBase.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpServerTransportProviderBase.java @@ -45,6 +45,23 @@ public interface McpServerTransportProviderBase { */ Mono notifyClients(String method, Object params); + /** + * Sends a notification to a specific client session. Transport providers that support + * resource subscriptions must override this method to enable per-session + * notifications. The default implementation returns an error indicating that this + * operation is not supported. + * @param sessionId the id of the session to notify + * @param method the name of the notification method to be called on the client + * @param params parameters to be sent with the notification + * @return a Mono that completes when the notification has been sent, or empty if the + * session is not found + */ + default Mono notifyClient(String sessionId, String method, Object params) { + return Mono.error( + new UnsupportedOperationException("This transport provider does not support per-session notifications. " + + "Override notifyClient() to enable resource subscription support.")); + } + /** * Immediately closes all the transports with connected clients and releases any * associated resources. diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpStreamableServerSession.java b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpStreamableServerSession.java index 95f8959f5..9ec2117bb 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpStreamableServerSession.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/spec/McpStreamableServerSession.java @@ -62,6 +62,8 @@ public class McpStreamableServerSession implements McpLoggableSession { private volatile McpSchema.LoggingLevel minLoggingLevel = McpSchema.LoggingLevel.INFO; + private final Supplier> onClose; + /** * Create an instance of the streamable session. * @param id session ID @@ -71,11 +73,12 @@ public class McpStreamableServerSession implements McpLoggableSession { * @param requestHandlers the map of MCP request handlers keyed by method name * @param notificationHandlers the map of MCP notification handlers keyed by method * name + * @param onClose supplier of a reactive callback invoked when the session is closed */ public McpStreamableServerSession(String id, McpSchema.ClientCapabilities clientCapabilities, McpSchema.Implementation clientInfo, Duration requestTimeout, - Map> requestHandlers, - Map notificationHandlers) { + Map> requestHandlers, Map notificationHandlers, + Supplier> onClose) { this.id = id; this.missingMcpTransportSession = new MissingMcpTransportSession(id); this.listeningStreamRef = new AtomicReference<>(this.missingMcpTransportSession); @@ -84,6 +87,24 @@ public McpStreamableServerSession(String id, McpSchema.ClientCapabilities client this.requestTimeout = requestTimeout; this.requestHandlers = requestHandlers; this.notificationHandlers = notificationHandlers; + this.onClose = onClose; + } + + /** + * Create an instance of the streamable session. + * @param id session ID + * @param clientCapabilities client capabilities + * @param clientInfo client info + * @param requestTimeout timeout to use for requests + * @param requestHandlers the map of MCP request handlers keyed by method name + * @param notificationHandlers the map of MCP notification handlers keyed by method + * name + */ + public McpStreamableServerSession(String id, McpSchema.ClientCapabilities clientCapabilities, + McpSchema.Implementation clientInfo, Duration requestTimeout, + Map> requestHandlers, + Map notificationHandlers) { + this(id, clientCapabilities, clientInfo, requestTimeout, requestHandlers, notificationHandlers, Mono::empty); } @Override @@ -126,6 +147,7 @@ public Mono sendNotification(String method, Object params) { } public Mono delete() { + // onClose is invoked inside closeGracefully return this.closeGracefully().then(Mono.fromRunnable(() -> { // TODO: review in the context of history storage // delete history, etc. @@ -258,15 +280,16 @@ private MethodNotFoundError getMethodNotFoundError(String method) { @Override public Mono closeGracefully() { - return Mono.defer(() -> { + return this.onClose.get().onErrorComplete().then(Mono.defer(() -> { McpLoggableSession listeningStream = this.listeningStreamRef.getAndSet(missingMcpTransportSession); return listeningStream.closeGracefully(); // TODO: Also close all the open streams - }); + })); } @Override public void close() { + this.onClose.get().onErrorComplete().subscribe(); McpLoggableSession listeningStream = this.listeningStreamRef.getAndSet(missingMcpTransportSession); if (listeningStream != null) { listeningStream.close(); diff --git a/mcp-test/src/main/java/io/modelcontextprotocol/AbstractMcpClientServerIntegrationTests.java b/mcp-test/src/main/java/io/modelcontextprotocol/AbstractMcpClientServerIntegrationTests.java index 270bc4308..1ed9b270a 100644 --- a/mcp-test/src/main/java/io/modelcontextprotocol/AbstractMcpClientServerIntegrationTests.java +++ b/mcp-test/src/main/java/io/modelcontextprotocol/AbstractMcpClientServerIntegrationTests.java @@ -1746,6 +1746,95 @@ void testStructuredOutputRuntimeToolAddition(String clientType) { } } + // --------------------------------------- + // Resource Subscription Tests + // --------------------------------------- + + @ParameterizedTest(name = "{0} : {displayName} ") + @MethodSource("clientsForTesting") + void testResourceSubscription(String clientType) throws InterruptedException { + + var clientBuilder = clientBuilders.get(clientType); + + String resourceUri = "test://subscribable-resource"; + var receivedContents = new AtomicReference>(); + var latch = new CountDownLatch(1); + + McpServerFeatures.SyncResourceSpecification resourceSpec = new McpServerFeatures.SyncResourceSpecification( + McpSchema.Resource.builder() + .uri(resourceUri) + .name("Subscribable Resource") + .mimeType("text/plain") + .build(), + (exchange, req) -> new McpSchema.ReadResourceResult( + List.of(new McpSchema.TextResourceContents(resourceUri, "text/plain", "initial content")))); + + McpSyncServer mcpServer = prepareSyncServerBuilder().serverInfo("test-server", "1.0.0") + .capabilities(McpSchema.ServerCapabilities.builder().resources(true, false).build()) + .resources(resourceSpec) + .build(); + + try (var mcpClient = clientBuilder.resourcesUpdateConsumer(contents -> { + receivedContents.set(contents); + latch.countDown(); + }).build()) { + + mcpClient.initialize(); + + mcpClient.subscribeResource(new McpSchema.SubscribeRequest(resourceUri)); + + mcpServer.notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification(resourceUri)); + + assertThat(latch.await(5, TimeUnit.SECONDS)) + .as("client should receive the resources/updated notification within 5 seconds") + .isTrue(); + assertThat(receivedContents.get()).isNotEmpty(); + } + finally { + mcpServer.closeGracefully(); + } + } + + @ParameterizedTest(name = "{0} : {displayName} ") + @MethodSource("clientsForTesting") + void testResourceSubscription_afterUnsubscribe_noNotification(String clientType) throws InterruptedException { + + var clientBuilder = clientBuilders.get(clientType); + + String resourceUri = "test://subscribable-resource-unsub"; + var notificationCount = new java.util.concurrent.atomic.AtomicInteger(0); + + McpServerFeatures.SyncResourceSpecification resourceSpec = new McpServerFeatures.SyncResourceSpecification( + McpSchema.Resource.builder() + .uri(resourceUri) + .name("Subscribable Resource") + .mimeType("text/plain") + .build(), + (exchange, req) -> new McpSchema.ReadResourceResult( + List.of(new McpSchema.TextResourceContents(resourceUri, "text/plain", "content")))); + + McpSyncServer mcpServer = prepareSyncServerBuilder().serverInfo("test-server", "1.0.0") + .capabilities(McpSchema.ServerCapabilities.builder().resources(true, false).build()) + .resources(resourceSpec) + .build(); + + try (var mcpClient = clientBuilder.resourcesUpdateConsumer(contents -> notificationCount.incrementAndGet()) + .build()) { + + mcpClient.initialize(); + + mcpClient.subscribeResource(new McpSchema.SubscribeRequest(resourceUri)); + mcpClient.unsubscribeResource(new McpSchema.UnsubscribeRequest(resourceUri)); + + mcpServer.notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification(resourceUri)); + + assertThat(notificationCount.get()).as("no notification should be received after unsubscribing").isZero(); + } + finally { + mcpServer.closeGracefully(); + } + } + private double evaluateExpression(String expression) { // Simple expression evaluator for testing return switch (expression) { diff --git a/mcp-test/src/test/java/io/modelcontextprotocol/MockMcpServerTransport.java b/mcp-test/src/test/java/io/modelcontextprotocol/MockMcpServerTransport.java index fac26596a..78b607e87 100644 --- a/mcp-test/src/test/java/io/modelcontextprotocol/MockMcpServerTransport.java +++ b/mcp-test/src/test/java/io/modelcontextprotocol/MockMcpServerTransport.java @@ -6,6 +6,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.function.BiConsumer; import io.modelcontextprotocol.json.McpJsonDefaults; @@ -25,6 +26,10 @@ public class MockMcpServerTransport implements McpServerTransport { private final BiConsumer interceptor; + private volatile String awaitedResponseId; + + private volatile CountDownLatch responseLatch; + public MockMcpServerTransport() { this((t, msg) -> { }); @@ -34,10 +39,29 @@ public MockMcpServerTransport(BiConsumer sendMessage(McpSchema.JSONRPCMessage message) { sent.add(message); interceptor.accept(this, message); + if (message instanceof McpSchema.JSONRPCResponse r && r.id() != null + && r.id().toString().equals(awaitedResponseId)) { + CountDownLatch latch = responseLatch; + if (latch != null) { + awaitedResponseId = null; + responseLatch = null; + latch.countDown(); + } + } return Mono.empty(); } diff --git a/mcp-test/src/test/java/io/modelcontextprotocol/MockMcpServerTransportProvider.java b/mcp-test/src/test/java/io/modelcontextprotocol/MockMcpServerTransportProvider.java index e955be89f..9488870e5 100644 --- a/mcp-test/src/test/java/io/modelcontextprotocol/MockMcpServerTransportProvider.java +++ b/mcp-test/src/test/java/io/modelcontextprotocol/MockMcpServerTransportProvider.java @@ -38,6 +38,14 @@ public Mono notifyClients(String method, Object params) { return session.sendNotification(method, params); } + @Override + public Mono notifyClient(String sessionId, String method, Object params) { + if (session != null && session.getId().equals(sessionId)) { + return session.sendNotification(method, params); + } + return Mono.empty(); + } + @Override public Mono closeGracefully() { return session.closeGracefully(); diff --git a/mcp-test/src/test/java/io/modelcontextprotocol/server/ResourceSubscriptionTests.java b/mcp-test/src/test/java/io/modelcontextprotocol/server/ResourceSubscriptionTests.java new file mode 100644 index 000000000..564a53698 --- /dev/null +++ b/mcp-test/src/test/java/io/modelcontextprotocol/server/ResourceSubscriptionTests.java @@ -0,0 +1,188 @@ +/* + * Copyright 2025-2025 the original author or authors. + */ + +package io.modelcontextprotocol.server; + +import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import io.modelcontextprotocol.MockMcpServerTransport; +import io.modelcontextprotocol.MockMcpServerTransportProvider; +import io.modelcontextprotocol.spec.McpSchema; +import io.modelcontextprotocol.spec.ProtocolVersions; +import org.junit.jupiter.api.Test; +import reactor.test.StepVerifier; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Unit tests for resource subscription logic in {@link McpAsyncServer}. Uses + * {@link MockMcpServerTransportProvider} to drive sessions directly without a real + * network stack. + * + *

+ * Each test creates a transport with a response-interceptor latch so that + * {@code simulateIncomingMessage} (fire-and-forget) can be reliably awaited before + * asserting subscription state. + */ +class ResourceSubscriptionTests { + + private static final String RESOURCE_URI = "test://resource/1"; + + private static final McpSchema.Implementation SERVER_INFO = new McpSchema.Implementation("test-server", "1.0.0"); + + private static final McpSchema.Implementation CLIENT_INFO = new McpSchema.Implementation("test-client", "1.0.0"); + + private static McpAsyncServer buildServer(MockMcpServerTransportProvider transportProvider) { + return McpServer.async(transportProvider) + .serverInfo(SERVER_INFO) + .capabilities(McpSchema.ServerCapabilities.builder().resources(true, false).build()) + .build(); + } + + /** + * Sends a request through the transport provider and blocks until the session has + * sent a response for that request ID, guaranteeing the handler has fully executed. + */ + private static void sendAndAwait(MockMcpServerTransport transport, MockMcpServerTransportProvider transportProvider, + McpSchema.JSONRPCRequest request) throws InterruptedException { + String requestId = request.id().toString(); + CountDownLatch latch = new CountDownLatch(1); + transport.setInterceptorForNextResponse(requestId, latch); + transportProvider.simulateIncomingMessage(request); + assertThat(latch.await(5, TimeUnit.SECONDS)) + .as("server should have responded to request " + requestId + " within 5 s") + .isTrue(); + } + + private static McpSchema.JSONRPCRequest initRequest() { + return new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_INITIALIZE, + UUID.randomUUID().toString(), + new McpSchema.InitializeRequest(ProtocolVersions.MCP_2025_11_25, null, CLIENT_INFO)); + } + + private static McpSchema.JSONRPCNotification initializedNotification() { + return new McpSchema.JSONRPCNotification(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_NOTIFICATION_INITIALIZED, + null); + } + + private static McpSchema.JSONRPCRequest subscribeRequest(String uri) { + return new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_RESOURCES_SUBSCRIBE, + UUID.randomUUID().toString(), new McpSchema.SubscribeRequest(uri)); + } + + private static McpSchema.JSONRPCRequest unsubscribeRequest(String uri) { + return new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_RESOURCES_UNSUBSCRIBE, + UUID.randomUUID().toString(), new McpSchema.UnsubscribeRequest(uri)); + } + + @Test + void notifyResourcesUpdated_noSubscribers_completesEmpty() throws InterruptedException { + MockMcpServerTransport transport = new MockMcpServerTransport(); + MockMcpServerTransportProvider transportProvider = new MockMcpServerTransportProvider(transport); + McpAsyncServer server = buildServer(transportProvider); + + sendAndAwait(transport, transportProvider, initRequest()); + transportProvider.simulateIncomingMessage(initializedNotification()); + transport.clearSentMessages(); + + StepVerifier.create(server.notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification(RESOURCE_URI))) + .verifyComplete(); + + assertThat(transport.getAllSentMessages()).as("no notification should be sent when nobody is subscribed") + .isEmpty(); + + server.closeGracefully().block(); + } + + @Test + void notifyResourcesUpdated_afterSubscribe_notifiesSession() throws InterruptedException { + MockMcpServerTransport transport = new MockMcpServerTransport(); + MockMcpServerTransportProvider transportProvider = new MockMcpServerTransportProvider(transport); + McpAsyncServer server = buildServer(transportProvider); + + sendAndAwait(transport, transportProvider, initRequest()); + transportProvider.simulateIncomingMessage(initializedNotification()); + sendAndAwait(transport, transportProvider, subscribeRequest(RESOURCE_URI)); + transport.clearSentMessages(); + + StepVerifier.create(server.notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification(RESOURCE_URI))) + .verifyComplete(); + + McpSchema.JSONRPCMessage sent = transport.getLastSentMessage(); + assertThat(sent).isInstanceOf(McpSchema.JSONRPCNotification.class); + McpSchema.JSONRPCNotification notification = (McpSchema.JSONRPCNotification) sent; + assertThat(notification.method()).isEqualTo(McpSchema.METHOD_NOTIFICATION_RESOURCES_UPDATED); + + server.closeGracefully().block(); + } + + @Test + void notifyResourcesUpdated_differentUri_doesNotNotifySession() throws InterruptedException { + MockMcpServerTransport transport = new MockMcpServerTransport(); + MockMcpServerTransportProvider transportProvider = new MockMcpServerTransportProvider(transport); + McpAsyncServer server = buildServer(transportProvider); + + sendAndAwait(transport, transportProvider, initRequest()); + transportProvider.simulateIncomingMessage(initializedNotification()); + sendAndAwait(transport, transportProvider, subscribeRequest(RESOURCE_URI)); + transport.clearSentMessages(); + + StepVerifier + .create(server.notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification("test://other/resource"))) + .verifyComplete(); + + assertThat(transport.getAllSentMessages()) + .as("notification for a different URI should not reach a session subscribed to a different URI") + .isEmpty(); + + server.closeGracefully().block(); + } + + @Test + void notifyResourcesUpdated_afterUnsubscribe_doesNotNotifySession() throws InterruptedException { + MockMcpServerTransport transport = new MockMcpServerTransport(); + MockMcpServerTransportProvider transportProvider = new MockMcpServerTransportProvider(transport); + McpAsyncServer server = buildServer(transportProvider); + + sendAndAwait(transport, transportProvider, initRequest()); + transportProvider.simulateIncomingMessage(initializedNotification()); + sendAndAwait(transport, transportProvider, subscribeRequest(RESOURCE_URI)); + sendAndAwait(transport, transportProvider, unsubscribeRequest(RESOURCE_URI)); + transport.clearSentMessages(); + + StepVerifier.create(server.notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification(RESOURCE_URI))) + .verifyComplete(); + + assertThat(transport.getAllSentMessages()).as("no notification should be sent after the session unsubscribed") + .isEmpty(); + + server.closeGracefully().block(); + } + + @Test + void notifyResourcesUpdated_afterSessionClose_doesNotNotifySession() throws InterruptedException { + MockMcpServerTransport transport = new MockMcpServerTransport(); + MockMcpServerTransportProvider transportProvider = new MockMcpServerTransportProvider(transport); + McpAsyncServer server = buildServer(transportProvider); + + sendAndAwait(transport, transportProvider, initRequest()); + transportProvider.simulateIncomingMessage(initializedNotification()); + sendAndAwait(transport, transportProvider, subscribeRequest(RESOURCE_URI)); + + // Close the session; onClose must fire and remove the subscription + transportProvider.closeGracefully().block(); + transport.clearSentMessages(); + + StepVerifier.create(server.notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification(RESOURCE_URI))) + .verifyComplete(); + + assertThat(transport.getAllSentMessages()).as("no notification should be sent after the session has closed") + .isEmpty(); + + server.closeGracefully().block(); + } + +} From 96c7d488cfd878f3e25bb6739c1058dad2b47ab6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Fri, 27 Feb 2026 08:26:48 +0100 Subject: [PATCH 2/4] Remove unnecessary test utils MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Dariusz Jędrzejczyk --- .../MockMcpServerTransport.java | 23 --------- .../server/ResourceSubscriptionTests.java | 47 ++++++------------- 2 files changed, 15 insertions(+), 55 deletions(-) diff --git a/mcp-test/src/test/java/io/modelcontextprotocol/MockMcpServerTransport.java b/mcp-test/src/test/java/io/modelcontextprotocol/MockMcpServerTransport.java index 78b607e87..9d43968e5 100644 --- a/mcp-test/src/test/java/io/modelcontextprotocol/MockMcpServerTransport.java +++ b/mcp-test/src/test/java/io/modelcontextprotocol/MockMcpServerTransport.java @@ -26,10 +26,6 @@ public class MockMcpServerTransport implements McpServerTransport { private final BiConsumer interceptor; - private volatile String awaitedResponseId; - - private volatile CountDownLatch responseLatch; - public MockMcpServerTransport() { this((t, msg) -> { }); @@ -39,29 +35,10 @@ public MockMcpServerTransport(BiConsumer sendMessage(McpSchema.JSONRPCMessage message) { sent.add(message); interceptor.accept(this, message); - if (message instanceof McpSchema.JSONRPCResponse r && r.id() != null - && r.id().toString().equals(awaitedResponseId)) { - CountDownLatch latch = responseLatch; - if (latch != null) { - awaitedResponseId = null; - responseLatch = null; - latch.countDown(); - } - } return Mono.empty(); } diff --git a/mcp-test/src/test/java/io/modelcontextprotocol/server/ResourceSubscriptionTests.java b/mcp-test/src/test/java/io/modelcontextprotocol/server/ResourceSubscriptionTests.java index 564a53698..7325b47d0 100644 --- a/mcp-test/src/test/java/io/modelcontextprotocol/server/ResourceSubscriptionTests.java +++ b/mcp-test/src/test/java/io/modelcontextprotocol/server/ResourceSubscriptionTests.java @@ -5,8 +5,6 @@ package io.modelcontextprotocol.server; import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import io.modelcontextprotocol.MockMcpServerTransport; import io.modelcontextprotocol.MockMcpServerTransportProvider; @@ -42,21 +40,6 @@ private static McpAsyncServer buildServer(MockMcpServerTransportProvider transpo .build(); } - /** - * Sends a request through the transport provider and blocks until the session has - * sent a response for that request ID, guaranteeing the handler has fully executed. - */ - private static void sendAndAwait(MockMcpServerTransport transport, MockMcpServerTransportProvider transportProvider, - McpSchema.JSONRPCRequest request) throws InterruptedException { - String requestId = request.id().toString(); - CountDownLatch latch = new CountDownLatch(1); - transport.setInterceptorForNextResponse(requestId, latch); - transportProvider.simulateIncomingMessage(request); - assertThat(latch.await(5, TimeUnit.SECONDS)) - .as("server should have responded to request " + requestId + " within 5 s") - .isTrue(); - } - private static McpSchema.JSONRPCRequest initRequest() { return new McpSchema.JSONRPCRequest(McpSchema.JSONRPC_VERSION, McpSchema.METHOD_INITIALIZE, UUID.randomUUID().toString(), @@ -79,12 +62,12 @@ private static McpSchema.JSONRPCRequest unsubscribeRequest(String uri) { } @Test - void notifyResourcesUpdated_noSubscribers_completesEmpty() throws InterruptedException { + void notifyResourcesUpdated_noSubscribers_completesEmpty() { MockMcpServerTransport transport = new MockMcpServerTransport(); MockMcpServerTransportProvider transportProvider = new MockMcpServerTransportProvider(transport); McpAsyncServer server = buildServer(transportProvider); - sendAndAwait(transport, transportProvider, initRequest()); + transportProvider.simulateIncomingMessage(initRequest()); transportProvider.simulateIncomingMessage(initializedNotification()); transport.clearSentMessages(); @@ -98,14 +81,14 @@ void notifyResourcesUpdated_noSubscribers_completesEmpty() throws InterruptedExc } @Test - void notifyResourcesUpdated_afterSubscribe_notifiesSession() throws InterruptedException { + void notifyResourcesUpdated_afterSubscribe_notifiesSession() { MockMcpServerTransport transport = new MockMcpServerTransport(); MockMcpServerTransportProvider transportProvider = new MockMcpServerTransportProvider(transport); McpAsyncServer server = buildServer(transportProvider); - sendAndAwait(transport, transportProvider, initRequest()); + transportProvider.simulateIncomingMessage(initRequest()); transportProvider.simulateIncomingMessage(initializedNotification()); - sendAndAwait(transport, transportProvider, subscribeRequest(RESOURCE_URI)); + transportProvider.simulateIncomingMessage(subscribeRequest(RESOURCE_URI)); transport.clearSentMessages(); StepVerifier.create(server.notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification(RESOURCE_URI))) @@ -120,14 +103,14 @@ void notifyResourcesUpdated_afterSubscribe_notifiesSession() throws InterruptedE } @Test - void notifyResourcesUpdated_differentUri_doesNotNotifySession() throws InterruptedException { + void notifyResourcesUpdated_differentUri_doesNotNotifySession() { MockMcpServerTransport transport = new MockMcpServerTransport(); MockMcpServerTransportProvider transportProvider = new MockMcpServerTransportProvider(transport); McpAsyncServer server = buildServer(transportProvider); - sendAndAwait(transport, transportProvider, initRequest()); + transportProvider.simulateIncomingMessage(initRequest()); transportProvider.simulateIncomingMessage(initializedNotification()); - sendAndAwait(transport, transportProvider, subscribeRequest(RESOURCE_URI)); + transportProvider.simulateIncomingMessage(subscribeRequest(RESOURCE_URI)); transport.clearSentMessages(); StepVerifier @@ -142,15 +125,15 @@ void notifyResourcesUpdated_differentUri_doesNotNotifySession() throws Interrupt } @Test - void notifyResourcesUpdated_afterUnsubscribe_doesNotNotifySession() throws InterruptedException { + void notifyResourcesUpdated_afterUnsubscribe_doesNotNotifySession() { MockMcpServerTransport transport = new MockMcpServerTransport(); MockMcpServerTransportProvider transportProvider = new MockMcpServerTransportProvider(transport); McpAsyncServer server = buildServer(transportProvider); - sendAndAwait(transport, transportProvider, initRequest()); + transportProvider.simulateIncomingMessage(initRequest()); transportProvider.simulateIncomingMessage(initializedNotification()); - sendAndAwait(transport, transportProvider, subscribeRequest(RESOURCE_URI)); - sendAndAwait(transport, transportProvider, unsubscribeRequest(RESOURCE_URI)); + transportProvider.simulateIncomingMessage(subscribeRequest(RESOURCE_URI)); + transportProvider.simulateIncomingMessage(unsubscribeRequest(RESOURCE_URI)); transport.clearSentMessages(); StepVerifier.create(server.notifyResourcesUpdated(new McpSchema.ResourcesUpdatedNotification(RESOURCE_URI))) @@ -163,14 +146,14 @@ void notifyResourcesUpdated_afterUnsubscribe_doesNotNotifySession() throws Inter } @Test - void notifyResourcesUpdated_afterSessionClose_doesNotNotifySession() throws InterruptedException { + void notifyResourcesUpdated_afterSessionClose_doesNotNotifySession() { MockMcpServerTransport transport = new MockMcpServerTransport(); MockMcpServerTransportProvider transportProvider = new MockMcpServerTransportProvider(transport); McpAsyncServer server = buildServer(transportProvider); - sendAndAwait(transport, transportProvider, initRequest()); + transportProvider.simulateIncomingMessage(initRequest()); transportProvider.simulateIncomingMessage(initializedNotification()); - sendAndAwait(transport, transportProvider, subscribeRequest(RESOURCE_URI)); + transportProvider.simulateIncomingMessage(subscribeRequest(RESOURCE_URI)); // Close the session; onClose must fire and remove the subscription transportProvider.closeGracefully().block(); From 3af9568254c27396fd519c2ec5c763140502bdc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Fri, 27 Feb 2026 08:53:16 +0100 Subject: [PATCH 3/4] Improve comments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Dariusz Jędrzejczyk --- .../transport/HttpServletSseServerTransportProvider.java | 3 ++- .../server/ResourceSubscriptionTests.java | 5 ----- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProvider.java b/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProvider.java index decd2ff77..d3648a06f 100644 --- a/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProvider.java +++ b/mcp-core/src/main/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProvider.java @@ -232,7 +232,8 @@ public Mono notifyClients(String method, Object params) { public Mono notifyClient(String sessionId, String method, Object params) { return Mono.defer(() -> { // Need to iterate in O(n) because the transport session id - // is different from the server-logical session id + // is different from the server-logical session id (in streamable http this + // design issue was solved) McpServerSession session = sessions.values() .stream() .filter(s -> sessionId.equals(s.getId())) diff --git a/mcp-test/src/test/java/io/modelcontextprotocol/server/ResourceSubscriptionTests.java b/mcp-test/src/test/java/io/modelcontextprotocol/server/ResourceSubscriptionTests.java index 7325b47d0..016e25e9f 100644 --- a/mcp-test/src/test/java/io/modelcontextprotocol/server/ResourceSubscriptionTests.java +++ b/mcp-test/src/test/java/io/modelcontextprotocol/server/ResourceSubscriptionTests.java @@ -19,11 +19,6 @@ * Unit tests for resource subscription logic in {@link McpAsyncServer}. Uses * {@link MockMcpServerTransportProvider} to drive sessions directly without a real * network stack. - * - *

- * Each test creates a transport with a response-interceptor latch so that - * {@code simulateIncomingMessage} (fire-and-forget) can be reliably awaited before - * asserting subscription state. */ class ResourceSubscriptionTests { From 13a95c3e2841f64cd1090bb76fb0375b639b34a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Fri, 27 Feb 2026 09:33:27 +0100 Subject: [PATCH 4/4] Update conformance tests docs and settings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Dariusz Jędrzejczyk --- conformance-tests/VALIDATION_RESULTS.md | 23 ++++++++-------------- conformance-tests/conformance-baseline.yml | 5 ----- conformance-tests/server-servlet/README.md | 14 ++++--------- 3 files changed, 12 insertions(+), 30 deletions(-) diff --git a/conformance-tests/VALIDATION_RESULTS.md b/conformance-tests/VALIDATION_RESULTS.md index 19e74330c..0cba495e0 100644 --- a/conformance-tests/VALIDATION_RESULTS.md +++ b/conformance-tests/VALIDATION_RESULTS.md @@ -2,27 +2,22 @@ ## Summary -**Server Tests:** 37/40 passed (92.5%) +**Server Tests:** 40/40 passed (100%) **Client Tests:** 3/4 scenarios passed (9/10 checks passed) **Auth Tests:** 12/14 scenarios fully passing (178 passed, 1 failed, 1 warning, 85.7% scenarios, 98.9% checks) ## Server Test Results -### Passing (37/40) +### Passing (40/40) - **Lifecycle & Utilities (4/4):** initialize, ping, logging-set-level, completion-complete - **Tools (11/11):** All scenarios including progress notifications ✨ - **Elicitation (10/10):** SEP-1034 defaults (5 checks), SEP-1330 enums (5 checks) -- **Resources (4/6):** list, read-text, read-binary, templates-read +- **Resources (6/6):** list, read-text, read-binary, templates-read, subscribe, unsubscribe - **Prompts (4/4):** list, simple, with-args, embedded-resource, with-image - **SSE Transport (2/2):** Multiple streams - **Security (2/2):** Localhost validation passes, DNS rebinding protection -### Failing (3/40) - -1. **resources-subscribe** - Not implemented in SDK -2. **resources-unsubscribe** - Not implemented in SDK - ## Client Test Results ### Passing (3/4 scenarios, 9/10 checks) @@ -68,10 +63,9 @@ Uses the `client-spring-http-client` module with Spring Security OAuth2 and the ## Known Limitations -1. **Resource Subscriptions:** SDK doesn't implement `resources/subscribe` and `resources/unsubscribe` handlers -2. **Client SSE Retry:** Client doesn't parse or respect the `retry:` field, reconnects immediately, and doesn't send Last-Event-ID header -3. **Auth Scope Step-Up:** Client does not fully handle scope step-up challenges where the server requests additional scopes after initial authorization -4. **Auth Basic CIMD:** Minor conformance warning in the basic Client-Initiated Metadata Discovery flow +1. **Client SSE Retry:** Client doesn't parse or respect the `retry:` field, reconnects immediately, and doesn't send Last-Event-ID header +2. **Auth Scope Step-Up:** Client does not fully handle scope step-up challenges where the server requests additional scopes after initial authorization +3. **Auth Basic CIMD:** Minor conformance warning in the basic Client-Initiated Metadata Discovery flow ## Running Tests @@ -119,6 +113,5 @@ npx @modelcontextprotocol/conformance@0.1.15 client \ ### High Priority 1. Fix client SSE retry field handling in `HttpClientStreamableHttpTransport` -2. Implement resource subscription handlers in `McpStatelessAsyncServer` -3. Implement CIMD -4. Implement scope step up +2. Implement CIMD +3. Implement scope step up diff --git a/conformance-tests/conformance-baseline.yml b/conformance-tests/conformance-baseline.yml index 4ab144063..d2990c155 100644 --- a/conformance-tests/conformance-baseline.yml +++ b/conformance-tests/conformance-baseline.yml @@ -2,11 +2,6 @@ # This file lists known failing scenarios that are expected to fail until fixed. # See: https://github.com/modelcontextprotocol/conformance/blob/main/SDK_INTEGRATION.md -server: - # Resource subscription not implemented in SDK - - resources-subscribe - - resources-unsubscribe - client: # SSE retry field handling not implemented # - Client does not parse or respect retry: field timing diff --git a/conformance-tests/server-servlet/README.md b/conformance-tests/server-servlet/README.md index bd86636b6..ef327ecf6 100644 --- a/conformance-tests/server-servlet/README.md +++ b/conformance-tests/server-servlet/README.md @@ -4,7 +4,7 @@ This module contains a comprehensive MCP (Model Context Protocol) server impleme ## Conformance Test Results -**Status: 37 out of 40 tests passing (92.5%)** +**Status: 40 out of 40 tests passing (100%)** The server has been validated against the official [MCP conformance test suite](https://github.com/modelcontextprotocol/conformance). See [VALIDATION_RESULTS.md](../VALIDATION_RESULTS.md) for detailed results. @@ -22,9 +22,8 @@ The server has been validated against the official [MCP conformance test suite]( - SEP-1034: Default values for all primitive types - SEP-1330: All enum schema variants -✅ **Resources** (4/6) -- List, read text/binary, templates -- ⚠️ Subscribe/unsubscribe (SDK limitation) +✅ **Resources** (6/6) +- List, read text/binary, templates, subscribe, unsubscribe ✅ **Prompts** (4/4) - Simple, parameterized, embedded resources, images @@ -191,12 +190,7 @@ curl -X POST http://localhost:8080/mcp \ ## Known Limitations -See [VALIDATION_RESULTS.md](../VALIDATION_RESULTS.md) for details on: - -1. **Resource Subscriptions** - Not implemented in Java SDK -2. **DNS Rebinding Protection** - Missing Host/Origin validation - -These are SDK-level limitations that require fixes in the core framework. +See [VALIDATION_RESULTS.md](../VALIDATION_RESULTS.md) for details on remaining client-side limitations. ## References