-
Notifications
You must be signed in to change notification settings - Fork 149
Expand file tree
/
Copy pathstart_stop.go
More file actions
349 lines (305 loc) · 11.2 KB
/
start_stop.go
File metadata and controls
349 lines (305 loc) · 11.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
package startstop
import (
"context"
"errors"
"sync"
)
// ErrStop is an error injected into WithCancelCause when context is canceled
// because a service is stopping. Makes it possible to differentiate a
// controlled stop from a context cancellation.
var ErrStop = errors.New("service stopped")
// Service is a generalized interface for a service that starts and stops,
// usually one backed by embedding BaseStartStop.
type Service interface {
// Start starts a service. Services are responsible for backgrounding
// themselves, so this function should be invoked synchronously. Services
// may return an error if they have trouble starting up, so the caller
// should wait and respond to the error if necessary.
Start(ctx context.Context) error
// Started returns a channel that's closed when a service finishes starting,
// or if failed to start and is stopped instead. It can be used in
// conjunction with WaitAllStarted to verify startup of a constellation of
// services.
Started() <-chan struct{}
// Stop stops a service. Services are responsible for making sure their stop
// is complete before returning so a caller can wait on this invocation
// synchronously and be guaranteed the service is fully stopped. Services
// are expected to be able to tolerate (1) being stopped without having been
// started, and (2) being double-stopped.
Stop()
}
// ServiceWithStopped is a Service that can also return a Stopped channel. I've
// kept this as a separate interface for the time being because I'm not sure
// this is strictly necessary to be part of startstop.
type serviceWithStopped interface {
Service
// Stopped returns a channel that can be waited on for the service to be
// stopped. This function is only safe to invoke after successfully waiting on a
// service's Start, and a reference to it must be taken _before_ invoking Stop.
Stopped() <-chan struct{}
}
// BaseStartStop is a helper that can be embedded on a queue maintenance service
// and which will provide the basic necessities to safely implement the Service
// interface in a way that's not racy and can tolerate a number of edge cases.
// It's packaged separately so that it doesn't leak its internal variables into
// services that use it.
//
// Services should implement their own Start function which invokes StartInit
// first thing, return if told not to start, spawn a goroutine with their main
// run block otherwise, and make sure to defer a close on the stop channel
// returned by StartInit within that goroutine.
//
// A Stop implementation is provided automatically and it's not necessary to
// override it.
type BaseStartStop struct {
cancelFunc context.CancelCauseFunc
isRunning bool
mu sync.Mutex
started chan struct{}
stopped chan struct{}
}
// StartInit should be invoked at the beginning of a service's Start function.
// It returns a context for the service to use, a boolean indicating whether it
// should start (which will be false if the service is already started), and a
// stopped channel. Services should defer a close on the stop channel in their
// main run loop.
//
// func (s *Service) Start(ctx context.Context) error {
// ctx, shouldStart, stopped := s.StartInit(ctx)
// if !shouldStart {
// return nil
// }
//
// go func() {
// defer close(stopped)
//
// <-ctx.Done()
//
// ...
// }()
//
// return nil
// }
//
// Be careful to also close it in the event of startup errors, otherwise a
// service that failed to start once will never be able to start up.
//
// ctx, shouldStart, stopped := s.StartInit(ctx)
// if !shouldStart {
// return nil
// }
//
// if err := possibleStartUpError(); err != nil {
// close(stopped)
// return err
// }
//
// ...
func (s *BaseStartStop) StartInit(ctx context.Context) (context.Context, bool, func(), func()) {
s.mu.Lock()
defer s.mu.Unlock()
if s.isRunning {
return ctx, false, nil, nil
}
s.isRunning = true
// Only allocate a started or stopped channels when not preallocated by
// Started or Stopped.
if s.started == nil {
s.started = make(chan struct{})
}
if s.stopped == nil {
s.stopped = make(chan struct{})
}
ctx, s.cancelFunc = context.WithCancelCause(ctx)
closeStartedOnce := sync.OnceFunc(func() { close(s.started) })
return ctx, true, closeStartedOnce, func() {
// Also close the started channel (in case it wasn't already), just in
// case `started()` was never invoked and someone is waiting on it.
closeStartedOnce()
close(s.stopped)
}
}
// StartFailed should be called when a service fails to start after StartInit.
// It closes the stopped channel and resets internal state so Start can be
// called again.
//
// This should not be used when a Stop is already in progress (ErrStop), because
// Stop will handle cleanup via finalizeStop.
func (s *BaseStartStop) StartFailed(stopped func()) {
if s.cancelFunc != nil {
s.cancelFunc(ErrStop)
}
stopped()
s.mu.Lock()
defer s.mu.Unlock()
if !s.isRunning {
return
}
s.isRunning = false
s.started = nil
s.stopped = nil
}
// Started returns a channel that's closed when a service finishes starting, or
// if failed to start and is stopped instead. It can be used in conjunction with
// WaitAllStarted to verify startup of a constellation of services.
func (s *BaseStartStop) Started() <-chan struct{} {
s.mu.Lock()
defer s.mu.Unlock()
// If the call to Started is before the service was actually started,
// preallocate the started channel so that regardless of whether the wait
// started before or after the service started, it will still do the right
// thing.
if s.started == nil {
s.started = make(chan struct{})
}
return s.started
}
// Stop is an automatically provided implementation for the maintenance Service
// interface's Stop.
func (s *BaseStartStop) Stop() {
shouldStop, stopped, finalizeStop := s.StopInit()
if !shouldStop {
return
}
<-stopped
finalizeStop(true)
}
// StopInit provides a way to build a more customized Stop implementation. It
// should be avoided unless there's an exceptional reason not to because Stop
// should be fine in the vast majority of situations.
//
// It returns a boolean indicating whether the service should do any additional
// work to stop (false is returned if the service was never started), a stopped
// channel to wait on for full stop, and a finalizeStop function that should be
// deferred in the stop function to ensure that locks are cleaned up and the
// struct is reset after stopping.
//
// func (s *Service) Stop(ctx context.Context) error {
// shouldStop, stopped, finalizeStop := s.StopInit(ctx)
// if !shouldStop {
// return
// }
//
// defer finalizeStop(true)
//
// ...
// }
//
// finalizeStop takes a boolean which indicates where the service should indeed
// be considered stopped. This should usually be true, but callers can pass
// false to cancel the stop action, keeping the service from starting again, and
// potentially allowing the service to try another stop.
func (s *BaseStartStop) StopInit() (bool, <-chan struct{}, func(didStop bool)) {
s.mu.Lock()
// Tolerate being told to stop without having been started.
if !s.isRunning {
s.mu.Unlock()
return false, nil, func(didStop bool) {}
}
s.cancelFunc(ErrStop)
return true, s.stopped, func(didStop bool) {
defer s.mu.Unlock()
if didStop {
s.isRunning = false
s.started = nil
s.stopped = nil
}
}
}
// Stopped returns a channel that can be waited on for the service to be
// stopped. This function may be used to return a stopped channel before a
// service is started or while it's running, but a reference to it must be taken
// _before_ invoking Stop.
func (s *BaseStartStop) Stopped() <-chan struct{} {
s.mu.Lock()
defer s.mu.Unlock()
// If the call to Stopped is before the service was actually started,
// preallocate the stopped channel so that regardless of whether the wait
// started before or after the service started, it will still do the right
// thing.
if s.stopped == nil {
s.stopped = make(chan struct{})
}
return s.stopped
}
// StoppedUnsafe returns a channel that can be waited on for the service to be
// stopped.
//
// Unlike Stopped, this returns the struct's internal channel directly without
// preallocation and without taking a lock on the mutex (making it safe to call
// while StopInit is ongoing). Most users of BaseStartStop shouldn't use this
// variant and it basically exists for river.Client so it can provide slightly
// different stop channel semantics compared to BaseStartStop's.
func (s *BaseStartStop) StoppedUnsafe() <-chan struct{} { return s.stopped }
type startStopFunc struct {
BaseStartStop
startFunc func(ctx context.Context, shouldStart bool, started, stopped func()) error
}
// StartStopFunc produces a `startstop.Service` from a function. It's useful for
// very small services that don't necessarily need a whole struct defined for
// them.
func StartStopFunc(startFunc func(ctx context.Context, shouldStart bool, started, stopped func()) error) *startStopFunc {
return &startStopFunc{
startFunc: startFunc,
}
}
func (s *startStopFunc) Start(ctx context.Context) error {
return s.startFunc(s.StartInit(ctx))
}
// StartAll starts all given services. If any service returns an error while
// being started, that error is returned, and any services that were started
// successfully up to that point are stopped.
func StartAll(ctx context.Context, services ...Service) error {
for i, service := range services {
if err := service.Start(ctx); err != nil {
StopAllParallel(services[0:i]...)
return err
}
}
return nil
}
// StopAllParallel stops all the given services in parallel and waits until
// they've all stopped successfully.
func StopAllParallel(services ...Service) {
var wg sync.WaitGroup
wg.Add(len(services))
for i := range services {
service := services[i]
go func() {
defer wg.Done()
service.Stop()
}()
}
wg.Wait()
}
// WaitAllStarted waits until all the given services are started (or stopped in
// a degenerate start scenario, like if context is cancelled while starting up).
//
// Unlike StopAllParallel, WaitAllStarted doesn't bother with parallelism
// because the services themselves have already backgrounded themselves, and we
// have to wait until the slowest service has started anyway.
func WaitAllStarted(services ...Service) {
<-WaitAllStartedC(services...)
}
// WaitAllStartedC waits until all the given services are started (or stopped in
// a degenerate start scenario, like if context is cancelled while starting up).
//
// This variant returns a channel so that a caller can apply a timeout branch
// with `select` if they'd like. For the most part this shouldn't be needed
// though, as long as each service individually is confirmed to be able to start
// and stop itself in a healthy way. (i.e. Never dies for any reason before
// managing to call `started()` or `stopped()`).
//
// Unlike StopAllParallel, WaitAllStartedC doesn't bother with parallelism
// because the services themselves have already background themselves, and we
// have to wait until the slowest service has started anyway.
func WaitAllStartedC(services ...Service) <-chan struct{} {
allStarted := make(chan struct{})
go func() {
defer close(allStarted)
for _, service := range services {
<-service.Started()
}
}()
return allStarted
}