diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java index 8fc7227fd15c4..047532259f71b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueue.java @@ -223,6 +223,10 @@ public SubscriptionEvent poll(final String consumerId) { private SubscriptionEvent pollInternal(final String consumerId) { states.markPollRequest(); + if (shouldThrottlePollByInFlightEvents()) { + return null; + } + if (prefetchingQueue.isEmpty()) { states.markMissingPrefetch(); try { @@ -254,6 +258,22 @@ private SubscriptionEvent pollInternal(final String consumerId) { return null; } + private boolean shouldThrottlePollByInFlightEvents() { + if (!states.shouldThrottlePoll()) { + return false; + } + + remapInFlightEventsSnapshot(committedCleaner, pollableNacker); + if (!states.shouldThrottlePoll()) { + return false; + } + + LOGGER.debug( + "Subscription: SubscriptionPrefetchingQueue {} throttles poll because too many events are in flight.", + this); + return true; + } + public SubscriptionEvent pollV2(final String consumerId, final PollTimer timer) { acquireReadLock(); try { @@ -268,6 +288,10 @@ private SubscriptionEvent pollInternalV2(final String consumerId, final PollTime // do-while ensures at least one poll do { + if (shouldThrottlePollByInFlightEvents()) { + return null; + } + SubscriptionEvent event; try { if (prefetchingQueue.isEmpty()) { @@ -976,6 +1000,10 @@ public long getSubscriptionUncommittedEventCount() { return inFlightEvents.size(); } + public long getSubscriptionRetainedEventCount() { + return prefetchingQueue.size() + inFlightEvents.size(); + } + public long getCurrentCommitId() { return commitIdGenerator.get(); } @@ -1023,6 +1051,7 @@ public Map coreReportMessage() { result.put("size of inputPendingQueue", String.valueOf(inputPendingQueue.size())); result.put("size of prefetchingQueue", String.valueOf(prefetchingQueue.size())); result.put("size of inFlightEvents", String.valueOf(inFlightEvents.size())); + result.put("size of retainedEvents", String.valueOf(getSubscriptionRetainedEventCount())); result.put("commitIdGenerator", commitIdGenerator.toString()); result.put("states", states.toString()); result.put("isCompleted", String.valueOf(isCompleted)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueueStates.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueueStates.java index 70eacd2eaaae9..29beedbae7228 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueueStates.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueueStates.java @@ -30,6 +30,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.function.IntSupplier; + import static com.google.common.base.MoreObjects.toStringHelper; /** @@ -54,6 +56,7 @@ public class SubscriptionPrefetchingQueueStates { SubscriptionConfig.getInstance().getSubscriptionPrefetchEventGlobalCountThreshold(); private final SubscriptionPrefetchingQueue prefetchingQueue; + private final IntSupplier prefetchingQueueCountSupplier; private volatile long lastPollRequestTimestamp; private final Meter pollRequestMeter; @@ -61,7 +64,14 @@ public class SubscriptionPrefetchingQueueStates { private final Counter disorderCauseCounter; // TODO: use meter public SubscriptionPrefetchingQueueStates(final SubscriptionPrefetchingQueue prefetchingQueue) { + this(prefetchingQueue, () -> SubscriptionAgent.broker().getPrefetchingQueueCount()); + } + + SubscriptionPrefetchingQueueStates( + final SubscriptionPrefetchingQueue prefetchingQueue, + final IntSupplier prefetchingQueueCountSupplier) { this.prefetchingQueue = prefetchingQueue; + this.prefetchingQueueCountSupplier = prefetchingQueueCountSupplier; this.lastPollRequestTimestamp = -1; this.pollRequestMeter = new Meter(new IoTDBMovingAverage(), Clock.defaultClock()); @@ -106,12 +116,12 @@ public boolean shouldPrefetch() { } // 1.3. local event count - if (hasTooManyPrefetchedLocalEvent()) { + if (hasTooManyRetainedLocalEvent()) { return false; } // 1.4. global event count - if (hasTooManyPrefetchedGlobalEvent()) { + if (hasTooManyRetainedGlobalEvent()) { return false; } @@ -132,24 +142,40 @@ public boolean shouldPrefetch() { return (System.currentTimeMillis() - lastPollRequestTimestamp) * pollRate() > 1000; } + public boolean shouldThrottlePoll() { + return hasTooManyInFlightLocalEvent() || hasTooManyInFlightGlobalEvent(); + } + private boolean isMemoryEnough() { return PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes() * PREFETCH_MEMORY_THRESHOLD > PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes(); } - private boolean hasTooManyPrefetchedLocalEvent() { - return prefetchingQueue.getPrefetchedEventCount() > PREFETCH_EVENT_LOCAL_COUNT_THRESHOLD; + private boolean hasTooManyRetainedLocalEvent() { + return prefetchingQueue.getSubscriptionRetainedEventCount() + > PREFETCH_EVENT_LOCAL_COUNT_THRESHOLD; } - private boolean hasTooManyPrefetchedGlobalEvent() { - // The number of prefetched events in the current prefetching queue > floor(t / number of + private boolean hasTooManyRetainedGlobalEvent() { + // The number of retained events in the current prefetching queue > floor(t / number of // prefetching queues), where t is an adjustable parameter. - return prefetchingQueue.getPrefetchedEventCount() - * SubscriptionAgent.broker().getPrefetchingQueueCount() + return prefetchingQueue.getSubscriptionRetainedEventCount() + * prefetchingQueueCountSupplier.getAsInt() > PREFETCH_EVENT_GLOBAL_COUNT_THRESHOLD; } + private boolean hasTooManyInFlightLocalEvent() { + return prefetchingQueue.getSubscriptionUncommittedEventCount() + >= PREFETCH_EVENT_LOCAL_COUNT_THRESHOLD; + } + + private boolean hasTooManyInFlightGlobalEvent() { + return prefetchingQueue.getSubscriptionUncommittedEventCount() + * prefetchingQueueCountSupplier.getAsInt() + >= PREFETCH_EVENT_GLOBAL_COUNT_THRESHOLD; + } + private boolean isMissingRateTooHigh() { return missingRate() > PREFETCH_MISSING_RATE_THRESHOLD; } @@ -169,6 +195,8 @@ public String toString() { .add("pollRate", pollRate()) .add("missingRate", missingRate()) .add("disorderCause", disorderCauseCounter.getCount()) + .add("retainedEventCount", prefetchingQueue.getSubscriptionRetainedEventCount()) + .add("inFlightEventCount", prefetchingQueue.getSubscriptionUncommittedEventCount()) .toString(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusPrefetchingQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusPrefetchingQueue.java index fcd5cc4c3b654..eca5b4f282126 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusPrefetchingQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/broker/consensus/ConsensusPrefetchingQueue.java @@ -586,8 +586,11 @@ public SubscriptionEvent poll(final String consumerId, final RegionProgress regi if (pendingSeekRequest != null) { return null; } + if (shouldThrottlePollByInFlightEvents()) { + return null; + } final SubscriptionEvent event = pollInternal(consumerId); - if (Objects.nonNull(event) && prefetchingQueue.size() < MAX_PREFETCHING_QUEUE_SIZE) { + if (Objects.nonNull(event) && hasAvailableEventSlot()) { requestPrefetch(); } else if (Objects.isNull(event) && shouldRecoverPrefetchBindingAfterEmptyPoll()) { requestPrefetch(); @@ -598,6 +601,25 @@ public SubscriptionEvent poll(final String consumerId, final RegionProgress regi } } + private boolean shouldThrottlePollByInFlightEvents() { + if (inFlightEvents.size() < MAX_PREFETCHING_QUEUE_SIZE) { + return false; + } + + recycleInFlightEvents(); + if (inFlightEvents.size() < MAX_PREFETCHING_QUEUE_SIZE) { + return false; + } + + LOGGER.debug( + "ConsensusPrefetchingQueue {}: throttles poll because too many events are in flight", this); + return true; + } + + private boolean hasAvailableEventSlot() { + return getSubscriptionRetainedEventCount() < MAX_PREFETCHING_QUEUE_SIZE; + } + private synchronized void initPrefetch(final RegionProgress regionProgress) { if (prefetchInitialized) { return; // double-check under synchronization @@ -1197,7 +1219,7 @@ public PrefetchRoundResult drivePrefetchOnce() { applyPendingSubscriptionWalReset(observedSeekGeneration); recycleInFlightEvents(); - if (!isActive || prefetchingQueue.size() >= MAX_PREFETCHING_QUEUE_SIZE) { + if (!isActive || !hasAvailableEventSlot()) { return computeIdleRoundResult(); } @@ -1244,7 +1266,7 @@ public PrefetchRoundResult drivePrefetchOnce() { return PrefetchRoundResult.rescheduleNow(); } - if (!lingerBatch.isEmpty() && lingerBatch.firstTabletTimeMs > 0L) { + if (!lingerBatch.isEmpty() && lingerBatch.firstTabletTimeMs > 0L && hasAvailableEventSlot()) { final long lingerElapsedMs = System.currentTimeMillis() - lingerBatch.firstTabletTimeMs; if (lingerElapsedMs >= batchMaxDelayMs) { if (seekGeneration.get() != observedSeekGeneration) { @@ -1331,8 +1353,11 @@ private PrefetchRoundResult computeIdleRoundResult() { if (isClosed || !prefetchInitialized || !isActive) { return PrefetchRoundResult.dormant(); } - if (prefetchingQueue.size() >= MAX_PREFETCHING_QUEUE_SIZE) { - return PrefetchRoundResult.dormant(); + if (!hasAvailableEventSlot()) { + return inFlightEvents.isEmpty() + ? PrefetchRoundResult.dormant() + : PrefetchRoundResult.rescheduleAfter( + SubscriptionConfig.getInstance().getSubscriptionRecycleUncommittedEventIntervalMs()); } if (hasImmediatePrefetchableWork()) { return PrefetchRoundResult.rescheduleNow(); @@ -1598,7 +1623,7 @@ private boolean pumpFromSubscriptionWAL( int entriesRead = 0; while (entriesRead < maxWalEntries && subscriptionWALIterator.hasNext() - && prefetchingQueue.size() < MAX_PREFETCHING_QUEUE_SIZE) { + && hasAvailableEventSlot()) { try { final IndexedConsensusRequest walEntry = subscriptionWALIterator.next(); entriesRead++; @@ -1879,7 +1904,7 @@ private boolean drainBufferedRealtimeWriters( drainRealtimeWriters(batchState, maxTablets, maxBatchBytes); final int bufferedAfter = getRealtimeBufferedEntryCount(); - if (bufferedAfter == 0 || prefetchingQueue.size() >= MAX_PREFETCHING_QUEUE_SIZE) { + if (bufferedAfter == 0 || !hasAvailableEventSlot()) { return true; } @@ -2066,13 +2091,19 @@ private boolean canAcceptCommitContext( } public boolean ack(final String consumerId, final SubscriptionCommitContext commitContext) { + final boolean acked; acquireReadLock(); try { - return canAcceptCommitContext(commitContext, "ack", false) - && ackInternal(consumerId, commitContext); + acked = + canAcceptCommitContext(commitContext, "ack", false) + && ackInternal(consumerId, commitContext); } finally { releaseReadLock(); } + if (acked && hasAvailableEventSlot()) { + requestPrefetch(); + } + return acked; } private boolean ackInternal( @@ -2148,6 +2179,7 @@ public boolean nack(final String consumerId, final SubscriptionCommitContext com * in multi-region iteration where only one queue owns the event. */ public boolean ackSilent(final String consumerId, final SubscriptionCommitContext commitContext) { + final boolean ackedResult; acquireReadLock(); try { if (!canAcceptCommitContext(commitContext, "ack", true)) { @@ -2190,10 +2222,14 @@ public boolean ackSilent(final String consumerId, final SubscriptionCommitContex ev.cleanUp(false); return null; }); - return acked.get(); + ackedResult = acked.get(); } finally { releaseReadLock(); } + if (ackedResult && hasAvailableEventSlot()) { + requestPrefetch(); + } + return ackedResult; } private WriterId extractCommitWriterId(final SubscriptionCommitContext commitContext) { @@ -2754,8 +2790,8 @@ private void maybeInjectWatermark() { } final long intervalMs = SubscriptionConfig.getInstance().getSubscriptionConsensusWatermarkIntervalMs(); - if (intervalMs <= 0) { - return; // Watermark disabled + if (intervalMs <= 0 || !hasAvailableEventSlot()) { + return; // Watermark disabled or the queue is full } final long now = System.currentTimeMillis(); if (now - lastWatermarkEmitTimeMs >= intervalMs) { @@ -3111,6 +3147,10 @@ public long getSubscriptionUncommittedEventCount() { return inFlightEvents.size(); } + public long getSubscriptionRetainedEventCount() { + return prefetchingQueue.size() + inFlightEvents.size(); + } + /** Exposes the current seek generation for runtime tests and metrics. */ public long getCurrentSeekGeneration() { return seekGeneration.get(); diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueueStatesTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueueStatesTest.java new file mode 100644 index 0000000000000..d8878f17d9046 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/subscription/broker/SubscriptionPrefetchingQueueStatesTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.subscription.broker; + +import org.apache.iotdb.commons.subscription.config.SubscriptionConfig; + +import org.junit.Test; + +import java.lang.reflect.Method; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class SubscriptionPrefetchingQueueStatesTest { + + private static final int LOCAL_COUNT_THRESHOLD = + SubscriptionConfig.getInstance().getSubscriptionPrefetchEventLocalCountThreshold(); + private static final int GLOBAL_COUNT_THRESHOLD = + SubscriptionConfig.getInstance().getSubscriptionPrefetchEventGlobalCountThreshold(); + + @Test + public void testShouldThrottlePollByLocalInFlightEventCount() { + assertFalse(newStates(queueWithCounts(0, LOCAL_COUNT_THRESHOLD - 1L), 1).shouldThrottlePoll()); + assertTrue(newStates(queueWithCounts(0, LOCAL_COUNT_THRESHOLD), 1).shouldThrottlePoll()); + } + + @Test + public void testShouldThrottlePollByGlobalInFlightEventCount() { + final long inFlightEventCount = LOCAL_COUNT_THRESHOLD - 1L; + final int belowGlobalQueueCount = + Math.max(1, (int) ((GLOBAL_COUNT_THRESHOLD - 1L) / inFlightEventCount)); + final int reachingGlobalQueueCount = + (int) ((GLOBAL_COUNT_THRESHOLD + inFlightEventCount - 1L) / inFlightEventCount); + + assertFalse( + newStates(queueWithCounts(0, inFlightEventCount), belowGlobalQueueCount) + .shouldThrottlePoll()); + assertTrue( + newStates(queueWithCounts(0, inFlightEventCount), reachingGlobalQueueCount) + .shouldThrottlePoll()); + } + + @Test + public void testRetainedLocalEventLimitCountsAllRetainedEvents() throws Exception { + assertFalse( + invokeStatePredicate( + newStates(queueWithCounts(LOCAL_COUNT_THRESHOLD, 0), 1), + "hasTooManyRetainedLocalEvent")); + assertTrue( + invokeStatePredicate( + newStates(queueWithCounts(LOCAL_COUNT_THRESHOLD + 1L, 0), 1), + "hasTooManyRetainedLocalEvent")); + } + + @Test + public void testRetainedGlobalEventLimitUsesInjectedQueueCount() throws Exception { + final long retainedEventCount = Math.max(1L, GLOBAL_COUNT_THRESHOLD / 2L); + final int belowOrAtGlobalQueueCount = + Math.max(1, (int) (GLOBAL_COUNT_THRESHOLD / retainedEventCount)); + final int aboveGlobalQueueCount = belowOrAtGlobalQueueCount + 1; + + assertFalse( + invokeStatePredicate( + newStates(queueWithCounts(retainedEventCount, 0), belowOrAtGlobalQueueCount), + "hasTooManyRetainedGlobalEvent")); + assertTrue( + invokeStatePredicate( + newStates(queueWithCounts(retainedEventCount, 0), aboveGlobalQueueCount), + "hasTooManyRetainedGlobalEvent")); + } + + private static SubscriptionPrefetchingQueueStates newStates( + final SubscriptionPrefetchingQueue queue, final int prefetchingQueueCount) { + return new SubscriptionPrefetchingQueueStates(queue, () -> prefetchingQueueCount); + } + + private static SubscriptionPrefetchingQueue queueWithCounts( + final long retainedEventCount, final long inFlightEventCount) { + final SubscriptionPrefetchingQueue queue = mock(SubscriptionPrefetchingQueue.class); + when(queue.getSubscriptionRetainedEventCount()).thenReturn(retainedEventCount); + when(queue.getSubscriptionUncommittedEventCount()).thenReturn(inFlightEventCount); + return queue; + } + + private static boolean invokeStatePredicate( + final SubscriptionPrefetchingQueueStates states, final String methodName) throws Exception { + final Method method = SubscriptionPrefetchingQueueStates.class.getDeclaredMethod(methodName); + method.setAccessible(true); + return (boolean) method.invoke(states); + } +}