Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions simplexmq.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ library
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251009_queue_to_subscribe
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251010_client_notices
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251230_strict_tables
Simplex.Messaging.Agent.Store.Postgres.Migrations.M20260410_receive_attempts
else
exposed-modules:
Simplex.Messaging.Agent.Store.SQLite
Expand Down Expand Up @@ -223,6 +224,7 @@ library
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251009_queue_to_subscribe
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251010_client_notices
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251230_strict_tables
Simplex.Messaging.Agent.Store.SQLite.Migrations.M20260410_receive_attempts
Simplex.Messaging.Agent.Store.SQLite.Util
if flag(client_postgres) || flag(server_postgres)
exposed-modules:
Expand Down
26 changes: 18 additions & 8 deletions src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3158,18 +3158,28 @@ processSMPTransmissions c@AgentClient {subQ} (tSess@(userId, srv, _), _v, sessId
pure conn''
| otherwise = pure conn'
Right Nothing -> prohibited "msg: bad agent msg" >> ack
Left e@(AGENT A_DUPLICATE) -> do
Left e@(AGENT A_DUPLICATE {}) -> do
atomically $ incSMPServerStat c userId srv recvDuplicates
withStore' c (\db -> getLastMsg db connId srvMsgId) >>= \case
Just RcvMsg {internalId, msgMeta, msgBody = agentMsgBody, userAck}
| userAck -> ackDel internalId
| otherwise ->
liftEither (parse smpP (AGENT A_MESSAGE) agentMsgBody) >>= \case
AgentMessage _ (A_MSG body) -> do
logServer "<--" c srv rId $ "MSG <MSG>:" <> logSecret' srvMsgId
notify $ MSG msgMeta msgFlags body
pure ACKPending
_ -> ack
| otherwise -> do
attempts <- withStore' c $ \db -> incMsgRcvAttempts db connId internalId
AgentConfig {rcvExpireCount, rcvExpireInterval} <- asks config
let firstTs = snd $ recipient msgMeta
brokerTs = snd $ broker msgMeta
now <- liftIO getCurrentTime
if attempts >= rcvExpireCount && diffUTCTime now firstTs >= rcvExpireInterval
then do
notify $ ERR (AGENT $ A_DUPLICATE $ Just DroppedMsg {brokerTs, attempts})
ackDel internalId
else
liftEither (parse smpP (AGENT A_MESSAGE) agentMsgBody) >>= \case
AgentMessage _ (A_MSG body) -> do
logServer "<--" c srv rId $ "MSG <MSG>:" <> logSecret' srvMsgId
notify $ MSG msgMeta msgFlags body
pure ACKPending
_ -> ack
_ -> checkDuplicateHash e encryptedMsgHash >> ack
Left (AGENT (A_CRYPTO e)) -> do
atomically $ incSMPServerStat c userId srv recvCryptoErrs
Expand Down
2 changes: 1 addition & 1 deletion src/Simplex/Messaging/Agent/Client.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2105,7 +2105,7 @@ cryptoError :: C.CryptoError -> AgentErrorType
cryptoError = \case
C.CryptoLargeMsgError -> CMD LARGE "CryptoLargeMsgError"
C.CryptoHeaderError _ -> AGENT A_MESSAGE -- parsing error
C.CERatchetDuplicateMessage -> AGENT A_DUPLICATE
C.CERatchetDuplicateMessage -> AGENT $ A_DUPLICATE Nothing
C.AESDecryptError -> c DECRYPT_AES
C.CBDecryptError -> c DECRYPT_CB
C.CERatchetHeader -> c RATCHET_HEADER
Expand Down
4 changes: 4 additions & 0 deletions src/Simplex/Messaging/Agent/Env/SQLite.hs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,8 @@ data AgentConfig = AgentConfig
caCertificateFile :: FilePath,
privateKeyFile :: FilePath,
certificateFile :: FilePath,
rcvExpireCount :: Int,
rcvExpireInterval :: NominalDiffTime,
e2eEncryptVRange :: VersionRangeE2E,
smpAgentVRange :: VersionRangeSMPA,
smpClientVRange :: VersionRangeSMPC
Expand Down Expand Up @@ -247,6 +249,8 @@ defaultAgentConfig =
caCertificateFile = "/etc/opt/simplex-agent/ca.crt",
privateKeyFile = "/etc/opt/simplex-agent/agent.key",
certificateFile = "/etc/opt/simplex-agent/agent.crt",
rcvExpireCount = 8,
rcvExpireInterval = nominalDay,
e2eEncryptVRange = supportedE2EEncryptVRange,
smpAgentVRange = supportedSMPAgentVRange,
smpClientVRange = supportedSMPClientVRange
Expand Down
18 changes: 14 additions & 4 deletions src/Simplex/Messaging/Agent/Protocol.hs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ module Simplex.Messaging.Agent.Protocol
ConnectionErrorType (..),
BrokerErrorType (..),
SMPAgentError (..),
DroppedMsg (..),
AgentCryptoError (..),
cryptoErrToSyncState,
ATransmission,
Expand Down Expand Up @@ -788,6 +789,12 @@ data MsgMeta = MsgMeta
}
deriving (Eq, Show)

data DroppedMsg = DroppedMsg
{ brokerTs :: UTCTime,
attempts :: Int
}
deriving (Eq, Show)

data SMPConfirmation = SMPConfirmation
{ -- | sender's public key to use for authentication of sender's commands at the recepient's server
senderKey :: Maybe SndPublicAuthKey,
Expand Down Expand Up @@ -2050,12 +2057,13 @@ data SMPAgentError
A_LINK {linkErr :: String}
| -- | cannot decrypt message
A_CRYPTO {cryptoErr :: AgentCryptoError}
| -- | duplicate message - this error is detected by ratchet decryption - this message will be ignored and not shown
-- it may also indicate a loss of ratchet synchronization (when only one message is sent via copied ratchet)
A_DUPLICATE
| -- | duplicate message - this error is detected by ratchet decryption - this message will be ignored and not shown.
-- it may also indicate a loss of ratchet synchronization (when only one message is sent via copied ratchet).
-- when message is dropped after too many reception attempts, DroppedMsg is included.
A_DUPLICATE {droppedMsg_ :: Maybe DroppedMsg}
| -- | error in the message to add/delete/etc queue in connection
A_QUEUE {queueErr :: String}
deriving (Eq, Read, Show, Exception)
deriving (Eq, Show, Exception)

data AgentCryptoError
= -- | AES decryption error
Expand Down Expand Up @@ -2165,6 +2173,8 @@ $(J.deriveJSON (sumTypeJSON id) ''ConnectionErrorType)

$(J.deriveJSON (sumTypeJSON id) ''AgentCryptoError)

$(J.deriveJSON defaultJSON ''DroppedMsg)

$(J.deriveJSON (sumTypeJSON id) ''SMPAgentError)

$(J.deriveJSON (sumTypeJSON id) ''AgentErrorType)
Expand Down
14 changes: 14 additions & 0 deletions src/Simplex/Messaging/Agent/Store/AgentStore.hs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ module Simplex.Messaging.Agent.Store.AgentStore
setMsgUserAck,
getRcvMsg,
getLastMsg,
incMsgRcvAttempts,
checkRcvMsgHashExists,
getRcvMsgBrokerTs,
deleteMsg,
Expand Down Expand Up @@ -1110,6 +1111,19 @@ toRcvMsg ((agentMsgId, internalTs, brokerId, brokerTs) :. (sndMsgId, integrity,
msgReceipt = MsgReceipt <$> rcptInternalId_ <*> rcptStatus_
in RcvMsg {internalId = InternalId agentMsgId, msgMeta, msgType, msgBody, internalHash, msgReceipt, userAck}

incMsgRcvAttempts :: DB.Connection -> ConnId -> InternalId -> IO Int
incMsgRcvAttempts db connId (InternalId msgId) =
fromOnly . head
<$> DB.query
db
[sql|
UPDATE rcv_messages
SET receive_attempts = receive_attempts + 1
WHERE conn_id = ? AND internal_id = ?
RETURNING receive_attempts
|]
(connId, msgId)

checkRcvMsgHashExists :: DB.Connection -> ConnId -> ByteString -> IO Bool
checkRcvMsgHashExists db connId hash =
maybeFirstRow' False fromOnlyBI $
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20250702_conn_invitati
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251009_queue_to_subscribe
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251010_client_notices
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20251230_strict_tables
import Simplex.Messaging.Agent.Store.Postgres.Migrations.M20260410_receive_attempts
import Simplex.Messaging.Agent.Store.Shared (Migration (..))

schemaMigrations :: [(String, Text, Maybe Text)]
Expand All @@ -21,7 +22,8 @@ schemaMigrations =
("20250702_conn_invitations_remove_cascade_delete", m20250702_conn_invitations_remove_cascade_delete, Just down_m20250702_conn_invitations_remove_cascade_delete),
("20251009_queue_to_subscribe", m20251009_queue_to_subscribe, Just down_m20251009_queue_to_subscribe),
("20251010_client_notices", m20251010_client_notices, Just down_m20251010_client_notices),
("20251230_strict_tables", m20251230_strict_tables, Just down_m20251230_strict_tables)
("20251230_strict_tables", m20251230_strict_tables, Just down_m20251230_strict_tables),
("20260410_receive_attempts", m20260410_receive_attempts, Just down_m20260410_receive_attempts)
]

-- | The list of migrations in ascending order by date
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}

module Simplex.Messaging.Agent.Store.Postgres.Migrations.M20260410_receive_attempts where

import Data.Text (Text)
import Text.RawString.QQ (r)

m20260410_receive_attempts :: Text
m20260410_receive_attempts =
[r|
ALTER TABLE rcv_messages ADD COLUMN receive_attempts SMALLINT NOT NULL DEFAULT 0;
|]

down_m20260410_receive_attempts :: Text
down_m20260410_receive_attempts =
[r|
ALTER TABLE rcv_messages DROP COLUMN receive_attempts;
|]
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20250702_conn_invitation
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251009_queue_to_subscribe
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251010_client_notices
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20251230_strict_tables
import Simplex.Messaging.Agent.Store.SQLite.Migrations.M20260410_receive_attempts
import Simplex.Messaging.Agent.Store.Shared (Migration (..))

schemaMigrations :: [(String, Query, Maybe Query)]
Expand Down Expand Up @@ -93,7 +94,8 @@ schemaMigrations =
("m20250702_conn_invitations_remove_cascade_delete", m20250702_conn_invitations_remove_cascade_delete, Just down_m20250702_conn_invitations_remove_cascade_delete),
("m20251009_queue_to_subscribe", m20251009_queue_to_subscribe, Just down_m20251009_queue_to_subscribe),
("m20251010_client_notices", m20251010_client_notices, Just down_m20251010_client_notices),
("m20251230_strict_tables", m20251230_strict_tables, Just down_m20251230_strict_tables)
("m20251230_strict_tables", m20251230_strict_tables, Just down_m20251230_strict_tables),
("m20260410_receive_attempts", m20260410_receive_attempts, Just down_m20260410_receive_attempts)
]

-- | The list of migrations in ascending order by date
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{-# LANGUAGE QuasiQuotes #-}

module Simplex.Messaging.Agent.Store.SQLite.Migrations.M20260410_receive_attempts where

import Database.SQLite.Simple (Query)
import Database.SQLite.Simple.QQ (sql)

m20260410_receive_attempts :: Query
m20260410_receive_attempts =
[sql|
ALTER TABLE rcv_messages ADD COLUMN receive_attempts INTEGER NOT NULL DEFAULT 0;
|]

down_m20260410_receive_attempts :: Query
down_m20260410_receive_attempts =
[sql|
ALTER TABLE rcv_messages DROP COLUMN receive_attempts;
|]
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ CREATE TABLE rcv_messages(
integrity BLOB NOT NULL,
user_ack INTEGER NULL DEFAULT 0,
rcv_queue_id INTEGER CHECK(rcv_queue_id NOT NULL),
receive_attempts INTEGER NOT NULL DEFAULT 0,
PRIMARY KEY(conn_id, internal_rcv_id),
FOREIGN KEY(conn_id, internal_id) REFERENCES messages
ON DELETE CASCADE
Expand Down
35 changes: 34 additions & 1 deletion tests/AgentTests/FunctionalAPITests.hs
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ functionalAPITests ps = do
it "should expire multiple messages" $ testExpireManyMessages ps
it "should expire one message if quota is exceeded" $ testExpireMessageQuota ps
it "should expire multiple messages if quota is exceeded" $ testExpireManyMessagesQuota ps
it "should drop message after too many receive attempts" $ testDropMsgAfterRcvAttempts ps
#if !defined(dbPostgres)
-- TODO [postgres] restore from outdated db backup (we use copyFile/renameFile for sqlite)
describe "Ratchet synchronization" $ do
Expand Down Expand Up @@ -2101,6 +2102,38 @@ testExpireManyMessagesQuota (t, msType) = withSmpServerConfigOn t cfg' testPort
where
cfg' = updateCfg (cfgMS msType) $ \cfg_ -> cfg_ {msgQueueQuota = 1, maxJournalMsgCount = 2}

testDropMsgAfterRcvAttempts :: HasCallStack => (ASrvTransport, AStoreType) -> IO ()
testDropMsgAfterRcvAttempts ps =
withSmpServerStoreLogOn ps testPort $ \_ -> do
let rcvCfg = agentCfg {rcvExpireCount = 2, rcvExpireInterval = 1}
alice <- getSMPAgentClient' 1 agentCfg initAgentServers testDB
bob <- getSMPAgentClient' 2 rcvCfg initAgentServers testDB2
(aliceId, bobId) <- runRight $ makeConnection alice bob
-- alice sends, bob receives but does NOT ack
runRight_ $ do
2 <- sendMessage alice bobId SMP.noMsgFlags "hello"
get alice ##> ("", bobId, SENT 2)
get bob =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False
-- bob disconnects without acking
disposeAgentClient bob
threadDelay 500000
-- bob reconnects, agent sees duplicate, counter=1
bob2 <- getSMPAgentClient' 3 rcvCfg initAgentServers testDB2
runRight_ $ do
subscribeConnection bob2 aliceId
get bob2 =##> \case ("", c, Msg "hello") -> c == aliceId; _ -> False
-- bob disconnects again without acking
disposeAgentClient bob2
-- wait for rcvExpireInterval (1 second)
threadDelay 500000
-- bob reconnects, agent sees duplicate, counter=2, interval exceeded -> drops
bob3 <- getSMPAgentClient' 4 rcvCfg initAgentServers testDB2
runRight_ $ do
subscribeConnection bob3 aliceId
get bob3 =##> \case ("", c, ERR (AGENT (A_DUPLICATE (Just DroppedMsg {})))) -> c == aliceId; _ -> False
disposeAgentClient bob3
disposeAgentClient alice

testRatchetSync :: HasCallStack => (ASrvTransport, AStoreType) -> IO ()
testRatchetSync ps = withAgentClients2 $ \alice bob ->
withSmpServerStoreMsgLogOn ps testPort $ \_ -> do
Expand Down Expand Up @@ -3224,7 +3257,7 @@ phase c connId d p statsExpectation =
d `shouldBe` d'
p `shouldBe` p'
statsExpectation stats
ERR (AGENT A_DUPLICATE) -> phase c connId d p statsExpectation
ERR (AGENT A_DUPLICATE {}) -> phase c connId d p statsExpectation
r -> do
liftIO . putStrLn $ "expected: " <> show p <> ", received: " <> show r
SWITCH {} <- pure r
Expand Down
Loading