From d2ef4b34d0b5da226cbc2f06f0b92d094baf7622 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Fri, 6 Mar 2026 11:15:02 -0800 Subject: [PATCH] core: Remove error return from IndexingContext.revert_data_sources The unnecessary return makes callers look like they could fail --- core/src/subgraph/context/mod.rs | 9 +++--- core/src/subgraph/runner/mod.rs | 50 +++++++++++++------------------- 2 files changed, 24 insertions(+), 35 deletions(-) diff --git a/core/src/subgraph/context/mod.rs b/core/src/subgraph/context/mod.rs index fa11dff8cf6..07af703d7d7 100644 --- a/core/src/subgraph/context/mod.rs +++ b/core/src/subgraph/context/mod.rs @@ -114,12 +114,12 @@ impl> IndexingContext { /// `process_trigger`. /// /// File data sources that have been marked not done during this process will get re-queued - pub fn revert_data_sources(&mut self, reverted_block: BlockNumber) -> Result<(), Error> { + pub fn revert_data_sources(&mut self, reverted_block: BlockNumber) { let removed = self.instance.revert_data_sources(reverted_block); removed .into_iter() - .try_for_each(|source| self.offchain_monitor.add_source(source)) + .for_each(|source| self.offchain_monitor.add_source(source)) } pub fn add_dynamic_data_source( @@ -136,7 +136,7 @@ impl> IndexingContext { if let Some((source, is_processed)) = offchain_fields { // monitor data source only if it has not yet been processed. if !is_processed { - self.offchain_monitor.add_source(source)?; + self.offchain_monitor.add_source(source); } } } @@ -212,7 +212,7 @@ impl OffchainMonitor { } } - fn add_source(&mut self, source: offchain::Source) -> Result<(), Error> { + fn add_source(&mut self, source: offchain::Source) { match source { offchain::Source::Ipfs(path) => self.ipfs_monitor.monitor(IpfsRequest { ctx: IpfsContext::new(&self.deployment_hash, &self.logger), @@ -220,7 +220,6 @@ impl OffchainMonitor { }), offchain::Source::Arweave(base64) => self.arweave_monitor.monitor(base64), }; - Ok(()) } pub fn ready_offchain_events(&mut self) -> Result, Error> { diff --git a/core/src/subgraph/runner/mod.rs b/core/src/subgraph/runner/mod.rs index a925c652e6c..370b914b9ab 100644 --- a/core/src/subgraph/runner/mod.rs +++ b/core/src/subgraph/runner/mod.rs @@ -124,14 +124,13 @@ where /// be removed. The same thing also applies to the block cache. /// This function must be called before continuing to process in order to avoid /// duplicated host insertion and POI issues with dirty entity changes. - fn revert_state_to(&mut self, block_number: BlockNumber) -> Result<(), Error> { + fn revert_state_to(&mut self, block_number: BlockNumber) { self.state.entity_lfu_cache = LfuCache::new(); // 1. Revert all hosts(created by DDS) at a block higher than `block_number`. // 2. Unmark any offchain data sources that were marked done on the blocks being removed. // When no offchain datasources are present, 2. should be a noop. - self.ctx.revert_data_sources(block_number + 1)?; - Ok(()) + self.ctx.revert_data_sources(block_number + 1); } #[cfg(debug_assertions)] @@ -420,7 +419,7 @@ where let store = self.inputs.store.cheap_clone(); if let Some(store) = store.restart().await? { let last_good_block = store.block_ptr().map(|ptr| ptr.number).unwrap_or(0); - self.revert_state_to(last_good_block)?; + self.revert_state_to(last_good_block); self.inputs = Arc::new(self.inputs.with_store(store)); } } @@ -522,10 +521,7 @@ where block_stream, to_ptr, cursor, - } => { - self.handle_revert_state(block_stream, to_ptr, cursor) - .await? - } + } => self.handle_revert_state(block_stream, to_ptr, cursor).await, RunnerState::Stopped { reason } => { return self.finalize(reason).await; @@ -610,20 +606,20 @@ where block_stream: Cancelable>>, revert_to_ptr: BlockPtr, cursor: FirehoseCursor, - ) -> Result, SubgraphRunnerError> { + ) -> RunnerState { let stopwatch = &self.metrics.stream.stopwatch; let _section = stopwatch.start_section(HANDLE_REVERT_SECTION_NAME); - let action = self.handle_revert(revert_to_ptr, cursor).await?; + let action = self.handle_revert(revert_to_ptr, cursor).await; match action { - Action::Continue => Ok(RunnerState::AwaitingBlock { block_stream }), - Action::Restart => Ok(RunnerState::Restarting { + Action::Continue => RunnerState::AwaitingBlock { block_stream }, + Action::Restart => RunnerState::Restarting { reason: RestartReason::DataSourceExpired, - }), - Action::Stop => Ok(RunnerState::Stopped { + }, + Action::Stop => RunnerState::Stopped { reason: StopReason::Canceled, - }), + }, } } @@ -1355,7 +1351,7 @@ where .block_ptr() .map(|ptr| ptr.number) .unwrap_or(0); - self.revert_state_to(last_good_block)?; + self.revert_state_to(last_good_block); Ok(Action::Restart) } @@ -1369,7 +1365,7 @@ where .block_ptr() .map(|ptr| ptr.number) .unwrap_or(0); - self.revert_state_to(last_good_block)?; + self.revert_state_to(last_good_block); let message = format!("{:#}", e).replace('\n', "\t"); let err = anyhow!("{}, code: {}", message, LogCode::SubgraphSyncingFailure); @@ -1388,7 +1384,7 @@ where .block_ptr() .map(|ptr| ptr.number) .unwrap_or(0); - self.revert_state_to(last_good_block)?; + self.revert_state_to(last_good_block); let message = format!("{:#}", e).replace('\n', "\t"); @@ -1612,11 +1608,7 @@ where C: Blockchain, T: RuntimeHostBuilder, { - async fn handle_revert( - &mut self, - revert_to_ptr: BlockPtr, - cursor: FirehoseCursor, - ) -> Result { + async fn handle_revert(&mut self, revert_to_ptr: BlockPtr, cursor: FirehoseCursor) -> Action { // Current deployment head in the database / WritableAgent Mutex cache. // // Safe unwrap because in a Revert event we're sure the subgraph has @@ -1624,7 +1616,7 @@ where let subgraph_ptr = self.inputs.store.block_ptr().unwrap(); if revert_to_ptr.number >= subgraph_ptr.number { info!(&self.logger, "Block to revert is higher than subgraph pointer, nothing to do"; "subgraph_ptr" => &subgraph_ptr, "revert_to_ptr" => &revert_to_ptr); - return Ok(Action::Continue); + return Action::Continue; } info!(&self.logger, "Reverting block to get back to main chain"; "subgraph_ptr" => &subgraph_ptr, "revert_to_ptr" => &revert_to_ptr); @@ -1638,7 +1630,7 @@ where error!(&self.logger, "Could not revert block. Retrying"; "error" => %e); // Exit inner block stream consumption loop and go up to loop that restarts subgraph - return Ok(Action::Restart); + return Action::Restart; } self.metrics @@ -1650,17 +1642,15 @@ where .deployment_head .set(subgraph_ptr.number as f64); - self.revert_state_to(revert_to_ptr.number)?; + self.revert_state_to(revert_to_ptr.number); let needs_restart: bool = self.needs_restart(revert_to_ptr, subgraph_ptr); - let action = if needs_restart { + if needs_restart { Action::Restart } else { Action::Continue - }; - - Ok(action) + } } /// Determines if the subgraph needs to be restarted.