From b7f36e0e4fbdb18513b48e5986bd4000a758dca0 Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Mon, 2 Mar 2026 17:26:02 -0500 Subject: [PATCH 01/14] feat(sensor): time first seen by sensor --- p2p/database/datastore.go | 104 +++++++++++++++++++++++++++----------- 1 file changed, 75 insertions(+), 29 deletions(-) diff --git a/p2p/database/datastore.go b/p2p/database/datastore.go index d1629b9c2..e3e364562 100644 --- a/p2p/database/datastore.go +++ b/p2p/database/datastore.go @@ -57,36 +57,38 @@ type DatastoreEvent struct { // DatastoreHeader stores the data in manner that can be easily written without // loss of precision. type DatastoreHeader struct { - ParentHash *datastore.Key - UncleHash string `datastore:",noindex"` - Coinbase string `datastore:",noindex"` - Root string `datastore:",noindex"` - TxHash string `datastore:",noindex"` - ReceiptHash string `datastore:",noindex"` - Bloom []byte `datastore:",noindex"` - Difficulty string `datastore:",noindex"` - Number string - GasLimit string `datastore:",noindex"` - GasUsed string - Time time.Time - Extra []byte `datastore:",noindex"` - MixDigest string `datastore:",noindex"` - Nonce string `datastore:",noindex"` - BaseFee string `datastore:",noindex"` - TimeFirstSeen time.Time - TTL time.Time - IsParent bool - SensorFirstSeen string + ParentHash *datastore.Key + UncleHash string `datastore:",noindex"` + Coinbase string `datastore:",noindex"` + Root string `datastore:",noindex"` + TxHash string `datastore:",noindex"` + ReceiptHash string `datastore:",noindex"` + Bloom []byte `datastore:",noindex"` + Difficulty string `datastore:",noindex"` + Number string + GasLimit string `datastore:",noindex"` + GasUsed string + Time time.Time + Extra []byte `datastore:",noindex"` + MixDigest string `datastore:",noindex"` + Nonce string `datastore:",noindex"` + BaseFee string `datastore:",noindex"` + TimeFirstSeen time.Time + TTL time.Time + IsParent bool + SensorFirstSeen string + TimeFirstSeenBySensor map[string]time.Time `datastore:",noindex"` } // DatastoreBlock represents a block stored in datastore. type DatastoreBlock struct { *DatastoreHeader - TotalDifficulty string `datastore:",noindex"` - Transactions []*datastore.Key `datastore:",noindex"` - Uncles []*datastore.Key `datastore:",noindex"` - TimeFirstSeenHash time.Time - SensorFirstSeenHash string + TotalDifficulty string `datastore:",noindex"` + Transactions []*datastore.Key `datastore:",noindex"` + Uncles []*datastore.Key `datastore:",noindex"` + TimeFirstSeenHash time.Time + SensorFirstSeenHash string + TimeFirstSeenHashBySensor map[string]time.Time `datastore:",noindex"` } // DatastoreTransaction represents a transaction stored in datastore. Data is @@ -249,17 +251,24 @@ func (d *Datastore) writeBlockHashFirstSeen(ctx context.Context, hash common.Has var block DatastoreBlock err := tx.Get(key, &block) + newSensorMap := map[string]time.Time{d.sensorID: tfsh} + // If block doesn't exist, create partial entry with just hash timing if err != nil { block.TimeFirstSeenHash = tfsh block.SensorFirstSeenHash = d.sensorID + block.TimeFirstSeenHashBySensor = newSensorMap _, err = tx.Put(key, &block) return err } - // If timestamp already set and not earlier, no update needed + // Always merge per-sensor map with current sensor's timestamp + block.TimeFirstSeenHashBySensor = mergeSensorTimes(block.TimeFirstSeenHashBySensor, newSensorMap) + + // If global timestamp already set and not earlier, just update per-sensor map if !block.TimeFirstSeenHash.IsZero() && !tfsh.Before(block.TimeFirstSeenHash) { - return nil + _, err = tx.Put(key, &block) + return err } // Update with earlier timestamp @@ -363,6 +372,19 @@ func (d *Datastore) HasBlock(ctx context.Context, hash common.Hash) bool { return err == nil && block.DatastoreHeader != nil } +// mergeSensorTimes merges per-sensor timestamps, keeping the earliest timestamp for each sensor. +func mergeSensorTimes(existing, new map[string]time.Time) map[string]time.Time { + if existing == nil { + existing = make(map[string]time.Time) + } + for sensorID, ts := range new { + if existingTs, ok := existing[sensorID]; !ok || ts.Before(existingTs) { + existing[sensorID] = ts + } + } + return existing +} + // newDatastoreHeader creates a DatastoreHeader from a types.Header. Some // values are converted into strings to prevent a loss of precision. func (d *Datastore) newDatastoreHeader(header *types.Header, tfs time.Time, isParent bool) *DatastoreHeader { @@ -387,6 +409,9 @@ func (d *Datastore) newDatastoreHeader(header *types.Header, tfs time.Time, isPa TTL: tfs.Add(d.ttl), IsParent: isParent, SensorFirstSeen: d.sensorID, + TimeFirstSeenBySensor: map[string]time.Time{ + d.sensorID: tfs, + }, } } @@ -400,11 +425,21 @@ func (d *Datastore) writeFirstSeen(header *DatastoreHeader, block *DatastoreBloc header.SensorFirstSeen = block.DatastoreHeader.SensorFirstSeen } + // Merge per-sensor header timing maps + if block.DatastoreHeader != nil { + header.TimeFirstSeenBySensor = mergeSensorTimes(block.DatastoreHeader.TimeFirstSeenBySensor, header.TimeFirstSeenBySensor) + } + // Set hash timing if it doesn't exist or if new timestamp is earlier if block.TimeFirstSeenHash.IsZero() || tfs.Before(block.TimeFirstSeenHash) { block.TimeFirstSeenHash = tfs block.SensorFirstSeenHash = d.sensorID } + + // Merge per-sensor hash timing maps and add current sensor + block.TimeFirstSeenHashBySensor = mergeSensorTimes(block.TimeFirstSeenHashBySensor, map[string]time.Time{ + d.sensorID: tfs, + }) } // newDatastoreTransaction creates a DatastoreTransaction from a types.Transaction. Some @@ -457,6 +492,7 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big. _ = tx.Get(key, &dsBlock) shouldWrite := false + newSensorMap := map[string]time.Time{d.sensorID: tfs} if dsBlock.DatastoreHeader == nil || tfs.Before(dsBlock.DatastoreHeader.TimeFirstSeen) { shouldWrite = true @@ -468,6 +504,11 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big. d.writeFirstSeen(header, &dsBlock, tfs) dsBlock.DatastoreHeader = header + } else { + // Even if not replacing header, still update per-sensor maps + dsBlock.DatastoreHeader.TimeFirstSeenBySensor = mergeSensorTimes(dsBlock.DatastoreHeader.TimeFirstSeenBySensor, newSensorMap) + dsBlock.TimeFirstSeenHashBySensor = mergeSensorTimes(dsBlock.TimeFirstSeenHashBySensor, newSensorMap) + shouldWrite = true } if len(dsBlock.TotalDifficulty) == 0 { @@ -560,9 +601,14 @@ func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header, var block DatastoreBlock err := tx.Get(key, &block) - // If block header already exists and new timestamp is not earlier, don't overwrite + newSensorMap := map[string]time.Time{d.sensorID: tfs} + + // If block header already exists and new timestamp is not earlier, just update per-sensor maps if err == nil && block.DatastoreHeader != nil && !tfs.Before(block.DatastoreHeader.TimeFirstSeen) { - return nil + block.DatastoreHeader.TimeFirstSeenBySensor = mergeSensorTimes(block.DatastoreHeader.TimeFirstSeenBySensor, newSensorMap) + block.TimeFirstSeenHashBySensor = mergeSensorTimes(block.TimeFirstSeenHashBySensor, newSensorMap) + _, err = tx.Put(key, &block) + return err } // Create new header with current timing From d018c73abb86ff1ea544e4cf56777efa8e0d0357 Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Wed, 4 Mar 2026 19:07:58 -0500 Subject: [PATCH 02/14] chore: improve mergeSensorTimes --- p2p/database/datastore.go | 87 +++++++++++++++++++-------------------- 1 file changed, 43 insertions(+), 44 deletions(-) diff --git a/p2p/database/datastore.go b/p2p/database/datastore.go index 301f05446..4cc48b8d7 100644 --- a/p2p/database/datastore.go +++ b/p2p/database/datastore.go @@ -57,38 +57,38 @@ type DatastoreEvent struct { // DatastoreHeader stores the data in manner that can be easily written without // loss of precision. type DatastoreHeader struct { - ParentHash *datastore.Key - UncleHash string `datastore:",noindex"` - Coinbase string `datastore:",noindex"` - Root string `datastore:",noindex"` - TxHash string `datastore:",noindex"` - ReceiptHash string `datastore:",noindex"` - Bloom []byte `datastore:",noindex"` - Difficulty string `datastore:",noindex"` - Number string - GasLimit string `datastore:",noindex"` - GasUsed string - Time time.Time - Extra []byte `datastore:",noindex"` - MixDigest string `datastore:",noindex"` - Nonce string `datastore:",noindex"` - BaseFee string `datastore:",noindex"` - TimeFirstSeen time.Time - TTL time.Time - IsParent bool - SensorFirstSeen string - TimeFirstSeenBySensor map[string]time.Time `datastore:",noindex"` + ParentHash *datastore.Key + UncleHash string `datastore:",noindex"` + Coinbase string `datastore:",noindex"` + Root string `datastore:",noindex"` + TxHash string `datastore:",noindex"` + ReceiptHash string `datastore:",noindex"` + Bloom []byte `datastore:",noindex"` + Difficulty string `datastore:",noindex"` + Number string + GasLimit string `datastore:",noindex"` + GasUsed string + Time time.Time + Extra []byte `datastore:",noindex"` + MixDigest string `datastore:",noindex"` + Nonce string `datastore:",noindex"` + BaseFee string `datastore:",noindex"` + TimeFirstSeen time.Time + TTL time.Time + IsParent bool + SensorFirstSeen string + TimeFirstSeenBySensor map[string]time.Time `datastore:",noindex"` } // DatastoreBlock represents a block stored in datastore. type DatastoreBlock struct { *DatastoreHeader - TotalDifficulty string `datastore:",noindex"` - Transactions []*datastore.Key `datastore:",noindex"` - Uncles []*datastore.Key `datastore:",noindex"` - TimeFirstSeenHash time.Time - SensorFirstSeenHash string - TimeFirstSeenHashBySensor map[string]time.Time `datastore:",noindex"` + TotalDifficulty string `datastore:",noindex"` + Transactions []*datastore.Key `datastore:",noindex"` + Uncles []*datastore.Key `datastore:",noindex"` + TimeFirstSeenHash time.Time + SensorFirstSeenHash string + TimeFirstSeenHashBySensor map[string]time.Time `datastore:",noindex"` } // DatastoreTransaction represents a transaction stored in datastore. Data is @@ -372,19 +372,6 @@ func (d *Datastore) HasBlock(ctx context.Context, hash common.Hash) bool { return err == nil && block.DatastoreHeader != nil } -// mergeSensorTimes merges per-sensor timestamps, keeping the earliest timestamp for each sensor. -func mergeSensorTimes(existing, new map[string]time.Time) map[string]time.Time { - if existing == nil { - existing = make(map[string]time.Time) - } - for sensorID, ts := range new { - if existingTs, ok := existing[sensorID]; !ok || ts.Before(existingTs) { - existing[sensorID] = ts - } - } - return existing -} - // newDatastoreHeader creates a DatastoreHeader from a types.Header. Some // values are converted into strings to prevent a loss of precision. func (d *Datastore) newDatastoreHeader(header *types.Header, tfs time.Time, isParent bool) *DatastoreHeader { @@ -427,7 +414,7 @@ func (d *Datastore) writeFirstSeen(header *DatastoreHeader, block *DatastoreBloc // Merge per-sensor header timing maps if block.DatastoreHeader != nil { - header.TimeFirstSeenBySensor = mergeSensorTimes(block.DatastoreHeader.TimeFirstSeenBySensor, header.TimeFirstSeenBySensor) + header.TimeFirstSeenBySensor = mergeSensorTimes(header.TimeFirstSeenBySensor, block.DatastoreHeader.TimeFirstSeenBySensor) } // Set hash timing if it doesn't exist or if new timestamp is earlier @@ -437,9 +424,7 @@ func (d *Datastore) writeFirstSeen(header *DatastoreHeader, block *DatastoreBloc } // Merge per-sensor hash timing maps and add current sensor - block.TimeFirstSeenHashBySensor = mergeSensorTimes(block.TimeFirstSeenHashBySensor, map[string]time.Time{ - d.sensorID: tfs, - }) + block.TimeFirstSeenHashBySensor = mergeSensorTimes(block.TimeFirstSeenHashBySensor, map[string]time.Time{d.sensorID: tfs}) } // newDatastoreTransaction creates a DatastoreTransaction from a types.Transaction. Some @@ -688,6 +673,20 @@ func (d *Datastore) writeTransactions(ctx context.Context, txs []*types.Transact } } +// mergeSensorTimes merges the provided maps, keeping the earliest timestamp +// for each key. Nil maps are safely ignored. +func mergeSensorTimes(maps ...map[string]time.Time) map[string]time.Time { + result := make(map[string]time.Time) + for _, m := range maps { + for key, ts := range m { + if existing, ok := result[key]; !ok || ts.Before(existing) { + result[key] = ts + } + } + } + return result +} + func (d *Datastore) NodeList(ctx context.Context, limit int) ([]string, error) { query := datastore.NewQuery(BlockEventsKind).Order("-Time") iter := d.client.Run(ctx, query) From 7501bdbb385a59cee8e4e3f97da4e1be9efc1b76 Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Wed, 4 Mar 2026 19:39:52 -0500 Subject: [PATCH 03/14] fix: simplify logic --- p2p/database/datastore.go | 101 ++++++++++++++++---------------------- p2p/protocol.go | 35 +++++++------ 2 files changed, 64 insertions(+), 72 deletions(-) diff --git a/p2p/database/datastore.go b/p2p/database/datastore.go index 4cc48b8d7..dfb3fbb53 100644 --- a/p2p/database/datastore.go +++ b/p2p/database/datastore.go @@ -402,31 +402,6 @@ func (d *Datastore) newDatastoreHeader(header *types.Header, tfs time.Time, isPa } } -// writeFirstSeen updates timing fields on a header and block, preserving earlier timestamps. -func (d *Datastore) writeFirstSeen(header *DatastoreHeader, block *DatastoreBlock, tfs time.Time) { - // Preserve earlier header timing if it exists - if block.DatastoreHeader != nil && - !block.TimeFirstSeen.IsZero() && - block.TimeFirstSeen.Before(tfs) { - header.TimeFirstSeen = block.TimeFirstSeen - header.SensorFirstSeen = block.SensorFirstSeen - } - - // Merge per-sensor header timing maps - if block.DatastoreHeader != nil { - header.TimeFirstSeenBySensor = mergeSensorTimes(header.TimeFirstSeenBySensor, block.DatastoreHeader.TimeFirstSeenBySensor) - } - - // Set hash timing if it doesn't exist or if new timestamp is earlier - if block.TimeFirstSeenHash.IsZero() || tfs.Before(block.TimeFirstSeenHash) { - block.TimeFirstSeenHash = tfs - block.SensorFirstSeenHash = d.sensorID - } - - // Merge per-sensor hash timing maps and add current sensor - block.TimeFirstSeenHashBySensor = mergeSensorTimes(block.TimeFirstSeenHashBySensor, map[string]time.Time{d.sensorID: tfs}) -} - // newDatastoreTransaction creates a DatastoreTransaction from a types.Transaction. Some // values are converted into strings to prevent a loss of precision. func (d *Datastore) newDatastoreTransaction(tx *types.Transaction, tfs time.Time) *DatastoreTransaction { @@ -476,33 +451,38 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big. // are nil we will just set them. _ = tx.Get(key, &dsBlock) - shouldWrite := false newSensorMap := map[string]time.Time{d.sensorID: tfs} if dsBlock.DatastoreHeader == nil || tfs.Before(dsBlock.TimeFirstSeen) { - shouldWrite = true - // Create new header with current timing header := d.newDatastoreHeader(block.Header(), tfs, false) - // Preserve earlier timestamps from any earlier announcement - d.writeFirstSeen(header, &dsBlock, tfs) + // Preserve earlier global timestamps from any earlier announcement + if dsBlock.DatastoreHeader != nil && + !dsBlock.TimeFirstSeen.IsZero() && + dsBlock.TimeFirstSeen.Before(tfs) { + header.TimeFirstSeen = dsBlock.TimeFirstSeen + header.SensorFirstSeen = dsBlock.SensorFirstSeen + } + + // Set hash timing if it doesn't exist or if new timestamp is earlier + if dsBlock.TimeFirstSeenHash.IsZero() || tfs.Before(dsBlock.TimeFirstSeenHash) { + dsBlock.TimeFirstSeenHash = tfs + dsBlock.SensorFirstSeenHash = d.sensorID + } dsBlock.DatastoreHeader = header - } else { - // Even if not replacing header, still update per-sensor maps - dsBlock.DatastoreHeader.TimeFirstSeenBySensor = mergeSensorTimes(dsBlock.DatastoreHeader.TimeFirstSeenBySensor, newSensorMap) - dsBlock.TimeFirstSeenHashBySensor = mergeSensorTimes(dsBlock.TimeFirstSeenHashBySensor, newSensorMap) - shouldWrite = true } + // Always merge per-sensor maps + dsBlock.DatastoreHeader.TimeFirstSeenBySensor = mergeSensorTimes(dsBlock.DatastoreHeader.TimeFirstSeenBySensor, newSensorMap) + dsBlock.TimeFirstSeenHashBySensor = mergeSensorTimes(dsBlock.TimeFirstSeenHashBySensor, newSensorMap) + if len(dsBlock.TotalDifficulty) == 0 { - shouldWrite = true dsBlock.TotalDifficulty = td.String() } if dsBlock.Transactions == nil && len(block.Transactions()) > 0 { - shouldWrite = true if d.shouldWriteTransactions { d.writeTransactions(ctx, block.Transactions(), tfs) } @@ -514,7 +494,6 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big. } if dsBlock.Uncles == nil && len(block.Uncles()) > 0 { - shouldWrite = true dsBlock.Uncles = make([]*datastore.Key, 0, len(block.Uncles())) for _, uncle := range block.Uncles() { d.writeBlockHeader(ctx, uncle, tfs, false) @@ -522,12 +501,8 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big. } } - if shouldWrite { - _, err := tx.Put(key, &dsBlock) - return err - } - - return nil + _, err := tx.Put(key, &dsBlock) + return err }, datastore.MaxAttempts(MaxAttempts)) if err != nil { @@ -584,26 +559,36 @@ func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header, _, err := d.client.RunInTransaction(ctx, func(tx *datastore.Transaction) error { var block DatastoreBlock - err := tx.Get(key, &block) + _ = tx.Get(key, &block) newSensorMap := map[string]time.Time{d.sensorID: tfs} - // If block header already exists and new timestamp is not earlier, just update per-sensor maps - if err == nil && block.DatastoreHeader != nil && !tfs.Before(block.DatastoreHeader.TimeFirstSeen) { - block.DatastoreHeader.TimeFirstSeenBySensor = mergeSensorTimes(block.DatastoreHeader.TimeFirstSeenBySensor, newSensorMap) - block.TimeFirstSeenHashBySensor = mergeSensorTimes(block.TimeFirstSeenHashBySensor, newSensorMap) - _, err = tx.Put(key, &block) - return err - } + // Create or replace header if it doesn't exist or if timestamp is earlier + if block.DatastoreHeader == nil || tfs.Before(block.DatastoreHeader.TimeFirstSeen) { + newHeader := d.newDatastoreHeader(header, tfs, isParent) - // Create new header with current timing - newHeader := d.newDatastoreHeader(header, tfs, isParent) + // Preserve earlier global timestamps + if block.DatastoreHeader != nil && + !block.TimeFirstSeen.IsZero() && + block.TimeFirstSeen.Before(tfs) { + newHeader.TimeFirstSeen = block.TimeFirstSeen + newHeader.SensorFirstSeen = block.SensorFirstSeen + } - // Preserve earlier timestamps from any earlier announcement or full block - d.writeFirstSeen(newHeader, &block, tfs) + // Set hash timing if it doesn't exist or if new timestamp is earlier + if block.TimeFirstSeenHash.IsZero() || tfs.Before(block.TimeFirstSeenHash) { + block.TimeFirstSeenHash = tfs + block.SensorFirstSeenHash = d.sensorID + } - block.DatastoreHeader = newHeader - _, err = tx.Put(key, &block) + block.DatastoreHeader = newHeader + } + + // Always merge per-sensor maps + block.DatastoreHeader.TimeFirstSeenBySensor = mergeSensorTimes(block.DatastoreHeader.TimeFirstSeenBySensor, newSensorMap) + block.TimeFirstSeenHashBySensor = mergeSensorTimes(block.TimeFirstSeenHashBySensor, newSensorMap) + + _, err := tx.Put(key, &block) return err }, datastore.MaxAttempts(MaxAttempts)) diff --git a/p2p/protocol.go b/p2p/protocol.go index 7721d0373..ecd599311 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -20,8 +20,8 @@ import ( "github.com/rs/zerolog" "github.com/rs/zerolog/log" - ds "github.com/0xPolygon/polygon-cli/p2p/datastructures" "github.com/0xPolygon/polygon-cli/p2p/database" + ds "github.com/0xPolygon/polygon-cli/p2p/datastructures" ) const ( @@ -1009,24 +1009,31 @@ func (c *conn) handleNewBlock(ctx context.Context, msg ethp2p.Msg) error { return err } - // Check if we already have the full block in the cache - if cache, ok := c.conns.Blocks().Peek(hash); ok && cache.TD != nil { + // Atomically check and add to cache to prevent duplicate writes from + // concurrent peers receiving the same block. + update := false + c.conns.Blocks().Update(hash, func(existing BlockCache) BlockCache { + if existing.TD != nil { + return existing // Already have full block + } + update = true + return BlockCache{ + Header: packet.Block.Header(), + Body: ð.BlockBody{ + Transactions: packet.Block.Transactions(), + Uncles: packet.Block.Uncles(), + Withdrawals: packet.Block.Withdrawals(), + }, + TD: packet.TD, + } + }) + + if !update { return nil } c.db.WriteBlock(ctx, c.node, packet.Block, packet.TD, tfs) - // Update cache to store the full block - c.conns.Blocks().Add(hash, BlockCache{ - Header: packet.Block.Header(), - Body: ð.BlockBody{ - Transactions: packet.Block.Transactions(), - Uncles: packet.Block.Uncles(), - Withdrawals: packet.Block.Withdrawals(), - }, - TD: packet.TD, - }) - // Broadcast block or block hash to other peers asynchronously go c.conns.BroadcastBlock(packet.Block, packet.TD) go c.conns.BroadcastBlockHashes( From 69bd29e12ff8b8a9dc5a2bddfa6ad9d08016f515 Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Thu, 5 Mar 2026 11:27:23 -0500 Subject: [PATCH 04/14] fix: use slice instead of map --- p2p/database/datastore.go | 74 ++++++++++++++++++++++++--------------- 1 file changed, 45 insertions(+), 29 deletions(-) diff --git a/p2p/database/datastore.go b/p2p/database/datastore.go index dfb3fbb53..9f3c50f90 100644 --- a/p2p/database/datastore.go +++ b/p2p/database/datastore.go @@ -54,6 +54,15 @@ type DatastoreEvent struct { TTL time.Time } +// SensorTime records when a specific sensor first saw something. +type SensorTime struct { + SensorID string + Time time.Time +} + +// SensorTimes is a slice of SensorTime entries. +type SensorTimes = []SensorTime + // DatastoreHeader stores the data in manner that can be easily written without // loss of precision. type DatastoreHeader struct { @@ -77,7 +86,7 @@ type DatastoreHeader struct { TTL time.Time IsParent bool SensorFirstSeen string - TimeFirstSeenBySensor map[string]time.Time `datastore:",noindex"` + TimeFirstSeenBySensor SensorTimes `datastore:",noindex"` } // DatastoreBlock represents a block stored in datastore. @@ -88,7 +97,7 @@ type DatastoreBlock struct { Uncles []*datastore.Key `datastore:",noindex"` TimeFirstSeenHash time.Time SensorFirstSeenHash string - TimeFirstSeenHashBySensor map[string]time.Time `datastore:",noindex"` + TimeFirstSeenHashBySensor SensorTimes `datastore:",noindex"` } // DatastoreTransaction represents a transaction stored in datastore. Data is @@ -251,19 +260,19 @@ func (d *Datastore) writeBlockHashFirstSeen(ctx context.Context, hash common.Has var block DatastoreBlock err := tx.Get(key, &block) - newSensorMap := map[string]time.Time{d.sensorID: tfsh} + newSensorTime := SensorTimes{{SensorID: d.sensorID, Time: tfsh}} // If block doesn't exist, create partial entry with just hash timing if err != nil { block.TimeFirstSeenHash = tfsh block.SensorFirstSeenHash = d.sensorID - block.TimeFirstSeenHashBySensor = newSensorMap + block.TimeFirstSeenHashBySensor = newSensorTime _, err = tx.Put(key, &block) return err } - // Always merge per-sensor map with current sensor's timestamp - block.TimeFirstSeenHashBySensor = mergeSensorTimes(block.TimeFirstSeenHashBySensor, newSensorMap) + // Always merge per-sensor times with current sensor's timestamp + block.TimeFirstSeenHashBySensor = mergeSensorTimes(block.TimeFirstSeenHashBySensor, newSensorTime) // If global timestamp already set and not earlier, just update per-sensor map if !block.TimeFirstSeenHash.IsZero() && !tfsh.Before(block.TimeFirstSeenHash) { @@ -392,13 +401,11 @@ func (d *Datastore) newDatastoreHeader(header *types.Header, tfs time.Time, isPa MixDigest: header.MixDigest.String(), Nonce: fmt.Sprint(header.Nonce.Uint64()), BaseFee: header.BaseFee.String(), - TimeFirstSeen: tfs, - TTL: tfs.Add(d.ttl), - IsParent: isParent, - SensorFirstSeen: d.sensorID, - TimeFirstSeenBySensor: map[string]time.Time{ - d.sensorID: tfs, - }, + TimeFirstSeen: tfs, + TTL: tfs.Add(d.ttl), + IsParent: isParent, + SensorFirstSeen: d.sensorID, + TimeFirstSeenBySensor: SensorTimes{{SensorID: d.sensorID, Time: tfs}}, } } @@ -451,7 +458,7 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big. // are nil we will just set them. _ = tx.Get(key, &dsBlock) - newSensorMap := map[string]time.Time{d.sensorID: tfs} + newSensorTime := SensorTimes{{SensorID: d.sensorID, Time: tfs}} if dsBlock.DatastoreHeader == nil || tfs.Before(dsBlock.TimeFirstSeen) { // Create new header with current timing @@ -474,9 +481,9 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big. dsBlock.DatastoreHeader = header } - // Always merge per-sensor maps - dsBlock.DatastoreHeader.TimeFirstSeenBySensor = mergeSensorTimes(dsBlock.DatastoreHeader.TimeFirstSeenBySensor, newSensorMap) - dsBlock.TimeFirstSeenHashBySensor = mergeSensorTimes(dsBlock.TimeFirstSeenHashBySensor, newSensorMap) + // Always merge per-sensor times + dsBlock.DatastoreHeader.TimeFirstSeenBySensor = mergeSensorTimes(dsBlock.DatastoreHeader.TimeFirstSeenBySensor, newSensorTime) + dsBlock.TimeFirstSeenHashBySensor = mergeSensorTimes(dsBlock.TimeFirstSeenHashBySensor, newSensorTime) if len(dsBlock.TotalDifficulty) == 0 { dsBlock.TotalDifficulty = td.String() @@ -561,7 +568,7 @@ func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header, var block DatastoreBlock _ = tx.Get(key, &block) - newSensorMap := map[string]time.Time{d.sensorID: tfs} + newSensorTime := SensorTimes{{SensorID: d.sensorID, Time: tfs}} // Create or replace header if it doesn't exist or if timestamp is earlier if block.DatastoreHeader == nil || tfs.Before(block.DatastoreHeader.TimeFirstSeen) { @@ -584,9 +591,9 @@ func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header, block.DatastoreHeader = newHeader } - // Always merge per-sensor maps - block.DatastoreHeader.TimeFirstSeenBySensor = mergeSensorTimes(block.DatastoreHeader.TimeFirstSeenBySensor, newSensorMap) - block.TimeFirstSeenHashBySensor = mergeSensorTimes(block.TimeFirstSeenHashBySensor, newSensorMap) + // Always merge per-sensor times + block.DatastoreHeader.TimeFirstSeenBySensor = mergeSensorTimes(block.DatastoreHeader.TimeFirstSeenBySensor, newSensorTime) + block.TimeFirstSeenHashBySensor = mergeSensorTimes(block.TimeFirstSeenHashBySensor, newSensorTime) _, err := tx.Put(key, &block) return err @@ -658,17 +665,26 @@ func (d *Datastore) writeTransactions(ctx context.Context, txs []*types.Transact } } -// mergeSensorTimes merges the provided maps, keeping the earliest timestamp -// for each key. Nil maps are safely ignored. -func mergeSensorTimes(maps ...map[string]time.Time) map[string]time.Time { - result := make(map[string]time.Time) - for _, m := range maps { - for key, ts := range m { - if existing, ok := result[key]; !ok || ts.Before(existing) { - result[key] = ts +// mergeSensorTimes merges the provided slices, keeping the earliest timestamp +// for each sensor. Nil slices are safely ignored. +func mergeSensorTimes(slices ...SensorTimes) SensorTimes { + seen := make(map[string]int) // sensorID -> index in result + var result SensorTimes + + for _, slice := range slices { + for _, st := range slice { + if idx, ok := seen[st.SensorID]; ok { + // Keep earlier timestamp + if st.Time.Before(result[idx].Time) { + result[idx].Time = st.Time + } + } else { + seen[st.SensorID] = len(result) + result = append(result, st) } } } + return result } From 1082c9d02c6989e8886f460bd06bba895479e104 Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Tue, 10 Mar 2026 22:11:08 -0400 Subject: [PATCH 05/14] feat: --write-first-block-event flag --- cmd/p2p/sensor/sensor.go | 5 ++ p2p/database/database.go | 3 +- p2p/database/datastore.go | 98 +++++++++++---------------------------- p2p/database/json.go | 7 ++- p2p/database/nodb.go | 7 ++- p2p/protocol.go | 2 +- 6 files changed, 48 insertions(+), 74 deletions(-) diff --git a/cmd/p2p/sensor/sensor.go b/cmd/p2p/sensor/sensor.go index 5daa5f397..b1bd8d791 100644 --- a/cmd/p2p/sensor/sensor.go +++ b/cmd/p2p/sensor/sensor.go @@ -50,6 +50,7 @@ type ( MaxDatabaseConcurrency int ShouldWriteBlocks bool ShouldWriteBlockEvents bool + ShouldWriteFirstBlockEvent bool ShouldWriteTransactions bool ShouldWriteTransactionEvents bool ShouldWritePeers bool @@ -438,6 +439,7 @@ func newDatabase(ctx context.Context) (database.Database, error) { MaxConcurrency: inputSensorParams.MaxDatabaseConcurrency, ShouldWriteBlocks: inputSensorParams.ShouldWriteBlocks, ShouldWriteBlockEvents: inputSensorParams.ShouldWriteBlockEvents, + ShouldWriteFirstBlockEvent: inputSensorParams.ShouldWriteFirstBlockEvent, ShouldWriteTransactions: inputSensorParams.ShouldWriteTransactions, ShouldWriteTransactionEvents: inputSensorParams.ShouldWriteTransactionEvents, ShouldWritePeers: inputSensorParams.ShouldWritePeers, @@ -476,6 +478,9 @@ func init() { will result in less chance of missing data but can significantly increase memory usage)`) f.BoolVarP(&inputSensorParams.ShouldWriteBlocks, "write-blocks", "B", true, "write blocks to database") f.BoolVar(&inputSensorParams.ShouldWriteBlockEvents, "write-block-events", true, "write block events to database") + f.BoolVar(&inputSensorParams.ShouldWriteFirstBlockEvent, "write-first-block-event", false, + "write one block event per block on first-seen (alternative to --write-block-events)") + SensorCmd.MarkFlagsMutuallyExclusive("write-block-events", "write-first-block-event") f.BoolVarP(&inputSensorParams.ShouldWriteTransactions, "write-txs", "t", true, `write transactions to database (this option can significantly increase CPU and memory usage)`) f.BoolVar(&inputSensorParams.ShouldWriteTransactionEvents, "write-tx-events", true, diff --git a/p2p/database/database.go b/p2p/database/database.go index b6fc60d03..dc1cd72d6 100644 --- a/p2p/database/database.go +++ b/p2p/database/database.go @@ -30,7 +30,7 @@ type Database interface { // WriteBlockHashFirstSeen writes a partial block entry with just the hash // first seen time if the block doesn't exist yet. - WriteBlockHashFirstSeen(context.Context, common.Hash, time.Time) + WriteBlockHashFirstSeen(context.Context, *enode.Node, common.Hash, time.Time) // WriteBlockBody will write the block bodies if ShouldWriteBlocks returns // true. @@ -51,6 +51,7 @@ type Database interface { MaxConcurrentWrites() int ShouldWriteBlocks() bool ShouldWriteBlockEvents() bool + ShouldWriteFirstBlockEvent() bool ShouldWriteTransactions() bool ShouldWriteTransactionEvents() bool ShouldWritePeers() bool diff --git a/p2p/database/datastore.go b/p2p/database/datastore.go index 9f3c50f90..298c62825 100644 --- a/p2p/database/datastore.go +++ b/p2p/database/datastore.go @@ -35,6 +35,7 @@ type Datastore struct { maxConcurrency int shouldWriteBlocks bool shouldWriteBlockEvents bool + shouldWriteFirstBlockEvent bool shouldWriteTransactions bool shouldWriteTransactionEvents bool shouldWritePeers bool @@ -54,15 +55,6 @@ type DatastoreEvent struct { TTL time.Time } -// SensorTime records when a specific sensor first saw something. -type SensorTime struct { - SensorID string - Time time.Time -} - -// SensorTimes is a slice of SensorTime entries. -type SensorTimes = []SensorTime - // DatastoreHeader stores the data in manner that can be easily written without // loss of precision. type DatastoreHeader struct { @@ -82,22 +74,20 @@ type DatastoreHeader struct { MixDigest string `datastore:",noindex"` Nonce string `datastore:",noindex"` BaseFee string `datastore:",noindex"` - TimeFirstSeen time.Time - TTL time.Time - IsParent bool - SensorFirstSeen string - TimeFirstSeenBySensor SensorTimes `datastore:",noindex"` + TimeFirstSeen time.Time + TTL time.Time + IsParent bool + SensorFirstSeen string } // DatastoreBlock represents a block stored in datastore. type DatastoreBlock struct { *DatastoreHeader - TotalDifficulty string `datastore:",noindex"` - Transactions []*datastore.Key `datastore:",noindex"` - Uncles []*datastore.Key `datastore:",noindex"` - TimeFirstSeenHash time.Time - SensorFirstSeenHash string - TimeFirstSeenHashBySensor SensorTimes `datastore:",noindex"` + TotalDifficulty string `datastore:",noindex"` + Transactions []*datastore.Key `datastore:",noindex"` + Uncles []*datastore.Key `datastore:",noindex"` + TimeFirstSeenHash time.Time + SensorFirstSeenHash string } // DatastoreTransaction represents a transaction stored in datastore. Data is @@ -138,6 +128,7 @@ type DatastoreOptions struct { MaxConcurrency int ShouldWriteBlocks bool ShouldWriteBlockEvents bool + ShouldWriteFirstBlockEvent bool ShouldWriteTransactions bool ShouldWriteTransactionEvents bool ShouldWritePeers bool @@ -159,6 +150,7 @@ func NewDatastore(ctx context.Context, opts DatastoreOptions) Database { maxConcurrency: opts.MaxConcurrency, shouldWriteBlocks: opts.ShouldWriteBlocks, shouldWriteBlockEvents: opts.ShouldWriteBlockEvents, + shouldWriteFirstBlockEvent: opts.ShouldWriteFirstBlockEvent, shouldWriteTransactions: opts.ShouldWriteTransactions, shouldWriteTransactionEvents: opts.ShouldWriteTransactionEvents, shouldWritePeers: opts.ShouldWritePeers, @@ -242,42 +234,40 @@ func (d *Datastore) WriteBlockHashes(ctx context.Context, peer *enode.Node, hash // WriteBlockHashFirstSeen writes a partial block entry with just the hash // first seen time if the block doesn't exist yet. If it exists, updates the // TimeFirstSeenHash if the new time is earlier. -func (d *Datastore) WriteBlockHashFirstSeen(ctx context.Context, hash common.Hash, tfsh time.Time) { +func (d *Datastore) WriteBlockHashFirstSeen(ctx context.Context, peer *enode.Node, hash common.Hash, tfsh time.Time) { if d.client == nil || !d.ShouldWriteBlocks() { return } d.runAsync(func() { - d.writeBlockHashFirstSeen(ctx, hash, tfsh) + d.writeBlockHashFirstSeen(ctx, peer, hash, tfsh) }) } // writeBlockHashFirstSeen performs the actual transaction to write or update the block hash first seen time. -func (d *Datastore) writeBlockHashFirstSeen(ctx context.Context, hash common.Hash, tfsh time.Time) { +func (d *Datastore) writeBlockHashFirstSeen(ctx context.Context, peer *enode.Node, hash common.Hash, tfsh time.Time) { + // Write block event if flag enabled (cache check in protocol.go already verified first-seen) + if d.shouldWriteFirstBlockEvent && peer != nil { + d.writeEvent(peer, BlockEventsKind, hash, BlocksKind, tfsh) + } + key := datastore.NameKey(BlocksKind, hash.Hex(), nil) _, err := d.client.RunInTransaction(ctx, func(tx *datastore.Transaction) error { var block DatastoreBlock err := tx.Get(key, &block) - newSensorTime := SensorTimes{{SensorID: d.sensorID, Time: tfsh}} - // If block doesn't exist, create partial entry with just hash timing if err != nil { block.TimeFirstSeenHash = tfsh block.SensorFirstSeenHash = d.sensorID - block.TimeFirstSeenHashBySensor = newSensorTime _, err = tx.Put(key, &block) return err } - // Always merge per-sensor times with current sensor's timestamp - block.TimeFirstSeenHashBySensor = mergeSensorTimes(block.TimeFirstSeenHashBySensor, newSensorTime) - - // If global timestamp already set and not earlier, just update per-sensor map + // If timestamp already set and not earlier, no update needed if !block.TimeFirstSeenHash.IsZero() && !tfsh.Before(block.TimeFirstSeenHash) { - _, err = tx.Put(key, &block) - return err + return nil } // Update with earlier timestamp @@ -401,11 +391,10 @@ func (d *Datastore) newDatastoreHeader(header *types.Header, tfs time.Time, isPa MixDigest: header.MixDigest.String(), Nonce: fmt.Sprint(header.Nonce.Uint64()), BaseFee: header.BaseFee.String(), - TimeFirstSeen: tfs, - TTL: tfs.Add(d.ttl), - IsParent: isParent, - SensorFirstSeen: d.sensorID, - TimeFirstSeenBySensor: SensorTimes{{SensorID: d.sensorID, Time: tfs}}, + TimeFirstSeen: tfs, + TTL: tfs.Add(d.ttl), + IsParent: isParent, + SensorFirstSeen: d.sensorID, } } @@ -458,8 +447,6 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big. // are nil we will just set them. _ = tx.Get(key, &dsBlock) - newSensorTime := SensorTimes{{SensorID: d.sensorID, Time: tfs}} - if dsBlock.DatastoreHeader == nil || tfs.Before(dsBlock.TimeFirstSeen) { // Create new header with current timing header := d.newDatastoreHeader(block.Header(), tfs, false) @@ -481,10 +468,6 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big. dsBlock.DatastoreHeader = header } - // Always merge per-sensor times - dsBlock.DatastoreHeader.TimeFirstSeenBySensor = mergeSensorTimes(dsBlock.DatastoreHeader.TimeFirstSeenBySensor, newSensorTime) - dsBlock.TimeFirstSeenHashBySensor = mergeSensorTimes(dsBlock.TimeFirstSeenHashBySensor, newSensorTime) - if len(dsBlock.TotalDifficulty) == 0 { dsBlock.TotalDifficulty = td.String() } @@ -568,8 +551,6 @@ func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header, var block DatastoreBlock _ = tx.Get(key, &block) - newSensorTime := SensorTimes{{SensorID: d.sensorID, Time: tfs}} - // Create or replace header if it doesn't exist or if timestamp is earlier if block.DatastoreHeader == nil || tfs.Before(block.DatastoreHeader.TimeFirstSeen) { newHeader := d.newDatastoreHeader(header, tfs, isParent) @@ -591,10 +572,6 @@ func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header, block.DatastoreHeader = newHeader } - // Always merge per-sensor times - block.DatastoreHeader.TimeFirstSeenBySensor = mergeSensorTimes(block.DatastoreHeader.TimeFirstSeenBySensor, newSensorTime) - block.TimeFirstSeenHashBySensor = mergeSensorTimes(block.TimeFirstSeenHashBySensor, newSensorTime) - _, err := tx.Put(key, &block) return err }, datastore.MaxAttempts(MaxAttempts)) @@ -665,27 +642,8 @@ func (d *Datastore) writeTransactions(ctx context.Context, txs []*types.Transact } } -// mergeSensorTimes merges the provided slices, keeping the earliest timestamp -// for each sensor. Nil slices are safely ignored. -func mergeSensorTimes(slices ...SensorTimes) SensorTimes { - seen := make(map[string]int) // sensorID -> index in result - var result SensorTimes - - for _, slice := range slices { - for _, st := range slice { - if idx, ok := seen[st.SensorID]; ok { - // Keep earlier timestamp - if st.Time.Before(result[idx].Time) { - result[idx].Time = st.Time - } - } else { - seen[st.SensorID] = len(result) - result = append(result, st) - } - } - } - - return result +func (d *Datastore) ShouldWriteFirstBlockEvent() bool { + return d.shouldWriteFirstBlockEvent } func (d *Datastore) NodeList(ctx context.Context, limit int) ([]string, error) { diff --git a/p2p/database/json.go b/p2p/database/json.go index 8d5c0e143..a62902413 100644 --- a/p2p/database/json.go +++ b/p2p/database/json.go @@ -238,7 +238,7 @@ func (j *JSONDatabase) WriteBlockHashes(ctx context.Context, peer *enode.Node, h // WriteBlockHashFirstSeen writes a partial block entry with just the hash // first seen time. For JSON output, this writes a separate record type. -func (j *JSONDatabase) WriteBlockHashFirstSeen(ctx context.Context, hash common.Hash, tfsh time.Time) { +func (j *JSONDatabase) WriteBlockHashFirstSeen(ctx context.Context, peer *enode.Node, hash common.Hash, tfsh time.Time) { if !j.ShouldWriteBlocks() { return } @@ -374,6 +374,11 @@ func (j *JSONDatabase) ShouldWriteBlockEvents() bool { return j.shouldWriteBlockEvents } +// ShouldWriteFirstBlockEvent returns false for JSON database. +func (j *JSONDatabase) ShouldWriteFirstBlockEvent() bool { + return false +} + // ShouldWriteTransactions returns the configured value. func (j *JSONDatabase) ShouldWriteTransactions() bool { return j.shouldWriteTransactions diff --git a/p2p/database/nodb.go b/p2p/database/nodb.go index b6223ba2a..1bc003980 100644 --- a/p2p/database/nodb.go +++ b/p2p/database/nodb.go @@ -34,7 +34,7 @@ func (n *nodb) WriteBlockHashes(ctx context.Context, peer *enode.Node, hashes [] } // WriteBlockHashFirstSeen does nothing. -func (n *nodb) WriteBlockHashFirstSeen(ctx context.Context, hash common.Hash, tfsh time.Time) { +func (n *nodb) WriteBlockHashFirstSeen(ctx context.Context, peer *enode.Node, hash common.Hash, tfsh time.Time) { } // WriteBlockBody does nothing. @@ -69,6 +69,11 @@ func (n *nodb) ShouldWriteBlockEvents() bool { return false } +// ShouldWriteFirstBlockEvent returns false. +func (n *nodb) ShouldWriteFirstBlockEvent() bool { + return false +} + // ShouldWriteTransactions returns false. func (n *nodb) ShouldWriteTransactions() bool { return false diff --git a/p2p/protocol.go b/p2p/protocol.go index ecd599311..1ee9f60d7 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -539,7 +539,7 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error { } // Write hash first seen time immediately for new blocks - c.db.WriteBlockHashFirstSeen(ctx, hash, tfs) + c.db.WriteBlockHashFirstSeen(ctx, c.node, hash, tfs) // Request only the parts we don't have if err := c.getBlockData(hash, cache, false); err != nil { From 85dd55f5885890b9407c5c1d81bb10f439033e1c Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Tue, 10 Mar 2026 22:31:27 -0400 Subject: [PATCH 06/14] chore: go fmt --- cmd/p2p/sensor/api.go | 1 - cmd/p2p/sensor/sensor.go | 2 +- cmd/root.go | 2 +- loadtest/gasmanager/wave.go | 8 ++++---- p2p/database/datastore.go | 32 ++++++++++++++--------------- p2p/datastructures/bloomset_test.go | 1 - p2p/types.go | 1 - 7 files changed, 22 insertions(+), 25 deletions(-) diff --git a/cmd/p2p/sensor/api.go b/cmd/p2p/sensor/api.go index fb68eb208..7d1062322 100644 --- a/cmd/p2p/sensor/api.go +++ b/cmd/p2p/sensor/api.go @@ -123,4 +123,3 @@ func handleAPI(server *ethp2p.Server, conns *p2p.Conns) { log.Error().Err(err).Msg("Failed to start API handler") } } - diff --git a/cmd/p2p/sensor/sensor.go b/cmd/p2p/sensor/sensor.go index b1bd8d791..102639efd 100644 --- a/cmd/p2p/sensor/sensor.go +++ b/cmd/p2p/sensor/sensor.go @@ -29,10 +29,10 @@ import ( "github.com/rs/zerolog/log" "github.com/spf13/cobra" - ds "github.com/0xPolygon/polygon-cli/p2p/datastructures" "github.com/0xPolygon/polygon-cli/flag" "github.com/0xPolygon/polygon-cli/p2p" "github.com/0xPolygon/polygon-cli/p2p/database" + ds "github.com/0xPolygon/polygon-cli/p2p/datastructures" "github.com/0xPolygon/polygon-cli/rpctypes" ) diff --git a/cmd/root.go b/cmd/root.go index b6515cc6f..493c024de 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -29,12 +29,12 @@ import ( "github.com/0xPolygon/polygon-cli/cmd/p2p" "github.com/0xPolygon/polygon-cli/cmd/parsebatchl2data" "github.com/0xPolygon/polygon-cli/cmd/parseethwallet" + "github.com/0xPolygon/polygon-cli/cmd/plot" "github.com/0xPolygon/polygon-cli/cmd/publish" "github.com/0xPolygon/polygon-cli/cmd/report" "github.com/0xPolygon/polygon-cli/cmd/retest" "github.com/0xPolygon/polygon-cli/cmd/rpcfuzz" "github.com/0xPolygon/polygon-cli/cmd/signer" - "github.com/0xPolygon/polygon-cli/cmd/plot" "github.com/0xPolygon/polygon-cli/cmd/ulxly" "github.com/0xPolygon/polygon-cli/cmd/version" "github.com/0xPolygon/polygon-cli/cmd/wallet" diff --git a/loadtest/gasmanager/wave.go b/loadtest/gasmanager/wave.go index 9cee2db36..6d4bfe441 100644 --- a/loadtest/gasmanager/wave.go +++ b/loadtest/gasmanager/wave.go @@ -70,11 +70,11 @@ func (w *wave) MoveNext() { } } -func (w *wave) Y() float64 { return w.points[w.x] } -func (w *wave) X() float64 { return w.x } -func (w *wave) Period() uint64 { return w.config.Period } +func (w *wave) Y() float64 { return w.points[w.x] } +func (w *wave) X() float64 { return w.x } +func (w *wave) Period() uint64 { return w.config.Period } func (w *wave) Amplitude() uint64 { return w.config.Amplitude } -func (w *wave) Target() uint64 { return w.config.Target } +func (w *wave) Target() uint64 { return w.config.Target } // Wave computation functions diff --git a/p2p/database/datastore.go b/p2p/database/datastore.go index 298c62825..58e2924fc 100644 --- a/p2p/database/datastore.go +++ b/p2p/database/datastore.go @@ -58,22 +58,22 @@ type DatastoreEvent struct { // DatastoreHeader stores the data in manner that can be easily written without // loss of precision. type DatastoreHeader struct { - ParentHash *datastore.Key - UncleHash string `datastore:",noindex"` - Coinbase string `datastore:",noindex"` - Root string `datastore:",noindex"` - TxHash string `datastore:",noindex"` - ReceiptHash string `datastore:",noindex"` - Bloom []byte `datastore:",noindex"` - Difficulty string `datastore:",noindex"` - Number string - GasLimit string `datastore:",noindex"` - GasUsed string - Time time.Time - Extra []byte `datastore:",noindex"` - MixDigest string `datastore:",noindex"` - Nonce string `datastore:",noindex"` - BaseFee string `datastore:",noindex"` + ParentHash *datastore.Key + UncleHash string `datastore:",noindex"` + Coinbase string `datastore:",noindex"` + Root string `datastore:",noindex"` + TxHash string `datastore:",noindex"` + ReceiptHash string `datastore:",noindex"` + Bloom []byte `datastore:",noindex"` + Difficulty string `datastore:",noindex"` + Number string + GasLimit string `datastore:",noindex"` + GasUsed string + Time time.Time + Extra []byte `datastore:",noindex"` + MixDigest string `datastore:",noindex"` + Nonce string `datastore:",noindex"` + BaseFee string `datastore:",noindex"` TimeFirstSeen time.Time TTL time.Time IsParent bool diff --git a/p2p/datastructures/bloomset_test.go b/p2p/datastructures/bloomset_test.go index 246b29add..f76fa51cc 100644 --- a/p2p/datastructures/bloomset_test.go +++ b/p2p/datastructures/bloomset_test.go @@ -200,4 +200,3 @@ func BenchmarkBloomSetFilterNotContained(b *testing.B) { bloom.FilterNotContained(batch) } } - diff --git a/p2p/types.go b/p2p/types.go index 56b25f16e..f1972e6eb 100644 --- a/p2p/types.go +++ b/p2p/types.go @@ -448,4 +448,3 @@ type rawPooledTransactionsPacket struct { RequestId uint64 Txs []rlp.RawValue } - From 010b67c56949f59c456436a79cb062fef1939033 Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Tue, 10 Mar 2026 22:58:53 -0400 Subject: [PATCH 07/14] fix: remove redundant branch in writeFirstSeen and update shouldWrite to modified --- p2p/database/datastore.go | 77 +++++++++++++++++++-------------------- 1 file changed, 38 insertions(+), 39 deletions(-) diff --git a/p2p/database/datastore.go b/p2p/database/datastore.go index 58e2924fc..a4800b524 100644 --- a/p2p/database/datastore.go +++ b/p2p/database/datastore.go @@ -398,6 +398,14 @@ func (d *Datastore) newDatastoreHeader(header *types.Header, tfs time.Time, isPa } } +// writeFirstSeen sets hash timing if it doesn't exist or if new timestamp is earlier. +func (d *Datastore) writeFirstSeen(block *DatastoreBlock, tfs time.Time) { + if block.TimeFirstSeenHash.IsZero() || tfs.Before(block.TimeFirstSeenHash) { + block.TimeFirstSeenHash = tfs + block.SensorFirstSeenHash = d.sensorID + } +} + // newDatastoreTransaction creates a DatastoreTransaction from a types.Transaction. Some // values are converted into strings to prevent a loss of precision. func (d *Datastore) newDatastoreTransaction(tx *types.Transaction, tfs time.Time) *DatastoreTransaction { @@ -447,32 +455,27 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big. // are nil we will just set them. _ = tx.Get(key, &dsBlock) + modified := false + if dsBlock.DatastoreHeader == nil || tfs.Before(dsBlock.TimeFirstSeen) { + modified = true + // Create new header with current timing header := d.newDatastoreHeader(block.Header(), tfs, false) - // Preserve earlier global timestamps from any earlier announcement - if dsBlock.DatastoreHeader != nil && - !dsBlock.TimeFirstSeen.IsZero() && - dsBlock.TimeFirstSeen.Before(tfs) { - header.TimeFirstSeen = dsBlock.TimeFirstSeen - header.SensorFirstSeen = dsBlock.SensorFirstSeen - } - // Set hash timing if it doesn't exist or if new timestamp is earlier - if dsBlock.TimeFirstSeenHash.IsZero() || tfs.Before(dsBlock.TimeFirstSeenHash) { - dsBlock.TimeFirstSeenHash = tfs - dsBlock.SensorFirstSeenHash = d.sensorID - } + d.writeFirstSeen(&dsBlock, tfs) dsBlock.DatastoreHeader = header } if len(dsBlock.TotalDifficulty) == 0 { + modified = true dsBlock.TotalDifficulty = td.String() } if dsBlock.Transactions == nil && len(block.Transactions()) > 0 { + modified = true if d.shouldWriteTransactions { d.writeTransactions(ctx, block.Transactions(), tfs) } @@ -484,6 +487,7 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big. } if dsBlock.Uncles == nil && len(block.Uncles()) > 0 { + modified = true dsBlock.Uncles = make([]*datastore.Key, 0, len(block.Uncles())) for _, uncle := range block.Uncles() { d.writeBlockHeader(ctx, uncle, tfs, false) @@ -491,8 +495,12 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big. } } - _, err := tx.Put(key, &dsBlock) - return err + if modified { + _, err := tx.Put(key, &dsBlock) + return err + } + + return nil }, datastore.MaxAttempts(MaxAttempts)) if err != nil { @@ -549,30 +557,21 @@ func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header, _, err := d.client.RunInTransaction(ctx, func(tx *datastore.Transaction) error { var block DatastoreBlock - _ = tx.Get(key, &block) - - // Create or replace header if it doesn't exist or if timestamp is earlier - if block.DatastoreHeader == nil || tfs.Before(block.DatastoreHeader.TimeFirstSeen) { - newHeader := d.newDatastoreHeader(header, tfs, isParent) - - // Preserve earlier global timestamps - if block.DatastoreHeader != nil && - !block.TimeFirstSeen.IsZero() && - block.TimeFirstSeen.Before(tfs) { - newHeader.TimeFirstSeen = block.TimeFirstSeen - newHeader.SensorFirstSeen = block.SensorFirstSeen - } - - // Set hash timing if it doesn't exist or if new timestamp is earlier - if block.TimeFirstSeenHash.IsZero() || tfs.Before(block.TimeFirstSeenHash) { - block.TimeFirstSeenHash = tfs - block.SensorFirstSeenHash = d.sensorID - } + err := tx.Get(key, &block) - block.DatastoreHeader = newHeader + // If block header already exists and new timestamp is not earlier, don't overwrite + if err == nil && block.DatastoreHeader != nil && !tfs.Before(block.TimeFirstSeen) { + return nil } - _, err := tx.Put(key, &block) + // Create new header with current timing + newHeader := d.newDatastoreHeader(header, tfs, isParent) + + // Set hash timing if it doesn't exist or if new timestamp is earlier + d.writeFirstSeen(&block, tfs) + + block.DatastoreHeader = newHeader + _, err = tx.Put(key, &block) return err }, datastore.MaxAttempts(MaxAttempts)) @@ -590,10 +589,10 @@ func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, has log.Debug().Err(err).Str("hash", hash.Hex()).Msg("Failed to fetch block when writing block body") } - shouldWrite := false + modified := false if block.Transactions == nil && len(body.Transactions) > 0 { - shouldWrite = true + modified = true if d.shouldWriteTransactions { d.writeTransactions(ctx, body.Transactions, tfs) } @@ -605,7 +604,7 @@ func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, has } if block.Uncles == nil && len(body.Uncles) > 0 { - shouldWrite = true + modified = true block.Uncles = make([]*datastore.Key, 0, len(body.Uncles)) for _, uncle := range body.Uncles { d.writeBlockHeader(ctx, uncle, tfs, false) @@ -613,7 +612,7 @@ func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, has } } - if shouldWrite { + if modified { _, err := tx.Put(key, &block) return err } From 3d2c2b1380b44a24c41b4c6643d588855812fa0c Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Tue, 10 Mar 2026 23:17:29 -0400 Subject: [PATCH 08/14] fix: flag logic --- p2p/database/datastore.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/p2p/database/datastore.go b/p2p/database/datastore.go index a4800b524..62fcdf262 100644 --- a/p2p/database/datastore.go +++ b/p2p/database/datastore.go @@ -235,7 +235,7 @@ func (d *Datastore) WriteBlockHashes(ctx context.Context, peer *enode.Node, hash // first seen time if the block doesn't exist yet. If it exists, updates the // TimeFirstSeenHash if the new time is earlier. func (d *Datastore) WriteBlockHashFirstSeen(ctx context.Context, peer *enode.Node, hash common.Hash, tfsh time.Time) { - if d.client == nil || !d.ShouldWriteBlocks() { + if d.client == nil || (!d.ShouldWriteBlocks() && !d.shouldWriteFirstBlockEvent) { return } @@ -251,6 +251,10 @@ func (d *Datastore) writeBlockHashFirstSeen(ctx context.Context, peer *enode.Nod d.writeEvent(peer, BlockEventsKind, hash, BlocksKind, tfsh) } + if !d.shouldWriteBlocks { + return + } + key := datastore.NameKey(BlocksKind, hash.Hex(), nil) _, err := d.client.RunInTransaction(ctx, func(tx *datastore.Transaction) error { From 7f2bf0166bd5b54c917f45db3a83c4ad9a1ee809 Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Tue, 10 Mar 2026 23:52:11 -0400 Subject: [PATCH 09/14] fix: mutual exclusive flag --- cmd/p2p/sensor/sensor.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cmd/p2p/sensor/sensor.go b/cmd/p2p/sensor/sensor.go index 102639efd..593e726d3 100644 --- a/cmd/p2p/sensor/sensor.go +++ b/cmd/p2p/sensor/sensor.go @@ -479,8 +479,7 @@ will result in less chance of missing data but can significantly increase memory f.BoolVarP(&inputSensorParams.ShouldWriteBlocks, "write-blocks", "B", true, "write blocks to database") f.BoolVar(&inputSensorParams.ShouldWriteBlockEvents, "write-block-events", true, "write block events to database") f.BoolVar(&inputSensorParams.ShouldWriteFirstBlockEvent, "write-first-block-event", false, - "write one block event per block on first-seen (alternative to --write-block-events)") - SensorCmd.MarkFlagsMutuallyExclusive("write-block-events", "write-first-block-event") + "write one block event per block on first-seen (use with --write-block-events=false to reduce write load)") f.BoolVarP(&inputSensorParams.ShouldWriteTransactions, "write-txs", "t", true, `write transactions to database (this option can significantly increase CPU and memory usage)`) f.BoolVar(&inputSensorParams.ShouldWriteTransactionEvents, "write-tx-events", true, From b9a32e231e57ea68952736ad0de83cf153473475 Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Wed, 11 Mar 2026 00:01:29 -0400 Subject: [PATCH 10/14] docs: make gen --- doc/polycli_p2p_sensor.md | 1 + 1 file changed, 1 insertion(+) diff --git a/doc/polycli_p2p_sensor.md b/doc/polycli_p2p_sensor.md index 0561950e4..bd5bfbae0 100644 --- a/doc/polycli_p2p_sensor.md +++ b/doc/polycli_p2p_sensor.md @@ -142,6 +142,7 @@ polycli p2p sensor amoy-nodes.json \ --txs-cache-ttl duration time to live for transaction cache entries (0 for no expiration) (default 10m0s) --write-block-events write block events to database (default true) -B, --write-blocks write blocks to database (default true) + --write-first-block-event write one block event per block on first-seen (use with --write-block-events=false to reduce write load) --write-peers write peers to database (default true) --write-tx-events write transaction events to database (this option can significantly increase CPU and memory usage) (default true) -t, --write-txs write transactions to database (this option can significantly increase CPU and memory usage) (default true) From f94e9afc3ef13aebad4baed96f5f0f81f5922a2f Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Wed, 11 Mar 2026 00:42:58 -0400 Subject: [PATCH 11/14] fix: missing if check bug --- p2p/database/datastore.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/p2p/database/datastore.go b/p2p/database/datastore.go index 62fcdf262..7374d0344 100644 --- a/p2p/database/datastore.go +++ b/p2p/database/datastore.go @@ -175,7 +175,7 @@ func (d *Datastore) WriteBlock(ctx context.Context, peer *enode.Node, block *typ return } - if d.ShouldWriteBlockEvents() { + if d.ShouldWriteBlockEvents() || d.shouldWriteFirstBlockEvent { d.runAsync(func() { d.writeEvent(peer, BlockEventsKind, block.Hash(), BlocksKind, tfs) }) From 8c331b6b9062919c39abc732eccc7b6647d76b37 Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Wed, 11 Mar 2026 08:02:26 -0400 Subject: [PATCH 12/14] feat: add sensor ID to API --- cmd/p2p/sensor/api.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/p2p/sensor/api.go b/cmd/p2p/sensor/api.go index 7d1062322..c9802742b 100644 --- a/cmd/p2p/sensor/api.go +++ b/cmd/p2p/sensor/api.go @@ -47,6 +47,7 @@ func newBlockInfo(header *types.Header) *blockInfo { // apiData represents all sensor information including node info and peer data. type apiData struct { + SensorID string `json:"sensor_id"` ENR string `json:"enr"` URL string `json:"enode"` PeerCount int `json:"peer_count"` @@ -105,6 +106,7 @@ func handleAPI(server *ethp2p.Server, conns *p2p.Conns) { } data := apiData{ + SensorID: inputSensorParams.SensorID, ENR: server.NodeInfo().ENR, URL: server.Self().URLv4(), PeerCount: len(peers), From 339a47660cb1988097a5466addba4cbd44038b4b Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Thu, 12 Mar 2026 05:40:19 -0400 Subject: [PATCH 13/14] feat: fix race conditions --- cmd/p2p/sensor/sensor.go | 2 +- doc/polycli_p2p_sensor.md | 2 +- p2p/database/datastore.go | 7 ++++--- p2p/datastructures/lru.go | 11 +++++++---- p2p/protocol.go | 32 ++++++++++++++++++++++---------- 5 files changed, 35 insertions(+), 19 deletions(-) diff --git a/cmd/p2p/sensor/sensor.go b/cmd/p2p/sensor/sensor.go index 593e726d3..f329e7b31 100644 --- a/cmd/p2p/sensor/sensor.go +++ b/cmd/p2p/sensor/sensor.go @@ -479,7 +479,7 @@ will result in less chance of missing data but can significantly increase memory f.BoolVarP(&inputSensorParams.ShouldWriteBlocks, "write-blocks", "B", true, "write blocks to database") f.BoolVar(&inputSensorParams.ShouldWriteBlockEvents, "write-block-events", true, "write block events to database") f.BoolVar(&inputSensorParams.ShouldWriteFirstBlockEvent, "write-first-block-event", false, - "write one block event per block on first-seen (use with --write-block-events=false to reduce write load)") + "write one block event on first-seen only (requires --write-block-events=false)") f.BoolVarP(&inputSensorParams.ShouldWriteTransactions, "write-txs", "t", true, `write transactions to database (this option can significantly increase CPU and memory usage)`) f.BoolVar(&inputSensorParams.ShouldWriteTransactionEvents, "write-tx-events", true, diff --git a/doc/polycli_p2p_sensor.md b/doc/polycli_p2p_sensor.md index bd5bfbae0..57b64fbf3 100644 --- a/doc/polycli_p2p_sensor.md +++ b/doc/polycli_p2p_sensor.md @@ -142,7 +142,7 @@ polycli p2p sensor amoy-nodes.json \ --txs-cache-ttl duration time to live for transaction cache entries (0 for no expiration) (default 10m0s) --write-block-events write block events to database (default true) -B, --write-blocks write blocks to database (default true) - --write-first-block-event write one block event per block on first-seen (use with --write-block-events=false to reduce write load) + --write-first-block-event write one block event on first-seen only (requires --write-block-events=false) --write-peers write peers to database (default true) --write-tx-events write transaction events to database (this option can significantly increase CPU and memory usage) (default true) -t, --write-txs write transactions to database (this option can significantly increase CPU and memory usage) (default true) diff --git a/p2p/database/datastore.go b/p2p/database/datastore.go index 7374d0344..18093c819 100644 --- a/p2p/database/datastore.go +++ b/p2p/database/datastore.go @@ -175,7 +175,7 @@ func (d *Datastore) WriteBlock(ctx context.Context, peer *enode.Node, block *typ return } - if d.ShouldWriteBlockEvents() || d.shouldWriteFirstBlockEvent { + if d.ShouldWriteBlockEvents() { d.runAsync(func() { d.writeEvent(peer, BlockEventsKind, block.Hash(), BlocksKind, tfs) }) @@ -246,8 +246,9 @@ func (d *Datastore) WriteBlockHashFirstSeen(ctx context.Context, peer *enode.Nod // writeBlockHashFirstSeen performs the actual transaction to write or update the block hash first seen time. func (d *Datastore) writeBlockHashFirstSeen(ctx context.Context, peer *enode.Node, hash common.Hash, tfsh time.Time) { - // Write block event if flag enabled (cache check in protocol.go already verified first-seen) - if d.shouldWriteFirstBlockEvent && peer != nil { + // Write block event if flag enabled and block events are disabled (mutually exclusive). + // Cache check in protocol.go already verified first-seen. + if d.shouldWriteFirstBlockEvent && !d.ShouldWriteBlockEvents() && peer != nil { d.writeEvent(peer, BlockEventsKind, hash, BlocksKind, tfsh) } diff --git a/p2p/datastructures/lru.go b/p2p/datastructures/lru.go index f39edd1ca..2fd2e0eec 100644 --- a/p2p/datastructures/lru.go +++ b/p2p/datastructures/lru.go @@ -191,9 +191,10 @@ func (c *LRU[K, V]) PeekManyWithKeys(keys []K) ([]K, []V) { // Update atomically updates a value in the cache using the provided update function. // The update function receives the current value (or zero value if not found) and -// returns the new value to store. This is thread-safe and prevents race conditions -// in get-modify-add patterns. -func (c *LRU[K, V]) Update(key K, updateFn func(V) V) { +// returns the new value to store. Returns true if the key already existed (and was +// not expired), false if a new entry was created. This is thread-safe and prevents +// race conditions in get-modify-add patterns. +func (c *LRU[K, V]) Update(key K, updateFn func(V) V) (existed bool) { c.mu.Lock() defer c.mu.Unlock() @@ -213,7 +214,7 @@ func (c *LRU[K, V]) Update(key K, updateFn func(V) V) { c.list.MoveToFront(elem) e.value = updateFn(currentVal) e.expiresAt = expiresAt - return + return true } // Entry expired, remove it c.list.Remove(elem) @@ -239,6 +240,8 @@ func (c *LRU[K, V]) Update(key K, updateFn func(V) V) { delete(c.items, e.key) } } + + return false } // Remove removes a key from the cache and returns the value if it existed. diff --git a/p2p/protocol.go b/p2p/protocol.go index 1ee9f60d7..d8fab248c 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -532,8 +532,15 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error { // Mark as known from this peer c.addKnownBlock(hash) - // Check what parts of the block we already have - cache, ok := c.conns.Blocks().Get(hash) + // Atomically check and add to cache to prevent duplicate writes from + // concurrent peers receiving the same block hash. + ok := c.conns.Blocks().Update(hash, func(cache BlockCache) BlockCache { + if cache != (BlockCache{}) { + return cache + } + return BlockCache{} + }) + if ok { continue } @@ -542,11 +549,10 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error { c.db.WriteBlockHashFirstSeen(ctx, c.node, hash, tfs) // Request only the parts we don't have - if err := c.getBlockData(hash, cache, false); err != nil { + if err := c.getBlockData(hash, BlockCache{}, false); err != nil { return err } - c.conns.Blocks().Add(hash, BlockCache{}) uniqueHashes = append(uniqueHashes, hash) uniqueNumbers = append(uniqueNumbers, entry.Number) } @@ -1011,12 +1017,13 @@ func (c *conn) handleNewBlock(ctx context.Context, msg ethp2p.Msg) error { // Atomically check and add to cache to prevent duplicate writes from // concurrent peers receiving the same block. - update := false - c.conns.Blocks().Update(hash, func(existing BlockCache) BlockCache { - if existing.TD != nil { - return existing // Already have full block + var exists bool + ok := c.conns.Blocks().Update(hash, func(cache BlockCache) BlockCache { + if cache.TD != nil { + exists = true + return cache } - update = true + return BlockCache{ Header: packet.Block.Header(), Body: ð.BlockBody{ @@ -1028,10 +1035,15 @@ func (c *conn) handleNewBlock(ctx context.Context, msg ethp2p.Msg) error { } }) - if !update { + if exists { return nil } + // Write first-seen event for blocks arriving directly (not announced via hash first) + if !ok { + c.db.WriteBlockHashFirstSeen(ctx, c.node, hash, tfs) + } + c.db.WriteBlock(ctx, c.node, packet.Block, packet.TD, tfs) // Broadcast block or block hash to other peers asynchronously From 1b9a9f40249ba63c2d9cf9718120644322a6e007 Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Thu, 12 Mar 2026 06:17:11 -0400 Subject: [PATCH 14/14] fix: comment --- p2p/database/datastore.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/database/datastore.go b/p2p/database/datastore.go index 18093c819..6b0753b61 100644 --- a/p2p/database/datastore.go +++ b/p2p/database/datastore.go @@ -468,7 +468,7 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big. // Create new header with current timing header := d.newDatastoreHeader(block.Header(), tfs, false) - // Set hash timing if it doesn't exist or if new timestamp is earlier + // Preserve earliest first-seen timestamp d.writeFirstSeen(&dsBlock, tfs) dsBlock.DatastoreHeader = header @@ -572,7 +572,7 @@ func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header, // Create new header with current timing newHeader := d.newDatastoreHeader(header, tfs, isParent) - // Set hash timing if it doesn't exist or if new timestamp is earlier + // Preserve earliest first-seen timestamp d.writeFirstSeen(&block, tfs) block.DatastoreHeader = newHeader