diff --git a/dash-spv/src/client/transactions.rs b/dash-spv/src/client/transactions.rs index a89129cff..62dd580d1 100644 --- a/dash-spv/src/client/transactions.rs +++ b/dash-spv/src/client/transactions.rs @@ -1,6 +1,6 @@ //! Transaction-related client APIs (e.g., broadcasting) -use crate::error::{Result, SpvError}; +use crate::error::{NetworkError, Result, SpvError, SyncError}; use crate::network::NetworkManager; use crate::storage::StorageManager; use dashcore::network::message::NetworkMessage; @@ -12,14 +12,26 @@ impl DashSpvClient { /// Broadcast a transaction to all connected peers. + /// + /// The transaction is also injected into the local message pipeline so that + /// the mempool manager processes it immediately. pub async fn broadcast_transaction(&self, tx: &dashcore::Transaction) -> Result<()> { + if !self.sync_progress().await.is_synced() { + return Err(SpvError::Sync(SyncError::NotSynced)); + } + let network_guard = self.network.lock().await; if network_guard.peer_count() == 0 { - return Err(SpvError::Network(crate::error::NetworkError::NotConnected)); + return Err(SpvError::Network(NetworkError::NotConnected)); } let message = NetworkMessage::Tx(tx.clone()); - Ok(network_guard.broadcast(message).await?) + network_guard.broadcast(message).await?; + + // Inject locally so the mempool manager picks it up through handle_tx. + network_guard.dispatch_local(NetworkMessage::Tx(tx.clone())).await; + + Ok(()) } } diff --git a/dash-spv/src/error.rs b/dash-spv/src/error.rs index 59340fa99..4fbdb5b75 100644 --- a/dash-spv/src/error.rs +++ b/dash-spv/src/error.rs @@ -232,13 +232,19 @@ pub enum SyncError { /// Masternode sync failed (QRInfo or MnListDiff processing error) #[error("Masternode sync failed: {0}")] MasternodeSyncFailed(String), + + /// Operation requires the client to be fully synced + #[error("Client is not synced")] + NotSynced, } impl SyncError { /// Returns a static string representing the error category based on the variant pub fn category(&self) -> &'static str { match self { - SyncError::SyncInProgress(_) | SyncError::InvalidState(_) => "state", + SyncError::SyncInProgress(_) | SyncError::InvalidState(_) | SyncError::NotSynced => { + "state" + } SyncError::Timeout(_) => "timeout", SyncError::Validation(_) => "validation", SyncError::MissingDependency(_) => "dependency", @@ -339,6 +345,7 @@ mod tests { assert_eq!(SyncError::SyncInProgress(ManagerIdentifier::BlockHeader).category(), "state"); assert_eq!(SyncError::InvalidState("test".to_string()).category(), "state"); assert_eq!(SyncError::MissingDependency("test".to_string()).category(), "dependency"); + assert_eq!(SyncError::NotSynced.category(), "state"); // Test deprecated SyncFailed always returns "unknown" #[allow(deprecated)] diff --git a/dash-spv/src/network/manager.rs b/dash-spv/src/network/manager.rs index fa85dec8f..4a519f3e8 100644 --- a/dash-spv/src/network/manager.rs +++ b/dash-spv/src/network/manager.rs @@ -1,7 +1,7 @@ //! Peer network manager for SPV client use std::collections::{HashMap, HashSet}; -use std::net::SocketAddr; +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::path::PathBuf; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; @@ -1329,6 +1329,12 @@ impl NetworkManager for PeerNetworkManager { Ok(()) } + async fn dispatch_local(&self, message: NetworkMessage) { + let local_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)); + let msg = Message::new(local_addr, message); + self.message_dispatcher.lock().await.dispatch(&msg); + } + async fn disconnect_peer(&self, addr: &SocketAddr, reason: &str) -> NetworkResult<()> { PeerNetworkManager::disconnect_peer(self, addr, reason) .await diff --git a/dash-spv/src/network/mod.rs b/dash-spv/src/network/mod.rs index 50c3d1907..495dfa01f 100644 --- a/dash-spv/src/network/mod.rs +++ b/dash-spv/src/network/mod.rs @@ -223,6 +223,12 @@ pub trait NetworkManager: Send + Sync + 'static { /// Broadcast a message to all connected peers. async fn broadcast(&self, _message: NetworkMessage) -> NetworkResult<()>; + /// Inject a message into the local message dispatcher as if received from a peer. + /// + /// Used for locally-originated messages (e.g., self-broadcast transactions) that + /// should be processed through the same pipeline as peer-received messages. + async fn dispatch_local(&self, message: NetworkMessage); + /// Disconnect a specific peer by address. async fn disconnect_peer(&self, _addr: &SocketAddr, _reason: &str) -> NetworkResult<()>; diff --git a/dash-spv/src/sync/mempool/manager.rs b/dash-spv/src/sync/mempool/manager.rs index cee25f68c..2d55fab6b 100644 --- a/dash-spv/src/sync/mempool/manager.rs +++ b/dash-spv/src/sync/mempool/manager.rs @@ -296,9 +296,27 @@ impl MempoolManager { } /// Handle a received transaction. - pub(super) async fn handle_tx(&mut self, tx: Transaction) -> SyncResult> { + /// + /// When `peer` is the local sentinel address (`0.0.0.0:0`), the transaction + /// is treated as self-originated and recorded in `recent_sends`. + pub(super) async fn handle_tx( + &mut self, + tx: Transaction, + peer: SocketAddr, + ) -> SyncResult> { let txid = tx.txid(); self.pending_requests.remove(&txid); + let is_local = peer.ip().is_unspecified(); + + // Skip if already tracked (e.g., locally broadcast then received from a peer) + if self.mempool_state.read().await.transactions.contains_key(&txid) { + self.seen_txids.insert(txid, Instant::now()); + if is_local { + self.mempool_state.write().await.record_send(txid); + } + return Ok(vec![]); + } + self.seen_txids.insert(txid, Instant::now()); self.progress.add_received(1); @@ -331,6 +349,9 @@ impl MempoolManager { { let mut state = self.mempool_state.write().await; state.add_transaction(unconfirmed_tx); + if is_local { + state.record_send(txid); + } self.progress.set_tracked(state.transactions.len() as u32); } @@ -345,6 +366,7 @@ impl MempoolManager { let mut state = self.mempool_state.write().await; for txid in txids { if state.remove_transaction(txid).is_some() { + state.recent_sends.remove(txid); removed.push(*txid); } } @@ -365,6 +387,7 @@ impl MempoolManager { let mut state = self.mempool_state.write().await; let instant_lock_opt = if let Some(tx) = state.transactions.get_mut(&txid) { tx.is_instant_send = true; + state.recent_sends.remove(&txid); tracing::debug!("Marked mempool tx {} as InstantSend-locked", txid); Some(instant_lock) } else if self.pending_is_locks.len() < MAX_PENDING_IS_LOCKS { @@ -712,7 +735,7 @@ mod tests { }; let txid = tx.txid(); - let events = manager.handle_tx(tx).await.unwrap(); + let events = manager.handle_tx(tx, test_socket_address(1)).await.unwrap(); // MockWallet returns is_relevant=false by default assert!(events.is_empty()); assert_eq!(manager.progress.received(), 1); @@ -831,7 +854,7 @@ mod tests { }; let txid = tx.txid(); - let events = manager.handle_tx(tx).await.unwrap(); + let events = manager.handle_tx(tx, test_socket_address(1)).await.unwrap(); assert!(events.is_empty()); // Verify transaction was stored in mempool state @@ -840,6 +863,72 @@ mod tests { assert_eq!(manager.progress.received(), 1); assert_eq!(manager.progress.relevant(), 1); assert_eq!(manager.progress.tracked(), 1); + drop(state); + + // Processing the same transaction again should be a no-op (dedup guard) + let tx2 = Transaction { + version: 1, + lock_time: 0, + input: vec![], + output: vec![], + special_transaction_payload: None, + }; + let events = manager.handle_tx(tx2, test_socket_address(1)).await.unwrap(); + assert!(events.is_empty()); + + let state = manager.mempool_state.read().await; + assert_eq!(state.transactions.len(), 1); + // Progress counters should not have incremented + assert_eq!(manager.progress.received(), 1); + assert_eq!(manager.progress.relevant(), 1); + } + + #[tokio::test] + async fn test_handle_tx_local_records_send() { + let (mut manager, _requests, _wallet) = create_relevant_manager(); + + let tx = Transaction { + version: 2, + lock_time: 0, + input: vec![], + output: vec![], + special_transaction_payload: None, + }; + let txid = tx.txid(); + + // Use the unspecified address to simulate a locally broadcast transaction + let local_addr = SocketAddr::from(([0, 0, 0, 0], 0)); + manager.handle_tx(tx, local_addr).await.unwrap(); + + let state = manager.mempool_state.read().await; + assert!(state.transactions.contains_key(&txid)); + assert!( + state.is_recent_send(&txid, Duration::from_secs(10)), + "locally dispatched transaction should be recorded as a recent send" + ); + } + + #[tokio::test] + async fn test_handle_tx_remote_does_not_record_send() { + let (mut manager, _requests, _wallet) = create_relevant_manager(); + + let tx = Transaction { + version: 3, + lock_time: 0, + input: vec![], + output: vec![], + special_transaction_payload: None, + }; + let txid = tx.txid(); + + manager.handle_tx(tx, test_socket_address(1)).await.unwrap(); + + let state = manager.mempool_state.read().await; + assert!(state.transactions.contains_key(&txid)); + assert!( + !state.is_recent_send(&txid, Duration::from_secs(10)), + "peer-received transaction should not be recorded as a recent send" + ); } #[tokio::test] @@ -859,7 +948,7 @@ mod tests { manager.pending_requests.insert(txid, Instant::now()); assert!(manager.pending_requests.contains_key(&txid)); - manager.handle_tx(tx).await.unwrap(); + manager.handle_tx(tx, test_socket_address(1)).await.unwrap(); // Pending request should be cleared regardless of relevance assert!(!manager.pending_requests.contains_key(&txid)); @@ -933,13 +1022,18 @@ mod tests { Vec::new(), 0, )); + state.record_send(txid); } manager.process_instant_send(dummy_instant_lock(txid)).await; - // Verify mempool state also reflects IS flag + // Verify mempool state reflects IS flag and recent_sends is cleaned up let state = manager.mempool_state.read().await; assert!(state.transactions.get(&txid).unwrap().is_instant_send); + assert!( + !state.recent_sends.contains_key(&txid), + "IS-locked transaction should be removed from recent_sends" + ); drop(state); let wallet = manager.wallet.read().await; @@ -1127,7 +1221,7 @@ mod tests { assert!(manager.pending_is_locks.contains_key(&txid)); // Transaction arrives - manager.handle_tx(tx).await.unwrap(); + manager.handle_tx(tx, test_socket_address(1)).await.unwrap(); // Pending IS lock consumed assert!(manager.pending_is_locks.is_empty()); @@ -1167,7 +1261,7 @@ mod tests { assert!(manager.pending_is_locks.contains_key(&txid)); // Transaction arrives but wallet says it's not relevant - manager.handle_tx(tx).await.unwrap(); + manager.handle_tx(tx, test_socket_address(1)).await.unwrap(); // Pending IS lock cleaned up (no leak) assert!(manager.pending_is_locks.is_empty()); @@ -1349,7 +1443,7 @@ mod tests { w.set_mempool_addresses(vec![addr.clone()]); } - manager.handle_tx(tx).await.unwrap(); + manager.handle_tx(tx, test_socket_address(1)).await.unwrap(); let state = manager.mempool_state.read().await; let stored = state.transactions.get(&txid).unwrap(); @@ -1378,7 +1472,7 @@ mod tests { w.set_mempool_net_amount(-30000); } - manager.handle_tx(tx).await.unwrap(); + manager.handle_tx(tx, test_socket_address(1)).await.unwrap(); let state = manager.mempool_state.read().await; let stored = state.transactions.get(&txid).unwrap(); @@ -1491,6 +1585,9 @@ mod tests { )); } assert_eq!(state.transactions.len(), 3); + // Mark two as recent sends + state.record_send(txids[0]); + state.record_send(txids[1]); } // Remove 2 of the 3 transactions @@ -1499,6 +1596,8 @@ mod tests { let state = manager.mempool_state.read().await; assert_eq!(state.transactions.len(), 1); assert!(state.transactions.contains_key(&txids[2])); + assert!(!state.recent_sends.contains_key(&txids[0])); + assert!(!state.recent_sends.contains_key(&txids[1])); drop(state); assert_eq!(manager.progress.removed(), 2); diff --git a/dash-spv/src/sync/mempool/sync_manager.rs b/dash-spv/src/sync/mempool/sync_manager.rs index fea5e6091..61a1f324a 100644 --- a/dash-spv/src/sync/mempool/sync_manager.rs +++ b/dash-spv/src/sync/mempool/sync_manager.rs @@ -50,7 +50,7 @@ impl SyncManager for MempoolManager { ) -> SyncResult> { match msg.inner() { NetworkMessage::Inv(inv) => self.handle_inv(inv, msg.peer_address(), requests).await, - NetworkMessage::Tx(tx) => self.handle_tx(tx.clone()).await, + NetworkMessage::Tx(tx) => self.handle_tx(tx.clone(), msg.peer_address()).await, _ => Ok(vec![]), } } @@ -816,7 +816,7 @@ mod tests { output: vec![], special_transaction_payload: None, }; - manager.handle_tx(tx).await.unwrap(); + manager.handle_tx(tx, test_socket_address(1)).await.unwrap(); let has_filter_load = std::iter::from_fn(|| rx.try_recv().ok()).any(|msg| { matches!(msg, NetworkRequest::SendMessageToPeer(NetworkMessage::FilterLoad(_), _)) diff --git a/dash-spv/src/test_utils/network.rs b/dash-spv/src/test_utils/network.rs index 38f64ee8a..1eff5aecf 100644 --- a/dash-spv/src/test_utils/network.rs +++ b/dash-spv/src/test_utils/network.rs @@ -10,10 +10,11 @@ use dashcore::{ network::message_blockdata::GetHeadersMessage, BlockHash, Network, }; use dashcore_hashes::Hash; -use std::net::SocketAddr; +use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use std::time::Duration; use tokio::sync::broadcast; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::sync::Mutex; pub fn test_socket_address(id: u8) -> SocketAddr { SocketAddr::from(([127, 0, 0, id], id as u16)) @@ -24,7 +25,7 @@ pub struct MockNetworkManager { connected: bool, connected_peer: SocketAddr, headers_chain: Vec, - message_dispatcher: MessageDispatcher, + message_dispatcher: Mutex, sent_messages: Vec, /// Request sender for outgoing messages. request_tx: UnboundedSender, @@ -42,7 +43,7 @@ impl MockNetworkManager { connected: true, connected_peer: SocketAddr::new(std::net::Ipv4Addr::LOCALHOST.into(), 9999), headers_chain: Vec::new(), - message_dispatcher: MessageDispatcher::default(), + message_dispatcher: Mutex::new(MessageDispatcher::default()), sent_messages: Vec::new(), request_tx, request_rx: Some(request_rx), @@ -123,7 +124,7 @@ impl Default for MockNetworkManager { #[async_trait] impl NetworkManager for MockNetworkManager { async fn message_receiver(&mut self, types: &[MessageType]) -> UnboundedReceiver { - self.message_dispatcher.message_receiver(types) + self.message_dispatcher.lock().await.message_receiver(types) } fn request_sender(&self) -> RequestSender { @@ -149,8 +150,8 @@ impl NetworkManager for MockNetworkManager { if let NetworkMessage::GetHeaders(ref getheaders) = message { let headers = self.process_getheaders(getheaders); if !headers.is_empty() { - let message = Message::new(self.connected_peer, NetworkMessage::Headers(headers)); - self.message_dispatcher.dispatch(&message); + let msg = Message::new(self.connected_peer, NetworkMessage::Headers(headers)); + self.message_dispatcher.lock().await.dispatch(&msg); } } @@ -170,6 +171,12 @@ impl NetworkManager for MockNetworkManager { panic!("Broadcast not implemented for MockNetworkManager"); } + async fn dispatch_local(&self, message: NetworkMessage) { + let local_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)); + let msg = Message::new(local_addr, message); + self.message_dispatcher.lock().await.dispatch(&msg); + } + async fn disconnect_peer(&self, _addr: &SocketAddr, _reason: &str) -> NetworkResult<()> { panic!("Disconnect peer not implemented for MockNetworkManager"); } diff --git a/dash-spv/src/test_utils/node.rs b/dash-spv/src/test_utils/node.rs index db9a9ef6a..829697c43 100644 --- a/dash-spv/src/test_utils/node.rs +++ b/dash-spv/src/test_utils/node.rs @@ -369,6 +369,41 @@ impl DashCoreNode { txid } + /// Create and sign a raw transaction without broadcasting it. + /// + /// Returns the signed transaction for use with `broadcast_transaction()`. + pub fn create_signed_transaction( + &self, + wallet_name: &str, + input_txid: Txid, + input_vout: u32, + input_amount: Amount, + destination: &Address, + fee: Amount, + ) -> Transaction { + let client = self.rpc_client_for_wallet(wallet_name); + + let inputs = vec![rpc_json::CreateRawTransactionInput { + txid: input_txid, + vout: input_vout, + sequence: None, + }]; + let send_amount = input_amount.checked_sub(fee).expect("fee exceeds input amount"); + let mut outputs = HashMap::new(); + outputs.insert(destination.to_string(), send_amount); + + let raw_tx: Transaction = client + .create_raw_transaction(&inputs, &outputs, None) + .expect("failed to create raw tx"); + + let signed = client + .sign_raw_transaction_with_wallet(&raw_tx, None, None) + .expect("failed to sign raw tx"); + assert!(signed.complete, "raw transaction signing incomplete"); + + signed.transaction().expect("invalid signed tx") + } + /// Connect this dashd node to another dashd node via P2P and wait for the /// connection to be established. pub async fn connect_to_node(&self, addr: SocketAddr) { diff --git a/dash-spv/tests/dashd_sync/tests_mempool.rs b/dash-spv/tests/dashd_sync/tests_mempool.rs index 640f6db51..845f31fda 100644 --- a/dash-spv/tests/dashd_sync/tests_mempool.rs +++ b/dash-spv/tests/dashd_sync/tests_mempool.rs @@ -496,3 +496,79 @@ async fn test_mempool_peer_disconnect_reactivation() { bf.stop().await; tracing::info!("test_mempool_peer_disconnect_reactivation passed"); } + +/// Verify that a locally broadcast transaction is immediately visible in mempool state. +#[tokio::test] +async fn test_broadcast_transaction_local_detection() { + let Some(ctx) = TestContext::new(TestChain::Minimal).await else { + return; + }; + if !ctx.dashd.supports_mining { + eprintln!("Skipping test (dashd RPC miner not available)"); + return; + } + + let (mut fa, _fa_dir) = ctx.spawn_client(MempoolStrategy::FetchAll).await; + let (mut bf, _bf_dir) = ctx.spawn_client(MempoolStrategy::BloomFilter).await; + wait_for_sync_both(&mut fa, &mut bf, ctx.dashd.initial_height).await; + + // Step 1: Fund the SPV wallet with a confirmed UTXO + let receive_address = ctx.receive_address().await; + let funding_amount = Amount::from_sat(200_000_000); + let funding_txid = ctx.dashd.node.send_to_address(&receive_address, funding_amount); + + wait_for_mempool_tx_both(&mut fa, &mut bf, MEMPOOL_TIMEOUT) + .await + .expect("Expected mempool event for funding tx"); + + let miner_address = ctx.dashd.node.get_new_address_from_wallet("default"); + ctx.dashd.node.generate_blocks(1, &miner_address); + let mined_height = ctx.dashd.initial_height + 1; + wait_for_sync_both(&mut fa, &mut bf, mined_height).await; + + // Step 2: Create a signed transaction without broadcasting via dashd + let wallet_name = &ctx.dashd.wallet.wallet_name; + let utxos = ctx.dashd.node.list_unspent_from_wallet(wallet_name); + let utxo = + utxos.iter().find(|u| u.txid == funding_txid).expect("Funding tx UTXO not found in wallet"); + + let external_address = ctx.dashd.node.get_new_address_from_wallet("default"); + let fee = Amount::from_sat(10_000); + let signed_tx = ctx.dashd.node.create_signed_transaction( + wallet_name, + utxo.txid, + utxo.vout, + utxo.amount, + &external_address, + fee, + ); + let txid = signed_tx.txid(); + tracing::info!("Created signed tx for SPV broadcast, txid: {}", txid); + + // Step 3: Broadcast via the SPV client (not via dashd) + fa.client.broadcast_transaction(&signed_tx).await.expect("broadcast_transaction failed"); + tracing::info!("Broadcast tx via FetchAll client"); + + // The locally dispatched transaction should be picked up by the mempool manager + let detected = wait_for_mempool_tx_both(&mut fa, &mut bf, MEMPOOL_TIMEOUT) + .await + .expect("Expected mempool TransactionReceived event after broadcast"); + assert_eq!(detected, txid, "Detected txid should match broadcast txid"); + + // Step 4: Mine the broadcast tx and verify it transitions to confirmed + ctx.dashd.node.generate_blocks(1, &miner_address); + let confirmed_height = mined_height + 1; + wait_for_sync_both(&mut fa, &mut bf, confirmed_height).await; + assert!( + client_has_transaction(&fa.client, &ctx.wallet_id, &txid).await, + "FetchAll: broadcast tx should be confirmed in wallet" + ); + assert!( + client_has_transaction(&bf.client, &ctx.wallet_id, &txid).await, + "BloomFilter: broadcast tx should be confirmed in wallet" + ); + + fa.stop().await; + bf.stop().await; + tracing::info!("test_broadcast_transaction_local_detection passed"); +}