From 82628332bef96af124c94f3c47931b202a4e8c94 Mon Sep 17 00:00:00 2001 From: Tony Chen Date: Thu, 7 May 2026 12:47:48 +0800 Subject: [PATCH] log OCC conflicts and block execution time --- giga/deps/tasks/scheduler.go | 43 +++++++++++++++++++++----- sei-cosmos/baseapp/abci.go | 18 +++++++++++ sei-cosmos/baseapp/baseapp.go | 4 +++ sei-cosmos/store/multiversion/store.go | 19 +++++++++--- sei-cosmos/tasks/scheduler.go | 43 +++++++++++++++++++++----- 5 files changed, 109 insertions(+), 18 deletions(-) diff --git a/giga/deps/tasks/scheduler.go b/giga/deps/tasks/scheduler.go index fc1e925a60..21c3f39452 100644 --- a/giga/deps/tasks/scheduler.go +++ b/giga/deps/tasks/scheduler.go @@ -3,8 +3,10 @@ package tasks import ( "context" "crypto/sha256" + "encoding/hex" "fmt" "sort" + "strings" "sync" "time" @@ -113,8 +115,10 @@ type scheduler struct { executeCh chan func() validateCh chan func() metrics *schedulerMetrics - synchronous bool // true if maxIncarnation exceeds threshold - maxIncarnation int // current highest incarnation + synchronous bool // true if maxIncarnation exceeds threshold + maxIncarnation int // current highest incarnation + conflictKeyCounts map[string]int // per-key conflict counts accumulated over the block + conflictKeyMu sync.Mutex } // NewScheduler creates a new scheduler @@ -166,23 +170,27 @@ func (s *scheduler) DoExecute(work func()) { s.executeCh <- work } -func (s *scheduler) findConflicts(task *deliverTxTask) (bool, []int) { +func (s *scheduler) findConflicts(task *deliverTxTask) (bool, []int, []string) { var conflicts []int + conflictKeys := make([]string, 0, len(s.multiVersionStores)) uniq := make(map[int]struct{}) valid := true - for _, mv := range s.multiVersionStores { - ok, mvConflicts := mv.ValidateTransactionState(task.AbsoluteIndex) + for storeKey, mv := range s.multiVersionStores { + ok, mvConflicts, mvKeys := mv.ValidateTransactionStateWithKeys(task.AbsoluteIndex) for _, c := range mvConflicts { if _, ok := uniq[c]; !ok { conflicts = append(conflicts, c) uniq[c] = struct{}{} } } + for _, k := range mvKeys { + conflictKeys = append(conflictKeys, storeKey.Name()+"/"+k) + } // any non-ok value makes valid false valid = valid && ok } sort.Ints(conflicts) - return valid, conflicts + return valid, conflicts, conflictKeys } func toTasks(reqs []*sdk.DeliverTxEntry) ([]*deliverTxTask, map[int]*deliverTxTask) { @@ -284,6 +292,7 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t tasks, tasksMap := toTasks(reqs) s.allTasks = tasks s.allTasksMap = tasksMap + s.conflictKeyCounts = make(map[string]int) s.executeCh = make(chan func(), len(tasks)) s.validateCh = make(chan func(), len(tasks)) defer s.emitMetrics() @@ -338,6 +347,21 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t } s.metrics.maxIncarnation = s.maxIncarnation + if s.metrics.retries > 0 && len(s.conflictKeyCounts) > 0 { + encoded := make(map[string]int, len(s.conflictKeyCounts)) + for k, v := range s.conflictKeyCounts { + storeName, rawKey, _ := strings.Cut(k, "/") + var encodedKey string + if rawKey == "globalAccountNumber" { + encodedKey = storeName + "/" + rawKey + } else { + encodedKey = storeName + "/" + hex.EncodeToString([]byte(rawKey)) + } + encoded[encodedKey] = v + } + logger.Info("occ scheduler key conflicts", "height", ctx.BlockHeight(), "counts", encoded) + } + logger.Info("occ scheduler", "height", ctx.BlockHeight(), "txs", len(tasks), "latency_ms", time.Since(startTime).Milliseconds(), "retries", s.metrics.retries, "maxIncarnation", s.maxIncarnation, "iterations", iterations, "sync", s.synchronous, "workers", s.workers) return s.collectResponses(tasks), nil @@ -354,9 +378,14 @@ func (s *scheduler) shouldRerun(task *deliverTxTask) bool { // With the current scheduler, we won't actually get to this step if a previous task has already been determined to be invalid, // since we choose to fail fast and mark the subsequent tasks as invalid as well. // TODO: in a future async scheduler that no longer exhaustively validates in order, we may need to carefully handle the `valid=true` with conflicts case - if valid, conflicts := s.findConflicts(task); !valid { + if valid, conflicts, conflictKeys := s.findConflicts(task); !valid { s.invalidateTask(task) task.AppendDependencies(conflicts) + s.conflictKeyMu.Lock() + for _, k := range conflictKeys { + s.conflictKeyCounts[k]++ + } + s.conflictKeyMu.Unlock() // if the conflicts are now validated, then rerun this task if dependenciesValidated(s.allTasksMap, task.Dependencies) { diff --git a/sei-cosmos/baseapp/abci.go b/sei-cosmos/baseapp/abci.go index ca40308baa..63153ded46 100644 --- a/sei-cosmos/baseapp/abci.go +++ b/sei-cosmos/baseapp/abci.go @@ -246,6 +246,7 @@ func (app *BaseApp) SetDeliverStateToCommit() { // height. func (app *BaseApp) Commit(ctx context.Context) (res *abci.ResponseCommit, err error) { defer telemetry.MeasureSince(time.Now(), "abci", "commit") + commitStart := time.Now() app.commitLock.Lock() defer app.commitLock.Unlock() @@ -300,6 +301,18 @@ func (app *BaseApp) Commit(ctx context.Context) (res *abci.ResponseCommit, err e } app.SnapshotIfApplicable(uint64(header.Height)) //nolint:gosec // bounds checked above + commitMs := time.Since(commitStart).Milliseconds() + ppMs := app.execProcessProposalMs + fbMs := app.execFinalizeBlockMs + logger.Info("execution block time", + "height", header.Height, + "block_txs", app.execBlockTxCount, + "process_proposal_ms", ppMs, + "finalize_block_ms", fbMs, + "commit_ms", commitMs, + "total_execution_ms", ppMs+fbMs+commitMs, + ) + return &abci.ResponseCommit{ RetainHeight: retainHeight, }, nil @@ -922,6 +935,8 @@ func splitPath(requestPath string) (path []string) { // ABCI++ func (app *BaseApp) ProcessProposal(ctx context.Context, req *abci.RequestProcessProposal) (resp *abci.ResponseProcessProposal, err error) { defer telemetry.MeasureSince(time.Now(), "abci", "process_proposal") + ppStart := time.Now() + defer func() { app.execProcessProposalMs = time.Since(ppStart).Milliseconds() }() if app.ChainID != req.Header.ChainID { return nil, fmt.Errorf("unexpected ChainID, got %q, want %q", req.Header.ChainID, app.ChainID) } @@ -983,6 +998,9 @@ func (app *BaseApp) ProcessProposal(ctx context.Context, req *abci.RequestProces func (app *BaseApp) FinalizeBlock(ctx context.Context, req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) { defer telemetry.MeasureSince(time.Now(), "abci", "finalize_block") + fbStart := time.Now() + app.execBlockTxCount = len(req.Txs) + defer func() { app.execFinalizeBlockMs = time.Since(fbStart).Milliseconds() }() if app.cms.TracingEnabled() { app.cms.SetTracingContext(sdk.TraceContext( diff --git a/sei-cosmos/baseapp/baseapp.go b/sei-cosmos/baseapp/baseapp.go index c72a37cb8e..cde44b4307 100644 --- a/sei-cosmos/baseapp/baseapp.go +++ b/sei-cosmos/baseapp/baseapp.go @@ -168,6 +168,10 @@ type BaseApp struct { occEnabled bool deliverTxHooks []DeliverTxHook + + execProcessProposalMs int64 + execFinalizeBlockMs int64 + execBlockTxCount int } type appStore struct { diff --git a/sei-cosmos/store/multiversion/store.go b/sei-cosmos/store/multiversion/store.go index 86aafa94f6..4a1c74b2c0 100644 --- a/sei-cosmos/store/multiversion/store.go +++ b/sei-cosmos/store/multiversion/store.go @@ -28,6 +28,7 @@ type MultiVersionStore interface { GetIterateset(index int) Iterateset ClearIterateset(index int) ValidateTransactionState(index int) (bool, []int) + ValidateTransactionStateWithKeys(index int) (bool, []int, []string) } type WriteSet map[string][]byte @@ -331,13 +332,14 @@ func (s *Store) checkIteratorAtIndex(index int) bool { return valid } -func (s *Store) checkReadsetAtIndex(index int) (bool, []int) { +func (s *Store) checkReadsetAtIndex(index int) (bool, []int, []string) { conflictSet := make(map[int]struct{}) + var conflictKeys []string valid := true readSetAny, found := s.txReadSets.Load(index) if !found { - return true, []int{} + return true, []int{}, nil } readset := readSetAny.(ReadSet) // iterate over readset and check if the value is the same as the latest value relateive to txIndex in the multiversion store @@ -354,6 +356,7 @@ func (s *Store) checkReadsetAtIndex(index int) (bool, []int) { parentVal := s.parentStore.Get([]byte(key)) if !bytes.Equal(parentVal, value) { valid = false + conflictKeys = append(conflictKeys, key) } } else { // if estimate, mark as conflict index - but don't invalidate @@ -365,10 +368,12 @@ func (s *Store) checkReadsetAtIndex(index int) (bool, []int) { // TODO: would we want to return early? conflictSet[latestValue.Index()] = struct{}{} valid = false + conflictKeys = append(conflictKeys, key) } } else if !bytes.Equal(latestValue.Value(), value) { conflictSet[latestValue.Index()] = struct{}{} valid = false + conflictKeys = append(conflictKeys, key) } } } @@ -380,7 +385,7 @@ func (s *Store) checkReadsetAtIndex(index int) (bool, []int) { sort.Ints(conflictIndices) - return valid, conflictIndices + return valid, conflictIndices, conflictKeys } // TODO: do we want to return bool + []int where bool indicates whether it was valid and then []int indicates only ones for which we need to wait due to estimates? - yes i think so? @@ -390,11 +395,17 @@ func (s *Store) ValidateTransactionState(index int) (bool, []int) { // TODO: can we parallelize for all iterators? iteratorValid := s.checkIteratorAtIndex(index) - readsetValid, conflictIndices := s.checkReadsetAtIndex(index) + readsetValid, conflictIndices, _ := s.checkReadsetAtIndex(index) return iteratorValid && readsetValid, conflictIndices } +func (s *Store) ValidateTransactionStateWithKeys(index int) (bool, []int, []string) { + iteratorValid := s.checkIteratorAtIndex(index) + readsetValid, conflictIndices, conflictKeys := s.checkReadsetAtIndex(index) + return iteratorValid && readsetValid, conflictIndices, conflictKeys +} + func (s *Store) WriteLatestToStore() { // sort the keys keys := []string{} diff --git a/sei-cosmos/tasks/scheduler.go b/sei-cosmos/tasks/scheduler.go index e9757a6cc7..473739737b 100644 --- a/sei-cosmos/tasks/scheduler.go +++ b/sei-cosmos/tasks/scheduler.go @@ -3,8 +3,10 @@ package tasks import ( "context" "crypto/sha256" + "encoding/hex" "fmt" "sort" + "strings" "sync" "time" @@ -113,8 +115,10 @@ type scheduler struct { executeCh chan func() validateCh chan func() metrics *schedulerMetrics - synchronous bool // true if maxIncarnation exceeds threshold - maxIncarnation int // current highest incarnation + synchronous bool // true if maxIncarnation exceeds threshold + maxIncarnation int // current highest incarnation + conflictKeyCounts map[string]int // per-key conflict counts accumulated over the block + conflictKeyMu sync.Mutex } // NewScheduler creates a new scheduler @@ -166,23 +170,27 @@ func (s *scheduler) DoExecute(work func()) { s.executeCh <- work } -func (s *scheduler) findConflicts(task *deliverTxTask) (bool, []int) { +func (s *scheduler) findConflicts(task *deliverTxTask) (bool, []int, []string) { var conflicts []int + conflictKeys := make([]string, 0, len(s.multiVersionStores)) uniq := make(map[int]struct{}) valid := true - for _, mv := range s.multiVersionStores { - ok, mvConflicts := mv.ValidateTransactionState(task.AbsoluteIndex) + for storeKey, mv := range s.multiVersionStores { + ok, mvConflicts, mvKeys := mv.ValidateTransactionStateWithKeys(task.AbsoluteIndex) for _, c := range mvConflicts { if _, ok := uniq[c]; !ok { conflicts = append(conflicts, c) uniq[c] = struct{}{} } } + for _, k := range mvKeys { + conflictKeys = append(conflictKeys, storeKey.Name()+"/"+k) + } // any non-ok value makes valid false valid = valid && ok } sort.Ints(conflicts) - return valid, conflicts + return valid, conflicts, conflictKeys } func toTasks(reqs []*sdk.DeliverTxEntry) ([]*deliverTxTask, map[int]*deliverTxTask) { @@ -270,6 +278,7 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t tasks, tasksMap := toTasks(reqs) s.allTasks = tasks s.allTasksMap = tasksMap + s.conflictKeyCounts = make(map[string]int) s.executeCh = make(chan func(), len(tasks)) s.validateCh = make(chan func(), len(tasks)) defer s.emitMetrics() @@ -324,6 +333,21 @@ func (s *scheduler) ProcessAll(ctx sdk.Context, reqs []*sdk.DeliverTxEntry) ([]t } s.metrics.maxIncarnation = s.maxIncarnation + if s.metrics.retries > 0 && len(s.conflictKeyCounts) > 0 { + encoded := make(map[string]int, len(s.conflictKeyCounts)) + for k, v := range s.conflictKeyCounts { + storeName, rawKey, _ := strings.Cut(k, "/") + var encodedKey string + if rawKey == "globalAccountNumber" { + encodedKey = storeName + "/" + rawKey + } else { + encodedKey = storeName + "/" + hex.EncodeToString([]byte(rawKey)) + } + encoded[encodedKey] = v + } + logger.Info("occ scheduler key conflicts", "height", ctx.BlockHeight(), "counts", encoded) + } + logger.Info("occ scheduler", "height", ctx.BlockHeight(), "txs", len(tasks), "latency_ms", time.Since(startTime).Milliseconds(), "retries", s.metrics.retries, "maxIncarnation", s.maxIncarnation, "iterations", iterations, "sync", s.synchronous, "workers", s.workers) return s.collectResponses(tasks), nil @@ -340,9 +364,14 @@ func (s *scheduler) shouldRerun(task *deliverTxTask) bool { // With the current scheduler, we won't actually get to this step if a previous task has already been determined to be invalid, // since we choose to fail fast and mark the subsequent tasks as invalid as well. // TODO: in a future async scheduler that no longer exhaustively validates in order, we may need to carefully handle the `valid=true` with conflicts case - if valid, conflicts := s.findConflicts(task); !valid { + if valid, conflicts, conflictKeys := s.findConflicts(task); !valid { s.invalidateTask(task) task.AppendDependencies(conflicts) + s.conflictKeyMu.Lock() + for _, k := range conflictKeys { + s.conflictKeyCounts[k]++ + } + s.conflictKeyMu.Unlock() // if the conflicts are now validated, then rerun this task if dependenciesValidated(s.allTasksMap, task.Dependencies) {