From a1d643348f4eb462814c0679b8c7dabb6a1b5b4f Mon Sep 17 00:00:00 2001 From: Sjors Provoost Date: Wed, 11 Mar 2026 13:39:38 +0100 Subject: [PATCH 1/8] test: worker thread destroyed before it is initialized Add test for race condition in makeThread that can currently trigger segfaults as reported: https://github.com/bitcoin/bitcoin/issues/34711 https://github.com/bitcoin/bitcoin/issues/34756 The test currently crashes and will be fixed in the next commit. Co-authored-by: Ryan Ofsky git-bisect-skip: yes --- include/mp/proxy-io.h | 8 ++++++++ src/mp/proxy.cpp | 7 +++++-- test/mp/test/test.cpp | 47 +++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 58 insertions(+), 4 deletions(-) diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index 12442cf..03b7c4d 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -340,6 +340,14 @@ class EventLoop //! External context pointer. void* m_context; + + //! Hook called when ProxyServer::makeThread() is called. + std::function testing_hook_makethread; + + //! Hook called on the worker thread inside makeThread(), after the thread + //! context is set up and thread_context promise is fulfilled, but before it + //! starts waiting for requests. + std::function testing_hook_makethread_created; }; //! Single element task queue used to handle recursive capnp calls. (If the diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index da22ae6..a627147 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -411,12 +411,15 @@ ProxyServer::ProxyServer(Connection& connection) : m_connection(conne kj::Promise ProxyServer::makeThread(MakeThreadContext context) { + EventLoop& loop{*m_connection.m_loop}; + if (loop.testing_hook_makethread) loop.testing_hook_makethread(); const std::string from = context.getParams().getName(); std::promise thread_context; - std::thread thread([&thread_context, from, this]() { - g_thread_context.thread_name = ThreadName(m_connection.m_loop->m_exe_name) + " (from " + from + ")"; + std::thread thread([&loop, &thread_context, from]() { + g_thread_context.thread_name = ThreadName(loop.m_exe_name) + " (from " + from + ")"; g_thread_context.waiter = std::make_unique(); thread_context.set_value(&g_thread_context); + if (loop.testing_hook_makethread_created) loop.testing_hook_makethread_created(); Lock lock(g_thread_context.waiter->m_mutex); // Wait for shutdown signal from ProxyServer destructor (signal // is just waiter getting set to null.) diff --git a/test/mp/test/test.cpp b/test/mp/test/test.cpp index bf41663..cab712d 100644 --- a/test/mp/test/test.cpp +++ b/test/mp/test/test.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -49,8 +50,8 @@ static_assert(std::is_integral_v, "MP_MINOR_VERSION * Test setup class creating a two way connection between a * ProxyServer object and a ProxyClient. * - * Provides client_disconnect and server_disconnect lambdas that can be used to - * trigger disconnects and test handling of broken and closed connections. + * Provides disconnection lambdas that can be used to trigger + * disconnects and test handling of broken and closed connections. * * Accepts a client_owns_connection option to test different ProxyClient * destroy_connection values and control whether destroying the ProxyClient @@ -63,6 +64,7 @@ class TestSetup { public: std::function server_disconnect; + std::function server_disconnect_later; std::function client_disconnect; std::promise>> client_promise; std::unique_ptr> client; @@ -88,6 +90,10 @@ class TestSetup return capnp::Capability::Client(kj::mv(server_proxy)); }); server_disconnect = [&] { loop.sync([&] { server_connection.reset(); }); }; + server_disconnect_later = [&] { + assert(std::this_thread::get_id() == loop.m_thread_id); + loop.m_task_set->add(kj::evalLater([&] { server_connection.reset(); })); + }; // Set handler to destroy the server when the client disconnects. This // is ignored if server_disconnect() is called instead. server_connection->onDisconnect([&] { server_connection.reset(); }); @@ -325,6 +331,43 @@ KJ_TEST("Calling IPC method, disconnecting and blocking during the call") signal.set_value(); } +KJ_TEST("Worker thread destroyed before it is initialized") +{ + // Regression test for bitcoin/bitcoin#34711, bitcoin/bitcoin#34756 where a + // worker thread is destroyed before it starts waiting for work. + // + // The test uses the `makethread` hook to trigger a disconnect as soon as + // ProxyServer::makeThread is called, so without the bugfix, + // ProxyServer::~ProxyServer would run and destroy the waiter before + // the worker thread started waiting, causing a SIGSEGV when it did start. + TestSetup setup; + ProxyClient* foo = setup.client.get(); + foo->initThreadMap(); + setup.server->m_impl->m_fn = [] {}; + + EventLoop& loop = *setup.server->m_context.connection->m_loop; + loop.testing_hook_makethread = [&] { + // Use disconnect_later to queue the disconnect, because the makethread + // hook is called on the event loop thread. The disconnect should happen + // as soon as the event loop is idle. + setup.server_disconnect_later(); + }; + loop.testing_hook_makethread_created = [&] { + // Sleep to allow event loop to run and process the queued disconnect + // before the worker thread starts waiting. + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + }; + + bool disconnected{false}; + try { + foo->callFnAsync(); + } catch (const std::runtime_error& e) { + KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect."); + disconnected = true; + } + KJ_EXPECT(disconnected); +} + KJ_TEST("Make simultaneous IPC calls on single remote thread") { TestSetup setup; From f11ec29ed20896cc95246d126adb414268ba4c69 Mon Sep 17 00:00:00 2001 From: Ryan Ofsky Date: Mon, 2 Mar 2026 14:20:44 -0500 Subject: [PATCH 2/8] race fix: worker thread destroyed before it is initialized This fixes a race condition in makeThread that can currently trigger segfaults as reported: https://github.com/bitcoin/bitcoin/issues/34711 https://github.com/bitcoin/bitcoin/issues/34756 The problem is a segfault in ProxyServer::makeThread calling `Lock lock(g_thread_context.waiter->m_mutex);` that happens because the waiter pointer is null. The waiter pointer can be null if the worker thread is destroyed immediately after it is created, because `~ProxyServer` sets it to null. The fix works by moving the lock line above the `thread_context.set_value()` line so the worker thread can't be destroyed before it is fully initialized. A more detailed description of the bug and fix can be found in https://github.com/bitcoin-core/libmultiprocess/pull/249#discussion_r2953134565 The bug can be reproduced by running the unit test added in the previous commit or by calling makeThread and immediately disconnecting or destroying the returned thread. The bug is not new and has existed since makeThread was implemented, but it was found due to a new functional test in bitcoin core and with antithesis testing (see details in linked issues). The fix was originally posted in https://github.com/bitcoin/bitcoin/issues/34711#issuecomment-3986380070 --- src/mp/proxy.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index a627147..d24208d 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -418,9 +418,9 @@ kj::Promise ProxyServer::makeThread(MakeThreadContext context) std::thread thread([&loop, &thread_context, from]() { g_thread_context.thread_name = ThreadName(loop.m_exe_name) + " (from " + from + ")"; g_thread_context.waiter = std::make_unique(); + Lock lock(g_thread_context.waiter->m_mutex); thread_context.set_value(&g_thread_context); if (loop.testing_hook_makethread_created) loop.testing_hook_makethread_created(); - Lock lock(g_thread_context.waiter->m_mutex); // Wait for shutdown signal from ProxyServer destructor (signal // is just waiter getting set to null.) g_thread_context.waiter->wait(lock, [] { return !g_thread_context.waiter; }); From 4a60c39f24a5f6b65781226774c974cd0fbf2bbe Mon Sep 17 00:00:00 2001 From: Sjors Provoost Date: Tue, 10 Mar 2026 13:34:03 -0400 Subject: [PATCH 3/8] test: getParams() called after request cancel Add test for disconnect race condition in the mp.Context PassField() overload that can currently trigger segfaults as reported in https://github.com/bitcoin/bitcoin/issues/34777. The test currently crashes and will be fixed in the next commit. Co-authored-by: Ryan Ofsky git-bisect-skip: yes --- include/mp/proxy-io.h | 5 +++++ include/mp/type-context.h | 2 ++ test/mp/test/test.cpp | 30 ++++++++++++++++++++++++++++++ 3 files changed, 37 insertions(+) diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index 03b7c4d..152e599 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -348,6 +348,11 @@ class EventLoop //! context is set up and thread_context promise is fulfilled, but before it //! starts waiting for requests. std::function testing_hook_makethread_created; + + //! Hook called on the worker thread when it starts to execute an async + //! request. Used by tests to control timing or inject behavior at this + //! point in execution. + std::function testing_hook_async_request_start; }; //! Single element task queue used to handle recursive capnp calls. (If the diff --git a/include/mp/type-context.h b/include/mp/type-context.h index 72c3963..86a80d3 100644 --- a/include/mp/type-context.h +++ b/include/mp/type-context.h @@ -74,6 +74,8 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& auto self = server.thisCap(); auto invoke = [self = kj::mv(self), call_context = kj::mv(server_context.call_context), &server, req, fn, args...](CancelMonitor& cancel_monitor) mutable { MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server executing request #" << req; + EventLoop& loop = *server.m_context.loop; + if (loop.testing_hook_async_request_start) loop.testing_hook_async_request_start(); const auto& params = call_context.getParams(); Context::Reader context_arg = Accessor::get(params); ServerContext server_context{server, call_context, req}; diff --git a/test/mp/test/test.cpp b/test/mp/test/test.cpp index cab712d..d6a93b9 100644 --- a/test/mp/test/test.cpp +++ b/test/mp/test/test.cpp @@ -368,6 +368,36 @@ KJ_TEST("Worker thread destroyed before it is initialized") KJ_EXPECT(disconnected); } +KJ_TEST("Calling async IPC method, with server disconnect racing the call") +{ + // Regression test for bitcoin/bitcoin#34777 heap-use-after-free where + // an async request is canceled before it starts to execute. + // + // Use testing_hook_async_request_start to trigger a disconnect from the + // worker thread as soon as it begins to execute an async request. Without + // the bugfix, the worker thread would trigger a SIGSEGV after this by + // calling call_context.getParams(). + TestSetup setup; + ProxyClient* foo = setup.client.get(); + foo->initThreadMap(); + setup.server->m_impl->m_fn = [] {}; + + EventLoop& loop = *setup.server->m_context.connection->m_loop; + loop.testing_hook_async_request_start = [&] { + setup.server_disconnect(); + // Sleep is necessary to let the event loop fully clean up after the + // disconnect and trigger the SIGSEGV. + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + }; + + try { + foo->callFnAsync(); + KJ_EXPECT(false); + } catch (const std::runtime_error& e) { + KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect."); + } +} + KJ_TEST("Make simultaneous IPC calls on single remote thread") { TestSetup setup; From f5509a31fccaebbb0587dbe0d99b62837d870773 Mon Sep 17 00:00:00 2001 From: Ryan Ofsky Date: Mon, 9 Mar 2026 10:43:37 -0400 Subject: [PATCH 4/8] race fix: getParams() called after request cancel This fixes a race condition in the mp.Context PassField() overload which is used to execute async requests, that can currently trigger segfaults as reported in https://github.com/bitcoin/bitcoin/issues/34777 when it calls call_context.getParams() after a disconnect. The bug can be reproduced by running the unit test added in the previous commit and was also seen in antithesis (see details in linked issue), but should be unlikely to happen normally because PassField checks for cancellation and returns early before actually using the getParams() result. This bug was introduced commit in 0174450ca2e95a4bd1f22e4fd38d83b1d432ac1f which started to cancel requests on disconnects. Before that commit, requests were not canceled and would continue to execute (Cap'n Proto would just discard the responses) so it was ok to call getParams(). This fix was originally posted in https://github.com/bitcoin/bitcoin/issues/34777#issuecomment-4024285314 --- include/mp/type-context.h | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/include/mp/type-context.h b/include/mp/type-context.h index 86a80d3..78932f8 100644 --- a/include/mp/type-context.h +++ b/include/mp/type-context.h @@ -76,8 +76,6 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server executing request #" << req; EventLoop& loop = *server.m_context.loop; if (loop.testing_hook_async_request_start) loop.testing_hook_async_request_start(); - const auto& params = call_context.getParams(); - Context::Reader context_arg = Accessor::get(params); ServerContext server_context{server, call_context, req}; { // Before invoking the function, store a reference to the @@ -128,10 +126,13 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& Lock cancel_lock{cancel_mutex}; server_context.request_canceled = true; }; - // Update requests_threads map if not canceled. + // Update requests_threads map if not canceled. We know + // the request is not canceled currently because + // cancel_monitor.m_canceled was checked above and this + // code is running on the event loop thread. std::tie(request_thread, inserted) = SetThread( GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection, - [&] { return context_arg.getCallbackThread(); }); + [&] { return Accessor::get(call_context.getParams()).getCallbackThread(); }); }); // If an entry was inserted into the request_threads map, From 1643d05ba0751ff8194f94a9cde773e922456b79 Mon Sep 17 00:00:00 2001 From: Sjors Provoost Date: Wed, 11 Mar 2026 11:33:14 +0100 Subject: [PATCH 5/8] test: m_on_cancel called after request finishes Add test disconnect for race condition in the mp.Context PassField() overload reported in https://github.com/bitcoin/bitcoin/issues/34782. The test crashes currently with AddressSanitizer, but will be fixed in the next commit. It's also possible to reproduce the bug without AddressSanitizer by adding an assert: ```diff --- a/include/mp/type-context.h +++ b/include/mp/type-context.h @@ -101,2 +101,3 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& server_context.cancel_lock = &cancel_lock; + KJ_DEFER(server_context.cancel_lock = nullptr); server.m_context.loop->sync([&] { @@ -111,2 +112,3 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " canceled while executing."; + assert(server_context.cancel_lock); // Lock cancel_mutex here to block the event loop ``` Co-authored-by: Ryan Ofsky git-bisect-skip: yes --- include/mp/proxy-io.h | 3 +++ include/mp/type-context.h | 1 + test/mp/test/test.cpp | 29 +++++++++++++++++++++++++++++ 3 files changed, 33 insertions(+) diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index 152e599..d7b9f0e 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -353,6 +353,9 @@ class EventLoop //! request. Used by tests to control timing or inject behavior at this //! point in execution. std::function testing_hook_async_request_start; + + //! Hook called on the worker thread just before returning results. + std::function testing_hook_async_request_done; }; //! Single element task queue used to handle recursive capnp calls. (If the diff --git a/include/mp/type-context.h b/include/mp/type-context.h index 78932f8..0a21b0f 100644 --- a/include/mp/type-context.h +++ b/include/mp/type-context.h @@ -76,6 +76,7 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server executing request #" << req; EventLoop& loop = *server.m_context.loop; if (loop.testing_hook_async_request_start) loop.testing_hook_async_request_start(); + KJ_DEFER(if (loop.testing_hook_async_request_done) loop.testing_hook_async_request_done()); ServerContext server_context{server, call_context, req}; { // Before invoking the function, store a reference to the diff --git a/test/mp/test/test.cpp b/test/mp/test/test.cpp index d6a93b9..d91edb4 100644 --- a/test/mp/test/test.cpp +++ b/test/mp/test/test.cpp @@ -398,6 +398,35 @@ KJ_TEST("Calling async IPC method, with server disconnect racing the call") } } +KJ_TEST("Calling async IPC method, with server disconnect after cleanup") +{ + // Regression test for bitcoin/bitcoin#34782 stack-use-after-return where + // an async request is canceled after it finishes executing but before the + // response is sent. + // + // Use testing_hook_async_request_done to trigger a disconnect from the + // worker thread after it executes an async request but before it returns. + // Without the bugfix, the m_on_cancel callback would be called at this + // point, accessing the cancel_mutex stack variable that had gone out of + // scope. + TestSetup setup; + ProxyClient* foo = setup.client.get(); + foo->initThreadMap(); + setup.server->m_impl->m_fn = [] {}; + + EventLoop& loop = *setup.server->m_context.connection->m_loop; + loop.testing_hook_async_request_done = [&] { + setup.server_disconnect(); + }; + + try { + foo->callFnAsync(); + KJ_EXPECT(false); + } catch (const std::runtime_error& e) { + KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect."); + } +} + KJ_TEST("Make simultaneous IPC calls on single remote thread") { TestSetup setup; From 1dbc59a4aa3893e500cc256999c582bdc8a977e7 Mon Sep 17 00:00:00 2001 From: Ryan Ofsky Date: Tue, 10 Mar 2026 13:25:03 -0400 Subject: [PATCH 6/8] race fix: m_on_cancel called after request finishes This fixes a race condition in the mp.Context PassField() overload which is used to execute async requests, that can currently trigger segfaults as reported in https://github.com/bitcoin/bitcoin/issues/34782 when a cancellation happens after the request executes but before it returns. The bug can be reproduced by running the unit test added in the previous commit and was also seen in antithesis (see details in linked issue), but should be unlikely to happen normally because the cancellation would have to happen in a very short window for there to be a problem. This bug was introduced commit in 0174450ca2e95a4bd1f22e4fd38d83b1d432ac1f which started to cancel requests on disconnects. Before that commit a cancellation callback was not present. This fix was originally posted in https://github.com/bitcoin/bitcoin/issues/34782#issuecomment-4033169085 and there is a sequence diagram explaining the bug in https://github.com/bitcoin-core/libmultiprocess/pull/249#discussion_r2953306851 --- include/mp/type-context.h | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/include/mp/type-context.h b/include/mp/type-context.h index 0a21b0f..405794f 100644 --- a/include/mp/type-context.h +++ b/include/mp/type-context.h @@ -157,6 +157,15 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& // the disconnect handler trying to destroy the thread // client object. server.m_context.loop->sync([&] { + // Clear cancellation callback. At this point the + // method invocation finished and the result is + // either being returned, or discarded if a + // cancellation happened. So we do not need to be + // notified of cancellations after this point. Also + // we do not want to be notified because + // cancel_mutex and server_context could be out of + // scope when it happens. + cancel_monitor.m_on_cancel = nullptr; auto self_dispose{kj::mv(self)}; if (erase_thread) { // Look up the thread again without using existing From ff1d8ba172ad4b16ce58729971b7a49a3933c073 Mon Sep 17 00:00:00 2001 From: Ryan Ofsky Date: Mon, 9 Mar 2026 10:43:37 -0400 Subject: [PATCH 7/8] refactor: Move type-context.h getParams() call closer to use This change has no effect on behavior, it just narrows the scope of the params variable to avoid potential bugs if a cancellation happens and makes them no longer valid. --- include/mp/type-context.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/mp/type-context.h b/include/mp/type-context.h index 405794f..315d932 100644 --- a/include/mp/type-context.h +++ b/include/mp/type-context.h @@ -61,8 +61,6 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& std::is_same::value, kj::Promise>::type { - const auto& params = server_context.call_context.getParams(); - Context::Reader context_arg = Accessor::get(params); auto& server = server_context.proxy_server; int req = server_context.req; // Keep a reference to the ProxyServer instance by assigning it to the self @@ -202,6 +200,8 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& // Lookup Thread object specified by the client. The specified thread should // be a local Thread::Server object, but it needs to be looked up // asynchronously with getLocalServer(). + const auto& params = server_context.call_context.getParams(); + Context::Reader context_arg = Accessor::get(params); auto thread_client = context_arg.getThread(); auto result = server.m_context.connection->m_threads.getLocalServer(thread_client) .then([&server, invoke = kj::mv(invoke), req](const kj::Maybe& perhaps) mutable { From ff0eed1bf183627c89007e71d631f039bd61bfb5 Mon Sep 17 00:00:00 2001 From: Ryan Ofsky Date: Wed, 25 Mar 2026 14:59:34 -0400 Subject: [PATCH 8/8] refactor: Use loop variable in type-context.h Replace `server.m_context.loop` references with `loop` in Context PassField implmentation after a `loop` variable was introduced in a recent commit. Also adjust PassField scopes and indentation without changing behavior. This commit is easiest to review ignoring whitespace. --- include/mp/type-context.h | 250 +++++++++++++++++++------------------- 1 file changed, 124 insertions(+), 126 deletions(-) diff --git a/include/mp/type-context.h b/include/mp/type-context.h index 315d932..8efd4fa 100644 --- a/include/mp/type-context.h +++ b/include/mp/type-context.h @@ -62,6 +62,7 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& kj::Promise>::type { auto& server = server_context.proxy_server; + EventLoop& loop = *server.m_context.loop; int req = server_context.req; // Keep a reference to the ProxyServer instance by assigning it to the self // variable. ProxyServer instances are reference-counted and if the client @@ -70,132 +71,129 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& // needs to be destroyed on the event loop thread so it is freed in a sync() // call below. auto self = server.thisCap(); - auto invoke = [self = kj::mv(self), call_context = kj::mv(server_context.call_context), &server, req, fn, args...](CancelMonitor& cancel_monitor) mutable { - MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server executing request #" << req; - EventLoop& loop = *server.m_context.loop; - if (loop.testing_hook_async_request_start) loop.testing_hook_async_request_start(); - KJ_DEFER(if (loop.testing_hook_async_request_done) loop.testing_hook_async_request_done()); - ServerContext server_context{server, call_context, req}; - { - // Before invoking the function, store a reference to the - // callbackThread provided by the client in the - // thread_local.request_threads map. This way, if this - // server thread needs to execute any RPCs that call back to - // the client, they will happen on the same client thread - // that is waiting for this function, just like what would - // happen if this were a normal function call made on the - // local stack. - // - // If the request_threads map already has an entry for this - // connection, it will be left unchanged, and it indicates - // that the current thread is an RPC client thread which is - // in the middle of an RPC call, and the current RPC call is - // a nested call from the remote thread handling that RPC - // call. In this case, the callbackThread value should point - // to the same thread already in the map, so there is no - // need to update the map. - auto& thread_context = g_thread_context; - auto& request_threads = thread_context.request_threads; - ConnThread request_thread; - bool inserted{false}; - Mutex cancel_mutex; - Lock cancel_lock{cancel_mutex}; - server_context.cancel_lock = &cancel_lock; - server.m_context.loop->sync([&] { - // Detect request being canceled before it executes. - if (cancel_monitor.m_canceled) { - server_context.request_canceled = true; - return; - } - // Detect request being canceled while it executes. - assert(!cancel_monitor.m_on_cancel); - cancel_monitor.m_on_cancel = [&server, &server_context, &cancel_mutex, req]() { - MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " canceled while executing."; - // Lock cancel_mutex here to block the event loop - // thread and prevent it from deleting the request's - // params and response structs while the execution - // thread is accessing them. Because this lock is - // released before the event loop thread does delete - // the structs, the mutex does not provide any - // protection from the event loop deleting the - // structs _before_ the execution thread acquires - // it. So in addition to locking the mutex, the - // execution thread always checks request_canceled - // as well before accessing the structs. - Lock cancel_lock{cancel_mutex}; - server_context.request_canceled = true; - }; - // Update requests_threads map if not canceled. We know - // the request is not canceled currently because - // cancel_monitor.m_canceled was checked above and this - // code is running on the event loop thread. - std::tie(request_thread, inserted) = SetThread( - GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection, - [&] { return Accessor::get(call_context.getParams()).getCallbackThread(); }); - }); + auto invoke = [self = kj::mv(self), call_context = kj::mv(server_context.call_context), &server, &loop, req, fn, args...](CancelMonitor& cancel_monitor) mutable { + MP_LOG(loop, Log::Debug) << "IPC server executing request #" << req; + if (loop.testing_hook_async_request_start) loop.testing_hook_async_request_start(); + KJ_DEFER(if (loop.testing_hook_async_request_done) loop.testing_hook_async_request_done()); + ServerContext server_context{server, call_context, req}; + // Before invoking the function, store a reference to the + // callbackThread provided by the client in the + // thread_local.request_threads map. This way, if this + // server thread needs to execute any RPCs that call back to + // the client, they will happen on the same client thread + // that is waiting for this function, just like what would + // happen if this were a normal function call made on the + // local stack. + // + // If the request_threads map already has an entry for this + // connection, it will be left unchanged, and it indicates + // that the current thread is an RPC client thread which is + // in the middle of an RPC call, and the current RPC call is + // a nested call from the remote thread handling that RPC + // call. In this case, the callbackThread value should point + // to the same thread already in the map, so there is no + // need to update the map. + auto& thread_context = g_thread_context; + auto& request_threads = thread_context.request_threads; + ConnThread request_thread; + bool inserted{false}; + Mutex cancel_mutex; + Lock cancel_lock{cancel_mutex}; + server_context.cancel_lock = &cancel_lock; + loop.sync([&] { + // Detect request being canceled before it executes. + if (cancel_monitor.m_canceled) { + server_context.request_canceled = true; + return; + } + // Detect request being canceled while it executes. + assert(!cancel_monitor.m_on_cancel); + cancel_monitor.m_on_cancel = [&loop, &server_context, &cancel_mutex, req]() { + MP_LOG(loop, Log::Info) << "IPC server request #" << req << " canceled while executing."; + // Lock cancel_mutex here to block the event loop + // thread and prevent it from deleting the request's + // params and response structs while the execution + // thread is accessing them. Because this lock is + // released before the event loop thread does delete + // the structs, the mutex does not provide any + // protection from the event loop deleting the + // structs _before_ the execution thread acquires + // it. So in addition to locking the mutex, the + // execution thread always checks request_canceled + // as well before accessing the structs. + Lock cancel_lock{cancel_mutex}; + server_context.request_canceled = true; + }; + // Update requests_threads map if not canceled. We know + // the request is not canceled currently because + // cancel_monitor.m_canceled was checked above and this + // code is running on the event loop thread. + std::tie(request_thread, inserted) = SetThread( + GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection, + [&] { return Accessor::get(call_context.getParams()).getCallbackThread(); }); + }); - // If an entry was inserted into the request_threads map, - // remove it after calling fn.invoke. If an entry was not - // inserted, one already existed, meaning this must be a - // recursive call (IPC call calling back to the caller which - // makes another IPC call), so avoid modifying the map. - const bool erase_thread{inserted}; - KJ_DEFER( - // Release the cancel lock before calling loop->sync and - // waiting for the event loop thread, because if a - // cancellation happened, it needs to run the on_cancel - // callback above. It's safe to release cancel_lock at - // this point because the fn.invoke() call below will be - // finished and no longer accessing the params or - // results structs. - cancel_lock.m_lock.unlock(); - // Erase the request_threads entry on the event loop - // thread with loop->sync(), so if the connection is - // broken there is not a race between this thread and - // the disconnect handler trying to destroy the thread - // client object. - server.m_context.loop->sync([&] { - // Clear cancellation callback. At this point the - // method invocation finished and the result is - // either being returned, or discarded if a - // cancellation happened. So we do not need to be - // notified of cancellations after this point. Also - // we do not want to be notified because - // cancel_mutex and server_context could be out of - // scope when it happens. - cancel_monitor.m_on_cancel = nullptr; - auto self_dispose{kj::mv(self)}; - if (erase_thread) { - // Look up the thread again without using existing - // iterator since entry may no longer be there after - // a disconnect. Destroy node after releasing - // Waiter::m_mutex, so the ProxyClient - // destructor is able to use EventLoop::mutex - // without violating lock order. - ConnThreads::node_type removed; - { - Lock lock(thread_context.waiter->m_mutex); - removed = request_threads.extract(server.m_context.connection); - } - } - }); - ); - if (server_context.request_canceled) { - MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " canceled before it could be executed"; - } else KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]{ - try { - fn.invoke(server_context, args...); - } catch (const InterruptException& e) { - MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " interrupted (" << e.what() << ")"; - } - })) { - MP_LOG(*server.m_context.loop, Log::Error) << "IPC server request #" << req << " uncaught exception (" << kj::str(*exception).cStr() << ")"; - throw kj::mv(*exception); + // If an entry was inserted into the request_threads map, + // remove it after calling fn.invoke. If an entry was not + // inserted, one already existed, meaning this must be a + // recursive call (IPC call calling back to the caller which + // makes another IPC call), so avoid modifying the map. + const bool erase_thread{inserted}; + KJ_DEFER( + // Release the cancel lock before calling loop->sync and + // waiting for the event loop thread, because if a + // cancellation happened, it needs to run the on_cancel + // callback above. It's safe to release cancel_lock at + // this point because the fn.invoke() call below will be + // finished and no longer accessing the params or + // results structs. + cancel_lock.m_lock.unlock(); + // Erase the request_threads entry on the event loop + // thread with loop->sync(), so if the connection is + // broken there is not a race between this thread and + // the disconnect handler trying to destroy the thread + // client object. + loop.sync([&] { + // Clear cancellation callback. At this point the + // method invocation finished and the result is + // either being returned, or discarded if a + // cancellation happened. So we do not need to be + // notified of cancellations after this point. Also + // we do not want to be notified because + // cancel_mutex and server_context could be out of + // scope when it happens. + cancel_monitor.m_on_cancel = nullptr; + auto self_dispose{kj::mv(self)}; + if (erase_thread) { + // Look up the thread again without using existing + // iterator since entry may no longer be there after + // a disconnect. Destroy node after releasing + // Waiter::m_mutex, so the ProxyClient + // destructor is able to use EventLoop::mutex + // without violating lock order. + ConnThreads::node_type removed; + { + Lock lock(thread_context.waiter->m_mutex); + removed = request_threads.extract(server.m_context.connection); } - // End of scope: if KJ_DEFER was reached, it runs here } - return call_context; - }; + }); + ); + if (server_context.request_canceled) { + MP_LOG(loop, Log::Info) << "IPC server request #" << req << " canceled before it could be executed"; + } else KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]{ + try { + fn.invoke(server_context, args...); + } catch (const InterruptException& e) { + MP_LOG(loop, Log::Info) << "IPC server request #" << req << " interrupted (" << e.what() << ")"; + } + })) { + MP_LOG(loop, Log::Error) << "IPC server request #" << req << " uncaught exception (" << kj::str(*exception).cStr() << ")"; + throw kj::mv(*exception); + } + return call_context; + // End of scope: if KJ_DEFER was reached, it runs here + }; // Lookup Thread object specified by the client. The specified thread should // be a local Thread::Server object, but it needs to be looked up @@ -204,17 +202,17 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& Context::Reader context_arg = Accessor::get(params); auto thread_client = context_arg.getThread(); auto result = server.m_context.connection->m_threads.getLocalServer(thread_client) - .then([&server, invoke = kj::mv(invoke), req](const kj::Maybe& perhaps) mutable { + .then([&loop, invoke = kj::mv(invoke), req](const kj::Maybe& perhaps) mutable { // Assuming the thread object is found, pass it a pointer to the // `invoke` lambda above which will invoke the function on that // thread. KJ_IF_MAYBE (thread_server, perhaps) { auto& thread = static_cast&>(*thread_server); - MP_LOG(*server.m_context.loop, Log::Debug) + MP_LOG(loop, Log::Debug) << "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}"; return thread.template post(std::move(invoke)); } else { - MP_LOG(*server.m_context.loop, Log::Error) + MP_LOG(loop, Log::Error) << "IPC server error request #" << req << ", missing thread to execute request"; throw std::runtime_error("invalid thread handle"); }