Skip to content

Commit 6686e6a

Browse files
committed
fix: preserve DA data until block acceptance
1 parent ea805f7 commit 6686e6a

5 files changed

Lines changed: 58 additions & 4 deletions

File tree

block/internal/syncing/da_retriever.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,10 @@ type DARetriever interface {
2828
ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent
2929
}
3030

31+
type pendingDataCleaner interface {
32+
removePendingData(height uint64)
33+
}
34+
3135
// daRetriever handles DA retrieval operations for syncing
3236
type daRetriever struct {
3337
client da.Client
@@ -213,7 +217,6 @@ func (r *daRetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight
213217
}
214218
} else {
215219
delete(r.pendingHeaders, height)
216-
delete(r.pendingData, height)
217220
}
218221

219222
// Create height event
@@ -245,6 +248,13 @@ func (r *daRetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight
245248
return events
246249
}
247250

251+
func (r *daRetriever) removePendingData(height uint64) {
252+
r.mu.Lock()
253+
defer r.mu.Unlock()
254+
255+
delete(r.pendingData, height)
256+
}
257+
248258
// tryDecodeHeader attempts to decode a blob as a header
249259
func (r *daRetriever) tryDecodeHeader(bz []byte, daHeight uint64) *types.SignedHeader {
250260
header := new(types.SignedHeader)

block/internal/syncing/da_retriever_test.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -307,9 +307,35 @@ func TestDARetriever_ProcessBlobs_CrossDAHeightMatching(t *testing.T) {
307307
assert.Equal(t, uint64(5), event.Data.Height())
308308
assert.Equal(t, uint64(102), event.DaHeight, "DaHeight should be the height where data was processed")
309309

310-
// Verify pending maps are cleared
310+
// Verify the header is consumed, while data remains available until the
311+
// candidate block is accepted by the syncer.
311312
require.NotContains(t, r.pendingHeaders, uint64(5), "header should be removed from pending")
312-
require.NotContains(t, r.pendingData, uint64(5), "data should be removed from pending")
313+
require.Contains(t, r.pendingData, uint64(5), "data should remain pending until accepted")
314+
315+
r.removePendingData(5)
316+
require.NotContains(t, r.pendingData, uint64(5), "accepted data should be removed from pending")
317+
}
318+
319+
func TestDARetriever_ProcessBlobs_KeepsDataForLaterHeaderAfterCandidateEvent(t *testing.T) {
320+
expectedAddr, expectedPub, expectedSigner := buildSyncTestSigner(t)
321+
wrongAddr, wrongPub, wrongSigner := buildSyncTestSigner(t)
322+
gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: expectedAddr}
323+
324+
r := newTestDARetriever(t, nil, config.DefaultConfig(), gen)
325+
326+
dataBin, data := makeSignedDataBytes(t, gen.ChainID, 5, expectedAddr, expectedPub, expectedSigner, 2)
327+
wrongHeaderBin, wrongHeader := makeSignedHeaderBytes(t, gen.ChainID, 5, wrongAddr, wrongPub, wrongSigner, nil, &data.Data, nil)
328+
correctHeaderBin, correctHeader := makeSignedHeaderBytes(t, gen.ChainID, 5, expectedAddr, expectedPub, expectedSigner, nil, &data.Data, nil)
329+
330+
events := r.processBlobs(context.Background(), [][]byte{wrongHeaderBin, dataBin}, 100)
331+
require.Len(t, events, 1)
332+
require.Equal(t, wrongHeader.Hash().String(), events[0].Header.Hash().String())
333+
require.Contains(t, r.pendingData, uint64(5), "data should stay available until the candidate block is accepted")
334+
335+
events = r.processBlobs(context.Background(), [][]byte{correctHeaderBin}, 101)
336+
require.Len(t, events, 1)
337+
require.Equal(t, correctHeader.Hash().String(), events[0].Header.Hash().String())
338+
require.Equal(t, data.Data.DACommitment().String(), events[0].Data.DACommitment().String())
313339
}
314340

315341
func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testing.T) {
@@ -355,6 +381,8 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin
355381
assert.Equal(t, uint64(5), events2[1].Header.Height())
356382
assert.Equal(t, uint64(5), events2[1].Data.Height())
357383
assert.Equal(t, uint64(203), events2[1].DaHeight)
384+
r.removePendingData(3)
385+
r.removePendingData(5)
358386

359387
// Verify header 4 is still pending (no matching data yet)
360388
require.Contains(t, r.pendingHeaders, uint64(4), "header 4 should still be pending")
@@ -369,6 +397,7 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin
369397
assert.Equal(t, uint64(4), events3[0].Header.Height())
370398
assert.Equal(t, uint64(4), events3[0].Data.Height())
371399
assert.Equal(t, uint64(205), events3[0].DaHeight)
400+
r.removePendingData(4)
372401

373402
// Verify all pending maps are now clear
374403
require.NotContains(t, r.pendingHeaders, uint64(4), "header 4 should be removed from pending")

block/internal/syncing/da_retriever_tracing.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,9 @@ func (t *tracedDARetriever) RetrieveFromDA(ctx context.Context, daHeight uint64)
5959
func (t *tracedDARetriever) ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent {
6060
return t.inner.ProcessBlobs(ctx, blobs, daHeight)
6161
}
62+
63+
func (t *tracedDARetriever) removePendingData(height uint64) {
64+
if cleaner, ok := t.inner.(pendingDataCleaner); ok {
65+
cleaner.removePendingData(height)
66+
}
67+
}

block/internal/syncing/syncer.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -827,6 +827,12 @@ func (s *Syncer) trySyncNextBlockWithState(ctx context.Context, event *common.DA
827827
s.p2pHandler.SetProcessedHeight(newState.LastBlockHeight)
828828
}
829829

830+
if event.Source == common.SourceDA {
831+
if cleaner, ok := s.daRetriever.(pendingDataCleaner); ok {
832+
cleaner.removePendingData(nextHeight)
833+
}
834+
}
835+
830836
return nil
831837
}
832838

block/internal/syncing/syncer_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -425,15 +425,18 @@ func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) {
425425
lastState := s.getLastState()
426426
data := makeData(gen.ChainID, 1, 0)
427427
_, hdr := makeSignedHeaderBytes(t, gen.ChainID, 1, addr, pub, signer, lastState.AppHash, data, nil)
428+
daRetriever := &daRetriever{pendingData: map[uint64]*types.Data{1: data}}
429+
s.daRetriever = daRetriever
428430

429431
// Expect ExecuteTxs call for height 1
430432
mockExec.EXPECT().ExecuteTxs(mock.Anything, mock.Anything, uint64(1), mock.Anything, lastState.AppHash).
431433
Return([]byte("app1"), nil).Once()
432434

433-
evt := common.DAHeightEvent{Header: hdr, Data: data, DaHeight: 1}
435+
evt := common.DAHeightEvent{Header: hdr, Data: data, Source: common.SourceDA, DaHeight: 1}
434436
s.processHeightEvent(t.Context(), &evt)
435437

436438
requireEmptyChan(t, errChan)
439+
require.NotContains(t, daRetriever.pendingData, uint64(1), "accepted DA data should be removed from the retriever pending data")
437440
h, err := st.Height(t.Context())
438441
require.NoError(t, err)
439442
assert.Equal(t, uint64(1), h)

0 commit comments

Comments
 (0)