-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgo_group.go
More file actions
119 lines (96 loc) · 2.12 KB
/
go_group.go
File metadata and controls
119 lines (96 loc) · 2.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package g
import (
"bytes"
"fmt"
"runtime/debug"
"sync"
)
type GoGroup struct {
// coroutine Start a coroutine.
coroutine func(fc func())
// stack Custom handling panicked debug.Stack(), nil value will not call the receiver.
stack func(stack []byte)
// exit Stop the current coroutine group.
exit func(err error)
// exitErr Read the error of Exit().
exitErr chan error
// doneOnce Make sure to write a shutdown signal to `shutdownWait` only once.
exitOnce *sync.Once
mutex *Mutex
wg *sync.WaitGroup
// listExit Last in first out, when the function list is called, all coroutines are exited.
listExit []func()
// listQuit Last in first out, when the function list is called, not all coroutines have exited.
listQuit []func()
}
func (s *GoGroup) Stack(stack func(stack []byte)) {
s.stack = stack
}
func (s *GoGroup) PushExit(fc func()) {
if fc != nil {
s.mutex.WithLock(func() { s.listExit = append(s.listExit, fc) })
}
}
func (s *GoGroup) CallExit() {
for i := len(s.listExit) - 1; i >= 0; i-- {
s.listExit[i]()
}
}
func (s *GoGroup) PushQuit(fc func()) {
if fc != nil {
s.mutex.WithLock(func() { s.listQuit = append(s.listQuit, fc) })
}
}
func (s *GoGroup) CallQuit() {
for i := len(s.listQuit) - 1; i >= 0; i-- {
s.listQuit[i]()
}
}
func (s *GoGroup) Go(fc func()) {
s.coroutine(fc)
}
func (s *GoGroup) Exit(err error) {
s.exit(err)
}
func (s *GoGroup) ExitErr() <-chan error {
return s.exitErr
}
func (s *GoGroup) Wait() {
s.wg.Wait()
}
func NewGoGroup() *GoGroup {
s := &GoGroup{
exitErr: make(chan error, 1),
exitOnce: &sync.Once{},
mutex: NewMutex(),
wg: &sync.WaitGroup{},
}
s.coroutine = func(fc func()) {
if fc == nil {
return
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
stack := s.stack
if stack != nil {
defer func() {
if msg := recover(); msg != nil {
buf := bytes.NewBuffer(nil)
buf.WriteString(fmt.Sprintf("<<< %v >>>\n", msg))
buf.Write(debug.Stack())
stack(buf.Bytes())
}
}()
}
fc()
}()
}
s.exit = func(err error) {
s.exitOnce.Do(func() {
s.exitErr <- err
close(s.exitErr)
})
}
return s
}