diff --git a/external/photon b/external/photon index 01cae51ca5..cfc1a7572d 160000 --- a/external/photon +++ b/external/photon @@ -1 +1 @@ -Subproject commit 01cae51ca58935eb46571094ce7abd0ea252c344 +Subproject commit cfc1a7572d485b2e21677e45436fb6cdc727b919 diff --git a/forester-utils/src/rpc_pool.rs b/forester-utils/src/rpc_pool.rs index f5a8d07625..4cd4702ac0 100644 --- a/forester-utils/src/rpc_pool.rs +++ b/forester-utils/src/rpc_pool.rs @@ -182,7 +182,7 @@ impl bb8::ManageConnection for SolanaConnectionManager { commitment_config: Some(self.commitment), fetch_active_tree: false, }; - R::new(fallback_config).await.map_err(|second_err| { + let conn = R::new(fallback_config).await.map_err(|second_err| { error!( "Both RPC endpoints failed: first={}, second={}", first_err, second_err @@ -191,7 +191,16 @@ impl bb8::ManageConnection for SolanaConnectionManager { "first: {}, second: {}", first_err, second_err )) - }) + })?; + // If we were in fallback mode but the fallback URL failed + // while the primary succeeded, recover to primary mode so + // has_broken() won't immediately reject this connection. + if self.health_state.is_fallback_active() + && conn.get_url() == self.health_state.primary_url + { + self.health_state.recover_primary(); + } + Ok(conn) } else { Err(PoolError::ClientCreation(first_err.to_string())) } diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs index 77a30bdc61..7bdd3b1622 100644 --- a/forester/src/epoch_manager.rs +++ b/forester/src/epoch_manager.rs @@ -93,6 +93,57 @@ type AddressBatchProcessorMap = Arc>>)>>; type ProcessorInitLockMap = Arc>>>; +/// Coordinates re-finalization across parallel `process_queue` tasks when new +/// foresters register mid-epoch. Only one task performs the on-chain +/// `finalize_registration` tx; others wait for it to complete. +#[derive(Debug)] +pub(crate) struct RegistrationTracker { + cached_registered_weight: AtomicU64, + refinalize_in_progress: AtomicBool, + refinalized: tokio::sync::Notify, +} + +impl RegistrationTracker { + fn new(weight: u64) -> Self { + Self { + cached_registered_weight: AtomicU64::new(weight), + refinalize_in_progress: AtomicBool::new(false), + refinalized: tokio::sync::Notify::new(), + } + } + + fn cached_weight(&self) -> u64 { + self.cached_registered_weight.load(Ordering::Acquire) + } + + /// Returns `true` if this caller won the race to perform re-finalization. + fn try_claim_refinalize(&self) -> bool { + self.refinalize_in_progress + .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + } + + /// Called by the winner after the on-chain tx succeeds. + fn complete_refinalize(&self, new_weight: u64) { + self.cached_registered_weight + .store(new_weight, Ordering::Release); + self.refinalize_in_progress.store(false, Ordering::Release); + self.refinalized.notify_waiters(); + } + + /// Called by non-winners to block until re-finalization is done. + async fn wait_for_refinalize(&self) { + if !self.refinalize_in_progress.load(Ordering::Acquire) { + return; + } + let fut = self.refinalized.notified(); + if !self.refinalize_in_progress.load(Ordering::Acquire) { + return; + } + fut.await; + } +} + /// Timing for a single circuit type (circuit inputs + proof generation) #[derive(Copy, Clone, Debug, Default)] pub struct CircuitMetrics { @@ -225,6 +276,8 @@ pub struct EpochManager { address_lookup_tables: Arc>, heartbeat: Arc, run_id: Arc, + /// Per-epoch registration trackers to coordinate re-finalization when new foresters register mid-epoch + registration_trackers: Arc>>, } impl Clone for EpochManager { @@ -255,6 +308,7 @@ impl Clone for EpochManager { address_lookup_tables: self.address_lookup_tables.clone(), heartbeat: self.heartbeat.clone(), run_id: self.run_id.clone(), + registration_trackers: self.registration_trackers.clone(), } } } @@ -305,6 +359,7 @@ impl EpochManager { address_lookup_tables, heartbeat, run_id: Arc::::from(run_id), + registration_trackers: Arc::new(DashMap::new()), }) } @@ -595,6 +650,16 @@ impl EpochManager { epoch_info.trees.push(tree_schedule.clone()); let self_clone = Arc::new(self.clone()); + let tracker = self + .registration_trackers + .entry(current_epoch) + .or_insert_with(|| { + Arc::new(RegistrationTracker::new( + epoch_info.epoch_pda.registered_weight, + )) + }) + .value() + .clone(); info!( event = "new_tree_processing_task_spawned", @@ -608,8 +673,9 @@ impl EpochManager { if let Err(e) = self_clone .process_queue( &epoch_info.epoch, - &epoch_info.forester_epoch_pda, + epoch_info.forester_epoch_pda.clone(), tree_schedule, + tracker, ) .await { @@ -978,31 +1044,6 @@ impl EpochManager { .await { Ok(info) => info, - Err(ForesterError::Registration( - RegistrationError::RegistrationPhaseEnded { - epoch: failed_epoch, - current_slot, - registration_end, - }, - )) => { - let next_epoch = failed_epoch + 1; - let next_phases = get_epoch_phases(&self.protocol_config, next_epoch); - let slots_to_wait = - next_phases.registration.start.saturating_sub(current_slot); - - info!( - event = "registration_window_missed", - run_id = %self.run_id, - failed_epoch, - registration_end_slot = registration_end, - current_slot, - next_epoch, - next_registration_start_slot = next_phases.registration.start, - slots_to_wait, - "Too late to register for requested epoch; next epoch will be used" - ); - return Ok(()); - } Err(e) => return Err(e.into()), } } @@ -1044,6 +1085,19 @@ impl EpochManager { // TODO: implement // self.claim(®istration_info).await?; + // Clean up per-epoch state now that this epoch is complete. + // In-flight tasks still hold their own Arc clones, so removal is safe. + self.registration_trackers.remove(&epoch); + self.processing_epochs.remove(&epoch); + self.processed_items_per_epoch_count + .lock() + .await + .remove(&epoch); + self.processing_metrics_per_epoch + .lock() + .await + .remove(&epoch); + info!( event = "process_epoch_completed", run_id = %self.run_id, @@ -1476,7 +1530,16 @@ impl EpochManager { ); let self_arc = Arc::new(self.clone()); - let epoch_info_arc = Arc::new(epoch_info.clone()); + let registration_tracker = self + .registration_trackers + .entry(epoch_info.epoch.epoch) + .or_insert_with(|| { + Arc::new(RegistrationTracker::new( + epoch_info.epoch_pda.registered_weight, + )) + }) + .value() + .clone(); let mut handles: Vec>> = Vec::with_capacity(trees_to_process.len()); @@ -1491,15 +1554,13 @@ impl EpochManager { self.heartbeat.add_tree_tasks_spawned(1); let self_clone = self_arc.clone(); - let epoch_info_clone = epoch_info_arc.clone(); + let epoch_clone = epoch_info.epoch.clone(); + let forester_epoch_pda = epoch_info.forester_epoch_pda.clone(); + let tracker = registration_tracker.clone(); let handle = tokio::spawn(async move { self_clone - .process_queue( - &epoch_info_clone.epoch, - &epoch_info_clone.forester_epoch_pda, - tree, - ) + .process_queue(&epoch_clone, forester_epoch_pda, tree, tracker) .await }); @@ -1559,15 +1620,16 @@ impl EpochManager { #[instrument( level = "debug", - skip(self, epoch_info, epoch_pda, tree_schedule), + skip(self, epoch_info, forester_epoch_pda, tree_schedule, registration_tracker), fields(forester = %self.config.payer_keypair.pubkey(), epoch = epoch_info.epoch, tree = %tree_schedule.tree_accounts.merkle_tree) )] - pub async fn process_queue( + pub(crate) async fn process_queue( &self, epoch_info: &Epoch, - epoch_pda: &ForesterEpochPda, + mut forester_epoch_pda: ForesterEpochPda, mut tree_schedule: TreeForesterSchedule, + registration_tracker: Arc, ) -> Result<()> { self.heartbeat.increment_queue_started(); let mut current_slot = self.slot_tracker.estimated_current_slot(); @@ -1588,6 +1650,9 @@ impl EpochManager { "Processing queue for tree" ); + let mut last_weight_check = Instant::now(); + const WEIGHT_CHECK_INTERVAL: Duration = Duration::from_secs(30); + 'outer_slot_loop: while current_slot < epoch_info.phases.active.end { let next_slot_to_process = tree_schedule .slots @@ -1600,7 +1665,7 @@ impl EpochManager { TreeType::StateV1 | TreeType::AddressV1 | TreeType::Unknown => { self.process_light_slot( epoch_info, - epoch_pda, + &forester_epoch_pda, &tree_schedule.tree_accounts, &light_slot_details, ) @@ -1612,7 +1677,7 @@ impl EpochManager { .unwrap_or(light_slot_details.end_solana_slot); self.process_light_slot_v2( epoch_info, - epoch_pda, + &forester_epoch_pda, &tree_schedule.tree_accounts, &light_slot_details, consecutive_end, @@ -1639,6 +1704,27 @@ impl EpochManager { } } tree_schedule.slots[slot_idx] = None; + + // Periodically check if new foresters registered and re-finalize + if last_weight_check.elapsed() >= WEIGHT_CHECK_INTERVAL { + last_weight_check = Instant::now(); + if let Err(e) = self + .maybe_refinalize( + epoch_info, + &mut forester_epoch_pda, + &mut tree_schedule, + ®istration_tracker, + ) + .await + { + warn!( + event = "refinalize_check_failed", + run_id = %self.run_id, + error = ?e, + "Failed to check/perform re-finalization" + ); + } + } } else { debug!( event = "process_queue_no_eligible_slots", @@ -1662,6 +1748,124 @@ impl EpochManager { Ok(()) } + /// Check if `EpochPda.registered_weight` changed on-chain. If so, + /// one task sends a `finalize_registration` tx while others wait, + /// then all tasks refresh their `ForesterEpochPda` and recompute schedules. + async fn maybe_refinalize( + &self, + epoch_info: &Epoch, + forester_epoch_pda: &mut ForesterEpochPda, + tree_schedule: &mut TreeForesterSchedule, + registration_tracker: &RegistrationTracker, + ) -> Result<()> { + let mut rpc = self.rpc_pool.get_connection().await?; + let epoch_pda_address = get_epoch_pda_address(epoch_info.epoch); + let on_chain_epoch_pda: EpochPda = rpc + .get_anchor_account::(&epoch_pda_address) + .await? + .ok_or_else(|| anyhow!("EpochPda not found for epoch {}", epoch_info.epoch))?; + + let on_chain_weight = on_chain_epoch_pda.registered_weight; + let cached_weight = registration_tracker.cached_weight(); + + if on_chain_weight == cached_weight { + return Ok(()); + } + + info!( + event = "registered_weight_changed", + run_id = %self.run_id, + epoch = epoch_info.epoch, + old_weight = cached_weight, + new_weight = on_chain_weight, + "Detected new forester registration, re-finalizing" + ); + + if registration_tracker.try_claim_refinalize() { + // This task sends the finalize_registration tx + let ix = create_finalize_registration_instruction( + &self.config.payer_keypair.pubkey(), + &self.config.derivation_pubkey, + epoch_info.epoch, + ); + match rpc + .create_and_send_transaction( + &[ix], + &self.config.payer_keypair.pubkey(), + &[&self.config.payer_keypair], + ) + .await + { + Ok(_) => { + // Re-fetch EpochPda after finalize to get authoritative + // post-finalize weight (another forester may have registered + // between our initial read and the finalize tx). + let post_finalize_pda: EpochPda = rpc + .get_anchor_account::(&epoch_pda_address) + .await? + .ok_or_else(|| { + anyhow!("EpochPda not found for epoch {}", epoch_info.epoch) + })?; + let post_finalize_weight = post_finalize_pda.registered_weight; + info!( + event = "refinalize_registration_success", + run_id = %self.run_id, + epoch = epoch_info.epoch, + new_weight = post_finalize_weight, + "Re-finalized registration on-chain" + ); + registration_tracker.complete_refinalize(post_finalize_weight); + } + Err(e) => { + // Release the claim so a future check can retry + registration_tracker.complete_refinalize(cached_weight); + return Err(e.into()); + } + } + } else { + // Another task is already re-finalizing; wait for it + registration_tracker.wait_for_refinalize().await; + } + + // All tasks: re-fetch both PDAs to get post-finalize on-chain state + // and recompute schedule. + let refreshed_epoch_pda: EpochPda = rpc + .get_anchor_account::(&epoch_pda_address) + .await? + .ok_or_else(|| anyhow!("EpochPda not found for epoch {}", epoch_info.epoch))?; + let updated_pda: ForesterEpochPda = rpc + .get_anchor_account::(&epoch_info.forester_epoch_pda) + .await? + .ok_or_else(|| { + anyhow!( + "ForesterEpochPda not found at {} after re-finalization", + epoch_info.forester_epoch_pda + ) + })?; + + let current_slot = self.slot_tracker.estimated_current_slot(); + let new_schedule = TreeForesterSchedule::new_with_schedule( + &tree_schedule.tree_accounts, + current_slot, + &updated_pda, + &refreshed_epoch_pda, + )?; + + *forester_epoch_pda = updated_pda; + *tree_schedule = new_schedule; + + info!( + event = "schedule_recomputed_after_refinalize", + run_id = %self.run_id, + epoch = epoch_info.epoch, + tree = %tree_schedule.tree_accounts.merkle_tree, + new_eligible_slots = tree_schedule.slots.iter().filter(|s| s.is_some()).count(), + "Recomputed schedule after re-finalization" + ); + + Ok(()) + } + #[instrument( level = "debug", skip(self, epoch_info, epoch_pda, tree_accounts, forester_slot_details), diff --git a/forester/src/errors.rs b/forester/src/errors.rs index f88c6e1f21..a83205f1d7 100644 --- a/forester/src/errors.rs +++ b/forester/src/errors.rs @@ -74,13 +74,6 @@ pub enum RegistrationError { registration_start: u64, }, - #[error("Too late to register for epoch {epoch}. Current slot: {current_slot}, Registration end: {registration_end}")] - RegistrationPhaseEnded { - epoch: u64, - current_slot: u64, - registration_end: u64, - }, - #[error("Cannot finalize registration for epoch {epoch}. Current slot: {current_slot}, active phase ended: {active_phase_end_slot}")] FinalizeRegistrationPhaseEnded { epoch: u64, diff --git a/forester/src/forester_status.rs b/forester/src/forester_status.rs index d133fb1af9..80c4539075 100644 --- a/forester/src/forester_status.rs +++ b/forester/src/forester_status.rs @@ -162,20 +162,21 @@ pub async fn get_forester_status_with_options( .config .get_current_active_epoch_progress(slot); let active_phase_length = protocol_config_pda.config.active_phase_length; - let registration_phase_length = protocol_config_pda.config.registration_phase_length; let active_epoch_progress_percentage = active_epoch_progress as f64 / active_phase_length as f64 * 100f64; let hours_until_next_epoch = active_phase_length.saturating_sub(active_epoch_progress) * 460 / 1000 / 3600; - // Determine if registration is currently open + // Registration is relaxed: foresters can register at any time during the epoch, + // so registration is always open for the latest register epoch. let registration_is_open = protocol_config_pda .config .is_registration_phase(slot) .is_ok(); // If registration is closed, show the next epoch as the registration target + let registration_phase_length = protocol_config_pda.config.registration_phase_length; let current_registration_epoch = if registration_is_open { latest_register_epoch } else { diff --git a/forester/src/processor/v1/send_transaction.rs b/forester/src/processor/v1/send_transaction.rs index 9fb015e09e..f30d295943 100644 --- a/forester/src/processor/v1/send_transaction.rs +++ b/forester/src/processor/v1/send_transaction.rs @@ -124,7 +124,7 @@ pub async fn send_batched_transactions match rpc.get_latest_blockhash().await { Ok((new_hash, new_height)) => { recent_blockhash = new_hash; - last_valid_block_height = new_height + 150; + last_valid_block_height = new_height; last_blockhash_refresh = Instant::now(); debug!(tree = %tree_accounts.merkle_tree, "Refreshed blockhash"); } @@ -242,7 +242,7 @@ async fn prepare_batch_prerequisites( error!(tree = %tree_id_str, "Failed to get latest blockhash: {:?}", e); ForesterError::Rpc(e) })?; - (r_blockhash.0, r_blockhash.1 + 150) + (r_blockhash.0, r_blockhash.1) }; let priority_fee = if config.build_transaction_batch_config.enable_priority_fees { diff --git a/forester/src/queue_helpers.rs b/forester/src/queue_helpers.rs index ba80afc166..f4a7dac704 100644 --- a/forester/src/queue_helpers.rs +++ b/forester/src/queue_helpers.rs @@ -381,10 +381,12 @@ pub async fn print_state_v2_input_queue_info( for detail in batch_details { println!(" {}", detail); } - println!( - " Total pending NULLIFY operations: {}", + let pending_ops = if parsed.zkp_batch_size > 0 { total_unprocessed / parsed.zkp_batch_size - ); + } else { + 0 + }; + println!(" Total pending NULLIFY operations: {}", pending_ops); Ok(QueueLengthAndCapacity { length: total_unprocessed, diff --git a/program-tests/registry-test/tests/test_late_registration_refinalize.rs b/program-tests/registry-test/tests/test_late_registration_refinalize.rs new file mode 100644 index 0000000000..775bec8cd4 --- /dev/null +++ b/program-tests/registry-test/tests/test_late_registration_refinalize.rs @@ -0,0 +1,196 @@ +use forester_utils::forester_epoch::Epoch; +use light_program_test::{ + program_test::{LightProgramTest, TestRpc}, + ProgramTestConfig, +}; +use light_registry::{ + protocol_config::state::{ProtocolConfig, ProtocolConfigPda}, + sdk::create_finalize_registration_instruction, + utils::get_epoch_pda_address, + EpochPda, ForesterConfig, ForesterEpochPda, +}; +use light_test_utils::{register_test_forester, Rpc}; +use serial_test::serial; +use solana_sdk::{signature::Keypair, signer::Signer}; + +/// Test that a forester registering after finalization can be included +/// via re-finalization. Two foresters: +/// - Forester A registers early, finalizes at active phase start +/// - Forester B registers during active phase (late) +/// - Both re-finalize → total_epoch_weight reflects both +#[serial] +#[tokio::test] +async fn test_late_registration_refinalize() { + let config = ProgramTestConfig { + protocol_config: ProtocolConfig::default(), + with_prover: false, + with_forester: false, + ..Default::default() + }; + let mut rpc = LightProgramTest::new(config).await.unwrap(); + rpc.indexer = None; + let env = rpc.test_accounts.clone(); + + let protocol_config = rpc + .get_anchor_account::(&env.protocol.governance_authority_pda) + .await + .unwrap() + .unwrap() + .config; + + // --- Create and register forester A --- + let forester_a = Keypair::new(); + rpc.airdrop_lamports(&forester_a.pubkey(), 10_000_000_000) + .await + .unwrap(); + register_test_forester( + &mut rpc, + &env.protocol.governance_authority, + &forester_a.pubkey(), + ForesterConfig { fee: 1 }, + ) + .await + .unwrap(); + + // Register forester A for epoch 0 (during registration phase). + let epoch_a = Epoch::register( + &mut rpc, + &protocol_config, + &forester_a, + &forester_a.pubkey(), + Some(0), + ) + .await + .unwrap() + .expect("Forester A should register for epoch 0"); + + // Verify epoch weight = 1 (only A). + let epoch_pda_pubkey = get_epoch_pda_address(0); + let epoch_pda: EpochPda = rpc + .get_anchor_account(&epoch_pda_pubkey) + .await + .unwrap() + .unwrap(); + assert_eq!(epoch_pda.registered_weight, 1, "Only forester A registered"); + + // --- Warp to active phase and finalize A --- + rpc.warp_to_slot(protocol_config.registration_phase_length + protocol_config.genesis_slot) + .unwrap(); + + let ix_a = + create_finalize_registration_instruction(&forester_a.pubkey(), &forester_a.pubkey(), 0); + rpc.create_and_send_transaction(&[ix_a], &forester_a.pubkey(), &[&forester_a]) + .await + .unwrap(); + + // Verify A's total_epoch_weight = 1 (snapshot of registered_weight when only A was registered). + let forester_a_pda: ForesterEpochPda = rpc + .get_anchor_account(&epoch_a.forester_epoch_pda) + .await + .unwrap() + .unwrap(); + assert_eq!( + forester_a_pda.total_epoch_weight, + Some(1), + "After first finalize, total_epoch_weight should be 1" + ); + assert_eq!(forester_a_pda.finalize_counter, 1); + + // --- Register forester B late (during active phase) --- + let forester_b = Keypair::new(); + rpc.airdrop_lamports(&forester_b.pubkey(), 10_000_000_000) + .await + .unwrap(); + register_test_forester( + &mut rpc, + &env.protocol.governance_authority, + &forester_b.pubkey(), + ForesterConfig { fee: 1 }, + ) + .await + .unwrap(); + + // Warp a few more slots into active phase. + rpc.warp_to_slot(protocol_config.registration_phase_length + protocol_config.genesis_slot + 50) + .unwrap(); + + let epoch_b = Epoch::register( + &mut rpc, + &protocol_config, + &forester_b, + &forester_b.pubkey(), + Some(0), + ) + .await + .unwrap() + .expect("Forester B should register for epoch 0 during active phase"); + + // Verify epoch weight = 2 (both A and B). + let epoch_pda: EpochPda = rpc + .get_anchor_account(&epoch_pda_pubkey) + .await + .unwrap() + .unwrap(); + assert_eq!(epoch_pda.registered_weight, 2, "Both foresters registered"); + + // B has not finalized yet. + let forester_b_pda: ForesterEpochPda = rpc + .get_anchor_account(&epoch_b.forester_epoch_pda) + .await + .unwrap() + .unwrap(); + assert_eq!( + forester_b_pda.total_epoch_weight, None, + "Forester B has not finalized yet" + ); + + // --- Re-finalize both foresters --- + // Forester A re-finalizes (updates snapshot to include B). + let ix_a2 = + create_finalize_registration_instruction(&forester_a.pubkey(), &forester_a.pubkey(), 0); + rpc.create_and_send_transaction(&[ix_a2], &forester_a.pubkey(), &[&forester_a]) + .await + .unwrap(); + + let forester_a_pda: ForesterEpochPda = rpc + .get_anchor_account(&epoch_a.forester_epoch_pda) + .await + .unwrap() + .unwrap(); + assert_eq!( + forester_a_pda.total_epoch_weight, + Some(2), + "After re-finalize, A's total_epoch_weight should be 2" + ); + assert_eq!(forester_a_pda.finalize_counter, 2, "A finalized twice"); + + // Forester B finalizes for the first time. + let ix_b = + create_finalize_registration_instruction(&forester_b.pubkey(), &forester_b.pubkey(), 0); + rpc.create_and_send_transaction(&[ix_b], &forester_b.pubkey(), &[&forester_b]) + .await + .unwrap(); + + let forester_b_pda: ForesterEpochPda = rpc + .get_anchor_account(&epoch_b.forester_epoch_pda) + .await + .unwrap() + .unwrap(); + assert_eq!( + forester_b_pda.total_epoch_weight, + Some(2), + "B's total_epoch_weight should also be 2" + ); + assert_eq!(forester_b_pda.finalize_counter, 1); + + // Both foresters now agree on total_epoch_weight = 2. + assert_eq!( + forester_a_pda.total_epoch_weight, forester_b_pda.total_epoch_weight, + "Both foresters should have the same total_epoch_weight" + ); + + // Verify both foresters have correct indices: + // A registered first → index 0, B registered second → index 1. + assert_eq!(forester_a_pda.forester_index, 0); + assert_eq!(forester_b_pda.forester_index, 1); +}