diff --git a/.gitignore b/.gitignore index 9ba0cfe438..4098f30577 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ release/ .DS_Store build/ cache/ +evm_stress *.iml # Local .terraform directories diff --git a/app/ante/evm_delivertx.go b/app/ante/evm_delivertx.go index 5168afa5c2..214d37a74a 100644 --- a/app/ante/evm_delivertx.go +++ b/app/ante/evm_delivertx.go @@ -8,6 +8,7 @@ import ( "github.com/sei-protocol/sei-chain/sei-cosmos/client" "github.com/sei-protocol/sei-chain/sei-cosmos/crypto/keys/secp256k1" sdk "github.com/sei-protocol/sei-chain/sei-cosmos/types" + sdkerrors "github.com/sei-protocol/sei-chain/sei-cosmos/types/errors" upgradekeeper "github.com/sei-protocol/sei-chain/sei-cosmos/x/upgrade/keeper" "github.com/sei-protocol/sei-chain/x/evm/derived" evmkeeper "github.com/sei-protocol/sei-chain/x/evm/keeper" @@ -46,6 +47,22 @@ func EvmDeliverTxAnte( } func EvmDeliverHandleSignatures(ctx sdk.Context, ek *evmkeeper.Keeper, txData ethtx.TxData, chainID *big.Int, msg *evmtypes.MsgEVMTransaction) (common.Address, derived.SignerVersion, error) { + if msg.Derived != nil { + if msg.Derived.PubKey == nil { + return common.Address{}, 0, sdkerrors.ErrInvalidPubKey + } + evmAddr := msg.Derived.SenderEVMAddr + seiAddr := msg.Derived.SenderSeiAddr + version := msg.Derived.Version + if err := AssociateAddress(ctx, ek, evmAddr, seiAddr, msg.Derived.PubKey); err != nil { + return evmAddr, version, err + } + if ek.EthReplayConfig.Enabled { + ek.PrepareReplayedAddr(ctx, evmAddr) + } + return evmAddr, version, nil + } + evmAddr, seiAddr, seiPubkey, version, err := CheckAndDecodeSignature(ctx, txData, chainID, ek.EthBlockTestConfig.Enabled) if err != nil { return evmAddr, version, err diff --git a/app/app.go b/app/app.go index 6bcef5afab..1623c86069 100644 --- a/app/app.go +++ b/app/app.go @@ -1506,6 +1506,35 @@ func (app *App) ProcessTxsSynchronousGiga(ctx sdk.Context, txs [][]byte, typedTx return txResults } +func (app *App) shouldProcessSingleRecipientEVMTransfersSynchronously(typedTxs []sdk.Tx) bool { + const minSingleRecipientEVMTransfers = 64 + + if len(typedTxs) < minSingleRecipientEVMTransfers { + return false + } + + var recipient common.Address + for i, tx := range typedTxs { + msg := app.GetEVMMsg(tx) + if msg == nil { + return false + } + etx, _ := msg.AsTransaction() + if etx == nil || etx.To() == nil || len(etx.Data()) != 0 || etx.Value().Sign() <= 0 { + return false + } + if i == 0 { + recipient = *etx.To() + continue + } + if *etx.To() != recipient { + return false + } + } + + return true +} + // cacheContext returns a new context based off of the provided context with // a branched multi-store. func (app *App) CacheContext(ctx sdk.Context) (sdk.Context, sdk.CacheMultiStore) { @@ -1516,14 +1545,19 @@ func (app *App) CacheContext(ctx sdk.Context) (sdk.Context, sdk.CacheMultiStore) // ExecuteTxsConcurrently calls the appropriate function for processing transacitons func (app *App) ExecuteTxsConcurrently(ctx sdk.Context, txs [][]byte, typedTxs []sdk.Tx) ([]*abci.ExecTxResult, sdk.Context) { + processSynchronously := app.shouldProcessSingleRecipientEVMTransfersSynchronously(typedTxs) + // Giga only supports synchronous execution for now - if app.GigaExecutorEnabled && app.GigaOCCEnabled { + if app.GigaExecutorEnabled && app.GigaOCCEnabled && !processSynchronously { return app.ProcessTXsWithOCCGiga(ctx, txs, typedTxs) } else if app.GigaExecutorEnabled { return app.ProcessTxsSynchronousGiga(ctx, txs, typedTxs), ctx } else if !ctx.IsOCCEnabled() { return app.ProcessTxsSynchronousV2(ctx, txs, typedTxs), ctx } + if processSynchronously { + return app.ProcessTxsSynchronousV2(ctx, txs, typedTxs), ctx + } return app.ProcessTXsWithOCCV2(ctx, txs, typedTxs) } diff --git a/giga/deps/tasks/scheduler.go b/giga/deps/tasks/scheduler.go index fc1e925a60..21c3f39452 100644 --- a/giga/deps/tasks/scheduler.go +++ b/giga/deps/tasks/scheduler.go @@ -3,8 +3,10 @@ package tasks import ( "context" "crypto/sha256" + "encoding/hex" "fmt" "sort" + "strings" "sync" "time" @@ -113,8 +115,10 @@ type scheduler struct { executeCh chan func() validateCh chan func() metrics *schedulerMetrics - synchronous bool // true if maxIncarnation exceeds threshold - maxIncarnation int // current highest incarnation + synchronous bool // true if maxIncarnation exceeds threshold + maxIncarnation int // current highest incarnation + conflictKeyCounts map[string]int // per-key conflict counts accumulated over the block + conflictKeyMu sync.Mutex } // NewScheduler creates a new scheduler @@ -166,23 +170,27 @@ func (s *scheduler) DoExecute(work func()) { s.executeCh <- work } -func (s *scheduler) findConflicts(task *deliverTxTask) (bool, []int) { +func (s *scheduler) findConflicts(task *deliverTxTask) (bool, []int, []string) { var conflicts []int + conflictKeys := make([]string, 0, len(s.multiVersionStores)) uniq := make(map[int]struct{}) valid := true - for _, mv := range s.multiVersionStores { - ok, mvConflicts := mv.ValidateTransactionState(task.AbsoluteIndex) + for storeKey, mv := range s.multiVersionStores { + ok, mvConflicts, mvKeys := mv.ValidateTransactionStateWithKeys(task.AbsoluteIndex) for _, c := range mvConflicts { if _, ok := uniq[c]; !ok { conflicts = append(conflicts, c) uniq[c] = struct{}{} } } + for _, k := range mvKeys { + conflictKeys = append(conflictKeys, storeKey.Name()+"/"+k) + } // any non-ok value makes valid false valid = valid && ok } sort.Ints(conflicts) - return valid, conflicts + return valid, conflicts, conflictKeys } func toTasks(reqs []*sdk.DeliverTxEntry) ([]*deliverTxTask, map[int]*deliverTxTask) { @@ -284,6 +292,7 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t tasks, tasksMap := toTasks(reqs) s.allTasks = tasks s.allTasksMap = tasksMap + s.conflictKeyCounts = make(map[string]int) s.executeCh = make(chan func(), len(tasks)) s.validateCh = make(chan func(), len(tasks)) defer s.emitMetrics() @@ -338,6 +347,21 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t } s.metrics.maxIncarnation = s.maxIncarnation + if s.metrics.retries > 0 && len(s.conflictKeyCounts) > 0 { + encoded := make(map[string]int, len(s.conflictKeyCounts)) + for k, v := range s.conflictKeyCounts { + storeName, rawKey, _ := strings.Cut(k, "/") + var encodedKey string + if rawKey == "globalAccountNumber" { + encodedKey = storeName + "/" + rawKey + } else { + encodedKey = storeName + "/" + hex.EncodeToString([]byte(rawKey)) + } + encoded[encodedKey] = v + } + logger.Info("occ scheduler key conflicts", "height", ctx.BlockHeight(), "counts", encoded) + } + logger.Info("occ scheduler", "height", ctx.BlockHeight(), "txs", len(tasks), "latency_ms", time.Since(startTime).Milliseconds(), "retries", s.metrics.retries, "maxIncarnation", s.maxIncarnation, "iterations", iterations, "sync", s.synchronous, "workers", s.workers) return s.collectResponses(tasks), nil @@ -354,9 +378,14 @@ func (s *scheduler) shouldRerun(task *deliverTxTask) bool { // With the current scheduler, we won't actually get to this step if a previous task has already been determined to be invalid, // since we choose to fail fast and mark the subsequent tasks as invalid as well. // TODO: in a future async scheduler that no longer exhaustively validates in order, we may need to carefully handle the `valid=true` with conflicts case - if valid, conflicts := s.findConflicts(task); !valid { + if valid, conflicts, conflictKeys := s.findConflicts(task); !valid { s.invalidateTask(task) task.AppendDependencies(conflicts) + s.conflictKeyMu.Lock() + for _, k := range conflictKeys { + s.conflictKeyCounts[k]++ + } + s.conflictKeyMu.Unlock() // if the conflicts are now validated, then rerun this task if dependenciesValidated(s.allTasksMap, task.Dependencies) { diff --git a/scripts/evm_stress.sh b/scripts/evm_stress.sh new file mode 100755 index 0000000000..dd8695c5ba --- /dev/null +++ b/scripts/evm_stress.sh @@ -0,0 +1,125 @@ +#!/usr/bin/env bash +# Spin up a local seid node, flood it with EVM transfers from N accounts to one +# recipient, and print only branch-specific logs + block time. +# +# Usage: ./scripts/evm_stress.sh +# Run from the repo root. +set -euo pipefail + +REPO_ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +SEID="$HOME/go/bin/seid" +LOG_FILE="/tmp/seid_stress.log" + +# Genesis balance per sender account. 10^12 usei ≈ unlimited for any test run. +SENDER_GENESIS_FUNDS="1000000000000" + +cd "$REPO_ROOT" + +cleanup() { + echo "" + echo "==> shutting down..." + [ -n "${SEID_PID:-}" ] && kill "$SEID_PID" 2>/dev/null || true + # Kill the entire process group so tail and grep children are also terminated. + [ -n "${LOG_PID:-}" ] && kill -- -"$LOG_PID" 2>/dev/null || true +} +trap cleanup EXIT INT TERM + +# Kill any tail processes from previous runs that are still watching this log +# file. tail -F re-opens the file by name when it is truncated, so a stale +# tail would re-read the new run's log and emit duplicate lines. +pkill -f "tail.*${LOG_FILE}" 2>/dev/null || true + +# --------------------------------------------------------------------------- +# 1. Init chain (no start) +# --------------------------------------------------------------------------- +echo "==> initializing chain..." +NO_RUN=1 ./scripts/initialize_local_chain.sh + +# --------------------------------------------------------------------------- +# 2. Bulk-add sender accounts to genesis via direct JSON patch. +# go run -dump-sei-addrs generates all bech32 addresses; Python patches +# genesis.json in one pass (same technique as populate_genesis_accounts.py). +# --------------------------------------------------------------------------- +cat > /tmp/evm_stress_patch_genesis.py << 'PYEOF' +import json, sys + +genesis_path = sys.argv[1] +amount = sys.argv[2] +denom = "usei" + +addrs = [l.strip() for l in sys.stdin if l.strip()] + +with open(genesis_path) as f: + g = json.load(f) + +for addr in addrs: + g["app_state"]["auth"]["accounts"].append({ + "@type": "/cosmos.auth.v1beta1.BaseAccount", + "address": addr, + "pub_key": None, + "account_number": "0", + "sequence": "0", + }) + g["app_state"]["bank"]["balances"].append({ + "address": addr, + "coins": [{"denom": denom, "amount": amount}], + }) + +with open(genesis_path, "w") as f: + json.dump(g, f, separators=(",", ":")) + +print(f"Added {len(addrs)} accounts to genesis", file=sys.stderr) +PYEOF + +echo "==> patching genesis with sender accounts..." +go run "$REPO_ROOT/scripts/evm_stress/main.go" -dump-sei-addrs \ + | python3 /tmp/evm_stress_patch_genesis.py \ + "$HOME/.sei/config/genesis.json" "$SENDER_GENESIS_FUNDS" +echo "==> genesis patched" + +# --------------------------------------------------------------------------- +# 3. Start seid, capturing all output to log file +# --------------------------------------------------------------------------- +echo "==> starting seid (logs -> $LOG_FILE)..." +mkdir -p /tmp/race +GORACE="log_path=/tmp/race/seid_race" \ + "$SEID" start --trace --chain-id sei-chain > "$LOG_FILE" 2>&1 & +SEID_PID=$! +echo "==> seid PID: $SEID_PID" + +# --------------------------------------------------------------------------- +# 4. Tail log file, printing only branch-specific messages +# - "occ scheduler key conflicts" from sei-cosmos/tasks/scheduler.go +# - "execution block time" from x/evm/keeper/abci.go +# --------------------------------------------------------------------------- +( + tail -F "$LOG_FILE" 2>/dev/null \ + | grep --line-buffered -E \ + '"occ scheduler key conflicts"|"execution block time"' +) & +LOG_PID=$! + +# --------------------------------------------------------------------------- +# 5. Wait for the EVM RPC to accept connections +# --------------------------------------------------------------------------- +echo "==> waiting for EVM RPC at http://127.0.0.1:8545..." +for i in $(seq 1 60); do + if curl -sf -X POST http://127.0.0.1:8545 \ + -H "Content-Type: application/json" \ + -d '{"jsonrpc":"2.0","method":"eth_blockNumber","params":[],"id":1}' \ + > /dev/null 2>&1; then + echo "==> EVM RPC ready (after ${i}s)" + break + fi + if [ "$i" -eq 60 ]; then + echo "ERROR: EVM RPC not ready after 60s" >&2 + exit 1 + fi + sleep 1 +done + +# --------------------------------------------------------------------------- +# 6. Run the Go load tester +# --------------------------------------------------------------------------- +echo "==> starting EVM transfer stress test (target 500 TPS, 50k unique senders)..." +go run "$REPO_ROOT/scripts/evm_stress/main.go" diff --git a/scripts/evm_stress/main.go b/scripts/evm_stress/main.go new file mode 100644 index 0000000000..aec30cc670 --- /dev/null +++ b/scripts/evm_stress/main.go @@ -0,0 +1,154 @@ +package main + +import ( + "context" + "crypto/ecdsa" + "flag" + "fmt" + "math/big" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethereum/go-ethereum/ethclient" + + seibech32 "github.com/sei-protocol/sei-chain/sei-cosmos/types/bech32" +) + +const ( + evmRPC = "http://127.0.0.1:8545" + chainID = 713714 // default EVM chain ID for "sei-chain" + targetTPS = 500 + numWorkers = 250 + + // Total unique sender accounts pre-funded in genesis. Every tx sent by + // the stress test has a distinct sender with nonce=0. At targetTPS, the + // pool lasts totalAccounts/targetTPS seconds. + totalAccounts = 50_000 +) + +var ( + bigChainID = big.NewInt(chainID) + signer = types.NewLondonSigner(bigChainID) + maxFee = big.NewInt(1_000_000_000_000) // 1000 gwei + priorityFee = big.NewInt(1_000_000_000) // 1 gwei + txValue = big.NewInt(1_000_000_000_001) // 10^12+1 wei: touches both usei balance and wei remainder +) + +// nextKey returns a unique deterministic private key for the given index. +func nextKey(idx uint64) *ecdsa.PrivateKey { + seed := make([]byte, 32) + // use upper 8 bytes for the index so seed is never all-zero + seed[0] = 0x01 + for i := 0; i < 8; i++ { + seed[1+i] = byte(idx >> (56 - 8*i)) + } + key, err := crypto.ToECDSA(seed) + if err != nil { + panic(fmt.Sprintf("bad key seed %d: %v", idx, err)) + } + return key +} + +func keyAddr(key *ecdsa.PrivateKey) common.Address { + return crypto.PubkeyToAddress(key.PublicKey) +} + +func evmToSei(addr common.Address) string { + s, err := seibech32.ConvertAndEncode("sei", addr.Bytes()) + if err != nil { + panic(fmt.Sprintf("bech32 encode: %v", err)) + } + return s +} + +func signTx(tx *types.Transaction, key *ecdsa.PrivateKey) *types.Transaction { + signed, err := types.SignTx(tx, signer, key) + if err != nil { + panic(err) + } + return signed +} + +func transfer(nonce uint64, to common.Address, key *ecdsa.PrivateKey) *types.Transaction { + return signTx(types.NewTx(&types.DynamicFeeTx{ + ChainID: bigChainID, + Nonce: nonce, + GasTipCap: priorityFee, + GasFeeCap: maxFee, + Gas: 21_000, + To: &to, + Value: txValue, + }), key) +} + +func waitForBalance(ctx context.Context, client *ethclient.Client, addr common.Address) { + fmt.Printf("waiting for %s to have balance...\n", addr.Hex()) + for { + bal, err := client.BalanceAt(ctx, addr, nil) + if err == nil && bal.Sign() > 0 { + fmt.Printf(" %s: %s wei\n", addr.Hex(), bal.String()) + return + } + time.Sleep(300 * time.Millisecond) + } +} + +func main() { + dumpSeiAddrs := flag.Bool("dump-sei-addrs", false, "print sender sei bech32 addresses for genesis funding and exit") + flag.Parse() + + // Key 0 = recipient; keys 1..totalAccounts = one-time genesis-funded senders. + recipient := keyAddr(nextKey(0)) + + if *dumpSeiAddrs { + for i := uint64(1); i <= totalAccounts; i++ { + fmt.Println(evmToSei(keyAddr(nextKey(i)))) + } + return + } + + ctx := context.Background() + client, err := ethclient.Dial(evmRPC) + if err != nil { + panic(fmt.Sprintf("dial %s: %v", evmRPC, err)) + } + defer client.Close() + + fmt.Printf("recipient: %s\n", recipient.Hex()) + + // Wait for genesis accounts to have balance — confirms the node is live. + waitForBalance(ctx, client, keyAddr(nextKey(1))) + + // Pre-fill the work queue. Each key is used for exactly one tx (nonce=0). + funded := make(chan *ecdsa.PrivateKey, totalAccounts) + for i := uint64(1); i <= totalAccounts; i++ { + funded <- nextKey(i) + } + close(funded) + + // Shared rate limiter across all workers: one tick per tx slot. + ticker := time.NewTicker(time.Second / time.Duration(targetTPS)) + defer ticker.Stop() + + fmt.Printf("starting %d workers, %d unique senders, target %d TPS\n", + numWorkers, totalAccounts, targetTPS) + + var wg sync.WaitGroup + for range numWorkers { + wg.Add(1) + go func() { + defer wg.Done() + for key := range funded { + <-ticker.C + tx := transfer(0, recipient, key) + _ = client.SendTransaction(ctx, tx) + } + }() + } + + wg.Wait() + fmt.Printf("all %d accounts exhausted\n", totalAccounts) +} diff --git a/sei-cosmos/baseapp/abci.go b/sei-cosmos/baseapp/abci.go index ca40308baa..63153ded46 100644 --- a/sei-cosmos/baseapp/abci.go +++ b/sei-cosmos/baseapp/abci.go @@ -246,6 +246,7 @@ func (app *BaseApp) SetDeliverStateToCommit() { // height. func (app *BaseApp) Commit(ctx context.Context) (res *abci.ResponseCommit, err error) { defer telemetry.MeasureSince(time.Now(), "abci", "commit") + commitStart := time.Now() app.commitLock.Lock() defer app.commitLock.Unlock() @@ -300,6 +301,18 @@ func (app *BaseApp) Commit(ctx context.Context) (res *abci.ResponseCommit, err e } app.SnapshotIfApplicable(uint64(header.Height)) //nolint:gosec // bounds checked above + commitMs := time.Since(commitStart).Milliseconds() + ppMs := app.execProcessProposalMs + fbMs := app.execFinalizeBlockMs + logger.Info("execution block time", + "height", header.Height, + "block_txs", app.execBlockTxCount, + "process_proposal_ms", ppMs, + "finalize_block_ms", fbMs, + "commit_ms", commitMs, + "total_execution_ms", ppMs+fbMs+commitMs, + ) + return &abci.ResponseCommit{ RetainHeight: retainHeight, }, nil @@ -922,6 +935,8 @@ func splitPath(requestPath string) (path []string) { // ABCI++ func (app *BaseApp) ProcessProposal(ctx context.Context, req *abci.RequestProcessProposal) (resp *abci.ResponseProcessProposal, err error) { defer telemetry.MeasureSince(time.Now(), "abci", "process_proposal") + ppStart := time.Now() + defer func() { app.execProcessProposalMs = time.Since(ppStart).Milliseconds() }() if app.ChainID != req.Header.ChainID { return nil, fmt.Errorf("unexpected ChainID, got %q, want %q", req.Header.ChainID, app.ChainID) } @@ -983,6 +998,9 @@ func (app *BaseApp) ProcessProposal(ctx context.Context, req *abci.RequestProces func (app *BaseApp) FinalizeBlock(ctx context.Context, req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) { defer telemetry.MeasureSince(time.Now(), "abci", "finalize_block") + fbStart := time.Now() + app.execBlockTxCount = len(req.Txs) + defer func() { app.execFinalizeBlockMs = time.Since(fbStart).Milliseconds() }() if app.cms.TracingEnabled() { app.cms.SetTracingContext(sdk.TraceContext( diff --git a/sei-cosmos/baseapp/baseapp.go b/sei-cosmos/baseapp/baseapp.go index c72a37cb8e..cde44b4307 100644 --- a/sei-cosmos/baseapp/baseapp.go +++ b/sei-cosmos/baseapp/baseapp.go @@ -168,6 +168,10 @@ type BaseApp struct { occEnabled bool deliverTxHooks []DeliverTxHook + + execProcessProposalMs int64 + execFinalizeBlockMs int64 + execBlockTxCount int } type appStore struct { diff --git a/sei-cosmos/store/multiversion/store.go b/sei-cosmos/store/multiversion/store.go index 86aafa94f6..4a1c74b2c0 100644 --- a/sei-cosmos/store/multiversion/store.go +++ b/sei-cosmos/store/multiversion/store.go @@ -28,6 +28,7 @@ type MultiVersionStore interface { GetIterateset(index int) Iterateset ClearIterateset(index int) ValidateTransactionState(index int) (bool, []int) + ValidateTransactionStateWithKeys(index int) (bool, []int, []string) } type WriteSet map[string][]byte @@ -331,13 +332,14 @@ func (s *Store) checkIteratorAtIndex(index int) bool { return valid } -func (s *Store) checkReadsetAtIndex(index int) (bool, []int) { +func (s *Store) checkReadsetAtIndex(index int) (bool, []int, []string) { conflictSet := make(map[int]struct{}) + var conflictKeys []string valid := true readSetAny, found := s.txReadSets.Load(index) if !found { - return true, []int{} + return true, []int{}, nil } readset := readSetAny.(ReadSet) // iterate over readset and check if the value is the same as the latest value relateive to txIndex in the multiversion store @@ -354,6 +356,7 @@ func (s *Store) checkReadsetAtIndex(index int) (bool, []int) { parentVal := s.parentStore.Get([]byte(key)) if !bytes.Equal(parentVal, value) { valid = false + conflictKeys = append(conflictKeys, key) } } else { // if estimate, mark as conflict index - but don't invalidate @@ -365,10 +368,12 @@ func (s *Store) checkReadsetAtIndex(index int) (bool, []int) { // TODO: would we want to return early? conflictSet[latestValue.Index()] = struct{}{} valid = false + conflictKeys = append(conflictKeys, key) } } else if !bytes.Equal(latestValue.Value(), value) { conflictSet[latestValue.Index()] = struct{}{} valid = false + conflictKeys = append(conflictKeys, key) } } } @@ -380,7 +385,7 @@ func (s *Store) checkReadsetAtIndex(index int) (bool, []int) { sort.Ints(conflictIndices) - return valid, conflictIndices + return valid, conflictIndices, conflictKeys } // TODO: do we want to return bool + []int where bool indicates whether it was valid and then []int indicates only ones for which we need to wait due to estimates? - yes i think so? @@ -390,11 +395,17 @@ func (s *Store) ValidateTransactionState(index int) (bool, []int) { // TODO: can we parallelize for all iterators? iteratorValid := s.checkIteratorAtIndex(index) - readsetValid, conflictIndices := s.checkReadsetAtIndex(index) + readsetValid, conflictIndices, _ := s.checkReadsetAtIndex(index) return iteratorValid && readsetValid, conflictIndices } +func (s *Store) ValidateTransactionStateWithKeys(index int) (bool, []int, []string) { + iteratorValid := s.checkIteratorAtIndex(index) + readsetValid, conflictIndices, conflictKeys := s.checkReadsetAtIndex(index) + return iteratorValid && readsetValid, conflictIndices, conflictKeys +} + func (s *Store) WriteLatestToStore() { // sort the keys keys := []string{} diff --git a/sei-cosmos/tasks/scheduler.go b/sei-cosmos/tasks/scheduler.go index e9757a6cc7..473739737b 100644 --- a/sei-cosmos/tasks/scheduler.go +++ b/sei-cosmos/tasks/scheduler.go @@ -3,8 +3,10 @@ package tasks import ( "context" "crypto/sha256" + "encoding/hex" "fmt" "sort" + "strings" "sync" "time" @@ -113,8 +115,10 @@ type scheduler struct { executeCh chan func() validateCh chan func() metrics *schedulerMetrics - synchronous bool // true if maxIncarnation exceeds threshold - maxIncarnation int // current highest incarnation + synchronous bool // true if maxIncarnation exceeds threshold + maxIncarnation int // current highest incarnation + conflictKeyCounts map[string]int // per-key conflict counts accumulated over the block + conflictKeyMu sync.Mutex } // NewScheduler creates a new scheduler @@ -166,23 +170,27 @@ func (s *scheduler) DoExecute(work func()) { s.executeCh <- work } -func (s *scheduler) findConflicts(task *deliverTxTask) (bool, []int) { +func (s *scheduler) findConflicts(task *deliverTxTask) (bool, []int, []string) { var conflicts []int + conflictKeys := make([]string, 0, len(s.multiVersionStores)) uniq := make(map[int]struct{}) valid := true - for _, mv := range s.multiVersionStores { - ok, mvConflicts := mv.ValidateTransactionState(task.AbsoluteIndex) + for storeKey, mv := range s.multiVersionStores { + ok, mvConflicts, mvKeys := mv.ValidateTransactionStateWithKeys(task.AbsoluteIndex) for _, c := range mvConflicts { if _, ok := uniq[c]; !ok { conflicts = append(conflicts, c) uniq[c] = struct{}{} } } + for _, k := range mvKeys { + conflictKeys = append(conflictKeys, storeKey.Name()+"/"+k) + } // any non-ok value makes valid false valid = valid && ok } sort.Ints(conflicts) - return valid, conflicts + return valid, conflicts, conflictKeys } func toTasks(reqs []*sdk.DeliverTxEntry) ([]*deliverTxTask, map[int]*deliverTxTask) { @@ -270,6 +278,7 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t tasks, tasksMap := toTasks(reqs) s.allTasks = tasks s.allTasksMap = tasksMap + s.conflictKeyCounts = make(map[string]int) s.executeCh = make(chan func(), len(tasks)) s.validateCh = make(chan func(), len(tasks)) defer s.emitMetrics() @@ -324,6 +333,21 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t } s.metrics.maxIncarnation = s.maxIncarnation + if s.metrics.retries > 0 && len(s.conflictKeyCounts) > 0 { + encoded := make(map[string]int, len(s.conflictKeyCounts)) + for k, v := range s.conflictKeyCounts { + storeName, rawKey, _ := strings.Cut(k, "/") + var encodedKey string + if rawKey == "globalAccountNumber" { + encodedKey = storeName + "/" + rawKey + } else { + encodedKey = storeName + "/" + hex.EncodeToString([]byte(rawKey)) + } + encoded[encodedKey] = v + } + logger.Info("occ scheduler key conflicts", "height", ctx.BlockHeight(), "counts", encoded) + } + logger.Info("occ scheduler", "height", ctx.BlockHeight(), "txs", len(tasks), "latency_ms", time.Since(startTime).Milliseconds(), "retries", s.metrics.retries, "maxIncarnation", s.maxIncarnation, "iterations", iterations, "sync", s.synchronous, "workers", s.workers) return s.collectResponses(tasks), nil @@ -340,9 +364,14 @@ func (s *scheduler) shouldRerun(task *deliverTxTask) bool { // With the current scheduler, we won't actually get to this step if a previous task has already been determined to be invalid, // since we choose to fail fast and mark the subsequent tasks as invalid as well. // TODO: in a future async scheduler that no longer exhaustively validates in order, we may need to carefully handle the `valid=true` with conflicts case - if valid, conflicts := s.findConflicts(task); !valid { + if valid, conflicts, conflictKeys := s.findConflicts(task); !valid { s.invalidateTask(task) task.AppendDependencies(conflicts) + s.conflictKeyMu.Lock() + for _, k := range conflictKeys { + s.conflictKeyCounts[k]++ + } + s.conflictKeyMu.Unlock() // if the conflicts are now validated, then rerun this task if dependenciesValidated(s.allTasksMap, task.Dependencies) { diff --git a/utils/helpers/associate.go b/utils/helpers/associate.go index 07095b56de..44f6a77fd8 100644 --- a/utils/helpers/associate.go +++ b/utils/helpers/associate.go @@ -32,6 +32,13 @@ func NewAssociationHelper(evmKeeper evmKeeper, bankKeeper bankKeeper, accountKee } func (p AssociationHelper) AssociateAddresses(ctx sdk.Context, seiAddr sdk.AccAddress, evmAddr common.Address, pubkey cryptotypes.PubKey, migrateUseiOnly bool) error { + castAddr := sdk.AccAddress(evmAddr[:]) + if !castAddr.Equals(seiAddr) && p.accountKeeper.GetAccount(ctx, seiAddr) == nil { + castAcc := p.accountKeeper.GetAccount(ctx, castAddr) + if castAcc != nil && castAcc.GetPubKey() == nil && p.bankKeeper.LockedCoins(ctx, castAddr).IsZero() { + p.accountKeeper.SetAccount(ctx, authtypes.NewBaseAccount(seiAddr, pubkey, castAcc.GetAccountNumber(), castAcc.GetSequence())) + } + } p.evmKeeper.SetAddressMapping(ctx, seiAddr, evmAddr) if acc := p.accountKeeper.GetAccount(ctx, seiAddr); acc.GetPubKey() == nil { if err := acc.SetPubKey(pubkey); err != nil { @@ -44,6 +51,9 @@ func (p AssociationHelper) AssociateAddresses(ctx sdk.Context, seiAddr sdk.AccAd func (p AssociationHelper) MigrateBalance(ctx sdk.Context, evmAddr common.Address, seiAddr sdk.AccAddress, migrateUseiOnly bool) error { castAddr := sdk.AccAddress(evmAddr[:]) + if castAddr.Equals(seiAddr) { + return nil + } var castAddrBalances sdk.Coins if migrateUseiOnly { castAddrBalances = sdk.Coins{p.bankKeeper.GetBalance(ctx, castAddr, "usei")} diff --git a/utils/helpers/associate_test.go b/utils/helpers/associate_test.go index f290cdf7ee..b6a024620a 100644 --- a/utils/helpers/associate_test.go +++ b/utils/helpers/associate_test.go @@ -3,9 +3,11 @@ package helpers import ( "testing" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/sei-protocol/sei-chain/sei-cosmos/crypto/keys/secp256k1" sdk "github.com/sei-protocol/sei-chain/sei-cosmos/types" + authtypes "github.com/sei-protocol/sei-chain/sei-cosmos/x/auth/types" "github.com/stretchr/testify/require" ) @@ -158,3 +160,114 @@ func TestEdgeCases(t *testing.T) { require.Equal(t, 20, len(evmAddr.Bytes())) }) } + +type mockEVMKeeper struct { + mappings map[string]common.Address +} + +func (m *mockEVMKeeper) SetAddressMapping(_ sdk.Context, seiAddress sdk.AccAddress, evmAddress common.Address) { + if m.mappings == nil { + m.mappings = map[string]common.Address{} + } + m.mappings[seiAddress.String()] = evmAddress +} + +type mockBankKeeper struct { + sendCalls int +} + +func (m *mockBankKeeper) SpendableCoins(sdk.Context, sdk.AccAddress) sdk.Coins { + return sdk.NewCoins(sdk.NewCoin("usei", sdk.NewInt(100))) +} + +func (m *mockBankKeeper) SendCoins(sdk.Context, sdk.AccAddress, sdk.AccAddress, sdk.Coins) error { + m.sendCalls++ + return nil +} + +func (m *mockBankKeeper) GetWeiBalance(sdk.Context, sdk.AccAddress) sdk.Int { + return sdk.ZeroInt() +} + +func (m *mockBankKeeper) SendCoinsAndWei(sdk.Context, sdk.AccAddress, sdk.AccAddress, sdk.Int, sdk.Int) error { + return nil +} + +func (m *mockBankKeeper) LockedCoins(sdk.Context, sdk.AccAddress) sdk.Coins { + return nil +} + +func (m *mockBankKeeper) GetBalance(sdk.Context, sdk.AccAddress, string) sdk.Coin { + return sdk.NewCoin("usei", sdk.NewInt(100)) +} + +type mockAccountKeeper struct { + accounts map[string]authtypes.AccountI + newAccountCalls int +} + +func (m *mockAccountKeeper) GetAccount(_ sdk.Context, addr sdk.AccAddress) authtypes.AccountI { + return m.accounts[addr.String()] +} + +func (m *mockAccountKeeper) HasAccount(_ sdk.Context, addr sdk.AccAddress) bool { + _, ok := m.accounts[addr.String()] + return ok +} + +func (m *mockAccountKeeper) SetAccount(_ sdk.Context, acc authtypes.AccountI) { + if m.accounts == nil { + m.accounts = map[string]authtypes.AccountI{} + } + m.accounts[acc.GetAddress().String()] = acc +} + +func (m *mockAccountKeeper) RemoveAccount(_ sdk.Context, acc authtypes.AccountI) { + delete(m.accounts, acc.GetAddress().String()) +} + +func (m *mockAccountKeeper) NewAccountWithAddress(_ sdk.Context, addr sdk.AccAddress) authtypes.AccountI { + m.newAccountCalls++ + return authtypes.NewBaseAccountWithAddress(addr) +} + +func (m *mockAccountKeeper) GetParams(sdk.Context) authtypes.Params { + return authtypes.DefaultParams() +} + +func TestAssociateAddressesReusesEmptyCastAccount(t *testing.T) { + ctx := sdk.Context{} + evmAddr := common.HexToAddress("0x1111111111111111111111111111111111111111") + castAddr := sdk.AccAddress(evmAddr[:]) + seiAddr := sdk.AccAddress(common.HexToAddress("0x2222222222222222222222222222222222222222").Bytes()) + pubkey := secp256k1.GenPrivKey().PubKey().(*secp256k1.PubKey) + + ak := &mockAccountKeeper{accounts: map[string]authtypes.AccountI{}} + ak.SetAccount(ctx, authtypes.NewBaseAccount(castAddr, nil, 42, 7)) + bk := &mockBankKeeper{} + ek := &mockEVMKeeper{} + + helper := NewAssociationHelper(ek, bk, ak) + require.NoError(t, helper.AssociateAddresses(ctx, seiAddr, evmAddr, pubkey, false)) + + require.Zero(t, ak.newAccountCalls) + require.Nil(t, ak.GetAccount(ctx, castAddr)) + require.Equal(t, evmAddr, ek.mappings[seiAddr.String()]) + acc := ak.GetAccount(ctx, seiAddr) + require.NotNil(t, acc) + require.Equal(t, uint64(42), acc.GetAccountNumber()) + require.Equal(t, uint64(7), acc.GetSequence()) + require.Equal(t, pubkey.Bytes(), acc.GetPubKey().Bytes()) + require.Equal(t, 1, bk.sendCalls) +} + +func TestMigrateBalanceSkipsDirectCastAddress(t *testing.T) { + ctx := sdk.Context{} + evmAddr := common.HexToAddress("0x1111111111111111111111111111111111111111") + seiAddr := sdk.AccAddress(evmAddr[:]) + + bk := &mockBankKeeper{} + helper := NewAssociationHelper(&mockEVMKeeper{}, bk, &mockAccountKeeper{}) + require.NoError(t, helper.MigrateBalance(ctx, evmAddr, seiAddr, false)) + require.Zero(t, bk.sendCalls) +}