From fa3205516bcab377b127973e7ddd4824f897306c Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Sun, 4 Jan 2026 14:35:16 +0800 Subject: [PATCH 01/15] [improve][cli] Add client side looping to analyze-backlog in Topics --- .../apache/pulsar/client/admin/Topics.java | 69 +++++++++++++++- .../client/admin/internal/TopicsImpl.java | 81 +++++++++++++++++++ 2 files changed, 149 insertions(+), 1 deletion(-) diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index fbcf0b4a07b1f..2ab685100ad5a 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -2220,21 +2220,88 @@ AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, String * This is a potentially expensive operation, as it requires * to read the messages from storage. * This function takes into consideration batch messages - * and also Subscription filters. + * and also Subscription filters.
+ * See also: {@link #analyzeSubscriptionBacklogAsync(String, String, Optional, long)} * @param topic * Topic name * @param subscriptionName * the subscription * @param startPosition * the position to start the scan from (empty means the last processed message) + * @param backlogScanMaxEntries + * the maximum number of backlog entries the client will scan before terminating its loop * @return an accurate analysis of the backlog * @throws PulsarAdminException * Unexpected error */ + AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, String subscriptionName, + Optional startPosition, + long backlogScanMaxEntries) throws PulsarAdminException; + + /** + * Analyze subscription backlog. + * This is a potentially expensive operation, as it requires + * to read the messages from storage. + * This function takes into consideration batch messages + * and also Subscription filters. + * @param topic + * Topic name + * @param subscriptionName + * the subscription + * @param startPosition + * the position to start the scan from (empty means the last processed message) + * @return an accurate analysis of the backlog + */ CompletableFuture analyzeSubscriptionBacklogAsync(String topic, String subscriptionName, Optional startPosition); + /** + * Analyze subscription backlog. + * This is a potentially expensive operation, as it requires + * to read the messages from storage. + * This function takes into consideration batch messages + * and also Subscription filters. + * + *

+ * What's the purpose of this overloaded method?
+ * There are broker side configurable maximum limits how many entries will be read and how long the scanning can + * take. The subscriptionBacklogScanMaxTimeMs (default 2 minutes) and subscriptionBacklogScanMaxEntries + * (default 10000) control this behavior.
+ * Increasing these settings is possible. However, it's possible that the HTTP request times out (also idle timeout + * in NAT/firewall etc.) before the command completes so increasing the limits might not be useful beyond a few + * minutes. + *

+ * + *

+ * How does this method work?
+ * 1. Add a new parameter backlogScanMaxEntries in client side method to control the client-side loop termination + * condition.
+ * 2. If subscriptionBacklogScanMaxEntries(server side) >= backlogScanMaxEntries(client side), then + * backlogScanMaxEntries parameter will take no effect.
+ * 3. If subscriptionBacklogScanMaxEntries < backlogScanMaxEntries, the client will call analyze-backlog method in + * a loop until server return ScanOutcome.COMPLETED or the total entries exceeds backlogScanMaxEntries.
+ * 4. This means that backlogScanMaxEntries cannot be used to precisely control the number of entries scanned by + * the server, it only serves to determine when the loop should terminate.
+ * 5. With this method, the server can reduce the values of the two parameters subscriptionBacklogScanMaxTimeMs and + * subscriptionBacklogScanMaxEntries, and then retrieve the desired number of backlog entries through + * client-side looping. + *

+ * @param topic + * Topic name + * @param subscriptionName + * the subscription + * @param startPosition + * the position to start the scan from (empty means the last processed message) + * @param backlogScanMaxEntries + * the maximum number of backlog entries the client will scan before terminating its loop + * @return an accurate analysis of the backlog + */ + CompletableFuture analyzeSubscriptionBacklogAsync(String topic, + String subscriptionName, + Optional startPosition, + long backlogScanMaxEntries); + /** * Get backlog size by a message ID. * @param topic diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 11dd69a23ce58..7b351396de60f 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -34,6 +34,9 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.BiConsumer; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; import javax.ws.rs.client.Entity; @@ -1560,6 +1563,15 @@ public AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, return sync(() -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, startPosition)); } + @Override + public AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, String subscriptionName, + Optional startPosition, + long backlogScanMaxEntries) + throws PulsarAdminException { + return sync( + () -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, startPosition, backlogScanMaxEntries)); + } + @Override public CompletableFuture analyzeSubscriptionBacklogAsync(String topic, String subscriptionName, @@ -1591,6 +1603,75 @@ public void failed(Throwable throwable) { return future; } + @Override + public CompletableFuture analyzeSubscriptionBacklogAsync(String topic, + String subscriptionName, + Optional startPosition, + long backlogScanMaxEntries) { + final CompletableFuture future = new CompletableFuture<>(); + AtomicReference resultRef = new AtomicReference<>(); + int partitionIndex = TopicName.get(topic).getPartitionIndex(); + AtomicReference> startPositionRef = new AtomicReference<>(startPosition); + + Supplier> resultSupplier = + () -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, startPositionRef.get()); + BiConsumer completeAction = new BiConsumer<>() { + @Override + public void accept(AnalyzeSubscriptionBacklogResult currentResult, Throwable throwable) { + if (throwable != null) { + future.completeExceptionally(throwable); + return; + } + + AnalyzeSubscriptionBacklogResult mergedResult = mergeBacklogResults(currentResult, resultRef.get()); + resultRef.set(mergedResult); + if (mergedResult.isAborted() || mergedResult.getEntries() >= backlogScanMaxEntries) { + future.complete(mergedResult); + return; + } + + String[] messageIdSplits = mergedResult.getLastMessageId().split(":"); + MessageIdImpl nextScanMessageId = + new MessageIdImpl(Long.parseLong(messageIdSplits[0]), Long.parseLong(messageIdSplits[1]) + 1, + partitionIndex); + startPositionRef.set(Optional.of(nextScanMessageId)); + + resultSupplier.get().whenComplete(this); + } + }; + + resultSupplier.get().whenComplete(completeAction); + return future; + } + + private AnalyzeSubscriptionBacklogResult mergeBacklogResults(AnalyzeSubscriptionBacklogResult current, + AnalyzeSubscriptionBacklogResult previous) { + if (previous == null) { + return current; + } + + AnalyzeSubscriptionBacklogResult mergedRes = new AnalyzeSubscriptionBacklogResult(); + mergedRes.setEntries(current.getEntries() + previous.getEntries()); + mergedRes.setMessages(current.getMessages() + previous.getMessages()); + mergedRes.setMarkerMessages(current.getMarkerMessages() + previous.getMarkerMessages()); + + mergedRes.setFilterAcceptedEntries(current.getFilterAcceptedEntries() + previous.getFilterAcceptedEntries()); + mergedRes.setFilterRejectedEntries(current.getFilterRejectedEntries() + previous.getFilterRejectedEntries()); + mergedRes.setFilterRescheduledEntries( + current.getFilterRescheduledEntries() + previous.getFilterRescheduledEntries()); + + mergedRes.setFilterAcceptedMessages(current.getFilterAcceptedMessages() + previous.getFilterAcceptedMessages()); + mergedRes.setFilterRejectedMessages(current.getFilterRejectedMessages() + previous.getFilterRejectedMessages()); + mergedRes.setFilterRescheduledMessages( + current.getFilterRescheduledMessages() + previous.getFilterRescheduledMessages()); + + mergedRes.setAborted(current.isAborted()); + mergedRes.setFirstMessageId(current.getFirstMessageId()); + mergedRes.setLastMessageId(current.getLastMessageId()); + + return mergedRes; + } + @Override public Long getBacklogSizeByMessageId(String topic, MessageId messageId) throws PulsarAdminException { From dfd12840c4b301acdbe5772a217ad16641db96a3 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Sun, 4 Jan 2026 21:23:36 +0800 Subject: [PATCH 02/15] [improve][cli] Modify logic --- .../org/apache/pulsar/client/admin/internal/TopicsImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 7b351396de60f..e7f1d21d41b8b 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -1625,7 +1625,7 @@ public void accept(AnalyzeSubscriptionBacklogResult currentResult, Throwable thr AnalyzeSubscriptionBacklogResult mergedResult = mergeBacklogResults(currentResult, resultRef.get()); resultRef.set(mergedResult); - if (mergedResult.isAborted() || mergedResult.getEntries() >= backlogScanMaxEntries) { + if (!mergedResult.isAborted() || mergedResult.getEntries() >= backlogScanMaxEntries) { future.complete(mergedResult); return; } From ba907c7ab5e323cf1054c7c4096c661d0b09afcc Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Mon, 5 Jan 2026 12:38:49 +0800 Subject: [PATCH 03/15] [improve][admin] Add test --- .../admin/AnalyzeBacklogSubscriptionTest.java | 99 +++++++++++++++++++ 1 file changed, 99 insertions(+) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java index acea913204999..df608f2d2c560 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import lombok.Cleanup; +import org.apache.commons.lang3.RandomUtils; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; @@ -189,4 +190,102 @@ public void partitionedTopicNotAllowed() throws Exception { assertEquals(0, analyzeSubscriptionBacklogResult.getEntries()); } + @Test + public void analyzeBacklogUsingClientSideLooping() throws Exception { + int serverSubscriptionBacklogScanMaxEntries = 10; + conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries); + + String topic = "persistent://my-property/my-ns/analyze-backlog-using-client-side-looping-topic"; + String subName = "sub-1"; + admin.topics().createSubscription(topic, subName, MessageId.latest); + + assertEquals(admin.topics().getSubscriptions(topic), List.of("sub-1")); + verifyBacklog(topic, subName, 0, 0); + + @Cleanup Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create(); + + // Test server returns false aborted flag. + int firstSendMessages = 5; + List> firstSendFutures = new ArrayList<>(); + for (int i = 0; i < firstSendMessages; i++) { + CompletableFuture future = producer.sendAsync(("first-" + i).getBytes()); + firstSendFutures.add(future); + } + FutureUtil.waitForAll(firstSendFutures).get(); + + AnalyzeSubscriptionBacklogResult backlogResult = + admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), 15); + + assertEquals(backlogResult.getEntries(), firstSendMessages); + assertEquals(backlogResult.getMessages(), firstSendMessages); + + // Test backlogScanMaxEntries(client side) <= subscriptionBacklogScanMaxEntries(server side), but server returns + // true aborted flag. + int secondSendMessages = 10; + List> secondSendFutures = new ArrayList<>(); + for (int i = 0; i < secondSendMessages; i++) { + CompletableFuture future = producer.sendAsync(("second-" + i).getBytes()); + secondSendFutures.add(future); + } + FutureUtil.waitForAll(secondSendFutures).get(); + + backlogResult = admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), + serverSubscriptionBacklogScanMaxEntries - 1); + + assertEquals(backlogResult.getEntries(), serverSubscriptionBacklogScanMaxEntries); + assertEquals(backlogResult.getMessages(), serverSubscriptionBacklogScanMaxEntries); + + // Test client side loop: backlogScanMaxEntries > subscriptionBacklogScanMaxEntries, the loop termination + // condition is that server returns false aborted flag. + int thirdSendMessages = 10; + List> thirdFutures = new ArrayList<>(); + for (int i = 0; i < thirdSendMessages; i++) { + CompletableFuture future = producer.sendAsync(("third-" + i).getBytes()); + thirdFutures.add(future); + } + FutureUtil.waitForAll(thirdFutures).get(); + + int thirdTotalMessages = firstSendMessages + secondSendMessages + thirdSendMessages; + backlogResult = + admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), thirdTotalMessages + 1); + + assertEquals(backlogResult.getEntries(), thirdTotalMessages); + assertEquals(backlogResult.getMessages(), thirdTotalMessages); + + // Test client side loop: backlogScanMaxEntries > subscriptionBacklogScanMaxEntries, the loop termination + // condition is that total entries exceeds backlogScanMaxEntries. + int fourthSendMessages = 10; + List> fourthFutures = new ArrayList<>(); + for (int i = 0; i < fourthSendMessages; i++) { + CompletableFuture future = producer.sendAsync(("fourth-" + i).getBytes()); + fourthFutures.add(future); + } + FutureUtil.waitForAll(fourthFutures).get(); + + backlogResult = admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), thirdTotalMessages); + + int fourthExpectedMessages = (thirdTotalMessages / serverSubscriptionBacklogScanMaxEntries) + + serverSubscriptionBacklogScanMaxEntries; + assertEquals(backlogResult.getEntries(), fourthExpectedMessages); + assertEquals(backlogResult.getMessages(), fourthExpectedMessages); + + // Test client side loop with topic unload. + admin.topics().unload(topic); + int fifthSendMessages = 10; + List> fifthFutures = new ArrayList<>(); + for (int i = 0; i < fifthSendMessages; i++) { + CompletableFuture future = producer.sendAsync(("fifth-" + i).getBytes()); + fifthFutures.add(future); + if (RandomUtils.secure().randomBoolean()) { + admin.topics().unload(topic); + } + } + FutureUtil.waitForAll(fifthFutures).get(); + + int fifthTotalMessages = thirdTotalMessages + fourthSendMessages + fifthSendMessages; + backlogResult = admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), fifthTotalMessages); + + assertEquals(backlogResult.getEntries(), fifthTotalMessages); + assertEquals(backlogResult.getMessages(), fifthTotalMessages); + } } From 6ec6b93a8deac1453e32b57cbac63e9171fae0d4 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Mon, 5 Jan 2026 16:25:55 +0800 Subject: [PATCH 04/15] [improve][admin] Optimize test --- .../bookkeeper/mledger/impl/OpScan.java | 3 +- .../admin/AnalyzeBacklogSubscriptionTest.java | 225 +++++++++++++----- 2 files changed, 161 insertions(+), 67 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java index 732071ee01a6a..6f9b87b245ec3 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java @@ -91,7 +91,8 @@ public void readEntriesComplete(List entries, Object ctx) { searchPosition = ledger.getPositionAfterN(lastPositionForBatch, 1, PositionBound.startExcluded); if (log.isDebugEnabled()) { - log.debug("readEntryComplete {} at {} next is {}", lastPositionForBatch, searchPosition); + log.debug("[{}] readEntryComplete at {} next is {}", OpScan.this.cursor, lastPositionForBatch, + searchPosition); } if (searchPosition.compareTo(lastPositionForBatch) == 0) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java index df608f2d2c560..1231d54332e07 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.admin; +import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertThrows; @@ -51,6 +52,12 @@ public void setup() throws Exception { producerBaseSetup(); } + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + conf.setDispatcherMaxReadBatchSize(10); + } + @AfterMethod(alwaysRun = true) @Override public void cleanup() throws Exception { @@ -191,101 +198,187 @@ public void partitionedTopicNotAllowed() throws Exception { } @Test - public void analyzeBacklogUsingClientSideLooping() throws Exception { - int serverSubscriptionBacklogScanMaxEntries = 10; + public void analyzeBacklogServerReturnFalseAbortedFlagWithoutLoop() throws Exception { + long serverSubscriptionBacklogScanMaxEntries = 20; conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries); - String topic = "persistent://my-property/my-ns/analyze-backlog-using-client-side-looping-topic"; + String topic = "persistent://my-property/my-ns/analyze-backlog-server-return-false-aborted-flag-without-loop"; String subName = "sub-1"; - admin.topics().createSubscription(topic, subName, MessageId.latest); - - assertEquals(admin.topics().getSubscriptions(topic), List.of("sub-1")); - verifyBacklog(topic, subName, 0, 0); - - @Cleanup Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create(); + int numMessages = 10; // Test server returns false aborted flag. - int firstSendMessages = 5; - List> firstSendFutures = new ArrayList<>(); - for (int i = 0; i < firstSendMessages; i++) { - CompletableFuture future = producer.sendAsync(("first-" + i).getBytes()); - firstSendFutures.add(future); - } - FutureUtil.waitForAll(firstSendFutures).get(); + clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages); AnalyzeSubscriptionBacklogResult backlogResult = - admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), 15); - - assertEquals(backlogResult.getEntries(), firstSendMessages); - assertEquals(backlogResult.getMessages(), firstSendMessages); - - // Test backlogScanMaxEntries(client side) <= subscriptionBacklogScanMaxEntries(server side), but server returns - // true aborted flag. - int secondSendMessages = 10; - List> secondSendFutures = new ArrayList<>(); - for (int i = 0; i < secondSendMessages; i++) { - CompletableFuture future = producer.sendAsync(("second-" + i).getBytes()); - secondSendFutures.add(future); - } - FutureUtil.waitForAll(secondSendFutures).get(); + admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), numMessages - 1); - backlogResult = admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), - serverSubscriptionBacklogScanMaxEntries - 1); + assertEquals(backlogResult.getEntries(), numMessages); + assertEquals(backlogResult.getMessages(), numMessages); + } + + @Test + public void analyzeBacklogMaxEntriesExceedWithoutLoop() throws Exception { + long serverSubscriptionBacklogScanMaxEntries = 20; + conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries); + + String topic = "persistent://my-property/my-ns/analyze-backlog-max-entries-exceed-without-loop"; + String subName = "sub-1"; + int numMessages = 25; + + // Test backlogScanMaxEntries(client side) <= subscriptionBacklogScanMaxEntries(server side), but server + // returns true aborted flag. Server dispatcherMaxReadBatchSize is set to 10. + clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages); + + AnalyzeSubscriptionBacklogResult backlogResult = admin.topics() + .analyzeSubscriptionBacklog(topic, subName, Optional.empty(), + serverSubscriptionBacklogScanMaxEntries - 1); assertEquals(backlogResult.getEntries(), serverSubscriptionBacklogScanMaxEntries); assertEquals(backlogResult.getMessages(), serverSubscriptionBacklogScanMaxEntries); + } + + @Test + public void analyzeBacklogServerReturnFalseAbortedFlagWithLoop() throws Exception { + long serverSubscriptionBacklogScanMaxEntries = 20; + conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries); + + String topic = "persistent://my-property/my-ns/analyze-backlog-server-return-false-aborted-flag-with-loop"; + String subName = "sub-1"; + int numMessages = 45; // Test client side loop: backlogScanMaxEntries > subscriptionBacklogScanMaxEntries, the loop termination // condition is that server returns false aborted flag. - int thirdSendMessages = 10; - List> thirdFutures = new ArrayList<>(); - for (int i = 0; i < thirdSendMessages; i++) { - CompletableFuture future = producer.sendAsync(("third-" + i).getBytes()); - thirdFutures.add(future); - } - FutureUtil.waitForAll(thirdFutures).get(); + clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages); - int thirdTotalMessages = firstSendMessages + secondSendMessages + thirdSendMessages; - backlogResult = - admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), thirdTotalMessages + 1); + AnalyzeSubscriptionBacklogResult backlogResult = + admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), numMessages); + + assertEquals(backlogResult.getEntries(), numMessages); + assertEquals(backlogResult.getMessages(), numMessages); + } + + @Test + public void analyzeBacklogMaxEntriesExceedWithLoop() throws Exception { + long serverSubscriptionBacklogScanMaxEntries = 15; + conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries); - assertEquals(backlogResult.getEntries(), thirdTotalMessages); - assertEquals(backlogResult.getMessages(), thirdTotalMessages); + String topic = "persistent://my-property/my-ns/analyze-backlog-max-entries-exceed-with-loop"; + String subName = "sub-1"; + int numMessages = 55; + int backlogScanMaxEntries = 40; // Test client side loop: backlogScanMaxEntries > subscriptionBacklogScanMaxEntries, the loop termination // condition is that total entries exceeds backlogScanMaxEntries. - int fourthSendMessages = 10; - List> fourthFutures = new ArrayList<>(); - for (int i = 0; i < fourthSendMessages; i++) { - CompletableFuture future = producer.sendAsync(("fourth-" + i).getBytes()); - fourthFutures.add(future); - } - FutureUtil.waitForAll(fourthFutures).get(); + // Server dispatcherMaxReadBatchSize is set to 10. + clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages); - backlogResult = admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), thirdTotalMessages); + AnalyzeSubscriptionBacklogResult backlogResult = + admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), backlogScanMaxEntries); - int fourthExpectedMessages = (thirdTotalMessages / serverSubscriptionBacklogScanMaxEntries) + // Broker returns 15 + 15 + 15 = 45 entries. + long expectedEntries = backlogScanMaxEntries / serverSubscriptionBacklogScanMaxEntries + serverSubscriptionBacklogScanMaxEntries; - assertEquals(backlogResult.getEntries(), fourthExpectedMessages); - assertEquals(backlogResult.getMessages(), fourthExpectedMessages); + assertEquals(backlogResult.getEntries(), expectedEntries); + assertEquals(backlogResult.getMessages(), expectedEntries); + } + + @Test + public void analyzeBacklogWithTopicUnload() throws Exception { + long serverSubscriptionBacklogScanMaxEntries = 10; + conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries); + + String topic = "persistent://my-property/my-ns/analyze-backlog-with-topic-unload"; + String subName = "sub-1"; + int numMessages = 35; + + admin.topics().createSubscription(topic, subName, MessageId.latest); + + assertEquals(admin.topics().getSubscriptions(topic), List.of("sub-1")); + verifyBacklog(topic, subName, 0, 0); // Test client side loop with topic unload. - admin.topics().unload(topic); - int fifthSendMessages = 10; - List> fifthFutures = new ArrayList<>(); - for (int i = 0; i < fifthSendMessages; i++) { - CompletableFuture future = producer.sendAsync(("fifth-" + i).getBytes()); - fifthFutures.add(future); + @Cleanup Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create(); + List> futures = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) { + CompletableFuture future = producer.sendAsync(("test-" + i).getBytes()); + futures.add(future); if (RandomUtils.secure().randomBoolean()) { admin.topics().unload(topic); } } - FutureUtil.waitForAll(fifthFutures).get(); + FutureUtil.waitForAll(futures).get(); + + AnalyzeSubscriptionBacklogResult backlogResult = + admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), numMessages); + + assertEquals(backlogResult.getEntries(), numMessages); + assertEquals(backlogResult.getMessages(), numMessages); + } + + @Test + public void analyzeBacklogWithIndividualAck() throws Exception { + long serverSubscriptionBacklogScanMaxEntries = 20; + conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries); + + String topic = "persistent://my-property/my-ns/analyze-backlog-with-topic-Unloaded"; + String subName = "sub-1"; + int messages = 55; + + // Test client side loop with individual ack. + clientSideLoopAnalyzeBacklogSetup(topic, subName, messages); + + // we want to wait for the server to process acks, in order to not have a flaky test. + @Cleanup Consumer consumer = + pulsarClient.newConsumer().topic(topic).isAckReceiptEnabled(true).subscriptionName(subName) + .subscriptionType(SubscriptionType.Shared).subscribe(); + + // Individual ack message2. + Message message1 = consumer.receive(); + Message message2 = consumer.receive(); + consumer.acknowledge(message2); + + long backlogScanMaxEntries = 20; + AnalyzeSubscriptionBacklogResult backlogResult = + admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), backlogScanMaxEntries); + + // Broker will filter deleted entries, total entries is supposed to be 19 + 20 = 39. + assertThat(backlogResult.getEntries()).isBetween(backlogScanMaxEntries, + backlogScanMaxEntries + serverSubscriptionBacklogScanMaxEntries - 1); + + // Ack message1 and message2. + consumer.acknowledge(message1); + + backlogResult = + admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), backlogScanMaxEntries); + + assertEquals(backlogResult.getEntries(), backlogScanMaxEntries); + assertEquals(backlogResult.getMessages(), backlogScanMaxEntries); + + // Ack all messages. + for (int i = 2; i < messages; i++) { + Message message = consumer.receive(); + consumer.acknowledge(message); + } + + backlogResult = + admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), backlogScanMaxEntries); + assertEquals(backlogResult.getEntries(), 0); + assertEquals(backlogResult.getMessages(), 0); + } - int fifthTotalMessages = thirdTotalMessages + fourthSendMessages + fifthSendMessages; - backlogResult = admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), fifthTotalMessages); + private void clientSideLoopAnalyzeBacklogSetup(String topic, String subName, int numMessages) throws Exception { + admin.topics().createSubscription(topic, subName, MessageId.latest); + + assertEquals(admin.topics().getSubscriptions(topic), List.of("sub-1")); + verifyBacklog(topic, subName, 0, 0); - assertEquals(backlogResult.getEntries(), fifthTotalMessages); - assertEquals(backlogResult.getMessages(), fifthTotalMessages); + @Cleanup Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create(); + List> futures = new ArrayList<>(); + for (int i = 0; i < numMessages; i++) { + CompletableFuture future = producer.sendAsync(("test-" + i).getBytes()); + futures.add(future); + } + FutureUtil.waitForAll(futures).get(); } + } From 6d7d2785d2156ab5ffee3a91a4dd020e7bd4f2c5 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Mon, 5 Jan 2026 16:45:53 +0800 Subject: [PATCH 05/15] [improve][admin] Optimize test --- .../pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java index 1231d54332e07..5ccca4d429134 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java @@ -320,7 +320,7 @@ public void analyzeBacklogWithIndividualAck() throws Exception { long serverSubscriptionBacklogScanMaxEntries = 20; conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries); - String topic = "persistent://my-property/my-ns/analyze-backlog-with-topic-Unloaded"; + String topic = "persistent://my-property/my-ns/analyze-backlog-with-individual-ack"; String subName = "sub-1"; int messages = 55; From 5929036d5c01b6de90bdd981994399974784c65d Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Mon, 5 Jan 2026 22:05:46 +0800 Subject: [PATCH 06/15] [improve][admin] Optimize code, fix test --- .../bookkeeper/mledger/impl/OpScan.java | 3 ++- .../admin/AnalyzeBacklogSubscriptionTest.java | 23 +++++++++---------- .../client/admin/internal/TopicsImpl.java | 10 ++++++++ 3 files changed, 23 insertions(+), 13 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java index 6f9b87b245ec3..90d5dd0dffe7f 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java @@ -69,6 +69,7 @@ public void readEntriesComplete(List entries, Object ctx) { try { Position lastPositionForBatch = entries.get(entries.size() - 1).getPosition(); lastSeenPosition = lastPositionForBatch; + // TODO This filter operation is not needed, already handled by OpReadEntry. // filter out the entry if it has been already deleted // filterReadEntries will call entry.release if the entry is filtered out List entriesFiltered = this.cursor.filterReadEntries(entries); @@ -76,7 +77,7 @@ public void readEntriesComplete(List entries, Object ctx) { remainingEntries.addAndGet(-skippedEntries); if (!entriesFiltered.isEmpty()) { for (Entry entry : entriesFiltered) { - if (remainingEntries.decrementAndGet() <= 0) { + if (remainingEntries.getAndDecrement() <= 0) { log.warn("[{}] Scan abort after reading too many entries", OpScan.this.cursor); callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx); return; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java index 5ccca4d429134..b43cb0c69a25f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java @@ -276,8 +276,8 @@ public void analyzeBacklogMaxEntriesExceedWithLoop() throws Exception { admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), backlogScanMaxEntries); // Broker returns 15 + 15 + 15 = 45 entries. - long expectedEntries = backlogScanMaxEntries / serverSubscriptionBacklogScanMaxEntries - + serverSubscriptionBacklogScanMaxEntries; + long expectedEntries = (backlogScanMaxEntries / serverSubscriptionBacklogScanMaxEntries + 1) + * serverSubscriptionBacklogScanMaxEntries; assertEquals(backlogResult.getEntries(), expectedEntries); assertEquals(backlogResult.getMessages(), expectedEntries); } @@ -296,17 +296,14 @@ public void analyzeBacklogWithTopicUnload() throws Exception { assertEquals(admin.topics().getSubscriptions(topic), List.of("sub-1")); verifyBacklog(topic, subName, 0, 0); - // Test client side loop with topic unload. + // Test client side loop with topic unload. Use sync send method here to avoid potential message duplication. @Cleanup Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create(); - List> futures = new ArrayList<>(); for (int i = 0; i < numMessages; i++) { - CompletableFuture future = producer.sendAsync(("test-" + i).getBytes()); - futures.add(future); + producer.send(("test-" + i).getBytes()); if (RandomUtils.secure().randomBoolean()) { admin.topics().unload(topic); } } - FutureUtil.waitForAll(futures).get(); AnalyzeSubscriptionBacklogResult backlogResult = admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), numMessages); @@ -341,11 +338,10 @@ public void analyzeBacklogWithIndividualAck() throws Exception { AnalyzeSubscriptionBacklogResult backlogResult = admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), backlogScanMaxEntries); - // Broker will filter deleted entries, total entries is supposed to be 19 + 20 = 39. - assertThat(backlogResult.getEntries()).isBetween(backlogScanMaxEntries, - backlogScanMaxEntries + serverSubscriptionBacklogScanMaxEntries - 1); + assertThat(backlogResult.getEntries()).isEqualTo(backlogScanMaxEntries); + assertThat(backlogResult.getMessages()).isEqualTo(backlogScanMaxEntries); - // Ack message1 and message2. + // Ack message1. consumer.acknowledge(message1); backlogResult = @@ -370,7 +366,10 @@ private void clientSideLoopAnalyzeBacklogSetup(String topic, String subName, int admin.topics().createSubscription(topic, subName, MessageId.latest); assertEquals(admin.topics().getSubscriptions(topic), List.of("sub-1")); - verifyBacklog(topic, subName, 0, 0); + AnalyzeSubscriptionBacklogResult backlogResult = + admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), 1); + assertEquals(backlogResult.getEntries(), 0); + assertEquals(backlogResult.getMessages(), 0); @Cleanup Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create(); List> futures = new ArrayList<>(); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index e7f1d21d41b8b..dfb7a6cfb07b8 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -47,6 +47,7 @@ import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; import javax.ws.rs.core.Response.Status; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.GetStatsOptions; import org.apache.pulsar.client.admin.ListTopicsOptions; import org.apache.pulsar.client.admin.LongRunningProcessStatus; @@ -1630,6 +1631,15 @@ public void accept(AnalyzeSubscriptionBacklogResult currentResult, Throwable thr return; } + // In analyze-backlog, lastMessageId is null only when: total entries is 0, + // with false aborted flag returned. + if (StringUtils.isBlank(mergedResult.getLastMessageId())) { + log.warn("[{}][{}] Scanned last message id is blank, abort analyze backlog, start position is: {}", + topic, subscriptionName, startPositionRef.get()); + future.completeExceptionally( + new PulsarAdminException("Incorrect last message id returned from server")); + } + String[] messageIdSplits = mergedResult.getLastMessageId().split(":"); MessageIdImpl nextScanMessageId = new MessageIdImpl(Long.parseLong(messageIdSplits[0]), Long.parseLong(messageIdSplits[1]) + 1, From 6309d4b23d9deb4b65e6bb5bae9e4803a7592253 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Tue, 6 Jan 2026 09:50:56 +0800 Subject: [PATCH 07/15] [improve][admin] Add guard safe --- .../apache/pulsar/client/admin/internal/TopicsImpl.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index dfb7a6cfb07b8..2f95812f41b4f 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -1631,6 +1631,14 @@ public void accept(AnalyzeSubscriptionBacklogResult currentResult, Throwable thr return; } + // To avoid infinite loops, we ensure the entry count is incremented after each loop. + if (currentResult.getEntries() <= 0) { + log.warn("[{}][{}] Scanned total entry count is null, abort analyze backlog, start position is: {}", + topic, subscriptionName, startPositionRef.get()); + future.completeExceptionally( + new PulsarAdminException("Incorrect total entry count returned from server")); + } + // In analyze-backlog, lastMessageId is null only when: total entries is 0, // with false aborted flag returned. if (StringUtils.isBlank(mergedResult.getLastMessageId())) { From e1962091f3fb29810513b7f4ebb73c8c7f9216ab Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Tue, 6 Jan 2026 10:28:30 +0800 Subject: [PATCH 08/15] [improve][admin] Optimize comments --- .../pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java | 2 +- .../src/main/java/org/apache/pulsar/client/admin/Topics.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java index b43cb0c69a25f..8016cf1ed6c36 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java @@ -324,7 +324,7 @@ public void analyzeBacklogWithIndividualAck() throws Exception { // Test client side loop with individual ack. clientSideLoopAnalyzeBacklogSetup(topic, subName, messages); - // we want to wait for the server to process acks, in order to not have a flaky test. + // We want to wait for the server to process acks, in order to not have a flaky test. @Cleanup Consumer consumer = pulsarClient.newConsumer().topic(topic).isAckReceiptEnabled(true).subscriptionName(subName) .subscriptionType(SubscriptionType.Shared).subscribe(); diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 2ab685100ad5a..63a817dca309b 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -2284,7 +2284,7 @@ CompletableFuture analyzeSubscriptionBacklogAs * 4. This means that backlogScanMaxEntries cannot be used to precisely control the number of entries scanned by * the server, it only serves to determine when the loop should terminate.
* 5. With this method, the server can reduce the values of the two parameters subscriptionBacklogScanMaxTimeMs and - * subscriptionBacklogScanMaxEntries, and then retrieve the desired number of backlog entries through + * subscriptionBacklogScanMaxEntries, so user can retrieve the desired number of backlog entries through * client-side looping. *

* @param topic From 825286da3bad5349c41cc2630694b43bef973bc3 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Tue, 6 Jan 2026 12:53:48 +0800 Subject: [PATCH 09/15] [improve][admin] Remove entry filter logic in OpScan --- .../bookkeeper/mledger/impl/OpScan.java | 28 +++++++------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java index 90d5dd0dffe7f..202ba099c5de2 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java @@ -69,24 +69,16 @@ public void readEntriesComplete(List entries, Object ctx) { try { Position lastPositionForBatch = entries.get(entries.size() - 1).getPosition(); lastSeenPosition = lastPositionForBatch; - // TODO This filter operation is not needed, already handled by OpReadEntry. - // filter out the entry if it has been already deleted - // filterReadEntries will call entry.release if the entry is filtered out - List entriesFiltered = this.cursor.filterReadEntries(entries); - int skippedEntries = entries.size() - entriesFiltered.size(); - remainingEntries.addAndGet(-skippedEntries); - if (!entriesFiltered.isEmpty()) { - for (Entry entry : entriesFiltered) { - if (remainingEntries.getAndDecrement() <= 0) { - log.warn("[{}] Scan abort after reading too many entries", OpScan.this.cursor); - callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx); - return; - } - if (!condition.test(entry)) { - log.warn("[{}] Scan abort due to user code", OpScan.this.cursor); - callback.scanComplete(lastSeenPosition, ScanOutcome.USER_INTERRUPTED, OpScan.this.ctx); - return; - } + for (Entry entry : entries) { + if (remainingEntries.getAndDecrement() <= 0) { + log.warn("[{}] Scan abort after reading too many entries", OpScan.this.cursor); + callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx); + return; + } + if (!condition.test(entry)) { + log.warn("[{}] Scan abort due to user code", OpScan.this.cursor); + callback.scanComplete(lastSeenPosition, ScanOutcome.USER_INTERRUPTED, OpScan.this.ctx); + return; } } searchPosition = ledger.getPositionAfterN(lastPositionForBatch, 1, From 9bc2cba004f90121c630a7cbd87ebdf018756fc3 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Thu, 8 Jan 2026 19:25:38 +0800 Subject: [PATCH 10/15] [improve][admin] Apply copilot comments, optimize tests --- .../admin/AnalyzeBacklogSubscriptionTest.java | 124 +++++++++--------- .../apache/pulsar/client/admin/Topics.java | 4 +- .../client/admin/internal/TopicsImpl.java | 8 +- 3 files changed, 71 insertions(+), 65 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java index 8016cf1ed6c36..1304417d29a7e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java @@ -18,9 +18,9 @@ */ package org.apache.pulsar.broker.admin; -import static org.assertj.core.api.Assertions.assertThat; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNull; import static org.testng.Assert.assertThrows; import java.util.ArrayList; import java.util.List; @@ -33,6 +33,7 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageIdAdv; import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.ProducerConsumerBase; import org.apache.pulsar.client.api.SubscriptionType; @@ -199,7 +200,7 @@ public void partitionedTopicNotAllowed() throws Exception { @Test public void analyzeBacklogServerReturnFalseAbortedFlagWithoutLoop() throws Exception { - long serverSubscriptionBacklogScanMaxEntries = 20; + int serverSubscriptionBacklogScanMaxEntries = 20; conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries); String topic = "persistent://my-property/my-ns/analyze-backlog-server-return-false-aborted-flag-without-loop"; @@ -207,18 +208,15 @@ public void analyzeBacklogServerReturnFalseAbortedFlagWithoutLoop() throws Excep int numMessages = 10; // Test server returns false aborted flag. - clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages); + List messageIds = clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages); - AnalyzeSubscriptionBacklogResult backlogResult = - admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), numMessages - 1); - - assertEquals(backlogResult.getEntries(), numMessages); - assertEquals(backlogResult.getMessages(), numMessages); + verifyClientSideLoopBacklog(topic, subName, numMessages - 1, numMessages, messageIds.get(0), + messageIds.get(numMessages - 1)); } @Test public void analyzeBacklogMaxEntriesExceedWithoutLoop() throws Exception { - long serverSubscriptionBacklogScanMaxEntries = 20; + int serverSubscriptionBacklogScanMaxEntries = 20; conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries); String topic = "persistent://my-property/my-ns/analyze-backlog-max-entries-exceed-without-loop"; @@ -227,19 +225,17 @@ public void analyzeBacklogMaxEntriesExceedWithoutLoop() throws Exception { // Test backlogScanMaxEntries(client side) <= subscriptionBacklogScanMaxEntries(server side), but server // returns true aborted flag. Server dispatcherMaxReadBatchSize is set to 10. - clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages); + List messageIds = clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages); - AnalyzeSubscriptionBacklogResult backlogResult = admin.topics() - .analyzeSubscriptionBacklog(topic, subName, Optional.empty(), - serverSubscriptionBacklogScanMaxEntries - 1); + verifyClientSideLoopBacklog(topic, subName, serverSubscriptionBacklogScanMaxEntries - 1, + serverSubscriptionBacklogScanMaxEntries, messageIds.get(0), + messageIds.get(serverSubscriptionBacklogScanMaxEntries - 1)); - assertEquals(backlogResult.getEntries(), serverSubscriptionBacklogScanMaxEntries); - assertEquals(backlogResult.getMessages(), serverSubscriptionBacklogScanMaxEntries); } @Test public void analyzeBacklogServerReturnFalseAbortedFlagWithLoop() throws Exception { - long serverSubscriptionBacklogScanMaxEntries = 20; + int serverSubscriptionBacklogScanMaxEntries = 20; conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries); String topic = "persistent://my-property/my-ns/analyze-backlog-server-return-false-aborted-flag-with-loop"; @@ -248,18 +244,15 @@ public void analyzeBacklogServerReturnFalseAbortedFlagWithLoop() throws Exceptio // Test client side loop: backlogScanMaxEntries > subscriptionBacklogScanMaxEntries, the loop termination // condition is that server returns false aborted flag. - clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages); - - AnalyzeSubscriptionBacklogResult backlogResult = - admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), numMessages); + List messageIds = clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages); - assertEquals(backlogResult.getEntries(), numMessages); - assertEquals(backlogResult.getMessages(), numMessages); + verifyClientSideLoopBacklog(topic, subName, numMessages, numMessages, messageIds.get(0), + messageIds.get(numMessages - 1)); } @Test public void analyzeBacklogMaxEntriesExceedWithLoop() throws Exception { - long serverSubscriptionBacklogScanMaxEntries = 15; + int serverSubscriptionBacklogScanMaxEntries = 15; conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries); String topic = "persistent://my-property/my-ns/analyze-backlog-max-entries-exceed-with-loop"; @@ -270,21 +263,18 @@ public void analyzeBacklogMaxEntriesExceedWithLoop() throws Exception { // Test client side loop: backlogScanMaxEntries > subscriptionBacklogScanMaxEntries, the loop termination // condition is that total entries exceeds backlogScanMaxEntries. // Server dispatcherMaxReadBatchSize is set to 10. - clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages); - - AnalyzeSubscriptionBacklogResult backlogResult = - admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), backlogScanMaxEntries); + List messageIds = clientSideLoopAnalyzeBacklogSetup(topic, subName, numMessages); // Broker returns 15 + 15 + 15 = 45 entries. - long expectedEntries = (backlogScanMaxEntries / serverSubscriptionBacklogScanMaxEntries + 1) + int expectedEntries = (backlogScanMaxEntries / serverSubscriptionBacklogScanMaxEntries + 1) * serverSubscriptionBacklogScanMaxEntries; - assertEquals(backlogResult.getEntries(), expectedEntries); - assertEquals(backlogResult.getMessages(), expectedEntries); + verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries, expectedEntries, messageIds.get(0), + messageIds.get(expectedEntries - 1)); } @Test public void analyzeBacklogWithTopicUnload() throws Exception { - long serverSubscriptionBacklogScanMaxEntries = 10; + int serverSubscriptionBacklogScanMaxEntries = 10; conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries); String topic = "persistent://my-property/my-ns/analyze-backlog-with-topic-unload"; @@ -298,23 +288,22 @@ public void analyzeBacklogWithTopicUnload() throws Exception { // Test client side loop with topic unload. Use sync send method here to avoid potential message duplication. @Cleanup Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create(); + List messageIds = new ArrayList<>(); for (int i = 0; i < numMessages; i++) { - producer.send(("test-" + i).getBytes()); + MessageId messageId = producer.send(("test-" + i).getBytes()); + messageIds.add(messageId); if (RandomUtils.secure().randomBoolean()) { admin.topics().unload(topic); } } - AnalyzeSubscriptionBacklogResult backlogResult = - admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), numMessages); - - assertEquals(backlogResult.getEntries(), numMessages); - assertEquals(backlogResult.getMessages(), numMessages); + verifyClientSideLoopBacklog(topic, subName, numMessages, numMessages, messageIds.get(0), + messageIds.get(numMessages - 1)); } @Test public void analyzeBacklogWithIndividualAck() throws Exception { - long serverSubscriptionBacklogScanMaxEntries = 20; + int serverSubscriptionBacklogScanMaxEntries = 20; conf.setSubscriptionBacklogScanMaxEntries(serverSubscriptionBacklogScanMaxEntries); String topic = "persistent://my-property/my-ns/analyze-backlog-with-individual-ack"; @@ -322,7 +311,7 @@ public void analyzeBacklogWithIndividualAck() throws Exception { int messages = 55; // Test client side loop with individual ack. - clientSideLoopAnalyzeBacklogSetup(topic, subName, messages); + List messageIds = clientSideLoopAnalyzeBacklogSetup(topic, subName, messages); // We want to wait for the server to process acks, in order to not have a flaky test. @Cleanup Consumer consumer = @@ -334,21 +323,14 @@ public void analyzeBacklogWithIndividualAck() throws Exception { Message message2 = consumer.receive(); consumer.acknowledge(message2); - long backlogScanMaxEntries = 20; - AnalyzeSubscriptionBacklogResult backlogResult = - admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), backlogScanMaxEntries); - - assertThat(backlogResult.getEntries()).isEqualTo(backlogScanMaxEntries); - assertThat(backlogResult.getMessages()).isEqualTo(backlogScanMaxEntries); + int backlogScanMaxEntries = 20; + verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries, backlogScanMaxEntries, messageIds.get(0), + messageIds.get(backlogScanMaxEntries)); // Ack message1. consumer.acknowledge(message1); - - backlogResult = - admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), backlogScanMaxEntries); - - assertEquals(backlogResult.getEntries(), backlogScanMaxEntries); - assertEquals(backlogResult.getMessages(), backlogScanMaxEntries); + verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries, backlogScanMaxEntries, messageIds.get(2), + messageIds.get(backlogScanMaxEntries + 1)); // Ack all messages. for (int i = 2; i < messages; i++) { @@ -356,20 +338,15 @@ public void analyzeBacklogWithIndividualAck() throws Exception { consumer.acknowledge(message); } - backlogResult = - admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), backlogScanMaxEntries); - assertEquals(backlogResult.getEntries(), 0); - assertEquals(backlogResult.getMessages(), 0); + verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries, 0, null, + null); } - private void clientSideLoopAnalyzeBacklogSetup(String topic, String subName, int numMessages) throws Exception { + private List clientSideLoopAnalyzeBacklogSetup(String topic, String subName, int numMessages) throws Exception { admin.topics().createSubscription(topic, subName, MessageId.latest); assertEquals(admin.topics().getSubscriptions(topic), List.of("sub-1")); - AnalyzeSubscriptionBacklogResult backlogResult = - admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), 1); - assertEquals(backlogResult.getEntries(), 0); - assertEquals(backlogResult.getMessages(), 0); + verifyClientSideLoopBacklog(topic, subName, -1, 0, null, null); @Cleanup Producer producer = pulsarClient.newProducer().topic(topic).enableBatching(false).create(); List> futures = new ArrayList<>(); @@ -378,6 +355,33 @@ private void clientSideLoopAnalyzeBacklogSetup(String topic, String subName, int futures.add(future); } FutureUtil.waitForAll(futures).get(); + return futures.stream().map(CompletableFuture::join).toList(); + } + + private void verifyClientSideLoopBacklog(String topic, String subName, int backlogMaxScanEntries, + int expectedEntries, MessageId firstMessageId, + MessageId lastMessageId) throws Exception { + AnalyzeSubscriptionBacklogResult backlogResult = + admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), backlogMaxScanEntries); + + assertEquals(backlogResult.getEntries(), expectedEntries); + assertEquals(backlogResult.getMessages(), expectedEntries); + + if (firstMessageId == null) { + assertNull(backlogResult.getFirstMessageId()); + } else { + MessageIdAdv firstMessageIdAdv = (MessageIdAdv) firstMessageId; + assertEquals(backlogResult.getFirstMessageId(), + firstMessageIdAdv.getLedgerId() + ":" + firstMessageIdAdv.getEntryId()); + } + + if (lastMessageId == null) { + assertNull(backlogResult.getLastMessageId()); + } else { + MessageIdAdv lastMessageIdAdv = (MessageIdAdv) lastMessageId; + assertEquals(backlogResult.getLastMessageId(), + lastMessageIdAdv.getLedgerId() + ":" + lastMessageIdAdv.getEntryId()); + } } } diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 63a817dca309b..3ac148191dce6 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -2271,7 +2271,7 @@ CompletableFuture analyzeSubscriptionBacklogAs * Increasing these settings is possible. However, it's possible that the HTTP request times out (also idle timeout * in NAT/firewall etc.) before the command completes so increasing the limits might not be useful beyond a few * minutes. - *

+ *

* *

* How does this method work?
@@ -2286,7 +2286,7 @@ CompletableFuture analyzeSubscriptionBacklogAs * 5. With this method, the server can reduce the values of the two parameters subscriptionBacklogScanMaxTimeMs and * subscriptionBacklogScanMaxEntries, so user can retrieve the desired number of backlog entries through * client-side looping. - *

+ *

* @param topic * Topic name * @param subscriptionName diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 2f95812f41b4f..12c3d3a9896a4 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -1633,10 +1633,11 @@ public void accept(AnalyzeSubscriptionBacklogResult currentResult, Throwable thr // To avoid infinite loops, we ensure the entry count is incremented after each loop. if (currentResult.getEntries() <= 0) { - log.warn("[{}][{}] Scanned total entry count is null, abort analyze backlog, start position is: {}", - topic, subscriptionName, startPositionRef.get()); + log.warn("[{}][{}] Scanned total entry count is zero or negative, abort analyze backlog, start " + + "position is: {}", topic, subscriptionName, startPositionRef.get()); future.completeExceptionally( new PulsarAdminException("Incorrect total entry count returned from server")); + return; } // In analyze-backlog, lastMessageId is null only when: total entries is 0, @@ -1646,6 +1647,7 @@ public void accept(AnalyzeSubscriptionBacklogResult currentResult, Throwable thr topic, subscriptionName, startPositionRef.get()); future.completeExceptionally( new PulsarAdminException("Incorrect last message id returned from server")); + return; } String[] messageIdSplits = mergedResult.getLastMessageId().split(":"); @@ -1684,7 +1686,7 @@ private AnalyzeSubscriptionBacklogResult mergeBacklogResults(AnalyzeSubscription current.getFilterRescheduledMessages() + previous.getFilterRescheduledMessages()); mergedRes.setAborted(current.isAborted()); - mergedRes.setFirstMessageId(current.getFirstMessageId()); + mergedRes.setFirstMessageId(previous.getFirstMessageId()); mergedRes.setLastMessageId(current.getLastMessageId()); return mergedRes; From 6047f048a5010fe56f854d68499c089d4afa69a0 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Thu, 8 Jan 2026 22:34:59 +0800 Subject: [PATCH 11/15] [improve][admin] Fix checkstyle --- .../broker/admin/AnalyzeBacklogSubscriptionTest.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java index 1304417d29a7e..4425436954aa6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AnalyzeBacklogSubscriptionTest.java @@ -338,11 +338,11 @@ public void analyzeBacklogWithIndividualAck() throws Exception { consumer.acknowledge(message); } - verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries, 0, null, - null); + verifyClientSideLoopBacklog(topic, subName, backlogScanMaxEntries, 0, null, null); } - private List clientSideLoopAnalyzeBacklogSetup(String topic, String subName, int numMessages) throws Exception { + private List clientSideLoopAnalyzeBacklogSetup(String topic, String subName, int numMessages) + throws Exception { admin.topics().createSubscription(topic, subName, MessageId.latest); assertEquals(admin.topics().getSubscriptions(topic), List.of("sub-1")); @@ -359,8 +359,8 @@ private List clientSideLoopAnalyzeBacklogSetup(String topic, String s } private void verifyClientSideLoopBacklog(String topic, String subName, int backlogMaxScanEntries, - int expectedEntries, MessageId firstMessageId, - MessageId lastMessageId) throws Exception { + int expectedEntries, MessageId firstMessageId, MessageId lastMessageId) + throws Exception { AnalyzeSubscriptionBacklogResult backlogResult = admin.topics().analyzeSubscriptionBacklog(topic, subName, Optional.empty(), backlogMaxScanEntries); From 21cdb9047adbf46e6dc9268596cfd4a4a401a03c Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Wed, 21 Jan 2026 10:02:11 +0800 Subject: [PATCH 12/15] [fix][admin] Add continuePredicate overload method --- .../apache/pulsar/client/admin/Topics.java | 47 +++++++++++++++++++ .../client/admin/internal/TopicsImpl.java | 24 ++++++++-- 2 files changed, 68 insertions(+), 3 deletions(-) diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 3ac148191dce6..47c22df8d2fc0 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -23,6 +23,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.function.Predicate; import org.apache.pulsar.client.admin.PulsarAdminException.ConflictException; import org.apache.pulsar.client.admin.PulsarAdminException.NotAllowedException; import org.apache.pulsar.client.admin.PulsarAdminException.NotAuthorizedException; @@ -2238,6 +2239,29 @@ AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, String Optional startPosition, long backlogScanMaxEntries) throws PulsarAdminException; + /** + * Analyze subscription backlog. + * This is a potentially expensive operation, as it requires + * to read the messages from storage. + * This function takes into consideration batch messages + * and also Subscription filters.
+ * See also: {@link #analyzeSubscriptionBacklogAsync(String, String, Optional, Predicate)}
+ * + * @param topic + * Topic name + * @param subscriptionName + * the subscription + * @param startPosition + * the position to start the scan from (empty means the last processed message) + * @param continuePredicate + * the predicate to determine whether to continue the loop + * @return an accurate analysis of the backlog + */ + AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, String subscriptionName, + Optional startPosition, + Predicate continuePredicate) + throws PulsarAdminException; + /** * Analyze subscription backlog. * This is a potentially expensive operation, as it requires @@ -2302,6 +2326,29 @@ CompletableFuture analyzeSubscriptionBacklogAs Optional startPosition, long backlogScanMaxEntries); + /** + * Analyze subscription backlog. + * This is a potentially expensive operation, as it requires + * to read the messages from storage. + * This function takes into consideration batch messages + * and also Subscription filters.
+ * See also: {@link #analyzeSubscriptionBacklogAsync(String, String, Optional, long)}
+ * User can control the loop termination condition by continuePredicate. + * + * @param topic + * Topic name + * @param subscriptionName + * the subscription + * @param startPosition + * the position to start the scan from (empty means the last processed message) + * @param continuePredicate + * the predicate to determine whether to continue the loop + * @return an accurate analysis of the backlog + */ + CompletableFuture analyzeSubscriptionBacklogAsync(String topic, + String subscriptionName, Optional startPosition, + Predicate continuePredicate); + /** * Get backlog size by a message ID. * @param topic diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 12c3d3a9896a4..5e4f8502613bb 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -36,6 +36,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -1573,6 +1574,14 @@ public AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, () -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, startPosition, backlogScanMaxEntries)); } + @Override + public AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, String subscriptionName, + Optional startPosition, + Predicate continuePredicate) + throws PulsarAdminException { + return sync(() -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, startPosition, continuePredicate)); + } + @Override public CompletableFuture analyzeSubscriptionBacklogAsync(String topic, String subscriptionName, @@ -1609,6 +1618,14 @@ public CompletableFuture analyzeSubscriptionBa String subscriptionName, Optional startPosition, long backlogScanMaxEntries) { + return analyzeSubscriptionBacklogAsync(topic, subscriptionName, startPosition, + (backlogResult) -> backlogResult.getEntries() >= backlogScanMaxEntries); + } + + @Override + public CompletableFuture analyzeSubscriptionBacklogAsync(String topic, + String subscriptionName, Optional startPosition, + Predicate continuePredicate) { final CompletableFuture future = new CompletableFuture<>(); AtomicReference resultRef = new AtomicReference<>(); int partitionIndex = TopicName.get(topic).getPartitionIndex(); @@ -1626,12 +1643,13 @@ public void accept(AnalyzeSubscriptionBacklogResult currentResult, Throwable thr AnalyzeSubscriptionBacklogResult mergedResult = mergeBacklogResults(currentResult, resultRef.get()); resultRef.set(mergedResult); - if (!mergedResult.isAborted() || mergedResult.getEntries() >= backlogScanMaxEntries) { + if (!mergedResult.isAborted() || continuePredicate.test(mergedResult)) { future.complete(mergedResult); return; } // To avoid infinite loops, we ensure the entry count is incremented after each loop. + // Should never happen. if (currentResult.getEntries() <= 0) { log.warn("[{}][{}] Scanned total entry count is zero or negative, abort analyze backlog, start " + "position is: {}", topic, subscriptionName, startPositionRef.get()); @@ -1640,8 +1658,8 @@ public void accept(AnalyzeSubscriptionBacklogResult currentResult, Throwable thr return; } - // In analyze-backlog, lastMessageId is null only when: total entries is 0, - // with false aborted flag returned. + // In analyze-backlog, lastMessageId is null only when: total entries is 0, with false aborted flag + // returned. Should never happen. if (StringUtils.isBlank(mergedResult.getLastMessageId())) { log.warn("[{}][{}] Scanned last message id is blank, abort analyze backlog, start position is: {}", topic, subscriptionName, startPositionRef.get()); From a32ed4790e30dff372f4fdc20498d52bacd5db9e Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Wed, 21 Jan 2026 14:27:08 +0800 Subject: [PATCH 13/15] [fix][admin] Change continuePredicate to terminatePredicate --- .../org/apache/pulsar/client/admin/Topics.java | 14 +++++++------- .../pulsar/client/admin/internal/TopicsImpl.java | 8 ++++---- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java index 47c22df8d2fc0..e68be8fd2e805 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java @@ -2253,13 +2253,13 @@ AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, String * the subscription * @param startPosition * the position to start the scan from (empty means the last processed message) - * @param continuePredicate - * the predicate to determine whether to continue the loop + * @param terminatePredicate + * the predicate to determine whether to terminate the loop * @return an accurate analysis of the backlog */ AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, String subscriptionName, Optional startPosition, - Predicate continuePredicate) + Predicate terminatePredicate) throws PulsarAdminException; /** @@ -2333,7 +2333,7 @@ CompletableFuture analyzeSubscriptionBacklogAs * This function takes into consideration batch messages * and also Subscription filters.
* See also: {@link #analyzeSubscriptionBacklogAsync(String, String, Optional, long)}
- * User can control the loop termination condition by continuePredicate. + * User can control the loop termination condition by terminatePredicate. * * @param topic * Topic name @@ -2341,13 +2341,13 @@ CompletableFuture analyzeSubscriptionBacklogAs * the subscription * @param startPosition * the position to start the scan from (empty means the last processed message) - * @param continuePredicate - * the predicate to determine whether to continue the loop + * @param terminatePredicate + * the predicate to determine whether to terminate the loop * @return an accurate analysis of the backlog */ CompletableFuture analyzeSubscriptionBacklogAsync(String topic, String subscriptionName, Optional startPosition, - Predicate continuePredicate); + Predicate terminatePredicate); /** * Get backlog size by a message ID. diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 5e4f8502613bb..530a126d1b1fa 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -1577,9 +1577,9 @@ public AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, @Override public AnalyzeSubscriptionBacklogResult analyzeSubscriptionBacklog(String topic, String subscriptionName, Optional startPosition, - Predicate continuePredicate) + Predicate terminatePredicate) throws PulsarAdminException { - return sync(() -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, startPosition, continuePredicate)); + return sync(() -> analyzeSubscriptionBacklogAsync(topic, subscriptionName, startPosition, terminatePredicate)); } @Override @@ -1625,7 +1625,7 @@ public CompletableFuture analyzeSubscriptionBa @Override public CompletableFuture analyzeSubscriptionBacklogAsync(String topic, String subscriptionName, Optional startPosition, - Predicate continuePredicate) { + Predicate terminatePredicate) { final CompletableFuture future = new CompletableFuture<>(); AtomicReference resultRef = new AtomicReference<>(); int partitionIndex = TopicName.get(topic).getPartitionIndex(); @@ -1643,7 +1643,7 @@ public void accept(AnalyzeSubscriptionBacklogResult currentResult, Throwable thr AnalyzeSubscriptionBacklogResult mergedResult = mergeBacklogResults(currentResult, resultRef.get()); resultRef.set(mergedResult); - if (!mergedResult.isAborted() || continuePredicate.test(mergedResult)) { + if (!mergedResult.isAborted() || terminatePredicate.test(mergedResult)) { future.complete(mergedResult); return; } From da82b1b321d043dc4515ff5f46e99701a7bf008d Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Thu, 5 Feb 2026 12:41:05 +0800 Subject: [PATCH 14/15] deal with 0 entries and null lastMessageId --- .../client/admin/internal/TopicsImpl.java | 29 +++++++------------ 1 file changed, 11 insertions(+), 18 deletions(-) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java index 530a126d1b1fa..78b7329159c1a 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java @@ -1648,23 +1648,13 @@ public void accept(AnalyzeSubscriptionBacklogResult currentResult, Throwable thr return; } - // To avoid infinite loops, we ensure the entry count is incremented after each loop. - // Should never happen. - if (currentResult.getEntries() <= 0) { - log.warn("[{}][{}] Scanned total entry count is zero or negative, abort analyze backlog, start " - + "position is: {}", topic, subscriptionName, startPositionRef.get()); - future.completeExceptionally( - new PulsarAdminException("Incorrect total entry count returned from server")); - return; - } - - // In analyze-backlog, lastMessageId is null only when: total entries is 0, with false aborted flag - // returned. Should never happen. - if (StringUtils.isBlank(mergedResult.getLastMessageId())) { - log.warn("[{}][{}] Scanned last message id is blank, abort analyze backlog, start position is: {}", - topic, subscriptionName, startPositionRef.get()); - future.completeExceptionally( - new PulsarAdminException("Incorrect last message id returned from server")); + // In analyze-backlog, we treat 0 entries or null lastMessageId as scan completed for mere safety. + // 0 entries or a null lastMessageId indicates no entries were scanned. + if (currentResult.getEntries() <= 0 || StringUtils.isBlank(currentResult.getLastMessageId())) { + log.info("[{}][{}] complete scan due total entry <= 0 or last message id is blank, " + + "start position is: {}, current result: {}", topic, subscriptionName, + startPositionRef.get(), currentResult); + future.complete(mergedResult); return; } @@ -1705,7 +1695,10 @@ private AnalyzeSubscriptionBacklogResult mergeBacklogResults(AnalyzeSubscription mergedRes.setAborted(current.isAborted()); mergedRes.setFirstMessageId(previous.getFirstMessageId()); - mergedRes.setLastMessageId(current.getLastMessageId()); + String lastMessageId = current.getLastMessageId(); + if (StringUtils.isNotBlank(lastMessageId)) { + mergedRes.setLastMessageId(lastMessageId); + } return mergedRes; } From 674ba6fe4e2c6b6184b0434e280fee556d86223d Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Thu, 5 Feb 2026 16:06:35 +0800 Subject: [PATCH 15/15] modify log level --- .../java/org/apache/bookkeeper/mledger/impl/OpScan.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java index 202ba099c5de2..413bb5c018e84 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpScan.java @@ -71,12 +71,12 @@ public void readEntriesComplete(List entries, Object ctx) { lastSeenPosition = lastPositionForBatch; for (Entry entry : entries) { if (remainingEntries.getAndDecrement() <= 0) { - log.warn("[{}] Scan abort after reading too many entries", OpScan.this.cursor); + log.info("[{}] Scan abort after reading too many entries", OpScan.this.cursor); callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx); return; } if (!condition.test(entry)) { - log.warn("[{}] Scan abort due to user code", OpScan.this.cursor); + log.info("[{}] Scan abort due to user code", OpScan.this.cursor); callback.scanComplete(lastSeenPosition, ScanOutcome.USER_INTERRUPTED, OpScan.this.ctx); return; } @@ -111,12 +111,12 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { public void find() { if (remainingEntries.get() <= 0) { - log.warn("[{}] Scan abort after reading too many entries", OpScan.this.cursor); + log.info("[{}] Scan abort after reading too many entries", OpScan.this.cursor); callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx); return; } if (System.currentTimeMillis() - startTime > timeOutMs) { - log.warn("[{}] Scan abort after hitting the deadline", OpScan.this.cursor); + log.info("[{}] Scan abort after hitting the deadline", OpScan.this.cursor); callback.scanComplete(lastSeenPosition, ScanOutcome.ABORTED, OpScan.this.ctx); return; }