-
Notifications
You must be signed in to change notification settings - Fork 93
feat: refinalization for late forester registrations #2326
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
7581a32
4c326d3
7b31574
0ac3361
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -93,6 +93,57 @@ type AddressBatchProcessorMap<R> = | |
| Arc<DashMap<Pubkey, (u64, Arc<Mutex<QueueProcessor<R, AddressTreeStrategy>>>)>>; | ||
| type ProcessorInitLockMap = Arc<DashMap<Pubkey, Arc<Mutex<()>>>>; | ||
|
|
||
| /// Coordinates re-finalization across parallel `process_queue` tasks when new | ||
| /// foresters register mid-epoch. Only one task performs the on-chain | ||
| /// `finalize_registration` tx; others wait for it to complete. | ||
| #[derive(Debug)] | ||
| pub(crate) struct RegistrationTracker { | ||
| cached_registered_weight: AtomicU64, | ||
| refinalize_in_progress: AtomicBool, | ||
| refinalized: tokio::sync::Notify, | ||
| } | ||
|
|
||
| impl RegistrationTracker { | ||
| fn new(weight: u64) -> Self { | ||
| Self { | ||
| cached_registered_weight: AtomicU64::new(weight), | ||
| refinalize_in_progress: AtomicBool::new(false), | ||
| refinalized: tokio::sync::Notify::new(), | ||
| } | ||
| } | ||
|
|
||
| fn cached_weight(&self) -> u64 { | ||
| self.cached_registered_weight.load(Ordering::Acquire) | ||
| } | ||
|
|
||
| /// Returns `true` if this caller won the race to perform re-finalization. | ||
| fn try_claim_refinalize(&self) -> bool { | ||
| self.refinalize_in_progress | ||
| .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) | ||
| .is_ok() | ||
| } | ||
|
|
||
| /// Called by the winner after the on-chain tx succeeds. | ||
| fn complete_refinalize(&self, new_weight: u64) { | ||
| self.cached_registered_weight | ||
| .store(new_weight, Ordering::Release); | ||
| self.refinalize_in_progress.store(false, Ordering::Release); | ||
| self.refinalized.notify_waiters(); | ||
| } | ||
|
|
||
| /// Called by non-winners to block until re-finalization is done. | ||
| async fn wait_for_refinalize(&self) { | ||
| if !self.refinalize_in_progress.load(Ordering::Acquire) { | ||
| return; | ||
| } | ||
| let fut = self.refinalized.notified(); | ||
| if !self.refinalize_in_progress.load(Ordering::Acquire) { | ||
| return; | ||
| } | ||
| fut.await; | ||
| } | ||
| } | ||
|
|
||
| /// Timing for a single circuit type (circuit inputs + proof generation) | ||
| #[derive(Copy, Clone, Debug, Default)] | ||
| pub struct CircuitMetrics { | ||
|
|
@@ -225,6 +276,8 @@ pub struct EpochManager<R: Rpc + Indexer> { | |
| address_lookup_tables: Arc<Vec<AddressLookupTableAccount>>, | ||
| heartbeat: Arc<ServiceHeartbeat>, | ||
| run_id: Arc<str>, | ||
| /// Per-epoch registration trackers to coordinate re-finalization when new foresters register mid-epoch | ||
| registration_trackers: Arc<DashMap<u64, Arc<RegistrationTracker>>>, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Consider cleaning up after the epoch completes, e.g. at the end of self.registration_trackers.remove(&epoch);
self.processing_epochs.remove(&epoch); |
||
| } | ||
|
|
||
| impl<R: Rpc + Indexer> Clone for EpochManager<R> { | ||
|
|
@@ -255,6 +308,7 @@ impl<R: Rpc + Indexer> Clone for EpochManager<R> { | |
| address_lookup_tables: self.address_lookup_tables.clone(), | ||
| heartbeat: self.heartbeat.clone(), | ||
| run_id: self.run_id.clone(), | ||
| registration_trackers: self.registration_trackers.clone(), | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -305,6 +359,7 @@ impl<R: Rpc + Indexer> EpochManager<R> { | |
| address_lookup_tables, | ||
| heartbeat, | ||
| run_id: Arc::<str>::from(run_id), | ||
| registration_trackers: Arc::new(DashMap::new()), | ||
| }) | ||
| } | ||
|
|
||
|
|
@@ -595,6 +650,16 @@ impl<R: Rpc + Indexer> EpochManager<R> { | |
| epoch_info.trees.push(tree_schedule.clone()); | ||
|
|
||
| let self_clone = Arc::new(self.clone()); | ||
| let tracker = self | ||
| .registration_trackers | ||
| .entry(current_epoch) | ||
| .or_insert_with(|| { | ||
| Arc::new(RegistrationTracker::new( | ||
| epoch_info.epoch_pda.registered_weight, | ||
| )) | ||
| }) | ||
| .value() | ||
| .clone(); | ||
|
|
||
| info!( | ||
| event = "new_tree_processing_task_spawned", | ||
|
|
@@ -608,8 +673,9 @@ impl<R: Rpc + Indexer> EpochManager<R> { | |
| if let Err(e) = self_clone | ||
| .process_queue( | ||
| &epoch_info.epoch, | ||
| &epoch_info.forester_epoch_pda, | ||
| epoch_info.forester_epoch_pda.clone(), | ||
| tree_schedule, | ||
| tracker, | ||
| ) | ||
| .await | ||
| { | ||
|
|
@@ -978,31 +1044,6 @@ impl<R: Rpc + Indexer> EpochManager<R> { | |
| .await | ||
| { | ||
| Ok(info) => info, | ||
| Err(ForesterError::Registration( | ||
| RegistrationError::RegistrationPhaseEnded { | ||
| epoch: failed_epoch, | ||
| current_slot, | ||
| registration_end, | ||
| }, | ||
| )) => { | ||
| let next_epoch = failed_epoch + 1; | ||
| let next_phases = get_epoch_phases(&self.protocol_config, next_epoch); | ||
| let slots_to_wait = | ||
| next_phases.registration.start.saturating_sub(current_slot); | ||
|
|
||
| info!( | ||
| event = "registration_window_missed", | ||
| run_id = %self.run_id, | ||
| failed_epoch, | ||
| registration_end_slot = registration_end, | ||
| current_slot, | ||
| next_epoch, | ||
| next_registration_start_slot = next_phases.registration.start, | ||
| slots_to_wait, | ||
| "Too late to register for requested epoch; next epoch will be used" | ||
| ); | ||
| return Ok(()); | ||
| } | ||
| Err(e) => return Err(e.into()), | ||
| } | ||
| } | ||
|
|
@@ -1476,7 +1517,16 @@ impl<R: Rpc + Indexer> EpochManager<R> { | |
| ); | ||
|
|
||
| let self_arc = Arc::new(self.clone()); | ||
| let epoch_info_arc = Arc::new(epoch_info.clone()); | ||
| let registration_tracker = self | ||
| .registration_trackers | ||
| .entry(epoch_info.epoch.epoch) | ||
| .or_insert_with(|| { | ||
| Arc::new(RegistrationTracker::new( | ||
| epoch_info.epoch_pda.registered_weight, | ||
| )) | ||
| }) | ||
| .value() | ||
| .clone(); | ||
|
sergeytimoshin marked this conversation as resolved.
|
||
|
|
||
| let mut handles: Vec<JoinHandle<Result<()>>> = Vec::with_capacity(trees_to_process.len()); | ||
|
|
||
|
|
@@ -1491,15 +1541,13 @@ impl<R: Rpc + Indexer> EpochManager<R> { | |
| self.heartbeat.add_tree_tasks_spawned(1); | ||
|
|
||
| let self_clone = self_arc.clone(); | ||
| let epoch_info_clone = epoch_info_arc.clone(); | ||
| let epoch_clone = epoch_info.epoch.clone(); | ||
| let forester_epoch_pda = epoch_info.forester_epoch_pda.clone(); | ||
| let tracker = registration_tracker.clone(); | ||
|
|
||
| let handle = tokio::spawn(async move { | ||
| self_clone | ||
| .process_queue( | ||
| &epoch_info_clone.epoch, | ||
| &epoch_info_clone.forester_epoch_pda, | ||
| tree, | ||
| ) | ||
| .process_queue(&epoch_clone, forester_epoch_pda, tree, tracker) | ||
| .await | ||
| }); | ||
|
|
||
|
|
@@ -1559,15 +1607,16 @@ impl<R: Rpc + Indexer> EpochManager<R> { | |
|
|
||
| #[instrument( | ||
| level = "debug", | ||
| skip(self, epoch_info, epoch_pda, tree_schedule), | ||
| skip(self, epoch_info, forester_epoch_pda, tree_schedule, registration_tracker), | ||
| fields(forester = %self.config.payer_keypair.pubkey(), epoch = epoch_info.epoch, | ||
| tree = %tree_schedule.tree_accounts.merkle_tree) | ||
| )] | ||
| pub async fn process_queue( | ||
| pub(crate) async fn process_queue( | ||
| &self, | ||
| epoch_info: &Epoch, | ||
| epoch_pda: &ForesterEpochPda, | ||
| mut forester_epoch_pda: ForesterEpochPda, | ||
| mut tree_schedule: TreeForesterSchedule, | ||
| registration_tracker: Arc<RegistrationTracker>, | ||
| ) -> Result<()> { | ||
| self.heartbeat.increment_queue_started(); | ||
| let mut current_slot = self.slot_tracker.estimated_current_slot(); | ||
|
|
@@ -1588,6 +1637,9 @@ impl<R: Rpc + Indexer> EpochManager<R> { | |
| "Processing queue for tree" | ||
| ); | ||
|
|
||
| let mut last_weight_check = Instant::now(); | ||
| const WEIGHT_CHECK_INTERVAL: Duration = Duration::from_secs(30); | ||
|
|
||
| 'outer_slot_loop: while current_slot < epoch_info.phases.active.end { | ||
| let next_slot_to_process = tree_schedule | ||
| .slots | ||
|
|
@@ -1600,7 +1652,7 @@ impl<R: Rpc + Indexer> EpochManager<R> { | |
| TreeType::StateV1 | TreeType::AddressV1 | TreeType::Unknown => { | ||
| self.process_light_slot( | ||
| epoch_info, | ||
| epoch_pda, | ||
| &forester_epoch_pda, | ||
| &tree_schedule.tree_accounts, | ||
| &light_slot_details, | ||
| ) | ||
|
|
@@ -1612,7 +1664,7 @@ impl<R: Rpc + Indexer> EpochManager<R> { | |
| .unwrap_or(light_slot_details.end_solana_slot); | ||
| self.process_light_slot_v2( | ||
| epoch_info, | ||
| epoch_pda, | ||
| &forester_epoch_pda, | ||
| &tree_schedule.tree_accounts, | ||
| &light_slot_details, | ||
| consecutive_end, | ||
|
|
@@ -1639,6 +1691,27 @@ impl<R: Rpc + Indexer> EpochManager<R> { | |
| } | ||
| } | ||
| tree_schedule.slots[slot_idx] = None; | ||
|
|
||
| // Periodically check if new foresters registered and re-finalize | ||
| if last_weight_check.elapsed() >= WEIGHT_CHECK_INTERVAL { | ||
| last_weight_check = Instant::now(); | ||
| if let Err(e) = self | ||
| .maybe_refinalize( | ||
| epoch_info, | ||
| &mut forester_epoch_pda, | ||
| &mut tree_schedule, | ||
| ®istration_tracker, | ||
| ) | ||
| .await | ||
| { | ||
| warn!( | ||
| event = "refinalize_check_failed", | ||
| run_id = %self.run_id, | ||
| error = ?e, | ||
| "Failed to check/perform re-finalization" | ||
| ); | ||
| } | ||
| } | ||
| } else { | ||
| debug!( | ||
| event = "process_queue_no_eligible_slots", | ||
|
|
@@ -1662,6 +1735,124 @@ impl<R: Rpc + Indexer> EpochManager<R> { | |
| Ok(()) | ||
| } | ||
|
|
||
| /// Check if `EpochPda.registered_weight` changed on-chain. If so, | ||
| /// one task sends a `finalize_registration` tx while others wait, | ||
| /// then all tasks refresh their `ForesterEpochPda` and recompute schedules. | ||
| async fn maybe_refinalize( | ||
| &self, | ||
| epoch_info: &Epoch, | ||
| forester_epoch_pda: &mut ForesterEpochPda, | ||
| tree_schedule: &mut TreeForesterSchedule, | ||
| registration_tracker: &RegistrationTracker, | ||
| ) -> Result<()> { | ||
| let mut rpc = self.rpc_pool.get_connection().await?; | ||
| let epoch_pda_address = get_epoch_pda_address(epoch_info.epoch); | ||
| let on_chain_epoch_pda: EpochPda = rpc | ||
| .get_anchor_account::<EpochPda>(&epoch_pda_address) | ||
| .await? | ||
| .ok_or_else(|| anyhow!("EpochPda not found for epoch {}", epoch_info.epoch))?; | ||
|
|
||
| let on_chain_weight = on_chain_epoch_pda.registered_weight; | ||
| let cached_weight = registration_tracker.cached_weight(); | ||
|
|
||
| if on_chain_weight == cached_weight { | ||
| return Ok(()); | ||
| } | ||
|
|
||
| info!( | ||
| event = "registered_weight_changed", | ||
| run_id = %self.run_id, | ||
| epoch = epoch_info.epoch, | ||
| old_weight = cached_weight, | ||
| new_weight = on_chain_weight, | ||
| "Detected new forester registration, re-finalizing" | ||
| ); | ||
|
|
||
| if registration_tracker.try_claim_refinalize() { | ||
| // This task sends the finalize_registration tx | ||
| let ix = create_finalize_registration_instruction( | ||
| &self.config.payer_keypair.pubkey(), | ||
| &self.config.derivation_pubkey, | ||
| epoch_info.epoch, | ||
| ); | ||
| match rpc | ||
| .create_and_send_transaction( | ||
| &[ix], | ||
| &self.config.payer_keypair.pubkey(), | ||
| &[&self.config.payer_keypair], | ||
| ) | ||
| .await | ||
| { | ||
| Ok(_) => { | ||
| // Re-fetch EpochPda after finalize to get authoritative | ||
| // post-finalize weight (another forester may have registered | ||
| // between our initial read and the finalize tx). | ||
| let post_finalize_pda: EpochPda = rpc | ||
| .get_anchor_account::<EpochPda>(&epoch_pda_address) | ||
| .await? | ||
| .ok_or_else(|| { | ||
| anyhow!("EpochPda not found for epoch {}", epoch_info.epoch) | ||
| })?; | ||
| let post_finalize_weight = post_finalize_pda.registered_weight; | ||
| info!( | ||
| event = "refinalize_registration_success", | ||
| run_id = %self.run_id, | ||
| epoch = epoch_info.epoch, | ||
| new_weight = post_finalize_weight, | ||
| "Re-finalized registration on-chain" | ||
| ); | ||
| registration_tracker.complete_refinalize(post_finalize_weight); | ||
| } | ||
| Err(e) => { | ||
| // Release the claim so a future check can retry | ||
| registration_tracker.complete_refinalize(cached_weight); | ||
| return Err(e.into()); | ||
| } | ||
| } | ||
| } else { | ||
| // Another task is already re-finalizing; wait for it | ||
| registration_tracker.wait_for_refinalize().await; | ||
| } | ||
|
|
||
| // All tasks: re-fetch both PDAs to get post-finalize on-chain state | ||
| // and recompute schedule. | ||
| let refreshed_epoch_pda: EpochPda = rpc | ||
| .get_anchor_account::<EpochPda>(&epoch_pda_address) | ||
| .await? | ||
| .ok_or_else(|| anyhow!("EpochPda not found for epoch {}", epoch_info.epoch))?; | ||
| let updated_pda: ForesterEpochPda = rpc | ||
| .get_anchor_account::<ForesterEpochPda>(&epoch_info.forester_epoch_pda) | ||
| .await? | ||
| .ok_or_else(|| { | ||
| anyhow!( | ||
| "ForesterEpochPda not found at {} after re-finalization", | ||
| epoch_info.forester_epoch_pda | ||
| ) | ||
| })?; | ||
|
|
||
| let current_slot = self.slot_tracker.estimated_current_slot(); | ||
| let new_schedule = TreeForesterSchedule::new_with_schedule( | ||
| &tree_schedule.tree_accounts, | ||
| current_slot, | ||
| &updated_pda, | ||
| &refreshed_epoch_pda, | ||
| )?; | ||
|
sergeytimoshin marked this conversation as resolved.
|
||
|
|
||
| *forester_epoch_pda = updated_pda; | ||
| *tree_schedule = new_schedule; | ||
|
|
||
|
sergeytimoshin marked this conversation as resolved.
|
||
| info!( | ||
| event = "schedule_recomputed_after_refinalize", | ||
| run_id = %self.run_id, | ||
| epoch = epoch_info.epoch, | ||
| tree = %tree_schedule.tree_accounts.merkle_tree, | ||
| new_eligible_slots = tree_schedule.slots.iter().filter(|s| s.is_some()).count(), | ||
| "Recomputed schedule after re-finalization" | ||
| ); | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| #[instrument( | ||
| level = "debug", | ||
| skip(self, epoch_info, epoch_pda, tree_accounts, forester_slot_details), | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.