From 817208ddfc1b4b8922397ccb04669b4669c170d5 Mon Sep 17 00:00:00 2001 From: benthecarman Date: Mon, 11 May 2026 12:29:41 -0500 Subject: [PATCH] Reserve metadata slot at SplicePending/ChannelPending ldk-node lists the splice or channel funding tx as an outbound on-chain payment the moment it broadcasts, but our tx_metadata entry isn't inserted until the rebalancer's OnChainRebalanceInitiated handler runs. A concurrent list_transactions call sees an outbound payment with no metadata and trips a debug_assert. Reserve a PendingRebalance placeholder at SplicePending and ChannelPending, which list_transactions already skips. Combine the trigger promotion and splice upsert in OnChainRebalanceInitiated into a single write-lock acquisition so the trigger and splice metadata are never visible to a reader in a half-updated state. Co-Authored-By: Claude Opus 4.7 (1M context) --- orange-sdk/src/event.rs | 39 +++++++- orange-sdk/src/lib.rs | 11 ++- orange-sdk/src/lightning_wallet.rs | 139 +++++++++++++---------------- orange-sdk/src/rebalancer.rs | 13 +-- orange-sdk/src/store.rs | 52 +++++++++++ 5 files changed, 165 insertions(+), 89 deletions(-) diff --git a/orange-sdk/src/event.rs b/orange-sdk/src/event.rs index 4c3a66e..e36127c 100644 --- a/orange-sdk/src/event.rs +++ b/orange-sdk/src/event.rs @@ -2,6 +2,7 @@ use crate::logging::Logger; use crate::store::{self, PaymentId}; use crate::dyn_store::DynStore; +use ldk_node::bitcoin::hashes::Hash; use ldk_node::bitcoin::secp256k1::PublicKey; use ldk_node::bitcoin::{OutPoint, Txid}; use ldk_node::lightning::events::{ClosureReason, PaymentFailureReason}; @@ -17,6 +18,7 @@ use ldk_node::{CustomTlvRecord, UserChannelId}; use std::collections::VecDeque; use std::sync::Arc; use std::task::{Poll, Waker}; +use std::time::SystemTime; use tokio::sync::{Mutex, watch}; /// The event queue will be persisted under this key. @@ -324,7 +326,7 @@ pub(crate) struct LdkEventHandler { pub(crate) tx_metadata: store::TxMetadataStore, pub(crate) payment_receipt_sender: watch::Sender<()>, pub(crate) channel_pending_sender: watch::Sender, - pub(crate) splice_pending_sender: watch::Sender, + pub(crate) splice_pending_inbox: Arc, pub(crate) logger: Arc, } @@ -410,8 +412,12 @@ impl LdkEventHandler { "Unexpected PaymentClaimable event received. This is likely due to a bug in the LDK Node implementation." ); }, - ldk_node::Event::ChannelPending { .. } => { + ldk_node::Event::ChannelPending { funding_txo, .. } => { log_debug!(self.logger, "Received ChannelPending event"); + // The funding tx is already in `ldk_node.list_payments()`; populate our + // metadata before any concurrent `list_transactions` call observes the + // outbound payment. + self.reserve_rebalance_slot_for_funding_tx(funding_txo.txid).await; }, ldk_node::Event::ChannelReady { channel_id, @@ -467,7 +473,11 @@ impl LdkEventHandler { new_funding_txo, } => { log_debug!(self.logger, "Received SplicePending event {event:?}"); - let _ = self.splice_pending_sender.send(user_channel_id.0); + // Reserve the metadata slot before delivering so any task waking on the + // inbox (the rebalancer's `OnChainRebalanceInitiated` for splice-in, + // `pay_lightning` for splice-out) sees an entry to upsert. + self.reserve_rebalance_slot_for_funding_tx(new_funding_txo.txid).await; + self.splice_pending_inbox.deliver(user_channel_id.0, new_funding_txo); if let Err(e) = self .event_queue @@ -492,4 +502,27 @@ impl LdkEventHandler { log_error!(self.logger, "Failed to handle event: {e:?}"); } } + + /// Reserve a `PendingRebalance` metadata slot for a freshly broadcast channel or splice + /// funding tx. The matching outbound on-chain payment is already visible in + /// `ldk_node.list_payments()` by the time we're called, so without this entry + /// `list_transactions` would trip its `debug_assert_ne!`. `PendingRebalance` is used as the + /// placeholder because `list_transactions` already skips it. + async fn reserve_rebalance_slot_for_funding_tx(&self, txid: Txid) { + let payment_id = PaymentId::SelfCustodial(txid.to_byte_array()); + if self.tx_metadata.read().get(&payment_id).is_some() { + return; + } + self.tx_metadata + .upsert( + payment_id, + store::TxMetadata { + ty: store::TxType::PendingRebalance {}, + time: SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default(), + }, + ) + .await; + } } diff --git a/orange-sdk/src/lib.rs b/orange-sdk/src/lib.rs index 607f932..5538779 100644 --- a/orange-sdk/src/lib.rs +++ b/orange-sdk/src/lib.rs @@ -1459,8 +1459,14 @@ impl Wallet { /// Stops the wallet, which will stop the underlying LDK node and any background tasks. /// This will ensure that any critical tasks have completed before stopping. pub async fn stop(&self) { - // wait for the balance mutex to ensure no other tasks are running log_info!(self.inner.logger, "Stopping..."); + // Abort the background rebalance loop first. If a rebalance is parked in + // `await_splice_pending` waiting for an event that's never going to arrive + // (e.g. the LSP went away mid-splice), it's still holding the rebalancer's + // `balance_mutex` — `rebalancer.stop().await` would deadlock waiting for it. + // Dropping the task releases the mutex. + self.inner.runtime.abort_cancellable_background_tasks(); + log_info!(self.inner.logger, "Stopping rebalancer..."); let _ = self.inner.rebalancer.stop().await; @@ -1470,9 +1476,6 @@ impl Wallet { log_debug!(self.inner.logger, "Stopping ln wallet..."); self.inner.ln_wallet.stop(); - // Cancel cancellable background tasks - self.inner.runtime.abort_cancellable_background_tasks(); - // Wait until non-cancellable background tasks (mod LDK's background processor) are done. self.inner.runtime.wait_on_background_tasks(); } diff --git a/orange-sdk/src/lightning_wallet.rs b/orange-sdk/src/lightning_wallet.rs index b910856..37961f6 100644 --- a/orange-sdk/src/lightning_wallet.rs +++ b/orange-sdk/src/lightning_wallet.rs @@ -30,9 +30,9 @@ use graduated_rebalancer::{LightningBalance, ReceivedLightningPayment}; use std::collections::HashMap; use std::fmt::Debug; use std::pin::Pin; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::SystemTime; -use tokio::sync::watch; +use tokio::sync::{Notify, watch}; #[derive(Debug, Clone, Copy)] pub(crate) struct LightningWalletBalance { @@ -46,11 +46,27 @@ pub(crate) struct LightningWalletImpl { store: Arc, payment_receipt_flag: watch::Receiver<()>, channel_pending_receipt_flag: watch::Receiver, - splice_pending_receipt_flag: watch::Receiver, + splice_pending_inbox: Arc, lsp_node_id: PublicKey, lsp_socket_addr: SocketAddress, } +/// One pending `SplicePending` event per `user_channel_id`, consumed by +/// `await_splice_pending`. A queue rather than a `watch` so each consumer takes its own event: +/// `watch` would let a second splice on the same channel observe the previous splice's stale +/// outpoint instead of waiting for the new `SplicePending`. +pub(crate) struct SplicePendingInbox { + pub(crate) pending: Mutex>, + pub(crate) notify: Notify, +} + +impl SplicePendingInbox { + pub(crate) fn deliver(&self, channel_id: u128, funding_txo: OutPoint) { + self.pending.lock().unwrap().insert(channel_id, funding_txo); + self.notify.notify_waiters(); + } +} + pub(crate) struct LightningWallet { pub(crate) inner: Arc, } @@ -176,14 +192,17 @@ impl LightningWallet { Arc::new(builder.build_with_store(node_entropy, LdkNodeStore(Arc::clone(&store)))?); let (payment_receipt_sender, payment_receipt_flag) = watch::channel(()); let (channel_pending_sender, channel_pending_receipt_flag) = watch::channel(0); - let (splice_pending_sender, splice_pending_receipt_flag) = watch::channel(0); + let splice_pending_inbox = Arc::new(SplicePendingInbox { + pending: Mutex::new(HashMap::new()), + notify: Notify::new(), + }); let ev_handler = Arc::new(LdkEventHandler { event_queue, ldk_node: Arc::clone(&ldk_node), tx_metadata, payment_receipt_sender, channel_pending_sender, - splice_pending_sender, + splice_pending_inbox: Arc::clone(&splice_pending_inbox), logger: Arc::clone(&logger), }); let inner = Arc::new(LightningWalletImpl { @@ -192,7 +211,7 @@ impl LightningWallet { store, payment_receipt_flag, channel_pending_receipt_flag, - splice_pending_receipt_flag, + splice_pending_inbox, lsp_node_id, lsp_socket_addr, }); @@ -222,10 +241,19 @@ impl LightningWallet { flag.wait_for(|t| t == &channel_id).await.expect("channel pending not received"); } - pub(crate) async fn await_splice_pending(&self, channel_id: u128) { - let mut flag = self.inner.splice_pending_receipt_flag.clone(); - flag.mark_unchanged(); - flag.wait_for(|t| t == &channel_id).await.expect("splice pending not received"); + pub(crate) async fn await_splice_pending(&self, channel_id: u128) -> OutPoint { + let inbox = &self.inner.splice_pending_inbox; + loop { + // Register interest BEFORE checking the queue so a `notify_waiters` racing with the + // check still wakes us up. + let notified = inbox.notify.notified(); + tokio::pin!(notified); + notified.as_mut().enable(); + if let Some(txo) = inbox.pending.lock().unwrap().remove(&channel_id) { + return txo; + } + notified.await; + } } pub(crate) fn get_on_chain_address(&self) -> Result { @@ -343,53 +371,28 @@ impl LightningWallet { amount_sats, )?; - loop { + let funding_txo = self.await_splice_pending(chan.user_channel_id.0).await; - let channels = self.inner.ldk_node.list_channels(); - let new_chan = channels - .iter() - .find(|c| c.user_channel_id == chan.user_channel_id); - match new_chan { - Some(c) => { - if c.funding_txo - .is_some_and(|f| f != chan.funding_txo.unwrap()) - { - let funding_txo = c.funding_txo.unwrap(); - - let id = PaymentId(funding_txo.txid.to_byte_array()); - let details = PaymentDetails { - id, - kind: PaymentKind::Onchain { - txid: funding_txo.txid, - status: ConfirmationStatus::Unconfirmed, // todo how do we update this? - }, - amount_msat: Some(amount_sats * 1_000), - fee_paid_msat: Some(69), // todo get real fee - direction: PaymentDirection::Outbound, - status: PaymentStatus::Succeeded, - latest_update_timestamp: SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs(), - }; - - store::write_splice_out( - self.inner.store.as_ref(), - &details, - ) - .await; - return Ok(id); - } - }, - None => { - log_error!( - self.inner.logger, - "Channel disappeared while awaiting splice out" - ); - return Err(NodeError::WalletOperationFailed); - }, - } - } + + let id = PaymentId(funding_txo.txid.to_byte_array()); + let details = PaymentDetails { + id, + kind: PaymentKind::Onchain { + txid: funding_txo.txid, + status: ConfirmationStatus::Unconfirmed, // todo how do we update this? + }, + amount_msat: Some(amount_sats * 1_000), + fee_paid_msat: Some(69), // todo get real fee + direction: PaymentDirection::Outbound, + status: PaymentStatus::Succeeded, + latest_update_timestamp: SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(), + }; + + store::write_splice_out(self.inner.store.as_ref(), &details).await; + Ok(id) }, } } @@ -587,26 +590,10 @@ impl graduated_rebalancer::LightningWallet for LightningWallet { fn await_splice_pending( &self, channel_id: u128, ) -> Pin + Send + '_>> { - Box::pin(async move { - // todo since we can't see if we have any active splices, we just await the next splice pending event - // this is kinda race-y hopefully we can fix - self.await_splice_pending(channel_id).await; - loop { - let channels = self.inner.ldk_node.list_channels(); - let chan = channels - .into_iter() - .find(|c| c.user_channel_id.0 == channel_id && c.funding_txo.is_some()); - match chan { - Some(c) => { - return c.funding_txo.expect("channel has no funding txo"); - }, - None => { - self.await_splice_pending(channel_id).await; - // Wait for the next channel pending event - }, - } - } - }) + // `ChannelDetails.funding_txo` from `list_channels` still reports the old funding + // outpoint between `SplicePending` and `SpliceLocked`, so we return the new outpoint + // from the event itself rather than reading it back from the channel. + Box::pin(async move { self.await_splice_pending(channel_id).await }) } } diff --git a/orange-sdk/src/rebalancer.rs b/orange-sdk/src/rebalancer.rs index 5b459b3..f3f142f 100644 --- a/orange-sdk/src/rebalancer.rs +++ b/orange-sdk/src/rebalancer.rs @@ -361,17 +361,18 @@ impl graduated_rebalancer::EventHandler for OrangeRebalanceEventHandler { let chan_txid = channel_outpoint.txid; let triggering_txid = Txid::from_byte_array(trigger_id); let trigger_id = PaymentId::SelfCustodial(triggering_txid.to_byte_array()); - self.tx_metadata - .set_tx_caused_rebalance(&trigger_id) - .await - .expect("Failed to write metadata for onchain rebalance transaction"); let metadata = TxMetadata { ty: TxType::OnchainToLightning { channel_txid: chan_txid, triggering_txid }, time: SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap(), }; self.tx_metadata - .insert(PaymentId::SelfCustodial(chan_txid.to_byte_array()), metadata) - .await; + .set_tx_caused_rebalance_with_splice( + &trigger_id, + PaymentId::SelfCustodial(chan_txid.to_byte_array()), + metadata, + ) + .await + .expect("Failed to write metadata for onchain rebalance transaction"); }, } }) diff --git a/orange-sdk/src/store.rs b/orange-sdk/src/store.rs index 58ce72e..6a86735 100644 --- a/orange-sdk/src/store.rs +++ b/orange-sdk/src/store.rs @@ -368,6 +368,58 @@ impl TxMetadataStore { Ok(()) } + /// Atomically marks `trigger_id` as a rebalance trigger and writes `splice_metadata` for + /// `splice_id`. The in-memory updates happen under a single write lock so that a concurrent + /// `list_transactions` either sees both changes or neither — without this, the rebalancer + /// briefly exposes a state where the trigger has been promoted to + /// `PaymentTriggeringTransferLightning` but the matching `OnchainToLightning` splice entry + /// hasn't landed yet, which makes the `InternalTransfer` validation in `list_transactions` + /// trip on a missing `send_fee`. + pub async fn set_tx_caused_rebalance_with_splice( + &self, trigger_id: &PaymentId, splice_id: PaymentId, splice_metadata: TxMetadata, + ) -> Result<(), ()> { + let (trigger_entry, splice_entry) = { + let mut tx_metadata = self.tx_metadata.write().unwrap(); + let trigger = match tx_metadata.get_mut(trigger_id) { + Some(metadata) => { + if let TxType::Payment { ty } = &mut metadata.ty { + metadata.ty = TxType::PaymentTriggeringTransferLightning { ty: *ty }; + (trigger_id.to_string(), metadata.encode()) + } else { + eprintln!("payment_id {trigger_id} is not a payment, cannot set rebalance"); + return Err(()); + } + }, + None => { + eprintln!("doesn't exist in metadata store: {trigger_id}"); + return Err(()); + }, + }; + tx_metadata.insert(splice_id, splice_metadata); + let splice = (splice_id.to_string(), splice_metadata.encode()); + (trigger, splice) + }; + let (trigger_res, splice_res) = tokio::join!( + KVStore::write( + self.store.as_ref(), + STORE_PRIMARY_KEY, + STORE_SECONDARY_KEY, + &trigger_entry.0, + trigger_entry.1, + ), + KVStore::write( + self.store.as_ref(), + STORE_PRIMARY_KEY, + STORE_SECONDARY_KEY, + &splice_entry.0, + splice_entry.1, + ), + ); + trigger_res.expect("We do not allow writes to fail"); + splice_res.expect("We do not allow writes to fail"); + Ok(()) + } + /// Sets the preimage for an outgoing lightning payment. If the payment already has a preimage, /// this is a no-op and returns Ok(()). If the payment_id does not exist in the store, or if the payment /// is not an outgoing lightning payment, returns Err(()).