Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
0e18b1f
Enable transitioning of node to using MEL when previously ran with In…
ganeshvanahalli Apr 2, 2026
701d16d
add changelog
ganeshvanahalli Apr 2, 2026
76ce27d
Merge branch 'replace-readerandtracker-with-messageextractor' into no…
ganeshvanahalli Apr 2, 2026
20ee29c
add system test to verify transition works
ganeshvanahalli Apr 3, 2026
e876e96
Merge branch 'replace-readerandtracker-with-messageextractor' into no…
ganeshvanahalli Apr 6, 2026
081546a
Merge branch 'replace-readerandtracker-with-messageextractor' into no…
ganeshvanahalli Apr 6, 2026
c6ca5f3
fix lint error
ganeshvanahalli Apr 6, 2026
102d632
Merge branch 'replace-readerandtracker-with-messageextractor' into no…
ganeshvanahalli Apr 7, 2026
d03fbca
merge upstream and resolve conflicts
ganeshvanahalli Apr 9, 2026
ac6f44c
add unit tests
ganeshvanahalli Apr 9, 2026
1e8152f
Merge branch 'replace-readerandtracker-with-messageextractor' into no…
ganeshvanahalli Apr 10, 2026
b784949
fix lint errors
ganeshvanahalli Apr 10, 2026
7fc981e
Merge branch 'master' into node-transition-to-mel
ganeshvanahalli Apr 13, 2026
50557e9
fix deadlock in execution engine
ganeshvanahalli Apr 14, 2026
5751070
address PR comments
ganeshvanahalli Apr 14, 2026
49dcf2a
Merge branch 'master' into node-transition-to-mel
ganeshvanahalli Apr 14, 2026
cfbcb70
remove duplicate helper method declaration in commmon_test.go
ganeshvanahalli Apr 14, 2026
d16f4d7
Merge branch 'master' into node-transition-to-mel
eljobe Apr 14, 2026
762d6b2
improve TestMELMigrationFromLegacyNode
ganeshvanahalli Apr 14, 2026
a4430e7
bug fix
ganeshvanahalli Apr 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions arbnode/db/schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ var (
SequencerBatchCountKey []byte = []byte("_sequencerBatchCount") // contains the current sequencer message count
DbSchemaVersion []byte = []byte("_schemaVersion") // contains a uint64 representing the database schema version
HeadMelStateBlockNumKey []byte = []byte("_headMelStateBlockNum") // contains the latest computed MEL state's parent chain block number
InitialMelStateBlockNumKey []byte = []byte("_initialMelStateBlockNum") // contains the initial MEL state's parent chain block number (legacy/MEL boundary)
)

const CurrentDbSchemaVersion uint64 = 2
2 changes: 1 addition & 1 deletion arbnode/mel/extraction/messages_in_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func messagesFromBatchSegments(
continue // We ignore being able to parse an advance segment.
}
return nil, fmt.Errorf(
"error parsing segment %d: %w", idx, err,
"error parsing segment %d, delayedSeen: %d delayedRead: %d, err: %w", idx, melState.DelayedMessagesSeen, melState.DelayedMessagesRead, err,
)
}
timestamp = newTimestamp
Expand Down
65 changes: 63 additions & 2 deletions arbnode/mel/runner/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package melrunner
import (
"fmt"

"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"
Expand All @@ -19,10 +20,64 @@ import (
// against the outbox accumulator.
type Database struct {
db ethdb.KeyValueStore

// Cached boundary counts from the initial MEL state (set during legacy migration).
// Indices below these thresholds are read from legacy (pre-MEL) schema keys.
hasInitialState bool
initialDelayedCount uint64
initialBatchCount uint64
}

func NewDatabase(db ethdb.KeyValueStore) *Database {
return &Database{db}
func NewDatabase(db ethdb.KeyValueStore) (*Database, error) {
d := &Database{db: db}
if err := d.loadInitialBoundary(); err != nil {
return nil, fmt.Errorf("failed to load initial MEL state boundary: %w", err)
}
return d, nil
}

// loadInitialBoundary loads the initial MEL state boundary counts from the DB.
// If InitialMelStateBlockNumKey exists, it reads the initial state to cache
// the delayed message and batch count thresholds for legacy key dispatch.
// Returns nil if the key does not exist (fresh node with no legacy boundary).
func (d *Database) loadInitialBoundary() error {
blockNum, err := read.Value[uint64](d.db, schema.InitialMelStateBlockNumKey)
if err != nil {
if rawdb.IsDbErrNotFound(err) {
d.hasInitialState = false
return nil
}
return fmt.Errorf("error reading InitialMelStateBlockNumKey: %w", err)
}
state, err := d.State(blockNum)
if err != nil {
return fmt.Errorf("error reading initial MEL state at block %d: %w", blockNum, err)
}
d.hasInitialState = true
d.initialDelayedCount = state.DelayedMessagesSeen
d.initialBatchCount = state.BatchCount
return nil
}

func (d *Database) SaveInitialMelState(initialState *mel.State) error {
dbBatch := d.db.NewBatch()
encoded, err := rlp.EncodeToBytes(initialState.ParentChainBlockNumber)
if err != nil {
return err
}
if err := dbBatch.Put(schema.InitialMelStateBlockNumKey, encoded); err != nil {
return err
}
if err := d.setMelState(dbBatch, initialState.ParentChainBlockNumber, *initialState); err != nil {
return err
}
if err := d.setHeadMelStateBlockNum(dbBatch, initialState.ParentChainBlockNumber); err != nil {
return err
}
d.hasInitialState = true
d.initialDelayedCount = initialState.DelayedMessagesSeen
d.initialBatchCount = initialState.BatchCount
return dbBatch.Write()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The in-memory fields are set before dbBatch.Write(). If the batch write fails (disk full, I/O error, etc.), the Database struct will have hasInitialState = true with the boundary values cached, but the DB will not actually contain the initial MEL state or the InitialMelStateBlockNumKey. This means fetchBatchMetadata and FetchDelayedMessage will dispatch to legacy keys based on stale thresholds for the remainder of the process lifetime.

In practice the caller returns an error and the node fails to start, so this is unlikely to cause data corruption — but it's a correctness bug in the function's contract and straightforward to fix:

if err := dbBatch.Write(); err != nil {
    return err
}
d.hasInitialState = true
d.initialDelayedCount = initialState.DelayedMessagesSeen
d.initialBatchCount = initialState.BatchCount
return nil

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed it

}

func (d *Database) GetHeadMelState() (*mel.State, error) {
Expand Down Expand Up @@ -103,6 +158,9 @@ func (d *Database) SaveBatchMetas(state *mel.State, batchMetas []*mel.BatchMetad
}

func (d *Database) fetchBatchMetadata(seqNum uint64) (*mel.BatchMetadata, error) {
if d.hasInitialState && seqNum < d.initialBatchCount {
return legacyFetchBatchMetadata(d.db, seqNum)
}
batchMetadata, err := read.Value[mel.BatchMetadata](d.db, read.Key(schema.MelSequencerBatchMetaPrefix, seqNum))
if err != nil {
return nil, err
Expand Down Expand Up @@ -162,6 +220,9 @@ func (d *Database) ReadDelayedMessage(state *mel.State, index uint64) (*mel.Dela
}

func (d *Database) FetchDelayedMessage(index uint64) (*mel.DelayedInboxMessage, error) {
if d.hasInitialState && index < d.initialDelayedCount {
return legacyFetchDelayedMessage(d.db, index)
}
delayed, err := read.Value[mel.DelayedInboxMessage](d.db, read.Key(schema.MelDelayedMessagePrefix, index))
if err != nil {
return nil, err
Expand Down
10 changes: 6 additions & 4 deletions arbnode/mel/runner/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ func TestMelDatabase(t *testing.T) {

// Create database
consensusDB := rawdb.NewMemoryDatabase()
melDB := NewDatabase(consensusDB)
melDB, err := NewDatabase(consensusDB)
require.NoError(t, err)

headMelState := &mel.State{
ParentChainBlockNumber: 2,
Expand Down Expand Up @@ -68,7 +69,8 @@ func TestMelDatabaseReadAndWriteDelayedMessages(t *testing.T) {
// Init
// Create database
consensusDB := rawdb.NewMemoryDatabase()
melDB := NewDatabase(consensusDB)
melDB, err := NewDatabase(consensusDB)
require.NoError(t, err)

delayedRequestId := common.BigToHash(common.Big1)
delayedMsg := &mel.DelayedInboxMessage{
Expand Down Expand Up @@ -102,10 +104,10 @@ func TestMelDelayedMessagesAccumulation(t *testing.T) {

// Create database
consensusDB := rawdb.NewMemoryDatabase()
melDB := NewDatabase(consensusDB)
melDB, err := NewDatabase(consensusDB)
require.NoError(t, err)

// Add genesis melState
var err error
genesis := &mel.State{
ParentChainBlockNumber: 1,
}
Expand Down
251 changes: 251 additions & 0 deletions arbnode/mel/runner/legacy_db_reads.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
// Copyright 2026, Offchain Labs, Inc.
// For license information, see https://github.com/OffchainLabs/nitro/blob/master/LICENSE.md
package melrunner

import (
"bytes"
"encoding/binary"
"errors"
"fmt"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/rlp"

"github.com/offchainlabs/nitro/arbnode/db/read"
"github.com/offchainlabs/nitro/arbnode/db/schema"
"github.com/offchainlabs/nitro/arbnode/mel"
"github.com/offchainlabs/nitro/arbos/arbostypes"
)

// legacyFetchDelayedMessage reads a delayed message stored under pre-MEL schema keys.
// It tries RlpDelayedMessagePrefix ("e") first, then falls back to LegacyDelayedMessagePrefix ("d").
// Returns a fully populated DelayedInboxMessage including BeforeInboxAcc (obtained from the
// previous message's AfterInboxAcc, or zero hash for index 0). BlockHash is left as zero.
func legacyFetchDelayedMessage(db ethdb.KeyValueStore, index uint64) (*mel.DelayedInboxMessage, error) {
msg, parentChainBlockNumber, err := legacyGetDelayedMessageAndParentChainBlockNumber(db, index)
if err != nil {
return nil, err
}
// BeforeInboxAcc is the AfterInboxAcc of the previous message (zero hash for index 0)
var beforeInboxAcc common.Hash
if index > 0 {
beforeInboxAcc, err = legacyGetDelayedAcc(db, index-1)
if err != nil {
return nil, fmt.Errorf("failed to get BeforeInboxAcc for delayed message %d: %w", index, err)
}
}
return &mel.DelayedInboxMessage{
BeforeInboxAcc: beforeInboxAcc,
Message: msg,
ParentChainBlockNumber: parentChainBlockNumber,
}, nil
}

// legacyGetDelayedMessageAndParentChainBlockNumber reads a delayed message and its parent chain
// block number from pre-MEL schema keys. Mirrors InboxTracker.getRawDelayedMessageAccumulatorAndParentChainBlockNumber.
func legacyGetDelayedMessageAndParentChainBlockNumber(db ethdb.KeyValueStore, index uint64) (*arbostypes.L1IncomingMessage, uint64, error) {
msg, _, err := legacyGetDelayedMessageFromRlpPrefix(db, index)
if err != nil {
// Fall back to legacy "d" prefix
msg, _, err = legacyGetDelayedMessageFromLegacyPrefix(db, index)
Comment on lines +49 to +55
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When legacyGetDelayedMessageFromRlpPrefix fails for a reason other than "not found" (e.g., corrupted data causing an RLP decode error, truncated entry), the code unconditionally falls through to the legacy "d" prefix instead of surfacing the corruption error. This could silently mask data corruption.

Notably, the same function already checks !rawdb.IsDbErrNotFound(err) for the parent chain block number fallback on line 61, so the asymmetry here looks like an oversight.

Suggested fix — only fall back on not-found:

msg, _, err := legacyGetDelayedMessageFromRlpPrefix(db, index)
if err != nil {
    if !rawdb.IsDbErrNotFound(err) {
        return nil, 0, fmt.Errorf("error reading RLP delayed message at index %d: %%w", index, err)
    }
    // Key genuinely absent — fall back to legacy "d" prefix
    msg, _, err = legacyGetDelayedMessageFromLegacyPrefix(db, index)
    ...
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, fixed it

if err != nil {
return nil, 0, fmt.Errorf("delayed message at index %d not found under either prefix: %w", index, err)
}
// Legacy "d" prefix does not store parent chain block number separately
return msg, msg.Header.BlockNumber, nil
}
parentChainBlockNumber, err := legacyGetParentChainBlockNumber(db, index)
if err != nil {
if !rawdb.IsDbErrNotFound(err) {
return nil, 0, err
}
return msg, msg.Header.BlockNumber, nil
}
return msg, parentChainBlockNumber, nil
}

// legacyGetDelayedMessageFromRlpPrefix reads from RlpDelayedMessagePrefix ("e").
// Format: [32-byte AfterInboxAcc | RLP(L1IncomingMessage)]
// Returns the decoded message and the AfterInboxAcc stored alongside it.
func legacyGetDelayedMessageFromRlpPrefix(db ethdb.KeyValueStore, index uint64) (*arbostypes.L1IncomingMessage, common.Hash, error) {
key := read.Key(schema.RlpDelayedMessagePrefix, index)
data, err := db.Get(key)
if err != nil {
return nil, common.Hash{}, err
}
if len(data) < 32 {
return nil, common.Hash{}, errors.New("delayed message RLP entry missing accumulator")
}
var acc common.Hash
copy(acc[:], data[:32])
var msg *arbostypes.L1IncomingMessage
if err := rlp.DecodeBytes(data[32:], &msg); err != nil {
return nil, common.Hash{}, fmt.Errorf("error decoding RLP delayed message at index %d: %w", index, err)
}
return msg, acc, nil
}

// legacyGetDelayedMessageFromLegacyPrefix reads from LegacyDelayedMessagePrefix ("d").
// Format: [32-byte AfterInboxAcc | L1-serialized message]
// Returns the decoded message and the AfterInboxAcc stored alongside it.
func legacyGetDelayedMessageFromLegacyPrefix(db ethdb.KeyValueStore, index uint64) (*arbostypes.L1IncomingMessage, common.Hash, error) {
key := read.Key(schema.LegacyDelayedMessagePrefix, index)
data, err := db.Get(key)
if err != nil {
return nil, common.Hash{}, err
}
if len(data) < 32 {
return nil, common.Hash{}, errors.New("delayed message legacy entry missing accumulator")
}
var acc common.Hash
copy(acc[:], data[:32])
msg, err := arbostypes.ParseIncomingL1Message(bytes.NewReader(data[32:]), nil)
if err != nil {
return nil, common.Hash{}, fmt.Errorf("error parsing legacy delayed message at index %d: %w", index, err)
}
return msg, acc, nil
}

// legacyGetParentChainBlockNumber reads the parent chain block number stored under
// ParentChainBlockNumberPrefix ("p") for a given delayed message index.
func legacyGetParentChainBlockNumber(db ethdb.KeyValueStore, index uint64) (uint64, error) {
key := read.Key(schema.ParentChainBlockNumberPrefix, index)
data, err := db.Get(key)
if err != nil {
return 0, err
}
if len(data) < 8 {
return 0, fmt.Errorf("parent chain block number data too short for index %d", index)
}
return binary.BigEndian.Uint64(data), nil
}

// legacyFetchBatchMetadata reads batch metadata stored under pre-MEL SequencerBatchMetaPrefix ("s").
func legacyFetchBatchMetadata(db ethdb.KeyValueStore, seqNum uint64) (*mel.BatchMetadata, error) {
batchMetadata, err := read.BatchMetadata(db, seqNum)
if err != nil {
return nil, err
}
return &batchMetadata, nil
}

// legacyGetDelayedAcc reads the delayed message accumulator (AfterInboxAcc) from pre-MEL keys.
// Tries RlpDelayedMessagePrefix ("e") first, then LegacyDelayedMessagePrefix ("d").
func legacyGetDelayedAcc(db ethdb.KeyValueStore, seqNum uint64) (common.Hash, error) {
key := read.Key(schema.RlpDelayedMessagePrefix, seqNum)
has, err := db.Has(key)
if err != nil {
return common.Hash{}, err
}
if !has {
key = read.Key(schema.LegacyDelayedMessagePrefix, seqNum)
has, err = db.Has(key)
if err != nil {
return common.Hash{}, err
}
if !has {
return common.Hash{}, fmt.Errorf("delayed accumulator not found for index %d", seqNum)
}
}
data, err := db.Get(key)
if err != nil {
return common.Hash{}, err
}
if len(data) < 32 {
return common.Hash{}, errors.New("delayed message entry missing accumulator")
}
var hash common.Hash
copy(hash[:], data[:32])
return hash, nil
}

// legacyFindBatchCountAtBlock finds the number of batches posted at or before
// the given parent chain block number by scanning backwards from totalBatchCount.
func legacyFindBatchCountAtBlock(db ethdb.KeyValueStore, totalBatchCount uint64, blockNum uint64) (uint64, error) {
for i := totalBatchCount; i > 0; i-- {
meta, err := legacyFetchBatchMetadata(db, i-1)
if err != nil {
return 0, fmt.Errorf("failed to read batch metadata %d: %w", i-1, err)
}
if meta.ParentChainBlock <= blockNum {
return i, nil
}
}
return 0, nil
}

// CreateInitialMELStateFromLegacyDB constructs an initial MEL state from pre-MEL
// database entries (legacy inbox reader/tracker data). The caller must provide:
// - startBlockNum: the parent chain block to anchor the initial state (should be finalized)
// - delayedSeenAtBlock: the on-chain delayed message count at startBlockNum (from bridge contract)
//
// Batch count, msg count, and delayed read are derived from legacy batch metadata.
// The MEL inbox accumulator is reconstructed for any unread delayed messages.
func CreateInitialMELStateFromLegacyDB(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a really nice function! Great job on putting it together

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks!

db ethdb.KeyValueStore,
sequencerInbox common.Address,
bridgeAddr common.Address,
parentChainId uint64,
fetchBlock func(blockNum uint64) (hash, parentHash common.Hash, err error),
startBlockNum uint64,
delayedSeenAtBlock uint64,
) (*mel.State, error) {
totalBatchCount, err := read.Value[uint64](db, schema.SequencerBatchCountKey)
if err != nil {
return nil, fmt.Errorf("failed to read legacy batch count: %w", err)
}

// Find batch count at or before the start block
batchCount, err := legacyFindBatchCountAtBlock(db, totalBatchCount, startBlockNum)
if err != nil {
return nil, fmt.Errorf("failed to find batch count at block %d: %w", startBlockNum, err)
}

// Determine msgCount and delayedRead from the last batch at/before start block
var msgCount uint64
var delayedRead uint64
if batchCount > 0 {
batchMeta, err := legacyFetchBatchMetadata(db, batchCount-1)
if err != nil {
return nil, fmt.Errorf("failed to read batch metadata %d: %w", batchCount-1, err)
}
msgCount = uint64(batchMeta.MessageCount)
delayedRead = batchMeta.DelayedMessageCount
}

blockHash, parentHash, err := fetchBlock(startBlockNum)
if err != nil {
return nil, fmt.Errorf("failed to fetch block %d: %w", startBlockNum, err)
}

state := &mel.State{
Version: 0,
BatchPostingTargetAddress: sequencerInbox,
DelayedMessagePostingTargetAddress: bridgeAddr,
ParentChainId: parentChainId,
ParentChainBlockNumber: startBlockNum,
ParentChainBlockHash: blockHash,
ParentChainPreviousBlockHash: parentHash,
DelayedMessagesSeen: delayedRead, // will be incremented during accumulation
DelayedMessagesRead: delayedRead,
MsgCount: msgCount,
BatchCount: batchCount,
}

// Reconstruct MEL inbox accumulator for unread delayed messages
// (messages seen but not yet consumed by a batch at the start block)
for i := delayedRead; i < delayedSeenAtBlock; i++ {
delayedMsg, err := legacyFetchDelayedMessage(db, i)
if err != nil {
return nil, fmt.Errorf("failed to fetch legacy delayed message %d: %w", i, err)
}
if err := state.AccumulateDelayedMessage(delayedMsg); err != nil {
return nil, fmt.Errorf("failed to accumulate delayed message %d: %w", i, err)
}
state.DelayedMessagesSeen++
}

return state, nil
}
Loading
Loading