From 7bed9a28ae3e1d5fafc963fce329efa5eebc5a0d Mon Sep 17 00:00:00 2001 From: sbackend Date: Mon, 23 Mar 2026 13:38:49 +0100 Subject: [PATCH 1/5] fix: speed up node shutdown Fixed order of closing components. Removed unnecessary logging. Made Close() methods safer. Fixed context in pull sync component. --- pkg/api/api.go | 9 ++++++--- pkg/blocker/blocker.go | 6 ++++-- pkg/gsoc/gsoc.go | 5 ++++- pkg/hive/hive.go | 5 ++++- pkg/node/node.go | 12 +++++++++--- pkg/p2p/libp2p/internal/reacher/reacher.go | 17 +++++++---------- pkg/p2p/libp2p/libp2p.go | 9 ++++++++- pkg/postage/listener/listener.go | 5 ++++- pkg/pss/pss.go | 5 ++++- pkg/pullsync/pullsync.go | 8 ++++++-- pkg/pusher/pusher.go | 5 ++++- pkg/salud/salud.go | 5 ++++- pkg/settlement/swap/priceoracle/priceoracle.go | 6 +++++- pkg/sharky/store.go | 7 +++++-- pkg/shed/db.go | 12 ++++++++---- pkg/skippeers/skippeers.go | 9 ++++++--- pkg/storageincentives/agent.go | 5 ++++- pkg/storer/storer.go | 5 ++++- pkg/topology/kademlia/kademlia.go | 5 ++++- 19 files changed, 100 insertions(+), 40 deletions(-) diff --git a/pkg/api/api.go b/pkg/api/api.go index 2b713bc76d4..29de7e658dd 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -174,8 +174,9 @@ type Service struct { metrics metrics - wsWg sync.WaitGroup // wait for all websockets to close on exit - quit chan struct{} + wsWg sync.WaitGroup // wait for all websockets to close on exit + closeOnce sync.Once + quit chan struct{} overlay *swarm.Address publicKey ecdsa.PublicKey @@ -398,7 +399,9 @@ func (s *Service) SetIsWarmingUp(v bool) { // Close hangs up running websockets on shutdown. func (s *Service) Close() error { s.logger.Info("api shutting down") - close(s.quit) + s.closeOnce.Do(func() { + close(s.quit) + }) done := make(chan struct{}) go func() { diff --git a/pkg/blocker/blocker.go b/pkg/blocker/blocker.go index 2d3deb10bf1..d15bcef1a1b 100644 --- a/pkg/blocker/blocker.go +++ b/pkg/blocker/blocker.go @@ -37,6 +37,7 @@ type Blocker struct { logger log.Logger wakeupCh chan struct{} quit chan struct{} + closeOnce sync.Once closeWg sync.WaitGroup blocklistCallback func(swarm.Address) } @@ -154,9 +155,10 @@ func (b *Blocker) PruneUnseen(seen []swarm.Address) { } // Close will exit the worker loop. -// must be called only once. func (b *Blocker) Close() error { - close(b.quit) + b.closeOnce.Do(func() { + close(b.quit) + }) b.closeWg.Wait() return nil } diff --git a/pkg/gsoc/gsoc.go b/pkg/gsoc/gsoc.go index 464681258e2..ca778bf9635 100644 --- a/pkg/gsoc/gsoc.go +++ b/pkg/gsoc/gsoc.go @@ -25,6 +25,7 @@ type Listener interface { type listener struct { handlers map[string][]*Handler handlersMu sync.Mutex + closeOnce sync.Once quit chan struct{} logger log.Logger } @@ -86,7 +87,9 @@ func (p *listener) getHandlers(address swarm.Address) []*Handler { } func (l *listener) Close() error { - close(l.quit) + l.closeOnce.Do(func() { + close(l.quit) + }) l.handlersMu.Lock() defer l.handlersMu.Unlock() diff --git a/pkg/hive/hive.go b/pkg/hive/hive.go index 8f8572bcb34..0dc53e406a9 100644 --- a/pkg/hive/hive.go +++ b/pkg/hive/hive.go @@ -58,6 +58,7 @@ type Service struct { metrics metrics inLimiter *ratelimit.Limiter outLimiter *ratelimit.Limiter + closeOnce sync.Once quit chan struct{} wg sync.WaitGroup peersChan chan pb.Peers @@ -144,7 +145,9 @@ func (s *Service) SetAddPeersHandler(h func(addr ...swarm.Address)) { } func (s *Service) Close() error { - close(s.quit) + s.closeOnce.Do(func() { + close(s.quit) + }) stopped := make(chan struct{}) go func() { diff --git a/pkg/node/node.go b/pkg/node/node.go index 50ed1a44b82..a0cee2a4e67 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -90,6 +90,7 @@ import ( const LoggerName = "node" type Bee struct { + logger log.Logger p2pService io.Closer p2pHalter p2p.Halter ctxCancel context.CancelFunc @@ -260,6 +261,7 @@ func NewBee( }) b = &Bee{ + logger: logger, ctxCancel: ctxCancel, errorLogWriter: sink, tracerCloser: tracerCloser, @@ -1350,12 +1352,15 @@ func (b *Bee) Shutdown() error { } // tryClose is a convenient closure which decrease // repetitive io.Closer tryClose procedure. - tryClose := func(c io.Closer, errMsg string) { + tryClose := func(c io.Closer, component string) { if c == nil { return } + + b.logger.Debug("starting shutdown", "component", component) + defer b.logger.Debug("finished shutdown", "component", component) if err := c.Close(); err != nil { - mErr = multierror.Append(mErr, fmt.Errorf("%s: %w", errMsg, err)) + mErr = multierror.Append(mErr, fmt.Errorf("%s: %w", component, err)) } } @@ -1429,9 +1434,10 @@ func (b *Bee) Shutdown() error { tryClose(b.tracerCloser, "tracer") tryClose(b.topologyCloser, "topology driver") tryClose(b.storageIncetivesCloser, "storage incentives agent") + // close localstore before StateStore to avoid ErrClosed / incomplete flush. + tryClose(b.localstoreCloser, "localstore") tryClose(b.stateStoreCloser, "statestore") tryClose(b.stamperStoreCloser, "stamperstore") - tryClose(b.localstoreCloser, "localstore") tryClose(b.resolverCloser, "resolver service") return mErr diff --git a/pkg/p2p/libp2p/internal/reacher/reacher.go b/pkg/p2p/libp2p/internal/reacher/reacher.go index 966b5978411..1a97b1dec08 100644 --- a/pkg/p2p/libp2p/internal/reacher/reacher.go +++ b/pkg/p2p/libp2p/internal/reacher/reacher.go @@ -43,8 +43,9 @@ type reacher struct { peerHeap peerHeap // min-heap ordered by retryAfter peerIndex map[string]*peer // lookup by overlay for O(1) access - newPeer chan struct{} - quit chan struct{} + newPeer chan struct{} + quit chan struct{} + closeOnce sync.Once pinger p2p.Pinger notifier p2p.ReachableNotifier @@ -290,15 +291,11 @@ func (r *reacher) jitter(d time.Duration) time.Duration { return time.Duration(float64(d) * j) } -// Close stops the worker. Must be called once. +// Close stops the worker. func (r *reacher) Close() error { - select { - case <-r.quit: - return nil - default: - } - - close(r.quit) + r.closeOnce.Do(func() { + close(r.quit) + }) r.wg.Wait() return nil } diff --git a/pkg/p2p/libp2p/libp2p.go b/pkg/p2p/libp2p/libp2p.go index 9b696cd26e2..316cbd6171f 100644 --- a/pkg/p2p/libp2p/libp2p.go +++ b/pkg/p2p/libp2p/libp2p.go @@ -1019,7 +1019,14 @@ func (s *Service) Connect(ctx context.Context, addrs []ma.Multiaddr) (address *b s.metrics.ConnectBreakerCount.Inc() return nil, p2p.NewConnectionBackoffError(err, s.connectionBreaker.ClosedUntil()) } - s.logger.Warning("libp2p connect", "peer_id", peerID, "underlay", info.Addrs, "error", err) + if !errors.Is(err, context.Canceled) { + select { + case <-s.halt: + s.logger.Debug("libp2p connect", "peer_id", peerID, "underlay", info.Addrs, "error", err) + default: + s.logger.Warning("libp2p connect", "peer_id", peerID, "underlay", info.Addrs, "error", err) + } + } connectErr = err continue } diff --git a/pkg/postage/listener/listener.go b/pkg/postage/listener/listener.go index 53feb3a5299..df7811417d7 100644 --- a/pkg/postage/listener/listener.go +++ b/pkg/postage/listener/listener.go @@ -57,6 +57,7 @@ type listener struct { postageStampContractAddress common.Address postageStampContractABI abi.ABI + closeOnce sync.Once quit chan struct{} wg sync.WaitGroup metrics metrics @@ -374,7 +375,9 @@ func (l *listener) Listen(ctx context.Context, from uint64, updater postage.Even } func (l *listener) Close() error { - close(l.quit) + l.closeOnce.Do(func() { + close(l.quit) + }) done := make(chan struct{}) go func() { diff --git a/pkg/pss/pss.go b/pkg/pss/pss.go index 28319e9a615..0387e30efaf 100644 --- a/pkg/pss/pss.go +++ b/pkg/pss/pss.go @@ -55,6 +55,7 @@ type pss struct { handlersMu sync.Mutex metrics metrics logger log.Logger + closeOnce sync.Once quit chan struct{} } @@ -70,7 +71,9 @@ func New(key *ecdsa.PrivateKey, logger log.Logger) Interface { } func (ps *pss) Close() error { - close(ps.quit) + ps.closeOnce.Do(func() { + close(ps.quit) + }) ps.handlersMu.Lock() defer ps.handlersMu.Unlock() diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index 0a1c0da6f60..304c1a36cc1 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -13,6 +13,7 @@ import ( "fmt" "io" "math" + "sync" "sync/atomic" "time" @@ -66,6 +67,7 @@ type Syncer struct { metrics metrics logger log.Logger store storer.Reserve + closeOnce sync.Once quit chan struct{} unwrap func(swarm.Chunk) gsocHandler func(*soc.SOC) @@ -193,7 +195,7 @@ func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Strea } // slow down future requests - waitDur, err := s.limiter.Wait(streamCtx, p.Address.ByteString(), max(1, len(chs))) + waitDur, err := s.limiter.Wait(ctx, p.Address.ByteString(), max(1, len(chs))) if err != nil { return fmt.Errorf("rate limiter: %w", err) } @@ -568,7 +570,9 @@ func (s *Syncer) disconnect(peer p2p.Peer) error { func (s *Syncer) Close() error { s.logger.Info("pull syncer shutting down") - close(s.quit) + s.closeOnce.Do(func() { + close(s.quit) + }) cc := make(chan struct{}) go func() { defer close(cc) diff --git a/pkg/pusher/pusher.go b/pkg/pusher/pusher.go index 9c4073f1fcc..b5997ff8fc5 100644 --- a/pkg/pusher/pusher.go +++ b/pkg/pusher/pusher.go @@ -55,6 +55,7 @@ type Service struct { batchExist postage.BatchExist logger log.Logger metrics metrics + closeOnce sync.Once quit chan struct{} chunksWorkerQuitC chan struct{} inflight *inflight @@ -363,7 +364,9 @@ func (s *Service) AddFeed(c <-chan *Op) { func (s *Service) Close() error { s.logger.Info("pusher shutting down") - close(s.quit) + s.closeOnce.Do(func() { + close(s.quit) + }) // Wait for chunks worker to finish select { diff --git a/pkg/salud/salud.go b/pkg/salud/salud.go index 1cf4ffa8373..3d75e9257a9 100644 --- a/pkg/salud/salud.go +++ b/pkg/salud/salud.go @@ -44,6 +44,7 @@ type peerStatus interface { type service struct { wg sync.WaitGroup + closeOnce sync.Once quit chan struct{} logger log.Logger topology topologyDriver @@ -116,7 +117,9 @@ func (s *service) worker(startupStabilizer stabilization.Subscriber, mode string } func (s *service) Close() error { - close(s.quit) + s.closeOnce.Do(func() { + close(s.quit) + }) s.wg.Wait() return nil } diff --git a/pkg/settlement/swap/priceoracle/priceoracle.go b/pkg/settlement/swap/priceoracle/priceoracle.go index 1b60b7c89ec..8e2cf248e37 100644 --- a/pkg/settlement/swap/priceoracle/priceoracle.go +++ b/pkg/settlement/swap/priceoracle/priceoracle.go @@ -9,6 +9,7 @@ import ( "errors" "io" "math/big" + "sync" "time" "github.com/ethereum/go-ethereum/accounts/abi" @@ -33,6 +34,7 @@ type service struct { exchangeRate *big.Int deduction *big.Int timeDivisor int64 + closeOnce sync.Once quitC chan struct{} } @@ -148,6 +150,8 @@ func (s *service) CurrentRates() (exchangeRate, deduction *big.Int, err error) { } func (s *service) Close() error { - close(s.quitC) + s.closeOnce.Do(func() { + close(s.quitC) + }) return nil } diff --git a/pkg/sharky/store.go b/pkg/sharky/store.go index d8947f9d042..ac100afdb32 100644 --- a/pkg/sharky/store.go +++ b/pkg/sharky/store.go @@ -32,7 +32,8 @@ type Store struct { writes chan write // shared write operations channel shards []*shard // shards wg *sync.WaitGroup // count started operations - quit chan struct{} // quit channel + closeOnce sync.Once + quit chan struct{} // quit channel metrics metrics } @@ -65,7 +66,9 @@ func New(basedir fs.FS, shardCnt int, maxDataSize int) (*Store, error) { // Close closes each shard and return incidental errors from each shard func (s *Store) Close() error { - close(s.quit) + s.closeOnce.Do(func() { + close(s.quit) + }) err := new(multierror.Error) for _, sh := range s.shards { err = multierror.Append(err, sh.close()) diff --git a/pkg/shed/db.go b/pkg/shed/db.go index 1106897d008..dc3348d9105 100644 --- a/pkg/shed/db.go +++ b/pkg/shed/db.go @@ -24,6 +24,7 @@ package shed import ( "errors" + "sync" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/iterator" @@ -51,9 +52,10 @@ type Options struct { // It provides a schema functionality to store fields and indexes // information about naming and types. type DB struct { - ldb *leveldb.DB - metrics metrics - quit chan struct{} // Quit channel to stop the metrics collection before closing the database + ldb *leveldb.DB + metrics metrics + closeOnce sync.Once + quit chan struct{} // Quit channel to stop the metrics collection before closing the database } // NewDB constructs a new DB and validates the schema @@ -193,6 +195,8 @@ func (db *DB) Compact(start, end []byte) error { // Close closes LevelDB database. func (db *DB) Close() (err error) { - close(db.quit) + db.closeOnce.Do(func() { + close(db.quit) + }) return db.ldb.Close() } diff --git a/pkg/skippeers/skippeers.go b/pkg/skippeers/skippeers.go index 20bbe3ba8f7..05787794d6d 100644 --- a/pkg/skippeers/skippeers.go +++ b/pkg/skippeers/skippeers.go @@ -17,8 +17,9 @@ const maxDuration time.Duration = math.MaxInt64 type List struct { mtx sync.Mutex - durC chan time.Duration - quit chan struct{} + closeOnce sync.Once + durC chan time.Duration + quit chan struct{} // key is chunk address, value is map of peer address to expiration skip map[string]map[string]int64 @@ -133,7 +134,9 @@ func (l *List) pruneChunk(ch string, now int64) int { } func (l *List) Close() error { - close(l.quit) + l.closeOnce.Do(func() { + close(l.quit) + }) l.wg.Wait() l.mtx.Lock() diff --git a/pkg/storageincentives/agent.go b/pkg/storageincentives/agent.go index 5142e97836e..7c84036ef6b 100644 --- a/pkg/storageincentives/agent.go +++ b/pkg/storageincentives/agent.go @@ -65,6 +65,7 @@ type Agent struct { store storer.Reserve fullSyncedFunc func() bool overlay swarm.Address + closeOnce sync.Once quit chan struct{} wg sync.WaitGroup state *RedistributionState @@ -547,7 +548,9 @@ func (a *Agent) commit(ctx context.Context, sample SampleData, round uint64) err } func (a *Agent) Close() error { - close(a.quit) + a.closeOnce.Do(func() { + close(a.quit) + }) stopped := make(chan struct{}) go func() { diff --git a/pkg/storer/storer.go b/pkg/storer/storer.go index d4a41c73491..38fb6cddec4 100644 --- a/pkg/storer/storer.go +++ b/pkg/storer/storer.go @@ -436,6 +436,7 @@ type DB struct { cacheLimiter cacheLimiter dbCloser io.Closer subscriptionsWG sync.WaitGroup + closeOnce sync.Once events *events.Subscriber directUploadLimiter chan struct{} @@ -627,7 +628,9 @@ func (db *DB) StatusMetrics() []prometheus.Collector { } func (db *DB) Close() error { - close(db.quit) + db.closeOnce.Do(func() { + close(db.quit) + }) bgReserveWorkersClosed := make(chan struct{}) go func() { diff --git a/pkg/topology/kademlia/kademlia.go b/pkg/topology/kademlia/kademlia.go index 6dae0c69fa5..a2ca1edcacf 100644 --- a/pkg/topology/kademlia/kademlia.go +++ b/pkg/topology/kademlia/kademlia.go @@ -193,6 +193,7 @@ type Kad struct { collector *im.Collector quit chan struct{} // quit channel halt chan struct{} // halt channel + closeOnce sync.Once done chan struct{} // signal that `manage` has quit wg sync.WaitGroup waitNext *waitnext.WaitNext @@ -1548,7 +1549,9 @@ func (k *Kad) Halt() { // Close shuts down kademlia. func (k *Kad) Close() error { k.logger.Info("kademlia shutting down") - close(k.quit) + k.closeOnce.Do(func() { + close(k.quit) + }) cc := make(chan struct{}) k.bgBroadcastCancel() From 4deb8ab48c9eb665e1daca7685d4addb436a5295 Mon Sep 17 00:00:00 2001 From: sbackend Date: Tue, 31 Mar 2026 14:57:24 +0200 Subject: [PATCH 2/5] fix: remove close.Once after discussion --- pkg/api/api.go | 9 ++------- pkg/blocker/blocker.go | 6 ++---- pkg/gsoc/gsoc.go | 5 +---- pkg/hive/hive.go | 5 +---- pkg/p2p/libp2p/internal/reacher/reacher.go | 17 ++++++++++------- pkg/postage/listener/listener.go | 5 +---- pkg/pss/pss.go | 5 +---- pkg/pullsync/pullsync.go | 6 +----- pkg/pusher/pusher.go | 5 +---- pkg/salud/salud.go | 5 +---- pkg/settlement/swap/priceoracle/priceoracle.go | 6 +----- pkg/sharky/store.go | 7 ++----- pkg/shed/db.go | 12 ++++-------- pkg/skippeers/skippeers.go | 9 +++------ pkg/storageincentives/agent.go | 5 +---- pkg/storer/storer.go | 5 +---- pkg/topology/kademlia/kademlia.go | 5 +---- 17 files changed, 34 insertions(+), 83 deletions(-) diff --git a/pkg/api/api.go b/pkg/api/api.go index 29de7e658dd..dc2abacd4e7 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -174,9 +174,8 @@ type Service struct { metrics metrics - wsWg sync.WaitGroup // wait for all websockets to close on exit - closeOnce sync.Once - quit chan struct{} + wsWg sync.WaitGroup // wait for all websockets to close on exit + quit chan struct{} overlay *swarm.Address publicKey ecdsa.PublicKey @@ -399,10 +398,6 @@ func (s *Service) SetIsWarmingUp(v bool) { // Close hangs up running websockets on shutdown. func (s *Service) Close() error { s.logger.Info("api shutting down") - s.closeOnce.Do(func() { - close(s.quit) - }) - done := make(chan struct{}) go func() { defer close(done) diff --git a/pkg/blocker/blocker.go b/pkg/blocker/blocker.go index d15bcef1a1b..2d3deb10bf1 100644 --- a/pkg/blocker/blocker.go +++ b/pkg/blocker/blocker.go @@ -37,7 +37,6 @@ type Blocker struct { logger log.Logger wakeupCh chan struct{} quit chan struct{} - closeOnce sync.Once closeWg sync.WaitGroup blocklistCallback func(swarm.Address) } @@ -155,10 +154,9 @@ func (b *Blocker) PruneUnseen(seen []swarm.Address) { } // Close will exit the worker loop. +// must be called only once. func (b *Blocker) Close() error { - b.closeOnce.Do(func() { - close(b.quit) - }) + close(b.quit) b.closeWg.Wait() return nil } diff --git a/pkg/gsoc/gsoc.go b/pkg/gsoc/gsoc.go index ca778bf9635..464681258e2 100644 --- a/pkg/gsoc/gsoc.go +++ b/pkg/gsoc/gsoc.go @@ -25,7 +25,6 @@ type Listener interface { type listener struct { handlers map[string][]*Handler handlersMu sync.Mutex - closeOnce sync.Once quit chan struct{} logger log.Logger } @@ -87,9 +86,7 @@ func (p *listener) getHandlers(address swarm.Address) []*Handler { } func (l *listener) Close() error { - l.closeOnce.Do(func() { - close(l.quit) - }) + close(l.quit) l.handlersMu.Lock() defer l.handlersMu.Unlock() diff --git a/pkg/hive/hive.go b/pkg/hive/hive.go index 0dc53e406a9..8f8572bcb34 100644 --- a/pkg/hive/hive.go +++ b/pkg/hive/hive.go @@ -58,7 +58,6 @@ type Service struct { metrics metrics inLimiter *ratelimit.Limiter outLimiter *ratelimit.Limiter - closeOnce sync.Once quit chan struct{} wg sync.WaitGroup peersChan chan pb.Peers @@ -145,9 +144,7 @@ func (s *Service) SetAddPeersHandler(h func(addr ...swarm.Address)) { } func (s *Service) Close() error { - s.closeOnce.Do(func() { - close(s.quit) - }) + close(s.quit) stopped := make(chan struct{}) go func() { diff --git a/pkg/p2p/libp2p/internal/reacher/reacher.go b/pkg/p2p/libp2p/internal/reacher/reacher.go index 1a97b1dec08..966b5978411 100644 --- a/pkg/p2p/libp2p/internal/reacher/reacher.go +++ b/pkg/p2p/libp2p/internal/reacher/reacher.go @@ -43,9 +43,8 @@ type reacher struct { peerHeap peerHeap // min-heap ordered by retryAfter peerIndex map[string]*peer // lookup by overlay for O(1) access - newPeer chan struct{} - quit chan struct{} - closeOnce sync.Once + newPeer chan struct{} + quit chan struct{} pinger p2p.Pinger notifier p2p.ReachableNotifier @@ -291,11 +290,15 @@ func (r *reacher) jitter(d time.Duration) time.Duration { return time.Duration(float64(d) * j) } -// Close stops the worker. +// Close stops the worker. Must be called once. func (r *reacher) Close() error { - r.closeOnce.Do(func() { - close(r.quit) - }) + select { + case <-r.quit: + return nil + default: + } + + close(r.quit) r.wg.Wait() return nil } diff --git a/pkg/postage/listener/listener.go b/pkg/postage/listener/listener.go index df7811417d7..53feb3a5299 100644 --- a/pkg/postage/listener/listener.go +++ b/pkg/postage/listener/listener.go @@ -57,7 +57,6 @@ type listener struct { postageStampContractAddress common.Address postageStampContractABI abi.ABI - closeOnce sync.Once quit chan struct{} wg sync.WaitGroup metrics metrics @@ -375,9 +374,7 @@ func (l *listener) Listen(ctx context.Context, from uint64, updater postage.Even } func (l *listener) Close() error { - l.closeOnce.Do(func() { - close(l.quit) - }) + close(l.quit) done := make(chan struct{}) go func() { diff --git a/pkg/pss/pss.go b/pkg/pss/pss.go index 0387e30efaf..28319e9a615 100644 --- a/pkg/pss/pss.go +++ b/pkg/pss/pss.go @@ -55,7 +55,6 @@ type pss struct { handlersMu sync.Mutex metrics metrics logger log.Logger - closeOnce sync.Once quit chan struct{} } @@ -71,9 +70,7 @@ func New(key *ecdsa.PrivateKey, logger log.Logger) Interface { } func (ps *pss) Close() error { - ps.closeOnce.Do(func() { - close(ps.quit) - }) + close(ps.quit) ps.handlersMu.Lock() defer ps.handlersMu.Unlock() diff --git a/pkg/pullsync/pullsync.go b/pkg/pullsync/pullsync.go index 304c1a36cc1..27e6d176998 100644 --- a/pkg/pullsync/pullsync.go +++ b/pkg/pullsync/pullsync.go @@ -13,7 +13,6 @@ import ( "fmt" "io" "math" - "sync" "sync/atomic" "time" @@ -67,7 +66,6 @@ type Syncer struct { metrics metrics logger log.Logger store storer.Reserve - closeOnce sync.Once quit chan struct{} unwrap func(swarm.Chunk) gsocHandler func(*soc.SOC) @@ -570,9 +568,7 @@ func (s *Syncer) disconnect(peer p2p.Peer) error { func (s *Syncer) Close() error { s.logger.Info("pull syncer shutting down") - s.closeOnce.Do(func() { - close(s.quit) - }) + close(s.quit) cc := make(chan struct{}) go func() { defer close(cc) diff --git a/pkg/pusher/pusher.go b/pkg/pusher/pusher.go index b5997ff8fc5..9c4073f1fcc 100644 --- a/pkg/pusher/pusher.go +++ b/pkg/pusher/pusher.go @@ -55,7 +55,6 @@ type Service struct { batchExist postage.BatchExist logger log.Logger metrics metrics - closeOnce sync.Once quit chan struct{} chunksWorkerQuitC chan struct{} inflight *inflight @@ -364,9 +363,7 @@ func (s *Service) AddFeed(c <-chan *Op) { func (s *Service) Close() error { s.logger.Info("pusher shutting down") - s.closeOnce.Do(func() { - close(s.quit) - }) + close(s.quit) // Wait for chunks worker to finish select { diff --git a/pkg/salud/salud.go b/pkg/salud/salud.go index 3d75e9257a9..1cf4ffa8373 100644 --- a/pkg/salud/salud.go +++ b/pkg/salud/salud.go @@ -44,7 +44,6 @@ type peerStatus interface { type service struct { wg sync.WaitGroup - closeOnce sync.Once quit chan struct{} logger log.Logger topology topologyDriver @@ -117,9 +116,7 @@ func (s *service) worker(startupStabilizer stabilization.Subscriber, mode string } func (s *service) Close() error { - s.closeOnce.Do(func() { - close(s.quit) - }) + close(s.quit) s.wg.Wait() return nil } diff --git a/pkg/settlement/swap/priceoracle/priceoracle.go b/pkg/settlement/swap/priceoracle/priceoracle.go index 8e2cf248e37..1b60b7c89ec 100644 --- a/pkg/settlement/swap/priceoracle/priceoracle.go +++ b/pkg/settlement/swap/priceoracle/priceoracle.go @@ -9,7 +9,6 @@ import ( "errors" "io" "math/big" - "sync" "time" "github.com/ethereum/go-ethereum/accounts/abi" @@ -34,7 +33,6 @@ type service struct { exchangeRate *big.Int deduction *big.Int timeDivisor int64 - closeOnce sync.Once quitC chan struct{} } @@ -150,8 +148,6 @@ func (s *service) CurrentRates() (exchangeRate, deduction *big.Int, err error) { } func (s *service) Close() error { - s.closeOnce.Do(func() { - close(s.quitC) - }) + close(s.quitC) return nil } diff --git a/pkg/sharky/store.go b/pkg/sharky/store.go index ac100afdb32..d8947f9d042 100644 --- a/pkg/sharky/store.go +++ b/pkg/sharky/store.go @@ -32,8 +32,7 @@ type Store struct { writes chan write // shared write operations channel shards []*shard // shards wg *sync.WaitGroup // count started operations - closeOnce sync.Once - quit chan struct{} // quit channel + quit chan struct{} // quit channel metrics metrics } @@ -66,9 +65,7 @@ func New(basedir fs.FS, shardCnt int, maxDataSize int) (*Store, error) { // Close closes each shard and return incidental errors from each shard func (s *Store) Close() error { - s.closeOnce.Do(func() { - close(s.quit) - }) + close(s.quit) err := new(multierror.Error) for _, sh := range s.shards { err = multierror.Append(err, sh.close()) diff --git a/pkg/shed/db.go b/pkg/shed/db.go index dc3348d9105..1106897d008 100644 --- a/pkg/shed/db.go +++ b/pkg/shed/db.go @@ -24,7 +24,6 @@ package shed import ( "errors" - "sync" "github.com/syndtr/goleveldb/leveldb" "github.com/syndtr/goleveldb/leveldb/iterator" @@ -52,10 +51,9 @@ type Options struct { // It provides a schema functionality to store fields and indexes // information about naming and types. type DB struct { - ldb *leveldb.DB - metrics metrics - closeOnce sync.Once - quit chan struct{} // Quit channel to stop the metrics collection before closing the database + ldb *leveldb.DB + metrics metrics + quit chan struct{} // Quit channel to stop the metrics collection before closing the database } // NewDB constructs a new DB and validates the schema @@ -195,8 +193,6 @@ func (db *DB) Compact(start, end []byte) error { // Close closes LevelDB database. func (db *DB) Close() (err error) { - db.closeOnce.Do(func() { - close(db.quit) - }) + close(db.quit) return db.ldb.Close() } diff --git a/pkg/skippeers/skippeers.go b/pkg/skippeers/skippeers.go index 05787794d6d..20bbe3ba8f7 100644 --- a/pkg/skippeers/skippeers.go +++ b/pkg/skippeers/skippeers.go @@ -17,9 +17,8 @@ const maxDuration time.Duration = math.MaxInt64 type List struct { mtx sync.Mutex - closeOnce sync.Once - durC chan time.Duration - quit chan struct{} + durC chan time.Duration + quit chan struct{} // key is chunk address, value is map of peer address to expiration skip map[string]map[string]int64 @@ -134,9 +133,7 @@ func (l *List) pruneChunk(ch string, now int64) int { } func (l *List) Close() error { - l.closeOnce.Do(func() { - close(l.quit) - }) + close(l.quit) l.wg.Wait() l.mtx.Lock() diff --git a/pkg/storageincentives/agent.go b/pkg/storageincentives/agent.go index 7c84036ef6b..5142e97836e 100644 --- a/pkg/storageincentives/agent.go +++ b/pkg/storageincentives/agent.go @@ -65,7 +65,6 @@ type Agent struct { store storer.Reserve fullSyncedFunc func() bool overlay swarm.Address - closeOnce sync.Once quit chan struct{} wg sync.WaitGroup state *RedistributionState @@ -548,9 +547,7 @@ func (a *Agent) commit(ctx context.Context, sample SampleData, round uint64) err } func (a *Agent) Close() error { - a.closeOnce.Do(func() { - close(a.quit) - }) + close(a.quit) stopped := make(chan struct{}) go func() { diff --git a/pkg/storer/storer.go b/pkg/storer/storer.go index 38fb6cddec4..d4a41c73491 100644 --- a/pkg/storer/storer.go +++ b/pkg/storer/storer.go @@ -436,7 +436,6 @@ type DB struct { cacheLimiter cacheLimiter dbCloser io.Closer subscriptionsWG sync.WaitGroup - closeOnce sync.Once events *events.Subscriber directUploadLimiter chan struct{} @@ -628,9 +627,7 @@ func (db *DB) StatusMetrics() []prometheus.Collector { } func (db *DB) Close() error { - db.closeOnce.Do(func() { - close(db.quit) - }) + close(db.quit) bgReserveWorkersClosed := make(chan struct{}) go func() { diff --git a/pkg/topology/kademlia/kademlia.go b/pkg/topology/kademlia/kademlia.go index a2ca1edcacf..6dae0c69fa5 100644 --- a/pkg/topology/kademlia/kademlia.go +++ b/pkg/topology/kademlia/kademlia.go @@ -193,7 +193,6 @@ type Kad struct { collector *im.Collector quit chan struct{} // quit channel halt chan struct{} // halt channel - closeOnce sync.Once done chan struct{} // signal that `manage` has quit wg sync.WaitGroup waitNext *waitnext.WaitNext @@ -1549,9 +1548,7 @@ func (k *Kad) Halt() { // Close shuts down kademlia. func (k *Kad) Close() error { k.logger.Info("kademlia shutting down") - k.closeOnce.Do(func() { - close(k.quit) - }) + close(k.quit) cc := make(chan struct{}) k.bgBroadcastCancel() From bae9b0e6a86eb17d1f0c712754e6c6a9cc228731 Mon Sep 17 00:00:00 2001 From: sbackend Date: Tue, 31 Mar 2026 14:58:33 +0200 Subject: [PATCH 3/5] fix: closing api --- pkg/api/api.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/api/api.go b/pkg/api/api.go index dc2abacd4e7..2b713bc76d4 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -398,6 +398,8 @@ func (s *Service) SetIsWarmingUp(v bool) { // Close hangs up running websockets on shutdown. func (s *Service) Close() error { s.logger.Info("api shutting down") + close(s.quit) + done := make(chan struct{}) go func() { defer close(done) From 30e6c3cb6753ea316e67ecc1e53533fb8d23e0d8 Mon Sep 17 00:00:00 2001 From: sbackend Date: Wed, 1 Apr 2026 14:32:18 +0200 Subject: [PATCH 4/5] fix: add time span --- pkg/node/node.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/node/node.go b/pkg/node/node.go index a0cee2a4e67..5db484e43e4 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -1357,8 +1357,9 @@ func (b *Bee) Shutdown() error { return } + start := time.Now() b.logger.Debug("starting shutdown", "component", component) - defer b.logger.Debug("finished shutdown", "component", component) + defer b.logger.Debug("finished shutdown", "component", component, "elapsed", time.Since(start)) if err := c.Close(); err != nil { mErr = multierror.Append(mErr, fmt.Errorf("%s: %w", component, err)) } From d2eeb63b833f56e907c175ebd90bf272c7946074 Mon Sep 17 00:00:00 2001 From: sbackend Date: Thu, 2 Apr 2026 09:27:29 +0200 Subject: [PATCH 5/5] fix: make linter happy --- pkg/node/node.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/node/node.go b/pkg/node/node.go index 50c3fa66b03..ee89dc2fa1a 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -1360,7 +1360,9 @@ func (b *Bee) Shutdown() error { start := time.Now() b.logger.Debug("starting shutdown", "component", component) - defer b.logger.Debug("finished shutdown", "component", component, "elapsed", time.Since(start)) + defer func() { + b.logger.Debug("finished shutdown", "component", component, "elapsed", time.Since(start)) + }() if err := c.Close(); err != nil { mErr = multierror.Append(mErr, fmt.Errorf("%s: %w", component, err)) }