Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions src/utils/block_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! Host chain block watcher that subscribes to new blocks and tracks the
//! current host block number.
//! Block watcher that subscribes to new blocks and tracks the current block
//! number for any chain.

use alloy::{
network::Ethereum,
Expand All @@ -12,33 +12,33 @@ use tokio::{
};
use tracing::{debug, error, trace};

/// Host chain block watcher that subscribes to new blocks and broadcasts
/// updates via a watch channel.
/// Block watcher that subscribes to new blocks and broadcasts updates via a
/// watch channel.
#[derive(Debug)]
pub struct BlockWatcher {
/// Watch channel responsible for broadcasting block number updates.
block_number: watch::Sender<u64>,

/// Host chain provider.
host_provider: RootProvider<Ethereum>,
/// Provider for the chain being watched.
provider: RootProvider<Ethereum>,
}

impl BlockWatcher {
/// Creates a new [`BlockWatcher`] with the given provider and initial
/// block number.
pub fn new(host_provider: RootProvider<Ethereum>, initial: u64) -> Self {
pub fn new(provider: RootProvider<Ethereum>, initial: u64) -> Self {
Self {
block_number: watch::channel(initial).0,
host_provider,
provider,
}
}

/// Creates a new [`BlockWatcher`], fetching the current block number first.
pub async fn with_current_block(
host_provider: RootProvider<Ethereum>,
provider: RootProvider<Ethereum>,
) -> Result<Self, TransportError> {
let block_number = host_provider.get_block_number().await?;
Ok(Self::new(host_provider, block_number))
let block_number = provider.get_block_number().await?;
Ok(Self::new(provider, block_number))
}

/// Subscribe to block number updates.
Expand All @@ -52,22 +52,22 @@ impl BlockWatcher {
}

async fn task_future(self) {
let mut sub = match self.host_provider.subscribe_blocks().await {
let mut sub = match self.provider.subscribe_blocks().await {
Ok(sub) => sub,
Err(error) => {
error!(%error);
return;
}
};

debug!("subscribed to host chain blocks");
debug!("subscribed to blocks");

loop {
match sub.recv().await {
Ok(header) => {
let block_number = header.number;
self.block_number.send_replace(block_number);
trace!(block_number, "updated host block number");
trace!(block_number, "updated block number");
}
Err(RecvError::Lagged(missed)) => {
debug!(%missed, "block subscription lagged");
Expand Down