diff --git a/pkg/node/node.go b/pkg/node/node.go index 04d17fa9c18..ee89dc2fa1a 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -90,6 +90,7 @@ import ( const LoggerName = "node" type Bee struct { + logger log.Logger p2pService io.Closer p2pHalter p2p.Halter ctxCancel context.CancelFunc @@ -261,6 +262,7 @@ func NewBee( }) b = &Bee{ + logger: logger, ctxCancel: ctxCancel, errorLogWriter: sink, tracerCloser: tracerCloser, @@ -1351,12 +1353,18 @@ func (b *Bee) Shutdown() error { } // tryClose is a convenient closure which decrease // repetitive io.Closer tryClose procedure. - tryClose := func(c io.Closer, errMsg string) { + tryClose := func(c io.Closer, component string) { if c == nil { return } + + start := time.Now() + b.logger.Debug("starting shutdown", "component", component) + defer func() { + b.logger.Debug("finished shutdown", "component", component, "elapsed", time.Since(start)) + }() if err := c.Close(); err != nil { - mErr = multierror.Append(mErr, fmt.Errorf("%s: %w", errMsg, err)) + mErr = multierror.Append(mErr, fmt.Errorf("%s: %w", component, err)) } } @@ -1431,9 +1439,10 @@ func (b *Bee) Shutdown() error { tryClose(b.topologyCloser, "topology driver") tryClose(b.storageIncetivesCloser, "storage incentives agent") tryClose(b.stabilizationDetector, "stabilization detector") + // close localstore before StateStore to avoid ErrClosed / incomplete flush. + tryClose(b.localstoreCloser, "localstore") tryClose(b.stateStoreCloser, "statestore") tryClose(b.stamperStoreCloser, "stamperstore") - tryClose(b.localstoreCloser, "localstore") tryClose(b.resolverCloser, "resolver service") return mErr diff --git a/pkg/p2p/libp2p/libp2p.go b/pkg/p2p/libp2p/libp2p.go index 9b696cd26e2..316cbd6171f 100644 --- a/pkg/p2p/libp2p/libp2p.go +++ b/pkg/p2p/libp2p/libp2p.go @@ -1019,7 +1019,14 @@ func (s *Service) Connect(ctx context.Context, addrs []ma.Multiaddr) (address *b s.metrics.ConnectBreakerCount.Inc() return nil, p2p.NewConnectionBackoffError(err, s.connectionBreaker.ClosedUntil()) } - s.logger.Warning("libp2p connect", "peer_id", peerID, "underlay", info.Addrs, "error", err) + if !errors.Is(err, context.Canceled) { + select { + case <-s.halt: + s.logger.Debug("libp2p connect", "peer_id", peerID, "underlay", info.Addrs, "error", err) + default: + s.logger.Warning("libp2p connect", "peer_id", peerID, "underlay", info.Addrs, "error", err) + } + } connectErr = err continue } diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index 0a1c0da6f60..27e6d176998 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -193,7 +193,7 @@ func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Strea } // slow down future requests - waitDur, err := s.limiter.Wait(streamCtx, p.Address.ByteString(), max(1, len(chs))) + waitDur, err := s.limiter.Wait(ctx, p.Address.ByteString(), max(1, len(chs))) if err != nil { return fmt.Errorf("rate limiter: %w", err) }