Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions vertx-core/src/main/asciidoc/eventbus.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,25 @@ persisted in storage, or `false` if not.
* A message consumer which processes an order might acknowledge with `true` when the order has been successfully processed
so it can be deleted from the database

==== Observability and Message Processing

When messages are consumed, Vert.x automatically manages observability signals such as tracing to track the message processing lifecycle.
By default, these signals are completed when the message handler returns.

This works well for synchronous handlers but may not be ideal for asynchronous or blocking operations.

For example, if your handler performs work on a worker thread, the tracer span would be closed before the actual work completes, resulting in inaccurate trace timing and context loss.

For asynchronous processing, you can register a processor with {@link io.vertx.core.eventbus.MessageConsumer#processor(java.util.function.Function)}.
The future returned by the processor signals the completion of message processing.

[source,$lang]
----
{@link examples.EventBusExamples#example13}
----

IMPORTANT: When a processor is used, the returned future completion defines the end of processing for all event bus messaging paradigms except request/reply.

==== Sending with timeouts

When sending a message with a reply handler, you can specify a timeout in the {@link io.vertx.core.eventbus.DeliveryOptions}.
Expand Down
16 changes: 16 additions & 0 deletions vertx-core/src/main/java/examples/EventBusExamples.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

package examples;

import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.eventbus.*;
Expand Down Expand Up @@ -173,4 +174,19 @@ public void headers(EventBus eventBus) {
class MyPOJO {

}

public void example13(Vertx vertx) {
EventBus eb = vertx.eventBus();

eb.<String>consumer("order.process").processor(message -> {
String orderId = message.body();
Future<Void> future = processOrderAsync(orderId);
return future;
});
}

private Future<Void> processOrderAsync(String orderId) {
// Simulated async operation
return Future.succeededFuture();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,16 @@

package io.vertx.core.eventbus;

import io.vertx.codegen.annotations.GenIgnore;
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.streams.ReadStream;

import java.util.function.Function;

import static io.vertx.codegen.annotations.GenIgnore.PERMITTED_TYPE;

/**
* An event bus consumer object representing a stream of message to an {@link EventBus} address that can
* be read from.
Expand All @@ -38,6 +43,17 @@ public interface MessageConsumer<T> extends ReadStream<Message<T>> {
@Override
MessageConsumer<T> handler(Handler<Message<T>> handler);

/**
* Set a processor for the consumer.
* <p>
* The returned future completion signals the completion of message processing.
*
* @param processor the processor
* @return this consumer
*/
@GenIgnore(PERMITTED_TYPE)
MessageConsumer<T> processor(Function<Message<T>, Future<Void>> processor);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we should use instead Function<Message<T>, Future<?>>


@Override
MessageConsumer<T> pause();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.vertx.core.tracing.TracingPolicy;

import java.util.function.Consumer;
import java.util.function.Function;

public abstract class HandlerRegistration<T> implements Closeable {

Expand Down Expand Up @@ -59,7 +60,7 @@ public String address() {

protected abstract void doReceive(Message<T> msg);

protected abstract void dispatchMessage(Message<T> msg, ContextInternal context, Handler<Message<T>> handler);
protected abstract void dispatchMessage(Message<T> msg, ContextInternal context, Function<Message<T>, Future<Void>> processor, Completable<Void> completion);

synchronized void register(boolean broadcast, boolean localOnly, Completable<Void> promise) {
if (registered != null) {
Expand Down Expand Up @@ -91,35 +92,62 @@ public Future<Void> unregister() {
return promise.future();
}

void dispatchMessage(Handler<Message<T>> handler, MessageImpl<?, T> message, ContextInternal context) {
void dispatchMessage(Function<Message<T>, Future<Void>> processor, MessageImpl<?, T> message, ContextInternal context) {
Handler<DeliveryContext<?>>[] interceptors = message.bus.inboundInterceptors();
if (interceptors.length > 0) {
Runnable dispatch = () -> dispatch(context, message, handler);
Runnable dispatch = () -> dispatch(context, message, processor);
DeliveryContextImpl<T> deliveryCtx = new DeliveryContextImpl<>(message, interceptors, context, message.receivedBody, dispatch);
deliveryCtx.next();
} else {
dispatch(context, message, handler);
dispatch(context, message, processor);
}
}

private void dispatch(ContextInternal ctx, MessageImpl<?, T> message, Handler<Message<T>> handler) {
private void dispatch(ContextInternal ctx, MessageImpl<?, T> message, Function<Message<T>, Future<Void>> processor) {
Object m = metric;
VertxTracer tracer = ctx.tracer();
if (bus.metrics != null) {
bus.metrics.messageDelivered(m, message.isLocal());
}
if (tracer != null && !src) {
message.trace = tracer.receiveRequest(ctx, SpanKind.RPC, TracingPolicy.PROPAGATE, message, message.isSend() ? "send" : "publish", message.headers(), MessageTagExtractor.INSTANCE);
dispatchMessage(message, ctx, handler);
Object trace = message.trace;
if (message.replyAddress == null && trace != null) {
tracer.sendResponse(ctx, null, trace, null, TagExtractor.empty());
Promise<Void> completion = ctx.promise();
dispatchMessage(message, ctx, processor, completion);
if (message.replyAddress == null && message.trace != null) {
completion.future().onComplete(new TraceNoReplyCompletion(tracer, ctx, message.trace));
}
} else {
dispatchMessage(message, ctx, handler);
dispatchMessage(message, ctx, processor, NO_OP);
}
}

private static class TraceNoReplyCompletion implements Handler<AsyncResult<Void>> {
final VertxTracer tracer;
final ContextInternal ctx;
final Object trace;

TraceNoReplyCompletion(VertxTracer tracer, ContextInternal ctx, Object trace) {
this.tracer = tracer;
this.ctx = ctx;
this.trace = trace;
}

@Override
public void handle(AsyncResult<Void> ar) {
if (ar.succeeded()) {
tracer.sendResponse(ctx, null, trace, null, TagExtractor.empty());
} else {
tracer.sendResponse(ctx, null, trace, ar.cause(), TagExtractor.empty());
}
}
}

private static final Completable<Void> NO_OP = new Completable<>() {
@Override
public void complete(Void result, Throwable failure) {
}
};

void discardMessage(Message<T> msg) {
if (bus.metrics != null) {
bus.metrics.discardMessage(metric, ((MessageImpl)msg).isLocal(), msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.streams.ReadStream;

import java.util.function.Function;

/**
*/
public class MessageConsumerImpl<T> extends HandlerRegistration<T> implements MessageConsumer<T> {

private static final Logger log = LoggerFactory.getLogger(MessageConsumerImpl.class);

private final boolean localOnly;
private Handler<Message<T>> handler;
private Function<Message<T>, Future<Void>> processor;
private Handler<Void> endHandler;
private Handler<Message<T>> discardHandler;
private final int maxBufferedMessages;
Expand All @@ -52,12 +54,12 @@ protected void handlePause() {
}
@Override
protected void handleMessage(Message<T> msg) {
Handler<Message<T>> handler;
Function<Message<T>, Future<Void>> processor;
synchronized (MessageConsumerImpl.this) {
handler = MessageConsumerImpl.this.handler;
processor = MessageConsumerImpl.this.processor;
}
if (handler != null) {
dispatchMessage(handler, (MessageImpl<?, T>) msg, context.duplicate());
if (processor != null) {
dispatchMessage(processor, (MessageImpl<?, T>) msg, context.duplicate());
} else {
handleDiscard(msg, false);
}
Expand All @@ -77,7 +79,7 @@ public synchronized Future<Void> completion() {

@Override
public synchronized Future<Void> unregister() {
handler = null;
processor = null;
if (endHandler != null) {
endHandler.handle(null);
}
Expand Down Expand Up @@ -118,11 +120,22 @@ protected void doReceive(Message<T> message) {
}

@Override
protected void dispatchMessage(Message<T> msg, ContextInternal context, Handler<Message<T>> handler) {
if (handler == null) {
protected void dispatchMessage(Message<T> msg, ContextInternal context, Function<Message<T>, Future<Void>> processor, Completable<Void> completion) {
if (processor == null) {
throw new NullPointerException();
}
context.dispatch(msg, handler);
context.dispatch(msg, message -> {
try {
Future<Void> future = processor.apply(message);
if (future == null) {
completion.fail(new NullPointerException("processor returned null"));
} else {
future.onComplete(completion);
}
} catch (Exception e) {
completion.fail(e);
}
});
}

/*
Expand All @@ -135,28 +148,55 @@ public synchronized void discardHandler(Handler<Message<T>> handler) {
@Override
public synchronized MessageConsumer<T> handler(Handler<Message<T>> h) {
if (h != null) {
synchronized (this) {
handler = h;
if (!registered) {
registered = true;
Promise<Void> p = result;
Promise<Void> registration = context.promise();
register(true, localOnly, registration);
registration.future().onComplete(ar -> {
if (ar.succeeded()) {
p.tryComplete();
} else {
p.tryFail(ar.cause());
}
});
}
}
processor = new HandlerAdapter<>(h);
registerIfNeeded();
} else {
unregister();
}
return this;
}

private static class HandlerAdapter<BODY_TYPE> implements Function<Message<BODY_TYPE>, Future<Void>> {
final Handler<Message<BODY_TYPE>> h;

HandlerAdapter(Handler<Message<BODY_TYPE>> h) {
this.h = h;
}

@Override
public Future<Void> apply(Message<BODY_TYPE> msg) {
h.handle(msg);
return Future.succeededFuture();
}
}

@Override
public synchronized MessageConsumer<T> processor(Function<Message<T>, Future<Void>> processor) {
if (processor != null) {
this.processor = processor;
registerIfNeeded();
} else {
unregister();
}
return this;
}

private void registerIfNeeded() {
if (!registered) {
registered = true;
Promise<Void> p = result;
Promise<Void> registration = context.promise();
register(true, localOnly, registration);
registration.future().onComplete(ar -> {
if (ar.succeeded()) {
p.tryComplete();
} else {
p.tryFail(ar.cause());
}
});
}
}

@Override
public ReadStream<T> bodyStream() {
return new BodyReadStream<>(this);
Expand Down Expand Up @@ -195,8 +235,4 @@ public synchronized MessageConsumer<T> endHandler(Handler<Void> endHandler) {
public synchronized MessageConsumer<T> exceptionHandler(Handler<Throwable> handler) {
return this;
}

public synchronized Handler<Message<T>> getHandler() {
return handler;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.vertx.core.spi.tracing.TagExtractor;
import io.vertx.core.spi.tracing.VertxTracer;

import java.util.function.Function;

class ReplyHandler<T> extends HandlerRegistration<T> implements Handler<Long> {

private static final Completable<Void> NULL_COMPLETABLE = (res, err) -> {};
Expand Down Expand Up @@ -83,7 +85,7 @@ void register() {
}

@Override
protected void dispatchMessage(Message<T> reply, ContextInternal context, Handler<Message<T>> handler /* null */) {
protected void dispatchMessage(Message<T> reply, ContextInternal context, Function<Message<T>, Future<Void>> processor /* null */, Completable<Void> completion) {
if (context.owner().cancelTimer(timeoutID)) {
unregister();
if (reply.body() instanceof ReplyException) {
Expand All @@ -93,5 +95,6 @@ protected void dispatchMessage(Message<T> reply, ContextInternal context, Handle
result.complete(reply);
}
}
completion.succeed();
}
}
Loading
Loading