diff --git a/plans/2026-05-18-parallel-message-processing.md b/plans/2026-05-18-parallel-message-processing.md new file mode 100644 index 0000000000..4c9a34ecba --- /dev/null +++ b/plans/2026-05-18-parallel-message-processing.md @@ -0,0 +1,161 @@ +# Parallel Message Processing - Eliminate Single-Thread Bottlenecks + +## Problem + +Message reception flows through two single-thread bottlenecks: + +1. **Agent `msgQ` bottleneck**: Multiple SMP server connections write to one shared `TBQueue` (`AgentClient.msgQ` / `SMPClientAgent.msgQ`). A single `subscriber` thread reads and processes all messages sequentially - DB lookups, double-ratchet decryption, DB writes - regardless of which connection they came from. + +2. **Chat `subQ` bottleneck**: The agent's `subscriber` thread writes processed events to one shared `TBQueue` (`AgentClient.subQ`). A single `agentSubscriber` thread in simplex-chat reads and processes all events sequentially. + +Both bottlenecks serialize work that could run in parallel, since messages from different connections are independent. + +## Solution + +Replace queues with callbacks at both layers. The producer calls a processing function directly in its own thread. + +### Layer 1: SMP client - eliminate `msgQ` + +**Current flow:** +``` +SMP connection thread -> writeTBQueue msgQ -> subscriber thread -> processSMPTransmissions +``` + +**New flow:** +``` +SMP connection thread -> processMsg callback (with per-client MVar lock) +``` + +**Why the MVar lock:** Within one SMP client, two threads produce messages: +- The receive loop (`processMsgs` in `Client.hs:686`) +- `writeSMPMessage` (`Client.hs:874`) - called from `processSUBResponse_` when a SUB response includes an inline MSG + +These two must be serialized within one client. An MVar lock ensures they take turns calling the callback. Across different clients (different server connections), no lock is shared - natural parallelism. + +#### Changes + +**`src/Simplex/Messaging/Client.hs`:** +- In `PClient`: replace `msgQ :: Maybe (TBQueue ...)` with `processServerMsg :: Maybe (ServerTransmissionBatch v err msg -> IO ())` and `processLock :: MVar ()` +- `processMsgs`: acquire `processLock`, call `processServerMsg` with the batch +- `writeSMPMessage`: acquire `processLock`, call `processServerMsg` +- `getProtocolClient`: takes `Maybe (ServerTransmissionBatch v err msg -> IO ())` instead of `Maybe (TBQueue ...)` +- `smpClientStub`: sets `processServerMsg = Nothing` +- `serverTransmission`: unchanged + +**`src/Simplex/Messaging/Agent/Client.hs`:** +- Remove `msgQ` field from `AgentClient` +- `smpConnectClient`: pass `processSMPTransmissions` wrapper as callback instead of `Just msgQ` +- Remove `AgentQueuesInfo` and `getAgentQueuesInfo` entirely (dead with no queues to monitor) +- Add `inflightCallbacks :: TVar Int` for monitoring instead - increment before callback, decrement in bracket + +**`src/Simplex/Messaging/Agent.hs`:** +- Remove `subscriber` function +- Remove `subscriber` from `runAgentThreads` +- `processSMPTransmissions` stays, called directly from SMP client threads +- `agentOperationBracket c AORcvNetwork` moves into the callback wrapper +- Exception handling: wrap callback with `catchOwn` matching current `subscriber`'s error handling + +**`src/Simplex/Messaging/Client/Agent.hs`:** +- `SMPClientAgent`: replace `msgQ` with callback field `processServerMsg :: ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO ()` +- `newSMPClientAgent`: takes callback parameter instead of creating `msgQ` +- `connectClient`: passes callback to `getProtocolClient` + +**`src/Simplex/Messaging/Notifications/Server.hs`:** +- `ntfSubscriber`: remove `receiveSMP` loop; the processing logic becomes the callback passed via `SMPClientAgent` +- Processing stays in M (via `UnliftIO` or pre-bound env) + +**Tests (`tests/SMPProxyTests.hs`):** +- 2 sites: change `getProtocolClient ... (Just msgQ) ...` to pass a callback that writes to a local test TBQueue + +### Layer 2: Agent to chat - eliminate `subQ` + +**Current flow:** +``` +agent processSMPTransmissions -> writeTBQueue subQ -> chat agentSubscriber -> process +``` + +**New flow:** +``` +agent processSMPTransmissions -> processEvent callback [events] +``` + +**Key design decisions:** +- Callback takes `[ATransmission]` (list), not single event. All events from one connection batch are passed together to maintain ordering within a connection. +- Error notifications (currently `nonBlockingWriteTBQueue`) use `forkIO $ callback [event]` - fire-and-forget, order doesn't matter for errors. +- The `isFullTBQueue subQ` / pending mechanism disappears - the callback receives the full list directly, no need to buffer/flush. +- `AgentClient` keeps `testQ :: Maybe (TBQueue ATransmission)` for tests only. + +#### Changes + +**`src/Simplex/Messaging/Agent/Client.hs`:** +- Replace `subQ :: TBQueue ATransmission` with: + - `processEvent :: [ATransmission] -> IO ()` - callback, accepts event list + - `testQ :: Maybe (TBQueue ATransmission)` - test-only, `Nothing` in production +- Remove `AgentQueuesInfo` / `getAgentQueuesInfo` +- Add `inflightCallbacks :: TVar Int` with bracket: `withInflight c $ processEvent c events` + +**`src/Simplex/Messaging/Agent.hs`:** +- `processSMPTransmissions`: accumulate events in a local list (currently uses `pendingMsgs` TVar + flush pattern). Call `processEvent` once at end with the full list. +- `runCommandProcessing`: same - call `processEvent` once with all events for the command batch. Remove `isFullTBQueue`/pending logic. +- All `notify`/`notify'` helpers within `processSMPTransmissions` write to a local `TVar [ATransmission]` instead of directly to `subQ`. Flushed at end as single `processEvent` call. +- Error sites (currently `nonBlockingWriteTBQueue`): use `forkIO $ processEvent c [event]` +- Other direct `writeTBQueue subQ` sites (CONNECT/DISCONNECT events, SUSPENDED, etc.): call `processEvent c [event]` directly. +- Remove `subscriber` function entirely. +- Exception safety: `processEvent` call wrapped in bracket that catches "own" exceptions and logs them. + +**`src/Simplex/Messaging/Agent/Client.hs`:** +- `notifySub'` (line 838): change to `forkIO $ processEvent c [event]` (non-blocking error notification) + +**`src/Simplex/Messaging/Agent/NtfSubSupervisor.hs`:** +- 1 site: change `nonBlockingWriteTBQueue subQ event` to `forkIO $ processEvent c [event]` + +**`src/Simplex/FileTransfer/Agent.hs`:** +- 1 site (line 351): `notify` helper changes to `processEvent c [event]` + +**`simplex-chat/src/Simplex/Chat/Library/Commands.hs`:** +- Remove `agentSubscriber` thread +- Pass chat's `process` function (adapted to accept `[ATransmission]`) as `processEvent` callback at agent initialization + +**Tests:** +- `pGet` changes from `readTBQueue (subQ c)` to `readTBQueue (fromJust $ testQ c)` - 1 line +- Agent test setup: `processEvent = mapM_ (atomically . writeTBQueue q)` where `q` is `testQ` +- ~348 test call sites unchanged + +## Concurrency Safety + +- **Per-SMP-connection:** MVar in each SMP client serializes `processMsgs` and `writeSMPMessage` +- **Cross-connection:** Different SMP clients have different MVars, run in different threads - fully parallel +- **Per-connection-id:** `withConnLock connId` in `processSMPTransmissions` handles per-connection locking +- **Chat callback:** Must be safe for concurrent calls from different agent threads. Chat dispatches by entity type and connection ID; individual handlers use their own locks. +- **Exception safety:** Callback wrapped with bracket pattern - catches own exceptions, logs, decrements inflight counter. Exceptions don't kill SMP client threads. + +## Implementation Order + +Both layers change in one PR since they share `Client.hs`. + +### Phase 1: SMP client callback (`Client.hs` + both agent types) + +- [ ] 1.1 `Client.hs`: Replace `msgQ` with `processServerMsg` callback + `processLock` MVar in `PClient` +- [ ] 1.2 `Client.hs`: Update `processMsgs`, `writeSMPMessage`, `getProtocolClient`, `smpClientStub` +- [ ] 1.3 `Client/Agent.hs`: Replace `msgQ` in `SMPClientAgent` with callback field, update `newSMPClientAgent`, `connectClient` +- [ ] 1.4 `Agent/Client.hs`: Remove `msgQ` from `AgentClient`, update `smpConnectClient` to pass `processSMPTransmissions` as callback +- [ ] 1.5 `Agent.hs`: Remove `subscriber` thread from `runAgentThreads`, add exception wrapper to callback +- [ ] 1.6 `Notifications/Server.hs`: Convert `receiveSMP` from loop to callback passed to `SMPClientAgent` +- [ ] 1.7 `SMPProxyTests.hs`: Update 2 call sites to use callback + local test queue + +### Phase 2: Agent event callback (`subQ` -> `processEvent`) + +- [ ] 2.1 `Agent/Client.hs`: Add `processEvent :: [ATransmission] -> IO ()` and `testQ :: Maybe (TBQueue ATransmission)`, remove `subQ`, remove `AgentQueuesInfo` +- [ ] 2.2 `Agent.hs`: Rewrite `processSMPTransmissions` to accumulate events in local list and call `processEvent` once at end +- [ ] 2.3 `Agent.hs`: Update `runCommandProcessing` - remove pending/isFullTBQueue pattern, call `processEvent` with list +- [ ] 2.4 `Agent.hs`, `Agent/Client.hs`, `NtfSubSupervisor.hs`, `FileTransfer/Agent.hs`: Update all `writeTBQueue subQ` / `nonBlockingWriteTBQueue subQ` sites (~32 total) +- [ ] 2.5 `Agent/Client.hs`: Add inflight counter with bracket +- [ ] 2.6 Update `pGet` to read from `testQ` (1 line), update test agent setup +- [ ] 2.7 `simplex-chat`: Pass chat's `process` as callback, remove `agentSubscriber` +- [ ] 2.8 Fix any multi-server test ordering issues + +## Risks + +- **Chat thread safety:** Chat's `process` may not be safe for concurrent calls. Audit needed. +- **Backpressure:** Slow callback blocks SMP client receive thread. Acceptable - the connection that produced the message waits. Cross-connection interference eliminated. +- **Ordering:** Within one SMP connection - preserved (MVar + list callback). Across connections - non-deterministic (same as today, since `msgQ` interleaving was arbitrary). Most tests use 1 server. diff --git a/src/Simplex/Messaging/Agent.hs b/src/Simplex/Messaging/Agent.hs index bd77b892a1..b8adb8315c 100644 --- a/src/Simplex/Messaging/Agent.hs +++ b/src/Simplex/Messaging/Agent.hs @@ -270,19 +270,22 @@ getSMPAgentClient_ clientId cfg initServers@InitialAgentServers {smp, xftp, netC liftIO $ checkServers "SMP" smp >> checkServers "XFTP" xftp currentTs <- liftIO getCurrentTime notices <- liftIO $ withTransaction store (`getClientNotices` presetServers) `catchAll_` pure [] - c@AgentClient {acThread} <- liftIO . newAgentClient clientId initServers currentTs notices =<< ask + env <- ask + let processMsg c t = + agentOperationBracket c AORcvNetwork waitUntilActive (processSMPTransmissions c t) `runReaderT` env + `catchOwn` \e -> atomically $ writeTBQueue (subQ c) ("", "", AEvt SAEConn $ ERR $ CRITICAL True $ "subscriber error: " <> show e) + c@AgentClient {acThread} <- liftIO $ newAgentClient clientId initServers currentTs notices processMsg env t <- runAgentThreads c `forkFinally` const (liftIO $ disconnectAgentClient c) atomically . writeTVar acThread . Just =<< mkWeakThreadId t pure c checkServers protocol srvs = forM_ (M.assocs srvs) $ \(userId, srvs') -> checkUserServers ("getSMPAgentClient " <> protocol <> " " <> tshow userId) srvs' runAgentThreads c - | backgroundMode = run c "subscriber" $ subscriber c + | backgroundMode = forever $ liftIO $ threadDelay maxBound | otherwise = do restoreServersStats c raceAny_ - [ run c "subscriber" $ subscriber c, - run c "runNtfSupervisor" $ runNtfSupervisor c, + [ run c "runNtfSupervisor" $ runNtfSupervisor c, run c "cleanupManager" $ cleanupManager c, run c "logServersStats" $ logServersStats c ] @@ -2982,14 +2985,6 @@ getNextSMPServer :: AgentClient -> UserId -> [SMPServer] -> AM SMPServerWithAuth getNextSMPServer c userId = getNextServer c userId storageSrvs {-# INLINE getNextSMPServer #-} -subscriber :: AgentClient -> AM' () -subscriber c@AgentClient {msgQ, subQ} = run $ forever $ do - t <- atomically $ readTBQueue msgQ - agentOperationBracket c AORcvNetwork waitUntilActive $ - processSMPTransmissions c t - where - run a = a `catchOwn` \e -> notify $ CRITICAL True $ "Agent subscriber stopped: " <> show e - notify err = atomically $ writeTBQueue subQ ("", "", AEvt SAEConn $ ERR err) cleanupManager :: AgentClient -> AM' () cleanupManager c@AgentClient {subQ} = do diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index d33794006b..e70004a562 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -338,7 +338,7 @@ data AgentClient = AgentClient { acThread :: TVar (Maybe (Weak ThreadId)), active :: TVar Bool, subQ :: TBQueue ATransmission, - msgQ :: TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg), + processServerMsg :: AgentClient -> ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO (), smpServers :: TMap UserId (UserServers 'PSMP), smpClients :: TMap SMPTransportSession SMPClientVar, useClientServices :: TMap UserId Bool, @@ -505,15 +505,14 @@ data UserNetworkType = UNNone | UNCellular | UNWifi | UNEthernet | UNOther deriving (Eq, Show) -- | Creates an SMP agent client instance that receives commands and sends responses via 'TBQueue's. -newAgentClient :: Int -> InitialAgentServers -> UTCTime -> Map (Maybe SMPServer) (Maybe SystemSeconds) -> Env -> IO AgentClient -newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, useServices, presetDomains, presetServers} currentTs notices agentEnv = do +newAgentClient :: Int -> InitialAgentServers -> UTCTime -> Map (Maybe SMPServer) (Maybe SystemSeconds) -> (AgentClient -> ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO ()) -> Env -> IO AgentClient +newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, useServices, presetDomains, presetServers} currentTs notices processServerMsg agentEnv = do let cfg = config agentEnv qSize = tbqSize cfg proxySessTs <- newTVarIO =<< getCurrentTime acThread <- newTVarIO Nothing active <- newTVarIO True subQ <- newTBQueueIO qSize - msgQ <- newTBQueueIO qSize smpServers <- newTVarIO $ M.map mkUserServers smp smpClients <- TM.emptyIO useClientServices <- newTVarIO useServices @@ -553,7 +552,7 @@ newAgentClient clientId InitialAgentServers {smp, ntf, xftp, netCfg, useServices { acThread, active, subQ, - msgQ, + processServerMsg, smpServers, smpClients, useClientServices, @@ -733,7 +732,7 @@ getSMPProxyClient c@AgentClient {active, smpClients, smpProxiedRelays, workerSeq Nothing -> Left $ BROKER (B.unpack $ strEncode srv) TIMEOUT smpConnectClient :: AgentClient -> NetworkRequestMode -> SMPTransportSession -> TMap SMPServer ProxiedRelayVar -> SMPClientVar -> AM SMPConnectedClient -smpConnectClient c@AgentClient {smpClients, msgQ, proxySessTs, presetDomains} nm tSess@(userId, srv, _) prs v = +smpConnectClient c@AgentClient {processServerMsg, smpClients, proxySessTs, presetDomains} nm tSess@(userId, srv, _) prs v = newProtocolClient c tSess smpClients connectClient v `catchAllErrors` \e -> lift (resubscribeSMPSession c tSess) >> throwE e where @@ -746,7 +745,7 @@ smpConnectClient c@AgentClient {smpClients, msgQ, proxySessTs, presetDomains} nm env <- ask smp <- liftError (protocolClientError SMP $ B.unpack $ strEncode srv) $ do ts <- readTVarIO proxySessTs - ExceptT $ getProtocolClient g nm tSess cfg' presetDomains (Just msgQ) ts $ smpClientDisconnected c tSess env v' prs + ExceptT $ getProtocolClient g nm tSess cfg' presetDomains (Just $ processServerMsg c) ts $ smpClientDisconnected c tSess env v' prs atomically $ SS.setSessionId tSess (sessionId $ thParams smp) $ currentSubs c updateClientService service smp pure SMPConnectedClient {connectedClient = smp, proxiedRelays = prs} @@ -2835,8 +2834,8 @@ data ClientInfo deriving (Show) getAgentQueuesInfo :: AgentClient -> IO AgentQueuesInfo -getAgentQueuesInfo AgentClient {msgQ, subQ, smpClients} = do - msgQInfo <- atomically $ getTBQueueInfo msgQ +getAgentQueuesInfo AgentClient {subQ, smpClients} = do + let msgQInfo = TBQueueInfo {qLength = 0, qFull = False} subQInfo <- atomically $ getTBQueueInfo subQ smpClientsMap <- readTVarIO smpClients let smpClientsMap' = M.mapKeys (decodeLatin1 . strEncode) smpClientsMap diff --git a/src/Simplex/Messaging/Client.hs b/src/Simplex/Messaging/Client.hs index 67b31de186..79ef392c81 100644 --- a/src/Simplex/Messaging/Client.hs +++ b/src/Simplex/Messaging/Client.hs @@ -128,6 +128,7 @@ where import Control.Applicative ((<|>)) import Control.Concurrent (ThreadId, forkFinally, forkIO, killThread, mkWeakThreadId) +import Control.Concurrent.MVar import Control.Concurrent.Async import Control.Concurrent.STM import Control.Exception (Exception, Handler (..), IOException, SomeAsyncException, SomeException) @@ -199,7 +200,8 @@ data PClient v err msg = PClient sentCommands :: TMap CorrId (Request err msg), sndQ :: TBQueue (Maybe (Request err msg), ByteString), rcvQ :: TBQueue (NonEmpty (Transmission (Either err msg))), - msgQ :: Maybe (TBQueue (ServerTransmissionBatch v err msg)) + processServerMsg :: Maybe (ServerTransmissionBatch v err msg -> IO ()), + processLock :: MVar () } smpClientStub :: TVar ChaChaDRG -> ByteString -> VersionSMP -> Maybe (THandleAuth 'TClient) -> IO SMPClient @@ -213,6 +215,7 @@ smpClientStub g sessionId thVersion thAuth = do timeoutErrorCount <- newTVarIO 0 sndQ <- newTBQueueIO 100 rcvQ <- newTBQueueIO 100 + processLock <- newMVar () let NetworkConfig {tcpConnectTimeout, tcpTimeout} = defaultNetworkConfig return ProtocolClient @@ -244,7 +247,8 @@ smpClientStub g sessionId thVersion thAuth = do sentCommands, sndQ, rcvQ, - msgQ = Nothing + processServerMsg = Nothing, + processLock } } @@ -562,10 +566,10 @@ type SMPTransportSession = TransportSession BrokerMsg -- | Connects to 'ProtocolServer' using passed client configuration -- and queue for messages and notifications. -- --- A single queue can be used for multiple 'SMPClient' instances, +-- A single callback can be used for multiple 'SMPClient' instances, -- as 'SMPServerTransmission' includes server information. -getProtocolClient :: forall v err msg. Protocol v err msg => TVar ChaChaDRG -> NetworkRequestMode -> TransportSession msg -> ProtocolClientConfig v -> [HostName] -> Maybe (TBQueue (ServerTransmissionBatch v err msg)) -> UTCTime -> (ProtocolClient v err msg -> IO ()) -> IO (Either (ProtocolClientError err) (ProtocolClient v err msg)) -getProtocolClient g nm transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize, networkConfig, clientALPN, serviceCredentials, serverVRange, agreeSecret, proxyServer, useSNI} presetDomains msgQ proxySessTs disconnected = do +getProtocolClient :: forall v err msg. Protocol v err msg => TVar ChaChaDRG -> NetworkRequestMode -> TransportSession msg -> ProtocolClientConfig v -> [HostName] -> Maybe (ServerTransmissionBatch v err msg -> IO ()) -> UTCTime -> (ProtocolClient v err msg -> IO ()) -> IO (Either (ProtocolClientError err) (ProtocolClient v err msg)) +getProtocolClient g nm transportSession@(_, srv, _) cfg@ProtocolClientConfig {qSize, networkConfig, clientALPN, serviceCredentials, serverVRange, agreeSecret, proxyServer, useSNI} presetDomains processServerMsg proxySessTs disconnected = do case chooseTransportHost networkConfig (host srv) of Right useHost -> (getCurrentTime >>= mkProtocolClient useHost >>= runClient useTransport useHost) @@ -583,6 +587,7 @@ getProtocolClient g nm transportSession@(_, srv, _) cfg@ProtocolClientConfig {qS sentCommands <- TM.emptyIO sndQ <- newTBQueueIO qSize rcvQ <- newTBQueueIO qSize + processLock <- newMVar () return PClient { connected, @@ -597,7 +602,8 @@ getProtocolClient g nm transportSession@(_, srv, _) cfg@ProtocolClientConfig {qS sentCommands, sndQ, rcvQ, - msgQ + processServerMsg, + processLock } runClient :: (ServiceName, ATransport 'TClient) -> TransportHost -> PClient v err msg -> IO (Either (ProtocolClientError err) (ProtocolClient v err msg)) @@ -686,8 +692,10 @@ getProtocolClient g nm transportSession@(_, srv, _) cfg@ProtocolClientConfig {qS processMsgs :: ProtocolClient v err msg -> NonEmpty (Transmission (Either err msg)) -> IO () processMsgs c ts = do ts' <- catMaybes <$> mapM (processMsg c) (L.toList ts) - forM_ msgQ $ \q -> - mapM_ (atomically . writeTBQueue q . serverTransmission c) (L.nonEmpty ts') + forM_ processServerMsg $ \process -> + forM_ (L.nonEmpty ts') $ \ts'' -> + withMVar (processLock $ client_ c) $ \_ -> + process $ serverTransmission c ts'' processMsg :: ProtocolClient v err msg -> Transmission (Either err msg) -> IO (Maybe (EntityId, ServerTransmission err msg)) processMsg ProtocolClient {client_ = PClient {sentCommands}} (corrId, entId, respOrErr) @@ -714,7 +722,7 @@ getProtocolClient g nm transportSession@(_, srv, _) cfg@ProtocolClientConfig {qS Just e -> Left $ PCEProtocolError e _ -> Right r sendMsg :: ServerTransmission err msg -> IO (Maybe (EntityId, ServerTransmission err msg)) - sendMsg t = case msgQ of + sendMsg t = case processServerMsg of Just _ -> pure $ Just (entId, t) Nothing -> Nothing <$ case clientResp of @@ -872,7 +880,10 @@ processSUBResponse_ c rId = \case r' -> pure . Left $ unexpectedResponse r' writeSMPMessage :: SMPClient -> RecipientId -> BrokerMsg -> IO () -writeSMPMessage c rId msg = atomically $ mapM_ (`writeTBQueue` serverTransmission c [(rId, STEvent (Right msg))]) (msgQ $ client_ c) +writeSMPMessage c rId msg = + forM_ (processServerMsg $ client_ c) $ \process -> + withMVar (processLock $ client_ c) $ \_ -> + process $ serverTransmission c [(rId, STEvent (Right msg))] serverTransmission :: ProtocolClient v err msg -> NonEmpty (RecipientId, ServerTransmission err msg) -> ServerTransmissionBatch v err msg serverTransmission ProtocolClient {thParams, client_ = PClient {transportSession}} ts = (transportSession, thParams, ts) diff --git a/src/Simplex/Messaging/Client/Agent.hs b/src/Simplex/Messaging/Client/Agent.hs index 76b2a7cf93..f035a800f4 100644 --- a/src/Simplex/Messaging/Client/Agent.hs +++ b/src/Simplex/Messaging/Client/Agent.hs @@ -138,7 +138,7 @@ data SMPClientAgent p = SMPClientAgent dbService :: Maybe DBService, active :: TVar Bool, startedAt :: UTCTime, - msgQ :: TBQueue (ServerTransmissionBatch SMPVersion ErrorType BrokerMsg), + processMsg :: SMPClientAgent p -> ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO (), agentQ :: TBQueue SMPClientAgentEvent, randomDrg :: TVar ChaChaDRG, smpClients :: TMap SMPServer SMPClientVar, @@ -158,11 +158,10 @@ data SMPClientAgent p = SMPClientAgent type OwnServer = Bool -newSMPClientAgent :: SParty p -> SMPClientAgentConfig -> Maybe DBService -> TVar ChaChaDRG -> IO (SMPClientAgent p) -newSMPClientAgent agentParty agentCfg@SMPClientAgentConfig {msgQSize, agentQSize} dbService randomDrg = do +newSMPClientAgent :: SParty p -> SMPClientAgentConfig -> (SMPClientAgent p -> ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO ()) -> Maybe DBService -> TVar ChaChaDRG -> IO (SMPClientAgent p) +newSMPClientAgent agentParty agentCfg@SMPClientAgentConfig {agentQSize} processMsg dbService randomDrg = do active <- newTVarIO True startedAt <- getCurrentTime - msgQ <- newTBQueueIO msgQSize agentQ <- newTBQueueIO agentQSize smpClients <- TM.emptyIO smpSessions <- TM.emptyIO @@ -179,7 +178,7 @@ newSMPClientAgent agentParty agentCfg@SMPClientAgentConfig {msgQSize, agentQSize dbService, active, startedAt, - msgQ, + processMsg, agentQ, randomDrg, smpClients, @@ -257,7 +256,7 @@ isOwnServer SMPClientAgent {agentCfg} ProtocolServer {host} = -- | Run an SMP client for SMPClientVar connectClient :: SMPClientAgent p -> SMPServer -> SMPClientVar -> IO (Either SMPClientError SMPClient) -connectClient ca@SMPClientAgent {agentCfg, dbService, smpClients, smpSessions, msgQ, randomDrg, startedAt} srv v = case dbService of +connectClient ca@SMPClientAgent {agentCfg, dbService, smpClients, smpSessions, processMsg, randomDrg, startedAt} srv v = case dbService of Just dbs -> runExceptT $ do creds <- ExceptT $ getCredentials dbs srv smp <- ExceptT $ getClient cfg {serviceCredentials = Just creds} @@ -267,7 +266,7 @@ connectClient ca@SMPClientAgent {agentCfg, dbService, smpClients, smpSessions, m Nothing -> getClient cfg where cfg = smpCfg agentCfg - getClient cfg' = getProtocolClient randomDrg NRMBackground (1, srv, Nothing) cfg' [] (Just msgQ) startedAt clientDisconnected + getClient cfg' = getProtocolClient randomDrg NRMBackground (1, srv, Nothing) cfg' [] (Just $ processMsg ca) startedAt clientDisconnected clientDisconnected :: SMPClient -> IO () clientDisconnected smp = do diff --git a/src/Simplex/Messaging/Notifications/Server.hs b/src/Simplex/Messaging/Notifications/Server.hs index 02429e9108..e4d20acff5 100644 --- a/src/Simplex/Messaging/Notifications/Server.hs +++ b/src/Simplex/Messaging/Notifications/Server.hs @@ -54,7 +54,7 @@ import GHC.IORef (atomicSwapIORef) import GHC.Stats (getRTSStats) import Network.Socket (ServiceName, Socket, socketToHandle) import Numeric.Natural (Natural) -import Simplex.Messaging.Client (ProtocolClientError (..), SMPClientError, ServerTransmission (..)) +import Simplex.Messaging.Client (ProtocolClientError (..), SMPClientError, ServerTransmission (..), ServerTransmissionBatch) import Simplex.Messaging.Client.Agent import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Encoding.String @@ -68,7 +68,7 @@ import Simplex.Messaging.Notifications.Server.Store (NtfSTMStore, TokenNtfMessag import Simplex.Messaging.Notifications.Server.Store.Postgres import Simplex.Messaging.Notifications.Server.Store.Types import Simplex.Messaging.Notifications.Transport -import Simplex.Messaging.Protocol (EntityId (..), ErrorType (..), NotifierId, Party (..), ProtocolServer (host), SMPServer, ServiceSub (..), SignedTransmission, Transmission, pattern NoEntity, pattern SMPServer, encodeTransmission, tGetServer, tPut) +import Simplex.Messaging.Protocol (BrokerMsg, EntityId (..), ErrorType (..), NotifierId, Party (..), ProtocolServer (host), SMPServer, ServiceSub (..), SignedTransmission, Transmission, pattern NoEntity, pattern SMPServer, encodeTransmission, tGetServer, tPut) import qualified Simplex.Messaging.Protocol as SMP import Simplex.Messaging.Server import Simplex.Messaging.Server.Control (CPClientRole (..)) @@ -77,7 +77,7 @@ import Simplex.Messaging.Server.Stats (PeriodStats (..), PeriodStatCounts (..), import Simplex.Messaging.Session import Simplex.Messaging.SystemTime import Simplex.Messaging.TMap (TMap) -import Simplex.Messaging.Transport (ASrvTransport, ATransport (..), THandle (..), THandleAuth (..), THandleParams (..), TProxy, Transport (..), TransportPeer (..), defaultSupportedParams) +import Simplex.Messaging.Transport (ASrvTransport, ATransport (..), SMPVersion, THandle (..), THandleAuth (..), THandleParams (..), TProxy, Transport (..), TransportPeer (..), defaultSupportedParams) import Simplex.Messaging.Transport.Buffer (trimCR) import Simplex.Messaging.Transport.Server (AddHTTP, runTransportServer, runLocalTCPServer) import Simplex.Messaging.Util @@ -101,7 +101,7 @@ runNtfServer cfg = do runNtfServerBlocking started cfg runNtfServerBlocking :: TMVar Bool -> NtfServerConfig -> IO () -runNtfServerBlocking started cfg = runReaderT (ntfServer cfg started) =<< newNtfServerEnv cfg +runNtfServerBlocking started cfg = runReaderT (ntfServer cfg started) =<< newNtfServerEnv cfg receiveSMPMessage type M a = ReaderT NtfEnv IO a @@ -525,92 +525,83 @@ subscribeNtfs NtfSubscriber {smpSubscribers, subscriberSeq, smpAgent = ca} st sm void $ updateSubStatus st srvId' nId NSPending subscribeQueuesNtfs ca smpServer' [sub] +receiveSMPMessage :: NtfPostgresStore -> NtfPushServer -> NtfServerStats -> SMPClientAgent 'NotifierService -> ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO () +receiveSMPMessage st ps stats ca ((_, srv@(SMPServer (h :| _) _ _), _), THandleParams {sessionId}, ts) = + forM_ ts $ \(ntfId, t) -> case t of + STUnexpectedError e -> logError $ "SMP client unexpected error: " <> tshow e -- uncorrelated response, should not happen + STResponse {} -> pure () -- it was already reported as timeout error + STEvent msgOrErr -> do + let smpQueue = SMPQueueNtf srv ntfId + case msgOrErr of + Right (SMP.NMSG nmsgNonce encNMsgMeta) -> do + ntfTs <- getSystemTime + updatePeriodStats (activeSubs stats) ntfId + let newNtf = PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta} + srvHost = safeDecodeUtf8 $ strEncode h + isOwn = isOwnServer ca srv + addTokenLastNtf st newNtf >>= \case + Right (tkn, lastNtfs) -> do + pushNotification st stats ps (Just srvHost) isOwn tkn $ PNMessage lastNtfs + incNtfStat_ stats ntfReceived + when isOwn $ incServerStat srvHost (ntfReceivedOwn stats) + Left AUTH -> do + incNtfStat_ stats ntfReceivedAuth + when isOwn $ incServerStat srvHost (ntfReceivedAuthOwn stats) + Left _ -> pure () + Right SMP.END -> + whenM (atomically $ activeClientSession' ca sessionId srv) $ + void $ updateSrvSubStatus st smpQueue NSEnd + Right SMP.DELD -> + void $ updateSrvSubStatus st smpQueue NSDeleted + Right (SMP.ERR e) -> logError $ "SMP server error: " <> tshow e + Right _ -> logError "SMP server unexpected response" + Left e -> logError $ "SMP client error: " <> tshow e + ntfSubscriber :: NtfSubscriber -> M () -ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {msgQ, agentQ}} = - race_ receiveSMP receiveAgent +ntfSubscriber NtfSubscriber {smpAgent = ca@SMPClientAgent {agentQ}} = do + st <- asks store + batchSize <- asks $ subsBatchSize . config + liftIO $ forever $ + atomically (readTBQueue agentQ) >>= \case + CAConnected srv serviceId -> do + let asService = if isJust serviceId then "as service " else "" + logInfo $ "SMP server reconnected " <> asService <> showServer' srv + CADisconnected srv nIds -> do + updated <- batchUpdateSrvSubStatus st srv Nothing nIds NSInactive + logSubStatus srv "disconnected" (L.length nIds) updated + CASubscribed srv serviceId nIds -> do + updated <- batchUpdateSrvSubStatus st srv serviceId nIds NSActive + let asService = if isJust serviceId then " as service" else "" + logSubStatus srv ("subscribed" <> asService) (L.length nIds) updated + CASubError srv errs -> do + forM_ (L.nonEmpty $ mapMaybe (\(nId, err) -> (nId,) <$> queueSubErrorStatus err) $ L.toList errs) $ \subStatuses -> do + updated <- batchUpdateSrvSubErrors st srv subStatuses + logSubErrors srv subStatuses updated + -- TODO [certs rcv] resubscribe queues with statuses NSErr and NSService + CAServiceDisconnected srv serviceSub -> + logNote $ "SMP server service disconnected " <> showService srv serviceSub + CAServiceSubscribed srv serviceSub@(ServiceSub _ n idsHash) (ServiceSub _ n' idsHash') + | n /= n' -> logWarn $ msg <> ", confirmed subs: " <> tshow n' + | idsHash /= idsHash' -> logWarn $ msg <> ", different IDs hash" + | otherwise -> logNote msg + where + msg = "SMP server service subscribed " <> showService srv serviceSub + CAServiceSubError srv serviceSub e -> + -- Errors that require re-subscribing queues directly are reported as CAServiceUnavailable. + -- See smpSubscribeService in Simplex.Messaging.Client.Agent + logError $ "SMP server service subscription error " <> showService srv serviceSub <> ": " <> tshow e + CAServiceUnavailable srv serviceSub -> do + logError $ "SMP server service unavailable: " <> showService srv serviceSub + removeServiceAndAssociations st srv >>= \case + Right (srvId, updated) -> do + logSubStatus srv "removed service association" updated updated + void $ subscribeSrvSubs ca st batchSize (srv, srvId, Nothing) + Left e -> logError $ "SMP server update and resubscription error " <> tshow e where - receiveSMP = do - st <- asks store - ps <- asks pushServer - stats <- asks serverStats - forever $ do - ((_, srv@(SMPServer (h :| _) _ _), _), THandleParams {sessionId}, ts) <- atomically $ readTBQueue msgQ - forM_ ts $ \(ntfId, t) -> case t of - STUnexpectedError e -> logError $ "SMP client unexpected error: " <> tshow e -- uncorrelated response, should not happen - STResponse {} -> pure () -- it was already reported as timeout error - STEvent msgOrErr -> do - let smpQueue = SMPQueueNtf srv ntfId - case msgOrErr of - Right (SMP.NMSG nmsgNonce encNMsgMeta) -> do - ntfTs <- liftIO getSystemTime - liftIO $ updatePeriodStats (activeSubs stats) ntfId - let newNtf = PNMessageData {smpQueue, ntfTs, nmsgNonce, encNMsgMeta} - srvHost = safeDecodeUtf8 $ strEncode h - isOwn = isOwnServer ca srv - liftIO (addTokenLastNtf st newNtf) >>= \case - Right (tkn, lastNtfs) -> do - pushNotification ps (Just srvHost) isOwn tkn $ PNMessage lastNtfs - liftIO $ incNtfStat_ stats ntfReceived - when isOwn $ liftIO $ incServerStat srvHost (ntfReceivedOwn stats) - Left AUTH -> liftIO $ do - incNtfStat_ stats ntfReceivedAuth - when isOwn $ incServerStat srvHost (ntfReceivedAuthOwn stats) - Left _ -> pure () - Right SMP.END -> - whenM (atomically $ activeClientSession' ca sessionId srv) $ - void $ liftIO $ updateSrvSubStatus st smpQueue NSEnd - Right SMP.DELD -> - void $ liftIO $ updateSrvSubStatus st smpQueue NSDeleted - Right (SMP.ERR e) -> logError $ "SMP server error: " <> tshow e - Right _ -> logError "SMP server unexpected response" - Left e -> logError $ "SMP client error: " <> tshow e - - receiveAgent = do - st <- asks store - batchSize <- asks $ subsBatchSize . config - liftIO $ forever $ - atomically (readTBQueue agentQ) >>= \case - CAConnected srv serviceId -> do - let asService = if isJust serviceId then "as service " else "" - logInfo $ "SMP server reconnected " <> asService <> showServer' srv - CADisconnected srv nIds -> do - updated <- batchUpdateSrvSubStatus st srv Nothing nIds NSInactive - logSubStatus srv "disconnected" (L.length nIds) updated - CASubscribed srv serviceId nIds -> do - updated <- batchUpdateSrvSubStatus st srv serviceId nIds NSActive - let asService = if isJust serviceId then " as service" else "" - logSubStatus srv ("subscribed" <> asService) (L.length nIds) updated - CASubError srv errs -> do - forM_ (L.nonEmpty $ mapMaybe (\(nId, err) -> (nId,) <$> queueSubErrorStatus err) $ L.toList errs) $ \subStatuses -> do - updated <- batchUpdateSrvSubErrors st srv subStatuses - logSubErrors srv subStatuses updated - -- TODO [certs rcv] resubscribe queues with statuses NSErr and NSService - CAServiceDisconnected srv serviceSub -> - logNote $ "SMP server service disconnected " <> showService srv serviceSub - CAServiceSubscribed srv serviceSub@(ServiceSub _ n idsHash) (ServiceSub _ n' idsHash') - | n /= n' -> logWarn $ msg <> ", confirmed subs: " <> tshow n' - | idsHash /= idsHash' -> logWarn $ msg <> ", different IDs hash" - | otherwise -> logNote msg - where - msg = "SMP server service subscribed " <> showService srv serviceSub - CAServiceSubError srv serviceSub e -> - -- Errors that require re-subscribing queues directly are reported as CAServiceUnavailable. - -- See smpSubscribeService in Simplex.Messaging.Client.Agent - logError $ "SMP server service subscription error " <> showService srv serviceSub <> ": " <> tshow e - CAServiceUnavailable srv serviceSub -> do - logError $ "SMP server service unavailable: " <> showService srv serviceSub - removeServiceAndAssociations st srv >>= \case - Right (srvId, updated) -> do - logSubStatus srv "removed service association" updated updated - void $ subscribeSrvSubs ca st batchSize (srv, srvId, Nothing) - Left e -> logError $ "SMP server update and resubscription error " <> tshow e - where - showService srv (ServiceSub serviceId n _) = showServer' srv <> ", service ID " <> decodeLatin1 (strEncode serviceId) <> ", " <> tshow n <> " subs" - + showService srv (ServiceSub serviceId n _) = showServer' srv <> ", service ID " <> decodeLatin1 (strEncode serviceId) <> ", " <> tshow n <> " subs" logSubErrors :: SMPServer -> NonEmpty (SMP.NotifierId, NtfSubStatus) -> Int -> IO () - logSubErrors srv subs updated = forM_ (L.group $ L.sort $ L.map snd subs) $ \ss -> do + logSubErrors srv subs updated = forM_ (L.group $ L.sort $ L.map snd subs) $ \ss -> logError $ "SMP server subscription errors " <> showServer' srv <> ": " <> tshow (L.head ss) <> " (" <> tshow (length ss) <> " errors, " <> tshow updated <> " subs updated)" - queueSubErrorStatus :: SMPClientError -> Maybe NtfSubStatus queueSubErrorStatus = \case PCEProtocolError AUTH -> Just NSAuth @@ -639,55 +630,53 @@ logSubStatus srv event n updated = showServer' :: SMPServer -> Text showServer' = decodeLatin1 . strEncode . host -pushNotification :: NtfPushServer -> Maybe T.Text -> OwnServer -> NtfTknRec -> PushNotification -> M () -pushNotification s srvHost_ isOwn tkn@NtfTknRec {token = DeviceToken pp _} ntf = do - q <- getOrCreatePushWorker s (srvHost_, pp) isOwn +pushNotification :: NtfPostgresStore -> NtfServerStats -> NtfPushServer -> Maybe T.Text -> OwnServer -> NtfTknRec -> PushNotification -> IO () +pushNotification st stats s srvHost_ isOwn tkn@NtfTknRec {token = DeviceToken pp _} ntf = do + q <- getOrCreatePushWorker st stats s (srvHost_, pp) isOwn atomically $ writeTBQueue q (tkn, ntf) -getOrCreatePushWorker :: NtfPushServer -> (Maybe T.Text, PushProvider) -> OwnServer -> M (TBQueue (NtfTknRec, PushNotification)) -getOrCreatePushWorker s@NtfPushServer {pushWorkers, pushWorkerSeq, pushQSize} key@(srvHost_, _) isOwn = do - ts <- liftIO getCurrentTime +getOrCreatePushWorker :: NtfPostgresStore -> NtfServerStats -> NtfPushServer -> (Maybe T.Text, PushProvider) -> OwnServer -> IO (TBQueue (NtfTknRec, PushNotification)) +getOrCreatePushWorker st stats s@NtfPushServer {pushWorkers, pushWorkerSeq, pushQSize} key@(srvHost_, _) isOwn = do + ts <- getCurrentTime atomically (getSessVar pushWorkerSeq key pushWorkers ts) >>= \case Left v -> do - q <- liftIO $ newTBQueueIO pushQSize - tId <- mkWeakThreadId =<< forkIO (runPushWorker s srvHost_ isOwn q) + q <- newTBQueueIO pushQSize + tId <- mkWeakThreadId =<< forkIO (runPushWorker st stats s srvHost_ isOwn q) atomically $ putTMVar (sessionVar v) PushWorker {workerQ = q, workerThreadId = tId} pure q Right v -> workerQ <$> atomically (readTMVar $ sessionVar v) -runPushWorker :: NtfPushServer -> Maybe T.Text -> OwnServer -> TBQueue (NtfTknRec, PushNotification) -> M () -runPushWorker s srvHost_ isOwn q = forever $ do +runPushWorker :: NtfPostgresStore -> NtfServerStats -> NtfPushServer -> Maybe T.Text -> OwnServer -> TBQueue (NtfTknRec, PushNotification) -> IO () +runPushWorker st stats s srvHost_ isOwn q = forever $ do (tkn@NtfTknRec {ntfTknId, token = t@(DeviceToken pp _), tknStatus}, ntf) <- atomically (readTBQueue q) - liftIO $ logDebug $ "sending push notification to " <> T.pack (show pp) - st <- asks store + logDebug $ "sending push notification to " <> T.pack (show pp) case ntf of PNVerification _ -> - liftIO (deliverNotification st pp tkn ntf) >>= \case + deliverNotification st pp tkn ntf >>= \case Right _ -> do - void $ liftIO $ setTknStatusConfirmed st tkn - incNtfStatT t ntfVrfDelivered - Left _ -> incNtfStatT t ntfVrfFailed + void $ setTknStatusConfirmed st tkn + incNtfStatT_ stats t ntfVrfDelivered + Left _ -> incNtfStatT_ stats t ntfVrfFailed PNCheckMessages -> - liftIO (deliverNotification st pp tkn ntf) >>= \case + deliverNotification st pp tkn ntf >>= \case Right _ -> do - void $ liftIO $ updateTokenCronSentAt st ntfTknId . systemSeconds =<< getSystemTime - incNtfStatT t ntfCronDelivered - Left _ -> incNtfStatT t ntfCronFailed + void $ updateTokenCronSentAt st ntfTknId . systemSeconds =<< getSystemTime + incNtfStatT_ stats t ntfCronDelivered + Left _ -> incNtfStatT_ stats t ntfCronFailed PNMessage {} -> checkActiveTkn tknStatus $ do - stats <- asks serverStats - liftIO $ updatePeriodStats (activeTokens stats) ntfTknId - liftIO (deliverNotification st pp tkn ntf) >>= \case + updatePeriodStats (activeTokens stats) ntfTknId + deliverNotification st pp tkn ntf >>= \case Left _ -> do - incNtfStatT t ntfFailed - when isOwn $ liftIO $ mapM_ (`incServerStat` ntfFailedOwn stats) srvHost_ + incNtfStatT_ stats t ntfFailed + when isOwn $ mapM_ (`incServerStat` ntfFailedOwn stats) srvHost_ Right () -> do - incNtfStatT t ntfDelivered - when isOwn $ liftIO $ mapM_ (`incServerStat` ntfDeliveredOwn stats) srvHost_ + incNtfStatT_ stats t ntfDelivered + when isOwn $ mapM_ (`incServerStat` ntfDeliveredOwn stats) srvHost_ where - checkActiveTkn :: NtfTknStatus -> M () -> M () + checkActiveTkn :: NtfTknStatus -> IO () -> IO () checkActiveTkn status action | status == NTActive = action - | otherwise = liftIO $ logError "bad notification token status" + | otherwise = logError "bad notification token status" deliverNotification :: NtfPostgresStore -> PushProvider -> NtfTknRec -> PushNotification -> IO (Either PushProviderError ()) deliverNotification st pp tkn@NtfTknRec {ntfTknId} ntf' = do (deliver, clientVar) <- getPushClient s pp @@ -730,13 +719,13 @@ pushWorkersQLength workers = do periodicNtfsThread :: NtfPushServer -> M () periodicNtfsThread s = do st <- asks store + stats <- asks serverStats ntfsInterval <- asks $ periodicNtfsInterval . config let interval = 1000000 * ntfsInterval - UnliftIO unlift <- askUnliftIO liftIO $ forever $ do threadDelay interval now <- systemSeconds <$> getSystemTime - cnt <- withPeriodicNtfTokens st now $ \tkn -> unlift $ pushNotification s Nothing False tkn PNCheckMessages + cnt <- withPeriodicNtfTokens st now $ \tkn -> pushNotification st stats s Nothing False tkn PNCheckMessages logNote $ "Scheduled periodic notifications: " <> tshow cnt runNtfClientTransport :: Transport c => THandleNTF c 'TServer -> M () @@ -826,14 +815,16 @@ verifyNtfTransmission st thAuth (tAuth, authorized, (corrId, entId, cmd)) = case e -> VRFailed e client :: NtfServerClient -> NtfSubscriber -> NtfPushServer -> M () -client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} ps = +client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} ps = do + st <- asks store + stats <- asks serverStats forever $ atomically (readTBQueue rcvQ) - >>= mapM processCommand + >>= mapM (processCommand st stats) >>= atomically . writeTBQueue sndQ where - processCommand :: NtfRequest -> M (Transmission NtfResponse) - processCommand = \case + processCommand :: NtfPostgresStore -> NtfServerStats -> NtfRequest -> M (Transmission NtfResponse) + processCommand st stats = \case NtfReqNew corrId (ANE SToken newTkn@(NewNtfTkn token _ dhPubKey)) -> (corrId,NoEntity,) <$> do logDebug "TNEW - new token" (srvDhPubKey, srvDhPrivKey) <- atomically . C.generateKeyPair =<< asks random @@ -843,7 +834,7 @@ client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} ps = ts <- liftIO $ getSystemDate let tkn = mkNtfTknRec tknId newTkn srvDhPrivKey dhSecret regCode ts withNtfStore (`addNtfToken` tkn) $ \_ -> do - pushNotification ps Nothing False tkn $ PNVerification regCode + liftIO $ pushNotification st stats ps Nothing False tkn $ PNVerification regCode incNtfStatT token ntfVrfQueued incNtfStatT token tknCreated pure $ NRTknId tknId srvDhPubKey @@ -859,7 +850,7 @@ client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} ps = | otherwise -> withNtfStore (\st -> updateTknStatus st tkn NTRegistered) $ \_ -> sendVerification where sendVerification = do - pushNotification ps Nothing False tkn $ PNVerification tknRegCode + liftIO $ pushNotification st stats ps Nothing False tkn $ PNVerification tknRegCode incNtfStatT token ntfVrfQueued pure $ NRTknId ntfTknId $ C.publicKey tknDhPrivKey TVFY code -- this allows repeated verification for cases when client connection dropped before server response @@ -877,7 +868,7 @@ client NtfServerClient {rcvQ, sndQ} ns@NtfSubscriber {smpAgent = ca} ps = regCode <- getRegCode let tkn' = tkn {token = token', tknStatus = NTRegistered, tknRegCode = regCode} withNtfStore (`replaceNtfToken` tkn') $ \_ -> do - pushNotification ps Nothing False tkn' $ PNVerification regCode + liftIO $ pushNotification st stats ps Nothing False tkn' $ PNVerification regCode incNtfStatT token ntfVrfQueued incNtfStatT token tknReplaced pure NROk @@ -949,6 +940,11 @@ incNtfStatT (DeviceToken PPApnsNull _) _ = pure () incNtfStatT _ statSel = incNtfStat statSel {-# INLINE incNtfStatT #-} +incNtfStatT_ :: NtfServerStats -> DeviceToken -> (NtfServerStats -> IORef Int) -> IO () +incNtfStatT_ _ (DeviceToken PPApnsNull _) _ = pure () +incNtfStatT_ stats _ statSel = incNtfStat_ stats statSel +{-# INLINE incNtfStatT_ #-} + incNtfStat :: (NtfServerStats -> IORef Int) -> M () incNtfStat statSel = asks serverStats >>= liftIO . (`incNtfStat_` statSel) {-# INLINE incNtfStat #-} diff --git a/src/Simplex/Messaging/Notifications/Server/Env.hs b/src/Simplex/Messaging/Notifications/Server/Env.hs index 365d464c85..7ece78609f 100644 --- a/src/Simplex/Messaging/Notifications/Server/Env.hs +++ b/src/Simplex/Messaging/Notifications/Server/Env.hs @@ -45,7 +45,7 @@ import qualified Data.X509.Validation as XV import Network.Socket import qualified Network.TLS as TLS import Numeric.Natural -import Simplex.Messaging.Client (ProtocolClientError (..), SMPClientError) +import Simplex.Messaging.Client (ProtocolClientError (..), SMPClientError, ServerTransmissionBatch) import Simplex.Messaging.Client.Agent import qualified Simplex.Messaging.Crypto as C import Simplex.Messaging.Notifications.Protocol @@ -54,14 +54,14 @@ import Simplex.Messaging.Notifications.Server.Stats import Simplex.Messaging.Notifications.Server.Store.Postgres import Simplex.Messaging.Notifications.Server.Store.Types import Simplex.Messaging.Notifications.Transport (NTFVersion, VersionRangeNTF) -import Simplex.Messaging.Protocol (BasicAuth, CorrId, Party (..), SMPServer, SParty (..), ServiceId, Transmission) +import Simplex.Messaging.Protocol (BasicAuth, BrokerMsg, CorrId, ErrorType, Party (..), SMPServer, SParty (..), ServiceId, Transmission) import Simplex.Messaging.Server.Env.STM (StartOptions (..)) import Simplex.Messaging.Server.Expiration import Simplex.Messaging.Server.QueueStore.Postgres.Config (PostgresStoreCfg (..)) import Simplex.Messaging.Session import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM -import Simplex.Messaging.Transport (ASrvTransport, SMPServiceRole (..), ServiceCredentials (..), THandleParams, TransportPeer (..)) +import Simplex.Messaging.Transport (ASrvTransport, SMPServiceRole (..), SMPVersion, ServiceCredentials (..), THandleParams, TransportPeer (..)) import Simplex.Messaging.Transport.Credentials (genCredentials, tlsCredentials) import Simplex.Messaging.Transport.Server (AddHTTP, ServerCredentials, TransportServerConfig, loadFingerprint, loadServerCredential) import Simplex.Messaging.Util (liftEitherWith, tshow) @@ -119,16 +119,17 @@ data NtfEnv = NtfEnv serverStats :: NtfServerStats } -newNtfServerEnv :: NtfServerConfig -> IO NtfEnv -newNtfServerEnv config@NtfServerConfig {pushQSize, smpAgentCfg, apnsConfig, dbStoreConfig, ntfCredentials, useServiceCreds} = do +newNtfServerEnv :: NtfServerConfig -> (NtfPostgresStore -> NtfPushServer -> NtfServerStats -> SMPClientAgent 'NotifierService -> ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO ()) -> IO NtfEnv +newNtfServerEnv config@NtfServerConfig {pushQSize, smpAgentCfg, apnsConfig, dbStoreConfig, ntfCredentials, useServiceCreds} mkProcessMsg = do random <- C.newRandom store <- newNtfDbStore dbStoreConfig tlsServerCreds <- loadServerCredential ntfCredentials XV.Fingerprint fp <- loadFingerprint ntfCredentials - let dbService = if useServiceCreds then Just $ mkDbService random store else Nothing - subscriber <- newNtfSubscriber smpAgentCfg dbService random pushServer <- newNtfPushServer pushQSize apnsConfig serverStats <- newNtfServerStats =<< getCurrentTime + let dbService = if useServiceCreds then Just $ mkDbService random store else Nothing + processMsg = mkProcessMsg store pushServer serverStats + subscriber <- newNtfSubscriber smpAgentCfg processMsg dbService random pure NtfEnv {config, subscriber, pushServer, store, random, tlsServerCreds, serverIdentity = C.KeyHash fp, serverStats} where mkDbService g st = DBService {getCredentials, updateServiceId} @@ -158,11 +159,11 @@ data NtfSubscriber = NtfSubscriber type SMPSubscriberVar = SessionVar SMPSubscriber -newNtfSubscriber :: SMPClientAgentConfig -> Maybe DBService -> TVar ChaChaDRG -> IO NtfSubscriber -newNtfSubscriber smpAgentCfg dbService random = do +newNtfSubscriber :: SMPClientAgentConfig -> (SMPClientAgent 'NotifierService -> ServerTransmissionBatch SMPVersion ErrorType BrokerMsg -> IO ()) -> Maybe DBService -> TVar ChaChaDRG -> IO NtfSubscriber +newNtfSubscriber smpAgentCfg processMsg dbService random = do smpSubscribers <- TM.emptyIO subscriberSeq <- newTVarIO 0 - smpAgent <- newSMPClientAgent SNotifierService smpAgentCfg dbService random + smpAgent <- newSMPClientAgent SNotifierService smpAgentCfg processMsg dbService random pure NtfSubscriber {smpSubscribers, subscriberSeq, smpAgent} data SMPSubscriber = SMPSubscriber diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 574111c15e..123ccd545c 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -706,7 +706,7 @@ mkJournalStoreConfig queueStoreCfg storePath msgQueueQuota maxJournalMsgCount ma newSMPProxyAgent :: SMPClientAgentConfig -> TVar ChaChaDRG -> IO ProxyAgent newSMPProxyAgent smpAgentCfg random = do - smpAgent <- newSMPClientAgent SSender smpAgentCfg Nothing random + smpAgent <- newSMPClientAgent SSender smpAgentCfg (\_ _ -> pure ()) Nothing random pure ProxyAgent {smpAgent} readWriteQueueStore :: forall q. StoreQueueClass q => Bool -> (RecipientId -> QueueRec -> IO q) -> FilePath -> STMQueueStore q -> IO (StoreLog 'WriteMode) diff --git a/tests/SMPProxyTests.hs b/tests/SMPProxyTests.hs index 0d8ccdf89e..1edc95766f 100644 --- a/tests/SMPProxyTests.hs +++ b/tests/SMPProxyTests.hs @@ -172,7 +172,7 @@ deliverMessagesViaProxy proxyServ relayServ alg unsecuredMsgs securedMsgs = do THAuthClient {} <- maybe (fail "getProtocolClient returned no thAuth") pure $ thAuth $ thParams pc -- set up relay msgQ <- newTBQueueIO 1024 - rc' <- getProtocolClient g NRMInteractive (2, relayServ, Nothing) defaultSMPClientConfig {serverVRange = mkVersionRange minServerSMPRelayVersion currentClientSMPRelayVersion} [] (Just msgQ) ts (\_ -> pure ()) + rc' <- getProtocolClient g NRMInteractive (2, relayServ, Nothing) defaultSMPClientConfig {serverVRange = mkVersionRange minServerSMPRelayVersion currentClientSMPRelayVersion} [] (Just $ atomically . writeTBQueue msgQ) ts (\_ -> pure ()) rc <- either (fail . show) pure rc' -- prepare receiving queue (rPub, rPriv) <- atomically $ C.generateAuthKeyPair alg g