Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 280 additions & 0 deletions pip/pip-452.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,280 @@
# PIP-452: Customizable topic listing of namespace with properties

# Motivation
Currently, the CommandGetTopicsOfNamespace logic in the Pulsar Broker is hard-coded to scan the metadata store (ZooKeeper) for all children nodes under a namespace.

This implementation limits the flexibility required for complex multi-tenant scenarios:

No Client Context: The broker cannot distinguish who is asking for the topics or why. It cannot filter topics based on client properties (these client properties may correspond to, or be derived from topic properties).

Inefficient Filtering: For namespaces with millions of topics, the broker must fetch the full list into memory before applying the topics_pattern regex. There is no way to "push down" the filtering to the data source (e.g., a database with an index).

To address these issues, I propose making the topic listing logic pluggable and extending the protocol to accept client properties.

# Goals

## In Scope

Protocol: Add a properties field to `CommandGetTopicsOfNamespace` to carry client-side context.
Comment thread
lhotari marked this conversation as resolved.

Broker: Introduce a new pluggable interface `PulsarResourcesExtended` to allow customization of resource management logic, starting with the "Get Topics" request.

Client: Update the Java Client to forward Consumer properties to the lookup service when using Regex subscriptions.

Admin API & CLI: Update the REST API and CLI to accept properties for listing topics in a namespace.

Configuration: Add a broker configuration to enable/disable topic list watching.

## Out of Scope

Not support topic list watching for customized topic listing in this PIP. It can be considered in future work.
If you want to use this feature, you need to disable topic list watcher by set `enableBrokerTopicListWatcher = false`.

# High Level Design
We will modify the Pulsar Protocol to carry a properties map. On the Broker side, we will add a new pluggable interface `PulsarResourcesExtended`.
The default implementation will preserve the existing behavior (delegating to `NamespaceService`). The Broker's connection handler will delegate the request to `NamespaceService`, which in turn delegates to the `PulsarResourcesExtended` interface.

# Detailed Design

## Public-facing Changes

### Configuration

broker.conf
```properties
# Enables watching topic add/remove events on broker side. It is separated from enableBrokerSideSubscriptionPatternEvaluation.
enableBrokerTopicListWatcher = true
Comment thread
coderzc marked this conversation as resolved.

# Class name for the extended Pulsar resources.
# This class must implement org.apache.pulsar.broker.PulsarResourcesExtended.
pulsarResourcesExtendedClassName = org.apache.pulsar.broker.DefaultPulsarResourcesExtended
```

### Protocol Changes
Update `PulsarApi.proto` to include the properties field.

PulsarApi.proto
```protobuf
message CommandGetTopicsOfNamespace {
required uint64 request_id = 1;
required string namespace = 2;
optional Mode mode = 3 [default = PERSISTENT];

// Existing fields for filtering and hash optimization
optional string topics_pattern = 4;
optional string topics_hash = 5;

// New field: Context properties from the client
repeated KeyValue properties = 6;
}
```

### REST API & CLI Changes

Add `properties` parameter to the REST API endpoint for listing topics in a namespace to list topic with specific properties for customizable topic listing.

REST API (URL encoding is required for values):
```
GET /admin/v2/persistent/{tenant}/{namespace}?properties=k1=v1,k2=v2
Comment thread
coderzc marked this conversation as resolved.
```

CLI:
```
pulsar-admin topics list <tenant>/<namespace> -p k1=v1 -p k2=v2
```

## Design & Implementation Details
### Broker Changes

Introduce `PulsarResourcesExtended` Interface

This interface is designed to be pluggable, allowing custom implementations to provide extended capabilities such as custom topic listing strategies.

```java
package org.apache.pulsar.broker;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.NamespaceName;
import org.jspecify.annotations.Nullable;

/**
* Extended PulsarResources that provides additional functionality beyond PulsarResources.
*
* <p>This interface is designed to be pluggable, allowing custom implementations
* to provide extended PulsarResources capabilities such as custom topic listing strategies</p>
*
* <p>Implementations of this interface can be registered with PulsarService to
* provide extended resource management capabilities.</p>
*/
@InterfaceStability.Evolving
public interface PulsarResourcesExtended {

/**
* Lists topics in a namespace with optional property-based filtering.
*
* <p>This method provides a flexible way to list topics in a namespace,
* supporting property-based filtering through the properties parameter.</p>
*
* @param namespaceName the namespace to list topics from
* @param mode the listing mode (ALL, PERSISTENT, NON_PERSISTENT)
* @param properties optional property filters for topic listing, if null or empty, no filtering is applied
* @return a CompletableFuture containing the list of topic names
*/
CompletableFuture<List<String>> listTopicOfNamespace(NamespaceName namespaceName,
CommandGetTopicsOfNamespace.Mode mode,
@Nullable Map<String, String> properties);

/**
* Initializes this extended resources instance with the PulsarService.
*
* <p>This method is called during broker startup to provide the implementation
* with access to the PulsarService and its dependencies.</p>
*
* @param pulsarService the PulsarService instance
*/
void initialize(PulsarService pulsarService);

/**
* Closes this extended resources instance and releases any resources.
*
* <p>This method should be called during broker shutdown to clean up
* any resources held by the implementation.</p>
*/
void close();
}
```

Add `DefaultPulsarResourcesExtended`

The default implementation delegates to the existing `NamespaceService`.

```java
package org.apache.pulsar.broker;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.NamespaceName;

public class DefaultPulsarResourcesExtended implements PulsarResourcesExtended {

private NamespaceService namespaceService;

@Override
public CompletableFuture<List<String>> listTopicOfNamespace(NamespaceName namespaceName,
CommandGetTopicsOfNamespace.Mode mode,
Map<String, String> properties) {
// Delegate to standard NamespaceService methods to preserve existing behavior
return namespaceService.getListOfTopics(namespaceName, mode);
}

@Override
public void initialize(PulsarService pulsarService) {
this.namespaceService = pulsarService.getNamespaceService();
}
}
```

NamespaceService Changes

Add `getListOfTopicsByProperties` and `getListOfUserTopicsByProperties` in `NamespaceService` to serve as the entry point for topic listing with properties support. This method acts as the hook to call the pluggable resource extension.

```java
// In NamespaceService.java

public CompletableFuture<List<String>> getListOfTopicsByProperties(NamespaceName namespaceName, Mode mode,
Map<String, String> properties) {
return pulsarResourcesExtended.listTopicOfNamespace(namespaceName, mode, properties);
}

public CompletableFuture<List<String>> getListOfUserTopicsByProperties(NamespaceName namespaceName, Mode mode,
Map<String, String> properties) {
return getListOfUserTopicsInternal(cacheKeyWithProperties(namespaceName, mode, properties),
() -> getListOfTopicsByProperties(namespaceName, mode, properties));
}

private CompletableFuture<List<String>> getListOfUserTopicsInternal(
String key,
Supplier<CompletableFuture<List<String>>> topicsSupplier) {
final MutableBoolean initializedByCurrentThread = new MutableBoolean();
CompletableFuture<List<String>> queryRes = inProgressQueryUserTopics.computeIfAbsent(key, k -> {
initializedByCurrentThread.setTrue();
return topicsSupplier.get().thenApplyAsync(TopicList::filterSystemTopic, pulsar.getExecutor());
});
.....
return queryRes;
}
```

Update CommandGetTopicsOfNamespace Handling

Use `getListOfUserTopicsByProperties` instead of `getListOfUserTopics` to get topic listing with properties support.

```java
private void internalHandleGetTopicsOfNamespace(String namespace, NamespaceName namespaceName, long requestId,
CommandGetTopicsOfNamespace.Mode mode,
Optional<String> topicsPattern, Optional<String> topicsHash,
Semaphore lookupSemaphore, Map<String, String> properties) {
listSizeHolder.getSizeAsync().thenAccept(initialSize -> {
maxTopicListInFlightLimiter.withAcquiredPermits(initialSize,
AsyncDualMemoryLimiter.LimitType.HEAP_MEMORY, isPermitRequestCancelled, initialPermits -> {
return getBrokerService().pulsar().getNamespaceService()
.getListOfUserTopicsByProperties(namespaceName, mode, properties);
......
});
});
}
```

### Client Changes
Update LookupService
The internal LookupService interface is updated to accept the properties map.

```Java
CompletableFuture<List<String>> getTopicsUnderNamespace(
NamespaceName namespace,
Mode mode,
String topicsPattern,
String topicsHash,
Map<String, String> properties
);
```
Update `PatternMultiTopicsConsumerImpl`
The regex consumer implementation (PatternMultiTopicsConsumerImpl) will be updated to extract the properties from ConsumerConfigurationData and pass them to the LookupService.

```Java
// In PatternMultiTopicsConsumerImpl.java
ConsumerConfigurationData conf;
Map<String, String> contextProperties = conf.getProperties();

lookup.getTopicsUnderNamespace(
namespace,
mode,
topicsPattern.pattern(),
topicsHash,
contextProperties // Pass consumer properties here
).thenAccept(topics -> {
// ... update subscriptions ...
});
```


# Backward Compatibility
Protocol: Adding an optional field (properties) to the Protobuf definition is non-breaking. Old clients will not send this field; old brokers will ignore it.

Behavior: The default strategy mimics the current behavior exactly. The Broker retains the final filtering logic, ensuring that even custom strategies cannot return topics that violate the client's requested pattern.

# Security Considerations
Input Validation: The properties map is user-supplied input. Implementers of the customizable strategy MUST validate and sanitize these inputs before using them, especially if they are used to construct database queries.

Authorization: This PIP only controls the discovery (listing) of topics. It does not bypass the Authorization Service for consuming or producing to those topics.

Comment thread
lhotari marked this conversation as resolved.
# Links

* Mailing List discussion thread: https://lists.apache.org/thread/nf11qzn6r9wps6l81trxwtdr1m9xszpm
* Mailing List voting thread: https://lists.apache.org/thread/fx9spdm11c1491064gzocfvz7ptlbs4t