diff --git a/cmd/p2p/sensor/api.go b/cmd/p2p/sensor/api.go index fb68eb20..c9802742 100644 --- a/cmd/p2p/sensor/api.go +++ b/cmd/p2p/sensor/api.go @@ -47,6 +47,7 @@ func newBlockInfo(header *types.Header) *blockInfo { // apiData represents all sensor information including node info and peer data. type apiData struct { + SensorID string `json:"sensor_id"` ENR string `json:"enr"` URL string `json:"enode"` PeerCount int `json:"peer_count"` @@ -105,6 +106,7 @@ func handleAPI(server *ethp2p.Server, conns *p2p.Conns) { } data := apiData{ + SensorID: inputSensorParams.SensorID, ENR: server.NodeInfo().ENR, URL: server.Self().URLv4(), PeerCount: len(peers), @@ -123,4 +125,3 @@ func handleAPI(server *ethp2p.Server, conns *p2p.Conns) { log.Error().Err(err).Msg("Failed to start API handler") } } - diff --git a/cmd/p2p/sensor/sensor.go b/cmd/p2p/sensor/sensor.go index 5daa5f39..f329e7b3 100644 --- a/cmd/p2p/sensor/sensor.go +++ b/cmd/p2p/sensor/sensor.go @@ -29,10 +29,10 @@ import ( "github.com/rs/zerolog/log" "github.com/spf13/cobra" - ds "github.com/0xPolygon/polygon-cli/p2p/datastructures" "github.com/0xPolygon/polygon-cli/flag" "github.com/0xPolygon/polygon-cli/p2p" "github.com/0xPolygon/polygon-cli/p2p/database" + ds "github.com/0xPolygon/polygon-cli/p2p/datastructures" "github.com/0xPolygon/polygon-cli/rpctypes" ) @@ -50,6 +50,7 @@ type ( MaxDatabaseConcurrency int ShouldWriteBlocks bool ShouldWriteBlockEvents bool + ShouldWriteFirstBlockEvent bool ShouldWriteTransactions bool ShouldWriteTransactionEvents bool ShouldWritePeers bool @@ -438,6 +439,7 @@ func newDatabase(ctx context.Context) (database.Database, error) { MaxConcurrency: inputSensorParams.MaxDatabaseConcurrency, ShouldWriteBlocks: inputSensorParams.ShouldWriteBlocks, ShouldWriteBlockEvents: inputSensorParams.ShouldWriteBlockEvents, + ShouldWriteFirstBlockEvent: inputSensorParams.ShouldWriteFirstBlockEvent, ShouldWriteTransactions: inputSensorParams.ShouldWriteTransactions, ShouldWriteTransactionEvents: inputSensorParams.ShouldWriteTransactionEvents, ShouldWritePeers: inputSensorParams.ShouldWritePeers, @@ -476,6 +478,8 @@ func init() { will result in less chance of missing data but can significantly increase memory usage)`) f.BoolVarP(&inputSensorParams.ShouldWriteBlocks, "write-blocks", "B", true, "write blocks to database") f.BoolVar(&inputSensorParams.ShouldWriteBlockEvents, "write-block-events", true, "write block events to database") + f.BoolVar(&inputSensorParams.ShouldWriteFirstBlockEvent, "write-first-block-event", false, + "write one block event on first-seen only (requires --write-block-events=false)") f.BoolVarP(&inputSensorParams.ShouldWriteTransactions, "write-txs", "t", true, `write transactions to database (this option can significantly increase CPU and memory usage)`) f.BoolVar(&inputSensorParams.ShouldWriteTransactionEvents, "write-tx-events", true, diff --git a/cmd/root.go b/cmd/root.go index b6515cc6..493c024d 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -29,12 +29,12 @@ import ( "github.com/0xPolygon/polygon-cli/cmd/p2p" "github.com/0xPolygon/polygon-cli/cmd/parsebatchl2data" "github.com/0xPolygon/polygon-cli/cmd/parseethwallet" + "github.com/0xPolygon/polygon-cli/cmd/plot" "github.com/0xPolygon/polygon-cli/cmd/publish" "github.com/0xPolygon/polygon-cli/cmd/report" "github.com/0xPolygon/polygon-cli/cmd/retest" "github.com/0xPolygon/polygon-cli/cmd/rpcfuzz" "github.com/0xPolygon/polygon-cli/cmd/signer" - "github.com/0xPolygon/polygon-cli/cmd/plot" "github.com/0xPolygon/polygon-cli/cmd/ulxly" "github.com/0xPolygon/polygon-cli/cmd/version" "github.com/0xPolygon/polygon-cli/cmd/wallet" diff --git a/doc/polycli_p2p_sensor.md b/doc/polycli_p2p_sensor.md index 0561950e..57b64fbf 100644 --- a/doc/polycli_p2p_sensor.md +++ b/doc/polycli_p2p_sensor.md @@ -142,6 +142,7 @@ polycli p2p sensor amoy-nodes.json \ --txs-cache-ttl duration time to live for transaction cache entries (0 for no expiration) (default 10m0s) --write-block-events write block events to database (default true) -B, --write-blocks write blocks to database (default true) + --write-first-block-event write one block event on first-seen only (requires --write-block-events=false) --write-peers write peers to database (default true) --write-tx-events write transaction events to database (this option can significantly increase CPU and memory usage) (default true) -t, --write-txs write transactions to database (this option can significantly increase CPU and memory usage) (default true) diff --git a/loadtest/gasmanager/wave.go b/loadtest/gasmanager/wave.go index 9cee2db3..6d4bfe44 100644 --- a/loadtest/gasmanager/wave.go +++ b/loadtest/gasmanager/wave.go @@ -70,11 +70,11 @@ func (w *wave) MoveNext() { } } -func (w *wave) Y() float64 { return w.points[w.x] } -func (w *wave) X() float64 { return w.x } -func (w *wave) Period() uint64 { return w.config.Period } +func (w *wave) Y() float64 { return w.points[w.x] } +func (w *wave) X() float64 { return w.x } +func (w *wave) Period() uint64 { return w.config.Period } func (w *wave) Amplitude() uint64 { return w.config.Amplitude } -func (w *wave) Target() uint64 { return w.config.Target } +func (w *wave) Target() uint64 { return w.config.Target } // Wave computation functions diff --git a/p2p/database/database.go b/p2p/database/database.go index b6fc60d0..dc1cd72d 100644 --- a/p2p/database/database.go +++ b/p2p/database/database.go @@ -30,7 +30,7 @@ type Database interface { // WriteBlockHashFirstSeen writes a partial block entry with just the hash // first seen time if the block doesn't exist yet. - WriteBlockHashFirstSeen(context.Context, common.Hash, time.Time) + WriteBlockHashFirstSeen(context.Context, *enode.Node, common.Hash, time.Time) // WriteBlockBody will write the block bodies if ShouldWriteBlocks returns // true. @@ -51,6 +51,7 @@ type Database interface { MaxConcurrentWrites() int ShouldWriteBlocks() bool ShouldWriteBlockEvents() bool + ShouldWriteFirstBlockEvent() bool ShouldWriteTransactions() bool ShouldWriteTransactionEvents() bool ShouldWritePeers() bool diff --git a/p2p/database/datastore.go b/p2p/database/datastore.go index 363d888e..6b0753b6 100644 --- a/p2p/database/datastore.go +++ b/p2p/database/datastore.go @@ -35,6 +35,7 @@ type Datastore struct { maxConcurrency int shouldWriteBlocks bool shouldWriteBlockEvents bool + shouldWriteFirstBlockEvent bool shouldWriteTransactions bool shouldWriteTransactionEvents bool shouldWritePeers bool @@ -127,6 +128,7 @@ type DatastoreOptions struct { MaxConcurrency int ShouldWriteBlocks bool ShouldWriteBlockEvents bool + ShouldWriteFirstBlockEvent bool ShouldWriteTransactions bool ShouldWriteTransactionEvents bool ShouldWritePeers bool @@ -148,6 +150,7 @@ func NewDatastore(ctx context.Context, opts DatastoreOptions) Database { maxConcurrency: opts.MaxConcurrency, shouldWriteBlocks: opts.ShouldWriteBlocks, shouldWriteBlockEvents: opts.ShouldWriteBlockEvents, + shouldWriteFirstBlockEvent: opts.ShouldWriteFirstBlockEvent, shouldWriteTransactions: opts.ShouldWriteTransactions, shouldWriteTransactionEvents: opts.ShouldWriteTransactionEvents, shouldWritePeers: opts.ShouldWritePeers, @@ -231,18 +234,28 @@ func (d *Datastore) WriteBlockHashes(ctx context.Context, peer *enode.Node, hash // WriteBlockHashFirstSeen writes a partial block entry with just the hash // first seen time if the block doesn't exist yet. If it exists, updates the // TimeFirstSeenHash if the new time is earlier. -func (d *Datastore) WriteBlockHashFirstSeen(ctx context.Context, hash common.Hash, tfsh time.Time) { - if d.client == nil || !d.ShouldWriteBlocks() { +func (d *Datastore) WriteBlockHashFirstSeen(ctx context.Context, peer *enode.Node, hash common.Hash, tfsh time.Time) { + if d.client == nil || (!d.ShouldWriteBlocks() && !d.shouldWriteFirstBlockEvent) { return } d.runAsync(func() { - d.writeBlockHashFirstSeen(ctx, hash, tfsh) + d.writeBlockHashFirstSeen(ctx, peer, hash, tfsh) }) } // writeBlockHashFirstSeen performs the actual transaction to write or update the block hash first seen time. -func (d *Datastore) writeBlockHashFirstSeen(ctx context.Context, hash common.Hash, tfsh time.Time) { +func (d *Datastore) writeBlockHashFirstSeen(ctx context.Context, peer *enode.Node, hash common.Hash, tfsh time.Time) { + // Write block event if flag enabled and block events are disabled (mutually exclusive). + // Cache check in protocol.go already verified first-seen. + if d.shouldWriteFirstBlockEvent && !d.ShouldWriteBlockEvents() && peer != nil { + d.writeEvent(peer, BlockEventsKind, hash, BlocksKind, tfsh) + } + + if !d.shouldWriteBlocks { + return + } + key := datastore.NameKey(BlocksKind, hash.Hex(), nil) _, err := d.client.RunInTransaction(ctx, func(tx *datastore.Transaction) error { @@ -390,17 +403,8 @@ func (d *Datastore) newDatastoreHeader(header *types.Header, tfs time.Time, isPa } } -// writeFirstSeen updates timing fields on a header and block, preserving earlier timestamps. -func (d *Datastore) writeFirstSeen(header *DatastoreHeader, block *DatastoreBlock, tfs time.Time) { - // Preserve earlier header timing if it exists - if block.DatastoreHeader != nil && - !block.TimeFirstSeen.IsZero() && - block.TimeFirstSeen.Before(tfs) { - header.TimeFirstSeen = block.TimeFirstSeen - header.SensorFirstSeen = block.SensorFirstSeen - } - - // Set hash timing if it doesn't exist or if new timestamp is earlier +// writeFirstSeen sets hash timing if it doesn't exist or if new timestamp is earlier. +func (d *Datastore) writeFirstSeen(block *DatastoreBlock, tfs time.Time) { if block.TimeFirstSeenHash.IsZero() || tfs.Before(block.TimeFirstSeenHash) { block.TimeFirstSeenHash = tfs block.SensorFirstSeenHash = d.sensorID @@ -456,27 +460,27 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big. // are nil we will just set them. _ = tx.Get(key, &dsBlock) - shouldWrite := false + modified := false if dsBlock.DatastoreHeader == nil || tfs.Before(dsBlock.TimeFirstSeen) { - shouldWrite = true + modified = true // Create new header with current timing header := d.newDatastoreHeader(block.Header(), tfs, false) - // Preserve earlier timestamps from any earlier announcement - d.writeFirstSeen(header, &dsBlock, tfs) + // Preserve earliest first-seen timestamp + d.writeFirstSeen(&dsBlock, tfs) dsBlock.DatastoreHeader = header } if len(dsBlock.TotalDifficulty) == 0 { - shouldWrite = true + modified = true dsBlock.TotalDifficulty = td.String() } if dsBlock.Transactions == nil && len(block.Transactions()) > 0 { - shouldWrite = true + modified = true if d.shouldWriteTransactions { d.writeTransactions(ctx, block.Transactions(), tfs) } @@ -488,7 +492,7 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big. } if dsBlock.Uncles == nil && len(block.Uncles()) > 0 { - shouldWrite = true + modified = true dsBlock.Uncles = make([]*datastore.Key, 0, len(block.Uncles())) for _, uncle := range block.Uncles() { d.writeBlockHeader(ctx, uncle, tfs, false) @@ -496,7 +500,7 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big. } } - if shouldWrite { + if modified { _, err := tx.Put(key, &dsBlock) return err } @@ -568,8 +572,8 @@ func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header, // Create new header with current timing newHeader := d.newDatastoreHeader(header, tfs, isParent) - // Preserve earlier timestamps from any earlier announcement or full block - d.writeFirstSeen(newHeader, &block, tfs) + // Preserve earliest first-seen timestamp + d.writeFirstSeen(&block, tfs) block.DatastoreHeader = newHeader _, err = tx.Put(key, &block) @@ -590,10 +594,10 @@ func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, has log.Debug().Err(err).Str("hash", hash.Hex()).Msg("Failed to fetch block when writing block body") } - shouldWrite := false + modified := false if block.Transactions == nil && len(body.Transactions) > 0 { - shouldWrite = true + modified = true if d.shouldWriteTransactions { d.writeTransactions(ctx, body.Transactions, tfs) } @@ -605,7 +609,7 @@ func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, has } if block.Uncles == nil && len(body.Uncles) > 0 { - shouldWrite = true + modified = true block.Uncles = make([]*datastore.Key, 0, len(body.Uncles)) for _, uncle := range body.Uncles { d.writeBlockHeader(ctx, uncle, tfs, false) @@ -613,7 +617,7 @@ func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, has } } - if shouldWrite { + if modified { _, err := tx.Put(key, &block) return err } @@ -642,6 +646,10 @@ func (d *Datastore) writeTransactions(ctx context.Context, txs []*types.Transact } } +func (d *Datastore) ShouldWriteFirstBlockEvent() bool { + return d.shouldWriteFirstBlockEvent +} + func (d *Datastore) NodeList(ctx context.Context, limit int) ([]string, error) { query := datastore.NewQuery(BlockEventsKind).Order("-Time") iter := d.client.Run(ctx, query) diff --git a/p2p/database/json.go b/p2p/database/json.go index 8d5c0e14..a6290241 100644 --- a/p2p/database/json.go +++ b/p2p/database/json.go @@ -238,7 +238,7 @@ func (j *JSONDatabase) WriteBlockHashes(ctx context.Context, peer *enode.Node, h // WriteBlockHashFirstSeen writes a partial block entry with just the hash // first seen time. For JSON output, this writes a separate record type. -func (j *JSONDatabase) WriteBlockHashFirstSeen(ctx context.Context, hash common.Hash, tfsh time.Time) { +func (j *JSONDatabase) WriteBlockHashFirstSeen(ctx context.Context, peer *enode.Node, hash common.Hash, tfsh time.Time) { if !j.ShouldWriteBlocks() { return } @@ -374,6 +374,11 @@ func (j *JSONDatabase) ShouldWriteBlockEvents() bool { return j.shouldWriteBlockEvents } +// ShouldWriteFirstBlockEvent returns false for JSON database. +func (j *JSONDatabase) ShouldWriteFirstBlockEvent() bool { + return false +} + // ShouldWriteTransactions returns the configured value. func (j *JSONDatabase) ShouldWriteTransactions() bool { return j.shouldWriteTransactions diff --git a/p2p/database/nodb.go b/p2p/database/nodb.go index b6223ba2..1bc00398 100644 --- a/p2p/database/nodb.go +++ b/p2p/database/nodb.go @@ -34,7 +34,7 @@ func (n *nodb) WriteBlockHashes(ctx context.Context, peer *enode.Node, hashes [] } // WriteBlockHashFirstSeen does nothing. -func (n *nodb) WriteBlockHashFirstSeen(ctx context.Context, hash common.Hash, tfsh time.Time) { +func (n *nodb) WriteBlockHashFirstSeen(ctx context.Context, peer *enode.Node, hash common.Hash, tfsh time.Time) { } // WriteBlockBody does nothing. @@ -69,6 +69,11 @@ func (n *nodb) ShouldWriteBlockEvents() bool { return false } +// ShouldWriteFirstBlockEvent returns false. +func (n *nodb) ShouldWriteFirstBlockEvent() bool { + return false +} + // ShouldWriteTransactions returns false. func (n *nodb) ShouldWriteTransactions() bool { return false diff --git a/p2p/datastructures/bloomset_test.go b/p2p/datastructures/bloomset_test.go index 246b29ad..f76fa51c 100644 --- a/p2p/datastructures/bloomset_test.go +++ b/p2p/datastructures/bloomset_test.go @@ -200,4 +200,3 @@ func BenchmarkBloomSetFilterNotContained(b *testing.B) { bloom.FilterNotContained(batch) } } - diff --git a/p2p/datastructures/lru.go b/p2p/datastructures/lru.go index f39edd1c..2fd2e0ee 100644 --- a/p2p/datastructures/lru.go +++ b/p2p/datastructures/lru.go @@ -191,9 +191,10 @@ func (c *LRU[K, V]) PeekManyWithKeys(keys []K) ([]K, []V) { // Update atomically updates a value in the cache using the provided update function. // The update function receives the current value (or zero value if not found) and -// returns the new value to store. This is thread-safe and prevents race conditions -// in get-modify-add patterns. -func (c *LRU[K, V]) Update(key K, updateFn func(V) V) { +// returns the new value to store. Returns true if the key already existed (and was +// not expired), false if a new entry was created. This is thread-safe and prevents +// race conditions in get-modify-add patterns. +func (c *LRU[K, V]) Update(key K, updateFn func(V) V) (existed bool) { c.mu.Lock() defer c.mu.Unlock() @@ -213,7 +214,7 @@ func (c *LRU[K, V]) Update(key K, updateFn func(V) V) { c.list.MoveToFront(elem) e.value = updateFn(currentVal) e.expiresAt = expiresAt - return + return true } // Entry expired, remove it c.list.Remove(elem) @@ -239,6 +240,8 @@ func (c *LRU[K, V]) Update(key K, updateFn func(V) V) { delete(c.items, e.key) } } + + return false } // Remove removes a key from the cache and returns the value if it existed. diff --git a/p2p/protocol.go b/p2p/protocol.go index 7721d037..d8fab248 100644 --- a/p2p/protocol.go +++ b/p2p/protocol.go @@ -20,8 +20,8 @@ import ( "github.com/rs/zerolog" "github.com/rs/zerolog/log" - ds "github.com/0xPolygon/polygon-cli/p2p/datastructures" "github.com/0xPolygon/polygon-cli/p2p/database" + ds "github.com/0xPolygon/polygon-cli/p2p/datastructures" ) const ( @@ -532,21 +532,27 @@ func (c *conn) handleNewBlockHashes(ctx context.Context, msg ethp2p.Msg) error { // Mark as known from this peer c.addKnownBlock(hash) - // Check what parts of the block we already have - cache, ok := c.conns.Blocks().Get(hash) + // Atomically check and add to cache to prevent duplicate writes from + // concurrent peers receiving the same block hash. + ok := c.conns.Blocks().Update(hash, func(cache BlockCache) BlockCache { + if cache != (BlockCache{}) { + return cache + } + return BlockCache{} + }) + if ok { continue } // Write hash first seen time immediately for new blocks - c.db.WriteBlockHashFirstSeen(ctx, hash, tfs) + c.db.WriteBlockHashFirstSeen(ctx, c.node, hash, tfs) // Request only the parts we don't have - if err := c.getBlockData(hash, cache, false); err != nil { + if err := c.getBlockData(hash, BlockCache{}, false); err != nil { return err } - c.conns.Blocks().Add(hash, BlockCache{}) uniqueHashes = append(uniqueHashes, hash) uniqueNumbers = append(uniqueNumbers, entry.Number) } @@ -1009,23 +1015,36 @@ func (c *conn) handleNewBlock(ctx context.Context, msg ethp2p.Msg) error { return err } - // Check if we already have the full block in the cache - if cache, ok := c.conns.Blocks().Peek(hash); ok && cache.TD != nil { + // Atomically check and add to cache to prevent duplicate writes from + // concurrent peers receiving the same block. + var exists bool + ok := c.conns.Blocks().Update(hash, func(cache BlockCache) BlockCache { + if cache.TD != nil { + exists = true + return cache + } + + return BlockCache{ + Header: packet.Block.Header(), + Body: ð.BlockBody{ + Transactions: packet.Block.Transactions(), + Uncles: packet.Block.Uncles(), + Withdrawals: packet.Block.Withdrawals(), + }, + TD: packet.TD, + } + }) + + if exists { return nil } - c.db.WriteBlock(ctx, c.node, packet.Block, packet.TD, tfs) + // Write first-seen event for blocks arriving directly (not announced via hash first) + if !ok { + c.db.WriteBlockHashFirstSeen(ctx, c.node, hash, tfs) + } - // Update cache to store the full block - c.conns.Blocks().Add(hash, BlockCache{ - Header: packet.Block.Header(), - Body: ð.BlockBody{ - Transactions: packet.Block.Transactions(), - Uncles: packet.Block.Uncles(), - Withdrawals: packet.Block.Withdrawals(), - }, - TD: packet.TD, - }) + c.db.WriteBlock(ctx, c.node, packet.Block, packet.TD, tfs) // Broadcast block or block hash to other peers asynchronously go c.conns.BroadcastBlock(packet.Block, packet.TD) diff --git a/p2p/types.go b/p2p/types.go index 56b25f16..f1972e6e 100644 --- a/p2p/types.go +++ b/p2p/types.go @@ -448,4 +448,3 @@ type rawPooledTransactionsPacket struct { RequestId uint64 Txs []rlp.RawValue } -