Skip to content

Commit 7f4ecaf

Browse files
committed
perf(eventhubs): reset buffered timeout with a single delay stream
1 parent 7fc5ddd commit 7f4ecaf

1 file changed

Lines changed: 3 additions & 4 deletions

File tree

sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventDataAggregator.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import reactor.core.Disposable;
1616
import reactor.core.publisher.Flux;
1717
import reactor.core.publisher.FluxOperator;
18+
import reactor.core.publisher.Mono;
1819
import reactor.core.publisher.Operators;
1920
import reactor.core.publisher.Sinks;
2021

@@ -127,10 +128,8 @@ static class EventDataAggregatorMain implements Subscription, CoreSubscriber<Eve
127128
this.currentBatch = batchSupplier.get();
128129

129130
this.eventSink = Sinks.many().unicast().onBackpressureError();
130-
this.disposable = Flux
131-
.switchOnNext(eventSink.asFlux()
132-
.map(e -> Flux.interval(options.getMaxWaitTime()).takeUntil(index -> isCompleted.get())))
133-
.subscribe(index -> {
131+
this.disposable
132+
= eventSink.asFlux().switchMap(ignored -> Mono.delay(options.getMaxWaitTime())).subscribe(index -> {
134133
logger.atVerbose()
135134
.addKeyValue(PARTITION_ID_KEY, partitionId)
136135
.log("Time elapsed. Attempt to publish downstream.");

0 commit comments

Comments
 (0)