Skip to content

Commit 79e641d

Browse files
committed
Made IPC MessageDispatcher and its subclasses private details of Connection implementation
1 parent 8280f24 commit 79e641d

3 files changed

Lines changed: 145 additions & 146 deletions

File tree

IPC/ARAIPCAudioUnit_v3.mm

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,12 @@
8282
#endif
8383

8484
public:
85-
void routeReceivedMessage (NSDictionary * _Nonnull message)
85+
void routeReceivedDictionary (NSDictionary * _Nonnull message)
8686
{
8787
ARA_INTERNAL_ASSERT (![NSThread isMainThread]);
8888
const MessageID messageID { [(NSNumber *) [message objectForKey:_messageIDKey] intValue] };
8989
auto decoder { std::make_unique<CFMessageDecoder> ((__bridge CFDictionaryRef) message) };
90-
getMessageDispatcher ()->routeReceivedMessage (messageID, std::move (decoder));
90+
routeReceivedMessage (messageID, std::move (decoder));
9191
}
9292

9393
void sendMessage (MessageID messageID, std::unique_ptr<MessageEncoder> && encoder) override
@@ -150,7 +150,7 @@ void sendMessage (MessageID messageID, std::unique_ptr<MessageEncoder> && encode
150150
_audioUnitChannel.callHostBlock =
151151
^NSDictionary * _Nullable (NSDictionary * _Nonnull message)
152152
{
153-
routeReceivedMessage (message);
153+
routeReceivedDictionary (message);
154154
return [NSDictionary dictionary]; // \todo it would yield better performance if the callHostBlock would allow nil as return value
155155
};
156156

@@ -265,7 +265,7 @@ ARAIPCProxyPlugInRef ARA_CALL ARAIPCAUProxyPlugInInitialize (AUAudioUnit * _Nonn
265265

266266
void ARA_CALL ARAIPCAUProxyPlugInPerformPendingMainThreadTasks (ARAIPCProxyPlugInRef _Nonnull proxyPlugInRef)
267267
{
268-
fromIPCRef (proxyPlugInRef)->getConnection ()->getMainThreadDispatcher ()->processPendingMessageIfNeeded ();
268+
fromIPCRef (proxyPlugInRef)->getConnection ()->processPendingMessageOnCreationThreadIfNeeded ();
269269
}
270270

271271
const ARAPlugInExtensionInstance * _Nonnull ARA_CALL ARAIPCAUProxyPlugInBindToDocumentController (AUAudioUnit * _Nonnull audioUnit,
@@ -346,7 +346,7 @@ ARAIPCMessageChannelRef _Nullable ARA_CALL ARAIPCAUProxyHostInitializeMessageCha
346346
NSDictionary * _Nonnull ARA_CALL ARAIPCAUProxyHostCommandHandler (ARAIPCMessageChannelRef _Nonnull messageChannelRef, NSDictionary * _Nonnull message)
347347
{
348348
if ([message count]) // \todo filter dummy message sent in ProxyPlugInMessageChannel(), see there
349-
static_cast<ProxyHostMessageChannel *> (fromIPCRef (messageChannelRef))->routeReceivedMessage (message);
349+
static_cast<ProxyHostMessageChannel *> (fromIPCRef (messageChannelRef))->routeReceivedDictionary (message);
350350
return [NSDictionary dictionary]; // \todo it would yield better performance if -callAudioUnit: would allow nil as return value
351351
}
352352

IPC/ARAIPCConnection.cpp

Lines changed: 128 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,50 @@ namespace IPC {
136136

137137
static bool _debugAsHost {};
138138

139+
//------------------------------------------------------------------------------
140+
141+
class MessageDispatcher
142+
{
143+
public: // needs to be public for thread-local variables (which cannot be class members)
144+
#if defined (_WIN32)
145+
using ThreadRef = int32_t;
146+
#elif defined (__APPLE__)
147+
using ThreadRef = size_t;
148+
#else
149+
#error "not yet implemented on this platform"
150+
#endif
151+
static constexpr ThreadRef _invalidThread { 0 };
152+
153+
public:
154+
explicit MessageDispatcher (Connection* connection, std::unique_ptr<MessageChannel> && messageChannel);
155+
virtual ~MessageDispatcher () = default;
156+
157+
// protected: this does not work with thread_local...
158+
struct PendingReplyHandler
159+
{
160+
ReplyHandler* const _replyHandler;
161+
const PendingReplyHandler* _prevPendingReplyHandler;
162+
};
163+
164+
static bool isReply (MessageID messageID) { return messageID == 0; }
165+
166+
static ThreadRef getCurrentThread ();
167+
168+
protected:
169+
Connection* getConnection () const { return _connection; }
170+
MessageChannel* getMessageChannel () const { return _messageChannel.get (); }
171+
172+
void _sendMessage (MessageID messageID, std::unique_ptr<MessageEncoder> && encoder, bool isNewTransaction);
173+
bool _waitForMessage ();
174+
void _handleReply (std::unique_ptr<const MessageDecoder> && decoder, ReplyHandler && replyHandler);
175+
std::unique_ptr<MessageEncoder> _handleReceivedMessage (MessageID messageID, std::unique_ptr<const MessageDecoder> && decoder);
176+
177+
private:
178+
Connection* const _connection;
179+
const std::unique_ptr<MessageChannel> _messageChannel;
180+
};
181+
182+
139183
MessageDispatcher::MessageDispatcher (Connection* connection, std::unique_ptr<MessageChannel> && messageChannel)
140184
: _connection { connection },
141185
_messageChannel { std::move (messageChannel) }
@@ -144,8 +188,6 @@ MessageDispatcher::MessageDispatcher (Connection* connection, std::unique_ptr<Me
144188
// unfortunately at least in clang std::thread::id's c'tor isn't constexpr
145189
// static_assert (std::thread::id {} == *reinterpret_cast<const std::thread::id*>(&_invalidThread), "the current implementation relies on invalid thread IDs being 0");
146190
ARA_INTERNAL_ASSERT (std::thread::id {} == *reinterpret_cast<const std::thread::id*>(&_invalidThread));
147-
148-
_messageChannel->setMessageDispatcher (this);
149191
}
150192

151193
MessageDispatcher::ThreadRef MessageDispatcher::getCurrentThread ()
@@ -189,6 +231,44 @@ std::unique_ptr<MessageEncoder> MessageDispatcher::_handleReceivedMessage (Messa
189231
return replyEncoder;
190232
}
191233

234+
//------------------------------------------------------------------------------
235+
236+
// single-threaded variant for main thread communication only
237+
class MainThreadMessageDispatcher : public MessageDispatcher
238+
{
239+
public:
240+
MainThreadMessageDispatcher (Connection* connection, std::unique_ptr<MessageChannel> && messageChannel)
241+
: MessageDispatcher { connection, std::move (messageChannel) }
242+
{
243+
getMessageChannel ()->_receivedMessageRouter = [this] (MessageID messageID, std::unique_ptr<const MessageDecoder> && decoder)
244+
{ routeReceivedMessage (messageID, std::move (decoder)); };
245+
}
246+
247+
void sendMessage (MessageID messageID, std::unique_ptr<MessageEncoder> && encoder, ReplyHandler && replyHandler);
248+
249+
void routeReceivedMessage (MessageID messageID, std::unique_ptr<const MessageDecoder> && decoder);
250+
251+
void processPendingMessageIfNeeded ();
252+
253+
private:
254+
// key to indicate whether an outgoing call is made in response to a currently handled incoming call
255+
// or a new call, which is necessary to deal with the decoupled main threads concurrency
256+
// to optimize for performance, it is only added when the call is a response, being a new call is implicit
257+
// (it is also never added to replies because they always are a response anyways)
258+
static constexpr MessageArgumentKey kIsResponseKey { -1 };
259+
260+
private:
261+
int32_t _processingMessagesCount { 0 };
262+
263+
// \todo this is not allowed for some reason, so we must cast at every use of _noPendingMessageDecoder...
264+
// static constexpr auto _noPendingMessageDecoder { reinterpret_cast<const MessageDecoder*> (static_cast<intptr_t> (-1)) };
265+
static constexpr auto _noPendingMessageDecoder { static_cast<intptr_t> (-1) };
266+
MessageID _pendingMessageID { 0 }; // read/write _pendingMessageDecoder with proper barrier before/after reading/writing this
267+
std::atomic<const MessageDecoder*> _pendingMessageDecoder { reinterpret_cast<const MessageDecoder*> (_noPendingMessageDecoder) };
268+
269+
const PendingReplyHandler* _pendingReplyHandler { nullptr };
270+
};
271+
192272

193273
void MainThreadMessageDispatcher::sendMessage (MessageID messageID, std::unique_ptr<MessageEncoder> && encoder,
194274
ReplyHandler && replyHandler)
@@ -281,6 +361,46 @@ void MainThreadMessageDispatcher::routeReceivedMessage (MessageID messageID, std
281361
}
282362
}
283363

364+
//------------------------------------------------------------------------------
365+
366+
// multi-threaded variant for all non-main thread communication
367+
class OtherThreadsMessageDispatcher : public MessageDispatcher
368+
{
369+
public:
370+
OtherThreadsMessageDispatcher (Connection* connection, std::unique_ptr<MessageChannel> && messageChannel)
371+
: MessageDispatcher { connection, std::move (messageChannel) }
372+
{
373+
getMessageChannel ()->_receivedMessageRouter = [this] (MessageID messageID, std::unique_ptr<const MessageDecoder> && decoder)
374+
{ routeReceivedMessage (messageID, std::move (decoder)); };
375+
}
376+
377+
void sendMessage (MessageID messageID, std::unique_ptr<MessageEncoder> && encoder, ReplyHandler && replyHandler);
378+
379+
void routeReceivedMessage (MessageID messageID, std::unique_ptr<const MessageDecoder> && decoder);
380+
381+
private:
382+
// keys to store the threading information in the IPC messages
383+
static constexpr MessageArgumentKey kSendThreadKey { -1 };
384+
static constexpr MessageArgumentKey kReceiveThreadKey { -2 };
385+
386+
struct RoutedMessage
387+
{
388+
MessageID _messageID { 0 };
389+
std::unique_ptr<const MessageDecoder> _decoder;
390+
ThreadRef _targetThread { _invalidThread };
391+
};
392+
RoutedMessage* _getRoutedMessageForThread (ThreadRef thread);
393+
394+
void _processReceivedMessage (MessageID messageID, std::unique_ptr<const MessageDecoder> && decoder);
395+
396+
private:
397+
// incoming data is stored in _routedMessages by the receive handler for the
398+
// sending threads waiting to pick it up (signalled via _routeReceiveCondition)
399+
std::condition_variable _routeReceiveCondition;
400+
std::vector<RoutedMessage> _routedMessages { 12 }; // we shouldn't use more than a handful of threads concurrently for the IPC
401+
std::mutex _sendLock;
402+
std::mutex _routeLock;
403+
};
284404

285405
// actually "static" members of OtherThreadsMessageDispatcher, but for some reason C++ doesn't allow this...
286406
thread_local OtherThreadsMessageDispatcher::ThreadRef _remoteTargetThread { 0 };
@@ -416,6 +536,7 @@ void OtherThreadsMessageDispatcher::_processReceivedMessage (MessageID messageID
416536
_remoteTargetThread = previousRemoteTargetThread;
417537
}
418538

539+
//------------------------------------------------------------------------------
419540

420541
#if defined (_WIN32)
421542

@@ -572,6 +693,11 @@ bool Connection::waitForMessageOnCreationThread ()
572693
return false;
573694
}
574695

696+
void Connection::processPendingMessageOnCreationThreadIfNeeded ()
697+
{
698+
_mainThreadDispatcher->processPendingMessageIfNeeded ();
699+
}
700+
575701
void Connection::signalMesssageReceived ()
576702
{
577703
#if __cplusplus >= 202002L

0 commit comments

Comments
 (0)