From 7acd86f1f4821bd4bf8419351c1a0ef8826cd081 Mon Sep 17 00:00:00 2001 From: xdustinface Date: Mon, 2 Mar 2026 09:15:45 +0700 Subject: [PATCH 1/4] fix: avoid extra `GetHeaders` after post-sync header processing When synced, `handle_headers_pipeline` always calls `send_pending()` after receiving each new block header, triggering a follow-up `GetHeaders` that always returned empty. This causes lot of unnecessary round-trips and leaves the tip segment in a pending state that rejected the next actual header as "unrequested headers". Use `tip_was_complete` to distinguish unsolicited headers from active catch-up responses. Only skip `send_pending()` and mark the tip complete for the unsolicited case, so multi-batch catch-up still works correctly. --- dash-spv/src/sync/block_headers/manager.rs | 60 +++++++++++++-- dash-spv/src/sync/block_headers/pipeline.rs | 82 +++++++++++++++++++++ dash/src/test_utils/block.rs | 21 ++++++ 3 files changed, 158 insertions(+), 5 deletions(-) diff --git a/dash-spv/src/sync/block_headers/manager.rs b/dash-spv/src/sync/block_headers/manager.rs index 592e07f67..b44eee004 100644 --- a/dash-spv/src/sync/block_headers/manager.rs +++ b/dash-spv/src/sync/block_headers/manager.rs @@ -127,6 +127,7 @@ impl BlockHeadersManager { } let was_syncing = self.state() == SyncState::Syncing; + let tip_was_complete = self.pipeline.is_tip_complete(); // Route headers to the pipeline, validates checkpoint match. let matched = self.pipeline.receive_headers(headers)?; @@ -138,10 +139,13 @@ impl BlockHeadersManager { ); } - // Send more requests if capacity available - let sent = self.pipeline.send_pending(requests)?; - if sent > 0 { - tracing::debug!("Pipeline sent {} more requests", sent); + // Send more requests during initial sync or active post-sync catch-up. + // Skip for unsolicited headers. + if was_syncing || !tip_was_complete { + let sent = self.pipeline.send_pending(requests)?; + if sent > 0 { + tracing::debug!("Pipeline sent {} more requests", sent); + } } // Process ready-to-store segments @@ -176,6 +180,12 @@ impl BlockHeadersManager { } } + // After storing unsolicited post-sync headers, mark the tip complete so the next header goes through + // the clean reset path. Don't mark complete during active catch-up. + if !was_syncing && tip_was_complete && !events.is_empty() { + self.pipeline.mark_tip_complete(); + } + if was_syncing && self.pipeline.is_complete() { // If blocks were announced during sync, request them before finalizing the sync if !self.pending_announcements.is_empty() { @@ -241,11 +251,12 @@ impl BlockHeadersManager { mod tests { use super::*; use crate::chain::checkpoints::testnet_checkpoints; - use crate::network::MessageType; + use crate::network::{MessageType, NetworkRequest, RequestSender}; use crate::storage::{ DiskStorageManager, PersistentBlockHeaderStorage, PersistentMetadataStorage, StorageManager, }; use crate::sync::{ManagerIdentifier, SyncManagerProgress}; + use tokio::sync::mpsc::unbounded_channel; type TestBlockHeadersManager = BlockHeadersManager; @@ -298,4 +309,43 @@ mod tests { assert!(!manager.pipeline.is_initialized()); assert_eq!(manager.pipeline.segment_count(), 0); } + + fn create_test_request_sender( + ) -> (RequestSender, tokio::sync::mpsc::UnboundedReceiver) { + let (tx, rx) = unbounded_channel(); + (RequestSender::new(tx), rx) + } + + #[tokio::test] + async fn test_unsolicited_post_sync_header_does_not_trigger_get_headers() { + let mut manager = create_test_manager().await; + let tip = manager.tip().await.unwrap(); + let tip_hash = *tip.hash(); + + // Simulate completed sync: pipeline initialized with tip segment marked complete + manager.pipeline.init(0, tip_hash, 0); + manager.pipeline.mark_tip_complete(); + manager.progress.set_state(SyncState::Synced); + + let (sender, mut rx) = create_test_request_sender(); + + let header = Header::dummy_chain(1, tip_hash).remove(0); + + let events = manager.handle_headers_pipeline(&[header], &sender).await.unwrap(); + + // Header should have been stored + assert_eq!(events.len(), 1); + assert!(matches!( + events[0], + SyncEvent::BlockHeadersStored { + tip_height: 1 + } + )); + + // No GetHeaders request should have been sent + assert!(rx.try_recv().is_err()); + + // Tip segment marked complete again for the next unsolicited header + assert!(manager.pipeline.is_tip_complete()); + } } diff --git a/dash-spv/src/sync/block_headers/pipeline.rs b/dash-spv/src/sync/block_headers/pipeline.rs index cf516f4bf..12af2f41c 100644 --- a/dash-spv/src/sync/block_headers/pipeline.rs +++ b/dash-spv/src/sync/block_headers/pipeline.rs @@ -192,6 +192,14 @@ impl HeadersPipeline { } } + // Check if these are duplicate headers from another peer. The first + // header's hash matches a segment's current tip, meaning we already have it. + let first_hash = headers[0].block_hash(); + if self.segments.iter().any(|s| s.current_tip_hash == first_hash) { + tracing::debug!("Ignoring duplicate header {} from another peer", first_hash); + return Ok(None); + } + tracing::warn!( "Received {} headers with prev_hash {} but no segment matched", headers.len(), @@ -270,6 +278,26 @@ impl HeadersPipeline { self.initialized } + /// Check if the tip segment is currently marked complete. + pub(super) fn is_tip_complete(&self) -> bool { + self.segments.iter().any(|s| s.target_height.is_none() && s.complete) + } + + /// Mark the tip segment as complete. + pub(super) fn mark_tip_complete(&mut self) { + for segment in &mut self.segments { + if segment.target_height.is_none() && !segment.complete { + segment.complete = true; + tracing::debug!( + "Tip segment {} marked complete at height {}", + segment.segment_id, + segment.current_height + ); + break; + } + } + } + /// Reset the tip segment for continued syncing after initial sync completes. /// This allows the pipeline to be reused for post-sync header updates. /// Returns true if the tip segment was reset, false if not found or not complete. @@ -475,4 +503,58 @@ mod tests { // Segment 1 should have the header assert_eq!(pipeline.segments[1].buffered_headers.len(), 1); } + + #[test] + fn test_duplicate_headers_from_another_peer_are_ignored() { + let tip_hash = BlockHash::dummy(50); + + let mut tip_seg = SegmentState::new(0, 1000, tip_hash, None, None); + tip_seg.current_height = 1001; + // Simulate that we already received a header and advanced the tip + let mut first_header = Header::dummy(1); + first_header.prev_blockhash = tip_hash; + let new_tip_hash = first_header.block_hash(); + tip_seg.current_tip_hash = new_tip_hash; + + let cm = create_test_checkpoint_manager(true); + let mut pipeline = HeadersPipeline::new(cm); + pipeline.initialized = true; + pipeline.segments = vec![tip_seg]; + + // Another peer sends the same header (prev_blockhash is old tip, first + // header hash matches the segment's current tip) + let matched = pipeline.receive_headers(&[first_header]).unwrap(); + assert_eq!(matched, None, "Duplicate headers should be silently ignored"); + assert!(pipeline.segments[0].buffered_headers.is_empty()); + } + + #[test] + fn test_tip_complete_lifecycle() { + let cm = create_test_checkpoint_manager(true); + let mut pipeline = HeadersPipeline::new(cm); + pipeline.initialized = true; + + // No tip segment → not complete + let checkpoint_seg = SegmentState::new(0, 0, BlockHash::dummy(0), Some(100), None); + pipeline.segments = vec![checkpoint_seg]; + assert!(!pipeline.is_tip_complete()); + + // Add a complete tip segment + let mut tip_seg = SegmentState::new(1, 500, BlockHash::dummy(77), None, None); + tip_seg.complete = true; + pipeline.segments.push(tip_seg); + assert!(pipeline.is_tip_complete()); + + // mark_tip_complete is a no-op when already complete + pipeline.mark_tip_complete(); + assert!(pipeline.is_tip_complete()); + + // Simulate receive_headers resetting the tip segment + pipeline.segments[1].complete = false; + assert!(!pipeline.is_tip_complete()); + + // mark_tip_complete restores the state after storing headers + pipeline.mark_tip_complete(); + assert!(pipeline.is_tip_complete()); + } } diff --git a/dash/src/test_utils/block.rs b/dash/src/test_utils/block.rs index 645c279e6..93a2ae497 100644 --- a/dash/src/test_utils/block.rs +++ b/dash/src/test_utils/block.rs @@ -40,6 +40,27 @@ impl Header { pub fn dummy_batch(height_range: Range) -> Vec { height_range.map(Self::dummy).collect() } + + /// Create a chain of headers where each header's `prev_blockhash` is the + /// actual `block_hash()` of the previous one. Uses an easy PoW target so + /// headers pass validation. + pub fn dummy_chain(count: usize, prev_blockhash: BlockHash) -> Vec { + let mut headers = Vec::with_capacity(count); + let mut prev = prev_blockhash; + for i in 0..count { + let header = Header { + version: Version::ONE, + prev_blockhash: prev, + merkle_root: TxMerkleNode::from_byte_array([0u8; 32]), + time: i as u32, + bits: CompactTarget::from_consensus(0x2100ffff), + nonce: i as u32, + }; + prev = header.block_hash(); + headers.push(header); + } + headers + } } impl BlockFilter { From 993e56bfb85942484312fe3867a806a6b13e18ef Mon Sep 17 00:00:00 2001 From: xdustinface Date: Tue, 3 Mar 2026 15:58:09 +0700 Subject: [PATCH 2/4] refactor: extract shared `send_message_to_peer` helper MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Deduplicate `GetHeaders` → `GetHeaders2` upgrade logic from `send_to_single_peer` and `send_distributed` into a shared `send_message_to_peer` method. --- dash-spv/src/network/manager.rs | 84 +++++++++------------------------ 1 file changed, 22 insertions(+), 62 deletions(-) diff --git a/dash-spv/src/network/manager.rs b/dash-spv/src/network/manager.rs index ee7153ef1..015a772d5 100644 --- a/dash-spv/src/network/manager.rs +++ b/dash-spv/src/network/manager.rs @@ -6,7 +6,7 @@ use std::path::PathBuf; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; -use tokio::sync::{broadcast, Mutex}; +use tokio::sync::{broadcast, Mutex, RwLock}; use tokio::task::JoinSet; use tokio::time; @@ -1068,54 +1068,7 @@ impl PeerNetworkManager { .find(|(a, _)| *a == selected_peer) .ok_or_else(|| NetworkError::ConnectionFailed("Selected peer not found".to_string()))?; - // Upgrade GetHeaders to GetHeaders2 if this specific peer supports it and not disabled - let peer_supports_headers2 = { - let peer_guard = peer.read().await; - peer_guard.can_request_headers2() - }; - let message = match message { - NetworkMessage::GetHeaders(get_headers) - if !self.headers2_disabled.lock().await.contains(addr) - && peer_supports_headers2 => - { - log::debug!( - "Upgrading GetHeaders to GetHeaders2 for peer {}: {:?}", - addr, - get_headers - ); - NetworkMessage::GetHeaders2(get_headers) - } - other => other, - }; - // Reduce verbosity for common sync messages - match &message { - NetworkMessage::GetHeaders(_) - | NetworkMessage::GetCFilters(_) - | NetworkMessage::GetCFHeaders(_) => { - log::debug!("Sending {} to {}", message.cmd(), addr); - } - NetworkMessage::GetHeaders2(gh2) => { - log::info!("📤 Sending GetHeaders2 to {} - version: {}, locator_count: {}, locator: {:?}, stop: {}", - addr, - gh2.version, - gh2.locator_hashes.len(), - gh2.locator_hashes.iter().take(2).collect::>(), - gh2.stop_hash - ); - } - NetworkMessage::SendHeaders2 => { - log::info!("🤝 Sending SendHeaders2 to {} - requesting compressed headers", addr); - } - _ => { - log::trace!("Sending {:?} to {}", message.cmd(), addr); - } - } - - let mut peer_guard = peer.write().await; - peer_guard - .send_message(message) - .await - .map_err(|e| NetworkError::ProtocolError(format!("Failed to send to {}: {}", addr, e))) + self.send_message_to_peer(addr, peer, message).await } /// Send a message distributed across connected peers using round-robin selection. @@ -1185,14 +1138,28 @@ impl PeerNetworkManager { let idx = self.round_robin_counter.fetch_add(1, Ordering::Relaxed) % selected_peers.len(); let (addr, peer) = &selected_peers[idx]; - // Upgrade GetHeaders to GetHeaders2 if peer supports it + log::debug!( + "Distributing {} request to peer {} (round-robin idx {})", + message.cmd(), + addr, + idx + ); + + self.send_message_to_peer(addr, peer, message).await + } + + /// Send a message to the given peer. + /// For GetHeaders messages upgrade to GetHeaders2 if the peer supports it. + async fn send_message_to_peer( + &self, + addr: &SocketAddr, + peer: &Arc>, + message: NetworkMessage, + ) -> NetworkResult<()> { let message = match message { NetworkMessage::GetHeaders(get_headers) => { - let peer_supports_headers2 = { - let peer_guard = peer.read().await; - peer_guard.can_request_headers2() - }; - if peer_supports_headers2 && !self.headers2_disabled.lock().await.contains(addr) { + let supports_headers2 = peer.read().await.can_request_headers2(); + if supports_headers2 && !self.headers2_disabled.lock().await.contains(addr) { log::debug!("Upgrading GetHeaders to GetHeaders2 for peer {}", addr); NetworkMessage::GetHeaders2(get_headers) } else { @@ -1202,13 +1169,6 @@ impl PeerNetworkManager { other => other, }; - log::debug!( - "Distributing {} request to peer {} (round-robin idx {})", - message.cmd(), - addr, - idx - ); - let mut peer_guard = peer.write().await; peer_guard .send_message(message) From 4140d31dcc8ad0eb0db3f2231c38542dfab9cf9a Mon Sep 17 00:00:00 2001 From: xdustinface Date: Tue, 3 Mar 2026 16:00:14 +0700 Subject: [PATCH 3/4] feat: add `request_block_headers_from_peer` to `RequestSender` Add `SendMessageToPeer` variant to `NetworkRequest` and `request_block_headers_from_peer` to `RequestSender` so sync managers can request block headers from specific peers. --- dash-spv/src/network/manager.rs | 21 ++++++++++++++++ dash-spv/src/network/mod.rs | 25 ++++++++++++++++++- dash-spv/src/network/tests.rs | 35 +++++++++++++++++++++++++++ dash-spv/src/sync/filters/pipeline.rs | 4 ++- 4 files changed, 83 insertions(+), 2 deletions(-) diff --git a/dash-spv/src/network/manager.rs b/dash-spv/src/network/manager.rs index 015a772d5..ae58749cb 100644 --- a/dash-spv/src/network/manager.rs +++ b/dash-spv/src/network/manager.rs @@ -791,6 +791,15 @@ impl PeerNetworkManager { } }); } + Some(NetworkRequest::SendMessageToPeer(addr, msg)) => { + log::debug!("Request processor: sending {} to {}", msg.cmd(), addr); + let this = this.clone(); + tokio::spawn(async move { + if let Err(e) = this.send_to_specific_peer(addr, msg).await { + log::error!("Request processor: failed to send to {}: {}", addr, e); + } + }); + } None => { log::info!("Request processor: channel closed"); break; @@ -1148,6 +1157,18 @@ impl PeerNetworkManager { self.send_message_to_peer(addr, peer, message).await } + /// Send a message to a specific peer by address. + async fn send_to_specific_peer( + &self, + addr: SocketAddr, + message: NetworkMessage, + ) -> NetworkResult<()> { + let peer = self.pool.get_peer(&addr).await.ok_or_else(|| { + NetworkError::ConnectionFailed(format!("Peer {} not connected", addr)) + })?; + self.send_message_to_peer(&addr, &peer, message).await + } + /// Send a message to the given peer. /// For GetHeaders messages upgrade to GetHeaders2 if the peer supports it. async fn send_message_to_peer( diff --git a/dash-spv/src/network/mod.rs b/dash-spv/src/network/mod.rs index 70a12b477..2f6d9882b 100644 --- a/dash-spv/src/network/mod.rs +++ b/dash-spv/src/network/mod.rs @@ -43,8 +43,10 @@ const FILTER_TYPE_DEFAULT: u8 = 0; /// Request to send to network. #[derive(Debug)] pub enum NetworkRequest { - /// Send a message to the network. + /// Send a message to the network (distributed across peers). SendMessage(NetworkMessage), + /// Send a message to a specific peer by address. + SendMessageToPeer(SocketAddr, NetworkMessage), } /// Handle for managers to queue outgoing network requests. @@ -68,6 +70,13 @@ impl RequestSender { .map_err(|e| NetworkError::ProtocolError(e.to_string())) } + /// Queue a message to be sent to a specific peer by address. + fn send_message_to_peer(&self, address: SocketAddr, msg: NetworkMessage) -> NetworkResult<()> { + self.tx + .send(NetworkRequest::SendMessageToPeer(address, msg)) + .map_err(|e| NetworkError::ProtocolError(e.to_string())) + } + pub fn request_inventory(&self, inventory: Vec) -> NetworkResult<()> { self.send_message(NetworkMessage::GetData(inventory)) } @@ -79,6 +88,20 @@ impl RequestSender { ))) } + pub fn request_block_headers_from_peer( + &self, + start_hash: BlockHash, + address: SocketAddr, + ) -> NetworkResult<()> { + self.send_message_to_peer( + address, + NetworkMessage::GetHeaders(GetHeadersMessage::new( + vec![start_hash], + BlockHash::all_zeros(), + )), + ) + } + pub fn request_filter_headers( &self, start_height: u32, diff --git a/dash-spv/src/network/tests.rs b/dash-spv/src/network/tests.rs index ffb992fd1..8f4ba5f1f 100644 --- a/dash-spv/src/network/tests.rs +++ b/dash-spv/src/network/tests.rs @@ -37,3 +37,38 @@ mod pool_tests { // Verify pool limits indirectly through methods; avoid constant assertions } } + +#[cfg(test)] +mod request_sender_tests { + use crate::network::{NetworkRequest, RequestSender}; + use dashcore::network::message::NetworkMessage; + use tokio::sync::mpsc; + + #[test] + fn test_send_message_to_peer_queues_correct_variant() { + let (tx, mut rx) = mpsc::unbounded_channel(); + let sender = RequestSender::new(tx); + let addr = "192.168.1.1:9999".parse().unwrap(); + let msg = NetworkMessage::Verack; + + sender.send_message_to_peer(addr, msg).unwrap(); + + let request = rx.try_recv().unwrap(); + let NetworkRequest::SendMessageToPeer(recv_addr, recv_msg) = request else { + panic!("Expected SendMessageToPeer variant"); + }; + assert_eq!(recv_addr, addr); + assert!(matches!(recv_msg, NetworkMessage::Verack)); + } + + #[test] + fn test_send_message_to_peer_returns_error_on_closed_channel() { + let (tx, rx) = mpsc::unbounded_channel(); + let sender = RequestSender::new(tx); + drop(rx); + + let addr = "192.168.1.1:9999".parse().unwrap(); + let result = sender.send_message_to_peer(addr, NetworkMessage::Verack); + assert!(result.is_err()); + } +} diff --git a/dash-spv/src/sync/filters/pipeline.rs b/dash-spv/src/sync/filters/pipeline.rs index 6906afef1..e83e0ad65 100644 --- a/dash-spv/src/sync/filters/pipeline.rs +++ b/dash-spv/src/sync/filters/pipeline.rs @@ -785,7 +785,9 @@ mod tests { // Verify message was sent let request = rx.try_recv().unwrap(); - let NetworkRequest::SendMessage(msg) = request; + let NetworkRequest::SendMessage(msg) = request else { + panic!("Expected SendMessage"); + }; if let NetworkMessage::GetCFilters(gcf) = msg { assert_eq!(gcf.start_height, 0); assert_eq!(gcf.filter_type, 0); From 527f938fbd0cf4e2428d6698b303b6a49a8e38c5 Mon Sep 17 00:00:00 2001 From: xdustinface Date: Tue, 3 Mar 2026 12:11:14 +0700 Subject: [PATCH 4/4] fix: announce tip to new peers when synced When already synced, send a `GetHeaders` with our tip locator to each newly connected peer. Without this, Dash Core never learns our best-known header and falls back to inv announcements instead of pushing unsolicited header messages. --- dash-spv/src/sync/block_headers/manager.rs | 98 ++++++++++++++++++- .../src/sync/block_headers/sync_manager.rs | 76 ++++++++------ 2 files changed, 141 insertions(+), 33 deletions(-) diff --git a/dash-spv/src/sync/block_headers/manager.rs b/dash-spv/src/sync/block_headers/manager.rs index b44eee004..08b0a8bf2 100644 --- a/dash-spv/src/sync/block_headers/manager.rs +++ b/dash-spv/src/sync/block_headers/manager.rs @@ -6,7 +6,8 @@ //! Uses HeadersPipeline for parallel downloads across checkpoint-defined segments //! during initial sync. The same pipeline is reused for post-sync updates. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; +use std::net::SocketAddr; use std::sync::Arc; use std::time::Instant; @@ -41,6 +42,9 @@ pub struct BlockHeadersManager { pub(super) pipeline: HeadersPipeline, /// Pending block announcements waiting for headers message (post-sync). pub(super) pending_announcements: HashMap, + /// Peers we've sent a GetHeaders to after sync, so Dash Core knows our tip + /// and can send us header announcements instead of inv. + pub(super) announced_peers: HashSet, } impl std::fmt::Debug for BlockHeadersManager { @@ -83,6 +87,7 @@ impl BlockHeadersManager { metadata_storage, pipeline: HeadersPipeline::new(checkpoint_manager), pending_announcements: HashMap::new(), + announced_peers: HashSet::new(), }) } @@ -251,12 +256,12 @@ impl BlockHeadersManager { mod tests { use super::*; use crate::chain::checkpoints::testnet_checkpoints; - use crate::network::{MessageType, NetworkRequest, RequestSender}; + use crate::network::{MessageType, NetworkEvent, NetworkRequest, RequestSender}; use crate::storage::{ DiskStorageManager, PersistentBlockHeaderStorage, PersistentMetadataStorage, StorageManager, }; - use crate::sync::{ManagerIdentifier, SyncManagerProgress}; - use tokio::sync::mpsc::unbounded_channel; + use crate::sync::{ManagerIdentifier, SyncManager, SyncManagerProgress}; + use tokio::sync::mpsc; type TestBlockHeadersManager = BlockHeadersManager; @@ -276,6 +281,15 @@ mod tests { .expect("Failed to create BlockHeadersManager") } + /// Create a manager in synced state with an initialized pipeline. + async fn create_synced_manager() -> TestBlockHeadersManager { + let mut manager = create_test_manager().await; + let tip = manager.tip().await.unwrap(); + manager.pipeline.init(tip.height(), *tip.hash(), tip.height()); + manager.progress.set_state(SyncState::Synced); + manager + } + #[tokio::test] async fn test_block_headers_manager_new() { let manager = create_test_manager().await; @@ -348,4 +362,80 @@ mod tests { // Tip segment marked complete again for the next unsolicited header assert!(manager.pipeline.is_tip_complete()); } + + #[tokio::test] + async fn test_peer_connected_when_synced_sends_getheaders() { + let mut manager = create_synced_manager().await; + let (requests, mut rx) = create_test_request_sender(); + + let addr: SocketAddr = "1.2.3.4:9999".parse().unwrap(); + let event = NetworkEvent::PeerConnected { address: addr }; + + let events = manager.handle_network_event(&event, &requests).await.unwrap(); + assert!(events.is_empty()); + assert!(manager.announced_peers.contains(&addr)); + + // Verify a peer-targeted GetHeaders request was sent to the specific peer + let request = rx.try_recv().unwrap(); + match request { + NetworkRequest::SendMessageToPeer(target_addr, _) => { + assert_eq!(target_addr, addr); + } + other => panic!("Expected SendMessageToPeer, got {:?}", other), + } + } + + #[tokio::test] + async fn test_peer_connected_when_synced_skips_already_announced() { + let mut manager = create_synced_manager().await; + let (requests, mut rx) = create_test_request_sender(); + + let addr: SocketAddr = "1.2.3.4:9999".parse().unwrap(); + let event = NetworkEvent::PeerConnected { address: addr }; + + // First connection sends GetHeaders + manager.handle_network_event(&event, &requests).await.unwrap(); + assert!(rx.try_recv().is_ok()); + + // Second connection for same peer sends nothing + manager.handle_network_event(&event, &requests).await.unwrap(); + assert!(rx.try_recv().is_err()); + } + + #[tokio::test] + async fn test_peer_disconnected_removes_from_announced() { + let mut manager = create_synced_manager().await; + let (requests, mut rx) = create_test_request_sender(); + + let addr: SocketAddr = "1.2.3.4:9999".parse().unwrap(); + + // Connect + let connect_event = NetworkEvent::PeerConnected { address: addr }; + manager.handle_network_event(&connect_event, &requests).await.unwrap(); + assert!(manager.announced_peers.contains(&addr)); + rx.try_recv().unwrap(); // drain the request + + // Disconnect + let disconnect_event = NetworkEvent::PeerDisconnected { address: addr }; + manager.handle_network_event(&disconnect_event, &requests).await.unwrap(); + assert!(!manager.announced_peers.contains(&addr)); + + // Reconnect sends GetHeaders again + manager.handle_network_event(&connect_event, &requests).await.unwrap(); + assert!(manager.announced_peers.contains(&addr)); + assert!(rx.try_recv().is_ok()); + } + + #[tokio::test] + async fn test_peer_connected_when_not_synced_does_nothing() { + let mut manager = create_test_manager().await; + let (requests, mut rx) = create_test_request_sender(); + + let addr: SocketAddr = "1.2.3.4:9999".parse().unwrap(); + let event = NetworkEvent::PeerConnected { address: addr }; + + manager.handle_network_event(&event, &requests).await.unwrap(); + assert!(!manager.announced_peers.contains(&addr)); + assert!(rx.try_recv().is_err()); + } } diff --git a/dash-spv/src/sync/block_headers/sync_manager.rs b/dash-spv/src/sync/block_headers/sync_manager.rs index 7a35c933a..9d0405d3a 100644 --- a/dash-spv/src/sync/block_headers/sync_manager.rs +++ b/dash-spv/src/sync/block_headers/sync_manager.rs @@ -41,6 +41,7 @@ impl SyncManager for BlockHeadersMana let checkpoint_manager = self.pipeline.checkpoint_manager().clone(); self.pipeline = HeadersPipeline::new(checkpoint_manager); self.pending_announcements.clear(); + self.announced_peers.clear(); } async fn start_sync(&mut self, requests: &RequestSender) -> SyncResult> { @@ -151,37 +152,54 @@ impl SyncManager for BlockHeadersMana event: &NetworkEvent, requests: &RequestSender, ) -> SyncResult> { - if let NetworkEvent::PeersUpdated { - connected_count, - best_height, - .. - } = event - { - if let Some(best_height) = best_height { - self.progress.update_target_height(*best_height); - let mut metadata_storage = self.metadata_storage.write().await; - metadata_storage.store_last_target_height(*best_height).await?; + match event { + NetworkEvent::PeerConnected { address } => { + // When synced, send GetHeaders to new peers so Dash Core learns our tip + // and sends headers2 announcements instead of inv. + if self.state() == SyncState::Synced + && self.pipeline.is_initialized() + && !self.announced_peers.contains(address) + { + let tip = self.tip().await?; + tracing::info!("Announcing tip {} to new peer {}", tip.height(), address); + requests.request_block_headers_from_peer(*tip.hash(), *address)?; + self.announced_peers.insert(*address); + } + } + NetworkEvent::PeerDisconnected { address } => { + self.announced_peers.remove(address); } - if *connected_count == 0 { - self.stop_sync(); - } else if *connected_count > 0 { - if self.state() == SyncState::WaitingForConnections { - return self.start_sync(requests).await; + NetworkEvent::PeersUpdated { + connected_count, + best_height, + .. + } => { + if let Some(best_height) = best_height { + self.progress.update_target_height(*best_height); + let mut metadata_storage = self.metadata_storage.write().await; + metadata_storage.store_last_target_height(*best_height).await?; } - // When already synced but behind peer height, request missing headers - if self.state() == SyncState::Synced { - if let Some(best_height) = best_height { - if *best_height > self.progress.tip_height() - && !self.pipeline.tip_segment_has_pending_request() - { - tracing::info!( - "Peer height {} > our height {}, requesting headers to catch up", - best_height, - self.progress.tip_height() - ); - // Reset tip segment and send requests via pipeline - self.pipeline.reset_tip_segment(); - self.pipeline.send_pending(requests)?; + if *connected_count == 0 { + self.stop_sync(); + } else if *connected_count > 0 { + if self.state() == SyncState::WaitingForConnections { + return self.start_sync(requests).await; + } + // When already synced but behind peer height, request missing headers + if self.state() == SyncState::Synced { + if let Some(best_height) = best_height { + if *best_height > self.progress.tip_height() + && !self.pipeline.tip_segment_has_pending_request() + { + tracing::info!( + "Peer height {} > our height {}, requesting headers to catch up", + best_height, + self.progress.tip_height() + ); + // Reset tip segment and send requests via pipeline + self.pipeline.reset_tip_segment(); + self.pipeline.send_pending(requests)?; + } } } }