From ce2c61251e5ebcbdf230e9e50168280bc3d7d1eb Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Thu, 9 Apr 2026 20:43:53 +0200 Subject: [PATCH 1/2] Fixed wait method for callbacks. --- python/tests/test_callback_subscription.py | 1 + src/subscriptions/callback.rs | 8 +++++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/python/tests/test_callback_subscription.py b/python/tests/test_callback_subscription.py index b945348..e37cb4e 100644 --- a/python/tests/test_callback_subscription.py +++ b/python/tests/test_callback_subscription.py @@ -52,6 +52,7 @@ async def test_callback_wait_method(nats: Nats) -> None: limit = 10 async def callback(msg: Message) -> None: + await asyncio.sleep(1) received.append(msg.payload) async with nats.subscribe(subject=subj, callback=callback) as sub: diff --git a/src/subscriptions/callback.rs b/src/subscriptions/callback.rs index 2f86495..0ea424c 100644 --- a/src/subscriptions/callback.rs +++ b/src/subscriptions/callback.rs @@ -47,13 +47,18 @@ async fn start_py_sub( mut unsub_receiver: tokio::sync::mpsc::Receiver, end_event: AsyncEvent, ) { + // Required to wait for completion of processing tasks. + // + // This will ensure that end_event is only set after + // processing of all messages is finished. + let mut tasks = tokio::task::JoinSet::new(); loop { tokio::select! { msg = sub.next() => { match msg { Some(message) => { let py_cb = py_callback.clone(); - tokio::spawn(pyo3_async_runtimes::tokio::scope( + tasks.spawn(pyo3_async_runtimes::tokio::scope( locals.clone(), process_message(message, py_cb), )); @@ -77,6 +82,7 @@ async fn start_py_sub( } } } + tasks.join_all().await; end_event.set(); } From 79a0719fb8ac39ee8d64dcad2f7ff752d22a1fb7 Mon Sep 17 00:00:00 2001 From: Pavel Kirilin Date: Thu, 9 Apr 2026 20:46:36 +0200 Subject: [PATCH 2/2] Lowered down sleep to minimize test times. --- python/tests/test_callback_subscription.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/tests/test_callback_subscription.py b/python/tests/test_callback_subscription.py index e37cb4e..43e3d31 100644 --- a/python/tests/test_callback_subscription.py +++ b/python/tests/test_callback_subscription.py @@ -52,7 +52,7 @@ async def test_callback_wait_method(nats: Nats) -> None: limit = 10 async def callback(msg: Message) -> None: - await asyncio.sleep(1) + await asyncio.sleep(0.1) received.append(msg.payload) async with nats.subscribe(subject=subj, callback=callback) as sub: