metrics(flatkv): add FlatKV observability metrics and logs#3366
metrics(flatkv): add FlatKV observability metrics and logs#3366blindchaser wants to merge 4 commits intomainfrom
Conversation
|
The latest Buf updates on your PR. Results from workflow Buf / buf (pull_request).
|
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3366 +/- ##
==========================================
+ Coverage 59.07% 59.20% +0.12%
==========================================
Files 2099 2098 -1
Lines 172988 172801 -187
==========================================
+ Hits 102195 102307 +112
+ Misses 61922 61625 -297
+ Partials 8871 8869 -2
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
| otelMetrics.SnapshotWriteLatency.Record(s.ctx, secondsSince(start), | ||
| metric.WithAttributes(successAttr(err))) | ||
| if err == nil { | ||
| otelMetrics.CurrentSnapshotHeight.Record(s.ctx, s.committedVersion) | ||
| } else if !errors.Is(err, errReadOnly) { | ||
| logger.Error("FlatKV snapshot failed", | ||
| "version", s.committedVersion, | ||
| "elapsed", time.Since(start), | ||
| "err", err) | ||
| } |
There was a problem hiding this comment.
Consider encapsulating this logic into a helper function inside flatkv/metrics.go. This is already fairly complex and long code, and so separating the metrics+logging stuff into helper functions might be worth it. (This is an optional suggestion, feel free to push back or ignore if you disagree.)
| defer func() { | ||
| otelMetrics.RollbackLatency.Record(s.ctx, secondsSince(start), | ||
| metric.WithAttributes(successAttr(err))) | ||
| if err == nil { | ||
| otelMetrics.CurrentVersion.Record(s.ctx, s.committedVersion) | ||
| } else if !errors.Is(err, errReadOnly) { | ||
| logger.Error("FlatKV Rollback failed", | ||
| "targetVersion", targetVersion, | ||
| "elapsed", time.Since(start), | ||
| "err", err) | ||
| } | ||
| }() |
There was a problem hiding this comment.
Similar to above, consider making this a helper function. Optional.
| defer func() { | ||
| otelMetrics.OpenLatency.Record(s.ctx, secondsSince(start), | ||
| metric.WithAttributes(attribute.Bool("read_only", readOnly), successAttr(retErr))) | ||
| if retErr == nil { | ||
| version := s.committedVersion | ||
| if opened != nil { | ||
| version = opened.Version() | ||
| } | ||
| if !readOnly { | ||
| otelMetrics.CurrentVersion.Record(s.ctx, s.committedVersion) | ||
| } | ||
| logger.Info("FlatKV LoadVersion complete", | ||
| "targetVersion", targetVersion, | ||
| "readOnly", readOnly, | ||
| "version", version, | ||
| "elapsed", time.Since(start)) | ||
| } else if !errors.Is(retErr, errReadOnly) { | ||
| logger.Error("FlatKV LoadVersion failed", | ||
| "targetVersion", targetVersion, | ||
| "readOnly", readOnly, | ||
| "elapsed", time.Since(start), | ||
| "err", retErr) | ||
| } | ||
| }() |
There was a problem hiding this comment.
Optional suggestion, make this a helper function
| func (s *CommitStore) ApplyChangeSets(changeSets []*proto.NamedChangeSet) (err error) { | ||
| start := time.Now() | ||
| defer func() { | ||
| otelMetrics.ApplyChangesetsLatency.Record(s.ctx, secondsSince(start), | ||
| metric.WithAttributes(successAttr(err))) | ||
| if err != nil && !errors.Is(err, errReadOnly) { | ||
| logger.Error("FlatKV ApplyChangeSets failed", | ||
| "changesets", len(changeSets), | ||
| "elapsed", time.Since(start), | ||
| "err", err) | ||
| } | ||
| }() |
There was a problem hiding this comment.
Optional, make this a helper function
| defer func() { | ||
| otelMetrics.CatchupLatency.Record(s.ctx, secondsSince(start), | ||
| metric.WithAttributes(successAttr(err))) | ||
| if replayed > 0 { | ||
| otelMetrics.CatchupReplayNumBlocks.Add(s.ctx, int64(replayed)) | ||
| otelMetrics.CurrentVersion.Record(s.ctx, s.committedVersion) | ||
| } | ||
| if err != nil { | ||
| logger.Error("FlatKV catchup failed", | ||
| "targetVersion", targetVersion, | ||
| "startOffset", startOff, | ||
| "endOffset", endOff, | ||
| "replayed", replayed, | ||
| "elapsed", time.Since(start), | ||
| "err", err) | ||
| } | ||
| }() |
There was a problem hiding this comment.
optional, make this a helper function
Summary
This PR adds a FlatKV observability layer around the existing persistence pipeline. It centralizes OpenTelemetry latency histograms, counters, and gauges while preserving the WAL -> per-DB batch -> metadata commit ordering and crash recovery model.
sei-db/state_db/sc/flatkv/metrics.go: defines the FlatKV meter, shared instruments, boolsuccessattributes, db attributes, duration helpers, pending-write gauges, version/snapshot gauges, and KV pair counters.sei-db/state_db/sc/flatkv/store.go: propagates the top-levelEnablePebbleMetricsflag into each nested PebbleDB config, records writerLoadVersionlatency/current version, and initializes snapshot-height gauges on open. Readonly clones do not overwrite writer version gauges.sei-db/state_db/sc/flatkv/store_apply.go: recordsApplyChangeSetslatency, applied KV counts, pending-write gauges, and debug fields that distinguish raw changes from merged writes. LtHash updates remain after classification and old-value reads.sei-db/state_db/sc/flatkv/store_write.go: records commit latency, per-DB batch commit latency, flush latency, pending-write resets, and current version. Commit error paths return the attempted version so deferred failure logs report the version being committed.sei-db/state_db/sc/flatkv/store_catchup.go: records catchup latency, replayed block counts, current version, and progress/failure logs while retaining WAL replay and global metadata commit behavior.sei-db/state_db/sc/flatkv/snapshot.go: records snapshot write latency, prune latency, current snapshot height, prune attempts, and rollback latency while preserving atomic snapshot swap and rollback ordering. Readonly guard failures are not logged as operational errors.sei-db/state_db/sc/flatkv/importer.go: records import latency, worker flush latency, imported KV counts, import stats, and post-import version/snapshot gauges while preserving per-DB worker ownership and final snapshot durability.sei-db/state_db/sc/flatkv/store_test.go: assertsInitializeDataDirectoriesenforces the top-level Pebble metrics setting across all nested DB configs.Test plan
sei-db/state_db/sc/flatkv/store_test.go:TestInitializeDataDirectoriesPropagatesPebbleMetricscovers propagation ofEnablePebbleMetrics=falseto account, code, storage, legacy, and metadata PebbleDB configs.