Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pulsar/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,15 @@ async def close(self) -> None:
self._consumer.close_async(functools.partial(_set_future, future, value=None))
await future

async def get_last_message_id(self) -> _pulsar.MessageId:
"""
Asynchronously get the last message id.
"""
future = asyncio.get_running_loop().create_future()
self._consumer.get_last_message_id_async(functools.partial(_set_future, future))
id = await future
return id

def topic(self) -> str:
"""
Return the topic this consumer is subscribed to.
Expand Down
8 changes: 7 additions & 1 deletion src/consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,11 @@ MessageId Consumer_get_last_message_id(Consumer& consumer) {
return msgId;
}

void Consumer_get_last_message_id_async(Consumer& consumer, GetLastMessageIdCallback callback) {
py::gil_scoped_release release;
consumer.getLastMessageIdAsync(callback);
}

void Consumer_receiveAsync(Consumer& consumer, ReceiveCallback callback) {
py::gil_scoped_release release;
consumer.receiveAsync(callback);
Expand Down Expand Up @@ -194,7 +199,8 @@ void export_consumer(py::module_& m) {
.def("acknowledge_cumulative_async", &Consumer_acknowledgeCumulativeAsync)
.def("acknowledge_cumulative_async", &Consumer_acknowledgeCumulativeAsync_message_id)
.def("negative_acknowledge_async", &Consumer_negative_acknowledgeAsync)
.def("negative_acknowledge_async", &Consumer_negative_acknowledgeAsync_message_id)
.def("negative_acknowledge_async", &Consumer_negative_acknowledgeAsync_message_id)
.def("get_last_message_id_async", &Consumer_get_last_message_id_async)
.def("close_async", &Consumer_closeAsync)
.def("unsubscribe_async", &Consumer_unsubscribeAsync)
.def("seek_async", &Consumer_seekAsync)
Expand Down
16 changes: 16 additions & 0 deletions tests/asyncio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
)

import pulsar # pylint: disable=import-error
import _pulsar # pylint: disable=import-error
from pulsar.asyncio import ( # pylint: disable=import-error
Client,
Consumer,
Expand Down Expand Up @@ -267,6 +268,21 @@ async def verify_receive(consumer: Consumer):
await verify_receive(consumer)
await consumer.close()

async def test_consumer_get_last_message_id(self):
topic = f'asyncio-test-get-last-message-id-{time.time()}'
sub = 'sub'
consumer = await self._client.subscribe(topic, sub,
consumer_type=pulsar.ConsumerType.Shared)
producer = await self._client.create_producer(topic)
for i in range(5):
msg = f'msg-{i}'.encode()
await producer.send(msg)
last_msg_id = await consumer.get_last_message_id()
assert isinstance(last_msg_id, _pulsar.MessageId)
assert last_msg_id.entry_id() == i
await consumer.acknowledge(last_msg_id)
await consumer.close()

async def test_unsubscribe(self):
topic = f'asyncio-test-unsubscribe-{time.time()}'
sub = 'sub'
Expand Down
Loading