From 5d43fe0f7cc22e33cb07db8b61f6663559e7fc10 Mon Sep 17 00:00:00 2001 From: Emmanuel Hugonnet Date: Tue, 19 May 2026 16:33:26 +0200 Subject: [PATCH] fix: prevent SSE stream closing before last write completes Replace the fragile 150ms sleep workaround in EventConsumer with a CAS-based state machine in the SSE subscribers that defers response.end() until the last response.write() handler fires. This fixes the race where onComplete() could close the HTTP response while a write was still in-flight, causing clients to see COMPLETED tasks with empty artifacts. Signed-off-by: Emmanuel Hugonnet --- .../apps/quarkus/A2AServerRoutes_v0_3.java | 36 +++++++++--- .../rest/quarkus/A2AServerRoutes_v0_3.java | 34 ++++++++++-- .../server/apps/quarkus/A2AServerRoutes.java | 55 ++++++++++++------- .../server/rest/quarkus/A2AServerRoutes.java | 55 +++++++++++-------- .../sdk/server/events/EventConsumer.java | 25 --------- 5 files changed, 124 insertions(+), 81 deletions(-) diff --git a/compat-0.3/reference/jsonrpc/src/main/java/org/a2aproject/sdk/compat03/server/apps/quarkus/A2AServerRoutes_v0_3.java b/compat-0.3/reference/jsonrpc/src/main/java/org/a2aproject/sdk/compat03/server/apps/quarkus/A2AServerRoutes_v0_3.java index 56ffc021f..79e1c079d 100644 --- a/compat-0.3/reference/jsonrpc/src/main/java/org/a2aproject/sdk/compat03/server/apps/quarkus/A2AServerRoutes_v0_3.java +++ b/compat-0.3/reference/jsonrpc/src/main/java/org/a2aproject/sdk/compat03/server/apps/quarkus/A2AServerRoutes_v0_3.java @@ -342,11 +342,16 @@ private MultiSseSupport() { // Avoid direct instantiation. } + private static final int IDLE = 0; + private static final int WRITE_PENDING = 1; + private static final int COMPLETE_DEFERRED = 2; + public static void writeSseStrings(Multi sseStrings, RoutingContext rc, ServerCallContext context) { HttpServerResponse response = rc.response(); sseStrings.subscribe().withSubscriber(new Flow.Subscriber() { Flow.@Nullable Subscription upstream; + final java.util.concurrent.atomic.AtomicInteger writeState = new java.util.concurrent.atomic.AtomicInteger(IDLE); @Override public void onSubscribe(Flow.Subscription subscription) { @@ -354,7 +359,7 @@ public void onSubscribe(Flow.Subscription subscription) { this.upstream.request(1); response.closeHandler(v -> { - logger.info("SSE connection closed by client, calling EventConsumer.cancel() to stop polling loop"); + logger.debug("SSE connection closed by client, calling EventConsumer.cancel() to stop polling loop"); context.invokeEventConsumerCancelCallback(); subscription.cancel(); }); @@ -380,14 +385,17 @@ public void onNext(String sseEvent) { response.write(": SSE stream started\n\n"); } + writeState.set(WRITE_PENDING); response.write(Buffer.buffer(sseEvent), new Handler>() { @Override public void handle(AsyncResult ar) { if (ar.failed()) { java.util.Objects.requireNonNull(upstream).cancel(); rc.fail(ar.cause()); - } else { + } else if (writeState.compareAndSet(WRITE_PENDING, IDLE)) { java.util.Objects.requireNonNull(upstream).request(1); + } else { + endResponse(response); } } }); @@ -401,13 +409,27 @@ public void onError(Throwable throwable) { @Override public void onComplete() { - if (response.bytesWritten() == 0) { - MultiMap headers = response.headers(); - if (headers.get(CONTENT_TYPE) == null) { - headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS); + if (!writeState.compareAndSet(WRITE_PENDING, COMPLETE_DEFERRED)) { + endResponse(response); + } + } + + private void endResponse(HttpServerResponse resp) { + Runnable doEnd = () -> { + if (resp.ended()) return; + if (resp.bytesWritten() == 0) { + MultiMap headers = resp.headers(); + if (headers.get(CONTENT_TYPE) == null) { + headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS); + } } + resp.end(); + }; + if (io.vertx.core.Context.isOnEventLoopThread()) { + doEnd.run(); + } else { + rc.vertx().runOnContext(v -> doEnd.run()); } - response.end(); } }); } diff --git a/compat-0.3/reference/rest/src/main/java/org/a2aproject/sdk/compat03/server/rest/quarkus/A2AServerRoutes_v0_3.java b/compat-0.3/reference/rest/src/main/java/org/a2aproject/sdk/compat03/server/rest/quarkus/A2AServerRoutes_v0_3.java index 505685ed5..3fce4b4cb 100644 --- a/compat-0.3/reference/rest/src/main/java/org/a2aproject/sdk/compat03/server/rest/quarkus/A2AServerRoutes_v0_3.java +++ b/compat-0.3/reference/rest/src/main/java/org/a2aproject/sdk/compat03/server/rest/quarkus/A2AServerRoutes_v0_3.java @@ -458,11 +458,16 @@ private MultiSseSupport() { // Avoid direct instantiation. } + private static final int IDLE = 0; + private static final int WRITE_PENDING = 1; + private static final int COMPLETE_DEFERRED = 2; + public static void writeSseStrings(Multi sseStrings, RoutingContext rc, ServerCallContext context) { HttpServerResponse response = rc.response(); sseStrings.subscribe().withSubscriber(new Flow.Subscriber() { Flow.@Nullable Subscription upstream; + final java.util.concurrent.atomic.AtomicInteger writeState = new java.util.concurrent.atomic.AtomicInteger(IDLE); @Override public void onSubscribe(Flow.Subscription subscription) { @@ -496,14 +501,17 @@ public void onNext(String sseEvent) { response.write(": SSE stream started\n\n"); } + writeState.set(WRITE_PENDING); response.write(Buffer.buffer(sseEvent), new Handler>() { @Override public void handle(AsyncResult ar) { if (ar.failed()) { java.util.Objects.requireNonNull(upstream).cancel(); rc.fail(ar.cause()); - } else { + } else if (writeState.compareAndSet(WRITE_PENDING, IDLE)) { java.util.Objects.requireNonNull(upstream).request(1); + } else { + endResponse(response); } } }); @@ -517,13 +525,27 @@ public void onError(Throwable throwable) { @Override public void onComplete() { - if (response.bytesWritten() == 0) { - MultiMap headers = response.headers(); - if (headers.get(CONTENT_TYPE) == null) { - headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS); + if (!writeState.compareAndSet(WRITE_PENDING, COMPLETE_DEFERRED)) { + endResponse(response); + } + } + + private void endResponse(HttpServerResponse resp) { + Runnable doEnd = () -> { + if (resp.ended()) return; + if (resp.bytesWritten() == 0) { + MultiMap headers = resp.headers(); + if (headers.get(CONTENT_TYPE) == null) { + headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS); + } } + resp.end(); + }; + if (io.vertx.core.Context.isOnEventLoopThread()) { + doEnd.run(); + } else { + rc.vertx().runOnContext(v -> doEnd.run()); } - response.end(); } }); } diff --git a/reference/jsonrpc/src/main/java/org/a2aproject/sdk/server/apps/quarkus/A2AServerRoutes.java b/reference/jsonrpc/src/main/java/org/a2aproject/sdk/server/apps/quarkus/A2AServerRoutes.java index e9df1bf06..838349867 100644 --- a/reference/jsonrpc/src/main/java/org/a2aproject/sdk/server/apps/quarkus/A2AServerRoutes.java +++ b/reference/jsonrpc/src/main/java/org/a2aproject/sdk/server/apps/quarkus/A2AServerRoutes.java @@ -81,6 +81,7 @@ import org.a2aproject.sdk.spec.TransportProtocol; import org.a2aproject.sdk.spec.UnsupportedOperationError; import org.a2aproject.sdk.transport.jsonrpc.handler.JSONRPCHandler; +import org.jspecify.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -807,11 +808,16 @@ private MultiSseSupport() { * @param context the A2A server call context (for EventConsumer cancellation) * @see SseFormatter#formatResponseAsSSE */ + private static final int IDLE = 0; + private static final int WRITE_PENDING = 1; + private static final int COMPLETE_DEFERRED = 2; + public static void writeSseStrings(Multi sseStrings, RoutingContext rc, ServerCallContext context) { HttpServerResponse response = rc.response(); sseStrings.subscribe().withSubscriber(new Flow.Subscriber() { - Flow.Subscription upstream; + Flow.@Nullable Subscription upstream; + final java.util.concurrent.atomic.AtomicInteger writeState = new java.util.concurrent.atomic.AtomicInteger(IDLE); @Override public void onSubscribe(Flow.Subscription subscription) { @@ -820,7 +826,7 @@ public void onSubscribe(Flow.Subscription subscription) { // Detect client disconnect and call EventConsumer.cancel() directly response.closeHandler(v -> { - logger.info("SSE connection closed by client, calling EventConsumer.cancel() to stop polling loop"); + logger.debug("SSE connection closed by client, calling EventConsumer.cancel() to stop polling loop"); context.invokeEventConsumerCancelCallback(); subscription.cancel(); }); @@ -840,29 +846,24 @@ public void onNext(String sseEvent) { if (headers.get(CONTENT_TYPE) == null) { headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS); } - // Additional SSE headers to prevent buffering headers.set("Cache-Control", "no-cache"); - headers.set("X-Accel-Buffering", "no"); // Disable nginx buffering + headers.set("X-Accel-Buffering", "no"); response.setChunked(true); - - // CRITICAL: Disable write queue max size to prevent buffering - // Vert.x buffers writes by default - we need immediate flushing for SSE response.setWriteQueueMaxSize(1); - - // Send initial SSE comment to kickstart the stream response.write(": SSE stream started\n\n"); } - // Write SSE-formatted string to response + writeState.set(WRITE_PENDING); response.write(Buffer.buffer(sseEvent), new Handler>() { @Override public void handle(AsyncResult ar) { if (ar.failed()) { - // Client disconnected or write failed - cancel upstream to stop EventConsumer - upstream.cancel(); + java.util.Objects.requireNonNull(upstream).cancel(); rc.fail(ar.cause()); + } else if (writeState.compareAndSet(WRITE_PENDING, IDLE)) { + java.util.Objects.requireNonNull(upstream).request(1); } else { - upstream.request(1); + endResponse(response); } } }); @@ -870,21 +871,33 @@ public void handle(AsyncResult ar) { @Override public void onError(Throwable throwable) { - // Cancel upstream to stop EventConsumer when error occurs - upstream.cancel(); + java.util.Objects.requireNonNull(upstream).cancel(); rc.fail(throwable); } @Override public void onComplete() { - if (response.bytesWritten() == 0) { - // No events written - still set SSE content type - MultiMap headers = response.headers(); - if (headers.get(CONTENT_TYPE) == null) { - headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS); + if (!writeState.compareAndSet(WRITE_PENDING, COMPLETE_DEFERRED)) { + endResponse(response); + } + } + + private void endResponse(HttpServerResponse resp) { + Runnable doEnd = () -> { + if (resp.ended()) return; + if (resp.bytesWritten() == 0) { + MultiMap headers = resp.headers(); + if (headers.get(CONTENT_TYPE) == null) { + headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS); + } } + resp.end(); + }; + if (io.vertx.core.Context.isOnEventLoopThread()) { + doEnd.run(); + } else { + rc.vertx().runOnContext(v -> doEnd.run()); } - response.end(); } }); } diff --git a/reference/rest/src/main/java/org/a2aproject/sdk/server/rest/quarkus/A2AServerRoutes.java b/reference/rest/src/main/java/org/a2aproject/sdk/server/rest/quarkus/A2AServerRoutes.java index 170aab63f..d148b30aa 100644 --- a/reference/rest/src/main/java/org/a2aproject/sdk/server/rest/quarkus/A2AServerRoutes.java +++ b/reference/rest/src/main/java/org/a2aproject/sdk/server/rest/quarkus/A2AServerRoutes.java @@ -998,11 +998,16 @@ private MultiSseSupport() { * @param rc Vert.x routing context providing HTTP response * @param context A2A server call context (for EventConsumer cancellation on disconnect) */ + private static final int IDLE = 0; + private static final int WRITE_PENDING = 1; + private static final int COMPLETE_DEFERRED = 2; + public static void writeSseStrings(Multi sseStrings, RoutingContext rc, ServerCallContext context) { HttpServerResponse response = rc.response(); sseStrings.subscribe().withSubscriber(new Flow.Subscriber() { Flow.@Nullable Subscription upstream; + final java.util.concurrent.atomic.AtomicInteger writeState = new java.util.concurrent.atomic.AtomicInteger(IDLE); @Override public void onSubscribe(Flow.Subscription subscription) { @@ -1031,32 +1036,25 @@ public void onNext(String sseEvent) { if (headers.get(CONTENT_TYPE) == null) { headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS); } - // Additional SSE headers to prevent buffering headers.set("Cache-Control", "no-cache"); - headers.set("X-Accel-Buffering", "no"); // Disable nginx buffering + headers.set("X-Accel-Buffering", "no"); response.setChunked(true); - - // CRITICAL: Disable write queue max size to prevent buffering - // Vert.x buffers writes by default - we need immediate flushing for SSE - response.setWriteQueueMaxSize(1); // Force immediate flush - - // Send initial SSE comment to kickstart the stream - // This forces Vert.x to send headers and start the stream immediately + response.setWriteQueueMaxSize(1); response.write(": SSE stream started\n\n"); } - // Write SSE-formatted string to response + writeState.set(WRITE_PENDING); response.write(Buffer.buffer(sseEvent), new Handler>() { @Override public void handle(AsyncResult ar) { if (ar.failed()) { - // Client disconnected or write failed - cancel upstream to stop EventConsumer - // NullAway: upstream is guaranteed non-null after onSubscribe java.util.Objects.requireNonNull(upstream).cancel(); rc.fail(ar.cause()); - } else { - // NullAway: upstream is guaranteed non-null after onSubscribe + } else if (writeState.compareAndSet(WRITE_PENDING, IDLE)) { java.util.Objects.requireNonNull(upstream).request(1); + } else { + // onComplete() arrived while write was pending — end the response now + endResponse(response); } } }); @@ -1064,22 +1062,35 @@ public void handle(AsyncResult ar) { @Override public void onError(Throwable throwable) { - // Cancel upstream to stop EventConsumer when error occurs - // NullAway: upstream is guaranteed non-null after onSubscribe java.util.Objects.requireNonNull(upstream).cancel(); rc.fail(throwable); } @Override public void onComplete() { - if (response.bytesWritten() == 0) { - // No events written - still set SSE content type - MultiMap headers = response.headers(); - if (headers.get(CONTENT_TYPE) == null) { - headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS); + if (!writeState.compareAndSet(WRITE_PENDING, COMPLETE_DEFERRED)) { + // No write in-flight — end immediately + endResponse(response); + } + // else: write handler will call endResponse when the write completes + } + + private void endResponse(HttpServerResponse resp) { + Runnable doEnd = () -> { + if (resp.ended()) return; + if (resp.bytesWritten() == 0) { + MultiMap headers = resp.headers(); + if (headers.get(CONTENT_TYPE) == null) { + headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS); + } } + resp.end(); + }; + if (io.vertx.core.Context.isOnEventLoopThread()) { + doEnd.run(); + } else { + rc.vertx().runOnContext(v -> doEnd.run()); } - response.end(); } }); } diff --git a/server-common/src/main/java/org/a2aproject/sdk/server/events/EventConsumer.java b/server-common/src/main/java/org/a2aproject/sdk/server/events/EventConsumer.java index aa357618f..ebda5821a 100644 --- a/server-common/src/main/java/org/a2aproject/sdk/server/events/EventConsumer.java +++ b/server-common/src/main/java/org/a2aproject/sdk/server/events/EventConsumer.java @@ -41,12 +41,6 @@ public class EventConsumer { private static final int MAX_AWAITING_FINAL_TIMEOUT_MS = 3000; private static final int MAX_POLL_TIMEOUTS_AWAITING_FINAL = (MAX_AWAITING_FINAL_TIMEOUT_MS + QUEUE_WAIT_MILLISECONDS - 1) / QUEUE_WAIT_MILLISECONDS; - // WORKAROUND: Sleep delay to allow SSE buffer flush before stream completion - // This is a temporary workaround for a race condition where tube.complete() can arrive - // before the final event is flushed from the SSE buffer. Ideally, this should be handled - // at the transport layer (e.g., MultiSseSupport) with proper write completion callbacks. - // TODO: Move buffer flush handling to transport layer to avoid this latency penalty - private static final int BUFFER_FLUSH_DELAY_MS = 150; public EventConsumer(EventQueue queue, Executor executor) { this.queue = queue; @@ -215,33 +209,14 @@ public Flow.Publisher consumeAll() { isFinalEvent = true; } - // Only send event if it's not a QueueClosedEvent - // QueueClosedEvent is an internal coordination event used for replication - // and should not be exposed to API consumers - boolean isFinalSent = false; if (!(event instanceof QueueClosedEvent)) { tube.send(item); - isFinalSent = isFinalEvent; } if (isFinalEvent) { LOGGER.debug("Final or interrupted event detected, closing queue and breaking loop for queue {}", System.identityHashCode(queue)); queue.close(); LOGGER.debug("Queue closed, breaking loop for queue {}", System.identityHashCode(queue)); - - // CRITICAL: Allow tube buffer to flush before calling tube.complete() - // tube.send() buffers events asynchronously. If we call tube.complete() immediately, - // the stream-end signal can reach the client BEFORE the buffered final event, - // causing the client to close the connection and never receive the final event. - // This is especially important in replicated scenarios where events arrive via Kafka - // and timing is less deterministic. - if (isFinalSent) { - try { - Thread.sleep(BUFFER_FLUSH_DELAY_MS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } break; } } catch (EventQueueClosedException e) {