Skip to content

Commit 215bf27

Browse files
committed
deal with 0 entries and null lastMessageId
1 parent a32ed47 commit 215bf27

1 file changed

Lines changed: 11 additions & 18 deletions

File tree

  • pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal

pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java

Lines changed: 11 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1648,23 +1648,13 @@ public void accept(AnalyzeSubscriptionBacklogResult currentResult, Throwable thr
16481648
return;
16491649
}
16501650

1651-
// To avoid infinite loops, we ensure the entry count is incremented after each loop.
1652-
// Should never happen.
1653-
if (currentResult.getEntries() <= 0) {
1654-
log.warn("[{}][{}] Scanned total entry count is zero or negative, abort analyze backlog, start "
1655-
+ "position is: {}", topic, subscriptionName, startPositionRef.get());
1656-
future.completeExceptionally(
1657-
new PulsarAdminException("Incorrect total entry count returned from server"));
1658-
return;
1659-
}
1660-
1661-
// In analyze-backlog, lastMessageId is null only when: total entries is 0, with false aborted flag
1662-
// returned. Should never happen.
1663-
if (StringUtils.isBlank(mergedResult.getLastMessageId())) {
1664-
log.warn("[{}][{}] Scanned last message id is blank, abort analyze backlog, start position is: {}",
1665-
topic, subscriptionName, startPositionRef.get());
1666-
future.completeExceptionally(
1667-
new PulsarAdminException("Incorrect last message id returned from server"));
1651+
// In analyze-backlog, we treat 0 entries or null lastMessageId as scan completed for mere safety.
1652+
// 0 entries or a null lastMessageId indicates no entries were scanned.
1653+
if (currentResult.getEntries() <= 0 || StringUtils.isBlank(currentResult.getLastMessageId())) {
1654+
log.info("[{}][{}] complete scan due total entry <= 0 or last message id is blank, "
1655+
+ "start position is: {}, current result: {}", topic, subscriptionName,
1656+
startPositionRef.get(), currentResult);
1657+
future.complete(mergedResult);
16681658
return;
16691659
}
16701660

@@ -1705,7 +1695,10 @@ private AnalyzeSubscriptionBacklogResult mergeBacklogResults(AnalyzeSubscription
17051695

17061696
mergedRes.setAborted(current.isAborted());
17071697
mergedRes.setFirstMessageId(previous.getFirstMessageId());
1708-
mergedRes.setLastMessageId(current.getLastMessageId());
1698+
String lastMessageId = current.getLastMessageId();
1699+
if (lastMessageId != null) {
1700+
mergedRes.setLastMessageId(lastMessageId);
1701+
}
17091702

17101703
return mergedRes;
17111704
}

0 commit comments

Comments
 (0)