Skip to content

Commit 9af0f90

Browse files
authored
feat(sequencer): catchup from base (#3057)
* feat(sequencer): catchup from base * catch up fixes * improvements * update catchup test * code cleanups * imp * only produce 1 base block per da epoch (unless more needed) * fixes * updates * cleanup comments * updates * fix failover with local-da change
1 parent 805f927 commit 9af0f90

34 files changed

Lines changed: 2768 additions & 1410 deletions

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111

1212
### Added
1313

14+
- Add disaster recovery for sequencer
15+
- Catch up possible DA-only blocks when restarting. [#3057](https://github.com/evstack/ev-node/pull/3057)
16+
- Verify DA and P2P state on restart (prevent double-signing). [#3061](https://github.com/evstack/ev-node/pull/3061)
1417
- Node pruning support. [#2984](https://github.com/evstack/ev-node/pull/2984)
1518
- Two different sort of pruning implemented:
1619
_Classic pruning_ (`all`): prunes given `HEAD-n` blocks from the databases, including store metadatas.
@@ -21,6 +24,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
2124
### Changes
2225

2326
- Store pending blocks separately from executed blocks key. [#3073](https://github.com/evstack/ev-node/pull/3073)
27+
- Fixes issues with force inclusion verification on sync nodes. [#3057](https://github.com/evstack/ev-node/pull/3057)
28+
- Add flag to `local-da` to produce empty DA blocks (closer to the real system). [#3057](https://github.com/evstack/ev-node/pull/3057)
2429

2530
## v1.0.0-rc.4
2631

apps/evm/server/force_inclusion_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ func (m *mockDA) HasForcedInclusionNamespace() bool {
7474
return true
7575
}
7676

77+
func (m *mockDA) GetLatestDAHeight(_ context.Context) (uint64, error) {
78+
return 0, nil
79+
}
80+
7781
func TestForceInclusionServer_handleSendRawTransaction_Success(t *testing.T) {
7882
testHeight := uint64(100)
7983

block/internal/da/async_block_retriever.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ func (f *asyncBlockRetriever) GetCachedBlock(ctx context.Context, daHeight uint6
157157

158158
block := &BlockData{
159159
Height: pbBlock.Height,
160-
Timestamp: time.Unix(pbBlock.Timestamp, 0).UTC(),
160+
Timestamp: time.Unix(0, pbBlock.Timestamp).UTC(),
161161
Blobs: pbBlock.Blobs,
162162
}
163163

@@ -261,7 +261,7 @@ func (f *asyncBlockRetriever) fetchAndCacheBlock(height uint64) {
261261
// Serialize and cache the block
262262
pbBlock := &pb.BlockData{
263263
Height: block.Height,
264-
Timestamp: block.Timestamp.Unix(),
264+
Timestamp: block.Timestamp.UnixNano(),
265265
Blobs: block.Blobs,
266266
}
267267
data, err := proto.Marshal(pbBlock)

block/internal/da/async_block_retriever_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ func TestAsyncBlockRetriever_StopGracefully(t *testing.T) {
201201
func TestBlockData_Serialization(t *testing.T) {
202202
block := &BlockData{
203203
Height: 100,
204-
Timestamp: time.Unix(12345, 0).UTC(),
204+
Timestamp: time.Unix(12345, 123456789).UTC(),
205205
Blobs: [][]byte{
206206
[]byte("blob1"),
207207
[]byte("blob2"),
@@ -212,7 +212,7 @@ func TestBlockData_Serialization(t *testing.T) {
212212
// Serialize using protobuf
213213
pbBlock := &pb.BlockData{
214214
Height: block.Height,
215-
Timestamp: block.Timestamp.Unix(),
215+
Timestamp: block.Timestamp.UnixNano(),
216216
Blobs: block.Blobs,
217217
}
218218
data, err := proto.Marshal(pbBlock)
@@ -226,11 +226,11 @@ func TestBlockData_Serialization(t *testing.T) {
226226

227227
decoded := &BlockData{
228228
Height: decodedPb.Height,
229-
Timestamp: time.Unix(decodedPb.Timestamp, 0).UTC(),
229+
Timestamp: time.Unix(0, decodedPb.Timestamp).UTC(),
230230
Blobs: decodedPb.Blobs,
231231
}
232232

233-
assert.Equal(t, block.Timestamp.Unix(), decoded.Timestamp.Unix())
233+
assert.Equal(t, block.Timestamp.UnixNano(), decoded.Timestamp.UnixNano())
234234
assert.Equal(t, block.Height, decoded.Height)
235235
assert.Equal(t, len(block.Blobs), len(decoded.Blobs))
236236
for i := range block.Blobs {
@@ -248,7 +248,7 @@ func TestBlockData_SerializationEmpty(t *testing.T) {
248248
// Serialize using protobuf
249249
pbBlock := &pb.BlockData{
250250
Height: block.Height,
251-
Timestamp: block.Timestamp.Unix(),
251+
Timestamp: block.Timestamp.UnixNano(),
252252
Blobs: block.Blobs,
253253
}
254254
data, err := proto.Marshal(pbBlock)
@@ -261,7 +261,7 @@ func TestBlockData_SerializationEmpty(t *testing.T) {
261261

262262
decoded := &BlockData{
263263
Height: decodedPb.Height,
264-
Timestamp: time.Unix(decodedPb.Timestamp, 0).UTC(),
264+
Timestamp: time.Unix(0, decodedPb.Timestamp).UTC(),
265265
Blobs: decodedPb.Blobs,
266266
}
267267

block/internal/da/client.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,23 @@ func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte)
299299
}
300300
}
301301

302+
// GetLatestDAHeight returns the latest height available on the DA layer by
303+
// querying the network head.
304+
func (c *client) GetLatestDAHeight(ctx context.Context) (uint64, error) {
305+
headCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout)
306+
defer cancel()
307+
308+
header, err := c.headerAPI.NetworkHead(headCtx)
309+
if err != nil {
310+
return 0, fmt.Errorf("failed to get DA network head: %w", err)
311+
}
312+
if header == nil {
313+
return 0, fmt.Errorf("DA network head returned nil header")
314+
}
315+
316+
return header.Height, nil
317+
}
318+
302319
// RetrieveForcedInclusion retrieves blobs from the forced inclusion namespace at the specified height.
303320
func (c *client) RetrieveForcedInclusion(ctx context.Context, height uint64) datypes.ResultRetrieve {
304321
if !c.hasForcedNamespace {

block/internal/da/forced_inclusion_retriever.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,9 @@ func (r *forcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context
163163

164164
if result.Code == datypes.StatusNotFound {
165165
r.logger.Debug().Uint64("height", h).Msg("no forced inclusion blobs at height")
166+
syncFetchedBlocks[h] = &BlockData{
167+
Timestamp: result.Timestamp,
168+
}
166169
continue
167170
}
168171

@@ -229,6 +232,7 @@ func (r *forcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context
229232
Msg("Failed to retrieve DA epoch.. retrying next iteration")
230233

231234
return &ForcedInclusionEvent{
235+
Timestamp: event.Timestamp,
232236
StartDaHeight: daHeight,
233237
EndDaHeight: daHeight,
234238
Txs: [][]byte{},

block/internal/da/interface.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ type Client interface {
1717
// Get retrieves blobs by their IDs. Used for visualization and fetching specific blobs.
1818
Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error)
1919

20+
// GetLatestDAHeight returns the latest height available on the DA layer.
21+
GetLatestDAHeight(ctx context.Context) (uint64, error)
22+
2023
// Namespace accessors.
2124
GetHeaderNamespace() []byte
2225
GetDataNamespace() []byte

block/internal/da/tracing.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,20 @@ func (t *tracedClient) Validate(ctx context.Context, ids []datypes.ID, proofs []
123123
return res, nil
124124
}
125125

126+
func (t *tracedClient) GetLatestDAHeight(ctx context.Context) (uint64, error) {
127+
ctx, span := t.tracer.Start(ctx, "DA.GetLatestDAHeight")
128+
defer span.End()
129+
130+
height, err := t.inner.GetLatestDAHeight(ctx)
131+
if err != nil {
132+
span.RecordError(err)
133+
span.SetStatus(codes.Error, err.Error())
134+
return 0, err
135+
}
136+
span.SetAttributes(attribute.Int64("da.latest_height", int64(height)))
137+
return height, nil
138+
}
139+
126140
func (t *tracedClient) GetHeaderNamespace() []byte { return t.inner.GetHeaderNamespace() }
127141
func (t *tracedClient) GetDataNamespace() []byte { return t.inner.GetDataNamespace() }
128142
func (t *tracedClient) GetForcedInclusionNamespace() []byte {

block/internal/da/tracing_test.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,11 @@ func (m *mockFullClient) Validate(ctx context.Context, ids []datypes.ID, proofs
5454
}
5555
return nil, nil
5656
}
57-
func (m *mockFullClient) GetHeaderNamespace() []byte { return []byte{0x01} }
58-
func (m *mockFullClient) GetDataNamespace() []byte { return []byte{0x02} }
59-
func (m *mockFullClient) GetForcedInclusionNamespace() []byte { return []byte{0x03} }
60-
func (m *mockFullClient) HasForcedInclusionNamespace() bool { return true }
57+
func (m *mockFullClient) GetLatestDAHeight(_ context.Context) (uint64, error) { return 0, nil }
58+
func (m *mockFullClient) GetHeaderNamespace() []byte { return []byte{0x01} }
59+
func (m *mockFullClient) GetDataNamespace() []byte { return []byte{0x02} }
60+
func (m *mockFullClient) GetForcedInclusionNamespace() []byte { return []byte{0x03} }
61+
func (m *mockFullClient) HasForcedInclusionNamespace() bool { return true }
6162

6263
// setup a tracer provider + span recorder
6364
func setupDATrace(t *testing.T, inner FullClient) (FullClient, *tracetest.SpanRecorder) {

block/internal/executing/executor.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,14 @@ func (e *Executor) initializeState() error {
337337
return fmt.Errorf("failed to sync execution layer: %w", err)
338338
}
339339

340+
// For based sequencer, advance safe/finalized since it comes from DA.
341+
if e.config.Node.BasedSequencer && syncTargetHeight > 0 {
342+
if err := e.exec.SetFinal(e.ctx, syncTargetHeight); err != nil {
343+
e.sendCriticalError(fmt.Errorf("failed to set final height in based sequencer mode: %w", err))
344+
return fmt.Errorf("failed to set final height in based sequencer mode: %w", err)
345+
}
346+
}
347+
340348
// Double-check state against Raft after replay
341349
if e.raftNode != nil {
342350
raftState := e.raftNode.GetState()
@@ -627,6 +635,14 @@ func (e *Executor) ProduceBlock(ctx context.Context) error {
627635
Int("txs", len(data.Txs)).
628636
Msg("produced block")
629637

638+
// For based sequencer, advance safe/finalized since it comes from DA.
639+
if e.config.Node.BasedSequencer {
640+
if err := e.exec.SetFinal(e.ctx, newHeight); err != nil {
641+
e.sendCriticalError(fmt.Errorf("failed to set final height in based sequencer mode: %w", err))
642+
return fmt.Errorf("failed to set final height in based sequencer mode: %w", err)
643+
}
644+
}
645+
630646
return nil
631647
}
632648

0 commit comments

Comments
 (0)