diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java index 50b9692e9052..447bb53eb655 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java @@ -15,6 +15,7 @@ import reactor.core.Disposable; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxOperator; +import reactor.core.publisher.Mono; import reactor.core.publisher.Operators; import reactor.core.publisher.Sinks; @@ -127,10 +128,15 @@ static class EventDataAggregatorMain implements Subscription, CoreSubscriber Flux.interval(options.getMaxWaitTime()).takeUntil(index -> isCompleted.get()))) - .subscribe(index -> { + this.disposable + = eventSink.asFlux().switchMap(ignored -> Mono.delay(options.getMaxWaitTime())).subscribe(index -> { + if (isCompleted.get()) { + logger.atVerbose() + .addKeyValue(PARTITION_ID_KEY, partitionId) + .log("Aggregator already completed. Skipping timed publish."); + return; + } + logger.atVerbose() .addKeyValue(PARTITION_ID_KEY, partitionId) .log("Time elapsed. Attempt to publish downstream."); @@ -180,10 +186,10 @@ public void cancel() { // Do not keep requesting more events upstream logger.atVerbose().addKeyValue(PARTITION_ID_KEY, partitionId).log("Disposing of aggregator."); subscription.cancel(); + disposeTimeoutSubscription(); updateOrPublishBatch(null, true); downstream.onComplete(); - disposable.dispose(); } @Override @@ -219,6 +225,7 @@ public void onError(Throwable t) { return; } + disposeTimeoutSubscription(); updateOrPublishBatch(null, true); downstream.onError(t); } @@ -229,6 +236,7 @@ public void onError(Throwable t) { @Override public void onComplete() { if (isCompleted.compareAndSet(false, true)) { + disposeTimeoutSubscription(); updateOrPublishBatch(null, true); downstream.onComplete(); } @@ -279,6 +287,11 @@ private void publishDownstream(boolean isFlush) { synchronized (lock) { previous = this.currentBatch; if (previous == null) { + if (isCompleted.get()) { + logger.verbose("Aggregator is completed. Skipping batch recreation."); + return; + } + logger.warning("Batch should not be null, setting a new batch."); this.currentBatch = batchSupplier.get(); if (isFlush) { @@ -343,5 +356,9 @@ private void publishDownstream(boolean isFlush) { private static boolean isFlushSignal(EventData eventData) { return eventData instanceof FlushSignal; } + + private void disposeTimeoutSubscription() { + disposable.dispose(); + } } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataBatch.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataBatch.java index 090ac2a24728..076399828fa9 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataBatch.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataBatch.java @@ -42,6 +42,7 @@ public final class EventDataBatch { private final String partitionKey; private final ErrorContextProvider contextProvider; private final List events; + private final List amqpMessages; private final String partitionId; private int sizeInBytes; private final EventHubsTracer tracer; @@ -53,6 +54,7 @@ public final class EventDataBatch { this.partitionId = partitionId; this.contextProvider = contextProvider; this.events = new LinkedList<>(); + this.amqpMessages = new LinkedList<>(); this.sizeInBytes = (maxMessageSize / 65536) * 1024; // reserve 1KB for every 64KB this.tracer = instrumentation.getTracer(); } @@ -106,7 +108,8 @@ public boolean tryAdd(final EventData eventData) { tracer.reportMessageSpan(eventData, eventData.getContext()); - final int size = getSize(eventData, events.isEmpty()); + final Message amqpMessage = createAmqpMessage(eventData, partitionKey); + final int size = getSize(eventData, amqpMessage, events.isEmpty()); if (this.sizeInBytes + size > this.maxMessageSize) { return false; } @@ -119,6 +122,7 @@ public boolean tryAdd(final EventData eventData) { } this.events.add(eventData); + this.amqpMessages.add(amqpMessage); return true; } @@ -126,6 +130,10 @@ List getEvents() { return events; } + List getMessages() { + return amqpMessages; + } + String getPartitionKey() { return partitionKey; } @@ -134,20 +142,20 @@ String getPartitionId() { return partitionId; } - private int getSize(final EventData eventData, final boolean isFirst) { + private int getSize(final EventData eventData, final Message amqpMessage, final boolean isFirst) { Objects.requireNonNull(eventData, "'eventData' cannot be null."); - final Message amqpMessage = createAmqpMessage(eventData, partitionKey); int eventSize = encodedSize(amqpMessage); // actual encoded bytes size eventSize += 16; // data section overhead if (isFirst) { - amqpMessage.setBody(null); - amqpMessage.setApplicationProperties(null); - amqpMessage.setProperties(null); - amqpMessage.setDeliveryAnnotations(null); + final Message batchEnvelopeMessage = createAmqpMessage(eventData, partitionKey); + batchEnvelopeMessage.setBody(null); + batchEnvelopeMessage.setApplicationProperties(null); + batchEnvelopeMessage.setProperties(null); + batchEnvelopeMessage.setDeliveryAnnotations(null); - eventSize += encodedSize(amqpMessage); + eventSize += encodedSize(batchEnvelopeMessage); } return eventSize; diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java index ef1eb6510383..e7c416938d1f 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducer.java @@ -21,7 +21,6 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; -import reactor.core.scheduler.Schedulers; import java.io.Closeable; import java.time.Duration; @@ -79,7 +78,7 @@ class EventHubBufferedPartitionProducer implements Closeable { this.publishResultSubscriber = new PublishResultSubscriber(partitionId, this.eventSink, options.getSendSucceededContext(), options.getSendFailedContext(), retryOptions.getTryTimeout(), logger); - this.publishSubscription = publishEvents(eventDataAggregator).publishOn(Schedulers.boundedElastic(), 1) + this.publishSubscription = publishEvents(eventDataAggregator).publishOn(client.getScheduler(), 1) .subscribeWith(publishResultSubscriber); this.tracer = new EventHubsTracer(tracer, client.getFullyQualifiedNamespace(), client.getEventHubName(), null); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java index 17370532d93e..a41011c5fa01 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClient.java @@ -35,7 +35,6 @@ import java.util.function.Function; import java.util.function.Supplier; import java.util.stream.Collector; -import java.util.stream.Collectors; import static com.azure.core.amqp.implementation.RetryUtil.withRetry; import static com.azure.core.util.FluxUtil.monoError; @@ -580,8 +579,7 @@ public Mono send(EventDataBatch batch) { .log("Sending batch to be distributed round-robin in service."); } - final List messages - = batch.getEvents().stream().map(messageSerializer::serialize).collect(Collectors.toList()); + final List messages = batch.getMessages(); final Mono sendMessage = getSendLink(batch.getPartitionId()) .flatMap(link -> messages.size() == 1 ? link.send(messages.get(0)) : link.send(messages)); @@ -593,6 +591,10 @@ public Mono send(EventDataBatch batch) { return instrumentation.sendBatch(send, batch); } + Scheduler getScheduler() { + return scheduler; + } + private Mono sendInternal(Flux events, SendOptions options) { final String partitionKey = options.getPartitionKey(); final String partitionId = options.getPartitionId(); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/MessageUtils.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/MessageUtils.java index 321832a05a58..f1cc1911e225 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/MessageUtils.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/MessageUtils.java @@ -25,6 +25,7 @@ import java.time.Duration; import java.time.Instant; import java.time.ZoneOffset; +import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.Locale; @@ -106,37 +107,21 @@ public static Message toProtonJMessage(AmqpAnnotatedMessage message) { } if (properties.getTo() != null) { - if (protonJMessage.getProperties() == null) { - protonJMessage.setProperties(new Properties()); - } - - protonJMessage.getProperties().setTo(properties.getTo().toString()); + getOrCreateProperties(protonJMessage).setTo(properties.getTo().toString()); } // The default is byte[0] when getting a user id that has not been set. if (properties.getUserId() != null && properties.getUserId().length > 0) { - if (protonJMessage.getProperties() == null) { - protonJMessage.setProperties(new Properties()); - } - - protonJMessage.getProperties().setUserId(new Binary(properties.getUserId())); + getOrCreateProperties(protonJMessage).setUserId(new Binary(properties.getUserId())); } if (properties.getAbsoluteExpiryTime() != null) { - if (protonJMessage.getProperties() == null) { - protonJMessage.setProperties(new Properties()); - } - - protonJMessage.getProperties() + getOrCreateProperties(protonJMessage) .setAbsoluteExpiryTime(Date.from(properties.getAbsoluteExpiryTime().toInstant())); } if (properties.getCreationTime() != null) { - if (protonJMessage.getProperties() == null) { - protonJMessage.setProperties(new Properties()); - } - - protonJMessage.getProperties().setCreationTime(Date.from(properties.getCreationTime().toInstant())); + getOrCreateProperties(protonJMessage).setCreationTime(Date.from(properties.getCreationTime().toInstant())); } // Set header @@ -338,16 +323,21 @@ public static AmqpAnnotatedMessage toAmqpAnnotatedMessage(Message message) { public static Map convert(Map sourceMap) { if (sourceMap == null) { return null; + } else if (sourceMap.isEmpty()) { + return Collections.emptyMap(); } - return sourceMap.entrySet().stream().collect(HashMap::new, (existing, entry) -> { + final Map converted = new HashMap<>(calculateInitialCapacity(sourceMap.size())); + for (Map.Entry entry : sourceMap.entrySet()) { if (entry.getValue() instanceof Instant) { final long epochMilli = ((Instant) entry.getValue()).toEpochMilli(); - existing.put(Symbol.valueOf(entry.getKey()), new Date(epochMilli)); + converted.put(Symbol.valueOf(entry.getKey()), new Date(epochMilli)); } else { - existing.put(Symbol.valueOf(entry.getKey()), entry.getValue()); + converted.put(Symbol.valueOf(entry.getKey()), entry.getValue()); } - }, (HashMap::putAll)); + } + + return converted; } /** @@ -381,6 +371,18 @@ private static void setValues(Map sourceMap, Map } } + private static int calculateInitialCapacity(int expectedSize) { + return Math.max(16, (int) (expectedSize / 0.75f) + 1); + } + + private static Properties getOrCreateProperties(Message protonJMessage) { + if (protonJMessage.getProperties() == null) { + protonJMessage.setProperties(new Properties()); + } + + return protonJMessage.getProperties(); + } + /** * Private constructor. */ diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/SynchronousEventSubscriber.java b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/SynchronousEventSubscriber.java index 96f9b03d4ebf..cfe1708e9abc 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/SynchronousEventSubscriber.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/implementation/SynchronousEventSubscriber.java @@ -12,8 +12,11 @@ import java.util.Collections; import java.util.Objects; -import java.util.Timer; -import java.util.TimerTask; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import static com.azure.messaging.eventhubs.implementation.ClientConstants.SUBSCRIBER_ID_KEY; @@ -21,10 +24,12 @@ * Subscriber that takes {@link SynchronousReceiveWork} and publishes events to them in the order received. */ public class SynchronousEventSubscriber extends BaseSubscriber { - private final Timer timer = new Timer(); + private static final ScheduledExecutorService TIMEOUT_SCHEDULER = createTimeoutScheduler(); + private final ClientLogger logger; private final SynchronousReceiveWork work; private volatile Subscription subscription; + private volatile ScheduledFuture timeoutTask; private final Context context; private final String subscriberId; @@ -55,7 +60,8 @@ protected void hookOnSubscribe(Subscription subscription) { logger.atInfo().addKeyValue("pendingEvents", work.getNumberOfEvents()).log("Scheduling receive timeout task."); subscription.request(work.getNumberOfEvents()); - timer.schedule(new ReceiveTimeoutTask(this::dispose, this.logger), work.getTimeout().toMillis()); + timeoutTask = TIMEOUT_SCHEDULER.schedule(new ReceiveTimeoutTask(this::dispose, this.logger), + work.getTimeout().toMillis(), TimeUnit.MILLISECONDS); } /** @@ -96,12 +102,29 @@ protected void hookOnError(Throwable throwable) { @Override public void dispose() { work.complete(); - subscription.cancel(); - timer.cancel(); + if (subscription != null) { + subscription.cancel(); + } + + if (timeoutTask != null) { + timeoutTask.cancel(false); + } + super.dispose(); } - private static class ReceiveTimeoutTask extends TimerTask { + static ScheduledExecutorService getTimeoutScheduler() { + return TIMEOUT_SCHEDULER; + } + + private static ScheduledExecutorService createTimeoutScheduler() { + final ScheduledThreadPoolExecutor scheduler + = new ScheduledThreadPoolExecutor(1, new ReceiveTimeoutThreadFactory()); + scheduler.setRemoveOnCancelPolicy(true); + return scheduler; + } + + private static class ReceiveTimeoutTask implements Runnable { private final ClientLogger logger; private final Runnable onDispose; @@ -110,10 +133,18 @@ private static class ReceiveTimeoutTask extends TimerTask { this.logger = logger; } - @Override public void run() { logger.info("Timeout encountered, disposing of subscriber."); onDispose.run(); } } + + private static class ReceiveTimeoutThreadFactory implements ThreadFactory { + @Override + public Thread newThread(Runnable runnable) { + final Thread thread = new Thread(runnable, "eventhubs-sync-receive-timeout"); + thread.setDaemon(true); + return thread; + } + } } diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataAggregatorTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataAggregatorTest.java index a28b6b29934a..ee606face685 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataAggregatorTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataAggregatorTest.java @@ -204,6 +204,37 @@ public void pushesBatchAfterMaxTime() { }).thenCancel().verify(); } + @Test + public void doesNotCreateNewBatchAfterCompletion() throws InterruptedException { + final List batchEvents = new ArrayList<>(); + setupBatchMock(batch, batchEvents, event1); + + final Duration waitTime = Duration.ofMillis(50); + final BufferedProducerClientOptions options = new BufferedProducerClientOptions(); + options.setMaxWaitTime(waitTime); + + final AtomicInteger batchSupplierInvocations = new AtomicInteger(); + final Supplier supplier = () -> { + batchSupplierInvocations.incrementAndGet(); + return batch; + }; + + final TestPublisher publisher = TestPublisher.createCold(); + final EventDataAggregator aggregator + = new EventDataAggregator(publisher.flux(), supplier, NAMESPACE, options, PARTITION_ID); + + StepVerifier.create(aggregator) + .then(() -> publisher.next(event1)) + .then(publisher::complete) + .expectNext(batch) + .expectComplete() + .verify(Duration.ofSeconds(5)); + + Thread.sleep(waitTime.multipliedBy(3).toMillis()); + + assertEquals(1, batchSupplierInvocations.get(), "No batch should be recreated after completion."); + } + /** * Tests that it pushes partial batches downstream when flush signal arrives. */ diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataBatchTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataBatchTest.java index 766d276f4e57..1f10b09abd49 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataBatchTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventDataBatchTest.java @@ -6,8 +6,12 @@ import com.azure.core.amqp.exception.AmqpErrorCondition; import com.azure.core.amqp.exception.AmqpErrorContext; import com.azure.core.amqp.exception.AmqpException; +import com.azure.core.amqp.implementation.AmqpConstants; import com.azure.core.amqp.implementation.ErrorContextProvider; import com.azure.messaging.eventhubs.implementation.ClientConstants; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.message.Message; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -15,6 +19,9 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.when; @@ -81,6 +88,23 @@ public void withinPayloadSize() { Assertions.assertEquals(1, batch.getCount()); } + @Test + public void cachesSerializedMessageWhenAdded() { + final EventDataBatch batch = new EventDataBatch(ClientConstants.MAX_MESSAGE_LENGTH_BYTES, null, PARTITION_KEY, + null, DEFAULT_INSTRUMENTATION); + final EventData event = new EventData("payload".getBytes(UTF_8)); + + Assertions.assertTrue(batch.tryAdd(event)); + Assertions.assertEquals(1, batch.getMessages().size()); + + final Message message = batch.getMessages().get(0); + final Data body = (Data) message.getBody(); + final Binary binary = body.getValue(); + + assertArrayEquals(event.getBody(), binary.getArray()); + assertEquals(PARTITION_KEY, message.getMessageAnnotations().getValue().get(AmqpConstants.PARTITION_KEY)); + } + /** * Verify that we can create a batch with partition id and key. */ diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducerTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducerTest.java index c719998cd4c7..878387d8e4b4 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducerTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubBufferedPartitionProducerTest.java @@ -23,6 +23,8 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.core.publisher.Sinks; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; import reactor.test.StepVerifier; import reactor.util.concurrent.Queues; @@ -35,6 +37,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; import static com.azure.messaging.eventhubs.EventDataAggregatorTest.setupBatchMock; @@ -61,6 +64,7 @@ public class EventHubBufferedPartitionProducerTest { private static final int QUEUE_SIZE = 5; private AutoCloseable mockCloseable; + private Scheduler scheduler; private final Semaphore successSemaphore = new Semaphore(1); private final Semaphore failedSemaphore = new Semaphore(1); @@ -98,6 +102,7 @@ public class EventHubBufferedPartitionProducerTest { @BeforeEach public void beforeEach() { mockCloseable = MockitoAnnotations.openMocks(this); + scheduler = Schedulers.newSingle("buffered-producer-test"); returnedBatches.add(batch); returnedBatches.add(batch2); @@ -107,6 +112,7 @@ public void beforeEach() { when(client.getFullyQualifiedNamespace()).thenReturn(NAMESPACE); when(client.getEventHubName()).thenReturn(EVENT_HUB_NAME); + when(client.getScheduler()).thenReturn(scheduler); when(client.getPartitionIds()).thenReturn(Flux.fromIterable(PARTITION_IDS)); when(client.createBatch(any(CreateBatchOptions.class))).thenAnswer(invocation -> { @@ -125,9 +131,44 @@ public void afterEach() throws Exception { mockCloseable.close(); } + if (scheduler != null) { + scheduler.dispose(); + } + Mockito.framework().clearInlineMock(this); } + @Test + public void publishesUsingClientScheduler() throws InterruptedException { + successSemaphore.acquire(); + failedSemaphore.acquire(); + + final AtomicReference callbackThread = new AtomicReference<>(); + final BufferedProducerClientOptions options = new BufferedProducerClientOptions(); + options.setMaxWaitTime(Duration.ofSeconds(5)); + options.setSendSucceededContext(context -> { + callbackThread.set(Thread.currentThread().getName()); + successSemaphore.release(); + }); + options.setSendFailedContext(context -> failedSemaphore.release()); + + final List batchEvents = new ArrayList<>(); + setupBatchMock(batch, batchEvents, event1, event2); + when(client.send(any(EventDataBatch.class))).thenReturn(Mono.empty()); + + final EventHubBufferedPartitionProducer producer = new EventHubBufferedPartitionProducer(client, PARTITION_ID, + options, DEFAULT_RETRY_OPTIONS, eventSink, null); + + StepVerifier.create(Mono.when(producer.enqueueEvent(event1), producer.enqueueEvent(event2))) + .expectComplete() + .verify(DEFAULT_RETRY_OPTIONS.getTryTimeout()); + + final Duration waitTime = options.getMaxWaitTime().plus(options.getMaxWaitTime()); + assertTrue(successSemaphore.tryAcquire(waitTime.toMillis(), TimeUnit.MILLISECONDS)); + assertNotNull(callbackThread.get()); + assertTrue(callbackThread.get().contains("buffered-producer-test")); + } + @Test public void publishesEvents() throws InterruptedException { // Arrange diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientTest.java index 55d3f5a2c4d9..8773bf7cafe4 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubProducerAsyncClientTest.java @@ -879,6 +879,32 @@ void sendEventIterableRequired() { .verify(DEFAULT_TIMEOUT); } + @Test + void sendBatchReusesCachedBatchMessages() { + final MessageSerializer serializer = mock(MessageSerializer.class); + final EventHubProducerAsyncClient asyncProducer + = new EventHubProducerAsyncClient(HOSTNAME, EVENT_HUB_NAME, connectionProcessor, retryOptions, serializer, + Schedulers.parallel(), false, onClientClosed, CLIENT_IDENTIFIER, DEFAULT_INSTRUMENTATION); + final EventDataBatch batch + = new EventDataBatch(ClientConstants.MAX_MESSAGE_LENGTH_BYTES, null, null, null, DEFAULT_INSTRUMENTATION); + final EventData event = new EventData("payload"); + + when(connection.createSendLink(eq(EVENT_HUB_NAME), eq(EVENT_HUB_NAME), eq(retryOptions), eq(CLIENT_IDENTIFIER))) + .thenReturn(Mono.just(sendLink)); + when(sendLink.getHostname()).thenReturn(HOSTNAME); + when(sendLink.getEntityPath()).thenReturn(EVENT_HUB_NAME); + when(sendLink.send(any(Message.class))).thenReturn(Mono.empty()); + + Assertions.assertTrue(batch.tryAdd(event)); + final Message expected = batch.getMessages().get(0); + + StepVerifier.create(asyncProducer.send(batch)).expectComplete().verify(DEFAULT_TIMEOUT); + + verify(sendLink).send(singleMessageCaptor.capture()); + assertSame(expected, singleMessageCaptor.getValue()); + verify(serializer, never()).serialize(any()); + } + @Test void sendEventFluxRequired() { // Arrange diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/MessageUtilsTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/MessageUtilsTest.java index dfc193ffd793..bcbb3c7e94a2 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/MessageUtilsTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/MessageUtilsTest.java @@ -26,10 +26,13 @@ import java.time.Duration; import java.time.Instant; import java.time.ZoneOffset; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -193,6 +196,28 @@ public void toProtonJMessageNoHeaderPropertiesSet() { assertProperties(actual); } + @Test + public void convertReturnsEmptyMapWhenSourceEmpty() { + final Map actual = MessageUtils.convert(Collections.emptyMap()); + + assertNotNull(actual); + assertTrue(actual.isEmpty()); + } + + @Test + public void convertConvertsInstantValuesToDate() { + final Instant instant = Instant.ofEpochSecond(1825644760L); + final Map source = new HashMap<>(); + source.put("instant", instant); + source.put("value", 5L); + + final Map actual = MessageUtils.convert(source); + + assertInstanceOf(java.util.Date.class, actual.get(Symbol.valueOf("instant"))); + assertEquals(instant.toEpochMilli(), ((java.util.Date) actual.get(Symbol.valueOf("instant"))).getTime()); + assertEquals(5L, actual.get(Symbol.valueOf("value"))); + } + private void assertBody(Message actual) { final Section body = actual.getBody(); assertTrue(body instanceof Data); diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessorTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessorTest.java index 1efa23daa1fe..19da68cb4c59 100644 --- a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessorTest.java +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/AmqpReceiveLinkProcessorTest.java @@ -25,12 +25,15 @@ import org.mockito.MockitoAnnotations; import org.reactivestreams.Subscription; import reactor.core.Disposable; +import reactor.core.publisher.BaseSubscriber; import reactor.core.publisher.Flux; import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; import reactor.test.publisher.TestPublisher; +import java.lang.reflect.Field; +import java.lang.reflect.Method; import java.time.Duration; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -114,6 +117,42 @@ void constructor() { () -> new AmqpReceiveLinkProcessor(null, PREFETCH, "partition", parentConnection, DEFAULT_INSTRUMENTATION)); } + @Test + void boundsBufferedMessagesToTwicePrefetch() throws Exception { + final Field maxQueueSize = AmqpReceiveLinkProcessor.class.getDeclaredField("maxQueueSize"); + maxQueueSize.setAccessible(true); + + Assertions.assertEquals(PREFETCH * 2, maxQueueSize.get(linkProcessor)); + } + + @Test + void creditCalculationAccountsForQueuedMessages() throws Exception { + final Field requestedField = AmqpReceiveLinkProcessor.class.getDeclaredField("requested"); + requestedField.setAccessible(true); + requestedField.setLong(linkProcessor, 3L); + + final Field downstreamField = AmqpReceiveLinkProcessor.class.getDeclaredField("downstream"); + downstreamField.setAccessible(true); + @SuppressWarnings("unchecked") + AtomicReference> downstream + = (AtomicReference>) downstreamField.get(linkProcessor); + downstream.set(new BaseSubscriber() { + }); + + final Field messageQueueField = AmqpReceiveLinkProcessor.class.getDeclaredField("messageQueue"); + messageQueueField.setAccessible(true); + @SuppressWarnings("unchecked") + java.util.Deque queue = (java.util.Deque) messageQueueField.get(linkProcessor); + queue.add(mock(Message.class)); + queue.add(mock(Message.class)); + queue.add(mock(Message.class)); + + final Method getCreditsToAdd = AmqpReceiveLinkProcessor.class.getDeclaredMethod("getCreditsToAdd"); + getCreditsToAdd.setAccessible(true); + + Assertions.assertEquals(0, getCreditsToAdd.invoke(linkProcessor)); + } + /** * Verifies that we can get a new AMQP receive link and fetch a few messages. */ diff --git a/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/SynchronousEventSubscriberTest.java b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/SynchronousEventSubscriberTest.java new file mode 100644 index 000000000000..716b1cf4c991 --- /dev/null +++ b/sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/implementation/SynchronousEventSubscriberTest.java @@ -0,0 +1,55 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.messaging.eventhubs.implementation; + +import com.azure.messaging.eventhubs.models.PartitionEvent; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.reactivestreams.Subscription; +import reactor.core.publisher.FluxSink; + +import java.time.Duration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class SynchronousEventSubscriberTest { + @Test + @SuppressWarnings("unchecked") + void timeoutDisposesSubscriberWithoutDedicatedTimerThread() throws InterruptedException { + final CountDownLatch completed = new CountDownLatch(1); + final FluxSink emitter = Mockito.mock(FluxSink.class); + final Subscription subscription = Mockito.mock(Subscription.class); + + when(emitter.isCancelled()).thenReturn(false); + doAnswer(invocation -> { + completed.countDown(); + return null; + }).when(emitter).complete(); + + final SynchronousReceiveWork work = new SynchronousReceiveWork(1L, 1, Duration.ofMillis(25), emitter); + final SynchronousEventSubscriber subscriber = new SynchronousEventSubscriber(work); + + subscriber.onSubscribe(subscription); + + assertTrue(completed.await(5, TimeUnit.SECONDS), "The timeout should complete the work."); + verify(subscription, atLeastOnce()).request(1L); + verify(subscription, atLeastOnce()).cancel(); + } + + @Test + void timeoutSchedulerRemovesCancelledTasksImmediately() { + final ScheduledThreadPoolExecutor scheduler + = assertInstanceOf(ScheduledThreadPoolExecutor.class, SynchronousEventSubscriber.getTimeoutScheduler()); + + assertTrue(scheduler.getRemoveOnCancelPolicy(), "Canceled timeout tasks should be removed immediately."); + } +}