diff --git a/package.yaml b/package.yaml index ef747da0d..a7c530bf8 100644 --- a/package.yaml +++ b/package.yaml @@ -47,6 +47,7 @@ dependencies: - direct-sqlcipher == 2.3.* - directory == 1.3.* - filepath == 1.4.* + - hashable == 1.4.* - hourglass == 0.2.* - http-types == 0.12.* - http2 >= 4.2.2 && < 4.3 @@ -61,6 +62,7 @@ dependencies: - network-udp >= 0.0 && < 0.1 - optparse-applicative >= 0.15 && < 0.17 - process == 1.6.* + - psqueues == 0.2.8.* - random >= 1.1 && < 1.3 - simple-logger == 0.1.* - socks == 0.6.* diff --git a/simplexmq.cabal b/simplexmq.cabal index bbe7583fa..e2c6ac032 100644 --- a/simplexmq.cabal +++ b/simplexmq.cabal @@ -174,6 +174,8 @@ library Simplex.Messaging.Server.QueueStore.QueueInfo Simplex.Messaging.Server.QueueStore.STM Simplex.Messaging.Server.Stats + Simplex.Messaging.Server.Stats.Client + Simplex.Messaging.Server.Stats.Timeline Simplex.Messaging.Server.StoreLog Simplex.Messaging.ServiceScheme Simplex.Messaging.Session @@ -233,6 +235,7 @@ library , direct-sqlcipher ==2.3.* , directory ==1.3.* , filepath ==1.4.* + , hashable ==1.4.* , hourglass ==0.2.* , http-types ==0.12.* , http2 >=4.2.2 && <4.3 @@ -247,6 +250,7 @@ library , network-udp ==0.0.* , optparse-applicative >=0.15 && <0.17 , process ==1.6.* + , psqueues ==0.2.8.* , random >=1.1 && <1.3 , simple-logger ==0.1.* , socks ==0.6.* @@ -307,6 +311,7 @@ executable ntf-server , direct-sqlcipher ==2.3.* , directory ==1.3.* , filepath ==1.4.* + , hashable ==1.4.* , hourglass ==0.2.* , http-types ==0.12.* , http2 >=4.2.2 && <4.3 @@ -321,6 +326,7 @@ executable ntf-server , network-udp ==0.0.* , optparse-applicative >=0.15 && <0.17 , process ==1.6.* + , psqueues ==0.2.8.* , random >=1.1 && <1.3 , simple-logger ==0.1.* , simplexmq @@ -386,6 +392,7 @@ executable smp-server , directory ==1.3.* , file-embed , filepath ==1.4.* + , hashable ==1.4.* , hourglass ==0.2.* , http-types ==0.12.* , http2 >=4.2.2 && <4.3 @@ -400,6 +407,7 @@ executable smp-server , network-udp ==0.0.* , optparse-applicative >=0.15 && <0.17 , process ==1.6.* + , psqueues ==0.2.8.* , random >=1.1 && <1.3 , simple-logger ==0.1.* , simplexmq @@ -464,6 +472,7 @@ executable xftp , direct-sqlcipher ==2.3.* , directory ==1.3.* , filepath ==1.4.* + , hashable ==1.4.* , hourglass ==0.2.* , http-types ==0.12.* , http2 >=4.2.2 && <4.3 @@ -478,6 +487,7 @@ executable xftp , network-udp ==0.0.* , optparse-applicative >=0.15 && <0.17 , process ==1.6.* + , psqueues ==0.2.8.* , random >=1.1 && <1.3 , simple-logger ==0.1.* , simplexmq @@ -539,6 +549,7 @@ executable xftp-server , direct-sqlcipher ==2.3.* , directory ==1.3.* , filepath ==1.4.* + , hashable ==1.4.* , hourglass ==0.2.* , http-types ==0.12.* , http2 >=4.2.2 && <4.3 @@ -553,6 +564,7 @@ executable xftp-server , network-udp ==0.0.* , optparse-applicative >=0.15 && <0.17 , process ==1.6.* + , psqueues ==0.2.8.* , random >=1.1 && <1.3 , simple-logger ==0.1.* , simplexmq @@ -650,6 +662,7 @@ test-suite simplexmq-test , directory ==1.3.* , filepath ==1.4.* , generic-random ==1.5.* + , hashable ==1.4.* , hourglass ==0.2.* , hspec ==2.11.* , hspec-core ==2.11.* @@ -667,6 +680,7 @@ test-suite simplexmq-test , network-udp ==0.0.* , optparse-applicative >=0.15 && <0.17 , process ==1.6.* + , psqueues ==0.2.8.* , random >=1.1 && <1.3 , silently ==1.2.* , simple-logger ==0.1.* diff --git a/src/Simplex/Messaging/Server.hs b/src/Simplex/Messaging/Server.hs index 88e3bf4a3..9b7433471 100644 --- a/src/Simplex/Messaging/Server.hs +++ b/src/Simplex/Messaging/Server.hs @@ -52,14 +52,19 @@ import Data.ByteString.Char8 (ByteString) import qualified Data.ByteString.Char8 as B import qualified Data.ByteString.Lazy.Char8 as LB import Data.Either (fromRight, partitionEithers) +import Data.Foldable (toList) import Data.Functor (($>)) import Data.Int (Int64) +import Data.IntMap.Strict (IntMap) import qualified Data.IntMap.Strict as IM +import qualified Data.IntSet as IS import Data.List (intercalate, mapAccumR) import Data.List.NonEmpty (NonEmpty (..)) import qualified Data.List.NonEmpty as L import qualified Data.Map.Strict as M -import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing) +import Data.Set (Set) +import qualified Data.Set as S +import Data.Maybe (catMaybes, fromMaybe, isJust, isNothing, listToMaybe) import qualified Data.Text as T import Data.Text.Encoding (decodeLatin1) import Data.Time.Clock (UTCTime (..), diffTimeToPicoseconds, getCurrentTime) @@ -85,6 +90,7 @@ import Simplex.Messaging.Server.QueueStore import Simplex.Messaging.Server.QueueStore.QueueInfo import Simplex.Messaging.Server.QueueStore.STM as QS import Simplex.Messaging.Server.Stats +import qualified Simplex.Messaging.Server.Stats.Client as CS import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM @@ -94,11 +100,12 @@ import Simplex.Messaging.Transport.Server import Simplex.Messaging.Util import Simplex.Messaging.Version import System.Exit (exitFailure) +import System.FilePath (takeDirectory) import System.IO (hPrint, hPutStrLn, hSetNewlineMode, universalNewlineMode) import System.Mem.Weak (deRefWeak) import UnliftIO (timeout) import UnliftIO.Concurrent -import UnliftIO.Directory (doesFileExist, renameFile) +import UnliftIO.Directory (createDirectoryIfMissing, doesFileExist, renameFile) import UnliftIO.Exception import UnliftIO.IO import UnliftIO.STM @@ -135,7 +142,7 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do ( serverThread s "server subscribedQ" subscribedQ subscribers subscriptions cancelSub : serverThread s "server ntfSubscribedQ" ntfSubscribedQ Env.notifiers ntfSubscriptions (\_ -> pure ()) : receiveFromProxyAgent pa - : map runServer transports <> expireMessagesThread_ cfg <> serverStatsThread_ cfg <> controlPortThread_ cfg + : map runServer transports <> expireMessagesThread_ cfg <> serverStatsThread_ cfg <> rateStatsThread_ cfg <> controlPortThread_ cfg ) `finally` withLock' (savingLock s) "final" (saveServer False >> closeServer) where @@ -223,6 +230,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do [logServerStats logStatsStartTime interval serverStatsLogFile] serverStatsThread_ _ = [] + rateStatsThread_ :: ServerConfig -> [M ()] + rateStatsThread_ ServerConfig {rateStatsLength = nBuckets, rateStatsInterval = Just bucketWidth, logStatsInterval = Just logInterval, logStatsStartTime, rateStatsLogFile} = + [ monitorServerRates nBuckets bucketWidth, -- roll windows, collect counters, runs at a faster rate so the measurements can be used for online anomaly detection + logServerRates logStatsStartTime logInterval rateStatsLogFile -- log current distributions once in a while + ] + rateStatsThread_ _ = [] + logServerStats :: Int64 -> Int64 -> FilePath -> M () logServerStats startAt logInterval statsFilePath = do labelMyThread "logServerStats" @@ -304,6 +318,66 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do showProxyStats ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther} = [show _pRequests, show _pSuccesses, show _pErrorsConnect, show _pErrorsCompat, show _pErrorsOther] + monitorServerRates :: Int -> Int64 -> M () + monitorServerRates nBuckets bucketWidth = do + labelMyThread "monitorServerRates" + stats' <- asks clientStats + rates' <- asks serverRates + liftIO . forever $ do + -- now <- getCurrentTime + -- TODO: calculate delay for the next bucket closing time + threadDelay' $ bucketWidth * 1000000 + stats <- readTVarIO stats' >>= mapM (CS.readClientStatsData readTVarIO) + let !rates = distribution . histogram <$> collect stats + atomically . modifyTVar' rates' $ \old -> + let timeline = take nBuckets old + in length timeline `seq` rates : timeline + where + collect :: IntMap CS.ClientStatsData -> CS.ClientStatsC (IntMap Int) + collect = IM.foldlWithKey' toColumns (CS.clientStatsC IM.empty) + where + toColumns acc statsId csd = + CS.ClientStatsC + { peerAddressesC = IS.size _peerAddresses +> CS.peerAddressesC acc, + socketCountC = _socketCount +> CS.socketCountC acc, + -- created/updated skpped + qCreatedC = S.size _qCreated +> CS.qCreatedC acc, + qSentSignedC = S.size _qSentSigned +> CS.qSentSignedC acc, + msgSentSignedC = _msgSentSigned +> CS.msgSentSignedC acc, + msgSentUnsignedC = _msgSentUnsigned +> CS.msgSentUnsignedC acc, + msgDeliveredSignedC = _msgDeliveredSigned +> CS.msgDeliveredSignedC acc, + proxyRelaysRequestedC = _proxyRelaysRequested +> CS.proxyRelaysRequestedC acc, + proxyRelaysConnectedC = _proxyRelaysConnected +> CS.proxyRelaysConnectedC acc, + msgSentViaProxyC = _msgSentViaProxy +> CS.msgSentViaProxyC acc + } + where + (+>) = IM.insertWith (+) statsId + CS.ClientStatsData {_peerAddresses, _socketCount, _qCreated, _qSentSigned, _msgSentSigned, _msgSentUnsigned, _msgDeliveredSigned, _proxyRelaysRequested, _proxyRelaysConnected, _msgSentViaProxy} = csd + + logServerRates :: Int64 -> Int64 -> FilePath -> M () + logServerRates startAt logInterval statsFilePath = do + labelMyThread "logServerStats" + liftIO . unlessM (doesFileExist statsFilePath) $ do + createDirectoryIfMissing True (takeDirectory statsFilePath) + B.writeFile statsFilePath $ B.intercalate "," csvLabels <> "\n" + initialDelay <- (startAt -) . fromIntegral . (`div` 1000000_000000) . diffTimeToPicoseconds . utctDayTime <$> liftIO getCurrentTime + liftIO $ putStrLn $ "server rates log enabled: " <> statsFilePath + liftIO $ threadDelay' $ 1000000 * (initialDelay + if initialDelay < 0 then 86400 else 0) + let interval = 1000000 * logInterval + rates' <- asks serverRates + liftIO . forever $ do + -- write the thing + rates <- readTVarIO rates' + forM_ (listToMaybe rates) $ \cs -> do + ts <- getCurrentTime + let values = concatMap (map bshow . toList) cs + withFile statsFilePath AppendMode $ \h -> liftIO $ do + hSetBuffering h LineBuffering + B.hPut h $ B.intercalate "," (strEncode ts : values) <> "\n" + threadDelay' interval + where + csvLabels = "ts" : concatMap (\s -> concatMap (\d -> [s <> "." <> d]) distributionLabels) CS.clientStatsLabels + runClient :: Transport c => C.APrivateSignKey -> TProxy c -> c -> M () runClient signKey tp h = do kh <- asks serverIdentity @@ -396,6 +470,13 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do putProxyStat "pMsgFwds" pMsgFwds putProxyStat "pMsgFwdsOwn" pMsgFwdsOwn putStat "pMsgFwdsRecv" pMsgFwdsRecv + CPStatsClients -> withAdminRole $ do + stats' <- unliftIO u (asks clientStats) >>= readTVarIO + B.hPutStr h "peerAddresses,socketCount,createdAt,updatedAt,qCreated,qSentSigned,msgSentSigned,msgSentUnsigned,msgDeliveredSigned,proxyRelaysRequested,proxyRelaysConnected,msgSentViaProxy\n" + forM_ stats' $ \cs -> do + csd <- CS.readClientStatsData readTVarIO cs + let CS.ClientStatsData {_peerAddresses, _socketCount, _createdAt, _updatedAt, _qCreated, _qSentSigned, _msgSentSigned, _msgSentUnsigned, _msgDeliveredSigned, _proxyRelaysRequested, _proxyRelaysConnected, _msgSentViaProxy} = csd + B.hPutStrLn h $ B.intercalate "," [bshow $ IS.size _peerAddresses, bshow _socketCount, strEncode _createdAt, strEncode _updatedAt, bshow $ S.size _qCreated, bshow $ S.size _qSentSigned, bshow _msgSentSigned, bshow _msgSentUnsigned, bshow _msgDeliveredSigned, bshow _proxyRelaysRequested, bshow _proxyRelaysConnected, bshow _msgSentViaProxy] CPStatsRTS -> getRTSStats >>= hPrint h CPThreads -> withAdminRole $ do #if MIN_VERSION_base(4,18,0) @@ -466,13 +547,17 @@ smpServer started cfg@ServerConfig {transports, transportConfig = tCfg} = do hPutStrLn h "AUTH" runClientTransport :: Transport c => THandleSMP c 'TServer -> M () -runClientTransport h@THandle {params = thParams@THandleParams {thVersion, sessionId}} = do +runClientTransport h@THandle {connection, params = thParams@THandleParams {thVersion, sessionId}} = do q <- asks $ tbqSize . config ts <- liftIO getSystemTime active <- asks clients nextClientId <- asks clientSeq + let peerId = getPeerId connection + skipStats = False -- TODO: check peerId + statsIds' <- asks statsClients c <- atomically $ do - new@Client {clientId} <- newClient nextClientId q thVersion sessionId ts + new@Client {clientId} <- newClient peerId nextClientId q thVersion sessionId ts + unless skipStats $ modifyTVar' statsIds' $ IM.insert clientId clientId -- until merged, its own fresh id is its stats id modifyTVar' active $ IM.insert clientId new pure new s <- asks server @@ -497,6 +582,7 @@ clientDisconnected c@Client {clientId, subscriptions, connected, sessionId, endT atomically $ modifyTVar' srvSubs $ \cs -> M.foldrWithKey (\sub _ -> M.update deleteCurrentClient sub) cs subs asks clients >>= atomically . (`modifyTVar'` IM.delete clientId) + asks statsClients >>= atomically . (`modifyTVar'` IM.delete clientId) tIds <- atomically $ swapTVar endThreads IM.empty liftIO $ mapM_ (mapM_ killThread <=< deRefWeak) tIds where @@ -676,7 +762,7 @@ forkClient Client {endThreads, endThreadSeq} label action = do mkWeakThreadId t >>= atomically . modifyTVar' endThreads . IM.insert tId client :: THandleParams SMPVersion 'TServer -> Client -> Server -> M () -client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId, procThreads} Server {subscribedQ, ntfSubscribedQ, notifiers} = do +client thParams' clnt@Client {clientId, peerId, subscriptions, ntfSubscriptions, rcvQ, sndQ, sessionId, procThreads} Server {subscribedQ, ntfSubscribedQ, notifiers} = do labelMyThread . B.unpack $ "client $" <> encode sessionId <> " commands" forever $ atomically (readTBQueue rcvQ) @@ -708,7 +794,9 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi Right (own, smp) -> do inc own pRequests case proxyResp smp of - r@PKEY {} -> r <$ inc own pSuccesses + r@PKEY {} -> do + withClientStatsId $ \cs -> atomically $ modifyTVar' (CS.proxyRelaysConnected cs) (+ 1) + r <$ inc own pSuccesses r -> r <$ inc own pErrorsCompat Left e -> do let own = isOwnServer a srv @@ -736,7 +824,9 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi if v >= sendingProxySMPVersion then forkProxiedCmd $ do liftIO (runExceptT (forwardSMPMessage smp corrId fwdV pubKey encBlock) `catch` (pure . Left . PCEIOError)) >>= \case - Right r -> PRES r <$ inc own pSuccesses + Right r -> do + withClientStatsId $ \cs -> atomically $ modifyTVar' (CS.msgSentViaProxy cs) (+ 1) + PRES r <$ inc own pSuccesses Left e -> ERR (smpProxyError e) <$ case e of PCEProtocolError {} -> inc own pSuccesses _ -> inc own pErrorsOther @@ -797,6 +887,10 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi where createQueue :: QueueStore -> RcvPublicAuthKey -> RcvPublicDhKey -> SubscriptionMode -> M (Transmission BrokerMsg) createQueue st recipientKey dhKey subMode = time "NEW" $ do + -- TODO: read client Q rate + -- TODO: read server Q rate for peerId + -- TODO: read global server Q rate + -- TODO: add throttling delay/blackhole request if needed (rcvPublicDhKey, privDhKey) <- atomically . C.generateKeyPair =<< asks random let rcvDhSecret = C.dh' dhKey privDhKey qik (rcvId, sndId) = QIK {rcvId, sndId, rcvPublicDhKey} @@ -827,6 +921,9 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi stats <- asks serverStats atomically $ modifyTVar' (qCreated stats) (+ 1) atomically $ modifyTVar' (qCount stats) (+ 1) + withClientStatsId $ \cs -> atomically $ modifyTVar' (CS.qCreated cs) $ S.insert rId + -- TODO: increment current Q counter in IP timeline + -- TODO: increment current Q counter in server timeline case subMode of SMOnlyCreate -> pure () SMSubscribe -> void $ subscribeQueue qr rId @@ -983,6 +1080,13 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi when (notification msgFlags) $ do atomically $ modifyTVar' (msgRecvNtf stats) (+ 1) atomically $ updatePeriodStats (activeQueuesNtf stats) queueId + senders' <- asks sendSignedClients + stats' <- asks clientStats + atomically $ do + sender_ <- mapM readTVar =<< TM.lookup (recipientId qr) senders' + forM_ sender_ $ \statsId -> do + cs_ <- IM.lookup statsId <$> readTVar stats' + forM_ cs_ $ \cs -> modifyTVar' (CS.msgDeliveredSigned cs) (+ 1) sendMessage :: QueueRec -> MsgFlags -> MsgBody -> M (Transmission BrokerMsg) sendMessage qr msgFlags msgBody @@ -1000,6 +1104,10 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi case C.maxLenBS msgBody of Left _ -> pure $ err LARGE_MSG Right body -> do + -- TODO: read client S rate + -- TODO: read server S rate for peerId + -- TODO: read global server S rate + -- TODO: add throttling delay/blackhole request if needed msg_ <- time "SEND" $ do q <- getStoreMsgQueue "SEND" $ recipientId qr expireMessages q @@ -1016,6 +1124,14 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi atomically $ modifyTVar' (msgSent stats) (+ 1) atomically $ modifyTVar' (msgCount stats) (+ 1) atomically $ updatePeriodStats (activeQueues stats) (recipientId qr) + case senderKey qr of + Nothing -> withClientStatsId $ \cs -> atomically $ modifyTVar' (CS.msgSentUnsigned cs) (+ 1) + Just _secured -> do + withMergedClientStatsId qr $ \cs -> do + atomically $ modifyTVar' (CS.qSentSigned cs) $ S.insert (recipientId qr) + atomically $ modifyTVar' (CS.msgSentSigned cs) (+ 1) + -- TODO: increment current S counter in IP timeline + -- TODO: increment current S counter in server timeline pure ok where THandleParams {thVersion} = thParams' @@ -1184,7 +1300,7 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi ProhibitSub -> QProhibitSub qDelivered <- decodeLatin1 . encode <$$> tryReadTMVar delivered pure QSub {qSubThread, qDelivered} - + ok :: Transmission BrokerMsg ok = (corrId, queueId, OK) @@ -1194,6 +1310,62 @@ client thParams' clnt@Client {subscriptions, ntfSubscriptions, rcvQ, sndQ, sessi okResp :: Either ErrorType () -> Transmission BrokerMsg okResp = either err $ const ok + -- missing clientId entry means the client is exempt from stats + withClientStatsId_ statsIds' getCS = IM.lookup clientId <$> readTVar statsIds' >>= mapM getCS + + withClientStatsId updateCS = do + statsIds' <- asks statsClients + stats' <- asks clientStats + now <- liftIO getCurrentTime + atomically (withClientStatsId_ statsIds' $ getClientStats stats' now) >>= mapM_ updateCS + + getClientStats stats' now statsId = do + stats <- readTVar stats' + case IM.lookup statsId stats of + Nothing -> do + new <- CS.newClientStats newTVar peerId now + writeTVar stats' $ IM.insert statsId new stats + pure new + Just cs -> cs <$ writeTVar (CS.updatedAt cs) now + + withMergedClientStatsId qr updateCS = do + senders' <- asks sendSignedClients + statsIds' <- asks statsClients + stats' <- asks clientStats + now <- liftIO getCurrentTime + atomically (withClientStatsId_ statsIds' $ getMergeClientStats senders' statsIds' stats' now qr) >>= mapM_ updateCS + + getMergeClientStats senders' statsIds' stats' now qr currentStatsId = do + senders <- readTVar senders' + statsId <- case M.lookup (recipientId qr) senders of + Nothing -> do + newOwner <- newTVar currentStatsId + writeTVar senders' $ M.insert (recipientId qr) newOwner senders + pure currentStatsId + Just sender -> do + prevStatsId <- readTVar sender + unless (prevStatsId == currentStatsId) $ do + modifyTVar' statsIds' $ IM.insert clientId prevStatsId + qsToTransfer <- mergeClientStats stats' prevStatsId currentStatsId + unless (S.null qsToTransfer) $ writeTVar senders' $ S.foldl' (\os k -> M.insert k sender os) senders qsToTransfer + pure prevStatsId + getClientStats stats' now statsId + + mergeClientStats :: TVar (IntMap CS.ClientStats) -> CS.ClientStatsId -> CS.ClientStatsId -> STM (Set RecipientId) + mergeClientStats stats' prevId curId = do + stats <- readTVar stats' + case (IM.lookup prevId stats, IM.lookup curId stats) of + (_, Nothing) -> pure mempty + (Nothing, Just cur@CS.ClientStats {qCreated}) -> do + writeTVar stats' $ IM.insert prevId cur (IM.delete curId stats) + readTVar qCreated + (Just prev, Just cur) -> do + curData@CS.ClientStatsData {_qCreated} <- CS.readClientStatsData readTVar cur + prevData <- CS.readClientStatsData readTVar prev + CS.writeClientStatsData prev $ CS.mergeClientStatsData prevData curData + writeTVar stats' $ IM.delete curId stats + pure _qCreated + updateDeletedStats :: QueueRec -> M () updateDeletedStats q = do stats <- asks serverStats diff --git a/src/Simplex/Messaging/Server/Control.hs b/src/Simplex/Messaging/Server/Control.hs index 9463fa777..bf703832e 100644 --- a/src/Simplex/Messaging/Server/Control.hs +++ b/src/Simplex/Messaging/Server/Control.hs @@ -16,6 +16,7 @@ data ControlProtocol | CPResume | CPClients | CPStats + | CPStatsClients | CPStatsRTS | CPThreads | CPSockets @@ -33,6 +34,7 @@ instance StrEncoding ControlProtocol where CPResume -> "resume" CPClients -> "clients" CPStats -> "stats" + CPStatsClients -> "stats-clients" CPStatsRTS -> "stats-rts" CPThreads -> "threads" CPSockets -> "sockets" @@ -49,6 +51,7 @@ instance StrEncoding ControlProtocol where "resume" -> pure CPResume "clients" -> pure CPClients "stats" -> pure CPStats + "stats-clients" -> pure CPStatsClients "stats-rts" -> pure CPStatsRTS "threads" -> pure CPThreads "sockets" -> pure CPSockets diff --git a/src/Simplex/Messaging/Server/Env/STM.hs b/src/Simplex/Messaging/Server/Env/STM.hs index 4217ea9b9..6c7179161 100644 --- a/src/Simplex/Messaging/Server/Env/STM.hs +++ b/src/Simplex/Messaging/Server/Env/STM.hs @@ -18,6 +18,7 @@ import Data.Map.Strict (Map) import qualified Data.Map.Strict as M import Data.Maybe (isJust, isNothing) import Data.Time.Clock (getCurrentTime) +import Data.Time.Clock.POSIX (getPOSIXTime) import Data.Time.Clock.System (SystemTime) import Data.X509.Validation (Fingerprint (..)) import Network.Socket (ServiceName) @@ -34,10 +35,12 @@ import Simplex.Messaging.Server.MsgStore.STM import Simplex.Messaging.Server.QueueStore (NtfCreds (..), QueueRec (..)) import Simplex.Messaging.Server.QueueStore.STM import Simplex.Messaging.Server.Stats +import Simplex.Messaging.Server.Stats.Client (ClientStats, ClientStatsC, ClientStatsId) +import Simplex.Messaging.Server.Stats.Timeline (Timeline, newTimeline, perMinute) import Simplex.Messaging.Server.StoreLog import Simplex.Messaging.TMap (TMap) import qualified Simplex.Messaging.TMap as TM -import Simplex.Messaging.Transport (ATransport, VersionRangeSMP, VersionSMP) +import Simplex.Messaging.Transport (ATransport, PeerId, VersionRangeSMP, VersionSMP) import Simplex.Messaging.Transport.Server (SocketState, TransportServerConfig, alpn, loadFingerprint, loadTLSServerParams, newSocketState) import System.IO (IOMode (..)) import System.Mem.Weak (Weak) @@ -74,6 +77,12 @@ data ServerConfig = ServerConfig serverStatsLogFile :: FilePath, -- | file to save and restore stats serverStatsBackupFile :: Maybe FilePath, + -- | rate limit monitoring interval / bucket width, seconds + rateStatsInterval :: Maybe Int64, + -- | number of rate limit samples to keep + rateStatsLength :: Int, + rateStatsLogFile :: FilePath, + rateStatsBackupFile :: Maybe FilePath, -- | CA certificate private key is not needed for initialization caCertificateFile :: FilePath, privateKeyFile :: FilePath, @@ -123,6 +132,12 @@ data Env = Env storeLog :: Maybe (StoreLog 'WriteMode), tlsServerParams :: T.ServerParams, serverStats :: ServerStats, + qCreatedByIp :: Timeline Int, + msgSentByIp :: Timeline Int, + clientStats :: TVar (IntMap ClientStats), -- transitive session stats + statsClients :: TVar (IntMap ClientStatsId), -- reverse index from sockets + sendSignedClients :: TMap RecipientId (TVar ClientStatsId), -- reverse index from queues to their senders + serverRates :: TVar [ClientStatsC (Distribution Int)], -- current (head) + historical distributions extracted from clientStats for logging and assessing ClientStatsData deviations sockets :: SocketState, clientSeq :: TVar ClientId, clients :: TVar (IntMap Client), @@ -145,6 +160,7 @@ type ClientId = Int data Client = Client { clientId :: ClientId, + peerId :: PeerId, -- send updates for this Id to time series subscriptions :: TMap RecipientId (TVar Sub), ntfSubscriptions :: TMap NotifierId (), rcvQ :: TBQueue (NonEmpty (Maybe QueueRec, Transmission Cmd)), @@ -177,8 +193,8 @@ newServer = do savingLock <- createLock return Server {subscribedQ, subscribers, ntfSubscribedQ, notifiers, savingLock} -newClient :: TVar ClientId -> Natural -> VersionSMP -> ByteString -> SystemTime -> STM Client -newClient nextClientId qSize thVersion sessionId createdAt = do +newClient :: PeerId -> TVar ClientId -> Natural -> VersionSMP -> ByteString -> SystemTime -> STM Client +newClient peerId nextClientId qSize thVersion sessionId createdAt = do clientId <- stateTVar nextClientId $ \next -> (next, next + 1) subscriptions <- TM.empty ntfSubscriptions <- TM.empty @@ -191,7 +207,7 @@ newClient nextClientId qSize thVersion sessionId createdAt = do connected <- newTVar True rcvActiveAt <- newTVar createdAt sndActiveAt <- newTVar createdAt - return Client {clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, msgQ, procThreads, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt} + return Client {peerId, clientId, subscriptions, ntfSubscriptions, rcvQ, sndQ, msgQ, procThreads, endThreads, endThreadSeq, thVersion, sessionId, connected, createdAt, rcvActiveAt, sndActiveAt} newSubscription :: SubscriptionThread -> STM Sub newSubscription subThread = do @@ -213,7 +229,14 @@ newEnv config@ServerConfig {caCertificateFile, certificateFile, privateKeyFile, clientSeq <- newTVarIO 0 clients <- newTVarIO mempty proxyAgent <- atomically $ newSMPProxyAgent smpAgentCfg random - pure Env {config, serverInfo, server, serverIdentity, queueStore, msgStore, random, storeLog, tlsServerParams, serverStats, sockets, clientSeq, clients, proxyAgent} + now <- getPOSIXTime + qCreatedByIp <- atomically $ newTimeline perMinute now + msgSentByIp <- atomically $ newTimeline perMinute now + clientStats <- newTVarIO mempty + statsClients <- newTVarIO mempty + sendSignedClients <- newTVarIO mempty + serverRates <- newTVarIO mempty + return Env {config, serverInfo, server, serverIdentity, queueStore, msgStore, random, storeLog, tlsServerParams, serverStats, sockets, clientSeq, clients, proxyAgent, qCreatedByIp, msgSentByIp, clientStats, statsClients, sendSignedClients, serverRates} where restoreQueues :: QueueStore -> FilePath -> IO (StoreLog 'WriteMode) restoreQueues QueueStore {queues, senders, notifiers} f = do diff --git a/src/Simplex/Messaging/Server/Main.hs b/src/Simplex/Messaging/Server/Main.hs index 7af57ba25..853bbd4e7 100644 --- a/src/Simplex/Messaging/Server/Main.hs +++ b/src/Simplex/Messaging/Server/Main.hs @@ -290,6 +290,10 @@ smpServerCLI_ generateSite serveStaticFiles cfgPath logPath = logStatsStartTime = 0, -- seconds from 00:00 UTC serverStatsLogFile = combine logPath "smp-server-stats.daily.log", serverStatsBackupFile = logStats $> combine logPath "smp-server-stats.log", + rateStatsInterval = Just 60, -- TODO: add to options + rateStatsLength = 0, -- Just (24 * 60), -- TODO: add to options + rateStatsLogFile = combine logPath "smp-server-rates.daily.log", + rateStatsBackupFile = Just $ combine logPath "smp-server-rates.log", smpServerVRange = supportedServerSMPRelayVRange, transportConfig = defaultTransportServerConfig diff --git a/src/Simplex/Messaging/Server/Stats.hs b/src/Simplex/Messaging/Server/Stats.hs index 880791c3d..08c2be841 100644 --- a/src/Simplex/Messaging/Server/Stats.hs +++ b/src/Simplex/Messaging/Server/Stats.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE DeriveFunctor #-} +{-# LANGUAGE DeriveTraversable #-} {-# LANGUAGE LambdaCase #-} {-# LANGUAGE NamedFieldPuns #-} {-# LANGUAGE OverloadedStrings #-} @@ -10,8 +12,14 @@ module Simplex.Messaging.Server.Stats where import Control.Applicative (optional, (<|>)) import qualified Data.Attoparsec.ByteString.Char8 as A import qualified Data.ByteString.Char8 as B +import Data.Foldable (toList) +import Data.IntMap (IntMap) +import qualified Data.IntMap.Strict as IM +import Data.List (find) +import Data.Maybe (listToMaybe) import Data.Set (Set) import qualified Data.Set as S +import Data.String (IsString) import Data.Time.Calendar.Month (pattern MonthDay) import Data.Time.Calendar.OrdinalDate (mondayStartWeek) import Data.Time.Clock (UTCTime (..)) @@ -121,7 +129,7 @@ getServerStatsData s = do _qDeletedSecured <- readTVar $ qDeletedSecured s _qSub <- readTVar $ qSub s _qSubAuth <- readTVar $ qSubAuth s - _qSubDuplicate <- readTVar $ qSubDuplicate s + _qSubDuplicate <- readTVar $ qSubDuplicate s _qSubProhibited <- readTVar $ qSubProhibited s _msgSent <- readTVar $ msgSent s _msgSentAuth <- readTVar $ msgSentAuth s @@ -151,7 +159,7 @@ setServerStats s d = do writeTVar (qDeletedNew s) $! _qDeletedNew d writeTVar (qDeletedSecured s) $! _qDeletedSecured d writeTVar (qSub s) $! _qSub d - writeTVar (qSubAuth s) $! _qSubAuth d + writeTVar (qSubAuth s) $! _qSubAuth d writeTVar (qSubDuplicate s) $! _qSubDuplicate d writeTVar (qSubProhibited s) $! _qSubProhibited d writeTVar (msgSent s) $! _msgSent d @@ -400,3 +408,61 @@ instance StrEncoding ProxyStatsData where _pErrorsCompat <- "errorsCompat=" *> strP <* A.endOfLine _pErrorsOther <- "errorsOther=" *> strP pure ProxyStatsData {_pRequests, _pSuccesses, _pErrorsConnect, _pErrorsCompat, _pErrorsOther} + +-- counter -> occurences +newtype Histogram = Histogram (IntMap Int) + deriving (Show) + +histogram :: Foldable t => t Int -> Histogram +histogram = Histogram . IM.fromListWith (+) . map (,1) . toList +{-# INLINE histogram #-} + +distribution :: Histogram -> Distribution Int +distribution h = + Distribution + { minimal = maybe 0 fst $ listToMaybe cdf', + bottom50p = bot 0.5, -- std median + top50p = top 0.5, + top20p = top 0.2, + top10p = top 0.1, + top5p = top 0.05, + top1p = top 0.01, + maximal = maybe 0 fst $ listToMaybe rcdf' + } + where + bot p = maybe 0 fst $ find (\(_, p') -> p' >= p) cdf' + top p = maybe 0 fst $ find (\(_, p') -> p' <= 1 - p) rcdf' + cdf' = cdf h + rcdf' = reverse cdf' -- allow find to work from the smaller end + +cdf :: Histogram -> [(Int, Float)] +cdf (Histogram h) = map (\(v, cc) -> (v, fromIntegral cc / total)) . scanl1 cumulative $ IM.assocs h + where + total :: Float + total = fromIntegral $ sum h + cumulative (_, acc) (v, c) = (v, acc + c) + +data Distribution a = Distribution + { minimal :: a, + bottom50p :: a, + top50p :: a, + top20p :: a, + top10p :: a, + top5p :: a, + top1p :: a, + maximal :: a + } + deriving (Show, Functor, Foldable, Traversable) + +distributionLabels :: IsString a => Distribution a +distributionLabels = + Distribution + { minimal = "minimal", + bottom50p = "bottom50p", + top50p = "top50p", + top20p = "top20p", + top10p = "top10p", + top5p = "top5p", + top1p = "top1p", + maximal = "maximal" + } diff --git a/src/Simplex/Messaging/Server/Stats/Client.hs b/src/Simplex/Messaging/Server/Stats/Client.hs new file mode 100644 index 000000000..a45984011 --- /dev/null +++ b/src/Simplex/Messaging/Server/Stats/Client.hs @@ -0,0 +1,195 @@ +{-# LANGUAGE DeriveFunctor #-} +{-# LANGUAGE DeriveTraversable #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE PatternSynonyms #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TupleSections #-} + +module Simplex.Messaging.Server.Stats.Client where + +import Data.IntSet (IntSet) +import qualified Data.IntSet as IS +import Data.Set (Set) +import Data.String (IsString) +import Data.Time.Clock (UTCTime (..)) +import Simplex.Messaging.Protocol (RecipientId) +import Simplex.Messaging.Transport (PeerId) +import UnliftIO.STM + +-- | Ephemeral client ID across reconnects +type ClientStatsId = Int + +data ClientStats = ClientStats + { peerAddresses :: TVar IntSet, -- cumulative set of used PeerIds + socketCount :: TVar Int, + createdAt :: TVar UTCTime, + updatedAt :: TVar UTCTime, + qCreated :: TVar (Set RecipientId), -- can be IntSet with QueueRecIDs, for dumping into suspicous + qSentSigned :: TVar (Set RecipientId), -- can be IntSet with QueueRecIDs + msgSentSigned :: TVar Int, + msgSentUnsigned :: TVar Int, + msgDeliveredSigned :: TVar Int, + proxyRelaysRequested :: TVar Int, + proxyRelaysConnected :: TVar Int, + msgSentViaProxy :: TVar Int + } + +-- may be combined with session duration to produce average rates (q/s, msg/s) +data ClientStatsData = ClientStatsData + { _peerAddresses :: IntSet, + _socketCount :: Int, + _createdAt :: UTCTime, + _updatedAt :: UTCTime, + _qCreated :: Set RecipientId, + _qSentSigned :: Set RecipientId, + _msgSentSigned :: Int, + _msgSentUnsigned :: Int, + _msgDeliveredSigned :: Int, + _proxyRelaysRequested :: Int, + _proxyRelaysConnected :: Int, + _msgSentViaProxy :: Int + } + +newClientStats :: Monad m => (forall a. a -> m (TVar a)) -> PeerId -> UTCTime -> m ClientStats +newClientStats newF peerId ts = do + peerAddresses <- newF $ IS.singleton peerId + socketCount <- newF 1 + createdAt <- newF ts + updatedAt <- newF ts + qCreated <- newF mempty + qSentSigned <- newF mempty + msgSentSigned <- newF 0 + msgSentUnsigned <- newF 0 + msgDeliveredSigned <- newF 0 + proxyRelaysRequested <- newF 0 + proxyRelaysConnected <- newF 0 + msgSentViaProxy <- newF 0 + pure + ClientStats + { peerAddresses, + socketCount, + createdAt, + updatedAt, + qCreated, + qSentSigned, + msgSentSigned, + msgSentUnsigned, + msgDeliveredSigned, + proxyRelaysRequested, + proxyRelaysConnected, + msgSentViaProxy + } +{-# INLINE newClientStats #-} + +readClientStatsData :: Monad m => (forall a. TVar a -> m a) -> ClientStats -> m ClientStatsData +readClientStatsData readF cs = do + _peerAddresses <- readF $ peerAddresses cs + _socketCount <- readF $ socketCount cs + _createdAt <- readF $ createdAt cs + _updatedAt <- readF $ updatedAt cs + _qCreated <- readF $ qCreated cs + _qSentSigned <- readF $ qSentSigned cs + _msgSentSigned <- readF $ msgSentSigned cs + _msgSentUnsigned <- readF $ msgSentUnsigned cs + _msgDeliveredSigned <- readF $ msgDeliveredSigned cs + _proxyRelaysRequested <- readF $ proxyRelaysRequested cs + _proxyRelaysConnected <- readF $ proxyRelaysConnected cs + _msgSentViaProxy <- readF $ msgSentViaProxy cs + pure + ClientStatsData + { _peerAddresses, + _socketCount, + _createdAt, + _updatedAt, + _qCreated, + _qSentSigned, + _msgSentSigned, + _msgSentUnsigned, + _msgDeliveredSigned, + _proxyRelaysRequested, + _proxyRelaysConnected, + _msgSentViaProxy + } +{-# INLINE readClientStatsData #-} + +writeClientStatsData :: ClientStats -> ClientStatsData -> STM () +writeClientStatsData cs csd = do + writeTVar (peerAddresses cs) (_peerAddresses csd) + writeTVar (socketCount cs) (_socketCount csd) + writeTVar (createdAt cs) (_createdAt csd) + writeTVar (updatedAt cs) (_updatedAt csd) + writeTVar (qCreated cs) (_qCreated csd) + writeTVar (qSentSigned cs) (_qSentSigned csd) + writeTVar (msgSentSigned cs) (_msgSentSigned csd) + writeTVar (msgSentUnsigned cs) (_msgSentUnsigned csd) + writeTVar (msgDeliveredSigned cs) (_msgDeliveredSigned csd) + writeTVar (proxyRelaysRequested cs) (_proxyRelaysRequested csd) + writeTVar (proxyRelaysConnected cs) (_proxyRelaysConnected csd) + writeTVar (msgSentViaProxy cs) (_msgSentViaProxy csd) + +mergeClientStatsData :: ClientStatsData -> ClientStatsData -> ClientStatsData +mergeClientStatsData a b = + ClientStatsData + { _peerAddresses = _peerAddresses a <> _peerAddresses b, + _socketCount = _socketCount a + _socketCount b, + _createdAt = min (_createdAt a) (_createdAt b), + _updatedAt = max (_updatedAt a) (_updatedAt b), + _qCreated = _qCreated a <> _qCreated b, + _qSentSigned = _qSentSigned a <> _qSentSigned b, + _msgSentSigned = _msgSentSigned a + _msgSentSigned b, + _msgSentUnsigned = _msgSentUnsigned a + _msgSentUnsigned b, + _msgDeliveredSigned = _msgDeliveredSigned a + _msgDeliveredSigned b, + _proxyRelaysRequested = _proxyRelaysRequested a + _proxyRelaysRequested b, + _proxyRelaysConnected = _proxyRelaysConnected a + _proxyRelaysConnected b, + _msgSentViaProxy = _msgSentViaProxy a + _msgSentViaProxy b + } + +-- | A column-based collection of ClientStats-related data. +data ClientStatsC a = ClientStatsC + { peerAddressesC :: a, + socketCountC :: a, + qCreatedC :: a, + qSentSignedC :: a, + msgSentSignedC :: a, + msgSentUnsignedC :: a, + msgDeliveredSignedC :: a, + proxyRelaysRequestedC :: a, + proxyRelaysConnectedC :: a, + msgSentViaProxyC :: a + } + deriving (Show, Functor, Foldable, Traversable) + +clientStatsC :: a -> ClientStatsC a +clientStatsC x = + ClientStatsC + { peerAddressesC = x, + socketCountC = x, + qCreatedC = x, + qSentSignedC = x, + msgSentSignedC = x, + msgSentUnsignedC = x, + msgDeliveredSignedC = x, + proxyRelaysRequestedC = x, + proxyRelaysConnectedC = x, + msgSentViaProxyC = x + } +{-# INLINE clientStatsC #-} + +clientStatsLabels :: IsString a => ClientStatsC a +clientStatsLabels = + ClientStatsC + { peerAddressesC = "peerAddresses", + socketCountC = "socketCount", + qCreatedC = "qCreated", + qSentSignedC = "qSentSigned", + msgSentSignedC = "msgSentSigned", + msgSentUnsignedC = "msgSentUnsigned", + msgDeliveredSignedC = "msgDeliveredSigned", + proxyRelaysRequestedC = "proxyRelaysRequested", + proxyRelaysConnectedC = "proxyRelaysConnected", + msgSentViaProxyC = "msgSentViaProxy" + } +{-# INLINE clientStatsLabels #-} diff --git a/src/Simplex/Messaging/Server/Stats/Timeline.hs b/src/Simplex/Messaging/Server/Stats/Timeline.hs new file mode 100644 index 000000000..504f82f08 --- /dev/null +++ b/src/Simplex/Messaging/Server/Stats/Timeline.hs @@ -0,0 +1,60 @@ +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE PatternSynonyms #-} +{-# LANGUAGE ScopedTypeVariables #-} +{-# LANGUAGE TupleSections #-} + +module Simplex.Messaging.Server.Stats.Timeline where + +import Data.IntMap (IntMap) +import qualified Data.IntMap.Strict as IM +import Data.IntPSQ (IntPSQ) +import qualified Data.IntPSQ as IP +import Data.List (find, sortOn) +import Data.Maybe (listToMaybe) +import Data.Time.Clock (NominalDiffTime) +import Data.Time.Clock.POSIX (POSIXTime) +import Data.Word (Word32) +import UnliftIO.STM + +-- A time series of counters with an active head +type Timeline a = (TVar SparseSeries, Current a) + +newTimeline :: forall a. QuantFun -> POSIXTime -> STM (Timeline a) +newTimeline quantF now = (,current) <$> newTVar IP.empty + where + current :: Current a + current = (quantF, quantF now, mempty) + +-- Sparse timeseries with 1 second resolution (or more coarse): +-- priotity - time/bucket +-- key -- PeerId +-- value -- final counter value of the bucket that was current +-- May be combined with bucket width to produce rolling rates. +type SparseSeries = IntPSQ BucketId Int + +-- POSIXTime, or quantized +type BucketId = Word32 + +type QuantFun = POSIXTime -> BucketId + +-- Current bucket that gets filled +type Current a = (QuantFun, BucketId, IntMap (TVar a)) + +perSecond :: POSIXTime -> BucketId +perSecond = truncate + +perMinute :: POSIXTime -> BucketId +perMinute = (60 `secondsWidth`) + +secondsWidth :: NominalDiffTime -> POSIXTime -> BucketId +secondsWidth w t = truncate $ t / w + +finishCurrent :: POSIXTime -> Timeline a -> STM (Timeline a) +finishCurrent now (series, current) = error "TODO: read/reset current, push into series, evict minimal when it falls out of scope" + +type WindowData = IntMap Int -- PeerId -> counter + +window :: BucketId -> BucketId -> SparseSeries -> WindowData +window = error "TODO: pick elements inside the range and drop bucket ids" diff --git a/src/Simplex/Messaging/Transport.hs b/src/Simplex/Messaging/Transport.hs index 6eddcabf8..d7d6f1993 100644 --- a/src/Simplex/Messaging/Transport.hs +++ b/src/Simplex/Messaging/Transport.hs @@ -56,6 +56,9 @@ module Simplex.Messaging.Transport ATransport (..), TransportPeer (..), getServerVerifyKey, + PeerId, + clientPeerId, + addrPeerId, -- * TLS Transport TLS (..), @@ -96,12 +99,14 @@ import qualified Data.ByteString.Char8 as B import qualified Data.ByteString.Lazy.Char8 as LB import Data.Default (def) import Data.Functor (($>)) +import Data.Hashable (hash) import Data.Version (showVersion) import Data.Word (Word16) import qualified Data.X509 as X import qualified Data.X509.Validation as XV import GHC.IO.Handle.Internals (ioe_EOF) import Network.Socket +import qualified Network.Socket.Address as SA import qualified Network.TLS as T import qualified Network.TLS.Extra as TE import qualified Paths_simplexmq as SMQ @@ -212,13 +217,15 @@ class Transport c where transportConfig :: c -> TransportConfig -- | Upgrade server TLS context to connection (used in the server) - getServerConnection :: TransportConfig -> X.CertificateChain -> T.Context -> IO c + getServerConnection :: PeerId -> TransportConfig -> X.CertificateChain -> T.Context -> IO c -- | Upgrade client TLS context to connection (used in the client) getClientConnection :: TransportConfig -> X.CertificateChain -> T.Context -> IO c getServerCerts :: c -> X.CertificateChain + getPeerId :: c -> PeerId + -- | tls-unique channel binding per RFC5929 tlsUnique :: c -> SessionId @@ -259,6 +266,7 @@ getServerVerifyKey c = data TLS = TLS { tlsContext :: T.Context, tlsPeer :: TransportPeer, + tlsPeerId :: PeerId, tlsUniq :: ByteString, tlsBuffer :: TBuffer, tlsALPN :: Maybe ALPN, @@ -277,13 +285,13 @@ connectTLS host_ TransportConfig {logTLSErrors} params sock = logThrow e = putStrLn ("TLS error" <> host <> ": " <> show e) >> E.throwIO e host = maybe "" (\h -> " (" <> h <> ")") host_ -getTLS :: TransportPeer -> TransportConfig -> X.CertificateChain -> T.Context -> IO TLS -getTLS tlsPeer cfg tlsServerCerts cxt = withTlsUnique tlsPeer cxt newTLS +getTLS :: TransportPeer -> PeerId -> TransportConfig -> X.CertificateChain -> T.Context -> IO TLS +getTLS tlsPeer tlsPeerId cfg tlsServerCerts cxt = withTlsUnique tlsPeer cxt newTLS where newTLS tlsUniq = do tlsBuffer <- atomically newTBuffer tlsALPN <- T.getNegotiatedProtocol cxt - pure TLS {tlsContext = cxt, tlsALPN, tlsTransportConfig = cfg, tlsServerCerts, tlsPeer, tlsUniq, tlsBuffer} + pure TLS {tlsContext = cxt, tlsPeerId, tlsALPN, tlsTransportConfig = cfg, tlsServerCerts, tlsPeer, tlsUniq, tlsBuffer} withTlsUnique :: TransportPeer -> T.Context -> (ByteString -> IO c) -> IO c withTlsUnique peer cxt f = @@ -317,7 +325,8 @@ instance Transport TLS where transportPeer = tlsPeer transportConfig = tlsTransportConfig getServerConnection = getTLS TServer - getClientConnection = getTLS TClient + getClientConnection = getTLS TClient 0 + getPeerId = tlsPeerId getServerCerts = tlsServerCerts getSessionALPN = tlsALPN tlsUnique = tlsUniq @@ -582,6 +591,23 @@ smpTHandle c = THandle {connection = c, params} batch = True } +-- | Stats key, hashed from IPs, circuits etc. We don't want to keep actual identities, just attach counters to them. +type PeerId = Int -- XXX: perhaps more fields needed if we want subnet escalation + +clientPeerId :: Socket -> IO PeerId +clientPeerId = fmap addrPeerId . SA.getPeerName + +addrPeerId :: SockAddr -> PeerId +addrPeerId peer = hash peer6 -- XXX: for extra paranoia can be salted with a seed randomized on server start + where + -- ingore ports and fluff, normalize to ipv6 address space + -- most of the ipv6 is unused as clients get /64 subnets for a few IPs, so 128bit to 64bit hashing is ok for using as intmap keys + peer6 = case peer of + SockAddrInet _port a -> embed4in6 a + SockAddrInet6 _port _flow a _scope -> a + SockAddrUnix _path -> error "use for TOR circuits?" + embed4in6 v4 = (0, 0, 0xFFFF, v4) -- RFC4038: the IPv6 address ::FFFF:x.y.z.w represents the IPv4 address x.y.z.w. + $(J.deriveJSON (sumTypeJSON id) ''HandshakeError) $(J.deriveJSON (sumTypeJSON $ dropPrefix "TE") ''TransportError) diff --git a/src/Simplex/Messaging/Transport/Server.hs b/src/Simplex/Messaging/Transport/Server.hs index ffde39991..bdb8f77ab 100644 --- a/src/Simplex/Messaging/Transport/Server.hs +++ b/src/Simplex/Messaging/Transport/Server.hs @@ -98,8 +98,9 @@ runTransportServerSocketState ss started getSocket threadLabel serverParams cfg tCfg = serverTransportConfig cfg setup conn = timeout (tlsSetupTimeout cfg) $ do labelMyThread $ threadLabel <> "/setup" + peerId <- clientPeerId conn tls <- connectTLS Nothing tCfg serverParams conn - getServerConnection tCfg (fst $ tlsServerCredentials serverParams) tls + getServerConnection peerId tCfg (fst $ tlsServerCredentials serverParams) tls tlsServerCredentials :: T.ServerParams -> (X.CertificateChain, X.PrivKey) tlsServerCredentials serverParams = case T.sharedCredentials $ T.serverShared serverParams of diff --git a/src/Simplex/Messaging/Transport/WebSockets.hs b/src/Simplex/Messaging/Transport/WebSockets.hs index 0883fcc28..817cc3796 100644 --- a/src/Simplex/Messaging/Transport/WebSockets.hs +++ b/src/Simplex/Messaging/Transport/WebSockets.hs @@ -20,6 +20,7 @@ import Simplex.Messaging.Transport TransportConfig (..), TransportError (..), TransportPeer (..), + PeerId, closeTLS, smpBlockSize, withTlsUnique, @@ -28,6 +29,7 @@ import Simplex.Messaging.Transport.Buffer (trimCR) data WS = WS { wsPeer :: TransportPeer, + wsPeerId :: PeerId, tlsUniq :: ByteString, wsALPN :: Maybe ALPN, wsStream :: Stream, @@ -54,11 +56,14 @@ instance Transport WS where transportConfig :: WS -> TransportConfig transportConfig = wsTransportConfig - getServerConnection :: TransportConfig -> X.CertificateChain -> T.Context -> IO WS + getServerConnection :: PeerId -> TransportConfig -> X.CertificateChain -> T.Context -> IO WS getServerConnection = getWS TServer getClientConnection :: TransportConfig -> X.CertificateChain -> T.Context -> IO WS - getClientConnection = getWS TClient + getClientConnection = getWS TClient 0 + + getPeerId :: WS -> PeerId + getPeerId = wsPeerId getServerCerts :: WS -> X.CertificateChain getServerCerts = wsServerCerts @@ -89,14 +94,14 @@ instance Transport WS where then E.throwIO TEBadBlock else pure $ B.init s -getWS :: TransportPeer -> TransportConfig -> X.CertificateChain -> T.Context -> IO WS -getWS wsPeer cfg wsServerCerts cxt = withTlsUnique wsPeer cxt connectWS +getWS :: TransportPeer -> PeerId -> TransportConfig -> X.CertificateChain -> T.Context -> IO WS +getWS wsPeer wsPeerId cfg wsServerCerts cxt = withTlsUnique wsPeer cxt connectWS where connectWS tlsUniq = do s <- makeTLSContextStream cxt wsConnection <- connectPeer wsPeer s wsALPN <- T.getNegotiatedProtocol cxt - pure $ WS {wsPeer, tlsUniq, wsALPN, wsStream = s, wsConnection, wsTransportConfig = cfg, wsServerCerts} + pure $ WS {wsPeer, wsPeerId, tlsUniq, wsALPN, wsStream = s, wsConnection, wsTransportConfig = cfg, wsServerCerts} connectPeer :: TransportPeer -> Stream -> IO Connection connectPeer TServer = acceptClientRequest connectPeer TClient = sendClientRequest diff --git a/tests/SMPClient.hs b/tests/SMPClient.hs index 6bc36c29a..adfb5acb9 100644 --- a/tests/SMPClient.hs +++ b/tests/SMPClient.hs @@ -115,6 +115,10 @@ cfg = logStatsStartTime = 0, serverStatsLogFile = "tests/smp-server-stats.daily.log", serverStatsBackupFile = Nothing, + rateStatsInterval = Nothing, + rateStatsLength = 0, + rateStatsLogFile = "", + rateStatsBackupFile = Nothing, caCertificateFile = "tests/fixtures/ca.crt", privateKeyFile = "tests/fixtures/server.key", certificateFile = "tests/fixtures/server.crt",