Skip to content

Commit b7b44b3

Browse files
committed
feat: implement parallel filter matching
1 parent 9f1a9b4 commit b7b44b3

8 files changed

Lines changed: 356 additions & 4 deletions

File tree

dash-spv/src/sync/filters/matching.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,16 @@
88
//! - Efficient filter matching using BIP158 algorithms
99
//! - Block download coordination for matches
1010
11+
use crate::error::{SyncError, SyncResult};
12+
use crate::network::NetworkManager;
13+
use crate::storage::StorageManager;
1114
use dashcore::{
1215
bip158::{BlockFilterReader, Error as Bip158Error},
1316
network::message::NetworkMessage,
1417
network::message_blockdata::Inventory,
1518
BlockHash, ScriptBuf,
1619
};
17-
18-
use crate::error::{SyncError, SyncResult};
19-
use crate::network::NetworkManager;
20-
use crate::storage::StorageManager;
20+
use key_wallet_manager::wallet_manager::{FilterMatchInput, FilterMatchOutput};
2121

2222
impl<S: StorageManager, N: NetworkManager> super::manager::FilterSyncManager<S, N> {
2323
pub async fn check_filter_for_matches<
@@ -41,6 +41,16 @@ impl<S: StorageManager, N: NetworkManager> super::manager::FilterSyncManager<S,
4141
}
4242
}
4343

44+
pub async fn check_filters_for_matches<
45+
W: key_wallet_manager::wallet_interface::WalletInterface,
46+
>(
47+
&self,
48+
input_map: FilterMatchInput,
49+
wallet: &W,
50+
) -> FilterMatchOutput {
51+
wallet.check_compact_filters(input_map).await
52+
}
53+
4454
/// Check if filter matches any of the provided scripts using BIP158 GCS filter.
4555
#[allow(dead_code)]
4656
fn filter_matches_scripts(

key-wallet-manager/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ serde = { version = "1.0", default-features = false, features = ["derive"], opti
2525
async-trait = "0.1"
2626
bincode = { version = "=2.0.0-rc.3", optional = true }
2727
zeroize = { version = "1.8", features = ["derive"] }
28+
rayon = "1.11"
2829
tokio = { version = "1.32", features = ["full"] }
2930

3031
[dev-dependencies]

key-wallet-manager/src/test_utils/wallet.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::{collections::BTreeMap, sync::Arc};
33
use dashcore::{Block, Transaction, Txid};
44
use tokio::sync::Mutex;
55

6+
use crate::wallet_manager::{FilterMatchInput, FilterMatchOutput};
67
use crate::{wallet_interface::WalletInterface, BlockProcessingResult};
78

89
// Type alias for transaction effects map
@@ -74,6 +75,10 @@ impl WalletInterface for MockWallet {
7475
let map = self.effects.lock().await;
7576
map.get(&tx.txid()).cloned()
7677
}
78+
79+
async fn check_compact_filters(&self, _input: FilterMatchInput) -> FilterMatchOutput {
80+
FilterMatchOutput::default()
81+
}
7782
}
7883

7984
/// Mock wallet that returns false for filter checks
@@ -103,6 +108,10 @@ impl WalletInterface for NonMatchingMockWallet {
103108
false
104109
}
105110

111+
async fn check_compact_filters(&self, _input: FilterMatchInput) -> FilterMatchOutput {
112+
FilterMatchOutput::default()
113+
}
114+
106115
async fn describe(&self) -> String {
107116
"NonMatchingWallet (test implementation)".to_string()
108117
}

key-wallet-manager/src/wallet_interface.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
//!
33
//! This module defines the trait that SPV clients use to interact with wallets.
44
5+
use crate::wallet_manager::{FilterMatchInput, FilterMatchOutput};
56
use alloc::string::String;
67
use alloc::vec::Vec;
78
use async_trait::async_trait;
@@ -41,6 +42,10 @@ pub trait WalletInterface: Send + Sync + 'static {
4142
block_hash: &dashcore::BlockHash,
4243
) -> bool;
4344

45+
/// Check compact filters against watched addresses in batch
46+
/// Returns map of filter keys to match results
47+
async fn check_compact_filters(&self, input: FilterMatchInput) -> FilterMatchOutput;
48+
4449
/// Return the wallet's per-transaction net change and involved addresses if known.
4550
/// Returns (net_amount, addresses) where net_amount is received - sent in satoshis.
4651
/// If the wallet has no record for the transaction, returns None.
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
use alloc::vec::Vec;
2+
use dashcore::bip158::BlockFilter;
3+
use dashcore::prelude::CoreBlockHeight;
4+
use dashcore::{Address, BlockHash};
5+
use rayon::prelude::{IntoParallelIterator, ParallelIterator};
6+
use std::collections::{BTreeSet, HashMap};
7+
8+
pub type FilterMatchInput = HashMap<FilterMatchKey, BlockFilter>;
9+
pub type FilterMatchOutput = BTreeSet<FilterMatchKey>;
10+
11+
#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
12+
pub struct FilterMatchKey {
13+
height: CoreBlockHeight,
14+
hash: BlockHash,
15+
}
16+
17+
impl FilterMatchKey {
18+
pub fn new(height: CoreBlockHeight, hash: BlockHash) -> Self {
19+
Self {
20+
height,
21+
hash,
22+
}
23+
}
24+
pub fn height(&self) -> CoreBlockHeight {
25+
self.height
26+
}
27+
pub fn hash(&self) -> &BlockHash {
28+
&self.hash
29+
}
30+
}
31+
32+
/// Check compact filters for addresses and return the keys that matched.
33+
pub fn check_compact_filters_for_addresses(
34+
input: FilterMatchInput,
35+
addresses: Vec<Address>,
36+
) -> FilterMatchOutput {
37+
let script_pubkey_bytes: Vec<Vec<u8>> =
38+
addresses.iter().map(|address| address.script_pubkey().to_bytes()).collect();
39+
40+
input
41+
.into_par_iter()
42+
.filter_map(|(key, filter)| {
43+
filter
44+
.match_any(key.hash(), script_pubkey_bytes.iter().map(|v| v.as_slice()))
45+
.unwrap_or(false)
46+
.then_some(key)
47+
})
48+
.collect()
49+
}
50+
51+
#[cfg(test)]
52+
mod tests {
53+
use super::*;
54+
use crate::Network;
55+
use dashcore::{Block, Transaction};
56+
57+
#[test]
58+
fn test_empty_input_returns_empty() {
59+
let result = check_compact_filters_for_addresses(FilterMatchInput::new(), vec![]);
60+
assert!(result.is_empty());
61+
}
62+
63+
#[test]
64+
fn test_empty_addresses_returns_empty() {
65+
let address = Address::dummy(Network::Regtest, 1);
66+
let tx = Transaction::dummy(&address, 0..0, &[1]);
67+
let block = Block::dummy(100, vec![tx]);
68+
let filter = BlockFilter::dummy(&block);
69+
let key = FilterMatchKey::new(100, block.block_hash());
70+
71+
let mut input = FilterMatchInput::new();
72+
input.insert(key.clone(), filter);
73+
74+
let output = check_compact_filters_for_addresses(input, vec![]);
75+
assert!(!output.contains(&key));
76+
}
77+
78+
#[test]
79+
fn test_matching_filter() {
80+
let address = Address::dummy(Network::Regtest, 1);
81+
let tx = Transaction::dummy(&address, 0..0, &[1]);
82+
let block = Block::dummy(100, vec![tx]);
83+
let filter = BlockFilter::dummy(&block);
84+
let key = FilterMatchKey::new(100, block.block_hash());
85+
86+
let mut input = FilterMatchInput::new();
87+
input.insert(key.clone(), filter);
88+
89+
let output = check_compact_filters_for_addresses(input, vec![address]);
90+
assert!(output.contains(&key));
91+
}
92+
93+
#[test]
94+
fn test_non_matching_filter() {
95+
let address = Address::dummy(Network::Regtest, 1);
96+
let address_other = Address::dummy(Network::Regtest, 2);
97+
98+
let tx = Transaction::dummy(&address_other, 0..0, &[1]);
99+
let block = Block::dummy(100, vec![tx]);
100+
let filter = BlockFilter::dummy(&block);
101+
let key = FilterMatchKey::new(100, block.block_hash());
102+
103+
let mut input = FilterMatchInput::new();
104+
input.insert(key.clone(), filter);
105+
106+
let output = check_compact_filters_for_addresses(input, vec![address]);
107+
assert!(!output.contains(&key));
108+
}
109+
110+
#[test]
111+
fn test_batch_mixed_results() {
112+
let unrelated_address = Address::dummy(Network::Regtest, 0);
113+
let address_1 = Address::dummy(Network::Regtest, 1);
114+
let address_2 = Address::dummy(Network::Regtest, 2);
115+
116+
let tx_1 = Transaction::dummy(&address_1, 0..0, &[1]);
117+
let block_1 = Block::dummy(100, vec![tx_1]);
118+
let filter_1 = BlockFilter::dummy(&block_1);
119+
let key_1 = FilterMatchKey::new(100, block_1.block_hash());
120+
121+
let tx_2 = Transaction::dummy(&address_2, 0..0, &[2]);
122+
let block_2 = Block::dummy(100, vec![tx_2]);
123+
let filter_2 = BlockFilter::dummy(&block_2);
124+
let key_2 = FilterMatchKey::new(200, block_2.block_hash());
125+
126+
let tx_3 = Transaction::dummy(&unrelated_address, 0..0, &[10]);
127+
let block_3 = Block::dummy(100, vec![tx_3]);
128+
let filter_3 = BlockFilter::dummy(&block_3);
129+
let key_3 = FilterMatchKey::new(300, block_3.block_hash());
130+
131+
let mut input = FilterMatchInput::new();
132+
input.insert(key_1.clone(), filter_1);
133+
input.insert(key_2.clone(), filter_2);
134+
input.insert(key_3.clone(), filter_3);
135+
136+
let output = check_compact_filters_for_addresses(input, vec![address_1, address_2]);
137+
assert_eq!(output.len(), 2);
138+
assert!(output.contains(&key_1));
139+
assert!(output.contains(&key_2));
140+
assert!(!output.contains(&key_3));
141+
}
142+
143+
#[test]
144+
fn test_output_sorted_by_height() {
145+
let address = Address::dummy(Network::Regtest, 1);
146+
147+
// Create blocks at different heights (inserted in non-sorted order)
148+
let heights = [500, 100, 300, 200, 400];
149+
let mut input = FilterMatchInput::new();
150+
151+
for (i, &height) in heights.iter().enumerate() {
152+
let tx = Transaction::dummy(&address, 0..0, &[i as u64]);
153+
let block = Block::dummy(height, vec![tx]);
154+
let filter = BlockFilter::dummy(&block);
155+
let key = FilterMatchKey::new(height, block.block_hash());
156+
input.insert(key, filter);
157+
}
158+
159+
let output = check_compact_filters_for_addresses(input, vec![address]);
160+
161+
// Verify output is sorted by height (ascending)
162+
let heights_out: Vec<CoreBlockHeight> = output.iter().map(|k| k.height()).collect();
163+
let mut sorted_heights = heights_out.clone();
164+
sorted_heights.sort();
165+
166+
assert_eq!(heights_out, sorted_heights);
167+
assert_eq!(heights_out, vec![100, 200, 300, 400, 500]);
168+
}
169+
}

key-wallet-manager/src/wallet_manager/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@
44
//! each of which can have multiple accounts. This follows the architecture
55
//! pattern where a manager oversees multiple distinct wallets.
66
7+
mod matching;
78
mod process_block;
89
mod transaction_building;
910

11+
pub use crate::wallet_manager::matching::{FilterMatchInput, FilterMatchKey, FilterMatchOutput};
1012
use alloc::collections::BTreeMap;
1113
use alloc::string::String;
1214
use alloc::vec::Vec;

key-wallet-manager/src/wallet_manager/process_block.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
use crate::wallet_interface::{BlockProcessingResult, WalletInterface};
2+
use crate::wallet_manager::matching::{
3+
check_compact_filters_for_addresses, FilterMatchInput, FilterMatchOutput,
4+
};
25
use crate::WalletManager;
36
use alloc::string::String;
47
use alloc::vec::Vec;
@@ -80,6 +83,10 @@ impl<T: WalletInfoInterface + Send + Sync + 'static> WalletInterface for WalletM
8083
hit
8184
}
8285

86+
async fn check_compact_filters(&self, input: FilterMatchInput) -> FilterMatchOutput {
87+
check_compact_filters_for_addresses(input, self.monitored_addresses())
88+
}
89+
8390
async fn transaction_effect(&self, tx: &Transaction) -> Option<(i64, Vec<String>)> {
8491
// Aggregate across all managed wallets. If any wallet considers it relevant,
8592
// compute net = total_received - total_sent and collect involved addresses.

0 commit comments

Comments
 (0)