From 2465930f85e4b0afd877cfc7ca56291f455ef410 Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Fri, 24 May 2024 22:03:44 +0300 Subject: [PATCH 01/10] agent: send UP from initial subscriptions --- src/Simplex/Messaging/Agent/Client.hs | 51 +++++++++++++-------------- 1 file changed, 24 insertions(+), 27 deletions(-) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index a99d957e3..74ae0055f 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -170,7 +170,7 @@ import Data.List.NonEmpty (NonEmpty (..), (<|)) import qualified Data.List.NonEmpty as L import Data.Map.Strict (Map) import qualified Data.Map.Strict as M -import Data.Maybe (isJust, isNothing, listToMaybe) +import Data.Maybe (catMaybes, isJust, isNothing, listToMaybe) import Data.Set (Set) import qualified Data.Set as S import Data.Text (Text) @@ -678,15 +678,12 @@ smpClientDisconnected c@AgentClient {active, smpClients, smpProxiedRelays} tSess serverDown :: ([RcvQueue], [ConnId]) -> IO () serverDown (qs, conns) = whenM (readTVarIO active) $ do incClientStat' c userId client "DISCONNECT" "" - notifySub "" $ hostEvent' DISCONNECT client - unless (null conns) $ notifySub "" $ DOWN srv conns + notifySub c "" $ hostEvent' DISCONNECT client + unless (null conns) $ notifySub c "" $ DOWN srv conns unless (null qs) $ do atomically $ mapM_ (releaseGetLock c) qs runReaderT (resubscribeSMPSession c tSess) env - notifySub :: forall e. AEntityI e => ConnId -> ACommand 'Agent e -> IO () - notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, APC (sAEntity @e) cmd) - resubscribeSMPSession :: AgentClient -> SMPTransportSession -> AM' () resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = atomically getWorkerVar >>= mapM_ (either newSubWorker (\_ -> pure ())) @@ -721,21 +718,22 @@ reconnectSMPClient c tSess@(_, srv, _) qs = handleNotify $ do (rs, sessId_) <- subscribeQueues c $ L.toList qs let (errs, okConns) = partitionEithers $ map (\(RcvQueue {connId}, r) -> bimap (connId,) (const connId) r) rs conns = filter (`M.notMember` cs) okConns - unless (null conns) $ notifySub "" $ UP srv conns + unless (null conns) $ notifySub c "" $ UP srv conns let (tempErrs, finalErrs) = partition (temporaryAgentError . snd) errs - mapM_ (\(connId, e) -> notifySub connId $ ERR e) finalErrs + mapM_ (\(connId, e) -> notifySub c connId $ ERR e) finalErrs forM_ (listToMaybe tempErrs) $ \(connId, e) -> do when (null okConns && M.null cs && null finalErrs) . liftIO $ forM_ sessId_ $ \sessId -> do -- We only close the client session that was used to subscribe. v_ <- atomically $ ifM (activeClientSession c tSess sessId) (TM.lookupDelete tSess $ smpClients c) (pure Nothing) mapM_ (closeClient_ c) v_ - notifySub connId $ ERR e + notifySub c connId $ ERR e where handleNotify :: AM' () -> AM' () - handleNotify = E.handleAny $ notifySub "" . ERR . INTERNAL . show - notifySub :: forall e. AEntityI e => ConnId -> ACommand 'Agent e -> AM' () - notifySub connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, APC (sAEntity @e) cmd) + handleNotify = E.handleAny $ notifySub c "" . ERR . INTERNAL . show + +notifySub :: forall e m. (AEntityI e, MonadIO m) => AgentClient -> ConnId -> ACommand 'Agent e -> m () +notifySub c connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, APC (sAEntity @e) cmd) getNtfServerClient :: AgentClient -> NtfTransportSession -> AM NtfClient getNtfServerClient c@AgentClient {active, ntfClients, workerSeq} tSess@(userId, srv, _) = do @@ -1291,14 +1289,10 @@ newRcvQueue c userId connId (ProtoServerWithAuth srv auth) vRange subMode = do qUri = SMPQueueUri vRange $ SMPQueueAddress srv sndId e2eDhKey pure (rq, qUri, tSess, sessId) -processSubResult :: AgentClient -> RcvQueue -> Either SMPClientError () -> STM () +processSubResult :: AgentClient -> RcvQueue -> Either SMPClientError () -> STM (Maybe ConnId) processSubResult c rq@RcvQueue {connId} = \case - Left e -> - unless (temporaryClientError e) $ - failSubscription c rq e - Right () -> - whenM (hasPendingSubscription c connId) $ - addSubscription c rq + Left e -> Nothing <$ unless (temporaryClientError e) (failSubscription c rq e) + Right () -> ifM (hasPendingSubscription c connId) (Just connId <$ addSubscription c rq) (pure Nothing) temporaryAgentError :: AgentErrorType -> Bool temporaryAgentError = \case @@ -1349,23 +1343,26 @@ subscribeQueues c qs = do subscribeQueues_ :: Env -> TVar (Maybe SessionId) -> SMPClient -> NonEmpty RcvQueue -> IO (BatchResponses SMPClientError ()) subscribeQueues_ env session smp qs' = do rs <- sendBatch subscribeSMPQueues smp qs' - active <- + (active, subResults) <- atomically $ ifM (activeClientSession c tSess sessId) - (writeTVar session (Just sessId) >> processSubResults rs $> True) - (pure False) + (writeTVar session (Just sessId) >> ((True,) <$> processSubResults rs)) + (pure (False, [])) if active - then when (hasTempErrors rs) resubscribe $> rs + then do + when (any isNothing subResults) resubscribe + let up = catMaybes $ L.toList subResults + unless (null up) $ notifySub c "" $ UP srv up + pure rs else do logWarn "subcription batch result for replaced SMP client, resubscribing" resubscribe $> L.map (second $ \_ -> Left PCENetworkError) rs where - tSess = transportSession' smp + tSess@(_, srv, _) = transportSession' smp sessId = sessionId $ thParams smp - hasTempErrors = any (either temporaryClientError (const False) . snd) - processSubResults :: NonEmpty (RcvQueue, Either SMPClientError ()) -> STM () - processSubResults = mapM_ $ uncurry $ processSubResult c + processSubResults :: NonEmpty (RcvQueue, Either SMPClientError ()) -> STM (NonEmpty (Maybe ConnId)) + processSubResults = mapM (uncurry $ processSubResult c) resubscribe = resubscribeSMPSession c tSess `runReaderT` env activeClientSession :: AgentClient -> SMPTransportSession -> SessionId -> STM Bool From 7f9b013a138bc056f8e62a80f3026c0bb4ff039a Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Thu, 30 May 2024 12:59:01 +0300 Subject: [PATCH 02/10] delegate resubscribe UPs to subscribeQueues --- src/Simplex/Messaging/Agent/Client.hs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 74ae0055f..08912baa5 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -713,12 +713,10 @@ resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = removeSessVar v tSess smpSubWorkers reconnectSMPClient :: AgentClient -> SMPTransportSession -> NonEmpty RcvQueue -> AM' () -reconnectSMPClient c tSess@(_, srv, _) qs = handleNotify $ do +reconnectSMPClient c tSess qs = handleNotify $ do cs <- readTVarIO $ RQ.getConnections $ activeSubs c (rs, sessId_) <- subscribeQueues c $ L.toList qs let (errs, okConns) = partitionEithers $ map (\(RcvQueue {connId}, r) -> bimap (connId,) (const connId) r) rs - conns = filter (`M.notMember` cs) okConns - unless (null conns) $ notifySub c "" $ UP srv conns let (tempErrs, finalErrs) = partition (temporaryAgentError . snd) errs mapM_ (\(connId, e) -> notifySub c connId $ ERR e) finalErrs forM_ (listToMaybe tempErrs) $ \(connId, e) -> do From 2ce3c712e1cdf46e0bdecbc1da30c523fd604b49 Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Thu, 30 May 2024 17:20:51 +0300 Subject: [PATCH 03/10] fix functional tests --- tests/AgentTests/FunctionalAPITests.hs | 158 +++++++++++-------------- 1 file changed, 67 insertions(+), 91 deletions(-) diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index 2b21ff3f7..9cd0664c0 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -13,6 +13,7 @@ {-# LANGUAGE RankNTypes #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE TypeApplications #-} +{-# OPTIONS_GHC -Wno-ambiguous-fields #-} {-# OPTIONS_GHC -Wno-orphans #-} module AgentTests.FunctionalAPITests @@ -114,7 +115,7 @@ a ##> t = withTimeout a (`shouldBe` t) a =##> p = withTimeout a $ \r -> do unless (p r) $ liftIO $ putStrLn $ "value failed predicate: " <> show r - r `shouldSatisfy` p + withFrozenCallStack $ r `shouldSatisfy` p withTimeout :: (HasCallStack, MonadUnliftIO m) => m a -> (HasCallStack => a -> Expectation) -> m () withTimeout a test = @@ -134,6 +135,13 @@ sfGet c = withFrozenCallStack $ get' @'AESndFile c nGet :: (MonadIO m, HasCallStack) => AgentClient -> m (AEntityTransmission 'AENone) nGet c = withFrozenCallStack $ get' @'AENone c +nGetUP :: (MonadIO m, HasCallStack) => AgentClient -> m (AEntityTransmission 'AENone) +nGetUP c = withFrozenCallStack $ liftIO $ do + timeout 15000000 (pGet_ c True) >>= \case + Just (corrId, connId, APC _ cmd@UP {}) -> pure (corrId, connId, cmd) + Just (_, _, APC _ cmd) -> error $ "unexpected command " <> show cmd + Nothing -> error "timed out waiting for UP" + get' :: forall e m. (MonadIO m, AEntityI e, HasCallStack) => AgentClient -> m (AEntityTransmission e) get' c = withFrozenCallStack $ do (corrId, connId, APC e cmd) <- pGet c @@ -141,14 +149,18 @@ get' c = withFrozenCallStack $ do Just Refl -> pure (corrId, connId, cmd) _ -> error $ "unexpected command " <> show cmd -pGet :: forall m. MonadIO m => AgentClient -> m (ATransmission 'Agent) -pGet c = do +pGet :: forall m. (MonadIO m, HasCallStack) => AgentClient -> m (ATransmission 'Agent) +pGet c = withFrozenCallStack $ pGet_ c False + +pGet_ :: forall m. (MonadIO m, HasCallStack) => AgentClient -> Bool -> m (ATransmission 'Agent) +pGet_ c expectUp = withFrozenCallStack $ do t@(_, _, APC _ cmd) <- atomically (readTBQueue $ subQ c) case cmd of - CONNECT {} -> pGet c - DISCONNECT {} -> pGet c - ERR (BROKER _ NETWORK) -> pGet c - MWARN {} -> pGet c + CONNECT {} -> pGet_ c expectUp + DISCONNECT {} -> pGet_ c expectUp + ERR (BROKER _ NETWORK) -> pGet_ c expectUp + MWARN {} -> pGet_ c expectUp + UP {} | not expectUp -> pGet_ c expectUp _ -> pure t pattern CONF :: ConfirmationId -> [SMPServer] -> ConnInfo -> ACommand 'Agent e @@ -767,7 +779,7 @@ testAsyncServerOffline t = withAgentClients2 $ \alice bob -> do conns `shouldBe` [bobId] -- connection succeeds after server start withSmpServerStoreLogOn t testPort $ \_ -> runRight_ $ do - ("", "", UP srv1 conns1) <- nGet alice + ("", "", UP srv1 conns1) <- nGetUP alice liftIO $ do srv1 `shouldBe` testSMPServer conns1 `shouldBe` [bobId] @@ -807,8 +819,7 @@ testAllowConnectionClientRestart t = do withSmpServerConfigOn t cfg {storeLogFile = Just testStoreLogFile2} testPort2 $ \_ -> do runRight $ do - ("", "", UP _ _) <- nGet bob - + ("", "", UP _ _) <- nGetUP bob subscribeConnection alice2 bobId get alice2 ##> ("", bobId, CON) @@ -955,7 +966,7 @@ testDeliverClientRestart t = do withSmpServerStoreMsgLogOn t testPort $ \_ -> do runRight_ $ do - ("", "", UP _ _) <- nGet alice + ("", "", UP _ _) <- nGetUP alice subscribeConnection bob2 aliceId @@ -1075,8 +1086,8 @@ testExpireMessage t = 5 <- runRight $ sendMessage a bId SMP.noMsgFlags "2" -- this won't expire get a =##> \case ("", c, MERR 4 (BROKER _ e)) -> bId == c && (e == TIMEOUT || e == NETWORK); _ -> False withSmpServerStoreLogOn t testPort $ \_ -> runRight_ $ do - withUP a bId $ \case ("", _, SENT 5) -> True; _ -> False - withUP b aId $ \case ("", _, MsgErr 4 (MsgSkipped 3 3) "2") -> True; _ -> False + get a =##> \case ("", _, SENT 5) -> True; _ -> False + get b =##> \case ("", _, MsgErr 4 (MsgSkipped 3 3) "2") -> True; _ -> False ackMessage b aId 4 Nothing testExpireManyMessages :: HasCallStack => ATransport -> IO () @@ -1106,19 +1117,10 @@ testExpireManyMessages t = liftIO $ expected c e `shouldBe` True r -> error $ show r withSmpServerStoreLogOn t testPort $ \_ -> runRight_ $ do - withUP a bId $ \case ("", _, SENT 7) -> True; _ -> False - withUP b aId $ \case ("", _, MsgErr 4 (MsgSkipped 3 5) "4") -> True; _ -> False + get a =##> \case ("", _, SENT 7) -> True; _ -> False + get b =##> \case ("", _, MsgErr 4 (MsgSkipped 3 5) "4") -> True; _ -> False ackMessage b aId 4 Nothing -withUP :: AgentClient -> ConnId -> (AEntityTransmission 'AEConn -> Bool) -> ExceptT AgentErrorType IO () -withUP a bId p = - liftIO $ - getInAnyOrder - a - [ \case ("", "", APC SAENone (UP _ [c])) -> c == bId; _ -> False, - \case (corrId, c, APC SAEConn cmd) -> c == bId && p (corrId, c, cmd); _ -> False - ] - testExpireMessageQuota :: HasCallStack => ATransport -> IO () testExpireMessageQuota t = withSmpServerConfigOn t cfg {msgQueueQuota = 1} testPort $ \_ -> do a <- getSMPAgentClient' 1 agentCfg {quotaExceededTimeout = 1, messageRetryInterval = fastMessageRetryInterval} initAgentServers testDB @@ -1268,19 +1270,14 @@ testRatchetSyncServerOffline t = withAgentClients2 $ \alice bob -> do withSmpServerStoreMsgLogOn t testPort $ \_ -> do concurrently_ - (getInAnyOrder alice [ratchetSyncP' bobId RSAgreed, serverUpP]) - (getInAnyOrder bob2 [ratchetSyncP' aliceId RSAgreed, serverUpP]) + (pGet alice =##> ratchetSyncP' bobId RSAgreed) + (pGet bob2 =##> ratchetSyncP' aliceId RSAgreed) runRight_ $ do get alice =##> ratchetSyncP bobId RSOk get bob2 =##> ratchetSyncP aliceId RSOk exchangeGreetingsMsgIds alice bobId 12 bob2 aliceId 9 disposeAgentClient bob2 -serverUpP :: ATransmission 'Agent -> Bool -serverUpP = \case - ("", "", APC SAENone (UP _ _)) -> True - _ -> False - testRatchetSyncClientRestart :: HasCallStack => ATransport -> IO () testRatchetSyncClientRestart t = do alice <- getSMPAgentClient' 1 agentCfg initAgentServers testDB @@ -1295,7 +1292,7 @@ testRatchetSyncClientRestart t = do bob3 <- getSMPAgentClient' 3 agentCfg initAgentServers testDB2 withSmpServerStoreMsgLogOn t testPort $ \_ -> do runRight_ $ do - ("", "", UP _ _) <- nGet alice + ("", "", UP _ _) <- nGetUP alice subscribeConnection bob3 aliceId get alice =##> ratchetSyncP bobId RSAgreed get bob3 =##> ratchetSyncP aliceId RSAgreed @@ -1324,13 +1321,11 @@ testRatchetSyncSuspendForeground t = do foregroundAgent bob2 withSmpServerStoreMsgLogOn t testPort $ \_ -> do - concurrently_ - (getInAnyOrder alice [ratchetSyncP' bobId RSAgreed, serverUpP]) - (getInAnyOrder bob2 [ratchetSyncP' aliceId RSAgreed, serverUpP]) - runRight_ $ do - get alice =##> ratchetSyncP bobId RSOk - get bob2 =##> ratchetSyncP aliceId RSOk - exchangeGreetingsMsgIds alice bobId 12 bob2 aliceId 9 + get alice =##> ratchetSyncP bobId RSAgreed + get bob2 =##> ratchetSyncP aliceId RSAgreed + get alice =##> ratchetSyncP bobId RSOk + get bob2 =##> ratchetSyncP aliceId RSOk + runRight_ $ exchangeGreetingsMsgIds alice bobId 12 bob2 aliceId 9 disposeAgentClient alice disposeAgentClient bob disposeAgentClient bob2 @@ -1352,13 +1347,11 @@ testRatchetSyncSimultaneous t = do liftIO $ aRSS `shouldBe` RSStarted withSmpServerStoreMsgLogOn t testPort $ \_ -> do - concurrently_ - (getInAnyOrder alice [ratchetSyncP' bobId RSAgreed, serverUpP]) - (getInAnyOrder bob2 [ratchetSyncP' aliceId RSAgreed, serverUpP]) - runRight_ $ do - get alice =##> ratchetSyncP bobId RSOk - get bob2 =##> ratchetSyncP aliceId RSOk - exchangeGreetingsMsgIds alice bobId 12 bob2 aliceId 9 + get alice =##> ratchetSyncP bobId RSAgreed + get bob2 =##> ratchetSyncP aliceId RSAgreed + get alice =##> ratchetSyncP bobId RSOk + get bob2 =##> ratchetSyncP aliceId RSOk + runRight_ $ exchangeGreetingsMsgIds alice bobId 12 bob2 aliceId 9 disposeAgentClient alice disposeAgentClient bob disposeAgentClient bob2 @@ -1454,6 +1447,7 @@ testActiveClientNotDisconnected t = do where keepSubscribing :: AgentClient -> ConnId -> SystemTime -> ExceptT AgentErrorType IO () keepSubscribing alice connId ts = do + atomically $ void . flushTBQueue $ subQ alice -- drain queue so subscribeConnection may proceed with UPs ts' <- liftIO getSystemTime if milliseconds ts' - milliseconds ts < 2200 then do @@ -1504,13 +1498,11 @@ testSuspendingAgentCompleteSending t = withAgentClients2 $ \a b -> do liftIO $ suspendAgent b 5000000 withSmpServerStoreLogOn t testPort $ \_ -> runRight_ @AgentErrorType $ do - pGet b =##> \case ("", c, APC SAEConn (SENT 5)) -> c == aId; ("", "", APC _ UP {}) -> True; _ -> False - pGet b =##> \case ("", c, APC SAEConn (SENT 5)) -> c == aId; ("", "", APC _ UP {}) -> True; _ -> False - pGet b =##> \case ("", c, APC SAEConn (SENT 6)) -> c == aId; ("", "", APC _ UP {}) -> True; _ -> False + pGet b =##> \case ("", c, APC SAEConn (SENT 5)) -> c == aId; _ -> False + pGet b =##> \case ("", c, APC SAEConn (SENT 6)) -> c == aId; _ -> False ("", "", SUSPENDED) <- nGet b - pGet a =##> \case ("", c, APC _ (Msg "hello too")) -> c == bId; ("", "", APC _ UP {}) -> True; _ -> False - pGet a =##> \case ("", c, APC _ (Msg "hello too")) -> c == bId; ("", "", APC _ UP {}) -> True; _ -> False + pGet a =##> \case ("", c, APC _ (Msg "hello too")) -> c == bId; _ -> False ackMessage a bId 5 Nothing get a =##> \case ("", c, Msg "how are you?") -> c == bId; _ -> False ackMessage a bId 6 Nothing @@ -1536,7 +1528,7 @@ testSuspendingAgentTimeout t = withAgentClients2 $ \a b -> do testBatchedSubscriptions :: Int -> Int -> ATransport -> IO () testBatchedSubscriptions nCreate nDel t = - withAgentClientsCfgServers2 agentCfg agentCfg initAgentServers2 $ \a b -> do + withAgentClientsCfgServers2 agentCfgN agentCfgN initAgentServers2 $ \a b -> do conns <- runServers $ do conns <- replicateM nCreate $ makeConnection_ PQSupportOff a b forM_ conns $ \(aId, bId) -> exchangeGreetings_ PQEncOff a bId b aId @@ -1550,10 +1542,10 @@ testBatchedSubscriptions nCreate nDel t = ("", "", DOWN {}) <- nGet b ("", "", DOWN {}) <- nGet b runServers $ do - ("", "", UP {}) <- nGet a - ("", "", UP {}) <- nGet a - ("", "", UP {}) <- nGet b - ("", "", UP {}) <- nGet b + ("", "", UP {}) <- nGetUP a + ("", "", UP {}) <- nGetUP a + ("", "", UP {}) <- nGetUP b + ("", "", UP {}) <- nGetUP b liftIO $ threadDelay 1000000 let (aIds, bIds) = unzip conns conns' = drop nDel conns @@ -1569,6 +1561,8 @@ testBatchedSubscriptions nCreate nDel t = deleteFail a bIds' deleteFail b aIds' where + agentCfgN :: AgentConfig + agentCfgN = agentCfg {tbqSize = fromIntegral nCreate} -- without reader thread sub notifications would be flushed until all batches finish, blocking on `notifySub` subscribe :: AgentClient -> [ConnId] -> ExceptT AgentErrorType IO () subscribe c cs = do r <- subscribeConnections c cs @@ -1797,13 +1791,7 @@ testWaitDelivery t = get alice ##> ("", bobId, SENT $ baseId + 3) get alice ##> ("", bobId, SENT $ baseId + 4) get alice =##> \case ("", cId, DEL_CONN) -> cId == bobId; _ -> False - - liftIO $ - getInAnyOrder - bob - [ \case ("", "", APC SAENone (UP _ [cId])) -> cId == aliceId; _ -> False, - \case ("", cId, APC SAEConn (Msg "how are you?")) -> cId == aliceId; _ -> False - ] + get bob =##> \case ("", cId, Msg "how are you?") -> cId == aliceId; _ -> False ackMessage bob aliceId (baseId + 3) Nothing get bob =##> \case ("", c, Msg "message 1") -> c == aliceId; _ -> False ackMessage bob aliceId (baseId + 4) Nothing @@ -1894,7 +1882,7 @@ testWaitDeliveryTimeout t = liftIO $ threadDelay 100000 withSmpServerStoreLogOn t testPort $ \_ -> do - nGet bob =##> \case ("", "", UP _ [cId]) -> cId == aliceId; _ -> False + nGetUP bob =##> \case ("", "", UP _ [cId]) -> cId == aliceId; _ -> False liftIO $ noMessages alice "nothing else should be delivered to alice" liftIO $ noMessages bob "nothing else should be delivered to bob" where @@ -1935,12 +1923,7 @@ testWaitDeliveryTimeout2 t = get alice ##> ("", bobId, SENT $ baseId + 3) -- "message 1" not delivered - liftIO $ - getInAnyOrder - bob - [ \case ("", "", APC SAENone (UP _ [cId])) -> cId == aliceId; _ -> False, - \case ("", cId, APC SAEConn (Msg "how are you?")) -> cId == aliceId; _ -> False - ] + get bob =##> \case ("", cId, Msg "how are you?") -> cId == aliceId; _ -> False liftIO $ noMessages alice "nothing else should be delivered to alice" liftIO $ noMessages bob "nothing else should be delivered to bob" where @@ -1964,14 +1947,8 @@ testJoinConnectionAsyncReplyError t = do withSmpServerOn t testPort2 $ do get b =##> \case ("2", c, OK) -> c == aId; _ -> False confId <- withSmpServerStoreLogOn t testPort $ \_ -> do - pGet a >>= \case - ("", "", APC _ (UP _ [_])) -> do - ("", _, CONF confId _ "bob's connInfo") <- get a - pure confId - ("", _, APC _ (CONF confId _ "bob's connInfo")) -> do - ("", "", UP _ [_]) <- nGet a - pure confId - r -> error $ "unexpected response " <> show r + ("", _, CONF confId _ "bob's connInfo") <- get a + pure confId nGet a =##> \case ("", "", DOWN _ [c]) -> c == bId; _ -> False runRight_ $ do allowConnectionAsync a "3" bId confId "alice's connInfo" @@ -1979,8 +1956,7 @@ testJoinConnectionAsyncReplyError t = do ConnectionStats {rcvQueuesInfo = [RcvQueueInfo {}], sndQueuesInfo = [SndQueueInfo {}]} <- getConnectionServers b aId pure () withSmpServerStoreLogOn t testPort $ \_ -> runRight_ $ do - pGet a =##> \case ("3", c, APC _ OK) -> c == bId; ("", "", APC _ (UP _ [c])) -> c == bId; _ -> False - pGet a =##> \case ("3", c, APC _ OK) -> c == bId; ("", "", APC _ (UP _ [c])) -> c == bId; _ -> False + pGet a =##> \case ("3", c, APC _ OK) -> c == bId; _ -> False get a ##> ("", bId, CON) get b ##> ("", aId, INFO "alice's connInfo") get b ##> ("", aId, CON) @@ -2032,8 +2008,8 @@ testUsersNoServer t = withAgentClientsCfg2 aCfg agentCfg $ \a b -> do nGet a =##> \case ("", "", DEL_USER u) -> u == auId; _ -> False liftIO $ noMessages a "nothing else should be delivered to alice" withSmpServerStoreLogOn t testPort $ \_ -> runRight_ $ do - nGet a =##> \case ("", "", UP _ [c]) -> c == bId; _ -> False - nGet b =##> \case ("", "", UP _ cs) -> length cs == 2; _ -> False + nGetUP a =##> \case ("", "", UP _ [c]) -> c == bId; _ -> False + nGetUP b =##> \case ("", "", UP _ cs) -> length cs == 2; _ -> False exchangeGreetingsMsgId 6 a bId b aId where aCfg = agentCfg {initialCleanupDelay = 10000, cleanupInterval = 10000, deleteErrorCount = 3} @@ -2600,7 +2576,7 @@ testTwoUsers = withAgentClients2 $ \a b -> do liftIO $ setNetworkConfig a nc {sessionMode = TSMEntity} liftIO $ threadDelay 250000 ("", "", DOWN _ _) <- nGet a - ("", "", UP _ _) <- nGet a + ("", "", UP _ _) <- nGetUP a a `hasClients` 2 exchangeGreetingsMsgId 6 a bId1 b aId1 @@ -2610,8 +2586,8 @@ testTwoUsers = withAgentClients2 $ \a b -> do liftIO $ threadDelay 250000 ("", "", DOWN _ _) <- nGet a ("", "", DOWN _ _) <- nGet a - ("", "", UP _ _) <- nGet a - ("", "", UP _ _) <- nGet a + ("", "", UP _ _) <- nGetUP a + ("", "", UP _ _) <- nGetUP a a `hasClients` 1 aUserId2 <- createUser a [noAuthSrv testSMPServer] [noAuthSrv testXFTPServer] @@ -2625,8 +2601,8 @@ testTwoUsers = withAgentClients2 $ \a b -> do liftIO $ threadDelay 250000 ("", "", DOWN _ _) <- nGet a ("", "", DOWN _ _) <- nGet a - ("", "", UP _ _) <- nGet a - ("", "", UP _ _) <- nGet a + ("", "", UP _ _) <- nGetUP a + ("", "", UP _ _) <- nGetUP a a `hasClients` 4 exchangeGreetingsMsgId 8 a bId1 b aId1 exchangeGreetingsMsgId 8 a bId1' b aId1' @@ -2639,10 +2615,10 @@ testTwoUsers = withAgentClients2 $ \a b -> do ("", "", DOWN _ _) <- nGet a ("", "", DOWN _ _) <- nGet a ("", "", DOWN _ _) <- nGet a - ("", "", UP _ _) <- nGet a - ("", "", UP _ _) <- nGet a - ("", "", UP _ _) <- nGet a - ("", "", UP _ _) <- nGet a + ("", "", UP _ _) <- nGetUP a + ("", "", UP _ _) <- nGetUP a + ("", "", UP _ _) <- nGetUP a + ("", "", UP _ _) <- nGetUP a a `hasClients` 2 exchangeGreetingsMsgId 10 a bId1 b aId1 exchangeGreetingsMsgId 10 a bId1' b aId1' From c76d16758ef0fc4b220c07085fe0c450237008a2 Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Thu, 30 May 2024 17:26:14 +0300 Subject: [PATCH 04/10] log stalled subQ --- src/Simplex/Messaging/Agent/Client.hs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 08912baa5..6633a61d2 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -1351,12 +1351,13 @@ subscribeQueues c qs = do then do when (any isNothing subResults) resubscribe let up = catMaybes $ L.toList subResults - unless (null up) $ notifySub c "" $ UP srv up + unless (null up) $ notifyUP up pure rs else do logWarn "subcription batch result for replaced SMP client, resubscribing" resubscribe $> L.map (second $ \_ -> Left PCENetworkError) rs where + notifyUP up = maybe (logError "sndQ full" >> notifyUP up) pure =<< timeout 30000000 (notifySub c "" $ UP srv up) tSess@(_, srv, _) = transportSession' smp sessId = sessionId $ thParams smp processSubResults :: NonEmpty (RcvQueue, Either SMPClientError ()) -> STM (NonEmpty (Maybe ConnId)) From dcbda5ab693e969e0d476bed80640b7c3f8294d0 Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Thu, 30 May 2024 17:30:44 +0300 Subject: [PATCH 05/10] fix more nGet/UP --- tests/AgentTests/FunctionalAPITests.hs | 1 + tests/AgentTests/NotificationTests.hs | 13 +++++++------ tests/SMPProxyTests.hs | 2 +- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index 9cd0664c0..6704a0fd8 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -39,6 +39,7 @@ module AgentTests.FunctionalAPITests rfGet, sfGet, nGet, + nGetUP, (##>), (=##>), pattern CON, diff --git a/tests/AgentTests/NotificationTests.hs b/tests/AgentTests/NotificationTests.hs index a7c3fb25a..b2e9fee52 100644 --- a/tests/AgentTests/NotificationTests.hs +++ b/tests/AgentTests/NotificationTests.hs @@ -19,6 +19,7 @@ import AgentTests.FunctionalAPITests joinConnection, makeConnection, nGet, + nGetUP, runRight, runRight_, sendMessage, @@ -594,8 +595,8 @@ testNotificationsSMPRestart t APNSMockServer {apnsQ} = withAgentClients2 $ \alic nGet bob =##> \case ("", "", DOWN _ [c]) -> c == aliceId; _ -> False withSmpServerStoreLogOn t testPort $ \threadId -> runRight_ $ do - nGet alice =##> \case ("", "", UP _ [c]) -> c == bobId; _ -> False - nGet bob =##> \case ("", "", UP _ [c]) -> c == aliceId; _ -> False + nGetUP alice =##> \case ("", "", UP _ [c]) -> c == bobId; _ -> False + nGetUP bob =##> \case ("", "", UP _ [c]) -> c == aliceId; _ -> False liftIO $ threadDelay 1000000 5 <- sendMessage bob aliceId (SMP.MsgFlags True) "hello again" get bob ##> ("", aliceId, SENT 5) @@ -628,11 +629,11 @@ testNotificationsSMPRestartBatch n t APNSMockServer {apnsQ} = liftIO $ length (acs1 <> acs2) `shouldBe` length conns runServers $ do - ("", "", UP _ bcs1) <- nGet a - ("", "", UP _ bcs2) <- nGet a + ("", "", UP _ bcs1) <- nGetUP a + ("", "", UP _ bcs2) <- nGetUP a liftIO $ length (bcs1 <> bcs2) `shouldBe` length conns - ("", "", UP _ acs1) <- nGet b - ("", "", UP _ acs2) <- nGet b + ("", "", UP _ acs1) <- nGetUP b + ("", "", UP _ acs2) <- nGetUP b liftIO $ length (acs1 <> acs2) `shouldBe` length conns liftIO $ threadDelay 1500000 forM_ conns $ \(aliceId, bobId) -> do diff --git a/tests/SMPProxyTests.hs b/tests/SMPProxyTests.hs index 036c8b203..ea09e4045 100644 --- a/tests/SMPProxyTests.hs +++ b/tests/SMPProxyTests.hs @@ -348,7 +348,7 @@ agentViaProxyRetryOffline = do withServer2 = withServer_ testStoreLogFile2 testStoreMsgsFile2 testPort2 withServer_ storeLog storeMsgs port = withSmpServerConfigOn (transport @TLS) proxyCfg {storeLogFile = Just storeLog, storeMsgsFile = Just storeMsgs} port - a `up` cId = nGet a =##> \case ("", "", UP _ [c]) -> c == cId; _ -> False + a `up` cId = nGetUP a =##> \case ("", "", UP _ [c]) -> c == cId; _ -> False a `down` cId = nGet a =##> \case ("", "", DOWN _ [c]) -> c == cId; _ -> False aCfg = agentProxyCfg {messageRetryInterval = fastMessageRetryInterval} baseId = 3 From 1b183776b64b1750a0bc18ab6a78788f869195c8 Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Thu, 30 May 2024 17:44:04 +0300 Subject: [PATCH 06/10] fix agent tests --- tests/AgentTests.hs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/AgentTests.hs b/tests/AgentTests.hs index b200c3933..c9e008a26 100644 --- a/tests/AgentTests.hs +++ b/tests/AgentTests.hs @@ -93,10 +93,10 @@ type AEntityTransmission p e = (ACorrId, ConnId, ACommand p e) type AEntityTransmissionOrError p e = (ACorrId, ConnId, Either AgentErrorType (ACommand p e)) -tGetAgent :: Transport c => c -> IO (AEntityTransmissionOrError 'Agent 'AEConn) +tGetAgent :: (Transport c, HasCallStack) => c -> IO (AEntityTransmissionOrError 'Agent 'AEConn) tGetAgent = tGetAgent' True -tGetAgent' :: forall c e. (Transport c, AEntityI e) => Bool -> c -> IO (AEntityTransmissionOrError 'Agent e) +tGetAgent' :: forall c e. (Transport c, AEntityI e, HasCallStack) => Bool -> c -> IO (AEntityTransmissionOrError 'Agent e) tGetAgent' skipErr h = do (corrId, connId, cmdOrErr) <- pGetAgent skipErr h case cmdOrErr of @@ -111,6 +111,7 @@ pGetAgent skipErr h = do case cmdOrErr of Right (APC _ CONNECT {}) -> pGetAgent skipErr h Right (APC _ DISCONNECT {}) -> pGetAgent skipErr h + Right (APC _ UP {}) -> pGetAgent skipErr h Right (APC _ (ERR (BROKER _ NETWORK))) | skipErr -> pGetAgent skipErr h cmd -> pure (corrId, connId, cmd) @@ -477,8 +478,7 @@ testMsgDeliveryAgentRestart t bob = do (corrId == "3" && cmd == OK) || (corrId == "" && cmd == SENT 5) _ -> False - bob <#=? \case ("", "alice", APC _ (Msg "hello again")) -> True; ("", "", APC _ (UP s ["alice"])) -> s == server; _ -> False - bob <#=? \case ("", "alice", APC _ (Msg "hello again")) -> True; ("", "", APC _ (UP s ["alice"])) -> s == server; _ -> False + bob <#=? \case ("", "alice", APC _ (Msg "hello again")) -> True; _ -> False bob #: ("12", "alice", "ACK 5") #> ("12", "alice", OK) removeFile testStoreLogFile From fc48a5f5458ce7b3026d4cbbe357e7f703561394 Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Thu, 30 May 2024 17:44:15 +0300 Subject: [PATCH 07/10] restore concurrently_ --- tests/AgentTests/FunctionalAPITests.hs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index 6704a0fd8..97da447b3 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -1322,8 +1322,9 @@ testRatchetSyncSuspendForeground t = do foregroundAgent bob2 withSmpServerStoreMsgLogOn t testPort $ \_ -> do - get alice =##> ratchetSyncP bobId RSAgreed - get bob2 =##> ratchetSyncP aliceId RSAgreed + concurrently_ + (get alice =##> ratchetSyncP bobId RSAgreed) + (get bob2 =##> ratchetSyncP aliceId RSAgreed) get alice =##> ratchetSyncP bobId RSOk get bob2 =##> ratchetSyncP aliceId RSOk runRight_ $ exchangeGreetingsMsgIds alice bobId 12 bob2 aliceId 9 @@ -1348,8 +1349,9 @@ testRatchetSyncSimultaneous t = do liftIO $ aRSS `shouldBe` RSStarted withSmpServerStoreMsgLogOn t testPort $ \_ -> do - get alice =##> ratchetSyncP bobId RSAgreed - get bob2 =##> ratchetSyncP aliceId RSAgreed + concurrently_ + (get alice =##> ratchetSyncP bobId RSAgreed) + (get bob2 =##> ratchetSyncP aliceId RSAgreed) get alice =##> ratchetSyncP bobId RSOk get bob2 =##> ratchetSyncP aliceId RSOk runRight_ $ exchangeGreetingsMsgIds alice bobId 12 bob2 aliceId 9 From 8650fcc05c19ec57170ac73d5e471957392af42d Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Thu, 30 May 2024 17:54:36 +0300 Subject: [PATCH 08/10] more redundant UPs --- tests/AgentTests.hs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/tests/AgentTests.hs b/tests/AgentTests.hs index c9e008a26..164c9fce5 100644 --- a/tests/AgentTests.hs +++ b/tests/AgentTests.hs @@ -408,7 +408,6 @@ testMsgDeliveryServerRestart (alice, aPQ) (bob, bPQ) = do withServer $ do bob <# ("", "alice", SENT 5) - alice <#. ("", "", UP server ["bob"]) alice <#= \case ("", "bob", Msg' _ pq' "hello again") -> pq == pq'; _ -> False alice #: ("12", "bob", "ACK 5") #> ("12", "bob", OK) @@ -433,10 +432,8 @@ testServerConnectionAfterError t _ = do bob #:! ("1", "alice", "SUB") =#> \case ("1", "alice", ERR (BROKER _ e)) -> e == NETWORK || e == TIMEOUT; _ -> False alice #:! ("1", "bob", "SUB") =#> \case ("1", "bob", ERR (BROKER _ e)) -> e == NETWORK || e == TIMEOUT; _ -> False withServer $ do - alice <#=? \case ("", "bob", APC SAEConn (SENT 4)) -> True; ("", "", APC _ (UP s ["bob"])) -> s == server; _ -> False - alice <#=? \case ("", "bob", APC SAEConn (SENT 4)) -> True; ("", "", APC _ (UP s ["bob"])) -> s == server; _ -> False - bob <#=? \case ("", "alice", APC _ (Msg "hello")) -> True; ("", "", APC _ (UP s ["alice"])) -> s == server; _ -> False - bob <#=? \case ("", "alice", APC _ (Msg "hello")) -> True; ("", "", APC _ (UP s ["alice"])) -> s == server; _ -> False + alice <#=? \case ("", "bob", APC SAEConn (SENT 4)) -> True; _ -> False + bob <#=? \case ("", "alice", APC _ (Msg "hello")) -> True; _ -> False bob #: ("2", "alice", "ACK 4") #> ("2", "alice", OK) alice #: ("1", "bob", "SEND F 11\nhello again") #> ("1", "bob", MID 5) alice <# ("", "bob", SENT 5) From 9666339699f36c8cbc009ac2ae79b17786c50e57 Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Thu, 30 May 2024 18:28:19 +0300 Subject: [PATCH 09/10] minify --- src/Simplex/Messaging/Agent/Client.hs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/Simplex/Messaging/Agent/Client.hs b/src/Simplex/Messaging/Agent/Client.hs index 6633a61d2..6a5495457 100644 --- a/src/Simplex/Messaging/Agent/Client.hs +++ b/src/Simplex/Messaging/Agent/Client.hs @@ -684,6 +684,9 @@ smpClientDisconnected c@AgentClient {active, smpClients, smpProxiedRelays} tSess atomically $ mapM_ (releaseGetLock c) qs runReaderT (resubscribeSMPSession c tSess) env +notifySub :: forall e m. (AEntityI e, MonadIO m) => AgentClient -> ConnId -> ACommand 'Agent e -> m () +notifySub c connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, APC (sAEntity @e) cmd) + resubscribeSMPSession :: AgentClient -> SMPTransportSession -> AM' () resubscribeSMPSession c@AgentClient {smpSubWorkers, workerSeq} tSess = atomically getWorkerVar >>= mapM_ (either newSubWorker (\_ -> pure ())) @@ -730,9 +733,6 @@ reconnectSMPClient c tSess qs = handleNotify $ do handleNotify :: AM' () -> AM' () handleNotify = E.handleAny $ notifySub c "" . ERR . INTERNAL . show -notifySub :: forall e m. (AEntityI e, MonadIO m) => AgentClient -> ConnId -> ACommand 'Agent e -> m () -notifySub c connId cmd = atomically $ writeTBQueue (subQ c) ("", connId, APC (sAEntity @e) cmd) - getNtfServerClient :: AgentClient -> NtfTransportSession -> AM NtfClient getNtfServerClient c@AgentClient {active, ntfClients, workerSeq} tSess@(userId, srv, _) = do unlessM (readTVarIO active) . throwError $ INACTIVE From 12df6fbf4c38383033ba99fb414d09d777bf1596 Mon Sep 17 00:00:00 2001 From: Alexander Bondarenko <486682+dpwiz@users.noreply.github.com> Date: Thu, 30 May 2024 18:28:39 +0300 Subject: [PATCH 10/10] reword comment --- tests/AgentTests/FunctionalAPITests.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/AgentTests/FunctionalAPITests.hs b/tests/AgentTests/FunctionalAPITests.hs index 97da447b3..c82ae7c62 100644 --- a/tests/AgentTests/FunctionalAPITests.hs +++ b/tests/AgentTests/FunctionalAPITests.hs @@ -1565,7 +1565,7 @@ testBatchedSubscriptions nCreate nDel t = deleteFail b aIds' where agentCfgN :: AgentConfig - agentCfgN = agentCfg {tbqSize = fromIntegral nCreate} -- without reader thread sub notifications would be flushed until all batches finish, blocking on `notifySub` + agentCfgN = agentCfg {tbqSize = fromIntegral nCreate} -- without a reader thread subscriber would block on notifySub when the subQ gets full subscribe :: AgentClient -> [ConnId] -> ExceptT AgentErrorType IO () subscribe c cs = do r <- subscribeConnections c cs