From 140cc9f559071fcc351f13526ab17878f2acc807 Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Thu, 26 Feb 2026 15:57:02 +0400 Subject: [PATCH 1/2] graph, chain/ethereum: Move json_patch and json_block modules to graph crate Move EthereumJsonBlock and JSON patching utilities from chain/ethereum to graph/src/components/ethereum/ so they can be used by the store layer without circular dependencies. This prepares for typed block caching where CachedBlock::from_json() needs access to these utilities. --- chain/ethereum/src/chain.rs | 2 +- chain/ethereum/src/ethereum_adapter.rs | 2 +- chain/ethereum/src/lib.rs | 2 -- chain/ethereum/src/transport.rs | 2 +- .../src/components/ethereum}/json_block.rs | 6 +++--- .../src/components/ethereum}/json_patch.rs | 10 +++++----- graph/src/components/ethereum/mod.rs | 3 +++ 7 files changed, 14 insertions(+), 13 deletions(-) rename {chain/ethereum/src => graph/src/components/ethereum}/json_block.rs (92%) rename {chain/ethereum/src => graph/src/components/ethereum}/json_patch.rs (92%) diff --git a/chain/ethereum/src/chain.rs b/chain/ethereum/src/chain.rs index 6cc098651a2..5690353cc88 100644 --- a/chain/ethereum/src/chain.rs +++ b/chain/ethereum/src/chain.rs @@ -46,7 +46,6 @@ use crate::codec::HeaderOnlyBlock; use crate::data_source::DataSourceTemplate; use crate::data_source::UnresolvedDataSourceTemplate; use crate::ingestor::PollingBlockIngestor; -use crate::json_block::EthereumJsonBlock; use crate::network::EthereumNetworkAdapters; use crate::polling_block_stream::PollingBlockStream; use crate::runtime::runtime_adapter::eth_call_gas; @@ -66,6 +65,7 @@ use graph::blockchain::block_stream::{ BlockStream, BlockStreamBuilder, BlockStreamError, BlockStreamMapper, FirehoseCursor, TriggersAdapterWrapper, }; +use graph::components::ethereum::EthereumJsonBlock; /// Celo Mainnet: 42220, Testnet Alfajores: 44787, Testnet Baklava: 62320 const CELO_CHAIN_IDS: [u64; 3] = [42220, 44787, 62320]; diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index 5d9b78118a0..14d10221efa 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -66,7 +66,6 @@ use crate::adapter::EthereumRpcError; use crate::adapter::ProviderStatus; use crate::call_helper::interpret_eth_call_error; use crate::chain::BlockFinality; -use crate::json_block::EthereumJsonBlock; use crate::trigger::{LogPosition, LogRef}; use crate::Chain; use crate::NodeCapabilities; @@ -80,6 +79,7 @@ use crate::{ trigger::{EthereumBlockTriggerType, EthereumTrigger}, ENV_VARS, }; +use graph::components::ethereum::EthereumJsonBlock; type AlloyProvider = FillProvider< JoinFill< diff --git a/chain/ethereum/src/lib.rs b/chain/ethereum/src/lib.rs index 48bdb7a6a11..b21eb4717ae 100644 --- a/chain/ethereum/src/lib.rs +++ b/chain/ethereum/src/lib.rs @@ -7,8 +7,6 @@ mod data_source; mod env; mod ethereum_adapter; mod ingestor; -mod json_block; -mod json_patch; mod polling_block_stream; pub mod runtime; mod transport; diff --git a/chain/ethereum/src/transport.rs b/chain/ethereum/src/transport.rs index 474e2dbaed3..0929dda9d1c 100644 --- a/chain/ethereum/src/transport.rs +++ b/chain/ethereum/src/transport.rs @@ -1,5 +1,5 @@ -use crate::json_patch; use alloy::transports::{TransportError, TransportErrorKind, TransportFut}; +use graph::components::ethereum::json_patch; use graph::components::network_provider::ProviderName; use graph::endpoint::{ConnectionType, EndpointMetrics, RequestLabels}; use graph::prelude::alloy::rpc::json_rpc::{RequestPacket, ResponsePacket}; diff --git a/chain/ethereum/src/json_block.rs b/graph/src/components/ethereum/json_block.rs similarity index 92% rename from chain/ethereum/src/json_block.rs rename to graph/src/components/ethereum/json_block.rs index 5525e7fb7a8..2d7e39f31d3 100644 --- a/chain/ethereum/src/json_block.rs +++ b/graph/src/components/ethereum/json_block.rs @@ -1,7 +1,7 @@ -use graph::prelude::serde_json::{self as json, Value}; -use graph::prelude::{EthereumBlock, LightEthereumBlock}; +use serde_json::{self as json, Value}; -use crate::json_patch; +use super::json_patch; +use super::types::{EthereumBlock, LightEthereumBlock}; #[derive(Debug)] pub struct EthereumJsonBlock(Value); diff --git a/chain/ethereum/src/json_patch.rs b/graph/src/components/ethereum/json_patch.rs similarity index 92% rename from chain/ethereum/src/json_patch.rs rename to graph/src/components/ethereum/json_patch.rs index d6a46f79ceb..8e2daff4684 100644 --- a/chain/ethereum/src/json_patch.rs +++ b/graph/src/components/ethereum/json_patch.rs @@ -7,9 +7,9 @@ //! //! Also used by `PatchingHttp` for chains that don't support EIP-2718 typed transactions. -use graph::prelude::serde_json::Value; +use serde_json::Value; -pub(crate) fn patch_type_field(obj: &mut Value) -> bool { +pub fn patch_type_field(obj: &mut Value) -> bool { if let Value::Object(map) = obj { if !map.contains_key("type") { map.insert("type".to_string(), Value::String("0x0".to_string())); @@ -19,7 +19,7 @@ pub(crate) fn patch_type_field(obj: &mut Value) -> bool { false } -pub(crate) fn patch_block_transactions(block: &mut Value) -> bool { +pub fn patch_block_transactions(block: &mut Value) -> bool { let Some(txs) = block.get_mut("transactions").and_then(|t| t.as_array_mut()) else { return false; }; @@ -30,7 +30,7 @@ pub(crate) fn patch_block_transactions(block: &mut Value) -> bool { patched } -pub(crate) fn patch_receipts(result: &mut Value) -> bool { +pub fn patch_receipts(result: &mut Value) -> bool { match result { Value::Object(_) => patch_type_field(result), Value::Array(arr) => { @@ -47,7 +47,7 @@ pub(crate) fn patch_receipts(result: &mut Value) -> bool { #[cfg(test)] mod tests { use super::*; - use graph::prelude::serde_json::json; + use serde_json::json; #[test] fn patch_type_field_adds_missing_type() { diff --git a/graph/src/components/ethereum/mod.rs b/graph/src/components/ethereum/mod.rs index 1fe8d2ac0d3..dfb7657a92b 100644 --- a/graph/src/components/ethereum/mod.rs +++ b/graph/src/components/ethereum/mod.rs @@ -1,6 +1,9 @@ +pub mod json_block; +pub mod json_patch; mod network; mod types; +pub use self::json_block::EthereumJsonBlock; pub use self::network::AnyNetworkBare; pub use self::types::{ AnyBlock, AnyTransaction, AnyTransactionReceiptBare, EthereumBlock, EthereumBlockWithCalls, From 4ba28d6aac0f22998d5ed501792aeef1c0290422 Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Fri, 27 Feb 2026 15:56:41 +0400 Subject: [PATCH 2/2] graph: Add CachedBlock enum and typed block cache --- chain/ethereum/src/chain.rs | 86 +---- chain/ethereum/src/ethereum_adapter.rs | 19 +- graph/src/blockchain/mock.rs | 8 +- graph/src/components/ethereum/json_block.rs | 16 +- graph/src/components/ethereum/mod.rs | 4 +- graph/src/components/ethereum/types.rs | 56 ++- graph/src/components/store/traits.rs | 11 +- node/src/manager/commands/check_blocks.rs | 2 +- server/index-node/src/resolver.rs | 4 +- store/postgres/src/chain_store.rs | 325 +++++++++++------- store/test-store/tests/postgres/chain_head.rs | 5 +- 11 files changed, 304 insertions(+), 232 deletions(-) diff --git a/chain/ethereum/src/chain.rs b/chain/ethereum/src/chain.rs index 5690353cc88..14119ddc8fa 100644 --- a/chain/ethereum/src/chain.rs +++ b/chain/ethereum/src/chain.rs @@ -65,8 +65,6 @@ use graph::blockchain::block_stream::{ BlockStream, BlockStreamBuilder, BlockStreamError, BlockStreamMapper, FirehoseCursor, TriggersAdapterWrapper, }; -use graph::components::ethereum::EthereumJsonBlock; - /// Celo Mainnet: 42220, Testnet Alfajores: 44787, Testnet Baklava: 62320 const CELO_CHAIN_IDS: [u64; 3] = [42220, 44787, 62320]; @@ -1076,55 +1074,26 @@ impl TriggersAdapterTrait for TriggersAdapter { .ancestor_block(ptr.clone(), offset, root.clone()) .await?; - // First check if we have the ancestor in cache and can deserialize it. - // The cached JSON can be in one of three formats: - // 1. Full RPC format: {"block": {...}, "transaction_receipts": [...]} - // 2. Shallow/header-only: {"timestamp": "...", "data": null} - only timestamp, no block data - // 3. Legacy direct: block fields at root level {hash, number, transactions, ...} - // We need full format with receipts for ancestor_block (used for trigger processing). + // Use full blocks (with receipts) directly from cache. + // Light blocks (no receipts) need to be fetched from Firehose/RPC. let block_ptr = match cached { - Some((json, ptr)) => { - let json_block = EthereumJsonBlock::new(json); - if json_block.is_shallow() { - trace!( - self.logger, - "Cached block #{} {} is shallow (header-only). Falling back to Firehose/RPC.", - ptr.number, - ptr.hash_hex(), - ); - ptr - } else if json_block.is_legacy_format() { + Some((cached_block, ptr)) => match cached_block.into_full_block() { + Some(block) => { + return Ok(Some(BlockFinality::NonFinal(EthereumBlockWithCalls { + ethereum_block: block, + calls: None, + }))); + } + None => { trace!( self.logger, - "Cached block #{} {} is legacy light format. Falling back to Firehose/RPC.", + "Cached block #{} {} is light (no receipts). Falling back to Firehose/RPC.", ptr.number, ptr.hash_hex(), ); ptr - } else { - match json_block.into_full_block() { - Ok(block) => { - return Ok(Some(BlockFinality::NonFinal(EthereumBlockWithCalls { - ethereum_block: block, - calls: None, - }))); - } - Err(e) => { - warn!( - self.logger, - "Failed to deserialize cached ancestor block #{} {} (offset {} from #{}): {}. \ - Falling back to Firehose/RPC.", - ptr.number, - ptr.hash_hex(), - offset, - ptr_for_log.number, - e - ); - ptr - } - } } - } + }, None => { // Cache miss - fall back to walking the chain via parent_ptr() calls. // This provides resilience when the block cache is empty (e.g., after truncation). @@ -1179,35 +1148,10 @@ impl TriggersAdapterTrait for TriggersAdapter { let block = match self.chain_client.as_ref() { ChainClient::Firehose(endpoints) => { let chain_store = self.chain_store.cheap_clone(); - // First try to get the block from the store - // See ancestor_block() for documentation of the 3 cached JSON formats. + // First try to get the block from the store (typed cache) if let Ok(blocks) = chain_store.blocks(vec![block.hash.clone()]).await { - if let Some(cached_json) = blocks.into_iter().next() { - let json_block = EthereumJsonBlock::new(cached_json); - if json_block.is_shallow() { - trace!( - self.logger, - "Cached block #{} {} is shallow. Falling back to Firehose.", - block.number, - block.hash_hex(), - ); - } else { - match json_block.into_light_block() { - Ok(light_block) => { - return Ok(light_block.parent_ptr()); - } - Err(e) => { - warn!( - self.logger, - "Failed to deserialize cached block #{} {}: {}. \ - Falling back to Firehose.", - block.number, - block.hash_hex(), - e - ); - } - } - } + if let Some(cached_block) = blocks.into_iter().next() { + return Ok(cached_block.light_block().parent_ptr()); } } diff --git a/chain/ethereum/src/ethereum_adapter.rs b/chain/ethereum/src/ethereum_adapter.rs index 14d10221efa..78680e9e46f 100644 --- a/chain/ethereum/src/ethereum_adapter.rs +++ b/chain/ethereum/src/ethereum_adapter.rs @@ -79,7 +79,6 @@ use crate::{ trigger::{EthereumBlockTriggerType, EthereumTrigger}, ENV_VARS, }; -use graph::components::ethereum::EthereumJsonBlock; type AlloyProvider = FillProvider< JoinFill< @@ -1641,23 +1640,7 @@ impl EthereumAdapterTrait for EthereumAdapter { .map_err(|e| error!(&logger, "Error accessing block cache {}", e)) .unwrap_or_default() .into_iter() - .filter_map(|value| { - let json_block = EthereumJsonBlock::new(value); - if json_block.is_shallow() { - return None; - } - json_block - .into_light_block() - .map_err(|e| { - warn!( - &logger, - "Failed to deserialize cached block: {}. Block will be re-fetched from RPC.", - e - ); - }) - .ok() - }) - .map(Arc::new) + .map(|cached| Arc::new(cached.into_light_block())) .collect(); let missing_blocks = Vec::from_iter( diff --git a/graph/src/blockchain/mock.rs b/graph/src/blockchain/mock.rs index 31980a996f2..701bca62eb6 100644 --- a/graph/src/blockchain/mock.rs +++ b/graph/src/blockchain/mock.rs @@ -1,6 +1,7 @@ use crate::{ bail, components::{ + ethereum::CachedBlock, link_resolver::LinkResolver, network_provider::ChainName, store::{ @@ -527,7 +528,10 @@ impl ChainStore for MockChainStore { ) -> Result, Error> { unimplemented!() } - async fn blocks(self: Arc, _hashes: Vec) -> Result, Error> { + async fn blocks(self: Arc, _hashes: Vec) -> Result, Error> { + unimplemented!() + } + async fn blocks_as_json(self: Arc, _hashes: Vec) -> Result, Error> { unimplemented!() } async fn ancestor_block( @@ -535,7 +539,7 @@ impl ChainStore for MockChainStore { _block_ptr: BlockPtr, _offset: BlockNumber, _root: Option, - ) -> Result, Error> { + ) -> Result, Error> { unimplemented!() } async fn cleanup_cached_blocks( diff --git a/graph/src/components/ethereum/json_block.rs b/graph/src/components/ethereum/json_block.rs index 2d7e39f31d3..33f7311351a 100644 --- a/graph/src/components/ethereum/json_block.rs +++ b/graph/src/components/ethereum/json_block.rs @@ -1,7 +1,7 @@ use serde_json::{self as json, Value}; use super::json_patch; -use super::types::{EthereumBlock, LightEthereumBlock}; +use super::types::{CachedBlock, EthereumBlock, LightEthereumBlock}; #[derive(Debug)] pub struct EthereumJsonBlock(Value); @@ -49,4 +49,18 @@ impl EthereumJsonBlock { json_patch::patch_block_transactions(&mut inner); json::from_value(inner) } + + /// Tries to deserialize into a `CachedBlock`. Uses `transaction_receipts` + /// presence to decide between full and light block, avoiding a JSON clone. + pub fn try_into_cached_block(self) -> Option { + let has_receipts = self + .0 + .get("transaction_receipts") + .is_some_and(|v| !v.is_null()); + if has_receipts { + self.into_full_block().ok().map(CachedBlock::Full) + } else { + self.into_light_block().ok().map(CachedBlock::Light) + } + } } diff --git a/graph/src/components/ethereum/mod.rs b/graph/src/components/ethereum/mod.rs index dfb7657a92b..256cc7f34a4 100644 --- a/graph/src/components/ethereum/mod.rs +++ b/graph/src/components/ethereum/mod.rs @@ -6,8 +6,8 @@ mod types; pub use self::json_block::EthereumJsonBlock; pub use self::network::AnyNetworkBare; pub use self::types::{ - AnyBlock, AnyTransaction, AnyTransactionReceiptBare, EthereumBlock, EthereumBlockWithCalls, - EthereumCall, LightEthereumBlock, LightEthereumBlockExt, + AnyBlock, AnyTransaction, AnyTransactionReceiptBare, CachedBlock, EthereumBlock, + EthereumBlockWithCalls, EthereumCall, LightEthereumBlock, LightEthereumBlockExt, }; // Re-export Alloy network types for convenience diff --git a/graph/src/components/ethereum/types.rs b/graph/src/components/ethereum/types.rs index 70db118d312..d59a327cea9 100644 --- a/graph/src/components/ethereum/types.rs +++ b/graph/src/components/ethereum/types.rs @@ -17,6 +17,8 @@ use crate::{ prelude::BlockNumber, }; +use super::json_block::EthereumJsonBlock; + pub type AnyTransaction = Transaction; pub type AnyBlock = Block>; /// Like alloy's `AnyTransactionReceipt` but without the `WithOtherFields` wrapper, @@ -24,7 +26,7 @@ pub type AnyBlock = Block>; pub type AnyTransactionReceiptBare = TransactionReceipt>; #[allow(dead_code)] -#[derive(Debug, Deserialize, Serialize)] +#[derive(Clone, Debug, Deserialize, Serialize)] pub struct LightEthereumBlock(AnyBlock); impl Default for LightEthereumBlock { @@ -259,3 +261,55 @@ impl<'a> From<&'a EthereumCall> for BlockPtr { BlockPtr::from((call.block_hash, call.block_number)) } } + +/// Typed cached block for Ethereum. Stores the deserialized block so that +/// repeated reads from the in-memory cache avoid `serde_json::from_value()`. +#[derive(Clone, Debug)] +#[allow(clippy::large_enum_variant)] +pub enum CachedBlock { + Full(EthereumBlock), + Light(LightEthereumBlock), +} + +impl CachedBlock { + pub fn light_block(&self) -> &LightEthereumBlock { + match self { + CachedBlock::Full(block) => &block.block, + CachedBlock::Light(block) => block, + } + } + + pub fn into_light_block(self) -> LightEthereumBlock { + match self { + CachedBlock::Full(block) => block.block.as_ref().clone(), + CachedBlock::Light(block) => block, + } + } + + pub fn into_full_block(self) -> Option { + match self { + CachedBlock::Full(block) => Some(block), + CachedBlock::Light(_) => None, + } + } + + pub fn from_json(value: serde_json::Value) -> Option { + let json_block = EthereumJsonBlock::new(value); + if json_block.is_shallow() { + return None; + } + json_block.try_into_cached_block() + } + + pub fn timestamp(&self) -> Option { + Some(self.light_block().timestamp_u64()) + } + + pub fn parent_ptr(&self) -> Option { + self.light_block().parent_ptr() + } + + pub fn ptr(&self) -> BlockPtr { + self.light_block().block_ptr() + } +} diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 658baa8be3e..fc77f5799a8 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -7,6 +7,7 @@ use async_trait::async_trait; use super::*; use crate::blockchain::block_stream::{EntitySourceOperation, FirehoseCursor}; use crate::blockchain::{BlockTime, ChainIdentifier, ExtendedBlockPtr}; +use crate::components::ethereum::CachedBlock; use crate::components::metrics::stopwatch::StopwatchMetrics; use crate::components::network_provider::ChainName; use crate::components::server::index_node::VersionInfo; @@ -553,8 +554,12 @@ pub trait ChainStore: ChainHeadStore { ancestor_count: BlockNumber, ) -> Result, Error>; - /// Returns the blocks present in the store. - async fn blocks( + /// Returns the blocks present in the store as typed cached blocks. + async fn blocks(self: Arc, hashes: Vec) -> Result, Error>; + + /// Returns blocks as raw JSON. Used by callers that need the original + /// JSON representation (e.g., GraphQL block queries, CLI tools). + async fn blocks_as_json( self: Arc, hashes: Vec, ) -> Result, Error>; @@ -584,7 +589,7 @@ pub trait ChainStore: ChainHeadStore { block_ptr: BlockPtr, offset: BlockNumber, root: Option, - ) -> Result, Error>; + ) -> Result, Error>; /// Remove old blocks from the cache we maintain in the database and /// return a pair containing the number of the oldest block retained diff --git a/node/src/manager/commands/check_blocks.rs b/node/src/manager/commands/check_blocks.rs index f6a4506a2f8..39f8611f1bb 100644 --- a/node/src/manager/commands/check_blocks.rs +++ b/node/src/manager/commands/check_blocks.rs @@ -187,7 +187,7 @@ mod steps { block_hash: B256, chain_store: Arc, ) -> anyhow::Result { - let blocks = chain_store.blocks(vec![block_hash.into()]).await?; + let blocks = chain_store.blocks_as_json(vec![block_hash.into()]).await?; match blocks.len() { 0 => bail!("Failed to locate block with hash {} in store", block_hash), 1 => {} diff --git a/server/index-node/src/resolver.rs b/server/index-node/src/resolver.rs index b8385866d33..197b705e513 100644 --- a/server/index-node/src/resolver.rs +++ b/server/index-node/src/resolver.rs @@ -222,7 +222,9 @@ where return Ok(r::Value::Null); }; - let blocks_res = chain_store.blocks(vec![block_hash.cheap_clone()]).await; + let blocks_res = chain_store + .blocks_as_json(vec![block_hash.cheap_clone()]) + .await; Ok(match blocks_res { Ok(blocks) if blocks.is_empty() => { error!( diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index 41bbb6add18..3fb188b8006 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -29,6 +29,7 @@ use std::{ use graph::blockchain::{Block, BlockHash, ChainIdentifier, ExtendedBlockPtr}; use graph::cheap_clone::CheapClone; +use graph::components::ethereum::CachedBlock; use graph::prelude::{ serde_json as json, transaction_receipt::LightTransactionReceipt, BlockNumber, BlockPtr, CachedEthereumCall, ChainStore as ChainStoreTrait, Error, EthereumCallCache, StoreError, @@ -63,6 +64,29 @@ impl JsonBlock { .and_then(|ts| ts.as_str()) .and_then(|ts| ts.parse::().ok()) } + + fn into_cache_block(self) -> CacheBlock { + let data = self.data.and_then(CachedBlock::from_json); + CacheBlock { + ptr: self.ptr, + parent_hash: self.parent_hash, + data, + } + } +} + +/// Typed version of JsonBlock for the in-memory cache. +#[derive(Clone, Debug)] +struct CacheBlock { + ptr: BlockPtr, + parent_hash: BlockHash, + data: Option, +} + +impl CacheBlock { + fn timestamp(&self) -> Option { + self.data.as_ref().and_then(|d| d.timestamp()) + } } /// Tables in the 'public' database schema that store chain-specific data @@ -2149,7 +2173,7 @@ pub struct ChainStore { blocks_by_hash_cache: HerdCache, StoreError>>>, blocks_by_number_cache: HerdCache>, StoreError>>>, - ancestor_cache: HerdCache, StoreError>>>, + ancestor_cache: HerdCache, StoreError>>>, /// Adaptive cache for chain_head_ptr() chain_head_ptr_cache: ChainHeadPtrCache, /// Herd cache to prevent thundering herd on chain_head_ptr() lookups @@ -2519,6 +2543,22 @@ fn json_block_to_block_ptr_ext(json_block: &JsonBlock) -> Result Result { + let hash = block.ptr.hash.clone(); + let number = block.ptr.number; + let parent_hash = block.parent_hash.clone(); + + let timestamp = block + .timestamp() + .ok_or_else(|| anyhow!("Timestamp is missing"))?; + + let ptr = + ExtendedBlockPtr::try_from((hash.as_b256(), number, parent_hash.as_b256(), timestamp)) + .map_err(|e| anyhow!("Failed to convert to ExtendedBlockPtr: {}", e))?; + + Ok(ptr) +} + #[async_trait] impl ChainHeadStore for ChainStore { async fn chain_head_ptr(self: Arc) -> Result, Error> { @@ -2645,8 +2685,8 @@ impl ChainStoreTrait for ChainStore { async fn upsert_block(&self, block: Arc) -> Result<(), Error> { // We should always have the parent block available to us at this point. if let Some(parent_hash) = block.parent_hash() { - let block = JsonBlock::new(block.ptr(), parent_hash, block.data().ok()); - self.recent_blocks_cache.insert_block(block); + let json_block = JsonBlock::new(block.ptr(), parent_hash, block.data().ok()); + self.recent_blocks_cache.insert_json_block(json_block); } let mut conn = self.pool.get_permitted().await?; @@ -2686,126 +2726,142 @@ impl ChainStoreTrait for ChainStore { self: Arc, numbers: Vec, ) -> Result>, Error> { - let result = if ENV_VARS.store.disable_block_cache_for_lookup { + if ENV_VARS.store.disable_block_cache_for_lookup { let values = self.blocks_from_store_by_numbers(numbers).await?; - values - } else { - let cached = self.recent_blocks_cache.get_block_ptrs_by_numbers(&numbers); - - let stored = if cached.len() < numbers.len() { - let missing_numbers = numbers - .iter() - .filter(|num| !cached.iter().any(|(ptr, _)| ptr.block_number() == **num)) - .cloned() - .collect::>(); - - let this = self.clone(); - let missing_clone = missing_numbers.clone(); - let (res, _) = self - .cached_lookup(&self.blocks_by_number_cache, &missing_numbers, async move { - this.blocks_from_store_by_numbers(missing_clone).await - }) - .await; - - match res { - Ok(blocks) => { - for blocks_for_num in blocks.values() { - if blocks.len() == 1 { - self.recent_blocks_cache - .insert_block(blocks_for_num[0].clone()); - } + let ptrs = values + .into_iter() + .map(|(num, blocks)| { + let ptrs = blocks + .into_iter() + .filter_map(|block| json_block_to_block_ptr_ext(&block).ok()) + .collect(); + (num, ptrs) + }) + .collect(); + + return Ok(ptrs); + } + + let cached = self.recent_blocks_cache.get_block_ptrs_by_numbers(&numbers); + + let stored = if cached.len() < numbers.len() { + let missing_numbers = numbers + .iter() + .filter(|num| !cached.iter().any(|(ptr, _)| ptr.block_number() == **num)) + .cloned() + .collect::>(); + + let this = self.clone(); + let missing_clone = missing_numbers.clone(); + let (res, _) = self + .cached_lookup(&self.blocks_by_number_cache, &missing_numbers, async move { + this.blocks_from_store_by_numbers(missing_clone).await + }) + .await; + + match res { + Ok(blocks) => { + for blocks_for_num in blocks.values() { + if blocks_for_num.len() == 1 { + self.recent_blocks_cache + .insert_json_block(blocks_for_num[0].clone()); } - blocks - } - Err(e) => { - return Err(e.into()); } + blocks + } + Err(e) => { + return Err(e.into()); } - } else { - BTreeMap::new() - }; - - let cached_map = cached - .into_iter() - .map(|(ptr, data)| (ptr.block_number(), vec![data])) - .collect::>(); - - let mut result = cached_map; - for (num, blocks) in stored { - result.entry(num).or_insert(blocks); } - - result + } else { + BTreeMap::new() }; - let ptrs = result + let cached_map: BTreeMap> = cached .into_iter() - .map(|(num, blocks)| { - let ptrs = blocks - .into_iter() - .filter_map(|block| json_block_to_block_ptr_ext(&block).ok()) - .collect(); - (num, ptrs) + .filter_map(|(_, block)| { + cache_block_to_block_ptr_ext(&block) + .ok() + .map(|ptr| (block.ptr.number, vec![ptr])) }) .collect(); - Ok(ptrs) + let mut result = cached_map; + for (num, json_blocks) in stored { + let ptrs: Vec<_> = json_blocks + .into_iter() + .filter_map(|block| json_block_to_block_ptr_ext(&block).ok()) + .collect(); + result.entry(num).or_insert(ptrs); + } + + Ok(result) } - async fn blocks(self: Arc, hashes: Vec) -> Result, Error> { + async fn blocks(self: Arc, hashes: Vec) -> Result, Error> { if ENV_VARS.store.disable_block_cache_for_lookup { let values = self .blocks_from_store(hashes) .await? .into_iter() - .filter_map(|block| block.data) + .filter_map(|block| block.data.and_then(CachedBlock::from_json)) .collect(); - Ok(values) - } else { - let cached = self.recent_blocks_cache.get_blocks_by_hash(&hashes); - let stored = if cached.len() < hashes.len() { - let hashes = hashes - .iter() - .filter(|hash| !cached.iter().any(|(ptr, _)| &ptr.hash == *hash)) - .cloned() - .collect::>(); - // We key this off the entire list of hashes, which means - // that concurrent attempts that look up `[h1, h2]` and - // `[h1, h3]` will still run two queries and duplicate the - // lookup of `h1`. Noticing that the two requests should be - // serialized would require a lot more work, and going to - // the database for one block hash, `h3`, is not much faster - // than looking up `[h1, h3]` though it would require less - // IO bandwidth - let this = self.clone(); - let hashes_clone = hashes.clone(); - let (res, _) = self - .cached_lookup(&self.blocks_by_hash_cache, &hashes, async move { - this.blocks_from_store(hashes_clone).await - }) - .await; - - match res { - Ok(blocks) => { - for block in &blocks { - self.recent_blocks_cache.insert_block(block.clone()); - } - blocks - } - Err(e) => { - return Err(e.into()); - } + return Ok(values); + } + + let cached = self.recent_blocks_cache.get_blocks_by_hash(&hashes); + let stored = if cached.len() < hashes.len() { + let hashes = hashes + .iter() + .filter(|hash| !cached.iter().any(|(ptr, _)| &ptr.hash == *hash)) + .cloned() + .collect::>(); + // We key this off the entire list of hashes, which means + // that concurrent attempts that look up `[h1, h2]` and + // `[h1, h3]` will still run two queries and duplicate the + // lookup of `h1`. Noticing that the two requests should be + // serialized would require a lot more work, and going to + // the database for one block hash, `h3`, is not much faster + // than looking up `[h1, h3]` though it would require less + // IO bandwidth + let this = self.clone(); + let hashes_clone = hashes.clone(); + let (res, _) = self + .cached_lookup(&self.blocks_by_hash_cache, &hashes, async move { + this.blocks_from_store(hashes_clone).await + }) + .await; + + match res { + Ok(blocks) => blocks + .into_iter() + .filter_map(|block| self.recent_blocks_cache.insert_json_block(block)) + .collect(), + Err(e) => { + return Err(e.into()); } - } else { - Vec::new() - }; + } + } else { + Vec::new() + }; - let mut result = cached.into_iter().map(|(_, data)| data).collect::>(); - let stored = stored.into_iter().filter_map(|block| block.data); - result.extend(stored); - Ok(result) - } + let mut result: Vec = cached.into_iter().map(|(_, data)| data).collect(); + result.extend(stored); + Ok(result) + } + + async fn blocks_as_json( + self: Arc, + hashes: Vec, + ) -> Result, Error> { + let values = self + .blocks_from_store(hashes) + .await? + .into_iter() + .filter_map(|block| block.data) + .collect(); + Ok(values) } async fn ancestor_block( @@ -2813,7 +2869,7 @@ impl ChainStoreTrait for ChainStore { block_ptr: BlockPtr, offset: BlockNumber, root: Option, - ) -> Result, Error> { + ) -> Result, Error> { ensure!( block_ptr.number >= offset, "block offset {} for block `{}` points to before genesis block", @@ -2842,29 +2898,31 @@ impl ChainStoreTrait for ChainStore { // Cache miss, query the database let mut conn = this.pool.get_permitted().await?; - let result = this + let json_result = this .storage .ancestor_block(&mut conn, block_ptr_clone, offset, root_clone) .await .map_err(StoreError::from)?; - // Insert into cache if we got a result - if let Some((ref data, ref ptr)) = result { - // Extract parent_hash from data["block"]["parentHash"] or - // data["parentHash"] - if let Some(parent_hash) = data - .get("block") - .unwrap_or(data) - .get("parentHash") - .and_then(|h| h.as_str()) - .and_then(|h| h.parse().ok()) - { - let block = JsonBlock::new(ptr.clone(), parent_hash, Some(data.clone())); - this.recent_blocks_cache.insert_block(block); + // Insert into cache and reuse the typed block + match json_result { + Some((data, ptr)) => { + let cached = if let Some(parent_hash) = data + .get("block") + .unwrap_or(&data) + .get("parentHash") + .and_then(|h| h.as_str()) + .and_then(|h| h.parse().ok()) + { + let json_block = JsonBlock::new(ptr.clone(), parent_hash, Some(data)); + this.recent_blocks_cache.insert_json_block(json_block) + } else { + CachedBlock::from_json(data) + }; + Ok(cached.map(|c| (c, ptr))) } + None => Ok(None), } - - Ok(result) }) .await; let result = res?; @@ -3062,20 +3120,20 @@ mod recent_blocks_cache { // entries. If there are multiple writes for the same block number, // the last one wins. Note that because of NEAR, the block numbers // might have gaps. - blocks: BTreeMap, + blocks: BTreeMap, // We only store these many blocks. capacity: usize, } impl Inner { - fn get_block_by_hash(&self, hash: &BlockHash) -> Option<(&BlockPtr, &json::Value)> { + fn get_block_by_hash(&self, hash: &BlockHash) -> Option<(&BlockPtr, &CachedBlock)> { self.blocks .values() .find(|block| &block.ptr.hash == hash) .and_then(|block| block.data.as_ref().map(|data| (&block.ptr, data))) } - fn get_block_by_number(&self, number: BlockNumber) -> Option<&JsonBlock> { + fn get_block_by_number(&self, number: BlockNumber) -> Option<&CacheBlock> { self.blocks.get(&number) } @@ -3083,7 +3141,7 @@ mod recent_blocks_cache { &self, child_ptr: &BlockPtr, offset: BlockNumber, - ) -> Option<(&BlockPtr, Option<&json::Value>)> { + ) -> Option<(&BlockPtr, Option<&CachedBlock>)> { let child = self.blocks.get(&child_ptr.number)?; if &child.ptr != child_ptr { return None; @@ -3104,7 +3162,7 @@ mod recent_blocks_cache { self.blocks.last_key_value().map(|b| &b.1.ptr) } - fn earliest_block(&self) -> Option<&JsonBlock> { + fn earliest_block(&self) -> Option<&CacheBlock> { self.blocks.first_key_value().map(|b| b.1) } @@ -3134,7 +3192,7 @@ mod recent_blocks_cache { .set(self.chain_head().map(|b| b.number).unwrap_or(0) as f64); } - fn insert_block(&mut self, block: JsonBlock) { + fn insert_block(&mut self, block: CacheBlock) { self.blocks.insert(block.ptr.number, block); self.evict_if_necessary(); } @@ -3174,7 +3232,7 @@ mod recent_blocks_cache { &self, child: &BlockPtr, offset: BlockNumber, - ) -> Option<(BlockPtr, Option)> { + ) -> Option<(BlockPtr, Option)> { let block_opt = self .inner .read() @@ -3191,7 +3249,7 @@ mod recent_blocks_cache { block_opt } - pub fn get_blocks_by_hash(&self, hashes: &[BlockHash]) -> Vec<(BlockPtr, json::Value)> { + pub fn get_blocks_by_hash(&self, hashes: &[BlockHash]) -> Vec<(BlockPtr, CachedBlock)> { let inner = self.inner.read(); let blocks: Vec<_> = hashes .iter() @@ -3209,9 +3267,9 @@ mod recent_blocks_cache { pub fn get_block_ptrs_by_numbers( &self, numbers: &[BlockNumber], - ) -> Vec<(BlockPtr, JsonBlock)> { + ) -> Vec<(BlockPtr, CacheBlock)> { let inner = self.inner.read(); - let mut blocks: Vec<(BlockPtr, JsonBlock)> = Vec::new(); + let mut blocks: Vec<(BlockPtr, CacheBlock)> = Vec::new(); for &number in numbers { if let Some(block) = inner.get_block_by_number(number) { @@ -3232,11 +3290,20 @@ mod recent_blocks_cache { /// its associated `data`. Note that for this to work, `child` must be /// in the cache already. The first block in the cache should be /// inserted via [`RecentBlocksCache::set_chain_head`]. - pub(super) fn insert_block(&self, block: JsonBlock) { + pub(super) fn insert_block(&self, block: CacheBlock) { self.inner.write().insert_block(block); self.inner.read().update_write_metrics(); } + /// Deserialize a JsonBlock into a typed CacheBlock and insert it into the cache. + /// Returns the deserialized data so callers can reuse it without re-deserializing. + pub(super) fn insert_json_block(&self, block: JsonBlock) -> Option { + let cache_block = block.into_cache_block(); + let data = cache_block.data.clone(); + self.insert_block(cache_block); + data + } + #[cfg(debug_assertions)] pub fn blocks(&self) -> Vec<(BlockPtr, BlockHash)> { self.inner diff --git a/store/test-store/tests/postgres/chain_head.rs b/store/test-store/tests/postgres/chain_head.rs index 6ee612de191..941fe6f9773 100644 --- a/store/test-store/tests/postgres/chain_head.rs +++ b/store/test-store/tests/postgres/chain_head.rs @@ -12,7 +12,7 @@ use std::future::Future; use std::sync::Arc; use graph::cheap_clone::CheapClone; -use graph::prelude::{alloy, serde_json as json, EthereumBlock}; +use graph::prelude::alloy; use graph::prelude::{anyhow::anyhow, anyhow::Error}; use graph::prelude::{BlockNumber, QueryStoreManager, QueryTarget}; use graph::{components::store::BlockStore as _, prelude::DeploymentHash}; @@ -328,8 +328,7 @@ fn check_ancestor( return Err(anyhow!("expected ptr `{}` but got `{}`", exp_ptr, act_ptr)); } - let act_block = json::from_value::(act.0)?; - let act_hash = format!("{:x}", act_block.block.hash()); + let act_hash = format!("{:x}", act.0.light_block().hash()); let exp_hash = &exp.hash; if &act_hash != exp_hash {