diff --git a/orange-sdk/src/lightning_wallet.rs b/orange-sdk/src/lightning_wallet.rs index 21939c7..3e2b2ac 100644 --- a/orange-sdk/src/lightning_wallet.rs +++ b/orange-sdk/src/lightning_wallet.rs @@ -27,7 +27,7 @@ use ldk_node::{NodeError, UserChannelId}; use graduated_rebalancer::{LightningBalance, ReceivedLightningPayment}; -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::fmt::Debug; use std::pin::Pin; use std::sync::{Arc, Mutex}; @@ -51,22 +51,6 @@ pub(crate) struct LightningWalletImpl { lsp_socket_addr: SocketAddress, } -/// One pending `SplicePending` event per `user_channel_id`, consumed by -/// `await_splice_pending`. A queue rather than a `watch` so each consumer takes its own event: -/// `watch` would let a second splice on the same channel observe the previous splice's stale -/// outpoint instead of waiting for the new `SplicePending`. -pub(crate) struct SplicePendingInbox { - pub(crate) pending: Mutex>, - pub(crate) notify: Notify, -} - -impl SplicePendingInbox { - pub(crate) fn deliver(&self, channel_id: u128, funding_txo: OutPoint) { - self.pending.lock().unwrap().insert(channel_id, funding_txo); - self.notify.notify_waiters(); - } -} - pub(crate) struct LightningWallet { pub(crate) inner: Arc, } @@ -242,18 +226,7 @@ impl LightningWallet { } pub(crate) async fn await_splice_pending(&self, channel_id: u128) -> OutPoint { - let inbox = &self.inner.splice_pending_inbox; - loop { - // Register interest BEFORE checking the queue so a `notify_waiters` racing with the - // check still wakes us up. - let notified = inbox.notify.notified(); - tokio::pin!(notified); - notified.as_mut().enable(); - if let Some(txo) = inbox.pending.lock().unwrap().remove(&channel_id) { - return txo; - } - notified.await; - } + self.inner.splice_pending_inbox.wait_for(channel_id).await } pub(crate) fn get_on_chain_address(&self) -> Result { @@ -607,3 +580,74 @@ impl From<&PaymentDetails> for PaymentType { } } } + +/// Pending `SplicePending` events keyed by `user_channel_id`, consumed by +/// `await_splice_pending`. A per-channel queue rather than a `watch` so each consumer takes its own +/// event: `watch` would let a second splice on the same channel observe the previous splice's stale +/// outpoint instead of waiting for the new `SplicePending`. +pub(crate) struct SplicePendingInbox { + pub(crate) pending: Mutex>>, + pub(crate) notify: Notify, +} + +impl SplicePendingInbox { + pub(crate) fn deliver(&self, channel_id: u128, funding_txo: OutPoint) { + self.pending.lock().unwrap().entry(channel_id).or_default().push_back(funding_txo); + self.notify.notify_waiters(); + } + + pub(crate) async fn wait_for(&self, channel_id: u128) -> OutPoint { + loop { + // Register interest BEFORE checking the queue so a `notify_waiters` racing with the + // check still wakes us up. + let notified = self.notify.notified(); + tokio::pin!(notified); + notified.as_mut().enable(); + if let Some(txo) = { + let mut pending = self.pending.lock().unwrap(); + if let Some(queue) = pending.get_mut(&channel_id) { + let txo = queue.pop_front(); + if queue.is_empty() { + pending.remove(&channel_id); + } + txo + } else { + None + } + } { + return txo; + } + notified.await; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use ldk_node::bitcoin::Txid; + + fn dummy_outpoint(seed: u8) -> OutPoint { + let bytes = [seed; 32]; + OutPoint { txid: Txid::from_byte_array(bytes), vout: seed as u32 } + } + + #[test] + fn splice_pending_inbox_preserves_distinct_events_for_same_channel() { + let inbox = + SplicePendingInbox { pending: Mutex::new(HashMap::new()), notify: Notify::new() }; + let channel_id = 7; + let first = dummy_outpoint(0xaa); + let second = dummy_outpoint(0xbb); + + inbox.deliver(channel_id, first); + inbox.deliver(channel_id, second); + + let mut pending = inbox.pending.lock().unwrap(); + let queue = pending.get_mut(&channel_id).expect("channel should have pending events"); + + assert_eq!(queue.pop_front(), Some(first)); + assert_eq!(queue.pop_front(), Some(second)); + assert!(queue.is_empty()); + } +} diff --git a/orange-sdk/tests/integration_tests.rs b/orange-sdk/tests/integration_tests.rs index a2dac0e..c9ae5a2 100644 --- a/orange-sdk/tests/integration_tests.rs +++ b/orange-sdk/tests/integration_tests.rs @@ -599,6 +599,122 @@ async fn test_receive_to_onchain_with_channel() { .await; } +#[tokio::test(flavor = "multi_thread")] +#[test_log::test] +async fn test_concurrent_splice_in_and_out_preserve_pending_events() { + test_utils::run_test(|params| async move { + let wallet = Arc::clone(¶ms.wallet); + let lsp = Arc::clone(¶ms.lsp); + let bitcoind = Arc::clone(¶ms.bitcoind); + let third_party = Arc::clone(¶ms.third_party); + let electrsd = Arc::clone(¶ms.electrsd); + + open_channel_from_lsp(&wallet, Arc::clone(&third_party)).await; + + generate_blocks(&bitcoind, &electrsd, 6).await; + test_utils::wait_for_condition("wallet sync after channel open", || async { + wallet.channels().iter().any(|a| a.confirmations.is_some_and(|c| c > 0) && a.is_usable) + }) + .await; + + let recv_amt = Amount::from_sats(300_000).unwrap(); + let uri = wallet.get_single_use_receive_uri(Some(recv_amt)).await.unwrap(); + let sent_txid = third_party + .onchain_payment() + .send_to_address(&uri.address.unwrap(), recv_amt.sats().unwrap(), None) + .unwrap(); + + wait_for_tx(&electrsd.client, sent_txid).await; + generate_blocks(&bitcoind, &electrsd, 6).await; + wallet.sync_ln_wallet().unwrap(); + + test_utils::wait_for_condition("pending balance to update", || async { + wallet.get_balance().await.unwrap().pending_balance == recv_amt + }) + .await; + + let event = wait_next_event(&wallet).await; + match event { + Event::OnchainPaymentReceived { txid, amount_sat, status, .. } => { + assert_eq!(txid, sent_txid); + assert_eq!(amount_sat, recv_amt.sats().unwrap()); + assert!(matches!(status, ConfirmationStatus::Confirmed { .. })); + }, + ev => panic!("Expected OnchainPaymentReceived event, got {ev:?}"), + } + + let first_splice = tokio::time::timeout(Duration::from_secs(60), wallet.next_event_async()) + .await + .expect("timed out waiting for splice-in event"); + wallet.event_handled().unwrap(); + let first_splice = match first_splice { + Event::SplicePending { + user_channel_id, counterparty_node_id, new_funding_txo, .. + } => { + assert_eq!(counterparty_node_id, lsp.node_id()); + (user_channel_id, new_funding_txo) + }, + ev => panic!("Expected first SplicePending event, got {ev:?}"), + }; + + generate_blocks(&bitcoind, &electrsd, 6).await; + wallet.sync_ln_wallet().unwrap(); + + let addr = third_party.onchain_payment().new_address().unwrap(); + let send_amount = Amount::from_sats(10_000).unwrap(); + let pay_wallet = Arc::clone(&wallet); + let pay_task = tokio::spawn(async move { + let instr = + pay_wallet.parse_payment_instructions(addr.to_string().as_str()).await.unwrap(); + let info = PaymentInfo::build(instr, Some(send_amount)).unwrap(); + pay_wallet.pay(&info).await + }); + + let second_splice = tokio::time::timeout(Duration::from_secs(60), async { + loop { + let event = wallet.next_event_async().await; + wallet.event_handled().unwrap(); + if let Event::SplicePending { + user_channel_id, + counterparty_node_id, + new_funding_txo, + .. + } = event + { + assert_eq!(counterparty_node_id, lsp.node_id()); + break (user_channel_id, new_funding_txo); + } + } + }) + .await + .expect("timed out waiting for splice-out event"); + + assert_eq!( + first_splice.0, second_splice.0, + "both splices should target the same LSP channel" + ); + assert_ne!( + first_splice.1, second_splice.1, + "splices should have distinct funding outpoints" + ); + + tokio::time::timeout(Duration::from_secs(60), pay_task) + .await + .expect("splice-out payment hung waiting for its SplicePending") + .expect("payment task panicked") + .expect("splice-out payment failed"); + + test_utils::wait_for_condition("splice-in rebalance metadata", || async { + wallet.list_transactions().await.unwrap().iter().any(|tx| { + tx.payment_type == PaymentType::IncomingOnChain { txid: Some(sent_txid) } + && tx.fee.is_some_and(|fee| fee > Amount::ZERO) + }) + }) + .await; + }) + .await; +} + async fn run_test_pay_lightning_from_self_custody(amountless: bool) { test_utils::run_test(move |params| async move { let wallet = Arc::clone(¶ms.wallet);