@@ -93,6 +93,49 @@ type AddressBatchProcessorMap<R> =
9393 Arc < DashMap < Pubkey , ( u64 , Arc < Mutex < QueueProcessor < R , AddressTreeStrategy > > > ) > > ;
9494type ProcessorInitLockMap = Arc < DashMap < Pubkey , Arc < Mutex < ( ) > > > > ;
9595
96+ /// Coordinates re-finalization across parallel `process_queue` tasks when new
97+ /// foresters register mid-epoch. Only one task performs the on-chain
98+ /// `finalize_registration` tx; others wait for it to complete.
99+ pub ( crate ) struct RegistrationTracker {
100+ cached_registered_weight : AtomicU64 ,
101+ refinalize_in_progress : AtomicBool ,
102+ refinalized : tokio:: sync:: Notify ,
103+ }
104+
105+ impl RegistrationTracker {
106+ fn new ( weight : u64 ) -> Self {
107+ Self {
108+ cached_registered_weight : AtomicU64 :: new ( weight) ,
109+ refinalize_in_progress : AtomicBool :: new ( false ) ,
110+ refinalized : tokio:: sync:: Notify :: new ( ) ,
111+ }
112+ }
113+
114+ fn cached_weight ( & self ) -> u64 {
115+ self . cached_registered_weight . load ( Ordering :: Acquire )
116+ }
117+
118+ /// Returns `true` if this caller won the race to perform re-finalization.
119+ fn try_claim_refinalize ( & self ) -> bool {
120+ self . refinalize_in_progress
121+ . compare_exchange ( false , true , Ordering :: AcqRel , Ordering :: Acquire )
122+ . is_ok ( )
123+ }
124+
125+ /// Called by the winner after the on-chain tx succeeds.
126+ fn complete_refinalize ( & self , new_weight : u64 ) {
127+ self . cached_registered_weight
128+ . store ( new_weight, Ordering :: Release ) ;
129+ self . refinalize_in_progress . store ( false , Ordering :: Release ) ;
130+ self . refinalized . notify_waiters ( ) ;
131+ }
132+
133+ /// Called by non-winners to block until re-finalization is done.
134+ async fn wait_for_refinalize ( & self ) {
135+ self . refinalized . notified ( ) . await ;
136+ }
137+ }
138+
96139/// Timing for a single circuit type (circuit inputs + proof generation)
97140#[ derive( Copy , Clone , Debug , Default ) ]
98141pub struct CircuitMetrics {
@@ -595,6 +638,9 @@ impl<R: Rpc + Indexer> EpochManager<R> {
595638 epoch_info. trees . push ( tree_schedule. clone ( ) ) ;
596639
597640 let self_clone = Arc :: new ( self . clone ( ) ) ;
641+ let tracker = Arc :: new ( RegistrationTracker :: new (
642+ epoch_info. epoch_pda . registered_weight ,
643+ ) ) ;
598644
599645 info ! (
600646 event = "new_tree_processing_task_spawned" ,
@@ -608,8 +654,10 @@ impl<R: Rpc + Indexer> EpochManager<R> {
608654 if let Err ( e) = self_clone
609655 . process_queue (
610656 & epoch_info. epoch ,
611- & epoch_info. forester_epoch_pda ,
657+ epoch_info. forester_epoch_pda . clone ( ) ,
612658 tree_schedule,
659+ epoch_info. epoch_pda . clone ( ) ,
660+ tracker,
613661 )
614662 . await
615663 {
@@ -978,31 +1026,6 @@ impl<R: Rpc + Indexer> EpochManager<R> {
9781026 . await
9791027 {
9801028 Ok ( info) => info,
981- Err ( ForesterError :: Registration (
982- RegistrationError :: RegistrationPhaseEnded {
983- epoch : failed_epoch,
984- current_slot,
985- registration_end,
986- } ,
987- ) ) => {
988- let next_epoch = failed_epoch + 1 ;
989- let next_phases = get_epoch_phases ( & self . protocol_config , next_epoch) ;
990- let slots_to_wait =
991- next_phases. registration . start . saturating_sub ( current_slot) ;
992-
993- info ! (
994- event = "registration_window_missed" ,
995- run_id = %self . run_id,
996- failed_epoch,
997- registration_end_slot = registration_end,
998- current_slot,
999- next_epoch,
1000- next_registration_start_slot = next_phases. registration. start,
1001- slots_to_wait,
1002- "Too late to register for requested epoch; next epoch will be used"
1003- ) ;
1004- return Ok ( ( ) ) ;
1005- }
10061029 Err ( e) => return Err ( e. into ( ) ) ,
10071030 }
10081031 }
@@ -1476,7 +1499,9 @@ impl<R: Rpc + Indexer> EpochManager<R> {
14761499 ) ;
14771500
14781501 let self_arc = Arc :: new ( self . clone ( ) ) ;
1479- let epoch_info_arc = Arc :: new ( epoch_info. clone ( ) ) ;
1502+ let registration_tracker = Arc :: new ( RegistrationTracker :: new (
1503+ epoch_info. epoch_pda . registered_weight ,
1504+ ) ) ;
14801505
14811506 let mut handles: Vec < JoinHandle < Result < ( ) > > > = Vec :: with_capacity ( trees_to_process. len ( ) ) ;
14821507
@@ -1491,15 +1516,14 @@ impl<R: Rpc + Indexer> EpochManager<R> {
14911516 self . heartbeat . add_tree_tasks_spawned ( 1 ) ;
14921517
14931518 let self_clone = self_arc. clone ( ) ;
1494- let epoch_info_clone = epoch_info_arc. clone ( ) ;
1519+ let epoch_clone = epoch_info. epoch . clone ( ) ;
1520+ let forester_epoch_pda = epoch_info. forester_epoch_pda . clone ( ) ;
1521+ let epoch_pda = epoch_info. epoch_pda . clone ( ) ;
1522+ let tracker = registration_tracker. clone ( ) ;
14951523
14961524 let handle = tokio:: spawn ( async move {
14971525 self_clone
1498- . process_queue (
1499- & epoch_info_clone. epoch ,
1500- & epoch_info_clone. forester_epoch_pda ,
1501- tree,
1502- )
1526+ . process_queue ( & epoch_clone, forester_epoch_pda, tree, epoch_pda, tracker)
15031527 . await
15041528 } ) ;
15051529
@@ -1559,15 +1583,17 @@ impl<R: Rpc + Indexer> EpochManager<R> {
15591583
15601584 #[ instrument(
15611585 level = "debug" ,
1562- skip( self , epoch_info, epoch_pda , tree_schedule) ,
1586+ skip( self , epoch_info, forester_epoch_pda , tree_schedule, epoch_pda , registration_tracker ) ,
15631587 fields( forester = %self . config. payer_keypair. pubkey( ) , epoch = epoch_info. epoch,
15641588 tree = %tree_schedule. tree_accounts. merkle_tree)
15651589 ) ]
1566- pub async fn process_queue (
1590+ pub ( crate ) async fn process_queue (
15671591 & self ,
15681592 epoch_info : & Epoch ,
1569- epoch_pda : & ForesterEpochPda ,
1593+ mut forester_epoch_pda : ForesterEpochPda ,
15701594 mut tree_schedule : TreeForesterSchedule ,
1595+ epoch_pda : EpochPda ,
1596+ registration_tracker : Arc < RegistrationTracker > ,
15711597 ) -> Result < ( ) > {
15721598 self . heartbeat . increment_queue_started ( ) ;
15731599 let mut current_slot = self . slot_tracker . estimated_current_slot ( ) ;
@@ -1588,6 +1614,9 @@ impl<R: Rpc + Indexer> EpochManager<R> {
15881614 "Processing queue for tree"
15891615 ) ;
15901616
1617+ let mut last_weight_check = Instant :: now ( ) ;
1618+ const WEIGHT_CHECK_INTERVAL : Duration = Duration :: from_secs ( 30 ) ;
1619+
15911620 ' outer_slot_loop: while current_slot < epoch_info. phases . active . end {
15921621 let next_slot_to_process = tree_schedule
15931622 . slots
@@ -1600,7 +1629,7 @@ impl<R: Rpc + Indexer> EpochManager<R> {
16001629 TreeType :: StateV1 | TreeType :: AddressV1 | TreeType :: Unknown => {
16011630 self . process_light_slot (
16021631 epoch_info,
1603- epoch_pda ,
1632+ & forester_epoch_pda ,
16041633 & tree_schedule. tree_accounts ,
16051634 & light_slot_details,
16061635 )
@@ -1612,7 +1641,7 @@ impl<R: Rpc + Indexer> EpochManager<R> {
16121641 . unwrap_or ( light_slot_details. end_solana_slot ) ;
16131642 self . process_light_slot_v2 (
16141643 epoch_info,
1615- epoch_pda ,
1644+ & forester_epoch_pda ,
16161645 & tree_schedule. tree_accounts ,
16171646 & light_slot_details,
16181647 consecutive_end,
@@ -1639,6 +1668,28 @@ impl<R: Rpc + Indexer> EpochManager<R> {
16391668 }
16401669 }
16411670 tree_schedule. slots [ slot_idx] = None ;
1671+
1672+ // Periodically check if new foresters registered and re-finalize
1673+ if last_weight_check. elapsed ( ) >= WEIGHT_CHECK_INTERVAL {
1674+ last_weight_check = Instant :: now ( ) ;
1675+ if let Err ( e) = self
1676+ . maybe_refinalize (
1677+ epoch_info,
1678+ & mut forester_epoch_pda,
1679+ & mut tree_schedule,
1680+ & epoch_pda,
1681+ & registration_tracker,
1682+ )
1683+ . await
1684+ {
1685+ warn ! (
1686+ event = "refinalize_check_failed" ,
1687+ run_id = %self . run_id,
1688+ error = ?e,
1689+ "Failed to check/perform re-finalization"
1690+ ) ;
1691+ }
1692+ }
16421693 } else {
16431694 debug ! (
16441695 event = "process_queue_no_eligible_slots" ,
@@ -1662,6 +1713,110 @@ impl<R: Rpc + Indexer> EpochManager<R> {
16621713 Ok ( ( ) )
16631714 }
16641715
1716+ /// Check if `EpochPda.registered_weight` changed on-chain. If so,
1717+ /// one task sends a `finalize_registration` tx while others wait,
1718+ /// then all tasks refresh their `ForesterEpochPda` and recompute schedules.
1719+ async fn maybe_refinalize (
1720+ & self ,
1721+ epoch_info : & Epoch ,
1722+ forester_epoch_pda : & mut ForesterEpochPda ,
1723+ tree_schedule : & mut TreeForesterSchedule ,
1724+ epoch_pda : & EpochPda ,
1725+ registration_tracker : & RegistrationTracker ,
1726+ ) -> Result < ( ) > {
1727+ let mut rpc = self . rpc_pool . get_connection ( ) . await ?;
1728+ let epoch_pda_address = get_epoch_pda_address ( epoch_info. epoch ) ;
1729+ let on_chain_epoch_pda: EpochPda = rpc
1730+ . get_anchor_account :: < EpochPda > ( & epoch_pda_address)
1731+ . await ?
1732+ . ok_or_else ( || anyhow ! ( "EpochPda not found for epoch {}" , epoch_info. epoch) ) ?;
1733+
1734+ let on_chain_weight = on_chain_epoch_pda. registered_weight ;
1735+ let cached_weight = registration_tracker. cached_weight ( ) ;
1736+
1737+ if on_chain_weight == cached_weight {
1738+ return Ok ( ( ) ) ;
1739+ }
1740+
1741+ info ! (
1742+ event = "registered_weight_changed" ,
1743+ run_id = %self . run_id,
1744+ epoch = epoch_info. epoch,
1745+ old_weight = cached_weight,
1746+ new_weight = on_chain_weight,
1747+ "Detected new forester registration, re-finalizing"
1748+ ) ;
1749+
1750+ if registration_tracker. try_claim_refinalize ( ) {
1751+ // This task sends the finalize_registration tx
1752+ let ix = create_finalize_registration_instruction (
1753+ & self . config . payer_keypair . pubkey ( ) ,
1754+ & self . config . derivation_pubkey ,
1755+ epoch_info. epoch ,
1756+ ) ;
1757+ match rpc
1758+ . create_and_send_transaction (
1759+ & [ ix] ,
1760+ & self . config . payer_keypair . pubkey ( ) ,
1761+ & [ & self . config . payer_keypair ] ,
1762+ )
1763+ . await
1764+ {
1765+ Ok ( _) => {
1766+ info ! (
1767+ event = "refinalize_registration_success" ,
1768+ run_id = %self . run_id,
1769+ epoch = epoch_info. epoch,
1770+ new_weight = on_chain_weight,
1771+ "Re-finalized registration on-chain"
1772+ ) ;
1773+ registration_tracker. complete_refinalize ( on_chain_weight) ;
1774+ }
1775+ Err ( e) => {
1776+ // Release the claim so a future check can retry
1777+ registration_tracker. complete_refinalize ( cached_weight) ;
1778+ return Err ( e. into ( ) ) ;
1779+ }
1780+ }
1781+ } else {
1782+ // Another task is already re-finalizing; wait for it
1783+ registration_tracker. wait_for_refinalize ( ) . await ;
1784+ }
1785+
1786+ // All tasks: refresh ForesterEpochPda and recompute schedule
1787+ let updated_pda: ForesterEpochPda = rpc
1788+ . get_anchor_account :: < ForesterEpochPda > ( & epoch_info. forester_epoch_pda )
1789+ . await ?
1790+ . ok_or_else ( || {
1791+ anyhow ! (
1792+ "ForesterEpochPda not found at {} after re-finalization" ,
1793+ epoch_info. forester_epoch_pda
1794+ )
1795+ } ) ?;
1796+
1797+ let current_slot = self . slot_tracker . estimated_current_slot ( ) ;
1798+ let new_schedule = TreeForesterSchedule :: new_with_schedule (
1799+ & tree_schedule. tree_accounts ,
1800+ current_slot,
1801+ & updated_pda,
1802+ epoch_pda,
1803+ ) ?;
1804+
1805+ * forester_epoch_pda = updated_pda;
1806+ * tree_schedule = new_schedule;
1807+
1808+ info ! (
1809+ event = "schedule_recomputed_after_refinalize" ,
1810+ run_id = %self . run_id,
1811+ epoch = epoch_info. epoch,
1812+ tree = %tree_schedule. tree_accounts. merkle_tree,
1813+ new_eligible_slots = tree_schedule. slots. iter( ) . filter( |s| s. is_some( ) ) . count( ) ,
1814+ "Recomputed schedule after re-finalization"
1815+ ) ;
1816+
1817+ Ok ( ( ) )
1818+ }
1819+
16651820 #[ instrument(
16661821 level = "debug" ,
16671822 skip( self , epoch_info, epoch_pda, tree_accounts, forester_slot_details) ,
0 commit comments