-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathremote_block_source.rs
More file actions
229 lines (206 loc) · 9.13 KB
/
remote_block_source.rs
File metadata and controls
229 lines (206 loc) · 9.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
//! Remote block source add-on for importing blocks from a remote L2 node
//! and building new blocks on top.
use crate::args::RemoteBlockSourceArgs;
use alloy_primitives::Signature;
use alloy_provider::{Provider, ProviderBuilder, RootProvider};
use alloy_rpc_client::RpcClient;
use alloy_transport::layers::RetryBackoffLayer;
use futures::StreamExt;
use reth_network_api::{FullNetwork, PeerId};
use reth_provider::BlockReader;
use reth_scroll_node::ScrollNetworkPrimitives;
use reth_tasks::shutdown::Shutdown;
use reth_tokio_util::EventStream;
use rollup_node_chain_orchestrator::{ChainOrchestratorEvent, ChainOrchestratorHandle};
use scroll_alloy_network::Scroll;
use scroll_network::NewBlockWithPeer;
use tokio::time::{interval, Duration};
/// Remote block source add-on that imports blocks from a trusted remote L2 node
/// and triggers block building on top of each imported block.
#[derive(Debug)]
pub struct RemoteBlockSourceAddOn<N>
where
N: FullNetwork<Primitives = ScrollNetworkPrimitives>,
{
/// Configuration for the remote block source.
config: RemoteBlockSourceArgs,
/// Handle to the chain orchestrator for sending commands.
orchestrator_handle: ChainOrchestratorHandle<N>,
/// An event stream for listening to chain orchestrator events, used to wait for block build
/// completion.
events: EventStream<ChainOrchestratorEvent>,
/// A provider for the remote node, used to fetch blocks and block information.
remote: RootProvider<Scroll>,
/// Tracks the last block number we imported from remote.
/// This is different from local head because we build blocks on top of imports.
last_imported_block: u64,
}
impl<N> RemoteBlockSourceAddOn<N>
where
N: FullNetwork<Primitives = ScrollNetworkPrimitives> + Send + Sync + 'static,
{
/// Creates a new remote block source add-on.
pub async fn new(
config: RemoteBlockSourceArgs,
handle: ChainOrchestratorHandle<N>,
provider: impl BlockReader,
) -> eyre::Result<Self> {
// Build remote provider with retry layer.
let Some(url) = config.url.clone() else {
tracing::error!(target: "scroll::remote_source", "URL required when remote-source is enabled");
return Err(eyre::eyre!("URL required when remote-source is enabled"));
};
let retry_layer = RetryBackoffLayer::new(10, 100, 330);
let client = RpcClient::builder().layer(retry_layer).http(url);
let remote = ProviderBuilder::<_, _, Scroll>::default().connect_client(client);
// Get event listener for waiting on block completion
let events = match handle.get_event_listener().await {
Ok(stream) => stream,
Err(e) => {
tracing::error!(target: "scroll::remote_source", ?e, "Failed to get event listener");
return Err(eyre::eyre!(e));
}
};
// Determine the last imported block by finding the highest common block
// between the local chain and the remote node.
let local_head = handle.status().await?.l2.fcs.head_block_info().number;
let remote_head = remote.get_block_number().await?;
let last_imported_block;
let mut search = local_head.min(remote_head);
loop {
if search == 0 {
// Genesis is always a common block (same chain spec assumed).
last_imported_block = 0;
break;
}
let local_hash = provider.block_hash(search)?;
let remote_block = remote.get_block_by_number(search.into()).await?;
match (local_hash, remote_block) {
(Some(lh), Some(rb)) if lh == rb.header.hash => {
last_imported_block = search;
break;
}
_ => {
search = search.saturating_sub(1);
}
}
}
tracing::info!(
target: "scroll::remote_source",
last_imported_block,
local_head,
remote_head,
"Determined highest common block with remote"
);
Ok(Self { config, orchestrator_handle: handle, events, remote, last_imported_block })
}
/// Runs the remote block source until shutdown.
pub async fn run_until_shutdown(mut self, mut shutdown: Shutdown) -> eyre::Result<()> {
let mut poll_interval = interval(Duration::from_millis(self.config.poll_interval_ms));
loop {
tokio::select! {
biased;
_guard = &mut shutdown => break,
_ = poll_interval.tick() => {
if let Err(e) = self.follow_and_build().await {
tracing::error!(target: "scroll::remote_source", ?e, "Sync error");
}
}
}
}
Ok(())
}
/// Follows the remote node and builds blocks on top of imported blocks.
async fn follow_and_build(&mut self) -> eyre::Result<()> {
loop {
// Get remote head
let remote_block = self
.remote
.get_block_by_number(alloy_eips::BlockNumberOrTag::Latest)
.full()
.await?
.ok_or_else(|| eyre::eyre!("Remote block not found"))?;
let remote_head = remote_block.header.number;
// Compare against last imported block
if remote_head <= self.last_imported_block {
tracing::trace!(target: "scroll::remote_source",
last_imported = self.last_imported_block,
remote_head,
"Already synced with remote");
return Ok(());
}
let blocks_behind = remote_head - self.last_imported_block;
tracing::info!(target: "scroll::remote_source",
last_imported = self.last_imported_block,
remote_head,
blocks_behind,
"Catching up");
// Fetch and import the next block from remote
let next_block_num = self.last_imported_block + 1;
let block = self
.remote
.get_block_by_number(next_block_num.into())
.full()
.await?
.ok_or_else(|| eyre::eyre!("Block {} not found", next_block_num))?
.into_consensus()
.map_transactions(|tx| tx.inner.into_inner());
// Create NewBlockWithPeer with dummy peer_id and signature (trusted source)
let block_with_peer = NewBlockWithPeer {
peer_id: PeerId::default(),
block,
signature: Signature::new(Default::default(), Default::default(), false),
};
// Import the block (this will cause a reorg if we had a locally built block at this
// height)
let chain_import = match self.orchestrator_handle.import_block(block_with_peer).await {
Ok(Ok(chain_import)) => {
self.last_imported_block = next_block_num;
chain_import
}
Ok(Err(e)) => {
return Err(eyre::eyre!("Import block failed: {}", e));
}
Err(e) => {
return Err(eyre::eyre!("chain orchestrator command channel error: {}", e));
}
};
if !chain_import.result.is_valid() {
tracing::info!(target: "scroll::remote_source",
result = ?chain_import.result,
"Imported block is not valid according to forkchoice, skipping build");
continue;
}
if !self.config.build {
tracing::debug!(target: "scroll::remote_source", "Imported block is valid, but build is disabled, skipping build");
continue;
}
if !self.orchestrator_handle.status().await?.is_synced() {
tracing::debug!(target: "scroll::remote_source", "Imported block is valid, but orchestrator is not synced, skipping build");
continue;
}
// Trigger block building on top of the imported block
self.orchestrator_handle.build_block();
// Wait for BlockSequenced event
tracing::debug!(target: "scroll::remote_source", "Waiting for block to be built...");
loop {
match self.events.next().await {
Some(ChainOrchestratorEvent::BlockSequenced(block)) => {
tracing::info!(target: "scroll::remote_source",
block_number = block.header.number,
block_hash = ?block.hash_slow(),
"Block built successfully, proceeding to next");
break;
}
Some(_) => {
// Ignore other events, keep waiting
}
None => {
return Err(eyre::eyre!("Event stream ended unexpectedly"));
}
}
}
// Loop continues to process next block
}
}
}