Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning",
bdk_chain = { version = "0.23.0", default-features = false, features = ["std"] }
bdk_esplora = { version = "0.22.0", default-features = false, features = ["async-https-rustls", "tokio"]}
bdk_electrum = { version = "0.23.0", default-features = false, features = ["use-rustls-ring"]}
bdk_wallet = { version = "2.2.0", default-features = false, features = ["std", "keys-bip39"]}
bdk_wallet = { version = "2.3.0", default-features = false, features = ["std", "keys-bip39"]}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please make the BDK bump a dedicated commit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this has been updated


reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] }
rustls = { version = "0.23", default-features = false }
Expand Down
26 changes: 23 additions & 3 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,14 @@ use crate::gossip::GossipSource;
use crate::io::sqlite_store::SqliteStore;
use crate::io::utils::{
read_event_queue, read_external_pathfinding_scores_from_cache, read_network_graph,
read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_scorer,
write_node_metrics,
read_node_metrics, read_output_sweeper, read_payments, read_peer_info, read_pending_payments,
read_scorer, write_node_metrics,
};
use crate::io::vss_store::VssStoreBuilder;
use crate::io::{
self, PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE, PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
};
use crate::liquidity::{
LSPS1ClientConfig, LSPS2ClientConfig, LSPS2ServiceConfig, LiquiditySourceBuilder,
Expand All @@ -73,7 +75,8 @@ use crate::runtime::Runtime;
use crate::tx_broadcaster::TransactionBroadcaster;
use crate::types::{
ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph, KeysManager,
MessageRouter, OnionMessenger, PaymentStore, PeerManager, Persister, SyncAndAsyncKVStore,
MessageRouter, OnionMessenger, PaymentStore, PeerManager, PendingPaymentStore, Persister,
SyncAndAsyncKVStore,
};
use crate::wallet::persist::KVStoreWalletPersister;
use crate::wallet::Wallet;
Expand Down Expand Up @@ -1235,6 +1238,22 @@ fn build_with_store_internal(
},
};

let pending_payment_store = match runtime
.block_on(async { read_pending_payments(&*kv_store, Arc::clone(&logger)).await })
{
Ok(pending_payments) => Arc::new(PendingPaymentStore::new(
pending_payments,
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(),
Arc::clone(&kv_store),
Arc::clone(&logger),
)),
Err(e) => {
log_error!(logger, "Failed to read pending payment data from store: {}", e);
return Err(BuildError::ReadFailed);
},
};

let wallet = Arc::new(Wallet::new(
bdk_wallet,
wallet_persister,
Expand All @@ -1243,6 +1262,7 @@ fn build_with_store_internal(
Arc::clone(&payment_store),
Arc::clone(&config),
Arc::clone(&logger),
Arc::clone(&pending_payment_store),
));

// Initialize the KeysManager
Expand Down
4 changes: 4 additions & 0 deletions src/data_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ where
})?;
Ok(())
}

pub(crate) fn contains_key(&self, id: &SO::Id) -> bool {
self.objects.lock().unwrap().contains_key(id)
}
}

#[cfg(test)]
Expand Down
4 changes: 4 additions & 0 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,7 @@ pub(crate) const BDK_WALLET_INDEXER_KEY: &str = "indexer";
///
/// [`StaticInvoice`]: lightning::offers::static_invoice::StaticInvoice
pub(crate) const STATIC_INVOICE_STORE_PRIMARY_NAMESPACE: &str = "static_invoices";

/// The pending payment information will be persisted under this prefix.
pub(crate) const PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE: &str = "pending_payments";
pub(crate) const PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE: &str = "";
78 changes: 78 additions & 0 deletions src/io/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use crate::io::{
NODE_METRICS_KEY, NODE_METRICS_PRIMARY_NAMESPACE, NODE_METRICS_SECONDARY_NAMESPACE,
};
use crate::logger::{log_error, LdkLogger, Logger};
use crate::payment::PendingPaymentDetails;
use crate::peer_store::PeerStore;
use crate::types::{Broadcaster, DynStore, KeysManager, Sweeper};
use crate::wallet::ser::{ChangeSetDeserWrapper, ChangeSetSerWrapper};
Expand Down Expand Up @@ -626,6 +627,83 @@ pub(crate) fn read_bdk_wallet_change_set(
Ok(Some(change_set))
}

/// Read previously persisted pending payments information from the store.
pub(crate) async fn read_pending_payments<L: Deref>(
kv_store: &DynStore, logger: L,
) -> Result<Vec<PendingPaymentDetails>, std::io::Error>
where
L::Target: LdkLogger,
{
let mut res = Vec::new();

let mut stored_keys = KVStore::list(
&*kv_store,
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
)
.await?;

const BATCH_SIZE: usize = 50;

let mut set = tokio::task::JoinSet::new();

// Fill JoinSet with tasks if possible
while set.len() < BATCH_SIZE && !stored_keys.is_empty() {
if let Some(next_key) = stored_keys.pop() {
let fut = KVStore::read(
&*kv_store,
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
&next_key,
);
set.spawn(fut);
debug_assert!(set.len() <= BATCH_SIZE);
}
}

while let Some(read_res) = set.join_next().await {
// Exit early if we get an IO error.
let reader = read_res
.map_err(|e| {
log_error!(logger, "Failed to read PendingPaymentDetails: {}", e);
set.abort_all();
e
})?
.map_err(|e| {
log_error!(logger, "Failed to read PendingPaymentDetails: {}", e);
set.abort_all();
e
})?;

// Refill set for every finished future, if we still have something to do.
if let Some(next_key) = stored_keys.pop() {
let fut = KVStore::read(
&*kv_store,
PENDING_PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE,
PENDING_PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE,
&next_key,
);
set.spawn(fut);
debug_assert!(set.len() <= BATCH_SIZE);
}

// Handle result.
let pending_payment = PendingPaymentDetails::read(&mut &*reader).map_err(|e| {
log_error!(logger, "Failed to deserialize PendingPaymentDetails: {}", e);
std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Failed to deserialize PendingPaymentDetails",
)
})?;
res.push(pending_payment);
}

debug_assert!(set.is_empty());
debug_assert!(stored_keys.is_empty());

Ok(res)
}

#[cfg(test)]
mod tests {
use super::read_or_generate_seed_file;
Expand Down
2 changes: 2 additions & 0 deletions src/payment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ pub(crate) mod asynchronous;
mod bolt11;
mod bolt12;
mod onchain;
pub(crate) mod pending_payment_store;
mod spontaneous;
pub(crate) mod store;
mod unified;

pub use bolt11::Bolt11Payment;
pub use bolt12::Bolt12Payment;
pub use onchain::OnchainPayment;
pub use pending_payment_store::PendingPaymentDetails;
pub use spontaneous::SpontaneousPayment;
pub use store::{
ConfirmationStatus, LSPFeeLimits, PaymentDetails, PaymentDirection, PaymentKind, PaymentStatus,
Expand Down
96 changes: 96 additions & 0 deletions src/payment/pending_payment_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// This file is Copyright its original authors, visible in version control history.
//
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
// accordance with one or both of these licenses.

use bitcoin::Txid;
use lightning::{impl_writeable_tlv_based, ln::channelmanager::PaymentId};

use crate::{
data_store::{StorableObject, StorableObjectUpdate},
payment::{store::PaymentDetailsUpdate, PaymentDetails},
};

/// Represents a pending payment
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct PendingPaymentDetails {
/// The full payment details
pub details: PaymentDetails,
/// Cached timestamp for efficient cleanup queries
pub created_at: u64,
/// Transaction IDs that have replaced or conflict with this payment.
pub conflicting_txids: Vec<Txid>,
}

impl PendingPaymentDetails {
pub(crate) fn new(details: PaymentDetails, conflicting_txids: Vec<Txid>) -> Self {
Self { created_at: details.latest_update_timestamp, details, conflicting_txids }
}

/// Convert to finalized payment for the main payment store
pub fn into_payment_details(self) -> PaymentDetails {
self.details
}
}

impl_writeable_tlv_based!(PendingPaymentDetails, {
(0, details, required),
(2, created_at, required),
(4, conflicting_txids, optional_vec),
});

#[derive(Clone, Debug, PartialEq, Eq)]
pub(crate) struct PendingPaymentDetailsUpdate {
pub id: PaymentId,
pub payment_update: Option<PaymentDetailsUpdate>,
pub conflicting_txids: Option<Vec<Txid>>,
}

impl StorableObject for PendingPaymentDetails {
type Id = PaymentId;
type Update = PendingPaymentDetailsUpdate;

fn id(&self) -> Self::Id {
self.details.id
}

fn update(&mut self, update: &Self::Update) -> bool {
let mut updated = false;

// Update the underlying payment details if present
if let Some(payment_update) = &update.payment_update {
updated |= self.details.update(payment_update);
}

if let Some(new_conflicting_txids) = &update.conflicting_txids {
if &self.conflicting_txids != new_conflicting_txids {
self.conflicting_txids = new_conflicting_txids.clone();
updated = true;
}
}

updated
}

fn to_update(&self) -> Self::Update {
self.into()
}
}

impl StorableObjectUpdate<PendingPaymentDetails> for PendingPaymentDetailsUpdate {
fn id(&self) -> <PendingPaymentDetails as StorableObject>::Id {
self.id
}
}

impl From<&PendingPaymentDetails> for PendingPaymentDetailsUpdate {
fn from(value: &PendingPaymentDetails) -> Self {
Self {
id: value.id(),
payment_update: Some(value.details.to_update()),
conflicting_txids: Some(value.conflicting_txids.clone()),
}
}
}
4 changes: 3 additions & 1 deletion src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::fee_estimator::OnchainFeeEstimator;
use crate::gossip::RuntimeSpawner;
use crate::logger::Logger;
use crate::message_handler::NodeCustomMessageHandler;
use crate::payment::PaymentDetails;
use crate::payment::{PaymentDetails, PendingPaymentDetails};

/// A supertrait that requires that a type implements both [`KVStore`] and [`KVStoreSync`] at the
/// same time.
Expand Down Expand Up @@ -609,3 +609,5 @@ impl From<&(u64, Vec<u8>)> for CustomTlvRecord {
CustomTlvRecord { type_num: tlv.0, value: tlv.1.clone() }
}
}

pub(crate) type PendingPaymentStore = DataStore<PendingPaymentDetails, Arc<Logger>>;
Loading
Loading