From fa3205516bcab377b127973e7ddd4824f897306c Mon Sep 17 00:00:00 2001
From: Oneby Wang <891734032@qq.com>
Date: Sun, 4 Jan 2026 14:35:16 +0800
Subject: [PATCH 01/15] [improve][cli] Add client side looping to
analyze-backlog in Topics
---
.../apache/pulsar/client/admin/Topics.java | 69 +++++++++++++++-
.../client/admin/internal/TopicsImpl.java | 81 +++++++++++++++++++
2 files changed, 149 insertions(+), 1 deletion(-)
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index fbcf0b4a07b1f..2ab685100ad5a 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -2220,21 +2220,88 @@ AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, String
* This is a potentially expensive operation, as it requires
* to read the messages from storage.
* This function takes into consideration batch messages
- * and also Subscription filters.
+ * and also Subscription filters.
+ * See also: {@link #analyzeSubscriptionBacklogAsync(String, String, Optional, long)}
* @param topic
* Topic name
* @param subscriptionName
* the subscription
* @param startPosition
* the position to start the scan from (empty means the last processed message)
+ * @param backlogScanMaxEntries
+ * the maximum number of backlog entries the client will scan before terminating its loop
* @return an accurate analysis of the backlog
* @throws PulsarAdminException
* Unexpected error
*/
+ AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, String subscriptionName,
+ Optional startPosition,
+ long backlogScanMaxEntries) throws PulsarAdminException;
+
+ /**
+ * Analyze subscription backlog.
+ * This is a potentially expensive operation, as it requires
+ * to read the messages from storage.
+ * This function takes into consideration batch messages
+ * and also Subscription filters.
+ * @param topic
+ * Topic name
+ * @param subscriptionName
+ * the subscription
+ * @param startPosition
+ * the position to start the scan from (empty means the last processed message)
+ * @return an accurate analysis of the backlog
+ */
CompletableFuture analyzeSubscriptionBacklogAsync(String topic,
String subscriptionName,
Optional startPosition);
+ /**
+ * Analyze subscription backlog.
+ * This is a potentially expensive operation, as it requires
+ * to read the messages from storage.
+ * This function takes into consideration batch messages
+ * and also Subscription filters.
+ *
+ *
+ * What's the purpose of this overloaded method?
+ * There are broker side configurable maximum limits how many entries will be read and how long the scanning can
+ * take. The subscriptionBacklogScanMaxTimeMs (default 2 minutes) and subscriptionBacklogScanMaxEntries
+ * (default 10000) control this behavior.
+ * Increasing these settings is possible. However, it's possible that the HTTP request times out (also idle timeout
+ * in NAT/firewall etc.) before the command completes so increasing the limits might not be useful beyond a few
+ * minutes.
+ *
+ *
+ *
+ * How does this method work?
+ * 1. Add a new parameter backlogScanMaxEntries in client side method to control the client-side loop termination
+ * condition.
+ * 2. If subscriptionBacklogScanMaxEntries(server side) >= backlogScanMaxEntries(client side), then
+ * backlogScanMaxEntries parameter will take no effect.
+ * 3. If subscriptionBacklogScanMaxEntries < backlogScanMaxEntries, the client will call analyze-backlog method in
+ * a loop until server return ScanOutcome.COMPLETED or the total entries exceeds backlogScanMaxEntries.
+ * 4. This means that backlogScanMaxEntries cannot be used to precisely control the number of entries scanned by
+ * the server, it only serves to determine when the loop should terminate.
+ * 5. With this method, the server can reduce the values of the two parameters subscriptionBacklogScanMaxTimeMs and
+ * subscriptionBacklogScanMaxEntries, and then retrieve the desired number of backlog entries through
+ * client-side looping.
+ *
+ * @param topic
+ * Topic name
+ * @param subscriptionName
+ * the subscription
+ * @param startPosition
+ * the position to start the scan from (empty means the last processed message)
+ * @param backlogScanMaxEntries
+ * the maximum number of backlog entries the client will scan before terminating its loop
+ * @return an accurate analysis of the backlog
+ */
+ CompletableFuture analyzeSubscriptionBacklogAsync(String topic,
+ String subscriptionName,
+ Optional startPosition,
+ long backlogScanMaxEntries);
+
/**
* Get backlog size by a message ID.
* @param topic
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 11dd69a23ce58..7b351396de60f 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -34,6 +34,9 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.ws.rs.client.Entity;
@@ -1560,6 +1563,15 @@ public AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic,
return sync(() -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, startPosition));
}
+ @Override
+ public AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, String subscriptionName,
+ Optional startPosition,
+ long backlogScanMaxEntries)
+ throws PulsarAdminException {
+ return sync(
+ () -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, startPosition, backlogScanMaxEntries));
+ }
+
@Override
public CompletableFuture analyzeSubscriptionBacklogAsync(String topic,
String subscriptionName,
@@ -1591,6 +1603,75 @@ public void failed(Throwable throwable) {
return future;
}
+ @Override
+ public CompletableFuture analyzeSubscriptionBacklogAsync(String topic,
+ String subscriptionName,
+ Optional startPosition,
+ long backlogScanMaxEntries) {
+ final CompletableFuture future = new CompletableFuture<>();
+ AtomicReference resultRef = new AtomicReference<>();
+ int partitionIndex = TopicName.get(topic).getPartitionIndex();
+ AtomicReference> startPositionRef = new AtomicReference<>(startPosition);
+
+ Supplier> resultSupplier =
+ () -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, startPositionRef.get());
+ BiConsumer completeAction = new BiConsumer<>() {
+ @Override
+ public void accept(AnalyzeSubscriptionBacklogResult currentResult, Throwable throwable) {
+ if (throwable != null) {
+ future.completeExceptionally(throwable);
+ return;
+ }
+
+ AnalyzeSubscriptionBacklogResult mergedResult = mergeBacklogResults(currentResult, resultRef.get());
+ resultRef.set(mergedResult);
+ if (mergedResult.isAborted() || mergedResult.getEntries() >= backlogScanMaxEntries) {
+ future.complete(mergedResult);
+ return;
+ }
+
+ String[] messageIdSplits = mergedResult.getLastMessageId().split(":");
+ MessageIdImpl nextScanMessageId =
+ new MessageIdImpl(Long.parseLong(messageIdSplits[0]), Long.parseLong(messageIdSplits[1]) + 1,
+ partitionIndex);
+ startPositionRef.set(Optional.of(nextScanMessageId));
+
+ resultSupplier.get().whenComplete(this);
+ }
+ };
+
+ resultSupplier.get().whenComplete(completeAction);
+ return future;
+ }
+
+ private AnalyzeSubscriptionBacklogResult mergeBacklogResults(AnalyzeSubscriptionBacklogResult current,
+ AnalyzeSubscriptionBacklogResult previous) {
+ if (previous == null) {
+ return current;
+ }
+
+ AnalyzeSubscriptionBacklogResult mergedRes = new AnalyzeSubscriptionBacklogResult();
+ mergedRes.setEntries(current.getEntries() + previous.getEntries());
+ mergedRes.setMessages(current.getMessages() + previous.getMessages());
+ mergedRes.setMarkerMessages(current.getMarkerMessages() + previous.getMarkerMessages());
+
+ mergedRes.setFilterAcceptedEntries(current.getFilterAcceptedEntries() + previous.getFilterAcceptedEntries());
+ mergedRes.setFilterRejectedEntries(current.getFilterRejectedEntries() + previous.getFilterRejectedEntries());
+ mergedRes.setFilterRescheduledEntries(
+ current.getFilterRescheduledEntries() + previous.getFilterRescheduledEntries());
+
+ mergedRes.setFilterAcceptedMessages(current.getFilterAcceptedMessages() + previous.getFilterAcceptedMessages());
+ mergedRes.setFilterRejectedMessages(current.getFilterRejectedMessages() + previous.getFilterRejectedMessages());
+ mergedRes.setFilterRescheduledMessages(
+ current.getFilterRescheduledMessages() + previous.getFilterRescheduledMessages());
+
+ mergedRes.setAborted(current.isAborted());
+ mergedRes.setFirstMessageId(current.getFirstMessageId());
+ mergedRes.setLastMessageId(current.getLastMessageId());
+
+ return mergedRes;
+ }
+
@Override
public Long getBacklogSizeByMessageId(String topic, MessageId messageId)
throws PulsarAdminException {
From dfd12840c4b301acdbe5772a217ad16641db96a3 Mon Sep 17 00:00:00 2001
From: Oneby Wang <891734032@qq.com>
Date: Sun, 4 Jan 2026 21:23:36 +0800
Subject: [PATCH 02/15] [improve][cli] Modify logic
---
.../org/apache/pulsar/client/admin/internal/TopicsImpl.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 7b351396de60f..e7f1d21d41b8b 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1625,7 +1625,7 @@ public void accept(AnalyzeSubscriptionBacklogResult currentResult, Throwable thr
AnalyzeSubscriptionBacklogResult mergedResult = mergeBacklogResults(currentResult, resultRef.get());
resultRef.set(mergedResult);
- if (mergedResult.isAborted() || mergedResult.getEntries() >= backlogScanMaxEntries) {
+ if (!mergedResult.isAborted() || mergedResult.getEntries() >= backlogScanMaxEntries) {
future.complete(mergedResult);
return;
}
From ba907c7ab5e323cf1054c7c4096c661d0b09afcc Mon Sep 17 00:00:00 2001
From: Oneby Wang <891734032@qq.com>
Date: Mon, 5 Jan 2026 12:38:49 +0800
Subject: [PATCH 03/15] [improve][admin] Add test
---
.../admin/AnalyzeBacklogSubscriptionTest.java | 99 +++++++++++++++++++
1 file changed, 99 insertions(+)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
index acea913204999..df608f2d2c560 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
@@ -27,6 +27,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
+import org.apache.commons.lang3.RandomUtils;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -189,4 +190,102 @@ public void partitionedTopicNotAllowed() throws Exception {
assertEquals(0, analyzeSubscriptionBacklogResult.getEntries());
}
+ @Test
+ public void analyzeBacklogUsingClientSideLooping() throws Exception {
+ int serverSubscriptionBacklogScanMaxEntries = 10;
+ conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
+
+ String topic = "persistent://my-property/my-ns/analyze-backlog-using-client-side-looping-topic";
+ String subName = "sub-1";
+ admin.topics().createSubscription(topic, subName, MessageId.latest);
+
+ assertEquals(admin.topics().getSubscriptions(topic), List.of("sub-1"));
+ verifyBacklog(topic, subName, 0, 0);
+
+ @Cleanup Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create();
+
+ // Test server returns false aborted flag.
+ int firstSendMessages = 5;
+ List> firstSendFutures = new ArrayList<>();
+ for (int i = 0; i < firstSendMessages; i++) {
+ CompletableFuture future = producer.sendAsync(("first-" + i).getBytes());
+ firstSendFutures.add(future);
+ }
+ FutureUtil.waitForAll(firstSendFutures).get();
+
+ AnalyzeSubscriptionBacklogResult backlogResult =
+ admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), 15);
+
+ assertEquals(backlogResult.getEntries(), firstSendMessages);
+ assertEquals(backlogResult.getMessages(), firstSendMessages);
+
+ // Test backlogScanMaxEntries(client side) <= subscriptionBacklogScanMaxEntries(server side), but server returns
+ // true aborted flag.
+ int secondSendMessages = 10;
+ List> secondSendFutures = new ArrayList<>();
+ for (int i = 0; i < secondSendMessages; i++) {
+ CompletableFuture future = producer.sendAsync(("second-" + i).getBytes());
+ secondSendFutures.add(future);
+ }
+ FutureUtil.waitForAll(secondSendFutures).get();
+
+ backlogResult = admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(),
+ serverSubscriptionBacklogScanMaxEntries - 1);
+
+ assertEquals(backlogResult.getEntries(), serverSubscriptionBacklogScanMaxEntries);
+ assertEquals(backlogResult.getMessages(), serverSubscriptionBacklogScanMaxEntries);
+
+ // Test client side loop: backlogScanMaxEntries > subscriptionBacklogScanMaxEntries, the loop termination
+ // condition is that server returns false aborted flag.
+ int thirdSendMessages = 10;
+ List> thirdFutures = new ArrayList<>();
+ for (int i = 0; i < thirdSendMessages; i++) {
+ CompletableFuture future = producer.sendAsync(("third-" + i).getBytes());
+ thirdFutures.add(future);
+ }
+ FutureUtil.waitForAll(thirdFutures).get();
+
+ int thirdTotalMessages = firstSendMessages + secondSendMessages + thirdSendMessages;
+ backlogResult =
+ admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), thirdTotalMessages + 1);
+
+ assertEquals(backlogResult.getEntries(), thirdTotalMessages);
+ assertEquals(backlogResult.getMessages(), thirdTotalMessages);
+
+ // Test client side loop: backlogScanMaxEntries > subscriptionBacklogScanMaxEntries, the loop termination
+ // condition is that total entries exceeds backlogScanMaxEntries.
+ int fourthSendMessages = 10;
+ List> fourthFutures = new ArrayList<>();
+ for (int i = 0; i < fourthSendMessages; i++) {
+ CompletableFuture future = producer.sendAsync(("fourth-" + i).getBytes());
+ fourthFutures.add(future);
+ }
+ FutureUtil.waitForAll(fourthFutures).get();
+
+ backlogResult = admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), thirdTotalMessages);
+
+ int fourthExpectedMessages = (thirdTotalMessages / serverSubscriptionBacklogScanMaxEntries)
+ + serverSubscriptionBacklogScanMaxEntries;
+ assertEquals(backlogResult.getEntries(), fourthExpectedMessages);
+ assertEquals(backlogResult.getMessages(), fourthExpectedMessages);
+
+ // Test client side loop with topic unload.
+ admin.topics().unload(topic);
+ int fifthSendMessages = 10;
+ List> fifthFutures = new ArrayList<>();
+ for (int i = 0; i < fifthSendMessages; i++) {
+ CompletableFuture future = producer.sendAsync(("fifth-" + i).getBytes());
+ fifthFutures.add(future);
+ if (RandomUtils.secure().randomBoolean()) {
+ admin.topics().unload(topic);
+ }
+ }
+ FutureUtil.waitForAll(fifthFutures).get();
+
+ int fifthTotalMessages = thirdTotalMessages + fourthSendMessages + fifthSendMessages;
+ backlogResult = admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), fifthTotalMessages);
+
+ assertEquals(backlogResult.getEntries(), fifthTotalMessages);
+ assertEquals(backlogResult.getMessages(), fifthTotalMessages);
+ }
}
From 6ec6b93a8deac1453e32b57cbac63e9171fae0d4 Mon Sep 17 00:00:00 2001
From: Oneby Wang <891734032@qq.com>
Date: Mon, 5 Jan 2026 16:25:55 +0800
Subject: [PATCH 04/15] [improve][admin] Optimize test
---
.../bookkeeper/mledger/impl/OpScan.java | 3 +-
.../admin/AnalyzeBacklogSubscriptionTest.java | 225 +++++++++++++-----
2 files changed, 161 insertions(+), 67 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
index 732071ee01a6a..6f9b87b245ec3 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
@@ -91,7 +91,8 @@ public void readEntriesComplete(List entries, Object ctx) {
searchPosition = ledger.getPositionAfterN(lastPositionForBatch, 1,
PositionBound.startExcluded);
if (log.isDebugEnabled()) {
- log.debug("readEntryComplete {} at {} next is {}", lastPositionForBatch, searchPosition);
+ log.debug("[{}] readEntryComplete at {} next is {}", OpScan.this.cursor, lastPositionForBatch,
+ searchPosition);
}
if (searchPosition.compareTo(lastPositionForBatch) == 0) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
index df608f2d2c560..1231d54332e07 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertThrows;
@@ -51,6 +52,12 @@ public void setup() throws Exception {
producerBaseSetup();
}
+ @Override
+ protected void doInitConf() throws Exception {
+ super.doInitConf();
+ conf.setDispatcherMaxReadBatchSize(10);
+ }
+
@AfterMethod(alwaysRun = true)
@Override
public void cleanup() throws Exception {
@@ -191,101 +198,187 @@ public void partitionedTopicNotAllowed() throws Exception {
}
@Test
- public void analyzeBacklogUsingClientSideLooping() throws Exception {
- int serverSubscriptionBacklogScanMaxEntries = 10;
+ public void analyzeBacklogServerReturnFalseAbortedFlagWithoutLoop() throws Exception {
+ long serverSubscriptionBacklogScanMaxEntries = 20;
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
- String topic = "persistent://my-property/my-ns/analyze-backlog-using-client-side-looping-topic";
+ String topic = "persistent://my-property/my-ns/analyze-backlog-server-return-false-aborted-flag-without-loop";
String subName = "sub-1";
- admin.topics().createSubscription(topic, subName, MessageId.latest);
-
- assertEquals(admin.topics().getSubscriptions(topic), List.of("sub-1"));
- verifyBacklog(topic, subName, 0, 0);
-
- @Cleanup Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create();
+ int numMessages = 10;
// Test server returns false aborted flag.
- int firstSendMessages = 5;
- List> firstSendFutures = new ArrayList<>();
- for (int i = 0; i < firstSendMessages; i++) {
- CompletableFuture future = producer.sendAsync(("first-" + i).getBytes());
- firstSendFutures.add(future);
- }
- FutureUtil.waitForAll(firstSendFutures).get();
+ clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages);
AnalyzeSubscriptionBacklogResult backlogResult =
- admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), 15);
-
- assertEquals(backlogResult.getEntries(), firstSendMessages);
- assertEquals(backlogResult.getMessages(), firstSendMessages);
-
- // Test backlogScanMaxEntries(client side) <= subscriptionBacklogScanMaxEntries(server side), but server returns
- // true aborted flag.
- int secondSendMessages = 10;
- List> secondSendFutures = new ArrayList<>();
- for (int i = 0; i < secondSendMessages; i++) {
- CompletableFuture future = producer.sendAsync(("second-" + i).getBytes());
- secondSendFutures.add(future);
- }
- FutureUtil.waitForAll(secondSendFutures).get();
+ admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), numMessages - 1);
- backlogResult = admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(),
- serverSubscriptionBacklogScanMaxEntries - 1);
+ assertEquals(backlogResult.getEntries(), numMessages);
+ assertEquals(backlogResult.getMessages(), numMessages);
+ }
+
+ @Test
+ public void analyzeBacklogMaxEntriesExceedWithoutLoop() throws Exception {
+ long serverSubscriptionBacklogScanMaxEntries = 20;
+ conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
+
+ String topic = "persistent://my-property/my-ns/analyze-backlog-max-entries-exceed-without-loop";
+ String subName = "sub-1";
+ int numMessages = 25;
+
+ // Test backlogScanMaxEntries(client side) <= subscriptionBacklogScanMaxEntries(server side), but server
+ // returns true aborted flag. Server dispatcherMaxReadBatchSize is set to 10.
+ clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages);
+
+ AnalyzeSubscriptionBacklogResult backlogResult = admin.topics()
+ .analyzeSubscriptionBacklog(topic, subName, Optional.empty(),
+ serverSubscriptionBacklogScanMaxEntries - 1);
assertEquals(backlogResult.getEntries(), serverSubscriptionBacklogScanMaxEntries);
assertEquals(backlogResult.getMessages(), serverSubscriptionBacklogScanMaxEntries);
+ }
+
+ @Test
+ public void analyzeBacklogServerReturnFalseAbortedFlagWithLoop() throws Exception {
+ long serverSubscriptionBacklogScanMaxEntries = 20;
+ conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
+
+ String topic = "persistent://my-property/my-ns/analyze-backlog-server-return-false-aborted-flag-with-loop";
+ String subName = "sub-1";
+ int numMessages = 45;
// Test client side loop: backlogScanMaxEntries > subscriptionBacklogScanMaxEntries, the loop termination
// condition is that server returns false aborted flag.
- int thirdSendMessages = 10;
- List> thirdFutures = new ArrayList<>();
- for (int i = 0; i < thirdSendMessages; i++) {
- CompletableFuture future = producer.sendAsync(("third-" + i).getBytes());
- thirdFutures.add(future);
- }
- FutureUtil.waitForAll(thirdFutures).get();
+ clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages);
- int thirdTotalMessages = firstSendMessages + secondSendMessages + thirdSendMessages;
- backlogResult =
- admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), thirdTotalMessages + 1);
+ AnalyzeSubscriptionBacklogResult backlogResult =
+ admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), numMessages);
+
+ assertEquals(backlogResult.getEntries(), numMessages);
+ assertEquals(backlogResult.getMessages(), numMessages);
+ }
+
+ @Test
+ public void analyzeBacklogMaxEntriesExceedWithLoop() throws Exception {
+ long serverSubscriptionBacklogScanMaxEntries = 15;
+ conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
- assertEquals(backlogResult.getEntries(), thirdTotalMessages);
- assertEquals(backlogResult.getMessages(), thirdTotalMessages);
+ String topic = "persistent://my-property/my-ns/analyze-backlog-max-entries-exceed-with-loop";
+ String subName = "sub-1";
+ int numMessages = 55;
+ int backlogScanMaxEntries = 40;
// Test client side loop: backlogScanMaxEntries > subscriptionBacklogScanMaxEntries, the loop termination
// condition is that total entries exceeds backlogScanMaxEntries.
- int fourthSendMessages = 10;
- List> fourthFutures = new ArrayList<>();
- for (int i = 0; i < fourthSendMessages; i++) {
- CompletableFuture future = producer.sendAsync(("fourth-" + i).getBytes());
- fourthFutures.add(future);
- }
- FutureUtil.waitForAll(fourthFutures).get();
+ // Server dispatcherMaxReadBatchSize is set to 10.
+ clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages);
- backlogResult = admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), thirdTotalMessages);
+ AnalyzeSubscriptionBacklogResult backlogResult =
+ admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), backlogScanMaxEntries);
- int fourthExpectedMessages = (thirdTotalMessages / serverSubscriptionBacklogScanMaxEntries)
+ // Broker returns 15 + 15 + 15 = 45 entries.
+ long expectedEntries = backlogScanMaxEntries / serverSubscriptionBacklogScanMaxEntries
+ serverSubscriptionBacklogScanMaxEntries;
- assertEquals(backlogResult.getEntries(), fourthExpectedMessages);
- assertEquals(backlogResult.getMessages(), fourthExpectedMessages);
+ assertEquals(backlogResult.getEntries(), expectedEntries);
+ assertEquals(backlogResult.getMessages(), expectedEntries);
+ }
+
+ @Test
+ public void analyzeBacklogWithTopicUnload() throws Exception {
+ long serverSubscriptionBacklogScanMaxEntries = 10;
+ conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
+
+ String topic = "persistent://my-property/my-ns/analyze-backlog-with-topic-unload";
+ String subName = "sub-1";
+ int numMessages = 35;
+
+ admin.topics().createSubscription(topic, subName, MessageId.latest);
+
+ assertEquals(admin.topics().getSubscriptions(topic), List.of("sub-1"));
+ verifyBacklog(topic, subName, 0, 0);
// Test client side loop with topic unload.
- admin.topics().unload(topic);
- int fifthSendMessages = 10;
- List> fifthFutures = new ArrayList<>();
- for (int i = 0; i < fifthSendMessages; i++) {
- CompletableFuture future = producer.sendAsync(("fifth-" + i).getBytes());
- fifthFutures.add(future);
+ @Cleanup Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create();
+ List> futures = new ArrayList<>();
+ for (int i = 0; i < numMessages; i++) {
+ CompletableFuture future = producer.sendAsync(("test-" + i).getBytes());
+ futures.add(future);
if (RandomUtils.secure().randomBoolean()) {
admin.topics().unload(topic);
}
}
- FutureUtil.waitForAll(fifthFutures).get();
+ FutureUtil.waitForAll(futures).get();
+
+ AnalyzeSubscriptionBacklogResult backlogResult =
+ admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), numMessages);
+
+ assertEquals(backlogResult.getEntries(), numMessages);
+ assertEquals(backlogResult.getMessages(), numMessages);
+ }
+
+ @Test
+ public void analyzeBacklogWithIndividualAck() throws Exception {
+ long serverSubscriptionBacklogScanMaxEntries = 20;
+ conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
+
+ String topic = "persistent://my-property/my-ns/analyze-backlog-with-topic-Unloaded";
+ String subName = "sub-1";
+ int messages = 55;
+
+ // Test client side loop with individual ack.
+ clientSideLoopAnalyzeBacklogSetup(topic, subName, messages);
+
+ // we want to wait for the server to process acks, in order to not have a flaky test.
+ @Cleanup Consumer consumer =
+ pulsarClient.newConsumer().topic(topic).isAckReceiptEnabled(true).subscriptionName(subName)
+ .subscriptionType(SubscriptionType.Shared).subscribe();
+
+ // Individual ack message2.
+ Message message1 = consumer.receive();
+ Message message2 = consumer.receive();
+ consumer.acknowledge(message2);
+
+ long backlogScanMaxEntries = 20;
+ AnalyzeSubscriptionBacklogResult backlogResult =
+ admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), backlogScanMaxEntries);
+
+ // Broker will filter deleted entries, total entries is supposed to be 19 + 20 = 39.
+ assertThat(backlogResult.getEntries()).isBetween(backlogScanMaxEntries,
+ backlogScanMaxEntries + serverSubscriptionBacklogScanMaxEntries - 1);
+
+ // Ack message1 and message2.
+ consumer.acknowledge(message1);
+
+ backlogResult =
+ admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), backlogScanMaxEntries);
+
+ assertEquals(backlogResult.getEntries(), backlogScanMaxEntries);
+ assertEquals(backlogResult.getMessages(), backlogScanMaxEntries);
+
+ // Ack all messages.
+ for (int i = 2; i < messages; i++) {
+ Message message = consumer.receive();
+ consumer.acknowledge(message);
+ }
+
+ backlogResult =
+ admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), backlogScanMaxEntries);
+ assertEquals(backlogResult.getEntries(), 0);
+ assertEquals(backlogResult.getMessages(), 0);
+ }
- int fifthTotalMessages = thirdTotalMessages + fourthSendMessages + fifthSendMessages;
- backlogResult = admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), fifthTotalMessages);
+ private void clientSideLoopAnalyzeBacklogSetup(String topic, String subName, int numMessages) throws Exception {
+ admin.topics().createSubscription(topic, subName, MessageId.latest);
+
+ assertEquals(admin.topics().getSubscriptions(topic), List.of("sub-1"));
+ verifyBacklog(topic, subName, 0, 0);
- assertEquals(backlogResult.getEntries(), fifthTotalMessages);
- assertEquals(backlogResult.getMessages(), fifthTotalMessages);
+ @Cleanup Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create();
+ List> futures = new ArrayList<>();
+ for (int i = 0; i < numMessages; i++) {
+ CompletableFuture future = producer.sendAsync(("test-" + i).getBytes());
+ futures.add(future);
+ }
+ FutureUtil.waitForAll(futures).get();
}
+
}
From 6d7d2785d2156ab5ffee3a91a4dd020e7bd4f2c5 Mon Sep 17 00:00:00 2001
From: Oneby Wang <891734032@qq.com>
Date: Mon, 5 Jan 2026 16:45:53 +0800
Subject: [PATCH 05/15] [improve][admin] Optimize test
---
.../pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
index 1231d54332e07..5ccca4d429134 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
@@ -320,7 +320,7 @@ public void analyzeBacklogWithIndividualAck() throws Exception {
long serverSubscriptionBacklogScanMaxEntries = 20;
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
- String topic = "persistent://my-property/my-ns/analyze-backlog-with-topic-Unloaded";
+ String topic = "persistent://my-property/my-ns/analyze-backlog-with-individual-ack";
String subName = "sub-1";
int messages = 55;
From 5929036d5c01b6de90bdd981994399974784c65d Mon Sep 17 00:00:00 2001
From: Oneby Wang <891734032@qq.com>
Date: Mon, 5 Jan 2026 22:05:46 +0800
Subject: [PATCH 06/15] [improve][admin] Optimize code, fix test
---
.../bookkeeper/mledger/impl/OpScan.java | 3 ++-
.../admin/AnalyzeBacklogSubscriptionTest.java | 23 +++++++++----------
.../client/admin/internal/TopicsImpl.java | 10 ++++++++
3 files changed, 23 insertions(+), 13 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
index 6f9b87b245ec3..90d5dd0dffe7f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
@@ -69,6 +69,7 @@ public void readEntriesComplete(List entries, Object ctx) {
try {
Position lastPositionForBatch = entries.get(entries.size() - 1).getPosition();
lastSeenPosition = lastPositionForBatch;
+ // TODO This filter operation is not needed, already handled by OpReadEntry.
// filter out the entry if it has been already deleted
// filterReadEntries will call entry.release if the entry is filtered out
List entriesFiltered = this.cursor.filterReadEntries(entries);
@@ -76,7 +77,7 @@ public void readEntriesComplete(List entries, Object ctx) {
remainingEntries.addAndGet(-skippedEntries);
if (!entriesFiltered.isEmpty()) {
for (Entry entry : entriesFiltered) {
- if (remainingEntries.decrementAndGet() <= 0) {
+ if (remainingEntries.getAndDecrement() <= 0) {
log.warn("[{}] Scan abort after reading too many entries", OpScan.this.cursor);
callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx);
return;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
index 5ccca4d429134..b43cb0c69a25f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
@@ -276,8 +276,8 @@ public void analyzeBacklogMaxEntriesExceedWithLoop() throws Exception {
admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), backlogScanMaxEntries);
// Broker returns 15 + 15 + 15 = 45 entries.
- long expectedEntries = backlogScanMaxEntries / serverSubscriptionBacklogScanMaxEntries
- + serverSubscriptionBacklogScanMaxEntries;
+ long expectedEntries = (backlogScanMaxEntries / serverSubscriptionBacklogScanMaxEntries + 1)
+ * serverSubscriptionBacklogScanMaxEntries;
assertEquals(backlogResult.getEntries(), expectedEntries);
assertEquals(backlogResult.getMessages(), expectedEntries);
}
@@ -296,17 +296,14 @@ public void analyzeBacklogWithTopicUnload() throws Exception {
assertEquals(admin.topics().getSubscriptions(topic), List.of("sub-1"));
verifyBacklog(topic, subName, 0, 0);
- // Test client side loop with topic unload.
+ // Test client side loop with topic unload. Use sync send method here to avoid potential message duplication.
@Cleanup Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create();
- List> futures = new ArrayList<>();
for (int i = 0; i < numMessages; i++) {
- CompletableFuture future = producer.sendAsync(("test-" + i).getBytes());
- futures.add(future);
+ producer.send(("test-" + i).getBytes());
if (RandomUtils.secure().randomBoolean()) {
admin.topics().unload(topic);
}
}
- FutureUtil.waitForAll(futures).get();
AnalyzeSubscriptionBacklogResult backlogResult =
admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), numMessages);
@@ -341,11 +338,10 @@ public void analyzeBacklogWithIndividualAck() throws Exception {
AnalyzeSubscriptionBacklogResult backlogResult =
admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), backlogScanMaxEntries);
- // Broker will filter deleted entries, total entries is supposed to be 19 + 20 = 39.
- assertThat(backlogResult.getEntries()).isBetween(backlogScanMaxEntries,
- backlogScanMaxEntries + serverSubscriptionBacklogScanMaxEntries - 1);
+ assertThat(backlogResult.getEntries()).isEqualTo(backlogScanMaxEntries);
+ assertThat(backlogResult.getMessages()).isEqualTo(backlogScanMaxEntries);
- // Ack message1 and message2.
+ // Ack message1.
consumer.acknowledge(message1);
backlogResult =
@@ -370,7 +366,10 @@ private void clientSideLoopAnalyzeBacklogSetup(String topic, String subName, int
admin.topics().createSubscription(topic, subName, MessageId.latest);
assertEquals(admin.topics().getSubscriptions(topic), List.of("sub-1"));
- verifyBacklog(topic, subName, 0, 0);
+ AnalyzeSubscriptionBacklogResult backlogResult =
+ admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), 1);
+ assertEquals(backlogResult.getEntries(), 0);
+ assertEquals(backlogResult.getMessages(), 0);
@Cleanup Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create();
List> futures = new ArrayList<>();
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index e7f1d21d41b8b..dfb7a6cfb07b8 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -47,6 +47,7 @@
import javax.ws.rs.core.MultivaluedMap;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.GetStatsOptions;
import org.apache.pulsar.client.admin.ListTopicsOptions;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
@@ -1630,6 +1631,15 @@ public void accept(AnalyzeSubscriptionBacklogResult currentResult, Throwable thr
return;
}
+ // In analyze-backlog, lastMessageId is null only when: total entries is 0,
+ // with false aborted flag returned.
+ if (StringUtils.isBlank(mergedResult.getLastMessageId())) {
+ log.warn("[{}][{}] Scanned last message id is blank, abort analyze backlog, start position is: {}",
+ topic, subscriptionName, startPositionRef.get());
+ future.completeExceptionally(
+ new PulsarAdminException("Incorrect last message id returned from server"));
+ }
+
String[] messageIdSplits = mergedResult.getLastMessageId().split(":");
MessageIdImpl nextScanMessageId =
new MessageIdImpl(Long.parseLong(messageIdSplits[0]), Long.parseLong(messageIdSplits[1]) + 1,
From 6309d4b23d9deb4b65e6bb5bae9e4803a7592253 Mon Sep 17 00:00:00 2001
From: Oneby Wang <891734032@qq.com>
Date: Tue, 6 Jan 2026 09:50:56 +0800
Subject: [PATCH 07/15] [improve][admin] Add guard safe
---
.../apache/pulsar/client/admin/internal/TopicsImpl.java | 8 ++++++++
1 file changed, 8 insertions(+)
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index dfb7a6cfb07b8..2f95812f41b4f 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1631,6 +1631,14 @@ public void accept(AnalyzeSubscriptionBacklogResult currentResult, Throwable thr
return;
}
+ // To avoid infinite loops, we ensure the entry count is incremented after each loop.
+ if (currentResult.getEntries() <= 0) {
+ log.warn("[{}][{}] Scanned total entry count is null, abort analyze backlog, start position is: {}",
+ topic, subscriptionName, startPositionRef.get());
+ future.completeExceptionally(
+ new PulsarAdminException("Incorrect total entry count returned from server"));
+ }
+
// In analyze-backlog, lastMessageId is null only when: total entries is 0,
// with false aborted flag returned.
if (StringUtils.isBlank(mergedResult.getLastMessageId())) {
From e1962091f3fb29810513b7f4ebb73c8c7f9216ab Mon Sep 17 00:00:00 2001
From: Oneby Wang <891734032@qq.com>
Date: Tue, 6 Jan 2026 10:28:30 +0800
Subject: [PATCH 08/15] [improve][admin] Optimize comments
---
.../pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java | 2 +-
.../src/main/java/org/apache/pulsar/client/admin/Topics.java | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
index b43cb0c69a25f..8016cf1ed6c36 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
@@ -324,7 +324,7 @@ public void analyzeBacklogWithIndividualAck() throws Exception {
// Test client side loop with individual ack.
clientSideLoopAnalyzeBacklogSetup(topic, subName, messages);
- // we want to wait for the server to process acks, in order to not have a flaky test.
+ // We want to wait for the server to process acks, in order to not have a flaky test.
@Cleanup Consumer consumer =
pulsarClient.newConsumer().topic(topic).isAckReceiptEnabled(true).subscriptionName(subName)
.subscriptionType(SubscriptionType.Shared).subscribe();
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 2ab685100ad5a..63a817dca309b 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -2284,7 +2284,7 @@ CompletableFuture analyzeSubscriptionBacklogAs
* 4. This means that backlogScanMaxEntries cannot be used to precisely control the number of entries scanned by
* the server, it only serves to determine when the loop should terminate.
* 5. With this method, the server can reduce the values of the two parameters subscriptionBacklogScanMaxTimeMs and
- * subscriptionBacklogScanMaxEntries, and then retrieve the desired number of backlog entries through
+ * subscriptionBacklogScanMaxEntries, so user can retrieve the desired number of backlog entries through
* client-side looping.
*
* @param topic
From 825286da3bad5349c41cc2630694b43bef973bc3 Mon Sep 17 00:00:00 2001
From: Oneby Wang <891734032@qq.com>
Date: Tue, 6 Jan 2026 12:53:48 +0800
Subject: [PATCH 09/15] [improve][admin] Remove entry filter logic in OpScan
---
.../bookkeeper/mledger/impl/OpScan.java | 28 +++++++------------
1 file changed, 10 insertions(+), 18 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
index 90d5dd0dffe7f..202ba099c5de2 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
@@ -69,24 +69,16 @@ public void readEntriesComplete(List entries, Object ctx) {
try {
Position lastPositionForBatch = entries.get(entries.size() - 1).getPosition();
lastSeenPosition = lastPositionForBatch;
- // TODO This filter operation is not needed, already handled by OpReadEntry.
- // filter out the entry if it has been already deleted
- // filterReadEntries will call entry.release if the entry is filtered out
- List entriesFiltered = this.cursor.filterReadEntries(entries);
- int skippedEntries = entries.size() - entriesFiltered.size();
- remainingEntries.addAndGet(-skippedEntries);
- if (!entriesFiltered.isEmpty()) {
- for (Entry entry : entriesFiltered) {
- if (remainingEntries.getAndDecrement() <= 0) {
- log.warn("[{}] Scan abort after reading too many entries", OpScan.this.cursor);
- callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx);
- return;
- }
- if (!condition.test(entry)) {
- log.warn("[{}] Scan abort due to user code", OpScan.this.cursor);
- callback.scanComplete(lastSeenPosition, ScanOutcome.USER_INTERRUPTED, OpScan.this.ctx);
- return;
- }
+ for (Entry entry : entries) {
+ if (remainingEntries.getAndDecrement() <= 0) {
+ log.warn("[{}] Scan abort after reading too many entries", OpScan.this.cursor);
+ callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx);
+ return;
+ }
+ if (!condition.test(entry)) {
+ log.warn("[{}] Scan abort due to user code", OpScan.this.cursor);
+ callback.scanComplete(lastSeenPosition, ScanOutcome.USER_INTERRUPTED, OpScan.this.ctx);
+ return;
}
}
searchPosition = ledger.getPositionAfterN(lastPositionForBatch, 1,
From 9bc2cba004f90121c630a7cbd87ebdf018756fc3 Mon Sep 17 00:00:00 2001
From: Oneby Wang <891734032@qq.com>
Date: Thu, 8 Jan 2026 19:25:38 +0800
Subject: [PATCH 10/15] [improve][admin] Apply copilot comments, optimize tests
---
.../admin/AnalyzeBacklogSubscriptionTest.java | 124 +++++++++---------
.../apache/pulsar/client/admin/Topics.java | 4 +-
.../client/admin/internal/TopicsImpl.java | 8 +-
3 files changed, 71 insertions(+), 65 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
index 8016cf1ed6c36..1304417d29a7e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
@@ -18,9 +18,9 @@
*/
package org.apache.pulsar.broker.admin;
-import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertThrows;
import java.util.ArrayList;
import java.util.List;
@@ -33,6 +33,7 @@
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.SubscriptionType;
@@ -199,7 +200,7 @@ public void partitionedTopicNotAllowed() throws Exception {
@Test
public void analyzeBacklogServerReturnFalseAbortedFlagWithoutLoop() throws Exception {
- long serverSubscriptionBacklogScanMaxEntries = 20;
+ int serverSubscriptionBacklogScanMaxEntries = 20;
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
String topic = "persistent://my-property/my-ns/analyze-backlog-server-return-false-aborted-flag-without-loop";
@@ -207,18 +208,15 @@ public void analyzeBacklogServerReturnFalseAbortedFlagWithoutLoop() throws Excep
int numMessages = 10;
// Test server returns false aborted flag.
- clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages);
+ List messageIds = clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages);
- AnalyzeSubscriptionBacklogResult backlogResult =
- admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), numMessages - 1);
-
- assertEquals(backlogResult.getEntries(), numMessages);
- assertEquals(backlogResult.getMessages(), numMessages);
+ verifyClientSideLoopBacklog(topic, subName, numMessages - 1, numMessages, messageIds.get(0),
+ messageIds.get(numMessages - 1));
}
@Test
public void analyzeBacklogMaxEntriesExceedWithoutLoop() throws Exception {
- long serverSubscriptionBacklogScanMaxEntries = 20;
+ int serverSubscriptionBacklogScanMaxEntries = 20;
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
String topic = "persistent://my-property/my-ns/analyze-backlog-max-entries-exceed-without-loop";
@@ -227,19 +225,17 @@ public void analyzeBacklogMaxEntriesExceedWithoutLoop() throws Exception {
// Test backlogScanMaxEntries(client side) <= subscriptionBacklogScanMaxEntries(server side), but server
// returns true aborted flag. Server dispatcherMaxReadBatchSize is set to 10.
- clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages);
+ List messageIds = clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages);
- AnalyzeSubscriptionBacklogResult backlogResult = admin.topics()
- .analyzeSubscriptionBacklog(topic, subName, Optional.empty(),
- serverSubscriptionBacklogScanMaxEntries - 1);
+ verifyClientSideLoopBacklog(topic, subName, serverSubscriptionBacklogScanMaxEntries - 1,
+ serverSubscriptionBacklogScanMaxEntries, messageIds.get(0),
+ messageIds.get(serverSubscriptionBacklogScanMaxEntries - 1));
- assertEquals(backlogResult.getEntries(), serverSubscriptionBacklogScanMaxEntries);
- assertEquals(backlogResult.getMessages(), serverSubscriptionBacklogScanMaxEntries);
}
@Test
public void analyzeBacklogServerReturnFalseAbortedFlagWithLoop() throws Exception {
- long serverSubscriptionBacklogScanMaxEntries = 20;
+ int serverSubscriptionBacklogScanMaxEntries = 20;
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
String topic = "persistent://my-property/my-ns/analyze-backlog-server-return-false-aborted-flag-with-loop";
@@ -248,18 +244,15 @@ public void analyzeBacklogServerReturnFalseAbortedFlagWithLoop() throws Exceptio
// Test client side loop: backlogScanMaxEntries > subscriptionBacklogScanMaxEntries, the loop termination
// condition is that server returns false aborted flag.
- clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages);
-
- AnalyzeSubscriptionBacklogResult backlogResult =
- admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), numMessages);
+ List messageIds = clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages);
- assertEquals(backlogResult.getEntries(), numMessages);
- assertEquals(backlogResult.getMessages(), numMessages);
+ verifyClientSideLoopBacklog(topic, subName, numMessages, numMessages, messageIds.get(0),
+ messageIds.get(numMessages - 1));
}
@Test
public void analyzeBacklogMaxEntriesExceedWithLoop() throws Exception {
- long serverSubscriptionBacklogScanMaxEntries = 15;
+ int serverSubscriptionBacklogScanMaxEntries = 15;
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
String topic = "persistent://my-property/my-ns/analyze-backlog-max-entries-exceed-with-loop";
@@ -270,21 +263,18 @@ public void analyzeBacklogMaxEntriesExceedWithLoop() throws Exception {
// Test client side loop: backlogScanMaxEntries > subscriptionBacklogScanMaxEntries, the loop termination
// condition is that total entries exceeds backlogScanMaxEntries.
// Server dispatcherMaxReadBatchSize is set to 10.
- clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages);
-
- AnalyzeSubscriptionBacklogResult backlogResult =
- admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), backlogScanMaxEntries);
+ List messageIds = clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages);
// Broker returns 15 + 15 + 15 = 45 entries.
- long expectedEntries = (backlogScanMaxEntries / serverSubscriptionBacklogScanMaxEntries + 1)
+ int expectedEntries = (backlogScanMaxEntries / serverSubscriptionBacklogScanMaxEntries + 1)
* serverSubscriptionBacklogScanMaxEntries;
- assertEquals(backlogResult.getEntries(), expectedEntries);
- assertEquals(backlogResult.getMessages(), expectedEntries);
+ verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries, expectedEntries, messageIds.get(0),
+ messageIds.get(expectedEntries - 1));
}
@Test
public void analyzeBacklogWithTopicUnload() throws Exception {
- long serverSubscriptionBacklogScanMaxEntries = 10;
+ int serverSubscriptionBacklogScanMaxEntries = 10;
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
String topic = "persistent://my-property/my-ns/analyze-backlog-with-topic-unload";
@@ -298,23 +288,22 @@ public void analyzeBacklogWithTopicUnload() throws Exception {
// Test client side loop with topic unload. Use sync send method here to avoid potential message duplication.
@Cleanup Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create();
+ List messageIds = new ArrayList<>();
for (int i = 0; i < numMessages; i++) {
- producer.send(("test-" + i).getBytes());
+ MessageId messageId = producer.send(("test-" + i).getBytes());
+ messageIds.add(messageId);
if (RandomUtils.secure().randomBoolean()) {
admin.topics().unload(topic);
}
}
- AnalyzeSubscriptionBacklogResult backlogResult =
- admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), numMessages);
-
- assertEquals(backlogResult.getEntries(), numMessages);
- assertEquals(backlogResult.getMessages(), numMessages);
+ verifyClientSideLoopBacklog(topic, subName, numMessages, numMessages, messageIds.get(0),
+ messageIds.get(numMessages - 1));
}
@Test
public void analyzeBacklogWithIndividualAck() throws Exception {
- long serverSubscriptionBacklogScanMaxEntries = 20;
+ int serverSubscriptionBacklogScanMaxEntries = 20;
conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries);
String topic = "persistent://my-property/my-ns/analyze-backlog-with-individual-ack";
@@ -322,7 +311,7 @@ public void analyzeBacklogWithIndividualAck() throws Exception {
int messages = 55;
// Test client side loop with individual ack.
- clientSideLoopAnalyzeBacklogSetup(topic, subName, messages);
+ List messageIds = clientSideLoopAnalyzeBacklogSetup(topic, subName, messages);
// We want to wait for the server to process acks, in order to not have a flaky test.
@Cleanup Consumer consumer =
@@ -334,21 +323,14 @@ public void analyzeBacklogWithIndividualAck() throws Exception {
Message message2 = consumer.receive();
consumer.acknowledge(message2);
- long backlogScanMaxEntries = 20;
- AnalyzeSubscriptionBacklogResult backlogResult =
- admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), backlogScanMaxEntries);
-
- assertThat(backlogResult.getEntries()).isEqualTo(backlogScanMaxEntries);
- assertThat(backlogResult.getMessages()).isEqualTo(backlogScanMaxEntries);
+ int backlogScanMaxEntries = 20;
+ verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries, backlogScanMaxEntries, messageIds.get(0),
+ messageIds.get(backlogScanMaxEntries));
// Ack message1.
consumer.acknowledge(message1);
-
- backlogResult =
- admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), backlogScanMaxEntries);
-
- assertEquals(backlogResult.getEntries(), backlogScanMaxEntries);
- assertEquals(backlogResult.getMessages(), backlogScanMaxEntries);
+ verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries, backlogScanMaxEntries, messageIds.get(2),
+ messageIds.get(backlogScanMaxEntries + 1));
// Ack all messages.
for (int i = 2; i < messages; i++) {
@@ -356,20 +338,15 @@ public void analyzeBacklogWithIndividualAck() throws Exception {
consumer.acknowledge(message);
}
- backlogResult =
- admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), backlogScanMaxEntries);
- assertEquals(backlogResult.getEntries(), 0);
- assertEquals(backlogResult.getMessages(), 0);
+ verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries, 0, null,
+ null);
}
- private void clientSideLoopAnalyzeBacklogSetup(String topic, String subName, int numMessages) throws Exception {
+ private List clientSideLoopAnalyzeBacklogSetup(String topic, String subName, int numMessages) throws Exception {
admin.topics().createSubscription(topic, subName, MessageId.latest);
assertEquals(admin.topics().getSubscriptions(topic), List.of("sub-1"));
- AnalyzeSubscriptionBacklogResult backlogResult =
- admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), 1);
- assertEquals(backlogResult.getEntries(), 0);
- assertEquals(backlogResult.getMessages(), 0);
+ verifyClientSideLoopBacklog(topic, subName, -1, 0, null, null);
@Cleanup Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create();
List> futures = new ArrayList<>();
@@ -378,6 +355,33 @@ private void clientSideLoopAnalyzeBacklogSetup(String topic, String subName, int
futures.add(future);
}
FutureUtil.waitForAll(futures).get();
+ return futures.stream().map(CompletableFuture::join).toList();
+ }
+
+ private void verifyClientSideLoopBacklog(String topic, String subName, int backlogMaxScanEntries,
+ int expectedEntries, MessageId firstMessageId,
+ MessageId lastMessageId) throws Exception {
+ AnalyzeSubscriptionBacklogResult backlogResult =
+ admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), backlogMaxScanEntries);
+
+ assertEquals(backlogResult.getEntries(), expectedEntries);
+ assertEquals(backlogResult.getMessages(), expectedEntries);
+
+ if (firstMessageId == null) {
+ assertNull(backlogResult.getFirstMessageId());
+ } else {
+ MessageIdAdv firstMessageIdAdv = (MessageIdAdv) firstMessageId;
+ assertEquals(backlogResult.getFirstMessageId(),
+ firstMessageIdAdv.getLedgerId() + ":" + firstMessageIdAdv.getEntryId());
+ }
+
+ if (lastMessageId == null) {
+ assertNull(backlogResult.getLastMessageId());
+ } else {
+ MessageIdAdv lastMessageIdAdv = (MessageIdAdv) lastMessageId;
+ assertEquals(backlogResult.getLastMessageId(),
+ lastMessageIdAdv.getLedgerId() + ":" + lastMessageIdAdv.getEntryId());
+ }
}
}
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 63a817dca309b..3ac148191dce6 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -2271,7 +2271,7 @@ CompletableFuture analyzeSubscriptionBacklogAs
* Increasing these settings is possible. However, it's possible that the HTTP request times out (also idle timeout
* in NAT/firewall etc.) before the command completes so increasing the limits might not be useful beyond a few
* minutes.
- *
+ *
*
*
* How does this method work?
@@ -2286,7 +2286,7 @@ CompletableFuture analyzeSubscriptionBacklogAs
* 5. With this method, the server can reduce the values of the two parameters subscriptionBacklogScanMaxTimeMs and
* subscriptionBacklogScanMaxEntries, so user can retrieve the desired number of backlog entries through
* client-side looping.
- *
+ *
* @param topic
* Topic name
* @param subscriptionName
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 2f95812f41b4f..12c3d3a9896a4 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1633,10 +1633,11 @@ public void accept(AnalyzeSubscriptionBacklogResult currentResult, Throwable thr
// To avoid infinite loops, we ensure the entry count is incremented after each loop.
if (currentResult.getEntries() <= 0) {
- log.warn("[{}][{}] Scanned total entry count is null, abort analyze backlog, start position is: {}",
- topic, subscriptionName, startPositionRef.get());
+ log.warn("[{}][{}] Scanned total entry count is zero or negative, abort analyze backlog, start "
+ + "position is: {}", topic, subscriptionName, startPositionRef.get());
future.completeExceptionally(
new PulsarAdminException("Incorrect total entry count returned from server"));
+ return;
}
// In analyze-backlog, lastMessageId is null only when: total entries is 0,
@@ -1646,6 +1647,7 @@ public void accept(AnalyzeSubscriptionBacklogResult currentResult, Throwable thr
topic, subscriptionName, startPositionRef.get());
future.completeExceptionally(
new PulsarAdminException("Incorrect last message id returned from server"));
+ return;
}
String[] messageIdSplits = mergedResult.getLastMessageId().split(":");
@@ -1684,7 +1686,7 @@ private AnalyzeSubscriptionBacklogResult mergeBacklogResults(AnalyzeSubscription
current.getFilterRescheduledMessages() + previous.getFilterRescheduledMessages());
mergedRes.setAborted(current.isAborted());
- mergedRes.setFirstMessageId(current.getFirstMessageId());
+ mergedRes.setFirstMessageId(previous.getFirstMessageId());
mergedRes.setLastMessageId(current.getLastMessageId());
return mergedRes;
From 6047f048a5010fe56f854d68499c089d4afa69a0 Mon Sep 17 00:00:00 2001
From: Oneby Wang <891734032@qq.com>
Date: Thu, 8 Jan 2026 22:34:59 +0800
Subject: [PATCH 11/15] [improve][admin] Fix checkstyle
---
.../broker/admin/AnalyzeBacklogSubscriptionTest.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
index 1304417d29a7e..4425436954aa6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java
@@ -338,11 +338,11 @@ public void analyzeBacklogWithIndividualAck() throws Exception {
consumer.acknowledge(message);
}
- verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries, 0, null,
- null);
+ verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries, 0, null, null);
}
- private List clientSideLoopAnalyzeBacklogSetup(String topic, String subName, int numMessages) throws Exception {
+ private List clientSideLoopAnalyzeBacklogSetup(String topic, String subName, int numMessages)
+ throws Exception {
admin.topics().createSubscription(topic, subName, MessageId.latest);
assertEquals(admin.topics().getSubscriptions(topic), List.of("sub-1"));
@@ -359,8 +359,8 @@ private List clientSideLoopAnalyzeBacklogSetup(String topic, String s
}
private void verifyClientSideLoopBacklog(String topic, String subName, int backlogMaxScanEntries,
- int expectedEntries, MessageId firstMessageId,
- MessageId lastMessageId) throws Exception {
+ int expectedEntries, MessageId firstMessageId, MessageId lastMessageId)
+ throws Exception {
AnalyzeSubscriptionBacklogResult backlogResult =
admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), backlogMaxScanEntries);
From 21cdb9047adbf46e6dc9268596cfd4a4a401a03c Mon Sep 17 00:00:00 2001
From: Oneby Wang <891734032@qq.com>
Date: Wed, 21 Jan 2026 10:02:11 +0800
Subject: [PATCH 12/15] [fix][admin] Add continuePredicate overload method
---
.../apache/pulsar/client/admin/Topics.java | 47 +++++++++++++++++++
.../client/admin/internal/TopicsImpl.java | 24 ++++++++--
2 files changed, 68 insertions(+), 3 deletions(-)
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 3ac148191dce6..47c22df8d2fc0 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -23,6 +23,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Predicate;
import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotAllowedException;
import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException;
@@ -2238,6 +2239,29 @@ AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, String
Optional startPosition,
long backlogScanMaxEntries) throws PulsarAdminException;
+ /**
+ * Analyze subscription backlog.
+ * This is a potentially expensive operation, as it requires
+ * to read the messages from storage.
+ * This function takes into consideration batch messages
+ * and also Subscription filters.
+ * See also: {@link #analyzeSubscriptionBacklogAsync(String, String, Optional, Predicate)}
+ *
+ * @param topic
+ * Topic name
+ * @param subscriptionName
+ * the subscription
+ * @param startPosition
+ * the position to start the scan from (empty means the last processed message)
+ * @param continuePredicate
+ * the predicate to determine whether to continue the loop
+ * @return an accurate analysis of the backlog
+ */
+ AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, String subscriptionName,
+ Optional startPosition,
+ Predicate continuePredicate)
+ throws PulsarAdminException;
+
/**
* Analyze subscription backlog.
* This is a potentially expensive operation, as it requires
@@ -2302,6 +2326,29 @@ CompletableFuture analyzeSubscriptionBacklogAs
Optional startPosition,
long backlogScanMaxEntries);
+ /**
+ * Analyze subscription backlog.
+ * This is a potentially expensive operation, as it requires
+ * to read the messages from storage.
+ * This function takes into consideration batch messages
+ * and also Subscription filters.
+ * See also: {@link #analyzeSubscriptionBacklogAsync(String, String, Optional, long)}
+ * User can control the loop termination condition by continuePredicate.
+ *
+ * @param topic
+ * Topic name
+ * @param subscriptionName
+ * the subscription
+ * @param startPosition
+ * the position to start the scan from (empty means the last processed message)
+ * @param continuePredicate
+ * the predicate to determine whether to continue the loop
+ * @return an accurate analysis of the backlog
+ */
+ CompletableFuture analyzeSubscriptionBacklogAsync(String topic,
+ String subscriptionName, Optional startPosition,
+ Predicate continuePredicate);
+
/**
* Get backlog size by a message ID.
* @param topic
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 12c3d3a9896a4..5e4f8502613bb 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -36,6 +36,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
+import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -1573,6 +1574,14 @@ public AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic,
() -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, startPosition, backlogScanMaxEntries));
}
+ @Override
+ public AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, String subscriptionName,
+ Optional startPosition,
+ Predicate continuePredicate)
+ throws PulsarAdminException {
+ return sync(() -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, startPosition, continuePredicate));
+ }
+
@Override
public CompletableFuture analyzeSubscriptionBacklogAsync(String topic,
String subscriptionName,
@@ -1609,6 +1618,14 @@ public CompletableFuture analyzeSubscriptionBa
String subscriptionName,
Optional startPosition,
long backlogScanMaxEntries) {
+ return analyzeSubscriptionBacklogAsync(topic, subscriptionName, startPosition,
+ (backlogResult) -> backlogResult.getEntries() >= backlogScanMaxEntries);
+ }
+
+ @Override
+ public CompletableFuture analyzeSubscriptionBacklogAsync(String topic,
+ String subscriptionName, Optional startPosition,
+ Predicate continuePredicate) {
final CompletableFuture future = new CompletableFuture<>();
AtomicReference resultRef = new AtomicReference<>();
int partitionIndex = TopicName.get(topic).getPartitionIndex();
@@ -1626,12 +1643,13 @@ public void accept(AnalyzeSubscriptionBacklogResult currentResult, Throwable thr
AnalyzeSubscriptionBacklogResult mergedResult = mergeBacklogResults(currentResult, resultRef.get());
resultRef.set(mergedResult);
- if (!mergedResult.isAborted() || mergedResult.getEntries() >= backlogScanMaxEntries) {
+ if (!mergedResult.isAborted() || continuePredicate.test(mergedResult)) {
future.complete(mergedResult);
return;
}
// To avoid infinite loops, we ensure the entry count is incremented after each loop.
+ // Should never happen.
if (currentResult.getEntries() <= 0) {
log.warn("[{}][{}] Scanned total entry count is zero or negative, abort analyze backlog, start "
+ "position is: {}", topic, subscriptionName, startPositionRef.get());
@@ -1640,8 +1658,8 @@ public void accept(AnalyzeSubscriptionBacklogResult currentResult, Throwable thr
return;
}
- // In analyze-backlog, lastMessageId is null only when: total entries is 0,
- // with false aborted flag returned.
+ // In analyze-backlog, lastMessageId is null only when: total entries is 0, with false aborted flag
+ // returned. Should never happen.
if (StringUtils.isBlank(mergedResult.getLastMessageId())) {
log.warn("[{}][{}] Scanned last message id is blank, abort analyze backlog, start position is: {}",
topic, subscriptionName, startPositionRef.get());
From a32ed4790e30dff372f4fdc20498d52bacd5db9e Mon Sep 17 00:00:00 2001
From: Oneby Wang <891734032@qq.com>
Date: Wed, 21 Jan 2026 14:27:08 +0800
Subject: [PATCH 13/15] [fix][admin] Change continuePredicate to
terminatePredicate
---
.../org/apache/pulsar/client/admin/Topics.java | 14 +++++++-------
.../pulsar/client/admin/internal/TopicsImpl.java | 8 ++++----
2 files changed, 11 insertions(+), 11 deletions(-)
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
index 47c22df8d2fc0..e68be8fd2e805 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
@@ -2253,13 +2253,13 @@ AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, String
* the subscription
* @param startPosition
* the position to start the scan from (empty means the last processed message)
- * @param continuePredicate
- * the predicate to determine whether to continue the loop
+ * @param terminatePredicate
+ * the predicate to determine whether to terminate the loop
* @return an accurate analysis of the backlog
*/
AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, String subscriptionName,
Optional startPosition,
- Predicate continuePredicate)
+ Predicate terminatePredicate)
throws PulsarAdminException;
/**
@@ -2333,7 +2333,7 @@ CompletableFuture analyzeSubscriptionBacklogAs
* This function takes into consideration batch messages
* and also Subscription filters.
* See also: {@link #analyzeSubscriptionBacklogAsync(String, String, Optional, long)}
- * User can control the loop termination condition by continuePredicate.
+ * User can control the loop termination condition by terminatePredicate.
*
* @param topic
* Topic name
@@ -2341,13 +2341,13 @@ CompletableFuture analyzeSubscriptionBacklogAs
* the subscription
* @param startPosition
* the position to start the scan from (empty means the last processed message)
- * @param continuePredicate
- * the predicate to determine whether to continue the loop
+ * @param terminatePredicate
+ * the predicate to determine whether to terminate the loop
* @return an accurate analysis of the backlog
*/
CompletableFuture analyzeSubscriptionBacklogAsync(String topic,
String subscriptionName, Optional startPosition,
- Predicate continuePredicate);
+ Predicate terminatePredicate);
/**
* Get backlog size by a message ID.
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 5e4f8502613bb..530a126d1b1fa 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1577,9 +1577,9 @@ public AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic,
@Override
public AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, String subscriptionName,
Optional startPosition,
- Predicate continuePredicate)
+ Predicate terminatePredicate)
throws PulsarAdminException {
- return sync(() -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, startPosition, continuePredicate));
+ return sync(() -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, startPosition, terminatePredicate));
}
@Override
@@ -1625,7 +1625,7 @@ public CompletableFuture analyzeSubscriptionBa
@Override
public CompletableFuture analyzeSubscriptionBacklogAsync(String topic,
String subscriptionName, Optional startPosition,
- Predicate continuePredicate) {
+ Predicate terminatePredicate) {
final CompletableFuture future = new CompletableFuture<>();
AtomicReference resultRef = new AtomicReference<>();
int partitionIndex = TopicName.get(topic).getPartitionIndex();
@@ -1643,7 +1643,7 @@ public void accept(AnalyzeSubscriptionBacklogResult currentResult, Throwable thr
AnalyzeSubscriptionBacklogResult mergedResult = mergeBacklogResults(currentResult, resultRef.get());
resultRef.set(mergedResult);
- if (!mergedResult.isAborted() || continuePredicate.test(mergedResult)) {
+ if (!mergedResult.isAborted() || terminatePredicate.test(mergedResult)) {
future.complete(mergedResult);
return;
}
From da82b1b321d043dc4515ff5f46e99701a7bf008d Mon Sep 17 00:00:00 2001
From: Oneby Wang <891734032@qq.com>
Date: Thu, 5 Feb 2026 12:41:05 +0800
Subject: [PATCH 14/15] deal with 0 entries and null lastMessageId
---
.../client/admin/internal/TopicsImpl.java | 29 +++++++------------
1 file changed, 11 insertions(+), 18 deletions(-)
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
index 530a126d1b1fa..78b7329159c1a 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
@@ -1648,23 +1648,13 @@ public void accept(AnalyzeSubscriptionBacklogResult currentResult, Throwable thr
return;
}
- // To avoid infinite loops, we ensure the entry count is incremented after each loop.
- // Should never happen.
- if (currentResult.getEntries() <= 0) {
- log.warn("[{}][{}] Scanned total entry count is zero or negative, abort analyze backlog, start "
- + "position is: {}", topic, subscriptionName, startPositionRef.get());
- future.completeExceptionally(
- new PulsarAdminException("Incorrect total entry count returned from server"));
- return;
- }
-
- // In analyze-backlog, lastMessageId is null only when: total entries is 0, with false aborted flag
- // returned. Should never happen.
- if (StringUtils.isBlank(mergedResult.getLastMessageId())) {
- log.warn("[{}][{}] Scanned last message id is blank, abort analyze backlog, start position is: {}",
- topic, subscriptionName, startPositionRef.get());
- future.completeExceptionally(
- new PulsarAdminException("Incorrect last message id returned from server"));
+ // In analyze-backlog, we treat 0 entries or null lastMessageId as scan completed for mere safety.
+ // 0 entries or a null lastMessageId indicates no entries were scanned.
+ if (currentResult.getEntries() <= 0 || StringUtils.isBlank(currentResult.getLastMessageId())) {
+ log.info("[{}][{}] complete scan due total entry <= 0 or last message id is blank, "
+ + "start position is: {}, current result: {}", topic, subscriptionName,
+ startPositionRef.get(), currentResult);
+ future.complete(mergedResult);
return;
}
@@ -1705,7 +1695,10 @@ private AnalyzeSubscriptionBacklogResult mergeBacklogResults(AnalyzeSubscription
mergedRes.setAborted(current.isAborted());
mergedRes.setFirstMessageId(previous.getFirstMessageId());
- mergedRes.setLastMessageId(current.getLastMessageId());
+ String lastMessageId = current.getLastMessageId();
+ if (StringUtils.isNotBlank(lastMessageId)) {
+ mergedRes.setLastMessageId(lastMessageId);
+ }
return mergedRes;
}
From 674ba6fe4e2c6b6184b0434e280fee556d86223d Mon Sep 17 00:00:00 2001
From: Oneby Wang <891734032@qq.com>
Date: Thu, 5 Feb 2026 16:06:35 +0800
Subject: [PATCH 15/15] modify log level
---
.../java/org/apache/bookkeeper/mledger/impl/OpScan.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
index 202ba099c5de2..413bb5c018e84 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java
@@ -71,12 +71,12 @@ public void readEntriesComplete(List entries, Object ctx) {
lastSeenPosition = lastPositionForBatch;
for (Entry entry : entries) {
if (remainingEntries.getAndDecrement() <= 0) {
- log.warn("[{}] Scan abort after reading too many entries", OpScan.this.cursor);
+ log.info("[{}] Scan abort after reading too many entries", OpScan.this.cursor);
callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx);
return;
}
if (!condition.test(entry)) {
- log.warn("[{}] Scan abort due to user code", OpScan.this.cursor);
+ log.info("[{}] Scan abort due to user code", OpScan.this.cursor);
callback.scanComplete(lastSeenPosition, ScanOutcome.USER_INTERRUPTED, OpScan.this.ctx);
return;
}
@@ -111,12 +111,12 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
public void find() {
if (remainingEntries.get() <= 0) {
- log.warn("[{}] Scan abort after reading too many entries", OpScan.this.cursor);
+ log.info("[{}] Scan abort after reading too many entries", OpScan.this.cursor);
callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx);
return;
}
if (System.currentTimeMillis() - startTime > timeOutMs) {
- log.warn("[{}] Scan abort after hitting the deadline", OpScan.this.cursor);
+ log.info("[{}] Scan abort after hitting the deadline", OpScan.this.cursor);
callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx);
return;
}