diff --git a/core/src/amp_subgraph/monitor.rs b/core/src/amp_subgraph/monitor.rs index 2bdd32ab504..93cf30252a6 100644 --- a/core/src/amp_subgraph/monitor.rs +++ b/core/src/amp_subgraph/monitor.rs @@ -22,7 +22,11 @@ use graph::{ cheap_clone::CheapClone, components::store::DeploymentLocator, log::factory::LoggerFactory, }; use slog::{debug, error, info, warn, Logger}; -use tokio::{sync::mpsc, task::JoinHandle, time::timeout}; +use tokio::{ + sync::mpsc::{self, error::SendError}, + task::JoinHandle, + time::timeout, +}; use tokio_util::sync::CancellationToken; /// Represents the maximum amount of time a subgraph instance is allowed to run @@ -137,7 +141,7 @@ impl Monitor { .new(slog::o!("method" => "start")); info!(logger, "Starting subgraph"); - handle_send_result( + log_send_error( &logger, self.command_tx.send(Command::Start { id: self.subgraph_instance_id.fetch_add(1, SeqCst), @@ -164,7 +168,7 @@ impl Monitor { .new(slog::o!("method" => "stop")); info!(logger, "Stopping subgraph"); - handle_send_result(&logger, self.command_tx.send(Command::Stop { deployment })); + log_send_error(&logger, self.command_tx.send(Command::Stop { deployment })); } /// Processes commands sent through the command channel. @@ -184,10 +188,6 @@ impl Monitor { loop { tokio::select! { Some(command) = command_rx.recv() => { - debug!(logger, "Processing a new command"; - "command" => ?command - ); - match &command { Command::Start { .. } => { Self::process_start_command( @@ -390,7 +390,7 @@ impl Monitor { if let Some(pending_start_command) = pending_start_commands.remove(&deployment) { debug!(logger, "Resending a pending start command"); - handle_send_result(&logger, command_tx.send(pending_start_command)); + log_send_error(&logger, command_tx.send(pending_start_command)); } } @@ -476,7 +476,7 @@ impl Monitor { } debug!(logger, "Sending clear command"); - handle_send_result(&logger, command_tx.send(Command::Clear { id, deployment })); + log_send_error(&logger, command_tx.send(Command::Clear { id, deployment })); } }); @@ -546,13 +546,10 @@ impl fmt::Debug for Command { } } -fn handle_send_result( - logger: &Logger, - result: Result<(), tokio::sync::mpsc::error::SendError>, -) { +fn log_send_error(logger: &Logger, result: Result<(), SendError>) { match result { Ok(()) => { - debug!(logger, "Command was sent successfully"); + // No need to log anything } // This should only happen if the parent cancel token of the subgraph monitor was cancelled diff --git a/core/src/amp_subgraph/runner/context.rs b/core/src/amp_subgraph/runner/context.rs index 32e96148acf..8ea0bb9e1f1 100644 --- a/core/src/amp_subgraph/runner/context.rs +++ b/core/src/amp_subgraph/runner/context.rs @@ -18,8 +18,8 @@ pub(in super::super) struct Context { pub(super) logger: Logger, pub(super) client: Arc, pub(super) store: Arc, - pub(super) max_buffer_size: usize, - pub(super) max_block_range: usize, + pub(super) buffer_size: usize, + pub(super) block_range: usize, pub(super) backoff: ExponentialBackoff, pub(super) deployment: DeploymentHash, pub(super) manifest: Manifest, @@ -45,8 +45,8 @@ impl Context { logger, client, store, - max_buffer_size: env.max_buffer_size, - max_block_range: env.max_block_range, + buffer_size: env.buffer_size, + block_range: env.block_range, backoff, deployment, manifest, @@ -77,15 +77,7 @@ impl Context { .map(|block_ptr| (block_ptr.number.compat(), block_ptr.hash.compat())) } - pub(super) fn total_queries(&self) -> usize { - self.manifest - .data_sources - .iter() - .map(|data_source| data_source.transformer.tables.len()) - .sum() - } - - pub(super) fn min_start_block(&self) -> BlockNumber { + pub(super) fn start_block(&self) -> BlockNumber { self.manifest .data_sources .iter() @@ -94,7 +86,7 @@ impl Context { .unwrap() } - pub(super) fn max_end_block(&self) -> BlockNumber { + pub(super) fn end_block(&self) -> BlockNumber { self.manifest .data_sources .iter() diff --git a/core/src/amp_subgraph/runner/data_processing.rs b/core/src/amp_subgraph/runner/data_processing.rs index 8c403de2b7f..033d4423726 100644 --- a/core/src/amp_subgraph/runner/data_processing.rs +++ b/core/src/amp_subgraph/runner/data_processing.rs @@ -139,7 +139,7 @@ async fn process_record_batch_group( entity_lfu_cache, evict_stats: _, } = entity_cache - .as_modifications(block_number.compat()) + .as_modifications(block_number.compat(), &cx.metrics.stopwatch) .await .map_err(Error::from) .map_err(|e| e.context("failed to extract entity modifications from the state"))?; diff --git a/core/src/amp_subgraph/runner/data_stream.rs b/core/src/amp_subgraph/runner/data_stream.rs index ad6d6d471f8..1d7d9a11c4e 100644 --- a/core/src/amp_subgraph/runner/data_stream.rs +++ b/core/src/amp_subgraph/runner/data_stream.rs @@ -3,16 +3,19 @@ use std::{collections::HashMap, ops::RangeInclusive, sync::Arc}; use alloy::primitives::BlockNumber; use anyhow::anyhow; use futures::{ - stream::{empty, BoxStream}, + stream::{self, BoxStream}, StreamExt, TryStreamExt, }; use graph::{ amp::{ + client::ResponseBatch, + error::IsDeterministic, manifest::DataSource, stream_aggregator::{RecordBatchGroups, StreamAggregator}, Client, }, cheap_clone::CheapClone, + prelude::StopwatchMetrics, }; use slog::{debug, warn}; @@ -25,135 +28,158 @@ pub(super) fn new_data_stream( latest_block: BlockNumber, ) -> BoxStream<'static, Result<(RecordBatchGroups, Arc<[TablePtr]>), Error>> where - AC: Client, + AC: Client + Send + Sync + 'static, { let logger = cx.logger.new(slog::o!("process" => "new_data_stream")); - - let total_queries = cx.total_queries(); - let mut total_queries_to_execute = 0; - let mut data_streams = Vec::new(); - let mut latest_queried_block = cx.latest_synced_block(); - let mut max_end_block = BlockNumber::MIN; + let client = cx.client.cheap_clone(); + let manifest = cx.manifest.clone(); + let buffer_size = cx.buffer_size; + let block_range = cx.block_range; + let stopwatch = cx.metrics.stopwatch.cheap_clone(); debug!(logger, "Creating data stream"; - "from_block" => latest_queried_block.unwrap_or(BlockNumber::MIN), + "from_block" => cx.latest_synced_block().unwrap_or(BlockNumber::MIN), "to_block" => latest_block, - "min_start_block" => cx.min_start_block(), - "max_block_range" => cx.max_block_range, + "start_block" => cx.start_block(), + "block_range" => block_range, ); - loop { - let next_block_ranges = next_block_ranges(cx, latest_queried_block, latest_block); - - if next_block_ranges.is_empty() { - if data_streams.is_empty() { - warn!(logger, "There are no unprocessed block ranges"); + // State: (latest_queried_block, end_block, is_first) + let initial_state = (cx.latest_synced_block(), BlockNumber::MIN, true); + + stream::unfold( + initial_state, + move |(latest_queried_block, mut end_block, is_first)| { + let block_ranges = next_block_ranges( + &manifest.data_sources, + block_range, + latest_queried_block, + latest_block, + ); + + if block_ranges.is_empty() { + if is_first { + warn!(logger, "There are no unprocessed block ranges"); + } + return futures::future::ready(None); } - break; - } - let mut query_streams = Vec::with_capacity(total_queries); - let mut query_streams_table_ptr = Vec::with_capacity(total_queries); - let mut min_start_block = BlockNumber::MAX; + let start_block = block_ranges.values().map(|r| *r.start()).min().unwrap(); + end_block = end_block.max(block_ranges.values().map(|r| *r.end()).max().unwrap()); + + let (query_streams, table_ptrs) = + build_query_streams(&*client, &logger, &manifest.data_sources, &block_ranges); + + let data_stream = build_data_stream( + &logger, + query_streams, + table_ptrs, + buffer_size, + &stopwatch, + start_block, + ); + + debug!(logger, "Created a new data stream"; + "latest_queried_block" => latest_queried_block, + "start_block" => start_block, + "end_block" => end_block, + ); + futures::future::ready(Some((data_stream, (Some(end_block), end_block, false)))) + }, + ) + .flatten() + .boxed() +} - for (i, data_source) in cx.manifest.data_sources.iter().enumerate() { - let Some(block_range) = next_block_ranges.get(&i) else { - continue; - }; +fn build_query_streams( + client: &AC, + logger: &slog::Logger, + data_sources: &[DataSource], + block_ranges: &HashMap>, +) -> ( + Vec<(String, BoxStream<'static, Result>)>, + Arc<[TablePtr]>, +) { + let total_queries: usize = data_sources + .iter() + .map(|ds| ds.transformer.tables.len()) + .sum(); - if *block_range.start() < min_start_block { - min_start_block = *block_range.start(); - } + let mut query_streams = Vec::with_capacity(total_queries); + let mut table_ptrs = Vec::with_capacity(total_queries); - if *block_range.end() > max_end_block { - max_end_block = *block_range.end(); - } + for (i, data_source) in data_sources.iter().enumerate() { + let Some(block_range) = block_ranges.get(&i) else { + continue; + }; - for (j, table) in data_source.transformer.tables.iter().enumerate() { - let query = table.query.build_with_block_range(block_range); - let stream = cx.client.query(&cx.logger, query, None); - let stream_name = format!("{}.{}", data_source.name, table.name); + for (j, table) in data_source.transformer.tables.iter().enumerate() { + let query = table.query.build_with_block_range(block_range); + let stream = client.query(logger, query, None); + let stream_name = format!("{}.{}", data_source.name, table.name); - query_streams.push((stream_name, stream)); - query_streams_table_ptr.push((i, j)); - } + query_streams.push((stream_name, stream)); + table_ptrs.push((i, j)); } + } - let query_streams_table_ptr: Arc<[TablePtr]> = query_streams_table_ptr.into(); - total_queries_to_execute += query_streams.len(); - - let mut min_start_block_checked = false; - let mut load_first_record_batch_group_section = Some( - cx.metrics - .stopwatch - .start_section("load_first_record_batch_group"), - ); - - data_streams.push( - StreamAggregator::new(&cx.logger, query_streams, cx.max_buffer_size) - .map_ok(move |response| (response, query_streams_table_ptr.cheap_clone())) - .map_err(Error::from) - .map(move |result| { - if load_first_record_batch_group_section.is_some() { - let _section = load_first_record_batch_group_section.take(); - } + (query_streams, table_ptrs.into()) +} - match result { - Ok(response) => { - if !min_start_block_checked { - if let Some(((first_block, _), _)) = response.0.first_key_value() { - if *first_block < min_start_block { - return Err(Error::NonDeterministic(anyhow!( - "chain reorg" - ))); - } - } - - min_start_block_checked = true; - } +fn build_data_stream( + logger: &slog::Logger, + query_streams: Vec<(String, BoxStream<'static, Result>)>, + table_ptrs: Arc<[TablePtr]>, + buffer_size: usize, + stopwatch: &StopwatchMetrics, + start_block: BlockNumber, +) -> BoxStream<'static, Result<(RecordBatchGroups, Arc<[TablePtr]>), Error>> +where + E: std::error::Error + IsDeterministic + Send + Sync + 'static, +{ + let mut start_block_checked = false; + let mut load_first_record_batch_group_section = + Some(stopwatch.start_section("load_first_record_batch_group")); + + StreamAggregator::new(logger, query_streams, buffer_size) + .map_ok(move |response| (response, table_ptrs.cheap_clone())) + .map_err(Error::from) + .map(move |result| { + if load_first_record_batch_group_section.is_some() { + let _section = load_first_record_batch_group_section.take(); + } - Ok(response) + match result { + Ok(response) => { + if !start_block_checked { + if let Some(((first_block, _), _)) = response.0.first_key_value() { + if *first_block < start_block { + return Err(Error::NonDeterministic(anyhow!("chain reorg"))); + } } - Err(e) => Err(e), - } - }) - .boxed(), - ); - - if max_end_block >= latest_block { - break; - } - - latest_queried_block = Some(max_end_block); - } - - debug!(logger, "Created aggregated data streams"; - "total_data_streams" => data_streams.len(), - "total_queries_to_execute" => total_queries_to_execute - ); - let mut iter = data_streams.into_iter(); - let mut merged_data_stream = iter.next().unwrap_or_else(|| empty().boxed()); - - for data_stream in iter { - merged_data_stream = merged_data_stream.chain(data_stream).boxed(); - } + start_block_checked = true; + } - merged_data_stream + Ok(response) + } + Err(e) => Err(e), + } + }) + .boxed() } -fn next_block_ranges( - cx: &Context, +fn next_block_ranges( + data_sources: &[DataSource], + block_range: usize, latest_queried_block: Option, latest_block: BlockNumber, ) -> HashMap> { - let block_ranges = cx - .manifest - .data_sources + let block_ranges = data_sources .iter() .enumerate() .filter_map(|(i, data_source)| { - next_block_range(cx, data_source, latest_queried_block, latest_block) + next_block_range(block_range, data_source, latest_queried_block, latest_block) .map(|block_range| (i, block_range)) }) .collect::>(); @@ -172,8 +198,8 @@ fn next_block_ranges( .collect() } -fn next_block_range( - cx: &Context, +fn next_block_range( + block_range: usize, data_source: &DataSource, latest_queried_block: Option, latest_block: BlockNumber, @@ -190,7 +216,7 @@ fn next_block_range( }; let end_block = [ - start_block.saturating_add(cx.max_block_range as BlockNumber), + start_block.saturating_add(block_range as BlockNumber), data_source.source.end_block, latest_block, ] diff --git a/core/src/amp_subgraph/runner/mod.rs b/core/src/amp_subgraph/runner/mod.rs index adfb88aa2c5..64b10c01f9f 100644 --- a/core/src/amp_subgraph/runner/mod.rs +++ b/core/src/amp_subgraph/runner/mod.rs @@ -67,7 +67,7 @@ where async fn run_indexing(cx: &mut Context) -> Result<(), Error> where - AC: Client, + AC: Client + Send + Sync + 'static, { cx.metrics.deployment_status.starting(); @@ -101,7 +101,7 @@ where cx.metrics .deployment_target - .update(latest_block.min(cx.max_end_block())); + .update(latest_block.min(cx.end_block())); let mut deployment_is_failed = cx.store.health().await?.is_failed(); let mut entity_cache = EntityCache::new(cx.store.cheap_clone()); @@ -131,12 +131,12 @@ where // source's endBlock. This handles the case where endBlock has no entity // data — the persisted block pointer never advances to endBlock, but the // server's latest block confirms all queries have been served. - if latest_block >= cx.max_end_block() { + if latest_block >= cx.end_block() { cx.metrics.deployment_synced.record(true); debug!(cx.logger, "Indexing completed; endBlock reached via server latest block"; "latest_block" => latest_block, - "max_end_block" => cx.max_end_block() + "end_block" => cx.end_block() ); return Ok(()); } @@ -152,7 +152,7 @@ where async fn run_indexing_with_retries(cx: &mut Context) -> Result<()> where - AC: Client, + AC: Client + Send + Sync + 'static, { loop { match run_indexing(cx).await { diff --git a/core/src/subgraph/runner/mod.rs b/core/src/subgraph/runner/mod.rs index aea1ef6aaf1..a925c652e6c 100644 --- a/core/src/subgraph/runner/mod.rs +++ b/core/src/subgraph/runner/mod.rs @@ -686,7 +686,7 @@ where entity_lfu_cache: cache, evict_stats, } = entity_cache - .as_modifications(block_ptr.number) + .as_modifications(block_ptr.number, &self.metrics.host.stopwatch) .await .classify()?; section.end(); @@ -1571,7 +1571,7 @@ where mods.extend( block_state .entity_cache - .as_modifications(block.number()) + .as_modifications(block.number(), &self.metrics.subgraph.stopwatch) .await? .modifications, ); diff --git a/docs/amp-powered-subgraphs.md b/docs/amp-powered-subgraphs.md index a93335b835d..3848c725f7d 100644 --- a/docs/amp-powered-subgraphs.md +++ b/docs/amp-powered-subgraphs.md @@ -563,8 +563,8 @@ Amp-powered subgraphs feature introduces the following new ENV variables: - `GRAPH_AMP_FLIGHT_SERVICE_ADDRESS` – The address of the Amp Flight gRPC service. _Defaults to `None`, which disables support for Amp-powered subgraphs._ - `GRAPH_AMP_FLIGHT_SERVICE_TOKEN` – Token used to authenticate Amp Flight gRPC service requests. _Defaults to `None`, which disables authentication._ -- `GRAPH_AMP_MAX_BUFFER_SIZE` – Maximum number of response batches to buffer in memory per stream for each SQL query. _Defaults to `1,000`._ -- `GRAPH_AMP_MAX_BLOCK_RANGE` – Maximum number of blocks to request per stream for each SQL query. _Defaults to `2,000,000`._ +- `GRAPH_AMP_BUFFER_SIZE` – Maximum number of response batches to buffer in memory per stream for each SQL query. _Defaults to `1,000`._ +- `GRAPH_AMP_BLOCK_RANGE` – Maximum number of blocks to request per stream for each SQL query. _Defaults to `100,000`._ - `GRAPH_AMP_QUERY_RETRY_MIN_DELAY_SECONDS` – Minimum time to wait before retrying a failed SQL query to the Amp server. _Defaults to `1` second._ - `GRAPH_AMP_QUERY_RETRY_MAX_DELAY_SECONDS` – Maximum time to wait before retrying a failed SQL query to the Amp server. _Defaults to `600` seconds._ diff --git a/graph/src/amp/stream_aggregator/mod.rs b/graph/src/amp/stream_aggregator/mod.rs index e2f0892252f..990d721ec8f 100644 --- a/graph/src/amp/stream_aggregator/mod.rs +++ b/graph/src/amp/stream_aggregator/mod.rs @@ -60,7 +60,7 @@ impl StreamAggregator { pub fn new( logger: &Logger, named_streams: impl IntoIterator>)>, - max_buffer_size: usize, + buffer_size: usize, ) -> Self where E: std::error::Error + IsDeterministic + Send + Sync + 'static, @@ -101,14 +101,9 @@ impl StreamAggregator { let num_streams = named_streams.len(); - info!(logger, "Initializing stream aggregator"; - "num_streams" => num_streams, - "max_buffer_size" => max_buffer_size - ); - Self { named_streams, - buffer: Buffer::new(num_streams, max_buffer_size), + buffer: Buffer::new(num_streams, buffer_size), logger, is_finalized: false, is_failed: false, @@ -120,6 +115,7 @@ impl StreamAggregator { cx: &mut task::Context<'_>, ) -> Poll>> { let mut made_progress = false; + let mut needs_repoll = false; for (stream_index, (stream_name, stream)) in self.named_streams.iter_mut().enumerate() { let logger = self.logger.new(slog::o!( @@ -157,6 +153,7 @@ impl StreamAggregator { match buffer_result { Ok(()) => { made_progress = true; + needs_repoll = true; debug!(logger, "Buffered record batch"; "buffer_size" => self.buffer.size(stream_index), @@ -172,6 +169,7 @@ impl StreamAggregator { } Poll::Ready(Some(Ok(_empty_record_batch))) => { debug!(logger, "Received an empty record batch"); + needs_repoll = true; } Poll::Ready(Some(Err(e))) => { self.is_failed = true; @@ -214,6 +212,16 @@ impl StreamAggregator { return Poll::Ready(None); } + // When any stream returned `Poll::Ready` but we couldn't produce + // output (e.g. empty batch, or data buffered but no completed + // groups yet), the waker was consumed by that stream's poll call + // and won't be re-registered until we poll it again. Schedule an + // immediate re-poll so those streams get polled again and their + // wakers are properly re-registered. + if needs_repoll { + cx.waker().wake_by_ref(); + } + Poll::Pending } } diff --git a/graph/src/amp/stream_aggregator/record_batch/buffer.rs b/graph/src/amp/stream_aggregator/record_batch/buffer.rs index 4b45680636c..cc59390f3b5 100644 --- a/graph/src/amp/stream_aggregator/record_batch/buffer.rs +++ b/graph/src/amp/stream_aggregator/record_batch/buffer.rs @@ -11,21 +11,21 @@ use super::{Aggregator, RecordBatchGroup, RecordBatchGroups, StreamRecordBatch}; pub(in super::super) struct Buffer { aggregators: Vec, num_streams: usize, - max_buffer_size: usize, + buffer_size: usize, } impl Buffer { /// Creates a new buffer that can handle exactly `num_streams` number of streams. /// /// Creates a new associated `Aggregator` for each stream. - /// The `max_buffer_size` specifies how many record batches for each stream can be buffered at most. - pub(in super::super) fn new(num_streams: usize, max_buffer_size: usize) -> Self { + /// The `buffer_size` specifies how many record batches for each stream can be buffered at most. + pub(in super::super) fn new(num_streams: usize, buffer_size: usize) -> Self { let aggregators = (0..num_streams).map(|_| Aggregator::new()).collect(); Self { aggregators, num_streams, - max_buffer_size, + buffer_size, } } @@ -130,7 +130,7 @@ impl Buffer { /// Panics if the `stream_index` is greater than the initialized number of streams. pub(in super::super) fn has_capacity(&self, stream_index: usize) -> bool { assert!(stream_index < self.num_streams); - self.aggregators[stream_index].len() < self.max_buffer_size + self.aggregators[stream_index].len() < self.buffer_size } /// Returns `true` if the stream `stream_index` is not allowed to make progress and diff --git a/graph/src/components/store/entity_cache.rs b/graph/src/components/store/entity_cache.rs index 7353ad17709..8e6a16bc7a1 100644 --- a/graph/src/components/store/entity_cache.rs +++ b/graph/src/components/store/entity_cache.rs @@ -8,7 +8,7 @@ use crate::cheap_clone::CheapClone; use crate::components::store::write::EntityModification; use crate::components::store::{self as s, Entity, EntityOperation}; use crate::data::store::{EntityValidationError, Id, IdType, IntoEntityIterator}; -use crate::prelude::{CacheWeight, ENV_VARS}; +use crate::prelude::{CacheWeight, StopwatchMetrics, ENV_VARS}; use crate::schema::{EntityKey, InputSchema}; use crate::util::intern::Error as InternError; use crate::util::lfu_cache::{EvictStats, LfuCache}; @@ -474,6 +474,7 @@ impl EntityCache { pub async fn as_modifications( mut self, block: BlockNumber, + stopwatch: &StopwatchMetrics, ) -> Result { assert!(!self.in_handler); @@ -491,10 +492,14 @@ impl EntityCache { // is wrong and the store already has a version of the entity from a // previous block, the attempt to insert will trigger a constraint // violation in the database, ensuring correctness - let missing = missing.filter(|key| !key.entity_type.is_immutable()); + { + let _section = stopwatch.start_section("as_modifications_load"); - for (entity_key, entity) in self.store.get_many(missing.cloned().collect()).await? { - self.current.insert(entity_key, Some(Arc::new(entity))); + let missing = missing.filter(|key| !key.entity_type.is_immutable()); + + for (entity_key, entity) in self.store.get_many(missing.cloned().collect()).await? { + self.current.insert(entity_key, Some(Arc::new(entity))); + } } let mut mods = Vec::new(); diff --git a/graph/src/data/store/mod.rs b/graph/src/data/store/mod.rs index 9bcbf52f08f..5f9dbc52abd 100644 --- a/graph/src/data/store/mod.rs +++ b/graph/src/data/store/mod.rs @@ -1017,16 +1017,20 @@ impl Entity { pub fn merge_remove_null_fields(&mut self, update: Entity) -> Result { let mut changed = false; for (key, value) in update.0.into_iter() { + // A change in VID, which changes on every save, does not count + // as a change to the entity's data; this avoids spurious + // `Overwrite` modifications. + let is_vid = key.as_str() == VID_FIELD; match value { Value::Null => { - if self.0.remove(&key).is_some() { + if self.0.remove(&key).is_some() && !is_vid { changed = true; } } _ => { let different = self.0.get(key.as_str()).is_none_or(|old| *old != value); self.0.insert(&key, value)?; - if different { + if different && !is_vid { changed = true; } } @@ -1388,3 +1392,32 @@ fn entity_hidden_vid() { _ = entity2.set_vid(7i64); assert_eq!(entity2.vid(), 7i64); } + +#[test] +fn merge_remove_null_fields_ignores_vid() { + use crate::schema::InputSchema; + let schema = InputSchema::raw("type Thing @entity {id: ID!, name: String!}", "test"); + + let mut current = entity! { schema => id: "1", name: "alice", vid: 3i64 }; + let update = entity! { schema => id: "1", name: "alice", vid: 99i64 }; + + // Merging an identical entity with a different VID must report no change. + let changed = current.merge_remove_null_fields(update).unwrap(); + assert!(!changed, "VID-only difference must not count as a change"); + // The VID must still be updated to the new value. + assert_eq!(current.vid(), 99i64); +} + +#[test] +fn merge_remove_null_fields_detects_real_change() { + use crate::schema::InputSchema; + let schema = InputSchema::raw("type Thing @entity {id: ID!, name: String!}", "test"); + + let mut current = entity! { schema => id: "1", name: "alice", vid: 3i64 }; + let update = entity! { schema => id: "1", name: "bob", vid: 99i64 }; + + let changed = current.merge_remove_null_fields(update).unwrap(); + assert!(changed, "data change must be detected"); + assert_eq!(current.get("name"), Some(&Value::String("bob".to_string()))); + assert_eq!(current.vid(), 99i64); +} diff --git a/graph/src/env/amp.rs b/graph/src/env/amp.rs index a6a02b194c3..93cf5868f2b 100644 --- a/graph/src/env/amp.rs +++ b/graph/src/env/amp.rs @@ -7,13 +7,13 @@ pub struct AmpEnv { /// This is the maximum number of record batches that can be output by a single block. /// /// Defaults to `1,000`. - pub max_buffer_size: usize, + pub buffer_size: usize, /// Maximum number of blocks to request per stream for each SQL query. /// Limiting this value reduces load on the Amp server when processing heavy queries. /// - /// Defaults to `2,000,000`. - pub max_block_range: usize, + /// Defaults to `100,000`. + pub block_range: usize, /// Minimum time to wait before retrying a failed SQL query to the Amp server. /// @@ -32,31 +32,31 @@ pub struct AmpEnv { } impl AmpEnv { - const DEFAULT_MAX_BUFFER_SIZE: usize = 1_000; - const DEFAULT_MAX_BLOCK_RANGE: usize = 2_000_000; + const DEFAULT_BUFFER_SIZE: usize = 1_000; + const DEFAULT_BLOCK_RANGE: usize = 100_000; const DEFAULT_QUERY_RETRY_MIN_DELAY: Duration = Duration::from_secs(1); const DEFAULT_QUERY_RETRY_MAX_DELAY: Duration = Duration::from_secs(600); pub(super) fn new(raw_env: &super::Inner) -> Self { Self { - max_buffer_size: raw_env - .amp_max_buffer_size + buffer_size: raw_env + .amp_buffer_size .and_then(|value| { if value == 0 { return None; } Some(value) }) - .unwrap_or(Self::DEFAULT_MAX_BUFFER_SIZE), - max_block_range: raw_env - .amp_max_block_range + .unwrap_or(Self::DEFAULT_BUFFER_SIZE), + block_range: raw_env + .amp_block_range .map(|mut value| { if value == 0 { value = usize::MAX; } value }) - .unwrap_or(Self::DEFAULT_MAX_BLOCK_RANGE), + .unwrap_or(Self::DEFAULT_BLOCK_RANGE), query_retry_min_delay: raw_env .amp_query_retry_min_delay_seconds .map(Duration::from_secs) diff --git a/graph/src/env/mod.rs b/graph/src/env/mod.rs index 23a6eaff579..536a383e0da 100644 --- a/graph/src/env/mod.rs +++ b/graph/src/env/mod.rs @@ -608,10 +608,10 @@ struct Inner { )] disable_deployment_hash_validation: EnvVarBoolean, - #[envconfig(from = "GRAPH_AMP_MAX_BUFFER_SIZE")] - amp_max_buffer_size: Option, - #[envconfig(from = "GRAPH_AMP_MAX_BLOCK_RANGE")] - amp_max_block_range: Option, + #[envconfig(from = "GRAPH_AMP_BUFFER_SIZE")] + amp_buffer_size: Option, + #[envconfig(from = "GRAPH_AMP_BLOCK_RANGE")] + amp_block_range: Option, #[envconfig(from = "GRAPH_AMP_QUERY_RETRY_MIN_DELAY_SECONDS")] amp_query_retry_min_delay_seconds: Option, #[envconfig(from = "GRAPH_AMP_QUERY_RETRY_MAX_DELAY_SECONDS")] diff --git a/runtime/test/src/test.rs b/runtime/test/src/test.rs index c656a2703f1..7eadb653c80 100644 --- a/runtime/test/src/test.rs +++ b/runtime/test/src/test.rs @@ -20,7 +20,7 @@ use graph_runtime_wasm::{ use semver::Version; use std::collections::{BTreeMap, HashMap}; use std::str::FromStr; -use test_store::{LOGGER, STORE}; +use test_store::{LOGGER, STOPWATCH, STORE, SUBGRAPH_STORE}; use wasmtime::{AsContext, AsContextMut}; use crate::common::{mock_context, mock_data_source}; @@ -44,20 +44,20 @@ fn subgraph_id_with_api_version(subgraph_id: &str, api_version: Version) -> Stri ) } -async fn test_valid_module_and_store( +async fn test_module_and_deployment( subgraph_id: &str, data_source: DataSource, api_version: Version, -) -> (WasmInstance, Arc, DeploymentLocator) { - test_valid_module_and_store_with_timeout(subgraph_id, data_source, api_version, None).await +) -> (WasmInstance, DeploymentLocator) { + test_module_and_deployment_with_timeout(subgraph_id, data_source, api_version, None).await } -async fn test_valid_module_and_store_with_timeout( +async fn test_module_and_deployment_with_timeout( subgraph_id: &str, data_source: DataSource, api_version: Version, timeout: Option, -) -> (WasmInstance, Arc, DeploymentLocator) { +) -> (WasmInstance, DeploymentLocator) { let logger = Logger::root(slog::Discard, o!()); let subgraph_id_with_api_version = subgraph_id_with_api_version(subgraph_id, api_version.clone()); @@ -80,7 +80,7 @@ async fn test_valid_module_and_store_with_timeout( }", ) .await; - let stopwatch_metrics = StopwatchMetrics::new( + let stopwatch = StopwatchMetrics::new( logger.clone(), deployment_id.clone(), "test", @@ -93,7 +93,7 @@ async fn test_valid_module_and_store_with_timeout( let host_metrics = Arc::new(HostMetrics::new( metrics_registry, deployment_id.as_str(), - stopwatch_metrics, + stopwatch.cheap_clone(), gas_metrics, )); @@ -115,7 +115,7 @@ async fn test_valid_module_and_store_with_timeout( .await .unwrap(); - (module, store.subgraph_store(), deployment) + (module, deployment) } pub async fn test_module( @@ -123,7 +123,7 @@ pub async fn test_module( data_source: DataSource, api_version: Version, ) -> WasmInstance { - test_valid_module_and_store(subgraph_id, data_source, api_version) + test_module_and_deployment(subgraph_id, data_source, api_version) .await .0 } @@ -135,9 +135,7 @@ pub async fn test_module_latest(subgraph_id: &str, wasm_file: &str) -> WasmInsta &wasm_file_path(wasm_file, API_VERSION_0_0_5), version.clone(), ); - test_valid_module_and_store(subgraph_id, ds, version) - .await - .0 + test_module_and_deployment(subgraph_id, ds, version).await.0 } pub trait SyncWasmTy: wasmtime::WasmTy + Sync {} @@ -529,7 +527,7 @@ async fn run_ipfs_map( .to_owned() }; - let (mut instance, _, _) = test_valid_module_and_store( + let mut instance = test_module( subgraph_id, mock_data_source( &wasm_file_path("ipfs_map.wasm", api_version.clone()), @@ -557,7 +555,7 @@ async fn run_ipfs_map( .take_ctx() .take_state() .entity_cache - .as_modifications(0) + .as_modifications(0, &STOPWATCH) .await? .modifications; @@ -1008,7 +1006,8 @@ async fn ens_name_by_hash_v0_0_5() { } async fn test_entity_store(api_version: Version) { - let (mut instance, store, deployment) = test_valid_module_and_store( + let store = SUBGRAPH_STORE.clone(); + let (mut instance, deployment) = test_module_and_deployment( "entityStore", mock_data_source( &wasm_file_path("store.wasm", api_version.clone()), @@ -1073,7 +1072,11 @@ async fn test_entity_store(api_version: Version) { &mut ctx.ctx.state.entity_cache, EntityCache::new(Arc::new(writable.clone())), ); - let mut mods = cache.as_modifications(0).await.unwrap().modifications; + let mut mods = cache + .as_modifications(0, &STOPWATCH) + .await + .unwrap() + .modifications; assert_eq!(1, mods.len()); match mods.pop().unwrap() { EntityModification::Overwrite { data, .. } => { @@ -1093,7 +1096,7 @@ async fn test_entity_store(api_version: Version) { .take_ctx() .take_state() .entity_cache - .as_modifications(0) + .as_modifications(0, &STOPWATCH) .await .unwrap() .modifications; @@ -1626,7 +1629,7 @@ async fn generate_id() { let entity_cache = host.ctx.state.entity_cache; let mods = entity_cache - .as_modifications(12) + .as_modifications(12, &STOPWATCH) .await .unwrap() .modifications; diff --git a/runtime/test/src/test/abi.rs b/runtime/test/src/test/abi.rs index 8b81b014027..b93ed2d9cfa 100644 --- a/runtime/test/src/test/abi.rs +++ b/runtime/test/src/test/abi.rs @@ -7,7 +7,7 @@ use super::*; async fn test_unbounded_loop(api_version: Version) { // Set handler timeout to 3 seconds. - let mut instance = test_valid_module_and_store_with_timeout( + let mut instance = test_module_and_deployment_with_timeout( "unboundedLoop", mock_data_source( &wasm_file_path("non_terminating.wasm", api_version.clone()), diff --git a/store/test-store/src/store.rs b/store/test-store/src/store.rs index 5f2cc52949b..b65080f3cf4 100644 --- a/store/test-store/src/store.rs +++ b/store/test-store/src/store.rs @@ -68,6 +68,13 @@ lazy_static! { pub static ref NODE_ID: NodeId = NodeId::new("test").unwrap(); pub static ref SUBGRAPH_STORE: Arc = STORE.subgraph_store(); static ref BLOCK_STORE: DieselBlockStore = STORE.block_store(); + pub static ref STOPWATCH: StopwatchMetrics = StopwatchMetrics::new( + Logger::root(slog::Discard, o!()), + DeploymentHash::new("test").unwrap(), + "test", + METRICS_REGISTRY.clone(), + "dummy".to_string(), + ); pub static ref GENESIS_PTR: BlockPtr = ( B256::from(hex!( "bd34884280958002c51d3f7b5f853e6febeba33de0f40d15b0363006533c924f" @@ -362,13 +369,6 @@ pub async fn transact_entities_and_dynamic_data_sources( Arc::new(manifest_idx_and_name), ))?; - let mut entity_cache = EntityCache::new(Arc::new(store.clone())); - entity_cache.append(ops); - let mods = entity_cache - .as_modifications(block_ptr_to.number) - .await - .expect("failed to convert to modifications") - .modifications; let metrics_registry = Arc::new(MetricsRegistry::mock()); let stopwatch_metrics = StopwatchMetrics::new( Logger::root(slog::Discard, o!()), @@ -377,6 +377,14 @@ pub async fn transact_entities_and_dynamic_data_sources( metrics_registry.clone(), store.shard().to_string(), ); + + let mut entity_cache = EntityCache::new(Arc::new(store.clone())); + entity_cache.append(ops); + let mods = entity_cache + .as_modifications(block_ptr_to.number, &stopwatch_metrics) + .await + .expect("failed to convert to modifications") + .modifications; let block_time = BlockTime::for_test(&block_ptr_to); store .transact_block_operations( diff --git a/store/test-store/tests/graph/entity_cache.rs b/store/test-store/tests/graph/entity_cache.rs index 7e10827548b..9b9f2c4c574 100644 --- a/store/test-store/tests/graph/entity_cache.rs +++ b/store/test-store/tests/graph/entity_cache.rs @@ -195,7 +195,7 @@ fn sort_by_entity_key(mut mods: Vec) -> Vec data.vid(), diff --git a/store/test-store/tests/postgres/aggregation.rs b/store/test-store/tests/postgres/aggregation.rs index de2ef0e07a7..7cc300b04d8 100644 --- a/store/test-store/tests/postgres/aggregation.rs +++ b/store/test-store/tests/postgres/aggregation.rs @@ -26,7 +26,8 @@ use graph::{ }; use graph_store_postgres::Store as DieselStore; use test_store::{ - create_test_subgraph, remove_subgraphs, run_test_sequentially, BLOCKS, LOGGER, METRICS_REGISTRY, + create_test_subgraph, remove_subgraphs, run_test_sequentially, BLOCKS, LOGGER, + METRICS_REGISTRY, STOPWATCH, }; const SCHEMA: &str = r#" @@ -110,7 +111,7 @@ pub async fn insert_entities( let mut entity_cache = EntityCache::new(Arc::new(store.clone())); entity_cache.append(ops); let mods = entity_cache - .as_modifications(block_ptr_to.number) + .as_modifications(block_ptr_to.number, &STOPWATCH) .await .expect("failed to convert to modifications") .modifications;