From 0e010576b412ce66b06e53cae33889ecf20010fe Mon Sep 17 00:00:00 2001 From: Mykhailo Kremniov Date: Wed, 11 Mar 2026 14:53:45 +0200 Subject: [PATCH 1/3] P2p: count connections without peer activity. Use the counter when calculating `next_connect_time` the same way `fail_count` has been used. --- .../src/crawler_p2p/crawler_manager/mod.rs | 19 +- .../crawler_manager/tests/mock_manager.rs | 9 +- p2p/src/message.rs | 9 +- p2p/src/net/default_backend/backend.rs | 9 +- p2p/src/net/default_backend/mod.rs | 6 +- p2p/src/net/default_backend/peer.rs | 27 +- p2p/src/net/default_backend/types.rs | 8 +- p2p/src/net/mod.rs | 13 +- p2p/src/net/types.rs | 93 +- p2p/src/peer_manager/mod.rs | 127 +- .../peer_manager/peerdb/address_data/mod.rs | 214 ++- .../peer_manager/peerdb/address_data/tests.rs | 64 +- p2p/src/peer_manager/peerdb/mod.rs | 34 +- p2p/src/peer_manager/peerdb/tests.rs | 30 +- p2p/src/peer_manager/tests/ban.rs | 8 +- p2p/src/peer_manager/tests/connections.rs | 8 +- p2p/src/peer_manager/tests/discouragement.rs | 8 +- p2p/src/peer_manager/tests/mod.rs | 12 +- p2p/src/peer_manager/tests/peer_types.rs | 3 +- .../unsuccessful_connection_counter_update.rs | 1276 +++++++++++++++++ p2p/src/peer_manager/tests/utils.rs | 84 +- p2p/src/tests/helpers/mod.rs | 29 +- test-utils/src/basic_test_time_getter.rs | 12 + utils/src/exp_rand/mod.rs | 4 +- 24 files changed, 1945 insertions(+), 161 deletions(-) create mode 100644 p2p/src/peer_manager/tests/unsuccessful_connection_counter_update.rs diff --git a/dns-server/src/crawler_p2p/crawler_manager/mod.rs b/dns-server/src/crawler_p2p/crawler_manager/mod.rs index 73774ee860..9bc5d0f28a 100644 --- a/dns-server/src/crawler_p2p/crawler_manager/mod.rs +++ b/dns-server/src/crawler_p2p/crawler_manager/mod.rs @@ -34,7 +34,7 @@ use p2p::{ PingResponse, }, net::{ - types::{ConnectivityEvent, SyncingEvent}, + types::{ConnectivityEvent, PeerManagerMessageOrTag, SyncingEvent}, ConnectivityService, NetworkingService, SyncingEventReceiver, }, peer_manager::{ @@ -211,9 +211,18 @@ where fn handle_conn_message( &mut self, peer_id: PeerId, - message: PeerManagerMessage, + message: PeerManagerMessageOrTag, ) -> p2p::Result<()> { - match message { + let peer_mgr_message = match message { + PeerManagerMessageOrTag::PeerManagerMessage(message) => message, + PeerManagerMessageOrTag::BlockSyncMessage(_) + | PeerManagerMessageOrTag::TransactionSyncMessage(_) => { + // Ignored + return Ok(()); + } + }; + + match peer_mgr_message { PeerManagerMessage::AddrListRequest(_) => { // Ignored Ok(()) @@ -231,7 +240,7 @@ where } PeerManagerMessage::PingRequest(PingRequest { nonce }) => { self.conn - .send_message( + .send_peer_manager_message( peer_id, PeerManagerMessage::PingResponse(PingResponse { nonce }), ) @@ -382,7 +391,7 @@ where CrawlerCommand::RequestAddresses { peer_id } => { log::debug!("Requesting addresses from peer {peer_id}"); - conn.send_message( + conn.send_peer_manager_message( peer_id, PeerManagerMessage::AddrListRequest(AddrListRequest {}), ) diff --git a/dns-server/src/crawler_p2p/crawler_manager/tests/mock_manager.rs b/dns-server/src/crawler_p2p/crawler_manager/tests/mock_manager.rs index 03d7de43f6..3d624ce1db 100644 --- a/dns-server/src/crawler_p2p/crawler_manager/tests/mock_manager.rs +++ b/dns-server/src/crawler_p2p/crawler_manager/tests/mock_manager.rs @@ -132,7 +132,8 @@ impl MockStateRef { peer_id, message: PeerManagerMessage::AnnounceAddrRequest(AnnounceAddrRequest { address: announced_ip.as_peer_address(), - }), + }) + .into(), }) .unwrap(); } @@ -271,7 +272,11 @@ impl ConnectivityService for MockConnectivityHandle { Ok(()) } - fn send_message(&mut self, _peer_id: PeerId, _request: PeerManagerMessage) -> p2p::Result<()> { + fn send_peer_manager_message( + &mut self, + _peer_id: PeerId, + _request: PeerManagerMessage, + ) -> p2p::Result<()> { Ok(()) } diff --git a/p2p/src/message.rs b/p2p/src/message.rs index 8d1f8263da..38d3bf92fe 100644 --- a/p2p/src/message.rs +++ b/p2p/src/message.rs @@ -25,7 +25,8 @@ use serialization::{Decode, Encode}; use crate::types::peer_address::PeerAddress; -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, strum::EnumDiscriminants)] +#[strum_discriminants(name(BlockSyncMessageTag))] pub enum BlockSyncMessage { HeaderListRequest(HeaderListRequest), BlockListRequest(BlockListRequest), @@ -40,14 +41,16 @@ pub enum BlockSyncMessage { TestSentinel(Id<()>), } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, strum::EnumDiscriminants)] +#[strum_discriminants(name(TransactionSyncMessageTag))] pub enum TransactionSyncMessage { NewTransaction(Id), TransactionRequest(Id), TransactionResponse(TransactionResponse), } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, strum::EnumDiscriminants)] +#[strum_discriminants(name(PeerManagerMessageTag))] pub enum PeerManagerMessage { AddrListRequest(AddrListRequest), AnnounceAddrRequest(AnnounceAddrRequest), diff --git a/p2p/src/net/default_backend/backend.rs b/p2p/src/net/default_backend/backend.rs index 852d1a04ab..5093381236 100644 --- a/p2p/src/net/default_backend/backend.rs +++ b/p2p/src/net/default_backend/backend.rs @@ -44,13 +44,14 @@ use crate::{ config::P2pConfig, disconnection_reason::DisconnectionReason, error::{DialError, P2pError, PeerError}, - message::PeerManagerMessage, net::{ default_backend::{ peer, types::{BackendEvent, Command, PeerEvent}, }, - types::{services::Services, ConnectivityEvent, PeerInfo, SyncingEvent}, + types::{ + services::Services, ConnectivityEvent, PeerInfo, PeerManagerMessageOrTag, SyncingEvent, + }, }, protocol::{ProtocolVersion, SupportedProtocolVersion}, types::{peer_address::PeerAddress, peer_id::PeerId}, @@ -613,7 +614,7 @@ where Ok(()) } - PeerEvent::MessageReceived { message } => { + PeerEvent::MessageReceived(message) => { if self.networking_enabled { self.handle_message(peer_id, message)?; } @@ -696,7 +697,7 @@ where fn handle_message( &mut self, peer_id: PeerId, - message: PeerManagerMessage, + message: PeerManagerMessageOrTag, ) -> crate::Result<()> { // Do not process remaining messages if the peer has been forcibly disconnected (for example, after being banned). // Without this check, the backend might send messages to the sync and peer managers after sending the disconnect notification. diff --git a/p2p/src/net/default_backend/mod.rs b/p2p/src/net/default_backend/mod.rs index 622a77b65c..5af7d88e53 100644 --- a/p2p/src/net/default_backend/mod.rs +++ b/p2p/src/net/default_backend/mod.rs @@ -136,7 +136,11 @@ where Ok(self.cmd_sender.send(types::Command::Disconnect { peer_id, reason })?) } - fn send_message(&mut self, peer_id: PeerId, message: PeerManagerMessage) -> crate::Result<()> { + fn send_peer_manager_message( + &mut self, + peer_id: PeerId, + message: PeerManagerMessage, + ) -> crate::Result<()> { Ok(self.cmd_sender.send(types::Command::SendMessage { peer_id, message: message.into(), diff --git a/p2p/src/net/default_backend/peer.rs b/p2p/src/net/default_backend/peer.rs index 5cac02ec25..4c5ce5eb2a 100644 --- a/p2p/src/net/default_backend/peer.rs +++ b/p2p/src/net/default_backend/peer.rs @@ -31,7 +31,10 @@ use crate::{ disconnection_reason::DisconnectionReason, error::{ConnectionValidationError, P2pError, PeerError, ProtocolError}, message::{BlockSyncMessage, TransactionSyncMessage, WillDisconnectMessage}, - net::default_backend::types::{BackendEvent, PeerEvent}, + net::{ + default_backend::types::{BackendEvent, PeerEvent}, + types::PeerManagerMessageOrTag, + }, protocol::{choose_common_protocol_version, ProtocolVersion, SupportedProtocolVersion}, types::peer_id::PeerId, }; @@ -409,10 +412,28 @@ where .await?; } CategorizedMessage::PeerManagerMessage(msg) => { - peer_event_sender.send(PeerEvent::MessageReceived { message: msg }).await? + peer_event_sender + .send(PeerEvent::MessageReceived( + PeerManagerMessageOrTag::PeerManagerMessage(msg), + )) + .await? + } + CategorizedMessage::BlockSyncMessage(msg) => { + peer_event_sender + .send(PeerEvent::MessageReceived( + PeerManagerMessageOrTag::BlockSyncMessage((&msg).into()), + )) + .await?; + + block_sync_msg_sender.send(msg).await? } - CategorizedMessage::BlockSyncMessage(msg) => block_sync_msg_sender.send(msg).await?, CategorizedMessage::TransactionSyncMessage(msg) => { + peer_event_sender + .send(PeerEvent::MessageReceived( + PeerManagerMessageOrTag::TransactionSyncMessage((&msg).into()), + )) + .await?; + transaction_sync_msg_sender.send(msg).await? } } diff --git a/p2p/src/net/default_backend/types.rs b/p2p/src/net/default_backend/types.rs index 7a1bbf2aad..236e1204e2 100644 --- a/p2p/src/net/default_backend/types.rs +++ b/p2p/src/net/default_backend/types.rs @@ -32,7 +32,7 @@ use crate::{ BlockSyncMessage, HeaderList, HeaderListRequest, PeerManagerMessage, PingRequest, PingResponse, TransactionResponse, TransactionSyncMessage, WillDisconnectMessage, }, - net::types::services::Services, + net::types::{services::Services, PeerManagerMessageOrTag}, protocol::{ProtocolVersion, SupportedProtocolVersion}, types::{peer_address::PeerAddress, peer_id::PeerId}, }; @@ -109,7 +109,7 @@ pub mod peer_event { } } -/// Events sent by Peer to Backend +/// Events sent by `Peer` to `Backend`. #[derive(Debug)] pub enum PeerEvent { /// Peer information received from remote @@ -119,7 +119,7 @@ pub enum PeerEvent { ConnectionClosed, /// Message received from remote - MessageReceived { message: PeerManagerMessage }, + MessageReceived(PeerManagerMessageOrTag), /// Protocol violation Misbehaved { error: P2pError }, @@ -135,7 +135,7 @@ pub enum PeerEvent { }, } -/// Events sent by Backend to Peer +/// Events sent by `Backend` to `Peer`. #[derive(Debug)] pub enum BackendEvent { Accepted { diff --git a/p2p/src/net/mod.rs b/p2p/src/net/mod.rs index 5a32a4603a..10fda32f05 100644 --- a/p2p/src/net/mod.rs +++ b/p2p/src/net/mod.rs @@ -115,18 +115,17 @@ where reason: Option, ) -> crate::Result<()>; - /// Sends a message to the given peer. - fn send_message(&mut self, peer: PeerId, message: PeerManagerMessage) -> crate::Result<()>; + /// Sends a peer manager message to the given peer. + fn send_peer_manager_message( + &mut self, + peer: PeerId, + message: PeerManagerMessage, + ) -> crate::Result<()>; /// Return the socket addresses of the network service provider fn local_addresses(&self) -> &[SocketAddress]; /// Poll events from the network service provider - /// - /// There are three types of events that can be received: - /// - incoming peer connections - /// - new discovered peers - /// - peer expiration events async fn poll_next(&mut self) -> crate::Result; } diff --git a/p2p/src/net/types.rs b/p2p/src/net/types.rs index 571f41c1bc..2d726696f0 100644 --- a/p2p/src/net/types.rs +++ b/p2p/src/net/types.rs @@ -26,7 +26,10 @@ use tokio::sync::mpsc::Receiver; use crate::{ error::ConnectionValidationError, - message::{BlockSyncMessage, PeerManagerMessage, TransactionSyncMessage}, + message::{ + BlockSyncMessage, BlockSyncMessageTag, PeerManagerMessage, PeerManagerMessageTag, + TransactionSyncMessage, TransactionSyncMessageTag, + }, protocol::SupportedProtocolVersion, types::{peer_address::PeerAddress, peer_id::PeerId}, P2pError, @@ -45,7 +48,7 @@ use self::services::Services; serde::Serialize, serde::Deserialize, rpc_description::HasValueHint, - enum_iterator::Sequence, + strum::EnumIter, )] pub enum PeerRole { Inbound, @@ -57,27 +60,42 @@ pub enum PeerRole { } impl PeerRole { - pub fn is_outbound(&self) -> bool { - use PeerRole::*; - + pub fn as_outbound(&self) -> Option { match self { - Inbound => false, - OutboundFullRelay | OutboundBlockRelay | OutboundReserved | OutboundManual | Feeler => { - true - } + Self::Inbound => None, + Self::OutboundFullRelay => Some(OutboundPeerRole::OutboundFullRelay), + Self::OutboundBlockRelay => Some(OutboundPeerRole::OutboundBlockRelay), + Self::OutboundReserved => Some(OutboundPeerRole::OutboundReserved), + Self::OutboundManual => Some(OutboundPeerRole::OutboundManual), + Self::Feeler => Some(OutboundPeerRole::Feeler), } } - pub fn is_outbound_manual(&self) -> bool { - use PeerRole::*; + pub fn is_outbound(&self) -> bool { + self.as_outbound().is_some() + } + pub fn is_outbound_manual(&self) -> bool { match self { - OutboundManual => true, - Inbound | OutboundFullRelay | OutboundBlockRelay | OutboundReserved | Feeler => false, + Self::OutboundManual => true, + Self::Inbound + | Self::OutboundFullRelay + | Self::OutboundBlockRelay + | Self::OutboundReserved + | Self::Feeler => false, } } } +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, strum::EnumIter)] +pub enum OutboundPeerRole { + OutboundFullRelay, + OutboundBlockRelay, + OutboundReserved, + OutboundManual, + Feeler, +} + /// Peer information learned during handshaking /// /// When an inbound/outbound connection succeeds, the networking service handshakes with the remote @@ -142,13 +160,19 @@ impl Display for PeerInfo { } } -/// Connectivity-related events received from the network +/// Events available via the `ConnectivityService` trait (normally implemented by `NetworkingService::ConnectivityHandle`). +/// +/// Note: `PeerManager` is the main consumer of these events. #[derive(Debug)] pub enum ConnectivityEvent { + /// A message received from a peer. + /// + /// Note that only a message tag is present here for block and transaction sync messages. Message { peer_id: PeerId, - message: PeerManagerMessage, + message: PeerManagerMessageOrTag, }, + /// Outbound connection accepted OutboundAccepted { /// Peer address @@ -224,7 +248,44 @@ pub enum ConnectivityEvent { }, } -/// Syncing-related events (sent from the backend) +/// Either a full `PeerManagerMessage` or, if it's a sync message, the corresponding tag. +#[derive(Debug, Clone)] +pub enum PeerManagerMessageOrTag { + PeerManagerMessage(PeerManagerMessage), + BlockSyncMessage(BlockSyncMessageTag), + TransactionSyncMessage(TransactionSyncMessageTag), +} + +impl From for PeerManagerMessageOrTag { + fn from(value: PeerManagerMessage) -> Self { + Self::PeerManagerMessage(value) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ConnectivityEventMessageTag { + PeerManagerMessage(PeerManagerMessageTag), + BlockSyncMessage(BlockSyncMessageTag), + TransactionSyncMessage(TransactionSyncMessageTag), +} + +impl From<&'_ PeerManagerMessageOrTag> for ConnectivityEventMessageTag { + fn from(value: &'_ PeerManagerMessageOrTag) -> Self { + match value { + PeerManagerMessageOrTag::PeerManagerMessage(msg) => { + Self::PeerManagerMessage(msg.into()) + } + PeerManagerMessageOrTag::BlockSyncMessage(tag) => Self::BlockSyncMessage(*tag), + PeerManagerMessageOrTag::TransactionSyncMessage(tag) => { + Self::TransactionSyncMessage(*tag) + } + } + } +} + +/// Events obtainable via the `SyncingEventReceiver` trait (normally implemented by `NetworkingService::SyncingEventReceiver`). +/// +/// Note: `SyncManager` is the consumer of these events. #[derive(Debug)] pub enum SyncingEvent { /// Peer connected diff --git a/p2p/src/peer_manager/mod.rs b/p2p/src/peer_manager/mod.rs index ee7223e73e..6495c7169e 100644 --- a/p2p/src/peer_manager/mod.rs +++ b/p2p/src/peer_manager/mod.rs @@ -45,7 +45,10 @@ use logging::log; use networking::types::ConnectionDirection; use p2p_types::{bannable_address::BannableAddress, socket_address::SocketAddress, IsGlobalIp}; use randomness::{seq::IteratorRandom, BoxedRngMutexWrapper, Rng, RngCore}; -use utils::{bloom_filters::rolling_bloom_filter::RollingBloomFilter, ensure, set_flag::SetFlag}; +use utils::{ + bloom_filters::rolling_bloom_filter::RollingBloomFilter, debug_panic_or_log, ensure, + set_flag::SetFlag, +}; use utils_networking::IpOrSocketAddress; use crate::{ @@ -60,7 +63,8 @@ use crate::{ net::{ types::{ services::{Service, Services}, - ConnectivityEvent, PeerInfo, PeerRole, + ConnectivityEvent, ConnectivityEventMessageTag, PeerInfo, PeerManagerMessageOrTag, + PeerRole, }, ConnectivityService, NetworkingService, }, @@ -1126,9 +1130,12 @@ where let old_value = self.peers.insert(peer_id, peer); assert!(old_value.is_none()); - if peer_role.is_outbound() { - self.peerdb - .outbound_peer_connected(peer_address, Self::lock_rng(&self.rng).deref_mut()); + if let Some(outbound_peer_role) = peer_role.as_outbound() { + self.peerdb.outbound_peer_connected( + peer_address, + outbound_peer_role, + Self::lock_rng(&self.rng).deref_mut(), + ); } if peer_role == PeerRole::OutboundBlockRelay { @@ -1265,9 +1272,13 @@ where | OutboundConnectType::Reserved | OutboundConnectType::Feeler => {} OutboundConnectType::Manual { response_sender } => { - response_sender.send(Err(error)); + response_sender.send(Err(error.clone())); } } + + if let Some(o) = self.observer.as_mut() { + o.outbound_error(address, error) + } } /// The connection to a remote peer is reported as closed. @@ -1310,6 +1321,10 @@ where } self.subscribed_to_peer_addresses.remove(&peer_id); + + if let Some(o) = self.observer.as_mut() { + o.connection_closed(peer_id) + } } } @@ -1320,7 +1335,7 @@ where ) { // `send_message` should not fail, but even if it does, the error can be ignored // because sending messages over the network does not guarantee that they will be received - let res = peer_connectivity_handle.send_message(peer_id, message); + let res = peer_connectivity_handle.send_peer_manager_message(peer_id, message); if let Err(err) = res { log::error!("send_message failed unexpectedly: {err:?}"); } @@ -1537,6 +1552,7 @@ where Self::lock_rng(&self.rng).deref_mut(), ); if let Some(address) = address_opt { + log::debug!("Need to establish a feeler connection to address {address}"); self.connect(address, OutboundConnectType::Feeler); self.next_feeler_connection_time = Self::choose_next_feeler_connection_time( &self.p2p_config, @@ -1547,20 +1563,67 @@ where } } - fn handle_incoming_message(&mut self, peer: PeerId, message: PeerManagerMessage) { - match message { - PeerManagerMessage::AddrListRequest(_) => self.handle_addr_list_request(peer), - PeerManagerMessage::AnnounceAddrRequest(r) => { - self.handle_announce_addr_request(peer, r.address) - } - PeerManagerMessage::PingRequest(r) => self.handle_ping_request(peer, r.nonce), - PeerManagerMessage::AddrListResponse(r) => { - self.handle_addr_list_response(peer, r.addresses) - } - PeerManagerMessage::PingResponse(r) => self.handle_ping_response(peer, r.nonce), - PeerManagerMessage::WillDisconnect(msg) => { - self.handle_will_disconnect_messgae(peer, msg) + fn handle_incoming_message(&mut self, peer_id: PeerId, message: PeerManagerMessageOrTag) { + let is_disconnection_message = match &message { + PeerManagerMessageOrTag::PeerManagerMessage(msg) => match msg { + PeerManagerMessage::WillDisconnect(_) => true, + + PeerManagerMessage::AddrListRequest(_) + | PeerManagerMessage::AnnounceAddrRequest(_) + | PeerManagerMessage::PingRequest(_) + | PeerManagerMessage::AddrListResponse(_) + | PeerManagerMessage::PingResponse(_) => false, + }, + PeerManagerMessageOrTag::BlockSyncMessage(_) + | PeerManagerMessageOrTag::TransactionSyncMessage(_) => false, + }; + + // Note: `PeerContext` must always exist when an incoming message arrives, and the individual + // "handle_" methods called below `expect` on this fact. Here we do an explicit check + // just in case, so that if things go horribly wrong, only the debug build will panic. + if let Some(peer) = self.peers.get(&peer_id) { + if !is_disconnection_message && peer.peer_role.is_outbound() { + self.peerdb.outbound_peer_has_activity( + peer.peer_address, + Self::lock_rng(&self.rng).deref_mut(), + ); } + } else { + debug_panic_or_log!( + "Peer context for {peer_id} not found when handling incoming message {message:?}" + ); + return; + } + + let message_tag: ConnectivityEventMessageTag = (&message).into(); + + match message { + PeerManagerMessageOrTag::PeerManagerMessage(msg) => match msg { + PeerManagerMessage::AddrListRequest(_) => { + self.handle_addr_list_request(peer_id); + } + PeerManagerMessage::AnnounceAddrRequest(msg) => { + self.handle_announce_addr_request(peer_id, msg.address); + } + PeerManagerMessage::PingRequest(msg) => { + self.handle_ping_request(peer_id, msg.nonce); + } + PeerManagerMessage::AddrListResponse(msg) => { + self.handle_addr_list_response(peer_id, msg.addresses); + } + PeerManagerMessage::PingResponse(msg) => { + self.handle_ping_response(peer_id, msg.nonce); + } + PeerManagerMessage::WillDisconnect(msg) => { + self.handle_will_disconnect_messgae(peer_id, msg); + } + }, + PeerManagerMessageOrTag::BlockSyncMessage(_) + | PeerManagerMessageOrTag::TransactionSyncMessage(_) => {} + }; + + if let Some(o) = self.observer.as_mut() { + o.message_received(peer_id, message_tag) } } @@ -2397,12 +2460,28 @@ where pub trait Observer { fn on_peer_ban_score_adjustment(&mut self, address: SocketAddress, new_score: u32); + fn on_peer_ban(&mut self, address: BannableAddress); + fn on_peer_discouragement(&mut self, address: BannableAddress); + // This will be called at the end of "heartbeat" function. fn on_heartbeat(&mut self); + // This will be called for both incoming and outgoing connections. fn on_connection_accepted(&mut self, address: SocketAddress, peer_role: PeerRole); + + // This will be called after `ConnectivityEvent::ConnectionError` has been handled by + // the peer manager. + fn outbound_error(&mut self, address: SocketAddress, error: P2pError); + + // This will be called after `ConnectivityEvent::ConnectionClosed` has been handled by + // the peer manager. + fn connection_closed(&mut self, peer_id: PeerId); + + // This will be called after `ConnectivityEvent::Message` has been handled by + // the peer manager. + fn message_received(&mut self, peer_id: PeerId, message_tag: ConnectivityEventMessageTag); } pub trait PeerManagerInterface { @@ -2417,6 +2496,9 @@ pub trait PeerManagerInterface { #[cfg(test)] fn peer_db_mut(&mut self) -> &mut dyn peerdb::PeerDbInterface; + + #[cfg(test)] + fn next_feeler_connection_time(&self) -> Time; } impl PeerManagerInterface for PeerManager @@ -2444,6 +2526,11 @@ where fn peer_db_mut(&mut self) -> &mut dyn peerdb::PeerDbInterface { &mut self.peerdb } + + #[cfg(test)] + fn next_feeler_connection_time(&self) -> Time { + self.next_feeler_connection_time + } } #[derive(Copy, Clone, Debug)] diff --git a/p2p/src/peer_manager/peerdb/address_data/mod.rs b/p2p/src/peer_manager/peerdb/address_data/mod.rs index c9ddcd9f96..6634a7a55b 100644 --- a/p2p/src/peer_manager/peerdb/address_data/mod.rs +++ b/p2p/src/peer_manager/peerdb/address_data/mod.rs @@ -17,6 +17,9 @@ use std::time::Duration; use common::primitives::time::Time; use randomness::Rng; +use utils::debug_panic_or_log; + +use crate::net::types::OutboundPeerRole; /// Maximum delay between reconnection attempts to reserved nodes const MAX_DELAY_RESERVED: Duration = Duration::from_secs(360); @@ -43,9 +46,17 @@ pub const PURGE_REACHABLE_FAIL_COUNT: u32 = /// -ln(0.0000000000000035527136788) which is about 33. const MAX_DELAY_FACTOR: u32 = 30; +// Note: AddressState/AddressData only track outbound connections, so if an inbound connection exists +// from a given address, its AddressState may still be Disconnected or even Unreachable. #[derive(Debug, Clone)] pub enum AddressState { - Connected {}, + Connected { + /// Whether the peer has shown some activity (i.e. sent us any message except for WillDisconnect) + /// during this connection. + had_activity: bool, + + peer_role: OutboundPeerRole, + }, Disconnected { /// Whether the address was reachable at least once. @@ -66,30 +77,26 @@ pub enum AddressState { }, } -#[derive(Copy, Clone, Debug)] -// Update `ALL_TRANSITIONS` if a new transition is added! +#[derive(Copy, Clone, Debug, strum::EnumDiscriminants)] +#[strum_discriminants(name(AddressStateTransitionToTag), derive(strum::EnumIter))] pub enum AddressStateTransitionTo { - Connected, + Connected { peer_role: OutboundPeerRole }, + HasActivity, Disconnected, ConnectionFailed, SetReserved, UnsetReserved, } -#[cfg(test)] -pub const ALL_TRANSITIONS: [AddressStateTransitionTo; 5] = [ - AddressStateTransitionTo::Connected, - AddressStateTransitionTo::Disconnected, - AddressStateTransitionTo::ConnectionFailed, - AddressStateTransitionTo::SetReserved, - AddressStateTransitionTo::UnsetReserved, -]; - -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct AddressData { state: AddressState, reserved: bool, + + /// The number of consecutive successful completed connections during which the peer had no activity + /// (i.e. hadn't sent us any message except for WillDisconnect). + connections_without_activity_count: u32, } impl AddressData { @@ -101,6 +108,7 @@ impl AddressData { next_connect_after: now, }, reserved, + connections_without_activity_count: 0, } } @@ -108,14 +116,26 @@ impl AddressData { &self.state } + #[cfg(test)] + pub fn state_mut(&mut self) -> &mut AddressState { + &mut self.state + } + pub fn reserved(&self) -> bool { self.reserved } + pub fn connections_without_activity_count(&self) -> u32 { + self.connections_without_activity_count + } + /// Returns true when it is time to attempt a new outbound connection pub fn connect_now(&self, now: Time) -> bool { match self.state { - AddressState::Connected {} => false, + AddressState::Connected { + had_activity: _, + peer_role: _, + } => false, // Once a peer is disconnected by the RPC command, it should remain disconnected // (at least until the RPC requests to connect). Otherwise, users may be surprised @@ -133,7 +153,10 @@ impl AddressData { /// Returns true if the address should be kept in memory pub fn retain(&self, now: Time) -> bool { match self.state { - AddressState::Connected {} => true, + AddressState::Connected { + had_activity: _, + peer_role: _, + } => true, AddressState::Disconnected { was_reachable: _, fail_count: _, @@ -151,7 +174,7 @@ impl AddressData { matches!(self.state, AddressState::Unreachable { .. }) } - fn next_connect_delay(fail_count: u32, reserved: bool) -> Duration { + fn next_connect_delay(effective_fail_count: u32, reserved: bool) -> Duration { let max_delay = if reserved { MAX_DELAY_RESERVED } else { @@ -160,14 +183,25 @@ impl AddressData { // 10, 20, 40, 80... seconds std::cmp::min( - Duration::from_secs(10).saturating_mul(2u32.saturating_pow(fail_count)), + Duration::from_secs(10).saturating_mul(2u32.saturating_pow(effective_fail_count)), max_delay, ) } - fn next_connect_time(now: Time, fail_count: u32, reserved: bool, rng: &mut impl Rng) -> Time { + fn next_connect_time( + now: Time, + fail_count: u32, + connections_without_activity_count: u32, + reserved: bool, + rng: &mut impl Rng, + ) -> Time { + // Note: fail_count is reset whenever any successful outbound connection is made, but + // connections_without_activity_count is not reset when an outbound connection fails, + // so it's possible for both of them to be non-zero. + let effective_fail_count = std::cmp::max(fail_count, connections_without_activity_count); + let factor = utils::exp_rand::exponential_rand(rng).clamp(0.0, MAX_DELAY_FACTOR as f64); - let offset = Self::next_connect_delay(fail_count, reserved).mul_f64(factor); + let offset = Self::next_connect_delay(effective_fail_count, reserved).mul_f64(factor); (now + offset).expect("Unexpected time addition overflow") } @@ -178,35 +212,124 @@ impl AddressData { rng: &mut impl Rng, ) { self.state = match transition { - AddressStateTransitionTo::Connected => match self.state { - AddressState::Connected {} => unreachable!(), + AddressStateTransitionTo::Connected { peer_role } => match self.state { + AddressState::Connected { + had_activity: _, + peer_role: _, + } => { + debug_panic_or_log!( + "Unexpected address state transition: Connected -> Connected" + ); + self.state.clone() + } AddressState::Disconnected { fail_count: _, next_connect_after: _, was_reachable: _, - } => AddressState::Connected {}, + } => AddressState::Connected { + had_activity: false, + peer_role, + }, AddressState::Unreachable { erase_after: _ } => { // Connection to an `Unreachable` node may be requested by RPC at any moment - AddressState::Connected {} + AddressState::Connected { + had_activity: false, + peer_role, + } } }, - AddressStateTransitionTo::Disconnected => match self.state { - AddressState::Connected {} => AddressState::Disconnected { - fail_count: 0, - next_connect_after: Self::next_connect_time(now, 0, self.reserved, rng), - was_reachable: true, + AddressStateTransitionTo::HasActivity => match self.state { + AddressState::Connected { + had_activity: _, + peer_role, + } => AddressState::Connected { + had_activity: true, + peer_role, }, AddressState::Disconnected { fail_count: _, next_connect_after: _, was_reachable: _, - } => unreachable!(), - AddressState::Unreachable { erase_after: _ } => unreachable!(), + } => { + debug_panic_or_log!( + "Unexpected address state transition: Disconnected -> HasActivity" + ); + self.state.clone() + } + AddressState::Unreachable { erase_after: _ } => { + debug_panic_or_log!( + "Unexpected address state transition: Unreachable -> HasActivity" + ); + self.state.clone() + } + }, + + AddressStateTransitionTo::Disconnected => match self.state { + AddressState::Connected { + had_activity, + peer_role, + } => { + if had_activity { + self.connections_without_activity_count = 0; + } else { + // We don't increase the counter for: + // 1) feeler connections (because it gets disconnected immediately, so the peer may + // not have the chance to send us anything); + // 2) manual connections; + let is_regular_auto_connection = match peer_role { + OutboundPeerRole::OutboundFullRelay + | OutboundPeerRole::OutboundBlockRelay + | OutboundPeerRole::OutboundReserved => true, + + OutboundPeerRole::Feeler | OutboundPeerRole::OutboundManual => false, + }; + + if is_regular_auto_connection { + self.connections_without_activity_count += 1; + } + } + + AddressState::Disconnected { + fail_count: 0, + next_connect_after: Self::next_connect_time( + now, + 0, + self.connections_without_activity_count, + self.reserved, + rng, + ), + was_reachable: true, + } + } + AddressState::Disconnected { + fail_count: _, + next_connect_after: _, + was_reachable: _, + } => { + debug_panic_or_log!( + "Unexpected address state transition: Disconnected -> Disconnected" + ); + self.state.clone() + } + AddressState::Unreachable { erase_after: _ } => { + debug_panic_or_log!( + "Unexpected address state transition: Unreachable -> Disconnected" + ); + self.state.clone() + } }, AddressStateTransitionTo::ConnectionFailed => match self.state { - AddressState::Connected {} => unreachable!(), + AddressState::Connected { + had_activity: _, + peer_role: _, + } => { + debug_panic_or_log!( + "Unexpected address state transition: Connected -> ConnectionFailed" + ); + self.state.clone() + } AddressState::Disconnected { fail_count, next_connect_after: _, @@ -218,6 +341,7 @@ impl AddressData { next_connect_after: Self::next_connect_time( now, fail_count + 1, + self.connections_without_activity_count, self.reserved, rng, ), @@ -236,6 +360,7 @@ impl AddressData { next_connect_after: Self::next_connect_time( now, fail_count + 1, + self.connections_without_activity_count, self.reserved, rng, ), @@ -254,7 +379,13 @@ impl AddressData { // Change to Disconnected if currently Unreachable match self.state { - AddressState::Connected {} => AddressState::Connected {}, + AddressState::Connected { + had_activity, + peer_role, + } => AddressState::Connected { + had_activity, + peer_role, + }, AddressState::Disconnected { was_reachable, fail_count, @@ -265,6 +396,7 @@ impl AddressData { next_connect_after: Self::next_connect_time( now, fail_count, + self.connections_without_activity_count, self.reserved, rng, ), @@ -272,7 +404,13 @@ impl AddressData { // Reserved nodes should not be in the `Unreachable` state AddressState::Unreachable { erase_after: _ } => AddressState::Disconnected { fail_count: 0, - next_connect_after: Self::next_connect_time(now, 0, self.reserved, rng), + next_connect_after: Self::next_connect_time( + now, + 0, + self.connections_without_activity_count, + self.reserved, + rng, + ), was_reachable: false, }, } @@ -283,7 +421,13 @@ impl AddressData { // Do not change the state match self.state { - AddressState::Connected {} => AddressState::Connected {}, + AddressState::Connected { + had_activity, + peer_role, + } => AddressState::Connected { + had_activity, + peer_role, + }, AddressState::Disconnected { was_reachable, fail_count, diff --git a/p2p/src/peer_manager/peerdb/address_data/tests.rs b/p2p/src/peer_manager/peerdb/address_data/tests.rs index a4e119a04a..b7427cd119 100644 --- a/p2p/src/peer_manager/peerdb/address_data/tests.rs +++ b/p2p/src/peer_manager/peerdb/address_data/tests.rs @@ -14,10 +14,13 @@ // limitations under the License. use rstest::rstest; +use strum::IntoEnumIterator as _; +use logging::log; use randomness::{ distributions::{Distribution, WeightedIndex}, rngs::StepRng, + seq::IteratorRandom as _, Rng, }; use test_utils::random::{make_seedable_rng, Seed}; @@ -32,8 +35,9 @@ fn randomized(#[case] seed: Seed) { let mut rng = make_seedable_rng(seed); let started_at = Time::from_duration_since_epoch(Duration::ZERO); - let weights = [100, 100, 100, 10, 10]; - assert_eq!(weights.len(), ALL_TRANSITIONS.len()); + let weights = [100, 100, 100, 100, 10, 10]; + let all_transition_tags = AddressStateTransitionToTag::iter().collect::>(); + assert_eq!(weights.len(), all_transition_tags.len()); let weights = WeightedIndex::new(weights).unwrap(); for _ in 0..100 { @@ -43,10 +47,28 @@ fn randomized(#[case] seed: Seed) { let mut address_data = AddressData::new(was_reachable, reserved, started_at); for _ in 0..1000 { - let transition = ALL_TRANSITIONS[weights.sample(&mut rng)]; + let transition_tag = all_transition_tags[weights.sample(&mut rng)]; + + let transition = match transition_tag { + AddressStateTransitionToTag::Connected => AddressStateTransitionTo::Connected { + peer_role: OutboundPeerRole::iter().choose(&mut rng).unwrap(), + }, + AddressStateTransitionToTag::HasActivity => AddressStateTransitionTo::HasActivity, + AddressStateTransitionToTag::Disconnected => AddressStateTransitionTo::Disconnected, + AddressStateTransitionToTag::ConnectionFailed => { + AddressStateTransitionTo::ConnectionFailed + } + AddressStateTransitionToTag::SetReserved => AddressStateTransitionTo::SetReserved, + AddressStateTransitionToTag::UnsetReserved => { + AddressStateTransitionTo::UnsetReserved + } + }; let is_valid_transition = match transition { - AddressStateTransitionTo::Connected => !address_data.is_connected(), + AddressStateTransitionTo::Connected { peer_role: _ } => { + !address_data.is_connected() + } + AddressStateTransitionTo::HasActivity => address_data.is_connected(), AddressStateTransitionTo::Disconnected => address_data.is_connected(), AddressStateTransitionTo::ConnectionFailed => !address_data.is_connected(), AddressStateTransitionTo::SetReserved => true, @@ -100,17 +122,29 @@ fn next_connect_time_test_impl(rng: &mut impl Rng) { let max_time_reserved = (start_time + limit_reserved).unwrap(); let max_time_reachable = (start_time + limit_reachable).unwrap(); - let time = AddressData::next_connect_time(start_time, 0, true, rng); - assert!(time <= max_time_reserved); - - let time = AddressData::next_connect_time(start_time, 0, false, rng); - assert!(time <= max_time_reachable); - - let time = AddressData::next_connect_time(start_time, u32::MAX, true, rng); - assert!(time <= max_time_reserved); - - let time = AddressData::next_connect_time(start_time, u32::MAX, false, rng); - assert!(time <= max_time_reachable); + for fail_count in [0, u32::MAX] { + for connections_without_activity_count in [0, u32::MAX] { + log::debug!("fail_count = {fail_count}, connections_without_activity_count = {connections_without_activity_count}"); + + let time = AddressData::next_connect_time( + start_time, + fail_count, + connections_without_activity_count, + true, + rng, + ); + assert!(time <= max_time_reserved); + + let time = AddressData::next_connect_time( + start_time, + fail_count, + connections_without_activity_count, + false, + rng, + ); + assert!(time <= max_time_reachable); + } + } } #[test] diff --git a/p2p/src/peer_manager/peerdb/mod.rs b/p2p/src/peer_manager/peerdb/mod.rs index 2d6cde287f..1b4f414e7b 100644 --- a/p2p/src/peer_manager/peerdb/mod.rs +++ b/p2p/src/peer_manager/peerdb/mod.rs @@ -43,7 +43,7 @@ use logging::log; use p2p_types::{bannable_address::BannableAddress, socket_address::SocketAddress}; use randomness::{seq::IteratorRandom, Rng, SliceRandom}; -use crate::config::P2pConfig; +use crate::{config::P2pConfig, net::types::OutboundPeerRole}; use self::{ address_data::{AddressData, AddressStateTransitionTo}, @@ -404,10 +404,18 @@ impl PeerDb { /// Mark peer as connected /// - /// After `PeerManager` has established either an inbound or an outbound connection, - /// it informs the `PeerDb` about it. - pub fn outbound_peer_connected(&mut self, address: SocketAddress, rng: &mut impl Rng) { - self.change_address_state(address, AddressStateTransitionTo::Connected, rng); + /// After `PeerManager` has established an outbound connection, it informs `PeerDb` about it. + pub fn outbound_peer_connected( + &mut self, + address: SocketAddress, + peer_role: OutboundPeerRole, + rng: &mut impl Rng, + ) { + self.change_address_state( + address, + AddressStateTransitionTo::Connected { peer_role }, + rng, + ); self.move_addr_to_tried(&address); } @@ -416,6 +424,11 @@ impl PeerDb { self.change_address_state(address, AddressStateTransitionTo::Disconnected, rng); } + /// Report that the peer has sent us a message other than the one indicating an intent to disconnect. + pub fn outbound_peer_has_activity(&mut self, address: SocketAddress, rng: &mut impl Rng) { + self.change_address_state(address, AddressStateTransitionTo::HasActivity, rng); + } + pub fn remove_address(&mut self, address: &SocketAddress) { if !self.reserved_nodes.contains(address) { self.addresses.remove(address); @@ -658,6 +671,9 @@ pub trait PeerDbInterface { fn is_address_discouraged(&self, address: &BannableAddress) -> bool; fn peer_discovered(&mut self, address: SocketAddress); + + fn address_data(&self, address: &SocketAddress) -> Option<&AddressData>; + fn address_data_mut(&mut self, address: &SocketAddress) -> Option<&mut AddressData>; } impl PeerDbInterface for PeerDb { @@ -672,6 +688,14 @@ impl PeerDbInterface for PeerDb { fn peer_discovered(&mut self, address: SocketAddress) { self.peer_discovered(address); } + + fn address_data(&self, address: &SocketAddress) -> Option<&AddressData> { + self.addresses.get(address) + } + + fn address_data_mut(&mut self, address: &SocketAddress) -> Option<&mut AddressData> { + self.addresses.get_mut(address) + } } #[cfg(test)] diff --git a/p2p/src/peer_manager/peerdb/tests.rs b/p2p/src/peer_manager/peerdb/tests.rs index 25742640b7..7fc6dec7f7 100644 --- a/p2p/src/peer_manager/peerdb/tests.rs +++ b/p2p/src/peer_manager/peerdb/tests.rs @@ -22,6 +22,7 @@ use std::{ use itertools::Itertools; use rstest::rstest; +use strum::IntoEnumIterator as _; use ::test_utils::{ random::{make_seedable_rng, Seed}, @@ -30,10 +31,11 @@ use ::test_utils::{ use common::{chain::config::create_unit_test_config, primitives::time::Time}; use networking::test_helpers::TestAddressMaker; use p2p_types::socket_addr_ext::SocketAddrExt; -use randomness::Rng; +use randomness::{seq::IteratorRandom as _, Rng}; use crate::{ ban_config::BanConfig, + net::types::OutboundPeerRole, peer_manager::{ peerdb::{ address_data::{self, PURGE_REACHABLE_FAIL_COUNT, PURGE_UNREACHABLE_TIME}, @@ -505,9 +507,10 @@ fn connected_unreachable(#[case] seed: Seed) { peerdb.report_outbound_failure(address, &mut rng); assert!(peerdb.addresses.get(&address).unwrap().is_unreachable()); - // User requests connection to the currently unreachable node via RPC and connection succeeds. + // PeerManager attempts a connection to a currently unreachable node and the connection succeeds. // PeerDb should process that normally. - peerdb.outbound_peer_connected(address, &mut rng); + let peer_role = OutboundPeerRole::iter().choose(&mut rng).unwrap(); + peerdb.outbound_peer_connected(address, peer_role, &mut rng); assert!(peerdb.addresses.get(&address).unwrap().is_connected()); assert_addr_consistency(&peerdb); @@ -535,9 +538,10 @@ fn connected_unknown(#[case] seed: Seed) { let address = TestAddressMaker::new_random_address(&mut rng).into(); - // User requests connection to some unknown node via RPC and connection succeeds. + // PeerManager attempts a connection to some unknown node and the connection succeeds. // PeerDb should process that normally. - peerdb.outbound_peer_connected(address, &mut rng); + let peer_role = OutboundPeerRole::iter().choose(&mut rng).unwrap(); + peerdb.outbound_peer_connected(address, peer_role, &mut rng); assert!(peerdb.addresses.get(&address).unwrap().is_connected()); assert_addr_consistency(&peerdb); @@ -605,8 +609,7 @@ fn anchor_peers(#[case] seed: Seed) { assert_addr_consistency(&peerdb); } -// Call 'remove_address' on new and tried addresses, check that the db is -// in consistent state. +// Call 'remove_address' on new and tried addresses, check that the db is in a consistent state. #[tracing::instrument(skip(seed))] #[rstest] #[trace] @@ -652,7 +655,8 @@ fn remove_addr(#[case] seed: Seed) { } for addr in &tried_addrs { - peerdb.outbound_peer_connected(*addr, &mut rng); + let peer_role = OutboundPeerRole::iter().choose(&mut rng).unwrap(); + peerdb.outbound_peer_connected(*addr, peer_role, &mut rng); } for addr in new_addrs_to_remove.iter().chain(tried_addrs_to_remove.iter()) { @@ -707,7 +711,8 @@ fn remove_unreachable(#[case] seed: Seed) { } for addr in &tried_addrs { - peerdb.outbound_peer_connected(*addr, &mut rng); + let peer_role = OutboundPeerRole::iter().choose(&mut rng).unwrap(); + peerdb.outbound_peer_connected(*addr, peer_role, &mut rng); } assert_eq!(new_addr_table(&peerdb).addr_count(), addr_count); @@ -862,7 +867,8 @@ fn tried_addr_count_limit(#[case] seed: Seed, #[values(true, false)] use_reserve peerdb.add_reserved_node(addr, &mut rng); } - peerdb.outbound_peer_connected(addr, &mut rng); + let peer_role = OutboundPeerRole::iter().choose(&mut rng).unwrap(); + peerdb.outbound_peer_connected(addr, peer_role, &mut rng); if use_reserved_nodes && i % 3 == 1 { peerdb.add_reserved_node(addr, &mut rng); @@ -934,7 +940,9 @@ fn new_tried_addr_selection_frequency() { peerdb.peer_discovered(addr); } for addr in tried_addrs { - peerdb.outbound_peer_connected(addr, &mut rng); + let peer_role = OutboundPeerRole::iter().choose(&mut rng).unwrap(); + peerdb.outbound_peer_connected(addr, peer_role, &mut rng); + // Mark the address as disconnected, otherwise it won't be selected by // select_non_reserved_outbound_addresses. peerdb.outbound_peer_disconnected(addr, &mut rng); diff --git a/p2p/src/peer_manager/tests/ban.rs b/p2p/src/peer_manager/tests/ban.rs index 9ac55eebb1..5ea8dee7a3 100644 --- a/p2p/src/peer_manager/tests/ban.rs +++ b/p2p/src/peer_manager/tests/ban.rs @@ -478,7 +478,8 @@ async fn banned_address_is_not_announced(#[case] seed: Seed) { peer_id: peer1_id, message: PeerManagerMessage::AnnounceAddrRequest(AnnounceAddrRequest { address: banned_addr.as_peer_address(), - }), + }) + .into(), }) .unwrap(); @@ -490,7 +491,8 @@ async fn banned_address_is_not_announced(#[case] seed: Seed) { peer_id: peer1_id, message: PeerManagerMessage::AnnounceAddrRequest(AnnounceAddrRequest { address: normal_addr.as_peer_address(), - }), + }) + .into(), }) .unwrap(); @@ -595,7 +597,7 @@ async fn banned_address_not_in_addr_response(#[case] seed: Seed) { conn_event_sender .send(ConnectivityEvent::Message { peer_id, - message: PeerManagerMessage::AddrListRequest(AddrListRequest {}), + message: PeerManagerMessage::AddrListRequest(AddrListRequest {}).into(), }) .unwrap(); diff --git a/p2p/src/peer_manager/tests/connections.rs b/p2p/src/peer_manager/tests/connections.rs index c0a45fddc7..8661f790d0 100644 --- a/p2p/src/peer_manager/tests/connections.rs +++ b/p2p/src/peer_manager/tests/connections.rs @@ -70,7 +70,7 @@ use crate::{ tests::{ get_connected_peers, make_peer_manager, make_standalone_peer_manager, run_peer_manager, utils::{ - expect_cmd_connect_to, expect_cmd_connect_to_one_of, + connection_closed, expect_cmd_connect_to, expect_cmd_connect_to_one_of, inbound_block_relay_peer_accepted_by_backend, make_full_relay_peer_info, mutate_peer_manager, outbound_full_relay_peer_accepted_by_backend, query_peer_manager, recv_command_advance_time, start_manually_connecting, @@ -2037,11 +2037,7 @@ async fn feeler_connections_test_impl(seed: Seed) { } ); - conn_event_sender - .send(ConnectivityEvent::ConnectionClosed { - peer_id: cur_peer_id, - }) - .unwrap(); + connection_closed(&conn_event_sender, cur_peer_id); successful_conn_addresses.insert(addr); had_successful_feelers = true; diff --git a/p2p/src/peer_manager/tests/discouragement.rs b/p2p/src/peer_manager/tests/discouragement.rs index 7a6ea73865..6cd1d4d8be 100644 --- a/p2p/src/peer_manager/tests/discouragement.rs +++ b/p2p/src/peer_manager/tests/discouragement.rs @@ -550,7 +550,8 @@ async fn discouraged_address_is_not_announced(#[case] seed: Seed) { peer_id: peer1_id, message: PeerManagerMessage::AnnounceAddrRequest(AnnounceAddrRequest { address: discouraged_addr.as_peer_address(), - }), + }) + .into(), }) .unwrap(); @@ -562,7 +563,8 @@ async fn discouraged_address_is_not_announced(#[case] seed: Seed) { peer_id: peer1_id, message: PeerManagerMessage::AnnounceAddrRequest(AnnounceAddrRequest { address: normal_addr.as_peer_address(), - }), + }) + .into(), }) .unwrap(); @@ -666,7 +668,7 @@ async fn discouraged_address_not_in_addr_response(#[case] seed: Seed) { conn_event_sender .send(ConnectivityEvent::Message { peer_id, - message: PeerManagerMessage::AddrListRequest(AddrListRequest {}), + message: PeerManagerMessage::AddrListRequest(AddrListRequest {}).into(), }) .unwrap(); diff --git a/p2p/src/peer_manager/tests/mod.rs b/p2p/src/peer_manager/tests/mod.rs index 3e36ebc760..4658e7e6ee 100644 --- a/p2p/src/peer_manager/tests/mod.rs +++ b/p2p/src/peer_manager/tests/mod.rs @@ -21,6 +21,7 @@ mod discouragement; mod eviction; mod peer_types; mod ping; +mod unsuccessful_connection_counter_update; pub mod utils; mod whitelist; @@ -245,13 +246,18 @@ async fn send_and_sync( cmd_receiver: &mut UnboundedReceiver, rng: &mut impl Rng, ) { - conn_event_sender.send(ConnectivityEvent::Message { peer_id, message }).unwrap(); + conn_event_sender + .send(ConnectivityEvent::Message { + peer_id, + message: message.into(), + }) + .unwrap(); let sent_nonce = rng.gen(); conn_event_sender .send(ConnectivityEvent::Message { peer_id, - message: PeerManagerMessage::PingRequest(PingRequest { nonce: sent_nonce }), + message: PeerManagerMessage::PingRequest(PingRequest { nonce: sent_nonce }).into(), }) .unwrap(); @@ -266,7 +272,7 @@ async fn send_and_sync( conn_event_sender .send(ConnectivityEvent::Message { peer_id, - message: PeerManagerMessage::PingResponse(PingResponse { nonce }), + message: PeerManagerMessage::PingResponse(PingResponse { nonce }).into(), }) .unwrap(); assert_eq!(nonce, sent_nonce); diff --git a/p2p/src/peer_manager/tests/peer_types.rs b/p2p/src/peer_manager/tests/peer_types.rs index d82520c894..c71bf5f32b 100644 --- a/p2p/src/peer_manager/tests/peer_types.rs +++ b/p2p/src/peer_manager/tests/peer_types.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use rstest::rstest; +use strum::IntoEnumIterator as _; use common::{chain::config, primitives::user_agent::mintlayer_core_user_agent}; use networking::transport::TcpTransportSocket; @@ -108,7 +109,7 @@ fn validate_services(#[case] seed: Seed) { let services = Services::from_u64(services); // List all peer roles - for peer_role in enum_iterator::all::() { + for peer_role in PeerRole::iter() { let peer_id_1 = PeerId::new(); let peer_info = PeerInfo { peer_id: peer_id_1, diff --git a/p2p/src/peer_manager/tests/unsuccessful_connection_counter_update.rs b/p2p/src/peer_manager/tests/unsuccessful_connection_counter_update.rs new file mode 100644 index 0000000000..fde1303324 --- /dev/null +++ b/p2p/src/peer_manager/tests/unsuccessful_connection_counter_update.rs @@ -0,0 +1,1276 @@ +// Copyright (c) 2026 RBB S.r.l +// opensource@mintlayer.org +// SPDX-License-Identifier: MIT +// Licensed under the MIT License; +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! These tests check that `AddressData`'s `fail_count` and `connections_without_activity_count` +//! are correctly updated after a failed connection or connection without peer activity, respectively. + +use std::{sync::Arc, time::Duration}; + +use rstest::rstest; +use tokio::sync::mpsc; + +use common::{ + chain::{ + config::{self}, + ChainConfig, + }, + primitives::time::Time, +}; +use logging::log; +use networking::test_helpers::{TestAddressMaker, TestTransportMaker, TestTransportTcp}; +use p2p_test_utils::{expect_no_recv, expect_recv}; +use p2p_types::socket_address::SocketAddress; +use randomness::Rng; +use test_utils::{ + assert_matches, assert_matches_return_val, + random::{make_seedable_rng, Seed}, + BasicTestTimeGetter, +}; +use utils::tokio_spawn_in_current_tracing_span; + +use crate::{ + config::P2pConfig, + disconnection_reason::DisconnectionReason, + error::{DialError, P2pError}, + message::BlockSyncMessageTag, + net::{ + default_backend::types::{Command, Message}, + types::{ + ConnectivityEvent, ConnectivityEventMessageTag, PeerManagerMessageOrTag, PeerRole, + }, + }, + peer_manager::{ + self, + config::PeerManagerConfig, + peerdb::address_data::AddressState, + test_utils::{add_reserved_peer, wait_for_heartbeat}, + tests::{ + make_standalone_peer_manager, + utils::{ + connection_closed, expect_cmd_connect_to, + inbound_full_relay_peer_accepted_by_backend, mutate_peer_manager, + outbound_peer_accepted_by_backend, query_peer_manager, start_manually_connecting, + }, + }, + }, + test_helpers::test_p2p_config_with_peer_mgr_config, + tests::helpers::PeerManagerNotification, + types::peer_id::PeerId, + PeerManagerEvent, +}; + +// Check the case of an automatic connection failure when the peer address would end up in the +// `Disconnected` state. +// 1) Set up the peer manager so that the connection of the corresponding type would occur. +// 2) For non-reserved connections, force-set peer address's `was_reachable` field to true +// (otherwise the address would be put into `Unreachable` state after the first unsuccessful +// connection attempt). For reserved connections try both variants. +// 3) In a loop, advance the time so that the corresponding connection would be attempted; +// make the connection fail; check that `fail_count` has been incremented. +// 4) Occasionally, accept an inbound connection from the peer address; expect that it doesn't +// affect the current `fail_count`. +// 5) On the last iteration of the loop make the connection succeed; check that `fail_count` has been +// set to zero. +#[tracing::instrument(skip(seed))] +#[rstest] +#[trace] +#[case(Seed::from_entropy())] +#[tokio::test] +async fn auto_connection_fails_peer_state_becomes_disconnected( + #[case] seed: Seed, + #[values( + (AutoConnType::FullRelay, true), + (AutoConnType::BlockRelay, true), + (AutoConnType::Reserved, true), + (AutoConnType::Reserved, false), + (AutoConnType::Feeler, true), + )] + (conn_type, was_reachable): (AutoConnType, bool), +) { + let mut rng = make_seedable_rng(seed); + + let chain_config = Arc::new(config::create_unit_test_config()); + let feeler_connections_interval = Duration::from_secs(rng.gen_range(1..100)); + let p2p_config = make_p2p_config_for_auto_connection(conn_type, feeler_connections_interval); + let time_getter = BasicTestTimeGetter::new(); + + let bind_address: SocketAddress = TestTransportTcp::make_address().into(); + let ( + peer_mgr, + conn_event_sender, + peer_mgr_event_sender, + mut cmd_receiver, + mut peer_mgr_notification_receiver, + ) = make_standalone_peer_manager( + Arc::clone(&chain_config), + Arc::clone(&p2p_config), + vec![bind_address], + time_getter.get_time_getter(), + make_seedable_rng(rng.gen()), + ); + + let peer_address: SocketAddress = TestAddressMaker::new_random_address(&mut rng).into(); + + let is_feeler_connection = matches!(conn_type, AutoConnType::Feeler); + let is_reserved_connection = matches!(conn_type, AutoConnType::Reserved); + let is_block_relay_connection = matches!(conn_type, AutoConnType::BlockRelay); + + let peer_mgr_join_handle = tokio_spawn_in_current_tracing_span( + async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }, + "", + ); + + wait_for_heartbeat(&mut peer_mgr_notification_receiver).await; + + let peer_discovery_time = time_getter.get_time_getter().get_time(); + discover_peer(&peer_mgr_event_sender, peer_address, was_reachable).await; + + let peer_addr_state = + expect_peer_state_disconnected(&peer_mgr_event_sender, peer_address).await; + assert_eq!( + peer_addr_state, + AddressStateDisconnected { + was_reachable, + fail_count: 0, + next_connect_after: peer_discovery_time, + connections_without_activity_count: 0, + } + ); + + if is_reserved_connection { + add_reserved_peer(&peer_mgr_event_sender, peer_address.socket_addr().into()).await; + } + + let num_iterations = rng.gen_range(5..10); + let mut last_connect_after = Time::from_duration_since_epoch(Duration::ZERO); + + for i in 0..num_iterations { + log::debug!("Running iteration {i} out of {num_iterations}"); + + let time_before_wait = time_getter.get_time_getter().get_time(); + let connect_after = if is_feeler_connection { + let next_feeler_connection_time = + query_peer_manager(&peer_mgr_event_sender, move |peer_mgr| { + peer_mgr.next_feeler_connection_time() + }) + .await; + + std::cmp::max(last_connect_after, next_feeler_connection_time) + } else { + last_connect_after + }; + + let wait_duration = std::cmp::max( + connect_after + .as_duration_since_epoch() + .checked_sub(time_before_wait.as_duration_since_epoch()) + .unwrap_or(Duration::ZERO), + peer_manager::HEARTBEAT_INTERVAL_MAX, + ); + + time_getter.advance_time_rounded_up(wait_duration); + wait_for_heartbeat(&mut peer_mgr_notification_receiver).await; + + let cmd = expect_recv!(cmd_receiver); + expect_cmd_connect_to(&cmd, &peer_address); + + let is_last_iteration = i == num_iterations - 1; + + if is_last_iteration { + let peer_id = outbound_peer_accepted_by_backend( + &conn_event_sender, + peer_address, + bind_address, + &chain_config, + !is_block_relay_connection, + ); + + peer_accepted_by_peer_mgr( + &mut cmd_receiver, + &mut peer_mgr_notification_receiver, + peer_id, + peer_address, + conn_type.to_peer_role(), + ) + .await; + + close_connection( + &conn_event_sender, + &mut peer_mgr_notification_receiver, + peer_id, + ) + .await; + } else { + let connection_error = P2pError::DialError(DialError::ConnectionRefusedOrTimedOut); + + conn_event_sender + .send(ConnectivityEvent::ConnectionError { + peer_address, + error: connection_error.clone(), + }) + .unwrap(); + + let peer_mgr_notification = expect_recv!(peer_mgr_notification_receiver); + assert_eq!( + peer_mgr_notification, + PeerManagerNotification::OutboundError { + address: peer_address, + error: connection_error.clone() + } + ); + + // An inbound connection shouldn't change the outcome even if it's successful. + if rng.gen_bool(0.5) { + log::debug!("Accepting an extra inbound connection"); + + inbound_full_relay_peer_connected_and_disconnected( + &conn_event_sender, + &mut cmd_receiver, + &mut peer_mgr_notification_receiver, + peer_address, + bind_address, + &chain_config, + ) + .await; + } + } + + let expected_fail_count = if is_last_iteration { 0 } else { i + 1 }; + let expected_connections_without_activity_count = + if is_last_iteration && !is_feeler_connection { + 1 + } else { + 0 + }; + + let time_after_wait = time_getter.get_time_getter().get_time(); + + let peer_addr_state = + expect_peer_state_disconnected(&peer_mgr_event_sender, peer_address).await; + assert_eq!( + peer_addr_state.was_reachable, + was_reachable || is_last_iteration + ); + assert_eq!(peer_addr_state.fail_count, expected_fail_count); + assert!(peer_addr_state.next_connect_after > time_after_wait); + assert_eq!( + peer_addr_state.connections_without_activity_count, + expected_connections_without_activity_count + ); + + last_connect_after = peer_addr_state.next_connect_after; + } + + drop(conn_event_sender); + drop(peer_mgr_event_sender); + + let _peer_mgr = peer_mgr_join_handle.await.unwrap(); +} + +// Check the case of an automatic connection failure when the peer address would end up in the +// `Unreachable` state. +// 1) Set up the peer manager so that the connection of the corresponding type would occur. +// Keep peer address's `was_reachable` field at false. +// 3) Advance the time so that the corresponding connection would be attempted; make the +// connection fail; check that the peer address state is now `Unreachable`. +// 4) Check that no further connection attempts are made. +#[tracing::instrument(skip(seed))] +#[rstest] +#[trace] +#[case(Seed::from_entropy())] +#[tokio::test] +async fn auto_connection_fails_peer_state_becomes_unreachable( + #[case] seed: Seed, + #[values( + AutoConnType::FullRelay, + AutoConnType::BlockRelay, + AutoConnType::Feeler + )] + conn_type: AutoConnType, +) { + let mut rng = make_seedable_rng(seed); + + let chain_config = Arc::new(config::create_unit_test_config()); + let feeler_connections_interval = Duration::from_secs(rng.gen_range(1..100)); + let p2p_config = make_p2p_config_for_auto_connection(conn_type, feeler_connections_interval); + let time_getter = BasicTestTimeGetter::new(); + + let bind_address: SocketAddress = TestTransportTcp::make_address().into(); + let ( + peer_mgr, + conn_event_sender, + peer_mgr_event_sender, + mut cmd_receiver, + mut peer_mgr_notification_receiver, + ) = make_standalone_peer_manager( + Arc::clone(&chain_config), + Arc::clone(&p2p_config), + vec![bind_address], + time_getter.get_time_getter(), + make_seedable_rng(rng.gen()), + ); + + let peer_address: SocketAddress = TestAddressMaker::new_random_address(&mut rng).into(); + + let is_feeler_connection = matches!(conn_type, AutoConnType::Feeler); + let is_reserved_connection = matches!(conn_type, AutoConnType::Reserved); + + let peer_mgr_join_handle = tokio_spawn_in_current_tracing_span( + async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }, + "", + ); + + wait_for_heartbeat(&mut peer_mgr_notification_receiver).await; + + let peer_discovery_time = time_getter.get_time_getter().get_time(); + discover_peer(&peer_mgr_event_sender, peer_address, false).await; + + let peer_addr_state = + expect_peer_state_disconnected(&peer_mgr_event_sender, peer_address).await; + assert_eq!( + peer_addr_state, + AddressStateDisconnected { + was_reachable: false, + fail_count: 0, + next_connect_after: peer_discovery_time, + connections_without_activity_count: 0 + } + ); + + if is_reserved_connection { + add_reserved_peer(&peer_mgr_event_sender, peer_address.socket_addr().into()).await; + } + + let time_before_wait = time_getter.get_time_getter().get_time(); + let connect_after = if is_feeler_connection { + query_peer_manager(&peer_mgr_event_sender, move |peer_mgr| { + peer_mgr.next_feeler_connection_time() + }) + .await + } else { + Time::from_duration_since_epoch(Duration::ZERO) + }; + + let wait_duration = std::cmp::max( + connect_after + .as_duration_since_epoch() + .checked_sub(time_before_wait.as_duration_since_epoch()) + .unwrap_or(Duration::ZERO), + peer_manager::HEARTBEAT_INTERVAL_MAX, + ); + + time_getter.advance_time_rounded_up(wait_duration); + wait_for_heartbeat(&mut peer_mgr_notification_receiver).await; + + let cmd = expect_recv!(cmd_receiver); + expect_cmd_connect_to(&cmd, &peer_address); + + let connection_error = P2pError::DialError(DialError::ConnectionRefusedOrTimedOut); + + conn_event_sender + .send(ConnectivityEvent::ConnectionError { + peer_address, + error: connection_error.clone(), + }) + .unwrap(); + + let peer_mgr_notification = expect_recv!(peer_mgr_notification_receiver); + assert_eq!( + peer_mgr_notification, + PeerManagerNotification::OutboundError { + address: peer_address, + error: connection_error.clone() + } + ); + + // An inbound connection shouldn't change the outcome even if it's successful. + if rng.gen_bool(0.5) { + log::debug!("Accepting an extra inbound connection"); + + inbound_full_relay_peer_connected_and_disconnected( + &conn_event_sender, + &mut cmd_receiver, + &mut peer_mgr_notification_receiver, + peer_address, + bind_address, + &chain_config, + ) + .await; + } + + let time_after_wait = time_getter.get_time_getter().get_time(); + + let peer_addr_state = expect_peer_state_unreachable(&peer_mgr_event_sender, peer_address).await; + assert!(peer_addr_state.erase_after > time_after_wait); + + // No further connection attempts should be made. + time_getter.advance_time(peer_manager::HEARTBEAT_INTERVAL_MAX * 1000); + wait_for_heartbeat(&mut peer_mgr_notification_receiver).await; + expect_no_recv!(cmd_receiver); + + drop(conn_event_sender); + drop(peer_mgr_event_sender); + + let _peer_mgr = peer_mgr_join_handle.await.unwrap(); +} + +// Check the case of a manual connection failure. +// 1) Setup the peer manager, optionally force-setting the peer address's `was_reachable` field +// to true. +// 2) In a loop, attempt a manual connection to the peer and make it fail. Check that: +// a) if `was_reachable` was true, the resulting address state is `Disconnected` and `fail_count` +// has been incremented; +// b) otherwise the resulting address state is `Unreachable`. +// 3) Occasionally, accept an inbound connection from the peer address; expect that it doesn't +// affect the current address state. +// 4) On the last iteration make the connection succeed. Check that the resulting address state is +// now `Disconnected` with zero `fail_count`. +#[tracing::instrument(skip(seed))] +#[rstest] +#[trace] +#[case(Seed::from_entropy())] +#[tokio::test] +async fn manual_connection_fails(#[case] seed: Seed, #[values(false, true)] make_reachable: bool) { + let mut rng = make_seedable_rng(seed); + + let chain_config = Arc::new(config::create_unit_test_config()); + let p2p_config = { + let peer_mgr_config = PeerManagerConfig { + outbound_block_relay_count: 0.into(), + outbound_block_relay_extra_count: 0.into(), + outbound_full_relay_count: 0.into(), + outbound_full_relay_extra_count: 0.into(), + enable_feeler_connections: false.into(), + + max_inbound_connections: Default::default(), + preserved_inbound_count_address_group: Default::default(), + preserved_inbound_count_ping: Default::default(), + preserved_inbound_count_new_blocks: Default::default(), + preserved_inbound_count_new_transactions: Default::default(), + outbound_block_relay_connection_min_age: Default::default(), + outbound_full_relay_connection_min_age: Default::default(), + stale_tip_time_diff: Default::default(), + main_loop_tick_interval: Default::default(), + feeler_connections_interval: Default::default(), + force_dns_query_if_no_global_addresses_known: Default::default(), + allow_same_ip_connections: Default::default(), + peerdb_config: Default::default(), + min_peer_software_version: Default::default(), + }; + + Arc::new(test_p2p_config_with_peer_mgr_config(peer_mgr_config)) + }; + + let time_getter = BasicTestTimeGetter::new(); + + let bind_address: SocketAddress = TestTransportTcp::make_address().into(); + let ( + peer_mgr, + conn_event_sender, + peer_mgr_event_sender, + mut cmd_receiver, + mut peer_mgr_notification_receiver, + ) = make_standalone_peer_manager( + Arc::clone(&chain_config), + Arc::clone(&p2p_config), + vec![bind_address], + time_getter.get_time_getter(), + make_seedable_rng(rng.gen()), + ); + + let peer_address: SocketAddress = TestAddressMaker::new_random_address(&mut rng).into(); + + let peer_mgr_join_handle = tokio_spawn_in_current_tracing_span( + async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }, + "", + ); + + wait_for_heartbeat(&mut peer_mgr_notification_receiver).await; + + if make_reachable { + let peer_discovery_time = time_getter.get_time_getter().get_time(); + + discover_peer(&peer_mgr_event_sender, peer_address, true).await; + + let peer_addr_state = + expect_peer_state_disconnected(&peer_mgr_event_sender, peer_address).await; + assert_eq!( + peer_addr_state, + AddressStateDisconnected { + was_reachable: true, + fail_count: 0, + next_connect_after: peer_discovery_time, + connections_without_activity_count: 0 + } + ); + } else { + expect_no_peer_state(&peer_mgr_event_sender, peer_address).await; + } + + let num_iterations = rng.gen_range(5..10); + let mut erase_after = None; + + let now = time_getter.get_time_getter().get_time(); + + for i in 0..num_iterations { + log::debug!("Running iteration {i} out of {num_iterations}"); + + let is_last_iteration = i == num_iterations - 1; + + if is_last_iteration { + start_and_close_manual_connection( + &conn_event_sender, + &peer_mgr_event_sender, + &mut cmd_receiver, + &mut peer_mgr_notification_receiver, + peer_address, + bind_address, + &chain_config, + ) + .await; + } else { + attempt_and_fail_manual_connection( + &conn_event_sender, + &peer_mgr_event_sender, + &mut cmd_receiver, + &mut peer_mgr_notification_receiver, + peer_address, + ) + .await; + + // An inbound connection shouldn't change the outcome even if it's successful. + if rng.gen_bool(0.5) { + log::debug!("Accepting an extra inbound connection"); + + inbound_full_relay_peer_connected_and_disconnected( + &conn_event_sender, + &mut cmd_receiver, + &mut peer_mgr_notification_receiver, + peer_address, + bind_address, + &chain_config, + ) + .await; + } + } + + if make_reachable || is_last_iteration { + let fail_count = if is_last_iteration { 0 } else { i + 1 }; + let peer_addr_state = + expect_peer_state_disconnected(&peer_mgr_event_sender, peer_address).await; + assert!(peer_addr_state.was_reachable); + assert_eq!(peer_addr_state.fail_count, fail_count); + assert!(peer_addr_state.next_connect_after > now); + assert_eq!(peer_addr_state.connections_without_activity_count, 0); + } else { + let peer_addr_state = + expect_peer_state_unreachable(&peer_mgr_event_sender, peer_address).await; + + if i == 0 { + assert!(peer_addr_state.erase_after > now); + erase_after = Some(peer_addr_state.erase_after); + } else { + // For unreachable addresses erase_after doesn't change on further unsuccessful connection attempts. + assert_eq!(peer_addr_state.erase_after, erase_after.unwrap()); + } + } + } + + drop(conn_event_sender); + drop(peer_mgr_event_sender); + + let _peer_mgr = peer_mgr_join_handle.await.unwrap(); +} + +// Check the case of a successful automatic connection without any peer activity. +// 1) Set up the peer manager so that the connection of the corresponding type would occur. +// 2) In a loop, advance the time so that the corresponding connection would be attempted. +// Make the connection succeed and close it immediately; check that `connections_without_activity_count` +// has been incremented. +// 3) Occasionally, make a successful incoming or manual outgoing connection without peer activity, or an unsuccessful +// manual outgoing one. Expect that this doesn't affect `connections_without_activity_count`. +// 4) On the final iteration make the peer actually send a message. Check that `connections_without_activity_count` has +// been reset to zero. +// Note that feeler connections are not checked in this test because once a feeler connection succeeds, it won't +// be attempted again. +#[tracing::instrument(skip(seed))] +#[rstest] +#[trace] +#[case(Seed::from_entropy())] +#[tokio::test] +async fn auto_connection_without_peer_activity( + #[case] seed: Seed, + #[values( + AutoConnType::FullRelay, + AutoConnType::BlockRelay, + AutoConnType::Reserved + )] + conn_type: AutoConnType, +) { + let mut rng = make_seedable_rng(seed); + + let chain_config = Arc::new(config::create_unit_test_config()); + let p2p_config = make_p2p_config_for_auto_connection(conn_type, Duration::ZERO); + let time_getter = BasicTestTimeGetter::new(); + + let bind_address: SocketAddress = TestTransportTcp::make_address().into(); + let ( + peer_mgr, + conn_event_sender, + peer_mgr_event_sender, + mut cmd_receiver, + mut peer_mgr_notification_receiver, + ) = make_standalone_peer_manager( + Arc::clone(&chain_config), + Arc::clone(&p2p_config), + vec![bind_address], + time_getter.get_time_getter(), + make_seedable_rng(rng.gen()), + ); + + let peer_address: SocketAddress = TestAddressMaker::new_random_address(&mut rng).into(); + + let is_reserved_connection = matches!(conn_type, AutoConnType::Reserved); + let is_block_relay_connection = matches!(conn_type, AutoConnType::BlockRelay); + let is_full_relay_connection = matches!(conn_type, AutoConnType::FullRelay); + + let peer_mgr_join_handle = tokio_spawn_in_current_tracing_span( + async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }, + "", + ); + + wait_for_heartbeat(&mut peer_mgr_notification_receiver).await; + + let peer_discovery_time = time_getter.get_time_getter().get_time(); + discover_peer(&peer_mgr_event_sender, peer_address, false).await; + + let peer_addr_state = + expect_peer_state_disconnected(&peer_mgr_event_sender, peer_address).await; + assert_eq!( + peer_addr_state, + AddressStateDisconnected { + was_reachable: false, + fail_count: 0, + next_connect_after: peer_discovery_time, + connections_without_activity_count: 0 + } + ); + + if is_reserved_connection { + add_reserved_peer(&peer_mgr_event_sender, peer_address.socket_addr().into()).await; + } + + let num_iterations = rng.gen_range(5..10); + let mut last_connect_after = Time::from_duration_since_epoch(Duration::ZERO); + + for i in 0..num_iterations { + log::debug!("Running iteration {i} out of {num_iterations}"); + + let time_before_wait = time_getter.get_time_getter().get_time(); + let wait_duration = std::cmp::max( + last_connect_after + .as_duration_since_epoch() + .checked_sub(time_before_wait.as_duration_since_epoch()) + .unwrap_or(Duration::ZERO), + peer_manager::HEARTBEAT_INTERVAL_MAX, + ); + + time_getter.advance_time_rounded_up(wait_duration); + wait_for_heartbeat(&mut peer_mgr_notification_receiver).await; + + let cmd = expect_recv!(cmd_receiver); + expect_cmd_connect_to(&cmd, &peer_address); + + let peer_id = outbound_peer_accepted_by_backend( + &conn_event_sender, + peer_address, + bind_address, + &chain_config, + !is_block_relay_connection, + ); + + peer_accepted_by_peer_mgr( + &mut cmd_receiver, + &mut peer_mgr_notification_receiver, + peer_id, + peer_address, + conn_type.to_peer_role(), + ) + .await; + + if is_reserved_connection || is_full_relay_connection { + // If the case of a reserved or full relay connection, the peer manager will send + // AddrListRequest to the peer. + let cmd = expect_recv!(cmd_receiver); + let (peer_id_in_cmd, message) = assert_matches_return_val!( + cmd, + Command::SendMessage { peer_id, message }, + (peer_id, message) + ); + assert_eq!(peer_id_in_cmd, peer_id); + assert_matches!(message, Message::AddrListRequest(_)); + } + + let is_last_iteration = i == num_iterations - 1; + + if is_last_iteration { + conn_event_sender + .send(ConnectivityEvent::Message { + peer_id, + message: PeerManagerMessageOrTag::BlockSyncMessage( + BlockSyncMessageTag::HeaderListRequest, + ), + }) + .unwrap(); + + let peer_mgr_notification = expect_recv!(peer_mgr_notification_receiver); + assert_eq!( + peer_mgr_notification, + PeerManagerNotification::MessageReceived { + peer_id, + message_tag: ConnectivityEventMessageTag::BlockSyncMessage( + BlockSyncMessageTag::HeaderListRequest + ) + } + ); + } + + close_connection( + &conn_event_sender, + &mut peer_mgr_notification_receiver, + peer_id, + ) + .await; + + // An inbound or manual connection without peer activity shouldn't affect connections_without_activity_count. + // Same for a failed outbound connection. + let extra_outboun_connection_failed = match rng.gen_range(0..4) { + 0 => { + log::debug!("Accepting an extra inbound connection"); + + inbound_full_relay_peer_connected_and_disconnected( + &conn_event_sender, + &mut cmd_receiver, + &mut peer_mgr_notification_receiver, + peer_address, + bind_address, + &chain_config, + ) + .await; + + false + } + 1 => { + log::debug!("Making an extra successful manual outbound connection"); + + start_and_close_manual_connection( + &conn_event_sender, + &peer_mgr_event_sender, + &mut cmd_receiver, + &mut peer_mgr_notification_receiver, + peer_address, + bind_address, + &chain_config, + ) + .await; + + false + } + 2 => { + log::debug!("Making an extra unsuccessful manual outbound connection"); + + attempt_and_fail_manual_connection( + &conn_event_sender, + &peer_mgr_event_sender, + &mut cmd_receiver, + &mut peer_mgr_notification_receiver, + peer_address, + ) + .await; + + true + } + + _ => false, + }; + + let expected_connections_without_activity_count = if is_last_iteration { 0 } else { i + 1 }; + let expected_fail_count = if extra_outboun_connection_failed { + 1 + } else { + 0 + }; + + let time_after_wait = time_getter.get_time_getter().get_time(); + + let peer_addr_state = + expect_peer_state_disconnected(&peer_mgr_event_sender, peer_address).await; + assert!(peer_addr_state.was_reachable); + assert_eq!(peer_addr_state.fail_count, expected_fail_count); + assert!(peer_addr_state.next_connect_after > time_after_wait); + assert_eq!( + peer_addr_state.connections_without_activity_count, + expected_connections_without_activity_count + ); + + last_connect_after = peer_addr_state.next_connect_after; + } + + drop(conn_event_sender); + drop(peer_mgr_event_sender); + + let _peer_mgr = peer_mgr_join_handle.await.unwrap(); +} + +// Check the case of a successful feeler connection without any peer activity. +// 1) Set up the peer manager so that a feeler connection would occur. +// 2) Advance the time so that the connection would be attempted. +// Make it succeed and check that disconnection is initiated immediately. Close the connection. +// 3) `connections_without_activity_count` should remain zero. +#[tracing::instrument(skip(seed))] +#[rstest] +#[trace] +#[case(Seed::from_entropy())] +#[tokio::test] +async fn feeler_connection_without_peer_activity(#[case] seed: Seed) { + let mut rng = make_seedable_rng(seed); + + let chain_config = Arc::new(config::create_unit_test_config()); + let feeler_connections_interval = Duration::from_secs(rng.gen_range(1..100)); + let p2p_config = + make_p2p_config_for_auto_connection(AutoConnType::Feeler, feeler_connections_interval); + let time_getter = BasicTestTimeGetter::new(); + + let bind_address: SocketAddress = TestTransportTcp::make_address().into(); + let ( + peer_mgr, + conn_event_sender, + peer_mgr_event_sender, + mut cmd_receiver, + mut peer_mgr_notification_receiver, + ) = make_standalone_peer_manager( + Arc::clone(&chain_config), + Arc::clone(&p2p_config), + vec![bind_address], + time_getter.get_time_getter(), + make_seedable_rng(rng.gen()), + ); + + let peer_address: SocketAddress = TestAddressMaker::new_random_address(&mut rng).into(); + + let peer_mgr_join_handle = tokio_spawn_in_current_tracing_span( + async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }, + "", + ); + + wait_for_heartbeat(&mut peer_mgr_notification_receiver).await; + + let peer_discovery_time = time_getter.get_time_getter().get_time(); + discover_peer(&peer_mgr_event_sender, peer_address, false).await; + + let peer_addr_state = + expect_peer_state_disconnected(&peer_mgr_event_sender, peer_address).await; + assert_eq!( + peer_addr_state, + AddressStateDisconnected { + was_reachable: false, + fail_count: 0, + next_connect_after: peer_discovery_time, + connections_without_activity_count: 0 + } + ); + + let time_before_wait = time_getter.get_time_getter().get_time(); + let next_feeler_connection_time = query_peer_manager(&peer_mgr_event_sender, move |peer_mgr| { + peer_mgr.next_feeler_connection_time() + }) + .await; + let wait_duration = std::cmp::max( + next_feeler_connection_time + .as_duration_since_epoch() + .checked_sub(time_before_wait.as_duration_since_epoch()) + .unwrap_or(Duration::ZERO), + peer_manager::HEARTBEAT_INTERVAL_MAX, + ); + + time_getter.advance_time_rounded_up(wait_duration); + wait_for_heartbeat(&mut peer_mgr_notification_receiver).await; + + let cmd = expect_recv!(cmd_receiver); + expect_cmd_connect_to(&cmd, &peer_address); + + let peer_id = outbound_peer_accepted_by_backend( + &conn_event_sender, + peer_address, + bind_address, + &chain_config, + true, + ); + + peer_accepted_by_peer_mgr( + &mut cmd_receiver, + &mut peer_mgr_notification_receiver, + peer_id, + peer_address, + PeerRole::Feeler, + ) + .await; + + // Since it's a feeler connection, the peer manager will initiate disconnection right away. + let cmd = expect_recv!(cmd_receiver); + assert_eq!( + cmd, + Command::Disconnect { + peer_id, + reason: Some(DisconnectionReason::FeelerConnection) + } + ); + + close_connection( + &conn_event_sender, + &mut peer_mgr_notification_receiver, + peer_id, + ) + .await; + + let time_after_wait = time_getter.get_time_getter().get_time(); + + let peer_addr_state = + expect_peer_state_disconnected(&peer_mgr_event_sender, peer_address).await; + assert!(peer_addr_state.was_reachable); + assert_eq!(peer_addr_state.fail_count, 0); + assert!(peer_addr_state.next_connect_after > time_after_wait); + assert_eq!(peer_addr_state.connections_without_activity_count, 0); + + drop(conn_event_sender); + drop(peer_mgr_event_sender); + + let _peer_mgr = peer_mgr_join_handle.await.unwrap(); +} + +#[derive(Debug, Copy, Clone)] +enum AutoConnType { + FullRelay, + BlockRelay, + Reserved, + Feeler, +} + +impl AutoConnType { + fn to_peer_role(self) -> PeerRole { + match self { + AutoConnType::FullRelay => PeerRole::OutboundFullRelay, + AutoConnType::BlockRelay => PeerRole::OutboundBlockRelay, + AutoConnType::Reserved => PeerRole::OutboundReserved, + AutoConnType::Feeler => PeerRole::Feeler, + } + } +} + +fn make_p2p_config_for_auto_connection( + conn_type: AutoConnType, + feeler_connections_interval: Duration, +) -> Arc { + let mut outbound_block_relay_count = 0; + let mut outbound_full_relay_count = 0; + let mut enable_feeler_connections = false; + + match conn_type { + AutoConnType::FullRelay => { + outbound_full_relay_count = 1; + } + AutoConnType::BlockRelay => { + outbound_block_relay_count = 1; + } + AutoConnType::Feeler => { + enable_feeler_connections = true; + } + AutoConnType::Reserved => {} + }; + + let peer_mgr_config = PeerManagerConfig { + outbound_block_relay_count: outbound_block_relay_count.into(), + outbound_block_relay_extra_count: 0.into(), + outbound_full_relay_count: outbound_full_relay_count.into(), + outbound_full_relay_extra_count: 0.into(), + enable_feeler_connections: enable_feeler_connections.into(), + feeler_connections_interval: feeler_connections_interval.into(), + + max_inbound_connections: Default::default(), + preserved_inbound_count_address_group: Default::default(), + preserved_inbound_count_ping: Default::default(), + preserved_inbound_count_new_blocks: Default::default(), + preserved_inbound_count_new_transactions: Default::default(), + outbound_block_relay_connection_min_age: Default::default(), + outbound_full_relay_connection_min_age: Default::default(), + stale_tip_time_diff: Default::default(), + main_loop_tick_interval: Default::default(), + force_dns_query_if_no_global_addresses_known: Default::default(), + allow_same_ip_connections: Default::default(), + peerdb_config: Default::default(), + min_peer_software_version: Default::default(), + }; + + Arc::new(test_p2p_config_with_peer_mgr_config(peer_mgr_config)) +} + +async fn discover_peer( + peer_mgr_event_sender: &mpsc::UnboundedSender, + peer_address: SocketAddress, + make_reachable: bool, +) { + mutate_peer_manager(peer_mgr_event_sender, move |peer_mgr| { + peer_mgr.peer_db_mut().peer_discovered(peer_address); + + if make_reachable { + let addr_data = peer_mgr.peer_db_mut().address_data_mut(&peer_address).unwrap(); + + let was_reachable = assert_matches_return_val!( + addr_data.state_mut(), + AddressState::Disconnected { was_reachable, .. }, + was_reachable + ); + + *was_reachable = true + } + }) + .await; +} + +// Contents of AddressState::Disconnected and also the `connections_without_activity_count` +// field that is part of `AddressData`. +#[derive(Eq, PartialEq, Clone, Debug)] +struct AddressStateDisconnected { + was_reachable: bool, + fail_count: u32, + next_connect_after: Time, + connections_without_activity_count: u32, +} + +// Contents of AddressState::Unreachable +#[derive(Eq, PartialEq, Clone, Debug)] +struct AddressStateUnreachable { + erase_after: Time, +} + +#[must_use] +async fn expect_peer_state_disconnected( + peer_mgr_event_sender: &mpsc::UnboundedSender, + peer_address: SocketAddress, +) -> AddressStateDisconnected { + query_peer_manager(peer_mgr_event_sender, move |peer_mgr| { + let addr_data = peer_mgr.peer_db().address_data(&peer_address).unwrap(); + assert_matches_return_val!( + addr_data.state().clone(), + AddressState::Disconnected { + was_reachable, + fail_count, + next_connect_after + }, + AddressStateDisconnected { + was_reachable, + fail_count, + next_connect_after, + connections_without_activity_count: addr_data.connections_without_activity_count() + } + ) + }) + .await +} + +#[must_use] +async fn expect_peer_state_unreachable( + peer_mgr_event_sender: &mpsc::UnboundedSender, + peer_address: SocketAddress, +) -> AddressStateUnreachable { + query_peer_manager(peer_mgr_event_sender, move |peer_mgr| { + let addr_data = peer_mgr.peer_db().address_data(&peer_address).unwrap(); + assert_matches_return_val!( + addr_data.state().clone(), + AddressState::Unreachable { erase_after }, + AddressStateUnreachable { erase_after } + ) + }) + .await +} + +async fn expect_no_peer_state( + peer_mgr_event_sender: &mpsc::UnboundedSender, + peer_address: SocketAddress, +) { + query_peer_manager(peer_mgr_event_sender, move |peer_mgr| { + assert!(peer_mgr.peer_db().address_data(&peer_address).is_none()); + }) + .await +} + +async fn peer_accepted_by_peer_mgr( + cmd_receiver: &mut mpsc::UnboundedReceiver, + peer_mgr_notification_receiver: &mut mpsc::UnboundedReceiver, + peer_id: PeerId, + peer_address: SocketAddress, + peer_role: PeerRole, +) { + let cmd = expect_recv!(cmd_receiver); + assert_eq!(cmd, Command::Accept { peer_id }); + + let peer_mgr_notification = expect_recv!(peer_mgr_notification_receiver); + assert_eq!( + peer_mgr_notification, + PeerManagerNotification::ConnectionAccepted { + address: peer_address, + peer_role + } + ); +} + +async fn close_connection( + conn_event_sender: &mpsc::UnboundedSender, + peer_mgr_notification_receiver: &mut mpsc::UnboundedReceiver, + peer_id: PeerId, +) { + connection_closed(conn_event_sender, peer_id); + + let peer_mgr_notification = expect_recv!(peer_mgr_notification_receiver); + assert_eq!( + peer_mgr_notification, + PeerManagerNotification::ConnectionClosed { peer_id } + ); +} + +async fn inbound_full_relay_peer_connected_and_disconnected( + conn_event_sender: &mpsc::UnboundedSender, + cmd_receiver: &mut mpsc::UnboundedReceiver, + peer_mgr_notification_receiver: &mut mpsc::UnboundedReceiver, + peer_address: SocketAddress, + bind_address: SocketAddress, + chain_config: &ChainConfig, +) { + let peer_id = inbound_full_relay_peer_accepted_by_backend( + conn_event_sender, + peer_address, + bind_address, + chain_config, + ); + peer_accepted_by_peer_mgr( + cmd_receiver, + peer_mgr_notification_receiver, + peer_id, + peer_address, + PeerRole::Inbound, + ) + .await; + + close_connection(conn_event_sender, peer_mgr_notification_receiver, peer_id).await; +} + +async fn start_and_close_manual_connection( + conn_event_sender: &mpsc::UnboundedSender, + peer_mgr_event_sender: &mpsc::UnboundedSender, + cmd_receiver: &mut mpsc::UnboundedReceiver, + peer_mgr_notification_receiver: &mut mpsc::UnboundedReceiver, + peer_address: SocketAddress, + bind_address: SocketAddress, + chain_config: &ChainConfig, +) { + let result_recv = start_manually_connecting(peer_mgr_event_sender, peer_address); + + let cmd = expect_recv!(cmd_receiver); + expect_cmd_connect_to(&cmd, &peer_address); + + let peer_id = outbound_peer_accepted_by_backend( + conn_event_sender, + peer_address, + bind_address, + chain_config, + true, + ); + + peer_accepted_by_peer_mgr( + cmd_receiver, + peer_mgr_notification_receiver, + peer_id, + peer_address, + PeerRole::OutboundManual, + ) + .await; + + result_recv.await.unwrap().unwrap(); + + // Since it's a manual connection, the peer manager will send AddrListRequest to the peer. + let cmd = expect_recv!(cmd_receiver); + let (peer_id_in_cmd, message) = assert_matches_return_val!( + cmd, + Command::SendMessage { peer_id, message }, + (peer_id, message) + ); + assert_eq!(peer_id_in_cmd, peer_id); + assert_matches!(message, Message::AddrListRequest(_)); + + close_connection(conn_event_sender, peer_mgr_notification_receiver, peer_id).await; +} + +async fn attempt_and_fail_manual_connection( + conn_event_sender: &mpsc::UnboundedSender, + peer_mgr_event_sender: &mpsc::UnboundedSender, + cmd_receiver: &mut mpsc::UnboundedReceiver, + peer_mgr_notification_receiver: &mut mpsc::UnboundedReceiver, + peer_address: SocketAddress, +) { + let result_recv = start_manually_connecting(peer_mgr_event_sender, peer_address); + + let cmd = expect_recv!(cmd_receiver); + expect_cmd_connect_to(&cmd, &peer_address); + + let connection_error = P2pError::DialError(DialError::ConnectionRefusedOrTimedOut); + + conn_event_sender + .send(ConnectivityEvent::ConnectionError { + peer_address, + error: connection_error.clone(), + }) + .unwrap(); + + let peer_mgr_notification = expect_recv!(peer_mgr_notification_receiver); + assert_eq!( + peer_mgr_notification, + PeerManagerNotification::OutboundError { + address: peer_address, + error: connection_error.clone() + } + ); + + assert_eq!(result_recv.await.unwrap(), Err(connection_error)); +} diff --git a/p2p/src/peer_manager/tests/utils.rs b/p2p/src/peer_manager/tests/utils.rs index f2dfa182e5..a681ef1fb4 100644 --- a/p2p/src/peer_manager/tests/utils.rs +++ b/p2p/src/peer_manager/tests/utils.rs @@ -34,6 +34,7 @@ use crate::{ types::{ConnectivityEvent, PeerInfo}, }, peer_manager::PeerManagerInterface, + peer_manager_event::PeerDisconnectionDbAction, test_helpers::TEST_PROTOCOL_VERSION, tests::helpers::PeerManagerNotification, utils::oneshot_nofail, @@ -173,17 +174,13 @@ pub fn outbound_block_relay_peer_accepted_by_backend( bind_address: SocketAddress, chain_config: &ChainConfig, ) -> PeerId { - let peer_id = PeerId::new(); - conn_event_sender - .send(ConnectivityEvent::OutboundAccepted { - peer_address, - bind_address, - peer_info: make_block_relay_peer_info(peer_id, chain_config), - node_address_as_seen_by_peer: None, - }) - .unwrap(); - - peer_id + outbound_peer_accepted_by_backend( + conn_event_sender, + peer_address, + bind_address, + chain_config, + false, + ) } /// Send a ConnectivityEvent simulating a connection being accepted by the backend. @@ -192,13 +189,34 @@ pub fn outbound_full_relay_peer_accepted_by_backend( peer_address: SocketAddress, bind_address: SocketAddress, chain_config: &ChainConfig, +) -> PeerId { + outbound_peer_accepted_by_backend( + conn_event_sender, + peer_address, + bind_address, + chain_config, + true, + ) +} + +pub fn outbound_peer_accepted_by_backend( + conn_event_sender: &mpsc::UnboundedSender, + peer_address: SocketAddress, + bind_address: SocketAddress, + chain_config: &ChainConfig, + is_full_relay: bool, ) -> PeerId { let peer_id = PeerId::new(); + let peer_info = if is_full_relay { + make_full_relay_peer_info(peer_id, chain_config) + } else { + make_block_relay_peer_info(peer_id, chain_config) + }; conn_event_sender .send(ConnectivityEvent::OutboundAccepted { peer_address, bind_address, - peer_info: make_full_relay_peer_info(peer_id, chain_config), + peer_info, node_address_as_seen_by_peer: None, }) .unwrap(); @@ -206,6 +224,13 @@ pub fn outbound_full_relay_peer_accepted_by_backend( peer_id } +pub fn connection_closed( + conn_event_sender: &mpsc::UnboundedSender, + peer_id: PeerId, +) { + conn_event_sender.send(ConnectivityEvent::ConnectionClosed { peer_id }).unwrap(); +} + pub async fn wait_for_heartbeat( peer_mgr_notification_receiver: &mut mpsc::UnboundedReceiver, ) { @@ -274,6 +299,25 @@ pub fn start_manually_connecting( result_receiver } +pub fn disconnect_manually( + peer_mgr_event_sender: &mpsc::UnboundedSender, + peer_id: PeerId, + peerdb_action: PeerDisconnectionDbAction, +) -> oneshot_nofail::Receiver> { + let (result_sender, result_receiver) = oneshot_nofail::channel(); + + peer_mgr_event_sender + .send(PeerManagerEvent::Disconnect( + peer_id, + peerdb_action, + None, + result_sender, + )) + .unwrap(); + + result_receiver +} + pub async fn adjust_peer_score( peer_mgr_event_sender: &mpsc::UnboundedSender, peer_id: PeerId, @@ -306,3 +350,19 @@ pub async fn ban_peer_manually( result_receiver.await.unwrap().unwrap(); } + +pub async fn add_reserved_peer( + peer_mgr_event_sender: &mpsc::UnboundedSender, + peer_addr: SocketAddress, +) { + let (result_sender, result_receiver) = oneshot_nofail::channel(); + + peer_mgr_event_sender + .send(PeerManagerEvent::AddReserved( + IpOrSocketAddress::Socket(peer_addr.socket_addr()), + result_sender, + )) + .unwrap(); + + result_receiver.await.unwrap().unwrap(); +} diff --git a/p2p/src/tests/helpers/mod.rs b/p2p/src/tests/helpers/mod.rs index b08cf56af6..95153054e9 100644 --- a/p2p/src/tests/helpers/mod.rs +++ b/p2p/src/tests/helpers/mod.rs @@ -30,7 +30,8 @@ use p2p_types::{bannable_address::BannableAddress, socket_address::SocketAddress use test_utils::BasicTestTimeGetter; use crate::{ - net::types::{PeerInfo, PeerRole}, + error::P2pError, + net::types::{ConnectivityEventMessageTag, PeerInfo, PeerRole}, peer_manager::{self, dns_seed::DnsSeed}, }; @@ -61,6 +62,17 @@ pub enum PeerManagerNotification { address: SocketAddress, peer_role: PeerRole, }, + OutboundError { + address: SocketAddress, + error: P2pError, + }, + ConnectionClosed { + peer_id: PeerId, + }, + MessageReceived { + peer_id: PeerId, + message_tag: ConnectivityEventMessageTag, + }, } pub struct PeerManagerObserver { @@ -103,6 +115,21 @@ impl peer_manager::Observer for PeerManagerObserver { fn on_connection_accepted(&mut self, address: SocketAddress, peer_role: PeerRole) { self.send_notification(PeerManagerNotification::ConnectionAccepted { address, peer_role }); } + + fn outbound_error(&mut self, address: SocketAddress, error: P2pError) { + self.send_notification(PeerManagerNotification::OutboundError { address, error }); + } + + fn connection_closed(&mut self, peer_id: PeerId) { + self.send_notification(PeerManagerNotification::ConnectionClosed { peer_id }); + } + + fn message_received(&mut self, peer_id: PeerId, message_tag: ConnectivityEventMessageTag) { + self.send_notification(PeerManagerNotification::MessageReceived { + peer_id, + message_tag, + }); + } } #[derive(Debug, PartialEq, Eq)] diff --git a/test-utils/src/basic_test_time_getter.rs b/test-utils/src/basic_test_time_getter.rs index 4a2c66b9b4..41b81e1945 100644 --- a/test-utils/src/basic_test_time_getter.rs +++ b/test-utils/src/basic_test_time_getter.rs @@ -46,6 +46,18 @@ impl BasicTestTimeGetter { self.current_time_millis.fetch_add(duration.as_millis() as u64); } + // Same as advance_time, except that if the passed duration's precision is not representable by + // the time getter (i.e. it's sub-millisecond), this function will round it up, ensuring that + // the resulting time is always bigger than the initial time plus duration. + pub fn advance_time_rounded_up(&self, duration: Duration) { + let mut millis = duration.as_millis() as u64; + if duration.subsec_nanos() % 1_000_000 != 0 { + millis += 1; + } + + self.current_time_millis.fetch_add(millis); + } + pub fn is_same_instance(&self, other: &BasicTestTimeGetter) -> bool { Arc::ptr_eq(&self.current_time_millis, &other.current_time_millis) } diff --git a/utils/src/exp_rand/mod.rs b/utils/src/exp_rand/mod.rs index 4e8d93c214..72d88b7b2a 100644 --- a/utils/src/exp_rand/mod.rs +++ b/utils/src/exp_rand/mod.rs @@ -15,7 +15,9 @@ use randomness::Rng; -/// Returns a value sampled from an exponential distribution with a mean of 1.0 +/// Returns a value sampled from an exponential distribution with a mean of 1.0. +/// +/// The result will be in the range (0, max], where max = -ln(f64::MIN_POSITIVE) ~= 708.396418532264. pub fn exponential_rand(rng: &mut impl Rng) -> f64 { let mut random_f64 = rng.gen::(); // The generated number will be in the range [0, 1). Turn it into (0, 1) to avoid From 0141c6493e4504d635c7eaa0fefb7355f7a64612 Mon Sep 17 00:00:00 2001 From: Mykhailo Kremniov Date: Thu, 12 Mar 2026 13:27:23 +0200 Subject: [PATCH 2/3] P2p: send a `FirstSyncMessageReceived` message to the peer manager once instead of sending `TransactionSyncMessageTag`/`BlockSyncMessageTag` on each sync message. Add a test to check that the message is actually sent. Some renaming. --- .../src/crawler_p2p/crawler_manager/mod.rs | 9 +- p2p/src/message.rs | 6 +- p2p/src/net/default_backend/backend.rs | 4 +- p2p/src/net/default_backend/peer.rs | 60 +-- p2p/src/net/default_backend/types.rs | 4 +- p2p/src/net/types.rs | 40 +- p2p/src/peer_manager/mod.rs | 19 +- .../unsuccessful_connection_counter_update.rs | 13 +- p2p/src/tests/helpers/mod.rs | 6 +- p2p/src/tests/helpers/test_node.rs | 6 + p2p/src/tests/mod.rs | 1 + p2p/src/tests/peer_mgr_events.rs | 351 ++++++++++++++++++ 12 files changed, 435 insertions(+), 84 deletions(-) create mode 100644 p2p/src/tests/peer_mgr_events.rs diff --git a/dns-server/src/crawler_p2p/crawler_manager/mod.rs b/dns-server/src/crawler_p2p/crawler_manager/mod.rs index 9bc5d0f28a..8d88c9d521 100644 --- a/dns-server/src/crawler_p2p/crawler_manager/mod.rs +++ b/dns-server/src/crawler_p2p/crawler_manager/mod.rs @@ -34,7 +34,7 @@ use p2p::{ PingResponse, }, net::{ - types::{ConnectivityEvent, PeerManagerMessageOrTag, SyncingEvent}, + types::{ConnectivityEvent, PeerManagerMessageExt, SyncingEvent}, ConnectivityService, NetworkingService, SyncingEventReceiver, }, peer_manager::{ @@ -211,12 +211,11 @@ where fn handle_conn_message( &mut self, peer_id: PeerId, - message: PeerManagerMessageOrTag, + message: PeerManagerMessageExt, ) -> p2p::Result<()> { let peer_mgr_message = match message { - PeerManagerMessageOrTag::PeerManagerMessage(message) => message, - PeerManagerMessageOrTag::BlockSyncMessage(_) - | PeerManagerMessageOrTag::TransactionSyncMessage(_) => { + PeerManagerMessageExt::PeerManagerMessage(message) => message, + PeerManagerMessageExt::FirstSyncMessageReceived => { // Ignored return Ok(()); } diff --git a/p2p/src/message.rs b/p2p/src/message.rs index 38d3bf92fe..fb91ecd506 100644 --- a/p2p/src/message.rs +++ b/p2p/src/message.rs @@ -26,7 +26,7 @@ use serialization::{Decode, Encode}; use crate::types::peer_address::PeerAddress; #[derive(Debug, Clone, PartialEq, Eq, strum::EnumDiscriminants)] -#[strum_discriminants(name(BlockSyncMessageTag))] +#[strum_discriminants(name(BlockSyncMessageTag), derive(strum::EnumIter))] pub enum BlockSyncMessage { HeaderListRequest(HeaderListRequest), BlockListRequest(BlockListRequest), @@ -42,7 +42,7 @@ pub enum BlockSyncMessage { } #[derive(Debug, Clone, PartialEq, Eq, strum::EnumDiscriminants)] -#[strum_discriminants(name(TransactionSyncMessageTag))] +#[strum_discriminants(name(TransactionSyncMessageTag), derive(strum::EnumIter))] pub enum TransactionSyncMessage { NewTransaction(Id), TransactionRequest(Id), @@ -50,7 +50,7 @@ pub enum TransactionSyncMessage { } #[derive(Debug, Clone, PartialEq, Eq, strum::EnumDiscriminants)] -#[strum_discriminants(name(PeerManagerMessageTag))] +#[strum_discriminants(name(PeerManagerMessageTag), derive(strum::EnumIter))] pub enum PeerManagerMessage { AddrListRequest(AddrListRequest), AnnounceAddrRequest(AnnounceAddrRequest), diff --git a/p2p/src/net/default_backend/backend.rs b/p2p/src/net/default_backend/backend.rs index 5093381236..ffd001735d 100644 --- a/p2p/src/net/default_backend/backend.rs +++ b/p2p/src/net/default_backend/backend.rs @@ -50,7 +50,7 @@ use crate::{ types::{BackendEvent, Command, PeerEvent}, }, types::{ - services::Services, ConnectivityEvent, PeerInfo, PeerManagerMessageOrTag, SyncingEvent, + services::Services, ConnectivityEvent, PeerInfo, PeerManagerMessageExt, SyncingEvent, }, }, protocol::{ProtocolVersion, SupportedProtocolVersion}, @@ -697,7 +697,7 @@ where fn handle_message( &mut self, peer_id: PeerId, - message: PeerManagerMessageOrTag, + message: PeerManagerMessageExt, ) -> crate::Result<()> { // Do not process remaining messages if the peer has been forcibly disconnected (for example, after being banned). // Without this check, the backend might send messages to the sync and peer managers after sending the disconnect notification. diff --git a/p2p/src/net/default_backend/peer.rs b/p2p/src/net/default_backend/peer.rs index 4c5ce5eb2a..b46d7def15 100644 --- a/p2p/src/net/default_backend/peer.rs +++ b/p2p/src/net/default_backend/peer.rs @@ -33,7 +33,7 @@ use crate::{ message::{BlockSyncMessage, TransactionSyncMessage, WillDisconnectMessage}, net::{ default_backend::types::{BackendEvent, PeerEvent}, - types::PeerManagerMessageOrTag, + types::PeerManagerMessageExt, }, protocol::{choose_common_protocol_version, ProtocolVersion, SupportedProtocolVersion}, types::peer_id::PeerId, @@ -83,6 +83,10 @@ pub struct Peer { /// Time getter time_getter: TimeGetter, + + /// Will be set to true once at least one BlockSyncMessage or TransactionSyncMessage has been + /// received from the peer. + sync_message_received: bool, } impl Peer @@ -115,6 +119,7 @@ where node_protocol_version, time_getter, common_protocol_version: None, + sync_message_received: false, } } @@ -393,17 +398,17 @@ where // Note: the channels used by this function to propagate messages to other parts of p2p // must be bounded; this is important to prevent DoS attacks. async fn handle_socket_msg( + &mut self, peer_id: PeerId, msg: Message, - peer_event_sender: &mut mpsc::Sender, - block_sync_msg_sender: &mut mpsc::Sender, - transaction_sync_msg_sender: &mut mpsc::Sender, + block_sync_msg_sender: &mpsc::Sender, + transaction_sync_msg_sender: &mpsc::Sender, ) -> crate::Result<()> { match msg.categorize() { CategorizedMessage::Handshake(_) => { log::error!("Peer {peer_id} sent unexpected handshake message"); - peer_event_sender + self.peer_event_sender .send(PeerEvent::Misbehaved { error: P2pError::ProtocolError(ProtocolError::UnexpectedMessage( "Unexpected handshake message".to_owned(), @@ -412,29 +417,35 @@ where .await?; } CategorizedMessage::PeerManagerMessage(msg) => { - peer_event_sender + self.peer_event_sender .send(PeerEvent::MessageReceived( - PeerManagerMessageOrTag::PeerManagerMessage(msg), + PeerManagerMessageExt::PeerManagerMessage(msg), )) .await? } CategorizedMessage::BlockSyncMessage(msg) => { - peer_event_sender - .send(PeerEvent::MessageReceived( - PeerManagerMessageOrTag::BlockSyncMessage((&msg).into()), - )) - .await?; - - block_sync_msg_sender.send(msg).await? + block_sync_msg_sender.send(msg).await?; + + if !self.sync_message_received { + self.peer_event_sender + .send(PeerEvent::MessageReceived( + PeerManagerMessageExt::FirstSyncMessageReceived, + )) + .await?; + self.sync_message_received = true; + } } CategorizedMessage::TransactionSyncMessage(msg) => { - peer_event_sender - .send(PeerEvent::MessageReceived( - PeerManagerMessageOrTag::TransactionSyncMessage((&msg).into()), - )) - .await?; - - transaction_sync_msg_sender.send(msg).await? + transaction_sync_msg_sender.send(msg).await?; + + if !self.sync_message_received { + self.peer_event_sender + .send(PeerEvent::MessageReceived( + PeerManagerMessageExt::FirstSyncMessageReceived, + )) + .await?; + self.sync_message_received = true; + } } } @@ -507,12 +518,11 @@ where event = self.socket.recv(), if sync_msg_senders_opt.is_some() => match event { Ok(message) => { let sync_msg_senders = sync_msg_senders_opt.as_mut().expect("sync_msg_senders_opt is some"); - Self::handle_socket_msg( + self.handle_socket_msg( self.peer_id, message, - &mut self.peer_event_sender, - &mut sync_msg_senders.0, - &mut sync_msg_senders.1, + &sync_msg_senders.0, + &sync_msg_senders.1, ).await?; } Err(err) => { diff --git a/p2p/src/net/default_backend/types.rs b/p2p/src/net/default_backend/types.rs index 236e1204e2..3e26a357ac 100644 --- a/p2p/src/net/default_backend/types.rs +++ b/p2p/src/net/default_backend/types.rs @@ -32,7 +32,7 @@ use crate::{ BlockSyncMessage, HeaderList, HeaderListRequest, PeerManagerMessage, PingRequest, PingResponse, TransactionResponse, TransactionSyncMessage, WillDisconnectMessage, }, - net::types::{services::Services, PeerManagerMessageOrTag}, + net::types::{services::Services, PeerManagerMessageExt}, protocol::{ProtocolVersion, SupportedProtocolVersion}, types::{peer_address::PeerAddress, peer_id::PeerId}, }; @@ -119,7 +119,7 @@ pub enum PeerEvent { ConnectionClosed, /// Message received from remote - MessageReceived(PeerManagerMessageOrTag), + MessageReceived(PeerManagerMessageExt), /// Protocol violation Misbehaved { error: P2pError }, diff --git a/p2p/src/net/types.rs b/p2p/src/net/types.rs index 2d726696f0..821a89d71f 100644 --- a/p2p/src/net/types.rs +++ b/p2p/src/net/types.rs @@ -27,8 +27,7 @@ use tokio::sync::mpsc::Receiver; use crate::{ error::ConnectionValidationError, message::{ - BlockSyncMessage, BlockSyncMessageTag, PeerManagerMessage, PeerManagerMessageTag, - TransactionSyncMessage, TransactionSyncMessageTag, + BlockSyncMessage, PeerManagerMessage, PeerManagerMessageTag, TransactionSyncMessage, }, protocol::SupportedProtocolVersion, types::{peer_address::PeerAddress, peer_id::PeerId}, @@ -166,11 +165,9 @@ impl Display for PeerInfo { #[derive(Debug)] pub enum ConnectivityEvent { /// A message received from a peer. - /// - /// Note that only a message tag is present here for block and transaction sync messages. Message { peer_id: PeerId, - message: PeerManagerMessageOrTag, + message: PeerManagerMessageExt, }, /// Outbound connection accepted @@ -248,37 +245,34 @@ pub enum ConnectivityEvent { }, } -/// Either a full `PeerManagerMessage` or, if it's a sync message, the corresponding tag. -#[derive(Debug, Clone)] -pub enum PeerManagerMessageOrTag { +#[derive(Debug)] +pub enum PeerManagerMessageExt { + // The complete PeerManagerMessage PeerManagerMessage(PeerManagerMessage), - BlockSyncMessage(BlockSyncMessageTag), - TransactionSyncMessage(TransactionSyncMessageTag), + + // An indicator that the first sync message (i.e. BlockSyncMessage or TransactionSyncMessage) + // has been received from the peer. + FirstSyncMessageReceived, } -impl From for PeerManagerMessageOrTag { +impl From for PeerManagerMessageExt { fn from(value: PeerManagerMessage) -> Self { Self::PeerManagerMessage(value) } } +/// Tag type for `PeerManagerMessageExt`. #[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub enum ConnectivityEventMessageTag { +pub enum PeerManagerMessageExtTag { PeerManagerMessage(PeerManagerMessageTag), - BlockSyncMessage(BlockSyncMessageTag), - TransactionSyncMessage(TransactionSyncMessageTag), + FirstSyncMessageReceived, } -impl From<&'_ PeerManagerMessageOrTag> for ConnectivityEventMessageTag { - fn from(value: &'_ PeerManagerMessageOrTag) -> Self { +impl From<&'_ PeerManagerMessageExt> for PeerManagerMessageExtTag { + fn from(value: &'_ PeerManagerMessageExt) -> Self { match value { - PeerManagerMessageOrTag::PeerManagerMessage(msg) => { - Self::PeerManagerMessage(msg.into()) - } - PeerManagerMessageOrTag::BlockSyncMessage(tag) => Self::BlockSyncMessage(*tag), - PeerManagerMessageOrTag::TransactionSyncMessage(tag) => { - Self::TransactionSyncMessage(*tag) - } + PeerManagerMessageExt::PeerManagerMessage(msg) => Self::PeerManagerMessage(msg.into()), + PeerManagerMessageExt::FirstSyncMessageReceived => Self::FirstSyncMessageReceived, } } } diff --git a/p2p/src/peer_manager/mod.rs b/p2p/src/peer_manager/mod.rs index 6495c7169e..13ecb74e9c 100644 --- a/p2p/src/peer_manager/mod.rs +++ b/p2p/src/peer_manager/mod.rs @@ -63,8 +63,7 @@ use crate::{ net::{ types::{ services::{Service, Services}, - ConnectivityEvent, ConnectivityEventMessageTag, PeerInfo, PeerManagerMessageOrTag, - PeerRole, + ConnectivityEvent, PeerInfo, PeerManagerMessageExt, PeerManagerMessageExtTag, PeerRole, }, ConnectivityService, NetworkingService, }, @@ -1563,9 +1562,9 @@ where } } - fn handle_incoming_message(&mut self, peer_id: PeerId, message: PeerManagerMessageOrTag) { + fn handle_incoming_message(&mut self, peer_id: PeerId, message: PeerManagerMessageExt) { let is_disconnection_message = match &message { - PeerManagerMessageOrTag::PeerManagerMessage(msg) => match msg { + PeerManagerMessageExt::PeerManagerMessage(msg) => match msg { PeerManagerMessage::WillDisconnect(_) => true, PeerManagerMessage::AddrListRequest(_) @@ -1574,8 +1573,7 @@ where | PeerManagerMessage::AddrListResponse(_) | PeerManagerMessage::PingResponse(_) => false, }, - PeerManagerMessageOrTag::BlockSyncMessage(_) - | PeerManagerMessageOrTag::TransactionSyncMessage(_) => false, + PeerManagerMessageExt::FirstSyncMessageReceived => false, }; // Note: `PeerContext` must always exist when an incoming message arrives, and the individual @@ -1595,10 +1593,10 @@ where return; } - let message_tag: ConnectivityEventMessageTag = (&message).into(); + let message_tag: PeerManagerMessageExtTag = (&message).into(); match message { - PeerManagerMessageOrTag::PeerManagerMessage(msg) => match msg { + PeerManagerMessageExt::PeerManagerMessage(msg) => match msg { PeerManagerMessage::AddrListRequest(_) => { self.handle_addr_list_request(peer_id); } @@ -1618,8 +1616,7 @@ where self.handle_will_disconnect_messgae(peer_id, msg); } }, - PeerManagerMessageOrTag::BlockSyncMessage(_) - | PeerManagerMessageOrTag::TransactionSyncMessage(_) => {} + PeerManagerMessageExt::FirstSyncMessageReceived => {} }; if let Some(o) = self.observer.as_mut() { @@ -2481,7 +2478,7 @@ pub trait Observer { // This will be called after `ConnectivityEvent::Message` has been handled by // the peer manager. - fn message_received(&mut self, peer_id: PeerId, message_tag: ConnectivityEventMessageTag); + fn message_received(&mut self, peer_id: PeerId, message_tag: PeerManagerMessageExtTag); } pub trait PeerManagerInterface { diff --git a/p2p/src/peer_manager/tests/unsuccessful_connection_counter_update.rs b/p2p/src/peer_manager/tests/unsuccessful_connection_counter_update.rs index fde1303324..edd169d29e 100644 --- a/p2p/src/peer_manager/tests/unsuccessful_connection_counter_update.rs +++ b/p2p/src/peer_manager/tests/unsuccessful_connection_counter_update.rs @@ -44,12 +44,9 @@ use crate::{ config::P2pConfig, disconnection_reason::DisconnectionReason, error::{DialError, P2pError}, - message::BlockSyncMessageTag, net::{ default_backend::types::{Command, Message}, - types::{ - ConnectivityEvent, ConnectivityEventMessageTag, PeerManagerMessageOrTag, PeerRole, - }, + types::{ConnectivityEvent, PeerManagerMessageExt, PeerManagerMessageExtTag, PeerRole}, }, peer_manager::{ self, @@ -745,9 +742,7 @@ async fn auto_connection_without_peer_activity( conn_event_sender .send(ConnectivityEvent::Message { peer_id, - message: PeerManagerMessageOrTag::BlockSyncMessage( - BlockSyncMessageTag::HeaderListRequest, - ), + message: PeerManagerMessageExt::FirstSyncMessageReceived, }) .unwrap(); @@ -756,9 +751,7 @@ async fn auto_connection_without_peer_activity( peer_mgr_notification, PeerManagerNotification::MessageReceived { peer_id, - message_tag: ConnectivityEventMessageTag::BlockSyncMessage( - BlockSyncMessageTag::HeaderListRequest - ) + message_tag: PeerManagerMessageExtTag::FirstSyncMessageReceived } ); } diff --git a/p2p/src/tests/helpers/mod.rs b/p2p/src/tests/helpers/mod.rs index 95153054e9..8a5e86a7a9 100644 --- a/p2p/src/tests/helpers/mod.rs +++ b/p2p/src/tests/helpers/mod.rs @@ -31,7 +31,7 @@ use test_utils::BasicTestTimeGetter; use crate::{ error::P2pError, - net::types::{ConnectivityEventMessageTag, PeerInfo, PeerRole}, + net::types::{PeerInfo, PeerManagerMessageExtTag, PeerRole}, peer_manager::{self, dns_seed::DnsSeed}, }; @@ -71,7 +71,7 @@ pub enum PeerManagerNotification { }, MessageReceived { peer_id: PeerId, - message_tag: ConnectivityEventMessageTag, + message_tag: PeerManagerMessageExtTag, }, } @@ -124,7 +124,7 @@ impl peer_manager::Observer for PeerManagerObserver { self.send_notification(PeerManagerNotification::ConnectionClosed { peer_id }); } - fn message_received(&mut self, peer_id: PeerId, message_tag: ConnectivityEventMessageTag) { + fn message_received(&mut self, peer_id: PeerId, message_tag: PeerManagerMessageExtTag) { self.send_notification(PeerManagerNotification::MessageReceived { peer_id, message_tag, diff --git a/p2p/src/tests/helpers/test_node.rs b/p2p/src/tests/helpers/test_node.rs index 162dd75c6f..69f1e4dadb 100644 --- a/p2p/src/tests/helpers/test_node.rs +++ b/p2p/src/tests/helpers/test_node.rs @@ -320,6 +320,12 @@ where self.peer_mgr_notification_receiver.try_recv().ok() } + pub fn peer_mgr_notification_receiver( + &mut self, + ) -> &mut mpsc::UnboundedReceiver { + &mut self.peer_mgr_notification_receiver + } + pub async fn get_peers_info(&self) -> TestPeersInfo { query_peer_manager(&self.peer_mgr_event_sender, |peer_mgr| { TestPeersInfo::from_peer_mgr_peer_contexts(peer_mgr.peers()) diff --git a/p2p/src/tests/mod.rs b/p2p/src/tests/mod.rs index d7529d27d5..9a56e4a5e5 100644 --- a/p2p/src/tests/mod.rs +++ b/p2p/src/tests/mod.rs @@ -24,6 +24,7 @@ mod incorrect_handshake; mod min_peer_software_version; mod misbehavior; mod peer_discovery_on_stale_tip; +mod peer_mgr_events; mod same_handshake_nonce; mod unsupported_message; mod unsupported_version; diff --git a/p2p/src/tests/peer_mgr_events.rs b/p2p/src/tests/peer_mgr_events.rs new file mode 100644 index 0000000000..45156e64cf --- /dev/null +++ b/p2p/src/tests/peer_mgr_events.rs @@ -0,0 +1,351 @@ +// Copyright (c) 2026 RBB S.r.l +// opensource@mintlayer.org +// SPDX-License-Identifier: MIT +// Licensed under the MIT License; +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + sync::Arc, +}; + +use rstest::rstest; +use strum::IntoEnumIterator as _; + +use chainstate::{ChainstateConfig, Locator}; +use common::primitives::Id; +use networking::{ + test_helpers::{TestTransportChannel, TestTransportMaker}, + transport::{BufferedTranscoder, TransportListener, TransportSocket}, +}; +use p2p_test_utils::{run_with_timeout, MEDIUM_TIMEOUT, SHORT_TIMEOUT}; +use p2p_types::peer_address::PeerAddress; +use randomness::{seq::IteratorRandom as _, Rng}; +use test_utils::{ + assert_matches, + random::{gen_random_alnum_string, make_seedable_rng, Seed}, + BasicTestTimeGetter, +}; + +use crate::{ + message::{ + AddrListRequest, AddrListResponse, AnnounceAddrRequest, BlockListRequest, BlockResponse, + BlockSyncMessageTag, HeaderList, HeaderListRequest, PeerManagerMessageTag, PingRequest, + PingResponse, TransactionResponse, TransactionSyncMessageTag, WillDisconnectMessage, + }, + net::{ + default_backend::types::{HandshakeMessage, Message, P2pTimestamp}, + types::PeerManagerMessageExtTag, + }, + sync::test_helpers::make_new_block, + test_helpers::{test_p2p_config, TEST_PROTOCOL_VERSION}, + tests::helpers::{PeerManagerNotification, TestNode}, +}; + +type Transport = ::Transport; + +#[allow(clippy::enum_variant_names)] +#[derive(Debug)] +enum TestParamForFirstSyncMessageReceivedMustBeSent { + BlockSyncMsg, + TxSyncMsg, + PeerMgrMsg, +} + +// Check that once the peer sends us any block or transaction sync message, `PeerManagerMessageExt::FirstSyncMessageReceived` +// is sent to the peer manager. +// 1) Send a random message from the peer. +// 2) If the sent message was a sync one, expect that `PeerManagerNotification::MessageReceived` is emitted with +// message equal to `PeerManagerMessageExtTag::FirstSyncMessageReceived`. If the sent message was not a sync one, +// expect that no such notification is emitted. +#[tracing::instrument(skip(seed))] +#[rstest] +#[trace] +#[case(Seed::from_entropy())] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn first_sync_message_received_must_be_sent( + #[case] seed: Seed, + #[values( + TestParamForFirstSyncMessageReceivedMustBeSent::BlockSyncMsg, + TestParamForFirstSyncMessageReceivedMustBeSent::TxSyncMsg, + TestParamForFirstSyncMessageReceivedMustBeSent::PeerMgrMsg + )] + param: TestParamForFirstSyncMessageReceivedMustBeSent, +) { + let mut rng = make_seedable_rng(seed); + + run_with_timeout(async { + let time_getter = BasicTestTimeGetter::new(); + let chain_config = Arc::new(common::chain::config::create_unit_test_config()); + let p2p_config = Arc::new(test_p2p_config()); + + let mut test_node = TestNode::::start( + true, + time_getter.clone(), + Arc::clone(&chain_config), + ChainstateConfig::new(), + Arc::clone(&p2p_config), + TestTransportChannel::make_transport(), + TestTransportChannel::make_address().into(), + TEST_PROTOCOL_VERSION.into(), + None, + make_seedable_rng(rng.gen()), + ) + .await; + + let transport = TestTransportChannel::make_transport(); + let mut listener = + transport.bind(vec![TestTransportChannel::make_address()]).await.unwrap(); + + let connect_result_receiver = + test_node.start_connecting(listener.local_addresses().unwrap()[0].into()); + + let (stream, _) = listener.accept().await.unwrap(); + + let mut msg_stream = + BufferedTranscoder::new(stream, Some(*p2p_config.protocol_config.max_message_size)); + + let msg = msg_stream.recv().await.unwrap(); + assert_matches!(msg, Message::Handshake(HandshakeMessage::Hello { .. })); + + msg_stream + .send(Message::Handshake(HandshakeMessage::HelloAck { + protocol_version: TEST_PROTOCOL_VERSION.into(), + network: *chain_config.magic_bytes(), + user_agent: p2p_config.user_agent.clone(), + software_version: *chain_config.software_version(), + services: (*p2p_config.node_type).into(), + receiver_address: None, + current_time: P2pTimestamp::from_time(time_getter.get_time_getter().get_time()), + })) + .await + .unwrap(); + + connect_result_receiver.await.unwrap().unwrap(); + + let (msg_to_send, is_sync_msg) = match param { + TestParamForFirstSyncMessageReceivedMustBeSent::BlockSyncMsg => { + let msg = match BlockSyncMessageTag::iter().choose(&mut rng).unwrap() { + BlockSyncMessageTag::HeaderListRequest => { + Message::HeaderListRequest(HeaderListRequest::new(Locator::new(vec![ + Id::random_using(&mut rng), + ]))) + } + BlockSyncMessageTag::BlockListRequest => Message::BlockListRequest( + BlockListRequest::new(vec![Id::random_using(&mut rng)]), + ), + BlockSyncMessageTag::HeaderList => { + Message::HeaderList(HeaderList::new(Vec::new())) + } + BlockSyncMessageTag::BlockResponse => { + Message::BlockResponse(BlockResponse::new(make_new_block( + &chain_config, + None, + &time_getter.get_time_getter(), + &mut rng, + ))) + } + BlockSyncMessageTag::TestSentinel => { + Message::TestBlockSyncMsgSentinel(Id::random_using(&mut rng)) + } + }; + + (msg, true) + } + TestParamForFirstSyncMessageReceivedMustBeSent::TxSyncMsg => { + let msg = match TransactionSyncMessageTag::iter().choose(&mut rng).unwrap() { + TransactionSyncMessageTag::NewTransaction => { + Message::NewTransaction(Id::random_using(&mut rng)) + } + TransactionSyncMessageTag::TransactionRequest => { + Message::TransactionRequest(Id::random_using(&mut rng)) + } + TransactionSyncMessageTag::TransactionResponse => Message::TransactionResponse( + TransactionResponse::NotFound(Id::random_using(&mut rng)), + ), + }; + + (msg, true) + } + TestParamForFirstSyncMessageReceivedMustBeSent::PeerMgrMsg => { + let msg = match PeerManagerMessageTag::iter().choose(&mut rng).unwrap() { + PeerManagerMessageTag::AddrListRequest => { + Message::AddrListRequest(AddrListRequest {}) + } + PeerManagerMessageTag::AnnounceAddrRequest => { + Message::AnnounceAddrRequest(AnnounceAddrRequest { + address: random_peer_addr(&mut rng), + }) + } + PeerManagerMessageTag::PingRequest => { + Message::PingRequest(PingRequest { nonce: rng.gen() }) + } + PeerManagerMessageTag::AddrListResponse => { + Message::AddrListResponse(AddrListResponse { + addresses: vec![random_peer_addr(&mut rng)], + }) + } + PeerManagerMessageTag::PingResponse => { + Message::PingResponse(PingResponse { nonce: rng.gen() }) + } + PeerManagerMessageTag::WillDisconnect => { + Message::WillDisconnect(WillDisconnectMessage { + reason: gen_random_alnum_string(&mut rng, 10, 20), + }) + } + }; + + (msg, false) + } + }; + + msg_stream.send(msg_to_send).await.unwrap(); + + if is_sync_msg { + tokio::time::timeout( + MEDIUM_TIMEOUT, + recv_first_sync_message_received(&mut test_node), + ) + .await + .unwrap(); + } else { + tokio::time::timeout( + SHORT_TIMEOUT, + recv_first_sync_message_received(&mut test_node), + ) + .await + .unwrap_err(); + } + + test_node.join().await; + }) + .await; +} + +// Check that `PeerManagerMessageExt::FirstSyncMessageReceived` will be sent only once even if +// many sync messages are received from the peer. +// 1) Send a few HeaderListRequest and/or NewTransaction (legit messages that a peer can send +// in any number). +// 2) Expect that only one `PeerManagerNotification::MessageReceived` with `PeerManagerMessageExtTag::FirstSyncMessageReceived` +// is produced. +#[tracing::instrument(skip(seed))] +#[rstest] +#[trace] +#[case(Seed::from_entropy())] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn first_sync_message_received_must_be_sent_only_once(#[case] seed: Seed) { + let mut rng = make_seedable_rng(seed); + + run_with_timeout(async { + let time_getter = BasicTestTimeGetter::new(); + let chain_config = Arc::new(common::chain::config::create_unit_test_config()); + let p2p_config = Arc::new(test_p2p_config()); + + let mut test_node = TestNode::::start( + true, + time_getter.clone(), + Arc::clone(&chain_config), + ChainstateConfig::new(), + Arc::clone(&p2p_config), + TestTransportChannel::make_transport(), + TestTransportChannel::make_address().into(), + TEST_PROTOCOL_VERSION.into(), + None, + make_seedable_rng(rng.gen()), + ) + .await; + + let transport = TestTransportChannel::make_transport(); + let mut listener = + transport.bind(vec![TestTransportChannel::make_address()]).await.unwrap(); + + let connect_result_receiver = + test_node.start_connecting(listener.local_addresses().unwrap()[0].into()); + + let (stream, _) = listener.accept().await.unwrap(); + + let mut msg_stream = + BufferedTranscoder::new(stream, Some(*p2p_config.protocol_config.max_message_size)); + + let msg = msg_stream.recv().await.unwrap(); + assert_matches!(msg, Message::Handshake(HandshakeMessage::Hello { .. })); + + msg_stream + .send(Message::Handshake(HandshakeMessage::HelloAck { + protocol_version: TEST_PROTOCOL_VERSION.into(), + network: *chain_config.magic_bytes(), + user_agent: p2p_config.user_agent.clone(), + software_version: *chain_config.software_version(), + services: (*p2p_config.node_type).into(), + receiver_address: None, + current_time: P2pTimestamp::from_time(time_getter.get_time_getter().get_time()), + })) + .await + .unwrap(); + + connect_result_receiver.await.unwrap().unwrap(); + + for _ in 5..10 { + let msg = if rng.gen_bool(0.5) { + Message::HeaderListRequest(HeaderListRequest::new(Locator::new(vec![ + Id::random_using(&mut rng), + ]))) + } else { + Message::NewTransaction(Id::random_using(&mut rng)) + }; + + msg_stream.send(msg).await.unwrap(); + } + + tokio::time::timeout( + MEDIUM_TIMEOUT, + recv_first_sync_message_received(&mut test_node), + ) + .await + .unwrap(); + + tokio::time::timeout( + SHORT_TIMEOUT, + recv_first_sync_message_received(&mut test_node), + ) + .await + .unwrap_err(); + + test_node.join().await; + }) + .await; +} + +fn random_peer_addr(rng: &mut impl Rng) -> PeerAddress { + SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(rng.gen(), rng.gen(), rng.gen(), rng.gen()), + rng.gen(), + )) + .into() +} + +async fn recv_first_sync_message_received(test_node: &mut TestNode) { + while let Some(notif) = test_node.peer_mgr_notification_receiver().recv().await { + if let PeerManagerNotification::MessageReceived { + peer_id: _, + message_tag, + } = notif + { + match message_tag { + PeerManagerMessageExtTag::PeerManagerMessage(_) => {} + PeerManagerMessageExtTag::FirstSyncMessageReceived => { + break; + } + } + } + } +} From 5666d8818642ba6325a532d4ed2a1be512b70caa Mon Sep 17 00:00:00 2001 From: Mykhailo Kremniov Date: Thu, 12 Mar 2026 18:09:14 +0200 Subject: [PATCH 3/3] Cleanup --- CHANGELOG.md | 4 ++ p2p/src/net/default_backend/peer.rs | 8 ++-- p2p/src/net/types.rs | 38 +++++++++++++++---- p2p/src/peer_manager/mod.rs | 3 +- .../peer_manager/peerdb/address_data/mod.rs | 35 ++++++++--------- .../peer_manager/peerdb/address_data/tests.rs | 4 +- p2p/src/peer_manager/peerdb/mod.rs | 4 +- .../unsuccessful_connection_counter_update.rs | 11 +++--- p2p/src/peer_manager/tests/utils.rs | 20 ---------- p2p/src/tests/peer_mgr_events.rs | 4 +- test-utils/src/basic_test_time_getter.rs | 2 +- 11 files changed, 71 insertions(+), 62 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 440307887e..32b22c7718 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -68,6 +68,10 @@ The format is loosely based on [Keep a Changelog](https://keepachangelog.com/en/ - Node bootstrapping: - The format of the bootstrap file was changed and the legacy format is no longer supported. + - P2p: + - The logic of initiating new outbound connections has been improved to prevent the node from + constantly attempting to re-establish a connection with a peer that has banned it. + ### Fixed - P2p: when a peer sends a message that can't be decoded, it will now be discouraged (which is what is normally done for misbehaving peers) and the node won't try connecting to it again.\ diff --git a/p2p/src/net/default_backend/peer.rs b/p2p/src/net/default_backend/peer.rs index b46d7def15..d528c6e9a1 100644 --- a/p2p/src/net/default_backend/peer.rs +++ b/p2p/src/net/default_backend/peer.rs @@ -424,8 +424,6 @@ where .await? } CategorizedMessage::BlockSyncMessage(msg) => { - block_sync_msg_sender.send(msg).await?; - if !self.sync_message_received { self.peer_event_sender .send(PeerEvent::MessageReceived( @@ -434,10 +432,10 @@ where .await?; self.sync_message_received = true; } + + block_sync_msg_sender.send(msg).await?; } CategorizedMessage::TransactionSyncMessage(msg) => { - transaction_sync_msg_sender.send(msg).await?; - if !self.sync_message_received { self.peer_event_sender .send(PeerEvent::MessageReceived( @@ -446,6 +444,8 @@ where .await?; self.sync_message_received = true; } + + transaction_sync_msg_sender.send(msg).await?; } } diff --git a/p2p/src/net/types.rs b/p2p/src/net/types.rs index 821a89d71f..ddc7cf90dc 100644 --- a/p2p/src/net/types.rs +++ b/p2p/src/net/types.rs @@ -62,10 +62,10 @@ impl PeerRole { pub fn as_outbound(&self) -> Option { match self { Self::Inbound => None, - Self::OutboundFullRelay => Some(OutboundPeerRole::OutboundFullRelay), - Self::OutboundBlockRelay => Some(OutboundPeerRole::OutboundBlockRelay), - Self::OutboundReserved => Some(OutboundPeerRole::OutboundReserved), - Self::OutboundManual => Some(OutboundPeerRole::OutboundManual), + Self::OutboundFullRelay => Some(OutboundPeerRole::FullRelay), + Self::OutboundBlockRelay => Some(OutboundPeerRole::BlockRelay), + Self::OutboundReserved => Some(OutboundPeerRole::Reserved), + Self::OutboundManual => Some(OutboundPeerRole::Manual), Self::Feeler => Some(OutboundPeerRole::Feeler), } } @@ -88,13 +88,35 @@ impl PeerRole { #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, strum::EnumIter)] pub enum OutboundPeerRole { - OutboundFullRelay, - OutboundBlockRelay, - OutboundReserved, - OutboundManual, + FullRelay, + BlockRelay, + Reserved, + Manual, Feeler, } +impl OutboundPeerRole { + /// Return true if for this connection type some message exchange is expected (besides + /// the handshake and WillDisconnect), i.e. the node is supposed to send at least one message + /// and get back a response. + /// + /// This is used by peerdb's AddressData to determine whether the "no activity" counter + /// should be increased after a connection with no peer activity. + pub fn is_message_exchange_expected(&self) -> bool { + match self { + Self::FullRelay | Self::BlockRelay | Self::Reserved | Self::Manual => true, + Self::Feeler => false, + } + } + + pub fn is_manual(&self) -> bool { + match self { + Self::Manual => true, + | Self::FullRelay | Self::BlockRelay | Self::Reserved | Self::Feeler => false, + } + } +} + /// Peer information learned during handshaking /// /// When an inbound/outbound connection succeeds, the networking service handshakes with the remote diff --git a/p2p/src/peer_manager/mod.rs b/p2p/src/peer_manager/mod.rs index 13ecb74e9c..66c25ba882 100644 --- a/p2p/src/peer_manager/mod.rs +++ b/p2p/src/peer_manager/mod.rs @@ -1552,6 +1552,7 @@ where ); if let Some(address) = address_opt { log::debug!("Need to establish a feeler connection to address {address}"); + self.connect(address, OutboundConnectType::Feeler); self.next_feeler_connection_time = Self::choose_next_feeler_connection_time( &self.p2p_config, @@ -1581,7 +1582,7 @@ where // just in case, so that if things go horribly wrong, only the debug build will panic. if let Some(peer) = self.peers.get(&peer_id) { if !is_disconnection_message && peer.peer_role.is_outbound() { - self.peerdb.outbound_peer_has_activity( + self.peerdb.outbound_peer_had_activity( peer.peer_address, Self::lock_rng(&self.rng).deref_mut(), ); diff --git a/p2p/src/peer_manager/peerdb/address_data/mod.rs b/p2p/src/peer_manager/peerdb/address_data/mod.rs index 6634a7a55b..d0bacbcf13 100644 --- a/p2p/src/peer_manager/peerdb/address_data/mod.rs +++ b/p2p/src/peer_manager/peerdb/address_data/mod.rs @@ -81,7 +81,7 @@ pub enum AddressState { #[strum_discriminants(name(AddressStateTransitionToTag), derive(strum::EnumIter))] pub enum AddressStateTransitionTo { Connected { peer_role: OutboundPeerRole }, - HasActivity, + HadActivity, Disconnected, ConnectionFailed, SetReserved, @@ -239,7 +239,7 @@ impl AddressData { } }, - AddressStateTransitionTo::HasActivity => match self.state { + AddressStateTransitionTo::HadActivity => match self.state { AddressState::Connected { had_activity: _, peer_role, @@ -253,13 +253,13 @@ impl AddressData { was_reachable: _, } => { debug_panic_or_log!( - "Unexpected address state transition: Disconnected -> HasActivity" + "Unexpected address state transition: Disconnected -> HadActivity" ); self.state.clone() } AddressState::Unreachable { erase_after: _ } => { debug_panic_or_log!( - "Unexpected address state transition: Unreachable -> HasActivity" + "Unexpected address state transition: Unreachable -> HadActivity" ); self.state.clone() } @@ -273,19 +273,20 @@ impl AddressData { if had_activity { self.connections_without_activity_count = 0; } else { - // We don't increase the counter for: - // 1) feeler connections (because it gets disconnected immediately, so the peer may - // not have the chance to send us anything); - // 2) manual connections; - let is_regular_auto_connection = match peer_role { - OutboundPeerRole::OutboundFullRelay - | OutboundPeerRole::OutboundBlockRelay - | OutboundPeerRole::OutboundReserved => true, - - OutboundPeerRole::Feeler | OutboundPeerRole::OutboundManual => false, - }; - - if is_regular_auto_connection { + // Note: + // 1) We don't increase the counter for manual connections. + // 2) Since `is_message_exchange_expected` doesn't know the actual services + // that the peer provides, it's technically possible to punish an + // "innocent" peer here, e.g. when the connection type is supposed to involve + // block exchange, but the peer's actual services don't include Blocks. + // However: + // a) The worst punishment it can get is that the next connection will be + // postponed for (roughly) MAX_DELAY_REACHABLE, which is currently 1 hour. + // b) Such a peer will be rather useless anyway. + // c) The connection has to be short enough, so that even a single PingRequest message + // (which requires a response) could not be sent. + // So it's not a big deal. + if peer_role.is_message_exchange_expected() && !peer_role.is_manual() { self.connections_without_activity_count += 1; } } diff --git a/p2p/src/peer_manager/peerdb/address_data/tests.rs b/p2p/src/peer_manager/peerdb/address_data/tests.rs index b7427cd119..7c7cc4cea7 100644 --- a/p2p/src/peer_manager/peerdb/address_data/tests.rs +++ b/p2p/src/peer_manager/peerdb/address_data/tests.rs @@ -53,7 +53,7 @@ fn randomized(#[case] seed: Seed) { AddressStateTransitionToTag::Connected => AddressStateTransitionTo::Connected { peer_role: OutboundPeerRole::iter().choose(&mut rng).unwrap(), }, - AddressStateTransitionToTag::HasActivity => AddressStateTransitionTo::HasActivity, + AddressStateTransitionToTag::HadActivity => AddressStateTransitionTo::HadActivity, AddressStateTransitionToTag::Disconnected => AddressStateTransitionTo::Disconnected, AddressStateTransitionToTag::ConnectionFailed => { AddressStateTransitionTo::ConnectionFailed @@ -68,7 +68,7 @@ fn randomized(#[case] seed: Seed) { AddressStateTransitionTo::Connected { peer_role: _ } => { !address_data.is_connected() } - AddressStateTransitionTo::HasActivity => address_data.is_connected(), + AddressStateTransitionTo::HadActivity => address_data.is_connected(), AddressStateTransitionTo::Disconnected => address_data.is_connected(), AddressStateTransitionTo::ConnectionFailed => !address_data.is_connected(), AddressStateTransitionTo::SetReserved => true, diff --git a/p2p/src/peer_manager/peerdb/mod.rs b/p2p/src/peer_manager/peerdb/mod.rs index 1b4f414e7b..d5224a0226 100644 --- a/p2p/src/peer_manager/peerdb/mod.rs +++ b/p2p/src/peer_manager/peerdb/mod.rs @@ -425,8 +425,8 @@ impl PeerDb { } /// Report that the peer has sent us a message other than the one indicating an intent to disconnect. - pub fn outbound_peer_has_activity(&mut self, address: SocketAddress, rng: &mut impl Rng) { - self.change_address_state(address, AddressStateTransitionTo::HasActivity, rng); + pub fn outbound_peer_had_activity(&mut self, address: SocketAddress, rng: &mut impl Rng) { + self.change_address_state(address, AddressStateTransitionTo::HadActivity, rng); } pub fn remove_address(&mut self, address: &SocketAddress) { diff --git a/p2p/src/peer_manager/tests/unsuccessful_connection_counter_update.rs b/p2p/src/peer_manager/tests/unsuccessful_connection_counter_update.rs index edd169d29e..463141049c 100644 --- a/p2p/src/peer_manager/tests/unsuccessful_connection_counter_update.rs +++ b/p2p/src/peer_manager/tests/unsuccessful_connection_counter_update.rs @@ -286,7 +286,9 @@ async fn auto_connection_fails_peer_state_becomes_disconnected( // Keep peer address's `was_reachable` field at false. // 3) Advance the time so that the corresponding connection would be attempted; make the // connection fail; check that the peer address state is now `Unreachable`. -// 4) Check that no further connection attempts are made. +// 4) Optionally, accept an inbound connection from the peer address; expect that it doesn't +// affect the current address state. +// 5) Check that no further connection attempts are made. #[tracing::instrument(skip(seed))] #[rstest] #[trace] @@ -510,7 +512,6 @@ async fn manual_connection_fails(#[case] seed: Seed, #[values(false, true)] make if make_reachable { let peer_discovery_time = time_getter.get_time_getter().get_time(); - discover_peer(&peer_mgr_event_sender, peer_address, true).await; let peer_addr_state = @@ -609,7 +610,7 @@ async fn manual_connection_fails(#[case] seed: Seed, #[values(false, true)] make // Make the connection succeed and close it immediately; check that `connections_without_activity_count` // has been incremented. // 3) Occasionally, make a successful incoming or manual outgoing connection without peer activity, or an unsuccessful -// manual outgoing one. Expect that this doesn't affect `connections_without_activity_count`. +// manual outgoing connection. Expect that this doesn't affect `connections_without_activity_count`. // 4) On the final iteration make the peer actually send a message. Check that `connections_without_activity_count` has // been reset to zero. // Note that feeler connections are not checked in this test because once a feeler connection succeeds, it won't @@ -765,7 +766,7 @@ async fn auto_connection_without_peer_activity( // An inbound or manual connection without peer activity shouldn't affect connections_without_activity_count. // Same for a failed outbound connection. - let extra_outboun_connection_failed = match rng.gen_range(0..4) { + let extra_outbound_connection_failed = match rng.gen_range(0..4) { 0 => { log::debug!("Accepting an extra inbound connection"); @@ -816,7 +817,7 @@ async fn auto_connection_without_peer_activity( }; let expected_connections_without_activity_count = if is_last_iteration { 0 } else { i + 1 }; - let expected_fail_count = if extra_outboun_connection_failed { + let expected_fail_count = if extra_outbound_connection_failed { 1 } else { 0 diff --git a/p2p/src/peer_manager/tests/utils.rs b/p2p/src/peer_manager/tests/utils.rs index a681ef1fb4..19e202a6c6 100644 --- a/p2p/src/peer_manager/tests/utils.rs +++ b/p2p/src/peer_manager/tests/utils.rs @@ -34,7 +34,6 @@ use crate::{ types::{ConnectivityEvent, PeerInfo}, }, peer_manager::PeerManagerInterface, - peer_manager_event::PeerDisconnectionDbAction, test_helpers::TEST_PROTOCOL_VERSION, tests::helpers::PeerManagerNotification, utils::oneshot_nofail, @@ -299,25 +298,6 @@ pub fn start_manually_connecting( result_receiver } -pub fn disconnect_manually( - peer_mgr_event_sender: &mpsc::UnboundedSender, - peer_id: PeerId, - peerdb_action: PeerDisconnectionDbAction, -) -> oneshot_nofail::Receiver> { - let (result_sender, result_receiver) = oneshot_nofail::channel(); - - peer_mgr_event_sender - .send(PeerManagerEvent::Disconnect( - peer_id, - peerdb_action, - None, - result_sender, - )) - .unwrap(); - - result_receiver -} - pub async fn adjust_peer_score( peer_mgr_event_sender: &mpsc::UnboundedSender, peer_id: PeerId, diff --git a/p2p/src/tests/peer_mgr_events.rs b/p2p/src/tests/peer_mgr_events.rs index 45156e64cf..6ee74df9aa 100644 --- a/p2p/src/tests/peer_mgr_events.rs +++ b/p2p/src/tests/peer_mgr_events.rs @@ -65,7 +65,7 @@ enum TestParamForFirstSyncMessageReceivedMustBeSent { // is sent to the peer manager. // 1) Send a random message from the peer. // 2) If the sent message was a sync one, expect that `PeerManagerNotification::MessageReceived` is emitted with -// message equal to `PeerManagerMessageExtTag::FirstSyncMessageReceived`. If the sent message was not a sync one, +// the message equal to `PeerManagerMessageExtTag::FirstSyncMessageReceived`. If the sent message was not a sync one, // expect that no such notification is emitted. #[tracing::instrument(skip(seed))] #[rstest] @@ -233,7 +233,7 @@ async fn first_sync_message_received_must_be_sent( // Check that `PeerManagerMessageExt::FirstSyncMessageReceived` will be sent only once even if // many sync messages are received from the peer. -// 1) Send a few HeaderListRequest and/or NewTransaction (legit messages that a peer can send +// 1) Send a few `HeaderListRequest` and/or `NewTransaction` messages (i.e. some legit messages that a peer can send // in any number). // 2) Expect that only one `PeerManagerNotification::MessageReceived` with `PeerManagerMessageExtTag::FirstSyncMessageReceived` // is produced. diff --git a/test-utils/src/basic_test_time_getter.rs b/test-utils/src/basic_test_time_getter.rs index 41b81e1945..1af2161f5c 100644 --- a/test-utils/src/basic_test_time_getter.rs +++ b/test-utils/src/basic_test_time_getter.rs @@ -48,7 +48,7 @@ impl BasicTestTimeGetter { // Same as advance_time, except that if the passed duration's precision is not representable by // the time getter (i.e. it's sub-millisecond), this function will round it up, ensuring that - // the resulting time is always bigger than the initial time plus duration. + // the resulting time is always greater-or-equal than the initial time plus the duration. pub fn advance_time_rounded_up(&self, duration: Duration) { let mut millis = duration.as_millis() as u64; if duration.subsec_nanos() % 1_000_000 != 0 {