forked from francescopepe/formigo
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconsumers.go
More file actions
261 lines (211 loc) · 6.71 KB
/
consumers.go
File metadata and controls
261 lines (211 loc) · 6.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
package formigo
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/Pod-Point/go-queue-worker/internal/messages"
)
type BatchResponse struct {
FailedMessagesId []interface{}
}
type MessageHandler = func(ctx context.Context, msg Message) error
type BatchHandler = func(ctx context.Context, msgs []Message) (BatchResponse, error)
// This means that the buffered messages didn't get passed to the handler within
// the first message's timeout.
// This is generally due to:
// - Visibility timeout of the messages too small
// - Buffer timeout too high
// - Consumer to slow
var errBufferCtxExpired = errors.New("buffer context expired, buffer will Reset")
type Consumer interface {
consume(concurrency int, ctrl *controller, messageCh <-chan messages.Message, deleteCh chan<- messages.Message)
}
func makeAvailableConsumers(concurrency int) chan struct{} {
consumers := make(chan struct{}, concurrency)
for i := 0; i < concurrency; i++ {
consumers <- struct{}{}
}
return consumers
}
// wrapHandler catches any panic error and returns the error that generated it.
// It prevents the worker from crashing in case of an unexpected error.
func wrapHandler(handler func() error) (err error) {
defer func() {
if r := recover(); r != nil {
// Set error before returning
err = fmt.Errorf("panic error: %s", r)
}
}()
err = handler()
return err
}
// messageConsumer defines a message handler that consumes only one message at a
// time.
// It can be useful when the workload is specific per message, for example for sending
// an email.
type messageConsumer struct {
handler MessageHandler
}
func (c *messageConsumer) processMessage(msg messages.Message) error {
defer msg.CancelCtx() // This must be called to release resources associated with the context.
// Process Message
return wrapHandler(func() error {
return c.handler(msg.Ctx, msg)
})
}
// Consumes and deletes a single message, it stops only when the `messageCh` gets closed
// and doesn't have any messages in it.
func (c *messageConsumer) consume(concurrency int, ctrl *controller, messageCh <-chan messages.Message, deleteCh chan<- messages.Message) {
consumers := makeAvailableConsumers(concurrency)
var wg sync.WaitGroup
for msg := range messageCh {
<-consumers // Use an available consumer
wg.Add(1)
go func(message messages.Message) {
defer func() {
wg.Done()
consumers <- struct{}{} // Release consumer
}()
err := c.processMessage(message)
if err != nil {
ctrl.reportError(fmt.Errorf("failed to process message: %w", err))
return
}
// Push message for deletion
deleteCh <- message
}(msg)
}
wg.Wait()
}
func NewMessageConsumer(config MessageConsumerConfiguration) *messageConsumer {
return &messageConsumer{
handler: config.Handler,
}
}
// batchConsumer allows to process multiple messages at a time. This can be useful
// for batch updates or use cases with high throughput.
type batchConsumer struct {
handler BatchHandler
bufferConfig BatchConsumerBufferConfiguration
}
// It processes the messages and push them downstream for deletion.
func (c *batchConsumer) processMessages(ctrl *controller, deleteCh chan<- messages.Message, ctx context.Context, msgs []messages.Message) {
defer func() {
if r := recover(); r != nil {
ctrl.reportError(fmt.Errorf("panic error: %s", r))
}
}()
// Convert slice to the abstraction
converted := make([]Message, 0, len(msgs))
for _, msg := range msgs {
converted = append(converted, msg)
}
resp, err := c.handler(ctx, converted)
if err != nil {
ctrl.reportError(fmt.Errorf("failed to process batch: %w", err))
}
toDelete := c.buildMessagesToDeleteFromBatchResponse(msgs, resp)
// Push messages for deletion
for _, msg := range toDelete {
deleteCh <- msg
}
}
// Consumes and deletes a number of messages in the interval [1, N] based on configuration
// provided in the BufferConfiguration.
// It stops only when the messageCh gets closed and doesn't have any messages in it.
func (c *batchConsumer) consume(concurrency int, ctrl *controller, messageCh <-chan messages.Message, deleteCh chan<- messages.Message) {
consumers := makeAvailableConsumers(concurrency)
// Create buffer
buffer := messages.NewBufferWithContextTimeout(messages.BufferWithContextTimeoutConfiguration{
Size: c.bufferConfig.Size,
BufferTimeout: c.bufferConfig.Timeout,
})
defer buffer.Reset()
var wg sync.WaitGroup
func() {
for {
select {
case msg, open := <-messageCh:
if !open {
// Message channel closed. This is the stop signal.
// Note: We can't return if the buffer contains messages to process.
// We MUST consume all the messages on the buffer
if buffer.IsEmpty() {
return // Buffer empty, we can stop
}
break // Buffer contains messages, break the select
}
// Add message to the buffer
buffer.Add(msg)
// If the buffer is not full, continue
if !buffer.IsFull() {
continue
}
case <-buffer.CtxExpired():
ctrl.reportError(errBufferCtxExpired)
// Reset the buffer.
buffer.Reset()
continue
case <-buffer.Expired():
// Timeout expired, process the buffer
}
select {
case <-consumers: // Use an available consumer
case <-buffer.CtxExpired():
ctrl.reportError(errBufferCtxExpired)
// Reset the buffer.
buffer.Reset()
continue
}
ctx, cancelCtx := buffer.PullContext()
wg.Add(1)
go func(ctx context.Context, ctxCancelFunc context.CancelFunc, msgs []messages.Message) {
defer func() {
wg.Done()
consumers <- struct{}{} // Release consumer
ctxCancelFunc() // Cancel context
}()
// Process the messages
c.processMessages(ctrl, deleteCh, ctx, msgs)
}(ctx, cancelCtx, buffer.Messages())
// Reset buffer
buffer.Reset()
}
}()
wg.Wait()
}
func (c *batchConsumer) buildMessagesToDeleteFromBatchResponse(msgs []messages.Message, resp BatchResponse) []messages.Message {
if len(resp.FailedMessagesId) == 0 {
return msgs
}
toDelete := make([]messages.Message, 0, len(msgs))
failedMessagesIdIndexed := make(map[interface{}]struct{}, len(resp.FailedMessagesId))
for _, id := range resp.FailedMessagesId {
failedMessagesIdIndexed[id] = struct{}{}
}
for _, msg := range msgs {
if _, ok := failedMessagesIdIndexed[msg.Id()]; !ok {
toDelete = append(toDelete, msg)
}
}
return toDelete
}
func NewBatchConsumer(config BatchConsumerConfiguration) *batchConsumer {
if config.BufferConfig.Size == 0 {
config.BufferConfig.Size = 10
}
if config.BufferConfig.Timeout == 0 {
config.BufferConfig.Timeout = time.Second
}
return &batchConsumer{
handler: config.Handler,
bufferConfig: config.BufferConfig,
}
}
// Interface guards
var (
_ Consumer = (*messageConsumer)(nil)
_ Consumer = (*batchConsumer)(nil)
)