diff --git a/python/tests/test_callback_subscription.py b/python/tests/test_callback_subscription.py index b945348..43e3d31 100644 --- a/python/tests/test_callback_subscription.py +++ b/python/tests/test_callback_subscription.py @@ -52,6 +52,7 @@ async def test_callback_wait_method(nats: Nats) -> None: limit = 10 async def callback(msg: Message) -> None: + await asyncio.sleep(0.1) received.append(msg.payload) async with nats.subscribe(subject=subj, callback=callback) as sub: diff --git a/src/subscriptions/callback.rs b/src/subscriptions/callback.rs index 2f86495..0ea424c 100644 --- a/src/subscriptions/callback.rs +++ b/src/subscriptions/callback.rs @@ -47,13 +47,18 @@ async fn start_py_sub( mut unsub_receiver: tokio::sync::mpsc::Receiver, end_event: AsyncEvent, ) { + // Required to wait for completion of processing tasks. + // + // This will ensure that end_event is only set after + // processing of all messages is finished. + let mut tasks = tokio::task::JoinSet::new(); loop { tokio::select! { msg = sub.next() => { match msg { Some(message) => { let py_cb = py_callback.clone(); - tokio::spawn(pyo3_async_runtimes::tokio::scope( + tasks.spawn(pyo3_async_runtimes::tokio::scope( locals.clone(), process_message(message, py_cb), )); @@ -77,6 +82,7 @@ async fn start_py_sub( } } } + tasks.join_all().await; end_event.set(); }