Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ import (
const LoggerName = "node"

type Bee struct {
logger log.Logger
p2pService io.Closer
p2pHalter p2p.Halter
ctxCancel context.CancelFunc
Expand Down Expand Up @@ -261,6 +262,7 @@ func NewBee(
})

b = &Bee{
logger: logger,
ctxCancel: ctxCancel,
errorLogWriter: sink,
tracerCloser: tracerCloser,
Expand Down Expand Up @@ -1351,12 +1353,18 @@ func (b *Bee) Shutdown() error {
}
// tryClose is a convenient closure which decrease
// repetitive io.Closer tryClose procedure.
tryClose := func(c io.Closer, errMsg string) {
tryClose := func(c io.Closer, component string) {
if c == nil {
return
}

start := time.Now()
b.logger.Debug("starting shutdown", "component", component)
defer func() {
b.logger.Debug("finished shutdown", "component", component, "elapsed", time.Since(start))
}()
if err := c.Close(); err != nil {
mErr = multierror.Append(mErr, fmt.Errorf("%s: %w", errMsg, err))
mErr = multierror.Append(mErr, fmt.Errorf("%s: %w", component, err))
}
}

Expand Down Expand Up @@ -1431,9 +1439,10 @@ func (b *Bee) Shutdown() error {
tryClose(b.topologyCloser, "topology driver")
tryClose(b.storageIncetivesCloser, "storage incentives agent")
tryClose(b.stabilizationDetector, "stabilization detector")
// close localstore before StateStore to avoid ErrClosed / incomplete flush.
tryClose(b.localstoreCloser, "localstore")
tryClose(b.stateStoreCloser, "statestore")
tryClose(b.stamperStoreCloser, "stamperstore")
tryClose(b.localstoreCloser, "localstore")
tryClose(b.resolverCloser, "resolver service")

return mErr
Expand Down
9 changes: 8 additions & 1 deletion pkg/p2p/libp2p/libp2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,14 @@ func (s *Service) Connect(ctx context.Context, addrs []ma.Multiaddr) (address *b
s.metrics.ConnectBreakerCount.Inc()
return nil, p2p.NewConnectionBackoffError(err, s.connectionBreaker.ClosedUntil())
}
s.logger.Warning("libp2p connect", "peer_id", peerID, "underlay", info.Addrs, "error", err)
if !errors.Is(err, context.Canceled) {
select {
case <-s.halt:
s.logger.Debug("libp2p connect", "peer_id", peerID, "underlay", info.Addrs, "error", err)
default:
s.logger.Warning("libp2p connect", "peer_id", peerID, "underlay", info.Addrs, "error", err)
}
}
connectErr = err
continue
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pullsync/pullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Strea
}

// slow down future requests
waitDur, err := s.limiter.Wait(streamCtx, p.Address.ByteString(), max(1, len(chs)))
waitDur, err := s.limiter.Wait(ctx, p.Address.ByteString(), max(1, len(chs)))
if err != nil {
return fmt.Errorf("rate limiter: %w", err)
}
Expand Down
Loading