-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrunner.go
More file actions
82 lines (68 loc) · 2.83 KB
/
runner.go
File metadata and controls
82 lines (68 loc) · 2.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package parallel
import (
"container/list"
"sync"
)
// runner manages the life-cycle and pipeline of each RunFunc.
type runner struct {
rFunc RunFunc // Function started as a goroutine by Run()
outTag, errTag []byte // Prepended to each output line
sync.RWMutex // Protects everything below here
stdout, stderr writer // Immutable "head" supplied to Run()
queue *queue // Remember queue so we can flush() it
canClose bool // If Wait() has read this runner from completed channel
}
// newRunner constructs a skeletal runner with an empty pipeline.
func newRunner(outTag, errTag string, rFunc RunFunc) *runner {
return &runner{outTag: []byte(outTag), errTag: []byte(errTag), rFunc: rFunc}
}
// The Queue Pipeline consists of head, queue tagger, tail and Group.stdout/Group.stderr
// built in reverse order as it's stored as a singly linked list. A Queue Pipeline starts
// out in background mode.
func (rnr *runner) buildQueuePipeline(grp *Group) {
var stdout, stderr writer
stdout = newTail(grp.stdout, &grp.outputMu)
stderr = newTail(grp.stderr, &grp.outputMu)
// Tagging is optional, so leave them out if not set
if len(rnr.outTag) > 0 {
stdout = newTagger(stdout, rnr.outTag)
}
if len(rnr.errTag) > 0 {
stderr = newTagger(stderr, rnr.errTag)
}
// Queue creates two writers which share an output buffer for sequencing and
// background storage purposes. We remember one of the Queue writers so that we
// can switch it to foreground at a later time.
rnr.queue, stderr = newQueue(grp.orderStderr, grp.limitMemory, stdout, stderr)
stdout = rnr.queue
stdout = newHead(stdout)
stderr = newHead(stderr)
rnr.stdout = stdout
rnr.stderr = stderr
}
// The Passthru Pipeline consists of head, tail and Group.stdout/Group.stderr which
// eliminates all writers with state but still retains concurrency protection for the
// Group io.Writers. So, not strictly a fully transparent passthru, but as close as we can
// get while still protecting Group outputs.
func (rnr *runner) buildPassthruPipeline(grp *Group) {
rnr.stdout = newHead(newTail(grp.stdout, &grp.outputMu))
rnr.stderr = newHead(newTail(grp.stderr, &grp.outputMu))
}
// switchToForeground is called when the runner is allowed to write directly to the Group
// io.Writers. The queue writer manages the transition by releasing its queue of pending
// writes and unblocking any blocked callers.
func (rnr *runner) switchToForeground() {
rnr.queue.foreground()
}
// run the RunFunc and notify completion to [Group.Wait]. This function is called by the
// RunFunc goroutine so nothing is stalled by potentially blocking on the completion
// channel.
func (rnr *runner) run(e *list.Element, completed chan *list.Element) {
rnr.rFunc(rnr.stdout, rnr.stderr)
completed <- e
}
// Flush all pending output
func (rnr *runner) close() {
rnr.stdout.close()
rnr.stderr.close()
}