Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,9 @@ void MultiTopicsConsumerImpl::messageReceived(const Consumer& consumer, const Me

void MultiTopicsConsumerImpl::internalListener(const Consumer& consumer) {
Message m;
incomingMessages_.pop(m);
if (!incomingMessages_.pop(m)) {
return;
}
try {
Consumer self{get_shared_this_ptr()};
messageProcessed(m);
Expand Down
42 changes: 42 additions & 0 deletions tests/ConsumerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1519,4 +1519,46 @@ TEST(ConsumerTest, testDuplicatedTopics) {
}
}

TEST(ConsumerTest, testConsumerListenerShouldNotSegfaultAfterClose) {
Client client(lookupUrl);

const int MSG_COUNT = 100;
std::string topicName = "persistent://public/default/my-topic-" + std::to_string(time(nullptr));

// 1. Create producer send 100 msgs
Producer producer;
ProducerConfiguration producerConfig;
producerConfig.setBatchingEnabled(false);
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, producer));
for (int i = 0; i < MSG_COUNT; ++i) {
std::string msg = "my-message-" + std::to_string(i);
Message message = MessageBuilder().setContent(msg).build();
ASSERT_EQ(ResultOk, producer.send(message));
}
ASSERT_EQ(ResultOk, producer.flush());
Comment thread
shibd marked this conversation as resolved.
Outdated

// 2. Create consumer with listener
Consumer consumer;
ConsumerConfiguration consumerConfig;
consumerConfig.setSubscriptionInitialPosition(InitialPositionEarliest);
Latch latchFirstReceiveMsg(1);
Latch latchAfterClosed(1);
consumerConfig.setMessageListener(
[&latchFirstReceiveMsg, &latchAfterClosed](Consumer consumer, const Message& msg) {
latchFirstReceiveMsg.countdown();
std::cout << "Consume message: " << msg.getDataAsString() << std::endl;
Comment thread
shibd marked this conversation as resolved.
Outdated
latchAfterClosed.wait();
});
auto result = client.subscribe(topicName, "test-sub", consumerConfig, consumer);
ASSERT_EQ(ResultOk, result);

// 3. wait first message consumed in listener and then close consumer.
latchFirstReceiveMsg.wait();
ASSERT_EQ(ResultOk, consumer.close());
latchAfterClosed.countdown();

ASSERT_EQ(ResultOk, producer.close());
ASSERT_EQ(ResultOk, client.close());
}

} // namespace pulsar
Loading