@@ -15,7 +15,8 @@ use sea_orm::QueryTrait;
1515use sea_orm:: Set ;
1616use sea_orm:: TransactionTrait ;
1717
18- use self :: parser:: state_update:: StateUpdate ;
18+ use self :: parser:: state_update:: { StateUpdate , SequenceGapError } ;
19+ use self :: rewind_controller:: { RewindController , determine_rewind_slot} ;
1920use self :: persist:: persist_state_update;
2021use self :: persist:: MAX_SQL_INSERTS ;
2122use self :: typedefs:: block_info:: BlockInfo ;
@@ -27,20 +28,43 @@ pub mod fetchers;
2728pub mod indexer;
2829pub mod parser;
2930pub mod persist;
31+ pub mod rewind_controller;
3032pub mod typedefs;
3133
32- fn derive_block_state_update ( block : & BlockInfo ) -> Result < StateUpdate , IngesterError > {
34+ fn derive_block_state_update (
35+ block : & BlockInfo ,
36+ rewind_controller : Option < & RewindController > ,
37+ ) -> Result < StateUpdate , IngesterError > {
3338 let mut state_updates: Vec < StateUpdate > = Vec :: new ( ) ;
3439 for transaction in & block. transactions {
3540 state_updates. push ( parse_transaction ( transaction, block. metadata . slot ) ?) ;
3641 }
37- Ok ( StateUpdate :: merge_updates ( state_updates) )
42+
43+ match StateUpdate :: merge_updates_with_slot ( state_updates, Some ( block. metadata . slot ) ) {
44+ Ok ( merged_update) => Ok ( merged_update) ,
45+ Err ( SequenceGapError :: GapDetected ( gaps) ) => {
46+ if let Some ( controller) = rewind_controller {
47+ let rewind_slot = determine_rewind_slot ( & gaps) ;
48+ let reason = format ! (
49+ "Sequence gaps detected in block {}: {} gaps found" ,
50+ block. metadata. slot,
51+ gaps. len( )
52+ ) ;
53+ controller. request_rewind ( rewind_slot, reason) ?;
54+ }
55+ Err ( IngesterError :: SequenceGapDetected ( gaps) )
56+ }
57+ }
3858}
3959
40- pub async fn index_block ( db : & DatabaseConnection , block : & BlockInfo ) -> Result < ( ) , IngesterError > {
60+ pub async fn index_block (
61+ db : & DatabaseConnection ,
62+ block : & BlockInfo ,
63+ rewind_controller : Option < & RewindController > ,
64+ ) -> Result < ( ) , IngesterError > {
4165 let txn = db. begin ( ) . await ?;
4266 index_block_metadatas ( & txn, vec ! [ & block. metadata] ) . await ?;
43- persist_state_update ( & txn, derive_block_state_update ( block) ?) . await ?;
67+ persist_state_update ( & txn, derive_block_state_update ( block, rewind_controller ) ?) . await ?;
4468 txn. commit ( ) . await ?;
4569 Ok ( ( ) )
4670}
@@ -81,16 +105,37 @@ async fn index_block_metadatas(
81105pub async fn index_block_batch (
82106 db : & DatabaseConnection ,
83107 block_batch : & Vec < BlockInfo > ,
108+ rewind_controller : Option < & RewindController > ,
84109) -> Result < ( ) , IngesterError > {
85110 let blocks_len = block_batch. len ( ) ;
86111 let tx = db. begin ( ) . await ?;
87112 let block_metadatas: Vec < & BlockMetadata > = block_batch. iter ( ) . map ( |b| & b. metadata ) . collect ( ) ;
88113 index_block_metadatas ( & tx, block_metadatas) . await ?;
89114 let mut state_updates = Vec :: new ( ) ;
90115 for block in block_batch {
91- state_updates. push ( derive_block_state_update ( block) ?) ;
116+ state_updates. push ( derive_block_state_update ( block, rewind_controller ) ?) ;
92117 }
93- persist:: persist_state_update ( & tx, StateUpdate :: merge_updates ( state_updates) ) . await ?;
118+
119+ let merged_state_update = match StateUpdate :: merge_updates_with_slot (
120+ state_updates,
121+ Some ( block_batch. last ( ) . unwrap ( ) . metadata . slot )
122+ ) {
123+ Ok ( merged) => merged,
124+ Err ( SequenceGapError :: GapDetected ( gaps) ) => {
125+ if let Some ( controller) = rewind_controller {
126+ let rewind_slot = determine_rewind_slot ( & gaps) ;
127+ let reason = format ! (
128+ "Sequence gaps detected in batch ending at slot {}: {} gaps found" ,
129+ block_batch. last( ) . unwrap( ) . metadata. slot,
130+ gaps. len( )
131+ ) ;
132+ controller. request_rewind ( rewind_slot, reason) ?;
133+ }
134+ return Err ( IngesterError :: SequenceGapDetected ( gaps) ) ;
135+ }
136+ } ;
137+
138+ persist:: persist_state_update ( & tx, merged_state_update) . await ?;
94139 metric ! {
95140 statsd_count!( "blocks_indexed" , blocks_len as i64 ) ;
96141 }
@@ -101,10 +146,16 @@ pub async fn index_block_batch(
101146pub async fn index_block_batch_with_infinite_retries (
102147 db : & DatabaseConnection ,
103148 block_batch : Vec < BlockInfo > ,
149+ rewind_controller : Option < & RewindController > ,
104150) {
105151 loop {
106- match index_block_batch ( db, & block_batch) . await {
152+ match index_block_batch ( db, & block_batch, rewind_controller ) . await {
107153 Ok ( ( ) ) => return ,
154+ Err ( IngesterError :: SequenceGapDetected ( _) ) => {
155+ // For sequence gaps, we don't retry - we let the rewind mechanism handle it
156+ log:: error!( "Sequence gap detected in batch, stopping processing to allow rewind" ) ;
157+ return ;
158+ }
108159 Err ( e) => {
109160 let start_block = block_batch. first ( ) . unwrap ( ) . metadata . slot ;
110161 let end_block = block_batch. last ( ) . unwrap ( ) . metadata . slot ;
0 commit comments