diff --git a/sei-tendermint/internal/autobahn/avail/inner.go b/sei-tendermint/internal/autobahn/avail/inner.go index 167c02e486..cfb9cebfee 100644 --- a/sei-tendermint/internal/autobahn/avail/inner.go +++ b/sei-tendermint/internal/autobahn/avail/inner.go @@ -9,6 +9,12 @@ import ( "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" ) +// TODO: when dynamic committee changes are supported, newly joined members +// must be added to blocks, votes, nextBlockToPersist, and persistedBlockStart. +// Currently all four are initialized once in newInner from c.Lanes().All(). +// BlockPersister already handles lazy lane WAL creation, but DeleteBefore +// removes lanes not in laneFirsts, so the new member must also appear in +// inner.blocks before the next persist cycle. type inner struct { latestAppQC utils.Option[*types.AppQC] latestCommitQC utils.AtomicSend[utils.Option[*types.CommitQC]] diff --git a/sei-tendermint/internal/autobahn/avail/state.go b/sei-tendermint/internal/autobahn/avail/state.go index aa070096d8..a21b9ad0fe 100644 --- a/sei-tendermint/internal/autobahn/avail/state.go +++ b/sei-tendermint/internal/autobahn/avail/state.go @@ -157,20 +157,31 @@ func NewState(key types.SecretKey, data *data.State, stateDir utils.Option[strin return nil, err } - // Delete files below the prune anchor that were filtered out by - // loadPersistedState. Also reset the CommitQC persister's cursor to - // match the post-prune range. + // Truncate WAL entries below the prune anchor that were filtered out by + // loadPersistedState. Must include all current committee members: + // DeleteBefore removes lane WALs not present in the map. laneFirsts := make(map[types.LaneID]types.BlockNumber, len(inner.blocks)) for lane, q := range inner.blocks { laneFirsts[lane] = q.first } if err := pers.blocks.DeleteBefore(laneFirsts); err != nil { - return nil, fmt.Errorf("prune stale block files: %w", err) + return nil, fmt.Errorf("prune stale block WAL entries: %w", err) } if err := pers.commitQCs.DeleteBefore(inner.commitQCs.first); err != nil { - return nil, fmt.Errorf("prune stale commitQC files: %w", err) + return nil, fmt.Errorf("prune stale commitQC WAL entries: %w", err) + } + // After a WAL reset (anchor advanced past all entries), write back the + // anchor's CommitQC that was lost when the WAL was cleared. In the + // normal case this is a no-op (idx < cp.next, already on disk). + // CommitQCs loaded from the WAL are guaranteed to still be on disk + // because DeleteBefore only removes entries below the anchor. + if ls, ok := loaded.Get(); ok { + if anchor, ok := ls.pruneAnchor.Get(); ok { + if err := pers.commitQCs.PersistCommitQC(anchor.CommitQC); err != nil { + return nil, fmt.Errorf("re-persist anchor commitqc: %w", err) + } + } } - pers.commitQCs.ResetNext(inner.commitQCs.next) return &State{ key: key, @@ -632,10 +643,14 @@ func (s *State) Run(ctx context.Context) error { // runPersist is the main loop for the persist goroutine. // Write order: // 1. Prune anchor (AppQC + CommitQC pair) — the crash-recovery watermark. -// 2. CommitQCs in order, then publish LastCommitQC immediately +// 2. Prune old blocks and CommitQCs (safe because the anchor is already durable). +// 3. CommitQCs in order, then publish LastCommitQC immediately // so consensus can advance without waiting for block writes. -// 3. Blocks per lane in order, markBlockPersisted after each. -// 4. Prune old blocks and CommitQCs. +// 4. Blocks per lane in order, markBlockPersisted after each. +// +// Pruning (step 2) must happen before writes (steps 3-4) because the WAL +// requires contiguous indices — if the anchor advanced past all persisted +// entries, DeleteBefore resets the WAL so new writes start clean. // // The prune anchor is a pruning watermark: on restart we resume from it. // @@ -654,16 +669,32 @@ func (s *State) runPersist(ctx context.Context, pers persisters) error { } // 1. Persist prune anchor first — establishes the crash-recovery watermark. + // CommitQC WAL pruning is co-located here so the truncation point + // is derived directly from the anchor, making the safety invariant + // explicit: we only truncate entries the on-disk anchor covers. if anchor, ok := batch.pruneAnchor.Get(); ok { if err := pers.pruneAnchor.Persist(PruneAnchorConv.Encode(anchor)); err != nil { return fmt.Errorf("persist prune anchor: %w", err) } s.advancePersistedBlockStart(anchor.CommitQC) lastPersistedAppQCNext = anchor.CommitQC.Proposal().Index() + 1 + + if err := pers.commitQCs.DeleteBefore(anchor.CommitQC.Proposal().Index()); err != nil { + return fmt.Errorf("commitqc deleteBefore: %w", err) + } + } + + // 2. Prune block WAL entries. Must happen before writes because the + // WAL requires contiguous indices — if the anchor advanced past + // all persisted entries, DeleteBefore resets the WAL so new writes + // start clean. Runs every cycle (not just on anchor change) so + // that stale lane retention timeouts are evaluated promptly. + if err := pers.blocks.DeleteBefore(batch.laneFirsts); err != nil { + return fmt.Errorf("block deleteBefore: %w", err) } - // 2. Persist new CommitQCs, then publish immediately so consensus - // can advance without waiting for block writes or pruning. + // 3. Persist new CommitQCs, then publish immediately so consensus + // can advance without waiting for block writes. for _, qc := range batch.commitQCs { if err := pers.commitQCs.PersistCommitQC(qc); err != nil { return fmt.Errorf("persist commitqc %d: %w", qc.Index(), err) @@ -673,7 +704,7 @@ func (s *State) runPersist(ctx context.Context, pers persisters) error { s.markCommitQCsPersisted(batch.commitQCs[len(batch.commitQCs)-1]) } - // 3. Persist blocks (mark each individually for vote latency). + // 4. Persist blocks (mark each individually for vote latency). for _, proposal := range batch.blocks { h := proposal.Msg().Block().Header() if err := pers.blocks.PersistBlock(proposal); err != nil { @@ -681,24 +712,15 @@ func (s *State) runPersist(ctx context.Context, pers persisters) error { } s.markBlockPersisted(h.Lane(), h.BlockNumber()+1) } - - // 4. Prune old data. - if err := pers.blocks.DeleteBefore(batch.laneFirsts); err != nil { - return fmt.Errorf("block deleteBefore: %w", err) - } - if err := pers.commitQCs.DeleteBefore(batch.commitQCFirst); err != nil { - return fmt.Errorf("commitqc deleteBefore: %w", err) - } } } // persistBatch holds the data collected under lock for one persist iteration. type persistBatch struct { - blocks []*types.Signed[*types.LaneProposal] - commitQCs []*types.CommitQC - pruneAnchor utils.Option[*PruneAnchor] - laneFirsts map[types.LaneID]types.BlockNumber - commitQCFirst types.RoadIndex + blocks []*types.Signed[*types.LaneProposal] + commitQCs []*types.CommitQC + pruneAnchor utils.Option[*PruneAnchor] + laneFirsts map[types.LaneID]types.BlockNumber } // advancePersistedBlockStart updates the per-lane block admission watermark @@ -759,6 +781,8 @@ func (s *State) collectPersistBatch(ctx context.Context, lastPersistedAppQCNext }); err != nil { return b, err } + // Must include all current committee members: DeleteBefore removes + // lane WALs not present in the map. b.laneFirsts = make(map[types.LaneID]types.BlockNumber, len(inner.blocks)) for lane, q := range inner.blocks { start := max(inner.nextBlockToPersist[lane], q.first) @@ -768,7 +792,6 @@ func (s *State) collectPersistBatch(ctx context.Context, lastPersistedAppQCNext b.laneFirsts[lane] = q.first } commitQCNext = max(commitQCNext, inner.commitQCs.first) - b.commitQCFirst = inner.commitQCs.first for n := commitQCNext; n < inner.commitQCs.next; n++ { b.commitQCs = append(b.commitQCs, inner.commitQCs.q[n]) } diff --git a/sei-tendermint/internal/autobahn/avail/state_test.go b/sei-tendermint/internal/autobahn/avail/state_test.go index d0e95a5bc0..f4acffa0be 100644 --- a/sei-tendermint/internal/autobahn/avail/state_test.go +++ b/sei-tendermint/internal/autobahn/avail/state_test.go @@ -629,7 +629,6 @@ func TestNewStateWithPersistence(t *testing.T) { t.Run("non-contiguous commitQC files return error", func(t *testing.T) { dir := t.TempDir() - ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]()) // Build 6 sequential CommitQCs (indices 0-5). allQCs := make([]*types.CommitQC, 6) @@ -649,19 +648,63 @@ func TestNewStateWithPersistence(t *testing.T) { CommitQc: types.CommitQCConv.Encode(allQCs[0]), })) - // Persist QCs 0, 1, 2 contiguously, then skip to 5 (simulating - // corruption or manual tampering). Since the anchor is persisted - // first, a gap should never occur normally — treat it as an error. + // Persist QCs 0, 1, 2 contiguously, then try to skip to 5. + // PersistCommitQC enforces strict sequential order, so the gap + // is caught at write time rather than at load time. cp, _, err := persist.NewCommitQCPersister(utils.Some(dir)) require.NoError(t, err) for i := 0; i < 3; i++ { require.NoError(t, cp.PersistCommitQC(allQCs[i])) } - require.NoError(t, cp.PersistCommitQC(allQCs[5])) - - _, err = NewState(keys[0], ds, utils.Some(dir)) + err = cp.PersistCommitQC(allQCs[5]) require.Error(t, err) - require.Contains(t, err.Error(), "non-contiguous") + require.Contains(t, err.Error(), "out of sequence") + require.NoError(t, cp.Close()) + }) + + t.Run("anchor past all persisted commitQCs truncates WAL", func(t *testing.T) { + dir := t.TempDir() + ds := data.NewState(&data.Config{Committee: committee}, utils.None[data.BlockStore]()) + + // Build a chain of 10 CommitQCs (indices 0-9). + qcs := make([]*types.CommitQC, 10) + prev := utils.None[*types.CommitQC]() + for i := range qcs { + qcs[i] = makeCommitQC(rng, committee, keys, prev, nil, utils.None[*types.AppQC]()) + prev = utils.Some(qcs[i]) + } + + // Persist only indices 0-4 to the CommitQC WAL. + cp, _, err := persist.NewCommitQCPersister(utils.Some(dir)) + require.NoError(t, err) + for i := 0; i < 5; i++ { + require.NoError(t, cp.PersistCommitQC(qcs[i])) + } + require.NoError(t, cp.Close()) + + // Persist a prune anchor at index 9 — well past the persisted range. + appProposal := types.NewAppProposal(50, 9, types.GenAppHash(rng)) + appQC := types.NewAppQC(makeAppVotes(keys, appProposal)) + prunePers, _, err := persist.NewPersister[*pb.PersistedAvailPruneAnchor](utils.Some(dir), innerFile) + require.NoError(t, err) + require.NoError(t, prunePers.Persist(&pb.PersistedAvailPruneAnchor{ + AppQc: types.AppQCConv.Encode(appQC), + CommitQc: types.CommitQCConv.Encode(qcs[9]), + })) + + // NewState should succeed: DeleteBefore truncates the stale WAL, + // then the re-persist loop writes the anchor's CommitQC back. + state, err := NewState(keys[0], ds, utils.Some(dir)) + require.NoError(t, err) + + require.Equal(t, types.RoadIndex(9), state.FirstCommitQC()) + latest, ok := state.LastCommitQC().Load().Get() + require.True(t, ok) + require.NoError(t, utils.TestDiff(qcs[9], latest)) + + got, ok := state.LastAppQC().Get() + require.True(t, ok) + require.Equal(t, types.RoadIndex(9), got.Proposal().RoadIndex()) }) t.Run("corrupt AppQC data returns error", func(t *testing.T) { diff --git a/sei-tendermint/internal/autobahn/consensus/persist/blocks.go b/sei-tendermint/internal/autobahn/consensus/persist/blocks.go index 93f233ee7d..edeeb252de 100644 --- a/sei-tendermint/internal/autobahn/consensus/persist/blocks.go +++ b/sei-tendermint/internal/autobahn/consensus/persist/blocks.go @@ -1,29 +1,13 @@ -// TODO: Block file persistence is a temporary solution that will be replaced by -// a WAL (Write-Ahead Log) library before launch. CommitQC file persistence -// (commitqcs.go) shares the same migration plan. With a WAL, atomic appends -// eliminate several complexities in both files: -// - Corrupt file handling (WAL handles its own integrity). -// - Per-file naming, parsing, and directory scanning. -// - Orphaned file cleanup (WAL truncation replaces DeleteBefore). -// - Gap handling in newInner (WAL replay is always contiguous). -// -// What survives: the BlockPersister abstraction (PersistBlock/DeleteBefore). - +// TODO: add Prometheus metrics for blocks written, truncated, and stale lanes removed. package persist import ( "encoding/hex" + "errors" "fmt" - "maps" "os" "path/filepath" - "slices" - "strconv" - "strings" - - "log/slog" - - "google.golang.org/protobuf/proto" + "time" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" @@ -32,187 +16,240 @@ import ( var logger = seilog.NewLogger("tendermint", "internal", "autobahn", "consensus", "persist") +const blocksDir = "blocks" + +// defaultStaleRetention is how long a lane WAL is kept after its last write +// before being deleted when the lane is no longer in the committee. This gives +// catching-up peers time to fetch blocks from the stale lane. +const defaultStaleRetention = 30 * time.Minute + // LoadedBlock is a block loaded from disk during state restoration. type LoadedBlock struct { Number types.BlockNumber Proposal *types.Signed[*types.LaneProposal] } -// BlockPersister manages individual block files in a blocks/ subdirectory. -// Each block is stored as _.pb. -// The caller is responsible for driving persistence (typically a goroutine that -// watches in-memory block state and calls PersistBlock / DeleteBefore). -// When noop is true, all disk I/O is skipped. +// laneWAL wraps an indexedWAL with a per-lane block number cursor. +// Block numbers within a lane are contiguous, so the first block number +// is derived: nextBlockNum - Count(). +type laneWAL struct { + *indexedWAL[*types.Signed[*types.LaneProposal]] + nextBlockNum types.BlockNumber +} + +func (lw *laneWAL) firstBlockNum() utils.Option[types.BlockNumber] { + if lw.Count() == 0 { + return utils.None[types.BlockNumber]() + } + return utils.Some(lw.nextBlockNum - types.BlockNumber(lw.Count())) +} + +// BlockPersister manages block persistence using one WAL per lane. +// Each lane gets its own WAL in a subdirectory named by hex-encoded lane ID, +// so truncation is independent per lane. A single shared WAL would be simpler +// but a lane whose blocks are never included in a committed block (e.g. the +// validator is removed from the committee) would prevent truncation of all +// other lanes' entries that follow it. +// When dir is None, all disk I/O is skipped (no-op mode). type BlockPersister struct { - dir string // full path to the blocks/ subdirectory; empty when noop - noop bool + dir utils.Option[string] + lanes map[types.LaneID]*laneWAL + staleRetention time.Duration } -// newNoOpBlockPersister returns a BlockPersister that skips all disk I/O. -// Used when persistence is disabled. -func newNoOpBlockPersister() *BlockPersister { - return &BlockPersister{noop: true} +func laneDir(lane types.LaneID) string { + return hex.EncodeToString(lane.Bytes()) } -// NewBlockPersister creates the blocks/ subdirectory if it doesn't exist and -// returns a block persister. Loads all persisted blocks from disk as sorted -// slices per lane. Corrupt files are skipped; the caller (newInner) returns -// an error if the resulting slices are non-contiguous. -// When stateDir is None, returns a no-op persister that skips all disk I/O. +func newLaneWAL(dir string) (*laneWAL, error) { + iw, err := openIndexedWAL(dir, types.SignedMsgConv[*types.LaneProposal]()) + if err != nil { + return nil, err + } + return &laneWAL{indexedWAL: iw}, nil +} + +// NewBlockPersister opens (or creates) per-lane WALs in subdirectories of +// blocks/ and replays all persisted entries. Returns the persister and loaded +// blocks grouped by lane (sorted by block number). Corrupt tail entries are +// auto-truncated by the WAL library. +// When stateDir is None, returns a no-op persister. func NewBlockPersister(stateDir utils.Option[string]) (*BlockPersister, map[types.LaneID][]LoadedBlock, error) { sd, ok := stateDir.Get() if !ok { - return newNoOpBlockPersister(), nil, nil + return &BlockPersister{lanes: map[types.LaneID]*laneWAL{}, staleRetention: defaultStaleRetention}, nil, nil } - dir := filepath.Join(sd, "blocks") + dir := filepath.Join(sd, blocksDir) if err := os.MkdirAll(dir, 0700); err != nil { return nil, nil, fmt.Errorf("create blocks dir %s: %w", dir, err) } - bp := &BlockPersister{ - dir: dir, - } - blocks, err := bp.loadAll() - if err != nil { - return nil, nil, err - } - return bp, blocks, nil -} - -func blockFilename(lane types.LaneID, n types.BlockNumber) string { - return hex.EncodeToString(lane.Bytes()) + "_" + strconv.FormatUint(uint64(n), 10) + ".pb" -} + bp := &BlockPersister{dir: utils.Some(dir), lanes: map[types.LaneID]*laneWAL{}, staleRetention: defaultStaleRetention} -func parseBlockFilename(name string) (types.LaneID, types.BlockNumber, error) { - name = strings.TrimSuffix(name, ".pb") - parts := strings.SplitN(name, "_", 2) - if len(parts) != 2 { - return types.PublicKey{}, 0, fmt.Errorf("bad block filename %q", name) - } - keyBytes, err := hex.DecodeString(parts[0]) + entries, err := os.ReadDir(dir) if err != nil { - return types.PublicKey{}, 0, fmt.Errorf("bad lane hex in %q: %w", name, err) + return nil, nil, fmt.Errorf("read blocks dir %s: %w", dir, err) } - lane, err := types.PublicKeyFromBytes(keyBytes) - if err != nil { - return types.PublicKey{}, 0, fmt.Errorf("bad lane key in %q: %w", name, err) - } - n, err := strconv.ParseUint(parts[1], 10, 64) - if err != nil { - return types.PublicKey{}, 0, fmt.Errorf("bad block number in %q: %w", name, err) + + allBlocks := map[types.LaneID][]LoadedBlock{} + for _, e := range entries { + if !e.IsDir() { + continue + } + laneBytes, err := hex.DecodeString(e.Name()) + if err != nil { + logger.Warn("skipping unexpected entry in blocks dir", "name", e.Name()) + continue + } + lane, err := types.PublicKeyFromBytes(laneBytes) + if err != nil { + logger.Warn("skipping lane dir with invalid key", "name", e.Name(), "err", err) + continue + } + lanePath := filepath.Join(dir, e.Name()) + lw, err := newLaneWAL(lanePath) + if err != nil { + _ = bp.close() + return nil, nil, fmt.Errorf("open lane WAL in %s: %w", lanePath, err) + } + loaded, err := lw.loadAll() + if err != nil { + _ = lw.Close() + _ = bp.close() + return nil, nil, fmt.Errorf("load lane WAL in %s: %w", lanePath, err) + } + bp.lanes[lane] = lw + if len(loaded) > 0 { + allBlocks[lane] = loaded + } } - return lane, types.BlockNumber(n), nil + + return bp, allBlocks, nil } -// PersistBlock writes a signed lane proposal to its own file. +// PersistBlock writes a signed lane proposal to the per-lane WAL. +// Creates the lane WAL lazily if this is the first block for the lane. func (bp *BlockPersister) PersistBlock(proposal *types.Signed[*types.LaneProposal]) error { - if bp.noop { - return nil + dir, ok := bp.dir.Get() + if !ok { + return nil // no-op persister (persistence disabled) } h := proposal.Msg().Block().Header() - pb := types.SignedMsgConv[*types.LaneProposal]().Encode(proposal) - data, err := proto.Marshal(pb) - if err != nil { - return fmt.Errorf("marshal block %s/%d: %w", h.Lane(), h.BlockNumber(), err) + lane := h.Lane() + lw, ok := bp.lanes[lane] + if !ok { + newLW, err := newLaneWAL(filepath.Join(dir, laneDir(lane))) + if err != nil { + return fmt.Errorf("create lane WAL for %s: %w", lane, err) + } + lw = newLW + bp.lanes[lane] = lw } - path := filepath.Join(bp.dir, blockFilename(h.Lane(), h.BlockNumber())) - return writeAndSync(path, data) + if lw.Count() > 0 && h.BlockNumber() != lw.nextBlockNum { + return fmt.Errorf("block %s/%d out of sequence (next=%d)", lane, h.BlockNumber(), lw.nextBlockNum) + } + if err := lw.Write(proposal); err != nil { + return fmt.Errorf("persist block %s/%d: %w", lane, h.BlockNumber(), err) + } + lw.nextBlockNum = h.BlockNumber() + 1 + return nil } -// DeleteBefore removes persisted block files that are no longer needed. -// For lanes in laneFirsts, deletes files with block number below the map value. -// For lanes NOT in laneFirsts (orphaned from a previous committee/epoch), -// deletes all files — old blocks are not reusable after a committee change. -// An empty/nil laneFirsts is a no-op (no committee info available to judge orphans). -// Returns an error if the directory cannot be read; individual file removal -// failures are logged but do not cause an error. +// DeleteBefore removes persisted blocks per lane by truncating each lane's +// WAL independently. For each lane in the map, blocks below the given +// BlockNumber are removed. Lanes NOT in the map are considered stale +// (validator no longer in committee): their WALs are closed and directories +// removed after staleRetention has elapsed since the last write, giving +// catching-up peers time to fetch the data. func (bp *BlockPersister) DeleteBefore(laneFirsts map[types.LaneID]types.BlockNumber) error { - if bp.noop || len(laneFirsts) == 0 { - return nil + dir, ok := bp.dir.Get() + if !ok { + return nil // no-op persister (persistence disabled) } - entries, err := os.ReadDir(bp.dir) - if err != nil { - return fmt.Errorf("list blocks dir for cleanup: %w", err) + if len(laneFirsts) == 0 { + panic("DeleteBefore called with empty laneFirsts (empty committee)") } - for _, entry := range entries { - if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".pb") { - continue + for lane, first := range laneFirsts { + lw, ok := bp.lanes[lane] + if !ok { + continue // no WAL yet; PersistBlock will create one lazily } - lane, fileN, err := parseBlockFilename(entry.Name()) - if err != nil { + firstBN, ok := lw.firstBlockNum().Get() + if !ok || first <= firstBN { continue } - first, ok := laneFirsts[lane] - if ok && fileN >= first { + if first >= lw.nextBlockNum { + // Anchor advanced past all persisted blocks for this lane. + if err := lw.TruncateAll(); err != nil { + return fmt.Errorf("truncate all lane %s WAL: %w", lane, err) + } continue } - path := filepath.Join(bp.dir, entry.Name()) - if err := os.Remove(path); err != nil && !os.IsNotExist(err) { - logger.Warn("failed to delete block file", "path", path, "err", err) + // Truncate entries below 'first'. The verify callback checks that + // the entry at the new front has the expected block number, catching + // any block-number-to-WAL-index mapping corruption. + walIdx := lw.firstIdx + uint64(first-firstBN) + if err := lw.TruncateBefore(walIdx, func(entry *types.Signed[*types.LaneProposal]) error { + if got := entry.Msg().Block().Header().BlockNumber(); got != first { + return fmt.Errorf("block at WAL index %d has number %d, expected %d (index mapping broken)", walIdx, got, first) + } + return nil + }); err != nil { + return fmt.Errorf("truncate lane %s WAL before block %d: %w", lane, first, err) } } - return nil -} - -// loadAll loads all persisted blocks from the blocks/ directory. -// Returns sorted slices per lane. Corrupt files are skipped; the caller -// (newInner) returns an error on gaps or parent-hash mismatches. -func (bp *BlockPersister) loadAll() (map[types.LaneID][]LoadedBlock, error) { - entries, err := os.ReadDir(bp.dir) - if err != nil { - return nil, fmt.Errorf("read blocks dir %s: %w", bp.dir, err) - } - - raw := map[types.LaneID]map[types.BlockNumber]*types.Signed[*types.LaneProposal]{} - for _, entry := range entries { - if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".pb") { - continue - } - lane, n, err := parseBlockFilename(entry.Name()) - if err != nil { - logger.Warn("skipping unrecognized block file", "file", entry.Name(), "err", err) - continue - } - proposal, err := loadBlockFile(filepath.Join(bp.dir, entry.Name())) - if err != nil { - logger.Warn("skipping corrupt block file", "file", entry.Name(), "err", err) + for lane, lw := range bp.lanes { + if _, ok := laneFirsts[lane]; ok { continue } - h := proposal.Msg().Block().Header() - if h.Lane() != lane || h.BlockNumber() != n { - logger.Warn("skipping block file with mismatched header", - "file", entry.Name(), - "headerLane", h.Lane(), - slog.Uint64("headerNum", uint64(h.BlockNumber())), - "filenameLane", lane, - slog.Uint64("filenameNum", uint64(n)), - ) + if time.Since(lw.LastWriteTime()) < bp.staleRetention { + // Wait longer before real deletion to give catching-up peers a chance. continue } - if raw[lane] == nil { - raw[lane] = map[types.BlockNumber]*types.Signed[*types.LaneProposal]{} - } - raw[lane][n] = proposal - logger.Info("loaded persisted block", "lane", lane.String(), slog.Uint64("block", uint64(n))) + logger.Info("removing stale lane WAL", "lane", lane, "lastWrite", lw.LastWriteTime()) + _ = lw.Close() + _ = os.RemoveAll(filepath.Join(dir, laneDir(lane))) + delete(bp.lanes, lane) } + return nil +} - result := map[types.LaneID][]LoadedBlock{} - for lane, bs := range raw { - sorted := slices.Sorted(maps.Keys(bs)) - blocks := make([]LoadedBlock, 0, len(sorted)) - for _, n := range sorted { - blocks = append(blocks, LoadedBlock{Number: n, Proposal: bs[n]}) +// close shuts down all per-lane WALs. Internal: only used by tests and +// NewBlockPersister (error cleanup). Production code does not close WALs +// at shutdown — the OS reclaims resources on process exit. +func (bp *BlockPersister) close() error { + if _, ok := bp.dir.Get(); !ok { + return nil // no-op persister (persistence disabled) + } + var errs []error + for _, lw := range bp.lanes { + if err := lw.Close(); err != nil { + errs = append(errs, err) } - result[lane] = blocks } - return result, nil + return errors.Join(errs...) } -func loadBlockFile(path string) (*types.Signed[*types.LaneProposal], error) { - data, err := os.ReadFile(path) //nolint:gosec // path is constructed from operator-configured stateDir + hardcoded filename; not user-controlled +// loadAll reads all entries from the lane WAL and returns the loaded blocks. +// Also restores nextBlockNum from the last entry. +func (lw *laneWAL) loadAll() ([]LoadedBlock, error) { + entries, err := lw.ReadAll() if err != nil { return nil, err } - conv := types.SignedMsgConv[*types.LaneProposal]() - return conv.Unmarshal(data) + loaded := make([]LoadedBlock, 0, len(entries)) + for i, proposal := range entries { + h := proposal.Msg().Block().Header() + if i > 0 && h.BlockNumber() != lw.nextBlockNum { + return nil, fmt.Errorf("gap in lane %s: block %d follows %d", h.Lane(), h.BlockNumber(), lw.nextBlockNum-1) + } + lw.nextBlockNum = h.BlockNumber() + 1 + loaded = append(loaded, LoadedBlock{Number: h.BlockNumber(), Proposal: proposal}) + } + if len(loaded) > 0 { + first, last := loaded[0].Number, loaded[len(loaded)-1].Number + logger.Debug("loaded persisted blocks", "lane", entries[0].Msg().Block().Header().Lane().String(), + "first", first, "last", last, "count", len(loaded)) + } + return loaded, nil } diff --git a/sei-tendermint/internal/autobahn/consensus/persist/blocks_test.go b/sei-tendermint/internal/autobahn/consensus/persist/blocks_test.go index e248ca2bdf..829ae8f289 100644 --- a/sei-tendermint/internal/autobahn/consensus/persist/blocks_test.go +++ b/sei-tendermint/internal/autobahn/consensus/persist/blocks_test.go @@ -1,7 +1,6 @@ package persist import ( - "encoding/hex" "os" "path/filepath" "testing" @@ -23,10 +22,10 @@ func TestNewBlockPersisterEmptyDir(t *testing.T) { require.NoError(t, err) require.NotNil(t, bp) require.Equal(t, 0, len(blocks)) - // blocks/ subdirectory should exist - fi, err := os.Stat(filepath.Join(dir, "blocks")) + fi, err := os.Stat(filepath.Join(dir, blocksDir)) require.NoError(t, err) require.True(t, fi.IsDir()) + require.NoError(t, bp.close()) } func TestPersistBlockAndLoad(t *testing.T) { @@ -42,6 +41,7 @@ func TestPersistBlockAndLoad(t *testing.T) { b1 := testSignedProposal(rng, key, 1) require.NoError(t, bp.PersistBlock(b0)) require.NoError(t, bp.PersistBlock(b1)) + require.NoError(t, bp.close()) bp2, blocks, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) @@ -52,6 +52,7 @@ func TestPersistBlockAndLoad(t *testing.T) { require.Equal(t, types.BlockNumber(1), blocks[lane][1].Number) require.NoError(t, utils.TestDiff(b0, blocks[lane][0].Proposal)) require.NoError(t, utils.TestDiff(b1, blocks[lane][1].Proposal)) + require.NoError(t, bp2.close()) } func TestPersistBlockMultipleLanes(t *testing.T) { @@ -69,6 +70,7 @@ func TestPersistBlockMultipleLanes(t *testing.T) { b2 := testSignedProposal(rng, key2, 0) require.NoError(t, bp.PersistBlock(b1)) require.NoError(t, bp.PersistBlock(b2)) + require.NoError(t, bp.close()) _, blocks, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) @@ -79,7 +81,7 @@ func TestPersistBlockMultipleLanes(t *testing.T) { require.NoError(t, utils.TestDiff(b2, blocks[lane2][0].Proposal)) } -func TestLoadSkipsCorruptBlockFile(t *testing.T) { +func TestDeleteBeforeRemovesOldKeepsNew(t *testing.T) { rng := utils.TestRng() dir := t.TempDir() @@ -88,46 +90,78 @@ func TestLoadSkipsCorruptBlockFile(t *testing.T) { bp, _, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) - // Write a good block - b0 := testSignedProposal(rng, key, 0) - require.NoError(t, bp.PersistBlock(b0)) + for i := types.BlockNumber(0); i < 5; i++ { + require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key, i))) + } - // Write a corrupt file with a valid filename - corruptName := blockFilename(lane, 1) - require.NoError(t, os.WriteFile(filepath.Join(dir, "blocks", corruptName), []byte("corrupt"), 0600)) + require.NoError(t, bp.DeleteBefore(map[types.LaneID]types.BlockNumber{lane: 3})) + require.NoError(t, bp.close()) _, blocks, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) - require.Equal(t, 1, len(blocks[lane]), "should only load the valid block") - require.NoError(t, utils.TestDiff(b0, blocks[lane][0].Proposal)) + require.Equal(t, 2, len(blocks[lane]), "should have blocks 3 and 4") + require.Equal(t, types.BlockNumber(3), blocks[lane][0].Number) + require.Equal(t, types.BlockNumber(4), blocks[lane][1].Number) } -func TestLoadCorruptMidSequenceCreatesGap(t *testing.T) { +func TestDeleteBeforeAndRestart(t *testing.T) { rng := utils.TestRng() dir := t.TempDir() - key := types.GenSecretKey(rng) - lane := key.Public() + key1 := types.GenSecretKey(rng) + key2 := types.GenSecretKey(rng) + key3 := types.GenSecretKey(rng) + lane1 := key1.Public() + lane2 := key2.Public() + lane3 := key3.Public() // never persisted — exercises the "no WAL yet" path bp, _, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) - // Persist blocks 0, 2 (valid) and corrupt block 1. - // After skipping corrupt-1, raw has {0, 2} → returned with gap. - b0 := testSignedProposal(rng, key, 0) - b2 := testSignedProposal(rng, key, 2) - require.NoError(t, bp.PersistBlock(b0)) - require.NoError(t, bp.PersistBlock(b2)) - corruptName := blockFilename(lane, 1) - require.NoError(t, os.WriteFile(filepath.Join(dir, "blocks", corruptName), []byte("corrupt"), 0600)) + for i := types.BlockNumber(0); i < 3; i++ { + require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key1, i))) + require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key2, i))) + } - _, blocks, err := NewBlockPersister(utils.Some(dir)) + // lane1: truncate old blocks, lane2: delete nothing (first=0), lane3: empty (no WAL). + require.NoError(t, bp.DeleteBefore(map[types.LaneID]types.BlockNumber{lane1: 2, lane2: 0, lane3: 0})) + require.NoError(t, bp.close()) + + // Restart — verify varied lane states load correctly. + bp2, blocks, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) - require.Equal(t, 2, len(blocks[lane]), "corrupt skipped; both valid blocks returned") - require.Equal(t, types.BlockNumber(0), blocks[lane][0].Number) - require.Equal(t, types.BlockNumber(2), blocks[lane][1].Number) + require.Equal(t, 1, len(blocks[lane1]), "lane1 should have block 2") + require.Equal(t, types.BlockNumber(2), blocks[lane1][0].Number) + require.Equal(t, 3, len(blocks[lane2]), "lane2 should have all 3 blocks") + require.Equal(t, 0, len(blocks[lane3]), "lane3 never had blocks") + + // Persist more after restart, then restart again to verify continuity. + require.NoError(t, bp2.PersistBlock(testSignedProposal(rng, key1, 3))) + require.NoError(t, bp2.PersistBlock(testSignedProposal(rng, key2, 3))) + require.NoError(t, bp2.close()) + + _, blocks2, err := NewBlockPersister(utils.Some(dir)) + require.NoError(t, err) + require.Equal(t, 2, len(blocks2[lane1]), "lane1 should have blocks 2,3") + require.Equal(t, types.BlockNumber(3), blocks2[lane1][1].Number) + require.Equal(t, 4, len(blocks2[lane2]), "lane2 should have blocks 0..3") + require.Equal(t, types.BlockNumber(3), blocks2[lane2][3].Number) } -func TestLoadReturnsAllWithGap(t *testing.T) { +func TestNoOpBlockPersister(t *testing.T) { + bp, blocks, err := NewBlockPersister(utils.None[string]()) + require.NoError(t, err) + require.NotNil(t, bp) + require.Equal(t, 0, len(blocks)) + + rng := utils.TestRng() + key := types.GenSecretKey(rng) + lane := key.Public() + require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key, 0))) + require.NoError(t, bp.DeleteBefore(map[types.LaneID]types.BlockNumber{lane: 0})) + require.NoError(t, bp.close()) +} + +func TestDeleteBeforeThenPersistMore(t *testing.T) { rng := utils.TestRng() dir := t.TempDir() @@ -136,89 +170,114 @@ func TestLoadReturnsAllWithGap(t *testing.T) { bp, _, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) - // Persist blocks 3, 4, 6, 7 (gap at 5). All four returned sorted. - for _, n := range []types.BlockNumber{3, 4, 6, 7} { - require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key, n))) + // Persist 0..4, delete before 3, then persist 5. + for i := types.BlockNumber(0); i < 5; i++ { + require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key, i))) } + require.NoError(t, bp.DeleteBefore(map[types.LaneID]types.BlockNumber{lane: 3})) + require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key, 5))) + require.NoError(t, bp.close()) _, blocks, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) - require.Equal(t, 4, len(blocks[lane]), "should return all valid files including after gap") + require.Equal(t, 3, len(blocks[lane]), "should have blocks 3, 4, 5") require.Equal(t, types.BlockNumber(3), blocks[lane][0].Number) require.Equal(t, types.BlockNumber(4), blocks[lane][1].Number) - require.Equal(t, types.BlockNumber(6), blocks[lane][2].Number) - require.Equal(t, types.BlockNumber(7), blocks[lane][3].Number) + require.Equal(t, types.BlockNumber(5), blocks[lane][2].Number) } -func TestLoadSkipsMismatchedHeader(t *testing.T) { +func TestDeleteBeforePastAllBlocks(t *testing.T) { rng := utils.TestRng() dir := t.TempDir() - key1 := types.GenSecretKey(rng) - key2 := types.GenSecretKey(rng) - lane1 := key1.Public() - lane2 := key2.Public() + key := types.GenSecretKey(rng) + lane := key.Public() bp, _, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) - // Write block for lane1 but save it under lane2's filename - b := testSignedProposal(rng, key1, 5) - require.NoError(t, bp.PersistBlock(b)) + for i := types.BlockNumber(0); i < 3; i++ { + require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key, i))) + } + + // Anchor advanced past everything (nextBlockNum is 3, first=10). + require.NoError(t, bp.DeleteBefore(map[types.LaneID]types.BlockNumber{lane: 10})) - // Rename the file to use lane2 in the filename - oldPath := filepath.Join(dir, "blocks", blockFilename(lane1, 5)) - newPath := filepath.Join(dir, "blocks", blockFilename(lane2, 5)) - require.NoError(t, os.Rename(oldPath, newPath)) + // Lane WAL is now empty; new writes starting from 10 should work. + require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key, 10))) + require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key, 11))) + require.NoError(t, bp.close()) - // Reload — should skip the mismatched file + // Reopen — should see only the post-TruncateAll blocks. _, blocks, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) - require.Equal(t, 0, len(blocks), "mismatched header should be skipped") + require.Equal(t, 2, len(blocks[lane])) + require.Equal(t, types.BlockNumber(10), blocks[lane][0].Number) + require.Equal(t, types.BlockNumber(11), blocks[lane][1].Number) } -func TestLoadSkipsUnrecognizedFilename(t *testing.T) { +func TestDeleteBeforeRemovesStaleLanes(t *testing.T) { + rng := utils.TestRng() dir := t.TempDir() + key1 := types.GenSecretKey(rng) + key2 := types.GenSecretKey(rng) + lane1 := key1.Public() + lane2 := key2.Public() bp, _, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) - _ = bp - // Write files with bad names - blocksDir := filepath.Join(dir, "blocks") - require.NoError(t, os.WriteFile(filepath.Join(blocksDir, "notablock.pb"), []byte("data"), 0600)) - require.NoError(t, os.WriteFile(filepath.Join(blocksDir, "readme.txt"), []byte("hi"), 0600)) + for i := types.BlockNumber(0); i < 3; i++ { + require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key1, i))) + require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key2, i))) + } + + // Disable retention so stale lanes are removed immediately. + bp.staleRetention = 0 + + // Only lane1 in laneFirsts — lane2 is stale and past retention, should be removed. + require.NoError(t, bp.DeleteBefore(map[types.LaneID]types.BlockNumber{lane1: 0})) + require.NoError(t, bp.close()) - // Reload — should skip both _, blocks, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) - require.Equal(t, 0, len(blocks)) + require.Equal(t, 3, len(blocks[lane1]), "lane1: all blocks intact") + require.Equal(t, 0, len(blocks[lane2]), "lane2: removed as stale") + + entries, _ := os.ReadDir(filepath.Join(dir, blocksDir)) + require.Equal(t, 1, len(entries), "only lane1 directory remains") } -func TestDeleteBeforeRemovesOldKeepsNew(t *testing.T) { +func TestDeleteBeforeRetainsRecentStaleLanes(t *testing.T) { rng := utils.TestRng() dir := t.TempDir() - key := types.GenSecretKey(rng) - lane := key.Public() + key1 := types.GenSecretKey(rng) + key2 := types.GenSecretKey(rng) + lane1 := key1.Public() + lane2 := key2.Public() bp, _, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) - // Persist blocks 0..4 - for i := types.BlockNumber(0); i < 5; i++ { - require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key, i))) + for i := types.BlockNumber(0); i < 3; i++ { + require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key1, i))) + require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key2, i))) } - // Delete blocks before 3 - require.NoError(t, bp.DeleteBefore(map[types.LaneID]types.BlockNumber{lane: 3})) + // lane2 was just written to (lastWriteTime is recent). + // DeleteBefore should NOT delete it even though it's not in laneFirsts. + require.NoError(t, bp.DeleteBefore(map[types.LaneID]types.BlockNumber{lane1: 0})) + require.NoError(t, bp.close()) _, blocks, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) - require.Equal(t, 2, len(blocks[lane]), "should have blocks 3 and 4") - require.Equal(t, types.BlockNumber(3), blocks[lane][0].Number) - require.Equal(t, types.BlockNumber(4), blocks[lane][1].Number) + require.Equal(t, 3, len(blocks[lane1]), "lane1: all blocks intact") + require.Equal(t, 3, len(blocks[lane2]), "lane2: retained because write was recent") + + entries, _ := os.ReadDir(filepath.Join(dir, blocksDir)) + require.Equal(t, 2, len(entries), "both lane directories remain") } -func TestDeleteBeforeMultipleLanes(t *testing.T) { +func TestDeleteBeforeRetainsEmptyStaleLaneAtStartup(t *testing.T) { rng := utils.TestRng() dir := t.TempDir() @@ -226,86 +285,141 @@ func TestDeleteBeforeMultipleLanes(t *testing.T) { key2 := types.GenSecretKey(rng) lane1 := key1.Public() lane2 := key2.Public() + + // Create lane directories. lane2 is empty (e.g. just joined then left committee). + bd := filepath.Join(dir, blocksDir) + require.NoError(t, os.MkdirAll(filepath.Join(bd, laneDir(lane1)), 0700)) + require.NoError(t, os.MkdirAll(filepath.Join(bd, laneDir(lane2)), 0700)) + + // Open — both lanes discovered. lane2 has never been written to. bp, _, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) - // Lane1: blocks 0,1,2; Lane2: blocks 0,1,2 - for i := types.BlockNumber(0); i < 3; i++ { - require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key1, i))) - require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key2, i))) - } - - // Delete lane1 < 2, lane2 < 1 - require.NoError(t, bp.DeleteBefore(map[types.LaneID]types.BlockNumber{lane1: 2, lane2: 1})) + // lane2 is not in laneFirsts but was just opened — retention should keep it. + require.NoError(t, bp.DeleteBefore(map[types.LaneID]types.BlockNumber{lane1: 0})) - _, blocks, err := NewBlockPersister(utils.Some(dir)) - require.NoError(t, err) - require.Equal(t, 1, len(blocks[lane1]), "lane1 should have block 2") - require.Equal(t, types.BlockNumber(2), blocks[lane1][0].Number) - require.Equal(t, 2, len(blocks[lane2]), "lane2 should have blocks 1,2") - require.Equal(t, types.BlockNumber(1), blocks[lane2][0].Number) - require.Equal(t, types.BlockNumber(2), blocks[lane2][1].Number) + entries, _ := os.ReadDir(bd) + require.Equal(t, 2, len(entries), "empty stale lane retained at startup") + require.NoError(t, bp.close()) } -func TestDeleteBeforeEmptyMap(t *testing.T) { +func TestEmptyLaneWALSurvivesReopen(t *testing.T) { rng := utils.TestRng() dir := t.TempDir() key := types.GenSecretKey(rng) lane := key.Public() - bp, _, err := NewBlockPersister(utils.Some(dir)) + + // Simulate a crash after lazy lane directory creation but before any write: + // create the lane subdirectory so NewBlockPersister discovers it on open. + bd := filepath.Join(dir, blocksDir) + require.NoError(t, os.MkdirAll(filepath.Join(bd, laneDir(lane)), 0700)) + + // Reopen — empty lane WAL should be loaded and usable. + bp, blocks, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) + require.Equal(t, 0, len(blocks[lane]), "no blocks loaded") + // Persist a new block into the lane without needing lazy creation. require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key, 0))) + require.NoError(t, bp.close()) - // Empty map — should not delete anything - require.NoError(t, bp.DeleteBefore(map[types.LaneID]types.BlockNumber{})) + // Reopen — should see the new block. + _, blocks2, err := NewBlockPersister(utils.Some(dir)) + require.NoError(t, err) + require.Equal(t, 1, len(blocks2[lane])) + require.Equal(t, types.BlockNumber(0), blocks2[lane][0].Number) +} - _, blocks, err := NewBlockPersister(utils.Some(dir)) +func TestNewBlockPersisterSkipsNonHexDir(t *testing.T) { + dir := t.TempDir() + bd := filepath.Join(dir, blocksDir) + require.NoError(t, os.MkdirAll(bd, 0700)) + + // Create a non-hex directory and a regular file — both should be skipped. + require.NoError(t, os.Mkdir(filepath.Join(bd, "not-valid-hex"), 0700)) + require.NoError(t, os.WriteFile(filepath.Join(bd, "stray-file.txt"), []byte("hi"), 0600)) + + bp, blocks, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) - require.Equal(t, 1, len(blocks[lane])) + require.Equal(t, 0, len(blocks)) + require.NoError(t, bp.close()) } -func TestDeleteBeforeRemovesOrphanedLanes(t *testing.T) { - rng := utils.TestRng() +func TestNewBlockPersisterSkipsInvalidKeyDir(t *testing.T) { dir := t.TempDir() + bd := filepath.Join(dir, blocksDir) + require.NoError(t, os.MkdirAll(bd, 0700)) - key1 := types.GenSecretKey(rng) - lane1 := key1.Public() - key2 := types.GenSecretKey(rng) - lane2 := key2.Public() + // Valid hex but too short to be a valid ed25519 public key. + require.NoError(t, os.Mkdir(filepath.Join(bd, "abcd"), 0700)) + + bp, blocks, err := NewBlockPersister(utils.Some(dir)) + require.NoError(t, err) + require.Equal(t, 0, len(blocks)) + require.NoError(t, bp.close()) +} +func TestPersistBlockOutOfSequence(t *testing.T) { + rng := utils.TestRng() + dir := t.TempDir() + + key := types.GenSecretKey(rng) bp, _, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) - // Persist blocks on both lanes. - for n := types.BlockNumber(0); n < 3; n++ { - require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key1, n))) - require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key2, n))) - } + require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key, 0))) - // Only lane1 is in the current committee; lane2 is orphaned. - require.NoError(t, bp.DeleteBefore(map[types.LaneID]types.BlockNumber{lane1: 1})) + // Gap: skip block 1, try block 2. + err = bp.PersistBlock(testSignedProposal(rng, key, 2)) + require.Error(t, err) + require.Contains(t, err.Error(), "out of sequence") - _, blocks, err := NewBlockPersister(utils.Some(dir)) - require.NoError(t, err) + // Duplicate: try block 0 again. + err = bp.PersistBlock(testSignedProposal(rng, key, 0)) + require.Error(t, err) + require.Contains(t, err.Error(), "out of sequence") - // lane1: block 0 deleted, blocks 1-2 kept. - require.Equal(t, 2, len(blocks[lane1])) - require.Equal(t, types.BlockNumber(1), blocks[lane1][0].Number) + require.NoError(t, bp.close()) +} - // lane2: all blocks deleted (orphaned lane). - require.Equal(t, 0, len(blocks[lane2])) +func TestLoadAllDetectsBlockGap(t *testing.T) { + rng := utils.TestRng() + dir := t.TempDir() + key := types.GenSecretKey(rng) + lane := key.Public() + + // Write directly to a lane WAL, bypassing PersistBlock's contiguity check + // to simulate on-disk corruption (block 0 then block 2, skipping 1). + ld := filepath.Join(dir, blocksDir, laneDir(lane)) + require.NoError(t, os.MkdirAll(ld, 0700)) + lw, err := newLaneWAL(ld) + require.NoError(t, err) + require.NoError(t, lw.Write(testSignedProposal(rng, key, 0))) + require.NoError(t, lw.Write(testSignedProposal(rng, key, 2))) + require.NoError(t, lw.Close()) + + _, _, err = NewBlockPersister(utils.Some(dir)) + require.Error(t, err) + require.Contains(t, err.Error(), "gap") } -func TestBlockFilenameRoundTrip(t *testing.T) { +func TestLazyLaneCreation(t *testing.T) { rng := utils.TestRng() - lane := types.GenSecretKey(rng).Public() - n := types.BlockNumber(42) + dir := t.TempDir() - name := blockFilename(lane, n) - parsedLane, parsedN, err := parseBlockFilename(name) + bp, _, err := NewBlockPersister(utils.Some(dir)) require.NoError(t, err) - require.Equal(t, hex.EncodeToString(lane.Bytes()), hex.EncodeToString(parsedLane.Bytes())) - require.Equal(t, n, parsedN) + + // No lanes exist yet. + entries, _ := os.ReadDir(filepath.Join(dir, blocksDir)) + require.Equal(t, 0, len(entries)) + + // First persist for a lane creates its directory and WAL. + key := types.GenSecretKey(rng) + require.NoError(t, bp.PersistBlock(testSignedProposal(rng, key, 0))) + + entries, _ = os.ReadDir(filepath.Join(dir, blocksDir)) + require.Equal(t, 1, len(entries), "should have 1 lane directory") + require.NoError(t, bp.close()) } diff --git a/sei-tendermint/internal/autobahn/consensus/persist/commitqcs.go b/sei-tendermint/internal/autobahn/consensus/persist/commitqcs.go index 4ade5e712c..bf4fdd094c 100644 --- a/sei-tendermint/internal/autobahn/consensus/persist/commitqcs.go +++ b/sei-tendermint/internal/autobahn/consensus/persist/commitqcs.go @@ -1,67 +1,51 @@ -// TODO: CommitQC file persistence is a temporary solution that will be replaced -// by the same WAL (Write-Ahead Log) library as block persistence (see blocks.go). -// With a WAL, atomic appends eliminate corrupt file handling, per-file -// naming/parsing, directory scanning, and DeleteBefore cleanup -// (WAL replay is always contiguous). - +// TODO: add Prometheus metrics for commitQCs written and truncated. package persist import ( "fmt" - "maps" - "os" "path/filepath" - "slices" - "strconv" - "strings" - - "log/slog" "github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types" "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils" ) +const commitqcsDir = "commitqcs" + // LoadedCommitQC is a CommitQC loaded from disk during state restoration. type LoadedCommitQC struct { Index types.RoadIndex QC *types.CommitQC } -// CommitQCPersister manages individual CommitQC files in a commitqcs/ subdirectory. -// Each CommitQC is stored as .pb. -// The caller is responsible for driving persistence (typically a goroutine that -// watches in-memory state and calls PersistCommitQC / DeleteBefore). -// When noop is true, all disk I/O is skipped but cursor tracking still works. +// CommitQCPersister manages CommitQC persistence using a WAL. +// Entries are appended in order; each entry is self-describing (the serialized +// CommitQC contains its RoadIndex). The WAL index is append order, not +// RoadIndex — the indexedWAL tracks first/next indices to enable truncation. +// When iw is None, all disk I/O is skipped but cursor tracking still works. type CommitQCPersister struct { - dir string // full path to the commitqcs/ subdirectory; empty when noop - noop bool + iw utils.Option[*indexedWAL[*types.CommitQC]] next types.RoadIndex } -// newNoOpCommitQCPersister returns a CommitQCPersister that skips all disk I/O -// but still tracks the next index. Used when persistence is disabled. -func newNoOpCommitQCPersister() *CommitQCPersister { - return &CommitQCPersister{noop: true} -} - -// NewCommitQCPersister creates the commitqcs/ subdirectory if it doesn't exist -// and returns a persister. Loads all persisted CommitQCs from disk as a sorted -// slice. Corrupt files are skipped; the caller (newInner) returns an error if -// the resulting slice is non-contiguous. -// When stateDir is None, returns a no-op persister that skips all disk I/O. +// NewCommitQCPersister opens (or creates) a WAL in the commitqcs/ subdirectory +// and replays all persisted entries. Returns the persister and a sorted slice of +// loaded CommitQCs. Corrupt tail entries are auto-truncated by the WAL library. +// When stateDir is None, returns a no-op persister. func NewCommitQCPersister(stateDir utils.Option[string]) (*CommitQCPersister, []LoadedCommitQC, error) { sd, ok := stateDir.Get() if !ok { - return newNoOpCommitQCPersister(), nil, nil + return &CommitQCPersister{}, nil, nil } - dir := filepath.Join(sd, "commitqcs") - if err := os.MkdirAll(dir, 0700); err != nil { - return nil, nil, fmt.Errorf("create commitqcs dir %s: %w", dir, err) + dir := filepath.Join(sd, commitqcsDir) + iw, err := openIndexedWAL(dir, types.CommitQCConv) + if err != nil { + return nil, nil, fmt.Errorf("open commitqc WAL in %s: %w", dir, err) } - cp := &CommitQCPersister{dir: dir} + cp := &CommitQCPersister{iw: utils.Some(iw)} loaded, err := cp.loadAll() if err != nil { + _ = iw.Close() return nil, nil, err } if len(loaded) > 0 { @@ -76,127 +60,91 @@ func (cp *CommitQCPersister) LoadNext() types.RoadIndex { return cp.next } -// ResetNext overrides the next-to-persist cursor. Called after newInner -// applies prune(), which may advance commitQCs.next beyond the raw loader's -// cursor. Without this, PersistCommitQC would reject valid new QCs as -// "already persisted". -func (cp *CommitQCPersister) ResetNext(idx types.RoadIndex) { - cp.next = idx -} - -func commitQCFilename(idx types.RoadIndex) string { - return strconv.FormatUint(uint64(idx), 10) + ".pb" -} - -func parseCommitQCFilename(name string) (types.RoadIndex, error) { - name = strings.TrimSuffix(name, ".pb") - n, err := strconv.ParseUint(name, 10, 64) - if err != nil { - return 0, fmt.Errorf("bad commitqc filename %q: %w", name, err) - } - return types.RoadIndex(n), nil -} - -// PersistCommitQC writes a CommitQC to its own file. -// The caller must persist CommitQCs in order; idx < cp.next is a bug. +// PersistCommitQC writes a CommitQC to the WAL. +// Entries must be persisted in sequential order. Duplicates (idx < next) are +// silently ignored — this makes startup idempotent after DeleteBefore truncates +// the WAL. Gaps (idx > next) return an error because they break the linear +// RoadIndex-to-WAL-index mapping that DeleteBefore relies on. func (cp *CommitQCPersister) PersistCommitQC(qc *types.CommitQC) error { idx := qc.Index() if idx < cp.next { - return fmt.Errorf("commitqc %d already persisted (next=%d)", idx, cp.next) + return nil + } + if idx > cp.next { + return fmt.Errorf("commitqc %d out of sequence (next=%d)", idx, cp.next) } - if !cp.noop { - data := types.CommitQCConv.Marshal(qc) - path := filepath.Join(cp.dir, commitQCFilename(idx)) - if err := writeAndSync(path, data); err != nil { + if iw, ok := cp.iw.Get(); ok { + if err := iw.Write(qc); err != nil { return fmt.Errorf("persist commitqc %d: %w", idx, err) } - } + } // else: no-op persister (persistence disabled); cursor still advances. cp.next = idx + 1 return nil } -// DeleteBefore removes persisted CommitQC files with road index below idx. -// Returns an error if the directory cannot be read; individual file removal -// failures are logged but do not cause an error. +// DeleteBefore removes persisted CommitQCs with road index below idx +// by truncating the front of the WAL. When idx is at or past every +// persisted entry, the WAL is truncated and the write cursor is advanced +// to idx so subsequent PersistCommitQC calls start from the right place. +// The mapping from RoadIndex to WAL index is linear: entries are written +// sequentially, so WAL index = firstIdx + (roadIndex - firstRoadIndex). func (cp *CommitQCPersister) DeleteBefore(idx types.RoadIndex) error { - if cp.noop || idx == 0 { + iw, ok := cp.iw.Get() + if !ok || idx == 0 { + return nil // no-op persister (persistence disabled), or nothing to prune + } + if idx >= cp.next { + // Anchor advanced past all persisted entries; advance the cursor + // so the next PersistCommitQC starts at idx. + cp.next = idx + if iw.Count() == 0 { + return nil // already empty (e.g. crash after a previous TruncateAll) + } + return iw.TruncateAll() + } + if iw.Count() == 0 { return nil } - entries, err := os.ReadDir(cp.dir) - if err != nil { - return fmt.Errorf("list commitqcs dir for cleanup: %w", err) + firstRoadIndex := cp.next - types.RoadIndex(iw.Count()) + if idx <= firstRoadIndex { + return nil } - for _, entry := range entries { - if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".pb") { - continue - } - fileIdx, err := parseCommitQCFilename(entry.Name()) - if err != nil { - continue - } - if fileIdx >= idx { - continue - } - path := filepath.Join(cp.dir, entry.Name()) - if err := os.Remove(path); err != nil && !os.IsNotExist(err) { - logger.Warn("failed to delete commitqc file", "path", path, "err", err) + walIdx := iw.firstIdx + uint64(idx-firstRoadIndex) + if err := iw.TruncateBefore(walIdx, func(entry *types.CommitQC) error { + if entry.Index() != idx { + return fmt.Errorf("commitqc at WAL index %d has road index %d, expected %d (index mapping broken)", walIdx, entry.Index(), idx) } + return nil + }); err != nil { + return fmt.Errorf("truncate commitqc WAL before %d: %w", walIdx, err) } return nil } -// loadAll loads all persisted CommitQCs from the commitqcs/ directory. -// Returns a sorted slice of all valid files. Corrupt or mismatched files -// are skipped; the caller (newInner) returns an error on gaps. -func (cp *CommitQCPersister) loadAll() ([]LoadedCommitQC, error) { - entries, err := os.ReadDir(cp.dir) - if err != nil { - return nil, fmt.Errorf("read commitqcs dir %s: %w", cp.dir, err) +// close shuts down the WAL. Internal: only used by tests and NewCommitQCPersister +// (error cleanup). Production code does not close WALs at shutdown. +func (cp *CommitQCPersister) close() error { + if iw, ok := cp.iw.Get(); ok { + return iw.Close() } - - raw := map[types.RoadIndex]*types.CommitQC{} - for _, entry := range entries { - if entry.IsDir() || !strings.HasSuffix(entry.Name(), ".pb") { - continue - } - idx, err := parseCommitQCFilename(entry.Name()) - if err != nil { - logger.Warn("skipping unrecognized commitqc file", "file", entry.Name(), "err", err) - continue - } - qc, err := loadCommitQCFile(filepath.Join(cp.dir, entry.Name())) - if err != nil { - logger.Warn("skipping corrupt commitqc file", "file", entry.Name(), "err", err) - continue - } - if qc.Index() != idx { - logger.Warn("skipping commitqc file with mismatched index", - "file", entry.Name(), - slog.Uint64("headerIdx", uint64(qc.Index())), - slog.Uint64("filenameIdx", uint64(idx)), - ) - continue - } - raw[idx] = qc - logger.Info("loaded persisted commitqc", slog.Uint64("roadIndex", uint64(idx))) - } - - if len(raw) == 0 { - return nil, nil - } - - sorted := slices.Sorted(maps.Keys(raw)) - result := make([]LoadedCommitQC, 0, len(sorted)) - for _, idx := range sorted { - result = append(result, LoadedCommitQC{Index: idx, QC: raw[idx]}) - } - return result, nil + return nil // no-op persister (persistence disabled) } -func loadCommitQCFile(path string) (*types.CommitQC, error) { - data, err := os.ReadFile(path) //nolint:gosec // path is constructed from operator-configured stateDir + hardcoded filename; not user-controlled +func (cp *CommitQCPersister) loadAll() ([]LoadedCommitQC, error) { + iw, ok := cp.iw.Get() + if !ok { + return nil, nil // no-op persister (persistence disabled) + } + entries, err := iw.ReadAll() if err != nil { return nil, err } - return types.CommitQCConv.Unmarshal(data) + loaded := make([]LoadedCommitQC, 0, len(entries)) + for i, qc := range entries { + if i > 0 && qc.Index() != loaded[i-1].Index+1 { + return nil, fmt.Errorf("gap in commitQCs: index %d follows %d", qc.Index(), loaded[i-1].Index) + } + loaded = append(loaded, LoadedCommitQC{Index: qc.Index(), QC: qc}) + } + return loaded, nil } diff --git a/sei-tendermint/internal/autobahn/consensus/persist/commitqcs_test.go b/sei-tendermint/internal/autobahn/consensus/persist/commitqcs_test.go index dccaa341fc..e874a78bfb 100644 --- a/sei-tendermint/internal/autobahn/consensus/persist/commitqcs_test.go +++ b/sei-tendermint/internal/autobahn/consensus/persist/commitqcs_test.go @@ -68,9 +68,10 @@ func TestNewCommitQCPersisterEmptyDir(t *testing.T) { require.Equal(t, 0, len(loaded)) require.Equal(t, types.RoadIndex(0), cp.LoadNext()) - fi, err := os.Stat(filepath.Join(dir, "commitqcs")) + fi, err := os.Stat(filepath.Join(dir, commitqcsDir)) require.NoError(t, err) require.True(t, fi.IsDir()) + require.NoError(t, cp.close()) } func TestPersistCommitQCAndLoad(t *testing.T) { @@ -87,6 +88,7 @@ func TestPersistCommitQCAndLoad(t *testing.T) { require.NoError(t, cp.PersistCommitQC(qc)) } require.Equal(t, types.RoadIndex(3), cp.LoadNext()) + require.NoError(t, cp.close()) cp2, loaded, err := NewCommitQCPersister(utils.Some(dir)) require.NoError(t, err) @@ -97,52 +99,52 @@ func TestPersistCommitQCAndLoad(t *testing.T) { require.NoError(t, utils.TestDiff(qcs[i], lqc.QC)) } require.Equal(t, types.RoadIndex(3), cp2.LoadNext()) + require.NoError(t, cp2.close()) } -func TestLoadSkipsCorruptCommitQCFile(t *testing.T) { +func TestCommitQCDeleteBeforeRemovesOldKeepsNew(t *testing.T) { rng := utils.TestRng() committee, keys := types.GenCommittee(rng, 4) dir := t.TempDir() - qcs := makeSequentialCommitQCs(rng, committee, keys, 1) + qcs := makeSequentialCommitQCs(rng, committee, keys, 5) cp, _, err := NewCommitQCPersister(utils.Some(dir)) require.NoError(t, err) - require.NoError(t, cp.PersistCommitQC(qcs[0])) + for _, qc := range qcs { + require.NoError(t, cp.PersistCommitQC(qc)) + } - // Write a corrupt file for index 1 - corruptPath := filepath.Join(dir, "commitqcs", commitQCFilename(1)) - require.NoError(t, os.WriteFile(corruptPath, []byte("corrupt"), 0600)) + require.NoError(t, cp.DeleteBefore(3)) + require.NoError(t, cp.close()) _, loaded, err := NewCommitQCPersister(utils.Some(dir)) require.NoError(t, err) - require.Equal(t, 1, len(loaded), "should only load valid commitqc") - require.NoError(t, utils.TestDiff(qcs[0], loaded[0].QC)) + require.Equal(t, 2, len(loaded), "should have indices 3 and 4") + require.Equal(t, types.RoadIndex(3), loaded[0].Index) + require.Equal(t, types.RoadIndex(4), loaded[1].Index) } -func TestLoadCommitQCReturnsAllWithGap(t *testing.T) { +func TestCommitQCDeleteBeforeZero(t *testing.T) { rng := utils.TestRng() committee, keys := types.GenCommittee(rng, 4) dir := t.TempDir() - qcs := makeSequentialCommitQCs(rng, committee, keys, 4) + qcs := makeSequentialCommitQCs(rng, committee, keys, 2) cp, _, err := NewCommitQCPersister(utils.Some(dir)) require.NoError(t, err) + for _, qc := range qcs { + require.NoError(t, cp.PersistCommitQC(qc)) + } - // Persist 0, 1, skip 2, persist 3 → gap at 2, all three returned sorted. - require.NoError(t, cp.PersistCommitQC(qcs[0])) - require.NoError(t, cp.PersistCommitQC(qcs[1])) - require.NoError(t, cp.PersistCommitQC(qcs[3])) + require.NoError(t, cp.DeleteBefore(0)) + require.NoError(t, cp.close()) - cp2, loaded, err := NewCommitQCPersister(utils.Some(dir)) + _, loaded, err := NewCommitQCPersister(utils.Some(dir)) require.NoError(t, err) - require.Equal(t, 3, len(loaded), "should return all valid files including after gap") - require.Equal(t, types.RoadIndex(0), loaded[0].Index) - require.Equal(t, types.RoadIndex(1), loaded[1].Index) - require.Equal(t, types.RoadIndex(3), loaded[2].Index) - require.Equal(t, types.RoadIndex(4), cp2.LoadNext(), "next should be max index + 1") + require.Equal(t, 2, len(loaded)) } -func TestLoadCommitQCCorruptMidSequenceCreatesGap(t *testing.T) { +func TestCommitQCPersistDuplicateIsNoOp(t *testing.T) { rng := utils.TestRng() committee, keys := types.GenCommittee(rng, 4) dir := t.TempDir() @@ -151,99 +153,163 @@ func TestLoadCommitQCCorruptMidSequenceCreatesGap(t *testing.T) { cp, _, err := NewCommitQCPersister(utils.Some(dir)) require.NoError(t, err) - // Persist 0, corrupt 1, persist 2 → corrupt skipped, returns [0, 2] with gap. require.NoError(t, cp.PersistCommitQC(qcs[0])) - require.NoError(t, cp.PersistCommitQC(qcs[2])) - corruptPath := filepath.Join(dir, "commitqcs", commitQCFilename(1)) - require.NoError(t, os.WriteFile(corruptPath, []byte("corrupt"), 0600)) - - _, loaded, err := NewCommitQCPersister(utils.Some(dir)) - require.NoError(t, err) - require.Equal(t, 2, len(loaded), "corrupt skipped; both valid files returned") - require.Equal(t, types.RoadIndex(0), loaded[0].Index) - require.Equal(t, types.RoadIndex(2), loaded[1].Index) + require.NoError(t, cp.PersistCommitQC(qcs[1])) + // Persisting qcs[0] again is a no-op (idx < next). + require.NoError(t, cp.PersistCommitQC(qcs[0])) + require.Equal(t, types.RoadIndex(2), cp.LoadNext()) + require.NoError(t, cp.close()) } -func TestLoadCommitQCSkipsMismatchedIndex(t *testing.T) { +func TestCommitQCPersistGapRejected(t *testing.T) { rng := utils.TestRng() committee, keys := types.GenCommittee(rng, 4) dir := t.TempDir() - qcs := makeSequentialCommitQCs(rng, committee, keys, 2) + qcs := makeSequentialCommitQCs(rng, committee, keys, 5) cp, _, err := NewCommitQCPersister(utils.Some(dir)) require.NoError(t, err) - // Persist qc[0] (index 0) but save it under filename for index 5 require.NoError(t, cp.PersistCommitQC(qcs[0])) - oldPath := filepath.Join(dir, "commitqcs", commitQCFilename(0)) - newPath := filepath.Join(dir, "commitqcs", commitQCFilename(5)) - require.NoError(t, os.Rename(oldPath, newPath)) - - _, loaded, err := NewCommitQCPersister(utils.Some(dir)) - require.NoError(t, err) - require.Equal(t, 0, len(loaded), "mismatched index should be skipped") + require.NoError(t, cp.PersistCommitQC(qcs[1])) + // Skip qcs[2], try to persist qcs[3] — should fail because idx(3) != next(2). + err = cp.PersistCommitQC(qcs[3]) + require.Error(t, err) + require.Contains(t, err.Error(), "out of sequence") + require.NoError(t, cp.close()) } -func TestLoadCommitQCSkipsUnrecognizedFilename(t *testing.T) { +func TestLoadAllDetectsCommitQCGap(t *testing.T) { + rng := utils.TestRng() + committee, keys := types.GenCommittee(rng, 4) dir := t.TempDir() - cp, _, err := NewCommitQCPersister(utils.Some(dir)) + + // Build 3 sequential CommitQCs (indices 0, 1, 2). + qcs := makeSequentialCommitQCs(rng, committee, keys, 3) + + // Write directly to the WAL, bypassing PersistCommitQC's contiguity + // check to simulate on-disk corruption (index 0 then index 2, skipping 1). + walDir := filepath.Join(dir, commitqcsDir) + require.NoError(t, os.MkdirAll(walDir, 0700)) + iw, err := openIndexedWAL(walDir, types.CommitQCConv) require.NoError(t, err) - _ = cp + require.NoError(t, iw.Write(qcs[0])) + require.NoError(t, iw.Write(qcs[2])) + require.NoError(t, iw.Close()) - qcDir := filepath.Join(dir, "commitqcs") - require.NoError(t, os.WriteFile(filepath.Join(qcDir, "notaqc.pb"), []byte("data"), 0600)) - require.NoError(t, os.WriteFile(filepath.Join(qcDir, "readme.txt"), []byte("hi"), 0600)) + _, _, err = NewCommitQCPersister(utils.Some(dir)) + require.Error(t, err) + require.Contains(t, err.Error(), "gap") +} - _, loaded, err := NewCommitQCPersister(utils.Some(dir)) +func TestNoOpCommitQCPersister(t *testing.T) { + cp, loaded, err := NewCommitQCPersister(utils.None[string]()) require.NoError(t, err) + require.NotNil(t, cp) require.Equal(t, 0, len(loaded)) + + rng := utils.TestRng() + committee, keys := types.GenCommittee(rng, 4) + qcs := makeSequentialCommitQCs(rng, committee, keys, 1) + require.NoError(t, cp.PersistCommitQC(qcs[0])) + require.Equal(t, types.RoadIndex(1), cp.LoadNext()) + require.NoError(t, cp.DeleteBefore(0)) + require.NoError(t, cp.close()) } -func TestCommitQCDeleteBeforeRemovesOldKeepsNew(t *testing.T) { +func TestCommitQCDeleteBeforePastAll(t *testing.T) { rng := utils.TestRng() committee, keys := types.GenCommittee(rng, 4) dir := t.TempDir() - qcs := makeSequentialCommitQCs(rng, committee, keys, 5) + qcs := makeSequentialCommitQCs(rng, committee, keys, 3) cp, _, err := NewCommitQCPersister(utils.Some(dir)) require.NoError(t, err) for _, qc := range qcs { require.NoError(t, cp.PersistCommitQC(qc)) } - - require.NoError(t, cp.DeleteBefore(3)) - + // cp.next is 3; prune past everything. DeleteBefore advances the cursor + // to 10 and truncates the WAL. + require.NoError(t, cp.DeleteBefore(10)) + require.Equal(t, types.RoadIndex(10), cp.LoadNext()) + + // New writes starting from 10 should work. + moreQCs := makeSequentialCommitQCs(rng, committee, keys, 12) + require.NoError(t, cp.PersistCommitQC(moreQCs[10])) + require.NoError(t, cp.PersistCommitQC(moreQCs[11])) + require.NoError(t, cp.close()) + + // Reopen — should see only the post-TruncateAll entries. _, loaded, err := NewCommitQCPersister(utils.Some(dir)) require.NoError(t, err) - require.Equal(t, 2, len(loaded), "should have indices 3 and 4") - require.Equal(t, types.RoadIndex(3), loaded[0].Index) - require.Equal(t, types.RoadIndex(4), loaded[1].Index) + require.Equal(t, 2, len(loaded)) + require.Equal(t, types.RoadIndex(10), loaded[0].Index) + require.Equal(t, types.RoadIndex(11), loaded[1].Index) } -func TestCommitQCDeleteBeforeZero(t *testing.T) { +// TestCommitQCDeleteBeforePastAllCrashRecovery simulates a crash between WAL +// TruncateAll and new write: on restart the WAL is empty but the anchor is far ahead. +// DeleteBefore must still advance the cursor so PersistCommitQC succeeds. +func TestCommitQCDeleteBeforePastAllCrashRecovery(t *testing.T) { rng := utils.TestRng() committee, keys := types.GenCommittee(rng, 4) dir := t.TempDir() - qcs := makeSequentialCommitQCs(rng, committee, keys, 2) + qcs := makeSequentialCommitQCs(rng, committee, keys, 3) cp, _, err := NewCommitQCPersister(utils.Some(dir)) require.NoError(t, err) for _, qc := range qcs { require.NoError(t, cp.PersistCommitQC(qc)) } - // idx=0 should be a no-op - require.NoError(t, cp.DeleteBefore(0)) + // DeleteBefore truncates the WAL (past all), then "crash" before writing. + require.NoError(t, cp.DeleteBefore(10)) + require.NoError(t, cp.close()) // simulate crash — no new QCs written - _, loaded, err := NewCommitQCPersister(utils.Some(dir)) + // Restart: WAL is empty, cp.next will be 0. + cp2, loaded, err := NewCommitQCPersister(utils.Some(dir)) + require.NoError(t, err) + require.Empty(t, loaded) + require.Equal(t, types.RoadIndex(0), cp2.LoadNext()) + + // Second DeleteBefore on the empty WAL must advance the cursor. + require.NoError(t, cp2.DeleteBefore(10)) + require.Equal(t, types.RoadIndex(10), cp2.LoadNext()) + + // Writing from index 10 should now succeed. + moreQCs := makeSequentialCommitQCs(rng, committee, keys, 12) + require.NoError(t, cp2.PersistCommitQC(moreQCs[10])) + require.NoError(t, cp2.PersistCommitQC(moreQCs[11])) + require.NoError(t, cp2.close()) + + _, loaded, err = NewCommitQCPersister(utils.Some(dir)) require.NoError(t, err) require.Equal(t, 2, len(loaded)) + require.Equal(t, types.RoadIndex(10), loaded[0].Index) + require.Equal(t, types.RoadIndex(11), loaded[1].Index) } -func TestCommitQCFilenameRoundTrip(t *testing.T) { - idx := types.RoadIndex(42) - name := commitQCFilename(idx) - parsed, err := parseCommitQCFilename(name) +func TestCommitQCDeleteBeforeThenPersistMore(t *testing.T) { + rng := utils.TestRng() + committee, keys := types.GenCommittee(rng, 4) + dir := t.TempDir() + + qcs := makeSequentialCommitQCs(rng, committee, keys, 6) + cp, _, err := NewCommitQCPersister(utils.Some(dir)) + require.NoError(t, err) + + // Persist 0..4, delete before 3, then persist 5. + for i := 0; i < 5; i++ { + require.NoError(t, cp.PersistCommitQC(qcs[i])) + } + require.NoError(t, cp.DeleteBefore(3)) + require.NoError(t, cp.PersistCommitQC(qcs[5])) + require.NoError(t, cp.close()) + + _, loaded, err := NewCommitQCPersister(utils.Some(dir)) require.NoError(t, err) - require.Equal(t, idx, parsed) + require.Equal(t, 3, len(loaded), "should have indices 3, 4, 5") + require.Equal(t, types.RoadIndex(3), loaded[0].Index) + require.Equal(t, types.RoadIndex(4), loaded[1].Index) + require.Equal(t, types.RoadIndex(5), loaded[2].Index) } diff --git a/sei-tendermint/internal/autobahn/consensus/persist/wal.go b/sei-tendermint/internal/autobahn/consensus/persist/wal.go new file mode 100644 index 0000000000..d2fd08adf7 --- /dev/null +++ b/sei-tendermint/internal/autobahn/consensus/persist/wal.go @@ -0,0 +1,150 @@ +package persist + +import ( + "context" + "fmt" + "os" + "time" + + dbwal "github.com/sei-protocol/sei-chain/sei-db/wal" +) + +// codec is the marshal/unmarshal pair needed to store T in a WAL. +// protoutils.Conv[T, P] satisfies this interface automatically. +type codec[T any] interface { + Marshal(T) []byte + Unmarshal([]byte) (T, error) +} + +// indexedWAL wraps a WAL with monotonic index tracking and typed entries. +// Callers map domain-specific indices (BlockNumber, RoadIndex) to WAL +// indices via Count() and firstIdx. Not safe for concurrent use. +type indexedWAL[T any] struct { + wal *dbwal.WAL[T] + firstIdx uint64 // WAL index of the oldest entry; == nextIdx when empty + nextIdx uint64 // WAL index that the next Write will be assigned + lastWriteTime time.Time // time of last Write(); initialized to open time +} + +// openIndexedWAL creates (or opens) a WAL in dir with synchronous, unbatched +// writes. Fsync is disabled because the prune anchor (persisted via A/B files +// with fsync) is the crash-recovery watermark — on power loss we restart from +// the anchor and re-sync any lost WAL entries from peers. +// Initializes index tracking from the WAL's stored offsets so the caller can +// immediately Write, ReadAll, or TruncateBefore. +func openIndexedWAL[T any](dir string, codec codec[T]) (*indexedWAL[T], error) { + if err := os.MkdirAll(dir, 0700); err != nil { + return nil, fmt.Errorf("create dir %s: %w", dir, err) + } + w, err := dbwal.NewWAL( + context.Background(), + func(entry T) ([]byte, error) { return codec.Marshal(entry), nil }, + codec.Unmarshal, + dir, + dbwal.Config{ + WriteBufferSize: 0, // synchronous writes + WriteBatchSize: 1, // no batching + AllowEmpty: true, + }, + ) + if err != nil { + return nil, err + } + iw := &indexedWAL[T]{wal: w} + first, err := w.FirstOffset() + if err != nil { + _ = w.Close() + return nil, fmt.Errorf("first offset: %w", err) + } + last, err := w.LastOffset() + if err != nil { + _ = w.Close() + return nil, fmt.Errorf("last offset: %w", err) + } + // With AllowEmpty, an empty log reports first > last (e.g. first=1, last=0 + // for a brand-new log, or first=N+1, last=N after TruncateAll). A non-empty + // log always has first <= last with first >= 1. In both cases, setting + // firstIdx = first and nextIdx = last + 1 yields Count() == 0 when empty. + iw.firstIdx = first + iw.nextIdx = last + 1 + iw.lastWriteTime = time.Now() + return iw, nil +} + +// Write appends entry to the WAL, advancing nextIdx. +func (w *indexedWAL[T]) Write(entry T) error { + if err := w.wal.Write(entry); err != nil { + return err + } + w.nextIdx++ + w.lastWriteTime = time.Now() + return nil +} + +// TruncateBefore reads the entry at walIdx, passes it to verify, and — if +// verify returns nil — removes all entries before walIdx. The verify callback +// lets callers assert that the WAL index maps to the expected domain object +// before a destructive operation. +func (w *indexedWAL[T]) TruncateBefore(walIdx uint64, verify func(T) error) error { + entry, err := w.wal.ReadAt(walIdx) + if err != nil { + return fmt.Errorf("read at WAL index %d: %w", walIdx, err) + } + if err := verify(entry); err != nil { + return err + } + if err := w.wal.TruncateBefore(walIdx); err != nil { + return err + } + w.firstIdx = walIdx + return nil +} + +// ReadAll returns all entries in the WAL. Returns nil if empty. +func (w *indexedWAL[T]) ReadAll() ([]T, error) { + if w.Count() == 0 { + return nil, nil + } + entries := make([]T, 0, w.Count()) + err := w.wal.Replay(w.firstIdx, w.nextIdx-1, func(_ uint64, entry T) error { + entries = append(entries, entry) + return nil + }) + if err != nil { + return nil, err + } + if uint64(len(entries)) != w.Count() { + return nil, fmt.Errorf("WAL replay returned %d entries, expected %d (possible silent data loss)", len(entries), w.Count()) + } + return entries, nil +} + +// Count returns the number of entries in the WAL. +// Empty when firstIdx == nextIdx (both after fresh open and after TruncateAll). +func (w *indexedWAL[T]) Count() uint64 { + return w.nextIdx - w.firstIdx +} + +// TruncateAll removes all entries from the WAL, leaving it empty for new writes. +// The underlying index counter is preserved (next Write continues from where +// it left off); firstIdx is advanced to nextIdx so Count() == 0. +// Used when all entries are stale (e.g. the prune anchor advanced past +// everything persisted). +func (w *indexedWAL[T]) TruncateAll() error { + if err := w.wal.TruncateAll(); err != nil { + return fmt.Errorf("truncate all WAL entries: %w", err) + } + w.firstIdx = w.nextIdx + return nil +} + +// LastWriteTime returns the time of the last successful Write(), or the time +// the WAL was opened if no writes have occurred since open. +func (w *indexedWAL[T]) LastWriteTime() time.Time { + return w.lastWriteTime +} + +// Close shuts down the underlying WAL. +func (w *indexedWAL[T]) Close() error { + return w.wal.Close() +} diff --git a/sei-tendermint/internal/autobahn/consensus/persist/wal_test.go b/sei-tendermint/internal/autobahn/consensus/persist/wal_test.go new file mode 100644 index 0000000000..0e6d4c3c01 --- /dev/null +++ b/sei-tendermint/internal/autobahn/consensus/persist/wal_test.go @@ -0,0 +1,315 @@ +package persist + +import ( + "fmt" + "testing" + "time" + + "github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/require" +) + +func acceptAny(string) error { return nil } + +// stringCodec is a trivial codec for testing indexedWAL with strings. +type stringCodec struct{} + +func (stringCodec) Marshal(s string) []byte { return []byte(s) } +func (stringCodec) Unmarshal(b []byte) (string, error) { return string(b), nil } + +func TestIndexedWAL_EmptyStart(t *testing.T) { + dir := t.TempDir() + iw, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + + require.Equal(t, uint64(0), iw.Count()) + require.Equal(t, iw.firstIdx, iw.nextIdx) // empty: firstIdx == nextIdx + + entries, err := iw.ReadAll() + require.NoError(t, err) + require.Equal(t, 0, len(entries)) + + require.NoError(t, iw.Close()) +} + +func TestIndexedWAL_WriteAndReadAll(t *testing.T) { + dir := t.TempDir() + iw, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + + require.NoError(t, iw.Write("a")) + require.NoError(t, iw.Write("b")) + require.NoError(t, iw.Write("c")) + + require.Equal(t, uint64(3), iw.Count()) + require.Equal(t, uint64(1), iw.firstIdx) + require.Equal(t, uint64(4), iw.nextIdx) + + entries, err := iw.ReadAll() + require.NoError(t, err) + require.Equal(t, 3, len(entries)) + require.Equal(t, "a", entries[0]) + require.Equal(t, "b", entries[1]) + require.Equal(t, "c", entries[2]) + + require.NoError(t, iw.Close()) +} + +func TestIndexedWAL_ReopenWithData(t *testing.T) { + dir := t.TempDir() + + // Write some entries and close. + iw, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + require.NoError(t, iw.Write("x")) + require.NoError(t, iw.Write("y")) + require.NoError(t, iw.Close()) + + // Reopen — should recover firstIdx, nextIdx, and entries. + iw2, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + + require.Equal(t, uint64(2), iw2.Count()) + require.Equal(t, uint64(1), iw2.firstIdx) + require.Equal(t, uint64(3), iw2.nextIdx) + + entries, err := iw2.ReadAll() + require.NoError(t, err) + require.Equal(t, 2, len(entries)) + require.Equal(t, "x", entries[0]) + require.Equal(t, "y", entries[1]) + + require.NoError(t, iw2.Close()) +} + +func TestIndexedWAL_ReopenAfterTruncate(t *testing.T) { + dir := t.TempDir() + + iw, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + for _, s := range []string{"a", "b", "c", "d", "e"} { + require.NoError(t, iw.Write(s)) + } + // Truncate first 3 entries (indices 1,2,3); keep 4,5. + require.NoError(t, iw.TruncateBefore(4, acceptAny)) + require.Equal(t, uint64(2), iw.Count()) + require.NoError(t, iw.Close()) + + // Reopen — should see only the surviving entries. + iw2, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + require.Equal(t, uint64(2), iw2.Count()) + require.Equal(t, uint64(4), iw2.firstIdx) + require.Equal(t, uint64(6), iw2.nextIdx) + + entries, err := iw2.ReadAll() + require.NoError(t, err) + require.Equal(t, 2, len(entries)) + require.Equal(t, "d", entries[0]) + require.Equal(t, "e", entries[1]) + + require.NoError(t, iw2.Close()) +} + +func TestIndexedWAL_TruncateAllButLast(t *testing.T) { + dir := t.TempDir() + + iw, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + require.NoError(t, iw.Write("a")) + require.NoError(t, iw.Write("b")) + require.NoError(t, iw.Write("c")) + + // TruncateBefore keeps the entry at the given index; remove all but last. + require.NoError(t, iw.TruncateBefore(3, acceptAny)) + require.Equal(t, uint64(1), iw.Count()) + require.Equal(t, uint64(3), iw.firstIdx) + require.NoError(t, iw.Close()) + + // Reopen — should see one entry. + iw2, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + require.Equal(t, uint64(1), iw2.Count()) + + entries, err := iw2.ReadAll() + require.NoError(t, err) + require.Equal(t, 1, len(entries)) + require.Equal(t, "c", entries[0]) + + require.NoError(t, iw2.Close()) +} + +func TestIndexedWAL_TruncateBeforeVerifiesEntry(t *testing.T) { + dir := t.TempDir() + iw, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + + require.NoError(t, iw.Write("a")) + require.NoError(t, iw.Write("b")) + require.NoError(t, iw.Write("c")) + + // Verify callback receives the correct entry. + var got string + require.NoError(t, iw.TruncateBefore(2, func(s string) error { + got = s + return nil + })) + require.Equal(t, "b", got) + require.Equal(t, uint64(2), iw.firstIdx) + + // Verify callback can reject the truncation. + err = iw.TruncateBefore(3, func(s string) error { + return fmt.Errorf("rejected: %s", s) + }) + require.Error(t, err) + require.Contains(t, err.Error(), "rejected: c") + // firstIdx should NOT have advanced since verify rejected. + require.Equal(t, uint64(2), iw.firstIdx) + + require.NoError(t, iw.Close()) +} + +func TestIndexedWAL_TruncateAll(t *testing.T) { + dir := t.TempDir() + iw, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + + require.NoError(t, iw.Write("a")) + require.NoError(t, iw.Write("b")) + require.NoError(t, iw.Write("c")) + require.Equal(t, uint64(3), iw.Count()) + require.Equal(t, uint64(4), iw.nextIdx) + + require.NoError(t, iw.TruncateAll()) + require.Equal(t, uint64(0), iw.Count()) + require.Equal(t, uint64(4), iw.firstIdx) // advanced to nextIdx + require.Equal(t, uint64(4), iw.nextIdx) // index counter preserved + + // Can write fresh entries after TruncateAll; indices continue. + require.NoError(t, iw.Write("x")) + require.Equal(t, uint64(1), iw.Count()) + require.Equal(t, uint64(4), iw.firstIdx) + require.Equal(t, uint64(5), iw.nextIdx) + + entries, err := iw.ReadAll() + require.NoError(t, err) + require.Equal(t, 1, len(entries)) + require.Equal(t, "x", entries[0]) + + require.NoError(t, iw.Close()) + + // Reopen — should see only the post-TruncateAll entry. + iw2, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + require.Equal(t, uint64(1), iw2.Count()) + require.Equal(t, uint64(4), iw2.firstIdx) + require.Equal(t, uint64(5), iw2.nextIdx) + entries, err = iw2.ReadAll() + require.NoError(t, err) + require.Equal(t, 1, len(entries)) + require.Equal(t, "x", entries[0]) + require.NoError(t, iw2.Close()) +} + +func TestIndexedWAL_ReadAllDetectsStaleNextIdx(t *testing.T) { + dir := t.TempDir() + iw, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + + require.NoError(t, iw.Write("a")) + require.NoError(t, iw.Write("b")) + require.Equal(t, uint64(2), iw.Count()) + + // Simulate stale internal state: advance nextIdx so Count() reports more + // entries than the WAL actually contains. ReadAll must return an error + // (either from Replay failing to read the missing entry, or from the + // post-replay count check). + iw.nextIdx++ + + _, err = iw.ReadAll() + require.Error(t, err) + + iw.nextIdx-- + require.NoError(t, iw.Close()) +} + +func TestIndexedWAL_WriteAfterTruncate(t *testing.T) { + dir := t.TempDir() + + iw, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + require.NoError(t, iw.Write("a")) + require.NoError(t, iw.Write("b")) + require.NoError(t, iw.Write("c")) + + // Truncate "a" and "b". + require.NoError(t, iw.TruncateBefore(3, acceptAny)) + require.Equal(t, uint64(1), iw.Count()) + + // Write more after truncation. + require.NoError(t, iw.Write("d")) + require.NoError(t, iw.Write("e")) + require.Equal(t, uint64(3), iw.Count()) + require.Equal(t, uint64(3), iw.firstIdx) + require.Equal(t, uint64(6), iw.nextIdx) + + entries, err := iw.ReadAll() + require.NoError(t, err) + require.Equal(t, 3, len(entries)) + require.Equal(t, "c", entries[0]) + require.Equal(t, "d", entries[1]) + require.Equal(t, "e", entries[2]) + + require.NoError(t, iw.Close()) +} + +func TestIndexedWAL_LastWriteTimeSetOnOpen(t *testing.T) { + before := time.Now() + dir := t.TempDir() + iw, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + after := time.Now() + + lwt := iw.LastWriteTime() + require.False(t, lwt.IsZero(), "empty WAL should have non-zero LastWriteTime from open") + require.False(t, lwt.Before(before)) + require.False(t, lwt.After(after)) + require.NoError(t, iw.Close()) +} + +func TestIndexedWAL_LastWriteTimeAdvancesOnWrite(t *testing.T) { + dir := t.TempDir() + iw, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + + before := time.Now() + require.NoError(t, iw.Write("a")) + after := time.Now() + + lwt := iw.LastWriteTime() + require.False(t, lwt.IsZero()) + require.False(t, lwt.Before(before)) + require.False(t, lwt.After(after)) + + require.NoError(t, iw.Close()) +} + +func TestIndexedWAL_LastWriteTimeNonZeroOnReopenWithData(t *testing.T) { + dir := t.TempDir() + + iw, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + require.NoError(t, iw.Write("a")) + require.NoError(t, iw.Close()) + + before := time.Now() + iw2, err := openIndexedWAL(dir, stringCodec{}) + require.NoError(t, err) + after := time.Now() + + lwt := iw2.LastWriteTime() + require.False(t, lwt.IsZero(), "reopened WAL with data should have non-zero LastWriteTime") + require.False(t, lwt.Before(before)) + require.False(t, lwt.After(after)) + + require.NoError(t, iw2.Close()) +}