From 2e9e35ba9d7a6af61d19d828ef6fd17b12630ec1 Mon Sep 17 00:00:00 2001 From: Artur Ciocanu Date: Fri, 9 Jan 2026 23:48:25 -0800 Subject: [PATCH 1/5] Adding raw event subscription. Signed-off-by: Artur Ciocanu --- .../dapr/it/pubsub/stream/PubSubStreamIT.java | 44 ++++++++++ .../java/io/dapr/client/DaprClientImpl.java | 9 ++ .../io/dapr/client/DaprPreviewClient.java | 12 +++ .../client/DaprPreviewClientGrpcTest.java | 82 +++++++++++++++++++ 4 files changed, 147 insertions(+) diff --git a/sdk-tests/src/test/java/io/dapr/it/pubsub/stream/PubSubStreamIT.java b/sdk-tests/src/test/java/io/dapr/it/pubsub/stream/PubSubStreamIT.java index 6c01befdef..8055b75526 100644 --- a/sdk-tests/src/test/java/io/dapr/it/pubsub/stream/PubSubStreamIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/pubsub/stream/PubSubStreamIT.java @@ -124,4 +124,48 @@ public void onError(RuntimeException exception) { } } } + + @Test + public void testPubSubRawData() throws Exception { + final DaprRun daprRun = closeLater(startDaprApp( + this.getClass().getSimpleName() + "-rawdata", + 60000)); + + var runId = UUID.randomUUID().toString(); + try (DaprClient client = daprRun.newDaprClient(); + DaprPreviewClient previewClient = daprRun.newDaprPreviewClient()) { + + // Publish messages + for (int i = 0; i < NUM_MESSAGES; i++) { + String message = String.format("Raw message #%d for run %s", i, runId); + client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message).block(); + System.out.println( + String.format("Published raw message: '%s' to topic '%s'", message, TOPIC_NAME)); + } + + System.out.println("Starting raw data subscription for " + TOPIC_NAME); + + Set messages = Collections.synchronizedSet(new HashSet<>()); + + // Use new subscribeToEventsData - receives String directly, not CloudEvent + var disposable = previewClient.subscribeToEventsData(PUBSUB_NAME, TOPIC_NAME, TypeRef.STRING) + .doOnNext(rawMessage -> { + // rawMessage is String directly + if (rawMessage.contains(runId)) { + messages.add(rawMessage); + System.out.println("Received raw message: " + rawMessage); + } + }) + .subscribe(); + + callWithRetry(() -> { + var messageCount = messages.size(); + System.out.println( + String.format("Got %d raw messages out of %d for topic %s.", messageCount, NUM_MESSAGES, TOPIC_NAME)); + assertEquals(NUM_MESSAGES, messages.size()); + }, 60000); + + disposable.dispose(); + } + } } diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java index 05b555b5e9..f41df9e80d 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java @@ -514,6 +514,15 @@ public Flux> subscribeToEvents(String pubsubName, String topic }, FluxSink.OverflowStrategy.BUFFER); } + /** + * {@inheritDoc} + */ + @Override + public Flux subscribeToEventsData(String pubsubName, String topic, TypeRef type) { + return subscribeToEvents(pubsubName, topic, type) + .map(CloudEvent::getData); + } + @Nonnull private Subscription buildSubscription( SubscriptionListener listener, diff --git a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java index 17ca136492..9677327efb 100644 --- a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java +++ b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java @@ -293,6 +293,18 @@ Subscription subscribeToEvents( */ Flux> subscribeToEvents(String pubsubName, String topic, TypeRef type); + /** + * Subscribe to pubsub events via streaming using Project Reactor Flux. + * Returns only the deserialized event data without CloudEvent metadata wrapper. + * + * @param pubsubName Name of the pubsub component. + * @param topic Name of the topic to subscribe to. + * @param type Type for object deserialization. + * @return A Flux of deserialized event payloads (no CloudEvent wrapper). + * @param Type of the event payload. + */ + Flux subscribeToEventsData(String pubsubName, String topic, TypeRef type); + /* * Converse with an LLM. * diff --git a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java index af54b64904..0d7309b167 100644 --- a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java @@ -659,6 +659,88 @@ public void onCompleted() { assertEquals(numEvents, eventCount.get()); } + @Test + public void subscribeToEventsDataTest() throws Exception { + var numEvents = 100; + var pubsubName = "pubsubName"; + var topicName = "topicName"; + var data = "my message"; + var started = new Semaphore(0); + + doAnswer((Answer>) invocation -> { + StreamObserver observer = + (StreamObserver) invocation.getArguments()[0]; + + var emitterThread = new Thread(() -> { + try { + started.acquire(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + observer.onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1.getDefaultInstance()); + + for (int i = 0; i < numEvents; i++) { + DaprProtos.SubscribeTopicEventsResponseAlpha1 response = + DaprProtos.SubscribeTopicEventsResponseAlpha1.newBuilder() + .setEventMessage(DaprAppCallbackProtos.TopicEventRequest.newBuilder() + .setId(Integer.toString(i)) + .setPubsubName(pubsubName) + .setTopic(topicName) + .setData(ByteString.copyFromUtf8("\"" + data + "\"")) + .setDataContentType("application/json") + .build()) + .build(); + observer.onNext(response); + } + + observer.onCompleted(); + }); + + emitterThread.start(); + + return new StreamObserver<>() { + @Override + public void onNext(DaprProtos.SubscribeTopicEventsRequestAlpha1 subscribeTopicEventsRequestAlpha1) { + started.release(); + } + + @Override + public void onError(Throwable throwable) { + // No-op + } + + @Override + public void onCompleted() { + // No-op + } + }; + }).when(daprStub).subscribeTopicEventsAlpha1(any(StreamObserver.class)); + + final AtomicInteger eventCount = new AtomicInteger(0); + final Semaphore gotAll = new Semaphore(0); + + // Use new subscribeToEventsData - receives raw String, not CloudEvent + var disposable = previewClient.subscribeToEventsData(pubsubName, topicName, TypeRef.STRING) + .doOnNext(rawData -> { + // rawData is String directly, not CloudEvent + assertEquals(data, rawData); + assertTrue(rawData instanceof String); + + int count = eventCount.incrementAndGet(); + + if (count >= numEvents) { + gotAll.release(); + } + }) + .subscribe(); + + gotAll.acquire(); + disposable.dispose(); + + assertEquals(numEvents, eventCount.get()); + } + @Test public void converseShouldThrowIllegalArgumentExceptionWhenComponentNameIsNull() throws Exception { List inputs = new ArrayList<>(); From e5dc0ec383643177277634603b7b567250c19653 Mon Sep 17 00:00:00 2001 From: Artur Ciocanu Date: Wed, 14 Jan 2026 21:40:41 -0500 Subject: [PATCH 2/5] Use proper method overloads for subscribeToEvents() Signed-off-by: Artur Ciocanu --- .../dapr/it/pubsub/stream/PubSubStreamIT.java | 106 ++++++++++++++++-- .../java/io/dapr/client/DaprClientImpl.java | 31 ++--- .../io/dapr/client/DaprPreviewClient.java | 23 +++- .../EventSubscriberStreamObserver.java | 31 ++--- .../client/DaprPreviewClientGrpcTest.java | 33 ++++-- .../EventSubscriberStreamObserverTest.java | 65 +++++------ 6 files changed, 189 insertions(+), 100 deletions(-) diff --git a/sdk-tests/src/test/java/io/dapr/it/pubsub/stream/PubSubStreamIT.java b/sdk-tests/src/test/java/io/dapr/it/pubsub/stream/PubSubStreamIT.java index 8055b75526..410b0da80c 100644 --- a/sdk-tests/src/test/java/io/dapr/it/pubsub/stream/PubSubStreamIT.java +++ b/sdk-tests/src/test/java/io/dapr/it/pubsub/stream/PubSubStreamIT.java @@ -28,6 +28,7 @@ import java.util.Collections; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.Set; import java.util.UUID; @@ -126,9 +127,9 @@ public void onError(RuntimeException exception) { } @Test - public void testPubSubRawData() throws Exception { + public void testPubSubFlux() throws Exception { final DaprRun daprRun = closeLater(startDaprApp( - this.getClass().getSimpleName() + "-rawdata", + this.getClass().getSimpleName() + "-flux", 60000)); var runId = UUID.randomUUID().toString(); @@ -137,18 +138,18 @@ public void testPubSubRawData() throws Exception { // Publish messages for (int i = 0; i < NUM_MESSAGES; i++) { - String message = String.format("Raw message #%d for run %s", i, runId); + String message = String.format("Flux message #%d for run %s", i, runId); client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message).block(); System.out.println( - String.format("Published raw message: '%s' to topic '%s'", message, TOPIC_NAME)); + String.format("Published flux message: '%s' to topic '%s'", message, TOPIC_NAME)); } - System.out.println("Starting raw data subscription for " + TOPIC_NAME); + System.out.println("Starting Flux subscription for " + TOPIC_NAME); Set messages = Collections.synchronizedSet(new HashSet<>()); - // Use new subscribeToEventsData - receives String directly, not CloudEvent - var disposable = previewClient.subscribeToEventsData(PUBSUB_NAME, TOPIC_NAME, TypeRef.STRING) + // subscribeToEvents now returns Flux directly (raw data) + var disposable = previewClient.subscribeToEvents(PUBSUB_NAME, TOPIC_NAME, TypeRef.STRING) .doOnNext(rawMessage -> { // rawMessage is String directly if (rawMessage.contains(runId)) { @@ -161,7 +162,96 @@ public void testPubSubRawData() throws Exception { callWithRetry(() -> { var messageCount = messages.size(); System.out.println( - String.format("Got %d raw messages out of %d for topic %s.", messageCount, NUM_MESSAGES, TOPIC_NAME)); + String.format("Got %d flux messages out of %d for topic %s.", messageCount, NUM_MESSAGES, TOPIC_NAME)); + assertEquals(NUM_MESSAGES, messages.size()); + }, 60000); + + disposable.dispose(); + } + } + + @Test + public void testPubSubCloudEvent() throws Exception { + final DaprRun daprRun = closeLater(startDaprApp( + this.getClass().getSimpleName() + "-cloudevent", + 60000)); + + var runId = UUID.randomUUID().toString(); + try (DaprClient client = daprRun.newDaprClient(); + DaprPreviewClient previewClient = daprRun.newDaprPreviewClient()) { + + // Publish messages + for (int i = 0; i < NUM_MESSAGES; i++) { + String message = String.format("CloudEvent message #%d for run %s", i, runId); + client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message).block(); + System.out.println( + String.format("Published CloudEvent message: '%s' to topic '%s'", message, TOPIC_NAME)); + } + + System.out.println("Starting CloudEvent subscription for " + TOPIC_NAME); + + Set messageIds = Collections.synchronizedSet(new HashSet<>()); + + // Use TypeRef> to receive full CloudEvent with metadata + var disposable = previewClient.subscribeToEvents(PUBSUB_NAME, TOPIC_NAME, new TypeRef>(){}) + .doOnNext(cloudEvent -> { + if (cloudEvent.getData() != null && cloudEvent.getData().contains(runId)) { + messageIds.add(cloudEvent.getId()); + System.out.println("Received CloudEvent with ID: " + cloudEvent.getId() + + ", topic: " + cloudEvent.getTopic() + + ", data: " + cloudEvent.getData()); + } + }) + .subscribe(); + + callWithRetry(() -> { + var messageCount = messageIds.size(); + System.out.println( + String.format("Got %d CloudEvent messages out of %d for topic %s.", messageCount, NUM_MESSAGES, TOPIC_NAME)); + assertEquals(NUM_MESSAGES, messageIds.size()); + }, 60000); + + disposable.dispose(); + } + } + + @Test + public void testPubSubRawPayload() throws Exception { + final DaprRun daprRun = closeLater(startDaprApp( + this.getClass().getSimpleName() + "-rawpayload", + 60000)); + + var runId = UUID.randomUUID().toString(); + try (DaprClient client = daprRun.newDaprClient(); + DaprPreviewClient previewClient = daprRun.newDaprPreviewClient()) { + + // Publish messages with rawPayload metadata + for (int i = 0; i < NUM_MESSAGES; i++) { + String message = String.format("RawPayload message #%d for run %s", i, runId); + client.publishEvent(PUBSUB_NAME, TOPIC_NAME, message, Map.of("rawPayload", "true")).block(); + System.out.println( + String.format("Published raw payload message: '%s' to topic '%s'", message, TOPIC_NAME)); + } + + System.out.println("Starting raw payload subscription for " + TOPIC_NAME); + + Set messages = Collections.synchronizedSet(new HashSet<>()); + Map metadata = Map.of("rawPayload", "true"); + + // Use subscribeToEvents with rawPayload metadata + var disposable = previewClient.subscribeToEvents(PUBSUB_NAME, TOPIC_NAME, TypeRef.STRING, metadata) + .doOnNext(rawMessage -> { + if (rawMessage.contains(runId)) { + messages.add(rawMessage); + System.out.println("Received raw payload message: " + rawMessage); + } + }) + .subscribe(); + + callWithRetry(() -> { + var messageCount = messages.size(); + System.out.println( + String.format("Got %d raw payload messages out of %d for topic %s.", messageCount, NUM_MESSAGES, TOPIC_NAME)); assertEquals(NUM_MESSAGES, messages.size()); }, 60000); diff --git a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java index f41df9e80d..f36b59c683 100644 --- a/sdk/src/main/java/io/dapr/client/DaprClientImpl.java +++ b/sdk/src/main/java/io/dapr/client/DaprClientImpl.java @@ -482,15 +482,27 @@ public Subscription subscribeToEvents( * {@inheritDoc} */ @Override - public Flux> subscribeToEvents(String pubsubName, String topic, TypeRef type) { - DaprProtos.SubscribeTopicEventsRequestInitialAlpha1 initialRequest = + public Flux subscribeToEvents(String pubsubName, String topic, TypeRef type) { + return subscribeToEvents(pubsubName, topic, type, null); + } + + /** + * {@inheritDoc} + */ + @Override + public Flux subscribeToEvents(String pubsubName, String topic, TypeRef type, Map metadata) { + DaprProtos.SubscribeTopicEventsRequestInitialAlpha1.Builder initialRequestBuilder = DaprProtos.SubscribeTopicEventsRequestInitialAlpha1.newBuilder() .setTopic(topic) - .setPubsubName(pubsubName) - .build(); + .setPubsubName(pubsubName); + + if (metadata != null && !metadata.isEmpty()) { + initialRequestBuilder.putAllMetadata(metadata); + } + DaprProtos.SubscribeTopicEventsRequestAlpha1 request = DaprProtos.SubscribeTopicEventsRequestAlpha1.newBuilder() - .setInitialRequest(initialRequest) + .setInitialRequest(initialRequestBuilder.build()) .build(); return Flux.create(sink -> { @@ -514,15 +526,6 @@ public Flux> subscribeToEvents(String pubsubName, String topic }, FluxSink.OverflowStrategy.BUFFER); } - /** - * {@inheritDoc} - */ - @Override - public Flux subscribeToEventsData(String pubsubName, String topic, TypeRef type) { - return subscribeToEvents(pubsubName, topic, type) - .map(CloudEvent::getData); - } - @Nonnull private Subscription buildSubscription( SubscriptionListener listener, diff --git a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java index 9677327efb..fa4a1eb9b7 100644 --- a/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java +++ b/sdk/src/main/java/io/dapr/client/DaprPreviewClient.java @@ -285,25 +285,36 @@ Subscription subscribeToEvents( /** * Subscribe to pubsub events via streaming using Project Reactor Flux. + * + *

The type parameter determines what is deserialized from the event data: + *

    + *
  • Use {@code TypeRef.STRING} or similar for raw payload data
  • + *
  • Use {@code new TypeRef>(){}} to receive CloudEvent with metadata
  • + *
+ * * @param pubsubName Name of the pubsub component. * @param topic Name of the topic to subscribe to. * @param type Type for object deserialization. - * @return A Flux of CloudEvents containing deserialized event payloads and metadata. + * @return A Flux of deserialized event payloads. * @param Type of the event payload. */ - Flux> subscribeToEvents(String pubsubName, String topic, TypeRef type); + Flux subscribeToEvents(String pubsubName, String topic, TypeRef type); /** - * Subscribe to pubsub events via streaming using Project Reactor Flux. - * Returns only the deserialized event data without CloudEvent metadata wrapper. + * Subscribe to pubsub events via streaming using Project Reactor Flux with metadata support. + * + *

If metadata is null or empty, this method delegates to {@link #subscribeToEvents(String, String, TypeRef)}. + * Use metadata {@code {"rawPayload": "true"}} for raw payload subscriptions where Dapr + * delivers messages without CloudEvent wrapping. * * @param pubsubName Name of the pubsub component. * @param topic Name of the topic to subscribe to. * @param type Type for object deserialization. - * @return A Flux of deserialized event payloads (no CloudEvent wrapper). + * @param metadata Subscription metadata (e.g., {"rawPayload": "true"}). + * @return A Flux of deserialized event payloads. * @param Type of the event payload. */ - Flux subscribeToEventsData(String pubsubName, String topic, TypeRef type); + Flux subscribeToEvents(String pubsubName, String topic, TypeRef type, Map metadata); /* * Converse with an LLM. diff --git a/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java b/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java index 56131882b8..c15f5a94e4 100644 --- a/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java +++ b/sdk/src/main/java/io/dapr/internal/subscription/EventSubscriberStreamObserver.java @@ -13,7 +13,6 @@ package io.dapr.internal.subscription; -import io.dapr.client.domain.CloudEvent; import io.dapr.exceptions.DaprException; import io.dapr.serializer.DaprObjectSerializer; import io.dapr.utils.TypeRef; @@ -40,7 +39,7 @@ public class EventSubscriberStreamObserver implements StreamObserver> sink; + private final FluxSink sink; private final TypeRef type; private final DaprObjectSerializer objectSerializer; @@ -50,13 +49,13 @@ public class EventSubscriberStreamObserver implements StreamObserver> sink, + FluxSink sink, TypeRef type, DaprObjectSerializer objectSerializer) { this.stub = stub; @@ -91,8 +90,7 @@ public void onNext(DaprProtos.SubscribeTopicEventsResponseAlpha1 response) { try { T data = deserializeEventData(message); - CloudEvent cloudEvent = buildCloudEvent(message, data); - emitEventAndAcknowledge(cloudEvent, eventId); + emitDataAndAcknowledge(data, eventId); } catch (IOException e) { // Deserialization failure - send DROP ack handleDeserializationError(eventId, e); @@ -142,22 +140,11 @@ private T deserializeEventData(DaprAppCallbackProtos.TopicEventRequest message) return objectSerializer.deserialize(message.getData().toByteArray(), type); } - private CloudEvent buildCloudEvent(DaprAppCallbackProtos.TopicEventRequest message, T data) { - CloudEvent cloudEvent = new CloudEvent<>(); - - cloudEvent.setId(message.getId()); - cloudEvent.setType(message.getType()); - cloudEvent.setSpecversion(message.getSpecVersion()); - cloudEvent.setDatacontenttype(message.getDataContentType()); - cloudEvent.setTopic(message.getTopic()); - cloudEvent.setPubsubName(message.getPubsubName()); - cloudEvent.setData(data); - - return cloudEvent; - } - - private void emitEventAndAcknowledge(CloudEvent cloudEvent, String eventId) { - sink.next(cloudEvent); + private void emitDataAndAcknowledge(T data, String eventId) { + // Only emit if data is not null (Reactor doesn't allow null values in Flux) + if (data != null) { + sink.next(data); + } // Send SUCCESS acknowledgment requestStream.onNext(buildSuccessAck(eventId)); diff --git a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java index 0d7309b167..dff71794ea 100644 --- a/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java +++ b/sdk/src/test/java/io/dapr/client/DaprPreviewClientGrpcTest.java @@ -88,6 +88,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import static io.dapr.utils.TestUtils.assertThrowsDaprException; import static org.junit.Assert.assertTrue; @@ -638,12 +639,13 @@ public void onCompleted() { final AtomicInteger eventCount = new AtomicInteger(0); final Semaphore gotAll = new Semaphore(0); + + // subscribeToEvents now returns Flux directly (raw data) var disposable = previewClient.subscribeToEvents(pubsubName, topicName, TypeRef.STRING) - .doOnNext(cloudEvent -> { - assertEquals(data, cloudEvent.getData()); - assertEquals(pubsubName, cloudEvent.getPubsubName()); - assertEquals(topicName, cloudEvent.getTopic()); - assertNotNull(cloudEvent.getId()); + .doOnNext(rawData -> { + // rawData is String directly, not CloudEvent + assertEquals(data, rawData); + assertTrue(rawData instanceof String); int count = eventCount.incrementAndGet(); @@ -660,12 +662,13 @@ public void onCompleted() { } @Test - public void subscribeToEventsDataTest() throws Exception { - var numEvents = 100; + public void subscribeEventsWithMetadataTest() throws Exception { + var numEvents = 10; var pubsubName = "pubsubName"; var topicName = "topicName"; var data = "my message"; var started = new Semaphore(0); + var capturedMetadata = new AtomicReference>(); doAnswer((Answer>) invocation -> { StreamObserver observer = @@ -701,7 +704,11 @@ public void subscribeToEventsDataTest() throws Exception { return new StreamObserver<>() { @Override - public void onNext(DaprProtos.SubscribeTopicEventsRequestAlpha1 subscribeTopicEventsRequestAlpha1) { + public void onNext(DaprProtos.SubscribeTopicEventsRequestAlpha1 request) { + // Capture metadata from initial request + if (request.hasInitialRequest()) { + capturedMetadata.set(request.getInitialRequest().getMetadataMap()); + } started.release(); } @@ -719,11 +726,11 @@ public void onCompleted() { final AtomicInteger eventCount = new AtomicInteger(0); final Semaphore gotAll = new Semaphore(0); + Map metadata = Map.of("rawPayload", "true"); - // Use new subscribeToEventsData - receives raw String, not CloudEvent - var disposable = previewClient.subscribeToEventsData(pubsubName, topicName, TypeRef.STRING) + // Use subscribeToEvents with rawPayload metadata + var disposable = previewClient.subscribeToEvents(pubsubName, topicName, TypeRef.STRING, metadata) .doOnNext(rawData -> { - // rawData is String directly, not CloudEvent assertEquals(data, rawData); assertTrue(rawData instanceof String); @@ -739,6 +746,10 @@ public void onCompleted() { disposable.dispose(); assertEquals(numEvents, eventCount.get()); + + // Verify metadata was passed to gRPC request + assertNotNull(capturedMetadata.get()); + assertEquals("true", capturedMetadata.get().get("rawPayload")); } @Test diff --git a/sdk/src/test/java/io/dapr/internal/subscription/EventSubscriberStreamObserverTest.java b/sdk/src/test/java/io/dapr/internal/subscription/EventSubscriberStreamObserverTest.java index 7328f79e51..d3566a9922 100644 --- a/sdk/src/test/java/io/dapr/internal/subscription/EventSubscriberStreamObserverTest.java +++ b/sdk/src/test/java/io/dapr/internal/subscription/EventSubscriberStreamObserverTest.java @@ -14,7 +14,6 @@ package io.dapr.internal.subscription; import com.google.protobuf.ByteString; -import io.dapr.client.domain.CloudEvent; import io.dapr.exceptions.DaprException; import io.dapr.serializer.DaprObjectSerializer; import io.dapr.serializer.DefaultObjectSerializer; @@ -62,7 +61,7 @@ void setUp() { @Test @DisplayName("Should successfully process events and send SUCCESS acks") void testSuccessfulEventProcessing() { - Flux> flux = Flux.create(sink -> { + Flux flux = Flux.create(sink -> { EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( mockStub, sink, @@ -87,11 +86,8 @@ void testSuccessfulEventProcessing() { }); StepVerifier.create(flux) - .assertNext(cloudEvent -> { - assertEquals("Hello World", cloudEvent.getData()); - assertEquals("event-1", cloudEvent.getId()); - assertEquals(PUBSUB_NAME, cloudEvent.getPubsubName()); - assertEquals(TOPIC_NAME, cloudEvent.getTopic()); + .assertNext(data -> { + assertEquals("Hello World", data); }) .verifyComplete(); @@ -115,7 +111,7 @@ void testSuccessfulEventProcessing() { @Test @DisplayName("Should handle multiple consecutive events correctly") void testMultipleEvents() { - Flux> flux = Flux.create(sink -> { + Flux flux = Flux.create(sink -> { EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( mockStub, sink, @@ -135,17 +131,14 @@ void testMultipleEvents() { }); StepVerifier.create(flux) - .assertNext(cloudEvent -> { - assertEquals("Message 1", cloudEvent.getData()); - assertEquals("event-1", cloudEvent.getId()); + .assertNext(data -> { + assertEquals("Message 1", data); }) - .assertNext(cloudEvent -> { - assertEquals("Message 2", cloudEvent.getData()); - assertEquals("event-2", cloudEvent.getId()); + .assertNext(data -> { + assertEquals("Message 2", data); }) - .assertNext(cloudEvent -> { - assertEquals("Message 3", cloudEvent.getData()); - assertEquals("event-3", cloudEvent.getId()); + .assertNext(data -> { + assertEquals("Message 3", data); }) .verifyComplete(); @@ -155,7 +148,7 @@ void testMultipleEvents() { @Test @DisplayName("Should send DROP ack when deserialization fails") void testDeserializationError() { - Flux> flux = Flux.create(sink -> { + Flux flux = Flux.create(sink -> { EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( mockStub, sink, @@ -209,7 +202,7 @@ void testDeserializationError() { @Test @DisplayName("Should send RETRY ack when non-deserialization error occurs") void testProcessingError() { - Flux> flux = Flux.create(sink -> { + Flux flux = Flux.create(sink -> { EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( mockStub, sink, @@ -240,7 +233,7 @@ void testProcessingError() { @Test @DisplayName("Should propagate gRPC errors as DaprException") void testGrpcError() { - Flux> flux = Flux.create(sink -> { + Flux flux = Flux.create(sink -> { EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( mockStub, sink, @@ -263,7 +256,7 @@ void testGrpcError() { @Test @DisplayName("Should handle null event messages gracefully without emitting events") void testNullEventMessage() { - Flux> flux = Flux.create(sink -> { + Flux flux = Flux.create(sink -> { EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( mockStub, sink, @@ -291,7 +284,7 @@ void testNullEventMessage() { @Test @DisplayName("Should skip events with empty pubsub name") void testEmptyPubsubName() { - Flux> flux = Flux.create(sink -> { + Flux flux = Flux.create(sink -> { EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( mockStub, sink, @@ -327,7 +320,7 @@ void testEmptyPubsubName() { @Test @DisplayName("Should skip events with empty event ID") void testEmptyEventId() { - Flux> flux = Flux.create(sink -> { + Flux flux = Flux.create(sink -> { EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( mockStub, sink, @@ -361,13 +354,13 @@ void testEmptyEventId() { } @Test - @DisplayName("Should handle null type parameter and emit CloudEvent with null data") - void testNullData() { - Flux> flux = Flux.create(sink -> { + @DisplayName("Should handle null type parameter by skipping emission but still sending ack") + void testNullTypeSkipsEmission() { + Flux flux = Flux.create(sink -> { EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( mockStub, sink, - null, // null type + null, // null type - deserialize returns null objectSerializer ); @@ -379,15 +372,11 @@ void testNullData() { observer.onCompleted(); }); + // No events emitted since null values are skipped (Reactor doesn't allow null) StepVerifier.create(flux) - .assertNext(cloudEvent -> { - assertNull(cloudEvent.getData()); - assertEquals("event-1", cloudEvent.getId()); - assertEquals(PUBSUB_NAME, cloudEvent.getPubsubName()); - assertEquals(TOPIC_NAME, cloudEvent.getTopic()); - }) .verifyComplete(); + // But ack is still sent verify(mockRequestStream, times(2)).onNext(any()); } @@ -397,7 +386,7 @@ void testComplexObjectSerialization() throws IOException { TestEvent testEvent = new TestEvent("test-name", 42); byte[] serializedEvent = objectSerializer.serialize(testEvent); - Flux> flux = Flux.create(sink -> { + Flux flux = Flux.create(sink -> { EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( mockStub, sink, @@ -425,11 +414,9 @@ void testComplexObjectSerialization() throws IOException { }); StepVerifier.create(flux) - .assertNext(cloudEvent -> { - TestEvent event = cloudEvent.getData(); + .assertNext(event -> { assertEquals("test-name", event.name); assertEquals(42, event.value); - assertEquals("event-1", cloudEvent.getId()); }) .verifyComplete(); } @@ -441,7 +428,7 @@ void testErrorDuringSendingAck() { .when(mockRequestStream) .onNext(argThat(DaprProtos.SubscribeTopicEventsRequestAlpha1::hasEventProcessed)); - Flux> flux = Flux.create(sink -> { + Flux flux = Flux.create(sink -> { EventSubscriberStreamObserver observer = new EventSubscriberStreamObserver<>( mockStub, sink, @@ -456,7 +443,7 @@ void testErrorDuringSendingAck() { }); StepVerifier.create(flux) - .assertNext(cloudEvent -> assertEquals("Hello", cloudEvent.getData())) // Event is emitted before ack + .assertNext(data -> assertEquals("Hello", data)) // Event is emitted before ack .expectError(DaprException.class) // Then error when sending ack .verify(); } From 4a033d8e8955bb17d11b0dc4422613322e679038 Mon Sep 17 00:00:00 2001 From: Artur Ciocanu Date: Wed, 14 Jan 2026 22:40:42 -0500 Subject: [PATCH 3/5] Updating the examples to use the latest changes. Signed-off-by: Artur Ciocanu --- .../io/dapr/examples/pubsub/stream/README.md | 81 ++++++++++++++----- .../examples/pubsub/stream/Subscriber.java | 46 +++++------ .../pubsub/stream/SubscriberCloudEvent.java | 78 ++++++++++++++++++ 3 files changed, 160 insertions(+), 45 deletions(-) create mode 100644 examples/src/main/java/io/dapr/examples/pubsub/stream/SubscriberCloudEvent.java diff --git a/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md b/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md index da3e4e2482..7916f62783 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md +++ b/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md @@ -3,7 +3,7 @@ In this sample, we'll create a publisher and a subscriber java applications using Dapr, based on the publish-subscribe pattern. The publisher will generate messages of a specific topic, while a subscriber will listen for messages of a specific topic via a bi-directional stream. All is abstracted by the SDK. See the [Dapr Pub-Sub docs](https://docs.dapr.io/developing-applications/building-blocks/pubsub/) to understand when this pattern might be a good choice for your software architecture. Visit [this](https://docs.dapr.io/developing-applications/building-blocks/pubsub/pubsub-overview/) link for more information about Dapr and Pub-Sub. - + ## Pub-Sub Sample using the Java-SDK This sample shows how the subscription to events no longer requires the application to listen to an HTTP or gRPC port. This example uses Redis Streams (enabled in Redis versions => 5). @@ -41,45 +41,80 @@ cd examples Run `dapr init` to initialize Dapr in Self-Hosted Mode if it's not already initialized. -### Running the subscriber - -The subscriber uses the `DaprPreviewClient` interface to use a new feature where events are subscribed via a streaming and processed via a callback interface. +## Running the Subscriber +The subscriber uses the `DaprPreviewClient` interface to subscribe to events via streaming and process them using reactive operators. +The SDK provides two ways to subscribe to events: -The publisher is a simple Java application with a main method that uses the Dapr gRPC Client to publish 10 messages to a specific topic. +### Option 1: Raw Data Subscription -In the `Subscriber.java` file, you will find the `Subscriber` class, containing the main method. The main method declares a `DaprPreviewClient` using the `DaprClientBuilder` class. When invoking `subscribeToEvents`, the method returns a `Flux>` that can be processed using reactive operators like `doOnNext()` for event handling and `doOnError()` for error handling. The example uses `blockLast()` to keep the subscriber running indefinitely. For production use cases requiring explicit subscription lifecycle control, you can use `.subscribe()` which returns a `Disposable` that can be disposed via `disposable.dispose()`. +Use `TypeRef.STRING` (or any other type) to receive the deserialized message data directly: ```java public class Subscriber { - // ... + public static void main(String[] args) throws Exception { + try (var client = new DaprClientBuilder().buildPreviewClient()) { + // Subscribe to events - receives raw String data directly + client.subscribeToEvents(PUBSUB_NAME, topicName, TypeRef.STRING) + .doOnNext(message -> { + System.out.println("Subscriber got: " + message); + }) + .doOnError(throwable -> { + System.out.println("Subscriber got exception: " + throwable.getMessage()); + }) + .blockLast(); + } + } +} +``` + +### Option 2: CloudEvent Subscription + +Use `TypeRef>` to receive the full CloudEvent with metadata (ID, source, type, etc.): + +```java +public class SubscriberCloudEvent { public static void main(String[] args) throws Exception { - String topicName = getTopicName(args); try (var client = new DaprClientBuilder().buildPreviewClient()) { - // Subscribe to events using the Flux-based reactive API - // The stream will emit CloudEvent objects as they arrive - client.subscribeToEvents( - PUBSUB_NAME, - topicName, - TypeRef.STRING) - .doOnNext(event -> { - System.out.println("Subscriber got: " + event.getData()); + // Subscribe to events - receives CloudEvent with full metadata + client.subscribeToEvents(PUBSUB_NAME, topicName, new TypeRef>() {}) + .doOnNext(cloudEvent -> { + System.out.println("Received CloudEvent:"); + System.out.println(" ID: " + cloudEvent.getId()); + System.out.println(" Type: " + cloudEvent.getType()); + System.out.println(" Data: " + cloudEvent.getData()); }) .doOnError(throwable -> { System.out.println("Subscriber got exception: " + throwable.getMessage()); }) - .blockLast(); // Blocks indefinitely until the stream completes (keeps the subscriber running) + .blockLast(); } } - - // ... } ``` -Execute the following command to run the Subscriber example: +### Subscription with Metadata + +You can also pass metadata to the subscription, for example to enable raw payload mode: + +```java +client.subscribeToEvents(PUBSUB_NAME, topicName, TypeRef.STRING, Map.of("rawPayload", "true")) + .doOnNext(message -> { + System.out.println("Subscriber got: " + message); + }) + .blockLast(); +``` + +### Subscription Lifecycle + +The examples use `blockLast()` to keep the subscriber running indefinitely. For production use cases requiring explicit subscription lifecycle control, you can use `.subscribe()` which returns a `Disposable` that can be disposed via `disposable.dispose()`. + +## Running the Examples + +Execute the following command to run the raw data Subscriber example: +Or run the CloudEvent Subscriber example: + +```bash +dapr run --resources-path ./components/pubsub --app-id subscriber -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.stream.SubscriberCloudEvent +``` + Once the subscriber is running, run the publisher in a new terminal to see the events in the subscriber's side: