Skip to content

Commit 8761b85

Browse files
committed
Added txids to 'BlockEntry' and optimized block parsing thread usage.
1 parent 335b567 commit 8761b85

2 files changed

Lines changed: 31 additions & 20 deletions

File tree

src/new_index/fetch.rs

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use std::thread;
1616

1717
use electrs_macros::trace;
1818

19-
use crate::chain::{Block, BlockHash};
19+
use crate::chain::{Block, BlockHash, Txid};
2020
use crate::daemon::Daemon;
2121
use crate::errors::*;
2222
use crate::util::{spawn_thread, HeaderEntry, SyncChannel};
@@ -45,6 +45,8 @@ pub struct BlockEntry {
4545
pub block: Block,
4646
pub entry: HeaderEntry,
4747
pub size: u32,
48+
/// Pre-computed txids, must always correspond 1:1 with block.txdata
49+
pub txids: Vec<Txid>,
4850
}
4951

5052
type SizedBlock = (Block, u32);
@@ -106,10 +108,14 @@ fn bitcoind_fetcher(
106108
let block_entries: Vec<BlockEntry> = blocks
107109
.into_iter()
108110
.zip(entries)
109-
.map(|(block, entry)| BlockEntry {
110-
entry: entry.clone(), // TODO: remove this clone()
111-
size: block.total_size() as u32,
112-
block,
111+
.map(|(block, entry)| {
112+
let txids = block.txdata.iter().map(|tx| tx.compute_txid()).collect();
113+
BlockEntry {
114+
entry: entry.clone(), // TODO: remove this clone()
115+
size: block.total_size() as u32,
116+
txids,
117+
block,
118+
}
113119
})
114120
.collect();
115121
assert_eq!(block_entries.len(), entries.len());
@@ -156,7 +162,10 @@ fn blkfiles_fetcher(
156162
let blockhash = block.block_hash();
157163
entry_map
158164
.remove(&blockhash)
159-
.map(|entry| BlockEntry { block, entry, size })
165+
.map(|entry| {
166+
let txids = block.txdata.iter().map(|tx| tx.compute_txid()).collect();
167+
BlockEntry { block, entry, size, txids }
168+
})
160169
.or_else(|| {
161170
trace!("skipping block {}", blockhash);
162171
None
@@ -224,9 +233,14 @@ fn blkfiles_parser(blobs: Fetcher<Vec<u8>>, magic: u32) -> Fetcher<Vec<SizedBloc
224233
Fetcher::from(
225234
chan.into_receiver(),
226235
spawn_thread("blkfiles_parser", move || {
236+
let pool = rayon::ThreadPoolBuilder::new()
237+
.num_threads(0) // CPU-bound
238+
.thread_name(|i| format!("parse-blocks-{}", i))
239+
.build()
240+
.unwrap();
227241
blobs.map(|blob| {
228242
trace!("parsing {} bytes", blob.len());
229-
let blocks = parse_blocks(blob, magic).expect("failed to parse blk*.dat file");
243+
let blocks = parse_blocks(&pool, blob, magic).expect("failed to parse blk*.dat file");
230244
sender
231245
.send(blocks)
232246
.expect("failed to send blocks from blk*.dat file");
@@ -236,7 +250,7 @@ fn blkfiles_parser(blobs: Fetcher<Vec<u8>>, magic: u32) -> Fetcher<Vec<SizedBloc
236250
}
237251

238252
#[trace]
239-
fn parse_blocks(blob: Vec<u8>, magic: u32) -> Result<Vec<SizedBlock>> {
253+
fn parse_blocks(pool: &rayon::ThreadPool, blob: Vec<u8>, magic: u32) -> Result<Vec<SizedBlock>> {
240254
let mut cursor = Cursor::new(&blob);
241255
let mut slices = vec![];
242256
let max_pos = blob.len() as u64;
@@ -273,11 +287,6 @@ fn parse_blocks(blob: Vec<u8>, magic: u32) -> Result<Vec<SizedBlock>> {
273287
cursor.set_position(end as u64);
274288
}
275289

276-
let pool = rayon::ThreadPoolBuilder::new()
277-
.num_threads(0) // CPU-bound
278-
.thread_name(|i| format!("parse-blocks-{}", i))
279-
.build()
280-
.unwrap();
281290
Ok(pool.install(|| {
282291
slices
283292
.into_par_iter()

src/new_index/schema.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1193,13 +1193,12 @@ fn add_blocks(block_entries: &[BlockEntry], iconfig: &IndexerConfig) -> Vec<DBRo
11931193
.map(|b| {
11941194
let mut rows = vec![];
11951195
let blockhash = full_hash(&b.entry.hash()[..]);
1196-
let txids: Vec<Txid> = b.block.txdata.iter().map(|tx| tx.compute_txid()).collect();
1197-
for (tx, txid) in b.block.txdata.iter().zip(txids.iter()) {
1196+
for (tx, txid) in b.block.txdata.iter().zip(b.txids.iter()) {
11981197
add_transaction(*txid, tx, &mut rows, iconfig);
11991198
}
12001199

12011200
if !iconfig.light_mode {
1202-
rows.push(BlockRow::new_txids(blockhash, &txids).into_row());
1201+
rows.push(BlockRow::new_txids(blockhash, &b.txids).into_row());
12031202
rows.push(BlockRow::new_meta(blockhash, &BlockMeta::from(b)).into_row());
12041203
}
12051204

@@ -1290,9 +1289,10 @@ fn index_blocks(
12901289
.par_iter() // serialization is CPU-intensive
12911290
.map(|b| {
12921291
let mut rows = vec![];
1293-
for tx in &b.block.txdata {
1294-
let height = b.entry.height() as u32;
1295-
index_transaction(tx, height, previous_txos_map, &mut rows, iconfig);
1292+
let height = b.entry.height() as u32;
1293+
for (tx, txid) in b.block.txdata.iter().zip(b.txids.iter()) {
1294+
let txid_hash = full_hash(&txid[..]);
1295+
index_transaction(tx, txid_hash, height, previous_txos_map, &mut rows, iconfig);
12961296
}
12971297
rows.push(BlockRow::new_done(full_hash(&b.entry.hash()[..])).into_row()); // mark block as "indexed"
12981298
rows
@@ -1304,12 +1304,12 @@ fn index_blocks(
13041304
// TODO: return an iterator?
13051305
fn index_transaction(
13061306
tx: &Transaction,
1307+
txid: FullHash,
13071308
confirmed_height: u32,
13081309
previous_txos_map: &HashMap<OutPoint, TxOut>,
13091310
rows: &mut Vec<DBRow>,
13101311
iconfig: &IndexerConfig,
13111312
) {
1312-
let txid = full_hash(&tx.compute_txid()[..]);
13131313

13141314
// persist tx confirmation row:
13151315
// C{txid} → "{block_height}"
@@ -1911,7 +1911,9 @@ pub mod bench {
19111911
let height = 702861;
19121912
let hash = block.block_hash();
19131913
let header = block.header.clone();
1914+
let txids = block.txdata.iter().map(|tx| tx.compute_txid()).collect();
19141915
let block_entry = BlockEntry {
1916+
txids,
19151917
block,
19161918
entry: HeaderEntry::new(height, hash, header),
19171919
size: 0u32, // wrong but not needed for benching

0 commit comments

Comments
 (0)