[fix][broker] Prevent missed topic changes in topic watchers and schedule periodic refresh with patternAutoDiscoveryPeriod interval#25188
Conversation
…e to prevent missed topic changes
There was a problem hiding this comment.
Pull request overview
This PR fixes an issue where the TopicListService implementation doesn't handle metadata store session events, causing client-side topic lists to become out of sync during metadata store disruptions. The solution introduces session event handling and hash-based verification to ensure consistency between broker and client.
Changes:
- Added
TopicListenerinterface to handle both topic events and session events, with session reconnection triggering topic list reconciliation - Implemented hash-based verification to detect state inconsistencies between client and broker, triggering reconciliation when mismatches are detected
- Modified client-side pattern consumer to schedule periodic state refresh and postpone topic list watcher creation until after initial subscription completes
Reviewed changes
Copilot reviewed 18 out of 18 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicListener.java | New interface defining contract for topic and session event handling |
| pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java | Updated to use MetadataStoreExtended and register session listeners, dispatch events to TopicListener implementations |
| pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java | Changed constructor parameters to accept MetadataStoreExtended instead of MetadataStore for session event support |
| pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java | Enhanced TopicListWatcher to handle session events, reconcile state after reconnection, and support hash-based refresh commands |
| pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java | Added reconcile() method and changed to use supplier for local state hash, updated to pass hash in watch commands |
| pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java | Postponed watcher creation, added periodic reconciliation scheduling, implemented hash calculation and watcher-based reconciliation |
| pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternConsumerUpdateQueue.java | Combined separate add/remove operations into single TOPICS_CHANGED task with hash verification and mismatch detection |
| pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java | Added convenience method for creating WatchTopicList requests with topics hash parameter |
| pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java | Added String deduplication for filtered topic results using TopicName cache |
| pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java | Added comprehensive tests for session event handling, hash matching, and reconciliation scenarios |
| pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java | Added test for periodic reconciliation and improved existing test methods |
| pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/TopicResourcesTest.java | Updated tests for new TopicListener interface |
| pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java | Added test for hash mismatch reconciliation |
| pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternConsumerUpdateQueueTest.java | Updated tests for combined topics changed operations |
| pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java | Simplified tests after removing topicsHashSetter parameter |
| pulsar-proxy/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConProxyTest.java | Added helper method to wait for topic list watcher startup |
| pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java | Updated test resources to use MetadataStoreExtended |
| pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java | Updated tests for renamed method onTopicEvent |
Comments suppressed due to low confidence (1)
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java:132
- The
onTopicEventmethod accesses and modifiesmatchingTopicswithout synchronization, which can lead to race conditions. The method is not synchronized, but it:
- Calls
matchingTopics.remove(topicName)on line 121 - Calls
matchingTopics.add(topicName)on line 124 - Passes
matchingTopicstoTopicList.calculateHash()on line 128
Meanwhile, other synchronized methods like updateTopics() (line 185) and prepareUpdateTopics() (line 178) also access matchingTopics. This creates a data race where unsynchronized access in onTopicEvent can interleave with synchronized access in other methods, potentially causing:
- Lost updates
- Inconsistent hash calculations
- ConcurrentModificationException if the Set implementation isn't thread-safe
The method should be declared synchronized to ensure thread-safe access to the shared matchingTopics field.
public void onTopicEvent(String topicName, NotificationType notificationType) {
if (closed) {
return;
}
String partitionedTopicName = TopicName.get(topicName).getPartitionedTopicName();
String domainLessTopicName = TopicList.removeTopicDomainScheme(partitionedTopicName);
if (topicsPattern.matches(domainLessTopicName)) {
List<String> newTopics = Collections.emptyList();
List<String> deletedTopics = Collections.emptyList();
if (notificationType == NotificationType.Deleted) {
if (matchingTopics.remove(topicName)) {
deletedTopics = Collections.singletonList(topicName);
}
} else if (notificationType == NotificationType.Created && matchingTopics.add(topicName)) {
newTopics = Collections.singletonList(topicName);
}
if (!newTopics.isEmpty() || !deletedTopics.isEmpty()) {
String hash = TopicList.calculateHash(matchingTopics);
sendTopicListUpdate(hash, deletedTopics, newTopics);
}
}
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
Outdated
Show resolved
Hide resolved
…pic list watcher reconcile
…urns the full list of matched topics
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 24 out of 24 changed files in this pull request and generated 7 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternConsumerUpdateQueue.java
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternConsumerUpdateQueue.java
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
Show resolved
Hide resolved
…n't support topic list watcher reconcile
…elease the connection
…cWatchers is true
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java
Show resolved
Hide resolved
|
The change from Third-party plugins usually do not directly depend on the concrete The main concern is the change of public methods that changing the |
Fixes #25020
Fixes #25191
Fixes #25192
Motivation
The
TopicListServiceimplementation does not handle metadata store session events (SessionEvent), causing the client-side topic list to become out of sync when the metadata store session experiences temporary disruptions. During metadata store disconnections, topic add/delete events are missed, leading to inconsistent state between the broker and pattern topic consumers. Additionally, metadata store change event delivery is not guaranteed, making state reconciliation necessary to maintain consistency.This PR also addresses several related issues:
Modifications
This PR introduces session event handling and hash-based verification to ensure topic list consistency:
Server-side changes:
TopicListenerinterface to handle both topic events and session eventsTopicResourcesto register session listeners and dispatch session events to topic listenersTopicListService.TopicListWatcherto:PulsarResourcesto useMetadataStoreExtendedinstead ofMetadataStoreto access session event functionalityTopicListServiceto accept topic list watcher commands for already existing watchers, enabling state reconciliation when the client-side topic listing hash mismatchesNamespaceService.getListOfUserTopicsto consistently exclude system topics from results, ensuring hash consistency between topic listing methodssupportsTopicWatcherReconcilefor backwards compatibilityCommandWatchTopicListfor existing watchers so that the topic list watcher state can be reconciled.Client-side changes:
patternAutoDiscoveryPeriodinterval to detect and correct any driftCommandWatchTopicListto include the client's current topics hashPatternConsumerUpdateQueueto combine add and delete operations into a single task, enabling hash comparison after applying changes to local statePatternMultiTopicsConsumerImplthat triggers reconciliation viarecheckTopicsChange()when the broker's hash differs from the expected local statesupportsTopicWatcherReconcilefor backwards compatibilityCommandWatchTopicListwhen the feature flag is setCommandWatchTopicListfor existing watchersDocumentation
docdoc-requireddoc-not-neededdoc-complete