-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[feat][pip] PIP-452: Customizable topic listing of namespace with properties #25134
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
d9ebc1a
[feat][broker] PIP-452: Customizable Topic Listing in GetTopicsOfName…
coderzc cc06f10
Add interceptGetTopicsOfNamespace method on BrokerInterceptor
coderzc 3f6c0b1
Enhance GetTopicsOfNamespace with customizable topic listing using pr…
coderzc 5e91453
update interceptGetTopicsOfNamespace to return Optional and handle pr…
coderzc 7c064cb
address comment & update title
coderzc 6b61215
improve doc
coderzc 50a5a63
Add `enableBrokerTopicListWatcher` config and explained that topic li…
coderzc e6e548c
Add Mailing List voting thread link
coderzc c0a1802
address comment
coderzc 3eaded2
Introduce `PulsarResourcesExtended` interface for customizable topic …
coderzc cf14cdf
Enhance `PulsarResourcesExtended` interface with detailed documentati…
coderzc 60cdd53
Update properties parameter description in listTopicOfNamespace
coderzc 67b9bb9
Update pip-452.md
coderzc File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,280 @@ | ||
| # PIP-452: Customizable topic listing of namespace with properties | ||
|
|
||
| # Motivation | ||
| Currently, the CommandGetTopicsOfNamespace logic in the Pulsar Broker is hard-coded to scan the metadata store (ZooKeeper) for all children nodes under a namespace. | ||
|
|
||
| This implementation limits the flexibility required for complex multi-tenant scenarios: | ||
|
|
||
| No Client Context: The broker cannot distinguish who is asking for the topics or why. It cannot filter topics based on client properties (these client properties may correspond to, or be derived from topic properties). | ||
|
|
||
| Inefficient Filtering: For namespaces with millions of topics, the broker must fetch the full list into memory before applying the topics_pattern regex. There is no way to "push down" the filtering to the data source (e.g., a database with an index). | ||
|
|
||
| To address these issues, I propose making the topic listing logic pluggable and extending the protocol to accept client properties. | ||
|
|
||
| # Goals | ||
|
|
||
| ## In Scope | ||
|
|
||
| Protocol: Add a properties field to `CommandGetTopicsOfNamespace` to carry client-side context. | ||
|
|
||
| Broker: Introduce a new pluggable interface `PulsarResourcesExtended` to allow customization of resource management logic, starting with the "Get Topics" request. | ||
|
|
||
| Client: Update the Java Client to forward Consumer properties to the lookup service when using Regex subscriptions. | ||
|
|
||
| Admin API & CLI: Update the REST API and CLI to accept properties for listing topics in a namespace. | ||
|
|
||
| Configuration: Add a broker configuration to enable/disable topic list watching. | ||
|
|
||
| ## Out of Scope | ||
|
|
||
| Not support topic list watching for customized topic listing in this PIP. It can be considered in future work. | ||
| If you want to use this feature, you need to disable topic list watcher by set `enableBrokerTopicListWatcher = false`. | ||
|
|
||
| # High Level Design | ||
| We will modify the Pulsar Protocol to carry a properties map. On the Broker side, we will add a new pluggable interface `PulsarResourcesExtended`. | ||
| The default implementation will preserve the existing behavior (delegating to `NamespaceService`). The Broker's connection handler will delegate the request to `NamespaceService`, which in turn delegates to the `PulsarResourcesExtended` interface. | ||
|
|
||
| # Detailed Design | ||
|
|
||
| ## Public-facing Changes | ||
|
|
||
| ### Configuration | ||
|
|
||
| broker.conf | ||
| ```properties | ||
| # Enables watching topic add/remove events on broker side. It is separated from enableBrokerSideSubscriptionPatternEvaluation. | ||
| enableBrokerTopicListWatcher = true | ||
|
coderzc marked this conversation as resolved.
|
||
|
|
||
| # Class name for the extended Pulsar resources. | ||
| # This class must implement org.apache.pulsar.broker.PulsarResourcesExtended. | ||
| pulsarResourcesExtendedClassName = org.apache.pulsar.broker.DefaultPulsarResourcesExtended | ||
| ``` | ||
|
|
||
| ### Protocol Changes | ||
| Update `PulsarApi.proto` to include the properties field. | ||
|
|
||
| PulsarApi.proto | ||
| ```protobuf | ||
| message CommandGetTopicsOfNamespace { | ||
| required uint64 request_id = 1; | ||
| required string namespace = 2; | ||
| optional Mode mode = 3 [default = PERSISTENT]; | ||
|
|
||
| // Existing fields for filtering and hash optimization | ||
| optional string topics_pattern = 4; | ||
| optional string topics_hash = 5; | ||
|
|
||
| // New field: Context properties from the client | ||
| repeated KeyValue properties = 6; | ||
| } | ||
| ``` | ||
|
|
||
| ### REST API & CLI Changes | ||
|
|
||
| Add `properties` parameter to the REST API endpoint for listing topics in a namespace to list topic with specific properties for customizable topic listing. | ||
|
|
||
| REST API (URL encoding is required for values): | ||
| ``` | ||
| GET /admin/v2/persistent/{tenant}/{namespace}?properties=k1=v1,k2=v2 | ||
|
coderzc marked this conversation as resolved.
|
||
| ``` | ||
|
|
||
| CLI: | ||
| ``` | ||
| pulsar-admin topics list <tenant>/<namespace> -p k1=v1 -p k2=v2 | ||
| ``` | ||
|
|
||
| ## Design & Implementation Details | ||
| ### Broker Changes | ||
|
|
||
| Introduce `PulsarResourcesExtended` Interface | ||
|
|
||
| This interface is designed to be pluggable, allowing custom implementations to provide extended capabilities such as custom topic listing strategies. | ||
|
|
||
| ```java | ||
| package org.apache.pulsar.broker; | ||
|
|
||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; | ||
| import org.apache.pulsar.common.naming.NamespaceName; | ||
| import org.jspecify.annotations.Nullable; | ||
|
|
||
| /** | ||
| * Extended PulsarResources that provides additional functionality beyond PulsarResources. | ||
| * | ||
| * <p>This interface is designed to be pluggable, allowing custom implementations | ||
| * to provide extended PulsarResources capabilities such as custom topic listing strategies</p> | ||
| * | ||
| * <p>Implementations of this interface can be registered with PulsarService to | ||
| * provide extended resource management capabilities.</p> | ||
| */ | ||
| @InterfaceStability.Evolving | ||
| public interface PulsarResourcesExtended { | ||
|
|
||
| /** | ||
| * Lists topics in a namespace with optional property-based filtering. | ||
| * | ||
| * <p>This method provides a flexible way to list topics in a namespace, | ||
| * supporting property-based filtering through the properties parameter.</p> | ||
| * | ||
| * @param namespaceName the namespace to list topics from | ||
| * @param mode the listing mode (ALL, PERSISTENT, NON_PERSISTENT) | ||
| * @param properties optional property filters for topic listing, if null or empty, no filtering is applied | ||
| * @return a CompletableFuture containing the list of topic names | ||
| */ | ||
| CompletableFuture<List<String>> listTopicOfNamespace(NamespaceName namespaceName, | ||
| CommandGetTopicsOfNamespace.Mode mode, | ||
| @Nullable Map<String, String> properties); | ||
|
|
||
| /** | ||
| * Initializes this extended resources instance with the PulsarService. | ||
| * | ||
| * <p>This method is called during broker startup to provide the implementation | ||
| * with access to the PulsarService and its dependencies.</p> | ||
| * | ||
| * @param pulsarService the PulsarService instance | ||
| */ | ||
| void initialize(PulsarService pulsarService); | ||
|
|
||
| /** | ||
| * Closes this extended resources instance and releases any resources. | ||
| * | ||
| * <p>This method should be called during broker shutdown to clean up | ||
| * any resources held by the implementation.</p> | ||
| */ | ||
| void close(); | ||
| } | ||
| ``` | ||
|
|
||
| Add `DefaultPulsarResourcesExtended` | ||
|
|
||
| The default implementation delegates to the existing `NamespaceService`. | ||
|
|
||
| ```java | ||
| package org.apache.pulsar.broker; | ||
|
|
||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import org.apache.pulsar.broker.namespace.NamespaceService; | ||
| import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace; | ||
| import org.apache.pulsar.common.naming.NamespaceName; | ||
|
|
||
| public class DefaultPulsarResourcesExtended implements PulsarResourcesExtended { | ||
|
|
||
| private NamespaceService namespaceService; | ||
|
|
||
| @Override | ||
| public CompletableFuture<List<String>> listTopicOfNamespace(NamespaceName namespaceName, | ||
| CommandGetTopicsOfNamespace.Mode mode, | ||
| Map<String, String> properties) { | ||
| // Delegate to standard NamespaceService methods to preserve existing behavior | ||
| return namespaceService.getListOfTopics(namespaceName, mode); | ||
| } | ||
|
|
||
| @Override | ||
| public void initialize(PulsarService pulsarService) { | ||
| this.namespaceService = pulsarService.getNamespaceService(); | ||
| } | ||
| } | ||
| ``` | ||
|
|
||
| NamespaceService Changes | ||
|
|
||
| Add `getListOfTopicsByProperties` and `getListOfUserTopicsByProperties` in `NamespaceService` to serve as the entry point for topic listing with properties support. This method acts as the hook to call the pluggable resource extension. | ||
|
|
||
| ```java | ||
| // In NamespaceService.java | ||
|
|
||
| public CompletableFuture<List<String>> getListOfTopicsByProperties(NamespaceName namespaceName, Mode mode, | ||
| Map<String, String> properties) { | ||
| return pulsarResourcesExtended.listTopicOfNamespace(namespaceName, mode, properties); | ||
| } | ||
|
|
||
| public CompletableFuture<List<String>> getListOfUserTopicsByProperties(NamespaceName namespaceName, Mode mode, | ||
| Map<String, String> properties) { | ||
| return getListOfUserTopicsInternal(cacheKeyWithProperties(namespaceName, mode, properties), | ||
| () -> getListOfTopicsByProperties(namespaceName, mode, properties)); | ||
| } | ||
|
|
||
| private CompletableFuture<List<String>> getListOfUserTopicsInternal( | ||
| String key, | ||
| Supplier<CompletableFuture<List<String>>> topicsSupplier) { | ||
| final MutableBoolean initializedByCurrentThread = new MutableBoolean(); | ||
| CompletableFuture<List<String>> queryRes = inProgressQueryUserTopics.computeIfAbsent(key, k -> { | ||
| initializedByCurrentThread.setTrue(); | ||
| return topicsSupplier.get().thenApplyAsync(TopicList::filterSystemTopic, pulsar.getExecutor()); | ||
| }); | ||
| ..... | ||
| return queryRes; | ||
| } | ||
| ``` | ||
|
|
||
| Update CommandGetTopicsOfNamespace Handling | ||
|
|
||
| Use `getListOfUserTopicsByProperties` instead of `getListOfUserTopics` to get topic listing with properties support. | ||
|
|
||
| ```java | ||
| private void internalHandleGetTopicsOfNamespace(String namespace, NamespaceName namespaceName, long requestId, | ||
| CommandGetTopicsOfNamespace.Mode mode, | ||
| Optional<String> topicsPattern, Optional<String> topicsHash, | ||
| Semaphore lookupSemaphore, Map<String, String> properties) { | ||
| listSizeHolder.getSizeAsync().thenAccept(initialSize -> { | ||
| maxTopicListInFlightLimiter.withAcquiredPermits(initialSize, | ||
| AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY, isPermitRequestCancelled, initialPermits -> { | ||
| return getBrokerService().pulsar().getNamespaceService() | ||
| .getListOfUserTopicsByProperties(namespaceName, mode, properties); | ||
| ...... | ||
| }); | ||
| }); | ||
| } | ||
| ``` | ||
|
|
||
| ### Client Changes | ||
| Update LookupService | ||
| The internal LookupService interface is updated to accept the properties map. | ||
|
|
||
| ```Java | ||
| CompletableFuture<List<String>> getTopicsUnderNamespace( | ||
| NamespaceName namespace, | ||
| Mode mode, | ||
| String topicsPattern, | ||
| String topicsHash, | ||
| Map<String, String> properties | ||
| ); | ||
| ``` | ||
| Update `PatternMultiTopicsConsumerImpl` | ||
| The regex consumer implementation (PatternMultiTopicsConsumerImpl) will be updated to extract the properties from ConsumerConfigurationData and pass them to the LookupService. | ||
|
|
||
| ```Java | ||
| // In PatternMultiTopicsConsumerImpl.java | ||
| ConsumerConfigurationData conf; | ||
| Map<String, String> contextProperties = conf.getProperties(); | ||
|
|
||
| lookup.getTopicsUnderNamespace( | ||
| namespace, | ||
| mode, | ||
| topicsPattern.pattern(), | ||
| topicsHash, | ||
| contextProperties // Pass consumer properties here | ||
| ).thenAccept(topics -> { | ||
| // ... update subscriptions ... | ||
| }); | ||
| ``` | ||
|
|
||
|
|
||
| # Backward Compatibility | ||
| Protocol: Adding an optional field (properties) to the Protobuf definition is non-breaking. Old clients will not send this field; old brokers will ignore it. | ||
|
|
||
| Behavior: The default strategy mimics the current behavior exactly. The Broker retains the final filtering logic, ensuring that even custom strategies cannot return topics that violate the client's requested pattern. | ||
|
|
||
| # Security Considerations | ||
| Input Validation: The properties map is user-supplied input. Implementers of the customizable strategy MUST validate and sanitize these inputs before using them, especially if they are used to construct database queries. | ||
|
|
||
| Authorization: This PIP only controls the discovery (listing) of topics. It does not bypass the Authorization Service for consuming or producing to those topics. | ||
|
|
||
|
lhotari marked this conversation as resolved.
|
||
| # Links | ||
|
|
||
| * Mailing List discussion thread: https://lists.apache.org/thread/nf11qzn6r9wps6l81trxwtdr1m9xszpm | ||
| * Mailing List voting thread: https://lists.apache.org/thread/fx9spdm11c1491064gzocfvz7ptlbs4t | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.