diff --git a/vertx-core/src/main/asciidoc/eventbus.adoc b/vertx-core/src/main/asciidoc/eventbus.adoc index c94011962af..5d61a8f1c48 100644 --- a/vertx-core/src/main/asciidoc/eventbus.adoc +++ b/vertx-core/src/main/asciidoc/eventbus.adoc @@ -240,6 +240,25 @@ persisted in storage, or `false` if not. * A message consumer which processes an order might acknowledge with `true` when the order has been successfully processed so it can be deleted from the database +==== Observability and Message Processing + +When messages are consumed, Vert.x automatically manages observability signals such as tracing to track the message processing lifecycle. +By default, these signals are completed when the message handler returns. + +This works well for synchronous handlers but may not be ideal for asynchronous or blocking operations. + +For example, if your handler performs work on a worker thread, the tracer span would be closed before the actual work completes, resulting in inaccurate trace timing and context loss. + +For asynchronous processing, you can register a processor with {@link io.vertx.core.eventbus.MessageConsumer#processor(java.util.function.Function)}. +The future returned by the processor signals the completion of message processing. + +[source,$lang] +---- +{@link examples.EventBusExamples#example13} +---- + +IMPORTANT: When a processor is used, the returned future completion defines the end of processing for all event bus messaging paradigms except request/reply. + ==== Sending with timeouts When sending a message with a reply handler, you can specify a timeout in the {@link io.vertx.core.eventbus.DeliveryOptions}. diff --git a/vertx-core/src/main/java/examples/EventBusExamples.java b/vertx-core/src/main/java/examples/EventBusExamples.java index b4fa8054a08..05f26514330 100644 --- a/vertx-core/src/main/java/examples/EventBusExamples.java +++ b/vertx-core/src/main/java/examples/EventBusExamples.java @@ -11,6 +11,7 @@ package examples; +import io.vertx.core.Future; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; import io.vertx.core.eventbus.*; @@ -173,4 +174,19 @@ public void headers(EventBus eventBus) { class MyPOJO { } + + public void example13(Vertx vertx) { + EventBus eb = vertx.eventBus(); + + eb.consumer("order.process").processor(message -> { + String orderId = message.body(); + Future future = processOrderAsync(orderId); + return future; + }); + } + + private Future processOrderAsync(String orderId) { + // Simulated async operation + return Future.succeededFuture(); + } } diff --git a/vertx-core/src/main/java/io/vertx/core/eventbus/MessageConsumer.java b/vertx-core/src/main/java/io/vertx/core/eventbus/MessageConsumer.java index 4956130a80e..f6e783deb8d 100644 --- a/vertx-core/src/main/java/io/vertx/core/eventbus/MessageConsumer.java +++ b/vertx-core/src/main/java/io/vertx/core/eventbus/MessageConsumer.java @@ -11,11 +11,16 @@ package io.vertx.core.eventbus; +import io.vertx.codegen.annotations.GenIgnore; import io.vertx.codegen.annotations.VertxGen; import io.vertx.core.Future; import io.vertx.core.Handler; import io.vertx.core.streams.ReadStream; +import java.util.function.Function; + +import static io.vertx.codegen.annotations.GenIgnore.PERMITTED_TYPE; + /** * An event bus consumer object representing a stream of message to an {@link EventBus} address that can * be read from. @@ -38,6 +43,17 @@ public interface MessageConsumer extends ReadStream> { @Override MessageConsumer handler(Handler> handler); + /** + * Set a processor for the consumer. + *

+ * The returned future completion signals the completion of message processing. + * + * @param processor the processor + * @return this consumer + */ + @GenIgnore(PERMITTED_TYPE) + MessageConsumer processor(Function, Future> processor); + @Override MessageConsumer pause(); diff --git a/vertx-core/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java b/vertx-core/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java index dbca4538e46..4f5b983aa24 100644 --- a/vertx-core/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java +++ b/vertx-core/src/main/java/io/vertx/core/eventbus/impl/HandlerRegistration.java @@ -22,6 +22,7 @@ import io.vertx.core.tracing.TracingPolicy; import java.util.function.Consumer; +import java.util.function.Function; public abstract class HandlerRegistration implements Closeable { @@ -59,7 +60,7 @@ public String address() { protected abstract void doReceive(Message msg); - protected abstract void dispatchMessage(Message msg, ContextInternal context, Handler> handler); + protected abstract void dispatchMessage(Message msg, ContextInternal context, Function, Future> processor, Completable completion); synchronized void register(boolean broadcast, boolean localOnly, Completable promise) { if (registered != null) { @@ -91,18 +92,18 @@ public Future unregister() { return promise.future(); } - void dispatchMessage(Handler> handler, MessageImpl message, ContextInternal context) { + void dispatchMessage(Function, Future> processor, MessageImpl message, ContextInternal context) { Handler>[] interceptors = message.bus.inboundInterceptors(); if (interceptors.length > 0) { - Runnable dispatch = () -> dispatch(context, message, handler); + Runnable dispatch = () -> dispatch(context, message, processor); DeliveryContextImpl deliveryCtx = new DeliveryContextImpl<>(message, interceptors, context, message.receivedBody, dispatch); deliveryCtx.next(); } else { - dispatch(context, message, handler); + dispatch(context, message, processor); } } - private void dispatch(ContextInternal ctx, MessageImpl message, Handler> handler) { + private void dispatch(ContextInternal ctx, MessageImpl message, Function, Future> processor) { Object m = metric; VertxTracer tracer = ctx.tracer(); if (bus.metrics != null) { @@ -110,16 +111,43 @@ private void dispatch(ContextInternal ctx, MessageImpl message, Handler completion = ctx.promise(); + dispatchMessage(message, ctx, processor, completion); + if (message.replyAddress == null && message.trace != null) { + completion.future().onComplete(new TraceNoReplyCompletion(tracer, ctx, message.trace)); } } else { - dispatchMessage(message, ctx, handler); + dispatchMessage(message, ctx, processor, NO_OP); + } + } + + private static class TraceNoReplyCompletion implements Handler> { + final VertxTracer tracer; + final ContextInternal ctx; + final Object trace; + + TraceNoReplyCompletion(VertxTracer tracer, ContextInternal ctx, Object trace) { + this.tracer = tracer; + this.ctx = ctx; + this.trace = trace; + } + + @Override + public void handle(AsyncResult ar) { + if (ar.succeeded()) { + tracer.sendResponse(ctx, null, trace, null, TagExtractor.empty()); + } else { + tracer.sendResponse(ctx, null, trace, ar.cause(), TagExtractor.empty()); + } } } + private static final Completable NO_OP = new Completable<>() { + @Override + public void complete(Void result, Throwable failure) { + } + }; + void discardMessage(Message msg) { if (bus.metrics != null) { bus.metrics.discardMessage(metric, ((MessageImpl)msg).isLocal(), msg); diff --git a/vertx-core/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java b/vertx-core/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java index 319259497c0..dc337ce5875 100644 --- a/vertx-core/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java +++ b/vertx-core/src/main/java/io/vertx/core/eventbus/impl/MessageConsumerImpl.java @@ -20,6 +20,8 @@ import io.vertx.core.internal.logging.LoggerFactory; import io.vertx.core.streams.ReadStream; +import java.util.function.Function; + /** */ public class MessageConsumerImpl extends HandlerRegistration implements MessageConsumer { @@ -27,7 +29,7 @@ public class MessageConsumerImpl extends HandlerRegistration implements Me private static final Logger log = LoggerFactory.getLogger(MessageConsumerImpl.class); private final boolean localOnly; - private Handler> handler; + private Function, Future> processor; private Handler endHandler; private Handler> discardHandler; private final int maxBufferedMessages; @@ -52,12 +54,12 @@ protected void handlePause() { } @Override protected void handleMessage(Message msg) { - Handler> handler; + Function, Future> processor; synchronized (MessageConsumerImpl.this) { - handler = MessageConsumerImpl.this.handler; + processor = MessageConsumerImpl.this.processor; } - if (handler != null) { - dispatchMessage(handler, (MessageImpl) msg, context.duplicate()); + if (processor != null) { + dispatchMessage(processor, (MessageImpl) msg, context.duplicate()); } else { handleDiscard(msg, false); } @@ -77,7 +79,7 @@ public synchronized Future completion() { @Override public synchronized Future unregister() { - handler = null; + processor = null; if (endHandler != null) { endHandler.handle(null); } @@ -118,11 +120,22 @@ protected void doReceive(Message message) { } @Override - protected void dispatchMessage(Message msg, ContextInternal context, Handler> handler) { - if (handler == null) { + protected void dispatchMessage(Message msg, ContextInternal context, Function, Future> processor, Completable completion) { + if (processor == null) { throw new NullPointerException(); } - context.dispatch(msg, handler); + context.dispatch(msg, message -> { + try { + Future future = processor.apply(message); + if (future == null) { + completion.fail(new NullPointerException("processor returned null")); + } else { + future.onComplete(completion); + } + } catch (Exception e) { + completion.fail(e); + } + }); } /* @@ -135,28 +148,55 @@ public synchronized void discardHandler(Handler> handler) { @Override public synchronized MessageConsumer handler(Handler> h) { if (h != null) { - synchronized (this) { - handler = h; - if (!registered) { - registered = true; - Promise p = result; - Promise registration = context.promise(); - register(true, localOnly, registration); - registration.future().onComplete(ar -> { - if (ar.succeeded()) { - p.tryComplete(); - } else { - p.tryFail(ar.cause()); - } - }); - } - } + processor = new HandlerAdapter<>(h); + registerIfNeeded(); + } else { + unregister(); + } + return this; + } + + private static class HandlerAdapter implements Function, Future> { + final Handler> h; + + HandlerAdapter(Handler> h) { + this.h = h; + } + + @Override + public Future apply(Message msg) { + h.handle(msg); + return Future.succeededFuture(); + } + } + + @Override + public synchronized MessageConsumer processor(Function, Future> processor) { + if (processor != null) { + this.processor = processor; + registerIfNeeded(); } else { unregister(); } return this; } + private void registerIfNeeded() { + if (!registered) { + registered = true; + Promise p = result; + Promise registration = context.promise(); + register(true, localOnly, registration); + registration.future().onComplete(ar -> { + if (ar.succeeded()) { + p.tryComplete(); + } else { + p.tryFail(ar.cause()); + } + }); + } + } + @Override public ReadStream bodyStream() { return new BodyReadStream<>(this); @@ -195,8 +235,4 @@ public synchronized MessageConsumer endHandler(Handler endHandler) { public synchronized MessageConsumer exceptionHandler(Handler handler) { return this; } - - public synchronized Handler> getHandler() { - return handler; - } } diff --git a/vertx-core/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java b/vertx-core/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java index 730b5e3a917..efdeff328d4 100644 --- a/vertx-core/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java +++ b/vertx-core/src/main/java/io/vertx/core/eventbus/impl/ReplyHandler.java @@ -22,6 +22,8 @@ import io.vertx.core.spi.tracing.TagExtractor; import io.vertx.core.spi.tracing.VertxTracer; +import java.util.function.Function; + class ReplyHandler extends HandlerRegistration implements Handler { private static final Completable NULL_COMPLETABLE = (res, err) -> {}; @@ -83,7 +85,7 @@ void register() { } @Override - protected void dispatchMessage(Message reply, ContextInternal context, Handler> handler /* null */) { + protected void dispatchMessage(Message reply, ContextInternal context, Function, Future> processor /* null */, Completable completion) { if (context.owner().cancelTimer(timeoutID)) { unregister(); if (reply.body() instanceof ReplyException) { @@ -93,5 +95,6 @@ protected void dispatchMessage(Message reply, ContextInternal context, Handle result.complete(reply); } } + completion.succeed(); } } diff --git a/vertx-core/src/test/java/io/vertx/tests/tracing/EventBusTracerTestBase.java b/vertx-core/src/test/java/io/vertx/tests/tracing/EventBusTracerTestBase.java index 14211cca3f1..3dcfe4e46be 100644 --- a/vertx-core/src/test/java/io/vertx/tests/tracing/EventBusTracerTestBase.java +++ b/vertx-core/src/test/java/io/vertx/tests/tracing/EventBusTracerTestBase.java @@ -11,6 +11,7 @@ package io.vertx.tests.tracing; import io.vertx.core.Context; +import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.eventbus.DeliveryOptions; import io.vertx.core.eventbus.Message; @@ -301,4 +302,68 @@ public void testEventBusRequestReplyReply() throws Exception { assertEquals(Arrays.asList("sendRequest[the_address]", "receiveResponse[]", "sendRequest[generated]", "receiveResponse[]"), ebTracer.sendEvents); assertEquals(Arrays.asList("receiveRequest[the_address]", "sendResponse[]", "receiveRequest[generated]", "sendResponse[]"), ebTracer.receiveEvents); } + + @Test + public void testEventBusSendProcessor() throws Exception { + EventBusTracer ebTracer = new EventBusTracer(); + tracer = ebTracer; + + Promise completion = Promise.promise(); + + vertx2.eventBus().consumer("the_address") + .processor(msg -> { + assertEquals("msg", msg.body()); + return completion.future(); + }).completion().await(); + + vertx1.runOnContext(v -> { + Context ctx = vertx1.getOrCreateContext(); + sendKey.put(ctx, ebTracer.sendVal); + vertx1.eventBus().send("the_address", "msg"); + }); + + waitUntil(() -> ebTracer.receiveEvents.contains("receiveRequest[the_address]")); + assertFalse(ebTracer.receiveEvents.contains("sendResponse[]")); + + completion.complete(); + waitUntil(() -> ebTracer.sendEvents.size() + ebTracer.receiveEvents.size() == 4); + assertEquals(Arrays.asList("receiveRequest[the_address]", "sendResponse[]"), ebTracer.receiveEvents); + } + + @Test + public void testEventBusPublishProcessor() throws Exception { + EventBusTracer ebTracer = new EventBusTracer(); + tracer = ebTracer; + + Promise completion1 = Promise.promise(); + Promise completion2 = Promise.promise(); + + vertx2.eventBus().consumer("the_address") + .processor(msg -> { + assertEquals("msg", msg.body()); + return completion1.future(); + }).completion().await(); + + vertx2.eventBus().consumer("the_address") + .processor(msg -> { + assertEquals("msg", msg.body()); + return completion2.future(); + }).completion().await(); + + vertx1.runOnContext(v -> { + Context ctx = vertx1.getOrCreateContext(); + sendKey.put(ctx, ebTracer.sendVal); + vertx1.eventBus().publish("the_address", "msg"); + }); + + waitUntil(() -> ebTracer.receiveEvents.stream().filter(e -> e.equals("receiveRequest[the_address]")).count() == 2); + assertEquals(0, ebTracer.receiveEvents.stream().filter(e -> e.equals("sendResponse[]")).count()); + + completion1.complete(); + waitUntil(() -> ebTracer.receiveEvents.stream().filter(e -> e.equals("sendResponse[]")).count() == 1); + + completion2.complete(); + waitUntil(() -> ebTracer.sendEvents.size() + ebTracer.receiveEvents.size() == 6); + assertEquals(2, ebTracer.receiveEvents.stream().filter(e -> e.equals("sendResponse[]")).count()); + } }