-
Notifications
You must be signed in to change notification settings - Fork 754
Expand file tree
/
Copy pathNodeToNode.hs
More file actions
204 lines (188 loc) · 9.03 KB
/
NodeToNode.hs
File metadata and controls
204 lines (188 loc) · 9.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE PackageImports #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
{-# LANGUAGE TypeOperators #-}
{-# OPTIONS_GHC -fno-warn-orphans -Wno-unticked-promoted-constructors -Wno-all-missed-specialisations #-}
module Cardano.Benchmarking.GeneratorTx.NodeToNode
( ConnectClient
, benchmarkConnectTxSubmit
) where
import Cardano.Prelude (forever, liftIO, throwIO)
import Prelude
import "contra-tracer" Control.Tracer (Tracer (..))
import Codec.Serialise (DeserialiseFailure)
import Control.Concurrent.Class.MonadSTM.Strict (newTVarIO)
import Control.Monad.Class.MonadTimer (MonadTimer, threadDelay)
import Data.ByteString.Lazy (ByteString)
import Data.Foldable (fold)
import qualified Data.Map.Strict as Map
import Data.Proxy (Proxy (..))
import Data.Void (Void, absurd)
import qualified Network.Mux as Mux
import Network.Socket (AddrInfo (..))
import System.Random (newStdGen)
import Ouroboros.Consensus.Block.Abstract
import Ouroboros.Consensus.Byron.Ledger.Mempool (GenTx)
import qualified Ouroboros.Consensus.Cardano as Consensus (CardanoBlock)
import Ouroboros.Consensus.Ledger.SupportsMempool (GenTxId)
import Ouroboros.Consensus.Network.NodeToNode (Codecs (..), defaultCodecs)
import Ouroboros.Consensus.Node.NetworkProtocolVersion
import Ouroboros.Consensus.Node.Run (RunNode)
import Ouroboros.Consensus.Shelley.Eras (StandardCrypto)
import Ouroboros.Network.Channel (Channel (..))
import Ouroboros.Network.Context
import Ouroboros.Network.ControlMessage (continueForever)
import Ouroboros.Network.DeltaQ (defaultGSV)
import Ouroboros.Network.Driver (runPeer, runPeerWithLimits)
import Ouroboros.Network.KeepAlive
import Ouroboros.Network.Magic
import Ouroboros.Network.Mux (MiniProtocolCb (..),
OuroborosApplication (..), OuroborosBundle, RunMiniProtocol (..))
import Cardano.Network.NodeToClient (chainSyncPeerNull)
import Cardano.Network.NodeToNode (NetworkConnectTracers (..))
import qualified Cardano.Network.NodeToNode as NtN
import Ouroboros.Network.PeerSelection.PeerSharing (PeerSharing (..))
import Ouroboros.Network.PeerSelection.PeerSharing.Codec (decodeRemoteAddress,
encodeRemoteAddress)
import Ouroboros.Network.Protocol.BlockFetch.Client (BlockFetchClient (..),
blockFetchClientPeer)
import Ouroboros.Network.Protocol.Handshake.Version (simpleSingletonVersions)
import Ouroboros.Network.Protocol.KeepAlive.Client hiding (SendMsgDone)
import Ouroboros.Network.Protocol.KeepAlive.Codec
import Ouroboros.Network.Protocol.TxSubmission2.Client (TxSubmissionClient,
txSubmissionClientPeer)
import Ouroboros.Network.Protocol.PeerSharing.Client (PeerSharingClient (..),
peerSharingClientPeer)
import Ouroboros.Network.Snocket (socketSnocket)
import Cardano.Benchmarking.LogTypes (EnvConsts (..), SendRecvConnect, SendRecvTxSubmission2)
import Cardano.TxGenerator.Setup.NixService (defaultKeepaliveTimeout, getKeepaliveTimeout)
type CardanoBlock = Consensus.CardanoBlock StandardCrypto
type ConnectClient = AddrInfo -> TxSubmissionClient (GenTxId CardanoBlock) (GenTx CardanoBlock) IO () -> IO ()
benchmarkConnectTxSubmit
:: forall blk. (blk ~ CardanoBlock, RunNode blk )
=> EnvConsts
-> Tracer IO SendRecvConnect
-> Tracer IO SendRecvTxSubmission2
-> CodecConfig CardanoBlock
-> NetworkMagic
-> AddrInfo
-- ^ remote address information
-> TxSubmissionClient (GenTxId blk) (GenTx blk) IO ()
-- ^ the particular txSubmission peer
-> IO ()
benchmarkConnectTxSubmit EnvConsts { .. } handshakeTracer submissionTracer codecConfig networkMagic remoteAddr myTxSubClient = do
done <- NtN.connectTo
(socketSnocket envIOManager)
NetworkConnectTracers {
nctMuxTracers = Mux.nullTracers,
nctHandshakeTracer = handshakeTracer
}
peerMultiplex
(addrAddress <$> Nothing)
(addrAddress remoteAddr)
case done of
Left err -> throwIO err
Right choice -> case choice of
Left () -> return ()
Right void -> absurd void
where
ownPeerSharing = PeerSharingDisabled
mkApp :: OuroborosBundle mode initiatorCtx responderCtx bs m a b
-> OuroborosApplication mode initiatorCtx responderCtx bs m a b
mkApp bundle =
OuroborosApplication $ fold bundle
n2nVer :: NodeToNodeVersion
n2nVer = NodeToNodeV_14
blkN2nVer :: BlockNodeToNodeVersion blk
blkN2nVer = supportedVers Map.! n2nVer
supportedVers :: Map.Map NodeToNodeVersion (BlockNodeToNodeVersion blk)
supportedVers = supportedNodeToNodeVersions (Proxy @blk)
myCodecs :: Codecs blk NtN.RemoteAddress DeserialiseFailure IO
ByteString ByteString ByteString ByteString ByteString ByteString
ByteString
myCodecs = defaultCodecs codecConfig blkN2nVer encodeRemoteAddress decodeRemoteAddress n2nVer
peerMultiplex :: NtN.Versions NodeToNodeVersion
NtN.NodeToNodeVersionData
(OuroborosApplication
'Mux.InitiatorMode
(MinimalInitiatorContext NtN.RemoteAddress)
(ResponderContext NtN.RemoteAddress)
ByteString IO () Void)
peerMultiplex =
simpleSingletonVersions
n2nVer
(NtN.NodeToNodeVersionData
{ NtN.networkMagic = networkMagic
, NtN.diffusionMode = NtN.InitiatorOnlyDiffusionMode
, NtN.peerSharing = ownPeerSharing
, NtN.query = False
}) $
\n2nData ->
mkApp $
NtN.nodeToNodeProtocols NtN.defaultMiniProtocolParameters
NtN.NodeToNodeProtocols
{ NtN.chainSyncProtocol = InitiatorProtocolOnly $ MiniProtocolCb $ \_ctx channel ->
runPeer
mempty
(cChainSyncCodec myCodecs)
channel
chainSyncPeerNull
, NtN.blockFetchProtocol = InitiatorProtocolOnly $ MiniProtocolCb $ \_ctx channel ->
runPeer
mempty
(cBlockFetchCodec myCodecs)
channel
(blockFetchClientPeer blockFetchClientNull)
, NtN.keepAliveProtocol = InitiatorProtocolOnly $ MiniProtocolCb $ \ctx channel ->
kaClient n2nVer (remoteAddress $ micConnectionId ctx) channel
, NtN.txSubmissionProtocol = InitiatorProtocolOnly $ MiniProtocolCb $ \_ctx channel ->
runPeer
submissionTracer
(cTxSubmission2Codec myCodecs)
channel
(txSubmissionClientPeer myTxSubClient)
, NtN.peerSharingProtocol = InitiatorProtocolOnly $ MiniProtocolCb $ \_ctx channel ->
runPeer
mempty
(cPeerSharingCodec myCodecs)
channel
(peerSharingClientPeer peerSharingClientNull)
}
n2nVer
n2nData
-- Stolen from: Ouroboros/Consensus/Network/NodeToNode.hs
kaClient
:: Ord remotePeer
=> NodeToNodeVersion
-> remotePeer
-> Channel IO ByteString
-> IO ((), Maybe ByteString)
kaClient _version them channel = do
keepAliveRng <- newStdGen
peerGSVMap <- liftIO . newTVarIO $ Map.singleton them defaultGSV
runPeerWithLimits
mempty
(cKeepAliveCodec myCodecs)
(byteLimitsKeepAlive (const 0)) -- TODO: Real Bytelimits, see #1727
timeLimitsKeepAlive
channel
$ keepAliveClientPeer
$ keepAliveClient
mempty
keepAliveRng
(continueForever (Proxy :: Proxy IO)) them peerGSVMap
(KeepAliveInterval $ maybe defaultKeepaliveTimeout getKeepaliveTimeout envNixSvcOpts)
-- the null block fetch client
blockFetchClientNull
:: forall block point m a. MonadTimer m
=> BlockFetchClient block point m a
blockFetchClientNull
= BlockFetchClient $ forever $ threadDelay (24 * 60 * 60) {- one day in seconds -}
-- the null peer sharing client
peerSharingClientNull
:: forall addr m a. MonadTimer m
=> PeerSharingClient addr m a
peerSharingClientNull = SendMsgDone $ forever $ threadDelay (24 * 60 * 60) {- one day in seconds -}