Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,10 @@ public SubscriptionEvent poll(final String consumerId) {
private SubscriptionEvent pollInternal(final String consumerId) {
states.markPollRequest();

if (shouldThrottlePollByInFlightEvents()) {
return null;
}

if (prefetchingQueue.isEmpty()) {
states.markMissingPrefetch();
try {
Expand Down Expand Up @@ -254,6 +258,22 @@ private SubscriptionEvent pollInternal(final String consumerId) {
return null;
}

private boolean shouldThrottlePollByInFlightEvents() {
if (!states.shouldThrottlePoll()) {
return false;
}

remapInFlightEventsSnapshot(committedCleaner, pollableNacker);
if (!states.shouldThrottlePoll()) {
return false;
}

LOGGER.debug(
"Subscription: SubscriptionPrefetchingQueue {} throttles poll because too many events are in flight.",
this);
return true;
}

public SubscriptionEvent pollV2(final String consumerId, final PollTimer timer) {
acquireReadLock();
try {
Expand All @@ -268,6 +288,10 @@ private SubscriptionEvent pollInternalV2(final String consumerId, final PollTime

// do-while ensures at least one poll
do {
if (shouldThrottlePollByInFlightEvents()) {
return null;
}

SubscriptionEvent event;
try {
if (prefetchingQueue.isEmpty()) {
Expand Down Expand Up @@ -976,6 +1000,10 @@ public long getSubscriptionUncommittedEventCount() {
return inFlightEvents.size();
}

public long getSubscriptionRetainedEventCount() {
return prefetchingQueue.size() + inFlightEvents.size();
}

public long getCurrentCommitId() {
return commitIdGenerator.get();
}
Expand Down Expand Up @@ -1023,6 +1051,7 @@ public Map<String, String> coreReportMessage() {
result.put("size of inputPendingQueue", String.valueOf(inputPendingQueue.size()));
result.put("size of prefetchingQueue", String.valueOf(prefetchingQueue.size()));
result.put("size of inFlightEvents", String.valueOf(inFlightEvents.size()));
result.put("size of retainedEvents", String.valueOf(getSubscriptionRetainedEventCount()));
result.put("commitIdGenerator", commitIdGenerator.toString());
result.put("states", states.toString());
result.put("isCompleted", String.valueOf(isCompleted));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.function.IntSupplier;

import static com.google.common.base.MoreObjects.toStringHelper;

/**
Expand All @@ -54,14 +56,22 @@ public class SubscriptionPrefetchingQueueStates {
SubscriptionConfig.getInstance().getSubscriptionPrefetchEventGlobalCountThreshold();

private final SubscriptionPrefetchingQueue prefetchingQueue;
private final IntSupplier prefetchingQueueCountSupplier;

private volatile long lastPollRequestTimestamp;
private final Meter pollRequestMeter;
private final Meter missingPrefechMeter;
private final Counter disorderCauseCounter; // TODO: use meter

public SubscriptionPrefetchingQueueStates(final SubscriptionPrefetchingQueue prefetchingQueue) {
this(prefetchingQueue, () -> SubscriptionAgent.broker().getPrefetchingQueueCount());
}

SubscriptionPrefetchingQueueStates(
final SubscriptionPrefetchingQueue prefetchingQueue,
final IntSupplier prefetchingQueueCountSupplier) {
this.prefetchingQueue = prefetchingQueue;
this.prefetchingQueueCountSupplier = prefetchingQueueCountSupplier;

this.lastPollRequestTimestamp = -1;
this.pollRequestMeter = new Meter(new IoTDBMovingAverage(), Clock.defaultClock());
Expand Down Expand Up @@ -106,12 +116,12 @@ public boolean shouldPrefetch() {
}

// 1.3. local event count
if (hasTooManyPrefetchedLocalEvent()) {
if (hasTooManyRetainedLocalEvent()) {
return false;
}

// 1.4. global event count
if (hasTooManyPrefetchedGlobalEvent()) {
if (hasTooManyRetainedGlobalEvent()) {
return false;
}

Expand All @@ -132,24 +142,40 @@ public boolean shouldPrefetch() {
return (System.currentTimeMillis() - lastPollRequestTimestamp) * pollRate() > 1000;
}

public boolean shouldThrottlePoll() {
return hasTooManyInFlightLocalEvent() || hasTooManyInFlightGlobalEvent();
}

private boolean isMemoryEnough() {
return PipeDataNodeResourceManager.memory().getTotalNonFloatingMemorySizeInBytes()
* PREFETCH_MEMORY_THRESHOLD
> PipeDataNodeResourceManager.memory().getUsedMemorySizeInBytes();
}

private boolean hasTooManyPrefetchedLocalEvent() {
return prefetchingQueue.getPrefetchedEventCount() > PREFETCH_EVENT_LOCAL_COUNT_THRESHOLD;
private boolean hasTooManyRetainedLocalEvent() {
return prefetchingQueue.getSubscriptionRetainedEventCount()
> PREFETCH_EVENT_LOCAL_COUNT_THRESHOLD;
}

private boolean hasTooManyPrefetchedGlobalEvent() {
// The number of prefetched events in the current prefetching queue > floor(t / number of
private boolean hasTooManyRetainedGlobalEvent() {
// The number of retained events in the current prefetching queue > floor(t / number of
// prefetching queues), where t is an adjustable parameter.
return prefetchingQueue.getPrefetchedEventCount()
* SubscriptionAgent.broker().getPrefetchingQueueCount()
return prefetchingQueue.getSubscriptionRetainedEventCount()
* prefetchingQueueCountSupplier.getAsInt()
> PREFETCH_EVENT_GLOBAL_COUNT_THRESHOLD;
}

private boolean hasTooManyInFlightLocalEvent() {
return prefetchingQueue.getSubscriptionUncommittedEventCount()
>= PREFETCH_EVENT_LOCAL_COUNT_THRESHOLD;
}

private boolean hasTooManyInFlightGlobalEvent() {
return prefetchingQueue.getSubscriptionUncommittedEventCount()
* prefetchingQueueCountSupplier.getAsInt()
>= PREFETCH_EVENT_GLOBAL_COUNT_THRESHOLD;
}

private boolean isMissingRateTooHigh() {
return missingRate() > PREFETCH_MISSING_RATE_THRESHOLD;
}
Expand All @@ -169,6 +195,8 @@ public String toString() {
.add("pollRate", pollRate())
.add("missingRate", missingRate())
.add("disorderCause", disorderCauseCounter.getCount())
.add("retainedEventCount", prefetchingQueue.getSubscriptionRetainedEventCount())
.add("inFlightEventCount", prefetchingQueue.getSubscriptionUncommittedEventCount())
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -586,8 +586,11 @@ public SubscriptionEvent poll(final String consumerId, final RegionProgress regi
if (pendingSeekRequest != null) {
return null;
}
if (shouldThrottlePollByInFlightEvents()) {
return null;
}
final SubscriptionEvent event = pollInternal(consumerId);
if (Objects.nonNull(event) && prefetchingQueue.size() < MAX_PREFETCHING_QUEUE_SIZE) {
if (Objects.nonNull(event) && hasAvailableEventSlot()) {
requestPrefetch();
} else if (Objects.isNull(event) && shouldRecoverPrefetchBindingAfterEmptyPoll()) {
requestPrefetch();
Expand All @@ -598,6 +601,25 @@ public SubscriptionEvent poll(final String consumerId, final RegionProgress regi
}
}

private boolean shouldThrottlePollByInFlightEvents() {
if (inFlightEvents.size() < MAX_PREFETCHING_QUEUE_SIZE) {
return false;
}

recycleInFlightEvents();
if (inFlightEvents.size() < MAX_PREFETCHING_QUEUE_SIZE) {
return false;
}

LOGGER.debug(
"ConsensusPrefetchingQueue {}: throttles poll because too many events are in flight", this);
return true;
}

private boolean hasAvailableEventSlot() {
return getSubscriptionRetainedEventCount() < MAX_PREFETCHING_QUEUE_SIZE;
}

private synchronized void initPrefetch(final RegionProgress regionProgress) {
if (prefetchInitialized) {
return; // double-check under synchronization
Expand Down Expand Up @@ -1197,7 +1219,7 @@ public PrefetchRoundResult drivePrefetchOnce() {
applyPendingSubscriptionWalReset(observedSeekGeneration);
recycleInFlightEvents();

if (!isActive || prefetchingQueue.size() >= MAX_PREFETCHING_QUEUE_SIZE) {
if (!isActive || !hasAvailableEventSlot()) {
return computeIdleRoundResult();
}

Expand Down Expand Up @@ -1244,7 +1266,7 @@ public PrefetchRoundResult drivePrefetchOnce() {
return PrefetchRoundResult.rescheduleNow();
}

if (!lingerBatch.isEmpty() && lingerBatch.firstTabletTimeMs > 0L) {
if (!lingerBatch.isEmpty() && lingerBatch.firstTabletTimeMs > 0L && hasAvailableEventSlot()) {
final long lingerElapsedMs = System.currentTimeMillis() - lingerBatch.firstTabletTimeMs;
if (lingerElapsedMs >= batchMaxDelayMs) {
if (seekGeneration.get() != observedSeekGeneration) {
Expand Down Expand Up @@ -1331,8 +1353,11 @@ private PrefetchRoundResult computeIdleRoundResult() {
if (isClosed || !prefetchInitialized || !isActive) {
return PrefetchRoundResult.dormant();
}
if (prefetchingQueue.size() >= MAX_PREFETCHING_QUEUE_SIZE) {
return PrefetchRoundResult.dormant();
if (!hasAvailableEventSlot()) {
return inFlightEvents.isEmpty()
? PrefetchRoundResult.dormant()
: PrefetchRoundResult.rescheduleAfter(
SubscriptionConfig.getInstance().getSubscriptionRecycleUncommittedEventIntervalMs());
}
if (hasImmediatePrefetchableWork()) {
return PrefetchRoundResult.rescheduleNow();
Expand Down Expand Up @@ -1598,7 +1623,7 @@ private boolean pumpFromSubscriptionWAL(
int entriesRead = 0;
while (entriesRead < maxWalEntries
&& subscriptionWALIterator.hasNext()
&& prefetchingQueue.size() < MAX_PREFETCHING_QUEUE_SIZE) {
&& hasAvailableEventSlot()) {
try {
final IndexedConsensusRequest walEntry = subscriptionWALIterator.next();
entriesRead++;
Expand Down Expand Up @@ -1879,7 +1904,7 @@ private boolean drainBufferedRealtimeWriters(
drainRealtimeWriters(batchState, maxTablets, maxBatchBytes);

final int bufferedAfter = getRealtimeBufferedEntryCount();
if (bufferedAfter == 0 || prefetchingQueue.size() >= MAX_PREFETCHING_QUEUE_SIZE) {
if (bufferedAfter == 0 || !hasAvailableEventSlot()) {
return true;
}

Expand Down Expand Up @@ -2066,13 +2091,19 @@ private boolean canAcceptCommitContext(
}

public boolean ack(final String consumerId, final SubscriptionCommitContext commitContext) {
final boolean acked;
acquireReadLock();
try {
return canAcceptCommitContext(commitContext, "ack", false)
&& ackInternal(consumerId, commitContext);
acked =
canAcceptCommitContext(commitContext, "ack", false)
&& ackInternal(consumerId, commitContext);
} finally {
releaseReadLock();
}
if (acked && hasAvailableEventSlot()) {
requestPrefetch();
}
return acked;
}

private boolean ackInternal(
Expand Down Expand Up @@ -2148,6 +2179,7 @@ public boolean nack(final String consumerId, final SubscriptionCommitContext com
* in multi-region iteration where only one queue owns the event.
*/
public boolean ackSilent(final String consumerId, final SubscriptionCommitContext commitContext) {
final boolean ackedResult;
acquireReadLock();
try {
if (!canAcceptCommitContext(commitContext, "ack", true)) {
Expand Down Expand Up @@ -2190,10 +2222,14 @@ public boolean ackSilent(final String consumerId, final SubscriptionCommitContex
ev.cleanUp(false);
return null;
});
return acked.get();
ackedResult = acked.get();
} finally {
releaseReadLock();
}
if (ackedResult && hasAvailableEventSlot()) {
requestPrefetch();
}
return ackedResult;
}

private WriterId extractCommitWriterId(final SubscriptionCommitContext commitContext) {
Expand Down Expand Up @@ -2754,8 +2790,8 @@ private void maybeInjectWatermark() {
}
final long intervalMs =
SubscriptionConfig.getInstance().getSubscriptionConsensusWatermarkIntervalMs();
if (intervalMs <= 0) {
return; // Watermark disabled
if (intervalMs <= 0 || !hasAvailableEventSlot()) {
return; // Watermark disabled or the queue is full
}
final long now = System.currentTimeMillis();
if (now - lastWatermarkEmitTimeMs >= intervalMs) {
Expand Down Expand Up @@ -3111,6 +3147,10 @@ public long getSubscriptionUncommittedEventCount() {
return inFlightEvents.size();
}

public long getSubscriptionRetainedEventCount() {
return prefetchingQueue.size() + inFlightEvents.size();
}

/** Exposes the current seek generation for runtime tests and metrics. */
public long getCurrentSeekGeneration() {
return seekGeneration.get();
Expand Down
Loading
Loading