diff --git a/apps/evm/cmd/run.go b/apps/evm/cmd/run.go index a88e548d21..7829c2e1b0 100644 --- a/apps/evm/cmd/run.go +++ b/apps/evm/cmd/run.go @@ -131,7 +131,10 @@ var RunCmd = &cobra.Command{ }() } - return rollcmd.StartNode(logger, cmd, executor, sequencer, nodeKey, datastore, nodeConfig, genesis, node.NodeOptions{}) + // nil fiberClient: the EVM app doesn't wire Fibre DA. See + // tools/celestia-node-fiber for the adapter; testapp/cmd/run.go + // has the same TODO note for matching context. + return rollcmd.StartNode(logger, cmd, executor, sequencer, nodeKey, datastore, nodeConfig, genesis, node.NodeOptions{}, nil) }, } diff --git a/apps/grpc/cmd/run.go b/apps/grpc/cmd/run.go index 22ca71f587..586a89eddc 100644 --- a/apps/grpc/cmd/run.go +++ b/apps/grpc/cmd/run.go @@ -86,8 +86,10 @@ The execution client must implement the Evolve execution gRPC interface.`, return err } - // Start the node - return rollcmd.StartNode(logger, cmd, executor, sequencer, nodeKey, datastore, nodeConfig, genesis, node.NodeOptions{}) + // Start the node. nil fiberClient: the gRPC app doesn't wire + // Fibre DA. See tools/celestia-node-fiber for the adapter; + // testapp/cmd/run.go has the same TODO note for context. + return rollcmd.StartNode(logger, cmd, executor, sequencer, nodeKey, datastore, nodeConfig, genesis, node.NodeOptions{}, nil) }, } diff --git a/apps/testapp/cmd/run.go b/apps/testapp/cmd/run.go index 75e5a49019..05698f1e60 100644 --- a/apps/testapp/cmd/run.go +++ b/apps/testapp/cmd/run.go @@ -97,7 +97,12 @@ var RunCmd = &cobra.Command{ return err } - return cmd.StartNode(logger, command, executor, sequencer, nodeKey, datastore, nodeConfig, genesis, node.NodeOptions{}) + // nil fiberClient: testapp doesn't yet wire Fibre DA. To enable + // fiber support here, build a *cnfiber.Adapter from + // nodeConfig.DA.Fiber and pass it as the last argument. The + // adapter wiring lives in tools/celestia-node-fiber; see the + // fiber-bench tool's run.go for a working caller. + return cmd.StartNode(logger, command, executor, sequencer, nodeKey, datastore, nodeConfig, genesis, node.NodeOptions{}, nil) }, } diff --git a/block/internal/cache/manager.go b/block/internal/cache/manager.go index e907002600..9e293c775a 100644 --- a/block/internal/cache/manager.go +++ b/block/internal/cache/manager.go @@ -25,8 +25,24 @@ const ( // DataDAIncludedPrefix is the store key prefix for data DA inclusion tracking. DataDAIncludedPrefix = "cache/data-da-included/" - // DefaultTxCacheRetention is the default time to keep transaction hashes in cache. - DefaultTxCacheRetention = 24 * time.Hour + // DefaultTxCacheRetention is how long tx hashes stay in the + // seen-tx cache before CleanupOldTxs evicts them. + // + // HACK(fiber-throughput): dropped from 24h to 30s while we chase + // throughput, but the previous default was itself wrong: 24h is + // retention × tps in memory, so any rollup with meaningful TPS + // would OOM (we hit ~16 GB in under a minute at ~1.5M tx/s). + // What this should be properly: + // - Bounded by entry count, not wall time. The dedup window + // should be "the last N txs we saw", LRU-evicted, so cache + // memory is fixed regardless of throughput. + // - Or expressed in DA blocks: "drop hashes once their txs + // would have been retried out of the mempool", which is a + // property of mempool TTL × DA block time, not 24 hours. + // - 30s is a fine measurement default and a reasonable upper + // bound for pretty much any rollup; pick the right number + // when the cache structure itself is reworked. + DefaultTxCacheRetention = 30 * time.Second ) // CacheManager provides thread-safe cache operations for tracking seen blocks diff --git a/block/internal/common/consts.go b/block/internal/common/consts.go index 840b2faa97..8e1e679fc3 100644 --- a/block/internal/common/consts.go +++ b/block/internal/common/consts.go @@ -2,19 +2,54 @@ package common import "strconv" -// defaultMaxBlobSizeStr holds the string representation of the default blob -// size limit. Override at link time via: +// defaultMaxBlobSizeStr holds the string representation of the default +// blob size limit. Anchored to Fibre's actual cap: protocol MaxBlobSize +// (1 << 27 = 128 MiB) minus the 5-byte Fibre blob header (1 byte +// version + 4 byte data size). See celestia-app/v9/fibre/blob.go +// (blobHeaderLen) and fibre/protocol_params.go (MaxBlobSize). // -// go build -ldflags "-X github.com/evstack/ev-node/block/internal/common.defaultMaxBlobSizeStr=125829120" -var defaultMaxBlobSizeStr = "5242880" // 5 MB +// HACK(fiber-throughput): this default is correct for fiber-enabled +// deployments but WRONG for the legacy JSON-RPC blob client path — +// the bridge / chain rejects blobs above its own (much smaller) cap, +// so a non-fiber node started against this default would fail to +// submit. The right shape is per-backend: fiber's cap is one number, +// blob-RPC's cap is another, and DefaultMaxBlobSize shouldn't be a +// single global. Restructure into config when the throughput-cleanup +// TODO lands; until then, non-fiber callers should override via +// ldflag or local config. +// +// MUST be a string literal: Go's `-ldflags "-X ..."` only takes effect +// on variables initialized to a string constant, NOT a function call. +// A previous version used strconv.FormatUint here, which compiled but +// silently ignored ldflag overrides. +// +// Override at link time via: +// +// go build -ldflags "-X github.com/evstack/ev-node/block/internal/common.defaultMaxBlobSizeStr=33554432" +var defaultMaxBlobSizeStr = "134217723" // 1 << 27 - 5 = 128 MiB - 5 B // DefaultMaxBlobSize is the max blob size limit used for blob submission. +// +// TODO(throughput-cleanup): this single value is currently plugged in +// at two semantically different limits and the conflation has caused +// real bugs (a packed block marshals larger than its raw-tx total, so +// using MaxBlobSize as both input cap and output cap let blocks blow +// past the DA cap). Split into two: +// +// MaxBlobSize — chain-side ceiling on a marshaled DA blob +// MaxBlockTxBytes() — derived raw-tx budget = MaxBlobSize - per-block +// marshal overhead. Used by RetrieveBatch / +// FilterTxs. +// +// Once that derivation exists, drop the ad-hoc 2% reservation in +// executing/executor.go::RetrieveBatch and the duplicate cap in +// submitting/da_submitter.go::defaultRetryPolicy. var DefaultMaxBlobSize uint64 func init() { v, err := strconv.ParseUint(defaultMaxBlobSizeStr, 10, 64) if err != nil || v == 0 { - DefaultMaxBlobSize = 5 * 1024 * 1024 // 5 MB fallback + DefaultMaxBlobSize = 134217723 return } DefaultMaxBlobSize = v diff --git a/block/internal/da/fiber_client.go b/block/internal/da/fiber_client.go index 22413a181c..db92f06792 100644 --- a/block/internal/da/fiber_client.go +++ b/block/internal/da/fiber_client.go @@ -5,6 +5,7 @@ import ( "encoding/binary" "errors" "fmt" + "sync" "time" "github.com/rs/zerolog" @@ -87,38 +88,96 @@ func (c *fiberDAClient) Submit(ctx context.Context, data [][]byte, _ float64, na } } - flat := flattenBlobs(data) + // Per-item concurrent Upload. Fibre's per-Upload latency is + // dominated by validator signature aggregation (~1.5 s on a + // healthy network) and does not scale up linearly under multiple + // in-flight Uploads, so settlement throughput scales with the + // number of concurrent items submitted in a single batch. Each + // item gets its own goroutine, its own Upload call, and its own + // BlobID in the result; the previous flatten step was both + // memory-wasteful (a MaxBlobSize-sized memcpy on every Submit) + // and inherently serial (one Upload per Submit). + // + // TODO: wire-format compat — old splitBlobs assumed all items in + // a Submit were written as a single prefixed blob. With per-item + // Uploads, retrievers must treat each BlobID separately. The + // retrieve path in this file still uses splitBlobs and will need + // a follow-up to read the new per-item blobs as raw payloads. + nsID := namespace[len(namespace)-10:] + type uploadResult struct { + idx int + id []byte + err error + } + results := make([]uploadResult, len(data)) + var wg sync.WaitGroup + for i := range data { + wg.Add(1) + go func(i int) { + defer wg.Done() + res, err := c.fiber.Upload(ctx, nsID, data[i]) + if err != nil { + results[i] = uploadResult{idx: i, err: err} + return + } + id := make([]byte, len(res.BlobID)) + copy(id, res.BlobID) + results[i] = uploadResult{idx: i, id: id} + }(i) + } + wg.Wait() + + // Walk results in submission order. submitToDA's retry logic + // expects "prefix of successes": SubmittedCount=N means items + // [0..N) succeeded and the caller will re-submit items [N..end) + // on the next attempt. Reporting interleaved successes would + // double-submit blobs and waste escrow; matching prefix + // semantics keeps the contract intact even when individual + // Uploads fail out-of-order. + ids := make([][]byte, 0, len(data)) + var firstErr error + for _, r := range results { + if r.err != nil { + firstErr = r.err + break + } + ids = append(ids, r.id) + } - result, err := c.fiber.Upload(context.Background(), namespace[len(namespace)-10:], flat) - if err != nil { + if len(ids) == 0 && firstErr != nil { code := datypes.StatusError switch { - case errors.Is(err, context.Canceled): + case errors.Is(firstErr, context.Canceled): code = datypes.StatusContextCanceled - case errors.Is(err, context.DeadlineExceeded): + case errors.Is(firstErr, context.DeadlineExceeded): code = datypes.StatusContextDeadline } - - c.logger.Error().Err(err).Msg("fiber upload failed") - + c.logger.Error().Err(firstErr).Msg("fiber upload failed") return datypes.ResultSubmit{ BaseResult: datypes.BaseResult{ Code: code, - Message: fmt.Sprintf("fiber upload failed for blob: %v", err), - SubmittedCount: uint64(len(data) - 1), + Message: fmt.Sprintf("fiber upload failed for blob: %v", firstErr), + SubmittedCount: 0, BlobSize: blobSize, Timestamp: time.Now(), }, } } - c.logger.Debug().Int("num_ids", len(data)).Uint64("height", 0 /* TODO */).Msg("fiber DA submission successful") + if firstErr != nil { + c.logger.Warn().Err(firstErr). + Int("submitted", len(ids)). + Int("total", len(data)). + Msg("fiber upload partial success — caller will retry the remainder") + } + + c.logger.Debug().Int("num_ids", len(ids)).Uint64("height", 0 /* TODO */).Msg("fiber DA submission successful") return datypes.ResultSubmit{ BaseResult: datypes.BaseResult{ Code: datypes.StatusSuccess, - IDs: [][]byte{result.BlobID}, - SubmittedCount: uint64(len(data)), + IDs: ids, + SubmittedCount: uint64(len(ids)), Height: 0, /* TODO */ BlobSize: blobSize, Timestamp: time.Now(), @@ -329,29 +388,19 @@ func (c *fiberDAClient) Validate(_ context.Context, ids []datypes.ID, proofs []d func (c *fiberDAClient) GetHeaderNamespace() []byte { return c.namespaceBz } func (c *fiberDAClient) GetDataNamespace() []byte { return c.dataNamespaceBz } -func flattenBlobs(blobs [][]byte) []byte { - if len(blobs) == 0 { - return nil - } - - var total int - for _, b := range blobs { - total += 4 + len(b) - } - total += 4 - - buf := make([]byte, total) - binary.BigEndian.PutUint32(buf, uint32(len(blobs))) - off := 4 - for _, b := range blobs { - binary.BigEndian.PutUint32(buf[off:], uint32(len(b))) - off += 4 - copy(buf[off:], b) - off += len(b) - } - return buf -} - +// splitBlobs decodes the legacy "count + per-item length" framing that +// the previous Submit path used to pack multiple blobs into a single +// Upload. The per-item-concurrent Submit path no longer writes that +// framing — each item is uploaded raw — so any blob written by this +// branch's Submit will fail to decode here. +// +// Callers (Retrieve / RetrieveBlobs / Get / Subscribe) therefore only +// work for blobs written by the OLD code path, OR for the multi-item +// header batches that still use it. Pair the format change with a +// matching update to the read path before any node on this branch +// tries to sync from another node on this branch. +// +// Tracked alongside the wire-format TODO on Submit (above). func splitBlobs(data []byte) ([][]byte, error) { if len(data) == 0 { return nil, nil diff --git a/block/internal/executing/executor.go b/block/internal/executing/executor.go index 32ca4e596b..69746841ad 100644 --- a/block/internal/executing/executor.go +++ b/block/internal/executing/executor.go @@ -663,11 +663,35 @@ func (e *Executor) ProduceBlock(ctx context.Context) error { return nil } +// blockMarshalOverhead reserves a fraction of MaxBlobSize for the proto +// framing + Metadata overhead added when types.Data is marshaled into a +// DA blob. Empirically the per-tx proto length-prefix runs ~3 bytes, +// which is roughly 1.5% at 200 B txs and stays in that range across +// realistic tx sizes; 2% gives margin for fixed Metadata fields without +// leaving meaningful capacity unused. Reserving here (vs. inside +// FilterTxs) keeps the executor’s view of MaxBytes equal to the raw-tx +// budget and prevents a fully packed batch from blowing past the +// submitter’s MaxBlobSize check. +// +// TODO(throughput-cleanup): this is the workaround half of a deeper +// issue — common.DefaultMaxBlobSize is used as both the raw-tx +// budget AND the marshaled-blob ceiling. The right fix is to derive +// a MaxBlockTxBytes() value once (= MaxBlobSize - overhead) and have +// RetrieveBatch / FilterTxs / da_submitter.limitBatchBySize all +// reference the appropriate value rather than each enforcing the +// same number with their own ad-hoc adjustments. See +// common/consts.go for the umbrella TODO. +const blockMarshalOverheadPct = 2 + // RetrieveBatch gets the next batch of transactions from the sequencer. func (e *Executor) RetrieveBatch(ctx context.Context) (*BatchData, error) { + maxTxBytes := common.DefaultMaxBlobSize + if reserve := maxTxBytes * blockMarshalOverheadPct / 100; reserve < maxTxBytes { + maxTxBytes -= reserve + } req := coresequencer.GetNextBatchRequest{ Id: []byte(e.genesis.ChainID), - MaxBytes: common.DefaultMaxBlobSize, + MaxBytes: maxTxBytes, LastBatchData: [][]byte{}, // Can be populated if needed for sequencer context } diff --git a/block/internal/reaping/reaper.go b/block/internal/reaping/reaper.go index d35dbfff3e..7cd9e27450 100644 --- a/block/internal/reaping/reaper.go +++ b/block/internal/reaping/reaper.go @@ -21,7 +21,21 @@ import ( const ( // MaxBackoffInterval is the maximum backoff interval for retries MaxBackoffInterval = 30 * time.Second - CleanupInterval = 1 * time.Hour + + // CleanupInterval is how often the reaper sweeps expired hashes + // out of the seen-tx cache. + // + // HACK(fiber-throughput): dropped from 1h to 5s. The original + // 1h was effectively coupled to the previous 24h retention — + // sweeping every hour against a 24h window means a cache entry + // can outlive its retention by 1h, which is fine when retention + // is a day but completely breaks at 30s retention (entries + // would survive 12× past expiry). Whatever the right retention + // turns out to be (see DefaultTxCacheRetention's note in + // cache/manager.go), this value should be a small fraction of + // it — not a fixed time. Better to derive: e.g. + // retention/10 with a sane min/max. + CleanupInterval = 5 * time.Second ) // Reaper is responsible for periodically retrieving transactions from the executor, diff --git a/block/internal/submitting/da_submitter.go b/block/internal/submitting/da_submitter.go index 83f56d9cb5..bb30eb6461 100644 --- a/block/internal/submitting/da_submitter.go +++ b/block/internal/submitting/da_submitter.go @@ -43,14 +43,48 @@ type retryPolicy struct { MinBackoff time.Duration MaxBackoff time.Duration MaxBlobBytes uint64 + // MaxItems caps the number of items packed into a single Submit + // call. DA clients that fan out per-item Uploads (fiber) benefit + // linearly from larger batches — settlement throughput scales + // with concurrency until per-Upload latency dominates. Default + // 1 preserves legacy single-item-per-Submit semantics for + // backends that flatten a batch into one blob (JSON-RPC blob + // client). The fiber path overrides this from config. + MaxItems int } +// defaultBatchItems is the conservative default for non-fiber backends +// that historically expected one item per Submit call. The fiber path +// raises this via config because it can fan out per-item Uploads. +const defaultBatchItems = 1 + +// fiberDefaultBatchItems is the upper bound on items packed into a +// single fiber Submit. Each item gets its own concurrent Upload, so +// this caps the per-batch goroutine fan-out. +// +// HACK(fiber-throughput): hardcoded at 16. The right value depends on +// memory budget × per-item Upload size × Fibre validator-side +// throughput, none of which the submitter can know at compile time. +// Should be a config knob (FiberDAConfig.UploadConcurrency was +// scaffolded for exactly this earlier — wire it through here when the +// concurrent-uploads change graduates from prototype). 16 is a +// pragmatic measurement default that gives meaningful concurrency +// without overwhelming celestia-node's per-FSP rate. +const fiberDefaultBatchItems = 16 + func defaultRetryPolicy(maxAttempts int, maxDuration time.Duration) retryPolicy { return retryPolicy{ - MaxAttempts: maxAttempts, - MinBackoff: initialBackoff, - MaxBackoff: maxDuration, + MaxAttempts: maxAttempts, + MinBackoff: initialBackoff, + MaxBackoff: maxDuration, + // TODO(throughput-cleanup): same value is used by + // executing/executor.go::RetrieveBatch as the raw-tx budget + // (with a 2% reservation) and again here as the marshaled + // blob ceiling. They are semantically different limits; + // the duplication is what made packed-block-larger-than-cap + // failures non-obvious. See common/consts.go. MaxBlobBytes: common.DefaultMaxBlobSize, + MaxItems: defaultBatchItems, } } @@ -572,12 +606,19 @@ func submitToDA[T any]( } pol := defaultRetryPolicy(s.config.DA.MaxSubmitAttempts, s.config.DA.BlockTime.Duration) + // Fiber's DA client fans out per-item Uploads concurrently, so + // packing more items per Submit lifts settlement throughput. For + // non-fiber backends the default of 1 preserves the legacy + // flatten-one-blob behavior. + if s.config.DA.IsFiberEnabled() { + pol.MaxItems = fiberDefaultBatchItems + } rs := retryState{Attempt: 0, Backoff: 0} // Limit this submission to a single size-capped batch if len(marshaled) > 0 { - batchItems, batchMarshaled, err := limitBatchBySize(items, marshaled, pol.MaxBlobBytes) + batchItems, batchMarshaled, err := limitBatchBySize(items, marshaled, pol.MaxBlobBytes, pol.MaxItems) if err != nil { s.logger.Error(). Str("itemType", itemType). @@ -688,27 +729,42 @@ func submitToDA[T any]( return fmt.Errorf("failed to submit all %s(s) to DA layer after %d attempts", itemType, rs.Attempt) } -// limitBatchBySize returns a prefix of items whose total marshaled size does not exceed maxBytes. -// If the first item exceeds maxBytes, it returns ErrOversizedItem which is unrecoverable. -func limitBatchBySize[T any](items []T, marshaled [][]byte, maxBytes uint64) ([]T, [][]byte, error) { - total := uint64(0) +// limitBatchBySize returns a prefix of items whose per-item marshaled size +// fits within maxItemBytes. The total batch size is bounded by item count +// (maxItems), not by total bytes — DA clients that can fan out per-item +// Uploads (e.g. the fiber DA client) settle each item in its own +// concurrent Upload call, so packing more items per batch lifts the +// effective settlement throughput. DA clients that flatten a batch into +// a single blob still get one item per call when maxItems == 1. +// +// If the first item exceeds maxItemBytes, returns ErrOversizedItem +// (unrecoverable). If no items fit at all (empty inputs), returns a +// distinct error so the caller can distinguish "nothing to send". +// +// TODO(throughput-cleanup): see common/consts.go — maxItemBytes is the +// per-item chain ceiling, separate from the raw-tx budget driving +// FilterTxs. Once that split lands, the duplicate-cap-everywhere +// problem these fixes work around goes away. +func limitBatchBySize[T any](items []T, marshaled [][]byte, maxItemBytes uint64, maxItems int) ([]T, [][]byte, error) { + if maxItems <= 0 { + maxItems = 1 + } count := 0 for i := range items { + if count >= maxItems { + break + } sz := uint64(len(marshaled[i])) - if sz > maxBytes { + if sz > maxItemBytes { if i == 0 { - return nil, nil, fmt.Errorf("%w: item size %d exceeds max %d", common.ErrOversizedItem, sz, maxBytes) + return nil, nil, fmt.Errorf("%w: item size %d exceeds max %d", common.ErrOversizedItem, sz, maxItemBytes) } break } - if total+sz > maxBytes { - break - } - total += sz count++ } if count == 0 { - return nil, nil, fmt.Errorf("no items fit within %d bytes", maxBytes) + return nil, nil, fmt.Errorf("no items fit within %d bytes", maxItemBytes) } return items[:count], marshaled[:count], nil } diff --git a/pkg/store/kv.go b/pkg/store/kv.go index 3ee23bc2c1..5cea24ba12 100644 --- a/pkg/store/kv.go +++ b/pkg/store/kv.go @@ -8,6 +8,7 @@ import ( ds "github.com/ipfs/go-datastore" ktds "github.com/ipfs/go-datastore/keytransform" dsq "github.com/ipfs/go-datastore/query" + dssync "github.com/ipfs/go-datastore/sync" badger4 "github.com/ipfs/go-ds-badger4" ) @@ -15,7 +16,40 @@ import ( const EvPrefix = "0" // NewDefaultKVStore creates instance of default key-value store. -func NewDefaultKVStore(rootDir, dbPath, dbName string) (ds.Batching, error) { +// +// HACK(fiber-throughput): swapped to a pure in-memory map for the +// Fibre throughput investigation. The real issue this surfaces is +// architectural, not a Badger bug: block.executing.Executor.ProduceBlock +// calls store.batch.Commit() synchronously inside the producer, so +// the storage engine's write rate is a hard ceiling on block +// production. With 128 MiB blocks × ~1 b/s the on-disk path drives +// ~150 MB/s of value-log writes plus heavy compaction; the producer +// blocks on Badger long before the DA submitter is the bottleneck. +// +// Don't revert this in place — fix the underlying design instead. +// Options worth weighing: +// - Move the block save off the producer hot path (async commit +// with a bounded queue). Block durability is not required to +// advance state, only to recover after restart. +// - For Fibre-only rollups specifically: skip local persistence +// entirely. Fibre IS the storage; a node can re-sync from the +// chain on restart. This removes the question. +// - If we keep persisting, pick a write-optimised backend that +// handles 100s of MB/s of large-value writes without compaction +// stalls. Badger v4 with these tunings still hit a .vlog +// rotation race under sustained load. +// +// NewDefaultKVStoreOnDisk preserved below as the literal Badger +// constructor for any caller that explicitly wants disk-backed +// state today; the production wiring should switch to one of the +// three options above before this directory is dropped. +func NewDefaultKVStore(_, _, _ string) (ds.Batching, error) { + return dssync.MutexWrap(ds.NewMapDatastore()), nil +} + +// NewDefaultKVStoreOnDisk is the original Badger-backed constructor, +// preserved for the duration of the throughput-cleanup window. +func NewDefaultKVStoreOnDisk(rootDir, dbPath, dbName string) (ds.Batching, error) { path := filepath.Join(rootify(rootDir, dbPath), dbName) return badger4.NewDatastore(path, BadgerOptions()) } diff --git a/tools/celestia-node-fiber/cmd/fiber-bench/README.md b/tools/celestia-node-fiber/cmd/fiber-bench/README.md new file mode 100644 index 0000000000..d99419f6e8 --- /dev/null +++ b/tools/celestia-node-fiber/cmd/fiber-bench/README.md @@ -0,0 +1,187 @@ +# fiber-bench + +Single-sequencer ev-node throughput bench against a remote Fibre network. + +## What it is + +A self-contained binary that spins up an ev-node aggregator wired to a +remote Fibre network (no bridge node, no P2P, no syncer, no +state-machine cost) and pumps transactions into its mempool as fast as +the configured backpressure allows. + +The intent is a fail-fast baseline so we can isolate ev-node's +batching + DA-submit pipeline from everything else when chasing the +1k tps regression. + +### What's stripped out (and why) + +| Stripped | Why | +|----------------|------------------------------------------------------------| +| Bridge node | Upload only needs consensus gRPC + FSPs. | +| Syncer | Aggregator-only single-node setup. | +| P2P outbound | ev-node already disables it when `da.fiber.enabled=true`. | +| Forced incl. | Solo sequencer. | +| Real state machine | Constant state root — measure ev-node, not state cost. | +| HTTP tx ingress | Direct `InjectTx`. Removes HTTP from the hot path. | + +## Layout + +``` +tools/celestia-node-fiber/cmd/fiber-bench/ + main.go cobra root + keys.go cosmos keyring management (test backend) + escrow.go Fibre escrow deposit/query + run.go the bench + executor.go in-mem core.Executor with constant state root + loader.go internal tx pump + stats.go periodic stats line + final baseline summary + fibre.go bridge-bypass cnfiber.Adapter constructor + run-bench.sh convenience wrapper +``` + +## Quick start + +```sh +cd tools/celestia-node-fiber + +# 1. Build — the `fibre` build tag is REQUIRED so celestia-app's +# x/fibre messages (MsgPayForFibre, MsgDepositToEscrow) are registered +# in the codec. Without it the async PFF settlement fails with +# "unable to resolve type URL /celestia.fibre.v1.MsgPayForFibre". +go build -tags fibre -o bin/fiber-bench ./cmd/fiber-bench/ + +# 2. Create the bench key (cosmos keyring, test backend = unencrypted on disk) +./bin/fiber-bench keys add bench +# prints: address: celestia1... +# mnemonic: ... + +# 3. Top up the printed address with utia on the chain (out of band). + +# 4. Deposit into the Fibre escrow +./bin/fiber-bench escrow deposit \ + --consensus-grpc 139.59.229.101:9091 \ + --key-name bench \ + --amount 50000000 # 50 TIA + +# 5. Sanity check +./bin/fiber-bench escrow query \ + --consensus-grpc 139.59.229.101:9091 \ + --key-name bench + +# 6. Run the bench +./bin/fiber-bench run \ + --evnode.da.fiber.consensus_address 139.59.229.101:9091 \ + --evnode.da.fiber.consensus_chain_id \ + --evnode.da.fiber.key_name bench \ + --duration 2m \ + --workers 32 \ + --tx-size 200 \ + --evnode.node.block_time 1s \ + --evnode.da.batching_strategy immediate +``` + +The bench reuses canonical ev-node flags (`--evnode.*`) registered by +`pkg/config.AddFlags` rather than defining bench-specific aliases. See +`fiber-bench run --help` for the full list — anything you'd configure on +testapp/evm/grpc apps works here too. + +Or use the convenience wrapper: + +```sh +CONSENSUS_GRPC=139.59.229.101:9091 \ +CHAIN_ID=talis-slab-diag \ + ./cmd/fiber-bench/run-bench.sh 2m 32 +``` + +## What the run prints + +A header, then one line per `--stats-interval` (default 1s): + +``` +elapsed injected inj/s exec/s blocks/s committed_h txs/blk blob_bytes pending drops +------------------------------------------------------------------------------------------------------ +1s 1452609 1452212 0 0.00 0 0.0 0 0 293116 +2s 1544094 91444 0 0.00 0 0.0 0 0 1007270 +``` + +Columns: + +- `injected` — total txs the load generator has called `InjectTx` for +- `inj/s` — injection rate over the last interval +- `exec/s` — txs included in produced blocks (rate) +- `blocks/s` — block production rate +- `committed_h` — last block height confirmed by DA (0 until first + Upload settles) +- `txs/blk` — running average over all blocks +- `blob_bytes` — last block's data size in bytes +- `pending` — `evnode_da_submitter_pending_blobs` gauge +- `drops` — txs the load generator could not enqueue because the + in-mem mempool channel was full (this is the backpressure signal) + +At the end: + +``` +============================================================ + BASELINE SUMMARY +============================================================ +Duration: 2m0s +Injected: XXX (avg N tx/s, peak N tx/s) +Dropped (mempool full): XXX +Mempool high-water: XXX +Blocks produced: XXX (committed_h=YYY) +Txs executed: XXX (avg N tx/s, peak N tx/s, T tx/blk) +============================================================ +``` + +## Knobs worth flipping while debugging + +Bench-local flags: + +| Flag | Default | Why | +|-------------------------|--------------|---------------------------------------------------| +| `--workers` | `32` | Tx-injection concurrency | +| `--tx-size` | `200` | Bytes per tx (matches user-reported regression) | +| `--mempool-size` | `1_000_000` | Bench's bounded backpressure boundary | +| `--keep-home` | `false` | Resume from prior state (defaults to wipe) | +| `--duration` | `1m` | How long to run (0 = until SIGINT) | +| `--stats-interval` | `1s` | Stats line cadence | +| `--keyring-dir` | `~/.fiber-bench/keyring` | Cosmos keyring (Fibre payment promises) | +| `--signer-passphrase` | `fiber-bench-passphrase` | ev-node block-signing key passphrase | + +Canonical ev-node flags worth flipping (full list: `run --help`): + +| Flag | Bench default | Why | +|-------------------------------------|---------------|--------------------------------------| +| `--evnode.node.block_time` | `1s` | Drop to `100ms` to expose per-block overhead | +| `--evnode.da.batching_strategy` | `immediate` | Try `time` / `size` / `adaptive` | +| `--evnode.node.scrape_interval` | `100ms` | How often the mempool drain runs | +| `--evnode.node.max_pending_headers_and_data` | `0` | Cap pending DA blobs to test backpressure | +| `--evnode.log.level` | `info` | `debug` to see ev-node block production logs | + +## ev-node Prometheus + +When `--prometheus=true` (default), ev-node exposes metrics at +`http://127.0.0.1:26660/metrics`. The bench scrapes a handful of them +for its stats line, but you can hit the endpoint directly for the full +picture: `evnode_block_production_duration_seconds`, +`evnode_da_submitter_failures_total`, etc. + +## Operational notes + +- **Test-backend keyring**: keys live unencrypted on disk under + `~/.fiber-bench/keyring`. Fine for a bench account funded with a + small amount of utia. Don't use for anything else. +- **The bench wipes its ev-node home (`~/.fiber-bench/node`) on every + run** unless `--keep-home` is passed. Block-signing key, store, and + any in-flight pending blocks all reset. The cosmos keyring is + separate and is preserved. +- **Bridge bypass**: the bench builds the `cnfiber.Adapter` via + `cnfiber.FromModules` with a stub Blob module that errors on every + call. The aggregator-only setup never invokes Listen/Subscribe, so + this is safe; if the assumption breaks, you'll see a clear + `fiber-bench: blob module not supported` error rather than a nil + panic. +- **Chain ID** is what the consensus node reports; the bench logs it + on startup. Pass the same value via `--chain-id` for config + validation; mismatch is logged but tx submission proceeds against + the chain's actual ID. diff --git a/tools/celestia-node-fiber/cmd/fiber-bench/escrow.go b/tools/celestia-node-fiber/cmd/fiber-bench/escrow.go new file mode 100644 index 0000000000..89e55e25c9 --- /dev/null +++ b/tools/celestia-node-fiber/cmd/fiber-bench/escrow.go @@ -0,0 +1,165 @@ +package main + +import ( + "context" + "fmt" + "time" + + sdkmath "cosmossdk.io/math" + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/spf13/cobra" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/celestiaorg/celestia-app/v9/app" + "github.com/celestiaorg/celestia-app/v9/app/encoding" + "github.com/celestiaorg/celestia-app/v9/pkg/appconsts" + "github.com/celestiaorg/celestia-app/v9/pkg/user" + fibretypes "github.com/celestiaorg/celestia-app/v9/x/fibre/types" +) + +// escrowCmd groups Fibre-escrow operations. Uploads consume utia from +// the signer's escrow account; without a funded escrow, every Upload on +// the bench will fail at the chain. +func escrowCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "escrow", + Short: "Manage Fibre escrow for the bench account", + } + cmd.AddCommand(escrowDepositCmd(), escrowQueryCmd()) + return cmd +} + +func escrowDepositCmd() *cobra.Command { + var ( + consensusGRPC string + keyringDir string + keyName string + amountUtia int64 + gasLimit uint64 + feeUtia uint64 + ) + cmd := &cobra.Command{ + Use: "deposit", + Short: "Deposit utia into the bench account's Fibre escrow", + RunE: func(cmd *cobra.Command, args []string) error { + ctx, cancel := context.WithTimeout(cmd.Context(), 60*time.Second) + defer cancel() + + kr, err := openKeyring(keyringDir) + if err != nil { + return fmt.Errorf("open keyring: %w", err) + } + rec, err := kr.Key(keyName) + if err != nil { + return fmt.Errorf("key %q not found in %s: %w", keyName, keyringDir, err) + } + addr, err := rec.GetAddress() + if err != nil { + return fmt.Errorf("get address: %w", err) + } + + conn, err := grpc.NewClient(consensusGRPC, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return fmt.Errorf("dial grpc: %w", err) + } + defer conn.Close() + + ecfg := encoding.MakeConfig(app.ModuleEncodingRegisters...) + tc, err := user.SetupTxClient(ctx, kr, conn, ecfg, user.WithDefaultAccount(keyName)) + if err != nil { + return fmt.Errorf("setup tx client: %w", err) + } + + amount := sdk.NewCoin(appconsts.BondDenom, sdkmath.NewInt(amountUtia)) + msg := &fibretypes.MsgDepositToEscrow{ + Signer: addr.String(), + Amount: amount, + } + fmt.Printf("submitting MsgDepositToEscrow: signer=%s amount=%s\n", addr.String(), amount.String()) + resp, err := tc.SubmitTx(ctx, []sdk.Msg{msg}, user.SetGasLimit(gasLimit), user.SetFee(feeUtia)) + if err != nil { + return fmt.Errorf("submit tx: %w", err) + } + if resp.Code != 0 { + return fmt.Errorf("deposit tx failed: code=%d codespace=%s", resp.Code, resp.Codespace) + } + fmt.Printf("deposit included: height=%d txhash=%s\n", resp.Height, resp.TxHash) + + // Sanity: read the escrow back so the operator sees the + // new balance immediately. + qc := fibretypes.NewQueryClient(conn) + res, err := qc.EscrowAccount(ctx, &fibretypes.QueryEscrowAccountRequest{Signer: addr.String()}) + if err != nil { + fmt.Printf("(could not query escrow back: %v)\n", err) + return nil + } + if !res.Found { + fmt.Println("(escrow not found after deposit — chain may need another block)") + return nil + } + fmt.Printf("escrow balance: %s\n", res.EscrowAccount.Balance.String()) + return nil + }, + } + cmd.Flags().StringVar(&consensusGRPC, "consensus-grpc", "", "celestia-app gRPC address (host:port). Required.") + cmd.Flags().StringVar(&keyringDir, "keyring-dir", defaultKeyringDir(), "directory holding the bench keyring") + cmd.Flags().StringVar(&keyName, "key-name", "default", "key in the keyring to deposit from") + cmd.Flags().Int64Var(&amountUtia, "amount", 50_000_000, "amount in utia to deposit (default 50 TIA)") + cmd.Flags().Uint64Var(&gasLimit, "gas-limit", 200_000, "tx gas limit") + cmd.Flags().Uint64Var(&feeUtia, "fee", 5_000, "fee in utia") + _ = cobra.MarkFlagRequired(cmd.Flags(), "consensus-grpc") + return cmd +} + +func escrowQueryCmd() *cobra.Command { + var ( + consensusGRPC string + keyringDir string + keyName string + ) + cmd := &cobra.Command{ + Use: "query", + Short: "Print the current Fibre escrow balance for the bench account", + RunE: func(cmd *cobra.Command, args []string) error { + ctx, cancel := context.WithTimeout(cmd.Context(), 30*time.Second) + defer cancel() + + kr, err := openKeyring(keyringDir) + if err != nil { + return err + } + rec, err := kr.Key(keyName) + if err != nil { + return fmt.Errorf("key %q not found: %w", keyName, err) + } + addr, err := rec.GetAddress() + if err != nil { + return err + } + + conn, err := grpc.NewClient(consensusGRPC, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + return err + } + defer conn.Close() + + qc := fibretypes.NewQueryClient(conn) + res, err := qc.EscrowAccount(ctx, &fibretypes.QueryEscrowAccountRequest{Signer: addr.String()}) + if err != nil { + return err + } + if !res.Found { + fmt.Printf("address: %s\nescrow: not found (deposit first)\n", addr.String()) + return nil + } + fmt.Printf("address: %s\nescrow: %s\n", addr.String(), res.EscrowAccount.Balance.String()) + return nil + }, + } + cmd.Flags().StringVar(&consensusGRPC, "consensus-grpc", "", "celestia-app gRPC address. Required.") + cmd.Flags().StringVar(&keyringDir, "keyring-dir", defaultKeyringDir(), "keyring directory") + cmd.Flags().StringVar(&keyName, "key-name", "default", "key name") + _ = cobra.MarkFlagRequired(cmd.Flags(), "consensus-grpc") + return cmd +} diff --git a/tools/celestia-node-fiber/cmd/fiber-bench/executor.go b/tools/celestia-node-fiber/cmd/fiber-bench/executor.go new file mode 100644 index 0000000000..088d21ffef --- /dev/null +++ b/tools/celestia-node-fiber/cmd/fiber-bench/executor.go @@ -0,0 +1,156 @@ +package main + +import ( + "context" + "sync/atomic" + "time" + + coreexecution "github.com/evstack/ev-node/core/execution" +) + +// inMemExecutor is a minimal core.Executor that: +// - accepts injected txs via a buffered channel (the "mempool") +// - drains them in GetTxs (non-blocking) +// - "executes" by counting (no state machine) +// - returns a constant state root, so we don't pay O(N) state-root cost on +// every block (which would dominate the measurement and tell us nothing +// about ev-node's batching/submitting performance). +// +// Use FilterTxs's size cap to enforce the configured per-block byte budget. +type inMemExecutor struct { + txCh chan []byte + + injected atomic.Uint64 + dropped atomic.Uint64 + blocksProduced atomic.Uint64 + totalExecutedTxs atomic.Uint64 + + // mempoolHigh tracks the maximum mempool depth observed (snapshot). + mempoolHigh atomic.Int64 + + // constStateRoot is what every block reports as its post-state. The + // measurement target is ev-node, not state computation. + constStateRoot []byte +} + +func newInMemExecutor(mempoolSize int) *inMemExecutor { + return &inMemExecutor{ + txCh: make(chan []byte, mempoolSize), + constStateRoot: []byte("fiber-bench-const-state-root"), + } +} + +// InjectTx is the bench's "mempool entry". Backpressures via channel +// capacity: full → drop and increment counter so the operator sees it. +func (e *inMemExecutor) InjectTx(tx []byte) bool { + select { + case e.txCh <- tx: + e.injected.Add(1) + // Loose mempool-depth high-water; not a hot-path concern. + if d := int64(len(e.txCh)); d > e.mempoolHigh.Load() { + e.mempoolHigh.Store(d) + } + return true + default: + e.dropped.Add(1) + return false + } +} + +func (e *inMemExecutor) MempoolDepth() int { return len(e.txCh) } + +func (e *inMemExecutor) Stats() (injected, dropped, blocks, txs uint64, mempoolHigh int64) { + return e.injected.Load(), + e.dropped.Load(), + e.blocksProduced.Load(), + e.totalExecutedTxs.Load(), + e.mempoolHigh.Load() +} + +// InitChain is called once at genesis. +func (e *inMemExecutor) InitChain(_ context.Context, _ time.Time, _ uint64, _ string) ([]byte, error) { + return e.constStateRoot, nil +} + +// GetTxs drains the mempool channel. Non-blocking — returns whatever is +// currently buffered. ev-node's reaper polls this on its own cadence. +func (e *inMemExecutor) GetTxs(ctx context.Context) ([][]byte, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + } + + n := len(e.txCh) + if n == 0 { + return nil, nil + } + txs := make([][]byte, 0, n) + for i := 0; i < n; i++ { + select { + case tx := <-e.txCh: + txs = append(txs, tx) + default: + return txs, nil + } + } + return txs, nil +} + +// ExecuteTxs is intentionally a no-op state transition: count txs, return +// a constant root. The whole point of this executor is to take state +// computation out of the measurement. +func (e *inMemExecutor) ExecuteTxs(_ context.Context, txs [][]byte, _ uint64, _ time.Time, _ []byte) ([]byte, error) { + e.blocksProduced.Add(1) + e.totalExecutedTxs.Add(uint64(len(txs))) + return e.constStateRoot, nil +} + +func (e *inMemExecutor) SetFinal(_ context.Context, _ uint64) error { return nil } +func (e *inMemExecutor) Rollback(_ context.Context, _ uint64) error { return nil } + +func (e *inMemExecutor) GetExecutionInfo(_ context.Context) (coreexecution.ExecutionInfo, error) { + // MaxGas=0 means "no gas-based filter"; the size cap (FilterTxs) is what + // bounds per-block bytes. + return coreexecution.ExecutionInfo{MaxGas: 0}, nil +} + +// FilterTxs enforces the configured per-block byte budget. Mirrors the +// existing testapp KV executor's behavior: oversized txs are dropped, the +// rest fill until the budget is hit and overflow is postponed for the +// next block. We don't validate tx content — txs from the load generator +// are well-formed by construction. +// +// We honor maxBytes as-is. Per-block proto/Metadata overhead is the +// responsibility of the block-size cap (now anchored to Fibre's actual +// MaxPayload in block/internal/common/consts.go), not the executor. +func (e *inMemExecutor) FilterTxs(_ context.Context, txs [][]byte, maxBytes, _ uint64, _ bool) ([]coreexecution.FilterStatus, error) { + out := make([]coreexecution.FilterStatus, len(txs)) + var used uint64 + limitReached := false + for i, tx := range txs { + size := uint64(len(tx)) + if size == 0 { + out[i] = coreexecution.FilterRemove + continue + } + if maxBytes > 0 && size > maxBytes { + out[i] = coreexecution.FilterRemove + continue + } + if limitReached { + out[i] = coreexecution.FilterPostpone + continue + } + if maxBytes > 0 && used+size > maxBytes { + limitReached = true + out[i] = coreexecution.FilterPostpone + continue + } + used += size + out[i] = coreexecution.FilterOK + } + return out, nil +} + +var _ coreexecution.Executor = (*inMemExecutor)(nil) diff --git a/tools/celestia-node-fiber/cmd/fiber-bench/fibre.go b/tools/celestia-node-fiber/cmd/fiber-bench/fibre.go new file mode 100644 index 0000000000..3a366d0824 --- /dev/null +++ b/tools/celestia-node-fiber/cmd/fiber-bench/fibre.go @@ -0,0 +1,139 @@ +package main + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/cosmos/cosmos-sdk/crypto/keyring" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + appfibre "github.com/celestiaorg/celestia-app/v9/fibre" + libshare "github.com/celestiaorg/go-square/v4/share" + + "github.com/celestiaorg/celestia-node/blob" + "github.com/celestiaorg/celestia-node/fibre" + blobapi "github.com/celestiaorg/celestia-node/nodebuilder/blob" + nodebuilderfibre "github.com/celestiaorg/celestia-node/nodebuilder/fibre" + "github.com/celestiaorg/celestia-node/state/txclient" + + "github.com/evstack/ev-node/block" + cnfiber "github.com/evstack/ev-node/tools/celestia-node-fiber" +) + +// buildFibreAdapter constructs a celestia-node-fiber Adapter that talks +// directly to consensus gRPC + FSPs — no bridge node hop. We do this by +// rebuilding only the submit-side wiring of celestia-node's api/client +// (which is otherwise eager about dialing BridgeDAAddr in NewReadClient). +// +// The returned adapter only supports Upload (and Download via FSPs). +// Listen would invoke a stub blob.Subscribe that returns an error; +// ev-node's aggregator-only setup never calls it (no syncer, no based +// sequencer), so this is fine. +// +// The returned closer releases the gRPC connection and stops the +// underlying app-level fibre client. +func buildFibreAdapter( + ctx context.Context, + consensusGRPC string, + keyName string, + kr keyring.Keyring, +) (block.FiberClient, func() error, error) { + if consensusGRPC == "" { + return nil, nil, errors.New("consensus gRPC address is required") + } + if keyName == "" { + return nil, nil, errors.New("key name is required") + } + + conn, err := grpc.NewClient( + consensusGRPC, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + return nil, nil, fmt.Errorf("dial consensus grpc %q: %w", consensusGRPC, err) + } + + tc, err := txclient.NewTxClient(kr, keyName, conn) + if err != nil { + _ = conn.Close() + return nil, nil, fmt.Errorf("new tx client: %w", err) + } + if err := tc.Start(ctx); err != nil { + _ = conn.Close() + return nil, nil, fmt.Errorf("start tx client: %w", err) + } + + appCfg := appfibre.DefaultClientConfig() + appCfg.DefaultKeyName = keyName + appCfg.StateAddress = conn.Target() + appClient, err := appfibre.NewClient(kr, appCfg) + if err != nil { + _ = tc.Stop(ctx) + _ = conn.Close() + return nil, nil, fmt.Errorf("new app fibre client: %w", err) + } + if err := appClient.Start(ctx); err != nil { + _ = tc.Stop(ctx) + _ = conn.Close() + return nil, nil, fmt.Errorf("start app fibre client: %w", err) + } + + accClient := fibre.NewAccountClient(tc, conn) + svc := fibre.NewService(appClient, tc, accClient) + module := nodebuilderfibre.NewModule(svc) + + adapter := cnfiber.FromModules(module, noBridgeBlob{}, 0) + + closer := func() error { + stopCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + var errs error + if err := appClient.Stop(stopCtx); err != nil { + errs = errors.Join(errs, err) + } + if err := tc.Stop(stopCtx); err != nil { + errs = errors.Join(errs, err) + } + if err := conn.Close(); err != nil { + errs = errors.Join(errs, err) + } + return errs + } + + return adapter, closer, nil +} + +// noBridgeBlob errors on every call. The only path that would invoke it +// is Listen→Subscribe, which our aggregator-only single-sequencer node +// never reaches. A clear error here surfaces an assumption break instead +// of a nil panic. +type noBridgeBlob struct{} + +var _ blobapi.Module = noBridgeBlob{} + +var errNoBridge = errors.New("fiber-bench: blob module not supported (running without a bridge node)") + +func (noBridgeBlob) Submit(context.Context, []*blob.Blob, *blob.SubmitOptions) (uint64, error) { + return 0, errNoBridge +} +func (noBridgeBlob) Get(context.Context, uint64, libshare.Namespace, blob.Commitment) (*blob.Blob, error) { + return nil, errNoBridge +} +func (noBridgeBlob) GetAll(context.Context, uint64, []libshare.Namespace) ([]*blob.Blob, error) { + return nil, errNoBridge +} +func (noBridgeBlob) GetProof(context.Context, uint64, libshare.Namespace, blob.Commitment) (*blob.Proof, error) { + return nil, errNoBridge +} +func (noBridgeBlob) Included(context.Context, uint64, libshare.Namespace, *blob.Proof, blob.Commitment) (bool, error) { + return false, errNoBridge +} +func (noBridgeBlob) GetCommitmentProof(context.Context, uint64, libshare.Namespace, []byte) (*blob.CommitmentProof, error) { + return nil, errNoBridge +} +func (noBridgeBlob) Subscribe(context.Context, libshare.Namespace, uint64) (<-chan *blob.SubscriptionResponse, error) { + return nil, errNoBridge +} diff --git a/tools/celestia-node-fiber/cmd/fiber-bench/instrumented.go b/tools/celestia-node-fiber/cmd/fiber-bench/instrumented.go new file mode 100644 index 0000000000..3acdfea794 --- /dev/null +++ b/tools/celestia-node-fiber/cmd/fiber-bench/instrumented.go @@ -0,0 +1,138 @@ +package main + +import ( + "context" + "sort" + "sync" + "sync/atomic" + "time" + + "github.com/evstack/ev-node/block" +) + +// instrumentedAdapter wraps a block.FiberClient and records latency +// per Upload (and per Download) call. The bench's stats printer +// reads percentiles from here so we can answer "is the bottleneck +// ev-node's submitter serialization, or actual Fibre Upload time?". +// +// We keep the last N samples in a ring buffer rather than an +// unbounded slice so a long run does not grow memory; N is sized for +// a 30-minute run at peak block rate. +type instrumentedAdapter struct { + inner block.FiberClient + + uploadCount atomic.Uint64 + uploadFailures atomic.Uint64 + uploadBytesSent atomic.Uint64 + + mu sync.Mutex + samples []time.Duration // ring buffer of recent durations + idx int // next slot to write + full bool // ring buffer has wrapped at least once +} + +const uploadSampleCapacity = 4096 + +func newInstrumentedAdapter(inner block.FiberClient) *instrumentedAdapter { + return &instrumentedAdapter{ + inner: inner, + samples: make([]time.Duration, uploadSampleCapacity), + } +} + +func (a *instrumentedAdapter) Upload(ctx context.Context, namespace []byte, data []byte) (block.FiberUploadResult, error) { + start := time.Now() + res, err := a.inner.Upload(ctx, namespace, data) + elapsed := time.Since(start) + + a.uploadCount.Add(1) + if err != nil { + a.uploadFailures.Add(1) + } else { + a.uploadBytesSent.Add(uint64(len(data))) + } + + a.mu.Lock() + a.samples[a.idx] = elapsed + a.idx = (a.idx + 1) % len(a.samples) + if a.idx == 0 { + a.full = true + } + a.mu.Unlock() + + return res, err +} + +func (a *instrumentedAdapter) Download(ctx context.Context, blobID block.FiberBlobID) ([]byte, error) { + return a.inner.Download(ctx, blobID) +} + +func (a *instrumentedAdapter) Listen(ctx context.Context, namespace []byte, fromHeight uint64) (<-chan block.FiberBlobEvent, error) { + return a.inner.Listen(ctx, namespace, fromHeight) +} + +// uploadStats returns snapshot p50, p99, mean of recent Upload +// durations plus cumulative counters. Returns zero durations when +// no samples have been recorded yet. +type uploadStats struct { + Count uint64 + Failures uint64 + BytesOK uint64 + P50 time.Duration + P99 time.Duration + Mean time.Duration + Max time.Duration +} + +func (a *instrumentedAdapter) uploadStats() uploadStats { + a.mu.Lock() + var n int + if a.full { + n = len(a.samples) + } else { + n = a.idx + } + if n == 0 { + a.mu.Unlock() + return uploadStats{ + Count: a.uploadCount.Load(), + Failures: a.uploadFailures.Load(), + BytesOK: a.uploadBytesSent.Load(), + } + } + // Copy under lock so we can sort outside it. + cp := make([]time.Duration, n) + copy(cp, a.samples[:n]) + a.mu.Unlock() + + sort.Slice(cp, func(i, j int) bool { return cp[i] < cp[j] }) + + var sum time.Duration + for _, d := range cp { + sum += d + } + + pct := func(p float64) time.Duration { + idx := int(float64(n-1) * p) + if idx < 0 { + idx = 0 + } + if idx >= n { + idx = n - 1 + } + return cp[idx] + } + + return uploadStats{ + Count: a.uploadCount.Load(), + Failures: a.uploadFailures.Load(), + BytesOK: a.uploadBytesSent.Load(), + P50: pct(0.50), + P99: pct(0.99), + Mean: sum / time.Duration(n), + Max: cp[n-1], + } +} + +// Compile-time guard: must satisfy the same interface ev-node consumes. +var _ block.FiberClient = (*instrumentedAdapter)(nil) diff --git a/tools/celestia-node-fiber/cmd/fiber-bench/keys.go b/tools/celestia-node-fiber/cmd/fiber-bench/keys.go new file mode 100644 index 0000000000..7b1e83ce93 --- /dev/null +++ b/tools/celestia-node-fiber/cmd/fiber-bench/keys.go @@ -0,0 +1,153 @@ +package main + +import ( + "fmt" + + "github.com/cosmos/cosmos-sdk/client" + "github.com/cosmos/cosmos-sdk/codec" + "github.com/cosmos/cosmos-sdk/codec/types" + "github.com/cosmos/cosmos-sdk/crypto/hd" + "github.com/cosmos/cosmos-sdk/crypto/keyring" + cryptocodec "github.com/cosmos/cosmos-sdk/crypto/codec" + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/spf13/cobra" +) + +// openKeyring opens (or creates if missing) a test-backend keyring at +// keyringDir. The "test" backend is unencrypted on disk — fine for a +// bench account, not fine for anything mainnet. +func openKeyring(keyringDir string) (keyring.Keyring, error) { + interfaceRegistry := types.NewInterfaceRegistry() + cryptocodec.RegisterInterfaces(interfaceRegistry) + cdc := codec.NewProtoCodec(interfaceRegistry) + return keyring.New( + "fiber-bench", + keyring.BackendTest, + keyringDir, + nil, + cdc, + ) +} + +func keysCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "keys", + Short: "Manage the cosmos keyring used to sign Fibre payment promises", + } + cmd.AddCommand(keysAddCmd(), keysShowCmd(), keysListCmd()) + return cmd +} + +func keysAddCmd() *cobra.Command { + var keyringDir string + cmd := &cobra.Command{ + Use: "add ", + Short: "Create a new key in the bench keyring (test backend, unencrypted)", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + name := args[0] + kr, err := openKeyring(keyringDir) + if err != nil { + return fmt.Errorf("open keyring: %w", err) + } + + if rec, _ := kr.Key(name); rec != nil { + return fmt.Errorf("key %q already exists in keyring %s", name, keyringDir) + } + + rec, mnemonic, err := kr.NewMnemonic( + name, + keyring.English, + sdk.FullFundraiserPath, + keyring.DefaultBIP39Passphrase, + hd.Secp256k1, + ) + if err != nil { + return fmt.Errorf("create key: %w", err) + } + + addr, err := rec.GetAddress() + if err != nil { + return fmt.Errorf("get address: %w", err) + } + + fmt.Printf("name: %s\n", name) + fmt.Printf("address: %s\n", addr.String()) + fmt.Printf("keyring: %s (backend=test)\n", keyringDir) + fmt.Printf("\nmnemonic (back this up — printed once, never stored elsewhere):\n%s\n", mnemonic) + fmt.Printf("\nNext steps:\n") + fmt.Printf(" 1. Top up the address above with utia on the chain.\n") + fmt.Printf(" 2. Deposit into the Fibre escrow with celestia-appd or your tooling, e.g.\n") + fmt.Printf(" celestia-appd tx fibre deposit-escrow --from %s --keyring-backend test --keyring-dir %s --chain-id --node tcp://\n", name, keyringDir) + return nil + }, + } + cmd.Flags().StringVar(&keyringDir, "keyring-dir", defaultKeyringDir(), "directory to store keyring files (test backend)") + return cmd +} + +func keysShowCmd() *cobra.Command { + var keyringDir string + cmd := &cobra.Command{ + Use: "show ", + Short: "Print the address of an existing key", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + kr, err := openKeyring(keyringDir) + if err != nil { + return fmt.Errorf("open keyring: %w", err) + } + rec, err := kr.Key(args[0]) + if err != nil { + return fmt.Errorf("get key: %w", err) + } + addr, err := rec.GetAddress() + if err != nil { + return fmt.Errorf("get address: %w", err) + } + fmt.Printf("name: %s\n", args[0]) + fmt.Printf("address: %s\n", addr.String()) + fmt.Printf("keyring: %s (backend=test)\n", keyringDir) + return nil + }, + } + cmd.Flags().StringVar(&keyringDir, "keyring-dir", defaultKeyringDir(), "directory holding keyring files (test backend)") + return cmd +} + +func keysListCmd() *cobra.Command { + var keyringDir string + cmd := &cobra.Command{ + Use: "list", + Short: "List all keys in the bench keyring", + RunE: func(cmd *cobra.Command, args []string) error { + kr, err := openKeyring(keyringDir) + if err != nil { + return fmt.Errorf("open keyring: %w", err) + } + records, err := kr.List() + if err != nil { + return fmt.Errorf("list keys: %w", err) + } + if len(records) == 0 { + fmt.Printf("(empty — keyring at %s)\n", keyringDir) + return nil + } + for _, rec := range records { + addr, err := rec.GetAddress() + if err != nil { + return err + } + fmt.Printf("%-20s %s\n", rec.Name, addr.String()) + } + return nil + }, + } + cmd.Flags().StringVar(&keyringDir, "keyring-dir", defaultKeyringDir(), "directory holding keyring files (test backend)") + return cmd +} + +// silenceUnusedClient keeps the SDK client package referenced even if a +// future refactor stops using it directly — convenient when wiring a +// proper send/escrow command. +var _ = client.Context{} diff --git a/tools/celestia-node-fiber/cmd/fiber-bench/loader.go b/tools/celestia-node-fiber/cmd/fiber-bench/loader.go new file mode 100644 index 0000000000..4f527e8dc8 --- /dev/null +++ b/tools/celestia-node-fiber/cmd/fiber-bench/loader.go @@ -0,0 +1,89 @@ +package main + +import ( + "context" + "encoding/binary" + "sync" + "sync/atomic" + "time" +) + +// loaderBackoff is what each worker waits when InjectTx returns false +// because the mempool channel is full. A real sleep (rather than +// runtime.Gosched) caps the per-worker drop rate so allocation +// pressure scales with actual drain throughput; without it, full- +// mempool workers spin a tight allocate-then-drop loop at ~200k +// iter/s/worker — millions of short-lived slices per second across the +// pool, which drove the OOM kills we hit early in the investigation. +// 100 µs caps each worker at ~10k drops/s when the mempool is +// permanently full. +const loaderBackoff = 100 * time.Microsecond + +// loader pumps fixed-size payloads into the in-mem executor as fast as it +// can. Backpressure comes from the executor's bounded mempool channel: +// when full, InjectTx returns false and we count it as dropped. +// +// Each payload is `txSize` bytes: a tx-id (uint64) prefix + zero filler. +// Non-deterministic content isn't important — ev-node hashes them for +// the seen-tx cache, so any unique-per-tx prefix is enough to avoid +// dedup hits. +type loader struct { + exec *inMemExecutor + workers int + txSize int + + // counter monotonically increments per generated tx so the + // SHA-256-based seen cache never falsely dedups. + counter atomic.Uint64 +} + +func newLoader(exec *inMemExecutor, workers, txSize int) *loader { + if workers < 1 { + workers = 1 + } + if txSize < 8 { + txSize = 8 + } + return &loader{ + exec: exec, + workers: workers, + txSize: txSize, + } +} + +// run blocks until ctx is done. Each worker spins on InjectTx — when +// full, it briefly yields. We don't sleep-back-off because the entire +// point of the bench is to keep the executor's mempool pressed against +// its bound. +func (l *loader) run(ctx context.Context) { + var wg sync.WaitGroup + for i := 0; i < l.workers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + buf := make([]byte, l.txSize) + for { + if ctx.Err() != nil { + return + } + id := l.counter.Add(1) + binary.BigEndian.PutUint64(buf, id) + // Copy on each Inject — the executor's mempool is a + // channel of []byte, and the consumer keeps a + // reference. Reusing the same buffer would corrupt + // in-flight items. + tx := make([]byte, l.txSize) + copy(tx, buf) + if !l.exec.InjectTx(tx) { + // Mempool full — back off briefly and retry. + select { + case <-ctx.Done(): + return + case <-time.After(loaderBackoff): + } + } + } + }() + } + wg.Wait() +} diff --git a/tools/celestia-node-fiber/cmd/fiber-bench/main.go b/tools/celestia-node-fiber/cmd/fiber-bench/main.go new file mode 100644 index 0000000000..1671e4a784 --- /dev/null +++ b/tools/celestia-node-fiber/cmd/fiber-bench/main.go @@ -0,0 +1,60 @@ +// Package main is the fiber-bench tool: a single-sequencer ev-node wired +// to a remote Fibre network for throughput measurement. +// +// It deliberately runs in the simplest possible configuration: +// +// - Solo sequencer (no based / no forced inclusion) +// - Aggregator-only (no syncer, no P2P) +// - In-memory executor with constant state root (no state computation +// cost in the measurement) +// - Bridge-bypass Fibre adapter (Upload directly via consensus gRPC + FSPs) +// +// The intent is a fail-fast baseline so we can isolate ev-node's batching +// + DA-submit pipeline from everything else. +package main + +import ( + "fmt" + "os" + + "github.com/spf13/cobra" + + // Pull celestia-app params for its init() that sets the global SDK + // bech32 prefix to "celestia" — must run before any keyring operation + // that prints addresses. + _ "github.com/celestiaorg/celestia-app/v9/app/params" + + rollconf "github.com/evstack/ev-node/pkg/config" +) + +// AppName names the binary. The home dir intentionally lives one level +// deeper at ~/.fiber-bench/node so the bench's --keep-home=false default +// (which os.RemoveAll's cfg.RootDir) cannot wipe the cosmos keyring at +// ~/.fiber-bench/keyring. +const ( + AppName = "fiber-bench" + defaultHomeAppName = AppName + "/node" +) + +func main() { + root := &cobra.Command{ + Use: AppName, + Short: "Single-sequencer ev-node throughput bench against a remote Fibre network", + } + + // Register --home, --evnode.log.level, --evnode.log.format, + // --evnode.log.trace on the root so every subcommand inherits them + // (matches apps/testapp). + rollconf.AddGlobalFlags(root, defaultHomeAppName) + + root.AddCommand( + keysCmd(), + escrowCmd(), + runCmd(), + ) + + if err := root.Execute(); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } +} diff --git a/tools/celestia-node-fiber/cmd/fiber-bench/run-bench.sh b/tools/celestia-node-fiber/cmd/fiber-bench/run-bench.sh new file mode 100755 index 0000000000..43ea31d0be --- /dev/null +++ b/tools/celestia-node-fiber/cmd/fiber-bench/run-bench.sh @@ -0,0 +1,74 @@ +#!/usr/bin/env bash +# run-bench.sh — convenience wrapper around `fiber-bench` for the +# common case: build the binary if missing, ensure a key exists, +# print the address, then start a run. +# +# Usage: +# CONSENSUS_GRPC=139.59.229.101:9091 \ +# CHAIN_ID=talis-evnode \ +# ./run-bench.sh [duration] [workers] +# +# All optional flags pass through via FIBER_BENCH_ARGS. +set -euo pipefail + +cd "$(dirname "$0")/../../.." + +CONSENSUS_GRPC="${CONSENSUS_GRPC:-}" +CHAIN_ID="${CHAIN_ID:-}" +KEYRING_DIR="${KEYRING_DIR:-$HOME/.fiber-bench/keyring}" +KEY_NAME="${KEY_NAME:-bench}" +DURATION="${1:-${DURATION:-2m}}" +WORKERS="${2:-${WORKERS:-32}}" +TX_SIZE="${TX_SIZE:-200}" +BLOCK_TIME="${BLOCK_TIME:-1s}" +BATCHING="${BATCHING:-immediate}" +HOME_DIR="${HOME_DIR:-$HOME/.fiber-bench/node}" + +if [[ -z "$CONSENSUS_GRPC" || -z "$CHAIN_ID" ]]; then + echo "ERROR: CONSENSUS_GRPC and CHAIN_ID must be set" >&2 + echo " example: CONSENSUS_GRPC=host:9091 CHAIN_ID=talis-evnode $0" >&2 + exit 1 +fi + +BIN="$(pwd)/bin/fiber-bench" +mkdir -p "$(dirname "$BIN")" + +if [[ ! -x "$BIN" || -n "${REBUILD:-}" ]]; then + echo "==> building fiber-bench (-tags fibre)" + go build -tags fibre -o "$BIN" ./cmd/fiber-bench/ +fi + +# Create the bench key if missing — idempotent: `keys add` errors if the +# key exists, so we only run it on a fresh keyring. +if ! "$BIN" keys show "$KEY_NAME" --keyring-dir "$KEYRING_DIR" >/dev/null 2>&1; then + echo "==> creating bench key '$KEY_NAME' at $KEYRING_DIR" + "$BIN" keys add "$KEY_NAME" --keyring-dir "$KEYRING_DIR" + echo + echo "Top up the address above and run:" + echo " $BIN escrow deposit --consensus-grpc $CONSENSUS_GRPC \\" + echo " --keyring-dir $KEYRING_DIR --key-name $KEY_NAME --amount 50000000" + echo + echo "Then re-run this script." + exit 0 +fi + +echo "==> bench account:" +"$BIN" keys show "$KEY_NAME" --keyring-dir "$KEYRING_DIR" + +echo "==> escrow:" +"$BIN" escrow query --consensus-grpc "$CONSENSUS_GRPC" \ + --keyring-dir "$KEYRING_DIR" --key-name "$KEY_NAME" || true + +echo "==> starting bench: duration=$DURATION workers=$WORKERS tx_size=$TX_SIZE block_time=$BLOCK_TIME batching=$BATCHING" +exec "$BIN" run \ + --evnode.da.fiber.consensus_address "$CONSENSUS_GRPC" \ + --evnode.da.fiber.consensus_chain_id "$CHAIN_ID" \ + --evnode.da.fiber.key_name "$KEY_NAME" \ + --keyring-dir "$KEYRING_DIR" \ + --home "$HOME_DIR" \ + --duration "$DURATION" \ + --workers "$WORKERS" \ + --tx-size "$TX_SIZE" \ + --evnode.node.block_time "$BLOCK_TIME" \ + --evnode.da.batching_strategy "$BATCHING" \ + ${FIBER_BENCH_ARGS:-} diff --git a/tools/celestia-node-fiber/cmd/fiber-bench/run.go b/tools/celestia-node-fiber/cmd/fiber-bench/run.go new file mode 100644 index 0000000000..0666790c26 --- /dev/null +++ b/tools/celestia-node-fiber/cmd/fiber-bench/run.go @@ -0,0 +1,306 @@ +package main + +import ( + "context" + "errors" + "fmt" + "os" + "os/signal" + "path/filepath" + "sync" + "syscall" + "time" + + "github.com/spf13/cobra" + + "github.com/evstack/ev-node/node" + rollcmd "github.com/evstack/ev-node/pkg/cmd" + rollconf "github.com/evstack/ev-node/pkg/config" + "github.com/evstack/ev-node/pkg/genesis" + "github.com/evstack/ev-node/pkg/p2p/key" + "github.com/evstack/ev-node/pkg/sequencers/solo" + "github.com/evstack/ev-node/pkg/signer/file" + "github.com/evstack/ev-node/pkg/store" +) + +// Bench-local flag names. The rest come from rollconf.AddFlags +// (--evnode.da.fiber.consensus_address, --evnode.da.batching_strategy, …) +// and rollconf.AddGlobalFlags (--home, --log.level, …). +const ( + flagKeyringDir = "keyring-dir" + flagKeepHome = "keep-home" + flagDuration = "duration" + flagWorkers = "workers" + flagTxSize = "tx-size" + flagMempoolSize = "mempool-size" + flagStatsInterval = "stats-interval" + flagSignerPassphrase = "signer-passphrase" +) + +func runCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "run", + Short: "Run the bench: start a single-sequencer ev-node against a Fibre network and pump load", + RunE: runBench, + } + + // Canonical ev-node flags: --evnode.node.*, --evnode.da.*, + // --evnode.da.fiber.*, --evnode.signer.*, --evnode.instrumentation.*, + // --evnode.p2p.*, --evnode.signer.passphrase_file, etc. The bench + // applies opinionated defaults post-parse for the ones a thoughtful + // operator would otherwise have to flip every run (see runBench). + rollconf.AddFlags(cmd) + + flags := cmd.Flags() + flags.String(flagKeyringDir, defaultKeyringDir(), "directory holding the bench cosmos keyring (test backend) used to sign Fibre payment promises") + flags.Bool(flagKeepHome, false, "do not wipe the ev-node home before starting (resumes prior state)") + flags.Duration(flagDuration, 60*time.Second, "how long to run the bench before stopping (0 = until SIGINT)") + flags.Int(flagWorkers, 32, "number of concurrent tx-injection goroutines") + flags.Int(flagTxSize, 200, "size of each generated tx in bytes") + flags.Int(flagMempoolSize, 1_000_000, "size of the in-mem executor's mempool channel (backpressure boundary)") + flags.Duration(flagStatsInterval, time.Second, "how often to print a stats line") + flags.String(flagSignerPassphrase, "fiber-bench-passphrase", "passphrase for the ev-node file signer (block-signing key, NOT the cosmos one). Written to a temp file consumed by --evnode.signer.passphrase_file.") + + // Fibre consensus address/chain ID don't have empty defaults + // (DefaultConfig points at 127.0.0.1:9090 / mocha-4), but those + // values are sentinels — running the bench against them is never + // what the operator wants. Force them through. + _ = cobra.MarkFlagRequired(flags, rollconf.FlagDAFiberConsensusAddress) + _ = cobra.MarkFlagRequired(flags, rollconf.FlagDAFiberConsensusChainID) + + return cmd +} + +func runBench(cobraCmd *cobra.Command, _ []string) error { + cfg, err := rollcmd.ParseConfig(cobraCmd) + if err != nil { + return err + } + applyBenchDefaults(cobraCmd, &cfg) + + // Re-validate after the bench's overrides — ParseConfig already ran + // once on parse, but we mutated Aggregator/Fiber/etc. afterwards. + if err := cfg.Validate(); err != nil { + return fmt.Errorf("config invalid after bench overrides: %w", err) + } + + logger := rollcmd.SetupLogger(cfg.Log) + + keyringDir, _ := cobraCmd.Flags().GetString(flagKeyringDir) + keepHome, _ := cobraCmd.Flags().GetBool(flagKeepHome) + duration, _ := cobraCmd.Flags().GetDuration(flagDuration) + workers, _ := cobraCmd.Flags().GetInt(flagWorkers) + txSize, _ := cobraCmd.Flags().GetInt(flagTxSize) + mempoolSize, _ := cobraCmd.Flags().GetInt(flagMempoolSize) + statsInterval, _ := cobraCmd.Flags().GetDuration(flagStatsInterval) + signerPassphrase, _ := cobraCmd.Flags().GetString(flagSignerPassphrase) + + if !keepHome { + _ = os.RemoveAll(cfg.RootDir) + } + if err := os.MkdirAll(cfg.RootDir, 0o755); err != nil { + return fmt.Errorf("create home %s: %w", cfg.RootDir, err) + } + + // 1) Cosmos keyring + bridge-bypass Fibre adapter — the two genuinely + // fiber-bench-specific pieces. Neither lives in the production wiring + // path. + kr, err := openKeyring(keyringDir) + if err != nil { + return fmt.Errorf("open keyring at %s: %w", keyringDir, err) + } + rec, err := kr.Key(cfg.DA.Fiber.KeyName) + if err != nil { + return fmt.Errorf("key %q not found in keyring %s — run `fiber-bench keys add %s` first: %w", + cfg.DA.Fiber.KeyName, keyringDir, cfg.DA.Fiber.KeyName, err) + } + addr, err := rec.GetAddress() + if err != nil { + return fmt.Errorf("derive key address: %w", err) + } + logger.Info().Str("address", addr.String()).Str("key", cfg.DA.Fiber.KeyName).Msg("loaded fibre signing key") + + logger.Info().Str("grpc", cfg.DA.Fiber.ConsensusAddress).Msg("dialing consensus gRPC") + innerFiberClient, fiberClose, err := buildFibreAdapter(cobraCmd.Context(), cfg.DA.Fiber.ConsensusAddress, cfg.DA.Fiber.KeyName, kr) + if err != nil { + return fmt.Errorf("build fibre adapter: %w", err) + } + defer func() { + if err := fiberClose(); err != nil { + logger.Warn().Err(err).Msg("fibre adapter close") + } + }() + // Wrap in a latency-recording proxy so the stats printer can show + // per-Upload p50/p99. + fiberClient := newInstrumentedAdapter(innerFiberClient) + + // 2) ev-node block-signing key. Created in cfg.Signer.SignerPath if + // missing. cmd.StartNode reads the passphrase from the path stored + // in --evnode.signer.passphrase_file; we write a temp file from + // --signer-passphrase and inject the flag value so the canonical + // signer-loading path works without us asking the operator to manage + // a passphrase file by hand. + signerDir := cfg.Signer.SignerPath + if signerDir == "" { + signerDir = filepath.Join(cfg.RootDir, "config") + } + if !filepath.IsAbs(signerDir) { + signerDir = filepath.Join(cfg.RootDir, signerDir) + } + cfg.Signer.SignerPath = signerDir + if err := os.MkdirAll(signerDir, 0o750); err != nil { + return fmt.Errorf("create signer dir: %w", err) + } + signerFile := filepath.Join(signerDir, "signer.json") + if _, statErr := os.Stat(signerFile); os.IsNotExist(statErr) { + s, err := file.CreateFileSystemSigner(signerDir, []byte(signerPassphrase)) + if err != nil { + return fmt.Errorf("create file signer: %w", err) + } + if _, err := s.GetAddress(); err != nil { + return fmt.Errorf("signer address: %w", err) + } + } + passphraseFile := filepath.Join(cfg.RootDir, "passphrase.txt") + if err := os.WriteFile(passphraseFile, []byte(signerPassphrase), 0o600); err != nil { + return fmt.Errorf("write passphrase file: %w", err) + } + if err := cobraCmd.Flags().Set(rollconf.FlagSignerPassphraseFile, passphraseFile); err != nil { + return fmt.Errorf("set passphrase flag: %w", err) + } + + // Reload the signer to derive the genesis proposer address. + loaded, err := file.LoadFileSystemSigner(signerDir, []byte(signerPassphrase)) + if err != nil { + return fmt.Errorf("load file signer: %w", err) + } + signerAddr, err := loaded.GetAddress() + if err != nil { + return fmt.Errorf("signer address: %w", err) + } + + // 3) Genesis. Single proposer = our signer. + gen := genesis.NewGenesis(cfg.DA.Fiber.ConsensusChainID, 1, time.Now().UTC(), signerAddr) + if err := gen.Validate(); err != nil { + return fmt.Errorf("invalid genesis: %w", err) + } + + // 4) Datastore + node-key + executor + sequencer. The first three + // look identical to what testapp/cmd/run.go does; the executor + // is the bench-specific in-memory variant (constant state root, + // see executor.go for rationale) and the sequencer is solo (no + // based-sequencer / no forced inclusion machinery). + ds, err := store.NewDefaultKVStore(cfg.RootDir, cfg.DBPath, "fiber-bench") + if err != nil { + return fmt.Errorf("open datastore: %w", err) + } + // Match canonical layout: node_key.json under /config/, the + // same dir testapp/cmd/run.go reads it from. + nodeKey, err := key.LoadOrGenNodeKey(filepath.Dir(cfg.ConfigPath())) + if err != nil { + return fmt.Errorf("node key: %w", err) + } + exec := newInMemExecutor(mempoolSize) + seq := solo.NewSoloSequencer(logger, []byte(gen.ChainID), exec) + + // 5) Spawn loader + stats printer BEFORE cmd.StartNode (which + // blocks). They run for the lifetime of the bench. cmd.StartNode + // owns its own signal-handling goroutine; we send SIGINT to + // ourselves when the duration timer expires so it can exit + // through its normal shutdown path. + bgCtx, bgCancel := signal.NotifyContext(cobraCmd.Context(), os.Interrupt, syscall.SIGTERM) + defer bgCancel() + + var loaderWg sync.WaitGroup + loaderWg.Add(1) + go func() { + defer loaderWg.Done() + newLoader(exec, workers, txSize).run(bgCtx) + }() + + printer := newStatsPrinter(exec, cfg.Instrumentation.PrometheusListenAddr, txSize, fiberClient) + printer.start(bgCtx, statsInterval) + + logger.Info(). + Dur("duration", duration). + Int("workers", workers). + Int("tx_size", txSize). + Int("mempool", mempoolSize). + Dur("block_time", cfg.Node.BlockTime.Duration). + Str("batching", cfg.DA.BatchingStrategy). + Msg("bench started") + + if duration > 0 { + go func() { + select { + case <-time.After(duration): + logger.Info().Msg("duration elapsed, sending SIGINT to trigger shutdown") + _ = syscall.Kill(syscall.Getpid(), syscall.SIGINT) + case <-bgCtx.Done(): + } + }() + } + + // 6) The actual node — let cmd.StartNode do all the wiring (signer + // load, DA client, p2p, node.NewNode, run loop with shutdown). Same + // call testapp/evm/grpc apps make. + startErr := rollcmd.StartNode( + logger, cobraCmd, exec, seq, nodeKey, ds, cfg, gen, + node.NodeOptions{}, fiberClient, + ) + + bgCancel() + loaderWg.Wait() + printer.printFinalSummary() + + if startErr != nil && !errors.Is(startErr, context.Canceled) { + return startErr + } + return nil +} + +// applyBenchDefaults overrides config fields that the bench needs forced +// (Aggregator, Fiber.Enabled) and the canonical defaults that are wrong +// for a throughput bench (DA block time, batching strategy, scrape +// interval, namespaces). Anything the operator passed on the command line +// is left untouched — we only override where the flag value still equals +// its canonical default. +func applyBenchDefaults(cmd *cobra.Command, cfg *rollconf.Config) { + // Forced for the bench: aggregator-only, Fibre DA, no P2P. + cfg.Node.Aggregator = true + cfg.Node.BasedSequencer = false + cfg.DA.Fiber.Enabled = true + if cfg.DA.Fiber.BridgeAddress == "" { + // FiberDAConfig.Validate requires a ws:// or wss:// address. + // Bench never dials it (see fibre.go: noBridgeBlob). + cfg.DA.Fiber.BridgeAddress = "ws://127.0.0.1:0" + } + cfg.P2P.ListenAddress = "/ip4/127.0.0.1/tcp/0" + cfg.P2P.DisableConnectionGater = true + cfg.RPC.Address = "127.0.0.1:0" + cfg.Signer.SignerType = "file" + cfg.Instrumentation.Pprof = false + // The stats printer scrapes /metrics every tick — keep Prometheus on + // even if the operator didn't pass --evnode.instrumentation.prometheus. + cfg.Instrumentation.Prometheus = true + + // Operator-overridable bench defaults — applied only if the canonical + // flag wasn't passed on the command line. + overrideIfUnchanged := func(name string, set func()) { + if !cmd.Flags().Changed(name) { + set() + } + } + overrideIfUnchanged(rollconf.FlagDABlockTime, func() { + cfg.DA.BlockTime = rollconf.DurationWrapper{Duration: time.Second} + }) + overrideIfUnchanged(rollconf.FlagDABatchingStrategy, func() { cfg.DA.BatchingStrategy = "immediate" }) + overrideIfUnchanged(rollconf.FlagScrapeInterval, func() { + cfg.Node.ScrapeInterval = rollconf.DurationWrapper{Duration: 100 * time.Millisecond} + }) + overrideIfUnchanged(rollconf.FlagDARequestTimeout, func() { + cfg.DA.RequestTimeout = rollconf.DurationWrapper{Duration: 60 * time.Second} + }) + overrideIfUnchanged(rollconf.FlagDANamespace, func() { cfg.DA.Namespace = "fb-bench-h" }) + overrideIfUnchanged(rollconf.FlagDADataNamespace, func() { cfg.DA.DataNamespace = "fb-bench-d" }) +} diff --git a/tools/celestia-node-fiber/cmd/fiber-bench/stats.go b/tools/celestia-node-fiber/cmd/fiber-bench/stats.go new file mode 100644 index 0000000000..f45fb91c06 --- /dev/null +++ b/tools/celestia-node-fiber/cmd/fiber-bench/stats.go @@ -0,0 +1,380 @@ +package main + +import ( + "bufio" + "context" + "fmt" + "io" + "net/http" + "strconv" + "strings" + "sync" + "time" +) + +// statsPrinter periodically prints a one-line summary combining counters +// from the in-mem executor and selected Prometheus metrics scraped from +// ev-node's instrumentation endpoint. +// +// Why scrape Prometheus instead of reaching into ev-node? Because the +// metrics ev-node already exports give us the answers we want +// (committed height, txs-per-block, pending blobs, block-production +// duration histogram) and scraping is zero source diff. It also makes +// the same numbers available to a real Prometheus once we move past the +// fail-fast baseline. +type statsPrinter struct { + exec *inMemExecutor + promURL string + httpClient *http.Client + txSize int + adapter *instrumentedAdapter + + mu sync.Mutex + startedAt time.Time + lastTick time.Time + lastInject uint64 + lastTxs float64 + lastBlocks float64 + lastDaInc float64 + peakInjRPS float64 + peakTxRPS float64 + peakDaRPS float64 + + // lastSnapshot caches the last successful Prometheus scrape so + // the final summary still has values after the node has shut + // down (its /metrics endpoint goes away with it). + lastSnapshot map[string]float64 +} + +func newStatsPrinter(exec *inMemExecutor, promListenAddr string, txSize int, adapter *instrumentedAdapter) *statsPrinter { + url := "" + if promListenAddr != "" { + // PrometheusListenAddr can be ":26660" or "127.0.0.1:26660"; + // normalize to a fetchable URL. + host := promListenAddr + if strings.HasPrefix(host, ":") { + host = "127.0.0.1" + host + } + url = "http://" + host + "/metrics" + } + return &statsPrinter{ + exec: exec, + promURL: url, + httpClient: &http.Client{Timeout: 500 * time.Millisecond}, + txSize: txSize, + adapter: adapter, + } +} + +// start prints a header then ticks every interval until ctx is done. +func (p *statsPrinter) start(ctx context.Context, interval time.Duration) { + if interval <= 0 { + interval = time.Second + } + now := time.Now() + p.mu.Lock() + p.startedAt = now + p.lastTick = now + p.mu.Unlock() + + fmt.Println() + // Each rate column shows " / " so tps and bandwidth + // land side by side without doubling the column count. The blob + // size at the latest block stays as an absolute (blob_KB) since + // it's a level, not a rate. + fmt.Printf("%-9s %-15s %-15s %-15s %-7s %-9s %-7s %-8s %-7s %-10s %s\n", + "elapsed", "inj tps/MBs", "exec tps/MBs", "da tps/MBs", + "prod_h", "da_inc_h", "txs/blk", "blob_KB", "pending", "drops", "upload latency") + fmt.Println(strings.Repeat("-", 140)) + + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + p.tick() + } + } + }() +} + +func (p *statsPrinter) tick() { + now := time.Now() + + injected, dropped, blocks, txs, _ := p.exec.Stats() + mempool := p.exec.MempoolDepth() + + prom := p.scrapePrometheus() + if len(prom) > 0 { + p.mu.Lock() + p.lastSnapshot = prom + p.mu.Unlock() + } + // ev-node prefixes its metrics with the namespace from the metrics + // provider — for the aggregator path this is "evnode_sequencer". + producedHeight := prom["evnode_sequencer_height"] + daInclusionHeight := prom["evnode_sequencer_da_inclusion_height"] + totalTxs := prom["evnode_sequencer_total_txs"] + if totalTxs == 0 { + totalTxs = float64(txs) + } + blockBytes := prom["evnode_sequencer_block_size_bytes"] + pending := prom["evnode_sequencer_da_submitter_pending_blobs"] + blocksGauge := float64(blocks) + if producedHeight > blocksGauge { + blocksGauge = producedHeight + } + txsPerBlock := txsPerBlockMetric(blocksGauge, totalTxs) + + p.mu.Lock() + dt := now.Sub(p.lastTick).Seconds() + if dt < 0.001 { + p.mu.Unlock() + return + } + injRPS := float64(injected-p.lastInject) / dt + txRPS := (totalTxs - p.lastTxs) / dt + daSettledRPS := (daInclusionHeight - p.lastDaInc) * txsPerBlock / dt + if injRPS > p.peakInjRPS { + p.peakInjRPS = injRPS + } + if txRPS > p.peakTxRPS { + p.peakTxRPS = txRPS + } + if daSettledRPS > p.peakDaRPS { + p.peakDaRPS = daSettledRPS + } + elapsed := now.Sub(p.startedAt).Truncate(time.Millisecond) + p.lastTick = now + p.lastInject = injected + p.lastTxs = totalTxs + p.lastBlocks = blocksGauge + p.lastDaInc = daInclusionHeight + p.mu.Unlock() + + txSizeBytes := float64(p.txSize) + + upStats := p.adapter.uploadStats() + + fmt.Printf("%-9s %-15s %-15s %-15s %-7.0f %-9.0f %-7.0f %-8.0f %-7.0f %-10d %s\n", + elapsed.String(), + formatRate(injRPS, txSizeBytes), + formatRate(txRPS, txSizeBytes), + formatRate(daSettledRPS, txSizeBytes), + producedHeight, daInclusionHeight, txsPerBlock, blockBytes/1024, pending, dropped, + formatUploadLatency(upStats), + ) + + _ = mempool // currently we report drops, not depth — the mempool is large enough that depth isn't the meaningful signal +} + +// formatUploadLatency renders Upload latency stats as a compact suffix +// for the live table. Returns "-" if no samples yet. +func formatUploadLatency(s uploadStats) string { + if s.Count == 0 { + return "upload[-]" + } + failPart := "" + if s.Failures > 0 { + failPart = fmt.Sprintf(",fails=%d", s.Failures) + } + return fmt.Sprintf("upload[n=%d p50=%v p99=%v%s]", + s.Count, s.P50.Truncate(time.Millisecond), s.P99.Truncate(time.Millisecond), failPart) +} + +// formatRate renders " / " compactly, rounding to whole MB/s +// since sub-MB/s precision isn't useful at our throughput levels and a +// short string keeps the table aligned. +func formatRate(rps, txSizeBytes float64) string { + mbps := rps * txSizeBytes / (1024 * 1024) + switch { + case rps >= 1_000_000: + return fmt.Sprintf("%.1fM/%.0fMB", rps/1_000_000, mbps) + case rps >= 1_000: + return fmt.Sprintf("%.0fk/%.0fMB", rps/1_000, mbps) + default: + return fmt.Sprintf("%.0f/%.1fMB", rps, mbps) + } +} + +// txsPerBlockMetric computes the running mean tx/blk over all produced +// blocks. Only meaningful once at least one block has been produced; +// returns 0 otherwise. +func txsPerBlockMetric(blocks, totalTxs float64) float64 { + if blocks <= 0 { + return 0 + } + return totalTxs / blocks +} + +// scrapePrometheus pulls the ev-node /metrics endpoint and parses just +// the gauges/counters we care about. Best effort: returns empty map on +// any error so the bench keeps running even if metrics aren't ready yet. +func (p *statsPrinter) scrapePrometheus() map[string]float64 { + out := map[string]float64{} + if p.promURL == "" { + return out + } + resp, err := p.httpClient.Get(p.promURL) + if err != nil { + return out + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + _, _ = io.Copy(io.Discard, resp.Body) + return out + } + + wanted := map[string]struct{}{ + "evnode_sequencer_height": {}, + "evnode_sequencer_latest_block_height": {}, + "evnode_sequencer_da_inclusion_height": {}, + "evnode_sequencer_total_txs": {}, + "evnode_sequencer_num_txs": {}, + "evnode_sequencer_block_size_bytes": {}, + "evnode_sequencer_da_submitter_pending_blobs": {}, + } + + scanner := bufio.NewScanner(resp.Body) + for scanner.Scan() { + line := scanner.Text() + if line == "" || strings.HasPrefix(line, "#") { + continue + } + // "metric_name{labels...} value [timestamp]" — strip labels and + // trailing timestamp; we don't use them. + nameEnd := strings.IndexAny(line, "{ ") + if nameEnd < 0 { + continue + } + name := line[:nameEnd] + if _, ok := wanted[name]; !ok { + continue + } + // Skip past labels if present. + rest := line[nameEnd:] + if rest[0] == '{' { + closeIdx := strings.Index(rest, "}") + if closeIdx < 0 { + continue + } + rest = rest[closeIdx+1:] + } + rest = strings.TrimSpace(rest) + valEnd := strings.IndexByte(rest, ' ') + valStr := rest + if valEnd >= 0 { + valStr = rest[:valEnd] + } + v, err := strconv.ParseFloat(valStr, 64) + if err != nil { + continue + } + out[name] = v + } + return out +} + +func (p *statsPrinter) printFinalSummary() { + injected, dropped, blocks, txs, mempoolHigh := p.exec.Stats() + // Prefer a fresh scrape, but fall back to the last live snapshot: + // the node's /metrics endpoint goes away as it shuts down, so a + // post-stop scrape returns an empty map and the summary would + // otherwise print zeros. + prom := p.scrapePrometheus() + p.mu.Lock() + if len(prom) == 0 && p.lastSnapshot != nil { + prom = p.lastSnapshot + } + p.mu.Unlock() + producedHeight := uint64(prom["evnode_sequencer_height"]) + daInclusionHeight := uint64(prom["evnode_sequencer_da_inclusion_height"]) + totalTxs := uint64(prom["evnode_sequencer_total_txs"]) + if totalTxs == 0 { + totalTxs = txs + } + + p.mu.Lock() + elapsed := time.Since(p.startedAt) + peakInj := p.peakInjRPS + peakTx := p.peakTxRPS + p.mu.Unlock() + + avgInj := 0.0 + if elapsed.Seconds() > 0 { + avgInj = float64(injected) / elapsed.Seconds() + } + avgTx := 0.0 + if elapsed.Seconds() > 0 { + avgTx = float64(totalTxs) / elapsed.Seconds() + } + txsPerBlock := 0.0 + if blocks > 0 { + txsPerBlock = float64(totalTxs) / float64(blocks) + } + txSize := float64(p.txSize) + mb := func(rps float64) float64 { return rps * txSize / (1024 * 1024) } + + p.mu.Lock() + peakDa := p.peakDaRPS + p.mu.Unlock() + + var avgDaSettled float64 + if daInclusionHeight > 0 && elapsed.Seconds() > 0 { + avgDaSettled = float64(daInclusionHeight) * txsPerBlock / elapsed.Seconds() + } + + fmt.Println() + fmt.Println(strings.Repeat("=", 70)) + fmt.Println(" BASELINE SUMMARY") + fmt.Println(strings.Repeat("=", 70)) + fmt.Printf("Duration: %s\n", elapsed.Truncate(time.Millisecond)) + fmt.Printf("Tx size: %d B\n", p.txSize) + fmt.Println() + fmt.Printf("Injection: avg %.0f tx/s (%.1f MB/s), peak %.0f tx/s (%.0f MB/s)\n", + avgInj, mb(avgInj), peakInj, mb(peakInj)) + fmt.Printf("Block production: avg %.0f tx/s (%.2f MB/s), peak %.0f tx/s (%.1f MB/s)\n", + avgTx, mb(avgTx), peakTx, mb(peakTx)) + fmt.Printf("DA-settled: avg %.0f tx/s (%.2f MB/s), peak %.0f tx/s (%.1f MB/s)\n", + avgDaSettled, mb(avgDaSettled), peakDa, mb(peakDa)) + fmt.Println() + fmt.Printf("Blocks produced: %d (prod_h=%d)\n", blocks, producedHeight) + fmt.Printf("DA-included height: %d (lag = %d blocks behind production)\n", + daInclusionHeight, producedHeight-daInclusionHeight) + fmt.Printf("Txs into blocks: %d (%.1f tx/blk)\n", totalTxs, txsPerBlock) + fmt.Printf("Dropped (mempool full): %d\n", dropped) + fmt.Printf("Mempool high-water: %d\n", mempoolHigh) + + upStats := p.adapter.uploadStats() + if upStats.Count > 0 { + fmt.Println() + fmt.Println("Fibre Upload latency (per call observed at the adapter):") + fmt.Printf(" count: %d (failures: %d)\n", upStats.Count, upStats.Failures) + fmt.Printf(" mean: %s\n", upStats.Mean.Truncate(time.Millisecond)) + fmt.Printf(" p50: %s\n", upStats.P50.Truncate(time.Millisecond)) + fmt.Printf(" p99: %s\n", upStats.P99.Truncate(time.Millisecond)) + fmt.Printf(" max: %s\n", upStats.Max.Truncate(time.Millisecond)) + // ev-node's submitter runs ONE header-Upload goroutine and + // ONE data-Upload goroutine concurrently (each TryLock'd via + // its own mutex in submitter.go). A block settles only when + // BOTH its header and data Uploads have returned, and each + // stream submits at most 1 Upload per mean_latency seconds — + // so the per-stream cap is 1/mean blocks/s, and the block + // settlement cap (min of the two) equals it. We print this + // so the operator can compare it to the observed da_inc_h + // rate and tell apart "Fibre Upload is slow" from "ev-node + // is leaving capacity on the table". + if upStats.Mean > 0 { + capBlocksPerSec := 1.0 / upStats.Mean.Seconds() + fmt.Printf(" implied cap (1/mean per stream): %.2f blocks/s ≈ %.0f tx/s (%.2f MB/s)\n", + capBlocksPerSec, + capBlocksPerSec*txsPerBlock, + capBlocksPerSec*txsPerBlock*txSize/(1024*1024), + ) + } + } + fmt.Println(strings.Repeat("=", 70)) +} diff --git a/tools/celestia-node-fiber/cmd/fiber-bench/util.go b/tools/celestia-node-fiber/cmd/fiber-bench/util.go new file mode 100644 index 0000000000..666d00f041 --- /dev/null +++ b/tools/celestia-node-fiber/cmd/fiber-bench/util.go @@ -0,0 +1,17 @@ +package main + +import ( + "os" + "path/filepath" +) + +// defaultKeyringDir is where we put the bench's cosmos keyring by default. +// Sibling of the ev-node home (~/.fiber-bench/node) so --keep-home=false +// runs cannot wipe it. +func defaultKeyringDir() string { + home, err := os.UserHomeDir() + if err != nil { + return ".fiber-bench-keyring" + } + return filepath.Join(home, ".fiber-bench", "keyring") +}