Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/p2p/sensor/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func newBlockInfo(header *types.Header) *blockInfo {

// apiData represents all sensor information including node info and peer data.
type apiData struct {
SensorID string `json:"sensor_id"`
ENR string `json:"enr"`
URL string `json:"enode"`
PeerCount int `json:"peer_count"`
Expand Down Expand Up @@ -105,6 +106,7 @@ func handleAPI(server *ethp2p.Server, conns *p2p.Conns) {
}

data := apiData{
SensorID: inputSensorParams.SensorID,
ENR: server.NodeInfo().ENR,
URL: server.Self().URLv4(),
PeerCount: len(peers),
Expand All @@ -123,4 +125,3 @@ func handleAPI(server *ethp2p.Server, conns *p2p.Conns) {
log.Error().Err(err).Msg("Failed to start API handler")
}
}

6 changes: 5 additions & 1 deletion cmd/p2p/sensor/sensor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ import (
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"

ds "github.com/0xPolygon/polygon-cli/p2p/datastructures"
"github.com/0xPolygon/polygon-cli/flag"
"github.com/0xPolygon/polygon-cli/p2p"
"github.com/0xPolygon/polygon-cli/p2p/database"
ds "github.com/0xPolygon/polygon-cli/p2p/datastructures"
"github.com/0xPolygon/polygon-cli/rpctypes"
)

Expand All @@ -50,6 +50,7 @@ type (
MaxDatabaseConcurrency int
ShouldWriteBlocks bool
ShouldWriteBlockEvents bool
ShouldWriteFirstBlockEvent bool
ShouldWriteTransactions bool
ShouldWriteTransactionEvents bool
ShouldWritePeers bool
Expand Down Expand Up @@ -438,6 +439,7 @@ func newDatabase(ctx context.Context) (database.Database, error) {
MaxConcurrency: inputSensorParams.MaxDatabaseConcurrency,
ShouldWriteBlocks: inputSensorParams.ShouldWriteBlocks,
ShouldWriteBlockEvents: inputSensorParams.ShouldWriteBlockEvents,
ShouldWriteFirstBlockEvent: inputSensorParams.ShouldWriteFirstBlockEvent,
ShouldWriteTransactions: inputSensorParams.ShouldWriteTransactions,
ShouldWriteTransactionEvents: inputSensorParams.ShouldWriteTransactionEvents,
ShouldWritePeers: inputSensorParams.ShouldWritePeers,
Expand Down Expand Up @@ -476,6 +478,8 @@ func init() {
will result in less chance of missing data but can significantly increase memory usage)`)
f.BoolVarP(&inputSensorParams.ShouldWriteBlocks, "write-blocks", "B", true, "write blocks to database")
f.BoolVar(&inputSensorParams.ShouldWriteBlockEvents, "write-block-events", true, "write block events to database")
f.BoolVar(&inputSensorParams.ShouldWriteFirstBlockEvent, "write-first-block-event", false,
"write one block event on first-seen only (requires --write-block-events=false)")
f.BoolVarP(&inputSensorParams.ShouldWriteTransactions, "write-txs", "t", true,
`write transactions to database (this option can significantly increase CPU and memory usage)`)
f.BoolVar(&inputSensorParams.ShouldWriteTransactionEvents, "write-tx-events", true,
Expand Down
2 changes: 1 addition & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ import (
"github.com/0xPolygon/polygon-cli/cmd/p2p"
"github.com/0xPolygon/polygon-cli/cmd/parsebatchl2data"
"github.com/0xPolygon/polygon-cli/cmd/parseethwallet"
"github.com/0xPolygon/polygon-cli/cmd/plot"
"github.com/0xPolygon/polygon-cli/cmd/publish"
"github.com/0xPolygon/polygon-cli/cmd/report"
"github.com/0xPolygon/polygon-cli/cmd/retest"
"github.com/0xPolygon/polygon-cli/cmd/rpcfuzz"
"github.com/0xPolygon/polygon-cli/cmd/signer"
"github.com/0xPolygon/polygon-cli/cmd/plot"
"github.com/0xPolygon/polygon-cli/cmd/ulxly"
"github.com/0xPolygon/polygon-cli/cmd/version"
"github.com/0xPolygon/polygon-cli/cmd/wallet"
Expand Down
1 change: 1 addition & 0 deletions doc/polycli_p2p_sensor.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ polycli p2p sensor amoy-nodes.json \
--txs-cache-ttl duration time to live for transaction cache entries (0 for no expiration) (default 10m0s)
--write-block-events write block events to database (default true)
-B, --write-blocks write blocks to database (default true)
--write-first-block-event write one block event on first-seen only (requires --write-block-events=false)
--write-peers write peers to database (default true)
--write-tx-events write transaction events to database (this option can significantly increase CPU and memory usage) (default true)
-t, --write-txs write transactions to database (this option can significantly increase CPU and memory usage) (default true)
Expand Down
8 changes: 4 additions & 4 deletions loadtest/gasmanager/wave.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ func (w *wave) MoveNext() {
}
}

func (w *wave) Y() float64 { return w.points[w.x] }
func (w *wave) X() float64 { return w.x }
func (w *wave) Period() uint64 { return w.config.Period }
func (w *wave) Y() float64 { return w.points[w.x] }
func (w *wave) X() float64 { return w.x }
func (w *wave) Period() uint64 { return w.config.Period }
func (w *wave) Amplitude() uint64 { return w.config.Amplitude }
func (w *wave) Target() uint64 { return w.config.Target }
func (w *wave) Target() uint64 { return w.config.Target }

// Wave computation functions

Expand Down
3 changes: 2 additions & 1 deletion p2p/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type Database interface {

// WriteBlockHashFirstSeen writes a partial block entry with just the hash
// first seen time if the block doesn't exist yet.
WriteBlockHashFirstSeen(context.Context, common.Hash, time.Time)
WriteBlockHashFirstSeen(context.Context, *enode.Node, common.Hash, time.Time)

// WriteBlockBody will write the block bodies if ShouldWriteBlocks returns
// true.
Expand All @@ -51,6 +51,7 @@ type Database interface {
MaxConcurrentWrites() int
ShouldWriteBlocks() bool
ShouldWriteBlockEvents() bool
ShouldWriteFirstBlockEvent() bool
ShouldWriteTransactions() bool
ShouldWriteTransactionEvents() bool
ShouldWritePeers() bool
Expand Down
66 changes: 37 additions & 29 deletions p2p/database/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type Datastore struct {
maxConcurrency int
shouldWriteBlocks bool
shouldWriteBlockEvents bool
shouldWriteFirstBlockEvent bool
shouldWriteTransactions bool
shouldWriteTransactionEvents bool
shouldWritePeers bool
Expand Down Expand Up @@ -127,6 +128,7 @@ type DatastoreOptions struct {
MaxConcurrency int
ShouldWriteBlocks bool
ShouldWriteBlockEvents bool
ShouldWriteFirstBlockEvent bool
ShouldWriteTransactions bool
ShouldWriteTransactionEvents bool
ShouldWritePeers bool
Expand All @@ -148,6 +150,7 @@ func NewDatastore(ctx context.Context, opts DatastoreOptions) Database {
maxConcurrency: opts.MaxConcurrency,
shouldWriteBlocks: opts.ShouldWriteBlocks,
shouldWriteBlockEvents: opts.ShouldWriteBlockEvents,
shouldWriteFirstBlockEvent: opts.ShouldWriteFirstBlockEvent,
shouldWriteTransactions: opts.ShouldWriteTransactions,
shouldWriteTransactionEvents: opts.ShouldWriteTransactionEvents,
shouldWritePeers: opts.ShouldWritePeers,
Expand Down Expand Up @@ -231,18 +234,28 @@ func (d *Datastore) WriteBlockHashes(ctx context.Context, peer *enode.Node, hash
// WriteBlockHashFirstSeen writes a partial block entry with just the hash
// first seen time if the block doesn't exist yet. If it exists, updates the
// TimeFirstSeenHash if the new time is earlier.
func (d *Datastore) WriteBlockHashFirstSeen(ctx context.Context, hash common.Hash, tfsh time.Time) {
if d.client == nil || !d.ShouldWriteBlocks() {
func (d *Datastore) WriteBlockHashFirstSeen(ctx context.Context, peer *enode.Node, hash common.Hash, tfsh time.Time) {
if d.client == nil || (!d.ShouldWriteBlocks() && !d.shouldWriteFirstBlockEvent) {
return
}

d.runAsync(func() {
d.writeBlockHashFirstSeen(ctx, hash, tfsh)
d.writeBlockHashFirstSeen(ctx, peer, hash, tfsh)
})
}

// writeBlockHashFirstSeen performs the actual transaction to write or update the block hash first seen time.
func (d *Datastore) writeBlockHashFirstSeen(ctx context.Context, hash common.Hash, tfsh time.Time) {
func (d *Datastore) writeBlockHashFirstSeen(ctx context.Context, peer *enode.Node, hash common.Hash, tfsh time.Time) {
// Write block event if flag enabled and block events are disabled (mutually exclusive).
// Cache check in protocol.go already verified first-seen.
if d.shouldWriteFirstBlockEvent && !d.ShouldWriteBlockEvents() && peer != nil {
d.writeEvent(peer, BlockEventsKind, hash, BlocksKind, tfsh)
}

if !d.shouldWriteBlocks {
return
}

key := datastore.NameKey(BlocksKind, hash.Hex(), nil)

_, err := d.client.RunInTransaction(ctx, func(tx *datastore.Transaction) error {
Expand Down Expand Up @@ -390,17 +403,8 @@ func (d *Datastore) newDatastoreHeader(header *types.Header, tfs time.Time, isPa
}
}

// writeFirstSeen updates timing fields on a header and block, preserving earlier timestamps.
func (d *Datastore) writeFirstSeen(header *DatastoreHeader, block *DatastoreBlock, tfs time.Time) {
// Preserve earlier header timing if it exists
if block.DatastoreHeader != nil &&
!block.TimeFirstSeen.IsZero() &&
block.TimeFirstSeen.Before(tfs) {
header.TimeFirstSeen = block.TimeFirstSeen
header.SensorFirstSeen = block.SensorFirstSeen
}

// Set hash timing if it doesn't exist or if new timestamp is earlier
// writeFirstSeen sets hash timing if it doesn't exist or if new timestamp is earlier.
func (d *Datastore) writeFirstSeen(block *DatastoreBlock, tfs time.Time) {
if block.TimeFirstSeenHash.IsZero() || tfs.Before(block.TimeFirstSeenHash) {
block.TimeFirstSeenHash = tfs
block.SensorFirstSeenHash = d.sensorID
Expand Down Expand Up @@ -456,27 +460,27 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big.
// are nil we will just set them.
_ = tx.Get(key, &dsBlock)

shouldWrite := false
modified := false

if dsBlock.DatastoreHeader == nil || tfs.Before(dsBlock.TimeFirstSeen) {
shouldWrite = true
modified = true

// Create new header with current timing
header := d.newDatastoreHeader(block.Header(), tfs, false)

// Preserve earlier timestamps from any earlier announcement
d.writeFirstSeen(header, &dsBlock, tfs)
// Preserve earliest first-seen timestamp
d.writeFirstSeen(&dsBlock, tfs)

dsBlock.DatastoreHeader = header
}

if len(dsBlock.TotalDifficulty) == 0 {
shouldWrite = true
modified = true
dsBlock.TotalDifficulty = td.String()
}

if dsBlock.Transactions == nil && len(block.Transactions()) > 0 {
shouldWrite = true
modified = true
if d.shouldWriteTransactions {
d.writeTransactions(ctx, block.Transactions(), tfs)
}
Expand All @@ -488,15 +492,15 @@ func (d *Datastore) writeBlock(ctx context.Context, block *types.Block, td *big.
}

if dsBlock.Uncles == nil && len(block.Uncles()) > 0 {
shouldWrite = true
modified = true
dsBlock.Uncles = make([]*datastore.Key, 0, len(block.Uncles()))
for _, uncle := range block.Uncles() {
d.writeBlockHeader(ctx, uncle, tfs, false)
dsBlock.Uncles = append(dsBlock.Uncles, datastore.NameKey(BlocksKind, uncle.Hash().Hex(), nil))
}
}

if shouldWrite {
if modified {
_, err := tx.Put(key, &dsBlock)
return err
}
Expand Down Expand Up @@ -568,8 +572,8 @@ func (d *Datastore) writeBlockHeader(ctx context.Context, header *types.Header,
// Create new header with current timing
newHeader := d.newDatastoreHeader(header, tfs, isParent)

// Preserve earlier timestamps from any earlier announcement or full block
d.writeFirstSeen(newHeader, &block, tfs)
// Preserve earliest first-seen timestamp
d.writeFirstSeen(&block, tfs)

block.DatastoreHeader = newHeader
_, err = tx.Put(key, &block)
Expand All @@ -590,10 +594,10 @@ func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, has
log.Debug().Err(err).Str("hash", hash.Hex()).Msg("Failed to fetch block when writing block body")
}

shouldWrite := false
modified := false

if block.Transactions == nil && len(body.Transactions) > 0 {
shouldWrite = true
modified = true
if d.shouldWriteTransactions {
d.writeTransactions(ctx, body.Transactions, tfs)
}
Expand All @@ -605,15 +609,15 @@ func (d *Datastore) writeBlockBody(ctx context.Context, body *eth.BlockBody, has
}

if block.Uncles == nil && len(body.Uncles) > 0 {
shouldWrite = true
modified = true
block.Uncles = make([]*datastore.Key, 0, len(body.Uncles))
for _, uncle := range body.Uncles {
d.writeBlockHeader(ctx, uncle, tfs, false)
block.Uncles = append(block.Uncles, datastore.NameKey(BlocksKind, uncle.Hash().Hex(), nil))
}
}

if shouldWrite {
if modified {
_, err := tx.Put(key, &block)
return err
}
Expand Down Expand Up @@ -642,6 +646,10 @@ func (d *Datastore) writeTransactions(ctx context.Context, txs []*types.Transact
}
}

func (d *Datastore) ShouldWriteFirstBlockEvent() bool {
return d.shouldWriteFirstBlockEvent
}

func (d *Datastore) NodeList(ctx context.Context, limit int) ([]string, error) {
query := datastore.NewQuery(BlockEventsKind).Order("-Time")
iter := d.client.Run(ctx, query)
Expand Down
7 changes: 6 additions & 1 deletion p2p/database/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (j *JSONDatabase) WriteBlockHashes(ctx context.Context, peer *enode.Node, h

// WriteBlockHashFirstSeen writes a partial block entry with just the hash
// first seen time. For JSON output, this writes a separate record type.
func (j *JSONDatabase) WriteBlockHashFirstSeen(ctx context.Context, hash common.Hash, tfsh time.Time) {
func (j *JSONDatabase) WriteBlockHashFirstSeen(ctx context.Context, peer *enode.Node, hash common.Hash, tfsh time.Time) {
if !j.ShouldWriteBlocks() {
return
}
Expand Down Expand Up @@ -374,6 +374,11 @@ func (j *JSONDatabase) ShouldWriteBlockEvents() bool {
return j.shouldWriteBlockEvents
}

// ShouldWriteFirstBlockEvent returns false for JSON database.
func (j *JSONDatabase) ShouldWriteFirstBlockEvent() bool {
return false
}

// ShouldWriteTransactions returns the configured value.
func (j *JSONDatabase) ShouldWriteTransactions() bool {
return j.shouldWriteTransactions
Expand Down
7 changes: 6 additions & 1 deletion p2p/database/nodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (n *nodb) WriteBlockHashes(ctx context.Context, peer *enode.Node, hashes []
}

// WriteBlockHashFirstSeen does nothing.
func (n *nodb) WriteBlockHashFirstSeen(ctx context.Context, hash common.Hash, tfsh time.Time) {
func (n *nodb) WriteBlockHashFirstSeen(ctx context.Context, peer *enode.Node, hash common.Hash, tfsh time.Time) {
}

// WriteBlockBody does nothing.
Expand Down Expand Up @@ -69,6 +69,11 @@ func (n *nodb) ShouldWriteBlockEvents() bool {
return false
}

// ShouldWriteFirstBlockEvent returns false.
func (n *nodb) ShouldWriteFirstBlockEvent() bool {
return false
}

// ShouldWriteTransactions returns false.
func (n *nodb) ShouldWriteTransactions() bool {
return false
Expand Down
1 change: 0 additions & 1 deletion p2p/datastructures/bloomset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,4 +200,3 @@ func BenchmarkBloomSetFilterNotContained(b *testing.B) {
bloom.FilterNotContained(batch)
}
}

11 changes: 7 additions & 4 deletions p2p/datastructures/lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,10 @@ func (c *LRU[K, V]) PeekManyWithKeys(keys []K) ([]K, []V) {

// Update atomically updates a value in the cache using the provided update function.
// The update function receives the current value (or zero value if not found) and
// returns the new value to store. This is thread-safe and prevents race conditions
// in get-modify-add patterns.
func (c *LRU[K, V]) Update(key K, updateFn func(V) V) {
// returns the new value to store. Returns true if the key already existed (and was
// not expired), false if a new entry was created. This is thread-safe and prevents
// race conditions in get-modify-add patterns.
func (c *LRU[K, V]) Update(key K, updateFn func(V) V) (existed bool) {
c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -213,7 +214,7 @@ func (c *LRU[K, V]) Update(key K, updateFn func(V) V) {
c.list.MoveToFront(elem)
e.value = updateFn(currentVal)
e.expiresAt = expiresAt
return
return true
}
// Entry expired, remove it
c.list.Remove(elem)
Expand All @@ -239,6 +240,8 @@ func (c *LRU[K, V]) Update(key K, updateFn func(V) V) {
delete(c.items, e.key)
}
}

return false
}

// Remove removes a key from the cache and returns the value if it existed.
Expand Down
Loading
Loading