-
-
Notifications
You must be signed in to change notification settings - Fork 94
Expand file tree
/
Copy pathSQLite.hs
More file actions
343 lines (311 loc) · 11.7 KB
/
SQLite.hs
File metadata and controls
343 lines (311 loc) · 11.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
{-# LANGUAGE ConstraintKinds #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TemplateHaskell #-}
{-# LANGUAGE TypeApplications #-}
{-# OPTIONS_GHC -fno-warn-unticked-promoted-constructors #-}
module Simplex.Messaging.Agent.Env.SQLite
( AM',
AM,
AgentConfig (..),
InitialAgentServers (..),
ServerCfg (..),
ServerRoles (..),
OperatorId,
UserServers (..),
NetworkConfig (..),
presetServerCfg,
allRoles,
mkUserServers,
serverHosts,
defaultAgentConfig,
defaultReconnectInterval,
Env (..),
newSMPAgentEnv,
createAgentStore,
NtfSupervisor (..),
NtfSupervisorCommand (..),
XFTPAgent (..),
Worker (..),
RestartCount (..),
updateRestartCount,
)
where
import Control.Concurrent (ThreadId)
import Control.Monad.Except
import Control.Monad.IO.Unlift
import Control.Monad.Reader
import Crypto.Random
import Data.Aeson (FromJSON (..), ToJSON (..))
import qualified Data.Aeson.TH as JQ
import Data.Int (Int64)
import Data.List.NonEmpty (NonEmpty)
import qualified Data.List.NonEmpty as L
import Data.Map.Strict (Map)
import Data.Maybe (fromMaybe)
import Data.Set (Set)
import qualified Data.Set as S
import Data.Time.Clock (NominalDiffTime, nominalDay)
import Data.Time.Clock.System (SystemTime (..))
import Data.Word (Word16)
import Network.Socket
import Numeric.Natural
import Simplex.FileTransfer.Client (XFTPClientConfig (..), defaultXFTPClientConfig)
import Simplex.Messaging.Agent.Protocol
import Simplex.Messaging.Agent.RetryInterval
import Simplex.Messaging.Agent.Store (createStore)
import Simplex.Messaging.Agent.Store.Common (DBStore)
import Simplex.Messaging.Agent.Store.Interface (DBOpts)
import Simplex.Messaging.Agent.Store.Shared (MigrationConfig (..), MigrationError (..))
import Simplex.Messaging.Client
import qualified Simplex.Messaging.Crypto as C
import Simplex.Messaging.Crypto.Ratchet (VersionRangeE2E, supportedE2EEncryptVRange)
import Simplex.Messaging.Notifications.Client (defaultNTFClientConfig)
import Simplex.Messaging.Notifications.Transport (NTFVersion)
import Simplex.Messaging.Notifications.Types
import Simplex.Messaging.Parsers (defaultJSON)
import Simplex.Messaging.Protocol (NtfServer, ProtoServerWithAuth (..), ProtocolServer (..), ProtocolType (..), ProtocolTypeI, VersionRangeSMPC, XFTPServer, supportedSMPClientVRange)
import Simplex.Messaging.TMap (TMap)
import qualified Simplex.Messaging.TMap as TM
import Simplex.Messaging.Transport (SMPVersion)
import Simplex.Messaging.Transport.Client (TransportHost)
import System.Mem.Weak (Weak)
import System.Random (StdGen, newStdGen)
import UnliftIO.STM
type AM' a = ReaderT Env IO a
type AM a = ExceptT AgentErrorType (ReaderT Env IO) a
data InitialAgentServers = InitialAgentServers
{ smp :: Map UserId (NonEmpty (ServerCfg 'PSMP)),
ntf :: [NtfServer],
xftp :: Map UserId (NonEmpty (ServerCfg 'PXFTP)),
netCfg :: NetworkConfig,
presetDomains :: [HostName],
presetServers :: [SMPServer]
}
data ServerCfg p = ServerCfg
{ server :: ProtoServerWithAuth p,
operator :: Maybe OperatorId,
enabled :: Bool,
roles :: ServerRoles
}
deriving (Show)
data ServerRoles = ServerRoles
{ storage :: Bool,
proxy :: Bool
}
deriving (Show)
allRoles :: ServerRoles
allRoles = ServerRoles True True
presetServerCfg :: Bool -> ServerRoles -> Maybe OperatorId -> ProtoServerWithAuth p -> ServerCfg p
presetServerCfg enabled roles operator server =
ServerCfg {server, operator, enabled, roles}
data UserServers p = UserServers
{ storageSrvs :: NonEmpty (Maybe OperatorId, ProtoServerWithAuth p),
proxySrvs :: NonEmpty (Maybe OperatorId, ProtoServerWithAuth p),
knownHosts :: Set TransportHost
}
type OperatorId = Int64
-- This function sets all servers as enabled in case all passed servers are disabled.
mkUserServers :: NonEmpty (ServerCfg p) -> UserServers p
mkUserServers srvs = UserServers {storageSrvs = filterSrvs storage, proxySrvs = filterSrvs proxy, knownHosts}
where
filterSrvs role = L.map (\ServerCfg {operator, server} -> (operator, server)) $ fromMaybe srvs $ L.nonEmpty $ L.filter (\ServerCfg {enabled, roles} -> enabled && role roles) srvs
knownHosts = S.unions $ L.map (\ServerCfg {server = ProtoServerWithAuth srv _} -> serverHosts srv) srvs
serverHosts :: ProtocolServer p -> Set TransportHost
serverHosts ProtocolServer {host} = S.fromList $ L.toList host
data AgentConfig = AgentConfig
{ tcpPort :: Maybe ServiceName,
rcvAuthAlg :: C.AuthAlg,
sndAuthAlg :: C.AuthAlg,
connIdBytes :: Int,
tbqSize :: Natural,
smpCfg :: ProtocolClientConfig SMPVersion,
ntfCfg :: ProtocolClientConfig NTFVersion,
xftpCfg :: XFTPClientConfig,
reconnectInterval :: RetryInterval,
messageRetryInterval :: RetryInterval2,
userNetworkInterval :: Int,
userOfflineDelay :: NominalDiffTime,
messageTimeout :: NominalDiffTime,
connDeleteDeliveryTimeout :: NominalDiffTime,
helloTimeout :: NominalDiffTime,
quotaExceededTimeout :: NominalDiffTime,
persistErrorInterval :: NominalDiffTime,
initialCleanupDelay :: Int64,
cleanupInterval :: Int64,
cleanupBatchSize :: Int,
initialLogStatsDelay :: Int64,
logStatsInterval :: Int64,
cleanupStepInterval :: Int,
maxWorkerRestartsPerMin :: Int,
storedMsgDataTTL :: NominalDiffTime,
rcvFilesTTL :: NominalDiffTime,
sndFilesTTL :: NominalDiffTime,
xftpConsecutiveRetries :: Int,
xftpMaxRecipientsPerRequest :: Int,
deleteErrorCount :: Int,
ntfCron :: Word16,
ntfBatchSize :: Int,
ntfSubFirstCheckInterval :: NominalDiffTime,
ntfSubCheckInterval :: NominalDiffTime,
subsBatchSize :: Int,
caCertificateFile :: FilePath,
privateKeyFile :: FilePath,
certificateFile :: FilePath,
rcvExpireCount :: Int,
rcvExpireInterval :: NominalDiffTime,
e2eEncryptVRange :: VersionRangeE2E,
smpAgentVRange :: VersionRangeSMPA,
smpClientVRange :: VersionRangeSMPC
}
defaultReconnectInterval :: RetryInterval
defaultReconnectInterval =
RetryInterval
{ initialInterval = 2_000000,
increaseAfter = 10_000000,
maxInterval = 180_000000
}
defaultMessageRetryInterval :: RetryInterval2
defaultMessageRetryInterval =
RetryInterval2
{ riFast =
RetryInterval
{ initialInterval = 2_000000,
increaseAfter = 10_000000,
maxInterval = 120_000000
},
riSlow =
RetryInterval
{ initialInterval = 300_000000, -- 5 minutes
increaseAfter = 60_000000,
maxInterval = 6 * 3600_000000 -- 6 hours
}
}
defaultAgentConfig :: AgentConfig
defaultAgentConfig =
AgentConfig
{ tcpPort = Just "5224",
-- while the current client version supports X25519, it can only be enabled once support for SMP v6 is dropped,
-- and all servers are required to support v7 to be compatible.
rcvAuthAlg = C.AuthAlg C.SEd25519, -- this will stay as Ed25519
sndAuthAlg = C.AuthAlg C.SEd25519, -- TODO replace with X25519 when switching to v7
connIdBytes = 12,
tbqSize = 128,
smpCfg = defaultSMPClientConfig,
ntfCfg = defaultNTFClientConfig,
xftpCfg = defaultXFTPClientConfig,
reconnectInterval = defaultReconnectInterval,
messageRetryInterval = defaultMessageRetryInterval,
userNetworkInterval = 1800_000000, -- 30 minutes, should be less than Int32 max value
userOfflineDelay = 2, -- if network offline event happens in less than 2 seconds after it was set online, it is ignored
messageTimeout = 2 * nominalDay,
connDeleteDeliveryTimeout = 2 * nominalDay,
helloTimeout = 2 * nominalDay,
quotaExceededTimeout = 7 * nominalDay,
persistErrorInterval = 3, -- seconds
initialCleanupDelay = 30 * 1000000, -- 30 seconds
cleanupInterval = 5 * 60 * 1000000, -- 5 minutes
cleanupBatchSize = 10000,
initialLogStatsDelay = 10 * 1000000, -- 10 seconds
logStatsInterval = 10 * 1000000, -- 10 seconds
cleanupStepInterval = 200000, -- 200ms
maxWorkerRestartsPerMin = 5,
storedMsgDataTTL = 21 * nominalDay,
rcvFilesTTL = 2 * nominalDay,
sndFilesTTL = nominalDay,
xftpConsecutiveRetries = 3,
xftpMaxRecipientsPerRequest = 200,
deleteErrorCount = 10,
ntfCron = 20, -- minutes
ntfBatchSize = 150,
ntfSubFirstCheckInterval = nominalDay,
ntfSubCheckInterval = 3 * nominalDay,
subsBatchSize = 1350,
-- CA certificate private key is not needed for initialization
-- ! we do not generate these
caCertificateFile = "/etc/opt/simplex-agent/ca.crt",
privateKeyFile = "/etc/opt/simplex-agent/agent.key",
certificateFile = "/etc/opt/simplex-agent/agent.crt",
rcvExpireCount = 8,
rcvExpireInterval = nominalDay,
e2eEncryptVRange = supportedE2EEncryptVRange,
smpAgentVRange = supportedSMPAgentVRange,
smpClientVRange = supportedSMPClientVRange
}
data Env = Env
{ config :: AgentConfig,
store :: DBStore,
random :: TVar ChaChaDRG,
randomServer :: TVar StdGen,
ntfSupervisor :: NtfSupervisor,
xftpAgent :: XFTPAgent,
multicastSubscribers :: TMVar Int
}
newSMPAgentEnv :: AgentConfig -> DBStore -> IO Env
newSMPAgentEnv config store = do
random <- C.newRandom
randomServer <- newTVarIO =<< liftIO newStdGen
ntfSupervisor <- newNtfSubSupervisor $ tbqSize config
xftpAgent <- newXFTPAgent
multicastSubscribers <- newTMVarIO 0
pure Env {config, store, random, randomServer, ntfSupervisor, xftpAgent, multicastSubscribers}
createAgentStore :: DBOpts -> MigrationConfig -> IO (Either MigrationError DBStore)
createAgentStore = createStore
data NtfSupervisor = NtfSupervisor
{ ntfTkn :: TVar (Maybe NtfToken),
ntfSubQ :: TBQueue (NtfSupervisorCommand, NonEmpty ConnId),
ntfWorkers :: TMap NtfServer Worker,
ntfSMPWorkers :: TMap SMPServer Worker,
ntfTknDelWorkers :: TMap NtfServer Worker
}
data NtfSupervisorCommand = NSCCreate | NSCSmpDelete | NSCDeleteSub
deriving (Show)
newNtfSubSupervisor :: Natural -> IO NtfSupervisor
newNtfSubSupervisor qSize = do
ntfTkn <- newTVarIO Nothing
ntfSubQ <- newTBQueueIO qSize
ntfWorkers <- TM.emptyIO
ntfSMPWorkers <- TM.emptyIO
ntfTknDelWorkers <- TM.emptyIO
pure NtfSupervisor {ntfTkn, ntfSubQ, ntfWorkers, ntfSMPWorkers, ntfTknDelWorkers}
data XFTPAgent = XFTPAgent
{ -- if set, XFTP file paths will be considered as relative to this directory
xftpWorkDir :: TVar (Maybe FilePath),
xftpRcvWorkers :: TMap (Maybe XFTPServer) Worker,
xftpSndWorkers :: TMap (Maybe XFTPServer) Worker,
xftpDelWorkers :: TMap XFTPServer Worker
}
newXFTPAgent :: IO XFTPAgent
newXFTPAgent = do
xftpWorkDir <- newTVarIO Nothing
xftpRcvWorkers <- TM.emptyIO
xftpSndWorkers <- TM.emptyIO
xftpDelWorkers <- TM.emptyIO
pure XFTPAgent {xftpWorkDir, xftpRcvWorkers, xftpSndWorkers, xftpDelWorkers}
data Worker = Worker
{ workerId :: Int,
doWork :: TMVar (),
action :: TMVar (Maybe (Weak ThreadId)),
restarts :: TVar RestartCount
}
data RestartCount = RestartCount
{ restartMinute :: Int64,
restartCount :: Int
}
updateRestartCount :: SystemTime -> RestartCount -> RestartCount
updateRestartCount t (RestartCount minute count) = do
let min' = systemSeconds t `div` 60
in RestartCount min' $ if minute == min' then count + 1 else 1
$(pure [])
$(JQ.deriveJSON defaultJSON ''ServerRoles)
instance ProtocolTypeI p => ToJSON (ServerCfg p) where
toEncoding = $(JQ.mkToEncoding defaultJSON ''ServerCfg)
toJSON = $(JQ.mkToJSON defaultJSON ''ServerCfg)
instance ProtocolTypeI p => FromJSON (ServerCfg p) where
parseJSON = $(JQ.mkParseJSON defaultJSON ''ServerCfg)