Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 36 additions & 3 deletions orange-sdk/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::logging::Logger;
use crate::store::{self, PaymentId};

use crate::dyn_store::DynStore;
use ldk_node::bitcoin::hashes::Hash;
use ldk_node::bitcoin::secp256k1::PublicKey;
use ldk_node::bitcoin::{OutPoint, Txid};
use ldk_node::lightning::events::{ClosureReason, PaymentFailureReason};
Expand All @@ -17,6 +18,7 @@ use ldk_node::{CustomTlvRecord, UserChannelId};
use std::collections::VecDeque;
use std::sync::Arc;
use std::task::{Poll, Waker};
use std::time::SystemTime;
use tokio::sync::{Mutex, watch};

/// The event queue will be persisted under this key.
Expand Down Expand Up @@ -324,7 +326,7 @@ pub(crate) struct LdkEventHandler {
pub(crate) tx_metadata: store::TxMetadataStore,
pub(crate) payment_receipt_sender: watch::Sender<()>,
pub(crate) channel_pending_sender: watch::Sender<u128>,
pub(crate) splice_pending_sender: watch::Sender<u128>,
pub(crate) splice_pending_inbox: Arc<crate::lightning_wallet::SplicePendingInbox>,
pub(crate) logger: Arc<Logger>,
}

Expand Down Expand Up @@ -410,8 +412,12 @@ impl LdkEventHandler {
"Unexpected PaymentClaimable event received. This is likely due to a bug in the LDK Node implementation."
);
},
ldk_node::Event::ChannelPending { .. } => {
ldk_node::Event::ChannelPending { funding_txo, .. } => {
log_debug!(self.logger, "Received ChannelPending event");
// The funding tx is already in `ldk_node.list_payments()`; populate our
// metadata before any concurrent `list_transactions` call observes the
// outbound payment.
self.reserve_rebalance_slot_for_funding_tx(funding_txo.txid).await;
},
ldk_node::Event::ChannelReady {
channel_id,
Expand Down Expand Up @@ -467,7 +473,11 @@ impl LdkEventHandler {
new_funding_txo,
} => {
log_debug!(self.logger, "Received SplicePending event {event:?}");
let _ = self.splice_pending_sender.send(user_channel_id.0);
// Reserve the metadata slot before delivering so any task waking on the
// inbox (the rebalancer's `OnChainRebalanceInitiated` for splice-in,
// `pay_lightning` for splice-out) sees an entry to upsert.
self.reserve_rebalance_slot_for_funding_tx(new_funding_txo.txid).await;
self.splice_pending_inbox.deliver(user_channel_id.0, new_funding_txo);

if let Err(e) = self
.event_queue
Expand All @@ -492,4 +502,27 @@ impl LdkEventHandler {
log_error!(self.logger, "Failed to handle event: {e:?}");
}
}

/// Reserve a `PendingRebalance` metadata slot for a freshly broadcast channel or splice
/// funding tx. The matching outbound on-chain payment is already visible in
/// `ldk_node.list_payments()` by the time we're called, so without this entry
/// `list_transactions` would trip its `debug_assert_ne!`. `PendingRebalance` is used as the
/// placeholder because `list_transactions` already skips it.
async fn reserve_rebalance_slot_for_funding_tx(&self, txid: Txid) {
let payment_id = PaymentId::SelfCustodial(txid.to_byte_array());
if self.tx_metadata.read().get(&payment_id).is_some() {
return;
}
self.tx_metadata
.upsert(
payment_id,
store::TxMetadata {
ty: store::TxType::PendingRebalance {},
time: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default(),
},
)
.await;
}
}
11 changes: 7 additions & 4 deletions orange-sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1459,8 +1459,14 @@ impl Wallet {
/// Stops the wallet, which will stop the underlying LDK node and any background tasks.
/// This will ensure that any critical tasks have completed before stopping.
pub async fn stop(&self) {
// wait for the balance mutex to ensure no other tasks are running
log_info!(self.inner.logger, "Stopping...");
// Abort the background rebalance loop first. If a rebalance is parked in
// `await_splice_pending` waiting for an event that's never going to arrive
// (e.g. the LSP went away mid-splice), it's still holding the rebalancer's
// `balance_mutex` — `rebalancer.stop().await` would deadlock waiting for it.
// Dropping the task releases the mutex.
self.inner.runtime.abort_cancellable_background_tasks();

log_info!(self.inner.logger, "Stopping rebalancer...");
let _ = self.inner.rebalancer.stop().await;

Expand All @@ -1470,9 +1476,6 @@ impl Wallet {
log_debug!(self.inner.logger, "Stopping ln wallet...");
self.inner.ln_wallet.stop();

// Cancel cancellable background tasks
self.inner.runtime.abort_cancellable_background_tasks();

// Wait until non-cancellable background tasks (mod LDK's background processor) are done.
self.inner.runtime.wait_on_background_tasks();
}
Expand Down
139 changes: 63 additions & 76 deletions orange-sdk/src/lightning_wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ use graduated_rebalancer::{LightningBalance, ReceivedLightningPayment};
use std::collections::HashMap;
use std::fmt::Debug;
use std::pin::Pin;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::time::SystemTime;
use tokio::sync::watch;
use tokio::sync::{Notify, watch};

#[derive(Debug, Clone, Copy)]
pub(crate) struct LightningWalletBalance {
Expand All @@ -46,11 +46,27 @@ pub(crate) struct LightningWalletImpl {
store: Arc<dyn DynStore>,
payment_receipt_flag: watch::Receiver<()>,
channel_pending_receipt_flag: watch::Receiver<u128>,
splice_pending_receipt_flag: watch::Receiver<u128>,
splice_pending_inbox: Arc<SplicePendingInbox>,
lsp_node_id: PublicKey,
lsp_socket_addr: SocketAddress,
}

/// One pending `SplicePending` event per `user_channel_id`, consumed by
/// `await_splice_pending`. A queue rather than a `watch` so each consumer takes its own event:
/// `watch` would let a second splice on the same channel observe the previous splice's stale
/// outpoint instead of waiting for the new `SplicePending`.
pub(crate) struct SplicePendingInbox {
pub(crate) pending: Mutex<HashMap<u128, OutPoint>>,
pub(crate) notify: Notify,
}

impl SplicePendingInbox {
pub(crate) fn deliver(&self, channel_id: u128, funding_txo: OutPoint) {
self.pending.lock().unwrap().insert(channel_id, funding_txo);
self.notify.notify_waiters();
}
}

pub(crate) struct LightningWallet {
pub(crate) inner: Arc<LightningWalletImpl>,
}
Expand Down Expand Up @@ -176,14 +192,17 @@ impl LightningWallet {
Arc::new(builder.build_with_store(node_entropy, LdkNodeStore(Arc::clone(&store)))?);
let (payment_receipt_sender, payment_receipt_flag) = watch::channel(());
let (channel_pending_sender, channel_pending_receipt_flag) = watch::channel(0);
let (splice_pending_sender, splice_pending_receipt_flag) = watch::channel(0);
let splice_pending_inbox = Arc::new(SplicePendingInbox {
pending: Mutex::new(HashMap::new()),
notify: Notify::new(),
});
let ev_handler = Arc::new(LdkEventHandler {
event_queue,
ldk_node: Arc::clone(&ldk_node),
tx_metadata,
payment_receipt_sender,
channel_pending_sender,
splice_pending_sender,
splice_pending_inbox: Arc::clone(&splice_pending_inbox),
logger: Arc::clone(&logger),
});
let inner = Arc::new(LightningWalletImpl {
Expand All @@ -192,7 +211,7 @@ impl LightningWallet {
store,
payment_receipt_flag,
channel_pending_receipt_flag,
splice_pending_receipt_flag,
splice_pending_inbox,
lsp_node_id,
lsp_socket_addr,
});
Expand Down Expand Up @@ -222,10 +241,19 @@ impl LightningWallet {
flag.wait_for(|t| t == &channel_id).await.expect("channel pending not received");
}

pub(crate) async fn await_splice_pending(&self, channel_id: u128) {
let mut flag = self.inner.splice_pending_receipt_flag.clone();
flag.mark_unchanged();
flag.wait_for(|t| t == &channel_id).await.expect("splice pending not received");
pub(crate) async fn await_splice_pending(&self, channel_id: u128) -> OutPoint {
let inbox = &self.inner.splice_pending_inbox;
loop {
// Register interest BEFORE checking the queue so a `notify_waiters` racing with the
// check still wakes us up.
let notified = inbox.notify.notified();
tokio::pin!(notified);
notified.as_mut().enable();
if let Some(txo) = inbox.pending.lock().unwrap().remove(&channel_id) {
return txo;
}
notified.await;
}
}

pub(crate) fn get_on_chain_address(&self) -> Result<Address, NodeError> {
Expand Down Expand Up @@ -343,53 +371,28 @@ impl LightningWallet {
amount_sats,
)?;

loop {
let funding_txo =
self.await_splice_pending(chan.user_channel_id.0).await;
let channels = self.inner.ldk_node.list_channels();
let new_chan = channels
.iter()
.find(|c| c.user_channel_id == chan.user_channel_id);
match new_chan {
Some(c) => {
if c.funding_txo
.is_some_and(|f| f != chan.funding_txo.unwrap())
{
let funding_txo = c.funding_txo.unwrap();

let id = PaymentId(funding_txo.txid.to_byte_array());
let details = PaymentDetails {
id,
kind: PaymentKind::Onchain {
txid: funding_txo.txid,
status: ConfirmationStatus::Unconfirmed, // todo how do we update this?
},
amount_msat: Some(amount_sats * 1_000),
fee_paid_msat: Some(69), // todo get real fee
direction: PaymentDirection::Outbound,
status: PaymentStatus::Succeeded,
latest_update_timestamp: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs(),
};

store::write_splice_out(
self.inner.store.as_ref(),
&details,
)
.await;
return Ok(id);
}
},
None => {
log_error!(
self.inner.logger,
"Channel disappeared while awaiting splice out"
);
return Err(NodeError::WalletOperationFailed);
},
}
}

let id = PaymentId(funding_txo.txid.to_byte_array());
let details = PaymentDetails {
id,
kind: PaymentKind::Onchain {
txid: funding_txo.txid,
status: ConfirmationStatus::Unconfirmed, // todo how do we update this?
},
amount_msat: Some(amount_sats * 1_000),
fee_paid_msat: Some(69), // todo get real fee
direction: PaymentDirection::Outbound,
status: PaymentStatus::Succeeded,
latest_update_timestamp: SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_secs(),
};

store::write_splice_out(self.inner.store.as_ref(), &details).await;
Ok(id)
},
}
}
Expand Down Expand Up @@ -587,26 +590,10 @@ impl graduated_rebalancer::LightningWallet for LightningWallet {
fn await_splice_pending(
&self, channel_id: u128,
) -> Pin<Box<dyn Future<Output = OutPoint> + Send + '_>> {
Box::pin(async move {
// todo since we can't see if we have any active splices, we just await the next splice pending event
// this is kinda race-y hopefully we can fix
self.await_splice_pending(channel_id).await;
loop {
let channels = self.inner.ldk_node.list_channels();
let chan = channels
.into_iter()
.find(|c| c.user_channel_id.0 == channel_id && c.funding_txo.is_some());
match chan {
Some(c) => {
return c.funding_txo.expect("channel has no funding txo");
},
None => {
self.await_splice_pending(channel_id).await;
// Wait for the next channel pending event
},
}
}
})
// `ChannelDetails.funding_txo` from `list_channels` still reports the old funding
// outpoint between `SplicePending` and `SpliceLocked`, so we return the new outpoint
// from the event itself rather than reading it back from the channel.
Box::pin(async move { self.await_splice_pending(channel_id).await })
}
}

Expand Down
13 changes: 7 additions & 6 deletions orange-sdk/src/rebalancer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,17 +361,18 @@ impl graduated_rebalancer::EventHandler for OrangeRebalanceEventHandler {
let chan_txid = channel_outpoint.txid;
let triggering_txid = Txid::from_byte_array(trigger_id);
let trigger_id = PaymentId::SelfCustodial(triggering_txid.to_byte_array());
self.tx_metadata
.set_tx_caused_rebalance(&trigger_id)
.await
.expect("Failed to write metadata for onchain rebalance transaction");
let metadata = TxMetadata {
ty: TxType::OnchainToLightning { channel_txid: chan_txid, triggering_txid },
time: SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap(),
};
self.tx_metadata
.insert(PaymentId::SelfCustodial(chan_txid.to_byte_array()), metadata)
.await;
.set_tx_caused_rebalance_with_splice(
&trigger_id,
PaymentId::SelfCustodial(chan_txid.to_byte_array()),
metadata,
)
.await
.expect("Failed to write metadata for onchain rebalance transaction");
},
}
})
Expand Down
52 changes: 52 additions & 0 deletions orange-sdk/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,58 @@ impl TxMetadataStore {
Ok(())
}

/// Atomically marks `trigger_id` as a rebalance trigger and writes `splice_metadata` for
/// `splice_id`. The in-memory updates happen under a single write lock so that a concurrent
/// `list_transactions` either sees both changes or neither — without this, the rebalancer
/// briefly exposes a state where the trigger has been promoted to
/// `PaymentTriggeringTransferLightning` but the matching `OnchainToLightning` splice entry
/// hasn't landed yet, which makes the `InternalTransfer` validation in `list_transactions`
/// trip on a missing `send_fee`.
pub async fn set_tx_caused_rebalance_with_splice(
&self, trigger_id: &PaymentId, splice_id: PaymentId, splice_metadata: TxMetadata,
) -> Result<(), ()> {
let (trigger_entry, splice_entry) = {
let mut tx_metadata = self.tx_metadata.write().unwrap();
let trigger = match tx_metadata.get_mut(trigger_id) {
Some(metadata) => {
if let TxType::Payment { ty } = &mut metadata.ty {
metadata.ty = TxType::PaymentTriggeringTransferLightning { ty: *ty };
(trigger_id.to_string(), metadata.encode())
} else {
eprintln!("payment_id {trigger_id} is not a payment, cannot set rebalance");
return Err(());
}
},
None => {
eprintln!("doesn't exist in metadata store: {trigger_id}");
return Err(());
},
};
tx_metadata.insert(splice_id, splice_metadata);
let splice = (splice_id.to_string(), splice_metadata.encode());
(trigger, splice)
};
let (trigger_res, splice_res) = tokio::join!(
KVStore::write(
self.store.as_ref(),
STORE_PRIMARY_KEY,
STORE_SECONDARY_KEY,
&trigger_entry.0,
trigger_entry.1,
),
KVStore::write(
self.store.as_ref(),
STORE_PRIMARY_KEY,
STORE_SECONDARY_KEY,
&splice_entry.0,
splice_entry.1,
),
);
trigger_res.expect("We do not allow writes to fail");
splice_res.expect("We do not allow writes to fail");
Ok(())
}

/// Sets the preimage for an outgoing lightning payment. If the payment already has a preimage,
/// this is a no-op and returns Ok(()). If the payment_id does not exist in the store, or if the payment
/// is not an outgoing lightning payment, returns Err(()).
Expand Down