Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions dash-spv/src/client/transactions.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Transaction-related client APIs (e.g., broadcasting)

use crate::error::{Result, SpvError};
use crate::error::{NetworkError, Result, SpvError, SyncError};
use crate::network::NetworkManager;
use crate::storage::StorageManager;
use dashcore::network::message::NetworkMessage;
Expand All @@ -12,14 +12,26 @@ impl<W: WalletInterface, N: NetworkManager, S: StorageManager, H: EventHandler>
DashSpvClient<W, N, S, H>
{
/// Broadcast a transaction to all connected peers.
///
/// The transaction is also injected into the local message pipeline so that
/// the mempool manager processes it immediately.
pub async fn broadcast_transaction(&self, tx: &dashcore::Transaction) -> Result<()> {
if !self.sync_progress().await.is_synced() {
return Err(SpvError::Sync(SyncError::NotSynced));
}

let network_guard = self.network.lock().await;

if network_guard.peer_count() == 0 {
return Err(SpvError::Network(crate::error::NetworkError::NotConnected));
return Err(SpvError::Network(NetworkError::NotConnected));
}

let message = NetworkMessage::Tx(tx.clone());
Ok(network_guard.broadcast(message).await?)
network_guard.broadcast(message).await?;

// Inject locally so the mempool manager picks it up through handle_tx.
network_guard.dispatch_local(NetworkMessage::Tx(tx.clone())).await;

Ok(())
}
}
9 changes: 8 additions & 1 deletion dash-spv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,13 +232,19 @@ pub enum SyncError {
/// Masternode sync failed (QRInfo or MnListDiff processing error)
#[error("Masternode sync failed: {0}")]
MasternodeSyncFailed(String),

/// Operation requires the client to be fully synced
#[error("Client is not synced")]
NotSynced,
}

impl SyncError {
/// Returns a static string representing the error category based on the variant
pub fn category(&self) -> &'static str {
match self {
SyncError::SyncInProgress(_) | SyncError::InvalidState(_) => "state",
SyncError::SyncInProgress(_) | SyncError::InvalidState(_) | SyncError::NotSynced => {
"state"
}
SyncError::Timeout(_) => "timeout",
SyncError::Validation(_) => "validation",
SyncError::MissingDependency(_) => "dependency",
Expand Down Expand Up @@ -339,6 +345,7 @@ mod tests {
assert_eq!(SyncError::SyncInProgress(ManagerIdentifier::BlockHeader).category(), "state");
assert_eq!(SyncError::InvalidState("test".to_string()).category(), "state");
assert_eq!(SyncError::MissingDependency("test".to_string()).category(), "dependency");
assert_eq!(SyncError::NotSynced.category(), "state");

// Test deprecated SyncFailed always returns "unknown"
#[allow(deprecated)]
Expand Down
8 changes: 7 additions & 1 deletion dash-spv/src/network/manager.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Peer network manager for SPV client

use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -1329,6 +1329,12 @@ impl NetworkManager for PeerNetworkManager {
Ok(())
}

async fn dispatch_local(&self, message: NetworkMessage) {
let local_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0));
let msg = Message::new(local_addr, message);
self.message_dispatcher.lock().await.dispatch(&msg);
}

async fn disconnect_peer(&self, addr: &SocketAddr, reason: &str) -> NetworkResult<()> {
PeerNetworkManager::disconnect_peer(self, addr, reason)
.await
Expand Down
6 changes: 6 additions & 0 deletions dash-spv/src/network/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,12 @@ pub trait NetworkManager: Send + Sync + 'static {
/// Broadcast a message to all connected peers.
async fn broadcast(&self, _message: NetworkMessage) -> NetworkResult<()>;

/// Inject a message into the local message dispatcher as if received from a peer.
///
/// Used for locally-originated messages (e.g., self-broadcast transactions) that
/// should be processed through the same pipeline as peer-received messages.
async fn dispatch_local(&self, message: NetworkMessage);

/// Disconnect a specific peer by address.
async fn disconnect_peer(&self, _addr: &SocketAddr, _reason: &str) -> NetworkResult<()>;

Expand Down
117 changes: 108 additions & 9 deletions dash-spv/src/sync/mempool/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,9 +296,27 @@ impl<W: WalletInterface> MempoolManager<W> {
}

/// Handle a received transaction.
pub(super) async fn handle_tx(&mut self, tx: Transaction) -> SyncResult<Vec<SyncEvent>> {
///
/// When `peer` is the local sentinel address (`0.0.0.0:0`), the transaction
/// is treated as self-originated and recorded in `recent_sends`.
pub(super) async fn handle_tx(
&mut self,
tx: Transaction,
peer: SocketAddr,
) -> SyncResult<Vec<SyncEvent>> {
let txid = tx.txid();
self.pending_requests.remove(&txid);
let is_local = peer.ip().is_unspecified();

// Skip if already tracked (e.g., locally broadcast then received from a peer)
if self.mempool_state.read().await.transactions.contains_key(&txid) {
self.seen_txids.insert(txid, Instant::now());
if is_local {
self.mempool_state.write().await.record_send(txid);
}
return Ok(vec![]);
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

self.seen_txids.insert(txid, Instant::now());
self.progress.add_received(1);

Expand Down Expand Up @@ -331,6 +349,9 @@ impl<W: WalletInterface> MempoolManager<W> {
{
let mut state = self.mempool_state.write().await;
state.add_transaction(unconfirmed_tx);
if is_local {
state.record_send(txid);
}
self.progress.set_tracked(state.transactions.len() as u32);
}

Expand All @@ -345,6 +366,7 @@ impl<W: WalletInterface> MempoolManager<W> {
let mut state = self.mempool_state.write().await;
for txid in txids {
if state.remove_transaction(txid).is_some() {
state.recent_sends.remove(txid);
removed.push(*txid);
}
}
Expand All @@ -365,6 +387,7 @@ impl<W: WalletInterface> MempoolManager<W> {
let mut state = self.mempool_state.write().await;
let instant_lock_opt = if let Some(tx) = state.transactions.get_mut(&txid) {
tx.is_instant_send = true;
state.recent_sends.remove(&txid);
tracing::debug!("Marked mempool tx {} as InstantSend-locked", txid);
Some(instant_lock)
} else if self.pending_is_locks.len() < MAX_PENDING_IS_LOCKS {
Expand Down Expand Up @@ -712,7 +735,7 @@ mod tests {
};
let txid = tx.txid();

let events = manager.handle_tx(tx).await.unwrap();
let events = manager.handle_tx(tx, test_socket_address(1)).await.unwrap();
// MockWallet returns is_relevant=false by default
assert!(events.is_empty());
assert_eq!(manager.progress.received(), 1);
Expand Down Expand Up @@ -831,7 +854,7 @@ mod tests {
};
let txid = tx.txid();

let events = manager.handle_tx(tx).await.unwrap();
let events = manager.handle_tx(tx, test_socket_address(1)).await.unwrap();
assert!(events.is_empty());

// Verify transaction was stored in mempool state
Expand All @@ -840,6 +863,72 @@ mod tests {
assert_eq!(manager.progress.received(), 1);
assert_eq!(manager.progress.relevant(), 1);
assert_eq!(manager.progress.tracked(), 1);
drop(state);

// Processing the same transaction again should be a no-op (dedup guard)
let tx2 = Transaction {
version: 1,
lock_time: 0,
input: vec![],
output: vec![],
special_transaction_payload: None,
};
let events = manager.handle_tx(tx2, test_socket_address(1)).await.unwrap();
assert!(events.is_empty());

let state = manager.mempool_state.read().await;
assert_eq!(state.transactions.len(), 1);
// Progress counters should not have incremented
assert_eq!(manager.progress.received(), 1);
assert_eq!(manager.progress.relevant(), 1);
}

#[tokio::test]
async fn test_handle_tx_local_records_send() {
let (mut manager, _requests, _wallet) = create_relevant_manager();

let tx = Transaction {
version: 2,
lock_time: 0,
input: vec![],
output: vec![],
special_transaction_payload: None,
};
let txid = tx.txid();

// Use the unspecified address to simulate a locally broadcast transaction
let local_addr = SocketAddr::from(([0, 0, 0, 0], 0));
manager.handle_tx(tx, local_addr).await.unwrap();

let state = manager.mempool_state.read().await;
assert!(state.transactions.contains_key(&txid));
assert!(
state.is_recent_send(&txid, Duration::from_secs(10)),
"locally dispatched transaction should be recorded as a recent send"
);
}

#[tokio::test]
async fn test_handle_tx_remote_does_not_record_send() {
let (mut manager, _requests, _wallet) = create_relevant_manager();

let tx = Transaction {
version: 3,
lock_time: 0,
input: vec![],
output: vec![],
special_transaction_payload: None,
};
let txid = tx.txid();

manager.handle_tx(tx, test_socket_address(1)).await.unwrap();

let state = manager.mempool_state.read().await;
assert!(state.transactions.contains_key(&txid));
assert!(
!state.is_recent_send(&txid, Duration::from_secs(10)),
"peer-received transaction should not be recorded as a recent send"
);
}

#[tokio::test]
Expand All @@ -859,7 +948,7 @@ mod tests {
manager.pending_requests.insert(txid, Instant::now());
assert!(manager.pending_requests.contains_key(&txid));

manager.handle_tx(tx).await.unwrap();
manager.handle_tx(tx, test_socket_address(1)).await.unwrap();
// Pending request should be cleared regardless of relevance
assert!(!manager.pending_requests.contains_key(&txid));

Expand Down Expand Up @@ -933,13 +1022,18 @@ mod tests {
Vec::new(),
0,
));
state.record_send(txid);
}

manager.process_instant_send(dummy_instant_lock(txid)).await;

// Verify mempool state also reflects IS flag
// Verify mempool state reflects IS flag and recent_sends is cleaned up
let state = manager.mempool_state.read().await;
assert!(state.transactions.get(&txid).unwrap().is_instant_send);
assert!(
!state.recent_sends.contains_key(&txid),
"IS-locked transaction should be removed from recent_sends"
);
drop(state);

let wallet = manager.wallet.read().await;
Expand Down Expand Up @@ -1127,7 +1221,7 @@ mod tests {
assert!(manager.pending_is_locks.contains_key(&txid));

// Transaction arrives
manager.handle_tx(tx).await.unwrap();
manager.handle_tx(tx, test_socket_address(1)).await.unwrap();

// Pending IS lock consumed
assert!(manager.pending_is_locks.is_empty());
Expand Down Expand Up @@ -1167,7 +1261,7 @@ mod tests {
assert!(manager.pending_is_locks.contains_key(&txid));

// Transaction arrives but wallet says it's not relevant
manager.handle_tx(tx).await.unwrap();
manager.handle_tx(tx, test_socket_address(1)).await.unwrap();

// Pending IS lock cleaned up (no leak)
assert!(manager.pending_is_locks.is_empty());
Expand Down Expand Up @@ -1349,7 +1443,7 @@ mod tests {
w.set_mempool_addresses(vec![addr.clone()]);
}

manager.handle_tx(tx).await.unwrap();
manager.handle_tx(tx, test_socket_address(1)).await.unwrap();

let state = manager.mempool_state.read().await;
let stored = state.transactions.get(&txid).unwrap();
Expand Down Expand Up @@ -1378,7 +1472,7 @@ mod tests {
w.set_mempool_net_amount(-30000);
}

manager.handle_tx(tx).await.unwrap();
manager.handle_tx(tx, test_socket_address(1)).await.unwrap();

let state = manager.mempool_state.read().await;
let stored = state.transactions.get(&txid).unwrap();
Expand Down Expand Up @@ -1491,6 +1585,9 @@ mod tests {
));
}
assert_eq!(state.transactions.len(), 3);
// Mark two as recent sends
state.record_send(txids[0]);
state.record_send(txids[1]);
}

// Remove 2 of the 3 transactions
Expand All @@ -1499,6 +1596,8 @@ mod tests {
let state = manager.mempool_state.read().await;
assert_eq!(state.transactions.len(), 1);
assert!(state.transactions.contains_key(&txids[2]));
assert!(!state.recent_sends.contains_key(&txids[0]));
assert!(!state.recent_sends.contains_key(&txids[1]));
drop(state);

assert_eq!(manager.progress.removed(), 2);
Expand Down
4 changes: 2 additions & 2 deletions dash-spv/src/sync/mempool/sync_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl<W: WalletInterface + 'static> SyncManager for MempoolManager<W> {
) -> SyncResult<Vec<SyncEvent>> {
match msg.inner() {
NetworkMessage::Inv(inv) => self.handle_inv(inv, msg.peer_address(), requests).await,
NetworkMessage::Tx(tx) => self.handle_tx(tx.clone()).await,
NetworkMessage::Tx(tx) => self.handle_tx(tx.clone(), msg.peer_address()).await,
_ => Ok(vec![]),
}
}
Expand Down Expand Up @@ -816,7 +816,7 @@ mod tests {
output: vec![],
special_transaction_payload: None,
};
manager.handle_tx(tx).await.unwrap();
manager.handle_tx(tx, test_socket_address(1)).await.unwrap();

let has_filter_load = std::iter::from_fn(|| rx.try_recv().ok()).any(|msg| {
matches!(msg, NetworkRequest::SendMessageToPeer(NetworkMessage::FilterLoad(_), _))
Expand Down
Loading
Loading