Skip to content

Commit cbe3b30

Browse files
committed
peer reputation module cleanup
1 parent 0530c9b commit cbe3b30

7 files changed

Lines changed: 272 additions & 514 deletions

File tree

dash-spv/src/client/chainlock.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,7 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
4141
.await
4242
{
4343
// Penalize the peer that relayed the invalid ChainLock
44-
let reason = format!("Invalid ChainLock: {}", e);
45-
self.network.penalize_peer_invalid_chainlock(peer_address, &reason).await;
44+
self.network.penalize_peer_invalid_chainlock(peer_address).await;
4645
return Err(SpvError::Validation(e));
4746
}
4847
}
@@ -110,7 +109,7 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
110109
tracing::warn!("{}", reason);
111110

112111
// Ban the peer using the reputation system
113-
self.network.penalize_peer_invalid_instantlock(peer_address, &reason).await;
112+
self.network.penalize_peer_invalid_instantlock(peer_address).await;
114113

115114
return Err(SpvError::Validation(e));
116115
}

dash-spv/src/client/lifecycle.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,7 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
294294
// Update storage with chain state including sync_base_height
295295
{
296296
let mut storage = self.storage.lock().await;
297+
297298
storage
298299
.store_headers_at_height(&[checkpoint_header], checkpoint.height)
299300
.await?;

dash-spv/src/network/manager.rs

Lines changed: 61 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@ use crate::network::addrv2::AddrV2Handler;
1717
use crate::network::constants::*;
1818
use crate::network::discovery::DnsDiscovery;
1919
use crate::network::pool::PeerPool;
20-
use crate::network::reputation::{
21-
misbehavior_scores, positive_scores, PeerReputationManager, ReputationAware,
22-
};
20+
use crate::network::reputation::{ChangeReason, PeerReputationManager};
2321
use crate::network::{
2422
HandshakeManager, Message, MessageDispatcher, MessageType, NetworkManager, Peer,
2523
};
@@ -43,7 +41,7 @@ pub struct PeerNetworkManager {
4341
/// Peer persistence
4442
peer_store: Arc<PersistentPeerStorage>,
4543
/// Peer reputation manager
46-
reputation_manager: Arc<PeerReputationManager>,
44+
reputation_manager: Arc<Mutex<PeerReputationManager>>,
4745
/// Network type
4846
network: Network,
4947
/// Shutdown token
@@ -80,11 +78,7 @@ impl PeerNetworkManager {
8078

8179
let peer_store = PersistentPeerStorage::open(data_dir.clone()).await?;
8280

83-
let reputation_manager = Arc::new(PeerReputationManager::new());
84-
85-
if let Err(e) = reputation_manager.load_from_storage(&peer_store).await {
86-
log::warn!("Failed to load peer reputation data: {}", e);
87-
}
81+
let reputation_manager = PeerReputationManager::load_or_new(&peer_store).await;
8882

8983
// Determine exclusive mode: either explicitly requested or peers were provided
9084
let exclusive_mode = config.restrict_to_configured_peers || !config.peers.is_empty();
@@ -94,7 +88,7 @@ impl PeerNetworkManager {
9488
discovery: Arc::new(discovery),
9589
addrv2_handler: Arc::new(AddrV2Handler::new()),
9690
peer_store: Arc::new(peer_store),
97-
reputation_manager,
91+
reputation_manager: Arc::new(Mutex::new(reputation_manager)),
9892
network: config.network,
9993
shutdown_token: CancellationToken::new(),
10094
tasks: Arc::new(Mutex::new(JoinSet::new())),
@@ -175,7 +169,7 @@ impl PeerNetworkManager {
175169
/// Connect to a specific peer
176170
async fn connect_to_peer(&self, addr: SocketAddr) {
177171
// Check reputation first
178-
if !self.reputation_manager.should_connect_to_peer(&addr).await {
172+
if !self.reputation_manager.lock().await.should_connect_to_peer(&addr).await {
179173
log::warn!("Not connecting to {} due to bad reputation", addr);
180174
return;
181175
}
@@ -191,7 +185,7 @@ impl PeerNetworkManager {
191185
}
192186

193187
// Record connection attempt
194-
self.reputation_manager.record_connection_attempt(addr).await;
188+
self.reputation_manager.lock().await.record_connection_attempt(addr).await;
195189

196190
let pool = self.pool.clone();
197191
let network = self.network;
@@ -218,9 +212,6 @@ impl PeerNetworkManager {
218212
Ok(_) => {
219213
log::info!("Successfully connected to {}", addr);
220214

221-
// Record successful connection
222-
reputation_manager.record_successful_connection(addr).await;
223-
224215
// Add to pool
225216
if let Err(e) = pool.add_peer(addr, peer).await {
226217
log::error!("Failed to add peer to pool: {}", e);
@@ -250,11 +241,9 @@ impl PeerNetworkManager {
250241
log::warn!("Handshake failed with {}: {}", addr, e);
251242
// Update reputation for handshake failure
252243
reputation_manager
253-
.update_reputation(
254-
addr,
255-
misbehavior_scores::INVALID_MESSAGE,
256-
"Handshake failed",
257-
)
244+
.lock()
245+
.await
246+
.update_reputation(addr, ChangeReason::HandshakeFailure)
258247
.await;
259248
// For handshake failures, try again later
260249
tokio::time::sleep(RECONNECT_DELAY).await;
@@ -265,11 +254,9 @@ impl PeerNetworkManager {
265254
log::debug!("Failed to connect to {}: {}", addr, e);
266255
// Minor reputation penalty for connection failure
267256
reputation_manager
268-
.update_reputation(
269-
addr,
270-
misbehavior_scores::TIMEOUT / 2,
271-
"Connection failed",
272-
)
257+
.lock()
258+
.await
259+
.update_reputation(addr, ChangeReason::ConnectionFailure)
273260
.await;
274261
}
275262
}
@@ -283,7 +270,7 @@ impl PeerNetworkManager {
283270
pool: Arc<PeerPool>,
284271
addrv2_handler: Arc<AddrV2Handler>,
285272
shutdown_token: CancellationToken,
286-
reputation_manager: Arc<PeerReputationManager>,
273+
reputation_manager: Arc<Mutex<PeerReputationManager>>,
287274
connected_peer_count: Arc<AtomicUsize>,
288275
headers2_disabled: Arc<Mutex<HashSet<SocketAddr>>>,
289276
message_dispatcher: Arc<Mutex<MessageDispatcher>>,
@@ -457,11 +444,9 @@ impl PeerNetworkManager {
457444
headers2_disabled.lock().await.insert(addr);
458445
// Apply reputation penalty
459446
reputation_manager
460-
.update_reputation(
461-
addr,
462-
misbehavior_scores::INVALID_MESSAGE,
463-
"Headers2 decompression failed",
464-
)
447+
.lock()
448+
.await
449+
.update_reputation(addr, ChangeReason::InvalidHeaders2)
465450
.await;
466451
continue; // Don't forward corrupted message
467452
}
@@ -515,11 +500,9 @@ impl PeerNetworkManager {
515500
log::debug!("Timeout reading from {}, continuing...", addr);
516501
// Minor reputation penalty for timeout
517502
reputation_manager
518-
.update_reputation(
519-
addr,
520-
misbehavior_scores::TIMEOUT,
521-
"Read timeout",
522-
)
503+
.lock()
504+
.await
505+
.update_reputation(addr, ChangeReason::Timeout)
523506
.await;
524507
continue;
525508
}
@@ -537,10 +520,11 @@ impl PeerNetworkManager {
537520
);
538521
// Reputation penalty for invalid data
539522
reputation_manager
523+
.lock()
524+
.await
540525
.update_reputation(
541526
addr,
542-
misbehavior_scores::INVALID_TRANSACTION,
543-
"Invalid transaction type in block",
527+
ChangeReason::InvalidTransaction,
544528
)
545529
.await;
546530
} else if error_msg
@@ -599,7 +583,9 @@ impl PeerNetworkManager {
599583
if conn_duration > Duration::from_secs(3600) {
600584
// 1 hour
601585
reputation_manager
602-
.update_reputation(addr, positive_scores::LONG_UPTIME, "Long connection uptime")
586+
.lock()
587+
.await
588+
.update_reputation(addr, ChangeReason::LongUptime)
603589
.await;
604590
}
605591
});
@@ -676,7 +662,7 @@ impl PeerNetworkManager {
676662
let known = addrv2_handler.get_known_addresses().await;
677663
let needed = TARGET_PEERS.saturating_sub(count);
678664
// Select best peers based on reputation
679-
let best_peers = reputation_manager.select_best_peers(known, needed * 2).await;
665+
let best_peers = reputation_manager.lock().await.select_best_peers(known, needed * 2).await;
680666
let mut attempted = 0;
681667

682668
for addr in best_peers {
@@ -750,10 +736,9 @@ impl PeerNetworkManager {
750736
if let Err(e) = peer_guard.send_ping().await {
751737
log::error!("Failed to ping {}: {}", addr, e);
752738
// Update reputation for ping failure
753-
reputation_manager.update_reputation(
739+
reputation_manager.lock().await.update_reputation(
754740
addr,
755-
misbehavior_scores::TIMEOUT,
756-
"Ping failed",
741+
ChangeReason::PingFailure,
757742
).await;
758743
}
759744
}
@@ -770,7 +755,7 @@ impl PeerNetworkManager {
770755
}
771756

772757
// Save reputation data periodically
773-
if let Err(e) = reputation_manager.save_to_storage(&*peer_store).await {
758+
if let Err(e) = reputation_manager.lock().await.save_to_storage(&*peer_store).await {
774759
log::warn!("Failed to save reputation data: {}", e);
775760
}
776761
}
@@ -994,8 +979,9 @@ impl PeerNetworkManager {
994979

995980
/// Get reputation information for all peers
996981
pub async fn get_peer_reputations(&self) -> HashMap<SocketAddr, (i32, bool)> {
997-
let reputations = self.reputation_manager.get_all_reputations().await;
998-
reputations.into_iter().map(|(addr, rep)| (addr, (rep.score, rep.is_banned()))).collect()
982+
let mut lock = self.reputation_manager.lock().await;
983+
let reputations = lock.get_all_reputations().await;
984+
reputations.iter().map(|(addr, rep)| (*addr, (rep.score(), rep.is_banned()))).collect()
999985
}
1000986

1001987
/// Ban a specific peer manually
@@ -1007,19 +993,17 @@ impl PeerNetworkManager {
1007993

1008994
// Update reputation to trigger ban
1009995
self.reputation_manager
1010-
.update_reputation(
1011-
*addr,
1012-
misbehavior_scores::INVALID_HEADER * 2, // Severe penalty
1013-
reason,
1014-
)
996+
.lock()
997+
.await
998+
.update_reputation(*addr, ChangeReason::ManuallyBanned)
1015999
.await;
10161000

10171001
Ok(())
10181002
}
10191003

10201004
/// Unban a specific peer
10211005
pub async fn unban_peer(&self, addr: &SocketAddr) {
1022-
self.reputation_manager.unban_peer(addr).await;
1006+
self.reputation_manager.lock().await.unban_peer(addr).await;
10231007
}
10241008

10251009
/// Shutdown the network manager
@@ -1036,7 +1020,9 @@ impl PeerNetworkManager {
10361020
}
10371021

10381022
// Save reputation data before shutdown
1039-
if let Err(e) = self.reputation_manager.save_to_storage(&*self.peer_store).await {
1023+
if let Err(e) =
1024+
self.reputation_manager.lock().await.save_to_storage(&*self.peer_store).await
1025+
{
10401026
log::warn!("Failed to save reputation data on shutdown: {}", e);
10411027
}
10421028

@@ -1131,64 +1117,62 @@ impl NetworkManager for PeerNetworkManager {
11311117
} // end match
11321118
} // end send_message
11331119

1134-
async fn penalize_peer(&self, address: SocketAddr, score_change: i32, reason: &str) {
1135-
self.reputation_manager.update_reputation(address, score_change, reason).await;
1120+
async fn penalize_peer(&self, address: SocketAddr, reason: ChangeReason) {
1121+
self.reputation_manager.lock().await.update_reputation(address, reason).await;
11361122
}
11371123

1138-
async fn penalize_peer_invalid_chainlock(&self, address: SocketAddr, reason: &str) {
1139-
match self.disconnect_peer(&address, reason).await {
1124+
async fn penalize_peer_invalid_chainlock(&self, address: SocketAddr) {
1125+
match self.disconnect_peer(&address, &ChangeReason::InvalidChainLock.to_string()).await {
11401126
Ok(()) => {
1141-
log::warn!(
1142-
"Peer {} disconnected for invalid ChainLock enforcement: {}",
1143-
address,
1144-
reason
1145-
);
1127+
log::warn!("Peer {} disconnected for invalid ChainLock enforcement", address,);
11461128
}
11471129
Err(err) => {
11481130
log::error!(
1149-
"Failed to disconnect peer {} after invalid ChainLock enforcement ({}): {}",
1131+
"Failed to disconnect peer {} after invalid ChainLock enforcement: {}",
11501132
address,
1151-
reason,
11521133
err
11531134
);
11541135
}
11551136
}
11561137

11571138
// Apply misbehavior score and a short temporary ban
11581139
self.reputation_manager
1159-
.update_reputation(address, misbehavior_scores::INVALID_CHAINLOCK, reason)
1140+
.lock()
1141+
.await
1142+
.update_reputation(address, ChangeReason::InvalidChainLock)
11601143
.await;
11611144

11621145
// Short ban: 10 minutes for relaying invalid ChainLock
11631146
self.reputation_manager
1164-
.temporary_ban_peer(address, Duration::from_secs(10 * 60), reason)
1147+
.lock()
1148+
.await
1149+
.temporary_ban_peer(address, Duration::from_secs(10 * 60))
11651150
.await;
11661151
}
11671152

1168-
async fn penalize_peer_invalid_instantlock(&self, address: SocketAddr, reason: &str) {
1153+
async fn penalize_peer_invalid_instantlock(&self, address: SocketAddr) {
11691154
// Apply misbehavior score and a short temporary ban
11701155
self.reputation_manager
1171-
.update_reputation(address, misbehavior_scores::INVALID_INSTANTLOCK, reason)
1156+
.lock()
1157+
.await
1158+
.update_reputation(address, ChangeReason::InvalidInstantLock)
11721159
.await;
11731160

11741161
// Short ban: 10 minutes for relaying invalid InstantLock
11751162
self.reputation_manager
1176-
.temporary_ban_peer(address, Duration::from_secs(10 * 60), reason)
1163+
.lock()
1164+
.await
1165+
.temporary_ban_peer(address, Duration::from_secs(10 * 60))
11771166
.await;
11781167

1179-
match self.disconnect_peer(&address, reason).await {
1168+
match self.disconnect_peer(&address, &ChangeReason::InvalidInstantLock.to_string()).await {
11801169
Ok(()) => {
1181-
log::warn!(
1182-
"Peer {} disconnected for invalid InstantLock enforcement: {}",
1183-
address,
1184-
reason
1185-
);
1170+
log::warn!("Peer {} disconnected for invalid InstantLock enforcement", address,);
11861171
}
11871172
Err(err) => {
11881173
log::error!(
1189-
"Failed to disconnect peer {} after invalid InstantLock enforcement ({}): {}",
1174+
"Failed to disconnect peer {} after invalid InstantLock enforcement: {}",
11901175
address,
1191-
reason,
11921176
err
11931177
);
11941178
}

0 commit comments

Comments
 (0)