Skip to content

Commit 339a6b8

Browse files
committed
[improve][broker] Add http produce backlog quota check
- Add http produce backlog quota check for both destination_storage and message_age Signed-off-by: Dream95 <zhou_8621@163.com>
1 parent b15f53b commit 339a6b8

2 files changed

Lines changed: 117 additions & 26 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/rest/TopicsBase.java

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
import org.apache.pulsar.common.compression.CompressionCodecProvider;
8989
import org.apache.pulsar.common.naming.TopicName;
9090
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
91+
import org.apache.pulsar.common.policies.data.BacklogQuota;
9192
import org.apache.pulsar.common.policies.data.TopicOperation;
9293
import org.apache.pulsar.common.protocol.Commands;
9394
import org.apache.pulsar.common.protocol.schema.SchemaData;
@@ -272,35 +273,42 @@ private void internalPublishMessages(TopicName topicName, ProducerMessages reque
272273
private CompletableFuture<Position> publishSingleMessageToPartition(String topic, Message message) {
273274
CompletableFuture<Position> publishResult = new CompletableFuture<>();
274275
pulsar().getBrokerService().getTopic(topic, false)
275-
.thenAccept(t -> {
276-
// TODO: Check message backlog and fail if backlog too large.
277-
if (!t.isPresent()) {
278-
// Topic not found, and remove from owning partition list.
279-
publishResult.completeExceptionally(new BrokerServiceException.TopicNotFoundException("Topic not "
280-
+ "owned by current broker."));
281-
TopicName topicName = TopicName.get(topic);
282-
pulsar().getBrokerService().getOwningTopics().get(topicName.getPartitionedTopicName())
283-
.remove(topicName.getPartitionIndex());
284-
} else {
285-
try {
286-
ByteBuf headersAndPayload = messageToByteBuf(message);
287-
try {
288-
Topic topicObj = t.get();
289-
topicObj.publishMessage(headersAndPayload,
290-
RestMessagePublishContext.get(publishResult, topicObj, System.nanoTime()));
291-
} finally {
292-
headersAndPayload.release();
276+
.thenCompose(tOpt -> {
277+
if (tOpt.isEmpty()) {
278+
publishResult.completeExceptionally(
279+
new BrokerServiceException.TopicNotFoundException("Topic not "
280+
+ "owned by current broker."));
281+
TopicName tn = TopicName.get(topic);
282+
pulsar().getBrokerService().getOwningTopics().get(tn.getPartitionedTopicName())
283+
.remove(tn.getPartitionIndex());
284+
return CompletableFuture.completedFuture(null);
293285
}
294-
} catch (Exception e) {
286+
Topic topicObj = tOpt.get();
287+
CompletableFuture<Void> backlogQuotaCheckFuture = CompletableFuture.allOf(
288+
topicObj.checkBacklogQuotaExceeded(message.getProducerName(),
289+
BacklogQuota.BacklogQuotaType.destination_storage),
290+
topicObj.checkBacklogQuotaExceeded(message.getProducerName(),
291+
BacklogQuota.BacklogQuotaType.message_age));
292+
return backlogQuotaCheckFuture.thenRun(() -> {
293+
ByteBuf headersAndPayload = messageToByteBuf(message);
294+
try {
295+
topicObj.publishMessage(headersAndPayload,
296+
RestMessagePublishContext.get(publishResult, topicObj, System.nanoTime()));
297+
} finally {
298+
headersAndPayload.release();
299+
}
300+
});
301+
})
302+
.exceptionally(ex -> {
303+
Throwable cause = FutureUtil.unwrapCompletionException(ex);
295304
if (log.isDebugEnabled()) {
296-
log.debug("Fail to publish single messages to topic {}: {} ",
297-
topicName, e.getCause());
305+
log.debug("Fail to publish single message to topic {}: {}", topic, cause.getMessage());
298306
}
299-
publishResult.completeExceptionally(e);
300-
}
301-
}
302-
});
303-
307+
if (!publishResult.isDone()) {
308+
publishResult.completeExceptionally(cause);
309+
}
310+
return null;
311+
});
304312
return publishResult;
305313
}
306314

pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicsTest.java

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.broker.admin;
2020

21+
import static org.awaitility.Awaitility.await;
2122
import static org.mockito.ArgumentMatchers.any;
2223
import static org.mockito.ArgumentMatchers.anyBoolean;
2324
import static org.mockito.ArgumentMatchers.anyString;
@@ -66,6 +67,7 @@
6667
import org.apache.pulsar.broker.web.RestException;
6768
import org.apache.pulsar.client.api.Consumer;
6869
import org.apache.pulsar.client.api.Message;
70+
import org.apache.pulsar.client.api.MessageId;
6971
import org.apache.pulsar.client.api.Producer;
7072
import org.apache.pulsar.client.api.Schema;
7173
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
@@ -82,6 +84,7 @@
8284
import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema;
8385
import org.apache.pulsar.client.impl.schema.generic.GenericSchemaImpl;
8486
import org.apache.pulsar.common.naming.TopicDomain;
87+
import org.apache.pulsar.common.policies.data.BacklogQuota;
8588
import org.apache.pulsar.common.policies.data.ClusterDataImpl;
8689
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
8790
import org.apache.pulsar.common.schema.KeyValue;
@@ -931,4 +934,84 @@ public void testProduceWithAutoConsumeSchema() throws Exception {
931934
}
932935
}
933936

937+
@Test
938+
public void testProduceWithBacklogQuotaSizeExceeded() throws Exception {
939+
String namespaceName = testTenant + "/" + testNamespace;
940+
String topicName = "persistent://" + namespaceName + "/" + testTopicName;
941+
admin.topics().createNonPartitionedTopic(topicName);
942+
BacklogQuota backlogQuota = BacklogQuota.builder()
943+
.limitSize(0)
944+
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
945+
.build();
946+
admin.namespaces().setBacklogQuota(namespaceName, backlogQuota,
947+
BacklogQuota.BacklogQuotaType.destination_storage);
948+
949+
AsyncResponse asyncResponse = mock(AsyncResponse.class);
950+
ProducerMessages producerMessages = new ProducerMessages();
951+
String message = "[{\"payload\":\"rest-produce\"}]";
952+
producerMessages.setMessages(createMessages(message));
953+
topics.produceOnPersistentTopic(asyncResponse, testTenant, testNamespace, testTopicName,
954+
false, producerMessages);
955+
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
956+
verify(asyncResponse, timeout(5000).times(1)).resume(responseCaptor.capture());
957+
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.OK.getStatusCode());
958+
Object responseEntity = responseCaptor.getValue().getEntity();
959+
Assert.assertTrue(responseEntity instanceof ProducerAcks);
960+
ProducerAcks response = (ProducerAcks) responseEntity;
961+
Assert.assertEquals(response.getMessagePublishResults().size(), 1);
962+
for (int index = 0; index < response.getMessagePublishResults().size(); index++) {
963+
Assert.assertEquals(response.getMessagePublishResults().get(index).getErrorCode(), 2);
964+
Assert.assertTrue(response.getMessagePublishResults().get(index).getErrorMsg()
965+
.contains("Cannot create producer on topic with backlog quota exceeded"));
966+
}
967+
}
968+
969+
@Test
970+
public void testProduceWithBacklogQuotaTimeExceeded() throws Exception {
971+
pulsar.getConfiguration().setPreciseTimeBasedBacklogQuotaCheck(true);
972+
String namespaceName = testTenant + "/" + testNamespace;
973+
String topicName = "persistent://" + namespaceName + "/" + testTopicName;
974+
admin.topics().createNonPartitionedTopic(topicName);
975+
BacklogQuota backlogQuota = BacklogQuota.builder()
976+
.limitTime(1)
977+
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
978+
.build();
979+
admin.namespaces().setBacklogQuota(namespaceName, backlogQuota,
980+
BacklogQuota.BacklogQuotaType.message_age);
981+
admin.topics().createSubscription(topicName, "time-quota-sub", MessageId.earliest);
982+
983+
@Cleanup
984+
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
985+
.topic(topicName)
986+
.create();
987+
producer.send("backlog-message");
988+
Topic topic = pulsar.getBrokerService().getTopic(topicName, false)
989+
.get()
990+
.orElseThrow(() -> new IllegalStateException("Topic not loaded: " + topicName));
991+
PersistentTopic persistentTopic = (PersistentTopic) topic;
992+
await().atMost(5, TimeUnit.SECONDS).untilAsserted(() ->
993+
Assert.assertTrue(persistentTopic.checkTimeBacklogExceeded(true).get()));
994+
995+
AsyncResponse asyncResponse = mock(AsyncResponse.class);
996+
ProducerMessages producerMessages = new ProducerMessages();
997+
String message = "["
998+
+ "{\"key\":\"my-key\",\"payload\":\"RestProducer:1\",\"eventTime\":1603045262772,\"sequenceId\":1},"
999+
+ "{\"key\":\"my-key\",\"payload\":\"RestProducer:2\",\"eventTime\":1603045262772,\"sequenceId\":2}]";
1000+
producerMessages.setMessages(createMessages(message));
1001+
topics.produceOnPersistentTopic(asyncResponse, testTenant, testNamespace, testTopicName,
1002+
false, producerMessages);
1003+
ArgumentCaptor<Response> responseCaptor = ArgumentCaptor.forClass(Response.class);
1004+
verify(asyncResponse, timeout(5000).times(1)).resume(responseCaptor.capture());
1005+
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.OK.getStatusCode());
1006+
Object responseEntity = responseCaptor.getValue().getEntity();
1007+
Assert.assertTrue(responseEntity instanceof ProducerAcks);
1008+
ProducerAcks response = (ProducerAcks) responseEntity;
1009+
Assert.assertEquals(response.getMessagePublishResults().size(), 2);
1010+
for (int index = 0; index < response.getMessagePublishResults().size(); index++) {
1011+
Assert.assertEquals(response.getMessagePublishResults().get(index).getErrorCode(), 2);
1012+
Assert.assertTrue(response.getMessagePublishResults().get(index).getErrorMsg()
1013+
.contains("Cannot create producer on topic with backlog quota exceeded"));
1014+
}
1015+
}
1016+
9341017
}

0 commit comments

Comments
 (0)