Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ Also EnableSolanaAddressLookupTable feature flag should be set.
* [4356](https://github.com/zeta-chain/node/pull/4356) - rename protocol contract imports to `protocol-contracts-evm`
* [4361](https://github.com/zeta-chain/node/pull/4361) - add basic validation on zetaclient config file
* [4391](https://github.com/zeta-chain/node/pull/4391) - change client mode config
* [4422](https://github.com/zeta-chain/node/pull/4422) - finish the Solana repository implementation


### Fixes

Expand Down
4 changes: 2 additions & 2 deletions cmd/zetatool/cctx/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,14 +297,14 @@ func (c *TrackingDetails) solanaInboundBallotIdentifier(ctx *context.Context) er
if solClient == nil {
return fmt.Errorf("error creating rpc client")
}
solRepo := solrepo.New(solClient)
solRepo := solrepo.New(solClient, solana.PublicKey{} /* unused */)

signature, err := solana.SignatureFromBase58(inboundHash)
if err != nil {
return fmt.Errorf("error parsing signature: %w", err)
}

txResult, err := solRepo.GetTransaction(goCtx, signature)
txResult, err := solRepo.GetTransaction(goCtx, signature, solrpc.CommitmentFinalized)
if err != nil {
return fmt.Errorf("error getting transaction: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/zetatool/cctx/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ func (c *TrackingDetails) checkSolanaOutboundTx(ctx *context.Context) error {
if solClient == nil {
return fmt.Errorf("error creating rpc client")
}
solRepo := solrepo.New(solClient)
solRepo := solrepo.New(solClient, solana.PublicKey{} /* unused */)

for _, hash := range txHashList {
signature := solana.MustSignatureFromBase58(hash)
_, err := solRepo.GetTransaction(goCtx, signature)
_, err := solRepo.GetTransaction(goCtx, signature, solrpc.CommitmentFinalized)
if err != nil {
continue
}
Expand Down
15 changes: 0 additions & 15 deletions zetaclient/chains/solana/client.go

This file was deleted.

24 changes: 24 additions & 0 deletions zetaclient/chains/solana/observer/gas.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package observer

import (
"context"

"github.com/gagliardetto/solana-go/rpc"
)

// PostGasPrice posts gas prices to zetacore.
func (ob *Observer) PostGasPrice(ctx context.Context) error {
priorityFee, err := ob.solanaRepo.GetPriorityFee(ctx)
if err != nil {
return err
}

slot, err := ob.solanaRepo.GetSlot(ctx, rpc.CommitmentConfirmed)
if err != nil {
return err
}

// There is no Ethereum-like gas price in Solana, so we only post the priority fee.
_, err = ob.ZetaRepo().VoteGasPrice(ctx, ob.Logger().Chain, 1, priorityFee, slot)
return err
}
112 changes: 112 additions & 0 deletions zetaclient/chains/solana/observer/gas_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package observer

import (
"context"
"testing"

"github.com/gagliardetto/solana-go/rpc"
"github.com/pkg/errors"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/zeta-chain/node/pkg/chains"
"github.com/zeta-chain/node/testutil/sample"
"github.com/zeta-chain/node/zetaclient/chains/base"
"github.com/zeta-chain/node/zetaclient/chains/zrepo"
"github.com/zeta-chain/node/zetaclient/db"
"github.com/zeta-chain/node/zetaclient/mode"
"github.com/zeta-chain/node/zetaclient/testutils/mocks"
)

func createObserver(t *testing.T,
zetacoreClient zrepo.ZetacoreClient,
solanaRepo SolanaRepo,
) *Observer {
chain := chains.SolanaDevnet
chainParams := *sample.ChainParams(chain.ChainId)
chainParams.GatewayAddress = sample.SolanaAddress(t)
zetaRepo := zrepo.New(zetacoreClient, chain, mode.StandardMode)

db, err := db.NewFromSqliteInMemory(true)
require.NoError(t, err)

logger := base.DefaultLogger()

base, err := base.NewObserver(chain, chainParams, zetaRepo, nil, 1000, nil, db, logger)
require.NoError(t, err)

observer, err := New(base, nil, chainParams.GatewayAddress)
require.NoError(t, err)
observer.solanaRepo = solanaRepo

return observer
}

func TestPostGasPrice(t *testing.T) {
anything := mock.Anything

priorityFee := uint64(5)
slot := uint64(100)

t.Run("Ok", func(t *testing.T) {
solanaRepo := mocks.NewSolanaRepo(t)
solanaRepo.On("GetPriorityFee", anything).Return(priorityFee, nil)
solanaRepo.On("GetSlot", anything, rpc.CommitmentConfirmed).Return(slot, nil)

zetacoreClient := mocks.NewZetacoreClient(t)
zetacoreClient.
On("PostVoteGasPrice", anything, anything, anything, priorityFee, slot).
Return(anything, nil)

ob := createObserver(t, zetacoreClient, solanaRepo)
err := ob.PostGasPrice(context.Background())
require.NoError(t, err)
})

t.Run("Error", func(t *testing.T) {
errTest := errors.New("test error")

t.Run("GetPriorityFee", func(t *testing.T) {
solanaRepo := mocks.NewSolanaRepo(t)
solanaRepo.On("GetPriorityFee", anything).Return(uint64(0), errTest)

zetacoreClient := mocks.NewZetacoreClient(t)

ob := createObserver(t, zetacoreClient, solanaRepo)
err := ob.PostGasPrice(context.Background())

require.Error(t, err)
require.ErrorIs(t, err, errTest)
})

t.Run("GetSlot", func(t *testing.T) {
solanaRepo := mocks.NewSolanaRepo(t)
solanaRepo.On("GetPriorityFee", anything).Return(priorityFee, nil)
solanaRepo.On("GetSlot", anything, rpc.CommitmentConfirmed).Return(uint64(0), errTest)

zetacoreClient := mocks.NewZetacoreClient(t)

ob := createObserver(t, zetacoreClient, solanaRepo)
err := ob.PostGasPrice(context.Background())

require.Error(t, err)
require.ErrorIs(t, err, errTest)
})

t.Run("VoteGasPrice", func(t *testing.T) {
solanaRepo := mocks.NewSolanaRepo(t)
solanaRepo.On("GetPriorityFee", anything).Return(priorityFee, nil)
solanaRepo.On("GetSlot", anything, rpc.CommitmentConfirmed).Return(slot, nil)

zetacoreClient := mocks.NewZetacoreClient(t)
zetacoreClient.
On("PostVoteGasPrice", anything, anything, anything, priorityFee, slot).
Return(anything, errTest)

ob := createObserver(t, zetacoreClient, solanaRepo)
err := ob.PostGasPrice(context.Background())

require.Error(t, err)
require.ErrorIs(t, err, errTest)
})
})
}
104 changes: 51 additions & 53 deletions zetaclient/chains/solana/observer/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,100 +20,96 @@
"github.com/zeta-chain/node/zetaclient/zetacore"
)

// MaxSignaturesPerTicker is the maximum number of signatures to process on a ticker
const MaxSignaturesPerTicker = 100
// maxSignaturesPerTicker is the maximum number of signatures to process on a ticker.
const maxSignaturesPerTicker = 100

// ObserveInbound observes the Solana chain for inbounds and post votes to zetacore.
// ObserveInbound observes the Solana chain for inbounds and posts votes to zetacore.
func (ob *Observer) ObserveInbound(ctx context.Context) error {
chainID := ob.Chain().ChainId
pageLimit := repo.DefaultPageLimit
logger := ob.Logger().Inbound

// scan from gateway 1st signature if last scanned tx is absent in the database
// the 1st gateway signature is typically the program initialization
// Scan from gateway's 1st signature if last scanned transaction is absent in the database.
// The 1st gateway signature is typically the program initialization.
if ob.LastTxScanned() == "" {
lastSig, err := ob.solanaRepo.GetFirstSignatureForAddress(ctx, ob.gatewayID, pageLimit)
sig, err := ob.solanaRepo.GetFirstSignature(ctx)
if err != nil {
format := "error GetFirstSignatureForAddress for chain %d address %s"
return errors.Wrapf(err, format, chainID, ob.gatewayID)
return err
}
ob.WithLastTxScanned(lastSig.String())
ob.WithLastTxScanned(sig.String())
}

// query last finalized slot
lastSlot, errSlot := ob.solanaClient.GetSlot(ctx, rpc.CommitmentFinalized)
// Get last finalized slot.
lastSlot, errSlot := ob.solanaRepo.GetSlot(ctx, rpc.CommitmentFinalized)
if errSlot != nil {
ob.Logger().Inbound.Err(errSlot).Msg("unable to get last slot")
logger.Err(errSlot).Msg("failed to get last finalized slot")
}

// get all signatures for the gateway address since last scanned signature
// Get all signatures for the gateway address since the last scanned signature.
lastSig := solana.MustSignatureFromBase58(ob.LastTxScanned())
signatures, err := ob.solanaRepo.GetSignaturesForAddressUntil(ctx, ob.gatewayID, lastSig, pageLimit)
sigs, err := ob.solanaRepo.GetSignaturesSince(ctx, lastSig)
if err != nil {
ob.Logger().Inbound.Err(err).Msg("error calling GetSignaturesForAddressUntil")
logger.Err(err).Send()
return err
}

// update metrics if no new signatures found
if len(signatures) == 0 {
if len(sigs) == 0 {
// Update metrics if there are no new signatures.
if errSlot == nil {
ob.WithLastBlockScanned(lastSlot)
}
} else {
ob.Logger().Inbound.Info().Int("signatures", len(signatures)).Msg("got inbound signatures")
logger.Info().Int("signatures", len(sigs)).Msg("got inbound signatures")
}

// loop signature from oldest to latest to filter inbound events
for i := len(signatures) - 1; i >= 0; i-- {
sig := signatures[i]
sigString := sig.Signature.String()
// Iterate over the signatures from oldest to latest to filter inbound events.
for i := len(sigs) - 1; i >= 0; i-- {
sig := sigs[i]

// process successfully signature only
// Process only successfull transactions.

Check failure on line 67 in zetaclient/chains/solana/observer/inbound.go

View workflow job for this annotation

GitHub Actions / lint

`successfull` is a misspelling of `successful` (misspell)
if sig.Err == nil {
txResult, err := ob.solanaRepo.GetTransaction(ctx, sig.Signature)
switch {
case errors.Is(err, repo.ErrUnsupportedTxVersion):
ob.Logger().Inbound.Warn().
txResult, err := ob.solanaRepo.GetTransaction(ctx, sig.Signature, rpc.CommitmentFinalized)
if errors.Is(err, repo.ErrUnsupportedTxVersion) {
logger.Warn().
Stringer("tx_signature", sig.Signature).
Msg("observe inbound: skip unsupported transaction")
// just save the sig to last scanned txs
case err != nil:
// we have to re-scan this signature on next ticker
return errors.Wrapf(err, "error GetTransaction for sig %s", sigString)
default:
// filter the events
events, err := FilterInboundEvents(txResult, ob.gatewayID, ob.Chain().ChainId, ob.Logger().Inbound)
Msg("skipping unsupported transaction")
} else if err != nil {
return err
} else {
events, err := FilterInboundEvents(txResult, ob.gatewayID, ob.Chain().ChainId, logger)
if err != nil {
// Log the error but continue processing other transactions
ob.Logger().Inbound.Error().
logger.Error().
Err(err).
Str("tx_signature", sigString).
Stringer("tx_signature", sig.Signature).
Msg("observe inbound: error filtering events, skipping")
continue
}

// vote on the events
if err := ob.VoteInboundEvents(ctx, events, false, false); err != nil {
// return error to retry this transaction
return errors.Wrapf(err, "error voting on events for transaction %s, will retry", sigString)
err = ob.VoteInboundEvents(ctx, events, false, false)
if err != nil {
return errors.Wrapf(err,
"error voting on events for transaction %s, will retry",
sig.Signature.String(),
)
}
}
}

// signature scanned; save last scanned signature to both memory and db, ignore db error
if err = ob.SaveLastTxScanned(sigString, sig.Slot); err != nil {
ob.Logger().Inbound.Error().
err = ob.SaveLastTxScanned(sig.Signature.String(), sig.Slot)
if err != nil {
logger.Error().
Err(err).
Str("tx_signature", sigString).
Msg("observe inbound: error saving last sig")
Stringer("tx_signature", sig.Signature).
Msg("error saving last signature")
}

ob.Logger().Inbound.Info().
Str("tx_signature", sigString).
logger.Info().
Stringer("tx_signature", sig.Signature).
Uint64("tx_slot", sig.Slot).
Msg("observe inbound: last scanned sig")
Msg("last scanned signature")

// take a rest if max signatures per ticker is reached
if len(signatures)-i >= MaxSignaturesPerTicker {
// Take a rest if the maximum number of signatures per ticker has been reached.
if len(sigs)-i >= maxSignaturesPerTicker {
break
}
}
Expand All @@ -132,7 +128,8 @@
msg := ob.BuildInboundVoteMsgFromEvent(event)
if msg != nil {
if fromTracker {
metrics.InboundObservationsTrackerTotal.WithLabelValues(ob.Chain().Name, strconv.FormatBool(isInternalTracker)).
metrics.InboundObservationsTrackerTotal.
WithLabelValues(ob.Chain().Name, strconv.FormatBool(isInternalTracker)).
Inc()
} else {
metrics.InboundObservationsBlockScanTotal.WithLabelValues(ob.Chain().Name).Inc()
Expand Down Expand Up @@ -163,7 +160,8 @@
gatewayID solana.PublicKey,
senderChainID int64,
logger zerolog.Logger,
) ([]*clienttypes.InboundEvent, error) {

Check failure on line 163 in zetaclient/chains/solana/observer/inbound.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary leading newline (whitespace)

if txResult.Meta.Err != nil {
return nil, errors.Errorf("transaction failed with error: %v", txResult.Meta.Err)
}
Expand Down
Loading
Loading