Skip to content

Commit 84ecbaf

Browse files
committed
perf(fiber-da): per-item concurrent Uploads on Submit
Fan out one goroutine per item in fiber DA Submit, calling fiber.Upload concurrently with the caller's ctx. Settlement throughput now scales linearly with the batch size: previously ev-node's submitter could only have one Upload in flight per stream (header + data, mutex-locked in submitter.go), and each Submit further serialized the batch into one big flatten-encoded blob. With fan-out, a Submit of N items becomes N concurrent Uploads, and Fibre's ~1.5 s per-Upload latency amortizes across N. The result-aggregation honors submitToDA's "prefix of successes" contract: SubmittedCount = N means items [0..N) succeeded and the caller will retry [N..end). Reporting interleaved successes would double-submit blobs and waste escrow; matching prefix semantics keeps the retry contract intact even when individual Uploads fail out-of-order. Pair changes in submitting/da_submitter.go: - limitBatchBySize gains a maxItems cap (was total-bytes-only). Each item is still bounded by maxItemBytes (chain ceiling), but the total batch is now bounded by item count, letting multiple full-size items flow through one Submit. - retryPolicy adds MaxItems with a sensible non-fiber default of 1 (preserves legacy single-item-per-Submit semantics for backends that flatten a batch into one blob). - For the fiber backend, MaxItems is bumped to 16 — covers a 5 min run at 1 b/s production with 4–8 pending blocks while leaving headroom for memory pressure under MaxBlobSize-sized items. Wire-format follow-up (see TODO in fiber_client.go::Submit): the retrieve path in this file still uses splitBlobs which assumes the old single-prefixed-blob format. Per-item Uploads now produce raw blobs with their own BlobIDs; retrieve needs an update to read each BlobID separately. The bench's aggregator-only setup never invokes retrieve so this is unblocked for measurement but blocks merging to production until addressed.
1 parent ef84e01 commit 84ecbaf

2 files changed

Lines changed: 125 additions & 47 deletions

File tree

block/internal/da/fiber_client.go

Lines changed: 70 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/binary"
66
"errors"
77
"fmt"
8+
"sync"
89
"time"
910

1011
"github.com/rs/zerolog"
@@ -87,62 +88,96 @@ func (c *fiberDAClient) Submit(ctx context.Context, data [][]byte, _ float64, na
8788
}
8889
}
8990

90-
// Single-item fast path: avoid the MaxBlobSize-sized allocation +
91-
// memcpy that flattenBlobs would do just to wrap one item in the
92-
// 8-byte count/length prefix. With per-item caps already saturating
93-
// MaxBlobBytes for data blobs, this is the steady-state path.
91+
// Per-item concurrent Upload. Fibre's per-Upload latency is
92+
// dominated by validator signature aggregation (~1.5 s on a
93+
// healthy network) and does not scale up linearly under multiple
94+
// in-flight Uploads, so settlement throughput scales with the
95+
// number of concurrent items submitted in a single batch. Each
96+
// item gets its own goroutine, its own Upload call, and its own
97+
// BlobID in the result; the previous flatten step was both
98+
// memory-wasteful (a MaxBlobSize-sized memcpy on every Submit)
99+
// and inherently serial (one Upload per Submit).
94100
//
95-
// TODO: wire-format compat — splitBlobs always expects the prefix,
96-
// so any retriever (full node syncer, light client) downloading a
97-
// blob written via this fast path will fail to decode. Address
98-
// alongside the concurrent-uploads change by switching to a
99-
// per-item Upload model where flatten is no longer needed.
100-
var blob []byte
101-
if len(data) == 1 {
102-
blob = data[0]
103-
} else {
104-
blob = flattenBlobs(data)
101+
// TODO: wire-format compat — old splitBlobs assumed all items in
102+
// a Submit were written as a single prefixed blob. With per-item
103+
// Uploads, retrievers must treat each BlobID separately. The
104+
// retrieve path in this file still uses splitBlobs and will need
105+
// a follow-up to read the new per-item blobs as raw payloads.
106+
nsID := namespace[len(namespace)-10:]
107+
type uploadResult struct {
108+
idx int
109+
id []byte
110+
err error
111+
}
112+
results := make([]uploadResult, len(data))
113+
var wg sync.WaitGroup
114+
for i := range data {
115+
wg.Add(1)
116+
go func(i int) {
117+
defer wg.Done()
118+
res, err := c.fiber.Upload(ctx, nsID, data[i])
119+
if err != nil {
120+
results[i] = uploadResult{idx: i, err: err}
121+
return
122+
}
123+
id := make([]byte, len(res.BlobID))
124+
copy(id, res.BlobID)
125+
results[i] = uploadResult{idx: i, id: id}
126+
}(i)
127+
}
128+
wg.Wait()
129+
130+
// Walk results in submission order. submitToDA's retry logic
131+
// expects "prefix of successes": SubmittedCount=N means items
132+
// [0..N) succeeded and the caller will re-submit items [N..end)
133+
// on the next attempt. Reporting interleaved successes would
134+
// double-submit blobs and waste escrow; matching prefix
135+
// semantics keeps the contract intact even when individual
136+
// Uploads fail out-of-order.
137+
ids := make([][]byte, 0, len(data))
138+
var firstErr error
139+
for _, r := range results {
140+
if r.err != nil {
141+
firstErr = r.err
142+
break
143+
}
144+
ids = append(ids, r.id)
105145
}
106146

107-
// Honor the caller's context so Upload returns promptly on
108-
// shutdown / parent cancellation. The previous context.Background()
109-
// kept Uploads alive past node shutdown and contributed to the
110-
// "payment promise already processed" warnings we saw in early
111-
// runs (a stale Upload would settle after the node had stopped
112-
// tracking it).
113-
result, err := c.fiber.Upload(ctx, namespace[len(namespace)-10:], blob)
114-
if err != nil {
147+
if len(ids) == 0 && firstErr != nil {
115148
code := datypes.StatusError
116149
switch {
117-
case errors.Is(err, context.Canceled):
150+
case errors.Is(firstErr, context.Canceled):
118151
code = datypes.StatusContextCanceled
119-
case errors.Is(err, context.DeadlineExceeded):
152+
case errors.Is(firstErr, context.DeadlineExceeded):
120153
code = datypes.StatusContextDeadline
121154
}
122-
123-
c.logger.Error().Err(err).Msg("fiber upload failed")
124-
155+
c.logger.Error().Err(firstErr).Msg("fiber upload failed")
125156
return datypes.ResultSubmit{
126157
BaseResult: datypes.BaseResult{
127-
Code: code,
128-
Message: fmt.Sprintf("fiber upload failed for blob: %v", err),
129-
// On error nothing settled — the previous len(data)-1
130-
// reported all-but-one as submitted on full failure,
131-
// which lied to the caller's retry/postSubmit logic.
158+
Code: code,
159+
Message: fmt.Sprintf("fiber upload failed for blob: %v", firstErr),
132160
SubmittedCount: 0,
133161
BlobSize: blobSize,
134162
Timestamp: time.Now(),
135163
},
136164
}
137165
}
138166

139-
c.logger.Debug().Int("num_ids", len(data)).Uint64("height", 0 /* TODO */).Msg("fiber DA submission successful")
167+
if firstErr != nil {
168+
c.logger.Warn().Err(firstErr).
169+
Int("submitted", len(ids)).
170+
Int("total", len(data)).
171+
Msg("fiber upload partial success — caller will retry the remainder")
172+
}
173+
174+
c.logger.Debug().Int("num_ids", len(ids)).Uint64("height", 0 /* TODO */).Msg("fiber DA submission successful")
140175

141176
return datypes.ResultSubmit{
142177
BaseResult: datypes.BaseResult{
143178
Code: datypes.StatusSuccess,
144-
IDs: [][]byte{result.BlobID},
145-
SubmittedCount: uint64(len(data)),
179+
IDs: ids,
180+
SubmittedCount: uint64(len(ids)),
146181
Height: 0, /* TODO */
147182
BlobSize: blobSize,
148183
Timestamp: time.Now(),

block/internal/submitting/da_submitter.go

Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,28 @@ type retryPolicy struct {
4343
MinBackoff time.Duration
4444
MaxBackoff time.Duration
4545
MaxBlobBytes uint64
46+
// MaxItems caps the number of items packed into a single Submit
47+
// call. DA clients that fan out per-item Uploads (fiber) benefit
48+
// linearly from larger batches — settlement throughput scales
49+
// with concurrency until per-Upload latency dominates. Default
50+
// 1 preserves legacy single-item-per-Submit semantics for
51+
// backends that flatten a batch into one blob (JSON-RPC blob
52+
// client). The fiber path overrides this from config.
53+
MaxItems int
4654
}
4755

56+
// defaultBatchItems is the conservative default for non-fiber backends
57+
// that historically expected one item per Submit call. The fiber path
58+
// raises this via config because it can fan out per-item Uploads.
59+
const defaultBatchItems = 1
60+
61+
// fiberDefaultBatchItems is the upper bound on items packed into a
62+
// single fiber Submit. Each item gets its own concurrent Upload, so
63+
// this caps the per-batch goroutine fan-out. 16 covers a 5 min run at
64+
// 1 b/s production with 4–8 pending blocks while leaving headroom for
65+
// memory pressure; tunable via config when the cleanup TODO lands.
66+
const fiberDefaultBatchItems = 16
67+
4868
func defaultRetryPolicy(maxAttempts int, maxDuration time.Duration) retryPolicy {
4969
return retryPolicy{
5070
MaxAttempts: maxAttempts,
@@ -57,6 +77,7 @@ func defaultRetryPolicy(maxAttempts int, maxDuration time.Duration) retryPolicy
5777
// the duplication is what made packed-block-larger-than-cap
5878
// failures non-obvious. See common/consts.go.
5979
MaxBlobBytes: common.DefaultMaxBlobSize,
80+
MaxItems: defaultBatchItems,
6081
}
6182
}
6283

@@ -578,12 +599,19 @@ func submitToDA[T any](
578599
}
579600

580601
pol := defaultRetryPolicy(s.config.DA.MaxSubmitAttempts, s.config.DA.BlockTime.Duration)
602+
// Fiber's DA client fans out per-item Uploads concurrently, so
603+
// packing more items per Submit lifts settlement throughput. For
604+
// non-fiber backends the default of 1 preserves the legacy
605+
// flatten-one-blob behavior.
606+
if s.config.DA.IsFiberEnabled() {
607+
pol.MaxItems = fiberDefaultBatchItems
608+
}
581609

582610
rs := retryState{Attempt: 0, Backoff: 0}
583611

584612
// Limit this submission to a single size-capped batch
585613
if len(marshaled) > 0 {
586-
batchItems, batchMarshaled, err := limitBatchBySize(items, marshaled, pol.MaxBlobBytes)
614+
batchItems, batchMarshaled, err := limitBatchBySize(items, marshaled, pol.MaxBlobBytes, pol.MaxItems)
587615
if err != nil {
588616
s.logger.Error().
589617
Str("itemType", itemType).
@@ -694,27 +722,42 @@ func submitToDA[T any](
694722
return fmt.Errorf("failed to submit all %s(s) to DA layer after %d attempts", itemType, rs.Attempt)
695723
}
696724

697-
// limitBatchBySize returns a prefix of items whose total marshaled size does not exceed maxBytes.
698-
// If the first item exceeds maxBytes, it returns ErrOversizedItem which is unrecoverable.
699-
func limitBatchBySize[T any](items []T, marshaled [][]byte, maxBytes uint64) ([]T, [][]byte, error) {
700-
total := uint64(0)
725+
// limitBatchBySize returns a prefix of items whose per-item marshaled size
726+
// fits within maxItemBytes. The total batch size is bounded by item count
727+
// (maxItems), not by total bytes — DA clients that can fan out per-item
728+
// Uploads (e.g. the fiber DA client) settle each item in its own
729+
// concurrent Upload call, so packing more items per batch lifts the
730+
// effective settlement throughput. DA clients that flatten a batch into
731+
// a single blob still get one item per call when maxItems == 1.
732+
//
733+
// If the first item exceeds maxItemBytes, returns ErrOversizedItem
734+
// (unrecoverable). If no items fit at all (empty inputs), returns a
735+
// distinct error so the caller can distinguish "nothing to send".
736+
//
737+
// TODO(throughput-cleanup): see common/consts.go — maxItemBytes is the
738+
// per-item chain ceiling, separate from the raw-tx budget driving
739+
// FilterTxs. Once that split lands, the duplicate-cap-everywhere
740+
// problem these fixes work around goes away.
741+
func limitBatchBySize[T any](items []T, marshaled [][]byte, maxItemBytes uint64, maxItems int) ([]T, [][]byte, error) {
742+
if maxItems <= 0 {
743+
maxItems = 1
744+
}
701745
count := 0
702746
for i := range items {
747+
if count >= maxItems {
748+
break
749+
}
703750
sz := uint64(len(marshaled[i]))
704-
if sz > maxBytes {
751+
if sz > maxItemBytes {
705752
if i == 0 {
706-
return nil, nil, fmt.Errorf("%w: item size %d exceeds max %d", common.ErrOversizedItem, sz, maxBytes)
753+
return nil, nil, fmt.Errorf("%w: item size %d exceeds max %d", common.ErrOversizedItem, sz, maxItemBytes)
707754
}
708755
break
709756
}
710-
if total+sz > maxBytes {
711-
break
712-
}
713-
total += sz
714757
count++
715758
}
716759
if count == 0 {
717-
return nil, nil, fmt.Errorf("no items fit within %d bytes", maxBytes)
760+
return nil, nil, fmt.Errorf("no items fit within %d bytes", maxItemBytes)
718761
}
719762
return items[:count], marshaled[:count], nil
720763
}

0 commit comments

Comments
 (0)