From de7ec8fb3cdd1b44aeb0c9a0051aa6385e0aae55 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Tue, 24 Feb 2026 16:20:50 -0800 Subject: [PATCH 01/10] core: Only log monitor send errors, not successes The surrounding code already logs enough on success --- core/src/amp_subgraph/monitor.rs | 21 +++++++++++---------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/core/src/amp_subgraph/monitor.rs b/core/src/amp_subgraph/monitor.rs index 2bdd32ab504..9236e560e25 100644 --- a/core/src/amp_subgraph/monitor.rs +++ b/core/src/amp_subgraph/monitor.rs @@ -22,7 +22,11 @@ use graph::{ cheap_clone::CheapClone, components::store::DeploymentLocator, log::factory::LoggerFactory, }; use slog::{debug, error, info, warn, Logger}; -use tokio::{sync::mpsc, task::JoinHandle, time::timeout}; +use tokio::{ + sync::mpsc::{self, error::SendError}, + task::JoinHandle, + time::timeout, +}; use tokio_util::sync::CancellationToken; /// Represents the maximum amount of time a subgraph instance is allowed to run @@ -137,7 +141,7 @@ impl Monitor { .new(slog::o!("method" => "start")); info!(logger, "Starting subgraph"); - handle_send_result( + log_send_error( &logger, self.command_tx.send(Command::Start { id: self.subgraph_instance_id.fetch_add(1, SeqCst), @@ -164,7 +168,7 @@ impl Monitor { .new(slog::o!("method" => "stop")); info!(logger, "Stopping subgraph"); - handle_send_result(&logger, self.command_tx.send(Command::Stop { deployment })); + log_send_error(&logger, self.command_tx.send(Command::Stop { deployment })); } /// Processes commands sent through the command channel. @@ -390,7 +394,7 @@ impl Monitor { if let Some(pending_start_command) = pending_start_commands.remove(&deployment) { debug!(logger, "Resending a pending start command"); - handle_send_result(&logger, command_tx.send(pending_start_command)); + log_send_error(&logger, command_tx.send(pending_start_command)); } } @@ -476,7 +480,7 @@ impl Monitor { } debug!(logger, "Sending clear command"); - handle_send_result(&logger, command_tx.send(Command::Clear { id, deployment })); + log_send_error(&logger, command_tx.send(Command::Clear { id, deployment })); } }); @@ -546,13 +550,10 @@ impl fmt::Debug for Command { } } -fn handle_send_result( - logger: &Logger, - result: Result<(), tokio::sync::mpsc::error::SendError>, -) { +fn log_send_error(logger: &Logger, result: Result<(), SendError>) { match result { Ok(()) => { - debug!(logger, "Command was sent successfully"); + // No need to log anything } // This should only happen if the parent cancel token of the subgraph monitor was cancelled From 2e735ae629ab05907d6a520eb55e0ff9720fcfa5 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Tue, 24 Feb 2026 16:30:03 -0800 Subject: [PATCH 02/10] core: Remove some more logs The log in stream_aggregator can become very spammy with a small block range The log in monitor isn't really needed since surrounding code already logs enough information --- core/src/amp_subgraph/monitor.rs | 4 ---- graph/src/amp/stream_aggregator/mod.rs | 5 ----- 2 files changed, 9 deletions(-) diff --git a/core/src/amp_subgraph/monitor.rs b/core/src/amp_subgraph/monitor.rs index 9236e560e25..93cf30252a6 100644 --- a/core/src/amp_subgraph/monitor.rs +++ b/core/src/amp_subgraph/monitor.rs @@ -188,10 +188,6 @@ impl Monitor { loop { tokio::select! { Some(command) = command_rx.recv() => { - debug!(logger, "Processing a new command"; - "command" => ?command - ); - match &command { Command::Start { .. } => { Self::process_start_command( diff --git a/graph/src/amp/stream_aggregator/mod.rs b/graph/src/amp/stream_aggregator/mod.rs index e2f0892252f..1ed94bd2177 100644 --- a/graph/src/amp/stream_aggregator/mod.rs +++ b/graph/src/amp/stream_aggregator/mod.rs @@ -101,11 +101,6 @@ impl StreamAggregator { let num_streams = named_streams.len(); - info!(logger, "Initializing stream aggregator"; - "num_streams" => num_streams, - "max_buffer_size" => max_buffer_size - ); - Self { named_streams, buffer: Buffer::new(num_streams, max_buffer_size), From fa272ef6f3543ea56d933217070236b5cd96c983 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 25 Feb 2026 11:11:55 -0800 Subject: [PATCH 03/10] core: Extract helpers from new_data_stream loop body Refactor the loop body in new_data_stream into focused helper functions to improve readability and prepare for lazy stream chaining: - build_query_streams: builds SQL queries and fires them via the client - build_data_stream: creates StreamAggregator with metrics and reorg check - Refactor next_block_ranges/next_block_range to take specific fields instead of &Context - Simplify min_start_block/max_end_block as one-liner aggregates No behavior change. --- core/src/amp_subgraph/runner/context.rs | 8 - core/src/amp_subgraph/runner/data_stream.rs | 192 ++++++++++++-------- 2 files changed, 115 insertions(+), 85 deletions(-) diff --git a/core/src/amp_subgraph/runner/context.rs b/core/src/amp_subgraph/runner/context.rs index 32e96148acf..e559c009726 100644 --- a/core/src/amp_subgraph/runner/context.rs +++ b/core/src/amp_subgraph/runner/context.rs @@ -77,14 +77,6 @@ impl Context { .map(|block_ptr| (block_ptr.number.compat(), block_ptr.hash.compat())) } - pub(super) fn total_queries(&self) -> usize { - self.manifest - .data_sources - .iter() - .map(|data_source| data_source.transformer.tables.len()) - .sum() - } - pub(super) fn min_start_block(&self) -> BlockNumber { self.manifest .data_sources diff --git a/core/src/amp_subgraph/runner/data_stream.rs b/core/src/amp_subgraph/runner/data_stream.rs index ad6d6d471f8..a868a701190 100644 --- a/core/src/amp_subgraph/runner/data_stream.rs +++ b/core/src/amp_subgraph/runner/data_stream.rs @@ -8,11 +8,14 @@ use futures::{ }; use graph::{ amp::{ + client::ResponseBatch, + error::IsDeterministic, manifest::DataSource, stream_aggregator::{RecordBatchGroups, StreamAggregator}, Client, }, cheap_clone::CheapClone, + prelude::StopwatchMetrics, }; use slog::{debug, warn}; @@ -29,7 +32,6 @@ where { let logger = cx.logger.new(slog::o!("process" => "new_data_stream")); - let total_queries = cx.total_queries(); let mut total_queries_to_execute = 0; let mut data_streams = Vec::new(); let mut latest_queried_block = cx.latest_synced_block(); @@ -43,82 +45,36 @@ where ); loop { - let next_block_ranges = next_block_ranges(cx, latest_queried_block, latest_block); + let block_ranges = next_block_ranges( + &cx.manifest.data_sources, + cx.max_block_range, + latest_queried_block, + latest_block, + ); - if next_block_ranges.is_empty() { + if block_ranges.is_empty() { if data_streams.is_empty() { warn!(logger, "There are no unprocessed block ranges"); } break; } - let mut query_streams = Vec::with_capacity(total_queries); - let mut query_streams_table_ptr = Vec::with_capacity(total_queries); - let mut min_start_block = BlockNumber::MAX; - - for (i, data_source) in cx.manifest.data_sources.iter().enumerate() { - let Some(block_range) = next_block_ranges.get(&i) else { - continue; - }; - - if *block_range.start() < min_start_block { - min_start_block = *block_range.start(); - } - - if *block_range.end() > max_end_block { - max_end_block = *block_range.end(); - } - - for (j, table) in data_source.transformer.tables.iter().enumerate() { - let query = table.query.build_with_block_range(block_range); - let stream = cx.client.query(&cx.logger, query, None); - let stream_name = format!("{}.{}", data_source.name, table.name); - - query_streams.push((stream_name, stream)); - query_streams_table_ptr.push((i, j)); - } - } + let min_start_block = block_ranges.values().map(|r| *r.start()).min().unwrap(); + max_end_block = + max_end_block.max(block_ranges.values().map(|r| *r.end()).max().unwrap()); - let query_streams_table_ptr: Arc<[TablePtr]> = query_streams_table_ptr.into(); + let (query_streams, table_ptrs) = + build_query_streams(&*cx.client, &logger, &cx.manifest.data_sources, &block_ranges); total_queries_to_execute += query_streams.len(); - let mut min_start_block_checked = false; - let mut load_first_record_batch_group_section = Some( - cx.metrics - .stopwatch - .start_section("load_first_record_batch_group"), - ); - - data_streams.push( - StreamAggregator::new(&cx.logger, query_streams, cx.max_buffer_size) - .map_ok(move |response| (response, query_streams_table_ptr.cheap_clone())) - .map_err(Error::from) - .map(move |result| { - if load_first_record_batch_group_section.is_some() { - let _section = load_first_record_batch_group_section.take(); - } - - match result { - Ok(response) => { - if !min_start_block_checked { - if let Some(((first_block, _), _)) = response.0.first_key_value() { - if *first_block < min_start_block { - return Err(Error::NonDeterministic(anyhow!( - "chain reorg" - ))); - } - } - - min_start_block_checked = true; - } - - Ok(response) - } - Err(e) => Err(e), - } - }) - .boxed(), - ); + data_streams.push(build_data_stream( + &logger, + query_streams, + table_ptrs, + cx.max_buffer_size, + &cx.metrics.stopwatch, + min_start_block, + )); if max_end_block >= latest_block { break; @@ -142,19 +98,101 @@ where merged_data_stream } -fn next_block_ranges( - cx: &Context, +fn build_query_streams( + client: &AC, + logger: &slog::Logger, + data_sources: &[DataSource], + block_ranges: &HashMap>, +) -> ( + Vec<(String, BoxStream<'static, Result>)>, + Arc<[TablePtr]>, +) { + let total_queries: usize = data_sources + .iter() + .map(|ds| ds.transformer.tables.len()) + .sum(); + + let mut query_streams = Vec::with_capacity(total_queries); + let mut table_ptrs = Vec::with_capacity(total_queries); + + for (i, data_source) in data_sources.iter().enumerate() { + let Some(block_range) = block_ranges.get(&i) else { + continue; + }; + + for (j, table) in data_source.transformer.tables.iter().enumerate() { + let query = table.query.build_with_block_range(block_range); + let stream = client.query(logger, query, None); + let stream_name = format!("{}.{}", data_source.name, table.name); + + query_streams.push((stream_name, stream)); + table_ptrs.push((i, j)); + } + } + + (query_streams, table_ptrs.into()) +} + +fn build_data_stream( + logger: &slog::Logger, + query_streams: Vec<(String, BoxStream<'static, Result>)>, + table_ptrs: Arc<[TablePtr]>, + max_buffer_size: usize, + stopwatch: &StopwatchMetrics, + min_start_block: BlockNumber, +) -> BoxStream<'static, Result<(RecordBatchGroups, Arc<[TablePtr]>), Error>> +where + E: std::error::Error + IsDeterministic + Send + Sync + 'static, +{ + let mut min_start_block_checked = false; + let mut load_first_record_batch_group_section = + Some(stopwatch.start_section("load_first_record_batch_group")); + + StreamAggregator::new(logger, query_streams, max_buffer_size) + .map_ok(move |response| (response, table_ptrs.cheap_clone())) + .map_err(Error::from) + .map(move |result| { + if load_first_record_batch_group_section.is_some() { + let _section = load_first_record_batch_group_section.take(); + } + + match result { + Ok(response) => { + if !min_start_block_checked { + if let Some(((first_block, _), _)) = response.0.first_key_value() { + if *first_block < min_start_block { + return Err(Error::NonDeterministic(anyhow!("chain reorg"))); + } + } + + min_start_block_checked = true; + } + + Ok(response) + } + Err(e) => Err(e), + } + }) + .boxed() +} + +fn next_block_ranges( + data_sources: &[DataSource], + max_block_range: usize, latest_queried_block: Option, latest_block: BlockNumber, ) -> HashMap> { - let block_ranges = cx - .manifest - .data_sources + let block_ranges = data_sources .iter() .enumerate() .filter_map(|(i, data_source)| { - next_block_range(cx, data_source, latest_queried_block, latest_block) - .map(|block_range| (i, block_range)) + next_block_range( + max_block_range, + data_source, + latest_queried_block, + latest_block, + ) + .map(|block_range| (i, block_range)) }) .collect::>(); @@ -172,8 +210,8 @@ fn next_block_ranges( .collect() } -fn next_block_range( - cx: &Context, +fn next_block_range( + max_block_range: usize, data_source: &DataSource, latest_queried_block: Option, latest_block: BlockNumber, @@ -190,7 +228,7 @@ fn next_block_range( }; let end_block = [ - start_block.saturating_add(cx.max_block_range as BlockNumber), + start_block.saturating_add(max_block_range as BlockNumber), data_source.source.end_block, latest_block, ] From ccc1835b00166858a6194494686c20f5cfec1b25 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Wed, 25 Feb 2026 11:13:14 -0800 Subject: [PATCH 04/10] core: Lazily produce data streams with stream::unfold Replace the eager loop that builds all StreamAggregator instances upfront and chains them together with stream::unfold + flatten. Each block-range iteration's stream is now produced on-demand only when the previous one is exhausted, avoiding a large upfront chain structure when there are many block ranges to cover. --- core/src/amp_subgraph/runner/data_stream.rs | 111 +++++++++----------- core/src/amp_subgraph/runner/mod.rs | 4 +- 2 files changed, 54 insertions(+), 61 deletions(-) diff --git a/core/src/amp_subgraph/runner/data_stream.rs b/core/src/amp_subgraph/runner/data_stream.rs index a868a701190..322697cf49a 100644 --- a/core/src/amp_subgraph/runner/data_stream.rs +++ b/core/src/amp_subgraph/runner/data_stream.rs @@ -3,7 +3,7 @@ use std::{collections::HashMap, ops::RangeInclusive, sync::Arc}; use alloy::primitives::BlockNumber; use anyhow::anyhow; use futures::{ - stream::{empty, BoxStream}, + stream::{self, BoxStream}, StreamExt, TryStreamExt, }; use graph::{ @@ -28,74 +28,67 @@ pub(super) fn new_data_stream( latest_block: BlockNumber, ) -> BoxStream<'static, Result<(RecordBatchGroups, Arc<[TablePtr]>), Error>> where - AC: Client, + AC: Client + Send + Sync + 'static, { let logger = cx.logger.new(slog::o!("process" => "new_data_stream")); - - let mut total_queries_to_execute = 0; - let mut data_streams = Vec::new(); - let mut latest_queried_block = cx.latest_synced_block(); - let mut max_end_block = BlockNumber::MIN; + let client = cx.client.cheap_clone(); + let manifest = cx.manifest.clone(); + let max_buffer_size = cx.max_buffer_size; + let max_block_range = cx.max_block_range; + let stopwatch = cx.metrics.stopwatch.cheap_clone(); debug!(logger, "Creating data stream"; - "from_block" => latest_queried_block.unwrap_or(BlockNumber::MIN), + "from_block" => cx.latest_synced_block().unwrap_or(BlockNumber::MIN), "to_block" => latest_block, - "min_start_block" => cx.min_start_block(), - "max_block_range" => cx.max_block_range, + "start_block" => cx.min_start_block(), + "max_block_range" => max_block_range, ); - loop { - let block_ranges = next_block_ranges( - &cx.manifest.data_sources, - cx.max_block_range, - latest_queried_block, - latest_block, - ); - - if block_ranges.is_empty() { - if data_streams.is_empty() { - warn!(logger, "There are no unprocessed block ranges"); - } - break; - } - - let min_start_block = block_ranges.values().map(|r| *r.start()).min().unwrap(); - max_end_block = - max_end_block.max(block_ranges.values().map(|r| *r.end()).max().unwrap()); - - let (query_streams, table_ptrs) = - build_query_streams(&*cx.client, &logger, &cx.manifest.data_sources, &block_ranges); - total_queries_to_execute += query_streams.len(); - - data_streams.push(build_data_stream( - &logger, - query_streams, - table_ptrs, - cx.max_buffer_size, - &cx.metrics.stopwatch, - min_start_block, - )); - - if max_end_block >= latest_block { - break; - } - - latest_queried_block = Some(max_end_block); - } + // State: (latest_queried_block, max_end_block, is_first) + let initial_state = (cx.latest_synced_block(), BlockNumber::MIN, true); - debug!(logger, "Created aggregated data streams"; - "total_data_streams" => data_streams.len(), - "total_queries_to_execute" => total_queries_to_execute - ); - - let mut iter = data_streams.into_iter(); - let mut merged_data_stream = iter.next().unwrap_or_else(|| empty().boxed()); + stream::unfold( + initial_state, + move |(latest_queried_block, mut end_block, is_first)| { + let block_ranges = next_block_ranges( + &manifest.data_sources, + max_block_range, + latest_queried_block, + latest_block, + ); - for data_stream in iter { - merged_data_stream = merged_data_stream.chain(data_stream).boxed(); - } + if block_ranges.is_empty() { + if is_first { + warn!(logger, "There are no unprocessed block ranges"); + } + return futures::future::ready(None); + } - merged_data_stream + let start_block = block_ranges.values().map(|r| *r.start()).min().unwrap(); + end_block = end_block.max(block_ranges.values().map(|r| *r.end()).max().unwrap()); + + let (query_streams, table_ptrs) = + build_query_streams(&*client, &logger, &manifest.data_sources, &block_ranges); + + let data_stream = build_data_stream( + &logger, + query_streams, + table_ptrs, + max_buffer_size, + &stopwatch, + start_block, + ); + + debug!(logger, "Created a new data stream"; + "latest_queried_block" => latest_queried_block, + "start_block" => start_block, + "end_block" => end_block, + ); + futures::future::ready(Some((data_stream, (Some(end_block), end_block, false)))) + }, + ) + .flatten() + .boxed() } fn build_query_streams( diff --git a/core/src/amp_subgraph/runner/mod.rs b/core/src/amp_subgraph/runner/mod.rs index adfb88aa2c5..8278f9bd1ee 100644 --- a/core/src/amp_subgraph/runner/mod.rs +++ b/core/src/amp_subgraph/runner/mod.rs @@ -67,7 +67,7 @@ where async fn run_indexing(cx: &mut Context) -> Result<(), Error> where - AC: Client, + AC: Client + Send + Sync + 'static, { cx.metrics.deployment_status.starting(); @@ -152,7 +152,7 @@ where async fn run_indexing_with_retries(cx: &mut Context) -> Result<()> where - AC: Client, + AC: Client + Send + Sync + 'static, { loop { match run_indexing(cx).await { From 6ed2660fcfc06bc1a55687c14d6cf8c2a62a4b5a Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 26 Feb 2026 10:25:12 -0800 Subject: [PATCH 05/10] graph: Fix lost waker bug in StreamAggregator polling When a stream returns Poll::Ready but the aggregator can't produce output yet (empty batch or data buffered without completed groups), the waker is consumed and never re-registered. This causes the aggregator to stop being polled, deadlocking the subgraph. Fix by calling cx.waker().wake_by_ref() before returning Pending whenever any stream returned Ready during the polling cycle. --- graph/src/amp/stream_aggregator/mod.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/graph/src/amp/stream_aggregator/mod.rs b/graph/src/amp/stream_aggregator/mod.rs index 1ed94bd2177..099d443a8e8 100644 --- a/graph/src/amp/stream_aggregator/mod.rs +++ b/graph/src/amp/stream_aggregator/mod.rs @@ -115,6 +115,7 @@ impl StreamAggregator { cx: &mut task::Context<'_>, ) -> Poll>> { let mut made_progress = false; + let mut needs_repoll = false; for (stream_index, (stream_name, stream)) in self.named_streams.iter_mut().enumerate() { let logger = self.logger.new(slog::o!( @@ -152,6 +153,7 @@ impl StreamAggregator { match buffer_result { Ok(()) => { made_progress = true; + needs_repoll = true; debug!(logger, "Buffered record batch"; "buffer_size" => self.buffer.size(stream_index), @@ -167,6 +169,7 @@ impl StreamAggregator { } Poll::Ready(Some(Ok(_empty_record_batch))) => { debug!(logger, "Received an empty record batch"); + needs_repoll = true; } Poll::Ready(Some(Err(e))) => { self.is_failed = true; @@ -209,6 +212,16 @@ impl StreamAggregator { return Poll::Ready(None); } + // When any stream returned `Poll::Ready` but we couldn't produce + // output (e.g. empty batch, or data buffered but no completed + // groups yet), the waker was consumed by that stream's poll call + // and won't be re-registered until we poll it again. Schedule an + // immediate re-poll so those streams get polled again and their + // wakers are properly re-registered. + if needs_repoll { + cx.waker().wake_by_ref(); + } + Poll::Pending } } From 86190360e1273e87f41d96f9cc78a01d51ac077e Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 26 Feb 2026 10:39:36 -0800 Subject: [PATCH 06/10] core, graph: Rename GRAPH_AMP_MAX_BUFFER_SIZE to GRAPH_AMP_BUFFER_SIZE --- core/src/amp_subgraph/runner/context.rs | 4 ++-- core/src/amp_subgraph/runner/data_stream.rs | 8 ++++---- docs/amp-powered-subgraphs.md | 2 +- graph/src/amp/stream_aggregator/mod.rs | 4 ++-- graph/src/amp/stream_aggregator/record_batch/buffer.rs | 10 +++++----- graph/src/env/amp.rs | 10 +++++----- graph/src/env/mod.rs | 4 ++-- 7 files changed, 21 insertions(+), 21 deletions(-) diff --git a/core/src/amp_subgraph/runner/context.rs b/core/src/amp_subgraph/runner/context.rs index e559c009726..13afa82c588 100644 --- a/core/src/amp_subgraph/runner/context.rs +++ b/core/src/amp_subgraph/runner/context.rs @@ -18,7 +18,7 @@ pub(in super::super) struct Context { pub(super) logger: Logger, pub(super) client: Arc, pub(super) store: Arc, - pub(super) max_buffer_size: usize, + pub(super) buffer_size: usize, pub(super) max_block_range: usize, pub(super) backoff: ExponentialBackoff, pub(super) deployment: DeploymentHash, @@ -45,7 +45,7 @@ impl Context { logger, client, store, - max_buffer_size: env.max_buffer_size, + buffer_size: env.buffer_size, max_block_range: env.max_block_range, backoff, deployment, diff --git a/core/src/amp_subgraph/runner/data_stream.rs b/core/src/amp_subgraph/runner/data_stream.rs index 322697cf49a..5e50b595dfc 100644 --- a/core/src/amp_subgraph/runner/data_stream.rs +++ b/core/src/amp_subgraph/runner/data_stream.rs @@ -33,7 +33,7 @@ where let logger = cx.logger.new(slog::o!("process" => "new_data_stream")); let client = cx.client.cheap_clone(); let manifest = cx.manifest.clone(); - let max_buffer_size = cx.max_buffer_size; + let buffer_size = cx.buffer_size; let max_block_range = cx.max_block_range; let stopwatch = cx.metrics.stopwatch.cheap_clone(); @@ -74,7 +74,7 @@ where &logger, query_streams, table_ptrs, - max_buffer_size, + buffer_size, &stopwatch, start_block, ); @@ -130,7 +130,7 @@ fn build_data_stream( logger: &slog::Logger, query_streams: Vec<(String, BoxStream<'static, Result>)>, table_ptrs: Arc<[TablePtr]>, - max_buffer_size: usize, + buffer_size: usize, stopwatch: &StopwatchMetrics, min_start_block: BlockNumber, ) -> BoxStream<'static, Result<(RecordBatchGroups, Arc<[TablePtr]>), Error>> @@ -141,7 +141,7 @@ where let mut load_first_record_batch_group_section = Some(stopwatch.start_section("load_first_record_batch_group")); - StreamAggregator::new(logger, query_streams, max_buffer_size) + StreamAggregator::new(logger, query_streams, buffer_size) .map_ok(move |response| (response, table_ptrs.cheap_clone())) .map_err(Error::from) .map(move |result| { diff --git a/docs/amp-powered-subgraphs.md b/docs/amp-powered-subgraphs.md index a93335b835d..666adaf8f17 100644 --- a/docs/amp-powered-subgraphs.md +++ b/docs/amp-powered-subgraphs.md @@ -563,7 +563,7 @@ Amp-powered subgraphs feature introduces the following new ENV variables: - `GRAPH_AMP_FLIGHT_SERVICE_ADDRESS` – The address of the Amp Flight gRPC service. _Defaults to `None`, which disables support for Amp-powered subgraphs._ - `GRAPH_AMP_FLIGHT_SERVICE_TOKEN` – Token used to authenticate Amp Flight gRPC service requests. _Defaults to `None`, which disables authentication._ -- `GRAPH_AMP_MAX_BUFFER_SIZE` – Maximum number of response batches to buffer in memory per stream for each SQL query. _Defaults to `1,000`._ +- `GRAPH_AMP_BUFFER_SIZE` – Maximum number of response batches to buffer in memory per stream for each SQL query. _Defaults to `1,000`._ - `GRAPH_AMP_MAX_BLOCK_RANGE` – Maximum number of blocks to request per stream for each SQL query. _Defaults to `2,000,000`._ - `GRAPH_AMP_QUERY_RETRY_MIN_DELAY_SECONDS` – Minimum time to wait before retrying a failed SQL query to the Amp server. _Defaults to `1` second._ - `GRAPH_AMP_QUERY_RETRY_MAX_DELAY_SECONDS` – Maximum time to wait before retrying a failed SQL query to the Amp server. _Defaults to `600` seconds._ diff --git a/graph/src/amp/stream_aggregator/mod.rs b/graph/src/amp/stream_aggregator/mod.rs index 099d443a8e8..990d721ec8f 100644 --- a/graph/src/amp/stream_aggregator/mod.rs +++ b/graph/src/amp/stream_aggregator/mod.rs @@ -60,7 +60,7 @@ impl StreamAggregator { pub fn new( logger: &Logger, named_streams: impl IntoIterator>)>, - max_buffer_size: usize, + buffer_size: usize, ) -> Self where E: std::error::Error + IsDeterministic + Send + Sync + 'static, @@ -103,7 +103,7 @@ impl StreamAggregator { Self { named_streams, - buffer: Buffer::new(num_streams, max_buffer_size), + buffer: Buffer::new(num_streams, buffer_size), logger, is_finalized: false, is_failed: false, diff --git a/graph/src/amp/stream_aggregator/record_batch/buffer.rs b/graph/src/amp/stream_aggregator/record_batch/buffer.rs index 4b45680636c..cc59390f3b5 100644 --- a/graph/src/amp/stream_aggregator/record_batch/buffer.rs +++ b/graph/src/amp/stream_aggregator/record_batch/buffer.rs @@ -11,21 +11,21 @@ use super::{Aggregator, RecordBatchGroup, RecordBatchGroups, StreamRecordBatch}; pub(in super::super) struct Buffer { aggregators: Vec, num_streams: usize, - max_buffer_size: usize, + buffer_size: usize, } impl Buffer { /// Creates a new buffer that can handle exactly `num_streams` number of streams. /// /// Creates a new associated `Aggregator` for each stream. - /// The `max_buffer_size` specifies how many record batches for each stream can be buffered at most. - pub(in super::super) fn new(num_streams: usize, max_buffer_size: usize) -> Self { + /// The `buffer_size` specifies how many record batches for each stream can be buffered at most. + pub(in super::super) fn new(num_streams: usize, buffer_size: usize) -> Self { let aggregators = (0..num_streams).map(|_| Aggregator::new()).collect(); Self { aggregators, num_streams, - max_buffer_size, + buffer_size, } } @@ -130,7 +130,7 @@ impl Buffer { /// Panics if the `stream_index` is greater than the initialized number of streams. pub(in super::super) fn has_capacity(&self, stream_index: usize) -> bool { assert!(stream_index < self.num_streams); - self.aggregators[stream_index].len() < self.max_buffer_size + self.aggregators[stream_index].len() < self.buffer_size } /// Returns `true` if the stream `stream_index` is not allowed to make progress and diff --git a/graph/src/env/amp.rs b/graph/src/env/amp.rs index a6a02b194c3..91aab4ee8d8 100644 --- a/graph/src/env/amp.rs +++ b/graph/src/env/amp.rs @@ -7,7 +7,7 @@ pub struct AmpEnv { /// This is the maximum number of record batches that can be output by a single block. /// /// Defaults to `1,000`. - pub max_buffer_size: usize, + pub buffer_size: usize, /// Maximum number of blocks to request per stream for each SQL query. /// Limiting this value reduces load on the Amp server when processing heavy queries. @@ -32,22 +32,22 @@ pub struct AmpEnv { } impl AmpEnv { - const DEFAULT_MAX_BUFFER_SIZE: usize = 1_000; + const DEFAULT_BUFFER_SIZE: usize = 1_000; const DEFAULT_MAX_BLOCK_RANGE: usize = 2_000_000; const DEFAULT_QUERY_RETRY_MIN_DELAY: Duration = Duration::from_secs(1); const DEFAULT_QUERY_RETRY_MAX_DELAY: Duration = Duration::from_secs(600); pub(super) fn new(raw_env: &super::Inner) -> Self { Self { - max_buffer_size: raw_env - .amp_max_buffer_size + buffer_size: raw_env + .amp_buffer_size .and_then(|value| { if value == 0 { return None; } Some(value) }) - .unwrap_or(Self::DEFAULT_MAX_BUFFER_SIZE), + .unwrap_or(Self::DEFAULT_BUFFER_SIZE), max_block_range: raw_env .amp_max_block_range .map(|mut value| { diff --git a/graph/src/env/mod.rs b/graph/src/env/mod.rs index 23a6eaff579..e2d0543ce94 100644 --- a/graph/src/env/mod.rs +++ b/graph/src/env/mod.rs @@ -608,8 +608,8 @@ struct Inner { )] disable_deployment_hash_validation: EnvVarBoolean, - #[envconfig(from = "GRAPH_AMP_MAX_BUFFER_SIZE")] - amp_max_buffer_size: Option, + #[envconfig(from = "GRAPH_AMP_BUFFER_SIZE")] + amp_buffer_size: Option, #[envconfig(from = "GRAPH_AMP_MAX_BLOCK_RANGE")] amp_max_block_range: Option, #[envconfig(from = "GRAPH_AMP_QUERY_RETRY_MIN_DELAY_SECONDS")] From 9b4d918e0c8db0b8d668b8f54f41109242b74e19 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 26 Feb 2026 10:43:35 -0800 Subject: [PATCH 07/10] core: Rename methods to get start and end block for amp subgraphs --- core/src/amp_subgraph/runner/context.rs | 4 ++-- core/src/amp_subgraph/runner/data_stream.rs | 14 +++++++------- core/src/amp_subgraph/runner/mod.rs | 6 +++--- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/core/src/amp_subgraph/runner/context.rs b/core/src/amp_subgraph/runner/context.rs index 13afa82c588..cf7a1770192 100644 --- a/core/src/amp_subgraph/runner/context.rs +++ b/core/src/amp_subgraph/runner/context.rs @@ -77,7 +77,7 @@ impl Context { .map(|block_ptr| (block_ptr.number.compat(), block_ptr.hash.compat())) } - pub(super) fn min_start_block(&self) -> BlockNumber { + pub(super) fn start_block(&self) -> BlockNumber { self.manifest .data_sources .iter() @@ -86,7 +86,7 @@ impl Context { .unwrap() } - pub(super) fn max_end_block(&self) -> BlockNumber { + pub(super) fn end_block(&self) -> BlockNumber { self.manifest .data_sources .iter() diff --git a/core/src/amp_subgraph/runner/data_stream.rs b/core/src/amp_subgraph/runner/data_stream.rs index 5e50b595dfc..6c789eb713d 100644 --- a/core/src/amp_subgraph/runner/data_stream.rs +++ b/core/src/amp_subgraph/runner/data_stream.rs @@ -40,11 +40,11 @@ where debug!(logger, "Creating data stream"; "from_block" => cx.latest_synced_block().unwrap_or(BlockNumber::MIN), "to_block" => latest_block, - "start_block" => cx.min_start_block(), + "start_block" => cx.start_block(), "max_block_range" => max_block_range, ); - // State: (latest_queried_block, max_end_block, is_first) + // State: (latest_queried_block, end_block, is_first) let initial_state = (cx.latest_synced_block(), BlockNumber::MIN, true); stream::unfold( @@ -132,12 +132,12 @@ fn build_data_stream( table_ptrs: Arc<[TablePtr]>, buffer_size: usize, stopwatch: &StopwatchMetrics, - min_start_block: BlockNumber, + start_block: BlockNumber, ) -> BoxStream<'static, Result<(RecordBatchGroups, Arc<[TablePtr]>), Error>> where E: std::error::Error + IsDeterministic + Send + Sync + 'static, { - let mut min_start_block_checked = false; + let mut start_block_checked = false; let mut load_first_record_batch_group_section = Some(stopwatch.start_section("load_first_record_batch_group")); @@ -151,14 +151,14 @@ where match result { Ok(response) => { - if !min_start_block_checked { + if !start_block_checked { if let Some(((first_block, _), _)) = response.0.first_key_value() { - if *first_block < min_start_block { + if *first_block < start_block { return Err(Error::NonDeterministic(anyhow!("chain reorg"))); } } - min_start_block_checked = true; + start_block_checked = true; } Ok(response) diff --git a/core/src/amp_subgraph/runner/mod.rs b/core/src/amp_subgraph/runner/mod.rs index 8278f9bd1ee..64b10c01f9f 100644 --- a/core/src/amp_subgraph/runner/mod.rs +++ b/core/src/amp_subgraph/runner/mod.rs @@ -101,7 +101,7 @@ where cx.metrics .deployment_target - .update(latest_block.min(cx.max_end_block())); + .update(latest_block.min(cx.end_block())); let mut deployment_is_failed = cx.store.health().await?.is_failed(); let mut entity_cache = EntityCache::new(cx.store.cheap_clone()); @@ -131,12 +131,12 @@ where // source's endBlock. This handles the case where endBlock has no entity // data — the persisted block pointer never advances to endBlock, but the // server's latest block confirms all queries have been served. - if latest_block >= cx.max_end_block() { + if latest_block >= cx.end_block() { cx.metrics.deployment_synced.record(true); debug!(cx.logger, "Indexing completed; endBlock reached via server latest block"; "latest_block" => latest_block, - "max_end_block" => cx.max_end_block() + "end_block" => cx.end_block() ); return Ok(()); } From e4dfe524f8f4cebc5516b129ae0417823c95b994 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 26 Feb 2026 10:48:02 -0800 Subject: [PATCH 08/10] core, graph: Rename GRAPH_AMP_MAX_BLOCK_RANGE, lower default Rename GRAPH_AMP_MAX_BLOCK_RANGE to just GRAPH_AMP_BLOCK_RANGE Lower the default to 100k blocks from 2M blocks --- core/src/amp_subgraph/runner/context.rs | 4 ++-- core/src/amp_subgraph/runner/data_stream.rs | 21 ++++++++------------- docs/amp-powered-subgraphs.md | 2 +- graph/src/env/amp.rs | 12 ++++++------ graph/src/env/mod.rs | 4 ++-- 5 files changed, 19 insertions(+), 24 deletions(-) diff --git a/core/src/amp_subgraph/runner/context.rs b/core/src/amp_subgraph/runner/context.rs index cf7a1770192..8ea0bb9e1f1 100644 --- a/core/src/amp_subgraph/runner/context.rs +++ b/core/src/amp_subgraph/runner/context.rs @@ -19,7 +19,7 @@ pub(in super::super) struct Context { pub(super) client: Arc, pub(super) store: Arc, pub(super) buffer_size: usize, - pub(super) max_block_range: usize, + pub(super) block_range: usize, pub(super) backoff: ExponentialBackoff, pub(super) deployment: DeploymentHash, pub(super) manifest: Manifest, @@ -46,7 +46,7 @@ impl Context { client, store, buffer_size: env.buffer_size, - max_block_range: env.max_block_range, + block_range: env.block_range, backoff, deployment, manifest, diff --git a/core/src/amp_subgraph/runner/data_stream.rs b/core/src/amp_subgraph/runner/data_stream.rs index 6c789eb713d..1d7d9a11c4e 100644 --- a/core/src/amp_subgraph/runner/data_stream.rs +++ b/core/src/amp_subgraph/runner/data_stream.rs @@ -34,14 +34,14 @@ where let client = cx.client.cheap_clone(); let manifest = cx.manifest.clone(); let buffer_size = cx.buffer_size; - let max_block_range = cx.max_block_range; + let block_range = cx.block_range; let stopwatch = cx.metrics.stopwatch.cheap_clone(); debug!(logger, "Creating data stream"; "from_block" => cx.latest_synced_block().unwrap_or(BlockNumber::MIN), "to_block" => latest_block, "start_block" => cx.start_block(), - "max_block_range" => max_block_range, + "block_range" => block_range, ); // State: (latest_queried_block, end_block, is_first) @@ -52,7 +52,7 @@ where move |(latest_queried_block, mut end_block, is_first)| { let block_ranges = next_block_ranges( &manifest.data_sources, - max_block_range, + block_range, latest_queried_block, latest_block, ); @@ -171,7 +171,7 @@ where fn next_block_ranges( data_sources: &[DataSource], - max_block_range: usize, + block_range: usize, latest_queried_block: Option, latest_block: BlockNumber, ) -> HashMap> { @@ -179,13 +179,8 @@ fn next_block_ranges( .iter() .enumerate() .filter_map(|(i, data_source)| { - next_block_range( - max_block_range, - data_source, - latest_queried_block, - latest_block, - ) - .map(|block_range| (i, block_range)) + next_block_range(block_range, data_source, latest_queried_block, latest_block) + .map(|block_range| (i, block_range)) }) .collect::>(); @@ -204,7 +199,7 @@ fn next_block_ranges( } fn next_block_range( - max_block_range: usize, + block_range: usize, data_source: &DataSource, latest_queried_block: Option, latest_block: BlockNumber, @@ -221,7 +216,7 @@ fn next_block_range( }; let end_block = [ - start_block.saturating_add(max_block_range as BlockNumber), + start_block.saturating_add(block_range as BlockNumber), data_source.source.end_block, latest_block, ] diff --git a/docs/amp-powered-subgraphs.md b/docs/amp-powered-subgraphs.md index 666adaf8f17..3848c725f7d 100644 --- a/docs/amp-powered-subgraphs.md +++ b/docs/amp-powered-subgraphs.md @@ -564,7 +564,7 @@ Amp-powered subgraphs feature introduces the following new ENV variables: - `GRAPH_AMP_FLIGHT_SERVICE_ADDRESS` – The address of the Amp Flight gRPC service. _Defaults to `None`, which disables support for Amp-powered subgraphs._ - `GRAPH_AMP_FLIGHT_SERVICE_TOKEN` – Token used to authenticate Amp Flight gRPC service requests. _Defaults to `None`, which disables authentication._ - `GRAPH_AMP_BUFFER_SIZE` – Maximum number of response batches to buffer in memory per stream for each SQL query. _Defaults to `1,000`._ -- `GRAPH_AMP_MAX_BLOCK_RANGE` – Maximum number of blocks to request per stream for each SQL query. _Defaults to `2,000,000`._ +- `GRAPH_AMP_BLOCK_RANGE` – Maximum number of blocks to request per stream for each SQL query. _Defaults to `100,000`._ - `GRAPH_AMP_QUERY_RETRY_MIN_DELAY_SECONDS` – Minimum time to wait before retrying a failed SQL query to the Amp server. _Defaults to `1` second._ - `GRAPH_AMP_QUERY_RETRY_MAX_DELAY_SECONDS` – Maximum time to wait before retrying a failed SQL query to the Amp server. _Defaults to `600` seconds._ diff --git a/graph/src/env/amp.rs b/graph/src/env/amp.rs index 91aab4ee8d8..93cf5868f2b 100644 --- a/graph/src/env/amp.rs +++ b/graph/src/env/amp.rs @@ -12,8 +12,8 @@ pub struct AmpEnv { /// Maximum number of blocks to request per stream for each SQL query. /// Limiting this value reduces load on the Amp server when processing heavy queries. /// - /// Defaults to `2,000,000`. - pub max_block_range: usize, + /// Defaults to `100,000`. + pub block_range: usize, /// Minimum time to wait before retrying a failed SQL query to the Amp server. /// @@ -33,7 +33,7 @@ pub struct AmpEnv { impl AmpEnv { const DEFAULT_BUFFER_SIZE: usize = 1_000; - const DEFAULT_MAX_BLOCK_RANGE: usize = 2_000_000; + const DEFAULT_BLOCK_RANGE: usize = 100_000; const DEFAULT_QUERY_RETRY_MIN_DELAY: Duration = Duration::from_secs(1); const DEFAULT_QUERY_RETRY_MAX_DELAY: Duration = Duration::from_secs(600); @@ -48,15 +48,15 @@ impl AmpEnv { Some(value) }) .unwrap_or(Self::DEFAULT_BUFFER_SIZE), - max_block_range: raw_env - .amp_max_block_range + block_range: raw_env + .amp_block_range .map(|mut value| { if value == 0 { value = usize::MAX; } value }) - .unwrap_or(Self::DEFAULT_MAX_BLOCK_RANGE), + .unwrap_or(Self::DEFAULT_BLOCK_RANGE), query_retry_min_delay: raw_env .amp_query_retry_min_delay_seconds .map(Duration::from_secs) diff --git a/graph/src/env/mod.rs b/graph/src/env/mod.rs index e2d0543ce94..536a383e0da 100644 --- a/graph/src/env/mod.rs +++ b/graph/src/env/mod.rs @@ -610,8 +610,8 @@ struct Inner { #[envconfig(from = "GRAPH_AMP_BUFFER_SIZE")] amp_buffer_size: Option, - #[envconfig(from = "GRAPH_AMP_MAX_BLOCK_RANGE")] - amp_max_block_range: Option, + #[envconfig(from = "GRAPH_AMP_BLOCK_RANGE")] + amp_block_range: Option, #[envconfig(from = "GRAPH_AMP_QUERY_RETRY_MIN_DELAY_SECONDS")] amp_query_retry_min_delay_seconds: Option, #[envconfig(from = "GRAPH_AMP_QUERY_RETRY_MAX_DELAY_SECONDS")] From 8fa65fd07b6fec961b6bea51eca1b14fb8bcf230 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 26 Feb 2026 13:03:59 -0800 Subject: [PATCH 09/10] graph: Exclude VID from change detection in merge_remove_null_fields Entity::merge_remove_null_fields iterates via the raw Object iterator which, unlike every other Entity API, does not filter the VID field. Since EntityCache::set stamps a fresh VID on every save, the VID always differs from the stored entity's VID, causing merge_remove_null_fields to report a change even when the user-visible data is identical. This produces a spurious Overwrite modification (and therefore a new row in the database) each time an unchanged entity is re-saved. Skip the VID when computing the changed flag while still merging it into the entity so downstream code (e.g. store insert queries) sees the latest value. --- graph/src/data/store/mod.rs | 37 +++++++++++++++++++++++++++++++++++-- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/graph/src/data/store/mod.rs b/graph/src/data/store/mod.rs index 9bcbf52f08f..5f9dbc52abd 100644 --- a/graph/src/data/store/mod.rs +++ b/graph/src/data/store/mod.rs @@ -1017,16 +1017,20 @@ impl Entity { pub fn merge_remove_null_fields(&mut self, update: Entity) -> Result { let mut changed = false; for (key, value) in update.0.into_iter() { + // A change in VID, which changes on every save, does not count + // as a change to the entity's data; this avoids spurious + // `Overwrite` modifications. + let is_vid = key.as_str() == VID_FIELD; match value { Value::Null => { - if self.0.remove(&key).is_some() { + if self.0.remove(&key).is_some() && !is_vid { changed = true; } } _ => { let different = self.0.get(key.as_str()).is_none_or(|old| *old != value); self.0.insert(&key, value)?; - if different { + if different && !is_vid { changed = true; } } @@ -1388,3 +1392,32 @@ fn entity_hidden_vid() { _ = entity2.set_vid(7i64); assert_eq!(entity2.vid(), 7i64); } + +#[test] +fn merge_remove_null_fields_ignores_vid() { + use crate::schema::InputSchema; + let schema = InputSchema::raw("type Thing @entity {id: ID!, name: String!}", "test"); + + let mut current = entity! { schema => id: "1", name: "alice", vid: 3i64 }; + let update = entity! { schema => id: "1", name: "alice", vid: 99i64 }; + + // Merging an identical entity with a different VID must report no change. + let changed = current.merge_remove_null_fields(update).unwrap(); + assert!(!changed, "VID-only difference must not count as a change"); + // The VID must still be updated to the new value. + assert_eq!(current.vid(), 99i64); +} + +#[test] +fn merge_remove_null_fields_detects_real_change() { + use crate::schema::InputSchema; + let schema = InputSchema::raw("type Thing @entity {id: ID!, name: String!}", "test"); + + let mut current = entity! { schema => id: "1", name: "alice", vid: 3i64 }; + let update = entity! { schema => id: "1", name: "bob", vid: 99i64 }; + + let changed = current.merge_remove_null_fields(update).unwrap(); + assert!(changed, "data change must be detected"); + assert_eq!(current.get("name"), Some(&Value::String("bob".to_string()))); + assert_eq!(current.vid(), 99i64); +} From a82072086038b610bf278c377fa8d64ca15a6edd Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 26 Feb 2026 16:46:39 -0800 Subject: [PATCH 10/10] all: Measure loading time in EntityCache.as_modifications This adds a new section "as_modifications_load" that shows us how much time we spend reading from the database --- .../amp_subgraph/runner/data_processing.rs | 2 +- core/src/subgraph/runner/mod.rs | 4 +- graph/src/components/store/entity_cache.rs | 13 ++++-- runtime/test/src/test.rs | 41 ++++++++++--------- runtime/test/src/test/abi.rs | 2 +- store/test-store/src/store.rs | 22 ++++++---- store/test-store/tests/graph/entity_cache.rs | 18 ++++---- .../test-store/tests/postgres/aggregation.rs | 5 ++- 8 files changed, 62 insertions(+), 45 deletions(-) diff --git a/core/src/amp_subgraph/runner/data_processing.rs b/core/src/amp_subgraph/runner/data_processing.rs index 8c403de2b7f..033d4423726 100644 --- a/core/src/amp_subgraph/runner/data_processing.rs +++ b/core/src/amp_subgraph/runner/data_processing.rs @@ -139,7 +139,7 @@ async fn process_record_batch_group( entity_lfu_cache, evict_stats: _, } = entity_cache - .as_modifications(block_number.compat()) + .as_modifications(block_number.compat(), &cx.metrics.stopwatch) .await .map_err(Error::from) .map_err(|e| e.context("failed to extract entity modifications from the state"))?; diff --git a/core/src/subgraph/runner/mod.rs b/core/src/subgraph/runner/mod.rs index aea1ef6aaf1..a925c652e6c 100644 --- a/core/src/subgraph/runner/mod.rs +++ b/core/src/subgraph/runner/mod.rs @@ -686,7 +686,7 @@ where entity_lfu_cache: cache, evict_stats, } = entity_cache - .as_modifications(block_ptr.number) + .as_modifications(block_ptr.number, &self.metrics.host.stopwatch) .await .classify()?; section.end(); @@ -1571,7 +1571,7 @@ where mods.extend( block_state .entity_cache - .as_modifications(block.number()) + .as_modifications(block.number(), &self.metrics.subgraph.stopwatch) .await? .modifications, ); diff --git a/graph/src/components/store/entity_cache.rs b/graph/src/components/store/entity_cache.rs index 7353ad17709..8e6a16bc7a1 100644 --- a/graph/src/components/store/entity_cache.rs +++ b/graph/src/components/store/entity_cache.rs @@ -8,7 +8,7 @@ use crate::cheap_clone::CheapClone; use crate::components::store::write::EntityModification; use crate::components::store::{self as s, Entity, EntityOperation}; use crate::data::store::{EntityValidationError, Id, IdType, IntoEntityIterator}; -use crate::prelude::{CacheWeight, ENV_VARS}; +use crate::prelude::{CacheWeight, StopwatchMetrics, ENV_VARS}; use crate::schema::{EntityKey, InputSchema}; use crate::util::intern::Error as InternError; use crate::util::lfu_cache::{EvictStats, LfuCache}; @@ -474,6 +474,7 @@ impl EntityCache { pub async fn as_modifications( mut self, block: BlockNumber, + stopwatch: &StopwatchMetrics, ) -> Result { assert!(!self.in_handler); @@ -491,10 +492,14 @@ impl EntityCache { // is wrong and the store already has a version of the entity from a // previous block, the attempt to insert will trigger a constraint // violation in the database, ensuring correctness - let missing = missing.filter(|key| !key.entity_type.is_immutable()); + { + let _section = stopwatch.start_section("as_modifications_load"); - for (entity_key, entity) in self.store.get_many(missing.cloned().collect()).await? { - self.current.insert(entity_key, Some(Arc::new(entity))); + let missing = missing.filter(|key| !key.entity_type.is_immutable()); + + for (entity_key, entity) in self.store.get_many(missing.cloned().collect()).await? { + self.current.insert(entity_key, Some(Arc::new(entity))); + } } let mut mods = Vec::new(); diff --git a/runtime/test/src/test.rs b/runtime/test/src/test.rs index c656a2703f1..7eadb653c80 100644 --- a/runtime/test/src/test.rs +++ b/runtime/test/src/test.rs @@ -20,7 +20,7 @@ use graph_runtime_wasm::{ use semver::Version; use std::collections::{BTreeMap, HashMap}; use std::str::FromStr; -use test_store::{LOGGER, STORE}; +use test_store::{LOGGER, STOPWATCH, STORE, SUBGRAPH_STORE}; use wasmtime::{AsContext, AsContextMut}; use crate::common::{mock_context, mock_data_source}; @@ -44,20 +44,20 @@ fn subgraph_id_with_api_version(subgraph_id: &str, api_version: Version) -> Stri ) } -async fn test_valid_module_and_store( +async fn test_module_and_deployment( subgraph_id: &str, data_source: DataSource, api_version: Version, -) -> (WasmInstance, Arc, DeploymentLocator) { - test_valid_module_and_store_with_timeout(subgraph_id, data_source, api_version, None).await +) -> (WasmInstance, DeploymentLocator) { + test_module_and_deployment_with_timeout(subgraph_id, data_source, api_version, None).await } -async fn test_valid_module_and_store_with_timeout( +async fn test_module_and_deployment_with_timeout( subgraph_id: &str, data_source: DataSource, api_version: Version, timeout: Option, -) -> (WasmInstance, Arc, DeploymentLocator) { +) -> (WasmInstance, DeploymentLocator) { let logger = Logger::root(slog::Discard, o!()); let subgraph_id_with_api_version = subgraph_id_with_api_version(subgraph_id, api_version.clone()); @@ -80,7 +80,7 @@ async fn test_valid_module_and_store_with_timeout( }", ) .await; - let stopwatch_metrics = StopwatchMetrics::new( + let stopwatch = StopwatchMetrics::new( logger.clone(), deployment_id.clone(), "test", @@ -93,7 +93,7 @@ async fn test_valid_module_and_store_with_timeout( let host_metrics = Arc::new(HostMetrics::new( metrics_registry, deployment_id.as_str(), - stopwatch_metrics, + stopwatch.cheap_clone(), gas_metrics, )); @@ -115,7 +115,7 @@ async fn test_valid_module_and_store_with_timeout( .await .unwrap(); - (module, store.subgraph_store(), deployment) + (module, deployment) } pub async fn test_module( @@ -123,7 +123,7 @@ pub async fn test_module( data_source: DataSource, api_version: Version, ) -> WasmInstance { - test_valid_module_and_store(subgraph_id, data_source, api_version) + test_module_and_deployment(subgraph_id, data_source, api_version) .await .0 } @@ -135,9 +135,7 @@ pub async fn test_module_latest(subgraph_id: &str, wasm_file: &str) -> WasmInsta &wasm_file_path(wasm_file, API_VERSION_0_0_5), version.clone(), ); - test_valid_module_and_store(subgraph_id, ds, version) - .await - .0 + test_module_and_deployment(subgraph_id, ds, version).await.0 } pub trait SyncWasmTy: wasmtime::WasmTy + Sync {} @@ -529,7 +527,7 @@ async fn run_ipfs_map( .to_owned() }; - let (mut instance, _, _) = test_valid_module_and_store( + let mut instance = test_module( subgraph_id, mock_data_source( &wasm_file_path("ipfs_map.wasm", api_version.clone()), @@ -557,7 +555,7 @@ async fn run_ipfs_map( .take_ctx() .take_state() .entity_cache - .as_modifications(0) + .as_modifications(0, &STOPWATCH) .await? .modifications; @@ -1008,7 +1006,8 @@ async fn ens_name_by_hash_v0_0_5() { } async fn test_entity_store(api_version: Version) { - let (mut instance, store, deployment) = test_valid_module_and_store( + let store = SUBGRAPH_STORE.clone(); + let (mut instance, deployment) = test_module_and_deployment( "entityStore", mock_data_source( &wasm_file_path("store.wasm", api_version.clone()), @@ -1073,7 +1072,11 @@ async fn test_entity_store(api_version: Version) { &mut ctx.ctx.state.entity_cache, EntityCache::new(Arc::new(writable.clone())), ); - let mut mods = cache.as_modifications(0).await.unwrap().modifications; + let mut mods = cache + .as_modifications(0, &STOPWATCH) + .await + .unwrap() + .modifications; assert_eq!(1, mods.len()); match mods.pop().unwrap() { EntityModification::Overwrite { data, .. } => { @@ -1093,7 +1096,7 @@ async fn test_entity_store(api_version: Version) { .take_ctx() .take_state() .entity_cache - .as_modifications(0) + .as_modifications(0, &STOPWATCH) .await .unwrap() .modifications; @@ -1626,7 +1629,7 @@ async fn generate_id() { let entity_cache = host.ctx.state.entity_cache; let mods = entity_cache - .as_modifications(12) + .as_modifications(12, &STOPWATCH) .await .unwrap() .modifications; diff --git a/runtime/test/src/test/abi.rs b/runtime/test/src/test/abi.rs index 8b81b014027..b93ed2d9cfa 100644 --- a/runtime/test/src/test/abi.rs +++ b/runtime/test/src/test/abi.rs @@ -7,7 +7,7 @@ use super::*; async fn test_unbounded_loop(api_version: Version) { // Set handler timeout to 3 seconds. - let mut instance = test_valid_module_and_store_with_timeout( + let mut instance = test_module_and_deployment_with_timeout( "unboundedLoop", mock_data_source( &wasm_file_path("non_terminating.wasm", api_version.clone()), diff --git a/store/test-store/src/store.rs b/store/test-store/src/store.rs index 5f2cc52949b..b65080f3cf4 100644 --- a/store/test-store/src/store.rs +++ b/store/test-store/src/store.rs @@ -68,6 +68,13 @@ lazy_static! { pub static ref NODE_ID: NodeId = NodeId::new("test").unwrap(); pub static ref SUBGRAPH_STORE: Arc = STORE.subgraph_store(); static ref BLOCK_STORE: DieselBlockStore = STORE.block_store(); + pub static ref STOPWATCH: StopwatchMetrics = StopwatchMetrics::new( + Logger::root(slog::Discard, o!()), + DeploymentHash::new("test").unwrap(), + "test", + METRICS_REGISTRY.clone(), + "dummy".to_string(), + ); pub static ref GENESIS_PTR: BlockPtr = ( B256::from(hex!( "bd34884280958002c51d3f7b5f853e6febeba33de0f40d15b0363006533c924f" @@ -362,13 +369,6 @@ pub async fn transact_entities_and_dynamic_data_sources( Arc::new(manifest_idx_and_name), ))?; - let mut entity_cache = EntityCache::new(Arc::new(store.clone())); - entity_cache.append(ops); - let mods = entity_cache - .as_modifications(block_ptr_to.number) - .await - .expect("failed to convert to modifications") - .modifications; let metrics_registry = Arc::new(MetricsRegistry::mock()); let stopwatch_metrics = StopwatchMetrics::new( Logger::root(slog::Discard, o!()), @@ -377,6 +377,14 @@ pub async fn transact_entities_and_dynamic_data_sources( metrics_registry.clone(), store.shard().to_string(), ); + + let mut entity_cache = EntityCache::new(Arc::new(store.clone())); + entity_cache.append(ops); + let mods = entity_cache + .as_modifications(block_ptr_to.number, &stopwatch_metrics) + .await + .expect("failed to convert to modifications") + .modifications; let block_time = BlockTime::for_test(&block_ptr_to); store .transact_block_operations( diff --git a/store/test-store/tests/graph/entity_cache.rs b/store/test-store/tests/graph/entity_cache.rs index 7e10827548b..9b9f2c4c574 100644 --- a/store/test-store/tests/graph/entity_cache.rs +++ b/store/test-store/tests/graph/entity_cache.rs @@ -195,7 +195,7 @@ fn sort_by_entity_key(mut mods: Vec) -> Vec data.vid(), diff --git a/store/test-store/tests/postgres/aggregation.rs b/store/test-store/tests/postgres/aggregation.rs index de2ef0e07a7..7cc300b04d8 100644 --- a/store/test-store/tests/postgres/aggregation.rs +++ b/store/test-store/tests/postgres/aggregation.rs @@ -26,7 +26,8 @@ use graph::{ }; use graph_store_postgres::Store as DieselStore; use test_store::{ - create_test_subgraph, remove_subgraphs, run_test_sequentially, BLOCKS, LOGGER, METRICS_REGISTRY, + create_test_subgraph, remove_subgraphs, run_test_sequentially, BLOCKS, LOGGER, + METRICS_REGISTRY, STOPWATCH, }; const SCHEMA: &str = r#" @@ -110,7 +111,7 @@ pub async fn insert_entities( let mut entity_cache = EntityCache::new(Arc::new(store.clone())); entity_cache.append(ops); let mods = entity_cache - .as_modifications(block_ptr_to.number) + .as_modifications(block_ptr_to.number, &STOPWATCH) .await .expect("failed to convert to modifications") .modifications;