diff --git a/dash-spv/src/sync/filters/manager.rs b/dash-spv/src/sync/filters/manager.rs index d1e961a71..dbf53695a 100644 --- a/dash-spv/src/sync/filters/manager.rs +++ b/dash-spv/src/sync/filters/manager.rs @@ -191,15 +191,28 @@ impl 0 { + (stored_filters_tip + 1).max(header_start_height) + } else { + scan_start + }; + + // Anchor the scan and store frontiers even when we return early below. + // A wallet that boots already synced takes the early return, and a + // later new block would otherwise create a lookahead batch from a + // stale `processing_height` of 0 and wedge the in-order store loop on + // a `next_batch_to_store` of 0. self.processing_height = scan_start; + self.next_batch_to_store = download_start; // Check if already at target (nothing to download) if scan_start > self.progress.filter_header_tip_height() { + // Park the idle pipeline at the download frontier so a later + // `extend_target` from a new block queues from here instead of + // re-requesting every filter from an uninitialized position. + self.filter_pipeline.init(download_start, download_start.saturating_sub(1)); // Only emit FiltersSyncComplete if we've also reached the chain tip // This prevents premature sync complete while filter headers are still syncing if self.progress.committed_height() >= self.progress.target_height() { @@ -216,16 +229,6 @@ impl 0 { - (stored_filters_tip + 1).max(header_start_height) - } else { - scan_start - }; - - self.next_batch_to_store = download_start; - tracing::info!( "Starting filter download (scan_start={}, download_start={}, stored_filters_tip={}, target={})", scan_start, @@ -902,10 +905,13 @@ impl { // In-progress work preserved across a disconnect cycle. Don't @@ -934,19 +940,23 @@ impl (TestFiltersManager, Vec
, BlockFilter) { + let mut manager = create_test_manager().await; + + let headers = Header::dummy_batch(0..102); + manager + .header_storage + .write() + .await + .store_headers(&headers.iter().map(HashedBlockHeader::from).collect::>()) + .await + .unwrap(); + + let boundary_filter = BlockFilter::new(&[0u8; 32]); + { + let mut fs = manager.filter_storage.write().await; + for h in 0..=100 { + fs.store_filter(h, &boundary_filter.content).await.unwrap(); + } + } + + // The prev header is non-zero because the segment store treats an + // all-zero filter header as an empty (sentinel) slot. + let prev_filter_header = FilterHeader::from_byte_array([1u8; 32]); + manager + .filter_header_storage + .write() + .await + .store_filter_headers_at_height( + &[prev_filter_header, boundary_filter.filter_header(&prev_filter_header)], + 100, + ) + .await + .unwrap(); + + manager.wallet.write().await.update_wallet_synced_height(&MOCK_WALLET_ID, 100); + manager.progress.update_committed_height(100); + manager.progress.update_stored_height(100); + manager.progress.update_filter_header_tip_height(100); + manager.progress.update_target_height(100); + + (manager, headers, boundary_filter) + } + /// Build a real `BlockFilter` for a single-output block paying `address`. fn filter_for_address( height: u32, @@ -2060,7 +2118,9 @@ mod tests { // Pipeline target at 150 with no pending batches, so extend_target(150) // is a no-op and send_pending returns immediately (no headers needed) manager.filter_pipeline.init(151, 150); - // Active batch prevents try_create_lookahead_batches from running + // The active batch ends at 200, so try_create_lookahead_batches chains + // its next start off 201, past the filter header tip 150, and the bound + // stops it before creating any batch or emitting an event. manager.active_batches.insert(101, FiltersBatch::new(101, 200, HashMap::new())); let (tx, _rx) = unbounded_channel(); @@ -2170,6 +2230,206 @@ mod tests { assert_eq!(manager.active_batches.keys().next(), Some(&101)); } + /// A fully synced node that reconnects and then sees one new block must + /// commit it. `start_sync` takes the `stored == committed == tip` branch, + /// reports `Synced`, and anchors the store and processing cursors at the + /// frontier; the boundary block's filter header, body, and commit then flow + /// through to the new tip. If those cursors were left at 0, the boundary + /// block would create a batch from height 0 and wedge the in-order store + /// loop so `committed_height` freezes one below tip. + #[tokio::test] + async fn test_start_sync_synced_restart_then_boundary_block_commits() { + let (mut manager, headers, boundary_filter) = setup_synced_manager_at_tip().await; + + // Fully-synced restart: `start_sync` requires `WaitingForConnections`. + manager.set_state(SyncState::WaitingForConnections); + + let (tx, _rx) = unbounded_channel(); + let requests = RequestSender::new(tx); + + // Reconnect: the already-synced branch reports completion and anchors the + // store/processing cursors at the frontier. + let events = manager.start_sync(&requests).await.unwrap(); + assert_eq!(manager.state(), SyncState::Synced); + assert!(events.iter().any(|e| matches!( + e, + SyncEvent::FiltersSyncComplete { + tip_height: 100 + } + ))); + + // Block 101's filter header lands: the manager queues its body download + // and a processing batch starting at the frontier (not at height 0). + manager + .handle_sync_event( + &SyncEvent::FilterHeadersStored { + start_height: 101, + end_height: 101, + tip_height: 101, + }, + &requests, + ) + .await + .unwrap(); + + // The peer answers with the boundary filter body over the real path. + let peer: SocketAddr = "127.0.0.1:19999".parse().unwrap(); + let cfilter = CFilter { + filter_type: 0, + block_hash: headers[101].block_hash(), + filter: boundary_filter.content.clone(), + }; + manager + .handle_message(Message::new(peer, NetworkMessage::CFilter(cfilter)), &requests) + .await + .unwrap(); + + // A trailing tick drives any residual processing to completion. + manager.tick(&requests).await.unwrap(); + + assert_eq!(manager.progress.committed_height(), 101); + assert_eq!(manager.state(), SyncState::Synced); + assert!(manager.active_batches.is_empty()); + assert!(manager.next_batch_to_store > 100); + } + + /// A node that boots already synced (default `WaitForEvents` state, which + /// `start_sync` never runs from) must commit a new block that lands after + /// the initial `FilterHeadersSyncComplete`. That event drives + /// `start_download`'s early return, which must anchor the store cursor and + /// park the pipeline at the frontier. Left at their defaults, the next + /// block makes `extend_target` re-request every filter from height 1 and + /// wedges the in-order store loop on a `next_batch_to_store` of 0, freezing + /// `committed_height` one below tip. + #[tokio::test] + async fn test_synced_boot_then_boundary_block_commits() { + let (mut manager, headers, boundary_filter) = setup_synced_manager_at_tip().await; + + // Fully-synced boot leaves the manager in its default state. + assert_eq!(manager.state(), SyncState::WaitForEvents); + + let (tx, mut rx) = unbounded_channel(); + let requests = RequestSender::new(tx); + + // Filter header sync completing at the stored tip reports Synced via + // `start_download`'s early return, anchoring the frontier cursors. + let events = manager + .handle_sync_event( + &SyncEvent::FilterHeadersSyncComplete { + tip_height: 100, + }, + &requests, + ) + .await + .unwrap(); + assert_eq!(manager.state(), SyncState::Synced); + assert!(events.iter().any(|e| matches!( + e, + SyncEvent::FiltersSyncComplete { + tip_height: 100 + } + ))); + assert_eq!(manager.next_batch_to_store, 101); + + // Block 101's filter header lands. + manager + .handle_sync_event( + &SyncEvent::FilterHeadersStored { + start_height: 101, + end_height: 101, + tip_height: 101, + }, + &requests, + ) + .await + .unwrap(); + + // Only the boundary body may be requested: a pipeline left unparked + // would re-request every filter from height 1 here. + while let Ok(request) = rx.try_recv() { + let (NetworkRequest::SendMessage(msg) + | NetworkRequest::SendMessageToPeer(msg, _) + | NetworkRequest::BroadcastMessage(msg)) = request; + if let NetworkMessage::GetCFilters(get) = msg { + assert_eq!(get.start_height, 101, "unexpected filter re-download"); + } + } + + // The peer answers with the boundary filter body over the real path. + let peer: SocketAddr = "127.0.0.1:19999".parse().unwrap(); + let cfilter = CFilter { + filter_type: 0, + block_hash: headers[101].block_hash(), + filter: boundary_filter.content.clone(), + }; + manager + .handle_message(Message::new(peer, NetworkMessage::CFilter(cfilter)), &requests) + .await + .unwrap(); + + // A trailing tick drives any residual processing to completion. + manager.tick(&requests).await.unwrap(); + + assert_eq!(manager.progress.committed_height(), 101); + assert_eq!(manager.state(), SyncState::Synced); + assert!(manager.active_batches.is_empty()); + } + + /// A node that restores its filter header tip ahead of its stored filter + /// bodies (headers at 101, bodies and commit at 100) must download and + /// commit the boundary body on reconnect. `start_sync` delegates to the + /// download path, which sees the scan frontier at or below the restored + /// tip and requests the boundary body rather than parking in + /// `WaitForEvents` and never asking for it. + #[tokio::test] + async fn test_start_sync_filter_headers_ahead_of_bodies_commits_boundary() { + let (mut manager, headers, boundary_filter) = setup_synced_manager_at_tip().await; + + // Filter headers restored ahead of the stored filter bodies. + manager.progress.update_filter_header_tip_height(101); + manager.set_state(SyncState::WaitingForConnections); + + let (tx, mut rx) = unbounded_channel(); + let requests = RequestSender::new(tx); + + // Reconnect delegates to the download path, which must request the + // boundary body at 101 rather than parking without asking for it. + let events = manager.start_sync(&requests).await.unwrap(); + assert_eq!(manager.state(), SyncState::Syncing); + assert!(!events.iter().any(|e| matches!(e, SyncEvent::SyncStart { .. }))); + assert!(manager.active_batches.contains_key(&101)); + + let mut requested_boundary = false; + while let Ok(request) = rx.try_recv() { + let (NetworkRequest::SendMessage(msg) + | NetworkRequest::SendMessageToPeer(msg, _) + | NetworkRequest::BroadcastMessage(msg)) = request; + if let NetworkMessage::GetCFilters(get) = msg { + assert_eq!(get.start_height, 101, "unexpected filter re-download"); + requested_boundary = true; + } + } + assert!(requested_boundary, "boundary filter body 101 must be requested"); + + // The peer answers with the boundary filter body over the real path. + let peer: SocketAddr = "127.0.0.1:19999".parse().unwrap(); + let cfilter = CFilter { + filter_type: 0, + block_hash: headers[101].block_hash(), + filter: boundary_filter.content.clone(), + }; + manager + .handle_message(Message::new(peer, NetworkMessage::CFilter(cfilter)), &requests) + .await + .unwrap(); + + manager.tick(&requests).await.unwrap(); + + assert_eq!(manager.progress.committed_height(), 101); + assert_eq!(manager.state(), SyncState::Synced); + assert!(manager.active_batches.is_empty()); + } + #[tokio::test] async fn test_handle_new_filter_headers_stays_synced_when_already_synced() { let mut manager = create_test_manager().await; @@ -2466,6 +2726,57 @@ mod tests { ); } + /// When the filter header tip grows while a historical rescan is still + /// draining `active_batches`, the boundary block must still get a + /// processing batch. `handle_new_filter_headers` must create the lookahead + /// batch even with a non-empty active set, or a block landing during a + /// restart reinit window gets its body downloaded but never a processing + /// batch, freezing `committed_height` one below tip. Asserted directly + /// after the call (no `tick`) so the deterministic path is verified in + /// isolation. + #[tokio::test] + async fn test_handle_new_filter_headers_extends_boundary_during_rescan() { + let mut manager = create_test_manager().await; + + let headers = Header::dummy_batch(0..102); + manager + .header_storage + .write() + .await + .store_headers(&headers.iter().map(HashedBlockHeader::from).collect::>()) + .await + .unwrap(); + { + let dummy_filter = BlockFilter::new(&[0u8; 32]); + let mut fs = manager.filter_storage.write().await; + for h in 0..=101 { + fs.store_filter(h, &dummy_filter.content).await.unwrap(); + } + } + + // Historical rescan in progress: an active batch ends at the old tip + // 100, filters stored through 101, committed still climbing. + manager.set_state(SyncState::Syncing); + manager.progress.update_committed_height(0); + manager.progress.update_stored_height(101); + manager.progress.update_filter_header_tip_height(100); + manager.progress.update_target_height(101); + manager.active_batches.insert(0, FiltersBatch::new(0, 100, HashMap::new())); + + let (tx, _rx) = unbounded_channel(); + let requests = RequestSender::new(tx); + + // New block 101 arrives: tip grows past the in-flight rescan boundary. + manager.handle_new_filter_headers(101, &requests).await.unwrap(); + + assert!( + manager.active_batches.contains_key(&101), + "boundary block 101 must get a processing batch even while a rescan \ + is draining active_batches, got keys: {:?}", + manager.active_batches.keys().collect::>() + ); + } + #[tokio::test] async fn test_handle_new_filter_headers_skips_when_fully_committed() { let mut manager = create_test_manager().await; diff --git a/dash-spv/src/sync/filters/sync_manager.rs b/dash-spv/src/sync/filters/sync_manager.rs index 3e080880a..ecce171fa 100644 --- a/dash-spv/src/sync/filters/sync_manager.rs +++ b/dash-spv/src/sync/filters/sync_manager.rs @@ -1,7 +1,6 @@ use crate::error::{SyncError, SyncResult}; use crate::network::{Message, MessageType, RequestSender}; use crate::storage::{BlockHeaderStorage, FilterHeaderStorage, FilterStorage}; -use crate::sync::progress::ProgressPercentage; use crate::sync::sync_manager::ensure_not_started; use crate::sync::{ FiltersManager, ManagerIdentifier, SyncEvent, SyncManager, SyncManagerProgress, SyncState, @@ -81,26 +80,13 @@ impl< return Ok(events); } - // Already at or beyond stored filters tip - check if fully synced + // Already at or beyond stored filters tip - delegate to start_download, + // whose early-return path anchors the store and processing cursors at + // the frontier and parks the idle pipeline there. Unlike the branch + // above this must not emit a SyncStart. if stored_filters_tip > 0 && stored_filters_tip == self.progress.committed_height() { self.progress.update_filter_header_tip_height(stored_filters_tip); - // Initialize the pipeline at the current tip. On full disconnect in-flight state gets - // reset, so we need to initialize the pipeline otherwise it would re-queue from height 1. - self.filter_pipeline.init(stored_filters_tip + 1, stored_filters_tip); - // Only emit SyncComplete if we've also reached the chain tip - if self.progress.committed_height() >= self.progress.target_height() { - self.set_state(SyncState::Synced); - tracing::info!( - "FiltersManager: already synced at height {}", - self.progress.committed_height() - ); - return Ok(vec![SyncEvent::FiltersSyncComplete { - tip_height: stored_filters_tip, - }]); - } - // Caught up to stored filters but chain tip not reached yet - self.set_state(SyncState::WaitForEvents); - return Ok(vec![]); + return self.start_download(requests).await; } // No stored filters to process - wait for FilterHeadersSyncComplete events diff --git a/dash-spv/tests/dashd_sync/tests_restart.rs b/dash-spv/tests/dashd_sync/tests_restart.rs index 9fe6c63f7..89bce2421 100644 --- a/dash-spv/tests/dashd_sync/tests_restart.rs +++ b/dash-spv/tests/dashd_sync/tests_restart.rs @@ -47,6 +47,26 @@ async fn test_sync_restart_consistency() { assert_eq!(first_balance, second_balance, "Balance mismatch after restart"); assert_eq!(first_tx_count, second_tx_count, "Transaction count mismatch after restart"); tracing::info!("State consistent after restart"); + + // A block mined right after a fully-synced restart must commit through the + // Filters phase instead of freezing `committed_height` one below the new + // tip. `wait_for_sync` checks the filters committed height, so the wedge + // surfaces as a timeout here. + if ctx.dashd.supports_mining { + drop(client_handle); + tokio::time::sleep(Duration::from_millis(100)).await; + + tracing::info!("Restarting fully synced, then mining a boundary block"); + let mut client_handle = ctx.spawn_new_client().await; + wait_for_sync(&mut client_handle.progress_receiver, ctx.dashd.initial_height).await; + + let miner_address = ctx.dashd.node.get_new_address_from_wallet("default"); + ctx.dashd.node.generate_blocks(1, &miner_address); + wait_for_sync(&mut client_handle.progress_receiver, ctx.dashd.initial_height + 1).await; + client_handle.stop().await; + } else { + eprintln!("Skipping boundary-block restart check (dashd RPC miner not available)"); + } } /// Verify correct rescan behavior when restarting with a fresh wallet but existing storage.