Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 229 additions & 38 deletions forester/src/epoch_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,57 @@ type AddressBatchProcessorMap<R> =
Arc<DashMap<Pubkey, (u64, Arc<Mutex<QueueProcessor<R, AddressTreeStrategy>>>)>>;
type ProcessorInitLockMap = Arc<DashMap<Pubkey, Arc<Mutex<()>>>>;

/// Coordinates re-finalization across parallel `process_queue` tasks when new
/// foresters register mid-epoch. Only one task performs the on-chain
/// `finalize_registration` tx; others wait for it to complete.
#[derive(Debug)]
pub(crate) struct RegistrationTracker {
cached_registered_weight: AtomicU64,
refinalize_in_progress: AtomicBool,
refinalized: tokio::sync::Notify,
}

impl RegistrationTracker {
fn new(weight: u64) -> Self {
Self {
cached_registered_weight: AtomicU64::new(weight),
refinalize_in_progress: AtomicBool::new(false),
refinalized: tokio::sync::Notify::new(),
}
}

fn cached_weight(&self) -> u64 {
self.cached_registered_weight.load(Ordering::Acquire)
}

/// Returns `true` if this caller won the race to perform re-finalization.
fn try_claim_refinalize(&self) -> bool {
self.refinalize_in_progress
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
}

/// Called by the winner after the on-chain tx succeeds.
fn complete_refinalize(&self, new_weight: u64) {
self.cached_registered_weight
.store(new_weight, Ordering::Release);
self.refinalize_in_progress.store(false, Ordering::Release);
self.refinalized.notify_waiters();
}

/// Called by non-winners to block until re-finalization is done.
async fn wait_for_refinalize(&self) {
if !self.refinalize_in_progress.load(Ordering::Acquire) {
return;
}
let fut = self.refinalized.notified();
if !self.refinalize_in_progress.load(Ordering::Acquire) {
return;
}
fut.await;
}
Comment thread
sergeytimoshin marked this conversation as resolved.
}

/// Timing for a single circuit type (circuit inputs + proof generation)
#[derive(Copy, Clone, Debug, Default)]
pub struct CircuitMetrics {
Expand Down Expand Up @@ -225,6 +276,8 @@ pub struct EpochManager<R: Rpc + Indexer> {
address_lookup_tables: Arc<Vec<AddressLookupTableAccount>>,
heartbeat: Arc<ServiceHeartbeat>,
run_id: Arc<str>,
/// Per-epoch registration trackers to coordinate re-finalization when new foresters register mid-epoch
registration_trackers: Arc<DashMap<u64, Arc<RegistrationTracker>>>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

registration_trackers entries are inserted per epoch but never removed. Same applies to processing_epochs (and processed_items_per_epoch_count / processing_metrics_per_epoch HashMaps). Over long-running deployments this is unbounded memory growth.

Consider cleaning up after the epoch completes, e.g. at the end of process_epoch:

self.registration_trackers.remove(&epoch);
self.processing_epochs.remove(&epoch);

}

impl<R: Rpc + Indexer> Clone for EpochManager<R> {
Expand Down Expand Up @@ -255,6 +308,7 @@ impl<R: Rpc + Indexer> Clone for EpochManager<R> {
address_lookup_tables: self.address_lookup_tables.clone(),
heartbeat: self.heartbeat.clone(),
run_id: self.run_id.clone(),
registration_trackers: self.registration_trackers.clone(),
}
}
}
Expand Down Expand Up @@ -305,6 +359,7 @@ impl<R: Rpc + Indexer> EpochManager<R> {
address_lookup_tables,
heartbeat,
run_id: Arc::<str>::from(run_id),
registration_trackers: Arc::new(DashMap::new()),
})
}

Expand Down Expand Up @@ -595,6 +650,16 @@ impl<R: Rpc + Indexer> EpochManager<R> {
epoch_info.trees.push(tree_schedule.clone());

let self_clone = Arc::new(self.clone());
let tracker = self
.registration_trackers
.entry(current_epoch)
.or_insert_with(|| {
Arc::new(RegistrationTracker::new(
epoch_info.epoch_pda.registered_weight,
))
})
.value()
.clone();

info!(
event = "new_tree_processing_task_spawned",
Expand All @@ -608,8 +673,9 @@ impl<R: Rpc + Indexer> EpochManager<R> {
if let Err(e) = self_clone
.process_queue(
&epoch_info.epoch,
&epoch_info.forester_epoch_pda,
epoch_info.forester_epoch_pda.clone(),
tree_schedule,
tracker,
)
.await
{
Expand Down Expand Up @@ -978,31 +1044,6 @@ impl<R: Rpc + Indexer> EpochManager<R> {
.await
{
Ok(info) => info,
Err(ForesterError::Registration(
RegistrationError::RegistrationPhaseEnded {
epoch: failed_epoch,
current_slot,
registration_end,
},
)) => {
let next_epoch = failed_epoch + 1;
let next_phases = get_epoch_phases(&self.protocol_config, next_epoch);
let slots_to_wait =
next_phases.registration.start.saturating_sub(current_slot);

info!(
event = "registration_window_missed",
run_id = %self.run_id,
failed_epoch,
registration_end_slot = registration_end,
current_slot,
next_epoch,
next_registration_start_slot = next_phases.registration.start,
slots_to_wait,
"Too late to register for requested epoch; next epoch will be used"
);
return Ok(());
}
Err(e) => return Err(e.into()),
}
}
Expand Down Expand Up @@ -1476,7 +1517,16 @@ impl<R: Rpc + Indexer> EpochManager<R> {
);

let self_arc = Arc::new(self.clone());
let epoch_info_arc = Arc::new(epoch_info.clone());
let registration_tracker = self
.registration_trackers
.entry(epoch_info.epoch.epoch)
.or_insert_with(|| {
Arc::new(RegistrationTracker::new(
epoch_info.epoch_pda.registered_weight,
))
})
.value()
.clone();
Comment thread
sergeytimoshin marked this conversation as resolved.

let mut handles: Vec<JoinHandle<Result<()>>> = Vec::with_capacity(trees_to_process.len());

Expand All @@ -1491,15 +1541,13 @@ impl<R: Rpc + Indexer> EpochManager<R> {
self.heartbeat.add_tree_tasks_spawned(1);

let self_clone = self_arc.clone();
let epoch_info_clone = epoch_info_arc.clone();
let epoch_clone = epoch_info.epoch.clone();
let forester_epoch_pda = epoch_info.forester_epoch_pda.clone();
let tracker = registration_tracker.clone();

let handle = tokio::spawn(async move {
self_clone
.process_queue(
&epoch_info_clone.epoch,
&epoch_info_clone.forester_epoch_pda,
tree,
)
.process_queue(&epoch_clone, forester_epoch_pda, tree, tracker)
.await
});

Expand Down Expand Up @@ -1559,15 +1607,16 @@ impl<R: Rpc + Indexer> EpochManager<R> {

#[instrument(
level = "debug",
skip(self, epoch_info, epoch_pda, tree_schedule),
skip(self, epoch_info, forester_epoch_pda, tree_schedule, registration_tracker),
fields(forester = %self.config.payer_keypair.pubkey(), epoch = epoch_info.epoch,
tree = %tree_schedule.tree_accounts.merkle_tree)
)]
pub async fn process_queue(
pub(crate) async fn process_queue(
&self,
epoch_info: &Epoch,
epoch_pda: &ForesterEpochPda,
mut forester_epoch_pda: ForesterEpochPda,
mut tree_schedule: TreeForesterSchedule,
registration_tracker: Arc<RegistrationTracker>,
) -> Result<()> {
self.heartbeat.increment_queue_started();
let mut current_slot = self.slot_tracker.estimated_current_slot();
Expand All @@ -1588,6 +1637,9 @@ impl<R: Rpc + Indexer> EpochManager<R> {
"Processing queue for tree"
);

let mut last_weight_check = Instant::now();
const WEIGHT_CHECK_INTERVAL: Duration = Duration::from_secs(30);

'outer_slot_loop: while current_slot < epoch_info.phases.active.end {
let next_slot_to_process = tree_schedule
.slots
Expand All @@ -1600,7 +1652,7 @@ impl<R: Rpc + Indexer> EpochManager<R> {
TreeType::StateV1 | TreeType::AddressV1 | TreeType::Unknown => {
self.process_light_slot(
epoch_info,
epoch_pda,
&forester_epoch_pda,
&tree_schedule.tree_accounts,
&light_slot_details,
)
Expand All @@ -1612,7 +1664,7 @@ impl<R: Rpc + Indexer> EpochManager<R> {
.unwrap_or(light_slot_details.end_solana_slot);
self.process_light_slot_v2(
epoch_info,
epoch_pda,
&forester_epoch_pda,
&tree_schedule.tree_accounts,
&light_slot_details,
consecutive_end,
Expand All @@ -1639,6 +1691,27 @@ impl<R: Rpc + Indexer> EpochManager<R> {
}
}
tree_schedule.slots[slot_idx] = None;

// Periodically check if new foresters registered and re-finalize
if last_weight_check.elapsed() >= WEIGHT_CHECK_INTERVAL {
last_weight_check = Instant::now();
if let Err(e) = self
.maybe_refinalize(
epoch_info,
&mut forester_epoch_pda,
&mut tree_schedule,
&registration_tracker,
)
.await
{
warn!(
event = "refinalize_check_failed",
run_id = %self.run_id,
error = ?e,
"Failed to check/perform re-finalization"
);
}
}
} else {
debug!(
event = "process_queue_no_eligible_slots",
Expand All @@ -1662,6 +1735,124 @@ impl<R: Rpc + Indexer> EpochManager<R> {
Ok(())
}

/// Check if `EpochPda.registered_weight` changed on-chain. If so,
/// one task sends a `finalize_registration` tx while others wait,
/// then all tasks refresh their `ForesterEpochPda` and recompute schedules.
async fn maybe_refinalize(
&self,
epoch_info: &Epoch,
forester_epoch_pda: &mut ForesterEpochPda,
tree_schedule: &mut TreeForesterSchedule,
registration_tracker: &RegistrationTracker,
) -> Result<()> {
let mut rpc = self.rpc_pool.get_connection().await?;
let epoch_pda_address = get_epoch_pda_address(epoch_info.epoch);
let on_chain_epoch_pda: EpochPda = rpc
.get_anchor_account::<EpochPda>(&epoch_pda_address)
.await?
.ok_or_else(|| anyhow!("EpochPda not found for epoch {}", epoch_info.epoch))?;

let on_chain_weight = on_chain_epoch_pda.registered_weight;
let cached_weight = registration_tracker.cached_weight();

if on_chain_weight == cached_weight {
return Ok(());
}

info!(
event = "registered_weight_changed",
run_id = %self.run_id,
epoch = epoch_info.epoch,
old_weight = cached_weight,
new_weight = on_chain_weight,
"Detected new forester registration, re-finalizing"
);

if registration_tracker.try_claim_refinalize() {
// This task sends the finalize_registration tx
let ix = create_finalize_registration_instruction(
&self.config.payer_keypair.pubkey(),
&self.config.derivation_pubkey,
epoch_info.epoch,
);
match rpc
.create_and_send_transaction(
&[ix],
&self.config.payer_keypair.pubkey(),
&[&self.config.payer_keypair],
)
.await
{
Ok(_) => {
// Re-fetch EpochPda after finalize to get authoritative
// post-finalize weight (another forester may have registered
// between our initial read and the finalize tx).
let post_finalize_pda: EpochPda = rpc
.get_anchor_account::<EpochPda>(&epoch_pda_address)
.await?
.ok_or_else(|| {
anyhow!("EpochPda not found for epoch {}", epoch_info.epoch)
})?;
let post_finalize_weight = post_finalize_pda.registered_weight;
info!(
event = "refinalize_registration_success",
run_id = %self.run_id,
epoch = epoch_info.epoch,
new_weight = post_finalize_weight,
"Re-finalized registration on-chain"
);
registration_tracker.complete_refinalize(post_finalize_weight);
}
Err(e) => {
// Release the claim so a future check can retry
registration_tracker.complete_refinalize(cached_weight);
return Err(e.into());
}
}
} else {
// Another task is already re-finalizing; wait for it
registration_tracker.wait_for_refinalize().await;
}

// All tasks: re-fetch both PDAs to get post-finalize on-chain state
// and recompute schedule.
let refreshed_epoch_pda: EpochPda = rpc
.get_anchor_account::<EpochPda>(&epoch_pda_address)
.await?
.ok_or_else(|| anyhow!("EpochPda not found for epoch {}", epoch_info.epoch))?;
let updated_pda: ForesterEpochPda = rpc
.get_anchor_account::<ForesterEpochPda>(&epoch_info.forester_epoch_pda)
.await?
.ok_or_else(|| {
anyhow!(
"ForesterEpochPda not found at {} after re-finalization",
epoch_info.forester_epoch_pda
)
})?;

let current_slot = self.slot_tracker.estimated_current_slot();
let new_schedule = TreeForesterSchedule::new_with_schedule(
&tree_schedule.tree_accounts,
current_slot,
&updated_pda,
&refreshed_epoch_pda,
)?;
Comment thread
sergeytimoshin marked this conversation as resolved.

*forester_epoch_pda = updated_pda;
*tree_schedule = new_schedule;

Comment thread
sergeytimoshin marked this conversation as resolved.
info!(
event = "schedule_recomputed_after_refinalize",
run_id = %self.run_id,
epoch = epoch_info.epoch,
tree = %tree_schedule.tree_accounts.merkle_tree,
new_eligible_slots = tree_schedule.slots.iter().filter(|s| s.is_some()).count(),
"Recomputed schedule after re-finalization"
);

Ok(())
}

#[instrument(
level = "debug",
skip(self, epoch_info, epoch_pda, tree_accounts, forester_slot_details),
Expand Down
7 changes: 0 additions & 7 deletions forester/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,6 @@ pub enum RegistrationError {
registration_start: u64,
},

#[error("Too late to register for epoch {epoch}. Current slot: {current_slot}, Registration end: {registration_end}")]
RegistrationPhaseEnded {
epoch: u64,
current_slot: u64,
registration_end: u64,
},

#[error("Cannot finalize registration for epoch {epoch}. Current slot: {current_slot}, active phase ended: {active_phase_end_slot}")]
FinalizeRegistrationPhaseEnded {
epoch: u64,
Expand Down
Loading