Skip to content

Commit 478bfab

Browse files
committed
fix: speed up node shutdown.
Fixed order of closing components. Removed unnecessary logging. Made Close() methods safer. Fixed context in pull sync component.
1 parent 5231811 commit 478bfab

19 files changed

Lines changed: 100 additions & 40 deletions

File tree

pkg/api/api.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -174,8 +174,9 @@ type Service struct {
174174

175175
metrics metrics
176176

177-
wsWg sync.WaitGroup // wait for all websockets to close on exit
178-
quit chan struct{}
177+
wsWg sync.WaitGroup // wait for all websockets to close on exit
178+
closeOnce sync.Once
179+
quit chan struct{}
179180

180181
overlay *swarm.Address
181182
publicKey ecdsa.PublicKey
@@ -398,7 +399,9 @@ func (s *Service) SetIsWarmingUp(v bool) {
398399
// Close hangs up running websockets on shutdown.
399400
func (s *Service) Close() error {
400401
s.logger.Info("api shutting down")
401-
close(s.quit)
402+
s.closeOnce.Do(func() {
403+
close(s.quit)
404+
})
402405

403406
done := make(chan struct{})
404407
go func() {

pkg/blocker/blocker.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ type Blocker struct {
3737
logger log.Logger
3838
wakeupCh chan struct{}
3939
quit chan struct{}
40+
closeOnce sync.Once
4041
closeWg sync.WaitGroup
4142
blocklistCallback func(swarm.Address)
4243
}
@@ -154,9 +155,10 @@ func (b *Blocker) PruneUnseen(seen []swarm.Address) {
154155
}
155156

156157
// Close will exit the worker loop.
157-
// must be called only once.
158158
func (b *Blocker) Close() error {
159-
close(b.quit)
159+
b.closeOnce.Do(func() {
160+
close(b.quit)
161+
})
160162
b.closeWg.Wait()
161163
return nil
162164
}

pkg/gsoc/gsoc.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ type Listener interface {
2525
type listener struct {
2626
handlers map[string][]*Handler
2727
handlersMu sync.Mutex
28+
closeOnce sync.Once
2829
quit chan struct{}
2930
logger log.Logger
3031
}
@@ -86,7 +87,9 @@ func (p *listener) getHandlers(address swarm.Address) []*Handler {
8687
}
8788

8889
func (l *listener) Close() error {
89-
close(l.quit)
90+
l.closeOnce.Do(func() {
91+
close(l.quit)
92+
})
9093
l.handlersMu.Lock()
9194
defer l.handlersMu.Unlock()
9295

pkg/hive/hive.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ type Service struct {
5858
metrics metrics
5959
inLimiter *ratelimit.Limiter
6060
outLimiter *ratelimit.Limiter
61+
closeOnce sync.Once
6162
quit chan struct{}
6263
wg sync.WaitGroup
6364
peersChan chan pb.Peers
@@ -144,7 +145,9 @@ func (s *Service) SetAddPeersHandler(h func(addr ...swarm.Address)) {
144145
}
145146

146147
func (s *Service) Close() error {
147-
close(s.quit)
148+
s.closeOnce.Do(func() {
149+
close(s.quit)
150+
})
148151

149152
stopped := make(chan struct{})
150153
go func() {

pkg/node/node.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,7 @@ import (
9090
const LoggerName = "node"
9191

9292
type Bee struct {
93+
logger log.Logger
9394
p2pService io.Closer
9495
p2pHalter p2p.Halter
9596
ctxCancel context.CancelFunc
@@ -260,6 +261,7 @@ func NewBee(
260261
})
261262

262263
b = &Bee{
264+
logger: logger,
263265
ctxCancel: ctxCancel,
264266
errorLogWriter: sink,
265267
tracerCloser: tracerCloser,
@@ -1350,12 +1352,15 @@ func (b *Bee) Shutdown() error {
13501352
}
13511353
// tryClose is a convenient closure which decrease
13521354
// repetitive io.Closer tryClose procedure.
1353-
tryClose := func(c io.Closer, errMsg string) {
1355+
tryClose := func(c io.Closer, component string) {
13541356
if c == nil {
13551357
return
13561358
}
1359+
1360+
b.logger.Debug("starting shutdown", "component", component)
1361+
defer b.logger.Debug("finished shutdown", "component", component)
13571362
if err := c.Close(); err != nil {
1358-
mErr = multierror.Append(mErr, fmt.Errorf("%s: %w", errMsg, err))
1363+
mErr = multierror.Append(mErr, fmt.Errorf("%s: %w", component, err))
13591364
}
13601365
}
13611366

@@ -1429,9 +1434,10 @@ func (b *Bee) Shutdown() error {
14291434
tryClose(b.tracerCloser, "tracer")
14301435
tryClose(b.topologyCloser, "topology driver")
14311436
tryClose(b.storageIncetivesCloser, "storage incentives agent")
1437+
// close localstore before StateStore to avoid ErrClosed / incomplete flush.
1438+
tryClose(b.localstoreCloser, "localstore")
14321439
tryClose(b.stateStoreCloser, "statestore")
14331440
tryClose(b.stamperStoreCloser, "stamperstore")
1434-
tryClose(b.localstoreCloser, "localstore")
14351441
tryClose(b.resolverCloser, "resolver service")
14361442

14371443
return mErr

pkg/p2p/libp2p/internal/reacher/reacher.go

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,9 @@ type reacher struct {
4343
peerHeap peerHeap // min-heap ordered by retryAfter
4444
peerIndex map[string]*peer // lookup by overlay for O(1) access
4545

46-
newPeer chan struct{}
47-
quit chan struct{}
46+
newPeer chan struct{}
47+
quit chan struct{}
48+
closeOnce sync.Once
4849

4950
pinger p2p.Pinger
5051
notifier p2p.ReachableNotifier
@@ -290,15 +291,11 @@ func (r *reacher) jitter(d time.Duration) time.Duration {
290291
return time.Duration(float64(d) * j)
291292
}
292293

293-
// Close stops the worker. Must be called once.
294+
// Close stops the worker.
294295
func (r *reacher) Close() error {
295-
select {
296-
case <-r.quit:
297-
return nil
298-
default:
299-
}
300-
301-
close(r.quit)
296+
r.closeOnce.Do(func() {
297+
close(r.quit)
298+
})
302299
r.wg.Wait()
303300
return nil
304301
}

pkg/p2p/libp2p/libp2p.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1019,7 +1019,14 @@ func (s *Service) Connect(ctx context.Context, addrs []ma.Multiaddr) (address *b
10191019
s.metrics.ConnectBreakerCount.Inc()
10201020
return nil, p2p.NewConnectionBackoffError(err, s.connectionBreaker.ClosedUntil())
10211021
}
1022-
s.logger.Warning("libp2p connect", "peer_id", peerID, "underlay", info.Addrs, "error", err)
1022+
if !errors.Is(err, context.Canceled) {
1023+
select {
1024+
case <-s.halt:
1025+
s.logger.Debug("libp2p connect", "peer_id", peerID, "underlay", info.Addrs, "error", err)
1026+
default:
1027+
s.logger.Warning("libp2p connect", "peer_id", peerID, "underlay", info.Addrs, "error", err)
1028+
}
1029+
}
10231030
connectErr = err
10241031
continue
10251032
}

pkg/postage/listener/listener.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ type listener struct {
5757

5858
postageStampContractAddress common.Address
5959
postageStampContractABI abi.ABI
60+
closeOnce sync.Once
6061
quit chan struct{}
6162
wg sync.WaitGroup
6263
metrics metrics
@@ -374,7 +375,9 @@ func (l *listener) Listen(ctx context.Context, from uint64, updater postage.Even
374375
}
375376

376377
func (l *listener) Close() error {
377-
close(l.quit)
378+
l.closeOnce.Do(func() {
379+
close(l.quit)
380+
})
378381

379382
done := make(chan struct{})
380383
go func() {

pkg/pss/pss.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ type pss struct {
5555
handlersMu sync.Mutex
5656
metrics metrics
5757
logger log.Logger
58+
closeOnce sync.Once
5859
quit chan struct{}
5960
}
6061

@@ -70,7 +71,9 @@ func New(key *ecdsa.PrivateKey, logger log.Logger) Interface {
7071
}
7172

7273
func (ps *pss) Close() error {
73-
close(ps.quit)
74+
ps.closeOnce.Do(func() {
75+
close(ps.quit)
76+
})
7477
ps.handlersMu.Lock()
7578
defer ps.handlersMu.Unlock()
7679

pkg/pullsync/pullsync.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"fmt"
1414
"io"
1515
"math"
16+
"sync"
1617
"sync/atomic"
1718
"time"
1819

@@ -66,6 +67,7 @@ type Syncer struct {
6667
metrics metrics
6768
logger log.Logger
6869
store storer.Reserve
70+
closeOnce sync.Once
6971
quit chan struct{}
7072
unwrap func(swarm.Chunk)
7173
gsocHandler func(*soc.SOC)
@@ -193,7 +195,7 @@ func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Strea
193195
}
194196

195197
// slow down future requests
196-
waitDur, err := s.limiter.Wait(streamCtx, p.Address.ByteString(), max(1, len(chs)))
198+
waitDur, err := s.limiter.Wait(ctx, p.Address.ByteString(), max(1, len(chs)))
197199
if err != nil {
198200
return fmt.Errorf("rate limiter: %w", err)
199201
}
@@ -568,7 +570,9 @@ func (s *Syncer) disconnect(peer p2p.Peer) error {
568570

569571
func (s *Syncer) Close() error {
570572
s.logger.Info("pull syncer shutting down")
571-
close(s.quit)
573+
s.closeOnce.Do(func() {
574+
close(s.quit)
575+
})
572576
cc := make(chan struct{})
573577
go func() {
574578
defer close(cc)

0 commit comments

Comments
 (0)