Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 27 additions & 31 deletions core/src/amp_subgraph/runner/data_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,22 +11,22 @@ use graph::{
},
blockchain::block_stream::FirehoseCursor,
cheap_clone::CheapClone,
components::store::{EntityCache, ModificationsAndCache},
components::store::{EntityCache, EntityLfuCache, ModificationsAndCache, SeqGenerator},
};
use slog::{debug, trace};

use super::{data_stream::TablePtr, Compat, Context, Error};

pub(super) async fn process_record_batch_groups<AC>(
cx: &mut Context<AC>,
mut entity_cache: EntityCache,
mut entity_lfu_cache: EntityLfuCache,
record_batch_groups: RecordBatchGroups,
stream_table_ptr: Arc<[TablePtr]>,
latest_block: BlockNumber,
) -> Result<EntityCache, Error> {
) -> Result<EntityLfuCache, Error> {
if record_batch_groups.is_empty() {
debug!(cx.logger, "Received no record batch groups");
return Ok(entity_cache);
return Ok(entity_lfu_cache);
}

let from_block = record_batch_groups
Expand All @@ -50,9 +50,9 @@ pub(super) async fn process_record_batch_groups<AC>(
"record_batches_count" => record_batch_group.record_batches.len()
);

entity_cache = process_record_batch_group(
entity_lfu_cache = process_record_batch_group(
cx,
entity_cache,
entity_lfu_cache,
block_number,
block_hash,
record_batch_group,
Expand All @@ -79,18 +79,18 @@ pub(super) async fn process_record_batch_groups<AC>(
"to_block" => to_block
);

Ok(entity_cache)
Ok(entity_lfu_cache)
}

async fn process_record_batch_group<AC>(
cx: &mut Context<AC>,
mut entity_cache: EntityCache,
entity_lfu_cache: EntityLfuCache,
block_number: BlockNumber,
block_hash: BlockHash,
record_batch_group: RecordBatchGroup,
stream_table_ptr: &[TablePtr],
latest_block: BlockNumber,
) -> Result<EntityCache, Error> {
) -> Result<EntityLfuCache, Error> {
let _section = cx
.metrics
.stopwatch
Expand All @@ -100,9 +100,15 @@ async fn process_record_batch_group<AC>(

if record_batches.is_empty() {
debug!(cx.logger, "Record batch group is empty");
return Ok(entity_cache);
return Ok(entity_lfu_cache);
}

let mut entity_cache = EntityCache::with_current(
cx.store.cheap_clone(),
entity_lfu_cache,
SeqGenerator::new(block_number.compat()),
);

let block_timestamp = if cx.manifest.schema.has_aggregations() {
decode_block_timestamp(&record_batches)
.map_err(|e| e.context("failed to decode block timestamp"))?
Expand All @@ -121,7 +127,6 @@ async fn process_record_batch_group<AC>(
process_record_batch(
cx,
&mut entity_cache,
block_number,
record_batch,
stream_table_ptr[stream_index],
)
Expand Down Expand Up @@ -169,16 +174,12 @@ async fn process_record_batch_group<AC>(
cx.metrics.deployment_synced.record(true);
}

Ok(EntityCache::with_current(
cx.store.cheap_clone(),
entity_lfu_cache,
))
Ok(entity_lfu_cache)
}

async fn process_record_batch<AC>(
cx: &mut Context<AC>,
entity_cache: &mut EntityCache,
block_number: BlockNumber,
record_batch: RecordBatch,
(i, j): TablePtr,
) -> Result<(), Error> {
Expand Down Expand Up @@ -209,13 +210,11 @@ async fn process_record_batch<AC>(
let key = match key {
Some(key) => key,
None => {
let entity_id = entity_cache
.generate_id(id_type, block_number.compat())
.map_err(|e| {
Error::Deterministic(e.context(format!(
"failed to generate a new id for an entity of type '{entity_name}'"
)))
})?;
let entity_id = entity_cache.seq_gen.id(id_type).map_err(|e| {
Error::Deterministic(e.context(format!(
"failed to generate a new id for an entity of type '{entity_name}'"
)))
})?;

entity_data.push(("id".into(), entity_id.clone().into()));
entity_type.key(entity_id)
Expand All @@ -229,14 +228,11 @@ async fn process_record_batch<AC>(
)))
})?;

entity_cache
.set(key, entity, block_number.compat(), None)
.await
.map_err(|e| {
Error::Deterministic(e.context(format!(
"failed to store a new entity of type '{entity_name}' with id '{entity_id}'"
)))
})?;
entity_cache.set(key, entity, None).await.map_err(|e| {
Error::Deterministic(e.context(format!(
"failed to store a new entity of type '{entity_name}' with id '{entity_id}'"
)))
})?;
}

Ok(())
Expand Down
10 changes: 5 additions & 5 deletions core/src/amp_subgraph/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use std::time::{Duration, Instant};
use anyhow::Result;
use futures::StreamExt;
use graph::{
amp::Client, cheap_clone::CheapClone, components::store::EntityCache,
data::subgraph::schema::SubgraphError,
amp::Client, cheap_clone::CheapClone, components::store::EntityLfuCache,
data::subgraph::schema::SubgraphError, util::lfu_cache::LfuCache,
};
use slog::{debug, error, warn};
use tokio_util::sync::CancellationToken;
Expand Down Expand Up @@ -104,15 +104,15 @@ where
.update(latest_block.min(cx.end_block()));

let mut deployment_is_failed = cx.store.health().await?.is_failed();
let mut entity_cache = EntityCache::new(cx.store.cheap_clone());
let mut entity_lfu_cache: EntityLfuCache = LfuCache::new();
let mut stream = new_data_stream(cx, latest_block);

while let Some(result) = stream.next().await {
let (record_batch_groups, stream_table_ptr) = result?;

entity_cache = process_record_batch_groups(
entity_lfu_cache = process_record_batch_groups(
cx,
entity_cache,
entity_lfu_cache,
record_batch_groups,
stream_table_ptr,
latest_block,
Expand Down
32 changes: 14 additions & 18 deletions core/src/subgraph/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ use graph::blockchain::{
Block, BlockTime, Blockchain, DataSource as _, SubgraphFilter, Trigger, TriggerFilter as _,
TriggerFilterWrapper,
};
use graph::components::store::{EmptyStore, GetScope, ReadStore, StoredDynamicDataSource};
use graph::components::store::{
EmptyStore, GetScope, ReadStore, SeqGenerator, StoredDynamicDataSource,
};
use graph::components::subgraph::InstanceDSTemplate;
use graph::components::trigger_processor::RunnableTriggers;
use graph::components::{
Expand Down Expand Up @@ -1023,9 +1025,9 @@ where
.ready_offchain_events()
.non_deterministic()?;

let onchain_vid_seq = block_state.entity_cache.vid_seq;
let vid_gen = block_state.vid_gen();
let (offchain_mods, processed_offchain_data_sources, persisted_off_chain_data_sources) =
self.handle_offchain_triggers(offchain_events, block, onchain_vid_seq)
self.handle_offchain_triggers(offchain_events, block, vid_gen)
.await
.non_deterministic()?;

Expand Down Expand Up @@ -1070,9 +1072,11 @@ where
// Causality region for onchain triggers.
let causality_region = PoICausalityRegion::from_network(&self.inputs.network);

let vid_gen = SeqGenerator::new(block_ptr.number);
let mut block_state = BlockState::new(
self.inputs.store.clone(),
std::mem::take(&mut self.state.entity_lfu_cache),
vid_gen,
);

let _section = self
Expand Down Expand Up @@ -1475,7 +1479,7 @@ where
&mut self,
triggers: Vec<offchain::TriggerData>,
block: &Arc<C::Block>,
mut next_vid_seq: u32,
vid_gen: SeqGenerator,
) -> Result<
(
Vec<EntityModification>,
Expand All @@ -1492,12 +1496,11 @@ where
// Using an `EmptyStore` and clearing the cache for each trigger is a makeshift way to
// get causality region isolation.
let schema = ReadStore::input_schema(&self.inputs.store);
let mut block_state = BlockState::new(EmptyStore::new(schema), LfuCache::new());

// Continue the vid sequence from the previous trigger (or from
// onchain processing) so that each offchain trigger does not
// reset to RESERVED_VIDS and produce duplicate VIDs.
block_state.entity_cache.vid_seq = next_vid_seq;
let mut block_state = BlockState::new(
EmptyStore::new(schema),
LfuCache::new(),
vid_gen.cheap_clone(),
);

// PoI ignores offchain events.
// See also: poi-ignores-offchain
Expand Down Expand Up @@ -1564,10 +1567,6 @@ where
return Err(anyhow!("{}", err));
}

// Carry forward the vid sequence so the next iteration doesn't
// reset to RESERVED_VIDS and produce duplicate VIDs.
next_vid_seq = block_state.entity_cache.vid_seq;

mods.extend(
block_state
.entity_cache
Expand Down Expand Up @@ -1695,7 +1694,6 @@ async fn update_proof_of_indexing(
key: EntityKey,
digest: Bytes,
block_time: BlockTime,
block: BlockNumber,
) -> Result<(), Error> {
let digest_name = entity_cache.schema.poi_digest();
let mut data = vec![
Expand All @@ -1710,12 +1708,11 @@ async fn update_proof_of_indexing(
data.push((entity_cache.schema.poi_block_time(), block_time));
}
let poi = entity_cache.make_entity(data)?;
entity_cache.set(key, poi, block, None).await
entity_cache.set(key, poi, None).await
}

let _section_guard = stopwatch.start_section("update_proof_of_indexing");

let block_number = proof_of_indexing.get_block();
let mut proof_of_indexing = proof_of_indexing.take();

for (causality_region, stream) in proof_of_indexing.drain() {
Expand Down Expand Up @@ -1752,7 +1749,6 @@ async fn update_proof_of_indexing(
entity_key,
updated_proof_of_indexing,
block_time,
block_number,
)
.await?;
}
Expand Down
Loading