Skip to content

[fix][broker] Prevent missed topic changes in topic watchers and schedule periodic refresh with patternAutoDiscoveryPeriod interval#25188

Open
lhotari wants to merge 26 commits intoapache:masterfrom
lhotari:lh-topiclistwatcher-should-poll
Open

[fix][broker] Prevent missed topic changes in topic watchers and schedule periodic refresh with patternAutoDiscoveryPeriod interval#25188
lhotari wants to merge 26 commits intoapache:masterfrom
lhotari:lh-topiclistwatcher-should-poll

Conversation

@lhotari
Copy link
Member

@lhotari lhotari commented Jan 27, 2026

Fixes #25020
Fixes #25191
Fixes #25192

Motivation

The TopicListService implementation does not handle metadata store session events (SessionEvent), causing the client-side topic list to become out of sync when the metadata store session experiences temporary disruptions. During metadata store disconnections, topic add/delete events are missed, leading to inconsistent state between the broker and pattern topic consumers. Additionally, metadata store change event delivery is not guaranteed, making state reconciliation necessary to maintain consistency.

This PR also addresses several related issues:

Modifications

This PR introduces session event handling and hash-based verification to ensure topic list consistency:

Server-side changes:

  • Added TopicListener interface to handle both topic events and session events
  • Modified TopicResources to register session listeners and dispatch session events to topic listeners
  • Enhanced TopicListService.TopicListWatcher to:
    • Retrieve the topic list after metadata store reconnection
    • Reconstruct missed add/delete events by comparing pre-disconnection and post-reconnection states
    • Send synthetic events to clients to synchronize their topic lists
  • Changed PulsarResources to use MetadataStoreExtended instead of MetadataStore to access session event functionality
  • Modified TopicListService to accept topic list watcher commands for already existing watchers, enabling state reconciliation when the client-side topic listing hash mismatches
  • Postponed topic list watcher creation until after initial topic subscription completes to prevent race conditions
  • Fixed NamespaceService.getListOfUserTopics to consistently exclude system topics from results, ensuring hash consistency between topic listing methods
  • Added new feature flag supportsTopicWatcherReconcile for backwards compatibility
    • the broker now supports calling CommandWatchTopicList for existing watchers so that the topic list watcher state can be reconciled.

Client-side changes:

  • Schedule periodic state refresh using the configured patternAutoDiscoveryPeriod interval to detect and correct any drift
  • Implemented hash-based verification to detect inconsistencies:
    • Updated CommandWatchTopicList to include the client's current topics hash
    • Modified PatternConsumerUpdateQueue to combine add and delete operations into a single task, enabling hash comparison after applying changes to local state
    • Added hash mismatch detection in PatternMultiTopicsConsumerImpl that triggers reconciliation via recheckTopicsChange() when the broker's hash differs from the expected local state
    • Enhanced reconciliation logic to compare local state with broker state and trigger a refresh when differences are detected
  • Use the new feature flag supportsTopicWatcherReconcile for backwards compatibility
    • Issue reconciliation operation via CommandWatchTopicList when the feature flag is set
    • Older brokers don't support issuing CommandWatchTopicList for existing watchers

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@lhotari lhotari added this to the 4.2.0 milestone Jan 27, 2026
@lhotari lhotari self-assigned this Jan 27, 2026
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Jan 27, 2026
@lhotari lhotari changed the title [fix][broker] Handle metadata store session events in TopicListService to prevent missed topic changes [fix][broker] Prevent missed topic changes in topic watchers and schedule periodic refresh with patternAutoDiscoveryPeriod interval Jan 27, 2026
@lhotari lhotari requested a review from Technoboy- January 27, 2026 14:19
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR fixes an issue where the TopicListService implementation doesn't handle metadata store session events, causing client-side topic lists to become out of sync during metadata store disruptions. The solution introduces session event handling and hash-based verification to ensure consistency between broker and client.

Changes:

  • Added TopicListener interface to handle both topic events and session events, with session reconnection triggering topic list reconciliation
  • Implemented hash-based verification to detect state inconsistencies between client and broker, triggering reconciliation when mismatches are detected
  • Modified client-side pattern consumer to schedule periodic state refresh and postpone topic list watcher creation until after initial subscription completes

Reviewed changes

Copilot reviewed 18 out of 18 changed files in this pull request and generated 1 comment.

Show a summary per file
File Description
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicListener.java New interface defining contract for topic and session event handling
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/TopicResources.java Updated to use MetadataStoreExtended and register session listeners, dispatch events to TopicListener implementations
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/resources/PulsarResources.java Changed constructor parameters to accept MetadataStoreExtended instead of MetadataStore for session event support
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java Enhanced TopicListWatcher to handle session events, reconcile state after reconnection, and support hash-based refresh commands
pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicListWatcher.java Added reconcile() method and changed to use supplier for local state hash, updated to pass hash in watch commands
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java Postponed watcher creation, added periodic reconciliation scheduling, implemented hash calculation and watcher-based reconciliation
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternConsumerUpdateQueue.java Combined separate add/remove operations into single TOPICS_CHANGED task with hash verification and mismatch detection
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java Added convenience method for creating WatchTopicList requests with topics hash parameter
pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java Added String deduplication for filtered topic results using TopicName cache
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListServiceTest.java Added comprehensive tests for session event handling, hash matching, and reconciliation scenarios
pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java Added test for periodic reconciliation and improved existing test methods
pulsar-broker-common/src/test/java/org/apache/pulsar/broker/resources/TopicResourcesTest.java Updated tests for new TopicListener interface
pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicListWatcherTest.java Added test for hash mismatch reconciliation
pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternConsumerUpdateQueueTest.java Updated tests for combined topics changed operations
pulsar-client/src/test/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImplTest.java Simplified tests after removing topicsHashSetter parameter
pulsar-proxy/src/test/java/org/apache/pulsar/client/impl/AutoCloseUselessClientConProxyTest.java Added helper method to wait for topic list watcher startup
pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/NonStartableTestPulsarService.java Updated test resources to use MetadataStoreExtended
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicListWatcherTest.java Updated tests for renamed method onTopicEvent
Comments suppressed due to low confidence (1)

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/TopicListService.java:132

  • The onTopicEvent method accesses and modifies matchingTopics without synchronization, which can lead to race conditions. The method is not synchronized, but it:
  1. Calls matchingTopics.remove(topicName) on line 121
  2. Calls matchingTopics.add(topicName) on line 124
  3. Passes matchingTopics to TopicList.calculateHash() on line 128

Meanwhile, other synchronized methods like updateTopics() (line 185) and prepareUpdateTopics() (line 178) also access matchingTopics. This creates a data race where unsynchronized access in onTopicEvent can interleave with synchronized access in other methods, potentially causing:

  • Lost updates
  • Inconsistent hash calculations
  • ConcurrentModificationException if the Set implementation isn't thread-safe

The method should be declared synchronized to ensure thread-safe access to the shared matchingTopics field.

        public void onTopicEvent(String topicName, NotificationType notificationType) {
            if (closed) {
                return;
            }
            String partitionedTopicName = TopicName.get(topicName).getPartitionedTopicName();
            String domainLessTopicName = TopicList.removeTopicDomainScheme(partitionedTopicName);

            if (topicsPattern.matches(domainLessTopicName)) {
                List<String> newTopics = Collections.emptyList();
                List<String> deletedTopics = Collections.emptyList();
                if (notificationType == NotificationType.Deleted) {
                    if (matchingTopics.remove(topicName)) {
                        deletedTopics = Collections.singletonList(topicName);
                    }
                } else if (notificationType == NotificationType.Created && matchingTopics.add(topicName)) {
                    newTopics = Collections.singletonList(topicName);
                }
                if (!newTopics.isEmpty() || !deletedTopics.isEmpty()) {
                    String hash = TopicList.calculateHash(matchingTopics);
                    sendTopicListUpdate(hash, deletedTopics, newTopics);
                }
            }
        }

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 24 out of 24 changed files in this pull request and generated 7 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@lhotari lhotari requested a review from Denovo1998 January 29, 2026 13:09
@nodece
Copy link
Member

nodece commented Feb 5, 2026

The change from MetadataStore to MetadataStoreExtended in PulsarResources and TopicResources is acceptable.

Third-party plugins usually do not directly depend on the concrete MetadataStore type, so this change alone is unlikely to cause real compatibility issues.

The main concern is the change of public methods that changing the TopicResources listener APIs from
BiConsumer<String, NotificationType> to the new TopicListener interface will break existing third-party plugins that register listeners.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

3 participants