Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -342,19 +342,24 @@ private MultiSseSupport() {
// Avoid direct instantiation.
}

private static final int IDLE = 0;
private static final int WRITE_PENDING = 1;
private static final int COMPLETE_DEFERRED = 2;
Comment on lines +345 to +347
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

These constants are duplicated across multiple files. Please move them to a shared location (e.g., a common constants class or a shared module) to ensure consistency and avoid duplication.

References
  1. Constants that are used across multiple modules or components should be moved to a shared location (e.g., a common constants class or a shared module) to avoid duplication and ensure consistency.


public static void writeSseStrings(Multi<String> sseStrings, RoutingContext rc, ServerCallContext context) {
HttpServerResponse response = rc.response();

sseStrings.subscribe().withSubscriber(new Flow.Subscriber<String>() {
Flow.@Nullable Subscription upstream;
final java.util.concurrent.atomic.AtomicInteger writeState = new java.util.concurrent.atomic.AtomicInteger(IDLE);

@Override
public void onSubscribe(Flow.Subscription subscription) {
this.upstream = subscription;
this.upstream.request(1);

response.closeHandler(v -> {
logger.info("SSE connection closed by client, calling EventConsumer.cancel() to stop polling loop");
logger.debug("SSE connection closed by client, calling EventConsumer.cancel() to stop polling loop");
context.invokeEventConsumerCancelCallback();
subscription.cancel();
});
Expand All @@ -380,14 +385,17 @@ public void onNext(String sseEvent) {
response.write(": SSE stream started\n\n");
}

writeState.set(WRITE_PENDING);
response.write(Buffer.buffer(sseEvent), new Handler<AsyncResult<Void>>() {
@Override
public void handle(AsyncResult<Void> ar) {
if (ar.failed()) {
java.util.Objects.requireNonNull(upstream).cancel();
rc.fail(ar.cause());
} else {
} else if (writeState.compareAndSet(WRITE_PENDING, IDLE)) {
java.util.Objects.requireNonNull(upstream).request(1);
} else {
endResponse(response);
}
}
});
Expand All @@ -401,13 +409,27 @@ public void onError(Throwable throwable) {

@Override
public void onComplete() {
if (response.bytesWritten() == 0) {
MultiMap headers = response.headers();
if (headers.get(CONTENT_TYPE) == null) {
headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS);
if (!writeState.compareAndSet(WRITE_PENDING, COMPLETE_DEFERRED)) {
endResponse(response);
}
}

private void endResponse(HttpServerResponse resp) {
Runnable doEnd = () -> {
if (resp.ended()) return;
if (resp.bytesWritten() == 0) {
MultiMap headers = resp.headers();
if (headers.get(CONTENT_TYPE) == null) {
headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS);
}
}
resp.end();
};
if (io.vertx.core.Context.isOnEventLoopThread()) {
doEnd.run();
} else {
rc.vertx().runOnContext(v -> doEnd.run());
}
response.end();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -458,11 +458,16 @@ private MultiSseSupport() {
// Avoid direct instantiation.
}

private static final int IDLE = 0;
private static final int WRITE_PENDING = 1;
private static final int COMPLETE_DEFERRED = 2;
Comment on lines +461 to +463
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

These constants are duplicated across multiple files. Please move them to a shared location (e.g., a common constants class or a shared module) to ensure consistency and avoid duplication.

References
  1. Constants that are used across multiple modules or components should be moved to a shared location (e.g., a common constants class or a shared module) to avoid duplication and ensure consistency.


public static void writeSseStrings(Multi<String> sseStrings, RoutingContext rc, ServerCallContext context) {
HttpServerResponse response = rc.response();

sseStrings.subscribe().withSubscriber(new Flow.Subscriber<String>() {
Flow.@Nullable Subscription upstream;
final java.util.concurrent.atomic.AtomicInteger writeState = new java.util.concurrent.atomic.AtomicInteger(IDLE);

@Override
public void onSubscribe(Flow.Subscription subscription) {
Expand Down Expand Up @@ -496,14 +501,17 @@ public void onNext(String sseEvent) {
response.write(": SSE stream started\n\n");
}

writeState.set(WRITE_PENDING);
response.write(Buffer.buffer(sseEvent), new Handler<AsyncResult<Void>>() {
@Override
public void handle(AsyncResult<Void> ar) {
if (ar.failed()) {
java.util.Objects.requireNonNull(upstream).cancel();
rc.fail(ar.cause());
} else {
} else if (writeState.compareAndSet(WRITE_PENDING, IDLE)) {
java.util.Objects.requireNonNull(upstream).request(1);
} else {
endResponse(response);
}
}
});
Expand All @@ -517,13 +525,27 @@ public void onError(Throwable throwable) {

@Override
public void onComplete() {
if (response.bytesWritten() == 0) {
MultiMap headers = response.headers();
if (headers.get(CONTENT_TYPE) == null) {
headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS);
if (!writeState.compareAndSet(WRITE_PENDING, COMPLETE_DEFERRED)) {
endResponse(response);
}
}

private void endResponse(HttpServerResponse resp) {
Runnable doEnd = () -> {
if (resp.ended()) return;
if (resp.bytesWritten() == 0) {
MultiMap headers = resp.headers();
if (headers.get(CONTENT_TYPE) == null) {
headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS);
}
}
resp.end();
};
if (io.vertx.core.Context.isOnEventLoopThread()) {
doEnd.run();
} else {
rc.vertx().runOnContext(v -> doEnd.run());
}
response.end();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.a2aproject.sdk.spec.TransportProtocol;
import org.a2aproject.sdk.spec.UnsupportedOperationError;
import org.a2aproject.sdk.transport.jsonrpc.handler.JSONRPCHandler;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -807,11 +808,16 @@ private MultiSseSupport() {
* @param context the A2A server call context (for EventConsumer cancellation)
* @see SseFormatter#formatResponseAsSSE
*/
private static final int IDLE = 0;
private static final int WRITE_PENDING = 1;
private static final int COMPLETE_DEFERRED = 2;
Comment on lines +811 to +813
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

These constants are duplicated across multiple files. Please move them to a shared location (e.g., a common constants class or a shared module) to ensure consistency and avoid duplication.

References
  1. Constants that are used across multiple modules or components should be moved to a shared location (e.g., a common constants class or a shared module) to avoid duplication and ensure consistency.


public static void writeSseStrings(Multi<String> sseStrings, RoutingContext rc, ServerCallContext context) {
HttpServerResponse response = rc.response();

sseStrings.subscribe().withSubscriber(new Flow.Subscriber<String>() {
Flow.Subscription upstream;
Flow.@Nullable Subscription upstream;
final java.util.concurrent.atomic.AtomicInteger writeState = new java.util.concurrent.atomic.AtomicInteger(IDLE);

@Override
public void onSubscribe(Flow.Subscription subscription) {
Expand All @@ -820,7 +826,7 @@ public void onSubscribe(Flow.Subscription subscription) {

// Detect client disconnect and call EventConsumer.cancel() directly
response.closeHandler(v -> {
logger.info("SSE connection closed by client, calling EventConsumer.cancel() to stop polling loop");
logger.debug("SSE connection closed by client, calling EventConsumer.cancel() to stop polling loop");
context.invokeEventConsumerCancelCallback();
subscription.cancel();
});
Expand All @@ -840,51 +846,58 @@ public void onNext(String sseEvent) {
if (headers.get(CONTENT_TYPE) == null) {
headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS);
}
// Additional SSE headers to prevent buffering
headers.set("Cache-Control", "no-cache");
headers.set("X-Accel-Buffering", "no"); // Disable nginx buffering
headers.set("X-Accel-Buffering", "no");
response.setChunked(true);

// CRITICAL: Disable write queue max size to prevent buffering
// Vert.x buffers writes by default - we need immediate flushing for SSE
response.setWriteQueueMaxSize(1);

// Send initial SSE comment to kickstart the stream
response.write(": SSE stream started\n\n");
}

// Write SSE-formatted string to response
writeState.set(WRITE_PENDING);
response.write(Buffer.buffer(sseEvent), new Handler<AsyncResult<Void>>() {
@Override
public void handle(AsyncResult<Void> ar) {
if (ar.failed()) {
// Client disconnected or write failed - cancel upstream to stop EventConsumer
upstream.cancel();
java.util.Objects.requireNonNull(upstream).cancel();
rc.fail(ar.cause());
} else if (writeState.compareAndSet(WRITE_PENDING, IDLE)) {
java.util.Objects.requireNonNull(upstream).request(1);
} else {
upstream.request(1);
endResponse(response);
}
}
});
}

@Override
public void onError(Throwable throwable) {
// Cancel upstream to stop EventConsumer when error occurs
upstream.cancel();
java.util.Objects.requireNonNull(upstream).cancel();
rc.fail(throwable);
}

@Override
public void onComplete() {
if (response.bytesWritten() == 0) {
// No events written - still set SSE content type
MultiMap headers = response.headers();
if (headers.get(CONTENT_TYPE) == null) {
headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS);
if (!writeState.compareAndSet(WRITE_PENDING, COMPLETE_DEFERRED)) {
endResponse(response);
}
}

private void endResponse(HttpServerResponse resp) {
Runnable doEnd = () -> {
if (resp.ended()) return;
if (resp.bytesWritten() == 0) {
MultiMap headers = resp.headers();
if (headers.get(CONTENT_TYPE) == null) {
headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS);
}
}
resp.end();
};
if (io.vertx.core.Context.isOnEventLoopThread()) {
doEnd.run();
} else {
rc.vertx().runOnContext(v -> doEnd.run());
}
response.end();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -998,11 +998,16 @@ private MultiSseSupport() {
* @param rc Vert.x routing context providing HTTP response
* @param context A2A server call context (for EventConsumer cancellation on disconnect)
*/
private static final int IDLE = 0;
private static final int WRITE_PENDING = 1;
private static final int COMPLETE_DEFERRED = 2;
Comment on lines +1001 to +1003
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

These constants are duplicated across multiple files. Please move them to a shared location (e.g., a common constants class or a shared module) to ensure consistency and avoid duplication.

References
  1. Constants that are used across multiple modules or components should be moved to a shared location (e.g., a common constants class or a shared module) to avoid duplication and ensure consistency.


public static void writeSseStrings(Multi<String> sseStrings, RoutingContext rc, ServerCallContext context) {
HttpServerResponse response = rc.response();

sseStrings.subscribe().withSubscriber(new Flow.Subscriber<String>() {
Flow.@Nullable Subscription upstream;
final java.util.concurrent.atomic.AtomicInteger writeState = new java.util.concurrent.atomic.AtomicInteger(IDLE);

@Override
public void onSubscribe(Flow.Subscription subscription) {
Expand Down Expand Up @@ -1031,55 +1036,61 @@ public void onNext(String sseEvent) {
if (headers.get(CONTENT_TYPE) == null) {
headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS);
}
// Additional SSE headers to prevent buffering
headers.set("Cache-Control", "no-cache");
headers.set("X-Accel-Buffering", "no"); // Disable nginx buffering
headers.set("X-Accel-Buffering", "no");
response.setChunked(true);

// CRITICAL: Disable write queue max size to prevent buffering
// Vert.x buffers writes by default - we need immediate flushing for SSE
response.setWriteQueueMaxSize(1); // Force immediate flush

// Send initial SSE comment to kickstart the stream
// This forces Vert.x to send headers and start the stream immediately
response.setWriteQueueMaxSize(1);
response.write(": SSE stream started\n\n");
}

// Write SSE-formatted string to response
writeState.set(WRITE_PENDING);
response.write(Buffer.buffer(sseEvent), new Handler<AsyncResult<Void>>() {
@Override
public void handle(AsyncResult<Void> ar) {
if (ar.failed()) {
// Client disconnected or write failed - cancel upstream to stop EventConsumer
// NullAway: upstream is guaranteed non-null after onSubscribe
java.util.Objects.requireNonNull(upstream).cancel();
rc.fail(ar.cause());
} else {
// NullAway: upstream is guaranteed non-null after onSubscribe
} else if (writeState.compareAndSet(WRITE_PENDING, IDLE)) {
java.util.Objects.requireNonNull(upstream).request(1);
} else {
// onComplete() arrived while write was pending — end the response now
endResponse(response);
}
}
});
}

@Override
public void onError(Throwable throwable) {
// Cancel upstream to stop EventConsumer when error occurs
// NullAway: upstream is guaranteed non-null after onSubscribe
java.util.Objects.requireNonNull(upstream).cancel();
rc.fail(throwable);
}

@Override
public void onComplete() {
if (response.bytesWritten() == 0) {
// No events written - still set SSE content type
MultiMap headers = response.headers();
if (headers.get(CONTENT_TYPE) == null) {
headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS);
if (!writeState.compareAndSet(WRITE_PENDING, COMPLETE_DEFERRED)) {
// No write in-flight — end immediately
endResponse(response);
}
// else: write handler will call endResponse when the write completes
}

private void endResponse(HttpServerResponse resp) {
Runnable doEnd = () -> {
if (resp.ended()) return;
if (resp.bytesWritten() == 0) {
MultiMap headers = resp.headers();
if (headers.get(CONTENT_TYPE) == null) {
headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS);
}
}
resp.end();
};
if (io.vertx.core.Context.isOnEventLoopThread()) {
doEnd.run();
} else {
rc.vertx().runOnContext(v -> doEnd.run());
}
response.end();
}
});
}
Expand Down
Loading
Loading