diff --git a/CHANGELOG.md b/CHANGELOG.md index 440307887e..32b22c7718 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -68,6 +68,10 @@ The format is loosely based on [Keep a Changelog](https://keepachangelog.com/en/ - Node bootstrapping: - The format of the bootstrap file was changed and the legacy format is no longer supported. + - P2p: + - The logic of initiating new outbound connections has been improved to prevent the node from + constantly attempting to re-establish a connection with a peer that has banned it. + ### Fixed - P2p: when a peer sends a message that can't be decoded, it will now be discouraged (which is what is normally done for misbehaving peers) and the node won't try connecting to it again.\ diff --git a/dns-server/src/crawler_p2p/crawler_manager/mod.rs b/dns-server/src/crawler_p2p/crawler_manager/mod.rs index 73774ee860..8d88c9d521 100644 --- a/dns-server/src/crawler_p2p/crawler_manager/mod.rs +++ b/dns-server/src/crawler_p2p/crawler_manager/mod.rs @@ -34,7 +34,7 @@ use p2p::{ PingResponse, }, net::{ - types::{ConnectivityEvent, SyncingEvent}, + types::{ConnectivityEvent, PeerManagerMessageExt, SyncingEvent}, ConnectivityService, NetworkingService, SyncingEventReceiver, }, peer_manager::{ @@ -211,9 +211,17 @@ where fn handle_conn_message( &mut self, peer_id: PeerId, - message: PeerManagerMessage, + message: PeerManagerMessageExt, ) -> p2p::Result<()> { - match message { + let peer_mgr_message = match message { + PeerManagerMessageExt::PeerManagerMessage(message) => message, + PeerManagerMessageExt::FirstSyncMessageReceived => { + // Ignored + return Ok(()); + } + }; + + match peer_mgr_message { PeerManagerMessage::AddrListRequest(_) => { // Ignored Ok(()) @@ -231,7 +239,7 @@ where } PeerManagerMessage::PingRequest(PingRequest { nonce }) => { self.conn - .send_message( + .send_peer_manager_message( peer_id, PeerManagerMessage::PingResponse(PingResponse { nonce }), ) @@ -382,7 +390,7 @@ where CrawlerCommand::RequestAddresses { peer_id } => { log::debug!("Requesting addresses from peer {peer_id}"); - conn.send_message( + conn.send_peer_manager_message( peer_id, PeerManagerMessage::AddrListRequest(AddrListRequest {}), ) diff --git a/dns-server/src/crawler_p2p/crawler_manager/tests/mock_manager.rs b/dns-server/src/crawler_p2p/crawler_manager/tests/mock_manager.rs index 03d7de43f6..3d624ce1db 100644 --- a/dns-server/src/crawler_p2p/crawler_manager/tests/mock_manager.rs +++ b/dns-server/src/crawler_p2p/crawler_manager/tests/mock_manager.rs @@ -132,7 +132,8 @@ impl MockStateRef { peer_id, message: PeerManagerMessage::AnnounceAddrRequest(AnnounceAddrRequest { address: announced_ip.as_peer_address(), - }), + }) + .into(), }) .unwrap(); } @@ -271,7 +272,11 @@ impl ConnectivityService for MockConnectivityHandle { Ok(()) } - fn send_message(&mut self, _peer_id: PeerId, _request: PeerManagerMessage) -> p2p::Result<()> { + fn send_peer_manager_message( + &mut self, + _peer_id: PeerId, + _request: PeerManagerMessage, + ) -> p2p::Result<()> { Ok(()) } diff --git a/p2p/src/message.rs b/p2p/src/message.rs index 8d1f8263da..fb91ecd506 100644 --- a/p2p/src/message.rs +++ b/p2p/src/message.rs @@ -25,7 +25,8 @@ use serialization::{Decode, Encode}; use crate::types::peer_address::PeerAddress; -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, strum::EnumDiscriminants)] +#[strum_discriminants(name(BlockSyncMessageTag), derive(strum::EnumIter))] pub enum BlockSyncMessage { HeaderListRequest(HeaderListRequest), BlockListRequest(BlockListRequest), @@ -40,14 +41,16 @@ pub enum BlockSyncMessage { TestSentinel(Id<()>), } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, strum::EnumDiscriminants)] +#[strum_discriminants(name(TransactionSyncMessageTag), derive(strum::EnumIter))] pub enum TransactionSyncMessage { NewTransaction(Id), TransactionRequest(Id), TransactionResponse(TransactionResponse), } -#[derive(Debug, Clone, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq, Eq, strum::EnumDiscriminants)] +#[strum_discriminants(name(PeerManagerMessageTag), derive(strum::EnumIter))] pub enum PeerManagerMessage { AddrListRequest(AddrListRequest), AnnounceAddrRequest(AnnounceAddrRequest), diff --git a/p2p/src/net/default_backend/backend.rs b/p2p/src/net/default_backend/backend.rs index 852d1a04ab..ffd001735d 100644 --- a/p2p/src/net/default_backend/backend.rs +++ b/p2p/src/net/default_backend/backend.rs @@ -44,13 +44,14 @@ use crate::{ config::P2pConfig, disconnection_reason::DisconnectionReason, error::{DialError, P2pError, PeerError}, - message::PeerManagerMessage, net::{ default_backend::{ peer, types::{BackendEvent, Command, PeerEvent}, }, - types::{services::Services, ConnectivityEvent, PeerInfo, SyncingEvent}, + types::{ + services::Services, ConnectivityEvent, PeerInfo, PeerManagerMessageExt, SyncingEvent, + }, }, protocol::{ProtocolVersion, SupportedProtocolVersion}, types::{peer_address::PeerAddress, peer_id::PeerId}, @@ -613,7 +614,7 @@ where Ok(()) } - PeerEvent::MessageReceived { message } => { + PeerEvent::MessageReceived(message) => { if self.networking_enabled { self.handle_message(peer_id, message)?; } @@ -696,7 +697,7 @@ where fn handle_message( &mut self, peer_id: PeerId, - message: PeerManagerMessage, + message: PeerManagerMessageExt, ) -> crate::Result<()> { // Do not process remaining messages if the peer has been forcibly disconnected (for example, after being banned). // Without this check, the backend might send messages to the sync and peer managers after sending the disconnect notification. diff --git a/p2p/src/net/default_backend/mod.rs b/p2p/src/net/default_backend/mod.rs index 622a77b65c..5af7d88e53 100644 --- a/p2p/src/net/default_backend/mod.rs +++ b/p2p/src/net/default_backend/mod.rs @@ -136,7 +136,11 @@ where Ok(self.cmd_sender.send(types::Command::Disconnect { peer_id, reason })?) } - fn send_message(&mut self, peer_id: PeerId, message: PeerManagerMessage) -> crate::Result<()> { + fn send_peer_manager_message( + &mut self, + peer_id: PeerId, + message: PeerManagerMessage, + ) -> crate::Result<()> { Ok(self.cmd_sender.send(types::Command::SendMessage { peer_id, message: message.into(), diff --git a/p2p/src/net/default_backend/peer.rs b/p2p/src/net/default_backend/peer.rs index 5cac02ec25..d528c6e9a1 100644 --- a/p2p/src/net/default_backend/peer.rs +++ b/p2p/src/net/default_backend/peer.rs @@ -31,7 +31,10 @@ use crate::{ disconnection_reason::DisconnectionReason, error::{ConnectionValidationError, P2pError, PeerError, ProtocolError}, message::{BlockSyncMessage, TransactionSyncMessage, WillDisconnectMessage}, - net::default_backend::types::{BackendEvent, PeerEvent}, + net::{ + default_backend::types::{BackendEvent, PeerEvent}, + types::PeerManagerMessageExt, + }, protocol::{choose_common_protocol_version, ProtocolVersion, SupportedProtocolVersion}, types::peer_id::PeerId, }; @@ -80,6 +83,10 @@ pub struct Peer { /// Time getter time_getter: TimeGetter, + + /// Will be set to true once at least one BlockSyncMessage or TransactionSyncMessage has been + /// received from the peer. + sync_message_received: bool, } impl Peer @@ -112,6 +119,7 @@ where node_protocol_version, time_getter, common_protocol_version: None, + sync_message_received: false, } } @@ -390,17 +398,17 @@ where // Note: the channels used by this function to propagate messages to other parts of p2p // must be bounded; this is important to prevent DoS attacks. async fn handle_socket_msg( + &mut self, peer_id: PeerId, msg: Message, - peer_event_sender: &mut mpsc::Sender, - block_sync_msg_sender: &mut mpsc::Sender, - transaction_sync_msg_sender: &mut mpsc::Sender, + block_sync_msg_sender: &mpsc::Sender, + transaction_sync_msg_sender: &mpsc::Sender, ) -> crate::Result<()> { match msg.categorize() { CategorizedMessage::Handshake(_) => { log::error!("Peer {peer_id} sent unexpected handshake message"); - peer_event_sender + self.peer_event_sender .send(PeerEvent::Misbehaved { error: P2pError::ProtocolError(ProtocolError::UnexpectedMessage( "Unexpected handshake message".to_owned(), @@ -409,11 +417,35 @@ where .await?; } CategorizedMessage::PeerManagerMessage(msg) => { - peer_event_sender.send(PeerEvent::MessageReceived { message: msg }).await? + self.peer_event_sender + .send(PeerEvent::MessageReceived( + PeerManagerMessageExt::PeerManagerMessage(msg), + )) + .await? + } + CategorizedMessage::BlockSyncMessage(msg) => { + if !self.sync_message_received { + self.peer_event_sender + .send(PeerEvent::MessageReceived( + PeerManagerMessageExt::FirstSyncMessageReceived, + )) + .await?; + self.sync_message_received = true; + } + + block_sync_msg_sender.send(msg).await?; } - CategorizedMessage::BlockSyncMessage(msg) => block_sync_msg_sender.send(msg).await?, CategorizedMessage::TransactionSyncMessage(msg) => { - transaction_sync_msg_sender.send(msg).await? + if !self.sync_message_received { + self.peer_event_sender + .send(PeerEvent::MessageReceived( + PeerManagerMessageExt::FirstSyncMessageReceived, + )) + .await?; + self.sync_message_received = true; + } + + transaction_sync_msg_sender.send(msg).await?; } } @@ -486,12 +518,11 @@ where event = self.socket.recv(), if sync_msg_senders_opt.is_some() => match event { Ok(message) => { let sync_msg_senders = sync_msg_senders_opt.as_mut().expect("sync_msg_senders_opt is some"); - Self::handle_socket_msg( + self.handle_socket_msg( self.peer_id, message, - &mut self.peer_event_sender, - &mut sync_msg_senders.0, - &mut sync_msg_senders.1, + &sync_msg_senders.0, + &sync_msg_senders.1, ).await?; } Err(err) => { diff --git a/p2p/src/net/default_backend/types.rs b/p2p/src/net/default_backend/types.rs index 7a1bbf2aad..3e26a357ac 100644 --- a/p2p/src/net/default_backend/types.rs +++ b/p2p/src/net/default_backend/types.rs @@ -32,7 +32,7 @@ use crate::{ BlockSyncMessage, HeaderList, HeaderListRequest, PeerManagerMessage, PingRequest, PingResponse, TransactionResponse, TransactionSyncMessage, WillDisconnectMessage, }, - net::types::services::Services, + net::types::{services::Services, PeerManagerMessageExt}, protocol::{ProtocolVersion, SupportedProtocolVersion}, types::{peer_address::PeerAddress, peer_id::PeerId}, }; @@ -109,7 +109,7 @@ pub mod peer_event { } } -/// Events sent by Peer to Backend +/// Events sent by `Peer` to `Backend`. #[derive(Debug)] pub enum PeerEvent { /// Peer information received from remote @@ -119,7 +119,7 @@ pub enum PeerEvent { ConnectionClosed, /// Message received from remote - MessageReceived { message: PeerManagerMessage }, + MessageReceived(PeerManagerMessageExt), /// Protocol violation Misbehaved { error: P2pError }, @@ -135,7 +135,7 @@ pub enum PeerEvent { }, } -/// Events sent by Backend to Peer +/// Events sent by `Backend` to `Peer`. #[derive(Debug)] pub enum BackendEvent { Accepted { diff --git a/p2p/src/net/mod.rs b/p2p/src/net/mod.rs index 5a32a4603a..10fda32f05 100644 --- a/p2p/src/net/mod.rs +++ b/p2p/src/net/mod.rs @@ -115,18 +115,17 @@ where reason: Option, ) -> crate::Result<()>; - /// Sends a message to the given peer. - fn send_message(&mut self, peer: PeerId, message: PeerManagerMessage) -> crate::Result<()>; + /// Sends a peer manager message to the given peer. + fn send_peer_manager_message( + &mut self, + peer: PeerId, + message: PeerManagerMessage, + ) -> crate::Result<()>; /// Return the socket addresses of the network service provider fn local_addresses(&self) -> &[SocketAddress]; /// Poll events from the network service provider - /// - /// There are three types of events that can be received: - /// - incoming peer connections - /// - new discovered peers - /// - peer expiration events async fn poll_next(&mut self) -> crate::Result; } diff --git a/p2p/src/net/types.rs b/p2p/src/net/types.rs index 571f41c1bc..ddc7cf90dc 100644 --- a/p2p/src/net/types.rs +++ b/p2p/src/net/types.rs @@ -26,7 +26,9 @@ use tokio::sync::mpsc::Receiver; use crate::{ error::ConnectionValidationError, - message::{BlockSyncMessage, PeerManagerMessage, TransactionSyncMessage}, + message::{ + BlockSyncMessage, PeerManagerMessage, PeerManagerMessageTag, TransactionSyncMessage, + }, protocol::SupportedProtocolVersion, types::{peer_address::PeerAddress, peer_id::PeerId}, P2pError, @@ -45,7 +47,7 @@ use self::services::Services; serde::Serialize, serde::Deserialize, rpc_description::HasValueHint, - enum_iterator::Sequence, + strum::EnumIter, )] pub enum PeerRole { Inbound, @@ -57,23 +59,60 @@ pub enum PeerRole { } impl PeerRole { + pub fn as_outbound(&self) -> Option { + match self { + Self::Inbound => None, + Self::OutboundFullRelay => Some(OutboundPeerRole::FullRelay), + Self::OutboundBlockRelay => Some(OutboundPeerRole::BlockRelay), + Self::OutboundReserved => Some(OutboundPeerRole::Reserved), + Self::OutboundManual => Some(OutboundPeerRole::Manual), + Self::Feeler => Some(OutboundPeerRole::Feeler), + } + } + pub fn is_outbound(&self) -> bool { - use PeerRole::*; + self.as_outbound().is_some() + } + pub fn is_outbound_manual(&self) -> bool { match self { - Inbound => false, - OutboundFullRelay | OutboundBlockRelay | OutboundReserved | OutboundManual | Feeler => { - true - } + Self::OutboundManual => true, + Self::Inbound + | Self::OutboundFullRelay + | Self::OutboundBlockRelay + | Self::OutboundReserved + | Self::Feeler => false, } } +} - pub fn is_outbound_manual(&self) -> bool { - use PeerRole::*; +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, strum::EnumIter)] +pub enum OutboundPeerRole { + FullRelay, + BlockRelay, + Reserved, + Manual, + Feeler, +} + +impl OutboundPeerRole { + /// Return true if for this connection type some message exchange is expected (besides + /// the handshake and WillDisconnect), i.e. the node is supposed to send at least one message + /// and get back a response. + /// + /// This is used by peerdb's AddressData to determine whether the "no activity" counter + /// should be increased after a connection with no peer activity. + pub fn is_message_exchange_expected(&self) -> bool { + match self { + Self::FullRelay | Self::BlockRelay | Self::Reserved | Self::Manual => true, + Self::Feeler => false, + } + } + pub fn is_manual(&self) -> bool { match self { - OutboundManual => true, - Inbound | OutboundFullRelay | OutboundBlockRelay | OutboundReserved | Feeler => false, + Self::Manual => true, + | Self::FullRelay | Self::BlockRelay | Self::Reserved | Self::Feeler => false, } } } @@ -142,13 +181,17 @@ impl Display for PeerInfo { } } -/// Connectivity-related events received from the network +/// Events available via the `ConnectivityService` trait (normally implemented by `NetworkingService::ConnectivityHandle`). +/// +/// Note: `PeerManager` is the main consumer of these events. #[derive(Debug)] pub enum ConnectivityEvent { + /// A message received from a peer. Message { peer_id: PeerId, - message: PeerManagerMessage, + message: PeerManagerMessageExt, }, + /// Outbound connection accepted OutboundAccepted { /// Peer address @@ -224,7 +267,41 @@ pub enum ConnectivityEvent { }, } -/// Syncing-related events (sent from the backend) +#[derive(Debug)] +pub enum PeerManagerMessageExt { + // The complete PeerManagerMessage + PeerManagerMessage(PeerManagerMessage), + + // An indicator that the first sync message (i.e. BlockSyncMessage or TransactionSyncMessage) + // has been received from the peer. + FirstSyncMessageReceived, +} + +impl From for PeerManagerMessageExt { + fn from(value: PeerManagerMessage) -> Self { + Self::PeerManagerMessage(value) + } +} + +/// Tag type for `PeerManagerMessageExt`. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PeerManagerMessageExtTag { + PeerManagerMessage(PeerManagerMessageTag), + FirstSyncMessageReceived, +} + +impl From<&'_ PeerManagerMessageExt> for PeerManagerMessageExtTag { + fn from(value: &'_ PeerManagerMessageExt) -> Self { + match value { + PeerManagerMessageExt::PeerManagerMessage(msg) => Self::PeerManagerMessage(msg.into()), + PeerManagerMessageExt::FirstSyncMessageReceived => Self::FirstSyncMessageReceived, + } + } +} + +/// Events obtainable via the `SyncingEventReceiver` trait (normally implemented by `NetworkingService::SyncingEventReceiver`). +/// +/// Note: `SyncManager` is the consumer of these events. #[derive(Debug)] pub enum SyncingEvent { /// Peer connected diff --git a/p2p/src/peer_manager/mod.rs b/p2p/src/peer_manager/mod.rs index ee7223e73e..66c25ba882 100644 --- a/p2p/src/peer_manager/mod.rs +++ b/p2p/src/peer_manager/mod.rs @@ -45,7 +45,10 @@ use logging::log; use networking::types::ConnectionDirection; use p2p_types::{bannable_address::BannableAddress, socket_address::SocketAddress, IsGlobalIp}; use randomness::{seq::IteratorRandom, BoxedRngMutexWrapper, Rng, RngCore}; -use utils::{bloom_filters::rolling_bloom_filter::RollingBloomFilter, ensure, set_flag::SetFlag}; +use utils::{ + bloom_filters::rolling_bloom_filter::RollingBloomFilter, debug_panic_or_log, ensure, + set_flag::SetFlag, +}; use utils_networking::IpOrSocketAddress; use crate::{ @@ -60,7 +63,7 @@ use crate::{ net::{ types::{ services::{Service, Services}, - ConnectivityEvent, PeerInfo, PeerRole, + ConnectivityEvent, PeerInfo, PeerManagerMessageExt, PeerManagerMessageExtTag, PeerRole, }, ConnectivityService, NetworkingService, }, @@ -1126,9 +1129,12 @@ where let old_value = self.peers.insert(peer_id, peer); assert!(old_value.is_none()); - if peer_role.is_outbound() { - self.peerdb - .outbound_peer_connected(peer_address, Self::lock_rng(&self.rng).deref_mut()); + if let Some(outbound_peer_role) = peer_role.as_outbound() { + self.peerdb.outbound_peer_connected( + peer_address, + outbound_peer_role, + Self::lock_rng(&self.rng).deref_mut(), + ); } if peer_role == PeerRole::OutboundBlockRelay { @@ -1265,9 +1271,13 @@ where | OutboundConnectType::Reserved | OutboundConnectType::Feeler => {} OutboundConnectType::Manual { response_sender } => { - response_sender.send(Err(error)); + response_sender.send(Err(error.clone())); } } + + if let Some(o) = self.observer.as_mut() { + o.outbound_error(address, error) + } } /// The connection to a remote peer is reported as closed. @@ -1310,6 +1320,10 @@ where } self.subscribed_to_peer_addresses.remove(&peer_id); + + if let Some(o) = self.observer.as_mut() { + o.connection_closed(peer_id) + } } } @@ -1320,7 +1334,7 @@ where ) { // `send_message` should not fail, but even if it does, the error can be ignored // because sending messages over the network does not guarantee that they will be received - let res = peer_connectivity_handle.send_message(peer_id, message); + let res = peer_connectivity_handle.send_peer_manager_message(peer_id, message); if let Err(err) = res { log::error!("send_message failed unexpectedly: {err:?}"); } @@ -1537,6 +1551,8 @@ where Self::lock_rng(&self.rng).deref_mut(), ); if let Some(address) = address_opt { + log::debug!("Need to establish a feeler connection to address {address}"); + self.connect(address, OutboundConnectType::Feeler); self.next_feeler_connection_time = Self::choose_next_feeler_connection_time( &self.p2p_config, @@ -1547,20 +1563,65 @@ where } } - fn handle_incoming_message(&mut self, peer: PeerId, message: PeerManagerMessage) { - match message { - PeerManagerMessage::AddrListRequest(_) => self.handle_addr_list_request(peer), - PeerManagerMessage::AnnounceAddrRequest(r) => { - self.handle_announce_addr_request(peer, r.address) - } - PeerManagerMessage::PingRequest(r) => self.handle_ping_request(peer, r.nonce), - PeerManagerMessage::AddrListResponse(r) => { - self.handle_addr_list_response(peer, r.addresses) - } - PeerManagerMessage::PingResponse(r) => self.handle_ping_response(peer, r.nonce), - PeerManagerMessage::WillDisconnect(msg) => { - self.handle_will_disconnect_messgae(peer, msg) + fn handle_incoming_message(&mut self, peer_id: PeerId, message: PeerManagerMessageExt) { + let is_disconnection_message = match &message { + PeerManagerMessageExt::PeerManagerMessage(msg) => match msg { + PeerManagerMessage::WillDisconnect(_) => true, + + PeerManagerMessage::AddrListRequest(_) + | PeerManagerMessage::AnnounceAddrRequest(_) + | PeerManagerMessage::PingRequest(_) + | PeerManagerMessage::AddrListResponse(_) + | PeerManagerMessage::PingResponse(_) => false, + }, + PeerManagerMessageExt::FirstSyncMessageReceived => false, + }; + + // Note: `PeerContext` must always exist when an incoming message arrives, and the individual + // "handle_" methods called below `expect` on this fact. Here we do an explicit check + // just in case, so that if things go horribly wrong, only the debug build will panic. + if let Some(peer) = self.peers.get(&peer_id) { + if !is_disconnection_message && peer.peer_role.is_outbound() { + self.peerdb.outbound_peer_had_activity( + peer.peer_address, + Self::lock_rng(&self.rng).deref_mut(), + ); } + } else { + debug_panic_or_log!( + "Peer context for {peer_id} not found when handling incoming message {message:?}" + ); + return; + } + + let message_tag: PeerManagerMessageExtTag = (&message).into(); + + match message { + PeerManagerMessageExt::PeerManagerMessage(msg) => match msg { + PeerManagerMessage::AddrListRequest(_) => { + self.handle_addr_list_request(peer_id); + } + PeerManagerMessage::AnnounceAddrRequest(msg) => { + self.handle_announce_addr_request(peer_id, msg.address); + } + PeerManagerMessage::PingRequest(msg) => { + self.handle_ping_request(peer_id, msg.nonce); + } + PeerManagerMessage::AddrListResponse(msg) => { + self.handle_addr_list_response(peer_id, msg.addresses); + } + PeerManagerMessage::PingResponse(msg) => { + self.handle_ping_response(peer_id, msg.nonce); + } + PeerManagerMessage::WillDisconnect(msg) => { + self.handle_will_disconnect_messgae(peer_id, msg); + } + }, + PeerManagerMessageExt::FirstSyncMessageReceived => {} + }; + + if let Some(o) = self.observer.as_mut() { + o.message_received(peer_id, message_tag) } } @@ -2397,12 +2458,28 @@ where pub trait Observer { fn on_peer_ban_score_adjustment(&mut self, address: SocketAddress, new_score: u32); + fn on_peer_ban(&mut self, address: BannableAddress); + fn on_peer_discouragement(&mut self, address: BannableAddress); + // This will be called at the end of "heartbeat" function. fn on_heartbeat(&mut self); + // This will be called for both incoming and outgoing connections. fn on_connection_accepted(&mut self, address: SocketAddress, peer_role: PeerRole); + + // This will be called after `ConnectivityEvent::ConnectionError` has been handled by + // the peer manager. + fn outbound_error(&mut self, address: SocketAddress, error: P2pError); + + // This will be called after `ConnectivityEvent::ConnectionClosed` has been handled by + // the peer manager. + fn connection_closed(&mut self, peer_id: PeerId); + + // This will be called after `ConnectivityEvent::Message` has been handled by + // the peer manager. + fn message_received(&mut self, peer_id: PeerId, message_tag: PeerManagerMessageExtTag); } pub trait PeerManagerInterface { @@ -2417,6 +2494,9 @@ pub trait PeerManagerInterface { #[cfg(test)] fn peer_db_mut(&mut self) -> &mut dyn peerdb::PeerDbInterface; + + #[cfg(test)] + fn next_feeler_connection_time(&self) -> Time; } impl PeerManagerInterface for PeerManager @@ -2444,6 +2524,11 @@ where fn peer_db_mut(&mut self) -> &mut dyn peerdb::PeerDbInterface { &mut self.peerdb } + + #[cfg(test)] + fn next_feeler_connection_time(&self) -> Time { + self.next_feeler_connection_time + } } #[derive(Copy, Clone, Debug)] diff --git a/p2p/src/peer_manager/peerdb/address_data/mod.rs b/p2p/src/peer_manager/peerdb/address_data/mod.rs index c9ddcd9f96..d0bacbcf13 100644 --- a/p2p/src/peer_manager/peerdb/address_data/mod.rs +++ b/p2p/src/peer_manager/peerdb/address_data/mod.rs @@ -17,6 +17,9 @@ use std::time::Duration; use common::primitives::time::Time; use randomness::Rng; +use utils::debug_panic_or_log; + +use crate::net::types::OutboundPeerRole; /// Maximum delay between reconnection attempts to reserved nodes const MAX_DELAY_RESERVED: Duration = Duration::from_secs(360); @@ -43,9 +46,17 @@ pub const PURGE_REACHABLE_FAIL_COUNT: u32 = /// -ln(0.0000000000000035527136788) which is about 33. const MAX_DELAY_FACTOR: u32 = 30; +// Note: AddressState/AddressData only track outbound connections, so if an inbound connection exists +// from a given address, its AddressState may still be Disconnected or even Unreachable. #[derive(Debug, Clone)] pub enum AddressState { - Connected {}, + Connected { + /// Whether the peer has shown some activity (i.e. sent us any message except for WillDisconnect) + /// during this connection. + had_activity: bool, + + peer_role: OutboundPeerRole, + }, Disconnected { /// Whether the address was reachable at least once. @@ -66,30 +77,26 @@ pub enum AddressState { }, } -#[derive(Copy, Clone, Debug)] -// Update `ALL_TRANSITIONS` if a new transition is added! +#[derive(Copy, Clone, Debug, strum::EnumDiscriminants)] +#[strum_discriminants(name(AddressStateTransitionToTag), derive(strum::EnumIter))] pub enum AddressStateTransitionTo { - Connected, + Connected { peer_role: OutboundPeerRole }, + HadActivity, Disconnected, ConnectionFailed, SetReserved, UnsetReserved, } -#[cfg(test)] -pub const ALL_TRANSITIONS: [AddressStateTransitionTo; 5] = [ - AddressStateTransitionTo::Connected, - AddressStateTransitionTo::Disconnected, - AddressStateTransitionTo::ConnectionFailed, - AddressStateTransitionTo::SetReserved, - AddressStateTransitionTo::UnsetReserved, -]; - -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct AddressData { state: AddressState, reserved: bool, + + /// The number of consecutive successful completed connections during which the peer had no activity + /// (i.e. hadn't sent us any message except for WillDisconnect). + connections_without_activity_count: u32, } impl AddressData { @@ -101,6 +108,7 @@ impl AddressData { next_connect_after: now, }, reserved, + connections_without_activity_count: 0, } } @@ -108,14 +116,26 @@ impl AddressData { &self.state } + #[cfg(test)] + pub fn state_mut(&mut self) -> &mut AddressState { + &mut self.state + } + pub fn reserved(&self) -> bool { self.reserved } + pub fn connections_without_activity_count(&self) -> u32 { + self.connections_without_activity_count + } + /// Returns true when it is time to attempt a new outbound connection pub fn connect_now(&self, now: Time) -> bool { match self.state { - AddressState::Connected {} => false, + AddressState::Connected { + had_activity: _, + peer_role: _, + } => false, // Once a peer is disconnected by the RPC command, it should remain disconnected // (at least until the RPC requests to connect). Otherwise, users may be surprised @@ -133,7 +153,10 @@ impl AddressData { /// Returns true if the address should be kept in memory pub fn retain(&self, now: Time) -> bool { match self.state { - AddressState::Connected {} => true, + AddressState::Connected { + had_activity: _, + peer_role: _, + } => true, AddressState::Disconnected { was_reachable: _, fail_count: _, @@ -151,7 +174,7 @@ impl AddressData { matches!(self.state, AddressState::Unreachable { .. }) } - fn next_connect_delay(fail_count: u32, reserved: bool) -> Duration { + fn next_connect_delay(effective_fail_count: u32, reserved: bool) -> Duration { let max_delay = if reserved { MAX_DELAY_RESERVED } else { @@ -160,14 +183,25 @@ impl AddressData { // 10, 20, 40, 80... seconds std::cmp::min( - Duration::from_secs(10).saturating_mul(2u32.saturating_pow(fail_count)), + Duration::from_secs(10).saturating_mul(2u32.saturating_pow(effective_fail_count)), max_delay, ) } - fn next_connect_time(now: Time, fail_count: u32, reserved: bool, rng: &mut impl Rng) -> Time { + fn next_connect_time( + now: Time, + fail_count: u32, + connections_without_activity_count: u32, + reserved: bool, + rng: &mut impl Rng, + ) -> Time { + // Note: fail_count is reset whenever any successful outbound connection is made, but + // connections_without_activity_count is not reset when an outbound connection fails, + // so it's possible for both of them to be non-zero. + let effective_fail_count = std::cmp::max(fail_count, connections_without_activity_count); + let factor = utils::exp_rand::exponential_rand(rng).clamp(0.0, MAX_DELAY_FACTOR as f64); - let offset = Self::next_connect_delay(fail_count, reserved).mul_f64(factor); + let offset = Self::next_connect_delay(effective_fail_count, reserved).mul_f64(factor); (now + offset).expect("Unexpected time addition overflow") } @@ -178,35 +212,125 @@ impl AddressData { rng: &mut impl Rng, ) { self.state = match transition { - AddressStateTransitionTo::Connected => match self.state { - AddressState::Connected {} => unreachable!(), + AddressStateTransitionTo::Connected { peer_role } => match self.state { + AddressState::Connected { + had_activity: _, + peer_role: _, + } => { + debug_panic_or_log!( + "Unexpected address state transition: Connected -> Connected" + ); + self.state.clone() + } AddressState::Disconnected { fail_count: _, next_connect_after: _, was_reachable: _, - } => AddressState::Connected {}, + } => AddressState::Connected { + had_activity: false, + peer_role, + }, AddressState::Unreachable { erase_after: _ } => { // Connection to an `Unreachable` node may be requested by RPC at any moment - AddressState::Connected {} + AddressState::Connected { + had_activity: false, + peer_role, + } } }, - AddressStateTransitionTo::Disconnected => match self.state { - AddressState::Connected {} => AddressState::Disconnected { - fail_count: 0, - next_connect_after: Self::next_connect_time(now, 0, self.reserved, rng), - was_reachable: true, + AddressStateTransitionTo::HadActivity => match self.state { + AddressState::Connected { + had_activity: _, + peer_role, + } => AddressState::Connected { + had_activity: true, + peer_role, }, AddressState::Disconnected { fail_count: _, next_connect_after: _, was_reachable: _, - } => unreachable!(), - AddressState::Unreachable { erase_after: _ } => unreachable!(), + } => { + debug_panic_or_log!( + "Unexpected address state transition: Disconnected -> HadActivity" + ); + self.state.clone() + } + AddressState::Unreachable { erase_after: _ } => { + debug_panic_or_log!( + "Unexpected address state transition: Unreachable -> HadActivity" + ); + self.state.clone() + } + }, + + AddressStateTransitionTo::Disconnected => match self.state { + AddressState::Connected { + had_activity, + peer_role, + } => { + if had_activity { + self.connections_without_activity_count = 0; + } else { + // Note: + // 1) We don't increase the counter for manual connections. + // 2) Since `is_message_exchange_expected` doesn't know the actual services + // that the peer provides, it's technically possible to punish an + // "innocent" peer here, e.g. when the connection type is supposed to involve + // block exchange, but the peer's actual services don't include Blocks. + // However: + // a) The worst punishment it can get is that the next connection will be + // postponed for (roughly) MAX_DELAY_REACHABLE, which is currently 1 hour. + // b) Such a peer will be rather useless anyway. + // c) The connection has to be short enough, so that even a single PingRequest message + // (which requires a response) could not be sent. + // So it's not a big deal. + if peer_role.is_message_exchange_expected() && !peer_role.is_manual() { + self.connections_without_activity_count += 1; + } + } + + AddressState::Disconnected { + fail_count: 0, + next_connect_after: Self::next_connect_time( + now, + 0, + self.connections_without_activity_count, + self.reserved, + rng, + ), + was_reachable: true, + } + } + AddressState::Disconnected { + fail_count: _, + next_connect_after: _, + was_reachable: _, + } => { + debug_panic_or_log!( + "Unexpected address state transition: Disconnected -> Disconnected" + ); + self.state.clone() + } + AddressState::Unreachable { erase_after: _ } => { + debug_panic_or_log!( + "Unexpected address state transition: Unreachable -> Disconnected" + ); + self.state.clone() + } }, AddressStateTransitionTo::ConnectionFailed => match self.state { - AddressState::Connected {} => unreachable!(), + AddressState::Connected { + had_activity: _, + peer_role: _, + } => { + debug_panic_or_log!( + "Unexpected address state transition: Connected -> ConnectionFailed" + ); + self.state.clone() + } AddressState::Disconnected { fail_count, next_connect_after: _, @@ -218,6 +342,7 @@ impl AddressData { next_connect_after: Self::next_connect_time( now, fail_count + 1, + self.connections_without_activity_count, self.reserved, rng, ), @@ -236,6 +361,7 @@ impl AddressData { next_connect_after: Self::next_connect_time( now, fail_count + 1, + self.connections_without_activity_count, self.reserved, rng, ), @@ -254,7 +380,13 @@ impl AddressData { // Change to Disconnected if currently Unreachable match self.state { - AddressState::Connected {} => AddressState::Connected {}, + AddressState::Connected { + had_activity, + peer_role, + } => AddressState::Connected { + had_activity, + peer_role, + }, AddressState::Disconnected { was_reachable, fail_count, @@ -265,6 +397,7 @@ impl AddressData { next_connect_after: Self::next_connect_time( now, fail_count, + self.connections_without_activity_count, self.reserved, rng, ), @@ -272,7 +405,13 @@ impl AddressData { // Reserved nodes should not be in the `Unreachable` state AddressState::Unreachable { erase_after: _ } => AddressState::Disconnected { fail_count: 0, - next_connect_after: Self::next_connect_time(now, 0, self.reserved, rng), + next_connect_after: Self::next_connect_time( + now, + 0, + self.connections_without_activity_count, + self.reserved, + rng, + ), was_reachable: false, }, } @@ -283,7 +422,13 @@ impl AddressData { // Do not change the state match self.state { - AddressState::Connected {} => AddressState::Connected {}, + AddressState::Connected { + had_activity, + peer_role, + } => AddressState::Connected { + had_activity, + peer_role, + }, AddressState::Disconnected { was_reachable, fail_count, diff --git a/p2p/src/peer_manager/peerdb/address_data/tests.rs b/p2p/src/peer_manager/peerdb/address_data/tests.rs index a4e119a04a..7c7cc4cea7 100644 --- a/p2p/src/peer_manager/peerdb/address_data/tests.rs +++ b/p2p/src/peer_manager/peerdb/address_data/tests.rs @@ -14,10 +14,13 @@ // limitations under the License. use rstest::rstest; +use strum::IntoEnumIterator as _; +use logging::log; use randomness::{ distributions::{Distribution, WeightedIndex}, rngs::StepRng, + seq::IteratorRandom as _, Rng, }; use test_utils::random::{make_seedable_rng, Seed}; @@ -32,8 +35,9 @@ fn randomized(#[case] seed: Seed) { let mut rng = make_seedable_rng(seed); let started_at = Time::from_duration_since_epoch(Duration::ZERO); - let weights = [100, 100, 100, 10, 10]; - assert_eq!(weights.len(), ALL_TRANSITIONS.len()); + let weights = [100, 100, 100, 100, 10, 10]; + let all_transition_tags = AddressStateTransitionToTag::iter().collect::>(); + assert_eq!(weights.len(), all_transition_tags.len()); let weights = WeightedIndex::new(weights).unwrap(); for _ in 0..100 { @@ -43,10 +47,28 @@ fn randomized(#[case] seed: Seed) { let mut address_data = AddressData::new(was_reachable, reserved, started_at); for _ in 0..1000 { - let transition = ALL_TRANSITIONS[weights.sample(&mut rng)]; + let transition_tag = all_transition_tags[weights.sample(&mut rng)]; + + let transition = match transition_tag { + AddressStateTransitionToTag::Connected => AddressStateTransitionTo::Connected { + peer_role: OutboundPeerRole::iter().choose(&mut rng).unwrap(), + }, + AddressStateTransitionToTag::HadActivity => AddressStateTransitionTo::HadActivity, + AddressStateTransitionToTag::Disconnected => AddressStateTransitionTo::Disconnected, + AddressStateTransitionToTag::ConnectionFailed => { + AddressStateTransitionTo::ConnectionFailed + } + AddressStateTransitionToTag::SetReserved => AddressStateTransitionTo::SetReserved, + AddressStateTransitionToTag::UnsetReserved => { + AddressStateTransitionTo::UnsetReserved + } + }; let is_valid_transition = match transition { - AddressStateTransitionTo::Connected => !address_data.is_connected(), + AddressStateTransitionTo::Connected { peer_role: _ } => { + !address_data.is_connected() + } + AddressStateTransitionTo::HadActivity => address_data.is_connected(), AddressStateTransitionTo::Disconnected => address_data.is_connected(), AddressStateTransitionTo::ConnectionFailed => !address_data.is_connected(), AddressStateTransitionTo::SetReserved => true, @@ -100,17 +122,29 @@ fn next_connect_time_test_impl(rng: &mut impl Rng) { let max_time_reserved = (start_time + limit_reserved).unwrap(); let max_time_reachable = (start_time + limit_reachable).unwrap(); - let time = AddressData::next_connect_time(start_time, 0, true, rng); - assert!(time <= max_time_reserved); - - let time = AddressData::next_connect_time(start_time, 0, false, rng); - assert!(time <= max_time_reachable); - - let time = AddressData::next_connect_time(start_time, u32::MAX, true, rng); - assert!(time <= max_time_reserved); - - let time = AddressData::next_connect_time(start_time, u32::MAX, false, rng); - assert!(time <= max_time_reachable); + for fail_count in [0, u32::MAX] { + for connections_without_activity_count in [0, u32::MAX] { + log::debug!("fail_count = {fail_count}, connections_without_activity_count = {connections_without_activity_count}"); + + let time = AddressData::next_connect_time( + start_time, + fail_count, + connections_without_activity_count, + true, + rng, + ); + assert!(time <= max_time_reserved); + + let time = AddressData::next_connect_time( + start_time, + fail_count, + connections_without_activity_count, + false, + rng, + ); + assert!(time <= max_time_reachable); + } + } } #[test] diff --git a/p2p/src/peer_manager/peerdb/mod.rs b/p2p/src/peer_manager/peerdb/mod.rs index 2d6cde287f..d5224a0226 100644 --- a/p2p/src/peer_manager/peerdb/mod.rs +++ b/p2p/src/peer_manager/peerdb/mod.rs @@ -43,7 +43,7 @@ use logging::log; use p2p_types::{bannable_address::BannableAddress, socket_address::SocketAddress}; use randomness::{seq::IteratorRandom, Rng, SliceRandom}; -use crate::config::P2pConfig; +use crate::{config::P2pConfig, net::types::OutboundPeerRole}; use self::{ address_data::{AddressData, AddressStateTransitionTo}, @@ -404,10 +404,18 @@ impl PeerDb { /// Mark peer as connected /// - /// After `PeerManager` has established either an inbound or an outbound connection, - /// it informs the `PeerDb` about it. - pub fn outbound_peer_connected(&mut self, address: SocketAddress, rng: &mut impl Rng) { - self.change_address_state(address, AddressStateTransitionTo::Connected, rng); + /// After `PeerManager` has established an outbound connection, it informs `PeerDb` about it. + pub fn outbound_peer_connected( + &mut self, + address: SocketAddress, + peer_role: OutboundPeerRole, + rng: &mut impl Rng, + ) { + self.change_address_state( + address, + AddressStateTransitionTo::Connected { peer_role }, + rng, + ); self.move_addr_to_tried(&address); } @@ -416,6 +424,11 @@ impl PeerDb { self.change_address_state(address, AddressStateTransitionTo::Disconnected, rng); } + /// Report that the peer has sent us a message other than the one indicating an intent to disconnect. + pub fn outbound_peer_had_activity(&mut self, address: SocketAddress, rng: &mut impl Rng) { + self.change_address_state(address, AddressStateTransitionTo::HadActivity, rng); + } + pub fn remove_address(&mut self, address: &SocketAddress) { if !self.reserved_nodes.contains(address) { self.addresses.remove(address); @@ -658,6 +671,9 @@ pub trait PeerDbInterface { fn is_address_discouraged(&self, address: &BannableAddress) -> bool; fn peer_discovered(&mut self, address: SocketAddress); + + fn address_data(&self, address: &SocketAddress) -> Option<&AddressData>; + fn address_data_mut(&mut self, address: &SocketAddress) -> Option<&mut AddressData>; } impl PeerDbInterface for PeerDb { @@ -672,6 +688,14 @@ impl PeerDbInterface for PeerDb { fn peer_discovered(&mut self, address: SocketAddress) { self.peer_discovered(address); } + + fn address_data(&self, address: &SocketAddress) -> Option<&AddressData> { + self.addresses.get(address) + } + + fn address_data_mut(&mut self, address: &SocketAddress) -> Option<&mut AddressData> { + self.addresses.get_mut(address) + } } #[cfg(test)] diff --git a/p2p/src/peer_manager/peerdb/tests.rs b/p2p/src/peer_manager/peerdb/tests.rs index 25742640b7..7fc6dec7f7 100644 --- a/p2p/src/peer_manager/peerdb/tests.rs +++ b/p2p/src/peer_manager/peerdb/tests.rs @@ -22,6 +22,7 @@ use std::{ use itertools::Itertools; use rstest::rstest; +use strum::IntoEnumIterator as _; use ::test_utils::{ random::{make_seedable_rng, Seed}, @@ -30,10 +31,11 @@ use ::test_utils::{ use common::{chain::config::create_unit_test_config, primitives::time::Time}; use networking::test_helpers::TestAddressMaker; use p2p_types::socket_addr_ext::SocketAddrExt; -use randomness::Rng; +use randomness::{seq::IteratorRandom as _, Rng}; use crate::{ ban_config::BanConfig, + net::types::OutboundPeerRole, peer_manager::{ peerdb::{ address_data::{self, PURGE_REACHABLE_FAIL_COUNT, PURGE_UNREACHABLE_TIME}, @@ -505,9 +507,10 @@ fn connected_unreachable(#[case] seed: Seed) { peerdb.report_outbound_failure(address, &mut rng); assert!(peerdb.addresses.get(&address).unwrap().is_unreachable()); - // User requests connection to the currently unreachable node via RPC and connection succeeds. + // PeerManager attempts a connection to a currently unreachable node and the connection succeeds. // PeerDb should process that normally. - peerdb.outbound_peer_connected(address, &mut rng); + let peer_role = OutboundPeerRole::iter().choose(&mut rng).unwrap(); + peerdb.outbound_peer_connected(address, peer_role, &mut rng); assert!(peerdb.addresses.get(&address).unwrap().is_connected()); assert_addr_consistency(&peerdb); @@ -535,9 +538,10 @@ fn connected_unknown(#[case] seed: Seed) { let address = TestAddressMaker::new_random_address(&mut rng).into(); - // User requests connection to some unknown node via RPC and connection succeeds. + // PeerManager attempts a connection to some unknown node and the connection succeeds. // PeerDb should process that normally. - peerdb.outbound_peer_connected(address, &mut rng); + let peer_role = OutboundPeerRole::iter().choose(&mut rng).unwrap(); + peerdb.outbound_peer_connected(address, peer_role, &mut rng); assert!(peerdb.addresses.get(&address).unwrap().is_connected()); assert_addr_consistency(&peerdb); @@ -605,8 +609,7 @@ fn anchor_peers(#[case] seed: Seed) { assert_addr_consistency(&peerdb); } -// Call 'remove_address' on new and tried addresses, check that the db is -// in consistent state. +// Call 'remove_address' on new and tried addresses, check that the db is in a consistent state. #[tracing::instrument(skip(seed))] #[rstest] #[trace] @@ -652,7 +655,8 @@ fn remove_addr(#[case] seed: Seed) { } for addr in &tried_addrs { - peerdb.outbound_peer_connected(*addr, &mut rng); + let peer_role = OutboundPeerRole::iter().choose(&mut rng).unwrap(); + peerdb.outbound_peer_connected(*addr, peer_role, &mut rng); } for addr in new_addrs_to_remove.iter().chain(tried_addrs_to_remove.iter()) { @@ -707,7 +711,8 @@ fn remove_unreachable(#[case] seed: Seed) { } for addr in &tried_addrs { - peerdb.outbound_peer_connected(*addr, &mut rng); + let peer_role = OutboundPeerRole::iter().choose(&mut rng).unwrap(); + peerdb.outbound_peer_connected(*addr, peer_role, &mut rng); } assert_eq!(new_addr_table(&peerdb).addr_count(), addr_count); @@ -862,7 +867,8 @@ fn tried_addr_count_limit(#[case] seed: Seed, #[values(true, false)] use_reserve peerdb.add_reserved_node(addr, &mut rng); } - peerdb.outbound_peer_connected(addr, &mut rng); + let peer_role = OutboundPeerRole::iter().choose(&mut rng).unwrap(); + peerdb.outbound_peer_connected(addr, peer_role, &mut rng); if use_reserved_nodes && i % 3 == 1 { peerdb.add_reserved_node(addr, &mut rng); @@ -934,7 +940,9 @@ fn new_tried_addr_selection_frequency() { peerdb.peer_discovered(addr); } for addr in tried_addrs { - peerdb.outbound_peer_connected(addr, &mut rng); + let peer_role = OutboundPeerRole::iter().choose(&mut rng).unwrap(); + peerdb.outbound_peer_connected(addr, peer_role, &mut rng); + // Mark the address as disconnected, otherwise it won't be selected by // select_non_reserved_outbound_addresses. peerdb.outbound_peer_disconnected(addr, &mut rng); diff --git a/p2p/src/peer_manager/tests/ban.rs b/p2p/src/peer_manager/tests/ban.rs index 9ac55eebb1..5ea8dee7a3 100644 --- a/p2p/src/peer_manager/tests/ban.rs +++ b/p2p/src/peer_manager/tests/ban.rs @@ -478,7 +478,8 @@ async fn banned_address_is_not_announced(#[case] seed: Seed) { peer_id: peer1_id, message: PeerManagerMessage::AnnounceAddrRequest(AnnounceAddrRequest { address: banned_addr.as_peer_address(), - }), + }) + .into(), }) .unwrap(); @@ -490,7 +491,8 @@ async fn banned_address_is_not_announced(#[case] seed: Seed) { peer_id: peer1_id, message: PeerManagerMessage::AnnounceAddrRequest(AnnounceAddrRequest { address: normal_addr.as_peer_address(), - }), + }) + .into(), }) .unwrap(); @@ -595,7 +597,7 @@ async fn banned_address_not_in_addr_response(#[case] seed: Seed) { conn_event_sender .send(ConnectivityEvent::Message { peer_id, - message: PeerManagerMessage::AddrListRequest(AddrListRequest {}), + message: PeerManagerMessage::AddrListRequest(AddrListRequest {}).into(), }) .unwrap(); diff --git a/p2p/src/peer_manager/tests/connections.rs b/p2p/src/peer_manager/tests/connections.rs index c0a45fddc7..8661f790d0 100644 --- a/p2p/src/peer_manager/tests/connections.rs +++ b/p2p/src/peer_manager/tests/connections.rs @@ -70,7 +70,7 @@ use crate::{ tests::{ get_connected_peers, make_peer_manager, make_standalone_peer_manager, run_peer_manager, utils::{ - expect_cmd_connect_to, expect_cmd_connect_to_one_of, + connection_closed, expect_cmd_connect_to, expect_cmd_connect_to_one_of, inbound_block_relay_peer_accepted_by_backend, make_full_relay_peer_info, mutate_peer_manager, outbound_full_relay_peer_accepted_by_backend, query_peer_manager, recv_command_advance_time, start_manually_connecting, @@ -2037,11 +2037,7 @@ async fn feeler_connections_test_impl(seed: Seed) { } ); - conn_event_sender - .send(ConnectivityEvent::ConnectionClosed { - peer_id: cur_peer_id, - }) - .unwrap(); + connection_closed(&conn_event_sender, cur_peer_id); successful_conn_addresses.insert(addr); had_successful_feelers = true; diff --git a/p2p/src/peer_manager/tests/discouragement.rs b/p2p/src/peer_manager/tests/discouragement.rs index 7a6ea73865..6cd1d4d8be 100644 --- a/p2p/src/peer_manager/tests/discouragement.rs +++ b/p2p/src/peer_manager/tests/discouragement.rs @@ -550,7 +550,8 @@ async fn discouraged_address_is_not_announced(#[case] seed: Seed) { peer_id: peer1_id, message: PeerManagerMessage::AnnounceAddrRequest(AnnounceAddrRequest { address: discouraged_addr.as_peer_address(), - }), + }) + .into(), }) .unwrap(); @@ -562,7 +563,8 @@ async fn discouraged_address_is_not_announced(#[case] seed: Seed) { peer_id: peer1_id, message: PeerManagerMessage::AnnounceAddrRequest(AnnounceAddrRequest { address: normal_addr.as_peer_address(), - }), + }) + .into(), }) .unwrap(); @@ -666,7 +668,7 @@ async fn discouraged_address_not_in_addr_response(#[case] seed: Seed) { conn_event_sender .send(ConnectivityEvent::Message { peer_id, - message: PeerManagerMessage::AddrListRequest(AddrListRequest {}), + message: PeerManagerMessage::AddrListRequest(AddrListRequest {}).into(), }) .unwrap(); diff --git a/p2p/src/peer_manager/tests/mod.rs b/p2p/src/peer_manager/tests/mod.rs index 3e36ebc760..4658e7e6ee 100644 --- a/p2p/src/peer_manager/tests/mod.rs +++ b/p2p/src/peer_manager/tests/mod.rs @@ -21,6 +21,7 @@ mod discouragement; mod eviction; mod peer_types; mod ping; +mod unsuccessful_connection_counter_update; pub mod utils; mod whitelist; @@ -245,13 +246,18 @@ async fn send_and_sync( cmd_receiver: &mut UnboundedReceiver, rng: &mut impl Rng, ) { - conn_event_sender.send(ConnectivityEvent::Message { peer_id, message }).unwrap(); + conn_event_sender + .send(ConnectivityEvent::Message { + peer_id, + message: message.into(), + }) + .unwrap(); let sent_nonce = rng.gen(); conn_event_sender .send(ConnectivityEvent::Message { peer_id, - message: PeerManagerMessage::PingRequest(PingRequest { nonce: sent_nonce }), + message: PeerManagerMessage::PingRequest(PingRequest { nonce: sent_nonce }).into(), }) .unwrap(); @@ -266,7 +272,7 @@ async fn send_and_sync( conn_event_sender .send(ConnectivityEvent::Message { peer_id, - message: PeerManagerMessage::PingResponse(PingResponse { nonce }), + message: PeerManagerMessage::PingResponse(PingResponse { nonce }).into(), }) .unwrap(); assert_eq!(nonce, sent_nonce); diff --git a/p2p/src/peer_manager/tests/peer_types.rs b/p2p/src/peer_manager/tests/peer_types.rs index d82520c894..c71bf5f32b 100644 --- a/p2p/src/peer_manager/tests/peer_types.rs +++ b/p2p/src/peer_manager/tests/peer_types.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use rstest::rstest; +use strum::IntoEnumIterator as _; use common::{chain::config, primitives::user_agent::mintlayer_core_user_agent}; use networking::transport::TcpTransportSocket; @@ -108,7 +109,7 @@ fn validate_services(#[case] seed: Seed) { let services = Services::from_u64(services); // List all peer roles - for peer_role in enum_iterator::all::() { + for peer_role in PeerRole::iter() { let peer_id_1 = PeerId::new(); let peer_info = PeerInfo { peer_id: peer_id_1, diff --git a/p2p/src/peer_manager/tests/unsuccessful_connection_counter_update.rs b/p2p/src/peer_manager/tests/unsuccessful_connection_counter_update.rs new file mode 100644 index 0000000000..463141049c --- /dev/null +++ b/p2p/src/peer_manager/tests/unsuccessful_connection_counter_update.rs @@ -0,0 +1,1270 @@ +// Copyright (c) 2026 RBB S.r.l +// opensource@mintlayer.org +// SPDX-License-Identifier: MIT +// Licensed under the MIT License; +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! These tests check that `AddressData`'s `fail_count` and `connections_without_activity_count` +//! are correctly updated after a failed connection or connection without peer activity, respectively. + +use std::{sync::Arc, time::Duration}; + +use rstest::rstest; +use tokio::sync::mpsc; + +use common::{ + chain::{ + config::{self}, + ChainConfig, + }, + primitives::time::Time, +}; +use logging::log; +use networking::test_helpers::{TestAddressMaker, TestTransportMaker, TestTransportTcp}; +use p2p_test_utils::{expect_no_recv, expect_recv}; +use p2p_types::socket_address::SocketAddress; +use randomness::Rng; +use test_utils::{ + assert_matches, assert_matches_return_val, + random::{make_seedable_rng, Seed}, + BasicTestTimeGetter, +}; +use utils::tokio_spawn_in_current_tracing_span; + +use crate::{ + config::P2pConfig, + disconnection_reason::DisconnectionReason, + error::{DialError, P2pError}, + net::{ + default_backend::types::{Command, Message}, + types::{ConnectivityEvent, PeerManagerMessageExt, PeerManagerMessageExtTag, PeerRole}, + }, + peer_manager::{ + self, + config::PeerManagerConfig, + peerdb::address_data::AddressState, + test_utils::{add_reserved_peer, wait_for_heartbeat}, + tests::{ + make_standalone_peer_manager, + utils::{ + connection_closed, expect_cmd_connect_to, + inbound_full_relay_peer_accepted_by_backend, mutate_peer_manager, + outbound_peer_accepted_by_backend, query_peer_manager, start_manually_connecting, + }, + }, + }, + test_helpers::test_p2p_config_with_peer_mgr_config, + tests::helpers::PeerManagerNotification, + types::peer_id::PeerId, + PeerManagerEvent, +}; + +// Check the case of an automatic connection failure when the peer address would end up in the +// `Disconnected` state. +// 1) Set up the peer manager so that the connection of the corresponding type would occur. +// 2) For non-reserved connections, force-set peer address's `was_reachable` field to true +// (otherwise the address would be put into `Unreachable` state after the first unsuccessful +// connection attempt). For reserved connections try both variants. +// 3) In a loop, advance the time so that the corresponding connection would be attempted; +// make the connection fail; check that `fail_count` has been incremented. +// 4) Occasionally, accept an inbound connection from the peer address; expect that it doesn't +// affect the current `fail_count`. +// 5) On the last iteration of the loop make the connection succeed; check that `fail_count` has been +// set to zero. +#[tracing::instrument(skip(seed))] +#[rstest] +#[trace] +#[case(Seed::from_entropy())] +#[tokio::test] +async fn auto_connection_fails_peer_state_becomes_disconnected( + #[case] seed: Seed, + #[values( + (AutoConnType::FullRelay, true), + (AutoConnType::BlockRelay, true), + (AutoConnType::Reserved, true), + (AutoConnType::Reserved, false), + (AutoConnType::Feeler, true), + )] + (conn_type, was_reachable): (AutoConnType, bool), +) { + let mut rng = make_seedable_rng(seed); + + let chain_config = Arc::new(config::create_unit_test_config()); + let feeler_connections_interval = Duration::from_secs(rng.gen_range(1..100)); + let p2p_config = make_p2p_config_for_auto_connection(conn_type, feeler_connections_interval); + let time_getter = BasicTestTimeGetter::new(); + + let bind_address: SocketAddress = TestTransportTcp::make_address().into(); + let ( + peer_mgr, + conn_event_sender, + peer_mgr_event_sender, + mut cmd_receiver, + mut peer_mgr_notification_receiver, + ) = make_standalone_peer_manager( + Arc::clone(&chain_config), + Arc::clone(&p2p_config), + vec![bind_address], + time_getter.get_time_getter(), + make_seedable_rng(rng.gen()), + ); + + let peer_address: SocketAddress = TestAddressMaker::new_random_address(&mut rng).into(); + + let is_feeler_connection = matches!(conn_type, AutoConnType::Feeler); + let is_reserved_connection = matches!(conn_type, AutoConnType::Reserved); + let is_block_relay_connection = matches!(conn_type, AutoConnType::BlockRelay); + + let peer_mgr_join_handle = tokio_spawn_in_current_tracing_span( + async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }, + "", + ); + + wait_for_heartbeat(&mut peer_mgr_notification_receiver).await; + + let peer_discovery_time = time_getter.get_time_getter().get_time(); + discover_peer(&peer_mgr_event_sender, peer_address, was_reachable).await; + + let peer_addr_state = + expect_peer_state_disconnected(&peer_mgr_event_sender, peer_address).await; + assert_eq!( + peer_addr_state, + AddressStateDisconnected { + was_reachable, + fail_count: 0, + next_connect_after: peer_discovery_time, + connections_without_activity_count: 0, + } + ); + + if is_reserved_connection { + add_reserved_peer(&peer_mgr_event_sender, peer_address.socket_addr().into()).await; + } + + let num_iterations = rng.gen_range(5..10); + let mut last_connect_after = Time::from_duration_since_epoch(Duration::ZERO); + + for i in 0..num_iterations { + log::debug!("Running iteration {i} out of {num_iterations}"); + + let time_before_wait = time_getter.get_time_getter().get_time(); + let connect_after = if is_feeler_connection { + let next_feeler_connection_time = + query_peer_manager(&peer_mgr_event_sender, move |peer_mgr| { + peer_mgr.next_feeler_connection_time() + }) + .await; + + std::cmp::max(last_connect_after, next_feeler_connection_time) + } else { + last_connect_after + }; + + let wait_duration = std::cmp::max( + connect_after + .as_duration_since_epoch() + .checked_sub(time_before_wait.as_duration_since_epoch()) + .unwrap_or(Duration::ZERO), + peer_manager::HEARTBEAT_INTERVAL_MAX, + ); + + time_getter.advance_time_rounded_up(wait_duration); + wait_for_heartbeat(&mut peer_mgr_notification_receiver).await; + + let cmd = expect_recv!(cmd_receiver); + expect_cmd_connect_to(&cmd, &peer_address); + + let is_last_iteration = i == num_iterations - 1; + + if is_last_iteration { + let peer_id = outbound_peer_accepted_by_backend( + &conn_event_sender, + peer_address, + bind_address, + &chain_config, + !is_block_relay_connection, + ); + + peer_accepted_by_peer_mgr( + &mut cmd_receiver, + &mut peer_mgr_notification_receiver, + peer_id, + peer_address, + conn_type.to_peer_role(), + ) + .await; + + close_connection( + &conn_event_sender, + &mut peer_mgr_notification_receiver, + peer_id, + ) + .await; + } else { + let connection_error = P2pError::DialError(DialError::ConnectionRefusedOrTimedOut); + + conn_event_sender + .send(ConnectivityEvent::ConnectionError { + peer_address, + error: connection_error.clone(), + }) + .unwrap(); + + let peer_mgr_notification = expect_recv!(peer_mgr_notification_receiver); + assert_eq!( + peer_mgr_notification, + PeerManagerNotification::OutboundError { + address: peer_address, + error: connection_error.clone() + } + ); + + // An inbound connection shouldn't change the outcome even if it's successful. + if rng.gen_bool(0.5) { + log::debug!("Accepting an extra inbound connection"); + + inbound_full_relay_peer_connected_and_disconnected( + &conn_event_sender, + &mut cmd_receiver, + &mut peer_mgr_notification_receiver, + peer_address, + bind_address, + &chain_config, + ) + .await; + } + } + + let expected_fail_count = if is_last_iteration { 0 } else { i + 1 }; + let expected_connections_without_activity_count = + if is_last_iteration && !is_feeler_connection { + 1 + } else { + 0 + }; + + let time_after_wait = time_getter.get_time_getter().get_time(); + + let peer_addr_state = + expect_peer_state_disconnected(&peer_mgr_event_sender, peer_address).await; + assert_eq!( + peer_addr_state.was_reachable, + was_reachable || is_last_iteration + ); + assert_eq!(peer_addr_state.fail_count, expected_fail_count); + assert!(peer_addr_state.next_connect_after > time_after_wait); + assert_eq!( + peer_addr_state.connections_without_activity_count, + expected_connections_without_activity_count + ); + + last_connect_after = peer_addr_state.next_connect_after; + } + + drop(conn_event_sender); + drop(peer_mgr_event_sender); + + let _peer_mgr = peer_mgr_join_handle.await.unwrap(); +} + +// Check the case of an automatic connection failure when the peer address would end up in the +// `Unreachable` state. +// 1) Set up the peer manager so that the connection of the corresponding type would occur. +// Keep peer address's `was_reachable` field at false. +// 3) Advance the time so that the corresponding connection would be attempted; make the +// connection fail; check that the peer address state is now `Unreachable`. +// 4) Optionally, accept an inbound connection from the peer address; expect that it doesn't +// affect the current address state. +// 5) Check that no further connection attempts are made. +#[tracing::instrument(skip(seed))] +#[rstest] +#[trace] +#[case(Seed::from_entropy())] +#[tokio::test] +async fn auto_connection_fails_peer_state_becomes_unreachable( + #[case] seed: Seed, + #[values( + AutoConnType::FullRelay, + AutoConnType::BlockRelay, + AutoConnType::Feeler + )] + conn_type: AutoConnType, +) { + let mut rng = make_seedable_rng(seed); + + let chain_config = Arc::new(config::create_unit_test_config()); + let feeler_connections_interval = Duration::from_secs(rng.gen_range(1..100)); + let p2p_config = make_p2p_config_for_auto_connection(conn_type, feeler_connections_interval); + let time_getter = BasicTestTimeGetter::new(); + + let bind_address: SocketAddress = TestTransportTcp::make_address().into(); + let ( + peer_mgr, + conn_event_sender, + peer_mgr_event_sender, + mut cmd_receiver, + mut peer_mgr_notification_receiver, + ) = make_standalone_peer_manager( + Arc::clone(&chain_config), + Arc::clone(&p2p_config), + vec![bind_address], + time_getter.get_time_getter(), + make_seedable_rng(rng.gen()), + ); + + let peer_address: SocketAddress = TestAddressMaker::new_random_address(&mut rng).into(); + + let is_feeler_connection = matches!(conn_type, AutoConnType::Feeler); + let is_reserved_connection = matches!(conn_type, AutoConnType::Reserved); + + let peer_mgr_join_handle = tokio_spawn_in_current_tracing_span( + async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }, + "", + ); + + wait_for_heartbeat(&mut peer_mgr_notification_receiver).await; + + let peer_discovery_time = time_getter.get_time_getter().get_time(); + discover_peer(&peer_mgr_event_sender, peer_address, false).await; + + let peer_addr_state = + expect_peer_state_disconnected(&peer_mgr_event_sender, peer_address).await; + assert_eq!( + peer_addr_state, + AddressStateDisconnected { + was_reachable: false, + fail_count: 0, + next_connect_after: peer_discovery_time, + connections_without_activity_count: 0 + } + ); + + if is_reserved_connection { + add_reserved_peer(&peer_mgr_event_sender, peer_address.socket_addr().into()).await; + } + + let time_before_wait = time_getter.get_time_getter().get_time(); + let connect_after = if is_feeler_connection { + query_peer_manager(&peer_mgr_event_sender, move |peer_mgr| { + peer_mgr.next_feeler_connection_time() + }) + .await + } else { + Time::from_duration_since_epoch(Duration::ZERO) + }; + + let wait_duration = std::cmp::max( + connect_after + .as_duration_since_epoch() + .checked_sub(time_before_wait.as_duration_since_epoch()) + .unwrap_or(Duration::ZERO), + peer_manager::HEARTBEAT_INTERVAL_MAX, + ); + + time_getter.advance_time_rounded_up(wait_duration); + wait_for_heartbeat(&mut peer_mgr_notification_receiver).await; + + let cmd = expect_recv!(cmd_receiver); + expect_cmd_connect_to(&cmd, &peer_address); + + let connection_error = P2pError::DialError(DialError::ConnectionRefusedOrTimedOut); + + conn_event_sender + .send(ConnectivityEvent::ConnectionError { + peer_address, + error: connection_error.clone(), + }) + .unwrap(); + + let peer_mgr_notification = expect_recv!(peer_mgr_notification_receiver); + assert_eq!( + peer_mgr_notification, + PeerManagerNotification::OutboundError { + address: peer_address, + error: connection_error.clone() + } + ); + + // An inbound connection shouldn't change the outcome even if it's successful. + if rng.gen_bool(0.5) { + log::debug!("Accepting an extra inbound connection"); + + inbound_full_relay_peer_connected_and_disconnected( + &conn_event_sender, + &mut cmd_receiver, + &mut peer_mgr_notification_receiver, + peer_address, + bind_address, + &chain_config, + ) + .await; + } + + let time_after_wait = time_getter.get_time_getter().get_time(); + + let peer_addr_state = expect_peer_state_unreachable(&peer_mgr_event_sender, peer_address).await; + assert!(peer_addr_state.erase_after > time_after_wait); + + // No further connection attempts should be made. + time_getter.advance_time(peer_manager::HEARTBEAT_INTERVAL_MAX * 1000); + wait_for_heartbeat(&mut peer_mgr_notification_receiver).await; + expect_no_recv!(cmd_receiver); + + drop(conn_event_sender); + drop(peer_mgr_event_sender); + + let _peer_mgr = peer_mgr_join_handle.await.unwrap(); +} + +// Check the case of a manual connection failure. +// 1) Setup the peer manager, optionally force-setting the peer address's `was_reachable` field +// to true. +// 2) In a loop, attempt a manual connection to the peer and make it fail. Check that: +// a) if `was_reachable` was true, the resulting address state is `Disconnected` and `fail_count` +// has been incremented; +// b) otherwise the resulting address state is `Unreachable`. +// 3) Occasionally, accept an inbound connection from the peer address; expect that it doesn't +// affect the current address state. +// 4) On the last iteration make the connection succeed. Check that the resulting address state is +// now `Disconnected` with zero `fail_count`. +#[tracing::instrument(skip(seed))] +#[rstest] +#[trace] +#[case(Seed::from_entropy())] +#[tokio::test] +async fn manual_connection_fails(#[case] seed: Seed, #[values(false, true)] make_reachable: bool) { + let mut rng = make_seedable_rng(seed); + + let chain_config = Arc::new(config::create_unit_test_config()); + let p2p_config = { + let peer_mgr_config = PeerManagerConfig { + outbound_block_relay_count: 0.into(), + outbound_block_relay_extra_count: 0.into(), + outbound_full_relay_count: 0.into(), + outbound_full_relay_extra_count: 0.into(), + enable_feeler_connections: false.into(), + + max_inbound_connections: Default::default(), + preserved_inbound_count_address_group: Default::default(), + preserved_inbound_count_ping: Default::default(), + preserved_inbound_count_new_blocks: Default::default(), + preserved_inbound_count_new_transactions: Default::default(), + outbound_block_relay_connection_min_age: Default::default(), + outbound_full_relay_connection_min_age: Default::default(), + stale_tip_time_diff: Default::default(), + main_loop_tick_interval: Default::default(), + feeler_connections_interval: Default::default(), + force_dns_query_if_no_global_addresses_known: Default::default(), + allow_same_ip_connections: Default::default(), + peerdb_config: Default::default(), + min_peer_software_version: Default::default(), + }; + + Arc::new(test_p2p_config_with_peer_mgr_config(peer_mgr_config)) + }; + + let time_getter = BasicTestTimeGetter::new(); + + let bind_address: SocketAddress = TestTransportTcp::make_address().into(); + let ( + peer_mgr, + conn_event_sender, + peer_mgr_event_sender, + mut cmd_receiver, + mut peer_mgr_notification_receiver, + ) = make_standalone_peer_manager( + Arc::clone(&chain_config), + Arc::clone(&p2p_config), + vec![bind_address], + time_getter.get_time_getter(), + make_seedable_rng(rng.gen()), + ); + + let peer_address: SocketAddress = TestAddressMaker::new_random_address(&mut rng).into(); + + let peer_mgr_join_handle = tokio_spawn_in_current_tracing_span( + async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }, + "", + ); + + wait_for_heartbeat(&mut peer_mgr_notification_receiver).await; + + if make_reachable { + let peer_discovery_time = time_getter.get_time_getter().get_time(); + discover_peer(&peer_mgr_event_sender, peer_address, true).await; + + let peer_addr_state = + expect_peer_state_disconnected(&peer_mgr_event_sender, peer_address).await; + assert_eq!( + peer_addr_state, + AddressStateDisconnected { + was_reachable: true, + fail_count: 0, + next_connect_after: peer_discovery_time, + connections_without_activity_count: 0 + } + ); + } else { + expect_no_peer_state(&peer_mgr_event_sender, peer_address).await; + } + + let num_iterations = rng.gen_range(5..10); + let mut erase_after = None; + + let now = time_getter.get_time_getter().get_time(); + + for i in 0..num_iterations { + log::debug!("Running iteration {i} out of {num_iterations}"); + + let is_last_iteration = i == num_iterations - 1; + + if is_last_iteration { + start_and_close_manual_connection( + &conn_event_sender, + &peer_mgr_event_sender, + &mut cmd_receiver, + &mut peer_mgr_notification_receiver, + peer_address, + bind_address, + &chain_config, + ) + .await; + } else { + attempt_and_fail_manual_connection( + &conn_event_sender, + &peer_mgr_event_sender, + &mut cmd_receiver, + &mut peer_mgr_notification_receiver, + peer_address, + ) + .await; + + // An inbound connection shouldn't change the outcome even if it's successful. + if rng.gen_bool(0.5) { + log::debug!("Accepting an extra inbound connection"); + + inbound_full_relay_peer_connected_and_disconnected( + &conn_event_sender, + &mut cmd_receiver, + &mut peer_mgr_notification_receiver, + peer_address, + bind_address, + &chain_config, + ) + .await; + } + } + + if make_reachable || is_last_iteration { + let fail_count = if is_last_iteration { 0 } else { i + 1 }; + let peer_addr_state = + expect_peer_state_disconnected(&peer_mgr_event_sender, peer_address).await; + assert!(peer_addr_state.was_reachable); + assert_eq!(peer_addr_state.fail_count, fail_count); + assert!(peer_addr_state.next_connect_after > now); + assert_eq!(peer_addr_state.connections_without_activity_count, 0); + } else { + let peer_addr_state = + expect_peer_state_unreachable(&peer_mgr_event_sender, peer_address).await; + + if i == 0 { + assert!(peer_addr_state.erase_after > now); + erase_after = Some(peer_addr_state.erase_after); + } else { + // For unreachable addresses erase_after doesn't change on further unsuccessful connection attempts. + assert_eq!(peer_addr_state.erase_after, erase_after.unwrap()); + } + } + } + + drop(conn_event_sender); + drop(peer_mgr_event_sender); + + let _peer_mgr = peer_mgr_join_handle.await.unwrap(); +} + +// Check the case of a successful automatic connection without any peer activity. +// 1) Set up the peer manager so that the connection of the corresponding type would occur. +// 2) In a loop, advance the time so that the corresponding connection would be attempted. +// Make the connection succeed and close it immediately; check that `connections_without_activity_count` +// has been incremented. +// 3) Occasionally, make a successful incoming or manual outgoing connection without peer activity, or an unsuccessful +// manual outgoing connection. Expect that this doesn't affect `connections_without_activity_count`. +// 4) On the final iteration make the peer actually send a message. Check that `connections_without_activity_count` has +// been reset to zero. +// Note that feeler connections are not checked in this test because once a feeler connection succeeds, it won't +// be attempted again. +#[tracing::instrument(skip(seed))] +#[rstest] +#[trace] +#[case(Seed::from_entropy())] +#[tokio::test] +async fn auto_connection_without_peer_activity( + #[case] seed: Seed, + #[values( + AutoConnType::FullRelay, + AutoConnType::BlockRelay, + AutoConnType::Reserved + )] + conn_type: AutoConnType, +) { + let mut rng = make_seedable_rng(seed); + + let chain_config = Arc::new(config::create_unit_test_config()); + let p2p_config = make_p2p_config_for_auto_connection(conn_type, Duration::ZERO); + let time_getter = BasicTestTimeGetter::new(); + + let bind_address: SocketAddress = TestTransportTcp::make_address().into(); + let ( + peer_mgr, + conn_event_sender, + peer_mgr_event_sender, + mut cmd_receiver, + mut peer_mgr_notification_receiver, + ) = make_standalone_peer_manager( + Arc::clone(&chain_config), + Arc::clone(&p2p_config), + vec![bind_address], + time_getter.get_time_getter(), + make_seedable_rng(rng.gen()), + ); + + let peer_address: SocketAddress = TestAddressMaker::new_random_address(&mut rng).into(); + + let is_reserved_connection = matches!(conn_type, AutoConnType::Reserved); + let is_block_relay_connection = matches!(conn_type, AutoConnType::BlockRelay); + let is_full_relay_connection = matches!(conn_type, AutoConnType::FullRelay); + + let peer_mgr_join_handle = tokio_spawn_in_current_tracing_span( + async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }, + "", + ); + + wait_for_heartbeat(&mut peer_mgr_notification_receiver).await; + + let peer_discovery_time = time_getter.get_time_getter().get_time(); + discover_peer(&peer_mgr_event_sender, peer_address, false).await; + + let peer_addr_state = + expect_peer_state_disconnected(&peer_mgr_event_sender, peer_address).await; + assert_eq!( + peer_addr_state, + AddressStateDisconnected { + was_reachable: false, + fail_count: 0, + next_connect_after: peer_discovery_time, + connections_without_activity_count: 0 + } + ); + + if is_reserved_connection { + add_reserved_peer(&peer_mgr_event_sender, peer_address.socket_addr().into()).await; + } + + let num_iterations = rng.gen_range(5..10); + let mut last_connect_after = Time::from_duration_since_epoch(Duration::ZERO); + + for i in 0..num_iterations { + log::debug!("Running iteration {i} out of {num_iterations}"); + + let time_before_wait = time_getter.get_time_getter().get_time(); + let wait_duration = std::cmp::max( + last_connect_after + .as_duration_since_epoch() + .checked_sub(time_before_wait.as_duration_since_epoch()) + .unwrap_or(Duration::ZERO), + peer_manager::HEARTBEAT_INTERVAL_MAX, + ); + + time_getter.advance_time_rounded_up(wait_duration); + wait_for_heartbeat(&mut peer_mgr_notification_receiver).await; + + let cmd = expect_recv!(cmd_receiver); + expect_cmd_connect_to(&cmd, &peer_address); + + let peer_id = outbound_peer_accepted_by_backend( + &conn_event_sender, + peer_address, + bind_address, + &chain_config, + !is_block_relay_connection, + ); + + peer_accepted_by_peer_mgr( + &mut cmd_receiver, + &mut peer_mgr_notification_receiver, + peer_id, + peer_address, + conn_type.to_peer_role(), + ) + .await; + + if is_reserved_connection || is_full_relay_connection { + // If the case of a reserved or full relay connection, the peer manager will send + // AddrListRequest to the peer. + let cmd = expect_recv!(cmd_receiver); + let (peer_id_in_cmd, message) = assert_matches_return_val!( + cmd, + Command::SendMessage { peer_id, message }, + (peer_id, message) + ); + assert_eq!(peer_id_in_cmd, peer_id); + assert_matches!(message, Message::AddrListRequest(_)); + } + + let is_last_iteration = i == num_iterations - 1; + + if is_last_iteration { + conn_event_sender + .send(ConnectivityEvent::Message { + peer_id, + message: PeerManagerMessageExt::FirstSyncMessageReceived, + }) + .unwrap(); + + let peer_mgr_notification = expect_recv!(peer_mgr_notification_receiver); + assert_eq!( + peer_mgr_notification, + PeerManagerNotification::MessageReceived { + peer_id, + message_tag: PeerManagerMessageExtTag::FirstSyncMessageReceived + } + ); + } + + close_connection( + &conn_event_sender, + &mut peer_mgr_notification_receiver, + peer_id, + ) + .await; + + // An inbound or manual connection without peer activity shouldn't affect connections_without_activity_count. + // Same for a failed outbound connection. + let extra_outbound_connection_failed = match rng.gen_range(0..4) { + 0 => { + log::debug!("Accepting an extra inbound connection"); + + inbound_full_relay_peer_connected_and_disconnected( + &conn_event_sender, + &mut cmd_receiver, + &mut peer_mgr_notification_receiver, + peer_address, + bind_address, + &chain_config, + ) + .await; + + false + } + 1 => { + log::debug!("Making an extra successful manual outbound connection"); + + start_and_close_manual_connection( + &conn_event_sender, + &peer_mgr_event_sender, + &mut cmd_receiver, + &mut peer_mgr_notification_receiver, + peer_address, + bind_address, + &chain_config, + ) + .await; + + false + } + 2 => { + log::debug!("Making an extra unsuccessful manual outbound connection"); + + attempt_and_fail_manual_connection( + &conn_event_sender, + &peer_mgr_event_sender, + &mut cmd_receiver, + &mut peer_mgr_notification_receiver, + peer_address, + ) + .await; + + true + } + + _ => false, + }; + + let expected_connections_without_activity_count = if is_last_iteration { 0 } else { i + 1 }; + let expected_fail_count = if extra_outbound_connection_failed { + 1 + } else { + 0 + }; + + let time_after_wait = time_getter.get_time_getter().get_time(); + + let peer_addr_state = + expect_peer_state_disconnected(&peer_mgr_event_sender, peer_address).await; + assert!(peer_addr_state.was_reachable); + assert_eq!(peer_addr_state.fail_count, expected_fail_count); + assert!(peer_addr_state.next_connect_after > time_after_wait); + assert_eq!( + peer_addr_state.connections_without_activity_count, + expected_connections_without_activity_count + ); + + last_connect_after = peer_addr_state.next_connect_after; + } + + drop(conn_event_sender); + drop(peer_mgr_event_sender); + + let _peer_mgr = peer_mgr_join_handle.await.unwrap(); +} + +// Check the case of a successful feeler connection without any peer activity. +// 1) Set up the peer manager so that a feeler connection would occur. +// 2) Advance the time so that the connection would be attempted. +// Make it succeed and check that disconnection is initiated immediately. Close the connection. +// 3) `connections_without_activity_count` should remain zero. +#[tracing::instrument(skip(seed))] +#[rstest] +#[trace] +#[case(Seed::from_entropy())] +#[tokio::test] +async fn feeler_connection_without_peer_activity(#[case] seed: Seed) { + let mut rng = make_seedable_rng(seed); + + let chain_config = Arc::new(config::create_unit_test_config()); + let feeler_connections_interval = Duration::from_secs(rng.gen_range(1..100)); + let p2p_config = + make_p2p_config_for_auto_connection(AutoConnType::Feeler, feeler_connections_interval); + let time_getter = BasicTestTimeGetter::new(); + + let bind_address: SocketAddress = TestTransportTcp::make_address().into(); + let ( + peer_mgr, + conn_event_sender, + peer_mgr_event_sender, + mut cmd_receiver, + mut peer_mgr_notification_receiver, + ) = make_standalone_peer_manager( + Arc::clone(&chain_config), + Arc::clone(&p2p_config), + vec![bind_address], + time_getter.get_time_getter(), + make_seedable_rng(rng.gen()), + ); + + let peer_address: SocketAddress = TestAddressMaker::new_random_address(&mut rng).into(); + + let peer_mgr_join_handle = tokio_spawn_in_current_tracing_span( + async move { + let mut peer_mgr = peer_mgr; + let _ = peer_mgr.run_internal(None).await; + peer_mgr + }, + "", + ); + + wait_for_heartbeat(&mut peer_mgr_notification_receiver).await; + + let peer_discovery_time = time_getter.get_time_getter().get_time(); + discover_peer(&peer_mgr_event_sender, peer_address, false).await; + + let peer_addr_state = + expect_peer_state_disconnected(&peer_mgr_event_sender, peer_address).await; + assert_eq!( + peer_addr_state, + AddressStateDisconnected { + was_reachable: false, + fail_count: 0, + next_connect_after: peer_discovery_time, + connections_without_activity_count: 0 + } + ); + + let time_before_wait = time_getter.get_time_getter().get_time(); + let next_feeler_connection_time = query_peer_manager(&peer_mgr_event_sender, move |peer_mgr| { + peer_mgr.next_feeler_connection_time() + }) + .await; + let wait_duration = std::cmp::max( + next_feeler_connection_time + .as_duration_since_epoch() + .checked_sub(time_before_wait.as_duration_since_epoch()) + .unwrap_or(Duration::ZERO), + peer_manager::HEARTBEAT_INTERVAL_MAX, + ); + + time_getter.advance_time_rounded_up(wait_duration); + wait_for_heartbeat(&mut peer_mgr_notification_receiver).await; + + let cmd = expect_recv!(cmd_receiver); + expect_cmd_connect_to(&cmd, &peer_address); + + let peer_id = outbound_peer_accepted_by_backend( + &conn_event_sender, + peer_address, + bind_address, + &chain_config, + true, + ); + + peer_accepted_by_peer_mgr( + &mut cmd_receiver, + &mut peer_mgr_notification_receiver, + peer_id, + peer_address, + PeerRole::Feeler, + ) + .await; + + // Since it's a feeler connection, the peer manager will initiate disconnection right away. + let cmd = expect_recv!(cmd_receiver); + assert_eq!( + cmd, + Command::Disconnect { + peer_id, + reason: Some(DisconnectionReason::FeelerConnection) + } + ); + + close_connection( + &conn_event_sender, + &mut peer_mgr_notification_receiver, + peer_id, + ) + .await; + + let time_after_wait = time_getter.get_time_getter().get_time(); + + let peer_addr_state = + expect_peer_state_disconnected(&peer_mgr_event_sender, peer_address).await; + assert!(peer_addr_state.was_reachable); + assert_eq!(peer_addr_state.fail_count, 0); + assert!(peer_addr_state.next_connect_after > time_after_wait); + assert_eq!(peer_addr_state.connections_without_activity_count, 0); + + drop(conn_event_sender); + drop(peer_mgr_event_sender); + + let _peer_mgr = peer_mgr_join_handle.await.unwrap(); +} + +#[derive(Debug, Copy, Clone)] +enum AutoConnType { + FullRelay, + BlockRelay, + Reserved, + Feeler, +} + +impl AutoConnType { + fn to_peer_role(self) -> PeerRole { + match self { + AutoConnType::FullRelay => PeerRole::OutboundFullRelay, + AutoConnType::BlockRelay => PeerRole::OutboundBlockRelay, + AutoConnType::Reserved => PeerRole::OutboundReserved, + AutoConnType::Feeler => PeerRole::Feeler, + } + } +} + +fn make_p2p_config_for_auto_connection( + conn_type: AutoConnType, + feeler_connections_interval: Duration, +) -> Arc { + let mut outbound_block_relay_count = 0; + let mut outbound_full_relay_count = 0; + let mut enable_feeler_connections = false; + + match conn_type { + AutoConnType::FullRelay => { + outbound_full_relay_count = 1; + } + AutoConnType::BlockRelay => { + outbound_block_relay_count = 1; + } + AutoConnType::Feeler => { + enable_feeler_connections = true; + } + AutoConnType::Reserved => {} + }; + + let peer_mgr_config = PeerManagerConfig { + outbound_block_relay_count: outbound_block_relay_count.into(), + outbound_block_relay_extra_count: 0.into(), + outbound_full_relay_count: outbound_full_relay_count.into(), + outbound_full_relay_extra_count: 0.into(), + enable_feeler_connections: enable_feeler_connections.into(), + feeler_connections_interval: feeler_connections_interval.into(), + + max_inbound_connections: Default::default(), + preserved_inbound_count_address_group: Default::default(), + preserved_inbound_count_ping: Default::default(), + preserved_inbound_count_new_blocks: Default::default(), + preserved_inbound_count_new_transactions: Default::default(), + outbound_block_relay_connection_min_age: Default::default(), + outbound_full_relay_connection_min_age: Default::default(), + stale_tip_time_diff: Default::default(), + main_loop_tick_interval: Default::default(), + force_dns_query_if_no_global_addresses_known: Default::default(), + allow_same_ip_connections: Default::default(), + peerdb_config: Default::default(), + min_peer_software_version: Default::default(), + }; + + Arc::new(test_p2p_config_with_peer_mgr_config(peer_mgr_config)) +} + +async fn discover_peer( + peer_mgr_event_sender: &mpsc::UnboundedSender, + peer_address: SocketAddress, + make_reachable: bool, +) { + mutate_peer_manager(peer_mgr_event_sender, move |peer_mgr| { + peer_mgr.peer_db_mut().peer_discovered(peer_address); + + if make_reachable { + let addr_data = peer_mgr.peer_db_mut().address_data_mut(&peer_address).unwrap(); + + let was_reachable = assert_matches_return_val!( + addr_data.state_mut(), + AddressState::Disconnected { was_reachable, .. }, + was_reachable + ); + + *was_reachable = true + } + }) + .await; +} + +// Contents of AddressState::Disconnected and also the `connections_without_activity_count` +// field that is part of `AddressData`. +#[derive(Eq, PartialEq, Clone, Debug)] +struct AddressStateDisconnected { + was_reachable: bool, + fail_count: u32, + next_connect_after: Time, + connections_without_activity_count: u32, +} + +// Contents of AddressState::Unreachable +#[derive(Eq, PartialEq, Clone, Debug)] +struct AddressStateUnreachable { + erase_after: Time, +} + +#[must_use] +async fn expect_peer_state_disconnected( + peer_mgr_event_sender: &mpsc::UnboundedSender, + peer_address: SocketAddress, +) -> AddressStateDisconnected { + query_peer_manager(peer_mgr_event_sender, move |peer_mgr| { + let addr_data = peer_mgr.peer_db().address_data(&peer_address).unwrap(); + assert_matches_return_val!( + addr_data.state().clone(), + AddressState::Disconnected { + was_reachable, + fail_count, + next_connect_after + }, + AddressStateDisconnected { + was_reachable, + fail_count, + next_connect_after, + connections_without_activity_count: addr_data.connections_without_activity_count() + } + ) + }) + .await +} + +#[must_use] +async fn expect_peer_state_unreachable( + peer_mgr_event_sender: &mpsc::UnboundedSender, + peer_address: SocketAddress, +) -> AddressStateUnreachable { + query_peer_manager(peer_mgr_event_sender, move |peer_mgr| { + let addr_data = peer_mgr.peer_db().address_data(&peer_address).unwrap(); + assert_matches_return_val!( + addr_data.state().clone(), + AddressState::Unreachable { erase_after }, + AddressStateUnreachable { erase_after } + ) + }) + .await +} + +async fn expect_no_peer_state( + peer_mgr_event_sender: &mpsc::UnboundedSender, + peer_address: SocketAddress, +) { + query_peer_manager(peer_mgr_event_sender, move |peer_mgr| { + assert!(peer_mgr.peer_db().address_data(&peer_address).is_none()); + }) + .await +} + +async fn peer_accepted_by_peer_mgr( + cmd_receiver: &mut mpsc::UnboundedReceiver, + peer_mgr_notification_receiver: &mut mpsc::UnboundedReceiver, + peer_id: PeerId, + peer_address: SocketAddress, + peer_role: PeerRole, +) { + let cmd = expect_recv!(cmd_receiver); + assert_eq!(cmd, Command::Accept { peer_id }); + + let peer_mgr_notification = expect_recv!(peer_mgr_notification_receiver); + assert_eq!( + peer_mgr_notification, + PeerManagerNotification::ConnectionAccepted { + address: peer_address, + peer_role + } + ); +} + +async fn close_connection( + conn_event_sender: &mpsc::UnboundedSender, + peer_mgr_notification_receiver: &mut mpsc::UnboundedReceiver, + peer_id: PeerId, +) { + connection_closed(conn_event_sender, peer_id); + + let peer_mgr_notification = expect_recv!(peer_mgr_notification_receiver); + assert_eq!( + peer_mgr_notification, + PeerManagerNotification::ConnectionClosed { peer_id } + ); +} + +async fn inbound_full_relay_peer_connected_and_disconnected( + conn_event_sender: &mpsc::UnboundedSender, + cmd_receiver: &mut mpsc::UnboundedReceiver, + peer_mgr_notification_receiver: &mut mpsc::UnboundedReceiver, + peer_address: SocketAddress, + bind_address: SocketAddress, + chain_config: &ChainConfig, +) { + let peer_id = inbound_full_relay_peer_accepted_by_backend( + conn_event_sender, + peer_address, + bind_address, + chain_config, + ); + peer_accepted_by_peer_mgr( + cmd_receiver, + peer_mgr_notification_receiver, + peer_id, + peer_address, + PeerRole::Inbound, + ) + .await; + + close_connection(conn_event_sender, peer_mgr_notification_receiver, peer_id).await; +} + +async fn start_and_close_manual_connection( + conn_event_sender: &mpsc::UnboundedSender, + peer_mgr_event_sender: &mpsc::UnboundedSender, + cmd_receiver: &mut mpsc::UnboundedReceiver, + peer_mgr_notification_receiver: &mut mpsc::UnboundedReceiver, + peer_address: SocketAddress, + bind_address: SocketAddress, + chain_config: &ChainConfig, +) { + let result_recv = start_manually_connecting(peer_mgr_event_sender, peer_address); + + let cmd = expect_recv!(cmd_receiver); + expect_cmd_connect_to(&cmd, &peer_address); + + let peer_id = outbound_peer_accepted_by_backend( + conn_event_sender, + peer_address, + bind_address, + chain_config, + true, + ); + + peer_accepted_by_peer_mgr( + cmd_receiver, + peer_mgr_notification_receiver, + peer_id, + peer_address, + PeerRole::OutboundManual, + ) + .await; + + result_recv.await.unwrap().unwrap(); + + // Since it's a manual connection, the peer manager will send AddrListRequest to the peer. + let cmd = expect_recv!(cmd_receiver); + let (peer_id_in_cmd, message) = assert_matches_return_val!( + cmd, + Command::SendMessage { peer_id, message }, + (peer_id, message) + ); + assert_eq!(peer_id_in_cmd, peer_id); + assert_matches!(message, Message::AddrListRequest(_)); + + close_connection(conn_event_sender, peer_mgr_notification_receiver, peer_id).await; +} + +async fn attempt_and_fail_manual_connection( + conn_event_sender: &mpsc::UnboundedSender, + peer_mgr_event_sender: &mpsc::UnboundedSender, + cmd_receiver: &mut mpsc::UnboundedReceiver, + peer_mgr_notification_receiver: &mut mpsc::UnboundedReceiver, + peer_address: SocketAddress, +) { + let result_recv = start_manually_connecting(peer_mgr_event_sender, peer_address); + + let cmd = expect_recv!(cmd_receiver); + expect_cmd_connect_to(&cmd, &peer_address); + + let connection_error = P2pError::DialError(DialError::ConnectionRefusedOrTimedOut); + + conn_event_sender + .send(ConnectivityEvent::ConnectionError { + peer_address, + error: connection_error.clone(), + }) + .unwrap(); + + let peer_mgr_notification = expect_recv!(peer_mgr_notification_receiver); + assert_eq!( + peer_mgr_notification, + PeerManagerNotification::OutboundError { + address: peer_address, + error: connection_error.clone() + } + ); + + assert_eq!(result_recv.await.unwrap(), Err(connection_error)); +} diff --git a/p2p/src/peer_manager/tests/utils.rs b/p2p/src/peer_manager/tests/utils.rs index f2dfa182e5..19e202a6c6 100644 --- a/p2p/src/peer_manager/tests/utils.rs +++ b/p2p/src/peer_manager/tests/utils.rs @@ -173,17 +173,13 @@ pub fn outbound_block_relay_peer_accepted_by_backend( bind_address: SocketAddress, chain_config: &ChainConfig, ) -> PeerId { - let peer_id = PeerId::new(); - conn_event_sender - .send(ConnectivityEvent::OutboundAccepted { - peer_address, - bind_address, - peer_info: make_block_relay_peer_info(peer_id, chain_config), - node_address_as_seen_by_peer: None, - }) - .unwrap(); - - peer_id + outbound_peer_accepted_by_backend( + conn_event_sender, + peer_address, + bind_address, + chain_config, + false, + ) } /// Send a ConnectivityEvent simulating a connection being accepted by the backend. @@ -192,13 +188,34 @@ pub fn outbound_full_relay_peer_accepted_by_backend( peer_address: SocketAddress, bind_address: SocketAddress, chain_config: &ChainConfig, +) -> PeerId { + outbound_peer_accepted_by_backend( + conn_event_sender, + peer_address, + bind_address, + chain_config, + true, + ) +} + +pub fn outbound_peer_accepted_by_backend( + conn_event_sender: &mpsc::UnboundedSender, + peer_address: SocketAddress, + bind_address: SocketAddress, + chain_config: &ChainConfig, + is_full_relay: bool, ) -> PeerId { let peer_id = PeerId::new(); + let peer_info = if is_full_relay { + make_full_relay_peer_info(peer_id, chain_config) + } else { + make_block_relay_peer_info(peer_id, chain_config) + }; conn_event_sender .send(ConnectivityEvent::OutboundAccepted { peer_address, bind_address, - peer_info: make_full_relay_peer_info(peer_id, chain_config), + peer_info, node_address_as_seen_by_peer: None, }) .unwrap(); @@ -206,6 +223,13 @@ pub fn outbound_full_relay_peer_accepted_by_backend( peer_id } +pub fn connection_closed( + conn_event_sender: &mpsc::UnboundedSender, + peer_id: PeerId, +) { + conn_event_sender.send(ConnectivityEvent::ConnectionClosed { peer_id }).unwrap(); +} + pub async fn wait_for_heartbeat( peer_mgr_notification_receiver: &mut mpsc::UnboundedReceiver, ) { @@ -306,3 +330,19 @@ pub async fn ban_peer_manually( result_receiver.await.unwrap().unwrap(); } + +pub async fn add_reserved_peer( + peer_mgr_event_sender: &mpsc::UnboundedSender, + peer_addr: SocketAddress, +) { + let (result_sender, result_receiver) = oneshot_nofail::channel(); + + peer_mgr_event_sender + .send(PeerManagerEvent::AddReserved( + IpOrSocketAddress::Socket(peer_addr.socket_addr()), + result_sender, + )) + .unwrap(); + + result_receiver.await.unwrap().unwrap(); +} diff --git a/p2p/src/tests/helpers/mod.rs b/p2p/src/tests/helpers/mod.rs index b08cf56af6..8a5e86a7a9 100644 --- a/p2p/src/tests/helpers/mod.rs +++ b/p2p/src/tests/helpers/mod.rs @@ -30,7 +30,8 @@ use p2p_types::{bannable_address::BannableAddress, socket_address::SocketAddress use test_utils::BasicTestTimeGetter; use crate::{ - net::types::{PeerInfo, PeerRole}, + error::P2pError, + net::types::{PeerInfo, PeerManagerMessageExtTag, PeerRole}, peer_manager::{self, dns_seed::DnsSeed}, }; @@ -61,6 +62,17 @@ pub enum PeerManagerNotification { address: SocketAddress, peer_role: PeerRole, }, + OutboundError { + address: SocketAddress, + error: P2pError, + }, + ConnectionClosed { + peer_id: PeerId, + }, + MessageReceived { + peer_id: PeerId, + message_tag: PeerManagerMessageExtTag, + }, } pub struct PeerManagerObserver { @@ -103,6 +115,21 @@ impl peer_manager::Observer for PeerManagerObserver { fn on_connection_accepted(&mut self, address: SocketAddress, peer_role: PeerRole) { self.send_notification(PeerManagerNotification::ConnectionAccepted { address, peer_role }); } + + fn outbound_error(&mut self, address: SocketAddress, error: P2pError) { + self.send_notification(PeerManagerNotification::OutboundError { address, error }); + } + + fn connection_closed(&mut self, peer_id: PeerId) { + self.send_notification(PeerManagerNotification::ConnectionClosed { peer_id }); + } + + fn message_received(&mut self, peer_id: PeerId, message_tag: PeerManagerMessageExtTag) { + self.send_notification(PeerManagerNotification::MessageReceived { + peer_id, + message_tag, + }); + } } #[derive(Debug, PartialEq, Eq)] diff --git a/p2p/src/tests/helpers/test_node.rs b/p2p/src/tests/helpers/test_node.rs index 162dd75c6f..69f1e4dadb 100644 --- a/p2p/src/tests/helpers/test_node.rs +++ b/p2p/src/tests/helpers/test_node.rs @@ -320,6 +320,12 @@ where self.peer_mgr_notification_receiver.try_recv().ok() } + pub fn peer_mgr_notification_receiver( + &mut self, + ) -> &mut mpsc::UnboundedReceiver { + &mut self.peer_mgr_notification_receiver + } + pub async fn get_peers_info(&self) -> TestPeersInfo { query_peer_manager(&self.peer_mgr_event_sender, |peer_mgr| { TestPeersInfo::from_peer_mgr_peer_contexts(peer_mgr.peers()) diff --git a/p2p/src/tests/mod.rs b/p2p/src/tests/mod.rs index d7529d27d5..9a56e4a5e5 100644 --- a/p2p/src/tests/mod.rs +++ b/p2p/src/tests/mod.rs @@ -24,6 +24,7 @@ mod incorrect_handshake; mod min_peer_software_version; mod misbehavior; mod peer_discovery_on_stale_tip; +mod peer_mgr_events; mod same_handshake_nonce; mod unsupported_message; mod unsupported_version; diff --git a/p2p/src/tests/peer_mgr_events.rs b/p2p/src/tests/peer_mgr_events.rs new file mode 100644 index 0000000000..6ee74df9aa --- /dev/null +++ b/p2p/src/tests/peer_mgr_events.rs @@ -0,0 +1,351 @@ +// Copyright (c) 2026 RBB S.r.l +// opensource@mintlayer.org +// SPDX-License-Identifier: MIT +// Licensed under the MIT License; +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://github.com/mintlayer/mintlayer-core/blob/master/LICENSE +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + net::{Ipv4Addr, SocketAddr, SocketAddrV4}, + sync::Arc, +}; + +use rstest::rstest; +use strum::IntoEnumIterator as _; + +use chainstate::{ChainstateConfig, Locator}; +use common::primitives::Id; +use networking::{ + test_helpers::{TestTransportChannel, TestTransportMaker}, + transport::{BufferedTranscoder, TransportListener, TransportSocket}, +}; +use p2p_test_utils::{run_with_timeout, MEDIUM_TIMEOUT, SHORT_TIMEOUT}; +use p2p_types::peer_address::PeerAddress; +use randomness::{seq::IteratorRandom as _, Rng}; +use test_utils::{ + assert_matches, + random::{gen_random_alnum_string, make_seedable_rng, Seed}, + BasicTestTimeGetter, +}; + +use crate::{ + message::{ + AddrListRequest, AddrListResponse, AnnounceAddrRequest, BlockListRequest, BlockResponse, + BlockSyncMessageTag, HeaderList, HeaderListRequest, PeerManagerMessageTag, PingRequest, + PingResponse, TransactionResponse, TransactionSyncMessageTag, WillDisconnectMessage, + }, + net::{ + default_backend::types::{HandshakeMessage, Message, P2pTimestamp}, + types::PeerManagerMessageExtTag, + }, + sync::test_helpers::make_new_block, + test_helpers::{test_p2p_config, TEST_PROTOCOL_VERSION}, + tests::helpers::{PeerManagerNotification, TestNode}, +}; + +type Transport = ::Transport; + +#[allow(clippy::enum_variant_names)] +#[derive(Debug)] +enum TestParamForFirstSyncMessageReceivedMustBeSent { + BlockSyncMsg, + TxSyncMsg, + PeerMgrMsg, +} + +// Check that once the peer sends us any block or transaction sync message, `PeerManagerMessageExt::FirstSyncMessageReceived` +// is sent to the peer manager. +// 1) Send a random message from the peer. +// 2) If the sent message was a sync one, expect that `PeerManagerNotification::MessageReceived` is emitted with +// the message equal to `PeerManagerMessageExtTag::FirstSyncMessageReceived`. If the sent message was not a sync one, +// expect that no such notification is emitted. +#[tracing::instrument(skip(seed))] +#[rstest] +#[trace] +#[case(Seed::from_entropy())] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn first_sync_message_received_must_be_sent( + #[case] seed: Seed, + #[values( + TestParamForFirstSyncMessageReceivedMustBeSent::BlockSyncMsg, + TestParamForFirstSyncMessageReceivedMustBeSent::TxSyncMsg, + TestParamForFirstSyncMessageReceivedMustBeSent::PeerMgrMsg + )] + param: TestParamForFirstSyncMessageReceivedMustBeSent, +) { + let mut rng = make_seedable_rng(seed); + + run_with_timeout(async { + let time_getter = BasicTestTimeGetter::new(); + let chain_config = Arc::new(common::chain::config::create_unit_test_config()); + let p2p_config = Arc::new(test_p2p_config()); + + let mut test_node = TestNode::::start( + true, + time_getter.clone(), + Arc::clone(&chain_config), + ChainstateConfig::new(), + Arc::clone(&p2p_config), + TestTransportChannel::make_transport(), + TestTransportChannel::make_address().into(), + TEST_PROTOCOL_VERSION.into(), + None, + make_seedable_rng(rng.gen()), + ) + .await; + + let transport = TestTransportChannel::make_transport(); + let mut listener = + transport.bind(vec![TestTransportChannel::make_address()]).await.unwrap(); + + let connect_result_receiver = + test_node.start_connecting(listener.local_addresses().unwrap()[0].into()); + + let (stream, _) = listener.accept().await.unwrap(); + + let mut msg_stream = + BufferedTranscoder::new(stream, Some(*p2p_config.protocol_config.max_message_size)); + + let msg = msg_stream.recv().await.unwrap(); + assert_matches!(msg, Message::Handshake(HandshakeMessage::Hello { .. })); + + msg_stream + .send(Message::Handshake(HandshakeMessage::HelloAck { + protocol_version: TEST_PROTOCOL_VERSION.into(), + network: *chain_config.magic_bytes(), + user_agent: p2p_config.user_agent.clone(), + software_version: *chain_config.software_version(), + services: (*p2p_config.node_type).into(), + receiver_address: None, + current_time: P2pTimestamp::from_time(time_getter.get_time_getter().get_time()), + })) + .await + .unwrap(); + + connect_result_receiver.await.unwrap().unwrap(); + + let (msg_to_send, is_sync_msg) = match param { + TestParamForFirstSyncMessageReceivedMustBeSent::BlockSyncMsg => { + let msg = match BlockSyncMessageTag::iter().choose(&mut rng).unwrap() { + BlockSyncMessageTag::HeaderListRequest => { + Message::HeaderListRequest(HeaderListRequest::new(Locator::new(vec![ + Id::random_using(&mut rng), + ]))) + } + BlockSyncMessageTag::BlockListRequest => Message::BlockListRequest( + BlockListRequest::new(vec![Id::random_using(&mut rng)]), + ), + BlockSyncMessageTag::HeaderList => { + Message::HeaderList(HeaderList::new(Vec::new())) + } + BlockSyncMessageTag::BlockResponse => { + Message::BlockResponse(BlockResponse::new(make_new_block( + &chain_config, + None, + &time_getter.get_time_getter(), + &mut rng, + ))) + } + BlockSyncMessageTag::TestSentinel => { + Message::TestBlockSyncMsgSentinel(Id::random_using(&mut rng)) + } + }; + + (msg, true) + } + TestParamForFirstSyncMessageReceivedMustBeSent::TxSyncMsg => { + let msg = match TransactionSyncMessageTag::iter().choose(&mut rng).unwrap() { + TransactionSyncMessageTag::NewTransaction => { + Message::NewTransaction(Id::random_using(&mut rng)) + } + TransactionSyncMessageTag::TransactionRequest => { + Message::TransactionRequest(Id::random_using(&mut rng)) + } + TransactionSyncMessageTag::TransactionResponse => Message::TransactionResponse( + TransactionResponse::NotFound(Id::random_using(&mut rng)), + ), + }; + + (msg, true) + } + TestParamForFirstSyncMessageReceivedMustBeSent::PeerMgrMsg => { + let msg = match PeerManagerMessageTag::iter().choose(&mut rng).unwrap() { + PeerManagerMessageTag::AddrListRequest => { + Message::AddrListRequest(AddrListRequest {}) + } + PeerManagerMessageTag::AnnounceAddrRequest => { + Message::AnnounceAddrRequest(AnnounceAddrRequest { + address: random_peer_addr(&mut rng), + }) + } + PeerManagerMessageTag::PingRequest => { + Message::PingRequest(PingRequest { nonce: rng.gen() }) + } + PeerManagerMessageTag::AddrListResponse => { + Message::AddrListResponse(AddrListResponse { + addresses: vec![random_peer_addr(&mut rng)], + }) + } + PeerManagerMessageTag::PingResponse => { + Message::PingResponse(PingResponse { nonce: rng.gen() }) + } + PeerManagerMessageTag::WillDisconnect => { + Message::WillDisconnect(WillDisconnectMessage { + reason: gen_random_alnum_string(&mut rng, 10, 20), + }) + } + }; + + (msg, false) + } + }; + + msg_stream.send(msg_to_send).await.unwrap(); + + if is_sync_msg { + tokio::time::timeout( + MEDIUM_TIMEOUT, + recv_first_sync_message_received(&mut test_node), + ) + .await + .unwrap(); + } else { + tokio::time::timeout( + SHORT_TIMEOUT, + recv_first_sync_message_received(&mut test_node), + ) + .await + .unwrap_err(); + } + + test_node.join().await; + }) + .await; +} + +// Check that `PeerManagerMessageExt::FirstSyncMessageReceived` will be sent only once even if +// many sync messages are received from the peer. +// 1) Send a few `HeaderListRequest` and/or `NewTransaction` messages (i.e. some legit messages that a peer can send +// in any number). +// 2) Expect that only one `PeerManagerNotification::MessageReceived` with `PeerManagerMessageExtTag::FirstSyncMessageReceived` +// is produced. +#[tracing::instrument(skip(seed))] +#[rstest] +#[trace] +#[case(Seed::from_entropy())] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn first_sync_message_received_must_be_sent_only_once(#[case] seed: Seed) { + let mut rng = make_seedable_rng(seed); + + run_with_timeout(async { + let time_getter = BasicTestTimeGetter::new(); + let chain_config = Arc::new(common::chain::config::create_unit_test_config()); + let p2p_config = Arc::new(test_p2p_config()); + + let mut test_node = TestNode::::start( + true, + time_getter.clone(), + Arc::clone(&chain_config), + ChainstateConfig::new(), + Arc::clone(&p2p_config), + TestTransportChannel::make_transport(), + TestTransportChannel::make_address().into(), + TEST_PROTOCOL_VERSION.into(), + None, + make_seedable_rng(rng.gen()), + ) + .await; + + let transport = TestTransportChannel::make_transport(); + let mut listener = + transport.bind(vec![TestTransportChannel::make_address()]).await.unwrap(); + + let connect_result_receiver = + test_node.start_connecting(listener.local_addresses().unwrap()[0].into()); + + let (stream, _) = listener.accept().await.unwrap(); + + let mut msg_stream = + BufferedTranscoder::new(stream, Some(*p2p_config.protocol_config.max_message_size)); + + let msg = msg_stream.recv().await.unwrap(); + assert_matches!(msg, Message::Handshake(HandshakeMessage::Hello { .. })); + + msg_stream + .send(Message::Handshake(HandshakeMessage::HelloAck { + protocol_version: TEST_PROTOCOL_VERSION.into(), + network: *chain_config.magic_bytes(), + user_agent: p2p_config.user_agent.clone(), + software_version: *chain_config.software_version(), + services: (*p2p_config.node_type).into(), + receiver_address: None, + current_time: P2pTimestamp::from_time(time_getter.get_time_getter().get_time()), + })) + .await + .unwrap(); + + connect_result_receiver.await.unwrap().unwrap(); + + for _ in 5..10 { + let msg = if rng.gen_bool(0.5) { + Message::HeaderListRequest(HeaderListRequest::new(Locator::new(vec![ + Id::random_using(&mut rng), + ]))) + } else { + Message::NewTransaction(Id::random_using(&mut rng)) + }; + + msg_stream.send(msg).await.unwrap(); + } + + tokio::time::timeout( + MEDIUM_TIMEOUT, + recv_first_sync_message_received(&mut test_node), + ) + .await + .unwrap(); + + tokio::time::timeout( + SHORT_TIMEOUT, + recv_first_sync_message_received(&mut test_node), + ) + .await + .unwrap_err(); + + test_node.join().await; + }) + .await; +} + +fn random_peer_addr(rng: &mut impl Rng) -> PeerAddress { + SocketAddr::V4(SocketAddrV4::new( + Ipv4Addr::new(rng.gen(), rng.gen(), rng.gen(), rng.gen()), + rng.gen(), + )) + .into() +} + +async fn recv_first_sync_message_received(test_node: &mut TestNode) { + while let Some(notif) = test_node.peer_mgr_notification_receiver().recv().await { + if let PeerManagerNotification::MessageReceived { + peer_id: _, + message_tag, + } = notif + { + match message_tag { + PeerManagerMessageExtTag::PeerManagerMessage(_) => {} + PeerManagerMessageExtTag::FirstSyncMessageReceived => { + break; + } + } + } + } +} diff --git a/test-utils/src/basic_test_time_getter.rs b/test-utils/src/basic_test_time_getter.rs index 4a2c66b9b4..1af2161f5c 100644 --- a/test-utils/src/basic_test_time_getter.rs +++ b/test-utils/src/basic_test_time_getter.rs @@ -46,6 +46,18 @@ impl BasicTestTimeGetter { self.current_time_millis.fetch_add(duration.as_millis() as u64); } + // Same as advance_time, except that if the passed duration's precision is not representable by + // the time getter (i.e. it's sub-millisecond), this function will round it up, ensuring that + // the resulting time is always greater-or-equal than the initial time plus the duration. + pub fn advance_time_rounded_up(&self, duration: Duration) { + let mut millis = duration.as_millis() as u64; + if duration.subsec_nanos() % 1_000_000 != 0 { + millis += 1; + } + + self.current_time_millis.fetch_add(millis); + } + pub fn is_same_instance(&self, other: &BasicTestTimeGetter) -> bool { Arc::ptr_eq(&self.current_time_millis, &other.current_time_millis) } diff --git a/utils/src/exp_rand/mod.rs b/utils/src/exp_rand/mod.rs index 4e8d93c214..72d88b7b2a 100644 --- a/utils/src/exp_rand/mod.rs +++ b/utils/src/exp_rand/mod.rs @@ -15,7 +15,9 @@ use randomness::Rng; -/// Returns a value sampled from an exponential distribution with a mean of 1.0 +/// Returns a value sampled from an exponential distribution with a mean of 1.0. +/// +/// The result will be in the range (0, max], where max = -ln(f64::MIN_POSITIVE) ~= 708.396418532264. pub fn exponential_rand(rng: &mut impl Rng) -> f64 { let mut random_f64 = rng.gen::(); // The generated number will be in the range [0, 1). Turn it into (0, 1) to avoid