Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
246e17d
test(fiber-bench): single-sequencer ev-node bench against a remote Fi…
Wondertan Apr 27, 2026
08897cd
feat(common): default MaxBlobSize to Fibre's actual cap (128 MiB - 5 B)
Wondertan Apr 27, 2026
6ac59c7
fix(block/executing): reserve proto/metadata overhead in RetrieveBatc…
Wondertan Apr 27, 2026
ecd7f62
fix(reaper,cache): make seen-tx retention link-time tunable to avoid …
Wondertan Apr 27, 2026
e0021d9
fix(fiber-bench/loader): backoff with sleep when mempool is full
Wondertan Apr 27, 2026
dede63e
fix(common): make defaultMaxBlobSizeStr a string literal so -ldflags …
Wondertan Apr 27, 2026
44e977a
docs: TODO(throughput-cleanup) on the DA-blob-vs-raw-tx-budget confla…
Wondertan Apr 27, 2026
ef84e01
perf(fiber-da): skip flatten allocation on single-item Submit; honor ctx
Wondertan Apr 27, 2026
84ecbaf
perf(fiber-da): per-item concurrent Uploads on Submit
Wondertan Apr 27, 2026
35f1e13
perf(fiber-bench): use in-memory KV store, not disk-backed Badger
Wondertan Apr 27, 2026
7ed0bf1
fix(fiber-bench): use ds.MapDatastore, not Badger in-memory
Wondertan Apr 27, 2026
1b518e8
hack(store): swap NewDefaultKVStore to in-memory MapDatastore
Wondertan Apr 27, 2026
b4e03f9
hack(reaper,cache): collapse seen-tx TTL plumbing back to plain consts
Wondertan Apr 27, 2026
bba2aa8
docs: surface follow-up issues left by the throughput hacks
Wondertan Apr 27, 2026
57fa859
refactor(fiber-bench): delegate node wiring to rollcmd.StartNode
Wondertan Apr 27, 2026
a1b2c9d
fix(apps): unblock testapp/evm/grpc compile by passing nil fiberClient
Wondertan Apr 27, 2026
5946b08
refactor(fiber-bench): reuse canonical config flags via rollconf.AddF…
Wondertan Apr 27, 2026
9f4fe4c
refactor(fiber-bench): inline loader backoff, drop yield.go
Wondertan Apr 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion apps/evm/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,10 @@ var RunCmd = &cobra.Command{
}()
}

return rollcmd.StartNode(logger, cmd, executor, sequencer, nodeKey, datastore, nodeConfig, genesis, node.NodeOptions{})
// nil fiberClient: the EVM app doesn't wire Fibre DA. See
// tools/celestia-node-fiber for the adapter; testapp/cmd/run.go
// has the same TODO note for matching context.
return rollcmd.StartNode(logger, cmd, executor, sequencer, nodeKey, datastore, nodeConfig, genesis, node.NodeOptions{}, nil)
},
}

Expand Down
6 changes: 4 additions & 2 deletions apps/grpc/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,10 @@ The execution client must implement the Evolve execution gRPC interface.`,
return err
}

// Start the node
return rollcmd.StartNode(logger, cmd, executor, sequencer, nodeKey, datastore, nodeConfig, genesis, node.NodeOptions{})
// Start the node. nil fiberClient: the gRPC app doesn't wire
// Fibre DA. See tools/celestia-node-fiber for the adapter;
// testapp/cmd/run.go has the same TODO note for context.
return rollcmd.StartNode(logger, cmd, executor, sequencer, nodeKey, datastore, nodeConfig, genesis, node.NodeOptions{}, nil)
},
}

Expand Down
7 changes: 6 additions & 1 deletion apps/testapp/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,12 @@ var RunCmd = &cobra.Command{
return err
}

return cmd.StartNode(logger, command, executor, sequencer, nodeKey, datastore, nodeConfig, genesis, node.NodeOptions{})
// nil fiberClient: testapp doesn't yet wire Fibre DA. To enable
// fiber support here, build a *cnfiber.Adapter from
// nodeConfig.DA.Fiber and pass it as the last argument. The
// adapter wiring lives in tools/celestia-node-fiber; see the
// fiber-bench tool's run.go for a working caller.
return cmd.StartNode(logger, command, executor, sequencer, nodeKey, datastore, nodeConfig, genesis, node.NodeOptions{}, nil)
},
}

Expand Down
20 changes: 18 additions & 2 deletions block/internal/cache/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,24 @@ const (
// DataDAIncludedPrefix is the store key prefix for data DA inclusion tracking.
DataDAIncludedPrefix = "cache/data-da-included/"

// DefaultTxCacheRetention is the default time to keep transaction hashes in cache.
DefaultTxCacheRetention = 24 * time.Hour
// DefaultTxCacheRetention is how long tx hashes stay in the
// seen-tx cache before CleanupOldTxs evicts them.
//
// HACK(fiber-throughput): dropped from 24h to 30s while we chase
// throughput, but the previous default was itself wrong: 24h is
// retention × tps in memory, so any rollup with meaningful TPS
// would OOM (we hit ~16 GB in under a minute at ~1.5M tx/s).
// What this should be properly:
// - Bounded by entry count, not wall time. The dedup window
// should be "the last N txs we saw", LRU-evicted, so cache
// memory is fixed regardless of throughput.
// - Or expressed in DA blocks: "drop hashes once their txs
// would have been retried out of the mempool", which is a
// property of mempool TTL × DA block time, not 24 hours.
// - 30s is a fine measurement default and a reasonable upper
// bound for pretty much any rollup; pick the right number
// when the cache structure itself is reworked.
DefaultTxCacheRetention = 30 * time.Second
)

// CacheManager provides thread-safe cache operations for tracking seen blocks
Expand Down
45 changes: 40 additions & 5 deletions block/internal/common/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,54 @@ package common

import "strconv"

// defaultMaxBlobSizeStr holds the string representation of the default blob
// size limit. Override at link time via:
// defaultMaxBlobSizeStr holds the string representation of the default
// blob size limit. Anchored to Fibre's actual cap: protocol MaxBlobSize
// (1 << 27 = 128 MiB) minus the 5-byte Fibre blob header (1 byte
// version + 4 byte data size). See celestia-app/v9/fibre/blob.go
// (blobHeaderLen) and fibre/protocol_params.go (MaxBlobSize).
//
// go build -ldflags "-X github.com/evstack/ev-node/block/internal/common.defaultMaxBlobSizeStr=125829120"
var defaultMaxBlobSizeStr = "5242880" // 5 MB
// HACK(fiber-throughput): this default is correct for fiber-enabled
// deployments but WRONG for the legacy JSON-RPC blob client path —
// the bridge / chain rejects blobs above its own (much smaller) cap,
// so a non-fiber node started against this default would fail to
// submit. The right shape is per-backend: fiber's cap is one number,
// blob-RPC's cap is another, and DefaultMaxBlobSize shouldn't be a
// single global. Restructure into config when the throughput-cleanup
// TODO lands; until then, non-fiber callers should override via
// ldflag or local config.
//
// MUST be a string literal: Go's `-ldflags "-X ..."` only takes effect
// on variables initialized to a string constant, NOT a function call.
// A previous version used strconv.FormatUint here, which compiled but
// silently ignored ldflag overrides.
//
// Override at link time via:
//
// go build -ldflags "-X github.com/evstack/ev-node/block/internal/common.defaultMaxBlobSizeStr=33554432"
var defaultMaxBlobSizeStr = "134217723" // 1 << 27 - 5 = 128 MiB - 5 B

// DefaultMaxBlobSize is the max blob size limit used for blob submission.
//
// TODO(throughput-cleanup): this single value is currently plugged in
// at two semantically different limits and the conflation has caused
// real bugs (a packed block marshals larger than its raw-tx total, so
// using MaxBlobSize as both input cap and output cap let blocks blow
// past the DA cap). Split into two:
//
// MaxBlobSize — chain-side ceiling on a marshaled DA blob
// MaxBlockTxBytes() — derived raw-tx budget = MaxBlobSize - per-block
// marshal overhead. Used by RetrieveBatch /
// FilterTxs.
//
// Once that derivation exists, drop the ad-hoc 2% reservation in
// executing/executor.go::RetrieveBatch and the duplicate cap in
// submitting/da_submitter.go::defaultRetryPolicy.
var DefaultMaxBlobSize uint64

func init() {
v, err := strconv.ParseUint(defaultMaxBlobSizeStr, 10, 64)
if err != nil || v == 0 {
DefaultMaxBlobSize = 5 * 1024 * 1024 // 5 MB fallback
DefaultMaxBlobSize = 134217723
return
}
DefaultMaxBlobSize = v
Expand Down
121 changes: 85 additions & 36 deletions block/internal/da/fiber_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/binary"
"errors"
"fmt"
"sync"
"time"

"github.com/rs/zerolog"
Expand Down Expand Up @@ -87,38 +88,96 @@ func (c *fiberDAClient) Submit(ctx context.Context, data [][]byte, _ float64, na
}
}

flat := flattenBlobs(data)
// Per-item concurrent Upload. Fibre's per-Upload latency is
// dominated by validator signature aggregation (~1.5 s on a
// healthy network) and does not scale up linearly under multiple
// in-flight Uploads, so settlement throughput scales with the
// number of concurrent items submitted in a single batch. Each
// item gets its own goroutine, its own Upload call, and its own
// BlobID in the result; the previous flatten step was both
// memory-wasteful (a MaxBlobSize-sized memcpy on every Submit)
// and inherently serial (one Upload per Submit).
//
// TODO: wire-format compat — old splitBlobs assumed all items in
// a Submit were written as a single prefixed blob. With per-item
// Uploads, retrievers must treat each BlobID separately. The
// retrieve path in this file still uses splitBlobs and will need
// a follow-up to read the new per-item blobs as raw payloads.
nsID := namespace[len(namespace)-10:]
type uploadResult struct {
idx int
id []byte
err error
}
results := make([]uploadResult, len(data))
var wg sync.WaitGroup
for i := range data {
wg.Add(1)
go func(i int) {
defer wg.Done()
res, err := c.fiber.Upload(ctx, nsID, data[i])
if err != nil {
results[i] = uploadResult{idx: i, err: err}
return
}
id := make([]byte, len(res.BlobID))
copy(id, res.BlobID)
results[i] = uploadResult{idx: i, id: id}
}(i)
}
wg.Wait()

// Walk results in submission order. submitToDA's retry logic
// expects "prefix of successes": SubmittedCount=N means items
// [0..N) succeeded and the caller will re-submit items [N..end)
// on the next attempt. Reporting interleaved successes would
// double-submit blobs and waste escrow; matching prefix
// semantics keeps the contract intact even when individual
// Uploads fail out-of-order.
ids := make([][]byte, 0, len(data))
var firstErr error
for _, r := range results {
if r.err != nil {
firstErr = r.err
break
}
ids = append(ids, r.id)
}

result, err := c.fiber.Upload(context.Background(), namespace[len(namespace)-10:], flat)
if err != nil {
if len(ids) == 0 && firstErr != nil {
code := datypes.StatusError
switch {
case errors.Is(err, context.Canceled):
case errors.Is(firstErr, context.Canceled):
code = datypes.StatusContextCanceled
case errors.Is(err, context.DeadlineExceeded):
case errors.Is(firstErr, context.DeadlineExceeded):
code = datypes.StatusContextDeadline
}

c.logger.Error().Err(err).Msg("fiber upload failed")

c.logger.Error().Err(firstErr).Msg("fiber upload failed")
return datypes.ResultSubmit{
BaseResult: datypes.BaseResult{
Code: code,
Message: fmt.Sprintf("fiber upload failed for blob: %v", err),
SubmittedCount: uint64(len(data) - 1),
Message: fmt.Sprintf("fiber upload failed for blob: %v", firstErr),
SubmittedCount: 0,
BlobSize: blobSize,
Timestamp: time.Now(),
},
}
}

c.logger.Debug().Int("num_ids", len(data)).Uint64("height", 0 /* TODO */).Msg("fiber DA submission successful")
if firstErr != nil {
c.logger.Warn().Err(firstErr).
Int("submitted", len(ids)).
Int("total", len(data)).
Msg("fiber upload partial success — caller will retry the remainder")
}

c.logger.Debug().Int("num_ids", len(ids)).Uint64("height", 0 /* TODO */).Msg("fiber DA submission successful")

return datypes.ResultSubmit{
BaseResult: datypes.BaseResult{
Code: datypes.StatusSuccess,
IDs: [][]byte{result.BlobID},
SubmittedCount: uint64(len(data)),
IDs: ids,
SubmittedCount: uint64(len(ids)),
Height: 0, /* TODO */
BlobSize: blobSize,
Timestamp: time.Now(),
Expand Down Expand Up @@ -329,29 +388,19 @@ func (c *fiberDAClient) Validate(_ context.Context, ids []datypes.ID, proofs []d
func (c *fiberDAClient) GetHeaderNamespace() []byte { return c.namespaceBz }
func (c *fiberDAClient) GetDataNamespace() []byte { return c.dataNamespaceBz }

func flattenBlobs(blobs [][]byte) []byte {
if len(blobs) == 0 {
return nil
}

var total int
for _, b := range blobs {
total += 4 + len(b)
}
total += 4

buf := make([]byte, total)
binary.BigEndian.PutUint32(buf, uint32(len(blobs)))
off := 4
for _, b := range blobs {
binary.BigEndian.PutUint32(buf[off:], uint32(len(b)))
off += 4
copy(buf[off:], b)
off += len(b)
}
return buf
}

// splitBlobs decodes the legacy "count + per-item length" framing that
// the previous Submit path used to pack multiple blobs into a single
// Upload. The per-item-concurrent Submit path no longer writes that
// framing — each item is uploaded raw — so any blob written by this
// branch's Submit will fail to decode here.
//
// Callers (Retrieve / RetrieveBlobs / Get / Subscribe) therefore only
// work for blobs written by the OLD code path, OR for the multi-item
// header batches that still use it. Pair the format change with a
// matching update to the read path before any node on this branch
// tries to sync from another node on this branch.
//
// Tracked alongside the wire-format TODO on Submit (above).
func splitBlobs(data []byte) ([][]byte, error) {
if len(data) == 0 {
return nil, nil
Expand Down
26 changes: 25 additions & 1 deletion block/internal/executing/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,11 +663,35 @@ func (e *Executor) ProduceBlock(ctx context.Context) error {
return nil
}

// blockMarshalOverhead reserves a fraction of MaxBlobSize for the proto
// framing + Metadata overhead added when types.Data is marshaled into a
// DA blob. Empirically the per-tx proto length-prefix runs ~3 bytes,
// which is roughly 1.5% at 200 B txs and stays in that range across
// realistic tx sizes; 2% gives margin for fixed Metadata fields without
// leaving meaningful capacity unused. Reserving here (vs. inside
// FilterTxs) keeps the executor’s view of MaxBytes equal to the raw-tx
// budget and prevents a fully packed batch from blowing past the
// submitter’s MaxBlobSize check.
//
// TODO(throughput-cleanup): this is the workaround half of a deeper
// issue — common.DefaultMaxBlobSize is used as both the raw-tx
// budget AND the marshaled-blob ceiling. The right fix is to derive
// a MaxBlockTxBytes() value once (= MaxBlobSize - overhead) and have
// RetrieveBatch / FilterTxs / da_submitter.limitBatchBySize all
// reference the appropriate value rather than each enforcing the
// same number with their own ad-hoc adjustments. See
// common/consts.go for the umbrella TODO.
const blockMarshalOverheadPct = 2

// RetrieveBatch gets the next batch of transactions from the sequencer.
func (e *Executor) RetrieveBatch(ctx context.Context) (*BatchData, error) {
maxTxBytes := common.DefaultMaxBlobSize
if reserve := maxTxBytes * blockMarshalOverheadPct / 100; reserve < maxTxBytes {
maxTxBytes -= reserve
}
req := coresequencer.GetNextBatchRequest{
Id: []byte(e.genesis.ChainID),
MaxBytes: common.DefaultMaxBlobSize,
MaxBytes: maxTxBytes,
LastBatchData: [][]byte{}, // Can be populated if needed for sequencer context
}

Expand Down
16 changes: 15 additions & 1 deletion block/internal/reaping/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,21 @@ import (
const (
// MaxBackoffInterval is the maximum backoff interval for retries
MaxBackoffInterval = 30 * time.Second
CleanupInterval = 1 * time.Hour

// CleanupInterval is how often the reaper sweeps expired hashes
// out of the seen-tx cache.
//
// HACK(fiber-throughput): dropped from 1h to 5s. The original
// 1h was effectively coupled to the previous 24h retention —
// sweeping every hour against a 24h window means a cache entry
// can outlive its retention by 1h, which is fine when retention
// is a day but completely breaks at 30s retention (entries
// would survive 12× past expiry). Whatever the right retention
// turns out to be (see DefaultTxCacheRetention's note in
// cache/manager.go), this value should be a small fraction of
// it — not a fixed time. Better to derive: e.g.
// retention/10 with a sane min/max.
CleanupInterval = 5 * time.Second
)

// Reaper is responsible for periodically retrieving transactions from the executor,
Expand Down
Loading
Loading