feat: refinalization for late forester registrations#2326
feat: refinalization for late forester registrations#2326sergeytimoshin merged 4 commits intomainfrom
Conversation
📝 WalkthroughWalkthroughAdds a per-epoch RegistrationTracker and integrates it into EpochManager and process_queue to coordinate on-chain re-finalization across parallel tasks, plus related adjustments: removed a registration error variant, refined registration timing, guarded a division-by-zero, and tightened transaction block-height handling. Changes
Sequence DiagramsequenceDiagram
participant Task1 as Task 1 (process_queue)
participant Task2 as Task 2 (process_queue)
participant RT as RegistrationTracker
participant EM as EpochManager
participant Chain as On-chain State
Task1->>RT: cached_weight() (periodic)
Task2->>RT: cached_weight() (periodic)
Chain-->>EM: on-chain registered_weight changed (observed)
Task1->>RT: try_claim_refinalize()
Note over RT: only one caller succeeds
RT-->>Task1: claim=true
RT-->>Task2: claim=false
Task1->>EM: maybe_refinalize()
EM->>Chain: submit finalize_registration tx
EM->>Chain: refresh ForesterEpochPda / EpochPda
EM-->>Task1: updated PDA + new weight
Task1->>RT: complete_refinalize(new_weight)
RT-->>Task1: notify waiters
Task2->>RT: wait_for_refinalize()
RT-->>Task2: notified
Task1->>Task1: recompute TreeForesterSchedule
Task2->>Task2: recompute TreeForesterSchedule
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
c6e7286 to
7581a32
Compare
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@forester/src/epoch_manager.rs`:
- Around line 1798-1803: The schedule is being recomputed with a stale
epoch_pda: TreeForesterSchedule::new_with_schedule(...) consumes
epoch_pda.registered_weight but you pass the old epoch_pda variable instead of
the freshly fetched on-chain EpochPda obtained earlier after weight changes;
update the call to pass the freshly fetched EpochPda (the variable used when
reloading the epoch from chain after detecting weight changes) in place of the
old epoch_pda parameter so the schedule uses the current registered_weight.
- Around line 133-136: wait_for_refinalize can miss a notify if notify_waiters()
runs before notified().await registers; to fix, check the refinalize_in_progress
flag (the same atomic/boolean used by complete_refinalize/notify_waiters) before
creating the notify future and then again immediately after creating it: if
refinalize_in_progress is false return immediately, otherwise create let fut =
self.refinalized.notified(); re-check refinalize_in_progress and if it became
false drop fut and return, otherwise await fut; this ensures you don't block
when the notify already happened. Use the existing symbols wait_for_refinalize,
refinalized, refinalize_in_progress, and complete_refinalize/notify_waiters to
locate and implement the checks.
In `@forester/src/forester_status.rs`:
- Around line 171-176: registration_is_open currently uses
protocol_config_pda.config.is_registration_phase(slot) which enforces the old
phase-gated behavior; instead make openness reflect the relaxed-registration
rule by returning true for the latest registration epoch. Replace the
is_registration_phase(...) check used to compute registration_is_open with logic
that checks whether the slot/epoch corresponds to the config's latest
registration epoch (e.g., compare the epoch derived from slot to
protocol_config_pda.config.latest_register_epoch or equivalent), or otherwise
unconditionally mark registration open for that latest register epoch; update
any uses of registration_is_open accordingly. Ensure you reference and modify
the protocol_config_pda.config.is_registration_phase call and the
registration_is_open binding.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 03c7ed79-0c40-4515-9488-81fd323b7668
⛔ Files ignored due to path filters (2)
external/photonis excluded by none and included by noneprogram-tests/registry-test/tests/test_late_registration_refinalize.rsis excluded by none and included by none
📒 Files selected for processing (4)
forester/src/epoch_manager.rsforester/src/errors.rsforester/src/forester_status.rsforester/src/processor/v1/send_transaction.rs
💤 Files with no reviewable changes (1)
- forester/src/errors.rs
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
forester/src/epoch_manager.rs (1)
1731-1737:⚠️ Potential issue | 🔴 CriticalRefresh
EpochPdaafter finalize before updating cache/schedule.Line 1731 snapshots
EpochPdabefore the finalize tx. If registration changes again before tx execution, Line 1775 caches an old weight and Line 1800 computes schedule from stale epoch weight. That can trigger an unnecessary extra finalize call and consumefinalize_counterbudget.Proposed fix (re-read epoch PDA after finalize/wait)
- let on_chain_epoch_pda: EpochPda = rpc + let observed_epoch_pda: EpochPda = rpc .get_anchor_account::<EpochPda>(&epoch_pda_address) .await? .ok_or_else(|| anyhow!("EpochPda not found for epoch {}", epoch_info.epoch))?; - let on_chain_weight = on_chain_epoch_pda.registered_weight; + let on_chain_weight = observed_epoch_pda.registered_weight; ... if registration_tracker.try_claim_refinalize() { ... match rpc.create_and_send_transaction(...).await { Ok(_) => { + let refreshed_epoch_pda: EpochPda = rpc + .get_anchor_account::<EpochPda>(&epoch_pda_address) + .await? + .ok_or_else(|| anyhow!("EpochPda not found for epoch {}", epoch_info.epoch))?; registration_tracker.complete_refinalize( - on_chain_weight + refreshed_epoch_pda.registered_weight ); } Err(e) => { registration_tracker.complete_refinalize(cached_weight); return Err(e.into()); } } } else { registration_tracker.wait_for_refinalize().await; } + let refreshed_epoch_pda: EpochPda = rpc + .get_anchor_account::<EpochPda>(&epoch_pda_address) + .await? + .ok_or_else(|| anyhow!("EpochPda not found for epoch {}", epoch_info.epoch))?; let new_schedule = TreeForesterSchedule::new_with_schedule( &tree_schedule.tree_accounts, current_slot, &updated_pda, - &on_chain_epoch_pda, + &refreshed_epoch_pda, )?;Also applies to: 1767-1776, 1799-1805
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@forester/src/epoch_manager.rs` around lines 1731 - 1737, The code reads EpochPda into on_chain_epoch_pda before submitting the finalize transaction, which can become stale; after the finalize/wait completes, re-fetch the EpochPda (using rpc.get_anchor_account::<EpochPda>(&epoch_pda_address).await?...ok_or_else(...)) and replace uses of the earlier on_chain_epoch_pda (and on_chain_weight) when updating registration_tracker.cached_weight() and when computing the new schedule so you always compare/update against the post-finalize on-chain state (apply this re-read in the spots around where on_chain_epoch_pda is first read and later where cached_weight and schedule are computed).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@forester/src/epoch_manager.rs`:
- Around line 648-650: Create and use a single epoch-scoped RegistrationTracker
(Arc<RegistrationTracker>) instead of creating multiple instances: instantiate
one Arc<RegistrationTracker> from epoch_info.epoch_pda.registered_weight and
store/declare it in the surrounding epoch scope, then clone and pass that same
Arc into all tree-processing tasks (the late new-tree processing code that
currently creates a tracker at the site using RegistrationTracker::new and the
active-work batch code that creates another tracker around the active-work
loop). Replace the separate new() calls at the noted sites (including the
instances around lines creating trackers in late new-tree processing and the
ones in the active-work batch) with clones of the single epoch-scoped Arc so all
tasks coordinate on the same RegistrationTracker and avoid duplicate
finalizations.
---
Duplicate comments:
In `@forester/src/epoch_manager.rs`:
- Around line 1731-1737: The code reads EpochPda into on_chain_epoch_pda before
submitting the finalize transaction, which can become stale; after the
finalize/wait completes, re-fetch the EpochPda (using
rpc.get_anchor_account::<EpochPda>(&epoch_pda_address).await?...ok_or_else(...))
and replace uses of the earlier on_chain_epoch_pda (and on_chain_weight) when
updating registration_tracker.cached_weight() and when computing the new
schedule so you always compare/update against the post-finalize on-chain state
(apply this re-read in the spots around where on_chain_epoch_pda is first read
and later where cached_weight and schedule are computed).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: c4f91126-ebf5-41b3-ae3e-63b9266ed1f1
📒 Files selected for processing (1)
forester/src/epoch_manager.rs
…ch forester registrations
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@forester/src/epoch_manager.rs`:
- Around line 1817-1843: Guard the schedule recompute so it only runs when the
on-chain finalization is confirmed: after fetching refreshed_epoch_pda and
updated_pda, check a definitive finalization condition on the PDAs (e.g., verify
updated_pda.total_epoch_weight == refreshed_epoch_pda.registered_weight or
another finalized flag) and only call TreeForesterSchedule::new_with_schedule
and overwrite forester_epoch_pda and tree_schedule when that condition is true;
otherwise skip recompute and keep existing forester_epoch_pda/tree_schedule (or
return a controlled error) to avoid rebuilding with mismatched weights.
- Around line 1520-1529: registration_trackers entries are never pruned, causing
unbounded growth; add eviction when you insert/refresh a tracker in the same
code path where you call registration_trackers.entry(epoch_info.epoch.epoch)
(referencing registration_trackers, RegistrationTracker, and
epoch_info.epoch.epoch). Implement a simple retention policy (e.g. define a
MAX_TRACKER_AGE or MAX_TRACKERS constant) and before/after inserting call
self.registration_trackers.retain(|k, _| *k >=
current_epoch.saturating_sub(MAX_TRACKER_AGE)) or remove oldest keys until size
<= MAX_TRACKERS; keep using Arc::new(...) and .clone() for values so
concurrency/ownership is preserved. Ensure the eviction logic runs cheaply and
deterministically in the same method to avoid leaving stale epoch entries.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 8b44426d-5e66-4cd6-867e-fd7b3dab2469
📒 Files selected for processing (1)
forester/src/epoch_manager.rs
ananas-block
left a comment
There was a problem hiding this comment.
Code Review Findings
1. registration_trackers DashMap entries never removed (see inline comment)
registration_trackers and processing_epochs grow by one entry per epoch but entries are never removed. Over long-running deployments this is unbounded memory growth.
Consider removing entries after the epoch's report-work phase completes.
2. Pre-existing (not in this PR): connect() + has_broken() connection churn in rpc_pool.rs
When fallback mode is active and the fallback URL is unreachable but the primary is healthy, connect() falls back to the primary as the second attempt. However, has_broken() sees conn.get_url() == primary_url while is_fallback_active() is true, so bb8 immediately drops the connection. This loops until the recovery probe fires (default 30s).
Suggested fix: when connect() successfully connects to the primary URL as the fallback-of-fallback, call self.health_state.recover_primary() before returning.
3. Pre-existing (not in this PR): unguarded division by zero in queue_helpers.rs
print_state_v2_input_queue_info has total_unprocessed / parsed.zkp_batch_size without checking for zero. The sibling parse_state_v2_queue_info and parse_address_v2_queue_info functions already guard with if parsed.zkp_batch_size > 0.
| heartbeat: Arc<ServiceHeartbeat>, | ||
| run_id: Arc<str>, | ||
| /// Per-epoch registration trackers to coordinate re-finalization when new foresters register mid-epoch | ||
| registration_trackers: Arc<DashMap<u64, Arc<RegistrationTracker>>>, |
There was a problem hiding this comment.
registration_trackers entries are inserted per epoch but never removed. Same applies to processing_epochs (and processed_items_per_epoch_count / processing_metrics_per_epoch HashMaps). Over long-running deployments this is unbounded memory growth.
Consider cleaning up after the epoch completes, e.g. at the end of process_epoch:
self.registration_trackers.remove(&epoch);
self.processing_epochs.remove(&epoch);There was a problem hiding this comment.
♻️ Duplicate comments (2)
forester/src/epoch_manager.rs (2)
1819-1852:⚠️ Potential issue | 🟠 MajorDon’t recompute schedule until finalization state is confirmed consistent on-chain.
After the winner failure path (Line 1819–Line 1823), waiters still continue into recomputation (Line 1832+). That can rebuild slots from
EpochPda.registered_weightwhileForesterEpochPda.total_epoch_weightis still old, causing inconsistent eligibility decisions.Suggested guard before recompute
let refreshed_epoch_pda: EpochPda = rpc .get_anchor_account::<EpochPda>(&epoch_pda_address) .await? .ok_or_else(|| anyhow!("EpochPda not found for epoch {}", epoch_info.epoch))?; let updated_pda: ForesterEpochPda = rpc .get_anchor_account::<ForesterEpochPda>(&epoch_info.forester_epoch_pda) .await? .ok_or_else(|| { anyhow!( "ForesterEpochPda not found at {} after re-finalization", epoch_info.forester_epoch_pda ) })?; + + if updated_pda.total_epoch_weight != Some(refreshed_epoch_pda.registered_weight) { + return Err(anyhow!( + "FinalizeRegistration not reflected on-chain yet for epoch {}", + epoch_info.epoch + )); + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@forester/src/epoch_manager.rs` around lines 1819 - 1852, After waiters wake from registration_tracker.wait_for_refinalize(), do not call TreeForesterSchedule::new_with_schedule immediately; instead poll/refetch the two PDAs (refreshed_epoch_pda and updated_pda) until the on-chain finalization state is consistent (e.g., updated_pda.total_epoch_weight matches refreshed_epoch_pda.registered_weight or another explicit finalized flag) or a timeout/error occurs. Update the code around the existing rpc.get_anchor_account calls and current_slot/new_schedule creation to loop/retry fetching EpochPda and ForesterEpochPda and only construct TreeForesterSchedule::new_with_schedule once the consistency check passes, otherwise return an error after timeout.
1088-1099:⚠️ Potential issue | 🟠 MajorCleanup only runs on the success path; error exits still leak per-epoch state.
Line 1088 cleanup is skipped on earlier
Errreturns (for example around Line 1059/Line 1063/Line 1072), soprocessing_epochs/registration_trackersand metric maps can still grow unbounded under recurring failures.Suggested fix (always-run cleanup path)
- // Perform work - if self.sync_slot().await? < phases.active.end { - self.perform_active_work(®istration_info).await?; - } - // Wait for report work phase - if self.sync_slot().await? < phases.report_work.start { - self.wait_for_report_work_phase(®istration_info).await?; - } - - // Always send metrics report to channel for monitoring/testing - self.send_work_report(®istration_info).await?; - - if self.sync_slot().await? < phases.report_work.end { - self.report_work_onchain(®istration_info).await?; - } else { - ... - } - - self.registration_trackers.remove(&epoch); - self.processing_epochs.remove(&epoch); - self.processed_items_per_epoch_count.lock().await.remove(&epoch); - self.processing_metrics_per_epoch.lock().await.remove(&epoch); - Ok(()) + let result: Result<()> = async { + if self.sync_slot().await? < phases.active.end { + self.perform_active_work(®istration_info).await?; + } + if self.sync_slot().await? < phases.report_work.start { + self.wait_for_report_work_phase(®istration_info).await?; + } + self.send_work_report(®istration_info).await?; + if self.sync_slot().await? < phases.report_work.end { + self.report_work_onchain(®istration_info).await?; + } + Ok(()) + } + .await; + + self.registration_trackers.remove(&epoch); + self.processing_epochs.remove(&epoch); + self.processed_items_per_epoch_count.lock().await.remove(&epoch); + self.processing_metrics_per_epoch.lock().await.remove(&epoch); + result🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@forester/src/epoch_manager.rs` around lines 1088 - 1099, The per-epoch cleanup (removing keys from registration_trackers, processing_epochs, processed_items_per_epoch_count, and processing_metrics_per_epoch) currently only runs on the success path and is skipped by earlier Err returns; refactor the function to guarantee cleanup always runs (e.g., move the removal logic into a single finally/cleanup section executed on all exits or wrap the main body in a scope with a Drop-style guard) so that registration_trackers.remove(&epoch), processing_epochs.remove(&epoch), processed_items_per_epoch_count.lock().await.remove(&epoch) and processing_metrics_per_epoch.lock().await.remove(&epoch) are executed even when the function returns Err. Ensure any locks are awaited and released correctly in the new cleanup path and adjust returns to occur after cleanup or rethrow the error after cleanup.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@forester/src/epoch_manager.rs`:
- Around line 1819-1852: After waiters wake from
registration_tracker.wait_for_refinalize(), do not call
TreeForesterSchedule::new_with_schedule immediately; instead poll/refetch the
two PDAs (refreshed_epoch_pda and updated_pda) until the on-chain finalization
state is consistent (e.g., updated_pda.total_epoch_weight matches
refreshed_epoch_pda.registered_weight or another explicit finalized flag) or a
timeout/error occurs. Update the code around the existing rpc.get_anchor_account
calls and current_slot/new_schedule creation to loop/retry fetching EpochPda and
ForesterEpochPda and only construct TreeForesterSchedule::new_with_schedule once
the consistency check passes, otherwise return an error after timeout.
- Around line 1088-1099: The per-epoch cleanup (removing keys from
registration_trackers, processing_epochs, processed_items_per_epoch_count, and
processing_metrics_per_epoch) currently only runs on the success path and is
skipped by earlier Err returns; refactor the function to guarantee cleanup
always runs (e.g., move the removal logic into a single finally/cleanup section
executed on all exits or wrap the main body in a scope with a Drop-style guard)
so that registration_trackers.remove(&epoch), processing_epochs.remove(&epoch),
processed_items_per_epoch_count.lock().await.remove(&epoch) and
processing_metrics_per_epoch.lock().await.remove(&epoch) are executed even when
the function returns Err. Ensure any locks are awaited and released correctly in
the new cleanup path and adjust returns to occur after cleanup or rethrow the
error after cleanup.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: b6579c51-f17b-4c70-bc9e-91145faa5b07
⛔ Files ignored due to path filters (1)
forester-utils/src/rpc_pool.rsis excluded by none and included by none
📒 Files selected for processing (2)
forester/src/epoch_manager.rsforester/src/queue_helpers.rs
Summary by CodeRabbit
New Features
Bug Fixes
Refactor
Improvements