Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.Function;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.LocalPolicies;
Expand All @@ -47,7 +48,19 @@ public Optional<LocalPolicies> getLocalPolicies(NamespaceName ns) throws Metadat
}

public CompletableFuture<Optional<LocalPolicies>> getLocalPoliciesAsync(NamespaceName ns) {
return getCache().get(joinPath(LOCAL_POLICIES_ROOT, ns.toString()));
return getCache().get(joinPath(LOCAL_POLICIES_ROOT, ns.toString()))
.exceptionally(ex -> {
// Handle empty or corrupted LocalPolicies gracefully
// Walk the exception chain to find ContentDeserializationException
Throwable cause = ex;
while (cause != null) {
if (cause instanceof MetadataStoreException.ContentDeserializationException) {
return Optional.empty();
}
cause = cause.getCause();
}
throw new CompletionException(ex);
});
}

public void setLocalPoliciesWithCreate(NamespaceName ns, Function<Optional<LocalPolicies>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,56 +316,42 @@ protected WorkerService validateAndGetWorkerService() {
*/
@Deprecated
protected Policies getNamespacePolicies(NamespaceName namespaceName) {
try {
Policies policies = namespaceResources().getPolicies(namespaceName)
.orElseThrow(() -> new RestException(Status.NOT_FOUND, "Namespace does not exist"));
// fetch bundles from LocalZK-policies
BundlesData bundleData = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(namespaceName).getBundlesData();
Optional<LocalPolicies> localPolicies = getLocalPolicies().getLocalPolicies(namespaceName);
policies.bundles = bundleData != null ? bundleData : policies.bundles;
policies.migrated = localPolicies.isPresent() ? localPolicies.get().migrated : false;
if (policies.is_allow_auto_update_schema == null) {
// the type changed from boolean to Boolean. return broker value here for keeping compatibility.
policies.is_allow_auto_update_schema = pulsar().getConfig().isAllowAutoUpdateSchemaEnabled();
}

return policies;
} catch (RestException re) {
throw re;
} catch (Exception e) {
log.error("[{}] Failed to get namespace policies {}", clientAppId(), namespaceName, e);
throw new RestException(e);
}

return sync(() -> getNamespacePoliciesAsync(namespaceName));
}

protected CompletableFuture<Policies> getNamespacePoliciesAsync(NamespaceName namespaceName) {
CompletableFuture<Policies> result = new CompletableFuture<>();
namespaceResources().getPoliciesAsync(namespaceName)
.thenCombine(getLocalPolicies().getLocalPoliciesAsync(namespaceName), (pl, localPolicies) -> {
if (pl.isPresent()) {
Policies policies = pl.get();
if (localPolicies.isPresent()) {
policies.bundles = localPolicies.get().bundles;
policies.migrated = localPolicies.get().migrated;
}
return namespaceResources().getPoliciesAsync(namespaceName)
.thenCompose(pl -> {
if (pl.isEmpty()) {
return FutureUtil.failedFuture(
new RestException(Status.NOT_FOUND, "Namespace does not exist"));
}
Policies policies = pl.get();
// fetch bundles from NamespaceBundleFactory (aligns with sync method)
CompletableFuture<BundlesData> bundlesFuture = pulsar().getNamespaceService()
.getNamespaceBundleFactory()
.getBundlesAsync(namespaceName)
.thenApply(bundles -> bundles.getBundlesData());
CompletableFuture<Optional<LocalPolicies>> localPoliciesFuture =
getLocalPolicies().getLocalPoliciesAsync(namespaceName)
.exceptionally(ex -> {
log.warn("Failed to load local policies for {}, using empty: {}",
namespaceName, ex.getMessage());
return Optional.empty();
});

return bundlesFuture.thenCombine(localPoliciesFuture, (bundleData, localPolicies) -> {
policies.bundles = bundleData != null ? bundleData : policies.bundles;
policies.migrated = localPolicies.isPresent() ? localPolicies.get().migrated : false;
if (policies.is_allow_auto_update_schema == null) {
// the type changed from boolean to Boolean. return
// broker value here for keeping compatibility.
policies.is_allow_auto_update_schema = pulsar().getConfig()
.isAllowAutoUpdateSchemaEnabled();
}
result.complete(policies);
} else {
result.completeExceptionally(new RestException(Status.NOT_FOUND, "Namespace does not exist"));
}
return null;
}).exceptionally(ex -> {
result.completeExceptionally(ex.getCause());
return null;
return policies;
});
});
return result;
}

protected BacklogQuota namespaceBacklogQuota(NamespaceName namespace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,12 @@ private CompletableFuture<NamespaceBundles> copyToLocalPolicies(NamespaceName na
return pulsar.getPulsarResources().getLocalPolicies()
.createLocalPoliciesAsync(namespace, localPolicies)
.thenApply(stat -> getBundles(namespace,
Optional.of(Pair.of(localPolicies, 0L))));
Optional.of(Pair.of(localPolicies, 0L))))
.exceptionally(ex -> {
// If LocalPolicies already exists (possibly with empty content),
// just use the bundles from global policies
return getBundles(namespace, Optional.empty());
});
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3429,6 +3429,54 @@ public void testCreateAndDeleteNamespaceWithBundles() throws Exception {
deleteNamespaceWithRetry(ns, false);
}

@Test
public void testGetNamespacePoliciesAsyncReflectsBundleSplit() throws Exception {
// Test that getPoliciesAsync reflects latest bundles from NamespaceBundleFactory after bundle split.
// This verifies that the async method uses NamespaceBundleFactory.getBundlesAsync()
// instead of LocalPolicies.bundles (which may be stale after split).
String ns = BrokerTestUtil.newUniqueName("prop-xyz/ns");

// Create namespace with 1 bundle
admin.namespaces().createNamespace(ns, 1);

try {
// Verify initial state - should have 1 bundle
Policies initialPolicies = admin.namespaces().getPoliciesAsync(ns).get();
assertEquals(initialPolicies.bundles.getNumBundles(), 1, "Should start with 1 bundle");
assertEquals(initialPolicies.bundles.getBoundaries().size(), 2,
"1 bundle should have 2 boundaries");

// Split the bundle into 2
admin.namespaces().splitNamespaceBundle(ns, "0x00000000_0xffffffff", true, null);

// Wait for split to complete in NamespaceBundleFactory
Awaitility.await().untilAsserted(() -> {
NamespaceBundles bundles = pulsar.getNamespaceService()
.getNamespaceBundleFactory()
.getBundles(NamespaceName.get(ns));
assertEquals(bundles.getBundles().size(), 2,
"NamespaceBundleFactory should have 2 bundles after split");
});

// getPoliciesAsync should now reflect the split bundles from NamespaceBundleFactory
// Before the fix: LocalPolicies.bundles would still show 1 bundle (stale data)
// After the fix: NamespaceBundleFactory.getBundlesAsync() provides latest 2 bundles
Policies afterSplitPolicies = admin.namespaces().getPoliciesAsync(ns).get();
assertEquals(afterSplitPolicies.bundles.getNumBundles(), 2,
"getPoliciesAsync should reflect bundles from NamespaceBundleFactory after split");
assertEquals(afterSplitPolicies.bundles.getBoundaries().size(), 3,
"2 bundles should have 3 boundaries");

// Verify the boundaries are correct (0x00000000, 0x7fffffff, 0xffffffff)
List<String> boundaries = afterSplitPolicies.bundles.getBoundaries();
assertEquals(boundaries.get(0), "0x00000000");
assertEquals(boundaries.get(1), "0x7fffffff");
assertEquals(boundaries.get(2), "0xffffffff");
} finally {
deleteNamespaceWithRetry(ns, false);
}
}

@Test
public void testBacklogSizeShouldBeZeroWhenConsumerAckedAllMessages() throws Exception {
final String topic = "persistent://prop-xyz/ns1/testBacklogSizeShouldBeZeroWhenConsumerAckedAllMessages";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,15 @@ private CompletableFuture<Optional<CacheGetResult<T>>> readValueFromStore(String

try {
GetResult res = optRes.get();
T obj = serde.deserialize(path, res.getValue(), res.getStat());
byte[] value = res.getValue();

// Handle empty content before attempting deserialization
if (value == null || value.length == 0) {
log.warn("Empty content found for key '{}', treating as non-existent", path);
return FutureUtils.value(Optional.empty());
}

T obj = serde.deserialize(path, value, res.getStat());
return FutureUtils
.value(Optional.of(new CacheGetResult<>(obj, res.getStat())));
} catch (Throwable t) {
Expand Down