From 3590bdc8a641f1730ec7ded0123288e523b40ec4 Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Tue, 5 May 2026 15:35:54 +0530 Subject: [PATCH 1/2] chain/ethereum: add unit tests for EthereumLogFilter receipt merge Cover all six insertion sites (from_data_sources arms, from_mapping, extend) and assert OR semantics across the three internal collections (contracts_and_events_graph, wildcard_events, events_with_topic_filters). Tests use two assertion helpers (assert_from_data_sources_or_merges and assert_extend_or_merges) parameterized over the match-arm or collection variant. Each helper runs both insertion orders so order-independence is verified per variant. The OR-semantics property test catches any flag-sequence regression. --- chain/ethereum/src/adapter.rs | 267 +++++++++++++++++++++++++++++++++- 1 file changed, 266 insertions(+), 1 deletion(-) diff --git a/chain/ethereum/src/adapter.rs b/chain/ethereum/src/adapter.rs index e9719aeff02..8d638dd8416 100644 --- a/chain/ethereum/src/adapter.rs +++ b/chain/ethereum/src/adapter.rs @@ -1849,7 +1849,7 @@ fn log_filter_require_transacion_receipt_method() { .into_iter() .collect(); - let events_with_topic_filters = HashMap::new(); // TODO(krishna): Test events with topic filters + let events_with_topic_filters = HashMap::new(); let alien_event_signature = b256(8); // those will not be inserted in the graph let alien_contract_address = address(9); @@ -1982,3 +1982,268 @@ fn log_filter_require_transacion_receipt_method() { &empty_vec )); } + +// Tests that `EthereumLogFilter` OR-merges per-handler `receipt` flags across +// every insertion site (`from_data_sources`, `from_mapping`, `extend`). + +#[cfg(test)] +fn receipt_merge_test_addr(n: u64) -> Address { + Address::left_padding_from(&n.to_be_bytes()) +} + +#[cfg(test)] +fn receipt_merge_test_sig(n: u64) -> B256 { + B256::left_padding_from(&n.to_be_bytes()) +} + +#[cfg(test)] +fn receipt_merge_test_mock_abi() -> std::sync::Arc { + std::sync::Arc::new(graph::data_source::common::MappingABI { + name: "mock_abi".to_string(), + contract: abi::JsonAbi::new(), + }) +} + +#[cfg(test)] +fn receipt_merge_test_event_handler( + sig: B256, + topic1: Option>, + topic2: Option>, + topic3: Option>, + receipt: bool, +) -> crate::data_source::MappingEventHandler { + crate::data_source::MappingEventHandler { + event: "Event()".to_string(), + topic0: Some(sig), + topic1, + topic2, + topic3, + handler: "handleEvent".to_string(), + receipt, + calls: graph::data_source::common::CallDecls::default(), + } +} + +#[cfg(test)] +fn receipt_merge_test_mapping( + handlers: Vec, +) -> crate::Mapping { + crate::Mapping { + kind: "ethereum/events".to_string(), + api_version: semver::Version::new(0, 0, 7), + language: "wasm/assemblyscript".to_string(), + entities: vec![], + abis: vec![receipt_merge_test_mock_abi()], + block_handlers: vec![], + call_handlers: vec![], + event_handlers: handlers, + runtime: std::sync::Arc::new(vec![]), + link: graph::prelude::Link { + link: "test".to_string(), + }, + } +} + +#[cfg(test)] +fn receipt_merge_test_data_source( + address: Option
, + handlers: Vec, +) -> crate::data_source::DataSource { + crate::data_source::DataSource { + kind: "ethereum/contract".to_string(), + network: Some("test".to_string()), + name: "Test".to_string(), + manifest_idx: 0, + address, + start_block: 0, + end_block: None, + mapping: receipt_merge_test_mapping(handlers), + context: std::sync::Arc::new(None), + creation_block: None, + contract_abi: receipt_merge_test_mock_abi(), + } +} + +/// Run two data sources with `receipt: true` and `receipt: false` at the +/// same effective filter key through `from_data_sources` and assert the +/// merged filter still requires a transaction receipt. Runs both +/// declaration orders so order-independence is verified per variant. +#[cfg(test)] +fn assert_from_data_sources_or_merges( + label: &str, + address: Option
, + topic1: Option>, + log_topics: &[B256], +) { + let event_sig = receipt_merge_test_sig(100); + let ds_yes = receipt_merge_test_data_source( + address, + vec![receipt_merge_test_event_handler( + event_sig, + topic1.clone(), + None, + None, + true, + )], + ); + let ds_no = receipt_merge_test_data_source( + address, + vec![receipt_merge_test_event_handler( + event_sig, + topic1.clone(), + None, + None, + false, + )], + ); + + for (order, dss) in [("yes,no", [&ds_yes, &ds_no]), ("no,yes", [&ds_no, &ds_yes])] { + let filter = EthereumLogFilter::from_data_sources(dss); + assert!( + filter.requires_transaction_receipt(&event_sig, address.as_ref(), log_topics), + "{label} ({order}): receipt:true must survive a later receipt:false", + ); + } +} + +/// Build two filters via `from_data_sources`, each with one handler at the +/// same effective key but with opposite receipt flags, then merge them via +/// `extend` and assert the merged filter still requires a transaction +/// receipt. Runs both extend directions so order-independence is verified +/// per variant. +#[cfg(test)] +fn assert_extend_or_merges( + label: &str, + address: Option
, + topic1: Option>, + log_topics: &[B256], +) { + let event_sig = receipt_merge_test_sig(105); + let ds_yes = receipt_merge_test_data_source( + address, + vec![receipt_merge_test_event_handler( + event_sig, + topic1.clone(), + None, + None, + true, + )], + ); + let ds_no = receipt_merge_test_data_source( + address, + vec![receipt_merge_test_event_handler( + event_sig, + topic1.clone(), + None, + None, + false, + )], + ); + + for (order, base, ext) in [ + ("yes.extend(no)", &ds_yes, &ds_no), + ("no.extend(yes)", &ds_no, &ds_yes), + ] { + let mut filter = EthereumLogFilter::from_data_sources([base]); + filter.extend(EthereumLogFilter::from_data_sources([ext])); + assert!( + filter.requires_transaction_receipt(&event_sig, address.as_ref(), log_topics), + "{label} ({order}): extend must OR-merge", + ); + } +} + +#[test] +fn from_data_sources_or_merges_at_every_insertion_site() { + let contract = receipt_merge_test_addr(1); + let event_sig = receipt_merge_test_sig(100); + let topic = receipt_merge_test_sig(200); + let with_topic = vec![event_sig, topic]; + + // Each case maps to one arm of the `match ds.address` in `from_data_sources`. + assert_from_data_sources_or_merges("graph edge", Some(contract), None, &[]); + assert_from_data_sources_or_merges( + "addressed topics", + Some(contract), + Some(vec![topic]), + &with_topic, + ); + assert_from_data_sources_or_merges("wildcard", None, None, &[]); + assert_from_data_sources_or_merges("wildcard topics", None, Some(vec![topic]), &with_topic); +} + +#[test] +fn from_mapping_or_merges_via_extend() { + // Two templates handling the same event signature with different receipt + // flags merged via `from_mapping` + `extend`. + let event_sig = receipt_merge_test_sig(104); + + let mapping_yes = receipt_merge_test_mapping(vec![receipt_merge_test_event_handler( + event_sig, None, None, None, true, + )]); + let mapping_no = receipt_merge_test_mapping(vec![receipt_merge_test_event_handler( + event_sig, None, None, None, false, + )]); + + let mut filter = EthereumLogFilter::from_mapping(&mapping_yes); + filter.extend(EthereumLogFilter::from_mapping(&mapping_no)); + + assert!(filter.requires_transaction_receipt(&event_sig, None, &[])); +} + +#[test] +fn extend_or_merges_at_every_collection() { + let contract = receipt_merge_test_addr(5); + let event_sig = receipt_merge_test_sig(105); + let topic = receipt_merge_test_sig(203); + let with_topic = vec![event_sig, topic]; + + assert_extend_or_merges("graph edge", Some(contract), None, &[]); + assert_extend_or_merges( + "addressed topics", + Some(contract), + Some(vec![topic]), + &with_topic, + ); + assert_extend_or_merges("wildcard", None, None, &[]); + assert_extend_or_merges("wildcard topics", None, Some(vec![topic]), &with_topic); +} + +#[test] +fn requires_transaction_receipt_has_or_semantics_across_handlers() { + // `requires_transaction_receipt` must return true iff at least one handler + // at the key declared `receipt: true`, regardless of declaration order. + let event_sig = receipt_merge_test_sig(108); + + for flags in [ + vec![false], + vec![true], + vec![true, false], + vec![false, true], + vec![false, false, true], + vec![true, false, false], + vec![false, true, false], + vec![true, true, false], + vec![false, false, false], + ] { + let dss: Vec<_> = flags + .iter() + .map(|&r| { + receipt_merge_test_data_source( + None, + vec![receipt_merge_test_event_handler( + event_sig, None, None, None, r, + )], + ) + }) + .collect(); + let filter = EthereumLogFilter::from_data_sources(dss.iter()); + + let any_requires_receipt = flags.iter().any(|&r| r); + assert_eq!( + filter.requires_transaction_receipt(&event_sig, None, &[]), + any_requires_receipt, + "flag sequence {flags:?}", + ); + } +} From a82f63dce422c39a7b855e2a5490dacd95afead0 Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Tue, 5 May 2026 15:36:28 +0530 Subject: [PATCH 2/2] chain/ethereum: OR-merge receipt requirements in EthereumLogFilter Wrap the three EthereumLogFilter collections in MergeMap and MergeGraph newtypes whose only mutators (or_insert / or_add_edge) OR-combine the bool weight with any existing value at the same key. The inner HashMap and GraphMap are private, so callers cannot bypass the merge by going through HashMap::insert or GraphMap::add_edge. When two handlers collide on the same filter key with differing receipt flags, the merged value is now the logical OR rather than the last write. Handlers that declared receipt: true now reliably receive transaction receipts even when another handler at the same key declared receipt: false. --- chain/ethereum/src/adapter.rs | 233 +++++++++++++++++++++++----------- 1 file changed, 162 insertions(+), 71 deletions(-) diff --git a/chain/ethereum/src/adapter.rs b/chain/ethereum/src/adapter.rs index 8d638dd8416..5204038879f 100644 --- a/chain/ethereum/src/adapter.rs +++ b/chain/ethereum/src/adapter.rs @@ -19,6 +19,7 @@ use prost_types::Any; use std::cmp; use std::collections::{HashMap, HashSet}; use std::fmt; +use std::hash::Hash; use thiserror::Error; use graph::prelude::*; @@ -351,15 +352,113 @@ pub struct EthereumLogFilter { /// Log filters can be represented as a bipartite graph between contracts and events. An edge /// exists between a contract and an event if a data source for the contract has a trigger for /// the event. - /// Edges are of `bool` type and indicates when a trigger requires a transaction receipt. - contracts_and_events_graph: GraphMap, + /// Edge weights are booleans indicating whether the trigger requires a transaction receipt. + contracts_and_events_graph: MergeGraph, /// Event sigs with no associated address, matching on all addresses. - /// Maps to a boolean representing if a trigger requires a transaction receipt. - wildcard_events: HashMap, - /// Events with any of the topic filters set - /// Maps to a boolean representing if a trigger requires a transaction receipt. - events_with_topic_filters: HashMap, + /// Values are booleans indicating whether the trigger requires a transaction receipt. + wildcard_events: MergeMap, + /// Events with any of the topic filters set. + /// Values are booleans indicating whether the trigger requires a transaction receipt. + events_with_topic_filters: MergeMap, +} + +/// `HashMap` wrapper whose values are OR-merged on every write. +/// +/// The only mutator that writes values is [`MergeMap::or_insert`] — the inner +/// `HashMap` is private so callers cannot bypass the merge via +/// `HashMap::insert`. Used by `EthereumLogFilter` to track per-key receipt +/// requirements where any handler asking for a receipt at a given key forces +/// receipt fetching. +#[derive(Clone, Debug)] +struct MergeMap(HashMap); + +impl Default for MergeMap { + fn default() -> Self { + Self(HashMap::new()) + } +} + +impl MergeMap { + fn or_insert(&mut self, k: K, v: bool) { + self.0.entry(k).and_modify(|e| *e |= v).or_insert(v); + } + + fn get(&self, k: &K) -> Option<&bool> { + self.0.get(k) + } + + fn contains_key(&self, k: &K) -> bool { + self.0.contains_key(k) + } + + fn iter(&self) -> impl Iterator + '_ { + self.0.iter() + } + + fn is_empty(&self) -> bool { + self.0.is_empty() + } +} + +impl IntoIterator for MergeMap { + type Item = (K, bool); + type IntoIter = std::collections::hash_map::IntoIter; + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} + +/// `GraphMap` wrapper that OR-merges edge weights on +/// every write. +/// +/// The only mutator that writes edge weights is [`MergeGraph::or_add_edge`] — +/// the inner `GraphMap` is private so callers cannot bypass the merge via +/// `GraphMap::add_edge`. +#[derive(Clone, Debug)] +struct MergeGraph(GraphMap); + +impl Default for MergeGraph { + fn default() -> Self { + Self(GraphMap::new()) + } +} + +impl MergeGraph { + fn or_add_edge(&mut self, a: LogFilterNode, b: LogFilterNode, v: bool) { + // Short-circuit on `v == true`: a `true` weight always wins over any + // prior weight, so we can skip the edge_weight lookup entirely. + let merged = v || self.0.edge_weight(a, b).copied().unwrap_or(false); + self.0.add_edge(a, b, merged); + } + + fn edge_weight(&self, a: LogFilterNode, b: LogFilterNode) -> Option<&bool> { + self.0.edge_weight(a, b) + } + + fn contains_edge(&self, a: LogFilterNode, b: LogFilterNode) -> bool { + self.0.contains_edge(a, b) + } + + fn edge_count(&self) -> usize { + self.0.edge_count() + } + + fn nodes(&self) -> impl Iterator + '_ { + self.0.nodes() + } + + fn neighbors(&self, n: LogFilterNode) -> impl Iterator + '_ { + self.0.neighbors(n) + } + + fn remove_node(&mut self, n: LogFilterNode) -> bool { + self.0.remove_node(n) + } + + fn all_edges(&self) -> impl Iterator { + self.0.all_edges() + } } impl From for Vec { @@ -458,14 +557,14 @@ impl EthereumLogFilter { let event_sig = event_handler.topic0(); match ds.address { Some(contract) if !event_handler.has_additional_topics() => { - this.contracts_and_events_graph.add_edge( + this.contracts_and_events_graph.or_add_edge( LogFilterNode::Contract(contract), LogFilterNode::Event(event_sig), event_handler.receipt, ); } Some(contract) => { - this.events_with_topic_filters.insert( + this.events_with_topic_filters.or_insert( EventSignatureWithTopics::new( Some(contract), event_sig, @@ -479,11 +578,11 @@ impl EthereumLogFilter { None if (!event_handler.has_additional_topics()) => { this.wildcard_events - .insert(event_sig, event_handler.receipt); + .or_insert(event_sig, event_handler.receipt); } None => { - this.events_with_topic_filters.insert( + this.events_with_topic_filters.or_insert( EventSignatureWithTopics::new( ds.address, event_sig, @@ -505,12 +604,18 @@ impl EthereumLogFilter { for event_handler in &mapping.event_handlers { let signature = event_handler.topic0(); this.wildcard_events - .insert(signature, event_handler.receipt); + .or_insert(signature, event_handler.receipt); } this } /// Extends this log filter with another one. + /// + /// The `receipt` flag stored at each filter key is OR-combined across the + /// two filters: a key whose receipt requirement is `true` in either filter + /// stays `true` in the merged result. Overwriting with the incoming value + /// would drop a prior `true` and silently downgrade receipt fetching for + /// any handler that declared `receipt: true` at that key. pub fn extend(&mut self, other: EthereumLogFilter) { if other.is_empty() { return; @@ -523,11 +628,14 @@ impl EthereumLogFilter { events_with_topic_filters, } = other; for (s, t, e) in contracts_and_events_graph.all_edges() { - self.contracts_and_events_graph.add_edge(s, t, *e); + self.contracts_and_events_graph.or_add_edge(s, t, *e); + } + for (k, v) in wildcard_events { + self.wildcard_events.or_insert(k, v); + } + for (k, v) in events_with_topic_filters { + self.events_with_topic_filters.or_insert(k, v); } - self.wildcard_events.extend(wildcard_events); - self.events_with_topic_filters - .extend(events_with_topic_filters); } /// An empty filter is one that never matches. @@ -555,15 +663,15 @@ impl EthereumLogFilter { // Start with the wildcard event filters. filters.extend( self.wildcard_events - .into_keys() - .map(EthGetLogsFilter::from_event), + .into_iter() + .map(|(k, _)| EthGetLogsFilter::from_event(k)), ); // Handle events with topic filters. filters.extend( self.events_with_topic_filters - .into_keys() - .map(EthGetLogsFilter::from_event_with_topics), + .into_iter() + .map(|(k, _)| EthGetLogsFilter::from_event_with_topics(k)), ); // The current algorithm is to repeatedly find the maximum cardinality vertex and turn all @@ -1194,7 +1302,6 @@ mod tests { use base64::prelude::*; use graph::blockchain::TriggerFilter as _; use graph::firehose::{CallToFilter, CombinedFilter, LogFilter, MultiLogFilter}; - use graph::petgraph::graphmap::GraphMap; use graph::prelude::EthereumCall; use graph::prelude::alloy::primitives::{Address, B256, Bytes, U256}; use hex::ToHex; @@ -1284,11 +1391,7 @@ mod tests { fn ethereum_trigger_filter_to_firehose() { let sig = |value: u64| B256::from(U256::from(value)); let mut filter = TriggerFilter { - log: EthereumLogFilter { - contracts_and_events_graph: GraphMap::new(), - wildcard_events: HashMap::new(), - events_with_topic_filters: HashMap::new(), - }, + log: EthereumLogFilter::default(), call: EthereumCallFilter { contract_addresses_function_signatures: HashMap::from_iter(vec![ (address(0), (0, HashSet::from_iter(vec![[0u8; 4]]))), @@ -1337,17 +1440,17 @@ mod tests { }, ]; - filter.log.contracts_and_events_graph.add_edge( + filter.log.contracts_and_events_graph.or_add_edge( LogFilterNode::Contract(address(10)), LogFilterNode::Event(sig(100)), false, ); - filter.log.contracts_and_events_graph.add_edge( + filter.log.contracts_and_events_graph.or_add_edge( LogFilterNode::Contract(address(10)), LogFilterNode::Event(sig(101)), false, ); - filter.log.contracts_and_events_graph.add_edge( + filter.log.contracts_and_events_graph.or_add_edge( LogFilterNode::Contract(address(20)), LogFilterNode::Event(sig(100)), false, @@ -1407,11 +1510,7 @@ mod tests { let address = |value: u64| Address::left_padding_from(&value.to_le_bytes()); let sig = |value: u64| B256::left_padding_from(&value.to_le_bytes()); let mut filter = TriggerFilter { - log: EthereumLogFilter { - contracts_and_events_graph: GraphMap::new(), - wildcard_events: HashMap::new(), - events_with_topic_filters: HashMap::new(), - }, + log: EthereumLogFilter::default(), call: EthereumCallFilter { contract_addresses_function_signatures: HashMap::new(), wildcard_signatures: HashSet::new(), @@ -1423,7 +1522,7 @@ mod tests { }, }; - filter.log.contracts_and_events_graph.add_edge( + filter.log.contracts_and_events_graph.or_add_edge( LogFilterNode::Contract(address(10)), LogFilterNode::Event(sig(101)), false, @@ -1759,10 +1858,10 @@ fn complete_log_filter() { let contracts: BTreeSet<_> = (0..j).map(|n| Address::from([n as u8; 20])).collect(); // Construct the complete bipartite graph with i events and j contracts. - let mut contracts_and_events_graph = GraphMap::new(); + let mut filter = EthereumLogFilter::default(); for &contract in &contracts { for &event in &events { - contracts_and_events_graph.add_edge( + filter.contracts_and_events_graph.or_add_edge( LogFilterNode::Contract(contract), LogFilterNode::Event(event), false, @@ -1771,13 +1870,9 @@ fn complete_log_filter() { } // Run `eth_get_logs_filters`, which is what we want to test. - let logs_filters: Vec<_> = EthereumLogFilter { - contracts_and_events_graph, - wildcard_events: HashMap::new(), - events_with_topic_filters: HashMap::new(), - } - .eth_get_logs_filters(ENV_VARS.get_logs_max_contracts) - .collect(); + let logs_filters: Vec<_> = filter + .eth_get_logs_filters(ENV_VARS.get_logs_max_contracts) + .collect(); // Assert that a contract or event is filtered on iff it was present in the graph. assert_eq!( @@ -1842,14 +1937,6 @@ fn log_filter_require_transacion_receipt_method() { let wildcard_event_with_receipt = b256(6); let wildcard_event_without_receipt = b256(7); - let wildcard_events = [ - (wildcard_event_with_receipt, true), - (wildcard_event_without_receipt, false), - ] - .into_iter() - .collect(); - - let events_with_topic_filters = HashMap::new(); let alien_event_signature = b256(8); // those will not be inserted in the graph let alien_contract_address = address(9); @@ -1877,25 +1964,29 @@ fn log_filter_require_transacion_receipt_method() { // event_b -- contract_a [ receipt=false ] // } // ``` - let mut contracts_and_events_graph = GraphMap::new(); - - let event_a_id = contracts_and_events_graph.add_node(event_a_node); - let event_b_id = contracts_and_events_graph.add_node(event_b_node); - let event_c_id = contracts_and_events_graph.add_node(event_c_node); - let contract_a_id = contracts_and_events_graph.add_node(contract_a_node); - let contract_b_id = contracts_and_events_graph.add_node(contract_b_node); - let contract_c_id = contracts_and_events_graph.add_node(contract_c_node); - contracts_and_events_graph.add_edge(event_a_id, contract_a_id, true); - contracts_and_events_graph.add_edge(event_b_id, contract_b_id, true); - contracts_and_events_graph.add_edge(event_a_id, contract_b_id, false); - contracts_and_events_graph.add_edge(event_b_id, contract_a_id, false); - contracts_and_events_graph.add_edge(event_c_id, contract_c_id, true); - - let filter = EthereumLogFilter { - contracts_and_events_graph, - wildcard_events, - events_with_topic_filters, - }; + // TODO(krishna): Test events with topic filters + let mut filter = EthereumLogFilter::default(); + filter + .contracts_and_events_graph + .or_add_edge(event_a_node, contract_a_node, true); + filter + .contracts_and_events_graph + .or_add_edge(event_b_node, contract_b_node, true); + filter + .contracts_and_events_graph + .or_add_edge(event_a_node, contract_b_node, false); + filter + .contracts_and_events_graph + .or_add_edge(event_b_node, contract_a_node, false); + filter + .contracts_and_events_graph + .or_add_edge(event_c_node, contract_c_node, true); + filter + .wildcard_events + .or_insert(wildcard_event_with_receipt, true); + filter + .wildcard_events + .or_insert(wildcard_event_without_receipt, false); let empty_vec: Vec = vec![];