Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 73 additions & 29 deletions orange-sdk/src/lightning_wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use ldk_node::{NodeError, UserChannelId};

use graduated_rebalancer::{LightningBalance, ReceivedLightningPayment};

use std::collections::HashMap;
use std::collections::{HashMap, VecDeque};
use std::fmt::Debug;
use std::pin::Pin;
use std::sync::{Arc, Mutex};
Expand All @@ -51,22 +51,6 @@ pub(crate) struct LightningWalletImpl {
lsp_socket_addr: SocketAddress,
}

/// One pending `SplicePending` event per `user_channel_id`, consumed by
/// `await_splice_pending`. A queue rather than a `watch` so each consumer takes its own event:
/// `watch` would let a second splice on the same channel observe the previous splice's stale
/// outpoint instead of waiting for the new `SplicePending`.
pub(crate) struct SplicePendingInbox {
pub(crate) pending: Mutex<HashMap<u128, OutPoint>>,
pub(crate) notify: Notify,
}

impl SplicePendingInbox {
pub(crate) fn deliver(&self, channel_id: u128, funding_txo: OutPoint) {
self.pending.lock().unwrap().insert(channel_id, funding_txo);
self.notify.notify_waiters();
}
}

pub(crate) struct LightningWallet {
pub(crate) inner: Arc<LightningWalletImpl>,
}
Expand Down Expand Up @@ -242,18 +226,7 @@ impl LightningWallet {
}

pub(crate) async fn await_splice_pending(&self, channel_id: u128) -> OutPoint {
let inbox = &self.inner.splice_pending_inbox;
loop {
// Register interest BEFORE checking the queue so a `notify_waiters` racing with the
// check still wakes us up.
let notified = inbox.notify.notified();
tokio::pin!(notified);
notified.as_mut().enable();
if let Some(txo) = inbox.pending.lock().unwrap().remove(&channel_id) {
return txo;
}
notified.await;
}
self.inner.splice_pending_inbox.wait_for(channel_id).await
}

pub(crate) fn get_on_chain_address(&self) -> Result<Address, NodeError> {
Expand Down Expand Up @@ -607,3 +580,74 @@ impl From<&PaymentDetails> for PaymentType {
}
}
}

/// Pending `SplicePending` events keyed by `user_channel_id`, consumed by
/// `await_splice_pending`. A per-channel queue rather than a `watch` so each consumer takes its own
/// event: `watch` would let a second splice on the same channel observe the previous splice's stale
/// outpoint instead of waiting for the new `SplicePending`.
pub(crate) struct SplicePendingInbox {
pub(crate) pending: Mutex<HashMap<u128, VecDeque<OutPoint>>>,
pub(crate) notify: Notify,
}

impl SplicePendingInbox {
pub(crate) fn deliver(&self, channel_id: u128, funding_txo: OutPoint) {
self.pending.lock().unwrap().entry(channel_id).or_default().push_back(funding_txo);
self.notify.notify_waiters();
}

pub(crate) async fn wait_for(&self, channel_id: u128) -> OutPoint {
loop {
// Register interest BEFORE checking the queue so a `notify_waiters` racing with the
// check still wakes us up.
let notified = self.notify.notified();
tokio::pin!(notified);
notified.as_mut().enable();
if let Some(txo) = {
let mut pending = self.pending.lock().unwrap();
if let Some(queue) = pending.get_mut(&channel_id) {
let txo = queue.pop_front();
if queue.is_empty() {
pending.remove(&channel_id);
}
txo
} else {
None
}
} {
return txo;
}
notified.await;
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use ldk_node::bitcoin::Txid;

fn dummy_outpoint(seed: u8) -> OutPoint {
let bytes = [seed; 32];
OutPoint { txid: Txid::from_byte_array(bytes), vout: seed as u32 }
}

#[test]
fn splice_pending_inbox_preserves_distinct_events_for_same_channel() {
let inbox =
SplicePendingInbox { pending: Mutex::new(HashMap::new()), notify: Notify::new() };
let channel_id = 7;
let first = dummy_outpoint(0xaa);
let second = dummy_outpoint(0xbb);

inbox.deliver(channel_id, first);
inbox.deliver(channel_id, second);

let mut pending = inbox.pending.lock().unwrap();
let queue = pending.get_mut(&channel_id).expect("channel should have pending events");

assert_eq!(queue.pop_front(), Some(first));
assert_eq!(queue.pop_front(), Some(second));
assert!(queue.is_empty());
}
}
116 changes: 116 additions & 0 deletions orange-sdk/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,122 @@ async fn test_receive_to_onchain_with_channel() {
.await;
}

#[tokio::test(flavor = "multi_thread")]
#[test_log::test]
async fn test_concurrent_splice_in_and_out_preserve_pending_events() {
test_utils::run_test(|params| async move {
let wallet = Arc::clone(&params.wallet);
let lsp = Arc::clone(&params.lsp);
let bitcoind = Arc::clone(&params.bitcoind);
let third_party = Arc::clone(&params.third_party);
let electrsd = Arc::clone(&params.electrsd);

open_channel_from_lsp(&wallet, Arc::clone(&third_party)).await;

generate_blocks(&bitcoind, &electrsd, 6).await;
test_utils::wait_for_condition("wallet sync after channel open", || async {
wallet.channels().iter().any(|a| a.confirmations.is_some_and(|c| c > 0) && a.is_usable)
})
.await;

let recv_amt = Amount::from_sats(300_000).unwrap();
let uri = wallet.get_single_use_receive_uri(Some(recv_amt)).await.unwrap();
let sent_txid = third_party
.onchain_payment()
.send_to_address(&uri.address.unwrap(), recv_amt.sats().unwrap(), None)
.unwrap();

wait_for_tx(&electrsd.client, sent_txid).await;
generate_blocks(&bitcoind, &electrsd, 6).await;
wallet.sync_ln_wallet().unwrap();

test_utils::wait_for_condition("pending balance to update", || async {
wallet.get_balance().await.unwrap().pending_balance == recv_amt
})
.await;

let event = wait_next_event(&wallet).await;
match event {
Event::OnchainPaymentReceived { txid, amount_sat, status, .. } => {
assert_eq!(txid, sent_txid);
assert_eq!(amount_sat, recv_amt.sats().unwrap());
assert!(matches!(status, ConfirmationStatus::Confirmed { .. }));
},
ev => panic!("Expected OnchainPaymentReceived event, got {ev:?}"),
}

let first_splice = tokio::time::timeout(Duration::from_secs(60), wallet.next_event_async())
.await
.expect("timed out waiting for splice-in event");
wallet.event_handled().unwrap();
let first_splice = match first_splice {
Event::SplicePending {
user_channel_id, counterparty_node_id, new_funding_txo, ..
} => {
assert_eq!(counterparty_node_id, lsp.node_id());
(user_channel_id, new_funding_txo)
},
ev => panic!("Expected first SplicePending event, got {ev:?}"),
};

generate_blocks(&bitcoind, &electrsd, 6).await;
wallet.sync_ln_wallet().unwrap();

let addr = third_party.onchain_payment().new_address().unwrap();
let send_amount = Amount::from_sats(10_000).unwrap();
let pay_wallet = Arc::clone(&wallet);
let pay_task = tokio::spawn(async move {
let instr =
pay_wallet.parse_payment_instructions(addr.to_string().as_str()).await.unwrap();
let info = PaymentInfo::build(instr, Some(send_amount)).unwrap();
pay_wallet.pay(&info).await
});

let second_splice = tokio::time::timeout(Duration::from_secs(60), async {
loop {
let event = wallet.next_event_async().await;
wallet.event_handled().unwrap();
if let Event::SplicePending {
user_channel_id,
counterparty_node_id,
new_funding_txo,
..
} = event
{
assert_eq!(counterparty_node_id, lsp.node_id());
break (user_channel_id, new_funding_txo);
}
}
})
.await
.expect("timed out waiting for splice-out event");

assert_eq!(
first_splice.0, second_splice.0,
"both splices should target the same LSP channel"
);
assert_ne!(
first_splice.1, second_splice.1,
"splices should have distinct funding outpoints"
);

tokio::time::timeout(Duration::from_secs(60), pay_task)
.await
.expect("splice-out payment hung waiting for its SplicePending")
.expect("payment task panicked")
.expect("splice-out payment failed");

test_utils::wait_for_condition("splice-in rebalance metadata", || async {
wallet.list_transactions().await.unwrap().iter().any(|tx| {
tx.payment_type == PaymentType::IncomingOnChain { txid: Some(sent_txid) }
&& tx.fee.is_some_and(|fee| fee > Amount::ZERO)
})
})
.await;
})
.await;
}

async fn run_test_pay_lightning_from_self_custody(amountless: bool) {
test_utils::run_test(move |params| async move {
let wallet = Arc::clone(&params.wallet);
Expand Down