Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions include/mp/proxy-io.h
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,22 @@ class EventLoop

//! External context pointer.
void* m_context;

//! Hook called when ProxyServer<ThreadMap>::makeThread() is called.
std::function<void()> testing_hook_makethread;

//! Hook called on the worker thread inside makeThread(), after the thread
//! context is set up and thread_context promise is fulfilled, but before it
//! starts waiting for requests.
std::function<void()> testing_hook_makethread_created;

//! Hook called on the worker thread when it starts to execute an async
//! request. Used by tests to control timing or inject behavior at this
//! point in execution.
std::function<void()> testing_hook_async_request_start;

//! Hook called on the worker thread just before returning results.
std::function<void()> testing_hook_async_request_done;
};

//! Single element task queue used to handle recursive capnp calls. (If the
Expand Down
241 changes: 126 additions & 115 deletions include/mp/type-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,8 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
std::is_same<decltype(Accessor::get(server_context.call_context.getParams())), Context::Reader>::value,
kj::Promise<typename ServerContext::CallContext>>::type
{
const auto& params = server_context.call_context.getParams();
Context::Reader context_arg = Accessor::get(params);
auto& server = server_context.proxy_server;
EventLoop& loop = *server.m_context.loop;
int req = server_context.req;
// Keep a reference to the ProxyServer instance by assigning it to the self
// variable. ProxyServer instances are reference-counted and if the client
Expand All @@ -72,136 +71,148 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
// needs to be destroyed on the event loop thread so it is freed in a sync()
// call below.
auto self = server.thisCap();
auto invoke = [self = kj::mv(self), call_context = kj::mv(server_context.call_context), &server, req, fn, args...](CancelMonitor& cancel_monitor) mutable {
MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server executing request #" << req;
const auto& params = call_context.getParams();
Context::Reader context_arg = Accessor::get(params);
ServerContext server_context{server, call_context, req};
{
// Before invoking the function, store a reference to the
// callbackThread provided by the client in the
// thread_local.request_threads map. This way, if this
// server thread needs to execute any RPCs that call back to
// the client, they will happen on the same client thread
// that is waiting for this function, just like what would
// happen if this were a normal function call made on the
// local stack.
//
// If the request_threads map already has an entry for this
// connection, it will be left unchanged, and it indicates
// that the current thread is an RPC client thread which is
// in the middle of an RPC call, and the current RPC call is
// a nested call from the remote thread handling that RPC
// call. In this case, the callbackThread value should point
// to the same thread already in the map, so there is no
// need to update the map.
auto& thread_context = g_thread_context;
auto& request_threads = thread_context.request_threads;
ConnThread request_thread;
bool inserted{false};
Mutex cancel_mutex;
Lock cancel_lock{cancel_mutex};
server_context.cancel_lock = &cancel_lock;
server.m_context.loop->sync([&] {
// Detect request being canceled before it executes.
if (cancel_monitor.m_canceled) {
server_context.request_canceled = true;
return;
}
// Detect request being canceled while it executes.
assert(!cancel_monitor.m_on_cancel);
cancel_monitor.m_on_cancel = [&server, &server_context, &cancel_mutex, req]() {
MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " canceled while executing.";
// Lock cancel_mutex here to block the event loop
// thread and prevent it from deleting the request's
// params and response structs while the execution
// thread is accessing them. Because this lock is
// released before the event loop thread does delete
// the structs, the mutex does not provide any
// protection from the event loop deleting the
// structs _before_ the execution thread acquires
// it. So in addition to locking the mutex, the
// execution thread always checks request_canceled
// as well before accessing the structs.
Lock cancel_lock{cancel_mutex};
server_context.request_canceled = true;
};
// Update requests_threads map if not canceled.
std::tie(request_thread, inserted) = SetThread(
GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection,
[&] { return context_arg.getCallbackThread(); });
});
auto invoke = [self = kj::mv(self), call_context = kj::mv(server_context.call_context), &server, &loop, req, fn, args...](CancelMonitor& cancel_monitor) mutable {
MP_LOG(loop, Log::Debug) << "IPC server executing request #" << req;
if (loop.testing_hook_async_request_start) loop.testing_hook_async_request_start();
KJ_DEFER(if (loop.testing_hook_async_request_done) loop.testing_hook_async_request_done());
ServerContext server_context{server, call_context, req};
// Before invoking the function, store a reference to the
// callbackThread provided by the client in the
// thread_local.request_threads map. This way, if this
// server thread needs to execute any RPCs that call back to
// the client, they will happen on the same client thread
// that is waiting for this function, just like what would
// happen if this were a normal function call made on the
// local stack.
//
// If the request_threads map already has an entry for this
// connection, it will be left unchanged, and it indicates
// that the current thread is an RPC client thread which is
// in the middle of an RPC call, and the current RPC call is
// a nested call from the remote thread handling that RPC
// call. In this case, the callbackThread value should point
// to the same thread already in the map, so there is no
// need to update the map.
auto& thread_context = g_thread_context;
auto& request_threads = thread_context.request_threads;
ConnThread request_thread;
bool inserted{false};
Mutex cancel_mutex;
Lock cancel_lock{cancel_mutex};
server_context.cancel_lock = &cancel_lock;
loop.sync([&] {
// Detect request being canceled before it executes.
if (cancel_monitor.m_canceled) {
server_context.request_canceled = true;
return;
}
// Detect request being canceled while it executes.
assert(!cancel_monitor.m_on_cancel);
cancel_monitor.m_on_cancel = [&loop, &server_context, &cancel_mutex, req]() {
MP_LOG(loop, Log::Info) << "IPC server request #" << req << " canceled while executing.";
// Lock cancel_mutex here to block the event loop
// thread and prevent it from deleting the request's
// params and response structs while the execution
// thread is accessing them. Because this lock is
// released before the event loop thread does delete
// the structs, the mutex does not provide any
// protection from the event loop deleting the
// structs _before_ the execution thread acquires
// it. So in addition to locking the mutex, the
// execution thread always checks request_canceled
// as well before accessing the structs.
Lock cancel_lock{cancel_mutex};
server_context.request_canceled = true;
};
// Update requests_threads map if not canceled. We know
// the request is not canceled currently because
// cancel_monitor.m_canceled was checked above and this
// code is running on the event loop thread.
std::tie(request_thread, inserted) = SetThread(
GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection,
[&] { return Accessor::get(call_context.getParams()).getCallbackThread(); });
});

// If an entry was inserted into the request_threads map,
// remove it after calling fn.invoke. If an entry was not
// inserted, one already existed, meaning this must be a
// recursive call (IPC call calling back to the caller which
// makes another IPC call), so avoid modifying the map.
const bool erase_thread{inserted};
KJ_DEFER(
// Release the cancel lock before calling loop->sync and
// waiting for the event loop thread, because if a
// cancellation happened, it needs to run the on_cancel
// callback above. It's safe to release cancel_lock at
// this point because the fn.invoke() call below will be
// finished and no longer accessing the params or
// results structs.
cancel_lock.m_lock.unlock();
// Erase the request_threads entry on the event loop
// thread with loop->sync(), so if the connection is
// broken there is not a race between this thread and
// the disconnect handler trying to destroy the thread
// client object.
server.m_context.loop->sync([&] {
auto self_dispose{kj::mv(self)};
if (erase_thread) {
// Look up the thread again without using existing
// iterator since entry may no longer be there after
// a disconnect. Destroy node after releasing
// Waiter::m_mutex, so the ProxyClient<Thread>
// destructor is able to use EventLoop::mutex
// without violating lock order.
ConnThreads::node_type removed;
{
Lock lock(thread_context.waiter->m_mutex);
removed = request_threads.extract(server.m_context.connection);
}
}
});
);
if (server_context.request_canceled) {
MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " canceled before it could be executed";
} else KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]{
try {
fn.invoke(server_context, args...);
} catch (const InterruptException& e) {
MP_LOG(*server.m_context.loop, Log::Info) << "IPC server request #" << req << " interrupted (" << e.what() << ")";
}
})) {
MP_LOG(*server.m_context.loop, Log::Error) << "IPC server request #" << req << " uncaught exception (" << kj::str(*exception).cStr() << ")";
throw kj::mv(*exception);
// If an entry was inserted into the request_threads map,
// remove it after calling fn.invoke. If an entry was not
// inserted, one already existed, meaning this must be a
// recursive call (IPC call calling back to the caller which
// makes another IPC call), so avoid modifying the map.
const bool erase_thread{inserted};
KJ_DEFER(
// Release the cancel lock before calling loop->sync and
// waiting for the event loop thread, because if a
// cancellation happened, it needs to run the on_cancel
// callback above. It's safe to release cancel_lock at
// this point because the fn.invoke() call below will be
// finished and no longer accessing the params or
// results structs.
cancel_lock.m_lock.unlock();
// Erase the request_threads entry on the event loop
// thread with loop->sync(), so if the connection is
// broken there is not a race between this thread and
// the disconnect handler trying to destroy the thread
// client object.
loop.sync([&] {
// Clear cancellation callback. At this point the
// method invocation finished and the result is
// either being returned, or discarded if a
// cancellation happened. So we do not need to be
// notified of cancellations after this point. Also
// we do not want to be notified because
// cancel_mutex and server_context could be out of
// scope when it happens.
cancel_monitor.m_on_cancel = nullptr;
auto self_dispose{kj::mv(self)};
if (erase_thread) {
// Look up the thread again without using existing
// iterator since entry may no longer be there after
// a disconnect. Destroy node after releasing
// Waiter::m_mutex, so the ProxyClient<Thread>
// destructor is able to use EventLoop::mutex
// without violating lock order.
ConnThreads::node_type removed;
{
Lock lock(thread_context.waiter->m_mutex);
removed = request_threads.extract(server.m_context.connection);
}
// End of scope: if KJ_DEFER was reached, it runs here
}
return call_context;
};
});
);
if (server_context.request_canceled) {
MP_LOG(loop, Log::Info) << "IPC server request #" << req << " canceled before it could be executed";
} else KJ_IF_MAYBE(exception, kj::runCatchingExceptions([&]{
try {
fn.invoke(server_context, args...);
} catch (const InterruptException& e) {
MP_LOG(loop, Log::Info) << "IPC server request #" << req << " interrupted (" << e.what() << ")";
}
})) {
MP_LOG(loop, Log::Error) << "IPC server request #" << req << " uncaught exception (" << kj::str(*exception).cStr() << ")";
throw kj::mv(*exception);
}
return call_context;
// End of scope: if KJ_DEFER was reached, it runs here
};

// Lookup Thread object specified by the client. The specified thread should
// be a local Thread::Server object, but it needs to be looked up
// asynchronously with getLocalServer().
const auto& params = server_context.call_context.getParams();
Context::Reader context_arg = Accessor::get(params);
auto thread_client = context_arg.getThread();
auto result = server.m_context.connection->m_threads.getLocalServer(thread_client)
.then([&server, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& perhaps) mutable {
.then([&loop, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& perhaps) mutable {
// Assuming the thread object is found, pass it a pointer to the
// `invoke` lambda above which will invoke the function on that
// thread.
KJ_IF_MAYBE (thread_server, perhaps) {
auto& thread = static_cast<ProxyServer<Thread>&>(*thread_server);
MP_LOG(*server.m_context.loop, Log::Debug)
MP_LOG(loop, Log::Debug)
<< "IPC server post request #" << req << " {" << thread.m_thread_context.thread_name << "}";
return thread.template post<typename ServerContext::CallContext>(std::move(invoke));
} else {
MP_LOG(*server.m_context.loop, Log::Error)
MP_LOG(loop, Log::Error)
<< "IPC server error request #" << req << ", missing thread to execute request";
throw std::runtime_error("invalid thread handle");
}
Expand Down
9 changes: 6 additions & 3 deletions src/mp/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -411,13 +411,16 @@ ProxyServer<ThreadMap>::ProxyServer(Connection& connection) : m_connection(conne

kj::Promise<void> ProxyServer<ThreadMap>::makeThread(MakeThreadContext context)
{
EventLoop& loop{*m_connection.m_loop};
if (loop.testing_hook_makethread) loop.testing_hook_makethread();
const std::string from = context.getParams().getName();
std::promise<ThreadContext*> thread_context;
std::thread thread([&thread_context, from, this]() {
g_thread_context.thread_name = ThreadName(m_connection.m_loop->m_exe_name) + " (from " + from + ")";
std::thread thread([&loop, &thread_context, from]() {
g_thread_context.thread_name = ThreadName(loop.m_exe_name) + " (from " + from + ")";
g_thread_context.waiter = std::make_unique<Waiter>();
thread_context.set_value(&g_thread_context);
Lock lock(g_thread_context.waiter->m_mutex);
thread_context.set_value(&g_thread_context);
if (loop.testing_hook_makethread_created) loop.testing_hook_makethread_created();
// Wait for shutdown signal from ProxyServer<Thread> destructor (signal
// is just waiter getting set to null.)
g_thread_context.waiter->wait(lock, [] { return !g_thread_context.waiter; });
Expand Down
Loading
Loading