Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,9 @@ type Service struct {

metrics metrics

wsWg sync.WaitGroup // wait for all websockets to close on exit
quit chan struct{}
wsWg sync.WaitGroup // wait for all websockets to close on exit
closeOnce sync.Once
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this needed? how come that a quit signal can happen twice?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe that this is the protection of the double closed channel, which is panicing.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that is clear. the question is why would the channel be closed twice in the first place? it suggests a problem in the calling code no?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose that it is just a common protection, as there is no way to restrict how many times an exported method is called. I did not notice that the Close method is called twice anywhere, but I suppose that a layer of safety is a good thing here.

quit chan struct{}

overlay *swarm.Address
publicKey ecdsa.PublicKey
Expand Down Expand Up @@ -398,7 +399,9 @@ func (s *Service) SetIsWarmingUp(v bool) {
// Close hangs up running websockets on shutdown.
func (s *Service) Close() error {
s.logger.Info("api shutting down")
close(s.quit)
s.closeOnce.Do(func() {
close(s.quit)
})

done := make(chan struct{})
go func() {
Expand Down
6 changes: 4 additions & 2 deletions pkg/blocker/blocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type Blocker struct {
logger log.Logger
wakeupCh chan struct{}
quit chan struct{}
closeOnce sync.Once
closeWg sync.WaitGroup
blocklistCallback func(swarm.Address)
}
Expand Down Expand Up @@ -154,9 +155,10 @@ func (b *Blocker) PruneUnseen(seen []swarm.Address) {
}

// Close will exit the worker loop.
// must be called only once.
func (b *Blocker) Close() error {
close(b.quit)
b.closeOnce.Do(func() {
close(b.quit)
})
b.closeWg.Wait()
return nil
}
5 changes: 4 additions & 1 deletion pkg/gsoc/gsoc.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Listener interface {
type listener struct {
handlers map[string][]*Handler
handlersMu sync.Mutex
closeOnce sync.Once
quit chan struct{}
logger log.Logger
}
Expand Down Expand Up @@ -86,7 +87,9 @@ func (p *listener) getHandlers(address swarm.Address) []*Handler {
}

func (l *listener) Close() error {
close(l.quit)
l.closeOnce.Do(func() {
close(l.quit)
})
l.handlersMu.Lock()
defer l.handlersMu.Unlock()

Expand Down
5 changes: 4 additions & 1 deletion pkg/hive/hive.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type Service struct {
metrics metrics
inLimiter *ratelimit.Limiter
outLimiter *ratelimit.Limiter
closeOnce sync.Once
quit chan struct{}
wg sync.WaitGroup
peersChan chan pb.Peers
Expand Down Expand Up @@ -144,7 +145,9 @@ func (s *Service) SetAddPeersHandler(h func(addr ...swarm.Address)) {
}

func (s *Service) Close() error {
close(s.quit)
s.closeOnce.Do(func() {
close(s.quit)
})

stopped := make(chan struct{})
go func() {
Expand Down
12 changes: 9 additions & 3 deletions pkg/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ import (
const LoggerName = "node"

type Bee struct {
logger log.Logger
p2pService io.Closer
p2pHalter p2p.Halter
ctxCancel context.CancelFunc
Expand Down Expand Up @@ -260,6 +261,7 @@ func NewBee(
})

b = &Bee{
logger: logger,
ctxCancel: ctxCancel,
errorLogWriter: sink,
tracerCloser: tracerCloser,
Expand Down Expand Up @@ -1350,12 +1352,15 @@ func (b *Bee) Shutdown() error {
}
// tryClose is a convenient closure which decrease
// repetitive io.Closer tryClose procedure.
tryClose := func(c io.Closer, errMsg string) {
tryClose := func(c io.Closer, component string) {
if c == nil {
return
}

b.logger.Debug("starting shutdown", "component", component)
defer b.logger.Debug("finished shutdown", "component", component)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it is worth to add to it the time-span it took to shut down the component

if err := c.Close(); err != nil {
mErr = multierror.Append(mErr, fmt.Errorf("%s: %w", errMsg, err))
mErr = multierror.Append(mErr, fmt.Errorf("%s: %w", component, err))
}
}

Expand Down Expand Up @@ -1429,9 +1434,10 @@ func (b *Bee) Shutdown() error {
tryClose(b.tracerCloser, "tracer")
tryClose(b.topologyCloser, "topology driver")
tryClose(b.storageIncetivesCloser, "storage incentives agent")
// close localstore before StateStore to avoid ErrClosed / incomplete flush.
tryClose(b.localstoreCloser, "localstore")
tryClose(b.stateStoreCloser, "statestore")
tryClose(b.stamperStoreCloser, "stamperstore")
tryClose(b.localstoreCloser, "localstore")
tryClose(b.resolverCloser, "resolver service")

return mErr
Expand Down
17 changes: 7 additions & 10 deletions pkg/p2p/libp2p/internal/reacher/reacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ type reacher struct {
peerHeap peerHeap // min-heap ordered by retryAfter
peerIndex map[string]*peer // lookup by overlay for O(1) access

newPeer chan struct{}
quit chan struct{}
newPeer chan struct{}
quit chan struct{}
closeOnce sync.Once
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


pinger p2p.Pinger
notifier p2p.ReachableNotifier
Expand Down Expand Up @@ -290,15 +291,11 @@ func (r *reacher) jitter(d time.Duration) time.Duration {
return time.Duration(float64(d) * j)
}

// Close stops the worker. Must be called once.
// Close stops the worker.
func (r *reacher) Close() error {
select {
case <-r.quit:
return nil
default:
}

close(r.quit)
r.closeOnce.Do(func() {
close(r.quit)
})
r.wg.Wait()
return nil
}
9 changes: 8 additions & 1 deletion pkg/p2p/libp2p/libp2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -1019,7 +1019,14 @@ func (s *Service) Connect(ctx context.Context, addrs []ma.Multiaddr) (address *b
s.metrics.ConnectBreakerCount.Inc()
return nil, p2p.NewConnectionBackoffError(err, s.connectionBreaker.ClosedUntil())
}
s.logger.Warning("libp2p connect", "peer_id", peerID, "underlay", info.Addrs, "error", err)
if !errors.Is(err, context.Canceled) {
select {
case <-s.halt:
s.logger.Debug("libp2p connect", "peer_id", peerID, "underlay", info.Addrs, "error", err)
default:
s.logger.Warning("libp2p connect", "peer_id", peerID, "underlay", info.Addrs, "error", err)
}
}
connectErr = err
continue
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/postage/listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type listener struct {

postageStampContractAddress common.Address
postageStampContractABI abi.ABI
closeOnce sync.Once
quit chan struct{}
wg sync.WaitGroup
metrics metrics
Expand Down Expand Up @@ -374,7 +375,9 @@ func (l *listener) Listen(ctx context.Context, from uint64, updater postage.Even
}

func (l *listener) Close() error {
close(l.quit)
l.closeOnce.Do(func() {
close(l.quit)
})

done := make(chan struct{})
go func() {
Expand Down
5 changes: 4 additions & 1 deletion pkg/pss/pss.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type pss struct {
handlersMu sync.Mutex
metrics metrics
logger log.Logger
closeOnce sync.Once
quit chan struct{}
}

Expand All @@ -70,7 +71,9 @@ func New(key *ecdsa.PrivateKey, logger log.Logger) Interface {
}

func (ps *pss) Close() error {
close(ps.quit)
ps.closeOnce.Do(func() {
close(ps.quit)
})
ps.handlersMu.Lock()
defer ps.handlersMu.Unlock()

Expand Down
8 changes: 6 additions & 2 deletions pkg/pullsync/pullsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"fmt"
"io"
"math"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -66,6 +67,7 @@ type Syncer struct {
metrics metrics
logger log.Logger
store storer.Reserve
closeOnce sync.Once
quit chan struct{}
unwrap func(swarm.Chunk)
gsocHandler func(*soc.SOC)
Expand Down Expand Up @@ -193,7 +195,7 @@ func (s *Syncer) handler(streamCtx context.Context, p p2p.Peer, stream p2p.Strea
}

// slow down future requests
waitDur, err := s.limiter.Wait(streamCtx, p.Address.ByteString(), max(1, len(chs)))
waitDur, err := s.limiter.Wait(ctx, p.Address.ByteString(), max(1, len(chs)))
if err != nil {
return fmt.Errorf("rate limiter: %w", err)
}
Expand Down Expand Up @@ -568,7 +570,9 @@ func (s *Syncer) disconnect(peer p2p.Peer) error {

func (s *Syncer) Close() error {
s.logger.Info("pull syncer shutting down")
close(s.quit)
s.closeOnce.Do(func() {
close(s.quit)
})
cc := make(chan struct{})
go func() {
defer close(cc)
Expand Down
5 changes: 4 additions & 1 deletion pkg/pusher/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Service struct {
batchExist postage.BatchExist
logger log.Logger
metrics metrics
closeOnce sync.Once
quit chan struct{}
chunksWorkerQuitC chan struct{}
inflight *inflight
Expand Down Expand Up @@ -363,7 +364,9 @@ func (s *Service) AddFeed(c <-chan *Op) {

func (s *Service) Close() error {
s.logger.Info("pusher shutting down")
close(s.quit)
s.closeOnce.Do(func() {
close(s.quit)
})

// Wait for chunks worker to finish
select {
Expand Down
5 changes: 4 additions & 1 deletion pkg/salud/salud.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type peerStatus interface {

type service struct {
wg sync.WaitGroup
closeOnce sync.Once
quit chan struct{}
logger log.Logger
topology topologyDriver
Expand Down Expand Up @@ -116,7 +117,9 @@ func (s *service) worker(startupStabilizer stabilization.Subscriber, mode string
}

func (s *service) Close() error {
close(s.quit)
s.closeOnce.Do(func() {
close(s.quit)
})
s.wg.Wait()
return nil
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/settlement/swap/priceoracle/priceoracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"errors"
"io"
"math/big"
"sync"
"time"

"github.com/ethereum/go-ethereum/accounts/abi"
Expand All @@ -33,6 +34,7 @@ type service struct {
exchangeRate *big.Int
deduction *big.Int
timeDivisor int64
closeOnce sync.Once
quitC chan struct{}
}

Expand Down Expand Up @@ -148,6 +150,8 @@ func (s *service) CurrentRates() (exchangeRate, deduction *big.Int, err error) {
}

func (s *service) Close() error {
close(s.quitC)
s.closeOnce.Do(func() {
close(s.quitC)
})
return nil
}
7 changes: 5 additions & 2 deletions pkg/sharky/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ type Store struct {
writes chan write // shared write operations channel
shards []*shard // shards
wg *sync.WaitGroup // count started operations
quit chan struct{} // quit channel
closeOnce sync.Once
quit chan struct{} // quit channel
metrics metrics
}

Expand Down Expand Up @@ -65,7 +66,9 @@ func New(basedir fs.FS, shardCnt int, maxDataSize int) (*Store, error) {

// Close closes each shard and return incidental errors from each shard
func (s *Store) Close() error {
close(s.quit)
s.closeOnce.Do(func() {
close(s.quit)
})
err := new(multierror.Error)
for _, sh := range s.shards {
err = multierror.Append(err, sh.close())
Expand Down
12 changes: 8 additions & 4 deletions pkg/shed/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package shed

import (
"errors"
"sync"

"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/iterator"
Expand Down Expand Up @@ -51,9 +52,10 @@ type Options struct {
// It provides a schema functionality to store fields and indexes
// information about naming and types.
type DB struct {
ldb *leveldb.DB
metrics metrics
quit chan struct{} // Quit channel to stop the metrics collection before closing the database
ldb *leveldb.DB
metrics metrics
closeOnce sync.Once
quit chan struct{} // Quit channel to stop the metrics collection before closing the database
}

// NewDB constructs a new DB and validates the schema
Expand Down Expand Up @@ -193,6 +195,8 @@ func (db *DB) Compact(start, end []byte) error {

// Close closes LevelDB database.
func (db *DB) Close() (err error) {
close(db.quit)
db.closeOnce.Do(func() {
close(db.quit)
})
return db.ldb.Close()
}
9 changes: 6 additions & 3 deletions pkg/skippeers/skippeers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ const maxDuration time.Duration = math.MaxInt64
type List struct {
mtx sync.Mutex

durC chan time.Duration
quit chan struct{}
closeOnce sync.Once
durC chan time.Duration
quit chan struct{}
// key is chunk address, value is map of peer address to expiration
skip map[string]map[string]int64

Expand Down Expand Up @@ -133,7 +134,9 @@ func (l *List) pruneChunk(ch string, now int64) int {
}

func (l *List) Close() error {
close(l.quit)
l.closeOnce.Do(func() {
close(l.quit)
})
l.wg.Wait()

l.mtx.Lock()
Expand Down
Loading
Loading