diff --git a/include/mp/proxy-io.h b/include/mp/proxy-io.h index 12442cf..d7b9f0e 100644 --- a/include/mp/proxy-io.h +++ b/include/mp/proxy-io.h @@ -340,6 +340,22 @@ class EventLoop //! External context pointer. void* m_context; + + //! Hook called when ProxyServer::makeThread() is called. + std::function testing_hook_makethread; + + //! Hook called on the worker thread inside makeThread(), after the thread + //! context is set up and thread_context promise is fulfilled, but before it + //! starts waiting for requests. + std::function testing_hook_makethread_created; + + //! Hook called on the worker thread when it starts to execute an async + //! request. Used by tests to control timing or inject behavior at this + //! point in execution. + std::function testing_hook_async_request_start; + + //! Hook called on the worker thread just before returning results. + std::function testing_hook_async_request_done; }; //! Single element task queue used to handle recursive capnp calls. (If the diff --git a/include/mp/type-context.h b/include/mp/type-context.h index 72c3963..8efd4fa 100644 --- a/include/mp/type-context.h +++ b/include/mp/type-context.h @@ -61,9 +61,8 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& std::is_same::value, kj::Promise>::type { - const auto& params = server_context.call_context.getParams(); - Context::Reader context_arg = Accessor::get(params); auto& server = server_context.proxy_server; + EventLoop& loop = *server.m_context.loop; int req = server_context.req; // Keep a reference to the ProxyServer instance by assigning it to the self // variable. ProxyServer instances are reference-counted and if the client @@ -72,136 +71,148 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& // needs to be destroyed on the event loop thread so it is freed in a sync() // call below. auto self = server.thisCap(); - auto invoke = [self = kj::mv(self), call_context = kj::mv(server_context.call_context), &server, req, fn, args...](CancelMonitor& cancel_monitor) mutable { - MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server executing request #" << req; - const auto& params = call_context.getParams(); - Context::Reader context_arg = Accessor::get(params); - ServerContext server_context{server, call_context, req}; - { - // Before invoking the function, store a reference to the - // callbackThread provided by the client in the - // thread_local.request_threads map. This way, if this - // server thread needs to execute any RPCs that call back to - // the client, they will happen on the same client thread - // that is waiting for this function, just like what would - // happen if this were a normal function call made on the - // local stack. - // - // If the request_threads map already has an entry for this - // connection, it will be left unchanged, and it indicates - // that the current thread is an RPC client thread which is - // in the middle of an RPC call, and the current RPC call is - // a nested call from the remote thread handling that RPC - // call. In this case, the callbackThread value should point - // to the same thread already in the map, so there is no - // need to update the map. - auto& thread_context = g_thread_context; - auto& request_threads = thread_context.request_threads; - ConnThread request_thread; - bool inserted{false}; - Mutex cancel_mutex; - Lock cancel_lock{cancel_mutex}; - server_context.cancel_lock = &cancel_lock; - server.m_context.loop->sync([&] { - // Detect request being canceled before it executes. - if (cancel_monitor.m_canceled) { - server_context.request_canceled = true; - return; - } - // Detect request being canceled while it executes. - assert(!cancel_monitor.m_on_cancel); - cancel_monitor.m_on_cancel = [&server, &server_context, &cancel_mutex, req]() { - MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " canceled while executing."; - // Lock cancel_mutex here to block the event loop - // thread and prevent it from deleting the request's - // params and response structs while the execution - // thread is accessing them. Because this lock is - // released before the event loop thread does delete - // the structs, the mutex does not provide any - // protection from the event loop deleting the - // structs _before_ the execution thread acquires - // it. So in addition to locking the mutex, the - // execution thread always checks request_canceled - // as well before accessing the structs. - Lock cancel_lock{cancel_mutex}; - server_context.request_canceled = true; - }; - // Update requests_threads map if not canceled. - std::tie(request_thread, inserted) = SetThread( - GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection, - [&] { return context_arg.getCallbackThread(); }); - }); + auto invoke = [self = kj::mv(self), call_context = kj::mv(server_context.call_context), &server, &loop, req, fn, args...](CancelMonitor& cancel_monitor) mutable { + MP_LOG(loop, Log::Debug) << "IPC server executing request #" << req; + if (loop.testing_hook_async_request_start) loop.testing_hook_async_request_start(); + KJ_DEFER(if (loop.testing_hook_async_request_done) loop.testing_hook_async_request_done()); + ServerContext server_context{server, call_context, req}; + // Before invoking the function, store a reference to the + // callbackThread provided by the client in the + // thread_local.request_threads map. This way, if this + // server thread needs to execute any RPCs that call back to + // the client, they will happen on the same client thread + // that is waiting for this function, just like what would + // happen if this were a normal function call made on the + // local stack. + // + // If the request_threads map already has an entry for this + // connection, it will be left unchanged, and it indicates + // that the current thread is an RPC client thread which is + // in the middle of an RPC call, and the current RPC call is + // a nested call from the remote thread handling that RPC + // call. In this case, the callbackThread value should point + // to the same thread already in the map, so there is no + // need to update the map. + auto& thread_context = g_thread_context; + auto& request_threads = thread_context.request_threads; + ConnThread request_thread; + bool inserted{false}; + Mutex cancel_mutex; + Lock cancel_lock{cancel_mutex}; + server_context.cancel_lock = &cancel_lock; + loop.sync([&] { + // Detect request being canceled before it executes. + if (cancel_monitor.m_canceled) { + server_context.request_canceled = true; + return; + } + // Detect request being canceled while it executes. + assert(!cancel_monitor.m_on_cancel); + cancel_monitor.m_on_cancel = [&loop, &server_context, &cancel_mutex, req]() { + MP_LOG(loop, Log::Info) << "IPC server request #" << req << " canceled while executing."; + // Lock cancel_mutex here to block the event loop + // thread and prevent it from deleting the request's + // params and response structs while the execution + // thread is accessing them. Because this lock is + // released before the event loop thread does delete + // the structs, the mutex does not provide any + // protection from the event loop deleting the + // structs _before_ the execution thread acquires + // it. So in addition to locking the mutex, the + // execution thread always checks request_canceled + // as well before accessing the structs. + Lock cancel_lock{cancel_mutex}; + server_context.request_canceled = true; + }; + // Update requests_threads map if not canceled. We know + // the request is not canceled currently because + // cancel_monitor.m_canceled was checked above and this + // code is running on the event loop thread. + std::tie(request_thread, inserted) = SetThread( + GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection, + [&] { return Accessor::get(call_context.getParams()).getCallbackThread(); }); + }); - // If an entry was inserted into the request_threads map, - // remove it after calling fn.invoke. If an entry was not - // inserted, one already existed, meaning this must be a - // recursive call (IPC call calling back to the caller which - // makes another IPC call), so avoid modifying the map. - const bool erase_thread{inserted}; - KJ_DEFER( - // Release the cancel lock before calling loop->sync and - // waiting for the event loop thread, because if a - // cancellation happened, it needs to run the on_cancel - // callback above. It's safe to release cancel_lock at - // this point because the fn.invoke() call below will be - // finished and no longer accessing the params or - // results structs. - cancel_lock.m_lock.unlock(); - // Erase the request_threads entry on the event loop - // thread with loop->sync(), so if the connection is - // broken there is not a race between this thread and - // the disconnect handler trying to destroy the thread - // client object. - server.m_context.loop->sync([&] { - auto self_dispose{kj::mv(self)}; - if (erase_thread) { - // Look up the thread again without using existing - // iterator since entry may no longer be there after - // a disconnect. Destroy node after releasing - // Waiter::m_mutex, so the ProxyClient - // destructor is able to use EventLoop::mutex - // without violating lock order. - ConnThreads::node_type removed; - { - Lock lock(thread_context.waiter->m_mutex); - removed = request_threads.extract(server.m_context.connection); - } - } - }); - ); - if (server_context.request_canceled) { - MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " canceled before it could be executed"; - } else KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]{ - try { - fn.invoke(server_context, args...); - } catch (const InterruptException& e) { - MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " interrupted (" << e.what() << ")"; - } - })) { - MP_LOG(*server.m_context.loop, Log::Error) << "IPC server request #" << req << " uncaught exception (" << kj::str(*exception).cStr() << ")"; - throw kj::mv(*exception); + // If an entry was inserted into the request_threads map, + // remove it after calling fn.invoke. If an entry was not + // inserted, one already existed, meaning this must be a + // recursive call (IPC call calling back to the caller which + // makes another IPC call), so avoid modifying the map. + const bool erase_thread{inserted}; + KJ_DEFER( + // Release the cancel lock before calling loop->sync and + // waiting for the event loop thread, because if a + // cancellation happened, it needs to run the on_cancel + // callback above. It's safe to release cancel_lock at + // this point because the fn.invoke() call below will be + // finished and no longer accessing the params or + // results structs. + cancel_lock.m_lock.unlock(); + // Erase the request_threads entry on the event loop + // thread with loop->sync(), so if the connection is + // broken there is not a race between this thread and + // the disconnect handler trying to destroy the thread + // client object. + loop.sync([&] { + // Clear cancellation callback. At this point the + // method invocation finished and the result is + // either being returned, or discarded if a + // cancellation happened. So we do not need to be + // notified of cancellations after this point. Also + // we do not want to be notified because + // cancel_mutex and server_context could be out of + // scope when it happens. + cancel_monitor.m_on_cancel = nullptr; + auto self_dispose{kj::mv(self)}; + if (erase_thread) { + // Look up the thread again without using existing + // iterator since entry may no longer be there after + // a disconnect. Destroy node after releasing + // Waiter::m_mutex, so the ProxyClient + // destructor is able to use EventLoop::mutex + // without violating lock order. + ConnThreads::node_type removed; + { + Lock lock(thread_context.waiter->m_mutex); + removed = request_threads.extract(server.m_context.connection); } - // End of scope: if KJ_DEFER was reached, it runs here } - return call_context; - }; + }); + ); + if (server_context.request_canceled) { + MP_LOG(loop, Log::Info) << "IPC server request #" << req << " canceled before it could be executed"; + } else KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]{ + try { + fn.invoke(server_context, args...); + } catch (const InterruptException& e) { + MP_LOG(loop, Log::Info) << "IPC server request #" << req << " interrupted (" << e.what() << ")"; + } + })) { + MP_LOG(loop, Log::Error) << "IPC server request #" << req << " uncaught exception (" << kj::str(*exception).cStr() << ")"; + throw kj::mv(*exception); + } + return call_context; + // End of scope: if KJ_DEFER was reached, it runs here + }; // Lookup Thread object specified by the client. The specified thread should // be a local Thread::Server object, but it needs to be looked up // asynchronously with getLocalServer(). + const auto& params = server_context.call_context.getParams(); + Context::Reader context_arg = Accessor::get(params); auto thread_client = context_arg.getThread(); auto result = server.m_context.connection->m_threads.getLocalServer(thread_client) - .then([&server, invoke = kj::mv(invoke), req](const kj::Maybe& perhaps) mutable { + .then([&loop, invoke = kj::mv(invoke), req](const kj::Maybe& perhaps) mutable { // Assuming the thread object is found, pass it a pointer to the // `invoke` lambda above which will invoke the function on that // thread. KJ_IF_MAYBE (thread_server, perhaps) { auto& thread = static_cast&>(*thread_server); - MP_LOG(*server.m_context.loop, Log::Debug) + MP_LOG(loop, Log::Debug) << "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}"; return thread.template post(std::move(invoke)); } else { - MP_LOG(*server.m_context.loop, Log::Error) + MP_LOG(loop, Log::Error) << "IPC server error request #" << req << ", missing thread to execute request"; throw std::runtime_error("invalid thread handle"); } diff --git a/src/mp/proxy.cpp b/src/mp/proxy.cpp index da22ae6..d24208d 100644 --- a/src/mp/proxy.cpp +++ b/src/mp/proxy.cpp @@ -411,13 +411,16 @@ ProxyServer::ProxyServer(Connection& connection) : m_connection(conne kj::Promise ProxyServer::makeThread(MakeThreadContext context) { + EventLoop& loop{*m_connection.m_loop}; + if (loop.testing_hook_makethread) loop.testing_hook_makethread(); const std::string from = context.getParams().getName(); std::promise thread_context; - std::thread thread([&thread_context, from, this]() { - g_thread_context.thread_name = ThreadName(m_connection.m_loop->m_exe_name) + " (from " + from + ")"; + std::thread thread([&loop, &thread_context, from]() { + g_thread_context.thread_name = ThreadName(loop.m_exe_name) + " (from " + from + ")"; g_thread_context.waiter = std::make_unique(); - thread_context.set_value(&g_thread_context); Lock lock(g_thread_context.waiter->m_mutex); + thread_context.set_value(&g_thread_context); + if (loop.testing_hook_makethread_created) loop.testing_hook_makethread_created(); // Wait for shutdown signal from ProxyServer destructor (signal // is just waiter getting set to null.) g_thread_context.waiter->wait(lock, [] { return !g_thread_context.waiter; }); diff --git a/test/mp/test/test.cpp b/test/mp/test/test.cpp index bf41663..d91edb4 100644 --- a/test/mp/test/test.cpp +++ b/test/mp/test/test.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -49,8 +50,8 @@ static_assert(std::is_integral_v, "MP_MINOR_VERSION * Test setup class creating a two way connection between a * ProxyServer object and a ProxyClient. * - * Provides client_disconnect and server_disconnect lambdas that can be used to - * trigger disconnects and test handling of broken and closed connections. + * Provides disconnection lambdas that can be used to trigger + * disconnects and test handling of broken and closed connections. * * Accepts a client_owns_connection option to test different ProxyClient * destroy_connection values and control whether destroying the ProxyClient @@ -63,6 +64,7 @@ class TestSetup { public: std::function server_disconnect; + std::function server_disconnect_later; std::function client_disconnect; std::promise>> client_promise; std::unique_ptr> client; @@ -88,6 +90,10 @@ class TestSetup return capnp::Capability::Client(kj::mv(server_proxy)); }); server_disconnect = [&] { loop.sync([&] { server_connection.reset(); }); }; + server_disconnect_later = [&] { + assert(std::this_thread::get_id() == loop.m_thread_id); + loop.m_task_set->add(kj::evalLater([&] { server_connection.reset(); })); + }; // Set handler to destroy the server when the client disconnects. This // is ignored if server_disconnect() is called instead. server_connection->onDisconnect([&] { server_connection.reset(); }); @@ -325,6 +331,102 @@ KJ_TEST("Calling IPC method, disconnecting and blocking during the call") signal.set_value(); } +KJ_TEST("Worker thread destroyed before it is initialized") +{ + // Regression test for bitcoin/bitcoin#34711, bitcoin/bitcoin#34756 where a + // worker thread is destroyed before it starts waiting for work. + // + // The test uses the `makethread` hook to trigger a disconnect as soon as + // ProxyServer::makeThread is called, so without the bugfix, + // ProxyServer::~ProxyServer would run and destroy the waiter before + // the worker thread started waiting, causing a SIGSEGV when it did start. + TestSetup setup; + ProxyClient* foo = setup.client.get(); + foo->initThreadMap(); + setup.server->m_impl->m_fn = [] {}; + + EventLoop& loop = *setup.server->m_context.connection->m_loop; + loop.testing_hook_makethread = [&] { + // Use disconnect_later to queue the disconnect, because the makethread + // hook is called on the event loop thread. The disconnect should happen + // as soon as the event loop is idle. + setup.server_disconnect_later(); + }; + loop.testing_hook_makethread_created = [&] { + // Sleep to allow event loop to run and process the queued disconnect + // before the worker thread starts waiting. + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + }; + + bool disconnected{false}; + try { + foo->callFnAsync(); + } catch (const std::runtime_error& e) { + KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect."); + disconnected = true; + } + KJ_EXPECT(disconnected); +} + +KJ_TEST("Calling async IPC method, with server disconnect racing the call") +{ + // Regression test for bitcoin/bitcoin#34777 heap-use-after-free where + // an async request is canceled before it starts to execute. + // + // Use testing_hook_async_request_start to trigger a disconnect from the + // worker thread as soon as it begins to execute an async request. Without + // the bugfix, the worker thread would trigger a SIGSEGV after this by + // calling call_context.getParams(). + TestSetup setup; + ProxyClient* foo = setup.client.get(); + foo->initThreadMap(); + setup.server->m_impl->m_fn = [] {}; + + EventLoop& loop = *setup.server->m_context.connection->m_loop; + loop.testing_hook_async_request_start = [&] { + setup.server_disconnect(); + // Sleep is necessary to let the event loop fully clean up after the + // disconnect and trigger the SIGSEGV. + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + }; + + try { + foo->callFnAsync(); + KJ_EXPECT(false); + } catch (const std::runtime_error& e) { + KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect."); + } +} + +KJ_TEST("Calling async IPC method, with server disconnect after cleanup") +{ + // Regression test for bitcoin/bitcoin#34782 stack-use-after-return where + // an async request is canceled after it finishes executing but before the + // response is sent. + // + // Use testing_hook_async_request_done to trigger a disconnect from the + // worker thread after it executes an async request but before it returns. + // Without the bugfix, the m_on_cancel callback would be called at this + // point, accessing the cancel_mutex stack variable that had gone out of + // scope. + TestSetup setup; + ProxyClient* foo = setup.client.get(); + foo->initThreadMap(); + setup.server->m_impl->m_fn = [] {}; + + EventLoop& loop = *setup.server->m_context.connection->m_loop; + loop.testing_hook_async_request_done = [&] { + setup.server_disconnect(); + }; + + try { + foo->callFnAsync(); + KJ_EXPECT(false); + } catch (const std::runtime_error& e) { + KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect."); + } +} + KJ_TEST("Make simultaneous IPC calls on single remote thread") { TestSetup setup;