Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 68 additions & 9 deletions src/Simplex/Messaging/Agent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ module Simplex.Messaging.Agent
connRequestPQSupport,
createConnectionAsync,
setConnShortLinkAsync,
getConnShortLinkAsync,
joinConnectionAsync,
allowConnectionAsync,
acceptContactAsync,
Expand Down Expand Up @@ -351,9 +352,15 @@ setConnShortLinkAsync :: ConnectionModeI c => AgentClient -> ACorrId -> ConnId -
setConnShortLinkAsync c = withAgentEnv c .::. setConnShortLinkAsync' c
{-# INLINE setConnShortLinkAsync #-}

-- | Join SMP agent connection (JOIN command) asynchronously, synchronous response is new connection id
joinConnectionAsync :: AgentClient -> UserId -> ACorrId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AE ConnId
joinConnectionAsync c userId aCorrId enableNtfs = withAgentEnv c .:: joinConnAsync c userId aCorrId enableNtfs
-- | Get and verify data from short link (LGET/LKEY command) asynchronously, synchronous response is new connection id
getConnShortLinkAsync :: ConnectionModeI c => AgentClient -> UserId -> ACorrId -> ConnShortLink c -> AE ConnId
getConnShortLinkAsync c = withAgentEnv c .:. getConnShortLinkAsync' c
{-# INLINE getConnShortLinkAsync #-}

-- | Join SMP agent connection (JOIN command) asynchronously, synchronous response is new connection id.
-- If connId is provided (for contact URIs), it updates the existing connection record created by getConnShortLinkAsync.
joinConnectionAsync :: AgentClient -> UserId -> ACorrId -> Maybe ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AE ConnId
joinConnectionAsync c userId aCorrId connId_ enableNtfs = withAgentEnv c .:: joinConnAsync c userId aCorrId connId_ enableNtfs
{-# INLINE joinConnectionAsync #-}

-- | Allow connection to continue after CONF notification (LET command), no synchronous response
Expand Down Expand Up @@ -784,8 +791,9 @@ newConnNoQueues c userId enableNtfs cMode pqSupport = do

-- TODO [short links] TBC, but probably we will need async join for contact addresses as the contact will be created after user confirming the connection,
-- and join should retry, the same as 1-time invitation joins.
joinConnAsync :: AgentClient -> UserId -> ACorrId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AM ConnId
joinConnAsync c userId corrId enableNtfs cReqUri@CRInvitationUri {} cInfo pqSup subMode = do
joinConnAsync :: AgentClient -> UserId -> ACorrId -> Maybe ConnId -> Bool -> ConnectionRequestUri c -> ConnInfo -> PQSupport -> SubscriptionMode -> AM ConnId
joinConnAsync c userId corrId connId_ enableNtfs cReqUri@CRInvitationUri {} cInfo pqSup subMode = do
when (isJust connId_) $ throwE $ CMD PROHIBITED "joinConnAsync: connId not allowed for invitation URI"
withInvLock c (strEncode cReqUri) "joinConnAsync" $ do
lift (compatibleInvitationUri cReqUri) >>= \case
Just (_, Compatible (CR.E2ERatchetParams v _ _ _), Compatible connAgentVersion) -> do
Expand All @@ -796,8 +804,22 @@ joinConnAsync c userId corrId enableNtfs cReqUri@CRInvitationUri {} cInfo pqSup
enqueueCommand c corrId connId Nothing $ AClientCommand $ JOIN enableNtfs (ACR sConnectionMode cReqUri) pqSupport subMode cInfo
pure connId
Nothing -> throwE $ AGENT A_VERSION
joinConnAsync _c _userId _corrId _enableNtfs (CRContactUri _) _subMode _cInfo _pqEncryption =
throwE $ CMD PROHIBITED "joinConnAsync"
joinConnAsync c userId corrId connId_ enableNtfs cReqUri@(CRContactUri _) cInfo pqSup subMode = do
lift (compatibleContactUri cReqUri) >>= \case
Just (_, Compatible connAgentVersion) -> do
let pqSupport = pqSup `CR.pqSupportAnd` versionPQSupport_ connAgentVersion Nothing
connId <- case connId_ of
Just cId -> do
-- update connection record created by getConnShortLinkAsync
withStore' c $ \db -> updateNewConnJoin db cId connAgentVersion pqSupport enableNtfs
pure cId
Nothing -> do
g <- asks random
let cData = ConnData {userId, connId = "", connAgentVersion, enableNtfs, lastExternalSndId = 0, deleted = False, ratchetSyncState = RSOk, pqSupport}
withStore c $ \db -> createNewConn db g cData SCMInvitation
enqueueCommand c corrId connId Nothing $ AClientCommand $ JOIN enableNtfs (ACR sConnectionMode cReqUri) pqSupport subMode cInfo
pure connId
Nothing -> throwE $ AGENT A_VERSION

allowConnectionAsync' :: AgentClient -> ACorrId -> ConnId -> ConfirmationId -> ConnInfo -> AM ()
allowConnectionAsync' c corrId connId confId ownConnInfo =
Expand All @@ -816,7 +838,7 @@ acceptContactAsync' :: AgentClient -> UserId -> ACorrId -> Bool -> InvitationId
acceptContactAsync' c userId corrId enableNtfs invId ownConnInfo pqSupport subMode = do
Invitation {connReq} <- withStore c $ \db -> getInvitation db "acceptContactAsync'" invId
withStore' c $ \db -> acceptInvitation db invId ownConnInfo
joinConnAsync c userId corrId enableNtfs connReq ownConnInfo pqSupport subMode `catchAllErrors` \err -> do
joinConnAsync c userId corrId Nothing enableNtfs connReq ownConnInfo pqSupport subMode `catchAllErrors` \err -> do
withStore' c (`unacceptInvitation` invId)
throwE err

Expand Down Expand Up @@ -902,6 +924,33 @@ setConnShortLinkAsync' c corrId connId cMode userLinkData clientData =
_ -> throwE $ CMD PROHIBITED "setConnShortLinkAsync: invalid connection or mode"
enqueueCommand c corrId connId (Just srv) $ AClientCommand $ LSET (AUCLD cMode userLinkData) clientData

getConnShortLinkAsync' :: forall c. ConnectionModeI c => AgentClient -> UserId -> ACorrId -> ConnShortLink c -> AM ConnId
getConnShortLinkAsync' c userId corrId shortLink = do
g <- asks random
connId <- withStore c $ \db -> do
-- server is created so the command is processed in server queue,
-- not blocking other "no server" commands
void $ createServer db srv
prepareNewConn db g
enqueueCommand c corrId connId (Just srv) $ AClientCommand $ LGET (ACSL (sConnectionMode @c) shortLink)
pure connId
where
srv = case shortLink of
CSLInvitation _ s _ _ -> s
CSLContact _ _ s _ -> s
prepareNewConn db g = do
let cData = ConnData
{ userId,
connId = "",
connAgentVersion = currentSMPAgentVersion,
enableNtfs = False,
lastExternalSndId = 0,
deleted = False,
ratchetSyncState = RSOk,
pqSupport = PQSupportOff
}
createNewConn db g cData SCMInvitation

setConnShortLink' :: AgentClient -> NetworkRequestMode -> ConnId -> SConnectionMode c -> UserConnLinkData c -> Maybe CRClientData -> AM (ConnShortLink c)
setConnShortLink' c nm connId cMode userLinkData clientData =
withConnLock c connId "setConnShortLink" $ do
Expand Down Expand Up @@ -1680,11 +1729,22 @@ runCommandProcessing c@AgentClient {subQ} connId server_ Worker {doWork} = do
withServer' . tryCommand $ do
link <- setConnShortLink' c NRMBackground connId cMode userLinkData clientData
notify $ LINK (ACSL cMode link) auData
LGET (ACSL cMode shortLink) ->
withServer' . tryCommand $ do
(connReq, linkData) <- getConnShortLink' c NRMBackground userId shortLink
notify $ LDATA (ACR cMode connReq) (ACLD cMode linkData)
JOIN enableNtfs (ACR _ cReq@(CRInvitationUri ConnReqUriData {crSmpQueues = q :| _} _)) pqEnc subMode connInfo -> noServer $ do
triedHosts <- newTVarIO S.empty
tryCommand . withNextSrv c userId storageSrvs triedHosts [qServer q] $ \srv -> do
(sqSecured, service) <- joinConnSrvAsync c userId connId enableNtfs cReq connInfo pqEnc subMode srv
notify $ JOINED sqSecured service
-- TODO TBC using joinConnSrvAsync for contact URIs, with receive queue created asynchronously.
-- Currently joinConnSrv is used because even joinConnSrvAsync for invitation URIs creates receive queue synchronously.
JOIN enableNtfs (ACR _ cReq@(CRContactUri ConnReqUriData {crSmpQueues = q :| _})) pqEnc subMode connInfo -> noServer $ do
triedHosts <- newTVarIO S.empty
tryCommand . withNextSrv c userId storageSrvs triedHosts [qServer q] $ \srv -> do
(sqSecured, service) <- joinConnSrv c NRMBackground userId connId enableNtfs cReq connInfo pqEnc subMode srv
notify $ JOINED sqSecured service
LET confId ownCInfo -> withServer' . tryCommand $ allowConnection' c connId confId ownCInfo >> notify OK
ACK msgId rcptInfo_ -> withServer' . tryCommand $ ackMessage' c connId msgId rcptInfo_ >> notify OK
SWCH ->
Expand All @@ -1694,7 +1754,6 @@ runCommandProcessing c@AgentClient {subQ} connId server_ Worker {doWork} = do
switchDuplexConnection c NRMBackground conn replaced >>= notify . SWITCH QDRcv SPStarted
_ -> throwE $ CMD PROHIBITED "SWCH: not duplex"
DEL -> withServer' . tryCommand $ deleteConnection' c NRMBackground connId >> notify OK
_ -> notify $ ERR $ INTERNAL $ "unsupported async command " <> show (aCommandTag cmd)
AInternalCommand cmd -> case cmd of
ICAckDel rId srvMsgId msgId -> withServer $ \srv -> tryWithLock "ICAckDel" $ ack srv rId srvMsgId >> withStore' c (\db -> deleteMsg db connId msgId)
ICAck rId srvMsgId -> withServer $ \srv -> tryWithLock "ICAck" $ ack srv rId srvMsgId
Expand Down
22 changes: 22 additions & 0 deletions src/Simplex/Messaging/Agent/Protocol.hs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ module Simplex.Messaging.Agent.Protocol
CRClientData,
ServiceScheme,
FixedLinkData (..),
AConnLinkData (..),
ConnLinkData (..),
AUserConnLinkData (..),
UserConnLinkData (..),
Expand Down Expand Up @@ -384,6 +385,7 @@ type SndQueueSecured = Bool
data AEvent (e :: AEntity) where
INV :: AConnectionRequestUri -> Maybe ClientServiceId -> AEvent AEConn
LINK :: AConnShortLink -> AUserConnLinkData -> AEvent AEConn
LDATA :: AConnectionRequestUri -> AConnLinkData -> AEvent AEConn
CONF :: ConfirmationId -> PQSupport -> [SMPServer] -> ConnInfo -> AEvent AEConn -- ConnInfo is from sender, [SMPServer] will be empty only in v1 handshake
REQ :: InvitationId -> PQSupport -> NonEmpty SMPServer -> ConnInfo -> AEvent AEConn -- ConnInfo is from sender
INFO :: PQSupport -> ConnInfo -> AEvent AEConn
Expand Down Expand Up @@ -438,6 +440,7 @@ deriving instance Show AEvtTag
data ACommand
= NEW Bool AConnectionMode InitialKeys SubscriptionMode -- response INV
| LSET AUserConnLinkData (Maybe CRClientData) -- response LINK
| LGET AConnShortLink -- response LDATA
| JOIN Bool AConnectionRequestUri PQSupport SubscriptionMode ConnInfo
| LET ConfirmationId ConnInfo -- ConnInfo is from client
| ACK AgentMsgId (Maybe MsgReceiptInfo)
Expand All @@ -448,6 +451,7 @@ data ACommand
data ACommandTag
= NEW_
| LSET_
| LGET_
| JOIN_
| LET_
| ACK_
Expand All @@ -458,6 +462,7 @@ data ACommandTag
data AEventTag (e :: AEntity) where
INV_ :: AEventTag AEConn
LINK_ :: AEventTag AEConn
LDATA_ :: AEventTag AEConn
CONF_ :: AEventTag AEConn
REQ_ :: AEventTag AEConn
INFO_ :: AEventTag AEConn
Expand Down Expand Up @@ -505,6 +510,7 @@ aCommandTag :: ACommand -> ACommandTag
aCommandTag = \case
NEW {} -> NEW_
LSET {} -> LSET_
LGET _ -> LGET_
JOIN {} -> JOIN_
LET {} -> LET_
ACK {} -> ACK_
Expand All @@ -515,6 +521,7 @@ aEventTag :: AEvent e -> AEventTag e
aEventTag = \case
INV {} -> INV_
LINK {} -> LINK_
LDATA {} -> LDATA_
CONF {} -> CONF_
REQ {} -> REQ_
INFO {} -> INFO_
Expand Down Expand Up @@ -1701,6 +1708,10 @@ data ConnLinkData c where
InvitationLinkData :: VersionRangeSMPA -> UserLinkData -> ConnLinkData 'CMInvitation
ContactLinkData :: VersionRangeSMPA -> UserContactData -> ConnLinkData 'CMContact

deriving instance Eq (ConnLinkData c)

deriving instance Show (ConnLinkData c)

data UserContactData = UserContactData
{ -- direct connection via connReq in fixed data is allowed.
direct :: Bool,
Expand All @@ -1717,6 +1728,13 @@ newtype UserLinkData = UserLinkData ByteString

data AConnLinkData = forall m. ConnectionModeI m => ACLD (SConnectionMode m) (ConnLinkData m)

instance Eq AConnLinkData where
ACLD m d == ACLD m' d' = case testEquality m m' of
Just Refl -> d == d'
Nothing -> False

deriving instance Show AConnLinkData

data UserConnLinkData c where
UserInvLinkData :: UserLinkData -> UserConnLinkData 'CMInvitation
UserContactLinkData :: UserContactData -> UserConnLinkData 'CMContact
Expand Down Expand Up @@ -2029,6 +2047,7 @@ instance StrEncoding ACommandTag where
A.takeTill (== ' ') >>= \case
"NEW" -> pure NEW_
"LSET" -> pure LSET_
"LGET" -> pure LGET_
"JOIN" -> pure JOIN_
"LET" -> pure LET_
"ACK" -> pure ACK_
Expand All @@ -2038,6 +2057,7 @@ instance StrEncoding ACommandTag where
strEncode = \case
NEW_ -> "NEW"
LSET_ -> "LSET"
LGET_ -> "LGET"
JOIN_ -> "JOIN"
LET_ -> "LET"
ACK_ -> "ACK"
Expand All @@ -2050,6 +2070,7 @@ commandP binaryP =
>>= \case
NEW_ -> s (NEW <$> strP_ <*> strP_ <*> pqIKP <*> (strP <|> pure SMP.SMSubscribe))
LSET_ -> s (LSET <$> strP <*> optional (A.space *> strP))
LGET_ -> s (LGET <$> strP)
JOIN_ -> s (JOIN <$> strP_ <*> strP_ <*> pqSupP <*> (strP_ <|> pure SMP.SMSubscribe) <*> binaryP)
LET_ -> s (LET <$> A.takeTill (== ' ') <* A.space <*> binaryP)
ACK_ -> s (ACK <$> A.decimal <*> optional (A.space *> binaryP))
Expand All @@ -2068,6 +2089,7 @@ serializeCommand :: ACommand -> ByteString
serializeCommand = \case
NEW ntfs cMode pqIK subMode -> s (NEW_, ntfs, cMode, pqIK, subMode)
LSET uld cd_ -> s (LSET_, uld) <> maybe "" (B.cons ' ' . s) cd_
LGET sl -> s (LGET_, sl)
JOIN ntfs cReq pqSup subMode cInfo -> s (JOIN_, ntfs, cReq, pqSup, subMode, Str $ serializeBinary cInfo)
LET confId cInfo -> B.unwords [s LET_, confId, serializeBinary cInfo]
ACK mId rcptInfo_ -> s (ACK_, mId) <> maybe "" (B.cons ' ' . serializeBinary) rcptInfo_
Expand Down
18 changes: 12 additions & 6 deletions src/Simplex/Messaging/Agent/Store/AgentStore.hs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ module Simplex.Messaging.Agent.Store.AgentStore
checkUser,

-- * Queues and connections
createServer,
createNewConn,
updateNewConnRcv,
updateNewConnSnd,
Expand All @@ -57,6 +58,7 @@ module Simplex.Messaging.Agent.Store.AgentStore
setConnUserId,
setConnAgentVersion,
setConnPQSupport,
updateNewConnJoin,
getDeletedConnIds,
getDeletedWaitingDeliveryConnIds,
setConnRatchetSync,
Expand Down Expand Up @@ -432,7 +434,7 @@ createSndConn db gVar cData q@SndQueue {server} =
-- check confirmed snd queue doesn't already exist, to prevent it being deleted by REPLACE in insertSndQueue_
ifM (liftIO $ checkConfirmedSndQueueExists_ db q) (pure $ Left SESndQueueExists) $
createConn_ db gVar cData $ \connId -> do
serverKeyHash_ <- createServer_ db server
serverKeyHash_ <- createServer db server
createConnRecord db connId cData SCMInvitation
insertSndQueue_ db connId q serverKeyHash_

Expand Down Expand Up @@ -519,7 +521,7 @@ addConnRcvQueue db connId rq subMode =

addConnRcvQueue_ :: DB.Connection -> ConnId -> NewRcvQueue -> SubscriptionMode -> IO RcvQueue
addConnRcvQueue_ db connId rq@RcvQueue {server} subMode = do
serverKeyHash_ <- createServer_ db server
serverKeyHash_ <- createServer db server
insertRcvQueue_ db connId rq subMode serverKeyHash_

addConnSndQueue :: DB.Connection -> ConnId -> NewSndQueue -> IO (Either StoreError SndQueue)
Expand All @@ -531,7 +533,7 @@ addConnSndQueue db connId sq =

addConnSndQueue_ :: DB.Connection -> ConnId -> NewSndQueue -> IO SndQueue
addConnSndQueue_ db connId sq@SndQueue {server} = do
serverKeyHash_ <- createServer_ db server
serverKeyHash_ <- createServer db server
insertSndQueue_ db connId sq serverKeyHash_

setRcvQueueStatus :: DB.Connection -> RcvQueue -> QueueStatus -> IO ()
Expand Down Expand Up @@ -829,7 +831,7 @@ deleteInvShortLink db srv lnkId =

createInvShortLink :: DB.Connection -> InvShortLink -> IO ()
createInvShortLink db InvShortLink {server, linkId, linkKey, sndPrivateKey, sndId} = do
serverKeyHash_ <- createServer_ db server
serverKeyHash_ <- createServer db server
DB.execute
db
[sql|
Expand Down Expand Up @@ -2024,8 +2026,8 @@ instance (ToField a, ToField b, ToField c, ToField d, ToField e, ToField f,
-- * Server helper

-- | Creates a new server, if it doesn't exist, and returns the passed key hash if it is different from stored.
createServer_ :: DB.Connection -> SMPServer -> IO (Maybe C.KeyHash)
createServer_ db newSrv@ProtocolServer {host, port, keyHash} = do
createServer :: DB.Connection -> SMPServer -> IO (Maybe C.KeyHash)
createServer db newSrv@ProtocolServer {host, port, keyHash} = do
r <- insertNewServer_
if null r
then getServerKeyHash_ db newSrv >>= either E.throwIO pure
Expand Down Expand Up @@ -2406,6 +2408,10 @@ setConnPQSupport :: DB.Connection -> ConnId -> PQSupport -> IO ()
setConnPQSupport db connId pqSupport =
DB.execute db "UPDATE connections SET pq_support = ? WHERE conn_id = ?" (pqSupport, connId)

updateNewConnJoin :: DB.Connection -> ConnId -> VersionSMPA -> PQSupport -> Bool -> IO ()
updateNewConnJoin db connId aVersion pqSupport enableNtfs =
DB.execute db "UPDATE connections SET smp_agent_version = ?, pq_support = ?, enable_ntfs = ? WHERE conn_id = ?" (aVersion, pqSupport, BI enableNtfs, connId)

getDeletedConnIds :: DB.Connection -> IO [ConnId]
getDeletedConnIds db = map fromOnly <$> DB.query db "SELECT conn_id FROM connections WHERE deleted = ?" (Only (BI True))

Expand Down
4 changes: 0 additions & 4 deletions tests/AgentTests/EqInstances.hs
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ deriving instance Eq ClientNtfCreds

deriving instance Eq ShortLinkCreds

deriving instance Show (ConnLinkData c)

deriving instance Eq (ConnLinkData c)

deriving instance Show ProxiedRelay

deriving instance Eq ProxiedRelay
Loading
Loading