Skip to content

Commit 099dd42

Browse files
committed
parallelize vss reads
1 parent e4e864b commit 099dd42

1 file changed

Lines changed: 70 additions & 45 deletions

File tree

src/builder.rs

Lines changed: 70 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1256,10 +1256,22 @@ fn build_with_store_internal(
12561256
}
12571257

12581258
// Initialize the status fields.
1259+
// PARALLEL GROUP 1: read_node_metrics + read_payments
12591260
let step_start = Instant::now();
1260-
let node_metrics = match runtime
1261-
.block_on(async { read_node_metrics(Arc::clone(&kv_store), Arc::clone(&logger)).await })
1262-
{
1261+
let (node_metrics_result, payments_result) = runtime.block_on(async {
1262+
let kv_store_1 = Arc::clone(&kv_store);
1263+
let logger_1 = Arc::clone(&logger);
1264+
let kv_store_2 = Arc::clone(&kv_store);
1265+
let logger_2 = Arc::clone(&logger);
1266+
1267+
tokio::join!(
1268+
read_node_metrics(kv_store_1, logger_1),
1269+
read_payments(kv_store_2, logger_2)
1270+
)
1271+
});
1272+
eprintln!("TIMING: [ldk-node] PARALLEL read_node_metrics + read_payments took {}ms", step_start.elapsed().as_millis());
1273+
1274+
let node_metrics = match node_metrics_result {
12631275
Ok(metrics) => Arc::new(RwLock::new(metrics)),
12641276
Err(e) => {
12651277
if e.kind() == std::io::ErrorKind::NotFound {
@@ -1270,15 +1282,11 @@ fn build_with_store_internal(
12701282
}
12711283
},
12721284
};
1273-
eprintln!("TIMING: [ldk-node] read_node_metrics() took {}ms", step_start.elapsed().as_millis());
12741285

12751286
let tx_broadcaster = Arc::new(TransactionBroadcaster::new(Arc::clone(&logger)));
12761287
let fee_estimator = Arc::new(OnchainFeeEstimator::new());
12771288

1278-
let step_start = Instant::now();
1279-
let payment_store = match runtime
1280-
.block_on(async { read_payments(Arc::clone(&kv_store), Arc::clone(&logger)).await })
1281-
{
1289+
let payment_store = match payments_result {
12821290
Ok(payments) => Arc::new(PaymentStore::new(
12831291
payments,
12841292
PAYMENT_INFO_PERSISTENCE_PRIMARY_NAMESPACE.to_string(),
@@ -1291,7 +1299,6 @@ fn build_with_store_internal(
12911299
return Err(BuildError::ReadFailed);
12921300
},
12931301
};
1294-
eprintln!("TIMING: [ldk-node] read_payments() took {}ms", step_start.elapsed().as_millis());
12951302

12961303
let step_start = Instant::now();
12971304
let (chain_source, chain_tip_opt) = match chain_data_source_config {
@@ -1511,11 +1518,22 @@ fn build_with_store_internal(
15111518
peer_storage_key,
15121519
));
15131520

1514-
// Initialize the network graph, scorer, and router
1521+
// PARALLEL GROUP 2: read_network_graph + read_event_queue
15151522
let step_start = Instant::now();
1516-
let network_graph = match runtime
1517-
.block_on(async { read_network_graph(Arc::clone(&kv_store), Arc::clone(&logger)).await })
1518-
{
1523+
let (network_graph_result, event_queue_result) = runtime.block_on(async {
1524+
let kv_store_1 = Arc::clone(&kv_store);
1525+
let logger_1 = Arc::clone(&logger);
1526+
let kv_store_2 = Arc::clone(&kv_store);
1527+
let logger_2 = Arc::clone(&logger);
1528+
1529+
tokio::join!(
1530+
read_network_graph(kv_store_1, logger_1),
1531+
read_event_queue(kv_store_2, logger_2)
1532+
)
1533+
});
1534+
eprintln!("TIMING: [ldk-node] PARALLEL read_network_graph + read_event_queue took {}ms", step_start.elapsed().as_millis());
1535+
1536+
let network_graph = match network_graph_result {
15191537
Ok(graph) => Arc::new(graph),
15201538
Err(e) => {
15211539
if e.kind() == std::io::ErrorKind::NotFound {
@@ -1526,7 +1544,18 @@ fn build_with_store_internal(
15261544
}
15271545
},
15281546
};
1529-
eprintln!("TIMING: [ldk-node] read_network_graph() took {}ms", step_start.elapsed().as_millis());
1547+
1548+
let event_queue = match event_queue_result {
1549+
Ok(event_queue) => Arc::new(event_queue),
1550+
Err(e) => {
1551+
if e.kind() == std::io::ErrorKind::NotFound {
1552+
Arc::new(EventQueue::new(Arc::clone(&kv_store), Arc::clone(&logger)))
1553+
} else {
1554+
log_error!(logger, "Failed to read event queue from store: {}", e);
1555+
return Err(BuildError::ReadFailed);
1556+
}
1557+
},
1558+
};
15301559

15311560
let step_start = Instant::now();
15321561
let local_scorer = match runtime.block_on(async {
@@ -1757,21 +1786,7 @@ fn build_with_store_internal(
17571786
};
17581787
eprintln!("TIMING: [ldk-node] gossip_source setup took {}ms", step_start.elapsed().as_millis());
17591788

1760-
let step_start = Instant::now();
1761-
let event_queue = match runtime
1762-
.block_on(async { read_event_queue(Arc::clone(&kv_store), Arc::clone(&logger)).await })
1763-
{
1764-
Ok(event_queue) => Arc::new(event_queue),
1765-
Err(e) => {
1766-
if e.kind() == std::io::ErrorKind::NotFound {
1767-
Arc::new(EventQueue::new(Arc::clone(&kv_store), Arc::clone(&logger)))
1768-
} else {
1769-
log_error!(logger, "Failed to read event queue from store: {}", e);
1770-
return Err(BuildError::ReadFailed);
1771-
}
1772-
},
1773-
};
1774-
eprintln!("TIMING: [ldk-node] read_event_queue() took {}ms", step_start.elapsed().as_millis());
1789+
// event_queue was already read in PARALLEL GROUP 2 above
17751790

17761791
let step_start = Instant::now();
17771792
let (liquidity_source, custom_message_handler) =
@@ -1885,18 +1900,33 @@ fn build_with_store_internal(
18851900
let connection_manager =
18861901
Arc::new(ConnectionManager::new(Arc::clone(&peer_manager), Arc::clone(&logger)));
18871902

1903+
// PARALLEL GROUP 3: read_output_sweeper + read_peer_info
18881904
let step_start = Instant::now();
1889-
let output_sweeper = match runtime.block_on(async {
1890-
read_output_sweeper(
1891-
Arc::clone(&tx_broadcaster),
1892-
Arc::clone(&fee_estimator),
1893-
Arc::clone(&chain_source),
1894-
Arc::clone(&keys_manager),
1895-
Arc::clone(&kv_store),
1896-
Arc::clone(&logger),
1905+
let (output_sweeper_result, peer_store_result) = runtime.block_on(async {
1906+
let tx_broadcaster_clone = Arc::clone(&tx_broadcaster);
1907+
let fee_estimator_clone = Arc::clone(&fee_estimator);
1908+
let chain_source_clone = Arc::clone(&chain_source);
1909+
let keys_manager_clone = Arc::clone(&keys_manager);
1910+
let kv_store_1 = Arc::clone(&kv_store);
1911+
let logger_1 = Arc::clone(&logger);
1912+
let kv_store_2 = Arc::clone(&kv_store);
1913+
let logger_2 = Arc::clone(&logger);
1914+
1915+
tokio::join!(
1916+
read_output_sweeper(
1917+
tx_broadcaster_clone,
1918+
fee_estimator_clone,
1919+
chain_source_clone,
1920+
keys_manager_clone,
1921+
kv_store_1,
1922+
logger_1,
1923+
),
1924+
read_peer_info(kv_store_2, logger_2)
18971925
)
1898-
.await
1899-
}) {
1926+
});
1927+
eprintln!("TIMING: [ldk-node] PARALLEL read_output_sweeper + read_peer_info took {}ms", step_start.elapsed().as_millis());
1928+
1929+
let output_sweeper = match output_sweeper_result {
19001930
Ok(output_sweeper) => Arc::new(output_sweeper),
19011931
Err(e) => {
19021932
if e.kind() == std::io::ErrorKind::NotFound {
@@ -1916,12 +1946,8 @@ fn build_with_store_internal(
19161946
}
19171947
},
19181948
};
1919-
eprintln!("TIMING: [ldk-node] read_output_sweeper() took {}ms", step_start.elapsed().as_millis());
19201949

1921-
let step_start = Instant::now();
1922-
let peer_store = match runtime
1923-
.block_on(async { read_peer_info(Arc::clone(&kv_store), Arc::clone(&logger)).await })
1924-
{
1950+
let peer_store = match peer_store_result {
19251951
Ok(peer_store) => Arc::new(peer_store),
19261952
Err(e) => {
19271953
if e.kind() == std::io::ErrorKind::NotFound {
@@ -1932,7 +1958,6 @@ fn build_with_store_internal(
19321958
}
19331959
},
19341960
};
1935-
eprintln!("TIMING: [ldk-node] read_peer_info() took {}ms", step_start.elapsed().as_millis());
19361961

19371962
let om_mailbox = if let Some(AsyncPaymentsRole::Server) = async_payments_role {
19381963
Some(Arc::new(OnionMessageMailbox::new()))

0 commit comments

Comments
 (0)