-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhelpers.go
More file actions
145 lines (133 loc) · 3.85 KB
/
helpers.go
File metadata and controls
145 lines (133 loc) · 3.85 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package workers
import (
"context"
"errors"
"math/rand/v2"
"time"
)
// EveryInterval wraps fn in a timer loop that calls fn at the given interval.
// Returns when ctx is cancelled. If fn returns an error, EveryInterval returns
// that error (the supervisor decides whether to restart based on [Worker.WithRestart]).
func EveryInterval(d time.Duration, fn CycleFunc) CycleFunc {
return everyIntervalWithJitter(d, 0, 0, fn)
}
// everyIntervalWithJitter wraps fn in a timer loop with configurable jitter
// and initial delay.
//
// On each tick: spread = base * jitterPercent / 100; jittered = base - spread + rand(2*spread+1).
// The effective interval is clamped to a minimum of 1ms.
// Uses [time.NewTimer] + Reset for variable intervals (not time.Ticker).
//
// If jitterPercent is 0, the interval is fixed (no randomization).
// If initialDelay > 0, the first tick is delayed by that amount instead of
// the computed interval.
func everyIntervalWithJitter(base time.Duration, jitterPercent int, initialDelay time.Duration, fn CycleFunc) CycleFunc {
base = max(base, time.Millisecond)
return func(ctx context.Context, info *WorkerInfo) error {
computeInterval := func() time.Duration {
if jitterPercent <= 0 {
return base
}
spread := int64(base) * int64(jitterPercent) / 100
jittered := int64(base) - spread + rand.Int64N(2*spread+1)
return time.Duration(max(jittered, int64(time.Millisecond)))
}
var firstInterval time.Duration
if initialDelay > 0 {
firstInterval = initialDelay
} else {
firstInterval = computeInterval()
}
timer := time.NewTimer(firstInterval)
defer timer.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
if err := fn(ctx, info); err != nil {
if !errors.Is(err, ErrSkipTick) {
return err
}
}
timer.Reset(computeInterval())
}
}
}
}
// ChannelWorker consumes items from ch one at a time, calling fn for each.
// Returns when ctx is cancelled or ch is closed.
func ChannelWorker[T any](ch <-chan T, fn func(ctx context.Context, info *WorkerInfo, item T) error) CycleFunc {
return func(ctx context.Context, info *WorkerInfo) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case item, ok := <-ch:
if !ok {
return ErrDoNotRestart // channel closed, nothing left to do
}
if err := fn(ctx, info, item); err != nil {
return err
}
}
}
}
}
// BatchChannelWorker collects items from ch into batches and calls fn when
// either the batch reaches maxSize or maxDelay elapses since the first item
// in the current batch — whichever comes first. Flushes any partial batch
// on context cancellation or channel close before returning.
func BatchChannelWorker[T any](ch <-chan T, maxSize int, maxDelay time.Duration, fn func(ctx context.Context, info *WorkerInfo, batch []T) error) CycleFunc {
maxSize = max(maxSize, 1)
return func(ctx context.Context, info *WorkerInfo) error {
batch := make([]T, 0, maxSize)
timer := time.NewTimer(maxDelay)
defer timer.Stop()
timer.Stop() // don't start until first item
flush := func() error {
if len(batch) == 0 {
return nil
}
err := fn(ctx, info, batch)
batch = batch[:0]
return err
}
for {
select {
case <-ctx.Done():
if err := flush(); err != nil {
return err
}
return ctx.Err()
case item, ok := <-ch:
if !ok {
// Channel closed — flush remaining and stop permanently.
if err := flush(); err != nil {
return err
}
return ErrDoNotRestart
}
if len(batch) == 0 {
timer.Reset(maxDelay)
}
batch = append(batch, item)
if len(batch) >= maxSize {
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
if err := flush(); err != nil {
return err
}
}
case <-timer.C:
if err := flush(); err != nil {
return err
}
}
}
}
}