From 1c2b2f613de431889b91d3d5f6a0a63121b1a826 Mon Sep 17 00:00:00 2001 From: Camillarhi Date: Tue, 9 Dec 2025 19:26:51 +0100 Subject: [PATCH 1/5] refactor: Extract payment creation logic into `create_payment_from_tx` --- src/wallet/mod.rs | 81 ++++++++++++++++++++++++++--------------------- 1 file changed, 45 insertions(+), 36 deletions(-) diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 5fd7b3d8e..0540c5a0a 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -192,45 +192,14 @@ impl Wallet { (PaymentStatus::Pending, ConfirmationStatus::Unconfirmed) }, }; - // TODO: It would be great to introduce additional variants for - // `ChannelFunding` and `ChannelClosing`. For the former, we could just - // take a reference to `ChannelManager` here and check against - // `list_channels`. But for the latter the best approach is much less - // clear: for force-closes/HTLC spends we should be good querying - // `OutputSweeper::tracked_spendable_outputs`, but regular channel closes - // (i.e., `SpendableOutputDescriptor::StaticOutput` variants) are directly - // spent to a wallet address. The only solution I can come up with is to - // create and persist a list of 'static pending outputs' that we could use - // here to determine the `PaymentKind`, but that's not really satisfactory, so - // we're punting on it until we can come up with a better solution. - let kind = crate::payment::PaymentKind::Onchain { txid, status: confirmation_status }; - let fee = locked_wallet.calculate_fee(&wtx.tx_node.tx).unwrap_or(Amount::ZERO); - let (sent, received) = locked_wallet.sent_and_received(&wtx.tx_node.tx); - let (direction, amount_msat) = if sent > received { - let direction = PaymentDirection::Outbound; - let amount_msat = Some( - sent.to_sat().saturating_sub(fee.to_sat()).saturating_sub(received.to_sat()) - * 1000, - ); - (direction, amount_msat) - } else { - let direction = PaymentDirection::Inbound; - let amount_msat = Some( - received.to_sat().saturating_sub(sent.to_sat().saturating_sub(fee.to_sat())) - * 1000, - ); - (direction, amount_msat) - }; - let fee_paid_msat = Some(fee.to_sat() * 1000); - - let payment = PaymentDetails::new( + let payment = self.create_payment_from_tx( + locked_wallet, + txid, id, - kind, - amount_msat, - fee_paid_msat, - direction, + &wtx.tx_node.tx, payment_status, + confirmation_status, ); self.payment_store.insert_or_update(payment)?; @@ -806,6 +775,46 @@ impl Wallet { Ok(tx) } + + fn create_payment_from_tx( + &self, locked_wallet: &PersistedWallet, txid: Txid, + payment_id: PaymentId, tx: &Transaction, payment_status: PaymentStatus, + confirmation_status: ConfirmationStatus, + ) -> PaymentDetails { + // TODO: It would be great to introduce additional variants for + // `ChannelFunding` and `ChannelClosing`. For the former, we could just + // take a reference to `ChannelManager` here and check against + // `list_channels`. But for the latter the best approach is much less + // clear: for force-closes/HTLC spends we should be good querying + // `OutputSweeper::tracked_spendable_outputs`, but regular channel closes + // (i.e., `SpendableOutputDescriptor::StaticOutput` variants) are directly + // spent to a wallet address. The only solution I can come up with is to + // create and persist a list of 'static pending outputs' that we could use + // here to determine the `PaymentKind`, but that's not really satisfactory, so + // we're punting on it until we can come up with a better solution. + + let kind = crate::payment::PaymentKind::Onchain { txid, status: confirmation_status }; + + let fee = locked_wallet.calculate_fee(tx).unwrap_or(Amount::ZERO); + let (sent, received) = locked_wallet.sent_and_received(tx); + let (direction, amount_msat) = if sent > received { + let direction = PaymentDirection::Outbound; + let amount_msat = Some( + sent.to_sat().saturating_sub(fee.to_sat()).saturating_sub(received.to_sat()) * 1000, + ); + (direction, amount_msat) + } else { + let direction = PaymentDirection::Inbound; + let amount_msat = Some( + received.to_sat().saturating_sub(sent.to_sat().saturating_sub(fee.to_sat())) * 1000, + ); + (direction, amount_msat) + }; + + let fee_paid_msat = Some(fee.to_sat() * 1000); + + PaymentDetails::new(payment_id, kind, amount_msat, fee_paid_msat, direction, payment_status) + } } impl Listen for Wallet { From 5f9141da60d764017697f1596572d478876f2f03 Mon Sep 17 00:00:00 2001 From: Camillarhi Date: Tue, 9 Dec 2025 15:44:43 +0100 Subject: [PATCH 2/5] Bump BDK_Wallet to 2.3.0 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 09ae4b03a..d5416d9f6 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,7 +54,7 @@ lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", bdk_chain = { version = "0.23.0", default-features = false, features = ["std"] } bdk_esplora = { version = "0.22.0", default-features = false, features = ["async-https-rustls", "tokio"]} bdk_electrum = { version = "0.23.0", default-features = false, features = ["use-rustls-ring"]} -bdk_wallet = { version = "2.2.0", default-features = false, features = ["std", "keys-bip39"]} +bdk_wallet = { version = "2.3.0", default-features = false, features = ["std", "keys-bip39"]} reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } rustls = { version = "0.23", default-features = false } From 281f6a413affdac4f6737e9130ef31b15f155202 Mon Sep 17 00:00:00 2001 From: Camillarhi Date: Tue, 9 Dec 2025 15:45:21 +0100 Subject: [PATCH 3/5] Use BDK events in `update_payment_store` instead of scanning all transactions Replace the full transaction list scan in `update_payment_store` with handling of BDK's `WalletEvent` stream during sync. This leverages the new events in BDK 2.2, reduces redundant work, and prepares the foundation for reliable RBF/CPFP tracking via `WalletEvent::TxReplaced` --- bindings/ldk_node.udl | 2 +- src/payment/store.rs | 23 ++- src/wallet/mod.rs | 253 ++++++++++++++++++++++++++------ tests/integration_tests_rust.rs | 4 +- 4 files changed, 233 insertions(+), 49 deletions(-) diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index c881dbe09..a323bbc7d 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -450,7 +450,7 @@ interface ClosureReason { [Enum] interface PaymentKind { - Onchain(Txid txid, ConfirmationStatus status); + Onchain(Txid txid, ConfirmationStatus status, sequence conflicting_txids); Bolt11(PaymentHash hash, PaymentPreimage? preimage, PaymentSecret? secret); Bolt11Jit(PaymentHash hash, PaymentPreimage? preimage, PaymentSecret? secret, u64? counterparty_skimmed_fee_msat, LSPFeeLimits lsp_fee_limits); Bolt12Offer(PaymentHash? hash, PaymentPreimage? preimage, PaymentSecret? secret, OfferId offer_id, UntrustedString? payer_note, u64? quantity); diff --git a/src/payment/store.rs b/src/payment/store.rs index 15e94190c..f4ceab4bb 100644 --- a/src/payment/store.rs +++ b/src/payment/store.rs @@ -291,6 +291,15 @@ impl StorableObject for PaymentDetails { } } + if let Some(conflicting_txids_opt) = &update.conflicting_txids { + match self.kind { + PaymentKind::Onchain { ref mut conflicting_txids, .. } => { + update_if_necessary!(*conflicting_txids, conflicting_txids_opt.to_vec()); + }, + _ => {}, + } + } + if updated { self.latest_update_timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -351,6 +360,8 @@ pub enum PaymentKind { txid: Txid, /// The confirmation status of this payment. status: ConfirmationStatus, + /// Transaction IDs that have replaced or conflict with this payment. + conflicting_txids: Vec, }, /// A [BOLT 11] payment. /// @@ -448,6 +459,7 @@ pub enum PaymentKind { impl_writeable_tlv_based_enum!(PaymentKind, (0, Onchain) => { (0, txid, required), + (1, conflicting_txids, optional_vec), (2, status, required), }, (2, Bolt11) => { @@ -540,6 +552,7 @@ pub(crate) struct PaymentDetailsUpdate { pub direction: Option, pub status: Option, pub confirmation_status: Option, + pub conflicting_txids: Option>, } impl PaymentDetailsUpdate { @@ -555,6 +568,7 @@ impl PaymentDetailsUpdate { direction: None, status: None, confirmation_status: None, + conflicting_txids: None, } } } @@ -570,9 +584,11 @@ impl From<&PaymentDetails> for PaymentDetailsUpdate { _ => (None, None, None), }; - let confirmation_status = match value.kind { - PaymentKind::Onchain { status, .. } => Some(status), - _ => None, + let (confirmation_status, conflicting_txids) = match &value.kind { + PaymentKind::Onchain { status, conflicting_txids, .. } => { + (Some(*status), conflicting_txids.clone()) + }, + _ => (None, Vec::new()), }; let counterparty_skimmed_fee_msat = match value.kind { @@ -593,6 +609,7 @@ impl From<&PaymentDetails> for PaymentDetailsUpdate { direction: Some(value.direction), status: Some(value.status), confirmation_status, + conflicting_txids: Some(conflicting_txids), } } } diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 0540c5a0a..d7ea83545 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -12,6 +12,7 @@ use std::sync::{Arc, Mutex}; use bdk_chain::spk_client::{FullScanRequest, SyncRequest}; use bdk_wallet::descriptor::ExtendedDescriptor; +use bdk_wallet::event::WalletEvent; #[allow(deprecated)] use bdk_wallet::SignOptions; use bdk_wallet::{Balance, KeychainKind, PersistedWallet, Update}; @@ -49,7 +50,7 @@ use crate::config::Config; use crate::fee_estimator::{ConfirmationTarget, FeeEstimator, OnchainFeeEstimator}; use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; use crate::payment::store::ConfirmationStatus; -use crate::payment::{PaymentDetails, PaymentDirection, PaymentStatus}; +use crate::payment::{PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus}; use crate::types::{Broadcaster, PaymentStore}; use crate::Error; @@ -114,15 +115,15 @@ impl Wallet { pub(crate) fn apply_update(&self, update: impl Into) -> Result<(), Error> { let mut locked_wallet = self.inner.lock().unwrap(); - match locked_wallet.apply_update(update) { - Ok(()) => { + match locked_wallet.apply_update_events(update) { + Ok(events) => { let mut locked_persister = self.persister.lock().unwrap(); locked_wallet.persist(&mut locked_persister).map_err(|e| { log_error!(self.logger, "Failed to persist wallet: {}", e); Error::PersistenceFailed })?; - self.update_payment_store(&mut *locked_wallet).map_err(|e| { + self.update_payment_store(&mut *locked_wallet, events).map_err(|e| { log_error!(self.logger, "Failed to update payment store: {}", e); Error::PersistenceFailed })?; @@ -167,42 +168,161 @@ impl Wallet { fn update_payment_store<'a>( &self, locked_wallet: &'a mut PersistedWallet, + mut events: Vec, ) -> Result<(), Error> { - for wtx in locked_wallet.transactions() { - let id = PaymentId(wtx.tx_node.txid.to_byte_array()); - let txid = wtx.tx_node.txid; - let (payment_status, confirmation_status) = match wtx.chain_position { - bdk_chain::ChainPosition::Confirmed { anchor, .. } => { - let confirmation_height = anchor.block_id.height; + if events.is_empty() { + return Ok(()); + } + + if events.len() > 1 { + events.sort_by_key(|e| match e { + WalletEvent::TxReplaced { .. } => 0, + WalletEvent::TxUnconfirmed { .. } => 1, + WalletEvent::TxConfirmed { .. } => 2, + WalletEvent::ChainTipChanged { .. } => 3, + WalletEvent::TxDropped { .. } => 4, + _ => 5, + }); + } + + for event in events { + match event { + WalletEvent::TxConfirmed { txid, tx, block_time, .. } => { let cur_height = locked_wallet.latest_checkpoint().height(); + let confirmation_height = block_time.block_id.height; let payment_status = if cur_height >= confirmation_height + ANTI_REORG_DELAY - 1 { PaymentStatus::Succeeded } else { PaymentStatus::Pending }; + let confirmation_status = ConfirmationStatus::Confirmed { - block_hash: anchor.block_id.hash, + block_hash: block_time.block_id.hash, height: confirmation_height, - timestamp: anchor.confirmation_time, + timestamp: block_time.confirmation_time, }; - (payment_status, confirmation_status) + + let payment_id = self + .find_payment_by_txid(txid) + .unwrap_or_else(|| PaymentId(txid.to_byte_array())); + + let payment = self.create_payment_from_tx( + locked_wallet, + txid, + payment_id, + &tx, + payment_status, + confirmation_status, + None, + ); + self.payment_store.insert_or_update(payment)?; + }, + WalletEvent::ChainTipChanged { new_tip, .. } => { + // Get all payments that are Pending with Confirmed status + let pending_payments: Vec = + self.payment_store.list_filter(|p| { + p.status == PaymentStatus::Pending + && matches!( + p.kind, + crate::payment::PaymentKind::Onchain { + status: ConfirmationStatus::Confirmed { .. }, + .. + } + ) + }); + + for mut payment in pending_payments { + if let crate::payment::PaymentKind::Onchain { + status: ConfirmationStatus::Confirmed { height, .. }, + .. + } = payment.kind + { + if new_tip.height >= height + ANTI_REORG_DELAY - 1 { + payment.status = PaymentStatus::Succeeded; + self.payment_store.insert_or_update(payment)?; + } + } + } + }, + WalletEvent::TxUnconfirmed { txid, tx, old_block_time: None } => { + let payment_id = self + .find_payment_by_txid(txid) + .unwrap_or_else(|| PaymentId(txid.to_byte_array())); + + let payment = self.create_payment_from_tx( + locked_wallet, + txid, + payment_id, + &tx, + PaymentStatus::Pending, + ConfirmationStatus::Unconfirmed, + None, + ); + self.payment_store.insert_or_update(payment)?; + }, + WalletEvent::TxReplaced { txid, tx, conflicts } => { + let payment_id = self + .find_payment_by_txid(txid) + .unwrap_or_else(|| PaymentId(txid.to_byte_array())); + + if let Some(mut payment) = self.payment_store.get(&payment_id) { + if let PaymentKind::Onchain { + ref mut conflicting_txids, + txid: current_txid, + .. + } = payment.kind + { + let existing_set: std::collections::HashSet<_> = + conflicting_txids.iter().collect(); + + let new_conflicts: Vec<_> = conflicts + .iter() + .map(|(_, conflict_txid)| *conflict_txid) + .filter(|conflict_txid| { + *conflict_txid != current_txid + && !existing_set.contains(conflict_txid) + }) + .collect(); + + conflicting_txids.extend(new_conflicts); + } + self.payment_store.insert_or_update(payment)?; + } else { + let conflicting_txids = + Some(conflicts.iter().map(|(_, txid)| *txid).collect()); + + let payment = self.create_payment_from_tx( + locked_wallet, + txid, + payment_id, + &tx, + PaymentStatus::Pending, + ConfirmationStatus::Unconfirmed, + conflicting_txids, + ); + self.payment_store.insert_or_update(payment)?; + } + }, + WalletEvent::TxDropped { txid, tx } => { + let payment_id = self + .find_payment_by_txid(txid) + .unwrap_or_else(|| PaymentId(txid.to_byte_array())); + let payment = self.create_payment_from_tx( + locked_wallet, + txid, + payment_id, + &tx, + PaymentStatus::Pending, + ConfirmationStatus::Unconfirmed, + None, + ); + self.payment_store.insert_or_update(payment)?; }, - bdk_chain::ChainPosition::Unconfirmed { .. } => { - (PaymentStatus::Pending, ConfirmationStatus::Unconfirmed) + _ => { + continue; }, }; - - let payment = self.create_payment_from_tx( - locked_wallet, - txid, - id, - &wtx.tx_node.tx, - payment_status, - confirmation_status, - ); - - self.payment_store.insert_or_update(payment)?; } Ok(()) @@ -779,7 +899,7 @@ impl Wallet { fn create_payment_from_tx( &self, locked_wallet: &PersistedWallet, txid: Txid, payment_id: PaymentId, tx: &Transaction, payment_status: PaymentStatus, - confirmation_status: ConfirmationStatus, + confirmation_status: ConfirmationStatus, conflicting_txids: Option>, ) -> PaymentDetails { // TODO: It would be great to introduce additional variants for // `ChannelFunding` and `ChannelClosing`. For the former, we could just @@ -793,27 +913,74 @@ impl Wallet { // here to determine the `PaymentKind`, but that's not really satisfactory, so // we're punting on it until we can come up with a better solution. - let kind = crate::payment::PaymentKind::Onchain { txid, status: confirmation_status }; + let existing_payment = self.payment_store.get(&payment_id); + let final_conflicting_txids = if let Some(provided_conflicts) = conflicting_txids { + provided_conflicts + } else if let Some(payment) = &existing_payment { + if let PaymentKind::Onchain { conflicting_txids: existing_conflicts, .. } = + &payment.kind + { + existing_conflicts.clone() + } else { + Vec::new() + } + } else { + Vec::new() + }; + + let kind = crate::payment::PaymentKind::Onchain { + txid, + status: confirmation_status, + conflicting_txids: final_conflicting_txids, + }; let fee = locked_wallet.calculate_fee(tx).unwrap_or(Amount::ZERO); let (sent, received) = locked_wallet.sent_and_received(tx); + let fee_sat = fee.to_sat(); + let (direction, amount_msat) = if sent > received { - let direction = PaymentDirection::Outbound; - let amount_msat = Some( - sent.to_sat().saturating_sub(fee.to_sat()).saturating_sub(received.to_sat()) * 1000, - ); - (direction, amount_msat) + ( + PaymentDirection::Outbound, + Some( + (sent.to_sat().saturating_sub(fee_sat).saturating_sub(received.to_sat())) + * 1000, + ), + ) } else { - let direction = PaymentDirection::Inbound; - let amount_msat = Some( - received.to_sat().saturating_sub(sent.to_sat().saturating_sub(fee.to_sat())) * 1000, - ); - (direction, amount_msat) + ( + PaymentDirection::Inbound, + Some( + received.to_sat().saturating_sub(sent.to_sat().saturating_sub(fee_sat)) * 1000, + ), + ) }; - let fee_paid_msat = Some(fee.to_sat() * 1000); + PaymentDetails::new( + payment_id, + kind, + amount_msat, + Some(fee_sat * 1000), + direction, + payment_status, + ) + } - PaymentDetails::new(payment_id, kind, amount_msat, fee_paid_msat, direction, payment_status) + fn find_payment_by_txid(&self, target_txid: Txid) -> Option { + let direct_payment_id = PaymentId(target_txid.to_byte_array()); + if self.payment_store.get(&direct_payment_id).is_some() { + return Some(direct_payment_id); + } + + self.payment_store + .list_filter(|p| { + if let PaymentKind::Onchain { txid, conflicting_txids, .. } = &p.kind { + *txid == target_txid || conflicting_txids.contains(&target_txid) + } else { + false + } + }) + .first() + .map(|p| p.id) } } @@ -843,9 +1010,9 @@ impl Listen for Wallet { ); } - match locked_wallet.apply_block(block, height) { - Ok(()) => { - if let Err(e) = self.update_payment_store(&mut *locked_wallet) { + match locked_wallet.apply_block_events(block, height) { + Ok(events) => { + if let Err(e) = self.update_payment_store(&mut *locked_wallet, events) { log_error!(self.logger, "Failed to update payment store: {}", e); return; } diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 4e94dd044..09afe1e61 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -455,7 +455,7 @@ async fn onchain_send_receive() { let payment_a = node_a.payment(&payment_id).unwrap(); match payment_a.kind { - PaymentKind::Onchain { txid: _txid, status } => { + PaymentKind::Onchain { txid: _txid, status, .. } => { assert_eq!(_txid, txid); assert!(matches!(status, ConfirmationStatus::Confirmed { .. })); }, @@ -464,7 +464,7 @@ async fn onchain_send_receive() { let payment_b = node_a.payment(&payment_id).unwrap(); match payment_b.kind { - PaymentKind::Onchain { txid: _txid, status } => { + PaymentKind::Onchain { txid: _txid, status, .. } => { assert_eq!(_txid, txid); assert!(matches!(status, ConfirmationStatus::Confirmed { .. })); }, From 4a17b6ff532d8a9b5919902857aba6f1d2a77e1f Mon Sep 17 00:00:00 2001 From: Camillarhi Date: Tue, 9 Dec 2025 17:52:29 +0100 Subject: [PATCH 4/5] Add `contains_key` method to DataStore --- src/data_store.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/data_store.rs b/src/data_store.rs index d295ece51..ff09d9902 100644 --- a/src/data_store.rs +++ b/src/data_store.rs @@ -167,6 +167,10 @@ where })?; Ok(()) } + + pub(crate) fn contains_key(&self, id: &SO::Id) -> bool { + self.objects.lock().unwrap().contains_key(id) + } } #[cfg(test)] From a677335052feb16947f639755f56c0eb0257290c Mon Sep 17 00:00:00 2001 From: Camillarhi Date: Wed, 5 Nov 2025 23:56:52 +0100 Subject: [PATCH 5/5] Add RBF replacement tracking with new persisted lookup table Introduce a new lookup `ReplacedTransactionStore` that maps old/replaced transaction IDs to their current replacement transaction IDs, enabling reliable tracking of replaced transactions throughout the replacement chain. Key changes: - Add persisted storage for RBF replacement relationships - Link transactions in replacement trees using payment IDs - Remove entire replacement chains from persistence when any transaction in the tree is confirmed --- bindings/ldk_node.udl | 2 +- src/builder.rs | 26 ++++- src/io/mod.rs | 4 + src/io/utils.rs | 78 +++++++++++++ src/payment/mod.rs | 2 + src/payment/pending_payment_store.rs | 96 ++++++++++++++++ src/payment/store.rs | 23 +--- src/types.rs | 4 +- src/wallet/mod.rs | 166 +++++++++++++-------------- tests/integration_tests_rust.rs | 4 +- 10 files changed, 293 insertions(+), 112 deletions(-) create mode 100644 src/payment/pending_payment_store.rs diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index a323bbc7d..c881dbe09 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -450,7 +450,7 @@ interface ClosureReason { [Enum] interface PaymentKind { - Onchain(Txid txid, ConfirmationStatus status, sequence conflicting_txids); + Onchain(Txid txid, ConfirmationStatus status); Bolt11(PaymentHash hash, PaymentPreimage? preimage, PaymentSecret? secret); Bolt11Jit(PaymentHash hash, PaymentPreimage? preimage, PaymentSecret? secret, u64? counterparty_skimmed_fee_msat, LSPFeeLimits lsp_fee_limits); Bolt12Offer(PaymentHash? hash, PaymentPreimage? preimage, PaymentSecret? secret, OfferId offer_id, UntrustedString? payer_note, u64? quantity); diff --git a/src/builder.rs b/src/builder.rs index 510d86bdd..9965a0110 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -55,12 +55,14 @@ use crate::gossip::GossipSource; use crate::io::sqlite_store::SqliteStore; use crate::io::utils::{ read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph, - read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_scorer, - write_node_metrics, + read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_pending_payments, + read_scorer, write_node_metrics, }; use crate::io::vss_store::VssStoreBuilder; use crate::io::{ self, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, }; use crate::liquidity::{ LSPS1ClientConfig, LSPS2ClientConfig, LSPS2ServiceConfig, LiquiditySourceBuilder, @@ -73,7 +75,8 @@ use crate::runtime::Runtime; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph, KeysManager, - MessageRouter, OnionMessenger, PaymentStore, PeerManager, Persister, SyncAndAsyncKVStore, + MessageRouter, OnionMessenger, PaymentStore, PeerManager, PendingPaymentStore, Persister, + SyncAndAsyncKVStore, }; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; @@ -1235,6 +1238,22 @@ fn build_with_store_internal( }, }; + let pending_payment_store = match runtime + .block_on(async { read_pending_payments(&*kv_store, Arc::clone(&logger)).await }) + { + Ok(pending_payments) => Arc::new(PendingPaymentStore::new( + pending_payments, + PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read pending payment data from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; + let wallet = Arc::new(Wallet::new( bdk_wallet, wallet_persister, @@ -1243,6 +1262,7 @@ fn build_with_store_internal( Arc::clone(&payment_store), Arc::clone(&config), Arc::clone(&logger), + Arc::clone(&pending_payment_store), )); // Initialize the KeysManager diff --git a/src/io/mod.rs b/src/io/mod.rs index 7afd5bd40..e080d39f7 100644 --- a/src/io/mod.rs +++ b/src/io/mod.rs @@ -78,3 +78,7 @@ pub(crate) const BDK_WALLET_INDEXER_KEY: &str = "indexer"; /// /// [`StaticInvoice`]: lightning::offers::static_invoice::StaticInvoice pub(crate) const STATIC_INVOICE_STORE_PRIMARY_NAMESPACE: &str = "static_invoices"; + +/// The pending payment information will be persisted under this prefix. +pub(crate) const PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "pending_payments"; +pub(crate) const PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; diff --git a/src/io/utils.rs b/src/io/utils.rs index 4ddc03b07..d2f70377b 100644 --- a/src/io/utils.rs +++ b/src/io/utils.rs @@ -46,6 +46,7 @@ use crate::io::{ NODE_METRICS_KEY, NODE_METRICS_PRIMARY_NAMESPACE, NODE_METRICS_SECONDARY_NAMESPACE, }; use crate::logger::{log_error, LdkLogger, Logger}; +use crate::payment::PendingPaymentDetails; use crate::peer_store::PeerStore; use crate::types::{Broadcaster, DynStore, KeysManager, Sweeper}; use crate::wallet::ser::{ChangeSetDeserWrapper, ChangeSetSerWrapper}; @@ -626,6 +627,83 @@ pub(crate) fn read_bdk_wallet_change_set( Ok(Some(change_set)) } +/// Read previously persisted pending payments information from the store. +pub(crate) async fn read_pending_payments( + kv_store: &DynStore, logger: L, +) -> Result, std::io::Error> +where + L::Target: LdkLogger, +{ + let mut res = Vec::new(); + + let mut stored_keys = KVStore::list( + &*kv_store, + PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + ) + .await?; + + const BATCH_SIZE: usize = 50; + + let mut set = tokio::task::JoinSet::new(); + + // Fill JoinSet with tasks if possible + while set.len() < BATCH_SIZE && !stored_keys.is_empty() { + if let Some(next_key) = stored_keys.pop() { + let fut = KVStore::read( + &*kv_store, + PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + &next_key, + ); + set.spawn(fut); + debug_assert!(set.len() <= BATCH_SIZE); + } + } + + while let Some(read_res) = set.join_next().await { + // Exit early if we get an IO error. + let reader = read_res + .map_err(|e| { + log_error!(logger, "Failed to read PendingPaymentDetails: {}", e); + set.abort_all(); + e + })? + .map_err(|e| { + log_error!(logger, "Failed to read PendingPaymentDetails: {}", e); + set.abort_all(); + e + })?; + + // Refill set for every finished future, if we still have something to do. + if let Some(next_key) = stored_keys.pop() { + let fut = KVStore::read( + &*kv_store, + PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, + PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE, + &next_key, + ); + set.spawn(fut); + debug_assert!(set.len() <= BATCH_SIZE); + } + + // Handle result. + let pending_payment = PendingPaymentDetails::read(&mut &*reader).map_err(|e| { + log_error!(logger, "Failed to deserialize PendingPaymentDetails: {}", e); + std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Failed to deserialize PendingPaymentDetails", + ) + })?; + res.push(pending_payment); + } + + debug_assert!(set.is_empty()); + debug_assert!(stored_keys.is_empty()); + + Ok(res) +} + #[cfg(test)] mod tests { use super::read_or_generate_seed_file; diff --git a/src/payment/mod.rs b/src/payment/mod.rs index c82f35c8f..42b5aff3b 100644 --- a/src/payment/mod.rs +++ b/src/payment/mod.rs @@ -11,6 +11,7 @@ pub(crate) mod asynchronous; mod bolt11; mod bolt12; mod onchain; +pub(crate) mod pending_payment_store; mod spontaneous; pub(crate) mod store; mod unified; @@ -18,6 +19,7 @@ mod unified; pub use bolt11::Bolt11Payment; pub use bolt12::Bolt12Payment; pub use onchain::OnchainPayment; +pub use pending_payment_store::PendingPaymentDetails; pub use spontaneous::SpontaneousPayment; pub use store::{ ConfirmationStatus, LSPFeeLimits, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, diff --git a/src/payment/pending_payment_store.rs b/src/payment/pending_payment_store.rs new file mode 100644 index 000000000..9daf5cb81 --- /dev/null +++ b/src/payment/pending_payment_store.rs @@ -0,0 +1,96 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use bitcoin::Txid; +use lightning::{impl_writeable_tlv_based, ln::channelmanager::PaymentId}; + +use crate::{ + data_store::{StorableObject, StorableObjectUpdate}, + payment::{store::PaymentDetailsUpdate, PaymentDetails}, +}; + +/// Represents a pending payment +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct PendingPaymentDetails { + /// The full payment details + pub details: PaymentDetails, + /// Cached timestamp for efficient cleanup queries + pub created_at: u64, + /// Transaction IDs that have replaced or conflict with this payment. + pub conflicting_txids: Vec, +} + +impl PendingPaymentDetails { + pub(crate) fn new(details: PaymentDetails, conflicting_txids: Vec) -> Self { + Self { created_at: details.latest_update_timestamp, details, conflicting_txids } + } + + /// Convert to finalized payment for the main payment store + pub fn into_payment_details(self) -> PaymentDetails { + self.details + } +} + +impl_writeable_tlv_based!(PendingPaymentDetails, { + (0, details, required), + (2, created_at, required), + (4, conflicting_txids, optional_vec), +}); + +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct PendingPaymentDetailsUpdate { + pub id: PaymentId, + pub payment_update: Option, + pub conflicting_txids: Option>, +} + +impl StorableObject for PendingPaymentDetails { + type Id = PaymentId; + type Update = PendingPaymentDetailsUpdate; + + fn id(&self) -> Self::Id { + self.details.id + } + + fn update(&mut self, update: &Self::Update) -> bool { + let mut updated = false; + + // Update the underlying payment details if present + if let Some(payment_update) = &update.payment_update { + updated |= self.details.update(payment_update); + } + + if let Some(new_conflicting_txids) = &update.conflicting_txids { + if &self.conflicting_txids != new_conflicting_txids { + self.conflicting_txids = new_conflicting_txids.clone(); + updated = true; + } + } + + updated + } + + fn to_update(&self) -> Self::Update { + self.into() + } +} + +impl StorableObjectUpdate for PendingPaymentDetailsUpdate { + fn id(&self) -> ::Id { + self.id + } +} + +impl From<&PendingPaymentDetails> for PendingPaymentDetailsUpdate { + fn from(value: &PendingPaymentDetails) -> Self { + Self { + id: value.id(), + payment_update: Some(value.details.to_update()), + conflicting_txids: Some(value.conflicting_txids.clone()), + } + } +} diff --git a/src/payment/store.rs b/src/payment/store.rs index f4ceab4bb..15e94190c 100644 --- a/src/payment/store.rs +++ b/src/payment/store.rs @@ -291,15 +291,6 @@ impl StorableObject for PaymentDetails { } } - if let Some(conflicting_txids_opt) = &update.conflicting_txids { - match self.kind { - PaymentKind::Onchain { ref mut conflicting_txids, .. } => { - update_if_necessary!(*conflicting_txids, conflicting_txids_opt.to_vec()); - }, - _ => {}, - } - } - if updated { self.latest_update_timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -360,8 +351,6 @@ pub enum PaymentKind { txid: Txid, /// The confirmation status of this payment. status: ConfirmationStatus, - /// Transaction IDs that have replaced or conflict with this payment. - conflicting_txids: Vec, }, /// A [BOLT 11] payment. /// @@ -459,7 +448,6 @@ pub enum PaymentKind { impl_writeable_tlv_based_enum!(PaymentKind, (0, Onchain) => { (0, txid, required), - (1, conflicting_txids, optional_vec), (2, status, required), }, (2, Bolt11) => { @@ -552,7 +540,6 @@ pub(crate) struct PaymentDetailsUpdate { pub direction: Option, pub status: Option, pub confirmation_status: Option, - pub conflicting_txids: Option>, } impl PaymentDetailsUpdate { @@ -568,7 +555,6 @@ impl PaymentDetailsUpdate { direction: None, status: None, confirmation_status: None, - conflicting_txids: None, } } } @@ -584,11 +570,9 @@ impl From<&PaymentDetails> for PaymentDetailsUpdate { _ => (None, None, None), }; - let (confirmation_status, conflicting_txids) = match &value.kind { - PaymentKind::Onchain { status, conflicting_txids, .. } => { - (Some(*status), conflicting_txids.clone()) - }, - _ => (None, Vec::new()), + let confirmation_status = match value.kind { + PaymentKind::Onchain { status, .. } => Some(status), + _ => None, }; let counterparty_skimmed_fee_msat = match value.kind { @@ -609,7 +593,6 @@ impl From<&PaymentDetails> for PaymentDetailsUpdate { direction: Some(value.direction), status: Some(value.status), confirmation_status, - conflicting_txids: Some(conflicting_txids), } } } diff --git a/src/types.rs b/src/types.rs index 2b7d3829a..14e0cc93e 100644 --- a/src/types.rs +++ b/src/types.rs @@ -38,7 +38,7 @@ use crate::fee_estimator::OnchainFeeEstimator; use crate::gossip::RuntimeSpawner; use crate::logger::Logger; use crate::message_handler::NodeCustomMessageHandler; -use crate::payment::PaymentDetails; +use crate::payment::{PaymentDetails, PendingPaymentDetails}; /// A supertrait that requires that a type implements both [`KVStore`] and [`KVStoreSync`] at the /// same time. @@ -609,3 +609,5 @@ impl From<&(u64, Vec)> for CustomTlvRecord { CustomTlvRecord { type_num: tlv.0, value: tlv.1.clone() } } } + +pub(crate) type PendingPaymentStore = DataStore>; diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index d7ea83545..05c743bd9 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -50,8 +50,10 @@ use crate::config::Config; use crate::fee_estimator::{ConfirmationTarget, FeeEstimator, OnchainFeeEstimator}; use crate::logger::{log_debug, log_error, log_info, log_trace, LdkLogger, Logger}; use crate::payment::store::ConfirmationStatus; -use crate::payment::{PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus}; -use crate::types::{Broadcaster, PaymentStore}; +use crate::payment::{ + PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus, PendingPaymentDetails, +}; +use crate::types::{Broadcaster, PaymentStore, PendingPaymentStore}; use crate::Error; pub(crate) enum OnchainSendAmount { @@ -72,6 +74,7 @@ pub(crate) struct Wallet { payment_store: Arc, config: Arc, logger: Arc, + pending_payment_store: Arc, } impl Wallet { @@ -79,11 +82,20 @@ impl Wallet { wallet: bdk_wallet::PersistedWallet, wallet_persister: KVStoreWalletPersister, broadcaster: Arc, fee_estimator: Arc, payment_store: Arc, - config: Arc, logger: Arc, + config: Arc, logger: Arc, pending_payment_store: Arc, ) -> Self { let inner = Mutex::new(wallet); let persister = Mutex::new(wallet_persister); - Self { inner, persister, broadcaster, fee_estimator, payment_store, config, logger } + Self { + inner, + persister, + broadcaster, + fee_estimator, + payment_store, + config, + logger, + pending_payment_store, + } } pub(crate) fn get_full_scan_request(&self) -> FullScanRequest { @@ -174,6 +186,15 @@ impl Wallet { return Ok(()); } + // Sort events to ensure proper sequencing for data consistency: + // 1. TXReplaced (0) before TxUnconfirmed (1) - Critical for RBF handling + // When a transaction is replaced via RBF, both events fire. Processing + // TXReplaced first stores the replaced transaction, allowing TxUnconfirmed + // to detect and skip duplicate payment record creation. + // 2. TxConfirmed (2) before ChainTipChanged (3) - Ensures height accuracy + // ChainTipChanged updates block height. Processing TxConfirmed first ensures + // it references the correct height for confirmation depth calculations. + // 3. Other events follow in deterministic order for predictable processing if events.len() > 1 { events.sort_by_key(|e| match e { WalletEvent::TxReplaced { .. } => 0, @@ -214,18 +235,22 @@ impl Wallet { &tx, payment_status, confirmation_status, - None, ); + + let pending_payment = + self.create_pending_payment_from_tx(payment.clone(), Vec::new()); + self.payment_store.insert_or_update(payment)?; + self.pending_payment_store.insert_or_update(pending_payment)?; }, WalletEvent::ChainTipChanged { new_tip, .. } => { // Get all payments that are Pending with Confirmed status - let pending_payments: Vec = - self.payment_store.list_filter(|p| { - p.status == PaymentStatus::Pending + let pending_payments: Vec = + self.pending_payment_store.list_filter(|p| { + p.details.status == PaymentStatus::Pending && matches!( - p.kind, - crate::payment::PaymentKind::Onchain { + p.details.kind, + PaymentKind::Onchain { status: ConfirmationStatus::Confirmed { .. }, .. } @@ -233,14 +258,16 @@ impl Wallet { }); for mut payment in pending_payments { - if let crate::payment::PaymentKind::Onchain { + if let PaymentKind::Onchain { status: ConfirmationStatus::Confirmed { height, .. }, .. - } = payment.kind + } = payment.details.kind { + let payment_id = payment.details.id; if new_tip.height >= height + ANTI_REORG_DELAY - 1 { - payment.status = PaymentStatus::Succeeded; - self.payment_store.insert_or_update(payment)?; + payment.details.status = PaymentStatus::Succeeded; + self.payment_store.insert_or_update(payment.details)?; + self.pending_payment_store.remove(&payment_id)?; } } } @@ -257,52 +284,33 @@ impl Wallet { &tx, PaymentStatus::Pending, ConfirmationStatus::Unconfirmed, - None, ); + let pending_payment = + self.create_pending_payment_from_tx(payment.clone(), Vec::new()); self.payment_store.insert_or_update(payment)?; + self.pending_payment_store.insert_or_update(pending_payment)?; }, - WalletEvent::TxReplaced { txid, tx, conflicts } => { + WalletEvent::TxReplaced { txid, conflicts, tx, .. } => { let payment_id = self .find_payment_by_txid(txid) .unwrap_or_else(|| PaymentId(txid.to_byte_array())); - if let Some(mut payment) = self.payment_store.get(&payment_id) { - if let PaymentKind::Onchain { - ref mut conflicting_txids, - txid: current_txid, - .. - } = payment.kind - { - let existing_set: std::collections::HashSet<_> = - conflicting_txids.iter().collect(); - - let new_conflicts: Vec<_> = conflicts - .iter() - .map(|(_, conflict_txid)| *conflict_txid) - .filter(|conflict_txid| { - *conflict_txid != current_txid - && !existing_set.contains(conflict_txid) - }) - .collect(); - - conflicting_txids.extend(new_conflicts); - } - self.payment_store.insert_or_update(payment)?; - } else { - let conflicting_txids = - Some(conflicts.iter().map(|(_, txid)| *txid).collect()); - - let payment = self.create_payment_from_tx( - locked_wallet, - txid, - payment_id, - &tx, - PaymentStatus::Pending, - ConfirmationStatus::Unconfirmed, - conflicting_txids, - ); - self.payment_store.insert_or_update(payment)?; - } + // Collect all conflict txids + let conflict_txids: Vec = + conflicts.iter().map(|(_, conflict_txid)| *conflict_txid).collect(); + + let payment = self.create_payment_from_tx( + locked_wallet, + txid, + payment_id, + &tx, + PaymentStatus::Pending, + ConfirmationStatus::Unconfirmed, + ); + let pending_payment_details = self + .create_pending_payment_from_tx(payment.clone(), conflict_txids.clone()); + + self.pending_payment_store.insert_or_update(pending_payment_details)?; }, WalletEvent::TxDropped { txid, tx } => { let payment_id = self @@ -315,9 +323,11 @@ impl Wallet { &tx, PaymentStatus::Pending, ConfirmationStatus::Unconfirmed, - None, ); + let pending_payment = + self.create_pending_payment_from_tx(payment.clone(), Vec::new()); self.payment_store.insert_or_update(payment)?; + self.pending_payment_store.insert_or_update(pending_payment)?; }, _ => { continue; @@ -899,7 +909,7 @@ impl Wallet { fn create_payment_from_tx( &self, locked_wallet: &PersistedWallet, txid: Txid, payment_id: PaymentId, tx: &Transaction, payment_status: PaymentStatus, - confirmation_status: ConfirmationStatus, conflicting_txids: Option>, + confirmation_status: ConfirmationStatus, ) -> PaymentDetails { // TODO: It would be great to introduce additional variants for // `ChannelFunding` and `ChannelClosing`. For the former, we could just @@ -913,26 +923,7 @@ impl Wallet { // here to determine the `PaymentKind`, but that's not really satisfactory, so // we're punting on it until we can come up with a better solution. - let existing_payment = self.payment_store.get(&payment_id); - let final_conflicting_txids = if let Some(provided_conflicts) = conflicting_txids { - provided_conflicts - } else if let Some(payment) = &existing_payment { - if let PaymentKind::Onchain { conflicting_txids: existing_conflicts, .. } = - &payment.kind - { - existing_conflicts.clone() - } else { - Vec::new() - } - } else { - Vec::new() - }; - - let kind = crate::payment::PaymentKind::Onchain { - txid, - status: confirmation_status, - conflicting_txids: final_conflicting_txids, - }; + let kind = PaymentKind::Onchain { txid, status: confirmation_status }; let fee = locked_wallet.calculate_fee(tx).unwrap_or(Amount::ZERO); let (sent, received) = locked_wallet.sent_and_received(tx); @@ -965,22 +956,27 @@ impl Wallet { ) } + fn create_pending_payment_from_tx( + &self, payment: PaymentDetails, conflicting_txids: Vec, + ) -> PendingPaymentDetails { + PendingPaymentDetails::new(payment, conflicting_txids) + } + fn find_payment_by_txid(&self, target_txid: Txid) -> Option { let direct_payment_id = PaymentId(target_txid.to_byte_array()); - if self.payment_store.get(&direct_payment_id).is_some() { + if self.pending_payment_store.contains_key(&direct_payment_id) { return Some(direct_payment_id); } - self.payment_store - .list_filter(|p| { - if let PaymentKind::Onchain { txid, conflicting_txids, .. } = &p.kind { - *txid == target_txid || conflicting_txids.contains(&target_txid) - } else { - false - } - }) + if let Some(replaced_details) = self + .pending_payment_store + .list_filter(|p| p.conflicting_txids.contains(&target_txid)) .first() - .map(|p| p.id) + { + return Some(replaced_details.details.id); + } + + None } } diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 09afe1e61..4e94dd044 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -455,7 +455,7 @@ async fn onchain_send_receive() { let payment_a = node_a.payment(&payment_id).unwrap(); match payment_a.kind { - PaymentKind::Onchain { txid: _txid, status, .. } => { + PaymentKind::Onchain { txid: _txid, status } => { assert_eq!(_txid, txid); assert!(matches!(status, ConfirmationStatus::Confirmed { .. })); }, @@ -464,7 +464,7 @@ async fn onchain_send_receive() { let payment_b = node_a.payment(&payment_id).unwrap(); match payment_b.kind { - PaymentKind::Onchain { txid: _txid, status, .. } => { + PaymentKind::Onchain { txid: _txid, status } => { assert_eq!(_txid, txid); assert!(matches!(status, ConfirmationStatus::Confirmed { .. })); },