Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ The format is loosely based on [Keep a Changelog](https://keepachangelog.com/en/
- Node bootstrapping:
- The format of the bootstrap file was changed and the legacy format is no longer supported.

- P2p:
- The logic of initiating new outbound connections has been improved to prevent the node from
constantly attempting to re-establish a connection with a peer that has banned it.

### Fixed
- P2p: when a peer sends a message that can't be decoded, it will now be discouraged (which is what
is normally done for misbehaving peers) and the node won't try connecting to it again.\
Expand Down
18 changes: 13 additions & 5 deletions dns-server/src/crawler_p2p/crawler_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use p2p::{
PingResponse,
},
net::{
types::{ConnectivityEvent, SyncingEvent},
types::{ConnectivityEvent, PeerManagerMessageExt, SyncingEvent},
ConnectivityService, NetworkingService, SyncingEventReceiver,
},
peer_manager::{
Expand Down Expand Up @@ -211,9 +211,17 @@ where
fn handle_conn_message(
&mut self,
peer_id: PeerId,
message: PeerManagerMessage,
message: PeerManagerMessageExt,
) -> p2p::Result<()> {
match message {
let peer_mgr_message = match message {
PeerManagerMessageExt::PeerManagerMessage(message) => message,
PeerManagerMessageExt::FirstSyncMessageReceived => {
// Ignored
return Ok(());
}
};

match peer_mgr_message {
PeerManagerMessage::AddrListRequest(_) => {
// Ignored
Ok(())
Expand All @@ -231,7 +239,7 @@ where
}
PeerManagerMessage::PingRequest(PingRequest { nonce }) => {
self.conn
.send_message(
.send_peer_manager_message(
peer_id,
PeerManagerMessage::PingResponse(PingResponse { nonce }),
)
Expand Down Expand Up @@ -382,7 +390,7 @@ where
CrawlerCommand::RequestAddresses { peer_id } => {
log::debug!("Requesting addresses from peer {peer_id}");

conn.send_message(
conn.send_peer_manager_message(
peer_id,
PeerManagerMessage::AddrListRequest(AddrListRequest {}),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ impl MockStateRef {
peer_id,
message: PeerManagerMessage::AnnounceAddrRequest(AnnounceAddrRequest {
address: announced_ip.as_peer_address(),
}),
})
.into(),
})
.unwrap();
}
Expand Down Expand Up @@ -271,7 +272,11 @@ impl ConnectivityService<MockNetworkingService> for MockConnectivityHandle {
Ok(())
}

fn send_message(&mut self, _peer_id: PeerId, _request: PeerManagerMessage) -> p2p::Result<()> {
fn send_peer_manager_message(
&mut self,
_peer_id: PeerId,
_request: PeerManagerMessage,
) -> p2p::Result<()> {
Ok(())
}

Expand Down
9 changes: 6 additions & 3 deletions p2p/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ use serialization::{Decode, Encode};

use crate::types::peer_address::PeerAddress;

#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, strum::EnumDiscriminants)]
#[strum_discriminants(name(BlockSyncMessageTag), derive(strum::EnumIter))]
pub enum BlockSyncMessage {
HeaderListRequest(HeaderListRequest),
BlockListRequest(BlockListRequest),
Expand All @@ -40,14 +41,16 @@ pub enum BlockSyncMessage {
TestSentinel(Id<()>),
}

#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, strum::EnumDiscriminants)]
#[strum_discriminants(name(TransactionSyncMessageTag), derive(strum::EnumIter))]
pub enum TransactionSyncMessage {
NewTransaction(Id<Transaction>),
TransactionRequest(Id<Transaction>),
TransactionResponse(TransactionResponse),
}

#[derive(Debug, Clone, PartialEq, Eq)]
#[derive(Debug, Clone, PartialEq, Eq, strum::EnumDiscriminants)]
#[strum_discriminants(name(PeerManagerMessageTag), derive(strum::EnumIter))]
pub enum PeerManagerMessage {
AddrListRequest(AddrListRequest),
AnnounceAddrRequest(AnnounceAddrRequest),
Expand Down
9 changes: 5 additions & 4 deletions p2p/src/net/default_backend/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ use crate::{
config::P2pConfig,
disconnection_reason::DisconnectionReason,
error::{DialError, P2pError, PeerError},
message::PeerManagerMessage,
net::{
default_backend::{
peer,
types::{BackendEvent, Command, PeerEvent},
},
types::{services::Services, ConnectivityEvent, PeerInfo, SyncingEvent},
types::{
services::Services, ConnectivityEvent, PeerInfo, PeerManagerMessageExt, SyncingEvent,
},
},
protocol::{ProtocolVersion, SupportedProtocolVersion},
types::{peer_address::PeerAddress, peer_id::PeerId},
Expand Down Expand Up @@ -613,7 +614,7 @@ where
Ok(())
}

PeerEvent::MessageReceived { message } => {
PeerEvent::MessageReceived(message) => {
if self.networking_enabled {
self.handle_message(peer_id, message)?;
}
Expand Down Expand Up @@ -696,7 +697,7 @@ where
fn handle_message(
&mut self,
peer_id: PeerId,
message: PeerManagerMessage,
message: PeerManagerMessageExt,
) -> crate::Result<()> {
// Do not process remaining messages if the peer has been forcibly disconnected (for example, after being banned).
// Without this check, the backend might send messages to the sync and peer managers after sending the disconnect notification.
Expand Down
6 changes: 5 additions & 1 deletion p2p/src/net/default_backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,11 @@ where
Ok(self.cmd_sender.send(types::Command::Disconnect { peer_id, reason })?)
}

fn send_message(&mut self, peer_id: PeerId, message: PeerManagerMessage) -> crate::Result<()> {
fn send_peer_manager_message(
&mut self,
peer_id: PeerId,
message: PeerManagerMessage,
) -> crate::Result<()> {
Ok(self.cmd_sender.send(types::Command::SendMessage {
peer_id,
message: message.into(),
Expand Down
55 changes: 43 additions & 12 deletions p2p/src/net/default_backend/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ use crate::{
disconnection_reason::DisconnectionReason,
error::{ConnectionValidationError, P2pError, PeerError, ProtocolError},
message::{BlockSyncMessage, TransactionSyncMessage, WillDisconnectMessage},
net::default_backend::types::{BackendEvent, PeerEvent},
net::{
default_backend::types::{BackendEvent, PeerEvent},
types::PeerManagerMessageExt,
},
protocol::{choose_common_protocol_version, ProtocolVersion, SupportedProtocolVersion},
types::peer_id::PeerId,
};
Expand Down Expand Up @@ -80,6 +83,10 @@ pub struct Peer<T: TransportSocket> {

/// Time getter
time_getter: TimeGetter,

/// Will be set to true once at least one BlockSyncMessage or TransactionSyncMessage has been
/// received from the peer.
sync_message_received: bool,
}

impl<T> Peer<T>
Expand Down Expand Up @@ -112,6 +119,7 @@ where
node_protocol_version,
time_getter,
common_protocol_version: None,
sync_message_received: false,
}
}

Expand Down Expand Up @@ -390,17 +398,17 @@ where
// Note: the channels used by this function to propagate messages to other parts of p2p
// must be bounded; this is important to prevent DoS attacks.
async fn handle_socket_msg(
&mut self,
peer_id: PeerId,
msg: Message,
peer_event_sender: &mut mpsc::Sender<PeerEvent>,
block_sync_msg_sender: &mut mpsc::Sender<BlockSyncMessage>,
transaction_sync_msg_sender: &mut mpsc::Sender<TransactionSyncMessage>,
block_sync_msg_sender: &mpsc::Sender<BlockSyncMessage>,
transaction_sync_msg_sender: &mpsc::Sender<TransactionSyncMessage>,
) -> crate::Result<()> {
match msg.categorize() {
CategorizedMessage::Handshake(_) => {
log::error!("Peer {peer_id} sent unexpected handshake message");

peer_event_sender
self.peer_event_sender
.send(PeerEvent::Misbehaved {
error: P2pError::ProtocolError(ProtocolError::UnexpectedMessage(
"Unexpected handshake message".to_owned(),
Expand All @@ -409,11 +417,35 @@ where
.await?;
}
CategorizedMessage::PeerManagerMessage(msg) => {
peer_event_sender.send(PeerEvent::MessageReceived { message: msg }).await?
self.peer_event_sender
.send(PeerEvent::MessageReceived(
PeerManagerMessageExt::PeerManagerMessage(msg),
))
.await?
}
CategorizedMessage::BlockSyncMessage(msg) => {
if !self.sync_message_received {
self.peer_event_sender
.send(PeerEvent::MessageReceived(
PeerManagerMessageExt::FirstSyncMessageReceived,
))
.await?;
self.sync_message_received = true;
}

block_sync_msg_sender.send(msg).await?;
}
CategorizedMessage::BlockSyncMessage(msg) => block_sync_msg_sender.send(msg).await?,
CategorizedMessage::TransactionSyncMessage(msg) => {
transaction_sync_msg_sender.send(msg).await?
if !self.sync_message_received {
self.peer_event_sender
.send(PeerEvent::MessageReceived(
PeerManagerMessageExt::FirstSyncMessageReceived,
))
.await?;
self.sync_message_received = true;
}

transaction_sync_msg_sender.send(msg).await?;
}
}

Expand Down Expand Up @@ -486,12 +518,11 @@ where
event = self.socket.recv(), if sync_msg_senders_opt.is_some() => match event {
Ok(message) => {
let sync_msg_senders = sync_msg_senders_opt.as_mut().expect("sync_msg_senders_opt is some");
Self::handle_socket_msg(
self.handle_socket_msg(
self.peer_id,
message,
&mut self.peer_event_sender,
&mut sync_msg_senders.0,
&mut sync_msg_senders.1,
&sync_msg_senders.0,
&sync_msg_senders.1,
).await?;
}
Err(err) => {
Expand Down
8 changes: 4 additions & 4 deletions p2p/src/net/default_backend/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::{
BlockSyncMessage, HeaderList, HeaderListRequest, PeerManagerMessage, PingRequest,
PingResponse, TransactionResponse, TransactionSyncMessage, WillDisconnectMessage,
},
net::types::services::Services,
net::types::{services::Services, PeerManagerMessageExt},
protocol::{ProtocolVersion, SupportedProtocolVersion},
types::{peer_address::PeerAddress, peer_id::PeerId},
};
Expand Down Expand Up @@ -109,7 +109,7 @@ pub mod peer_event {
}
}

/// Events sent by Peer to Backend
/// Events sent by `Peer` to `Backend`.
#[derive(Debug)]
pub enum PeerEvent {
/// Peer information received from remote
Expand All @@ -119,7 +119,7 @@ pub enum PeerEvent {
ConnectionClosed,

/// Message received from remote
MessageReceived { message: PeerManagerMessage },
MessageReceived(PeerManagerMessageExt),

/// Protocol violation
Misbehaved { error: P2pError },
Expand All @@ -135,7 +135,7 @@ pub enum PeerEvent {
},
}

/// Events sent by Backend to Peer
/// Events sent by `Backend` to `Peer`.
#[derive(Debug)]
pub enum BackendEvent {
Accepted {
Expand Down
13 changes: 6 additions & 7 deletions p2p/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,18 +115,17 @@ where
reason: Option<DisconnectionReason>,
) -> crate::Result<()>;

/// Sends a message to the given peer.
fn send_message(&mut self, peer: PeerId, message: PeerManagerMessage) -> crate::Result<()>;
/// Sends a peer manager message to the given peer.
fn send_peer_manager_message(
&mut self,
peer: PeerId,
message: PeerManagerMessage,
) -> crate::Result<()>;

/// Return the socket addresses of the network service provider
fn local_addresses(&self) -> &[SocketAddress];

/// Poll events from the network service provider
///
/// There are three types of events that can be received:
/// - incoming peer connections
/// - new discovered peers
/// - peer expiration events
async fn poll_next(&mut self) -> crate::Result<types::ConnectivityEvent>;
}

Expand Down
Loading
Loading