-
Notifications
You must be signed in to change notification settings - Fork 148
fix: prevent SSE stream closing before last write completes #888
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -458,11 +458,16 @@ private MultiSseSupport() { | |
| // Avoid direct instantiation. | ||
| } | ||
|
|
||
| private static final int IDLE = 0; | ||
| private static final int WRITE_PENDING = 1; | ||
| private static final int COMPLETE_DEFERRED = 2; | ||
|
Comment on lines
+461
to
+463
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These constants are duplicated across multiple files. Please move them to a shared location (e.g., a common constants class or a shared module) to ensure consistency and avoid duplication. References
|
||
|
|
||
| public static void writeSseStrings(Multi<String> sseStrings, RoutingContext rc, ServerCallContext context) { | ||
| HttpServerResponse response = rc.response(); | ||
|
|
||
| sseStrings.subscribe().withSubscriber(new Flow.Subscriber<String>() { | ||
| Flow.@Nullable Subscription upstream; | ||
| final java.util.concurrent.atomic.AtomicInteger writeState = new java.util.concurrent.atomic.AtomicInteger(IDLE); | ||
|
|
||
| @Override | ||
| public void onSubscribe(Flow.Subscription subscription) { | ||
|
|
@@ -496,14 +501,17 @@ public void onNext(String sseEvent) { | |
| response.write(": SSE stream started\n\n"); | ||
| } | ||
|
|
||
| writeState.set(WRITE_PENDING); | ||
| response.write(Buffer.buffer(sseEvent), new Handler<AsyncResult<Void>>() { | ||
| @Override | ||
| public void handle(AsyncResult<Void> ar) { | ||
| if (ar.failed()) { | ||
| java.util.Objects.requireNonNull(upstream).cancel(); | ||
| rc.fail(ar.cause()); | ||
| } else { | ||
| } else if (writeState.compareAndSet(WRITE_PENDING, IDLE)) { | ||
| java.util.Objects.requireNonNull(upstream).request(1); | ||
| } else { | ||
| endResponse(response); | ||
| } | ||
| } | ||
| }); | ||
|
|
@@ -517,13 +525,27 @@ public void onError(Throwable throwable) { | |
|
|
||
| @Override | ||
| public void onComplete() { | ||
| if (response.bytesWritten() == 0) { | ||
| MultiMap headers = response.headers(); | ||
| if (headers.get(CONTENT_TYPE) == null) { | ||
| headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS); | ||
| if (!writeState.compareAndSet(WRITE_PENDING, COMPLETE_DEFERRED)) { | ||
| endResponse(response); | ||
| } | ||
| } | ||
|
|
||
| private void endResponse(HttpServerResponse resp) { | ||
| Runnable doEnd = () -> { | ||
| if (resp.ended()) return; | ||
| if (resp.bytesWritten() == 0) { | ||
| MultiMap headers = resp.headers(); | ||
| if (headers.get(CONTENT_TYPE) == null) { | ||
| headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS); | ||
| } | ||
| } | ||
| resp.end(); | ||
| }; | ||
| if (io.vertx.core.Context.isOnEventLoopThread()) { | ||
| doEnd.run(); | ||
| } else { | ||
| rc.vertx().runOnContext(v -> doEnd.run()); | ||
| } | ||
| response.end(); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -81,6 +81,7 @@ | |
| import org.a2aproject.sdk.spec.TransportProtocol; | ||
| import org.a2aproject.sdk.spec.UnsupportedOperationError; | ||
| import org.a2aproject.sdk.transport.jsonrpc.handler.JSONRPCHandler; | ||
| import org.jspecify.annotations.Nullable; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
|
|
@@ -807,11 +808,16 @@ private MultiSseSupport() { | |
| * @param context the A2A server call context (for EventConsumer cancellation) | ||
| * @see SseFormatter#formatResponseAsSSE | ||
| */ | ||
| private static final int IDLE = 0; | ||
| private static final int WRITE_PENDING = 1; | ||
| private static final int COMPLETE_DEFERRED = 2; | ||
|
Comment on lines
+811
to
+813
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These constants are duplicated across multiple files. Please move them to a shared location (e.g., a common constants class or a shared module) to ensure consistency and avoid duplication. References
|
||
|
|
||
| public static void writeSseStrings(Multi<String> sseStrings, RoutingContext rc, ServerCallContext context) { | ||
| HttpServerResponse response = rc.response(); | ||
|
|
||
| sseStrings.subscribe().withSubscriber(new Flow.Subscriber<String>() { | ||
| Flow.Subscription upstream; | ||
| Flow.@Nullable Subscription upstream; | ||
| final java.util.concurrent.atomic.AtomicInteger writeState = new java.util.concurrent.atomic.AtomicInteger(IDLE); | ||
|
|
||
| @Override | ||
| public void onSubscribe(Flow.Subscription subscription) { | ||
|
|
@@ -820,7 +826,7 @@ public void onSubscribe(Flow.Subscription subscription) { | |
|
|
||
| // Detect client disconnect and call EventConsumer.cancel() directly | ||
| response.closeHandler(v -> { | ||
| logger.info("SSE connection closed by client, calling EventConsumer.cancel() to stop polling loop"); | ||
| logger.debug("SSE connection closed by client, calling EventConsumer.cancel() to stop polling loop"); | ||
| context.invokeEventConsumerCancelCallback(); | ||
| subscription.cancel(); | ||
| }); | ||
|
|
@@ -840,51 +846,58 @@ public void onNext(String sseEvent) { | |
| if (headers.get(CONTENT_TYPE) == null) { | ||
| headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS); | ||
| } | ||
| // Additional SSE headers to prevent buffering | ||
| headers.set("Cache-Control", "no-cache"); | ||
| headers.set("X-Accel-Buffering", "no"); // Disable nginx buffering | ||
| headers.set("X-Accel-Buffering", "no"); | ||
| response.setChunked(true); | ||
|
|
||
| // CRITICAL: Disable write queue max size to prevent buffering | ||
| // Vert.x buffers writes by default - we need immediate flushing for SSE | ||
| response.setWriteQueueMaxSize(1); | ||
|
|
||
| // Send initial SSE comment to kickstart the stream | ||
| response.write(": SSE stream started\n\n"); | ||
| } | ||
|
|
||
| // Write SSE-formatted string to response | ||
| writeState.set(WRITE_PENDING); | ||
| response.write(Buffer.buffer(sseEvent), new Handler<AsyncResult<Void>>() { | ||
| @Override | ||
| public void handle(AsyncResult<Void> ar) { | ||
| if (ar.failed()) { | ||
| // Client disconnected or write failed - cancel upstream to stop EventConsumer | ||
| upstream.cancel(); | ||
| java.util.Objects.requireNonNull(upstream).cancel(); | ||
| rc.fail(ar.cause()); | ||
| } else if (writeState.compareAndSet(WRITE_PENDING, IDLE)) { | ||
| java.util.Objects.requireNonNull(upstream).request(1); | ||
| } else { | ||
| upstream.request(1); | ||
| endResponse(response); | ||
| } | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| @Override | ||
| public void onError(Throwable throwable) { | ||
| // Cancel upstream to stop EventConsumer when error occurs | ||
| upstream.cancel(); | ||
| java.util.Objects.requireNonNull(upstream).cancel(); | ||
| rc.fail(throwable); | ||
| } | ||
|
|
||
| @Override | ||
| public void onComplete() { | ||
| if (response.bytesWritten() == 0) { | ||
| // No events written - still set SSE content type | ||
| MultiMap headers = response.headers(); | ||
| if (headers.get(CONTENT_TYPE) == null) { | ||
| headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS); | ||
| if (!writeState.compareAndSet(WRITE_PENDING, COMPLETE_DEFERRED)) { | ||
| endResponse(response); | ||
| } | ||
| } | ||
|
|
||
| private void endResponse(HttpServerResponse resp) { | ||
| Runnable doEnd = () -> { | ||
| if (resp.ended()) return; | ||
| if (resp.bytesWritten() == 0) { | ||
| MultiMap headers = resp.headers(); | ||
| if (headers.get(CONTENT_TYPE) == null) { | ||
| headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS); | ||
| } | ||
| } | ||
| resp.end(); | ||
| }; | ||
| if (io.vertx.core.Context.isOnEventLoopThread()) { | ||
| doEnd.run(); | ||
| } else { | ||
| rc.vertx().runOnContext(v -> doEnd.run()); | ||
| } | ||
| response.end(); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -998,11 +998,16 @@ private MultiSseSupport() { | |
| * @param rc Vert.x routing context providing HTTP response | ||
| * @param context A2A server call context (for EventConsumer cancellation on disconnect) | ||
| */ | ||
| private static final int IDLE = 0; | ||
| private static final int WRITE_PENDING = 1; | ||
| private static final int COMPLETE_DEFERRED = 2; | ||
|
Comment on lines
+1001
to
+1003
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These constants are duplicated across multiple files. Please move them to a shared location (e.g., a common constants class or a shared module) to ensure consistency and avoid duplication. References
|
||
|
|
||
| public static void writeSseStrings(Multi<String> sseStrings, RoutingContext rc, ServerCallContext context) { | ||
| HttpServerResponse response = rc.response(); | ||
|
|
||
| sseStrings.subscribe().withSubscriber(new Flow.Subscriber<String>() { | ||
| Flow.@Nullable Subscription upstream; | ||
| final java.util.concurrent.atomic.AtomicInteger writeState = new java.util.concurrent.atomic.AtomicInteger(IDLE); | ||
|
|
||
| @Override | ||
| public void onSubscribe(Flow.Subscription subscription) { | ||
|
|
@@ -1031,55 +1036,61 @@ public void onNext(String sseEvent) { | |
| if (headers.get(CONTENT_TYPE) == null) { | ||
| headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS); | ||
| } | ||
| // Additional SSE headers to prevent buffering | ||
| headers.set("Cache-Control", "no-cache"); | ||
| headers.set("X-Accel-Buffering", "no"); // Disable nginx buffering | ||
| headers.set("X-Accel-Buffering", "no"); | ||
| response.setChunked(true); | ||
|
|
||
| // CRITICAL: Disable write queue max size to prevent buffering | ||
| // Vert.x buffers writes by default - we need immediate flushing for SSE | ||
| response.setWriteQueueMaxSize(1); // Force immediate flush | ||
|
|
||
| // Send initial SSE comment to kickstart the stream | ||
| // This forces Vert.x to send headers and start the stream immediately | ||
| response.setWriteQueueMaxSize(1); | ||
| response.write(": SSE stream started\n\n"); | ||
| } | ||
|
|
||
| // Write SSE-formatted string to response | ||
| writeState.set(WRITE_PENDING); | ||
| response.write(Buffer.buffer(sseEvent), new Handler<AsyncResult<Void>>() { | ||
| @Override | ||
| public void handle(AsyncResult<Void> ar) { | ||
| if (ar.failed()) { | ||
| // Client disconnected or write failed - cancel upstream to stop EventConsumer | ||
| // NullAway: upstream is guaranteed non-null after onSubscribe | ||
| java.util.Objects.requireNonNull(upstream).cancel(); | ||
| rc.fail(ar.cause()); | ||
| } else { | ||
| // NullAway: upstream is guaranteed non-null after onSubscribe | ||
| } else if (writeState.compareAndSet(WRITE_PENDING, IDLE)) { | ||
| java.util.Objects.requireNonNull(upstream).request(1); | ||
| } else { | ||
| // onComplete() arrived while write was pending — end the response now | ||
| endResponse(response); | ||
| } | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| @Override | ||
| public void onError(Throwable throwable) { | ||
| // Cancel upstream to stop EventConsumer when error occurs | ||
| // NullAway: upstream is guaranteed non-null after onSubscribe | ||
| java.util.Objects.requireNonNull(upstream).cancel(); | ||
| rc.fail(throwable); | ||
| } | ||
|
|
||
| @Override | ||
| public void onComplete() { | ||
| if (response.bytesWritten() == 0) { | ||
| // No events written - still set SSE content type | ||
| MultiMap headers = response.headers(); | ||
| if (headers.get(CONTENT_TYPE) == null) { | ||
| headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS); | ||
| if (!writeState.compareAndSet(WRITE_PENDING, COMPLETE_DEFERRED)) { | ||
| // No write in-flight — end immediately | ||
| endResponse(response); | ||
| } | ||
| // else: write handler will call endResponse when the write completes | ||
| } | ||
|
|
||
| private void endResponse(HttpServerResponse resp) { | ||
| Runnable doEnd = () -> { | ||
| if (resp.ended()) return; | ||
| if (resp.bytesWritten() == 0) { | ||
| MultiMap headers = resp.headers(); | ||
| if (headers.get(CONTENT_TYPE) == null) { | ||
| headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS); | ||
| } | ||
| } | ||
| resp.end(); | ||
| }; | ||
| if (io.vertx.core.Context.isOnEventLoopThread()) { | ||
| doEnd.run(); | ||
| } else { | ||
| rc.vertx().runOnContext(v -> doEnd.run()); | ||
| } | ||
| response.end(); | ||
| } | ||
| }); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These constants are duplicated across multiple files. Please move them to a shared location (e.g., a common constants class or a shared module) to ensure consistency and avoid duplication.
References