Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 128 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Apex — Celestia Namespace Indexer

Lightweight indexer that watches Celestia namespaces, stores blobs/headers in SQLite, and exposes them via JSON-RPC, gRPC, and REST health endpoints. Includes Prometheus observability, a CLI client, and multi-stage Docker build.

## Build Commands

```bash
just build # compile to bin/apex
just test # go test -race ./...
just lint # golangci-lint run
just fmt # gofumpt -w .
just check # tidy + lint + test + build (CI equivalent)
just run # build and run
just clean # remove bin/
just tidy # go mod tidy
```

## Architecture

### Data Flow

```
Celestia Node → Fetcher → Sync Coordinator → Store (SQLite)
→ Notifier → Subscribers
API (JSON-RPC + gRPC + Health)
```
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

The sync coordinator runs in two phases: **backfill** (historical blocks in batches) then **streaming** (live via header subscription). Height observers publish events to the notifier which fans out to API subscribers.

### File Structure

```
cmd/apex/
main.go CLI entrypoint, server wiring, graceful shutdown
client.go Thin HTTP JSON-RPC client for CLI commands
status.go `apex status` command (health endpoint)
blob_cmd.go `apex blob get|list` commands
config_cmd.go `apex config validate|show` commands

config/
config.go Config structs (DataSource, Storage, RPC, Sync, Metrics, Log)
load.go YAML loading, validation, env var override, template generation

pkg/types/
types.go Domain types: Namespace, Blob, Header, SyncState, SyncStatus

pkg/store/
store.go Store interface (PutBlobs, GetBlobs, PutHeader, GetHeader, sync state)
sqlite.go SQLite implementation with metrics instrumentation
migrations/ SQL migration files

pkg/fetch/
fetcher.go DataFetcher + ProofForwarder interfaces
celestia_node.go Celestia node-api client (headers, blobs, subscriptions, proofs)
celestia_app.go Celestia-app gRPC client (headers, blobs, polling subscription)

pkg/sync/
coordinator.go Sync lifecycle: initialize → backfill → stream, tracks heights
backfill.go Concurrent batch backfill with configurable batch size/concurrency
subscription.go Header subscription manager for live streaming

pkg/api/
service.go API service layer (blob/header queries, proof forwarding, subscriptions)
notifier.go Event fan-out to subscribers with bounded buffers
health.go /health and /health/ready HTTP endpoints, HealthStatus JSON
jsonrpc/ JSON-RPC server (go-jsonrpc), blob/header/subscription handlers
grpc/ gRPC server, protobuf service implementations
gen/apex/v1/ Generated protobuf Go code
gen/cosmos/base/tendermint/v1beta1/ Generated Cosmos CometBFT service client

pkg/metrics/
metrics.go Recorder interface (nil-safe), nopRecorder, PromRecorder (Prometheus)
server.go HTTP server for /metrics endpoint

proto/apex/v1/ Protobuf definitions (blob, header, types)
proto/cosmos/base/tendermint/v1beta1/ Minimal Cosmos SDK CometBFT service proto

Dockerfile Multi-stage build (golang builder + distroless runtime)
```

### Key Interfaces

- **`store.Store`** — persistence (SQLite impl, instrumented with metrics)
- **`fetch.DataFetcher`** — block data retrieval (Celestia node JSON-RPC or celestia-app gRPC)
- **`fetch.ProofForwarder`** — proof/inclusion forwarding to upstream node
- **`metrics.Recorder`** — nil-safe metrics abstraction (Prometheus or no-op)
- **`api.StatusProvider`** — sync status for health endpoints (implemented by coordinator)

### Ports (defaults)

| Port | Protocol | Purpose |
|-------|----------|------------------|
| :8080 | HTTP | JSON-RPC + health|
| :9090 | TCP | gRPC |
| :9091 | HTTP | Prometheus /metrics |

### Config

YAML with strict unknown-field rejection. Auth token via `APEX_AUTH_TOKEN` env var only (not in config file). See `config/config.go` for all fields and `DefaultConfig()` for defaults.

## Conventions

- Go 1.25+ (`go.mod` specifies 1.25.0)
- SQLite via `modernc.org/sqlite` (CGo-free)
- Config: YAML (`gopkg.in/yaml.v3`), strict unknown-field rejection
- Logging: `rs/zerolog`
- CLI: `spf13/cobra`
- Metrics: `prometheus/client_golang` behind nil-safe `Recorder` interface
- JSON-RPC: `filecoin-project/go-jsonrpc`
- gRPC: `google.golang.org/grpc` + `google.golang.org/protobuf`
- Protobuf codegen: `buf` (`buf.yaml` + `buf.gen.yaml`)
- Linter: golangci-lint v2 (.golangci.yml v2 format), gocyclo max 15
- Formatter: gofumpt
- Build runner: just (justfile)

## Dependencies

- Only add deps that are strictly necessary
- Prefer stdlib where reasonable
- No CGo dependencies (cross-compilation constraint)

## Testing

- All tests use `-race`
- Table-driven tests preferred
- Test files alongside source (`_test.go`)
- No test frameworks beyond stdlib `testing`
112 changes: 90 additions & 22 deletions cmd/apex/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/evstack/apex/pkg/metrics"
"github.com/evstack/apex/pkg/profile"
"github.com/evstack/apex/pkg/store"
"github.com/evstack/apex/pkg/submit"
syncer "github.com/evstack/apex/pkg/sync"
"github.com/evstack/apex/pkg/types"
)
Expand Down Expand Up @@ -113,6 +114,7 @@ func startCmd() *cobra.Command {
startLog := log.Info().
Str("version", version).
Str("datasource_type", cfg.DataSource.Type).
Bool("submission_enabled", cfg.Submission.Enabled).
Int("namespaces", len(cfg.DataSource.Namespaces))
if cfg.DataSource.Type == dataSourceTypeApp {
startLog = startLog.Str("app_grpc_addr", cfg.DataSource.CelestiaAppGRPCAddr)
Expand Down Expand Up @@ -275,33 +277,18 @@ func runIndexer(ctx context.Context, cfg *config.Config) error {
}
defer dataFetcher.Close() //nolint:errcheck

// Set up API layer.
notifier := api.NewNotifier(cfg.Subscription.BufferSize, cfg.Subscription.MaxSubscribers, log.Logger)
notifier.SetMetrics(rec)
svc := api.NewService(db, dataFetcher, proofFwd, notifier, log.Logger)

// Build and run the sync coordinator with observer hook.
coordOpts := []syncer.Option{
syncer.WithStartHeight(cfg.Sync.StartHeight),
syncer.WithBatchSize(cfg.Sync.BatchSize),
syncer.WithConcurrency(cfg.Sync.Concurrency),
syncer.WithLogger(log.Logger),
syncer.WithMetrics(rec),
syncer.WithObserver(func(h uint64, hdr *types.Header, blobs []types.Blob) {
notifier.Publish(api.HeightEvent{Height: h, Header: hdr, Blobs: blobs})
}),
svc, notifier, closeSubmitter, err := setupAPIService(cfg, db, dataFetcher, proofFwd, rec)
if err != nil {
return err
}
defer closeSubmitter()

backfillOpt, closeBackfill, err := maybeBackfillSourceOption(cfg, log.Logger)
// Build and run the sync coordinator with observer hook.
coordOpts, closeBackfill, err := buildCoordinatorOptions(cfg, notifier, rec)
if err != nil {
return err
}
if closeBackfill != nil {
defer closeBackfill()
}
if backfillOpt != nil {
coordOpts = append(coordOpts, backfillOpt)
}
defer closeBackfill()

coord := syncer.New(db, dataFetcher, coordOpts...)

Expand Down Expand Up @@ -360,6 +347,87 @@ func runIndexer(ctx context.Context, cfg *config.Config) error {
return nil
}

func openBlobSubmitter(cfg *config.Config) (*submit.DirectSubmitter, error) {
if !cfg.Submission.Enabled {
return nil, nil
}

appClient, err := submit.NewGRPCAppClient(cfg.Submission.CelestiaAppGRPCAddr)
if err != nil {
return nil, fmt.Errorf("create submission app client: %w", err)
}

signer, err := submit.LoadSigner(cfg.Submission.SignerPrivateKey)
if err != nil {
_ = appClient.Close()
return nil, fmt.Errorf("load submission signer: %w", err)
}

blobSubmitter, err := submit.NewDirectSubmitter(appClient, signer, submit.DirectConfig{
ChainID: cfg.Submission.ChainID,
GasPrice: cfg.Submission.GasPrice,
MaxGasPrice: cfg.Submission.MaxGasPrice,
ConfirmationTimeout: time.Duration(cfg.Submission.ConfirmationTimeout) * time.Second,
})
if err != nil {
_ = appClient.Close()
return nil, fmt.Errorf("configure submission backend: %w", err)
}

return blobSubmitter, nil
}

func setupAPIService(cfg *config.Config, db store.Store, dataFetcher fetch.DataFetcher, proofFwd fetch.ProofForwarder, rec metrics.Recorder) (*api.Service, *api.Notifier, func(), error) {
blobSubmitter, err := openBlobSubmitter(cfg)
if err != nil {
return nil, nil, nil, err
}

closeSubmitter := func() {}
if blobSubmitter != nil {
closeSubmitter = func() {
_ = blobSubmitter.Close()
}
}

notifier := api.NewNotifier(cfg.Subscription.BufferSize, cfg.Subscription.MaxSubscribers, log.Logger)
notifier.SetMetrics(rec)

svcOpts := make([]api.ServiceOption, 0, 1)
if blobSubmitter != nil {
svcOpts = append(svcOpts, api.WithBlobSubmitter(blobSubmitter))
}

svc := api.NewService(db, dataFetcher, proofFwd, notifier, log.Logger, svcOpts...)
return svc, notifier, closeSubmitter, nil
}

func buildCoordinatorOptions(cfg *config.Config, notifier *api.Notifier, rec metrics.Recorder) ([]syncer.Option, func(), error) {
coordOpts := []syncer.Option{
syncer.WithStartHeight(cfg.Sync.StartHeight),
syncer.WithBatchSize(cfg.Sync.BatchSize),
syncer.WithConcurrency(cfg.Sync.Concurrency),
syncer.WithLogger(log.Logger),
syncer.WithMetrics(rec),
syncer.WithObserver(func(h uint64, hdr *types.Header, blobs []types.Blob) {
notifier.Publish(api.HeightEvent{Height: h, Header: hdr, Blobs: blobs})
}),
}

backfillOpt, closeBackfill, err := maybeBackfillSourceOption(cfg, log.Logger)
if err != nil {
return nil, nil, err
}
if closeBackfill == nil {
closeBackfill = func() {}
}
if backfillOpt != nil {
coordOpts = append(coordOpts, backfillOpt)
}

return coordOpts, closeBackfill, nil
}

func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *metrics.Server, profileSrv *profile.Server) {
stopped := make(chan struct{})
go func() {
Expand Down
61 changes: 61 additions & 0 deletions cmd/apex/main_test.go
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

those tests feel a bit useless.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea will remove this, adding e2e tests

Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package main

import (
"testing"

"github.com/evstack/apex/config"
)

func TestOpenBlobSubmitterDisabled(t *testing.T) {
t.Parallel()

cfg := config.DefaultConfig()
cfg.Submission.Enabled = false

submitter, err := openBlobSubmitter(&cfg)
if err != nil {
t.Fatalf("openBlobSubmitter: %v", err)
}
if submitter != nil {
t.Fatalf("expected nil submitter when submission disabled, got %T", submitter)
}
}

func TestOpenBlobSubmitterCreatesSubmitter(t *testing.T) {
t.Parallel()

cfg := submissionConfigWithAddr("127.0.0.1:65535")
submitter, err := openBlobSubmitter(cfg)
if err != nil {
t.Fatalf("openBlobSubmitter: %v", err)
}
if submitter == nil {
t.Fatal("expected submitter but got nil")
}
if err := submitter.Close(); err != nil {
t.Fatalf("closing submitter: %v", err)
}
}

func TestOpenBlobSubmitterRejectsBadSignerKey(t *testing.T) {
t.Parallel()

cfg := submissionConfigWithAddr("127.0.0.1:65535")
cfg.Submission.SignerPrivateKey = "zzzz"

_, err := openBlobSubmitter(cfg)
if err == nil {
t.Fatal("expected error when signer key is invalid")
}
}

func submissionConfigWithAddr(addr string) *config.Config {
cfg := config.DefaultConfig()
cfg.Submission.Enabled = true
cfg.Submission.CelestiaAppGRPCAddr = addr
cfg.Submission.ChainID = "mocha-4"
cfg.Submission.GasPrice = 0.001
cfg.Submission.ConfirmationTimeout = 30
cfg.Submission.SignerPrivateKey = "0102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f20"
return &cfg
}
22 changes: 22 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Config struct {
Metrics MetricsConfig `yaml:"metrics"`
Profiling ProfilingConfig `yaml:"profiling"`
Log LogConfig `yaml:"log"`
Submission SubmissionConfig `yaml:"submission"`
}

// DataSourceConfig configures the Celestia data source.
Expand Down Expand Up @@ -92,6 +93,18 @@ type LogConfig struct {
Format string `yaml:"format"`
}

// SubmissionConfig contains settings for the future blob submission pipeline.
type SubmissionConfig struct {
Enabled bool `yaml:"enabled"`
CelestiaAppGRPCAddr string `yaml:"app_grpc_addr"`
ChainID string `yaml:"chain_id"`
SignerKey string `yaml:"signer_key"`
SignerPrivateKey string `yaml:"-"`
GasPrice float64 `yaml:"gas_price"`
MaxGasPrice float64 `yaml:"max_gas_price"`
ConfirmationTimeout int `yaml:"confirmation_timeout"` // seconds
}

// DefaultConfig returns a Config with sensible defaults.
func DefaultConfig() Config {
return Config{
Expand Down Expand Up @@ -120,6 +133,15 @@ func DefaultConfig() Config {
BufferSize: 64,
MaxSubscribers: 1024,
},
Submission: SubmissionConfig{
Enabled: false,
CelestiaAppGRPCAddr: "",
ChainID: "",
SignerKey: "",
GasPrice: 0,
MaxGasPrice: 0,
ConfirmationTimeout: 30,
},
Metrics: MetricsConfig{
Enabled: true,
ListenAddr: ":9091",
Expand Down
Loading
Loading