Skip to content

Commit 9977b76

Browse files
committed
[fix][broker] Handle empty LocalPolicies content gracefully (#25158)
1 parent 356a0f4 commit 9977b76

3 files changed

Lines changed: 26 additions & 3 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,12 @@ protected CompletableFuture<Policies> getNamespacePoliciesAsync(NamespaceName na
333333
.getBundlesAsync(namespaceName)
334334
.thenApply(bundles -> bundles.getBundlesData());
335335
CompletableFuture<Optional<LocalPolicies>> localPoliciesFuture =
336-
getLocalPolicies().getLocalPoliciesAsync(namespaceName);
336+
getLocalPolicies().getLocalPoliciesAsync(namespaceName)
337+
.exceptionally(ex -> {
338+
log.warn("Failed to load local policies for {}, using empty: {}",
339+
namespaceName, ex.getMessage());
340+
return Optional.empty();
341+
});
337342

338343
return bundlesFuture.thenCombine(localPoliciesFuture, (bundleData, localPolicies) -> {
339344
policies.bundles = bundleData != null ? bundleData : policies.bundles;
@@ -839,6 +844,11 @@ protected CompletableFuture<SchemaCompatibilityStrategy> getSchemaCompatibilityS
839844
}
840845
}
841846
return schemaCompatibilityStrategy;
847+
}).exceptionally(ex -> {
848+
// If namespace policies cannot be loaded, fall back to broker config
849+
log.warn("Failed to load namespace policies for {}, using broker config default: {}",
850+
namespaceName, ex.getMessage());
851+
return pulsar().getConfig().getSchemaCompatibilityStrategy();
842852
});
843853
});
844854
}

pulsar-broker/src/main/java/org/apache/pulsar/common/naming/NamespaceBundleFactory.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,12 @@ private CompletableFuture<NamespaceBundles> copyToLocalPolicies(NamespaceName na
177177
return pulsar.getPulsarResources().getLocalPolicies()
178178
.createLocalPoliciesAsync(namespace, localPolicies)
179179
.thenApply(stat -> getBundles(namespace,
180-
Optional.of(Pair.of(localPolicies, 0L))));
180+
Optional.of(Pair.of(localPolicies, 0L))))
181+
.exceptionally(ex -> {
182+
// If LocalPolicies already exists (possibly with empty content),
183+
// just use the bundles from global policies
184+
return getBundles(namespace, Optional.empty());
185+
});
181186
});
182187
}
183188

pulsar-metadata/src/main/java/org/apache/pulsar/metadata/cache/impl/MetadataCacheImpl.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,15 @@ private CompletableFuture<Optional<CacheGetResult<T>>> readValueFromStore(String
136136

137137
try {
138138
GetResult res = optRes.get();
139-
T obj = serde.deserialize(path, res.getValue(), res.getStat());
139+
byte[] value = res.getValue();
140+
141+
// Handle empty content before attempting deserialization
142+
if (value == null || value.length == 0) {
143+
log.warn("Empty content found for key '{}', treating as non-existent", path);
144+
return FutureUtils.value(Optional.empty());
145+
}
146+
147+
T obj = serde.deserialize(path, value, res.getStat());
140148
return FutureUtils
141149
.value(Optional.of(new CacheGetResult<>(obj, res.getStat())));
142150
} catch (Throwable t) {

0 commit comments

Comments
 (0)