@@ -20,100 +20,96 @@ import (
2020 "github.com/zeta-chain/node/zetaclient/zetacore"
2121)
2222
23- // MaxSignaturesPerTicker is the maximum number of signatures to process on a ticker
24- const MaxSignaturesPerTicker = 100
23+ // maxSignaturesPerTicker is the maximum number of signatures to process on a ticker.
24+ const maxSignaturesPerTicker = 100
2525
26- // ObserveInbound observes the Solana chain for inbounds and post votes to zetacore.
26+ // ObserveInbound observes the Solana chain for inbounds and posts votes to zetacore.
2727func (ob * Observer ) ObserveInbound (ctx context.Context ) error {
28- chainID := ob .Chain ().ChainId
29- pageLimit := repo .DefaultPageLimit
28+ logger := ob .Logger ().Inbound
3029
31- // scan from gateway 1st signature if last scanned tx is absent in the database
32- // the 1st gateway signature is typically the program initialization
30+ // Scan from gateway's 1st signature if last scanned transaction is absent in the database.
31+ // The 1st gateway signature is typically the program initialization.
3332 if ob .LastTxScanned () == "" {
34- lastSig , err := ob .solanaRepo .GetFirstSignatureForAddress (ctx , ob . gatewayID , pageLimit )
33+ sig , err := ob .solanaRepo .GetFirstSignature (ctx )
3534 if err != nil {
36- format := "error GetFirstSignatureForAddress for chain %d address %s"
37- return errors .Wrapf (err , format , chainID , ob .gatewayID )
35+ return err
3836 }
39- ob .WithLastTxScanned (lastSig .String ())
37+ ob .WithLastTxScanned (sig .String ())
4038 }
4139
42- // query last finalized slot
43- lastSlot , errSlot := ob .solanaClient .GetSlot (ctx , rpc .CommitmentFinalized )
40+ // Get last finalized slot.
41+ lastSlot , errSlot := ob .solanaRepo .GetSlot (ctx , rpc .CommitmentFinalized )
4442 if errSlot != nil {
45- ob . Logger (). Inbound . Err (errSlot ).Msg ("unable to get last slot" )
43+ logger . Err (errSlot ).Msg ("failed to get last finalized slot" )
4644 }
4745
48- // get all signatures for the gateway address since last scanned signature
46+ // Get all signatures for the gateway address since the last scanned signature.
4947 lastSig := solana .MustSignatureFromBase58 (ob .LastTxScanned ())
50- signatures , err := ob .solanaRepo .GetSignaturesForAddressUntil (ctx , ob . gatewayID , lastSig , pageLimit )
48+ sigs , err := ob .solanaRepo .GetSignaturesSince (ctx , lastSig )
5149 if err != nil {
52- ob . Logger (). Inbound . Err (err ).Msg ( "error calling GetSignaturesForAddressUntil" )
50+ logger . Err (err ).Send ( )
5351 return err
5452 }
5553
56- // update metrics if no new signatures found
57- if len ( signatures ) == 0 {
54+ if len ( sigs ) == 0 {
55+ // Update metrics if there are no new signatures.
5856 if errSlot == nil {
5957 ob .WithLastBlockScanned (lastSlot )
6058 }
6159 } else {
62- ob . Logger (). Inbound . Info ().Int ("signatures" , len (signatures )).Msg ("got inbound signatures" )
60+ logger . Info ().Int ("signatures" , len (sigs )).Msg ("got inbound signatures" )
6361 }
6462
65- // loop signature from oldest to latest to filter inbound events
66- for i := len (signatures ) - 1 ; i >= 0 ; i -- {
67- sig := signatures [i ]
68- sigString := sig .Signature .String ()
63+ // Iterate over the signatures from oldest to latest to filter inbound events.
64+ for i := len (sigs ) - 1 ; i >= 0 ; i -- {
65+ sig := sigs [i ]
6966
70- // process successfully signature only
67+ // Process only successfull transactions.
7168 if sig .Err == nil {
72- txResult , err := ob .solanaRepo .GetTransaction (ctx , sig .Signature )
73- switch {
74- case errors .Is (err , repo .ErrUnsupportedTxVersion ):
75- ob .Logger ().Inbound .Warn ().
69+ txResult , err := ob .solanaRepo .GetTransaction (ctx , sig .Signature , rpc .CommitmentFinalized )
70+ if errors .Is (err , repo .ErrUnsupportedTxVersion ) {
71+ logger .Warn ().
7672 Stringer ("tx_signature" , sig .Signature ).
77- Msg ("observe inbound: skip unsupported transaction" )
78- // just save the sig to last scanned txs
79- case err != nil :
80- // we have to re-scan this signature on next ticker
81- return errors .Wrapf (err , "error GetTransaction for sig %s" , sigString )
82- default :
83- // filter the events
84- events , err := FilterInboundEvents (txResult , ob .gatewayID , ob .Chain ().ChainId , ob .Logger ().Inbound )
73+ Msg ("skipping unsupported transaction" )
74+ } else if err != nil {
75+ return err
76+ } else {
77+ events , err := FilterInboundEvents (txResult , ob .gatewayID , ob .Chain ().ChainId , logger )
8578 if err != nil {
8679 // Log the error but continue processing other transactions
87- ob . Logger (). Inbound .Error ().
80+ logger .Error ().
8881 Err (err ).
89- Str ("tx_signature" , sigString ).
82+ Stringer ("tx_signature" , sig . Signature ).
9083 Msg ("observe inbound: error filtering events, skipping" )
9184 continue
9285 }
9386
94- // vote on the events
95- if err := ob .VoteInboundEvents (ctx , events , false , false ); err != nil {
96- // return error to retry this transaction
97- return errors .Wrapf (err , "error voting on events for transaction %s, will retry" , sigString )
87+ err = ob .VoteInboundEvents (ctx , events , false , false )
88+ if err != nil {
89+ return errors .Wrapf (err ,
90+ "error voting on events for transaction %s, will retry" ,
91+ sig .Signature .String (),
92+ )
9893 }
9994 }
10095 }
10196
10297 // signature scanned; save last scanned signature to both memory and db, ignore db error
103- if err = ob .SaveLastTxScanned (sigString , sig .Slot ); err != nil {
104- ob .Logger ().Inbound .Error ().
98+ err = ob .SaveLastTxScanned (sig .Signature .String (), sig .Slot )
99+ if err != nil {
100+ logger .Error ().
105101 Err (err ).
106- Str ("tx_signature" , sigString ).
107- Msg ("observe inbound: error saving last sig " )
102+ Stringer ("tx_signature" , sig . Signature ).
103+ Msg ("error saving last signature " )
108104 }
109105
110- ob . Logger (). Inbound .Info ().
111- Str ("tx_signature" , sigString ).
106+ logger .Info ().
107+ Stringer ("tx_signature" , sig . Signature ).
112108 Uint64 ("tx_slot" , sig .Slot ).
113- Msg ("observe inbound: last scanned sig " )
109+ Msg ("last scanned signature " )
114110
115- // take a rest if max signatures per ticker is reached
116- if len (signatures )- i >= MaxSignaturesPerTicker {
111+ // Take a rest if the maximum number of signatures per ticker has been reached.
112+ if len (sigs )- i >= maxSignaturesPerTicker {
117113 break
118114 }
119115 }
@@ -132,7 +128,8 @@ func (ob *Observer) VoteInboundEvents(
132128 msg := ob .BuildInboundVoteMsgFromEvent (event )
133129 if msg != nil {
134130 if fromTracker {
135- metrics .InboundObservationsTrackerTotal .WithLabelValues (ob .Chain ().Name , strconv .FormatBool (isInternalTracker )).
131+ metrics .InboundObservationsTrackerTotal .
132+ WithLabelValues (ob .Chain ().Name , strconv .FormatBool (isInternalTracker )).
136133 Inc ()
137134 } else {
138135 metrics .InboundObservationsBlockScanTotal .WithLabelValues (ob .Chain ().Name ).Inc ()
@@ -164,6 +161,7 @@ func FilterInboundEvents(
164161 senderChainID int64 ,
165162 logger zerolog.Logger ,
166163) ([]* clienttypes.InboundEvent , error ) {
164+
167165 if txResult .Meta .Err != nil {
168166 return nil , errors .Errorf ("transaction failed with error: %v" , txResult .Meta .Err )
169167 }
0 commit comments