Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions core/src/subgraph/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ impl<C: Blockchain, T: RuntimeHostBuilder<C>> IndexingContext<C, T> {
/// `process_trigger`.
///
/// File data sources that have been marked not done during this process will get re-queued
pub fn revert_data_sources(&mut self, reverted_block: BlockNumber) -> Result<(), Error> {
pub fn revert_data_sources(&mut self, reverted_block: BlockNumber) {
let removed = self.instance.revert_data_sources(reverted_block);

removed
.into_iter()
.try_for_each(|source| self.offchain_monitor.add_source(source))
.for_each(|source| self.offchain_monitor.add_source(source))
}

pub fn add_dynamic_data_source(
Expand All @@ -136,7 +136,7 @@ impl<C: Blockchain, T: RuntimeHostBuilder<C>> IndexingContext<C, T> {
if let Some((source, is_processed)) = offchain_fields {
// monitor data source only if it has not yet been processed.
if !is_processed {
self.offchain_monitor.add_source(source)?;
self.offchain_monitor.add_source(source);
}
}
}
Expand Down Expand Up @@ -212,15 +212,14 @@ impl OffchainMonitor {
}
}

fn add_source(&mut self, source: offchain::Source) -> Result<(), Error> {
fn add_source(&mut self, source: offchain::Source) {
match source {
offchain::Source::Ipfs(path) => self.ipfs_monitor.monitor(IpfsRequest {
ctx: IpfsContext::new(&self.deployment_hash, &self.logger),
path,
}),
offchain::Source::Arweave(base64) => self.arweave_monitor.monitor(base64),
};
Ok(())
}

pub fn ready_offchain_events(&mut self) -> Result<Vec<offchain::TriggerData>, Error> {
Expand Down
50 changes: 20 additions & 30 deletions core/src/subgraph/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,13 @@ where
/// be removed. The same thing also applies to the block cache.
/// This function must be called before continuing to process in order to avoid
/// duplicated host insertion and POI issues with dirty entity changes.
fn revert_state_to(&mut self, block_number: BlockNumber) -> Result<(), Error> {
fn revert_state_to(&mut self, block_number: BlockNumber) {
self.state.entity_lfu_cache = LfuCache::new();

// 1. Revert all hosts(created by DDS) at a block higher than `block_number`.
// 2. Unmark any offchain data sources that were marked done on the blocks being removed.
// When no offchain datasources are present, 2. should be a noop.
self.ctx.revert_data_sources(block_number + 1)?;
Ok(())
self.ctx.revert_data_sources(block_number + 1);
}

#[cfg(debug_assertions)]
Expand Down Expand Up @@ -420,7 +419,7 @@ where
let store = self.inputs.store.cheap_clone();
if let Some(store) = store.restart().await? {
let last_good_block = store.block_ptr().map(|ptr| ptr.number).unwrap_or(0);
self.revert_state_to(last_good_block)?;
self.revert_state_to(last_good_block);
self.inputs = Arc::new(self.inputs.with_store(store));
}
}
Expand Down Expand Up @@ -522,10 +521,7 @@ where
block_stream,
to_ptr,
cursor,
} => {
self.handle_revert_state(block_stream, to_ptr, cursor)
.await?
}
} => self.handle_revert_state(block_stream, to_ptr, cursor).await,

RunnerState::Stopped { reason } => {
return self.finalize(reason).await;
Expand Down Expand Up @@ -610,20 +606,20 @@ where
block_stream: Cancelable<Box<dyn BlockStream<C>>>,
revert_to_ptr: BlockPtr,
cursor: FirehoseCursor,
) -> Result<RunnerState<C>, SubgraphRunnerError> {
) -> RunnerState<C> {
let stopwatch = &self.metrics.stream.stopwatch;
let _section = stopwatch.start_section(HANDLE_REVERT_SECTION_NAME);

let action = self.handle_revert(revert_to_ptr, cursor).await?;
let action = self.handle_revert(revert_to_ptr, cursor).await;

match action {
Action::Continue => Ok(RunnerState::AwaitingBlock { block_stream }),
Action::Restart => Ok(RunnerState::Restarting {
Action::Continue => RunnerState::AwaitingBlock { block_stream },
Action::Restart => RunnerState::Restarting {
reason: RestartReason::DataSourceExpired,
}),
Action::Stop => Ok(RunnerState::Stopped {
},
Action::Stop => RunnerState::Stopped {
reason: StopReason::Canceled,
}),
},
}
}

Expand Down Expand Up @@ -1355,7 +1351,7 @@ where
.block_ptr()
.map(|ptr| ptr.number)
.unwrap_or(0);
self.revert_state_to(last_good_block)?;
self.revert_state_to(last_good_block);

Ok(Action::Restart)
}
Expand All @@ -1369,7 +1365,7 @@ where
.block_ptr()
.map(|ptr| ptr.number)
.unwrap_or(0);
self.revert_state_to(last_good_block)?;
self.revert_state_to(last_good_block);

let message = format!("{:#}", e).replace('\n', "\t");
let err = anyhow!("{}, code: {}", message, LogCode::SubgraphSyncingFailure);
Expand All @@ -1388,7 +1384,7 @@ where
.block_ptr()
.map(|ptr| ptr.number)
.unwrap_or(0);
self.revert_state_to(last_good_block)?;
self.revert_state_to(last_good_block);

let message = format!("{:#}", e).replace('\n', "\t");

Expand Down Expand Up @@ -1612,19 +1608,15 @@ where
C: Blockchain,
T: RuntimeHostBuilder<C>,
{
async fn handle_revert(
&mut self,
revert_to_ptr: BlockPtr,
cursor: FirehoseCursor,
) -> Result<Action, Error> {
async fn handle_revert(&mut self, revert_to_ptr: BlockPtr, cursor: FirehoseCursor) -> Action {
// Current deployment head in the database / WritableAgent Mutex cache.
//
// Safe unwrap because in a Revert event we're sure the subgraph has
// advanced at least once.
let subgraph_ptr = self.inputs.store.block_ptr().unwrap();
if revert_to_ptr.number >= subgraph_ptr.number {
info!(&self.logger, "Block to revert is higher than subgraph pointer, nothing to do"; "subgraph_ptr" => &subgraph_ptr, "revert_to_ptr" => &revert_to_ptr);
return Ok(Action::Continue);
return Action::Continue;
}

info!(&self.logger, "Reverting block to get back to main chain"; "subgraph_ptr" => &subgraph_ptr, "revert_to_ptr" => &revert_to_ptr);
Expand All @@ -1638,7 +1630,7 @@ where
error!(&self.logger, "Could not revert block. Retrying"; "error" => %e);

// Exit inner block stream consumption loop and go up to loop that restarts subgraph
return Ok(Action::Restart);
return Action::Restart;
}

self.metrics
Expand All @@ -1650,17 +1642,15 @@ where
.deployment_head
.set(subgraph_ptr.number as f64);

self.revert_state_to(revert_to_ptr.number)?;
self.revert_state_to(revert_to_ptr.number);

let needs_restart: bool = self.needs_restart(revert_to_ptr, subgraph_ptr);

let action = if needs_restart {
if needs_restart {
Action::Restart
} else {
Action::Continue
};

Ok(action)
}
}

/// Determines if the subgraph needs to be restarted.
Expand Down