Skip to content

Commit d94cd42

Browse files
committed
Refactor Pulsar service integration in CustomizedPulsarResourcesExtended and clean up unused variables
1 parent 6b904e5 commit d94cd42

4 files changed

Lines changed: 8 additions & 12 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/DefaultPulsarResourcesExtended.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.List;
2222
import java.util.Map;
2323
import java.util.concurrent.CompletableFuture;
24+
import lombok.Getter;
2425
import org.apache.pulsar.broker.namespace.NamespaceService;
2526
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
2627
import org.apache.pulsar.common.naming.NamespaceName;
@@ -33,6 +34,7 @@
3334
*/
3435
public class DefaultPulsarResourcesExtended implements PulsarResourcesExtended {
3536

37+
@Getter
3638
private PulsarService pulsarService;
3739

3840
@Override

pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -901,6 +901,8 @@ public void start() throws PulsarServerException {
901901
shouldShutdownConfigurationMetadataStore = false;
902902
}
903903
pulsarResources = newPulsarResources();
904+
// Initialize PulsarResourcesExtended
905+
pulsarResourcesExtended = loadPulsarResourcesExtended();
904906

905907
orderedExecutor = newOrderedExecutor();
906908

@@ -921,9 +923,6 @@ public void start() throws PulsarServerException {
921923
// needs load management service and before start broker service,
922924
this.startNamespaceService();
923925

924-
// Initialize PulsarResourcesExtended
925-
pulsarResourcesExtended = loadPulsarResourcesExtended();
926-
927926
schemaStorage = createAndStartSchemaStorage();
928927
schemaRegistryService = SchemaRegistryService.create(
929928
schemaStorage, config.getSchemaRegistryCompatibilityCheckers(), this);

pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@
5959
import org.apache.commons.lang3.StringUtils;
6060
import org.apache.commons.lang3.mutable.MutableBoolean;
6161
import org.apache.commons.lang3.tuple.Pair;
62-
import org.apache.pulsar.broker.PulsarResourcesExtended;
6362
import org.apache.pulsar.broker.PulsarServerException;
6463
import org.apache.pulsar.broker.PulsarService;
6564
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -195,8 +194,6 @@ public class NamespaceService implements AutoCloseable {
195194
private ConcurrentHashMap<String, CompletableFuture<List<String>>> inProgressQueryUserTopics =
196195
new ConcurrentHashMap<>();
197196

198-
private PulsarResourcesExtended pulsarResourcesExtended;
199-
200197
/**
201198
* Default constructor.
202199
*/
@@ -211,7 +208,6 @@ public NamespaceService(PulsarService pulsar) {
211208
this.bundleSplitListeners = new CopyOnWriteArrayList<>();
212209
this.localBrokerDataCache = pulsar.getLocalMetadataStore().getMetadataCache(LocalBrokerData.class);
213210
this.redirectManager = new RedirectManager(pulsar);
214-
this.pulsarResourcesExtended = pulsar.getPulsarResourcesExtended();
215211

216212
this.lookupLatencyHistogram = pulsar.getOpenTelemetry().getMeter()
217213
.histogramBuilder(LOOKUP_REQUEST_DURATION_METRIC_NAME)
@@ -1539,7 +1535,7 @@ public CompletableFuture<List<String>> getListOfTopicsByProperties(NamespaceName
15391535
if (MapUtils.isEmpty(properties)) {
15401536
return getListOfTopics(namespaceName, mode);
15411537
} else {
1542-
return pulsarResourcesExtended.listTopicOfNamespace(namespaceName, mode, properties);
1538+
return pulsar.getPulsarResourcesExtended().listTopicOfNamespace(namespaceName, mode, properties);
15431539
}
15441540
}
15451541

pulsar-broker/src/test/java/org/apache/pulsar/broker/intercept/CustomizedPulsarResourcesExtended.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@
3939
@Slf4j
4040
public class CustomizedPulsarResourcesExtended extends DefaultPulsarResourcesExtended {
4141

42-
private PulsarService pulsar;
4342
private boolean enabledTopicWatcher;
4443

4544
// Map<Namespace, Map<Topic, Map<PropertyKey, PropertyValue>>>
@@ -89,9 +88,9 @@ public List<String> queryTopicListByProperties(String namespace, Map<String, Str
8988

9089
public void setCustomProperties(TopicName topicName, Map<String, String> properties)
9190
throws ExecutionException, InterruptedException, TimeoutException {
92-
int timeoutSeconds = pulsar.getConfiguration().getMetadataStoreOperationTimeoutSeconds();
91+
int timeoutSeconds = getPulsarService().getConfiguration().getMetadataStoreOperationTimeoutSeconds();
9392
PartitionedTopicMetadata partitionedTopicMetadata =
94-
pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)
93+
getPulsarService().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)
9594
.get(timeoutSeconds, TimeUnit.SECONDS);
9695
if (partitionedTopicMetadata.partitions == 0) {
9796
setNonPartitionedTopicCustomProperties(topicName, properties);
@@ -115,7 +114,7 @@ private void setNonPartitionedTopicCustomProperties(TopicName topicName, Map<Str
115114

116115
@Override
117116
public void initialize(PulsarService pulsarService) {
118-
this.pulsar = pulsarService;
117+
super.initialize(pulsarService);
119118
this.enabledTopicWatcher = pulsarService.getConfiguration().isEnableBrokerTopicListWatcher();
120119
this.customTopicPropertiesMap = new ConcurrentHashMap<>();
121120
}

0 commit comments

Comments
 (0)