Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions chain/ethereum/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
}
}

async fn is_on_main_chain(&self, ptr: BlockPtr) -> Result<bool, Error> {
async fn is_on_main_chain(&self, ptr: BlockPtr) -> Result<Option<BlockPtr>, Error> {
match &*self.chain_client {
ChainClient::Firehose(endpoints) => {
let endpoint = endpoints.endpoint().await?;
Expand All @@ -1034,15 +1034,34 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
"Failed to fetch block {} from firehose",
ptr.number
))?;
Ok(block.hash() == ptr.hash)
if block.hash() == ptr.hash {
Ok(None)
} else {
Ok(Some(block.parent_ptr().ok_or_else(|| {
anyhow!(
"canonical block at {} has no parent; cannot determine revert target",
ptr.number
)
})?))
}
}
ChainClient::Rpc(adapter) => {
let adapter = adapter
.cheapest()
.await
.ok_or_else(|| anyhow!("unable to get adapter for is_on_main_chain"))?;

adapter.is_on_main_chain(&self.logger, ptr).await
let canonical = adapter
.next_existing_ptr_to_number(&self.logger, ptr.number)
.await?;
if canonical == ptr {
Ok(None)
} else {
let parent = adapter
.next_existing_ptr_to_number(&self.logger, ptr.number - 1)
.await?;
Ok(Some(parent))
}
}
}
}
Expand Down
23 changes: 0 additions & 23 deletions chain/ethereum/src/ethereum_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -884,29 +884,6 @@ impl EthereumAdapter {
.map(|b| BlockPtr::from((b.header.hash, b.header.number)))
}

/// Check if `block_ptr` refers to a block that is on the main chain, according to the Ethereum
/// node.
///
/// Careful: don't use this function without considering race conditions.
/// Chain reorgs could happen at any time, and could affect the answer received.
/// Generally, it is only safe to use this function with blocks that have received enough
/// confirmations to guarantee no further reorgs, **and** where the Ethereum node is aware of
/// those confirmations.
/// If the Ethereum node is far behind in processing blocks, even old blocks can be subject to
/// reorgs.
pub(crate) async fn is_on_main_chain(
&self,
logger: &Logger,
block_ptr: BlockPtr,
) -> Result<bool, Error> {
// TODO: This considers null blocks, but we could instead bail if we encounter one as a
// small optimization.
let canonical_block = self
.next_existing_ptr_to_number(logger, block_ptr.number)
.await?;
Ok(canonical_block == block_ptr)
}

pub(crate) fn logs_in_block_range(
&self,
logger: &Logger,
Expand Down
16 changes: 5 additions & 11 deletions chain/ethereum/src/polling_block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,20 +272,14 @@ impl PollingBlockStreamContext {
// This allows us to ask the node: does subgraph_ptr point to a block that was
// permanently accepted into the main chain, or does it point to a block that was
// uncled?
let is_on_main_chain = match &subgraph_ptr {
let canonical_parent = match &subgraph_ptr {
Some(ptr) => ctx.adapter.is_on_main_chain(ptr.clone()).await?,
None => true,
None => None,
};
if !is_on_main_chain {
if let Some(canonical_parent) = canonical_parent {
// The subgraph ptr points to a block that was uncled.
// We need to revert this block.
//
// Note: We can safely unwrap the subgraph ptr here, because
// if it was `None`, `is_on_main_chain` would be true.
let from = subgraph_ptr.unwrap();
let parent = self.parent_ptr(&from, "is_on_main_chain").await?;

return Ok(ReconciliationStep::Revert(parent));
// Revert to the canonical parent provided by is_on_main_chain.
return Ok(ReconciliationStep::Revert(canonical_parent));
}

// The subgraph ptr points to a block on the main chain.
Expand Down
2 changes: 1 addition & 1 deletion chain/near/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ impl TriggersAdapterTrait<Chain> for TriggersAdapter {
Ok(BlockWithTriggers::new(block, trigger_data, logger))
}

async fn is_on_main_chain(&self, _ptr: BlockPtr) -> Result<bool, Error> {
async fn is_on_main_chain(&self, _ptr: BlockPtr) -> Result<Option<BlockPtr>, Error> {
panic!("Should never be called since not used by FirehoseBlockStream")
}

Expand Down
4 changes: 2 additions & 2 deletions gnd/src/commands/test/noop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ impl<C: Blockchain> TriggersAdapter<C> for NoopTriggersAdapter<C> {
Ok(BlockWithTriggers::new(block, Vec::new(), &logger))
}

async fn is_on_main_chain(&self, _ptr: BlockPtr) -> Result<bool, Error> {
Ok(true)
async fn is_on_main_chain(&self, _ptr: BlockPtr) -> Result<Option<BlockPtr>, Error> {
Ok(None)
}

async fn parent_ptr(&self, block: &BlockPtr) -> Result<Option<BlockPtr>, Error> {
Expand Down
9 changes: 5 additions & 4 deletions graph/src/blockchain/block_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
.await
}

pub async fn is_on_main_chain(&self, ptr: BlockPtr) -> Result<bool, Error> {
pub async fn is_on_main_chain(&self, ptr: BlockPtr) -> Result<Option<BlockPtr>, Error> {
self.adapter.is_on_main_chain(ptr).await
}

Expand Down Expand Up @@ -610,9 +610,10 @@ pub trait TriggersAdapter<C: Blockchain>: Send + Sync {
filter: &C::TriggerFilter,
) -> Result<BlockWithTriggers<C>, Error>;

/// Return `true` if the block with the given hash and number is on the
/// main chain, i.e., the chain going back from the current chain head.
async fn is_on_main_chain(&self, ptr: BlockPtr) -> Result<bool, Error>;
/// Check whether the block is on the main chain. Returns `None` if it
/// is, or `Some(revert_to)` with the canonical parent pointer to revert
/// to if the block has been reorged out.
async fn is_on_main_chain(&self, ptr: BlockPtr) -> Result<Option<BlockPtr>, Error>;

/// Get pointer to parent of `block`. This is called when reverting `block`.
async fn parent_ptr(&self, block: &BlockPtr) -> Result<Option<BlockPtr>, Error>;
Expand Down
2 changes: 1 addition & 1 deletion graph/src/blockchain/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ impl TriggersAdapter<MockBlockchain> for MockTriggersAdapter {
todo!()
}

async fn is_on_main_chain(&self, _ptr: BlockPtr) -> Result<bool, Error> {
async fn is_on_main_chain(&self, _ptr: BlockPtr) -> Result<Option<BlockPtr>, Error> {
todo!()
}

Expand Down
2 changes: 2 additions & 0 deletions runtime/test/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#![cfg(test)]
// Deep async nesting in tests exceeds the default limit (128) on newer rustc versions.
#![recursion_limit = "256"]
pub mod common;
mod test;

Expand Down
5 changes: 4 additions & 1 deletion store/postgres/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,10 @@ pub async fn revert_block_ptr(
match affected_rows {
1 => Ok(()),
0 => Err(StoreError::Unknown(anyhow!(
"No rows affected. This could be due to an attempt to revert beyond earliest_block + reorg_threshold",
"No rows affected. The revert target (block {}) may be beyond earliest_block + \
reorg_threshold, or the revert target may be a stale block from a previous reorg \
that no longer exists on the canonical chain",
ptr.number,
))),
_ => Err(StoreError::Unknown(anyhow!(
"Expected to update 1 row, but {} rows were affected",
Expand Down
2 changes: 1 addition & 1 deletion tests/src/fixture/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1080,7 +1080,7 @@ impl<C: Blockchain> TriggersAdapter<C> for MockTriggersAdapter<C> {
(self.triggers_in_block)(block)
}

async fn is_on_main_chain(&self, _ptr: BlockPtr) -> Result<bool, Error> {
async fn is_on_main_chain(&self, _ptr: BlockPtr) -> Result<Option<BlockPtr>, Error> {
todo!()
}

Expand Down