Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,16 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
)
private int metadataStoreCacheExpirySeconds = 300;

private static final String DEFAULT_EXTENDED_RESOURCES_CLASS_NAME =
"org.apache.pulsar.broker.DefaultPulsarResourcesExtended";

@FieldContext(
category = CATEGORY_SERVER,
doc = "The class name of the PulsarResourcesExtended implementation. "
+ "This class must implement org.apache.pulsar.broker.PulsarResourcesExtended."
)
private String pulsarResourcesExtendedClassName = DEFAULT_EXTENDED_RESOURCES_CLASS_NAME;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Is metadata store read-only operations."
Expand Down Expand Up @@ -1265,10 +1275,20 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
@FieldContext(
dynamic = false,
category = CATEGORY_POLICIES,
doc = "Enables evaluating subscription pattern on broker side."
doc = "Enables evaluating subscription pattern on broker side. "
+ "Note: This config no longer controls watching topic list. "
+ "Please use `enableBrokerTopicListWatcher` to control that behavior."
)
private boolean enableBrokerSideSubscriptionPatternEvaluation = true;

@FieldContext(
dynamic = false,
category = CATEGORY_POLICIES,
doc = "Enables watching topic add/remove events on broker side for "
+ "subscription pattern evaluation."
)
private boolean enableBrokerTopicListWatcher = true;
Comment thread
shibd marked this conversation as resolved.

@FieldContext(
dynamic = false,
category = CATEGORY_POLICIES,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import lombok.Getter;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.naming.NamespaceName;

/**
* Default implementation of {@link PulsarResourcesExtended}.
*
* <p>This implementation provides the standard topic listing functionality
* by delegating to the {@link NamespaceService}.</p>
*/
public class DefaultPulsarResourcesExtended implements PulsarResourcesExtended {

@Getter
private PulsarService pulsarService;

@Override
public CompletableFuture<List<String>> listTopicOfNamespace(NamespaceName namespaceName,
CommandGetTopicsOfNamespace.Mode mode,
Map<String, String> properties) {
return pulsarService.getNamespaceService().getListOfTopics(namespaceName, mode);
}

@Override
public void initialize(PulsarService pulsarService) {
this.pulsarService = pulsarService;
}

@Override
public void close() {
// No specific resources to close in the default implementation
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker;

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.classification.InterfaceStability;
import org.apache.pulsar.common.naming.NamespaceName;
import org.jspecify.annotations.Nullable;

/**
* Extended PulsarResources that provides additional functionality beyond PulsarResources.
*
* <p>This interface is designed to be pluggable, allowing custom implementations
* to provide extended PulsarResources capabilities such as custom topic listing strategies</p>
*
* <p>Implementations of this interface can be registered with PulsarService to
* provide extended resource management capabilities.</p>
*/
@InterfaceStability.Evolving
public interface PulsarResourcesExtended {

/**
* Lists topics in a namespace with optional property-based filtering.
*
* <p>This method provides a flexible way to list topics in a namespace,
* supporting property-based filtering through the properties parameter.</p>
*
* @param namespaceName the namespace to list topics from
* @param mode the listing mode (ALL, PERSISTENT, NON_PERSISTENT)
* @param properties optional property filters for topic listing, if null or empty, no filtering is applied
* @return a CompletableFuture containing the list of topic names
*/
CompletableFuture<List<String>> listTopicOfNamespace(NamespaceName namespaceName,
CommandGetTopicsOfNamespace.Mode mode,
@Nullable Map<String, String> properties);

/**
* Initializes this extended resources instance with the PulsarService.
*
* <p>This method is called during broker startup to provide the implementation
* with access to the PulsarService and its dependencies.</p>
*
* @param pulsarService the PulsarService instance
*/
void initialize(PulsarService pulsarService);

/**
* Closes this extended resources instance and releases any resources.
*
* <p>This method should be called during broker shutdown to clean up
* any resources held by the implementation.</p>
*/
void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ public class PulsarService implements AutoCloseable, ShutdownService {
private boolean shouldShutdownConfigurationMetadataStore;

private PulsarResources pulsarResources;
private PulsarResourcesExtended pulsarResourcesExtended;

private TransactionPendingAckStoreProvider transactionPendingAckStoreProvider;
private final ExecutorProvider transactionExecutorProvider;
Expand Down Expand Up @@ -1032,6 +1033,9 @@ public void start() throws PulsarServerException {

this.metricsGenerator = new MetricsGenerator(this);

// Initialize PulsarResourcesExtended
pulsarResourcesExtended = loadPulsarResourcesExtended();

// the broker is ready to accept incoming requests by Pulsar binary protocol and http/https
final List<Runnable> runnables;
synchronized (pendingTasksBeforeReadyForIncomingRequests) {
Expand Down Expand Up @@ -1158,6 +1162,14 @@ protected PulsarResources newPulsarResources() {
return pulsarResources;
}

protected PulsarResourcesExtended loadPulsarResourcesExtended() {
String className = config.getPulsarResourcesExtendedClassName();
PulsarResourcesExtended extendedResources = Reflections.createInstance(className,
PulsarResourcesExtended.class, Thread.currentThread().getContextClassLoader());
extendedResources.initialize(this);
return extendedResources;
}

private synchronized void createMetricsServlet() {
this.metricsServlet = new PulsarPrometheusMetricsServlet(
this, config.isExposeTopicLevelMetricsInPrometheus(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.EncryptionKeys;
Expand Down Expand Up @@ -151,6 +152,7 @@
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.jspecify.annotations.NonNull;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -163,15 +165,17 @@ public class PersistentTopicsBase extends AdminResource {
private static final String DEPRECATED_CLIENT_VERSION_PREFIX = "Pulsar-CPP-v";
private static final Version LEAST_SUPPORTED_CLIENT_VERSION_PREFIX = Version.forIntegers(1, 21);

protected CompletableFuture<List<String>> internalGetListAsync(Optional<String> bundle) {
protected CompletableFuture<List<String>> internalGetListAsync(Optional<String> bundle,
@Nullable Map<String, String> properties) {
return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.GET_TOPICS)
.thenCompose(__ -> namespaceResources().namespaceExistsAsync(namespaceName))
.thenAccept(exists -> {
if (!exists) {
throw new RestException(Status.NOT_FOUND, "Namespace does not exist");
}
})
.thenCompose(__ -> topicResources().listPersistentTopicsAsync(namespaceName))
.thenCompose(__ -> pulsar().getNamespaceService().getListOfTopicsByProperties(namespaceName,
CommandGetTopicsOfNamespace.Mode.PERSISTENT, properties))
.thenApply(topics ->
topics.stream()
.filter(topic -> {
Expand Down Expand Up @@ -4423,7 +4427,7 @@ private CompletableFuture<Topic> topicNotFoundReasonAsync(TopicName topicName) {
"Partitioned Topic not found: %s %s", topicName.toString(), topicErrorType));
}
})
.thenCompose(__ -> internalGetListAsync(Optional.empty()))
.thenCompose(__ -> internalGetListAsync(Optional.empty(), null))
.thenApply(topics -> {
if (!topics.contains(topicName.toString())) {
throw new RestException(Status.NOT_FOUND, "Topic partitions were not yet created");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void getList(@Suspended final AsyncResponse asyncResponse, @PathParam("pr
@ApiParam(value = "Specify the bundle name", required = false)
@QueryParam("bundle") String bundle) {
validateNamespaceName(property, cluster, namespace);
internalGetListAsync(Optional.ofNullable(bundle))
internalGetListAsync(Optional.ofNullable(bundle), null)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,8 @@ public void getList(
@ApiParam(value = "Specify the bundle name", required = false)
@QueryParam("bundle") String nsBundle,
@ApiParam(value = "Include system topic")
@QueryParam("includeSystemTopic") boolean includeSystemTopic) {
@QueryParam("includeSystemTopic") boolean includeSystemTopic,
@QueryParam("properties") String propertiesStr) {
Policies policies = null;
try {
validateNamespaceName(tenant, namespace);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -121,9 +123,11 @@ public void getList(
@ApiParam(value = "Specify the bundle name", required = false)
@QueryParam("bundle") String bundle,
@ApiParam(value = "Include system topic")
@QueryParam("includeSystemTopic") boolean includeSystemTopic) {
@QueryParam("includeSystemTopic") boolean includeSystemTopic,
@ApiParam(value = "properties for customized topic listing plugin, format: k1=v1,k2=v2")
@QueryParam("properties") String propertiesStr) {
Comment thread
coderzc marked this conversation as resolved.
validateNamespaceName(tenant, namespace);
internalGetListAsync(Optional.ofNullable(bundle))
internalGetListAsync(Optional.ofNullable(bundle), parseProperties(propertiesStr))
.thenAccept(topicList -> asyncResponse.resume(filterSystemTopic(topicList, includeSystemTopic)))
.exceptionally(ex -> {
if (isNot307And404Exception(ex)) {
Expand Down Expand Up @@ -5131,5 +5135,28 @@ public void getMessageIDByIndex(@Suspended final AsyncResponse asyncResponse,
});
}

private Map<String, String> parseProperties(String propertiesStr) {
if (propertiesStr == null || propertiesStr.trim().isEmpty()) {
return Collections.emptyMap();
}
Map<String, String> map = new HashMap<>();
String[] pairs = propertiesStr.split(",");
for (String pair : pairs) {
String[] parts = pair.split("=", 2);
if (parts.length == 2) {
try {
String key = Codec.decode(parts[0].trim());
String value = Codec.decode(parts[1].trim());
if (!key.isEmpty()) {
map.put(key, value);
}
} catch (Exception e) {
log.warn("Failed to decode property: {}", pair, e);
}
}
}
return map;
}

private static final Logger log = LoggerFactory.getLogger(PersistentTopics.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package org.apache.pulsar.broker.intercept;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
Expand Down Expand Up @@ -66,7 +66,8 @@ public static BrokerInterceptor load(ServiceConfiguration conf) throws IOExcepti
BrokerInterceptorUtils.searchForInterceptors(conf.getBrokerInterceptorsDirectory(),
conf.getNarExtractionDirectory());

ImmutableMap.Builder<String, BrokerInterceptorWithClassLoader> builder = ImmutableMap.builder();
// Use LinkedHashMap as a temporary container to ensure insertion order
Map<String, BrokerInterceptorWithClassLoader> orderedInterceptorMap = new LinkedHashMap<>();

conf.getBrokerInterceptors().forEach(interceptorName -> {

Expand All @@ -80,7 +81,7 @@ public static BrokerInterceptor load(ServiceConfiguration conf) throws IOExcepti
try {
interceptor = BrokerInterceptorUtils.load(definition, conf.getNarExtractionDirectory());
if (interceptor != null) {
builder.put(interceptorName, interceptor);
orderedInterceptorMap.put(interceptorName, interceptor);
}
log.info("Successfully loaded broker interceptor for name `{}`", interceptorName);
} catch (IOException e) {
Expand All @@ -89,9 +90,8 @@ public static BrokerInterceptor load(ServiceConfiguration conf) throws IOExcepti
}
});

Map<String, BrokerInterceptorWithClassLoader> interceptors = builder.build();
if (!interceptors.isEmpty()) {
return new BrokerInterceptors(interceptors);
if (!orderedInterceptorMap.isEmpty()) {
return new BrokerInterceptors(Map.copyOf(orderedInterceptorMap));
} else {
return null;
}
Expand Down
Loading
Loading