Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxOperator;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Sinks;

Expand Down Expand Up @@ -127,10 +128,15 @@ static class EventDataAggregatorMain implements Subscription, CoreSubscriber<Eve
this.currentBatch = batchSupplier.get();

this.eventSink = Sinks.many().unicast().onBackpressureError();
this.disposable = Flux
.switchOnNext(eventSink.asFlux()
.map(e -> Flux.interval(options.getMaxWaitTime()).takeUntil(index -> isCompleted.get())))
.subscribe(index -> {
this.disposable
= eventSink.asFlux().switchMap(ignored -> Mono.delay(options.getMaxWaitTime())).subscribe(index -> {
if (isCompleted.get()) {
logger.atVerbose()
.addKeyValue(PARTITION_ID_KEY, partitionId)
.log("Aggregator already completed. Skipping timed publish.");
return;
}

logger.atVerbose()
.addKeyValue(PARTITION_ID_KEY, partitionId)
.log("Time elapsed. Attempt to publish downstream.");
Expand Down Expand Up @@ -180,10 +186,10 @@ public void cancel() {
// Do not keep requesting more events upstream
logger.atVerbose().addKeyValue(PARTITION_ID_KEY, partitionId).log("Disposing of aggregator.");
subscription.cancel();
disposeTimeoutSubscription();

updateOrPublishBatch(null, true);
downstream.onComplete();
disposable.dispose();
}

@Override
Expand Down Expand Up @@ -219,6 +225,7 @@ public void onError(Throwable t) {
return;
}

disposeTimeoutSubscription();
updateOrPublishBatch(null, true);
downstream.onError(t);
}
Expand All @@ -229,6 +236,7 @@ public void onError(Throwable t) {
@Override
public void onComplete() {
if (isCompleted.compareAndSet(false, true)) {
disposeTimeoutSubscription();
updateOrPublishBatch(null, true);
downstream.onComplete();
}
Expand Down Expand Up @@ -279,6 +287,11 @@ private void publishDownstream(boolean isFlush) {
synchronized (lock) {
previous = this.currentBatch;
if (previous == null) {
if (isCompleted.get()) {
logger.verbose("Aggregator is completed. Skipping batch recreation.");
return;
}

logger.warning("Batch should not be null, setting a new batch.");
this.currentBatch = batchSupplier.get();
if (isFlush) {
Expand Down Expand Up @@ -343,5 +356,9 @@ private void publishDownstream(boolean isFlush) {
private static boolean isFlushSignal(EventData eventData) {
return eventData instanceof FlushSignal;
}

private void disposeTimeoutSubscription() {
disposable.dispose();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public final class EventDataBatch {
private final String partitionKey;
private final ErrorContextProvider contextProvider;
private final List<EventData> events;
private final List<Message> amqpMessages;
private final String partitionId;
private int sizeInBytes;
private final EventHubsTracer tracer;
Expand All @@ -53,6 +54,7 @@ public final class EventDataBatch {
this.partitionId = partitionId;
this.contextProvider = contextProvider;
this.events = new LinkedList<>();
this.amqpMessages = new LinkedList<>();
this.sizeInBytes = (maxMessageSize / 65536) * 1024; // reserve 1KB for every 64KB
this.tracer = instrumentation.getTracer();
}
Expand Down Expand Up @@ -106,7 +108,8 @@ public boolean tryAdd(final EventData eventData) {

tracer.reportMessageSpan(eventData, eventData.getContext());

final int size = getSize(eventData, events.isEmpty());
final Message amqpMessage = createAmqpMessage(eventData, partitionKey);
final int size = getSize(eventData, amqpMessage, events.isEmpty());
if (this.sizeInBytes + size > this.maxMessageSize) {
return false;
}
Expand All @@ -119,13 +122,18 @@ public boolean tryAdd(final EventData eventData) {
}

this.events.add(eventData);
this.amqpMessages.add(amqpMessage);
return true;
}

List<EventData> getEvents() {
return events;
}

List<Message> getMessages() {
return amqpMessages;
}

String getPartitionKey() {
return partitionKey;
}
Expand All @@ -134,20 +142,20 @@ String getPartitionId() {
return partitionId;
}

private int getSize(final EventData eventData, final boolean isFirst) {
private int getSize(final EventData eventData, final Message amqpMessage, final boolean isFirst) {
Objects.requireNonNull(eventData, "'eventData' cannot be null.");

final Message amqpMessage = createAmqpMessage(eventData, partitionKey);
int eventSize = encodedSize(amqpMessage); // actual encoded bytes size
eventSize += 16; // data section overhead

if (isFirst) {
amqpMessage.setBody(null);
amqpMessage.setApplicationProperties(null);
amqpMessage.setProperties(null);
amqpMessage.setDeliveryAnnotations(null);
final Message batchEnvelopeMessage = createAmqpMessage(eventData, partitionKey);
batchEnvelopeMessage.setBody(null);
batchEnvelopeMessage.setApplicationProperties(null);
batchEnvelopeMessage.setProperties(null);
batchEnvelopeMessage.setDeliveryAnnotations(null);

eventSize += encodedSize(amqpMessage);
eventSize += encodedSize(batchEnvelopeMessage);
}

return eventSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

import java.io.Closeable;
import java.time.Duration;
Expand Down Expand Up @@ -79,7 +78,7 @@ class EventHubBufferedPartitionProducer implements Closeable {
this.publishResultSubscriber = new PublishResultSubscriber(partitionId, this.eventSink,
options.getSendSucceededContext(), options.getSendFailedContext(), retryOptions.getTryTimeout(), logger);

this.publishSubscription = publishEvents(eventDataAggregator).publishOn(Schedulers.boundedElastic(), 1)
this.publishSubscription = publishEvents(eventDataAggregator).publishOn(client.getScheduler(), 1)
.subscribeWith(publishResultSubscriber);

this.tracer = new EventHubsTracer(tracer, client.getFullyQualifiedNamespace(), client.getEventHubName(), null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Collectors;

import static com.azure.core.amqp.implementation.RetryUtil.withRetry;
import static com.azure.core.util.FluxUtil.monoError;
Expand Down Expand Up @@ -580,8 +579,7 @@ public Mono<Void> send(EventDataBatch batch) {
.log("Sending batch to be distributed round-robin in service.");
}

final List<Message> messages
= batch.getEvents().stream().map(messageSerializer::serialize).collect(Collectors.toList());
final List<Message> messages = batch.getMessages();

final Mono<Void> sendMessage = getSendLink(batch.getPartitionId())
.flatMap(link -> messages.size() == 1 ? link.send(messages.get(0)) : link.send(messages));
Expand All @@ -593,6 +591,10 @@ public Mono<Void> send(EventDataBatch batch) {
return instrumentation.sendBatch(send, batch);
}

Scheduler getScheduler() {
return scheduler;
}

private Mono<Void> sendInternal(Flux<EventData> events, SendOptions options) {
final String partitionKey = options.getPartitionKey();
final String partitionId = options.getPartitionId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Locale;
Expand Down Expand Up @@ -106,37 +107,21 @@ public static Message toProtonJMessage(AmqpAnnotatedMessage message) {
}

if (properties.getTo() != null) {
if (protonJMessage.getProperties() == null) {
protonJMessage.setProperties(new Properties());
}

protonJMessage.getProperties().setTo(properties.getTo().toString());
getOrCreateProperties(protonJMessage).setTo(properties.getTo().toString());
}

// The default is byte[0] when getting a user id that has not been set.
if (properties.getUserId() != null && properties.getUserId().length > 0) {
if (protonJMessage.getProperties() == null) {
protonJMessage.setProperties(new Properties());
}

protonJMessage.getProperties().setUserId(new Binary(properties.getUserId()));
getOrCreateProperties(protonJMessage).setUserId(new Binary(properties.getUserId()));
}

if (properties.getAbsoluteExpiryTime() != null) {
if (protonJMessage.getProperties() == null) {
protonJMessage.setProperties(new Properties());
}

protonJMessage.getProperties()
getOrCreateProperties(protonJMessage)
.setAbsoluteExpiryTime(Date.from(properties.getAbsoluteExpiryTime().toInstant()));
}

if (properties.getCreationTime() != null) {
if (protonJMessage.getProperties() == null) {
protonJMessage.setProperties(new Properties());
}

protonJMessage.getProperties().setCreationTime(Date.from(properties.getCreationTime().toInstant()));
getOrCreateProperties(protonJMessage).setCreationTime(Date.from(properties.getCreationTime().toInstant()));
}

// Set header
Expand Down Expand Up @@ -338,16 +323,21 @@ public static AmqpAnnotatedMessage toAmqpAnnotatedMessage(Message message) {
public static Map<Symbol, Object> convert(Map<String, Object> sourceMap) {
if (sourceMap == null) {
return null;
} else if (sourceMap.isEmpty()) {
return Collections.emptyMap();
}

return sourceMap.entrySet().stream().collect(HashMap::new, (existing, entry) -> {
final Map<Symbol, Object> converted = new HashMap<>(calculateInitialCapacity(sourceMap.size()));
for (Map.Entry<String, Object> entry : sourceMap.entrySet()) {
if (entry.getValue() instanceof Instant) {
final long epochMilli = ((Instant) entry.getValue()).toEpochMilli();
existing.put(Symbol.valueOf(entry.getKey()), new Date(epochMilli));
converted.put(Symbol.valueOf(entry.getKey()), new Date(epochMilli));
} else {
existing.put(Symbol.valueOf(entry.getKey()), entry.getValue());
converted.put(Symbol.valueOf(entry.getKey()), entry.getValue());
}
}, (HashMap::putAll));
}

return converted;
}

/**
Expand Down Expand Up @@ -381,6 +371,18 @@ private static void setValues(Map<Symbol, Object> sourceMap, Map<String, Object>
}
}

private static int calculateInitialCapacity(int expectedSize) {
return Math.max(16, (int) (expectedSize / 0.75f) + 1);
}

private static Properties getOrCreateProperties(Message protonJMessage) {
if (protonJMessage.getProperties() == null) {
protonJMessage.setProperties(new Properties());
}

return protonJMessage.getProperties();
}

/**
* Private constructor.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,24 @@

import java.util.Collections;
import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;

import static com.azure.messaging.eventhubs.implementation.ClientConstants.SUBSCRIBER_ID_KEY;

/**
* Subscriber that takes {@link SynchronousReceiveWork} and publishes events to them in the order received.
*/
public class SynchronousEventSubscriber extends BaseSubscriber<PartitionEvent> {
private final Timer timer = new Timer();
private static final ScheduledExecutorService TIMEOUT_SCHEDULER = createTimeoutScheduler();

private final ClientLogger logger;
private final SynchronousReceiveWork work;
private volatile Subscription subscription;
private volatile ScheduledFuture<?> timeoutTask;
private final Context context;
private final String subscriberId;

Expand Down Expand Up @@ -55,7 +60,8 @@ protected void hookOnSubscribe(Subscription subscription) {
logger.atInfo().addKeyValue("pendingEvents", work.getNumberOfEvents()).log("Scheduling receive timeout task.");
subscription.request(work.getNumberOfEvents());

timer.schedule(new ReceiveTimeoutTask(this::dispose, this.logger), work.getTimeout().toMillis());
timeoutTask = TIMEOUT_SCHEDULER.schedule(new ReceiveTimeoutTask(this::dispose, this.logger),
work.getTimeout().toMillis(), TimeUnit.MILLISECONDS);
}

/**
Expand Down Expand Up @@ -96,12 +102,29 @@ protected void hookOnError(Throwable throwable) {
@Override
public void dispose() {
work.complete();
subscription.cancel();
timer.cancel();
if (subscription != null) {
subscription.cancel();
}

if (timeoutTask != null) {
timeoutTask.cancel(false);
}

super.dispose();
}

private static class ReceiveTimeoutTask extends TimerTask {
static ScheduledExecutorService getTimeoutScheduler() {
return TIMEOUT_SCHEDULER;
}

private static ScheduledExecutorService createTimeoutScheduler() {
final ScheduledThreadPoolExecutor scheduler
= new ScheduledThreadPoolExecutor(1, new ReceiveTimeoutThreadFactory());
scheduler.setRemoveOnCancelPolicy(true);
return scheduler;
}

private static class ReceiveTimeoutTask implements Runnable {
private final ClientLogger logger;
private final Runnable onDispose;

Expand All @@ -110,10 +133,18 @@ private static class ReceiveTimeoutTask extends TimerTask {
this.logger = logger;
}

@Override
public void run() {
logger.info("Timeout encountered, disposing of subscriber.");
onDispose.run();
}
}

private static class ReceiveTimeoutThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable runnable) {
final Thread thread = new Thread(runnable, "eventhubs-sync-receive-timeout");
thread.setDaemon(true);
return thread;
}
}
}
Loading
Loading