From 2ce50c3df3311d716cba97475e1a0d97091fe201 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 12 Oct 2025 13:48:04 +0000 Subject: [PATCH 01/12] Update BestBlock to store ANTI_REORG_DELAY * 2 recent block hashes On restart, LDK expects the chain to be replayed starting from where it was when objects were last serialized. This is fine in the normal case, but if there was a reorg and the node which we were syncing from either resynced or was changed, the last block that we were synced as of might no longer be available. As a result, it becomes impossible to figure out where the fork point is, and thus to replay the chain. Luckily, changing the block source during a reorg isn't exactly common, but we shouldn't end up with a bricked node. To address this, `lightning-block-sync` allows the user to pass in `Cache` which can be used to cache recent blocks and thus allow for reorg handling in this case. However, serialization for, and a reasonable default implementation of a `Cache` was never built. Instead, here, we start taking a different approach. To avoid developers having to persist yet another object, we move `BestBlock` to storing some number of recent block hashes. This allows us to find the fork point with just the serialized state. In conjunction with 403dc1a48bb71ae794f6883ae0b760aad44cda39 (which allows us to disconnect blocks without having the stored header), this should allow us to replay chain state after a reorg even if we no longer have access to the top few blocks of the old chain tip. While we only really need to store `ANTI_REORG_DELAY` blocks (as we generally assume that any deeper reorg won't happen and thus we don't guarantee we handle it correctly), its nice to store a few more to be able to handle more than a six block reorg. While other parts of the codebase may not be entirely robust against such a reorg if the transactions confirmed change out from under us, its entirely possible (and, indeed, common) for reorgs to contain nearly identical transactions. --- lightning/src/chain/channelmonitor.rs | 22 +++-- lightning/src/chain/mod.rs | 121 +++++++++++++++++++++++++- lightning/src/ln/channelmanager.rs | 21 ++++- lightning/src/util/ser.rs | 27 ++++++ lightning/src/util/sweep.rs | 7 +- 5 files changed, 186 insertions(+), 12 deletions(-) diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 8e7b6035523..8202669f6a2 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -1755,6 +1755,7 @@ pub(crate) fn write_chanmon_internal( (34, channel_monitor.alternative_funding_confirmed, option), (35, channel_monitor.is_manual_broadcast, required), (37, channel_monitor.funding_seen_onchain, required), + (39, channel_monitor.best_block.previous_blocks, required), }); Ok(()) @@ -5390,9 +5391,6 @@ impl ChannelMonitorImpl { &mut self, header: &Header, txdata: &TransactionData, height: u32, broadcaster: B, fee_estimator: F, logger: &WithContext, ) -> Vec { - let block_hash = header.block_hash(); - self.best_block = BestBlock::new(block_hash, height); - let bounded_fee_estimator = LowerBoundedFeeEstimator::new(fee_estimator); self.transactions_confirmed(header, txdata, height, broadcaster, &bounded_fee_estimator, logger) } @@ -5408,7 +5406,11 @@ impl ChannelMonitorImpl { ) -> Vec { let block_hash = header.block_hash(); - if height > self.best_block.height { + if height == self.best_block.height + 1 { + self.best_block.advance(block_hash); + log_trace!(logger, "Connecting new block {} at height {}", block_hash, height); + self.block_confirmed(height, block_hash, vec![], vec![], vec![], &broadcaster, &fee_estimator, logger) + } else if height > self.best_block.height { self.best_block = BestBlock::new(block_hash, height); log_trace!(logger, "Connecting new block {} at height {}", block_hash, height); self.block_confirmed(height, block_hash, vec![], vec![], vec![], &broadcaster, &fee_estimator, logger) @@ -5682,7 +5684,9 @@ impl ChannelMonitorImpl { } } - if height > self.best_block.height { + if height == self.best_block.height + 1 { + self.best_block.advance(block_hash); + } else if height > self.best_block.height { self.best_block = BestBlock::new(block_hash, height); } @@ -6644,7 +6648,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP } } - let best_block = BestBlock::new(Readable::read(reader)?, Readable::read(reader)?); + let mut best_block = BestBlock::new(Readable::read(reader)?, Readable::read(reader)?); let waiting_threshold_conf_len: u64 = Readable::read(reader)?; let mut onchain_events_awaiting_threshold_conf = Vec::with_capacity(cmp::min(waiting_threshold_conf_len as usize, MAX_ALLOC_SIZE / 128)); @@ -6694,6 +6698,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP let mut alternative_funding_confirmed = None; let mut is_manual_broadcast = RequiredWrapper(None); let mut funding_seen_onchain = RequiredWrapper(None); + let mut best_block_previous_blocks = None; read_tlv_fields!(reader, { (1, funding_spend_confirmed, option), (3, htlcs_resolved_on_chain, optional_vec), @@ -6716,7 +6721,12 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP (34, alternative_funding_confirmed, option), (35, is_manual_broadcast, (default_value, false)), (37, funding_seen_onchain, (default_value, true)), + (39, best_block_previous_blocks, option), }); + if let Some(previous_blocks) = best_block_previous_blocks { + best_block.previous_blocks = previous_blocks; + } + // Note that `payment_preimages_with_info` was added (and is always written) in LDK 0.1, so // we can use it to determine if this monitor was last written by LDK 0.1 or later. let written_by_0_1_or_later = payment_preimages_with_info.is_some(); diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index bc47f1b1db6..2937b768178 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -18,7 +18,9 @@ use bitcoin::network::Network; use bitcoin::script::{Script, ScriptBuf}; use bitcoin::secp256k1::PublicKey; -use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, MonitorEvent}; +use crate::chain::channelmonitor::{ + ChannelMonitor, ChannelMonitorUpdate, MonitorEvent, ANTI_REORG_DELAY, +}; use crate::chain::transaction::{OutPoint, TransactionData}; use crate::ln::types::ChannelId; use crate::sign::ecdsa::EcdsaChannelSigner; @@ -43,13 +45,20 @@ pub struct BestBlock { pub block_hash: BlockHash, /// The height at which the block was confirmed. pub height: u32, + /// Previous blocks immediately before [`Self::block_hash`], in reverse chronological order. + /// + /// These ensure we can find the fork point of a reorg if our block source no longer has the + /// previous best tip after a restart. + pub previous_blocks: [Option; ANTI_REORG_DELAY as usize * 2], } impl BestBlock { /// Constructs a `BestBlock` that represents the genesis block at height 0 of the given /// network. pub fn from_network(network: Network) -> Self { - BestBlock { block_hash: genesis_block(network).header.block_hash(), height: 0 } + let block_hash = genesis_block(network).header.block_hash(); + let previous_blocks = [None; ANTI_REORG_DELAY as usize * 2]; + BestBlock { block_hash, height: 0, previous_blocks } } /// Returns a `BestBlock` as identified by the given block hash and height. @@ -57,13 +66,77 @@ impl BestBlock { /// This is not exported to bindings users directly as the bindings auto-generate an /// equivalent `new`. pub fn new(block_hash: BlockHash, height: u32) -> Self { - BestBlock { block_hash, height } + let previous_blocks = [None; ANTI_REORG_DELAY as usize * 2]; + BestBlock { block_hash, height, previous_blocks } + } + + /// Advances to a new block at height [`Self::height`] + 1. + pub fn advance(&mut self, new_hash: BlockHash) { + // Shift all block hashes to the right (making room for the old tip at index 0) + for i in (1..self.previous_blocks.len()).rev() { + self.previous_blocks[i] = self.previous_blocks[i - 1]; + } + + // The old tip becomes the new index 0 (tip-1) + self.previous_blocks[0] = Some(self.block_hash); + + // Update to the new tip + self.block_hash = new_hash; + self.height += 1; + } + + /// Returns the block hash at the given height, if available in our history. + pub fn get_hash_at_height(&self, height: u32) -> Option { + if height > self.height { + return None; + } + if height == self.height { + return Some(self.block_hash); + } + + // offset = 1 means we want tip-1, which is block_hashes[0] + // offset = 2 means we want tip-2, which is block_hashes[1], etc. + let offset = self.height.saturating_sub(height) as usize; + if offset >= 1 && offset <= self.previous_blocks.len() { + self.previous_blocks[offset - 1] + } else { + None + } + } + + /// Find the most recent common ancestor between two BestBlocks by searching their block hash + /// histories. + /// + /// Returns the common block hash and height, or None if no common block is found in the + /// available histories. + pub fn find_common_ancestor(&self, other: &BestBlock) -> Option<(BlockHash, u32)> { + // First check if either tip matches + if self.block_hash == other.block_hash && self.height == other.height { + return Some((self.block_hash, self.height)); + } + + // Check all heights covered by self's history + let min_height = self.height.saturating_sub(self.previous_blocks.len() as u32); + for check_height in (min_height..=self.height).rev() { + if let Some(self_hash) = self.get_hash_at_height(check_height) { + if let Some(other_hash) = other.get_hash_at_height(check_height) { + if self_hash == other_hash { + return Some((self_hash, check_height)); + } + } + } + } + None } } impl_writeable_tlv_based!(BestBlock, { (0, block_hash, required), + // Note that any change to the previous_blocks array length will change the serialization + // format and thus it is specified without constants here. + (1, previous_blocks_read, (legacy, [Option; 6 * 2], |_| Ok(()), |us: &BestBlock| Some(us.previous_blocks))), (2, height, required), + (unused, previous_blocks, (static_value, previous_blocks_read.unwrap_or([None; 6 * 2]))), }); /// The `Listen` trait is used to notify when blocks have been connected or disconnected from the @@ -495,3 +568,45 @@ impl ClaimId { ClaimId(Sha256::from_engine(engine).to_byte_array()) } } + +#[cfg(test)] +mod tests { + use super::*; + use bitcoin::hashes::Hash; + + #[test] + fn test_best_block() { + let hash1 = BlockHash::from_slice(&[1; 32]).unwrap(); + let mut chain_a = BestBlock::new(hash1, 100); + let mut chain_b = BestBlock::new(hash1, 100); + + // Test get_hash_at_height on initial block + assert_eq!(chain_a.get_hash_at_height(100), Some(hash1)); + assert_eq!(chain_a.get_hash_at_height(101), None); + assert_eq!(chain_a.get_hash_at_height(99), None); + + // Test find_common_ancestor with identical blocks + assert_eq!(chain_a.find_common_ancestor(&chain_b), Some((hash1, 100))); + + let hash2 = BlockHash::from_slice(&[2; 32]).unwrap(); + chain_a.advance(hash2); + assert_eq!(chain_a.height, 101); + assert_eq!(chain_a.block_hash, hash2); + assert_eq!(chain_a.previous_blocks[0], Some(hash1)); + assert_eq!(chain_a.get_hash_at_height(101), Some(hash2)); + assert_eq!(chain_a.get_hash_at_height(100), Some(hash1)); + + // Test find_common_ancestor with different heights + assert_eq!(chain_a.find_common_ancestor(&chain_b), Some((hash1, 100))); + + // Test find_common_ancestor with diverged chains but the same height + let hash_b3 = BlockHash::from_slice(&[33; 32]).unwrap(); + chain_b.advance(hash_b3); + assert_eq!(chain_a.find_common_ancestor(&chain_b), Some((hash1, 100))); + + // Test find_common_ancestor with no common history + let hash_other = BlockHash::from_slice(&[99; 32]).unwrap(); + let chain_c = BestBlock::new(hash_other, 200); + assert_eq!(chain_a.find_common_ancestor(&chain_c), None); + } +} diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 70617b20894..eeeb34ac0eb 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -15832,7 +15832,14 @@ impl< let _persistence_guard = PersistenceNotifierGuard::optionally_notify_skipping_background_events( self, || -> NotifyOption { NotifyOption::DoPersist }); - *self.best_block.write().unwrap() = BestBlock::new(block_hash, height); + { + let mut best_block = self.best_block.write().unwrap(); + if height == best_block.height + 1 { + best_block.advance(block_hash); + } else { + *best_block = BestBlock::new(block_hash, height); + } + } let mut min_anchor_feerate = None; let mut min_non_anchor_feerate = None; @@ -18160,6 +18167,7 @@ impl< (17, in_flight_monitor_updates, option), (19, peer_storage_dir, optional_vec), (21, WithoutLength(&self.flow.writeable_async_receive_offer_cache()), required), + (23, self.best_block.read().unwrap().previous_blocks, required), }); // Remove the SpliceFailed and DiscardFunding events added earlier. @@ -18247,6 +18255,7 @@ pub(super) struct ChannelManagerData { in_flight_monitor_updates: HashMap<(PublicKey, ChannelId), Vec>, peer_storage_dir: Vec<(PublicKey, Vec)>, async_receive_offer_cache: AsyncReceiveOfferCache, + best_block_previous_blocks: Option<[Option; ANTI_REORG_DELAY as usize * 2]>, // Marked `_legacy` because in versions > 0.2 we are taking steps to remove the requirement of // regularly persisting the `ChannelManager` and instead rebuild the set of HTLC forwards from // `Channel{Monitor}` data. @@ -18438,6 +18447,7 @@ impl<'a, ES: EntropySource, SP: SignerProvider, L: Logger> let mut inbound_payment_id_secret = None; let mut peer_storage_dir: Option)>> = None; let mut async_receive_offer_cache: AsyncReceiveOfferCache = AsyncReceiveOfferCache::new(); + let mut best_block_previous_blocks = None; read_tlv_fields!(reader, { (1, pending_outbound_payments_no_retry, option), (2, pending_intercepted_htlcs_legacy, option), @@ -18456,6 +18466,7 @@ impl<'a, ES: EntropySource, SP: SignerProvider, L: Logger> (17, in_flight_monitor_updates, option), (19, peer_storage_dir, optional_vec), (21, async_receive_offer_cache, (default_value, async_receive_offer_cache)), + (23, best_block_previous_blocks, option), }); // Merge legacy pending_outbound_payments fields into a single HashMap. @@ -18573,6 +18584,7 @@ impl<'a, ES: EntropySource, SP: SignerProvider, L: Logger> in_flight_monitor_updates: in_flight_monitor_updates.unwrap_or_default(), peer_storage_dir: peer_storage_dir.unwrap_or_default(), async_receive_offer_cache, + best_block_previous_blocks, version, }) } @@ -18875,6 +18887,7 @@ impl< mut in_flight_monitor_updates, peer_storage_dir, async_receive_offer_cache, + best_block_previous_blocks, version: _version, } = data; @@ -20064,7 +20077,11 @@ impl< } } - let best_block = BestBlock::new(best_block_hash, best_block_height); + let mut best_block = BestBlock::new(best_block_hash, best_block_height); + if let Some(previous_blocks) = best_block_previous_blocks { + best_block.previous_blocks = previous_blocks; + } + let flow = OffersMessageFlow::new( chain_hash, best_block, diff --git a/lightning/src/util/ser.rs b/lightning/src/util/ser.rs index 5d4b6b2df23..5b158a54ab9 100644 --- a/lightning/src/util/ser.rs +++ b/lightning/src/util/ser.rs @@ -1476,6 +1476,33 @@ impl Readable for BlockHash { } } +impl Writeable for [Option; 12] { + fn write(&self, w: &mut W) -> Result<(), io::Error> { + for hash_opt in self { + match hash_opt { + Some(hash) => hash.write(w)?, + None => ([0u8; 32]).write(w)?, + } + } + Ok(()) + } +} + +impl Readable for [Option; 12] { + fn read(r: &mut R) -> Result { + use bitcoin::hashes::Hash; + + let mut res = [None; 12]; + for hash_opt in res.iter_mut() { + let buf: [u8; 32] = Readable::read(r)?; + if buf != [0; 32] { + *hash_opt = Some(BlockHash::from_slice(&buf[..]).unwrap()); + } + } + Ok(res) + } +} + impl Writeable for ChainHash { fn write(&self, w: &mut W) -> Result<(), io::Error> { w.write_all(self.as_bytes()) diff --git a/lightning/src/util/sweep.rs b/lightning/src/util/sweep.rs index b70eb274085..5a46e05dd49 100644 --- a/lightning/src/util/sweep.rs +++ b/lightning/src/util/sweep.rs @@ -734,7 +734,12 @@ where fn best_block_updated_internal( &self, sweeper_state: &mut SweeperState, header: &Header, height: u32, ) { - sweeper_state.best_block = BestBlock::new(header.block_hash(), height); + let block_hash = header.block_hash(); + if height == sweeper_state.best_block.height + 1 { + sweeper_state.best_block.advance(block_hash); + } else { + sweeper_state.best_block = BestBlock::new(block_hash, height); + } self.prune_confirmed_outputs(sweeper_state); sweeper_state.dirty = true; From daacbd8afb19d9b5d1bc832d605c01897d4b4dcb Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 12 Oct 2025 14:08:16 +0000 Subject: [PATCH 02/12] Return `BestBlock` when deserializing chain-synced structs The deserialization of `ChannelMonitor`, `ChannelManager`, and `OutputSweeper` is implemented for a `(BlockHash, ...)` pair rather than on the object itself. This ensures developers are pushed to think about initial chain sync after deserialization and provides the latest chain sync state conviniently at deserialization-time. In the previous commit we started storing additional recent block hashes in `BestBlock` for use during initial sync to ensure we can handle reorgs while offline if the chain source loses the reorged-out blocks. Here, we move the deserialization routines to be on a `(BestBlock, ...)` pair instead of `(BlockHash, ...)`, providing access to those recent block hashes at deserialization-time. --- fuzz/src/chanmon_consistency.rs | 8 ++-- fuzz/src/chanmon_deser.rs | 8 ++-- lightning-block-sync/src/init.rs | 17 ++++---- lightning/src/chain/channelmonitor.rs | 16 ++++---- lightning/src/ln/chanmon_update_fail_tests.rs | 4 +- lightning/src/ln/channelmanager.rs | 24 ++++++------ lightning/src/ln/functional_test_utils.rs | 8 ++-- lightning/src/ln/functional_tests.rs | 7 ++-- lightning/src/ln/reload_tests.rs | 11 +++--- lightning/src/util/persist.rs | 39 ++++++++++--------- lightning/src/util/test_utils.rs | 9 +++-- 11 files changed, 74 insertions(+), 77 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 45e9a68cb63..7c7e700aee0 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -29,7 +29,7 @@ use bitcoin::transaction::{Transaction, TxOut}; use bitcoin::FeeRate; use bitcoin::block::Header; -use bitcoin::hash_types::{BlockHash, Txid}; +use bitcoin::hash_types::Txid; use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::hashes::sha256d::Hash as Sha256dHash; use bitcoin::hashes::Hash as TraitImport; @@ -331,7 +331,7 @@ impl chain::Watch for TestChainMonitor { .map(|(_, data)| data) .unwrap_or(&map_entry.persisted_monitor); let deserialized_monitor = - <(BlockHash, channelmonitor::ChannelMonitor)>::read( + <(BestBlock, channelmonitor::ChannelMonitor)>::read( &mut &latest_monitor_data[..], (&*self.keys, &*self.keys), ) @@ -1002,7 +1002,7 @@ pub fn do_test( // Use a different value of `use_old_mons` if we have another monitor (only for node B) // by shifting `use_old_mons` one in base-3. use_old_mons /= 3; - let mon = <(BlockHash, ChannelMonitor)>::read( + let mon = <(BestBlock, ChannelMonitor)>::read( &mut &serialized_mon[..], (&**keys, &**keys), ) @@ -1037,7 +1037,7 @@ pub fn do_test( }; let manager = - <(BlockHash, ChanMan)>::read(&mut &ser[..], read_args).expect("Failed to read manager"); + <(BestBlock, ChanMan)>::read(&mut &ser[..], read_args).expect("Failed to read manager"); let res = (manager.1, chain_monitor.clone()); for (channel_id, mon) in monitors.drain() { assert_eq!( diff --git a/fuzz/src/chanmon_deser.rs b/fuzz/src/chanmon_deser.rs index 4a4e79c83c1..be9ffe8f026 100644 --- a/fuzz/src/chanmon_deser.rs +++ b/fuzz/src/chanmon_deser.rs @@ -1,9 +1,7 @@ // This file is auto-generated by gen_target.sh based on msg_target_template.txt // To modify it, modify msg_target_template.txt and run gen_target.sh instead. -use bitcoin::hash_types::BlockHash; - -use lightning::chain::channelmonitor; +use lightning::chain::{channelmonitor, BestBlock}; use lightning::util::ser::{ReadableArgs, Writeable, Writer}; use lightning::util::test_channel_signer::TestChannelSigner; use lightning::util::test_utils::OnlyReadsKeysInterface; @@ -23,14 +21,14 @@ impl Writer for VecWriter { #[inline] pub fn do_test(data: &[u8], _out: Out) { if let Ok((latest_block_hash, monitor)) = - <(BlockHash, channelmonitor::ChannelMonitor)>::read( + <(BestBlock, channelmonitor::ChannelMonitor)>::read( &mut Cursor::new(data), (&OnlyReadsKeysInterface {}, &OnlyReadsKeysInterface {}), ) { let mut w = VecWriter(Vec::new()); monitor.write(&mut w).unwrap(); let deserialized_copy = - <(BlockHash, channelmonitor::ChannelMonitor)>::read( + <(BestBlock, channelmonitor::ChannelMonitor)>::read( &mut Cursor::new(&w.0), (&OnlyReadsKeysInterface {}, &OnlyReadsKeysInterface {}), ) diff --git a/lightning-block-sync/src/init.rs b/lightning-block-sync/src/init.rs index a870f8ca88c..61f44c6139e 100644 --- a/lightning-block-sync/src/init.rs +++ b/lightning-block-sync/src/init.rs @@ -40,11 +40,10 @@ where /// switching to [`SpvClient`]. For example: /// /// ``` -/// use bitcoin::hash_types::BlockHash; /// use bitcoin::network::Network; /// /// use lightning::chain; -/// use lightning::chain::Watch; +/// use lightning::chain::{BestBlock, Watch}; /// use lightning::chain::chainmonitor; /// use lightning::chain::chainmonitor::ChainMonitor; /// use lightning::chain::channelmonitor::ChannelMonitor; @@ -89,14 +88,14 @@ where /// logger: &L, /// persister: &P, /// ) { -/// // Read a serialized channel monitor paired with the block hash when it was persisted. +/// // Read a serialized channel monitor paired with the best block when it was persisted. /// let serialized_monitor = "..."; -/// let (monitor_block_hash, mut monitor) = <(BlockHash, ChannelMonitor)>::read( +/// let (monitor_best_block, mut monitor) = <(BestBlock, ChannelMonitor)>::read( /// &mut Cursor::new(&serialized_monitor), (entropy_source, signer_provider)).unwrap(); /// -/// // Read the channel manager paired with the block hash when it was persisted. +/// // Read the channel manager paired with the best block when it was persisted. /// let serialized_manager = "..."; -/// let (manager_block_hash, mut manager) = { +/// let (manager_best_block, mut manager) = { /// let read_args = ChannelManagerReadArgs::new( /// entropy_source, /// node_signer, @@ -110,7 +109,7 @@ where /// config, /// vec![&mut monitor], /// ); -/// <(BlockHash, ChannelManager<&ChainMonitor, &T, &ES, &NS, &SP, &F, &R, &MR, &L>)>::read( +/// <(BestBlock, ChannelManager<&ChainMonitor, &T, &ES, &NS, &SP, &F, &R, &MR, &L>)>::read( /// &mut Cursor::new(&serialized_manager), read_args).unwrap() /// }; /// @@ -118,8 +117,8 @@ where /// let mut cache = UnboundedCache::new(); /// let mut monitor_listener = (monitor, &*tx_broadcaster, &*fee_estimator, &*logger); /// let listeners = vec![ -/// (monitor_block_hash, &monitor_listener as &dyn chain::Listen), -/// (manager_block_hash, &manager as &dyn chain::Listen), +/// (monitor_best_block.block_hash, &monitor_listener as &dyn chain::Listen), +/// (manager_best_block.block_hash, &manager as &dyn chain::Listen), /// ]; /// let chain_tip = init::synchronize_listeners( /// block_source, Network::Bitcoin, &mut cache, listeners).await.unwrap(); diff --git a/lightning/src/chain/channelmonitor.rs b/lightning/src/chain/channelmonitor.rs index 8202669f6a2..d09504f7aad 100644 --- a/lightning/src/chain/channelmonitor.rs +++ b/lightning/src/chain/channelmonitor.rs @@ -1058,7 +1058,7 @@ impl Readable for IrrevocablyResolvedHTLC { /// You MUST ensure that no ChannelMonitors for a given channel anywhere contain out-of-date /// information and are actively monitoring the chain. /// -/// Like the [`ChannelManager`], deserialization is implemented for `(BlockHash, ChannelMonitor)`, +/// Like the [`ChannelManager`], deserialization is implemented for `(BestBlock, ChannelMonitor)`, /// providing you with the last block hash which was connected before shutting down. You must begin /// syncing the chain from that point, disconnecting and connecting blocks as required to get to /// the best chain on startup. Note that all [`ChannelMonitor`]s passed to a [`ChainMonitor`] must @@ -1066,7 +1066,7 @@ impl Readable for IrrevocablyResolvedHTLC { /// initialization. /// /// For those loading potentially-ancient [`ChannelMonitor`]s, deserialization is also implemented -/// for `Option<(BlockHash, ChannelMonitor)>`. LDK can no longer deserialize a [`ChannelMonitor`] +/// for `Option<(BestBlock, ChannelMonitor)>`. LDK can no longer deserialize a [`ChannelMonitor`] /// that was first created in LDK prior to 0.0.110 and last updated prior to LDK 0.0.119. In such /// cases, the `Option<(..)>` deserialization option may return `Ok(None)` rather than failing to /// deserialize, allowing you to differentiate between the two cases. @@ -6473,7 +6473,7 @@ where const MAX_ALLOC_SIZE: usize = 64 * 1024; impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP)> - for (BlockHash, ChannelMonitor) + for (BestBlock, ChannelMonitor) { fn read(reader: &mut R, args: (&'a ES, &'b SP)) -> Result { match >::read(reader, args) { @@ -6485,7 +6485,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP } impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP)> - for Option<(BlockHash, ChannelMonitor)> + for Option<(BestBlock, ChannelMonitor)> { #[rustfmt::skip] fn read(reader: &mut R, args: (&'a ES, &'b SP)) -> Result { @@ -6919,7 +6919,7 @@ impl<'a, 'b, ES: EntropySource, SP: SignerProvider> ReadableArgs<(&'a ES, &'b SP To continue, run a v0.1 release, send/route a payment over the channel or close it."); } } - Ok(Some((best_block.block_hash, monitor))) + Ok(Some((best_block, monitor))) } } @@ -6991,7 +6991,7 @@ pub(super) fn dummy_monitor( #[cfg(test)] mod tests { use bitcoin::amount::Amount; - use bitcoin::hash_types::{BlockHash, Txid}; + use bitcoin::hash_types::Txid; use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::hashes::Hash; use bitcoin::hex::FromHex; @@ -7017,7 +7017,7 @@ mod tests { weight_revoked_received_htlc, WEIGHT_REVOKED_OUTPUT, }; use crate::chain::transaction::OutPoint; - use crate::chain::Confirm; + use crate::chain::{BestBlock, Confirm}; use crate::io; use crate::ln::chan_utils::{self, HTLCOutputInCommitment, HolderCommitmentTransaction}; use crate::ln::channel_keys::{ @@ -7083,7 +7083,7 @@ mod tests { nodes[1].chain_monitor.chain_monitor.transactions_confirmed(&new_header, &[(0, broadcast_tx)], conf_height); - let (_, pre_update_monitor) = <(BlockHash, ChannelMonitor<_>)>::read( + let (_, pre_update_monitor) = <(BestBlock, ChannelMonitor<_>)>::read( &mut io::Cursor::new(&get_monitor!(nodes[1], channel.2).encode()), (&nodes[1].keys_manager.backing, &nodes[1].keys_manager.backing)).unwrap(); diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 623d028560f..03a65e432ba 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -16,7 +16,7 @@ use crate::chain::chaininterface::LowerBoundedFeeEstimator; use crate::chain::chainmonitor::ChainMonitor; use crate::chain::channelmonitor::{ChannelMonitor, MonitorEvent, ANTI_REORG_DELAY}; use crate::chain::transaction::OutPoint; -use crate::chain::{ChannelMonitorUpdateStatus, Listen, Watch}; +use crate::chain::{BestBlock, ChannelMonitorUpdateStatus, Listen, Watch}; use crate::events::{ClosureReason, Event, HTLCHandlingFailureType, PaymentPurpose}; use crate::ln::channel::AnnouncementSigsState; use crate::ln::channelmanager::{PaymentId, RAACommitmentOrder}; @@ -90,7 +90,7 @@ fn test_monitor_and_persister_update_fail() { let chain_mon = { let new_monitor = { let monitor = nodes[0].chain_monitor.chain_monitor.get_monitor(chan.2).unwrap(); - let (_, new_monitor) = <(BlockHash, ChannelMonitor)>::read( + let (_, new_monitor) = <(BestBlock, ChannelMonitor)>::read( &mut &monitor.encode()[..], (nodes[0].keys_manager, nodes[0].keys_manager), ) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index eeeb34ac0eb..f2fab09f3f0 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -2048,7 +2048,6 @@ impl< /// detailed in the [`ChannelManagerReadArgs`] documentation. /// /// ``` -/// use bitcoin::BlockHash; /// use bitcoin::network::Network; /// use lightning::chain::BestBlock; /// # use lightning::chain::channelmonitor::ChannelMonitor; @@ -2097,8 +2096,8 @@ impl< /// entropy_source, node_signer, signer_provider, fee_estimator, chain_monitor, tx_broadcaster, /// router, message_router, logger, config, channel_monitors.iter().collect(), /// ); -/// let (block_hash, channel_manager) = -/// <(BlockHash, ChannelManager<_, _, _, _, _, _, _, _, _>)>::read(&mut reader, args)?; +/// let (best_block, channel_manager) = +/// <(BestBlock, ChannelManager<_, _, _, _, _, _, _, _, _>)>::read(&mut reader, args)?; /// /// // Update the ChannelManager and ChannelMonitors with the latest chain data /// // ... @@ -2665,7 +2664,7 @@ impl< /// [`read`], those channels will be force-closed based on the `ChannelMonitor` state and no funds /// will be lost (modulo on-chain transaction fees). /// -/// Note that the deserializer is only implemented for `(`[`BlockHash`]`, `[`ChannelManager`]`)`, which +/// Note that the deserializer is only implemented for `(`[`BestBlock`]`, `[`ChannelManager`]`)`, which /// tells you the last block hash which was connected. You should get the best block tip before using the manager. /// See [`chain::Listen`] and [`chain::Confirm`] for more details. /// @@ -2732,7 +2731,6 @@ impl< /// [`peer_disconnected`]: msgs::BaseMessageHandler::peer_disconnected /// [`funding_created`]: msgs::FundingCreated /// [`funding_transaction_generated`]: Self::funding_transaction_generated -/// [`BlockHash`]: bitcoin::hash_types::BlockHash /// [`update_channel`]: chain::Watch::update_channel /// [`ChannelUpdate`]: msgs::ChannelUpdate /// [`read`]: ReadableArgs::read @@ -18596,7 +18594,7 @@ impl<'a, ES: EntropySource, SP: SignerProvider, L: Logger> /// is: /// 1) Deserialize all stored [`ChannelMonitor`]s. /// 2) Deserialize the [`ChannelManager`] by filling in this struct and calling: -/// `<(BlockHash, ChannelManager)>::read(reader, args)` +/// `<(BestBlock, ChannelManager)>::read(reader, args)` /// This may result in closing some channels if the [`ChannelMonitor`] is newer than the stored /// [`ChannelManager`] state to ensure no loss of funds. Thus, transactions may be broadcasted. /// 3) If you are not fetching full blocks, register all relevant [`ChannelMonitor`] outpoints the @@ -18797,14 +18795,14 @@ impl< MR: MessageRouter, L: Logger + Clone, > ReadableArgs> - for (BlockHash, Arc>) + for (BestBlock, Arc>) { fn read( reader: &mut Reader, args: ChannelManagerReadArgs<'a, M, T, ES, NS, SP, F, R, MR, L>, ) -> Result { - let (blockhash, chan_manager) = - <(BlockHash, ChannelManager)>::read(reader, args)?; - Ok((blockhash, Arc::new(chan_manager))) + let (best_block, chan_manager) = + <(BestBlock, ChannelManager)>::read(reader, args)?; + Ok((best_block, Arc::new(chan_manager))) } } @@ -18820,7 +18818,7 @@ impl< MR: MessageRouter, L: Logger + Clone, > ReadableArgs> - for (BlockHash, ChannelManager) + for (BestBlock, ChannelManager) { fn read( reader: &mut Reader, args: ChannelManagerReadArgs<'a, M, T, ES, NS, SP, F, R, MR, L>, @@ -18864,7 +18862,7 @@ impl< pub(super) fn from_channel_manager_data( data: ChannelManagerData, mut args: ChannelManagerReadArgs<'_, M, T, ES, NS, SP, F, R, MR, L>, - ) -> Result<(BlockHash, Self), DecodeError> { + ) -> Result<(BestBlock, Self), DecodeError> { let ChannelManagerData { chain_hash, best_block_height, @@ -20493,7 +20491,7 @@ impl< //TODO: Broadcast channel update for closed channels, but only after we've made a //connection or two. - Ok((best_block_hash, channel_manager)) + Ok((best_block, channel_manager)) } } diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index 596b2420ca2..89ec48ff7ce 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -851,7 +851,7 @@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> { let mon = self.chain_monitor.chain_monitor.get_monitor(channel_id).unwrap(); mon.write(&mut w).unwrap(); let (_, deserialized_monitor) = - <(BlockHash, ChannelMonitor)>::read( + <(BestBlock, ChannelMonitor)>::read( &mut io::Cursor::new(&w.0), (self.keys_manager, self.keys_manager), ) @@ -880,7 +880,7 @@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> { let mut w = test_utils::TestVecWriter(Vec::new()); self.node.write(&mut w).unwrap(); <( - BlockHash, + BestBlock, ChannelManager< &test_utils::TestChainMonitor, &test_utils::TestBroadcaster, @@ -1319,7 +1319,7 @@ pub fn _reload_node<'a, 'b, 'c>( let mut monitors_read = Vec::with_capacity(monitors_encoded.len()); for encoded in monitors_encoded { let mut monitor_read = &encoded[..]; - let (_, monitor) = <(BlockHash, ChannelMonitor)>::read( + let (_, monitor) = <(BestBlock, ChannelMonitor)>::read( &mut monitor_read, (node.keys_manager, node.keys_manager), ) @@ -1334,7 +1334,7 @@ pub fn _reload_node<'a, 'b, 'c>( for monitor in monitors_read.iter() { assert!(channel_monitors.insert(monitor.channel_id(), monitor).is_none()); } - <(BlockHash, TestChannelManager<'b, 'c>)>::read( + <(BestBlock, TestChannelManager<'b, 'c>)>::read( &mut node_read, ChannelManagerReadArgs { config, diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index c98cfa53b86..7fb39486bf4 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -19,6 +19,7 @@ use crate::chain::channelmonitor::{ LATENCY_GRACE_PERIOD_BLOCKS, }; use crate::chain::transaction::OutPoint; +use crate::chain::BestBlock; use crate::chain::{ChannelMonitorUpdateStatus, Confirm, Listen, Watch}; use crate::events::{ ClosureReason, Event, HTLCHandlingFailureType, PathFailure, PaymentFailureReason, @@ -7379,7 +7380,7 @@ pub fn test_update_err_monitor_lockdown() { let new_monitor = { let monitor = nodes[0].chain_monitor.chain_monitor.get_monitor(chan_1.2).unwrap(); let new_monitor = - <(BlockHash, channelmonitor::ChannelMonitor)>::read( + <(BestBlock, channelmonitor::ChannelMonitor)>::read( &mut io::Cursor::new(&monitor.encode()), (nodes[0].keys_manager, nodes[0].keys_manager), ) @@ -7487,7 +7488,7 @@ pub fn test_concurrent_monitor_claim() { let new_monitor = { let monitor = nodes[0].chain_monitor.chain_monitor.get_monitor(chan_1.2).unwrap(); let new_monitor = - <(BlockHash, channelmonitor::ChannelMonitor)>::read( + <(BestBlock, channelmonitor::ChannelMonitor)>::read( &mut io::Cursor::new(&monitor.encode()), (nodes[0].keys_manager, nodes[0].keys_manager), ) @@ -7537,7 +7538,7 @@ pub fn test_concurrent_monitor_claim() { let new_monitor = { let monitor = nodes[0].chain_monitor.chain_monitor.get_monitor(chan_1.2).unwrap(); let new_monitor = - <(BlockHash, channelmonitor::ChannelMonitor)>::read( + <(BestBlock, channelmonitor::ChannelMonitor)>::read( &mut io::Cursor::new(&monitor.encode()), (nodes[0].keys_manager, nodes[0].keys_manager), ) diff --git a/lightning/src/ln/reload_tests.rs b/lightning/src/ln/reload_tests.rs index bb730f8fba8..80a314adccc 100644 --- a/lightning/src/ln/reload_tests.rs +++ b/lightning/src/ln/reload_tests.rs @@ -11,7 +11,7 @@ //! Functional tests which test for correct behavior across node restarts. -use crate::chain::{ChannelMonitorUpdateStatus, Watch}; +use crate::chain::{BestBlock, ChannelMonitorUpdateStatus, Watch}; use crate::chain::chaininterface::LowerBoundedFeeEstimator; use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateStep}; use crate::routing::router::{PaymentParameters, RouteParameters}; @@ -30,7 +30,6 @@ use crate::util::ser::{Writeable, ReadableArgs}; use crate::util::config::{HTLCInterceptionFlags, UserConfig}; use bitcoin::hashes::Hash; -use bitcoin::hash_types::BlockHash; use types::payment::{PaymentHash, PaymentPreimage}; use crate::prelude::*; @@ -412,7 +411,7 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() { let mut node_0_stale_monitors = Vec::new(); for serialized in node_0_stale_monitors_serialized.iter() { let mut read = &serialized[..]; - let (_, monitor) = <(BlockHash, ChannelMonitor)>::read(&mut read, (keys_manager, keys_manager)).unwrap(); + let (_, monitor) = <(BestBlock, ChannelMonitor)>::read(&mut read, (keys_manager, keys_manager)).unwrap(); assert!(read.is_empty()); node_0_stale_monitors.push(monitor); } @@ -420,14 +419,14 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() { let mut node_0_monitors = Vec::new(); for serialized in node_0_monitors_serialized.iter() { let mut read = &serialized[..]; - let (_, monitor) = <(BlockHash, ChannelMonitor)>::read(&mut read, (keys_manager, keys_manager)).unwrap(); + let (_, monitor) = <(BestBlock, ChannelMonitor)>::read(&mut read, (keys_manager, keys_manager)).unwrap(); assert!(read.is_empty()); node_0_monitors.push(monitor); } let mut nodes_0_read = &nodes_0_serialized[..]; if let Err(msgs::DecodeError::DangerousValue) = - <(BlockHash, ChannelManager<&test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestRouter, &test_utils::TestMessageRouter, &test_utils::TestLogger>)>::read(&mut nodes_0_read, ChannelManagerReadArgs { + <(BestBlock, ChannelManager<&test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestRouter, &test_utils::TestMessageRouter, &test_utils::TestLogger>)>::read(&mut nodes_0_read, ChannelManagerReadArgs { config: UserConfig::default(), entropy_source: keys_manager, node_signer: keys_manager, @@ -446,7 +445,7 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() { let mut nodes_0_read = &nodes_0_serialized[..]; let (_, nodes_0_deserialized_tmp) = - <(BlockHash, ChannelManager<&test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestRouter, &test_utils::TestMessageRouter, &test_utils::TestLogger>)>::read(&mut nodes_0_read, ChannelManagerReadArgs { + <(BestBlock, ChannelManager<&test_utils::TestChainMonitor, &test_utils::TestBroadcaster, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestKeysInterface, &test_utils::TestFeeEstimator, &test_utils::TestRouter, &test_utils::TestMessageRouter, &test_utils::TestLogger>)>::read(&mut nodes_0_read, ChannelManagerReadArgs { config: UserConfig::default(), entropy_source: keys_manager, node_signer: keys_manager, diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index f27ccc1cbac..7df63aa5ac9 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -14,7 +14,7 @@ use alloc::sync::Arc; use bitcoin::hashes::hex::FromHex; -use bitcoin::{BlockHash, Txid}; +use bitcoin::Txid; use core::convert::Infallible; use core::fmt; @@ -33,6 +33,7 @@ use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use crate::chain::chainmonitor::Persist; use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate}; use crate::chain::transaction::OutPoint; +use crate::chain::BestBlock; use crate::ln::types::ChannelId; use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider}; use crate::sync::Mutex; @@ -653,7 +654,7 @@ impl Persist( kv_store: K, entropy_source: ES, signer_provider: SP, -) -> Result)>, io::Error> +) -> Result)>, io::Error> where K::Target: KVStoreSync, { @@ -663,7 +664,7 @@ where CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, )? { - match )>>::read( + match )>>::read( &mut io::Cursor::new(kv_store.read( CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE, @@ -671,7 +672,7 @@ where )?), (&entropy_source, &signer_provider), ) { - Ok(Some((block_hash, channel_monitor))) => { + Ok(Some((best_block, channel_monitor))) => { let monitor_name = MonitorName::from_str(&stored_key)?; if channel_monitor.persistence_key() != monitor_name { return Err(io::Error::new( @@ -680,7 +681,7 @@ where )); } - res.push((block_hash, channel_monitor)); + res.push((best_block, channel_monitor)); }, Ok(None) => {}, Err(_) => { @@ -856,7 +857,7 @@ where /// Reads all stored channel monitors, along with any stored updates for them. pub fn read_all_channel_monitors_with_updates( &self, - ) -> Result)>, io::Error> { + ) -> Result)>, io::Error> { poll_sync_future(self.0.read_all_channel_monitors_with_updates()) } @@ -877,7 +878,7 @@ where /// function to accomplish this. Take care to limit the number of parallel readers. pub fn read_channel_monitor_with_updates( &self, monitor_key: &str, - ) -> Result<(BlockHash, ChannelMonitor), io::Error> { + ) -> Result<(BestBlock, ChannelMonitor), io::Error> { poll_sync_future(self.0.read_channel_monitor_with_updates(monitor_key)) } @@ -1044,7 +1045,7 @@ impl< /// deserialization as well. pub async fn read_all_channel_monitors_with_updates( &self, - ) -> Result)>, io::Error> { + ) -> Result)>, io::Error> { let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE; let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE; let monitor_list = self.0.kv_store.list(primary, secondary).await?; @@ -1075,7 +1076,7 @@ impl< /// `Arc` that can live for `'static` and be sent and accessed across threads. pub async fn read_all_channel_monitors_with_updates_parallel( self: &Arc, - ) -> Result)>, io::Error> + ) -> Result)>, io::Error> where K: MaybeSend + MaybeSync + 'static, L: MaybeSend + MaybeSync + 'static, @@ -1125,7 +1126,7 @@ impl< /// function to accomplish this. Take care to limit the number of parallel readers. pub async fn read_channel_monitor_with_updates( &self, monitor_key: &str, - ) -> Result<(BlockHash, ChannelMonitor), io::Error> { + ) -> Result<(BestBlock, ChannelMonitor), io::Error> { self.0.read_channel_monitor_with_updates(monitor_key).await } @@ -1236,7 +1237,7 @@ impl< { pub async fn read_channel_monitor_with_updates( &self, monitor_key: &str, - ) -> Result<(BlockHash, ChannelMonitor), io::Error> { + ) -> Result<(BestBlock, ChannelMonitor), io::Error> { match self.maybe_read_channel_monitor_with_updates(monitor_key).await? { Some(res) => Ok(res), None => Err(io::Error::new( @@ -1253,14 +1254,14 @@ impl< async fn maybe_read_channel_monitor_with_updates( &self, monitor_key: &str, - ) -> Result)>, io::Error> { + ) -> Result)>, io::Error> { let monitor_name = MonitorName::from_str(monitor_key)?; let read_future = pin!(self.maybe_read_monitor(&monitor_name, monitor_key)); let list_future = pin!(self .kv_store .list(CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE, monitor_key)); let (read_res, list_res) = TwoFutureJoiner::new(read_future, list_future).await; - let (block_hash, monitor) = match read_res? { + let (best_block, monitor) = match read_res? { Some(res) => res, None => return Ok(None), }; @@ -1291,13 +1292,13 @@ impl< io::Error::new(io::ErrorKind::Other, "Monitor update failed") })?; } - Ok(Some((block_hash, monitor))) + Ok(Some((best_block, monitor))) } /// Read a channel monitor. async fn maybe_read_monitor( &self, monitor_name: &MonitorName, monitor_key: &str, - ) -> Result)>, io::Error> { + ) -> Result)>, io::Error> { let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE; let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE; let monitor_bytes = self.kv_store.read(primary, secondary, monitor_key).await?; @@ -1306,12 +1307,12 @@ impl< if monitor_cursor.get_ref().starts_with(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL) { monitor_cursor.set_position(MONITOR_UPDATING_PERSISTER_PREPEND_SENTINEL.len() as u64); } - match )>>::read( + match )>>::read( &mut monitor_cursor, (&self.entropy_source, &self.signer_provider), ) { Ok(None) => Ok(None), - Ok(Some((blockhash, channel_monitor))) => { + Ok(Some((best_block, channel_monitor))) => { if channel_monitor.persistence_key() != *monitor_name { log_error!( self.logger, @@ -1323,7 +1324,7 @@ impl< "ChannelMonitor was stored under the wrong key", )) } else { - Ok(Some((blockhash, channel_monitor))) + Ok(Some((best_block, channel_monitor))) } }, Err(e) => { @@ -1502,7 +1503,7 @@ impl< async fn archive_persisted_channel(&self, monitor_name: MonitorName) { let monitor_key = monitor_name.to_string(); let monitor = match self.read_channel_monitor_with_updates(&monitor_key).await { - Ok((_block_hash, monitor)) => monitor, + Ok((_best_block, monitor)) => monitor, Err(_) => return, }; let primary = ARCHIVED_CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE; diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index d31c16ccbf0..aa77f515cd9 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -20,6 +20,7 @@ use crate::chain::channelmonitor::{ ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateStep, MonitorEvent, }; use crate::chain::transaction::OutPoint; +use crate::chain::BestBlock; use crate::chain::WatchedOutput; #[cfg(any(test, feature = "_externalize_tests"))] use crate::ln::chan_utils::CommitmentTransaction; @@ -66,7 +67,7 @@ use bitcoin::amount::Amount; use bitcoin::block::Block; use bitcoin::constants::genesis_block; use bitcoin::constants::ChainHash; -use bitcoin::hash_types::{BlockHash, Txid}; +use bitcoin::hash_types::Txid; use bitcoin::hashes::{hex::FromHex, Hash}; use bitcoin::network::Network; use bitcoin::script::{Builder, Script, ScriptBuf}; @@ -607,7 +608,7 @@ impl<'a> TestChainMonitor<'a> { // underlying `ChainMonitor`. let mut w = TestVecWriter(Vec::new()); monitor.write(&mut w).unwrap(); - let new_monitor = <(BlockHash, ChannelMonitor)>::read( + let new_monitor = <(BestBlock, ChannelMonitor)>::read( &mut io::Cursor::new(&w.0), (self.keys_manager, self.keys_manager), ) @@ -644,7 +645,7 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { // monitor to a serialized copy and get he same one back. let mut w = TestVecWriter(Vec::new()); monitor.write(&mut w).unwrap(); - let new_monitor = <(BlockHash, ChannelMonitor)>::read( + let new_monitor = <(BestBlock, ChannelMonitor)>::read( &mut io::Cursor::new(&w.0), (self.keys_manager, self.keys_manager), ) @@ -700,7 +701,7 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { let monitor = self.chain_monitor.get_monitor(channel_id).unwrap(); w.0.clear(); monitor.write(&mut w).unwrap(); - let new_monitor = <(BlockHash, ChannelMonitor)>::read( + let new_monitor = <(BestBlock, ChannelMonitor)>::read( &mut io::Cursor::new(&w.0), (self.keys_manager, self.keys_manager), ) From ff3f4089bfa0c2e7bcd1c2487159e436f1f437b5 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 12 Oct 2025 16:01:55 +0000 Subject: [PATCH 03/12] Replace `Cache::block_disconnected` with `blocks_disconnected` In 403dc1a48bb71ae794f6883ae0b760aad44cda39 we converted the `Listen` disconnect semantics to only pass the fork point, rather than each block being disconnected. We did not, however, update the semantics of `lightning-block-sync`'s `Cache` to reduce patch size. Here we go ahead and do so, dropping `ChainDifference::disconnected_blocks` as well as its no longer needed. --- lightning-block-sync/src/init.rs | 8 +++---- lightning-block-sync/src/lib.rs | 37 +++++++++++++------------------- 2 files changed, 19 insertions(+), 26 deletions(-) diff --git a/lightning-block-sync/src/init.rs b/lightning-block-sync/src/init.rs index 61f44c6139e..07575c6f523 100644 --- a/lightning-block-sync/src/init.rs +++ b/lightning-block-sync/src/init.rs @@ -175,7 +175,9 @@ where let mut chain_notifier = ChainNotifier { header_cache, chain_listener }; let difference = chain_notifier.find_difference(best_header, &old_header, &mut chain_poller).await?; - chain_notifier.disconnect_blocks(difference.disconnected_blocks); + if difference.common_ancestor != old_header { + chain_notifier.disconnect_blocks(difference.common_ancestor); + } (difference.common_ancestor, difference.connected_blocks) }; @@ -215,9 +217,7 @@ impl<'a, C: Cache> Cache for ReadOnlyCache<'a, C> { unreachable!() } - fn block_disconnected(&mut self, _block_hash: &BlockHash) -> Option { - None - } + fn blocks_disconnected(&mut self, _fork_point: &ValidatedBlockHeader) {} } /// Wrapper for supporting dynamically sized chain listeners. diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index 02593047658..3b9b137f21f 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -202,9 +202,11 @@ pub trait Cache { /// disconnected later if needed. fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader); - /// Called when a block has been disconnected from the best chain. Once disconnected, a block's - /// header is no longer needed and thus can be removed. - fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option; + /// Called when blocks have been disconnected from the best chain. Only the fork point + /// (best common ancestor) is provided. + /// + /// Once disconnected, a block's header is no longer needed and thus can be removed. + fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader); } /// Unbounded cache of block headers keyed by block hash. @@ -219,8 +221,8 @@ impl Cache for UnboundedCache { self.insert(block_hash, block_header); } - fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option { - self.remove(block_hash) + fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader) { + self.retain(|_, block_info| block_info.height < fork_point.height); } } @@ -315,9 +317,6 @@ struct ChainDifference { /// If there are any disconnected blocks, this is where the chain forked. common_ancestor: ValidatedBlockHeader, - /// Blocks that were disconnected from the chain since the last poll. - disconnected_blocks: Vec, - /// Blocks that were connected to the chain since the last poll. connected_blocks: Vec, } @@ -341,7 +340,9 @@ where .find_difference(new_header, old_header, chain_poller) .await .map_err(|e| (e, None))?; - self.disconnect_blocks(difference.disconnected_blocks); + if difference.common_ancestor != *old_header { + self.disconnect_blocks(difference.common_ancestor); + } self.connect_blocks(difference.common_ancestor, difference.connected_blocks, chain_poller) .await } @@ -354,7 +355,6 @@ where &self, current_header: ValidatedBlockHeader, prev_header: &ValidatedBlockHeader, chain_poller: &mut P, ) -> BlockSourceResult { - let mut disconnected_blocks = Vec::new(); let mut connected_blocks = Vec::new(); let mut current = current_header; let mut previous = *prev_header; @@ -369,7 +369,6 @@ where let current_height = current.height; let previous_height = previous.height; if current_height <= previous_height { - disconnected_blocks.push(previous); previous = self.look_up_previous_header(chain_poller, &previous).await?; } if current_height >= previous_height { @@ -379,7 +378,7 @@ where } let common_ancestor = current; - Ok(ChainDifference { common_ancestor, disconnected_blocks, connected_blocks }) + Ok(ChainDifference { common_ancestor, connected_blocks }) } /// Returns the previous header for the given header, either by looking it up in the cache or @@ -394,16 +393,10 @@ where } /// Notifies the chain listeners of disconnected blocks. - fn disconnect_blocks(&mut self, disconnected_blocks: Vec) { - for header in disconnected_blocks.iter() { - if let Some(cached_header) = self.header_cache.block_disconnected(&header.block_hash) { - assert_eq!(cached_header, *header); - } - } - if let Some(block) = disconnected_blocks.last() { - let fork_point = BestBlock::new(block.header.prev_blockhash, block.height - 1); - self.chain_listener.blocks_disconnected(fork_point); - } + fn disconnect_blocks(&mut self, fork_point: ValidatedBlockHeader) { + self.header_cache.blocks_disconnected(&fork_point); + let best_block = BestBlock::new(fork_point.block_hash, fork_point.height); + self.chain_listener.blocks_disconnected(best_block); } /// Notifies the chain listeners of connected blocks. From 1a9ecc699fe4497bd56ace8a804b1507b2818d79 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 12 Oct 2025 15:49:11 +0000 Subject: [PATCH 04/12] Pass a `BestBlock` to `init::synchronize_listeners` On restart, LDK expects the chain to be replayed starting from where it was when objects were last serialized. This is fine in the normal case, but if there was a reorg and the node which we were syncing from either resynced or was changed, the last block that we were synced as of might no longer be available. As a result, it becomes impossible to figure out where the fork point is, and thus to replay the chain. Luckily, changing the block source during a reorg isn't exactly common, but we shouldn't end up with a bricked node. To address this, `lightning-block-sync` allows the user to pass in `Cache` which can be used to cache recent blocks and thus allow for reorg handling in this case. However, serialization for, and a reasonable default implementation of a `Cache` was never built. Instead, here, we start taking a different approach. To avoid developers having to persist yet another object, we move `BestBlock` to storing some number of recent block hashes. This allows us to find the fork point with just the serialized state. In a previous commit, we moved deserialization of various structs to return the `BestBlock` rather than a `BlockHash`. Here we move to actually using it, taking a `BestBlock` in place of `BlockHash` to `init::synchronize_listeners` and walking the `previous_blocks` list to find the fork point rather than relying on the `Cache`. --- lightning-block-sync/src/init.rs | 51 ++++++++++---------------- lightning-block-sync/src/lib.rs | 45 ++++++++++++++++++++++- lightning-block-sync/src/poll.rs | 13 +++++++ lightning-block-sync/src/test_utils.rs | 17 +++++++++ 4 files changed, 93 insertions(+), 33 deletions(-) diff --git a/lightning-block-sync/src/init.rs b/lightning-block-sync/src/init.rs index 07575c6f523..4fdd3efabd8 100644 --- a/lightning-block-sync/src/init.rs +++ b/lightning-block-sync/src/init.rs @@ -117,8 +117,8 @@ where /// let mut cache = UnboundedCache::new(); /// let mut monitor_listener = (monitor, &*tx_broadcaster, &*fee_estimator, &*logger); /// let listeners = vec![ -/// (monitor_best_block.block_hash, &monitor_listener as &dyn chain::Listen), -/// (manager_best_block.block_hash, &manager as &dyn chain::Listen), +/// (monitor_best_block, &monitor_listener as &dyn chain::Listen), +/// (manager_best_block, &manager as &dyn chain::Listen), /// ]; /// let chain_tip = init::synchronize_listeners( /// block_source, Network::Bitcoin, &mut cache, listeners).await.unwrap(); @@ -143,39 +143,28 @@ pub async fn synchronize_listeners< L: chain::Listen + ?Sized, >( block_source: B, network: Network, header_cache: &mut C, - mut chain_listeners: Vec<(BlockHash, &L)>, + mut chain_listeners: Vec<(BestBlock, &L)>, ) -> BlockSourceResult where B::Target: BlockSource, { let best_header = validate_best_block_header(&*block_source).await?; - // Fetch the header for the block hash paired with each listener. - let mut chain_listeners_with_old_headers = Vec::new(); - for (old_block_hash, chain_listener) in chain_listeners.drain(..) { - let old_header = match header_cache.look_up(&old_block_hash) { - Some(header) => *header, - None => { - block_source.get_header(&old_block_hash, None).await?.validate(old_block_hash)? - }, - }; - chain_listeners_with_old_headers.push((old_header, chain_listener)) - } - // Find differences and disconnect blocks for each listener individually. let mut chain_poller = ChainPoller::new(block_source, network); let mut chain_listeners_at_height = Vec::new(); let mut most_common_ancestor = None; let mut most_connected_blocks = Vec::new(); - for (old_header, chain_listener) in chain_listeners_with_old_headers.drain(..) { + for (old_best_block, chain_listener) in chain_listeners.drain(..) { // Disconnect any stale blocks, but keep them in the cache for the next iteration. let header_cache = &mut ReadOnlyCache(header_cache); let (common_ancestor, connected_blocks) = { let chain_listener = &DynamicChainListener(chain_listener); let mut chain_notifier = ChainNotifier { header_cache, chain_listener }; - let difference = - chain_notifier.find_difference(best_header, &old_header, &mut chain_poller).await?; - if difference.common_ancestor != old_header { + let difference = chain_notifier + .find_difference_from_best_block(best_header, old_best_block, &mut chain_poller) + .await?; + if difference.common_ancestor.block_hash != old_best_block.block_hash { chain_notifier.disconnect_blocks(difference.common_ancestor); } (difference.common_ancestor, difference.connected_blocks) @@ -281,9 +270,9 @@ mod tests { let listener_3 = MockChainListener::new().expect_block_connected(*chain.at_height(4)); let listeners = vec![ - (chain.at_height(1).block_hash, &listener_1 as &dyn chain::Listen), - (chain.at_height(2).block_hash, &listener_2 as &dyn chain::Listen), - (chain.at_height(3).block_hash, &listener_3 as &dyn chain::Listen), + (chain.best_block_at_height(1), &listener_1 as &dyn chain::Listen), + (chain.best_block_at_height(2), &listener_2 as &dyn chain::Listen), + (chain.best_block_at_height(3), &listener_3 as &dyn chain::Listen), ]; let mut cache = chain.header_cache(0..=4); match synchronize_listeners(&chain, Network::Bitcoin, &mut cache, listeners).await { @@ -313,9 +302,9 @@ mod tests { .expect_block_connected(*main_chain.at_height(4)); let listeners = vec![ - (fork_chain_1.tip().block_hash, &listener_1 as &dyn chain::Listen), - (fork_chain_2.tip().block_hash, &listener_2 as &dyn chain::Listen), - (fork_chain_3.tip().block_hash, &listener_3 as &dyn chain::Listen), + (fork_chain_1.best_block(), &listener_1 as &dyn chain::Listen), + (fork_chain_2.best_block(), &listener_2 as &dyn chain::Listen), + (fork_chain_3.best_block(), &listener_3 as &dyn chain::Listen), ]; let mut cache = fork_chain_1.header_cache(2..=4); cache.extend(fork_chain_2.header_cache(3..=4)); @@ -350,9 +339,9 @@ mod tests { .expect_block_connected(*main_chain.at_height(4)); let listeners = vec![ - (fork_chain_1.tip().block_hash, &listener_1 as &dyn chain::Listen), - (fork_chain_2.tip().block_hash, &listener_2 as &dyn chain::Listen), - (fork_chain_3.tip().block_hash, &listener_3 as &dyn chain::Listen), + (fork_chain_1.best_block(), &listener_1 as &dyn chain::Listen), + (fork_chain_2.best_block(), &listener_2 as &dyn chain::Listen), + (fork_chain_3.best_block(), &listener_3 as &dyn chain::Listen), ]; let mut cache = fork_chain_1.header_cache(2..=4); cache.extend(fork_chain_2.header_cache(3..=4)); @@ -368,18 +357,18 @@ mod tests { let main_chain = Blockchain::default().with_height(2); let fork_chain = main_chain.fork_at_height(1); let new_tip = main_chain.tip(); - let old_tip = fork_chain.tip(); + let old_best_block = fork_chain.best_block(); let listener = MockChainListener::new() .expect_blocks_disconnected(*fork_chain.at_height(1)) .expect_block_connected(*new_tip); - let listeners = vec![(old_tip.block_hash, &listener as &dyn chain::Listen)]; + let listeners = vec![(old_best_block, &listener as &dyn chain::Listen)]; let mut cache = fork_chain.header_cache(2..=2); match synchronize_listeners(&main_chain, Network::Bitcoin, &mut cache, listeners).await { Ok(_) => { assert!(cache.contains_key(&new_tip.block_hash)); - assert!(cache.contains_key(&old_tip.block_hash)); + assert!(cache.contains_key(&old_best_block.block_hash)); }, Err(e) => panic!("Unexpected error: {:?}", e), } diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index 3b9b137f21f..ba583c2737e 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -337,7 +337,7 @@ where chain_poller: &mut P, ) -> Result<(), (BlockSourceError, Option)> { let difference = self - .find_difference(new_header, old_header, chain_poller) + .find_difference_from_header(new_header, old_header, chain_poller) .await .map_err(|e| (e, None))?; if difference.common_ancestor != *old_header { @@ -347,11 +347,52 @@ where .await } + /// Returns the changes needed to produce the chain with `current_header` as its tip from the + /// chain with `prev_best_block` as its tip. + /// + /// First resolves `prev_best_block` to a `ValidatedBlockHeader` using the `previous_blocks` + /// field as fallback if needed, then finds the common ancestor. + async fn find_difference_from_best_block( + &self, current_header: ValidatedBlockHeader, prev_best_block: BestBlock, + chain_poller: &mut P, + ) -> BlockSourceResult { + // Try to resolve the header for the previous best block. First try the block_hash, + // then fall back to previous_blocks if that fails. + let cur_tip = core::iter::once((0, &prev_best_block.block_hash)); + let prev_tips = + prev_best_block.previous_blocks.iter().enumerate().filter_map(|(idx, hash_opt)| { + if let Some(block_hash) = hash_opt { + Some((idx as u32 + 1, block_hash)) + } else { + None + } + }); + let mut found_header = None; + for (height_diff, block_hash) in cur_tip.chain(prev_tips) { + if let Some(header) = self.header_cache.look_up(block_hash) { + found_header = Some(*header); + break; + } + let height = prev_best_block.height.checked_sub(height_diff).ok_or( + BlockSourceError::persistent("BestBlock had more previous_blocks than its height"), + )?; + if let Ok(header) = chain_poller.get_header(block_hash, Some(height)).await { + found_header = Some(header); + break; + } + } + let found_header = found_header.ok_or_else(|| { + BlockSourceError::persistent("could not resolve any block from BestBlock") + })?; + + self.find_difference_from_header(current_header, &found_header, chain_poller).await + } + /// Returns the changes needed to produce the chain with `current_header` as its tip from the /// chain with `prev_header` as its tip. /// /// Walks backwards from `current_header` and `prev_header`, finding the common ancestor. - async fn find_difference( + async fn find_difference_from_header( &self, current_header: ValidatedBlockHeader, prev_header: &ValidatedBlockHeader, chain_poller: &mut P, ) -> BlockSourceResult { diff --git a/lightning-block-sync/src/poll.rs b/lightning-block-sync/src/poll.rs index 13e0403c3b6..fd8c546c56f 100644 --- a/lightning-block-sync/src/poll.rs +++ b/lightning-block-sync/src/poll.rs @@ -31,6 +31,11 @@ pub trait Poll { fn fetch_block<'a>( &'a self, header: &'a ValidatedBlockHeader, ) -> impl Future> + Send + 'a; + + /// Returns the header for a given hash and optional height hint. + fn get_header<'a>( + &'a self, block_hash: &'a BlockHash, height_hint: Option, + ) -> impl Future> + Send + 'a; } /// A chain tip relative to another chain tip in terms of block hash and chainwork. @@ -258,6 +263,14 @@ impl + Sized + Send + Sync, T: BlockSource + ?Sized> Poll ) -> impl Future> + Send + 'a { async move { self.block_source.get_block(&header.block_hash).await?.validate(header.block_hash) } } + + fn get_header<'a>( + &'a self, block_hash: &'a BlockHash, height_hint: Option, + ) -> impl Future> + Send + 'a { + Box::pin(async move { + self.block_source.get_header(block_hash, height_hint).await?.validate(*block_hash) + }) + } } #[cfg(test)] diff --git a/lightning-block-sync/src/test_utils.rs b/lightning-block-sync/src/test_utils.rs index 40788e4d08c..3d7870afb1e 100644 --- a/lightning-block-sync/src/test_utils.rs +++ b/lightning-block-sync/src/test_utils.rs @@ -104,6 +104,18 @@ impl Blockchain { block_header.validate(block_hash).unwrap() } + pub fn best_block_at_height(&self, height: usize) -> BestBlock { + let mut previous_blocks = [None; 12]; + for (i, height) in (0..height).rev().take(12).enumerate() { + previous_blocks[i] = Some(self.blocks[height].block_hash()); + } + BestBlock { + height: height as u32, + block_hash: self.blocks[height].block_hash(), + previous_blocks, + } + } + fn at_height_unvalidated(&self, height: usize) -> BlockHeaderData { assert!(!self.blocks.is_empty()); assert!(height < self.blocks.len()); @@ -123,6 +135,11 @@ impl Blockchain { self.at_height(self.blocks.len() - 1) } + pub fn best_block(&self) -> BestBlock { + assert!(!self.blocks.is_empty()); + self.best_block_at_height(self.blocks.len() - 1) + } + pub fn disconnect_tip(&mut self) -> Option { self.blocks.pop() } From 9c130357208fb1861eb9600c367aeb58ae29d94f Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 27 Jan 2026 00:37:20 +0000 Subject: [PATCH 05/12] Make the `Cache` trait priv, just use `UnboundedCache` publicly In the previous commit, we moved to relying on `BestBlock::previous_blocks` to find the fork point in `lightning-block-sync`'s `init::synchronize_listeners`. Here we now drop the `Cache` parameter as we no longer rely on it. Because we now have no reason to want a persistent `Cache`, we remove the trait from the public interface. However, to keep disconnections reliable we return the `UnboundedCache` we built up during initial sync from `init::synchronize_listeners` which we expect developers to pass to `SpvClient::new`. --- lightning-block-sync/src/init.rs | 94 +++++++------------------------- lightning-block-sync/src/lib.rs | 61 ++++++++++++--------- 2 files changed, 56 insertions(+), 99 deletions(-) diff --git a/lightning-block-sync/src/init.rs b/lightning-block-sync/src/init.rs index 4fdd3efabd8..5e1c344e3f5 100644 --- a/lightning-block-sync/src/init.rs +++ b/lightning-block-sync/src/init.rs @@ -2,7 +2,7 @@ //! from disk. use crate::poll::{ChainPoller, Validate, ValidatedBlockHeader}; -use crate::{BlockSource, BlockSourceResult, Cache, ChainNotifier}; +use crate::{BlockSource, BlockSourceResult, Cache, ChainNotifier, UnboundedCache}; use bitcoin::block::Header; use bitcoin::hash_types::BlockHash; @@ -32,9 +32,9 @@ where /// Performs a one-time sync of chain listeners using a single *trusted* block source, bringing each /// listener's view of the chain from its paired block hash to `block_source`'s best chain tip. /// -/// Upon success, the returned header can be used to initialize [`SpvClient`]. In the case of -/// failure, each listener may be left at a different block hash than the one it was originally -/// paired with. +/// Upon success, the returned header and header cache can be used to initialize [`SpvClient`]. In +/// the case of failure, each listener may be left at a different block hash than the one it was +/// originally paired with. /// /// Useful during startup to bring the [`ChannelManager`] and each [`ChannelMonitor`] in sync before /// switching to [`SpvClient`]. For example: @@ -114,14 +114,13 @@ where /// }; /// /// // Synchronize any channel monitors and the channel manager to be on the best block. -/// let mut cache = UnboundedCache::new(); /// let mut monitor_listener = (monitor, &*tx_broadcaster, &*fee_estimator, &*logger); /// let listeners = vec![ /// (monitor_best_block, &monitor_listener as &dyn chain::Listen), /// (manager_best_block, &manager as &dyn chain::Listen), /// ]; -/// let chain_tip = init::synchronize_listeners( -/// block_source, Network::Bitcoin, &mut cache, listeners).await.unwrap(); +/// let (chain_cache, chain_tip) = init::synchronize_listeners( +/// block_source, Network::Bitcoin, listeners).await.unwrap(); /// /// // Allow the chain monitor to watch any channels. /// let monitor = monitor_listener.0; @@ -130,21 +129,16 @@ where /// // Create an SPV client to notify the chain monitor and channel manager of block events. /// let chain_poller = poll::ChainPoller::new(block_source, Network::Bitcoin); /// let mut chain_listener = (chain_monitor, &manager); -/// let spv_client = SpvClient::new(chain_tip, chain_poller, &mut cache, &chain_listener); +/// let spv_client = SpvClient::new(chain_tip, chain_poller, chain_cache, &chain_listener); /// } /// ``` /// /// [`SpvClient`]: crate::SpvClient /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor -pub async fn synchronize_listeners< - B: Deref + Sized + Send + Sync, - C: Cache, - L: chain::Listen + ?Sized, ->( - block_source: B, network: Network, header_cache: &mut C, - mut chain_listeners: Vec<(BestBlock, &L)>, -) -> BlockSourceResult +pub async fn synchronize_listeners( + block_source: B, network: Network, mut chain_listeners: Vec<(BestBlock, &L)>, +) -> BlockSourceResult<(UnboundedCache, ValidatedBlockHeader)> where B::Target: BlockSource, { @@ -155,12 +149,13 @@ where let mut chain_listeners_at_height = Vec::new(); let mut most_common_ancestor = None; let mut most_connected_blocks = Vec::new(); + let mut header_cache = UnboundedCache::new(); for (old_best_block, chain_listener) in chain_listeners.drain(..) { // Disconnect any stale blocks, but keep them in the cache for the next iteration. - let header_cache = &mut ReadOnlyCache(header_cache); let (common_ancestor, connected_blocks) = { let chain_listener = &DynamicChainListener(chain_listener); - let mut chain_notifier = ChainNotifier { header_cache, chain_listener }; + let mut chain_notifier = + ChainNotifier { header_cache: &mut header_cache, chain_listener }; let difference = chain_notifier .find_difference_from_best_block(best_header, old_best_block, &mut chain_poller) .await?; @@ -181,32 +176,14 @@ where // Connect new blocks for all listeners at once to avoid re-fetching blocks. if let Some(common_ancestor) = most_common_ancestor { let chain_listener = &ChainListenerSet(chain_listeners_at_height); - let mut chain_notifier = ChainNotifier { header_cache, chain_listener }; + let mut chain_notifier = ChainNotifier { header_cache: &mut header_cache, chain_listener }; chain_notifier .connect_blocks(common_ancestor, most_connected_blocks, &mut chain_poller) .await .map_err(|(e, _)| e)?; } - Ok(best_header) -} - -/// A wrapper to make a cache read-only. -/// -/// Used to prevent losing headers that may be needed to disconnect blocks common to more than one -/// listener. -struct ReadOnlyCache<'a, C: Cache>(&'a mut C); - -impl<'a, C: Cache> Cache for ReadOnlyCache<'a, C> { - fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> { - self.0.look_up(block_hash) - } - - fn block_connected(&mut self, _block_hash: BlockHash, _block_header: ValidatedBlockHeader) { - unreachable!() - } - - fn blocks_disconnected(&mut self, _fork_point: &ValidatedBlockHeader) {} + Ok((header_cache, best_header)) } /// Wrapper for supporting dynamically sized chain listeners. @@ -274,9 +251,8 @@ mod tests { (chain.best_block_at_height(2), &listener_2 as &dyn chain::Listen), (chain.best_block_at_height(3), &listener_3 as &dyn chain::Listen), ]; - let mut cache = chain.header_cache(0..=4); - match synchronize_listeners(&chain, Network::Bitcoin, &mut cache, listeners).await { - Ok(header) => assert_eq!(header, chain.tip()), + match synchronize_listeners(&chain, Network::Bitcoin, listeners).await { + Ok((_, header)) => assert_eq!(header, chain.tip()), Err(e) => panic!("Unexpected error: {:?}", e), } } @@ -306,11 +282,8 @@ mod tests { (fork_chain_2.best_block(), &listener_2 as &dyn chain::Listen), (fork_chain_3.best_block(), &listener_3 as &dyn chain::Listen), ]; - let mut cache = fork_chain_1.header_cache(2..=4); - cache.extend(fork_chain_2.header_cache(3..=4)); - cache.extend(fork_chain_3.header_cache(4..=4)); - match synchronize_listeners(&main_chain, Network::Bitcoin, &mut cache, listeners).await { - Ok(header) => assert_eq!(header, main_chain.tip()), + match synchronize_listeners(&main_chain, Network::Bitcoin, listeners).await { + Ok((_, header)) => assert_eq!(header, main_chain.tip()), Err(e) => panic!("Unexpected error: {:?}", e), } } @@ -343,33 +316,8 @@ mod tests { (fork_chain_2.best_block(), &listener_2 as &dyn chain::Listen), (fork_chain_3.best_block(), &listener_3 as &dyn chain::Listen), ]; - let mut cache = fork_chain_1.header_cache(2..=4); - cache.extend(fork_chain_2.header_cache(3..=4)); - cache.extend(fork_chain_3.header_cache(4..=4)); - match synchronize_listeners(&main_chain, Network::Bitcoin, &mut cache, listeners).await { - Ok(header) => assert_eq!(header, main_chain.tip()), - Err(e) => panic!("Unexpected error: {:?}", e), - } - } - - #[tokio::test] - async fn cache_connected_and_keep_disconnected_blocks() { - let main_chain = Blockchain::default().with_height(2); - let fork_chain = main_chain.fork_at_height(1); - let new_tip = main_chain.tip(); - let old_best_block = fork_chain.best_block(); - - let listener = MockChainListener::new() - .expect_blocks_disconnected(*fork_chain.at_height(1)) - .expect_block_connected(*new_tip); - - let listeners = vec![(old_best_block, &listener as &dyn chain::Listen)]; - let mut cache = fork_chain.header_cache(2..=2); - match synchronize_listeners(&main_chain, Network::Bitcoin, &mut cache, listeners).await { - Ok(_) => { - assert!(cache.contains_key(&new_tip.block_hash)); - assert!(cache.contains_key(&old_best_block.block_hash)); - }, + match synchronize_listeners(&main_chain, Network::Bitcoin, listeners).await { + Ok((_, header)) => assert_eq!(header, main_chain.tip()), Err(e) => panic!("Unexpected error: {:?}", e), } } diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index ba583c2737e..e94096ccc58 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -170,18 +170,13 @@ pub enum BlockData { /// sources for the best chain tip. During this process it detects any chain forks, determines which /// constitutes the best chain, and updates the listener accordingly with any blocks that were /// connected or disconnected since the last poll. -/// -/// Block headers for the best chain are maintained in the parameterized cache, allowing for a -/// custom cache eviction policy. This offers flexibility to those sensitive to resource usage. -/// Hence, there is a trade-off between a lower memory footprint and potentially increased network -/// I/O as headers are re-fetched during fork detection. -pub struct SpvClient<'a, P: Poll, C: Cache, L: Deref> +pub struct SpvClient where L::Target: chain::Listen, { chain_tip: ValidatedBlockHeader, chain_poller: P, - chain_notifier: ChainNotifier<'a, C, L>, + chain_notifier: ChainNotifier, } /// The `Cache` trait defines behavior for managing a block header cache, where block headers are @@ -194,7 +189,7 @@ where /// Implementations may define how long to retain headers such that it's unlikely they will ever be /// needed to disconnect a block. In cases where block sources provide access to headers on stale /// forks reliably, caches may be entirely unnecessary. -pub trait Cache { +pub(crate) trait Cache { /// Retrieves the block header keyed by the given block hash. fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader>; @@ -226,7 +221,21 @@ impl Cache for UnboundedCache { } } -impl<'a, P: Poll, C: Cache, L: Deref> SpvClient<'a, P, C, L> +impl Cache for &mut UnboundedCache { + fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> { + self.get(block_hash) + } + + fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { + self.insert(block_hash, block_header); + } + + fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader) { + self.retain(|_, block_info| block_info.height < fork_point.height); + } +} + +impl SpvClient where L::Target: chain::Listen, { @@ -241,7 +250,7 @@ where /// /// [`poll_best_tip`]: SpvClient::poll_best_tip pub fn new( - chain_tip: ValidatedBlockHeader, chain_poller: P, header_cache: &'a mut C, + chain_tip: ValidatedBlockHeader, chain_poller: P, header_cache: UnboundedCache, chain_listener: L, ) -> Self { let chain_notifier = ChainNotifier { header_cache, chain_listener }; @@ -295,15 +304,15 @@ where /// Notifies [listeners] of blocks that have been connected or disconnected from the chain. /// /// [listeners]: lightning::chain::Listen -pub struct ChainNotifier<'a, C: Cache, L: Deref> +pub(crate) struct ChainNotifier where L::Target: chain::Listen, { /// Cache for looking up headers before fetching from a block source. - header_cache: &'a mut C, + pub(crate) header_cache: C, /// Listener that will be notified of connected or disconnected blocks. - chain_listener: L, + pub(crate) chain_listener: L, } /// Changes made to the chain between subsequent polls that transformed it from having one chain tip @@ -321,7 +330,7 @@ struct ChainDifference { connected_blocks: Vec, } -impl<'a, C: Cache, L: Deref> ChainNotifier<'a, C, L> +impl ChainNotifier where L::Target: chain::Listen, { @@ -481,9 +490,9 @@ mod spv_client_tests { let best_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let mut cache = UnboundedCache::new(); + let cache = UnboundedCache::new(); let mut listener = NullChainListener {}; - let mut client = SpvClient::new(best_tip, poller, &mut cache, &mut listener); + let mut client = SpvClient::new(best_tip, poller, cache, &mut listener); match client.poll_best_tip().await { Err(e) => { assert_eq!(e.kind(), BlockSourceErrorKind::Persistent); @@ -500,9 +509,9 @@ mod spv_client_tests { let common_tip = chain.tip(); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let mut cache = UnboundedCache::new(); + let cache = UnboundedCache::new(); let mut listener = NullChainListener {}; - let mut client = SpvClient::new(common_tip, poller, &mut cache, &mut listener); + let mut client = SpvClient::new(common_tip, poller, cache, &mut listener); match client.poll_best_tip().await { Err(e) => panic!("Unexpected error: {:?}", e), Ok((chain_tip, blocks_connected)) => { @@ -520,9 +529,9 @@ mod spv_client_tests { let old_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let mut cache = UnboundedCache::new(); + let cache = UnboundedCache::new(); let mut listener = NullChainListener {}; - let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener); + let mut client = SpvClient::new(old_tip, poller, cache, &mut listener); match client.poll_best_tip().await { Err(e) => panic!("Unexpected error: {:?}", e), Ok((chain_tip, blocks_connected)) => { @@ -540,9 +549,9 @@ mod spv_client_tests { let old_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let mut cache = UnboundedCache::new(); + let cache = UnboundedCache::new(); let mut listener = NullChainListener {}; - let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener); + let mut client = SpvClient::new(old_tip, poller, cache, &mut listener); match client.poll_best_tip().await { Err(e) => panic!("Unexpected error: {:?}", e), Ok((chain_tip, blocks_connected)) => { @@ -560,9 +569,9 @@ mod spv_client_tests { let old_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let mut cache = UnboundedCache::new(); + let cache = UnboundedCache::new(); let mut listener = NullChainListener {}; - let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener); + let mut client = SpvClient::new(old_tip, poller, cache, &mut listener); match client.poll_best_tip().await { Err(e) => panic!("Unexpected error: {:?}", e), Ok((chain_tip, blocks_connected)) => { @@ -581,9 +590,9 @@ mod spv_client_tests { let worse_tip = chain.tip(); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let mut cache = UnboundedCache::new(); + let cache = UnboundedCache::new(); let mut listener = NullChainListener {}; - let mut client = SpvClient::new(best_tip, poller, &mut cache, &mut listener); + let mut client = SpvClient::new(best_tip, poller, cache, &mut listener); match client.poll_best_tip().await { Err(e) => panic!("Unexpected error: {:?}", e), Ok((chain_tip, blocks_connected)) => { From 0ac3f4386e57d049a67d19a734e1df438aa6fd88 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 27 Jan 2026 00:37:58 +0000 Subject: [PATCH 06/12] Make `UnboundedCache` bounded In the previous commit we moved to hard-coding `UnboundedCache` in the `lightning-block-sync` interface. This is great, except that its an unbounded cache that can use arbitrary amounts of memory (though never really all that much - its just headers that come in while we're running). Here we simply limit the size, and while we're at it give it a more generic `HeaderCache` name. --- lightning-block-sync/src/init.rs | 7 ++-- lightning-block-sync/src/lib.rs | 52 +++++++++++++++++--------- lightning-block-sync/src/test_utils.rs | 9 +++-- 3 files changed, 42 insertions(+), 26 deletions(-) diff --git a/lightning-block-sync/src/init.rs b/lightning-block-sync/src/init.rs index 5e1c344e3f5..ddb90d6d97f 100644 --- a/lightning-block-sync/src/init.rs +++ b/lightning-block-sync/src/init.rs @@ -2,10 +2,9 @@ //! from disk. use crate::poll::{ChainPoller, Validate, ValidatedBlockHeader}; -use crate::{BlockSource, BlockSourceResult, Cache, ChainNotifier, UnboundedCache}; +use crate::{BlockSource, BlockSourceResult, ChainNotifier, HeaderCache}; use bitcoin::block::Header; -use bitcoin::hash_types::BlockHash; use bitcoin::network::Network; use lightning::chain; @@ -138,7 +137,7 @@ where /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor pub async fn synchronize_listeners( block_source: B, network: Network, mut chain_listeners: Vec<(BestBlock, &L)>, -) -> BlockSourceResult<(UnboundedCache, ValidatedBlockHeader)> +) -> BlockSourceResult<(HeaderCache, ValidatedBlockHeader)> where B::Target: BlockSource, { @@ -149,7 +148,7 @@ where let mut chain_listeners_at_height = Vec::new(); let mut most_common_ancestor = None; let mut most_connected_blocks = Vec::new(); - let mut header_cache = UnboundedCache::new(); + let mut header_cache = HeaderCache::new(); for (old_best_block, chain_listener) in chain_listeners.drain(..) { // Disconnect any stale blocks, but keep them in the cache for the next iteration. let (common_ancestor, connected_blocks) = { diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index e94096ccc58..c9cffa272d1 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -176,7 +176,7 @@ where { chain_tip: ValidatedBlockHeader, chain_poller: P, - chain_notifier: ChainNotifier, + chain_notifier: ChainNotifier, } /// The `Cache` trait defines behavior for managing a block header cache, where block headers are @@ -204,34 +204,50 @@ pub(crate) trait Cache { fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader); } -/// Unbounded cache of block headers keyed by block hash. -pub type UnboundedCache = std::collections::HashMap; +/// The maximum number of [`ValidatedBlockHeader`]s stored in a [`HeaderCache`]. +pub const HEADER_CACHE_LIMIT: u32 = 6 * 24 * 7; -impl Cache for UnboundedCache { +/// Bounded cache of block headers keyed by block hash. +/// +/// Retains only the latest [`HEADER_CACHE_LIMIT`] block headers based on height. +pub struct HeaderCache(std::collections::HashMap); + +impl HeaderCache { + /// Creates a new empty header cache. + pub fn new() -> Self { + Self(std::collections::HashMap::new()) + } +} + +impl Cache for HeaderCache { fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> { - self.get(block_hash) + self.0.get(block_hash) } fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { - self.insert(block_hash, block_header); + self.0.insert(block_hash, block_header); + + // Remove headers older than a week. + let cutoff_height = block_header.height.saturating_sub(HEADER_CACHE_LIMIT); + self.0.retain(|_, header| header.height >= cutoff_height); } fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader) { - self.retain(|_, block_info| block_info.height < fork_point.height); + self.0.retain(|_, block_info| block_info.height <= fork_point.height); } } -impl Cache for &mut UnboundedCache { +impl Cache for &mut HeaderCache { fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> { - self.get(block_hash) + self.0.get(block_hash) } fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { - self.insert(block_hash, block_header); + (*self).block_connected(block_hash, block_header); } fn blocks_disconnected(&mut self, fork_point: &ValidatedBlockHeader) { - self.retain(|_, block_info| block_info.height < fork_point.height); + self.0.retain(|_, block_info| block_info.height <= fork_point.height); } } @@ -250,7 +266,7 @@ where /// /// [`poll_best_tip`]: SpvClient::poll_best_tip pub fn new( - chain_tip: ValidatedBlockHeader, chain_poller: P, header_cache: UnboundedCache, + chain_tip: ValidatedBlockHeader, chain_poller: P, header_cache: HeaderCache, chain_listener: L, ) -> Self { let chain_notifier = ChainNotifier { header_cache, chain_listener }; @@ -490,7 +506,7 @@ mod spv_client_tests { let best_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let cache = UnboundedCache::new(); + let cache = HeaderCache::new(); let mut listener = NullChainListener {}; let mut client = SpvClient::new(best_tip, poller, cache, &mut listener); match client.poll_best_tip().await { @@ -509,7 +525,7 @@ mod spv_client_tests { let common_tip = chain.tip(); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let cache = UnboundedCache::new(); + let cache = HeaderCache::new(); let mut listener = NullChainListener {}; let mut client = SpvClient::new(common_tip, poller, cache, &mut listener); match client.poll_best_tip().await { @@ -529,7 +545,7 @@ mod spv_client_tests { let old_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let cache = UnboundedCache::new(); + let cache = HeaderCache::new(); let mut listener = NullChainListener {}; let mut client = SpvClient::new(old_tip, poller, cache, &mut listener); match client.poll_best_tip().await { @@ -549,7 +565,7 @@ mod spv_client_tests { let old_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let cache = UnboundedCache::new(); + let cache = HeaderCache::new(); let mut listener = NullChainListener {}; let mut client = SpvClient::new(old_tip, poller, cache, &mut listener); match client.poll_best_tip().await { @@ -569,7 +585,7 @@ mod spv_client_tests { let old_tip = chain.at_height(1); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let cache = UnboundedCache::new(); + let cache = HeaderCache::new(); let mut listener = NullChainListener {}; let mut client = SpvClient::new(old_tip, poller, cache, &mut listener); match client.poll_best_tip().await { @@ -590,7 +606,7 @@ mod spv_client_tests { let worse_tip = chain.tip(); let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); - let cache = UnboundedCache::new(); + let cache = HeaderCache::new(); let mut listener = NullChainListener {}; let mut client = SpvClient::new(best_tip, poller, cache, &mut listener); match client.poll_best_tip().await { diff --git a/lightning-block-sync/src/test_utils.rs b/lightning-block-sync/src/test_utils.rs index 3d7870afb1e..89cb3e81d60 100644 --- a/lightning-block-sync/src/test_utils.rs +++ b/lightning-block-sync/src/test_utils.rs @@ -1,6 +1,7 @@ use crate::poll::{Validate, ValidatedBlockHeader}; use crate::{ - BlockData, BlockHeaderData, BlockSource, BlockSourceError, BlockSourceResult, UnboundedCache, + BlockData, BlockHeaderData, BlockSource, BlockSourceError, BlockSourceResult, Cache, + HeaderCache, }; use bitcoin::block::{Block, Header, Version}; @@ -144,12 +145,12 @@ impl Blockchain { self.blocks.pop() } - pub fn header_cache(&self, heights: std::ops::RangeInclusive) -> UnboundedCache { - let mut cache = UnboundedCache::new(); + pub fn header_cache(&self, heights: std::ops::RangeInclusive) -> HeaderCache { + let mut cache = HeaderCache::new(); for i in heights { let value = self.at_height(i); let key = value.header.block_hash(); - assert!(cache.insert(key, value).is_none()); + cache.block_connected(key, value); } cache } From 046c792ae25082654218e4c676710de13b67b7c2 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 8 Dec 2025 01:10:30 +0000 Subject: [PATCH 07/12] Consolidate all the pub aync utils to `native_async` --- fuzz/src/chanmon_consistency.rs | 2 +- fuzz/src/full_stack.rs | 4 +-- lightning-background-processor/src/lib.rs | 4 +-- lightning/src/chain/chainmonitor.rs | 3 +-- lightning/src/ln/funding.rs | 2 +- lightning/src/sign/mod.rs | 2 +- lightning/src/util/async_poll.rs | 28 -------------------- lightning/src/util/mod.rs | 2 +- lightning/src/util/native_async.rs | 31 ++++++++++++++++++++++- lightning/src/util/persist.rs | 4 +-- lightning/src/util/test_utils.rs | 2 +- lightning/src/util/wallet_utils.rs | 3 ++- 12 files changed, 44 insertions(+), 43 deletions(-) diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 7c7e700aee0..2cf450efad4 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -74,11 +74,11 @@ use lightning::sign::{ SignerProvider, }; use lightning::types::payment::{PaymentHash, PaymentPreimage, PaymentSecret}; -use lightning::util::async_poll::{MaybeSend, MaybeSync}; use lightning::util::config::UserConfig; use lightning::util::errors::APIError; use lightning::util::hash_tables::*; use lightning::util::logger::Logger; +use lightning::util::native_async::{MaybeSend, MaybeSync}; use lightning::util::ser::{LengthReadable, ReadableArgs, Writeable, Writer}; use lightning::util::test_channel_signer::{EnforcementState, SignerOp, TestChannelSigner}; use lightning::util::test_utils::TestWalletSource; diff --git a/fuzz/src/full_stack.rs b/fuzz/src/full_stack.rs index 47aebf41ac9..94bc2246c8e 100644 --- a/fuzz/src/full_stack.rs +++ b/fuzz/src/full_stack.rs @@ -63,10 +63,10 @@ use lightning::sign::{ SignerProvider, }; use lightning::types::payment::{PaymentHash, PaymentPreimage, PaymentSecret}; -use lightning::util::async_poll::{MaybeSend, MaybeSync}; use lightning::util::config::{ChannelConfig, UserConfig}; use lightning::util::hash_tables::*; use lightning::util::logger::Logger; +use lightning::util::native_async::{MaybeSend, MaybeSync}; use lightning::util::ser::{Readable, Writeable}; use lightning::util::test_channel_signer::{EnforcementState, TestChannelSigner}; use lightning::util::test_utils::TestWalletSource; @@ -1956,8 +1956,8 @@ pub fn write_fst_seeds(path: &str) { #[cfg(test)] mod tests { - use lightning::util::async_poll::{MaybeSend, MaybeSync}; use lightning::util::logger::{Logger, Record}; + use lightning::util::native_async::{MaybeSend, MaybeSync}; use std::collections::HashMap; use std::sync::{Arc, Mutex}; diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 594681d9782..94e7a94b042 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -55,9 +55,9 @@ use lightning::routing::utxo::UtxoLookup; #[cfg(not(c_bindings))] use lightning::sign::EntropySource; use lightning::sign::{ChangeDestinationSource, ChangeDestinationSourceSync, OutputSpender}; -#[cfg(not(c_bindings))] -use lightning::util::async_poll::MaybeSend; use lightning::util::logger::Logger; +#[cfg(not(c_bindings))] +use lightning::util::native_async::MaybeSend; use lightning::util::persist::{ KVStore, KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 396ee277067..89f02df51ef 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -51,10 +51,9 @@ use crate::sign::ecdsa::EcdsaChannelSigner; use crate::sign::{EntropySource, PeerStorageKey, SignerProvider}; use crate::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard}; use crate::types::features::{InitFeatures, NodeFeatures}; -use crate::util::async_poll::{MaybeSend, MaybeSync}; use crate::util::errors::APIError; use crate::util::logger::{Logger, WithContext}; -use crate::util::native_async::FutureSpawner; +use crate::util::native_async::{FutureSpawner, MaybeSend, MaybeSync}; use crate::util::persist::{KVStore, MonitorName, MonitorUpdatingPersisterAsync}; #[cfg(peer_storage)] use crate::util::ser::{VecWriter, Writeable}; diff --git a/lightning/src/ln/funding.rs b/lightning/src/ln/funding.rs index c81024ca080..353d43cfe8e 100644 --- a/lightning/src/ln/funding.rs +++ b/lightning/src/ln/funding.rs @@ -22,7 +22,7 @@ use crate::ln::msgs; use crate::ln::types::ChannelId; use crate::ln::LN_MAX_MSG_LEN; use crate::prelude::*; -use crate::util::async_poll::MaybeSend; +use crate::util::native_async::MaybeSend; use crate::util::wallet_utils::{ CoinSelection, CoinSelectionSource, CoinSelectionSourceSync, Input, }; diff --git a/lightning/src/sign/mod.rs b/lightning/src/sign/mod.rs index 84bfbb902ea..10a3c155374 100644 --- a/lightning/src/sign/mod.rs +++ b/lightning/src/sign/mod.rs @@ -58,7 +58,7 @@ use crate::ln::script::ShutdownScript; use crate::offers::invoice::UnsignedBolt12Invoice; use crate::types::features::ChannelTypeFeatures; use crate::types::payment::PaymentPreimage; -use crate::util::async_poll::MaybeSend; +use crate::util::native_async::MaybeSend; use crate::util::ser::{ReadableArgs, Writeable}; use crate::util::transaction_utils; diff --git a/lightning/src/util/async_poll.rs b/lightning/src/util/async_poll.rs index 57df5b26cb0..23ca1aad603 100644 --- a/lightning/src/util/async_poll.rs +++ b/lightning/src/util/async_poll.rs @@ -164,31 +164,3 @@ const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( pub(crate) fn dummy_waker() -> Waker { unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) } } - -/// Marker trait to optionally implement `Sync` under std. -/// -/// This is not exported to bindings users as async is only supported in Rust. -#[cfg(feature = "std")] -pub use core::marker::Sync as MaybeSync; - -#[cfg(not(feature = "std"))] -/// Marker trait to optionally implement `Sync` under std. -/// -/// This is not exported to bindings users as async is only supported in Rust. -pub trait MaybeSync {} -#[cfg(not(feature = "std"))] -impl MaybeSync for T where T: ?Sized {} - -/// Marker trait to optionally implement `Send` under std. -/// -/// This is not exported to bindings users as async is only supported in Rust. -#[cfg(feature = "std")] -pub use core::marker::Send as MaybeSend; - -#[cfg(not(feature = "std"))] -/// Marker trait to optionally implement `Send` under std. -/// -/// This is not exported to bindings users as async is only supported in Rust. -pub trait MaybeSend {} -#[cfg(not(feature = "std"))] -impl MaybeSend for T where T: ?Sized {} diff --git a/lightning/src/util/mod.rs b/lightning/src/util/mod.rs index 75434fdabab..4f3e930caf4 100644 --- a/lightning/src/util/mod.rs +++ b/lightning/src/util/mod.rs @@ -20,7 +20,7 @@ pub mod mut_global; pub mod anchor_channel_reserves; -pub mod async_poll; +pub(crate) mod async_poll; #[cfg(fuzzing)] pub mod base32; #[cfg(not(fuzzing))] diff --git a/lightning/src/util/native_async.rs b/lightning/src/util/native_async.rs index 0c380f2b1d1..31b07c2f3b5 100644 --- a/lightning/src/util/native_async.rs +++ b/lightning/src/util/native_async.rs @@ -9,8 +9,9 @@ #[cfg(all(test, feature = "std"))] use crate::sync::{Arc, Mutex}; -use crate::util::async_poll::{MaybeSend, MaybeSync}; +#[cfg(test)] +use alloc::boxed::Box; #[cfg(all(test, not(feature = "std")))] use alloc::rc::Rc; @@ -53,6 +54,34 @@ trait MaybeSendableFuture: Future + MaybeSend + 'static {} #[cfg(test)] impl + MaybeSend + 'static> MaybeSendableFuture for F {} +/// Marker trait to optionally implement `Sync` under std. +/// +/// This is not exported to bindings users as async is only supported in Rust. +#[cfg(feature = "std")] +pub use core::marker::Sync as MaybeSync; + +#[cfg(not(feature = "std"))] +/// Marker trait to optionally implement `Sync` under std. +/// +/// This is not exported to bindings users as async is only supported in Rust. +pub trait MaybeSync {} +#[cfg(not(feature = "std"))] +impl MaybeSync for T where T: ?Sized {} + +/// Marker trait to optionally implement `Send` under std. +/// +/// This is not exported to bindings users as async is only supported in Rust. +#[cfg(feature = "std")] +pub use core::marker::Send as MaybeSend; + +#[cfg(not(feature = "std"))] +/// Marker trait to optionally implement `Send` under std. +/// +/// This is not exported to bindings users as async is only supported in Rust. +pub trait MaybeSend {} +#[cfg(not(feature = "std"))] +impl MaybeSend for T where T: ?Sized {} + /// A simple [`FutureSpawner`] which holds [`Future`]s until they are manually polled via /// [`Self::poll_futures`]. #[cfg(all(test, feature = "std"))] diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 7df63aa5ac9..68359636f6b 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -38,10 +38,10 @@ use crate::ln::types::ChannelId; use crate::sign::{ecdsa::EcdsaChannelSigner, EntropySource, SignerProvider}; use crate::sync::Mutex; use crate::util::async_poll::{ - dummy_waker, MaybeSend, MaybeSync, MultiResultFuturePoller, ResultFuture, TwoFutureJoiner, + dummy_waker, MultiResultFuturePoller, ResultFuture, TwoFutureJoiner, }; use crate::util::logger::Logger; -use crate::util::native_async::FutureSpawner; +use crate::util::native_async::{FutureSpawner, MaybeSend, MaybeSync}; use crate::util::ser::{Readable, ReadableArgs, Writeable}; use crate::util::wakers::Notifier; diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index aa77f515cd9..060ab8b3989 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -49,7 +49,6 @@ use crate::sign::{self, ReceiveAuthKey}; use crate::sign::{ChannelSigner, PeerStorageKey}; use crate::sync::RwLock; use crate::types::features::{ChannelFeatures, InitFeatures, NodeFeatures}; -use crate::util::async_poll::MaybeSend; use crate::util::config::UserConfig; use crate::util::dyn_signer::{ DynKeysInterface, DynKeysInterfaceTrait, DynPhantomKeysInterface, DynSigner, @@ -57,6 +56,7 @@ use crate::util::dyn_signer::{ use crate::util::logger::{Logger, Record}; #[cfg(feature = "std")] use crate::util::mut_global::MutGlobal; +use crate::util::native_async::MaybeSend; use crate::util::persist::{KVStore, KVStoreSync, MonitorName}; use crate::util::ser::{Readable, ReadableArgs, Writeable, Writer}; use crate::util::test_channel_signer::{EnforcementState, TestChannelSigner}; diff --git a/lightning/src/util/wallet_utils.rs b/lightning/src/util/wallet_utils.rs index b82437c03e8..be8d9475098 100644 --- a/lightning/src/util/wallet_utils.rs +++ b/lightning/src/util/wallet_utils.rs @@ -24,9 +24,10 @@ use crate::ln::chan_utils::{ use crate::prelude::*; use crate::sign::{P2TR_KEY_PATH_WITNESS_WEIGHT, P2WPKH_WITNESS_WEIGHT}; use crate::sync::Mutex; -use crate::util::async_poll::{dummy_waker, MaybeSend, MaybeSync}; +use crate::util::async_poll::dummy_waker; use crate::util::hash_tables::{new_hash_map, HashMap}; use crate::util::logger::Logger; +use crate::util::native_async::{MaybeSend, MaybeSync}; use bitcoin::amount::Amount; use bitcoin::consensus::Encodable; From 436480fc432c0ccb379d895ec0c0be46428ddc42 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sat, 18 Oct 2025 01:36:01 +0000 Subject: [PATCH 08/12] Add `async_poll.rs` to `lightning-block-sync` In the next commit we'll fetch blocks during initial connection in parallel, which requires a multi-future poller. Here we add a symlink to the existing `lightning` `async_poll.rs` file, making it available in `lightning-block-sync` --- lightning-block-sync/src/async_poll.rs | 1 + lightning-block-sync/src/lib.rs | 7 ++++++- 2 files changed, 7 insertions(+), 1 deletion(-) create mode 120000 lightning-block-sync/src/async_poll.rs diff --git a/lightning-block-sync/src/async_poll.rs b/lightning-block-sync/src/async_poll.rs new file mode 120000 index 00000000000..eb85cdac697 --- /dev/null +++ b/lightning-block-sync/src/async_poll.rs @@ -0,0 +1 @@ +../../lightning/src/util/async_poll.rs \ No newline at end of file diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index c9cffa272d1..fb51f69bbec 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -16,9 +16,11 @@ #![deny(rustdoc::broken_intra_doc_links)] #![deny(rustdoc::private_intra_doc_links)] #![deny(missing_docs)] -#![deny(unsafe_code)] #![cfg_attr(docsrs, feature(doc_cfg))] +extern crate alloc; +extern crate core; + #[cfg(any(feature = "rest-client", feature = "rpc-client"))] pub mod http; @@ -42,6 +44,9 @@ mod test_utils; #[cfg(any(feature = "rest-client", feature = "rpc-client"))] mod utils; +#[allow(unused)] +mod async_poll; + use crate::poll::{ChainTip, Poll, ValidatedBlockHeader}; use bitcoin::block::{Block, Header}; From 1eabb66b3ae87516e9c51ce4493099be370d8fe4 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 7 Dec 2025 23:30:57 +0000 Subject: [PATCH 09/12] Fetch blocks from source in parallel during initial sync In `init::synchronize_listeners` we may end up spending a decent chunk of our time just fetching block data. Here we parallelize that step across up to 36 blocks at a time. On my node with bitcoind on localhost, the impact of this is somewhat muted by block deserialization being the bulk of the work, however a networked bitcoind would likely change that. Even still, fetching a batch of 36 blocks in parallel happens on my node in ~615 ms vs ~815ms in serial. --- lightning-block-sync/src/init.rs | 91 ++++++++++++++++++-------------- 1 file changed, 52 insertions(+), 39 deletions(-) diff --git a/lightning-block-sync/src/init.rs b/lightning-block-sync/src/init.rs index ddb90d6d97f..b1d93bc0462 100644 --- a/lightning-block-sync/src/init.rs +++ b/lightning-block-sync/src/init.rs @@ -1,8 +1,9 @@ //! Utilities to assist in the initial sync required to initialize or reload Rust-Lightning objects //! from disk. -use crate::poll::{ChainPoller, Validate, ValidatedBlockHeader}; -use crate::{BlockSource, BlockSourceResult, ChainNotifier, HeaderCache}; +use crate::async_poll::{MultiResultFuturePoller, ResultFuture}; +use crate::poll::{ChainPoller, Poll, Validate, ValidatedBlockHeader}; +use crate::{BlockData, BlockSource, BlockSourceResult, ChainNotifier, HeaderCache}; use bitcoin::block::Header; use bitcoin::network::Network; @@ -146,7 +147,6 @@ where // Find differences and disconnect blocks for each listener individually. let mut chain_poller = ChainPoller::new(block_source, network); let mut chain_listeners_at_height = Vec::new(); - let mut most_common_ancestor = None; let mut most_connected_blocks = Vec::new(); let mut header_cache = HeaderCache::new(); for (old_best_block, chain_listener) in chain_listeners.drain(..) { @@ -167,19 +167,59 @@ where // Keep track of the most common ancestor and all blocks connected across all listeners. chain_listeners_at_height.push((common_ancestor.height, chain_listener)); if connected_blocks.len() > most_connected_blocks.len() { - most_common_ancestor = Some(common_ancestor); most_connected_blocks = connected_blocks; } } - // Connect new blocks for all listeners at once to avoid re-fetching blocks. - if let Some(common_ancestor) = most_common_ancestor { - let chain_listener = &ChainListenerSet(chain_listeners_at_height); - let mut chain_notifier = ChainNotifier { header_cache: &mut header_cache, chain_listener }; - chain_notifier - .connect_blocks(common_ancestor, most_connected_blocks, &mut chain_poller) - .await - .map_err(|(e, _)| e)?; + while !most_connected_blocks.is_empty() { + #[cfg(not(test))] + const MAX_BLOCKS_AT_ONCE: usize = 6 * 6; // Six hours of blocks, 144MiB encoded + #[cfg(test)] + const MAX_BLOCKS_AT_ONCE: usize = 2; + + let mut fetch_block_futures = + Vec::with_capacity(core::cmp::min(MAX_BLOCKS_AT_ONCE, most_connected_blocks.len())); + for header in most_connected_blocks.iter().rev().take(MAX_BLOCKS_AT_ONCE) { + let fetch_future = chain_poller.fetch_block(header); + fetch_block_futures + .push(ResultFuture::Pending(Box::pin(async move { (header, fetch_future.await) }))); + } + let results = MultiResultFuturePoller::new(fetch_block_futures).await.into_iter(); + + const NO_BLOCK: Option<(u32, crate::poll::ValidatedBlock)> = None; + let mut fetched_blocks = [NO_BLOCK; MAX_BLOCKS_AT_ONCE]; + for ((header, block_res), result) in results.into_iter().zip(fetched_blocks.iter_mut()) { + *result = Some((header.height, block_res?)); + } + debug_assert!(fetched_blocks.iter().take(most_connected_blocks.len()).all(|r| r.is_some())); + // TODO: When our MSRV is 1.82, use is_sorted_by_key + debug_assert!(fetched_blocks.windows(2).all(|blocks| { + if let (Some(a), Some(b)) = (&blocks[0], &blocks[1]) { + a.0 < b.0 + } else { + // Any non-None blocks have to come before any None entries + blocks[1].is_none() + } + })); + + for (listener_height, listener) in chain_listeners_at_height.iter() { + // Connect blocks for this listener. + for (height, block_data) in fetched_blocks.iter().flatten() { + if *height > *listener_height { + match &**block_data { + BlockData::FullBlock(block) => { + listener.block_connected(&block, *height); + }, + BlockData::HeaderOnly(header_data) => { + listener.filtered_block_connected(&header_data, &[], *height); + }, + } + } + } + } + + most_connected_blocks + .truncate(most_connected_blocks.len().saturating_sub(MAX_BLOCKS_AT_ONCE)); } Ok((header_cache, best_header)) @@ -200,33 +240,6 @@ impl<'a, L: chain::Listen + ?Sized> chain::Listen for DynamicChainListener<'a, L } } -/// A set of dynamically sized chain listeners, each paired with a starting block height. -struct ChainListenerSet<'a, L: chain::Listen + ?Sized>(Vec<(u32, &'a L)>); - -impl<'a, L: chain::Listen + ?Sized> chain::Listen for ChainListenerSet<'a, L> { - fn block_connected(&self, block: &bitcoin::Block, height: u32) { - for (starting_height, chain_listener) in self.0.iter() { - if height > *starting_height { - chain_listener.block_connected(block, height); - } - } - } - - fn filtered_block_connected( - &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32, - ) { - for (starting_height, chain_listener) in self.0.iter() { - if height > *starting_height { - chain_listener.filtered_block_connected(header, txdata, height); - } - } - } - - fn blocks_disconnected(&self, _fork_point: BestBlock) { - unreachable!() - } -} - #[cfg(test)] mod tests { use super::*; From 7068a69f7fe54d85c5ff40ebcc8672b1efe9899e Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 8 Dec 2025 12:18:01 +0000 Subject: [PATCH 10/12] Silence "elided lifetime has a name" warnings in no-std locking --- lightning/src/sync/nostd_sync.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lightning/src/sync/nostd_sync.rs b/lightning/src/sync/nostd_sync.rs index 12070741918..18055d1ebe4 100644 --- a/lightning/src/sync/nostd_sync.rs +++ b/lightning/src/sync/nostd_sync.rs @@ -61,7 +61,7 @@ impl<'a, T: 'a> LockTestExt<'a> for Mutex { } type ExclLock = MutexGuard<'a, T>; #[inline] - fn unsafe_well_ordered_double_lock_self(&'a self) -> MutexGuard { + fn unsafe_well_ordered_double_lock_self(&'a self) -> MutexGuard<'a, T> { self.lock().unwrap() } } @@ -132,7 +132,7 @@ impl<'a, T: 'a> LockTestExt<'a> for RwLock { } type ExclLock = RwLockWriteGuard<'a, T>; #[inline] - fn unsafe_well_ordered_double_lock_self(&'a self) -> RwLockWriteGuard { + fn unsafe_well_ordered_double_lock_self(&'a self) -> RwLockWriteGuard<'a, T> { self.write().unwrap() } } From b0074368f81ebfc59a61fac127b79bfda2352264 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 8 Dec 2025 14:15:47 +0000 Subject: [PATCH 11/12] Use the header cache across listeners during initial disconnect In `lightning-blocksync::init::synchronize_listeners`, we may have many listeners we want to do a chain diff on. When doing so, we should make sure we utilize our header cache, rather than querying our chain source for every header we need for each listener. Here we do so, inserting into the cache as we do chain diffs. On my node with a bitcoind on localhost, this brings the calculate-differences step of `init::synchronize_listeners` from ~500ms to under 150ms. --- lightning-block-sync/src/lib.rs | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index fb51f69bbec..34ff0ae24fd 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -198,6 +198,10 @@ pub(crate) trait Cache { /// Retrieves the block header keyed by the given block hash. fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader>; + /// Inserts the given block header during a find_difference operation, implying it might not be + /// the best header. + fn insert_during_diff(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader); + /// Called when a block has been connected to the best chain to ensure it is available to be /// disconnected later if needed. fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader); @@ -229,6 +233,15 @@ impl Cache for HeaderCache { self.0.get(block_hash) } + fn insert_during_diff(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { + self.0.insert(block_hash, block_header); + + // Remove headers older than our newest header minus a week. + let best_height = self.0.iter().map(|(_, header)| header.height).max().unwrap_or(0); + let cutoff_height = best_height.saturating_sub(HEADER_CACHE_LIMIT); + self.0.retain(|_, header| header.height >= cutoff_height); + } + fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { self.0.insert(block_hash, block_header); @@ -247,6 +260,10 @@ impl Cache for &mut HeaderCache { self.0.get(block_hash) } + fn insert_during_diff(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { + (*self).insert_during_diff(block_hash, block_header); + } + fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { (*self).block_connected(block_hash, block_header); } @@ -383,7 +400,7 @@ where /// First resolves `prev_best_block` to a `ValidatedBlockHeader` using the `previous_blocks` /// field as fallback if needed, then finds the common ancestor. async fn find_difference_from_best_block( - &self, current_header: ValidatedBlockHeader, prev_best_block: BestBlock, + &mut self, current_header: ValidatedBlockHeader, prev_best_block: BestBlock, chain_poller: &mut P, ) -> BlockSourceResult { // Try to resolve the header for the previous best block. First try the block_hash, @@ -408,6 +425,7 @@ where )?; if let Ok(header) = chain_poller.get_header(block_hash, Some(height)).await { found_header = Some(header); + self.header_cache.insert_during_diff(*block_hash, header); break; } } From d37321e640f35de5fb4f38d3b9f57b23cebe60c1 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 27 Jan 2026 17:22:06 +0000 Subject: [PATCH 12/12] Include recent blocks in the `synchronize_listeners`-returned cache When `synchronize_listeners` runs, it returns a cache of the headers it needed when doing chain difference-finding. This allows us to ensure that when we start running normally we have all the recent headers in case we need them to reorg. Sadly, in some cases it was returning a mostly-empty cache. Because it was only being filled during block difference reconciliation it would only get a block around each listener's fork point. Worse, because we were calling `disconnect_blocks` with the cache the cache would assume we were reorging against the main chain and drop blocks we actually want. Instead, we avoid dropping blocks on `disconnect_blocks` calls and ensure we always add connected blocks to the cache. --- lightning-block-sync/src/init.rs | 68 +++++++++++++++++++++++++++++--- 1 file changed, 62 insertions(+), 6 deletions(-) diff --git a/lightning-block-sync/src/init.rs b/lightning-block-sync/src/init.rs index b1d93bc0462..5bb1b4292f7 100644 --- a/lightning-block-sync/src/init.rs +++ b/lightning-block-sync/src/init.rs @@ -3,7 +3,7 @@ use crate::async_poll::{MultiResultFuturePoller, ResultFuture}; use crate::poll::{ChainPoller, Poll, Validate, ValidatedBlockHeader}; -use crate::{BlockData, BlockSource, BlockSourceResult, ChainNotifier, HeaderCache}; +use crate::{BlockData, BlockSource, BlockSourceResult, Cache, ChainNotifier, HeaderCache}; use bitcoin::block::Header; use bitcoin::network::Network; @@ -153,8 +153,9 @@ where // Disconnect any stale blocks, but keep them in the cache for the next iteration. let (common_ancestor, connected_blocks) = { let chain_listener = &DynamicChainListener(chain_listener); + let mut cache_wrapper = HeaderCacheNoDisconnect(&mut header_cache); let mut chain_notifier = - ChainNotifier { header_cache: &mut header_cache, chain_listener }; + ChainNotifier { header_cache: &mut cache_wrapper, chain_listener }; let difference = chain_notifier .find_difference_from_best_block(best_header, old_best_block, &mut chain_poller) .await?; @@ -189,7 +190,9 @@ where const NO_BLOCK: Option<(u32, crate::poll::ValidatedBlock)> = None; let mut fetched_blocks = [NO_BLOCK; MAX_BLOCKS_AT_ONCE]; for ((header, block_res), result) in results.into_iter().zip(fetched_blocks.iter_mut()) { - *result = Some((header.height, block_res?)); + let block = block_res?; + header_cache.block_connected(header.block_hash, *header); + *result = Some((header.height, block)); } debug_assert!(fetched_blocks.iter().take(most_connected_blocks.len()).all(|r| r.is_some())); // TODO: When our MSRV is 1.82, use is_sorted_by_key @@ -240,10 +243,40 @@ impl<'a, L: chain::Listen + ?Sized> chain::Listen for DynamicChainListener<'a, L } } +/// Wrapper around HeaderCache that ignores `blocks_disconnected` calls, retaining disconnected +/// blocks in the cache. This is useful during initial sync to keep headers available across +/// multiple listeners. +struct HeaderCacheNoDisconnect<'a>(&'a mut HeaderCache); + +impl<'a> crate::Cache for &mut HeaderCacheNoDisconnect<'a> { + fn look_up( + &self, block_hash: &bitcoin::hash_types::BlockHash, + ) -> Option<&ValidatedBlockHeader> { + self.0.look_up(block_hash) + } + + fn insert_during_diff( + &mut self, block_hash: bitcoin::hash_types::BlockHash, block_header: ValidatedBlockHeader, + ) { + self.0.insert_during_diff(block_hash, block_header); + } + + fn block_connected( + &mut self, block_hash: bitcoin::hash_types::BlockHash, block_header: ValidatedBlockHeader, + ) { + self.0.block_connected(block_hash, block_header); + } + + fn blocks_disconnected(&mut self, _fork_point: &ValidatedBlockHeader) { + // Intentionally ignore disconnections to retain blocks in cache + } +} + #[cfg(test)] mod tests { use super::*; use crate::test_utils::{Blockchain, MockChainListener}; + use crate::Cache; #[tokio::test] async fn sync_from_same_chain() { @@ -264,7 +297,13 @@ mod tests { (chain.best_block_at_height(3), &listener_3 as &dyn chain::Listen), ]; match synchronize_listeners(&chain, Network::Bitcoin, listeners).await { - Ok((_, header)) => assert_eq!(header, chain.tip()), + Ok((cache, header)) => { + assert_eq!(header, chain.tip()); + assert!(cache.look_up(&chain.at_height(1).block_hash).is_some()); + assert!(cache.look_up(&chain.at_height(2).block_hash).is_some()); + assert!(cache.look_up(&chain.at_height(3).block_hash).is_some()); + assert!(cache.look_up(&chain.at_height(4).block_hash).is_some()); + }, Err(e) => panic!("Unexpected error: {:?}", e), } } @@ -295,7 +334,15 @@ mod tests { (fork_chain_3.best_block(), &listener_3 as &dyn chain::Listen), ]; match synchronize_listeners(&main_chain, Network::Bitcoin, listeners).await { - Ok((_, header)) => assert_eq!(header, main_chain.tip()), + Ok((cache, header)) => { + assert_eq!(header, main_chain.tip()); + assert!(cache.look_up(&main_chain.at_height(1).block_hash).is_some()); + assert!(cache.look_up(&main_chain.at_height(2).block_hash).is_some()); + assert!(cache.look_up(&main_chain.at_height(3).block_hash).is_some()); + assert!(cache.look_up(&fork_chain_1.at_height(2).block_hash).is_none()); + assert!(cache.look_up(&fork_chain_2.at_height(3).block_hash).is_none()); + assert!(cache.look_up(&fork_chain_3.at_height(4).block_hash).is_none()); + }, Err(e) => panic!("Unexpected error: {:?}", e), } } @@ -329,7 +376,16 @@ mod tests { (fork_chain_3.best_block(), &listener_3 as &dyn chain::Listen), ]; match synchronize_listeners(&main_chain, Network::Bitcoin, listeners).await { - Ok((_, header)) => assert_eq!(header, main_chain.tip()), + Ok((cache, header)) => { + assert_eq!(header, main_chain.tip()); + assert!(cache.look_up(&main_chain.at_height(1).block_hash).is_some()); + assert!(cache.look_up(&main_chain.at_height(2).block_hash).is_some()); + assert!(cache.look_up(&main_chain.at_height(3).block_hash).is_some()); + assert!(cache.look_up(&main_chain.at_height(4).block_hash).is_some()); + assert!(cache.look_up(&fork_chain_1.at_height(2).block_hash).is_none()); + assert!(cache.look_up(&fork_chain_1.at_height(3).block_hash).is_none()); + assert!(cache.look_up(&fork_chain_1.at_height(4).block_hash).is_none()); + }, Err(e) => panic!("Unexpected error: {:?}", e), } }