Skip to content

Commit a51d28b

Browse files
tac0turtleclaude
andauthored
perf: reduce allocations on hot paths (#38)
- Cache Prometheus WithLabelValues at init (metrics.go): eliminates per-height map lookups and slice allocs for sync state, backfill stages, and store operations. - Move transient error needles to package-level var (celestia_node.go): avoids per-retry []string heap allocation. - Cache bearer auth metadata map at construction (celestia_app.go): avoids map+string alloc per gRPC call. - Replace map[string]any with struct in MarshalBlob (service.go): ~200 bytes less per blob marshal. - Preallocate blob slices in GetBlobs, BlobGetAll, gRPC GetAll: reduces append doublings on typical result sets. - Optimize notifier filterEvent: skip allocation when all/no blobs match; use exact-capacity slice for partial matches. - Skip namespace map allocation for header-only subscriptions. - Use slice indexing in httpToWS instead of TrimPrefix. Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent c8b5ebc commit a51d28b

7 files changed

Lines changed: 177 additions & 94 deletions

File tree

pkg/api/grpc/blob_service.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func (s *BlobServiceServer) GetAll(ctx context.Context, req *pb.GetAllRequest) (
7373
nsList[i] = ns
7474
}
7575

76-
var allBlobs []types.Blob
76+
allBlobs := make([]types.Blob, 0, len(nsList)*8)
7777
for _, ns := range nsList {
7878
blobs, err := s.svc.Store().GetBlobs(ctx, ns, req.Height, req.Height, 0, 0)
7979
if err != nil {

pkg/api/notifier.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,13 @@ func (n *Notifier) Subscribe(namespaces []types.Namespace) (*Subscription, error
7777
}
7878

7979
id := n.nextID.Add(1)
80-
nsSet := make(map[types.Namespace]struct{}, len(namespaces))
81-
for _, ns := range namespaces {
82-
nsSet[ns] = struct{}{}
80+
81+
var nsSet map[types.Namespace]struct{}
82+
if len(namespaces) > 0 {
83+
nsSet = make(map[types.Namespace]struct{}, len(namespaces))
84+
for _, ns := range namespaces {
85+
nsSet[ns] = struct{}{}
86+
}
8387
}
8488

8589
sub := &Subscription{
@@ -161,13 +165,27 @@ func (n *Notifier) Publish(event HeightEvent) {
161165

162166
// filterEvent returns an event with blobs filtered to the subscriber's
163167
// namespace set. If the subscriber watches all namespaces, the event is
164-
// returned as-is.
168+
// returned as-is. Avoids allocation when no blobs match.
165169
func (n *Notifier) filterEvent(event HeightEvent, sub *Subscription) HeightEvent {
166170
if len(sub.namespaces) == 0 {
167171
return event
168172
}
169173

170-
filtered := make([]types.Blob, 0, len(event.Blobs))
174+
// Count matches first to avoid allocating when nothing matches.
175+
count := 0
176+
for i := range event.Blobs {
177+
if _, ok := sub.namespaces[event.Blobs[i].Namespace]; ok {
178+
count++
179+
}
180+
}
181+
if count == len(event.Blobs) {
182+
return event // all match — no copy needed
183+
}
184+
if count == 0 {
185+
return HeightEvent{Height: event.Height, Header: event.Header}
186+
}
187+
188+
filtered := make([]types.Blob, 0, count)
171189
for i := range event.Blobs {
172190
if _, ok := sub.namespaces[event.Blobs[i].Namespace]; ok {
173191
filtered = append(filtered, event.Blobs[i])

pkg/api/service.go

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func (s *Service) BlobGetByCommitment(ctx context.Context, commitment []byte) (j
7070
// limit=0 means no limit; offset=0 means no offset.
7171
// Pagination is applied to the aggregate result across all namespaces.
7272
func (s *Service) BlobGetAll(ctx context.Context, height uint64, namespaces []types.Namespace, limit, offset int) (json.RawMessage, error) {
73-
var allBlobs []types.Blob
73+
allBlobs := make([]types.Blob, 0, len(namespaces)*8) // preallocate for typical workload
7474
for _, ns := range namespaces {
7575
blobs, err := s.store.GetBlobs(ctx, ns, height, height, 0, 0)
7676
if err != nil {
@@ -179,15 +179,25 @@ func (s *Service) Fetcher() fetch.DataFetcher {
179179
return s.fetcher
180180
}
181181

182+
// blobJSON is a struct-based representation for celestia-node compatible JSON.
183+
// Using a struct avoids the per-call map[string]any allocation that json.Marshal
184+
// requires for maps.
185+
type blobJSON struct {
186+
Namespace []byte `json:"namespace"`
187+
Data []byte `json:"data"`
188+
ShareVersion uint32 `json:"share_version"`
189+
Commitment []byte `json:"commitment"`
190+
Index int `json:"index"`
191+
}
192+
182193
// MarshalBlob converts a stored blob into celestia-node compatible JSON.
183194
func MarshalBlob(b *types.Blob) json.RawMessage {
184-
m := map[string]any{
185-
"namespace": b.Namespace[:],
186-
"data": b.Data,
187-
"share_version": b.ShareVersion,
188-
"commitment": b.Commitment,
189-
"index": b.Index,
190-
}
191-
raw, _ := json.Marshal(m) //nolint:errcheck
195+
raw, _ := json.Marshal(blobJSON{ //nolint:errcheck
196+
Namespace: b.Namespace[:],
197+
Data: b.Data,
198+
ShareVersion: b.ShareVersion,
199+
Commitment: b.Commitment,
200+
Index: b.Index,
201+
})
192202
return raw
193203
}

pkg/fetch/celestia_app.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,15 @@ type CelestiaAppFetcher struct {
2929

3030
// bearerCreds implements grpc.PerRPCCredentials for bearer token auth.
3131
type bearerCreds struct {
32-
token string
32+
metadata map[string]string // cached; avoids allocation per RPC
33+
}
34+
35+
func newBearerCreds(token string) bearerCreds {
36+
return bearerCreds{metadata: map[string]string{"authorization": "Bearer " + token}}
3337
}
3438

3539
func (b bearerCreds) GetRequestMetadata(_ context.Context, _ ...string) (map[string]string, error) {
36-
return map[string]string{"authorization": "Bearer " + b.token}, nil
40+
return b.metadata, nil
3741
}
3842

3943
func (b bearerCreds) RequireTransportSecurity() bool { return false }
@@ -45,7 +49,7 @@ func NewCelestiaAppFetcher(grpcAddr, authToken string, log zerolog.Logger) (*Cel
4549
grpc.WithTransportCredentials(insecure.NewCredentials()),
4650
}
4751
if authToken != "" {
48-
opts = append(opts, grpc.WithPerRPCCredentials(bearerCreds{token: authToken}))
52+
opts = append(opts, grpc.WithPerRPCCredentials(newBearerCreds(authToken)))
4953
}
5054

5155
conn, err := grpc.NewClient(grpcAddr, opts...)

pkg/fetch/celestia_node.go

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,9 @@ func NewCelestiaNodeFetcher(ctx context.Context, addr, token string, log zerolog
9191
func httpToWS(addr string) string {
9292
switch {
9393
case strings.HasPrefix(addr, "http://"):
94-
return "ws://" + strings.TrimPrefix(addr, "http://")
94+
return "ws://" + addr[len("http://"):]
9595
case strings.HasPrefix(addr, "https://"):
96-
return "wss://" + strings.TrimPrefix(addr, "https://")
96+
return "wss://" + addr[len("https://"):]
9797
default:
9898
return addr
9999
}
@@ -513,25 +513,28 @@ func isTransientRPCError(err error) bool {
513513
return true
514514
}
515515
msg := strings.ToLower(err.Error())
516-
for _, needle := range []string{
517-
"eof",
518-
"connection reset by peer",
519-
"broken pipe",
520-
"i/o timeout",
521-
"timeout",
522-
"temporarily unavailable",
523-
"connection refused",
524-
"503",
525-
"504",
526-
"502",
527-
} {
516+
for _, needle := range transientNeedles {
528517
if strings.Contains(msg, needle) {
529518
return true
530519
}
531520
}
532521
return false
533522
}
534523

524+
// transientNeedles is allocated once at package init to avoid per-call slice allocation.
525+
var transientNeedles = []string{
526+
"eof",
527+
"connection reset by peer",
528+
"broken pipe",
529+
"i/o timeout",
530+
"timeout",
531+
"temporarily unavailable",
532+
"connection refused",
533+
"503",
534+
"504",
535+
"502",
536+
}
537+
535538
// jsonInt64 handles CometBFT's int64 fields encoded as JSON strings.
536539
type jsonInt64 int64
537540

0 commit comments

Comments
 (0)