Skip to content

Commit a23c25c

Browse files
committed
move update limiter to listener
1 parent d229046 commit a23c25c

2 files changed

Lines changed: 60 additions & 56 deletions

File tree

pkg/database/execution_crawler.go

Lines changed: 0 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -399,58 +399,6 @@ func (db *DB) InsertBlocks(
399399
return nil
400400
}
401401

402-
type UpdateLimiter struct {
403-
m map[string]time.Time
404-
ttl time.Duration
405-
lock sync.Mutex
406-
}
407-
408-
func NewUpdateLimiter(ttl time.Duration) *UpdateLimiter {
409-
limiter := &UpdateLimiter{
410-
m: map[string]time.Time{},
411-
ttl: ttl,
412-
lock: sync.Mutex{},
413-
}
414-
415-
go limiter.runCleaner()
416-
417-
return limiter
418-
}
419-
420-
func (l *UpdateLimiter) runCleaner() {
421-
for {
422-
time.Sleep(time.Minute)
423-
424-
l.lock.Lock()
425-
426-
for key, ts := range l.m {
427-
if time.Since(ts) > l.ttl {
428-
delete(l.m, key)
429-
}
430-
}
431-
432-
l.lock.Unlock()
433-
}
434-
}
435-
436-
func (l *UpdateLimiter) IsLimited(node common.NodeJSON) bool {
437-
l.lock.Lock()
438-
defer l.lock.Unlock()
439-
440-
id := node.IDString()
441-
442-
t, found := l.m[id]
443-
if found && time.Since(t) < l.ttl {
444-
return true
445-
}
446-
447-
l.m[id] = time.Now()
448-
449-
return false
450-
}
451-
452-
var limiter = NewUpdateLimiter(3 * time.Hour)
453-
454402
func (db *DB) UpsertCrawledNode(ctx context.Context, tx pgx.Tx, node common.NodeJSON) error {
455403
defer metrics.NodeUpdateInc(node.Direction.String(), node.Error)
456404

@@ -459,10 +407,6 @@ func (db *DB) UpsertCrawledNode(ctx context.Context, tx pgx.Tx, node common.Node
459407
return nil
460408
}
461409

462-
if limiter.IsLimited(node) {
463-
return nil
464-
}
465-
466410
if !node.EthNode {
467411
err := db.UpdateNotEthNode(ctx, tx, node)
468412
if err != nil {

pkg/execution/listener/listener.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,15 @@ package listener
33
import (
44
"context"
55
"crypto/ecdsa"
6+
"encoding/hex"
67
"fmt"
78
"net"
89
"sync"
910
"time"
1011

1112
"log/slog"
1213

14+
ethp2p "github.com/ethereum/go-ethereum/p2p"
1315
"github.com/ethereum/go-ethereum/p2p/enode"
1416
"github.com/ethereum/go-ethereum/p2p/netutil"
1517
"github.com/ethereum/node-crawler/pkg/common"
@@ -138,6 +140,58 @@ func (l *Listener) startListener(ctx context.Context, nodeKey *ecdsa.PrivateKey,
138140
}()
139141
}
140142

143+
type UpdateLimiter struct {
144+
m map[string]time.Time
145+
ttl time.Duration
146+
lock sync.Mutex
147+
}
148+
149+
func NewUpdateLimiter(ttl time.Duration) *UpdateLimiter {
150+
limiter := &UpdateLimiter{
151+
m: map[string]time.Time{},
152+
ttl: ttl,
153+
lock: sync.Mutex{},
154+
}
155+
156+
go limiter.runCleaner()
157+
158+
return limiter
159+
}
160+
161+
func (l *UpdateLimiter) runCleaner() {
162+
for {
163+
time.Sleep(time.Minute)
164+
165+
l.lock.Lock()
166+
167+
for key, ts := range l.m {
168+
if time.Since(ts) > l.ttl {
169+
delete(l.m, key)
170+
}
171+
}
172+
173+
l.lock.Unlock()
174+
}
175+
}
176+
177+
func (l *UpdateLimiter) IsLimited(pubkey *ecdsa.PublicKey) bool {
178+
l.lock.Lock()
179+
defer l.lock.Unlock()
180+
181+
id := hex.EncodeToString(common.PubkeyBytes(pubkey))
182+
183+
t, found := l.m[id]
184+
if found && time.Since(t) < l.ttl {
185+
return true
186+
}
187+
188+
l.m[id] = time.Now()
189+
190+
return false
191+
}
192+
193+
var limiter = NewUpdateLimiter(3 * time.Hour)
194+
141195
func (l *Listener) crawlPeer(ctx context.Context, nodeKey *ecdsa.PrivateKey, fd net.Conn) {
142196
pubKey, conn, err := p2p.Accept(nodeKey, fd)
143197
if err != nil {
@@ -150,6 +204,12 @@ func (l *Listener) crawlPeer(ctx context.Context, nodeKey *ecdsa.PrivateKey, fd
150204
}
151205
defer conn.Close()
152206

207+
if limiter.IsLimited(pubKey) {
208+
_ = conn.Write(p2p.Disconnect{Reason: ethp2p.DiscTooManyPeers})
209+
210+
return
211+
}
212+
153213
err = l.db.WithTxAsync(
154214
ctx,
155215
database.TxOptionsDeferrable,

0 commit comments

Comments
 (0)