Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
4568ec8
persist: replace file-per-entry with WAL-based persistence
wen-coding Mar 9, 2026
9bea6b0
persist: refactor block/commitqc WAL into generic indexedWAL
wen-coding Mar 9, 2026
89a5fca
persist: fix errcheck lint warnings in blocks.go
wen-coding Mar 9, 2026
dddd764
persist: fix gofmt alignment in wal_test.go
wen-coding Mar 9, 2026
a05ca12
persist: skip non-lane directories in blocks dir with warning
wen-coding Mar 10, 2026
adb2b9c
persist: use Option[LaneID] instead of *LaneID in loadAll
wen-coding Mar 10, 2026
d23861e
persist: derive lane ID from directory name, keep empty WALs open
wen-coding Mar 10, 2026
0c0d8e6
persist: add TestEmptyLaneWALSurvivesReopen
wen-coding Mar 10, 2026
2d02be8
persist: decode lane ID once and add tests for skip paths
wen-coding Mar 10, 2026
5331e88
persist: replace Replay with ReadAll, remove unnecessary sort
wen-coding Mar 10, 2026
1dd6adb
persist: add defense-in-depth checks for WAL index mapping
wen-coding Mar 10, 2026
008c30c
fix: update state_test to expect write-time sequence check
wen-coding Mar 10, 2026
3c70d1d
persist: simplify lazy lane WAL creation in PersistBlock
wen-coding Mar 10, 2026
0212146
persist: detect gaps in loadAll for blocks and commitQCs
wen-coding Mar 10, 2026
fff1fc3
persist: demote per-block load log to DEBUG
wen-coding Mar 10, 2026
1e34e34
persist: document why blocks use per-lane WALs
wen-coding Mar 10, 2026
01cce1c
persist: remove noop flag, use Option for dir/iw instead
wen-coding Mar 10, 2026
9169cfa
persist: simplify Close, consolidate loadAll log, drop slog import
wen-coding Mar 10, 2026
2d431fb
persist: extract directory name string literals into constants
wen-coding Mar 10, 2026
3295dab
persist: handle anchor past all persisted entries, remove ResetNext
wen-coding Mar 10, 2026
edc8383
persist: add TODO for WAL Clear/TruncateAll method
wen-coding Mar 10, 2026
a294737
persist: fix crash-recovery bug in CommitQC DeleteBefore with empty WAL
wen-coding Mar 10, 2026
c907f70
persist: simplify startup re-persist to only anchor's CommitQC
wen-coding Mar 10, 2026
422aa8f
persist: review feedback — disable fsync, harden ReadAll, fix Close
wen-coding Mar 11, 2026
40cecc0
Merge branch 'main' into wen/use_wal_for_persistence
wen-coding Mar 11, 2026
cd09aaf
persist: use TruncateAll instead of close-remove-reopen for Reset
wen-coding Mar 11, 2026
1b3e3c1
fix stale "reset" references in test comments to say "TruncateAll"/"t…
wen-coding Mar 11, 2026
b08bb25
Merge branch 'main' into wen/use_wal_for_persistence
wen-coding Mar 12, 2026
39a625e
Merge branch 'main' into wen/use_wal_for_persistence
wen-coding Mar 12, 2026
bd94691
Remove logger arg from NewWAL call after upstream API change
wen-coding Mar 12, 2026
3310313
Add time-based retention for stale lane WAL deletion
wen-coding Mar 13, 2026
963b356
Derive commitQC WAL truncation point from anchor
wen-coding Mar 15, 2026
52963c2
Add no-op persister comments to blocks.go and commitqcs.go
wen-coding Mar 15, 2026
aeeddbf
Unexport Close() on BlockPersister and CommitQCPersister
wen-coding Mar 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sei-tendermint/internal/autobahn/avail/inner.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ import (
"github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils"
)

// TODO: when dynamic committee changes are supported, newly joined members
// must be added to blocks, votes, nextBlockToPersist, and persistedBlockStart.
// Currently all four are initialized once in newInner from c.Lanes().All().
// BlockPersister already handles lazy lane WAL creation, but DeleteBefore
// removes lanes not in laneFirsts, so the new member must also appear in
// inner.blocks before the next persist cycle.
type inner struct {
latestAppQC utils.Option[*types.AppQC]
latestCommitQC utils.AtomicSend[utils.Option[*types.CommitQC]]
Expand Down
75 changes: 49 additions & 26 deletions sei-tendermint/internal/autobahn/avail/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,20 +157,31 @@ func NewState(key types.SecretKey, data *data.State, stateDir utils.Option[strin
return nil, err
}

// Delete files below the prune anchor that were filtered out by
// loadPersistedState. Also reset the CommitQC persister's cursor to
// match the post-prune range.
// Truncate WAL entries below the prune anchor that were filtered out by
// loadPersistedState. Must include all current committee members:
// DeleteBefore removes lane WALs not present in the map.
laneFirsts := make(map[types.LaneID]types.BlockNumber, len(inner.blocks))
for lane, q := range inner.blocks {
laneFirsts[lane] = q.first
}
if err := pers.blocks.DeleteBefore(laneFirsts); err != nil {
return nil, fmt.Errorf("prune stale block files: %w", err)
return nil, fmt.Errorf("prune stale block WAL entries: %w", err)
}
if err := pers.commitQCs.DeleteBefore(inner.commitQCs.first); err != nil {
return nil, fmt.Errorf("prune stale commitQC files: %w", err)
return nil, fmt.Errorf("prune stale commitQC WAL entries: %w", err)
}
// After a WAL reset (anchor advanced past all entries), write back the
// anchor's CommitQC that was lost when the WAL was cleared. In the
// normal case this is a no-op (idx < cp.next, already on disk).
// CommitQCs loaded from the WAL are guaranteed to still be on disk
// because DeleteBefore only removes entries below the anchor.
if ls, ok := loaded.Get(); ok {
if anchor, ok := ls.pruneAnchor.Get(); ok {
if err := pers.commitQCs.PersistCommitQC(anchor.CommitQC); err != nil {
return nil, fmt.Errorf("re-persist anchor commitqc: %w", err)
}
}
}
pers.commitQCs.ResetNext(inner.commitQCs.next)

return &State{
key: key,
Expand Down Expand Up @@ -632,10 +643,14 @@ func (s *State) Run(ctx context.Context) error {
// runPersist is the main loop for the persist goroutine.
// Write order:
// 1. Prune anchor (AppQC + CommitQC pair) — the crash-recovery watermark.
// 2. CommitQCs in order, then publish LastCommitQC immediately
// 2. Prune old blocks and CommitQCs (safe because the anchor is already durable).
// 3. CommitQCs in order, then publish LastCommitQC immediately
// so consensus can advance without waiting for block writes.
// 3. Blocks per lane in order, markBlockPersisted after each.
// 4. Prune old blocks and CommitQCs.
// 4. Blocks per lane in order, markBlockPersisted after each.
//
// Pruning (step 2) must happen before writes (steps 3-4) because the WAL
// requires contiguous indices — if the anchor advanced past all persisted
// entries, DeleteBefore resets the WAL so new writes start clean.
//
// The prune anchor is a pruning watermark: on restart we resume from it.
//
Expand All @@ -654,16 +669,32 @@ func (s *State) runPersist(ctx context.Context, pers persisters) error {
}

// 1. Persist prune anchor first — establishes the crash-recovery watermark.
// CommitQC WAL pruning is co-located here so the truncation point
// is derived directly from the anchor, making the safety invariant
// explicit: we only truncate entries the on-disk anchor covers.
if anchor, ok := batch.pruneAnchor.Get(); ok {
if err := pers.pruneAnchor.Persist(PruneAnchorConv.Encode(anchor)); err != nil {
return fmt.Errorf("persist prune anchor: %w", err)
}
s.advancePersistedBlockStart(anchor.CommitQC)
lastPersistedAppQCNext = anchor.CommitQC.Proposal().Index() + 1

if err := pers.commitQCs.DeleteBefore(anchor.CommitQC.Proposal().Index()); err != nil {
return fmt.Errorf("commitqc deleteBefore: %w", err)
}
}

// 2. Prune block WAL entries. Must happen before writes because the
// WAL requires contiguous indices — if the anchor advanced past
// all persisted entries, DeleteBefore resets the WAL so new writes
// start clean. Runs every cycle (not just on anchor change) so
// that stale lane retention timeouts are evaluated promptly.
if err := pers.blocks.DeleteBefore(batch.laneFirsts); err != nil {
return fmt.Errorf("block deleteBefore: %w", err)
}

// 2. Persist new CommitQCs, then publish immediately so consensus
// can advance without waiting for block writes or pruning.
// 3. Persist new CommitQCs, then publish immediately so consensus
// can advance without waiting for block writes.
for _, qc := range batch.commitQCs {
if err := pers.commitQCs.PersistCommitQC(qc); err != nil {
return fmt.Errorf("persist commitqc %d: %w", qc.Index(), err)
Expand All @@ -673,32 +704,23 @@ func (s *State) runPersist(ctx context.Context, pers persisters) error {
s.markCommitQCsPersisted(batch.commitQCs[len(batch.commitQCs)-1])
}

// 3. Persist blocks (mark each individually for vote latency).
// 4. Persist blocks (mark each individually for vote latency).
for _, proposal := range batch.blocks {
h := proposal.Msg().Block().Header()
if err := pers.blocks.PersistBlock(proposal); err != nil {
return fmt.Errorf("persist block %s/%d: %w", h.Lane(), h.BlockNumber(), err)
}
s.markBlockPersisted(h.Lane(), h.BlockNumber()+1)
}

// 4. Prune old data.
if err := pers.blocks.DeleteBefore(batch.laneFirsts); err != nil {
return fmt.Errorf("block deleteBefore: %w", err)
}
if err := pers.commitQCs.DeleteBefore(batch.commitQCFirst); err != nil {
return fmt.Errorf("commitqc deleteBefore: %w", err)
}
}
}

// persistBatch holds the data collected under lock for one persist iteration.
type persistBatch struct {
blocks []*types.Signed[*types.LaneProposal]
commitQCs []*types.CommitQC
pruneAnchor utils.Option[*PruneAnchor]
laneFirsts map[types.LaneID]types.BlockNumber
commitQCFirst types.RoadIndex
blocks []*types.Signed[*types.LaneProposal]
commitQCs []*types.CommitQC
pruneAnchor utils.Option[*PruneAnchor]
laneFirsts map[types.LaneID]types.BlockNumber
}

// advancePersistedBlockStart updates the per-lane block admission watermark
Expand Down Expand Up @@ -759,6 +781,8 @@ func (s *State) collectPersistBatch(ctx context.Context, lastPersistedAppQCNext
}); err != nil {
return b, err
}
// Must include all current committee members: DeleteBefore removes
// lane WALs not present in the map.
b.laneFirsts = make(map[types.LaneID]types.BlockNumber, len(inner.blocks))
for lane, q := range inner.blocks {
start := max(inner.nextBlockToPersist[lane], q.first)
Expand All @@ -768,7 +792,6 @@ func (s *State) collectPersistBatch(ctx context.Context, lastPersistedAppQCNext
b.laneFirsts[lane] = q.first
}
commitQCNext = max(commitQCNext, inner.commitQCs.first)
b.commitQCFirst = inner.commitQCs.first
for n := commitQCNext; n < inner.commitQCs.next; n++ {
b.commitQCs = append(b.commitQCs, inner.commitQCs.q[n])
}
Expand Down
59 changes: 51 additions & 8 deletions sei-tendermint/internal/autobahn/avail/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,6 @@

t.Run("non-contiguous commitQC files return error", func(t *testing.T) {
dir := t.TempDir()
ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]())

// Build 6 sequential CommitQCs (indices 0-5).
allQCs := make([]*types.CommitQC, 6)
Expand All @@ -649,19 +648,63 @@
CommitQc: types.CommitQCConv.Encode(allQCs[0]),
}))

// Persist QCs 0, 1, 2 contiguously, then skip to 5 (simulating
// corruption or manual tampering). Since the anchor is persisted
// first, a gap should never occur normally — treat it as an error.
// Persist QCs 0, 1, 2 contiguously, then try to skip to 5.
// PersistCommitQC enforces strict sequential order, so the gap
// is caught at write time rather than at load time.
cp, _, err := persist.NewCommitQCPersister(utils.Some(dir))
require.NoError(t, err)
for i := 0; i < 3; i++ {
require.NoError(t, cp.PersistCommitQC(allQCs[i]))
}
require.NoError(t, cp.PersistCommitQC(allQCs[5]))

_, err = NewState(keys[0], ds, utils.Some(dir))
err = cp.PersistCommitQC(allQCs[5])
require.Error(t, err)
require.Contains(t, err.Error(), "non-contiguous")
require.Contains(t, err.Error(), "out of sequence")
require.NoError(t, cp.Close())

Check failure on line 662 in sei-tendermint/internal/autobahn/avail/state_test.go

View workflow job for this annotation

GitHub Actions / Coverage

cp.Close undefined (type *persist.CommitQCPersister has no field or method Close, but does have unexported method close)

Check failure on line 662 in sei-tendermint/internal/autobahn/avail/state_test.go

View workflow job for this annotation

GitHub Actions / Race Detection

cp.Close undefined (type *persist.CommitQCPersister has no field or method Close, but does have unexported method close)
})

t.Run("anchor past all persisted commitQCs truncates WAL", func(t *testing.T) {
dir := t.TempDir()
ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]())

// Build a chain of 10 CommitQCs (indices 0-9).
qcs := make([]*types.CommitQC, 10)
prev := utils.None[*types.CommitQC]()
for i := range qcs {
qcs[i] = makeCommitQC(rng, committee, keys, prev, nil, utils.None[*types.AppQC]())
prev = utils.Some(qcs[i])
}

// Persist only indices 0-4 to the CommitQC WAL.
cp, _, err := persist.NewCommitQCPersister(utils.Some(dir))
require.NoError(t, err)
for i := 0; i < 5; i++ {
require.NoError(t, cp.PersistCommitQC(qcs[i]))
}
require.NoError(t, cp.Close())

Check failure on line 683 in sei-tendermint/internal/autobahn/avail/state_test.go

View workflow job for this annotation

GitHub Actions / Coverage

cp.Close undefined (type *persist.CommitQCPersister has no field or method Close, but does have unexported method close)

Check failure on line 683 in sei-tendermint/internal/autobahn/avail/state_test.go

View workflow job for this annotation

GitHub Actions / Race Detection

cp.Close undefined (type *persist.CommitQCPersister has no field or method Close, but does have unexported method close)

// Persist a prune anchor at index 9 — well past the persisted range.
appProposal := types.NewAppProposal(50, 9, types.GenAppHash(rng))
appQC := types.NewAppQC(makeAppVotes(keys, appProposal))
prunePers, _, err := persist.NewPersister[*pb.PersistedAvailPruneAnchor](utils.Some(dir), innerFile)
require.NoError(t, err)
require.NoError(t, prunePers.Persist(&pb.PersistedAvailPruneAnchor{
AppQc: types.AppQCConv.Encode(appQC),
CommitQc: types.CommitQCConv.Encode(qcs[9]),
}))

// NewState should succeed: DeleteBefore truncates the stale WAL,
// then the re-persist loop writes the anchor's CommitQC back.
state, err := NewState(keys[0], ds, utils.Some(dir))
require.NoError(t, err)

require.Equal(t, types.RoadIndex(9), state.FirstCommitQC())
latest, ok := state.LastCommitQC().Load().Get()
require.True(t, ok)
require.NoError(t, utils.TestDiff(qcs[9], latest))

got, ok := state.LastAppQC().Get()
require.True(t, ok)
require.Equal(t, types.RoadIndex(9), got.Proposal().RoadIndex())
})

t.Run("corrupt AppQC data returns error", func(t *testing.T) {
Expand Down
Loading
Loading