Skip to content

Commit b03252a

Browse files
committed
refactor: detect bloom filter staleness via monitor_revision polling
Instead of requiring consumers to call an external `notify_wallet_addresses_changed()` API whenever wallet addresses change, the mempool manager now internally detects staleness by polling a revision counter on each tick. `WalletManager` maintains a `monitor_revision` counter that increments whenever the monitored address/outpoint set changes (wallet creation/removal, gap limit maintenance). The mempool manager snapshots this counter after each filter build and compares it on tick — a single u64 read behind a read lock. When the revision diverges, the bloom filter is rebuilt automatically. This eliminates a foot-gun where forgetting to call the external API would silently leave the bloom filter stale.
1 parent d2fed4e commit b03252a

12 files changed

Lines changed: 550 additions & 103 deletions

File tree

dash-spv/src/client/lifecycle.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,11 +125,13 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager> DashSpvClient<W,
125125
// Create mempool state and build mempool manager if tracking is enabled
126126
let mempool_state = Arc::new(RwLock::new(MempoolState::default()));
127127
if config.enable_mempool_tracking {
128+
let initial_revision = wallet.read().await.monitor_revision();
128129
managers.mempool = Some(MempoolManager::new(
129130
wallet.clone(),
130131
mempool_state.clone(),
131132
config.mempool_strategy,
132133
config.max_mempool_transactions,
134+
initial_revision,
133135
));
134136
}
135137

dash-spv/src/sync/mempool/manager.rs

Lines changed: 35 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ pub(crate) struct MempoolManager<W: WalletInterface> {
6262
/// Prevents duplicate downloads when multiple peers announce the same transactions.
6363
/// Entries expire after `SEEN_TXID_EXPIRY`.
6464
seen_txids: HashMap<Txid, Instant>,
65+
/// Wallet monitor revision at the time of the last filter build.
66+
/// Compared on each tick to detect when the wallet's monitored set has changed.
67+
pub(super) last_monitor_revision: u64,
6568
}
6669

6770
impl<W: WalletInterface> MempoolManager<W> {
@@ -72,6 +75,7 @@ impl<W: WalletInterface> MempoolManager<W> {
7275
mempool_state: Arc<RwLock<MempoolState>>,
7376
strategy: MempoolStrategy,
7477
max_transactions: usize,
78+
initial_monitor_revision: u64,
7579
) -> Self {
7680
Self {
7781
progress: MempoolProgress::default(),
@@ -83,6 +87,7 @@ impl<W: WalletInterface> MempoolManager<W> {
8387
peers: HashMap::new(),
8488
pending_is_locks: HashMap::new(),
8589
seen_txids: HashMap::new(),
90+
last_monitor_revision: initial_monitor_revision,
8691
}
8792
}
8893

@@ -290,11 +295,7 @@ impl<W: WalletInterface> MempoolManager<W> {
290295
}
291296

292297
/// Handle a received transaction.
293-
pub(super) async fn handle_tx(
294-
&mut self,
295-
tx: Transaction,
296-
requests: &RequestSender,
297-
) -> SyncResult<Vec<SyncEvent>> {
298+
pub(super) async fn handle_tx(&mut self, tx: Transaction) -> SyncResult<Vec<SyncEvent>> {
298299
let txid = tx.txid();
299300
self.pending_requests.remove(&txid);
300301
self.seen_txids.insert(txid, Instant::now());
@@ -331,10 +332,6 @@ impl<W: WalletInterface> MempoolManager<W> {
331332
self.progress.set_tracked(state.transactions.len() as u32);
332333
}
333334

334-
// Wallet-relevant transactions change the monitored set (new UTXOs, spent
335-
// inputs, potentially new addresses from gap limit maintenance).
336-
self.rebuild_filter(requests).await?;
337-
338335
Ok(vec![])
339336
}
340337

@@ -515,7 +512,7 @@ mod tests {
515512
let requests = RequestSender::new(tx);
516513

517514
let mut manager =
518-
MempoolManager::new(wallet, mempool_state, MempoolStrategy::FetchAll, 1000);
515+
MempoolManager::new(wallet, mempool_state, MempoolStrategy::FetchAll, 1000, 0);
519516
manager.progress.set_state(SyncState::Synced);
520517

521518
(manager, requests, rx)
@@ -529,7 +526,7 @@ mod tests {
529526
let requests = RequestSender::new(tx);
530527

531528
let manager =
532-
MempoolManager::new(wallet, mempool_state, MempoolStrategy::BloomFilter, 1000);
529+
MempoolManager::new(wallet, mempool_state, MempoolStrategy::BloomFilter, 1000, 0);
533530

534531
(manager, requests, rx)
535532
}
@@ -599,6 +596,7 @@ mod tests {
599596
mempool_state.clone(),
600597
MempoolStrategy::FetchAll,
601598
2, // Very small capacity
599+
0,
602600
);
603601
let peer = test_socket_address(1);
604602
manager.peers.insert(peer, Some(VecDeque::new()));
@@ -640,7 +638,8 @@ mod tests {
640638
let (tx, _rx) = mpsc::unbounded_channel::<NetworkRequest>();
641639
let requests = RequestSender::new(tx);
642640

643-
let mut manager = MempoolManager::new(wallet, mempool_state, MempoolStrategy::FetchAll, 2);
641+
let mut manager =
642+
MempoolManager::new(wallet, mempool_state, MempoolStrategy::FetchAll, 2, 0);
644643
manager.progress.set_state(SyncState::Synced);
645644
let peer = test_socket_address(1);
646645
manager.peers.insert(peer, Some(VecDeque::new()));
@@ -666,7 +665,7 @@ mod tests {
666665
let _requests = RequestSender::new(tx);
667666

668667
let mut manager =
669-
MempoolManager::new(wallet, mempool_state, MempoolStrategy::FetchAll, 1000);
668+
MempoolManager::new(wallet, mempool_state, MempoolStrategy::FetchAll, 1000, 0);
670669

671670
let fresh_txid = Txid::from_byte_array([1; 32]);
672671
let stale_txid = Txid::from_byte_array([2; 32]);
@@ -684,7 +683,7 @@ mod tests {
684683

685684
#[tokio::test]
686685
async fn test_handle_tx_irrelevant() {
687-
let (mut manager, requests, _rx) = create_test_manager();
686+
let (mut manager, _requests, _rx) = create_test_manager();
688687

689688
let tx = Transaction {
690689
version: 1,
@@ -695,7 +694,7 @@ mod tests {
695694
};
696695
let txid = tx.txid();
697696

698-
let events = manager.handle_tx(tx, &requests).await.unwrap();
697+
let events = manager.handle_tx(tx).await.unwrap();
699698
// MockWallet returns is_relevant=false by default
700699
assert!(events.is_empty());
701700
assert_eq!(manager.progress.received(), 1);
@@ -790,15 +789,20 @@ mod tests {
790789
let (tx, _rx) = mpsc::unbounded_channel::<NetworkRequest>();
791790
let requests = RequestSender::new(tx);
792791

793-
let manager =
794-
MempoolManager::new(wallet.clone(), mempool_state, MempoolStrategy::BloomFilter, 1000);
792+
let manager = MempoolManager::new(
793+
wallet.clone(),
794+
mempool_state,
795+
MempoolStrategy::BloomFilter,
796+
1000,
797+
0,
798+
);
795799

796800
(manager, requests, wallet)
797801
}
798802

799803
#[tokio::test]
800804
async fn test_handle_tx_relevant_stores_transaction() {
801-
let (mut manager, requests, _wallet) = create_relevant_manager();
805+
let (mut manager, _requests, _wallet) = create_relevant_manager();
802806

803807
let tx = Transaction {
804808
version: 1,
@@ -809,7 +813,7 @@ mod tests {
809813
};
810814
let txid = tx.txid();
811815

812-
let events = manager.handle_tx(tx, &requests).await.unwrap();
816+
let events = manager.handle_tx(tx).await.unwrap();
813817
assert!(events.is_empty());
814818

815819
// Verify transaction was stored in mempool state
@@ -822,7 +826,7 @@ mod tests {
822826

823827
#[tokio::test]
824828
async fn test_handle_tx_clears_pending_request() {
825-
let (mut manager, requests, _wallet) = create_relevant_manager();
829+
let (mut manager, _requests, _wallet) = create_relevant_manager();
826830

827831
let tx = Transaction {
828832
version: 1,
@@ -837,7 +841,7 @@ mod tests {
837841
manager.pending_requests.insert(txid, Instant::now());
838842
assert!(manager.pending_requests.contains_key(&txid));
839843

840-
manager.handle_tx(tx, &requests).await.unwrap();
844+
manager.handle_tx(tx).await.unwrap();
841845
// Pending request should be cleared regardless of relevance
842846
assert!(!manager.pending_requests.contains_key(&txid));
843847

@@ -857,7 +861,7 @@ mod tests {
857861
let requests = RequestSender::new(tx);
858862

859863
let manager =
860-
MempoolManager::new(wallet, mempool_state, MempoolStrategy::BloomFilter, 1000);
864+
MempoolManager::new(wallet, mempool_state, MempoolStrategy::BloomFilter, 1000, 0);
861865

862866
(manager, requests, rx)
863867
}
@@ -1088,7 +1092,7 @@ mod tests {
10881092

10891093
#[tokio::test]
10901094
async fn test_instant_send_before_transaction() {
1091-
let (mut manager, requests, _wallet) = create_relevant_manager();
1095+
let (mut manager, _requests, _wallet) = create_relevant_manager();
10921096

10931097
let tx = Transaction {
10941098
version: 1,
@@ -1104,7 +1108,7 @@ mod tests {
11041108
assert!(manager.pending_is_locks.contains_key(&txid));
11051109

11061110
// Transaction arrives
1107-
manager.handle_tx(tx, &requests).await.unwrap();
1111+
manager.handle_tx(tx).await.unwrap();
11081112

11091113
// Pending IS lock consumed
11101114
assert!(manager.pending_is_locks.is_empty());
@@ -1116,7 +1120,7 @@ mod tests {
11161120

11171121
#[tokio::test]
11181122
async fn test_instant_send_before_irrelevant_transaction() {
1119-
let (mut manager, requests, _rx) = create_test_manager();
1123+
let (mut manager, _requests, _rx) = create_test_manager();
11201124

11211125
let tx = Transaction {
11221126
version: 1,
@@ -1132,7 +1136,7 @@ mod tests {
11321136
assert!(manager.pending_is_locks.contains_key(&txid));
11331137

11341138
// Transaction arrives but wallet says it's not relevant
1135-
manager.handle_tx(tx, &requests).await.unwrap();
1139+
manager.handle_tx(tx).await.unwrap();
11361140

11371141
// Pending IS lock cleaned up (no leak)
11381142
assert!(manager.pending_is_locks.is_empty());
@@ -1274,7 +1278,7 @@ mod tests {
12741278
let requests = RequestSender::new(tx);
12751279

12761280
let mut manager =
1277-
MempoolManager::new(wallet, mempool_state, MempoolStrategy::BloomFilter, 1000);
1281+
MempoolManager::new(wallet, mempool_state, MempoolStrategy::BloomFilter, 1000, 0);
12781282

12791283
// Drop receiver so send_filter_load fails
12801284
drop(rx);
@@ -1286,7 +1290,7 @@ mod tests {
12861290

12871291
#[tokio::test]
12881292
async fn test_handle_tx_relevant_populates_wallet_effect_fields() {
1289-
let (mut manager, requests, wallet) = create_relevant_manager();
1293+
let (mut manager, _requests, wallet) = create_relevant_manager();
12901294

12911295
let tx = Transaction {
12921296
version: 1,
@@ -1303,7 +1307,7 @@ mod tests {
13031307
w.set_effect(txid, 50000, vec!["yWdXnYxGbouNoo8yMvcbZmZ3Gdp6BpySxL".into()]).await;
13041308
}
13051309

1306-
manager.handle_tx(tx, &requests).await.unwrap();
1310+
manager.handle_tx(tx).await.unwrap();
13071311

13081312
let state = manager.mempool_state.read().await;
13091313
let stored = state.transactions.get(&txid).unwrap();
@@ -1316,7 +1320,7 @@ mod tests {
13161320

13171321
#[tokio::test]
13181322
async fn test_handle_tx_outgoing_transaction() {
1319-
let (mut manager, requests, wallet) = create_relevant_manager();
1323+
let (mut manager, _requests, wallet) = create_relevant_manager();
13201324

13211325
let tx = Transaction {
13221326
version: 1,
@@ -1332,7 +1336,7 @@ mod tests {
13321336
w.set_effect(txid, -30000, vec![]).await;
13331337
}
13341338

1335-
manager.handle_tx(tx, &requests).await.unwrap();
1339+
manager.handle_tx(tx).await.unwrap();
13361340

13371341
let state = manager.mempool_state.read().await;
13381342
let stored = state.transactions.get(&txid).unwrap();

0 commit comments

Comments
 (0)