Skip to content

Commit f46cffc

Browse files
committed
allow breaking inside for_blocks closures using ControlFlow
1 parent 1b06226 commit f46cffc

5 files changed

Lines changed: 42 additions & 15 deletions

File tree

src/daemon.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use serde_json::{json, Value};
88

99
use std::fs::File;
1010
use std::io::Read;
11+
use std::ops::ControlFlow;
1112
use std::path::Path;
1213

1314
use crate::{
@@ -229,10 +230,10 @@ impl Daemon {
229230
self.p2p.lock().get_new_headers(chain)
230231
}
231232

232-
pub(crate) fn for_blocks<B, F>(&self, blockhashes: B, func: F) -> Result<()>
233+
pub(crate) fn for_blocks<B, F, R>(&self, blockhashes: B, func: F) -> Result<ControlFlow<R>>
233234
where
234235
B: IntoIterator<Item = BlockHash>,
235-
F: FnMut(BlockHash, SerBlock),
236+
F: FnMut(BlockHash, SerBlock) -> ControlFlow<R>,
236237
{
237238
self.p2p.lock().for_blocks(blockhashes, func)
238239
}

src/index.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ impl Index {
210210
index_single_block(blockhash, block, height, &mut batch);
211211
});
212212
self.stats.height.set("tip", height as f64);
213+
ControlFlow::Continue::<()>(())
213214
})?;
214215
let heights: Vec<_> = heights.collect();
215216
assert!(

src/p2p.rs

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crossbeam_channel::{bounded, select, Receiver, Sender};
2222

2323
use std::io::{self, ErrorKind, Write};
2424
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};
25+
use std::ops::ControlFlow;
2526
use std::sync::Arc;
2627
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
2728

@@ -94,21 +95,26 @@ impl Connection {
9495
/// Request and process the specified blocks (in the specified order).
9596
/// See https://en.bitcoin.it/wiki/Protocol_documentation#getblocks for details.
9697
/// Defined as `&mut self` to prevent concurrent invocations (https://github.com/romanz/electrs/pull/526#issuecomment-934685515).
97-
pub(crate) fn for_blocks<B, F>(&mut self, blockhashes: B, mut func: F) -> Result<()>
98+
pub(crate) fn for_blocks<B, F, R>(
99+
&mut self,
100+
blockhashes: B,
101+
mut func: F,
102+
) -> Result<ControlFlow<R>>
98103
where
99104
B: IntoIterator<Item = BlockHash>,
100-
F: FnMut(BlockHash, SerBlock),
105+
F: FnMut(BlockHash, SerBlock) -> ControlFlow<R>,
101106
{
102107
self.blocks_duration.observe_duration("total", || {
103108
let blockhashes: Vec<BlockHash> = blockhashes.into_iter().collect();
104109
if blockhashes.is_empty() {
105-
return Ok(());
110+
return Ok(ControlFlow::Continue(()));
106111
}
107112
self.blocks_duration.observe_duration("request", || {
108113
debug!("loading {} blocks", blockhashes.len());
109114
self.req_send.send(Request::get_blocks(&blockhashes))
110115
})?;
111116

117+
let mut ret = ControlFlow::Continue(());
112118
for hash in blockhashes {
113119
let block = self.blocks_duration.observe_duration("response", || {
114120
let block = self
@@ -124,10 +130,13 @@ impl Connection {
124130
);
125131
Ok(block)
126132
})?;
127-
self.blocks_duration
128-
.observe_duration("process", || func(hash, block));
133+
if ret.is_continue() {
134+
ret = self
135+
.blocks_duration
136+
.observe_duration("process", || func(hash, block));
137+
}
129138
}
130-
Ok(())
139+
Ok(ret)
131140
})
132141
}
133142

src/status.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -308,10 +308,15 @@ impl ScriptHashStatus {
308308
}
309309

310310
/// Apply func only on the new blocks (fetched from daemon).
311-
fn for_new_blocks<B, F>(&self, blockhashes: B, daemon: &Daemon, func: F) -> Result<()>
311+
fn for_new_blocks<B, F, R>(
312+
&self,
313+
blockhashes: B,
314+
daemon: &Daemon,
315+
func: F,
316+
) -> Result<ControlFlow<R>>
312317
where
313318
B: IntoIterator<Item = BlockHash>,
314-
F: FnMut(BlockHash, SerBlock),
319+
F: FnMut(BlockHash, SerBlock) -> ControlFlow<R>,
315320
{
316321
daemon.for_blocks(
317322
blockhashes
@@ -347,6 +352,7 @@ impl ScriptHashStatus {
347352
.or_insert_with(|| TxEntry::new(filtered_outputs.txid))
348353
.outputs = filtered_outputs.result;
349354
}
355+
ControlFlow::Continue::<()>(())
350356
})?;
351357
let spending_blockhashes: HashSet<BlockHash> = outpoints
352358
.par_iter()
@@ -361,6 +367,7 @@ impl ScriptHashStatus {
361367
.or_insert_with(|| TxEntry::new(filtered_inputs.txid))
362368
.spent = filtered_inputs.result;
363369
}
370+
ControlFlow::Continue::<()>(())
364371
})?;
365372

366373
Ok(result

src/tracker.rs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use bitcoin_slices::{
55
Error::VisitBreak,
66
Visit,
77
};
8+
use std::ops::ControlFlow;
89

910
use crate::{
1011
cache::Cache,
@@ -105,17 +106,25 @@ impl Tracker {
105106
) -> Result<Option<(BlockHash, Transaction)>> {
106107
// Note: there are two blocks with coinbase transactions having same txid (see BIP-30)
107108
let blockhashes = self.index.filter_by_txid(txid);
108-
let mut result = None;
109-
daemon.for_blocks(blockhashes, |blockhash, block| {
109+
let result = daemon.for_blocks(blockhashes, |blockhash, block| {
110110
let mut visitor = FindTransaction::new(txid);
111111
match bsl::Block::visit(&block, &mut visitor) {
112112
Ok(_) | Err(VisitBreak) => (),
113113
Err(e) => panic!("core returned invalid block: {:?}", e),
114114
}
115-
if let Some(tx) = visitor.tx_found() {
116-
result = Some((blockhash, tx));
115+
match visitor.tx_found() {
116+
Some(tx) => ControlFlow::Break((blockhash, tx)),
117+
None => ControlFlow::Continue(()),
117118
}
118119
})?;
119-
Ok(result)
120+
Ok(control_flow_break_value(result))
121+
}
122+
}
123+
124+
/// See unstable ControlFlow::break_value (https://github.com/rust-lang/rust/issues/75744)
125+
fn control_flow_break_value<B, C>(value: ControlFlow<B, C>) -> Option<B> {
126+
match value {
127+
ControlFlow::Continue(..) => None,
128+
ControlFlow::Break(x) => Some(x),
120129
}
121130
}

0 commit comments

Comments
 (0)