From 95c7097f5b698863dbf1001e5426e075519266ae Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 13 Jan 2026 14:51:27 +0000 Subject: [PATCH 1/4] Move `RuntimeSpawner` to `runtime.rs` In a few commits as we upgrade LDK we'll use `RuntimeSpawner` outside of gossip, making it make much more sense to have it in `runtime.rs` instead. --- src/gossip.rs | 20 +------------------- src/runtime.rs | 18 ++++++++++++++++++ src/types.rs | 2 +- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/src/gossip.rs b/src/gossip.rs index 2b524d9ae..f42b4602c 100644 --- a/src/gossip.rs +++ b/src/gossip.rs @@ -5,18 +5,16 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. -use std::future::Future; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use std::time::Duration; -use lightning::util::native_async::FutureSpawner; use lightning_block_sync::gossip::GossipVerifier; use crate::chain::ChainSource; use crate::config::RGS_SYNC_TIMEOUT_SECS; use crate::logger::{log_trace, LdkLogger, Logger}; -use crate::runtime::Runtime; +use crate::runtime::{Runtime, RuntimeSpawner}; use crate::types::{GossipSync, Graph, P2PGossipSync, RapidGossipSync}; use crate::Error; @@ -114,19 +112,3 @@ impl GossipSource { } } } - -pub(crate) struct RuntimeSpawner { - runtime: Arc, -} - -impl RuntimeSpawner { - pub(crate) fn new(runtime: Arc) -> Self { - Self { runtime } - } -} - -impl FutureSpawner for RuntimeSpawner { - fn spawn + Send + 'static>(&self, future: T) { - self.runtime.spawn_cancellable_background_task(future); - } -} diff --git a/src/runtime.rs b/src/runtime.rs index 1e9883ae4..d9d39e84b 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -9,6 +9,8 @@ use std::future::Future; use std::sync::{Arc, Mutex}; use std::time::Duration; +use lightning::util::native_async::FutureSpawner; + use tokio::task::{JoinHandle, JoinSet}; use crate::config::{ @@ -219,3 +221,19 @@ enum RuntimeMode { Owned(tokio::runtime::Runtime), Handle(tokio::runtime::Handle), } + +pub(crate) struct RuntimeSpawner { + runtime: Arc, +} + +impl RuntimeSpawner { + pub(crate) fn new(runtime: Arc) -> Self { + Self { runtime } + } +} + +impl FutureSpawner for RuntimeSpawner { + fn spawn + Send + 'static>(&self, future: T) { + self.runtime.spawn_cancellable_background_task(future); + } +} diff --git a/src/types.rs b/src/types.rs index 2b7d3829a..96b9a9866 100644 --- a/src/types.rs +++ b/src/types.rs @@ -35,10 +35,10 @@ use crate::chain::ChainSource; use crate::config::ChannelConfig; use crate::data_store::DataStore; use crate::fee_estimator::OnchainFeeEstimator; -use crate::gossip::RuntimeSpawner; use crate::logger::Logger; use crate::message_handler::NodeCustomMessageHandler; use crate::payment::PaymentDetails; +use crate::runtime::RuntimeSpawner; /// A supertrait that requires that a type implements both [`KVStore`] and [`KVStoreSync`] at the /// same time. From 68e92ffba7dd9dad3ba70d79c167a324e83e6fdf Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 13 Jan 2026 14:57:51 +0000 Subject: [PATCH 2/4] Upgrade to latest LDK (which spawns futures with a return value) --- Cargo.toml | 26 +++++++++++++------------- src/runtime.rs | 14 ++++++++++++-- 2 files changed, 25 insertions(+), 15 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 09ae4b03a..e0ace123f 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,17 +39,17 @@ default = [] #lightning-liquidity = { version = "0.2.0", features = ["std"] } #lightning-macros = { version = "0.2.0" } -lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["std"] } -lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" } -lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["std"] } -lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" } -lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["tokio"] } -lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" } -lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" } -lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["rest-client", "rpc-client", "tokio"] } -lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["esplora-async-https", "time", "electrum-rustls-ring"] } -lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["std"] } -lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891" } +lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["std"] } +lightning-types = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256" } +lightning-invoice = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["std"] } +lightning-net-tokio = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256" } +lightning-persister = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["tokio"] } +lightning-background-processor = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256" } +lightning-rapid-gossip-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256" } +lightning-block-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["rest-client", "rpc-client", "tokio"] } +lightning-transaction-sync = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["esplora-async-https", "time", "electrum-rustls-ring"] } +lightning-liquidity = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["std"] } +lightning-macros = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256" } bdk_chain = { version = "0.23.0", default-features = false, features = ["std"] } bdk_esplora = { version = "0.22.0", default-features = false, features = ["async-https-rustls", "tokio"]} @@ -78,13 +78,13 @@ log = { version = "0.4.22", default-features = false, features = ["std"]} vss-client = { package = "vss-client-ng", version = "0.4" } prost = { version = "0.11.6", default-features = false} #bitcoin-payment-instructions = { version = "0.6" } -bitcoin-payment-instructions = { git = "https://github.com/tnull/bitcoin-payment-instructions", rev = "fdca6c62f2fe2c53427d3e51e322a49aa7323ee2" } +bitcoin-payment-instructions = { git = "https://github.com/tnull/bitcoin-payment-instructions", rev = "ce9ff5281ae9bb05526981f6f9df8f8d929c7c44" } [target.'cfg(windows)'.dependencies] winapi = { version = "0.3", features = ["winbase"] } [dev-dependencies] -lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "5236dba053a3f4f01cf0c32ce42b609a93738891", features = ["std", "_test_utils"] } +lightning = { git = "https://github.com/lightningdevkit/rust-lightning", rev = "c9f022bcccb33964604159e6bdb4722020b4d256", features = ["std", "_test_utils"] } proptest = "1.0.0" regex = "1.5.6" criterion = { version = "0.7.0", features = ["async_tokio"] } diff --git a/src/runtime.rs b/src/runtime.rs index d9d39e84b..f43cbb9f0 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -233,7 +233,17 @@ impl RuntimeSpawner { } impl FutureSpawner for RuntimeSpawner { - fn spawn + Send + 'static>(&self, future: T) { - self.runtime.spawn_cancellable_background_task(future); + type E = tokio::sync::oneshot::error::RecvError; + type SpawnedFutureResult = tokio::sync::oneshot::Receiver; + fn spawn + Send + 'static>( + &self, future: F, + ) -> Self::SpawnedFutureResult { + let (result, output) = tokio::sync::oneshot::channel(); + self.runtime.spawn_cancellable_background_task(async move { + // We don't care if the send works or not, if the receiver is dropped its not our + // problem. + let _ = result.send(future.await); + }); + output } } From b0f4a5e45fb7293a139b93ac654aa16a797fb890 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 8 Jan 2026 19:25:44 +0000 Subject: [PATCH 3/4] Switch to the new highly-parallel `ChannelMonitor` reader Upstream LDK added the ability to read `ChannelMonitor`s from storage in parallel, which we switch to here. --- src/builder.rs | 24 +++++++++++++++++++----- src/types.rs | 14 +++++++++++++- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index 510d86bdd..6348c26ba 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -69,11 +69,12 @@ use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger}; use crate::message_handler::NodeCustomMessageHandler; use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox; use crate::peer_store::PeerStore; -use crate::runtime::Runtime; +use crate::runtime::{Runtime, RuntimeSpawner}; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ - ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph, KeysManager, - MessageRouter, OnionMessenger, PaymentStore, PeerManager, Persister, SyncAndAsyncKVStore, + AsyncPersister, ChainMonitor, ChannelManager, DynStore, DynStoreWrapper, GossipSync, Graph, + KeysManager, MessageRouter, OnionMessenger, PaymentStore, PeerManager, Persister, + SyncAndAsyncKVStore, }; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; @@ -1261,8 +1262,9 @@ fn build_with_store_internal( )); let peer_storage_key = keys_manager.get_peer_storage_key(); - let persister = Arc::new(Persister::new( + let monitor_reader = Arc::new(AsyncPersister::new( Arc::clone(&kv_store), + RuntimeSpawner::new(Arc::clone(&runtime)), Arc::clone(&logger), PERSISTER_MAX_PENDING_UPDATES, Arc::clone(&keys_manager), @@ -1272,7 +1274,9 @@ fn build_with_store_internal( )); // Read ChannelMonitor state from store - let channel_monitors = match persister.read_all_channel_monitors_with_updates() { + let monitor_read_result = + runtime.block_on(monitor_reader.read_all_channel_monitors_with_updates_parallel()); + let channel_monitors = match monitor_read_result { Ok(monitors) => monitors, Err(e) => { if e.kind() == lightning::io::ErrorKind::NotFound { @@ -1284,6 +1288,16 @@ fn build_with_store_internal( }, }; + let persister = Arc::new(Persister::new( + Arc::clone(&kv_store), + Arc::clone(&logger), + PERSISTER_MAX_PENDING_UPDATES, + Arc::clone(&keys_manager), + Arc::clone(&keys_manager), + Arc::clone(&tx_broadcaster), + Arc::clone(&fee_estimator), + )); + // Initialize the ChainMonitor let chain_monitor: Arc = Arc::new(chainmonitor::ChainMonitor::new( Some(Arc::clone(&chain_source)), diff --git a/src/types.rs b/src/types.rs index 96b9a9866..614efd90e 100644 --- a/src/types.rs +++ b/src/types.rs @@ -23,7 +23,9 @@ use lightning::routing::gossip; use lightning::routing::router::DefaultRouter; use lightning::routing::scoring::{CombinedScorer, ProbabilisticScoringFeeParameters}; use lightning::sign::InMemorySigner; -use lightning::util::persist::{KVStore, KVStoreSync, MonitorUpdatingPersister}; +use lightning::util::persist::{ + KVStore, KVStoreSync, MonitorUpdatingPersister, MonitorUpdatingPersisterAsync, +}; use lightning::util::ser::{Readable, Writeable, Writer}; use lightning::util::sweep::OutputSweeper; use lightning_block_sync::gossip::GossipVerifier; @@ -185,6 +187,16 @@ impl DynStoreTrait for DynStoreWrapper } } +pub(crate) type AsyncPersister = MonitorUpdatingPersisterAsync< + Arc, + RuntimeSpawner, + Arc, + Arc, + Arc, + Arc, + Arc, +>; + pub type Persister = MonitorUpdatingPersister< Arc, Arc, From 78842ad3ac12c4d5be0ff46361b411cad7199088 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 8 Jan 2026 19:51:14 +0000 Subject: [PATCH 4/4] Parallelize store reads in init Since I was editing the init logic anyway I couldn't resist going ahead and parallelizing various read calls. Since we added support for an async `KVStore` in LDK 0.2/ldk-node 0.7, we can now practically do initialization reads in parallel. Thus, rather than making a long series of read calls in `build`, we use `tokio::join` to reduce the number of round-trips to our backing store, which should be a very large win for initialization cost on those using remote storage (e.g. VSS). Sadly we can't trivially do all our reads in one go, we need the payment history to initialize the BDK wallet, which is used in the `Walet` object which is referenced in our `KeysManager`. Thus we first read the payment store and node metrics before moving on. Then, we need a reference to the `NetworkGraph` when we build the scorer. While we could/eventually should move to reading the *bytes* for the scorer while reading the graph and only building the scorer later, that's a larger refactor we leave for later. In the end, we end up with: * 1 round-trip to load the payment history and node metrics, * 2 round-trips to load ChannelMonitors and NetworkGraph (where there's an internal extra round-trip after listing the monitor updates for a monitor), * 1 round-trip to validate bitcoind RPC/REST access for those using bitcoind as a chain source, * 1 round-trip to load various smaller LDK and ldk-node objects, * and 1 additional round-trip to drop the rgs snapshot timestamp for nodes using P2P network gossip syncing for a total of 4 round-trips in the common case and 6 for nodes using less common chain source and gossip sync sources. We then have additional round-trips to our storage and chain source during node start, but those are in many cases already async. --- src/builder.rs | 135 +++++++++++++++++++++++++++++-------------------- 1 file changed, 80 insertions(+), 55 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index 6348c26ba..2046956e0 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -33,7 +33,7 @@ use lightning::routing::scoring::{ }; use lightning::sign::{EntropySource, NodeSigner}; use lightning::util::persist::{ - KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + KVStore, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, }; use lightning::util::ser::ReadableArgs; @@ -1052,10 +1052,20 @@ fn build_with_store_internal( } } + let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger))); + let fee_estimator = Arc::new(OnchainFeeEstimator::new()); + + let kv_store_ref = Arc::clone(&kv_store); + let logger_ref = Arc::clone(&logger); + let (payment_store_res, node_metris_res) = runtime.block_on(async move { + tokio::join!( + read_payments(&*kv_store_ref, Arc::clone(&logger_ref)), + read_node_metrics(&*kv_store_ref, Arc::clone(&logger_ref)), + ) + }); + // Initialize the status fields. - let node_metrics = match runtime - .block_on(async { read_node_metrics(&*kv_store, Arc::clone(&logger)).await }) - { + let node_metrics = match node_metris_res { Ok(metrics) => Arc::new(RwLock::new(metrics)), Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { @@ -1066,23 +1076,20 @@ fn build_with_store_internal( } }, }; - let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger))); - let fee_estimator = Arc::new(OnchainFeeEstimator::new()); - let payment_store = - match runtime.block_on(async { read_payments(&*kv_store, Arc::clone(&logger)).await }) { - Ok(payments) => Arc::new(PaymentStore::new( - payments, - PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), - PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), - Arc::clone(&kv_store), - Arc::clone(&logger), - )), - Err(e) => { - log_error!(logger, "Failed to read payment data from store: {}", e); - return Err(BuildError::ReadFailed); - }, - }; + let payment_store = match payment_store_res { + Ok(payments) => Arc::new(PaymentStore::new( + payments, + PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(), + PAYMENT_INFO_PERSISTENCE_SECONDARY_NAMESPACE.to_string(), + Arc::clone(&kv_store), + Arc::clone(&logger), + )), + Err(e) => { + log_error!(logger, "Failed to read payment data from store: {}", e); + return Err(BuildError::ReadFailed); + }, + }; let (chain_source, chain_tip_opt) = match chain_data_source_config { Some(ChainDataSourceConfig::Esplora { server_url, headers, sync_config }) => { @@ -1273,10 +1280,18 @@ fn build_with_store_internal( Arc::clone(&fee_estimator), )); + // Read ChannelMonitors and the NetworkGraph + let kv_store_ref = Arc::clone(&kv_store); + let logger_ref = Arc::clone(&logger); + let (monitor_read_res, network_graph_res) = runtime.block_on(async move { + tokio::join!( + monitor_reader.read_all_channel_monitors_with_updates_parallel(), + read_network_graph(&*kv_store_ref, logger_ref), + ) + }); + // Read ChannelMonitor state from store - let monitor_read_result = - runtime.block_on(monitor_reader.read_all_channel_monitors_with_updates_parallel()); - let channel_monitors = match monitor_read_result { + let channel_monitors = match monitor_read_res { Ok(monitors) => monitors, Err(e) => { if e.kind() == lightning::io::ErrorKind::NotFound { @@ -1310,9 +1325,7 @@ fn build_with_store_internal( )); // Initialize the network graph, scorer, and router - let network_graph = match runtime - .block_on(async { read_network_graph(&*kv_store, Arc::clone(&logger)).await }) - { + let network_graph = match network_graph_res { Ok(graph) => Arc::new(graph), Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { @@ -1324,9 +1337,42 @@ fn build_with_store_internal( }, }; - let local_scorer = match runtime.block_on(async { - read_scorer(&*kv_store, Arc::clone(&network_graph), Arc::clone(&logger)).await - }) { + // Read various smaller LDK and ldk-node objects from the store + let kv_store_ref = Arc::clone(&kv_store); + let logger_ref = Arc::clone(&logger); + let network_graph_ref = Arc::clone(&network_graph); + let output_sweeper_future = read_output_sweeper( + Arc::clone(&tx_broadcaster), + Arc::clone(&fee_estimator), + Arc::clone(&chain_source), + Arc::clone(&keys_manager), + Arc::clone(&kv_store_ref), + Arc::clone(&logger_ref), + ); + let ( + scorer_res, + external_scores_res, + channel_manager_bytes_res, + sweeper_bytes_res, + event_queue_res, + peer_info_res, + ) = runtime.block_on(async move { + tokio::join!( + read_scorer(&*kv_store_ref, network_graph_ref, Arc::clone(&logger_ref)), + read_external_pathfinding_scores_from_cache(&*kv_store_ref, Arc::clone(&logger_ref)), + KVStore::read( + &*kv_store_ref, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_KEY, + ), + output_sweeper_future, + read_event_queue(Arc::clone(&kv_store_ref), Arc::clone(&logger_ref)), + read_peer_info(Arc::clone(&kv_store_ref), Arc::clone(&logger_ref)), + ) + }); + + let local_scorer = match scorer_res { Ok(scorer) => scorer, Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { @@ -1342,9 +1388,7 @@ fn build_with_store_internal( let scorer = Arc::new(Mutex::new(CombinedScorer::new(local_scorer))); // Restore external pathfinding scores from cache if possible. - match runtime.block_on(async { - read_external_pathfinding_scores_from_cache(&*kv_store, Arc::clone(&logger)).await - }) { + match external_scores_res { Ok(external_scores) => { scorer.lock().unwrap().merge(external_scores, cur_time); log_trace!(logger, "External scores from cache merged successfully"); @@ -1397,12 +1441,7 @@ fn build_with_store_internal( // Initialize the ChannelManager let channel_manager = { - if let Ok(reader) = KVStoreSync::read( - &*kv_store, - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, - CHANNEL_MANAGER_PERSISTENCE_KEY, - ) { + if let Ok(reader) = channel_manager_bytes_res { let channel_monitor_references = channel_monitors.iter().map(|(_, chanmon)| chanmon).collect(); let read_args = ChannelManagerReadArgs::new( @@ -1627,17 +1666,7 @@ fn build_with_store_internal( let connection_manager = Arc::new(ConnectionManager::new(Arc::clone(&peer_manager), Arc::clone(&logger))); - let output_sweeper = match runtime.block_on(async { - read_output_sweeper( - Arc::clone(&tx_broadcaster), - Arc::clone(&fee_estimator), - Arc::clone(&chain_source), - Arc::clone(&keys_manager), - Arc::clone(&kv_store), - Arc::clone(&logger), - ) - .await - }) { + let output_sweeper = match sweeper_bytes_res { Ok(output_sweeper) => Arc::new(output_sweeper), Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { @@ -1658,9 +1687,7 @@ fn build_with_store_internal( }, }; - let event_queue = match runtime - .block_on(async { read_event_queue(Arc::clone(&kv_store), Arc::clone(&logger)).await }) - { + let event_queue = match event_queue_res { Ok(event_queue) => Arc::new(event_queue), Err(e) => { if e.kind() == std::io::ErrorKind::NotFound { @@ -1672,9 +1699,7 @@ fn build_with_store_internal( }, }; - let peer_store = match runtime - .block_on(async { read_peer_info(Arc::clone(&kv_store), Arc::clone(&logger)).await }) - { + let peer_store = match peer_info_res { Ok(peer_store) => Arc::new(peer_store), Err(e) => { if e.kind() == std::io::ErrorKind::NotFound {