Skip to content

Commit 02f845e

Browse files
Ahmad Rizqi MeydiarsoAhmad Rizqi Meydiarso
authored andcommitted
fix retry worker, revamp test.sh
1 parent c9629b9 commit 02f845e

6 files changed

Lines changed: 483 additions & 221 deletions

File tree

internal/scheduler/dispatcher.go

Lines changed: 113 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -68,26 +68,7 @@ func (d *Dispatcher) SetHTTPClient(client HTTPClient) {
6868
d.client = client
6969
}
7070

71-
// retryWithBackoff executes a function with retries and backoff
72-
func (d *Dispatcher) retryWithBackoff(ctx context.Context, operation func() error) error {
73-
var lastErr error
74-
for attempt := 0; attempt <= d.maxRetries; attempt++ {
75-
err := operation()
76-
if err == nil {
77-
return nil
78-
}
79-
80-
lastErr = err
81-
if attempt == d.maxRetries {
82-
return lastErr
83-
}
84-
85-
time.Sleep(d.retryDelay)
86-
}
87-
return lastErr
88-
}
89-
90-
// DispatchWebhook sends a webhook request and handles retries using a Schedule object
71+
// DispatchWebhook sends a webhook request and handles a single attempt (no in-process retry)
9172
func (d *Dispatcher) DispatchWebhook(ctx context.Context, sched *models.Schedule) error {
9273
if strings.HasPrefix(sched.Webhook, "q:") {
9374
if d.clientNotifier == nil {
@@ -106,29 +87,24 @@ func (d *Dispatcher) DispatchWebhook(ctx context.Context, sched *models.Schedule
10687
if err != nil {
10788
return fmt.Errorf("error marshaling payload: %w", err)
10889
}
109-
var dispatchErr error
110-
dispatchErr = d.retryWithBackoff(ctx, func() error {
111-
return d.clientNotifier.DispatchToClient(clientID, jsonPayload)
112-
})
113-
// Log occurrence as before (reuse code)
114-
// ... (copy occurrence logging from HTTP path, set status based on dispatchErr) ...
90+
err = d.clientNotifier.DispatchToClient(clientID, jsonPayload)
11591
finalStatus := models.OccurrenceStatusCompleted
116-
if dispatchErr != nil {
92+
if err != nil {
11793
finalStatus = models.OccurrenceStatusFailed
11894
}
11995
logOccurrence := &models.Occurrence{
12096
OccurrenceID: sched.OccurrenceID,
12197
EventID: sched.EventID,
12298
ScheduledAt: sched.ScheduledAt,
12399
Status: finalStatus,
124-
AttemptCount: 1, // or track attempts if needed
100+
AttemptCount: sched.AttemptCount,
125101
Timestamp: time.Now(),
126102
StatusCode: 0,
127103
ResponseBody: "",
128-
ErrorMessage: fmt.Sprintf("client hook dispatch: %v", dispatchErr),
104+
ErrorMessage: fmt.Sprintf("client hook dispatch: %v", err),
129105
}
130106
_ = d.occurrenceRepo.Create(ctx, logOccurrence)
131-
return dispatchErr
107+
return err
132108
}
133109
// Prepare webhook payload with rich information
134110
payload := map[string]interface{}{
@@ -139,102 +115,74 @@ func (d *Dispatcher) DispatchWebhook(ctx context.Context, sched *models.Schedule
139115
"scheduled_at": sched.ScheduledAt,
140116
"metadata": sched.Metadata,
141117
}
142-
143-
// Convert payload to JSON
144118
jsonPayload, err := json.Marshal(payload)
145119
if err != nil {
146120
return fmt.Errorf("error marshaling payload: %w", err)
147121
}
148-
149-
// Create base request (will be cloned for each attempt)
150122
baseReq, err := http.NewRequestWithContext(ctx, "POST", sched.Webhook, nil)
151123
if err != nil {
152124
return fmt.Errorf("error creating request: %w", err)
153125
}
154-
155-
// Set headers
156126
baseReq.Header.Set("Content-Type", "application/json")
157127
baseReq.Header.Set("Content-Length", fmt.Sprintf("%d", len(jsonPayload)))
158-
159-
// Sign request if HMAC is enabled (not supported if secret is not present)
160128
if d.hmacService != nil {
161-
// No HMACSecret in Schedule, so skip or use default
162129
secret := ""
163130
signature := d.hmacService.SignPayload(jsonPayload, secret)
164131
baseReq.Header.Set("X-Qhronos-Signature", signature)
165132
}
166-
167-
// Track attempts
168-
attemptCount := 0
169-
var lastAttempt time.Time
133+
// Only one attempt per call
134+
attemptCount := sched.AttemptCount
135+
if attemptCount == 0 {
136+
attemptCount = 1
137+
} else {
138+
attemptCount++
139+
}
140+
req := baseReq.Clone(ctx)
141+
req.Body = ioutil.NopCloser(bytes.NewBuffer(jsonPayload))
170142
var finalStatus models.OccurrenceStatus
171143
var statusCode int
172144
var responseBody string
173145
var errorMessage string
174-
175-
// Execute webhook request with retries
176-
dispatchErr := d.retryWithBackoff(ctx, func() error {
177-
attemptCount++
178-
lastAttempt = time.Now()
179-
// Clone the base request and set the body for this attempt
180-
req := baseReq.Clone(ctx)
181-
req.Body = ioutil.NopCloser(bytes.NewBuffer(jsonPayload))
182-
183-
resp, err := d.client.Do(req)
184-
if err != nil {
185-
finalStatus = models.OccurrenceStatusFailed
186-
statusCode = 0
187-
responseBody = ""
188-
errorMessage = err.Error()
189-
return fmt.Errorf("webhook request failed: %w", err)
190-
}
191-
192-
// Handle nil response
193-
if resp == nil {
194-
finalStatus = models.OccurrenceStatusFailed
195-
statusCode = 0
196-
responseBody = ""
197-
errorMessage = "empty response from server"
198-
return fmt.Errorf("empty response from server")
199-
}
200-
201-
// Ensure response body is closed
146+
resp, err := d.client.Do(req)
147+
if err != nil {
148+
finalStatus = models.OccurrenceStatusFailed
149+
statusCode = 0
150+
responseBody = ""
151+
errorMessage = err.Error()
152+
} else if resp == nil {
153+
finalStatus = models.OccurrenceStatusFailed
154+
statusCode = 0
155+
responseBody = ""
156+
errorMessage = "empty response from server"
157+
} else {
202158
defer func() {
203159
if resp.Body != nil {
204160
resp.Body.Close()
205161
}
206162
}()
207-
208-
// Check response status
209163
statusCode = resp.StatusCode
210164
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
211165
finalStatus = models.OccurrenceStatusCompleted
212166
responseBody = ""
213167
errorMessage = ""
214-
return nil
168+
} else {
169+
finalStatus = models.OccurrenceStatusFailed
170+
responseBody = ""
171+
errorMessage = fmt.Sprintf("received non-2xx status code: %d", resp.StatusCode)
215172
}
216-
217-
// Non-2xx status code
218-
finalStatus = models.OccurrenceStatusFailed
219-
responseBody = ""
220-
errorMessage = fmt.Sprintf("received non-2xx status code: %d", resp.StatusCode)
221-
return fmt.Errorf("received non-2xx status code: %d", resp.StatusCode)
222-
})
223-
224-
// Log the result to Postgres as a new occurrence record (append-only, for history)
173+
}
225174
logOccurrence := &models.Occurrence{
226175
OccurrenceID: sched.OccurrenceID,
227176
EventID: sched.EventID,
228177
ScheduledAt: sched.ScheduledAt,
229178
Status: finalStatus,
230179
AttemptCount: attemptCount,
231-
Timestamp: lastAttempt,
180+
Timestamp: time.Now(),
232181
StatusCode: statusCode,
233182
ResponseBody: responseBody,
234183
ErrorMessage: errorMessage,
235184
}
236-
_ = d.occurrenceRepo.Create(ctx, logOccurrence) // Ignore error to avoid blocking delivery
237-
185+
_ = d.occurrenceRepo.Create(ctx, logOccurrence)
238186
// Auto-inactivate one-time events after dispatch (success or max retries)
239187
event, err := d.eventRepo.GetByID(ctx, sched.EventID)
240188
if event == nil {
@@ -244,39 +192,82 @@ func (d *Dispatcher) DispatchWebhook(ctx context.Context, sched *models.Schedule
244192
event.Status = "inactive"
245193
_ = d.eventRepo.Update(ctx, event)
246194
}
247-
248-
return dispatchErr
195+
if finalStatus == models.OccurrenceStatusCompleted {
196+
return nil
197+
}
198+
return fmt.Errorf("dispatch failed: %s", errorMessage)
249199
}
250200

251-
const dispatchProcessingKey = "dispatch:processing"
201+
const retryQueueKey = "retry:queue"
252202

253203
// Run starts a pool of dispatcher workers that process the dispatch queue
254204
func (d *Dispatcher) Run(ctx context.Context, scheduler *Scheduler, workerCount int) error {
255205
d.logger.Info("Starting dispatcher worker pool", zap.Int("worker_count", workerCount))
206+
207+
// Lua script for atomic move from retry queue to dispatch queue
208+
retryLua := `
209+
local due = redis.call('ZRANGEBYSCORE', KEYS[1], '-inf', ARGV[1])
210+
for i, v in ipairs(due) do
211+
redis.call('RPUSH', KEYS[2], v)
212+
redis.call('ZREM', KEYS[1], v)
213+
end
214+
return due
215+
`
216+
217+
// Start retry poller
218+
pollerCtx, pollerCancel := context.WithCancel(context.Background())
219+
go func() {
220+
ticker := time.NewTicker(1 * time.Second)
221+
defer ticker.Stop()
222+
for {
223+
select {
224+
case <-pollerCtx.Done():
225+
return
226+
case <-ticker.C:
227+
now := fmt.Sprintf("%f", float64(time.Now().Unix()))
228+
// Use Lua script for atomic move
229+
res, err := scheduler.redis.Eval(ctx, retryLua, []string{retryQueueKey, dispatchQueueKey}, now).Result()
230+
if err != nil {
231+
d.logger.Error("[RETRY POLLER] Lua script failed", zap.Error(err))
232+
continue
233+
}
234+
if arr, ok := res.([]interface{}); ok && len(arr) > 0 {
235+
d.logger.Debug("[RETRY POLLER] Moved items from retry queue to dispatch queue", zap.Int("count", len(arr)))
236+
}
237+
}
238+
}
239+
}()
240+
256241
workerFn := func(workerID int) {
257242
for {
258243
select {
259244
case <-ctx.Done():
260245
return
261246
default:
262-
d.logger.Debug("[DISPATCHER] Worker waiting for item", zap.Int("worker_id", workerID))
263-
// Atomically move from queue to processing
264-
data, err := scheduler.redis.BRPopLPush(ctx, dispatchQueueKey, dispatchProcessingKey, 5*time.Second).Result()
247+
itemStart := time.Now()
248+
d.logger.Debug("[DISPATCHER] Worker waiting for item", zap.Int("worker_id", workerID), zap.Time("ts", itemStart))
249+
// Pop from dispatch queue (no processing queue)
250+
popStart := time.Now()
251+
data, err := scheduler.redis.BRPop(ctx, 5*time.Second, dispatchQueueKey).Result()
252+
popEnd := time.Now()
253+
d.logger.Debug("[DISPATCHER] BRPop duration", zap.Int("worker_id", workerID), zap.Duration("duration", popEnd.Sub(popStart)), zap.Error(err))
265254
if err == redis.Nil {
266-
d.logger.Debug("[DISPATCHER] No item found, continuing", zap.Int("worker_id", workerID))
255+
d.logger.Debug("[DISPATCHER] No item found, continuing", zap.Int("worker_id", workerID), zap.Time("ts", time.Now()))
267256
continue // No item, keep waiting
268257
} else if err != nil {
269-
d.logger.Error("Worker failed to BRPOPLPUSH", zap.Int("worker_id", workerID), zap.Error(err))
258+
d.logger.Error("Worker failed to BRPOP", zap.Int("worker_id", workerID), zap.Error(err))
270259
continue
271260
}
272-
d.logger.Debug("[DISPATCHER] Worker got item from queue", zap.Int("worker_id", workerID), zap.String("data", data))
261+
// BRPop returns [queue, value]
262+
item := data[1]
263+
unmarshalStart := time.Now()
273264
var sched models.Schedule
274-
if err := json.Unmarshal([]byte(data), &sched); err != nil {
275-
d.logger.Error("Worker failed to unmarshal schedule", zap.Int("worker_id", workerID), zap.Error(err), zap.String("data", data))
276-
// Remove the bad item from processing
277-
_ = scheduler.redis.LRem(ctx, dispatchProcessingKey, 1, data).Err()
265+
if err := json.Unmarshal([]byte(item), &sched); err != nil {
266+
d.logger.Error("Worker failed to unmarshal schedule", zap.Int("worker_id", workerID), zap.Error(err), zap.String("data", item))
278267
continue
279268
}
269+
unmarshalEnd := time.Now()
270+
d.logger.Debug("[DISPATCHER] Unmarshal duration", zap.Int("worker_id", workerID), zap.Duration("duration", unmarshalEnd.Sub(unmarshalStart)))
280271
d.logger.Debug("[DISPATCHER] Worker unmarshalled schedule", zap.Int("worker_id", workerID), zap.Any("schedule", sched))
281272
d.logger.Debug("[DISPATCHER] Worker dispatching webhook", zap.Int("worker_id", workerID), zap.String("occurrence_id", sched.OccurrenceID.String()))
282273

@@ -287,55 +278,47 @@ func (d *Dispatcher) Run(ctx context.Context, scheduler *Scheduler, workerCount
287278
sched.AttemptCount++
288279
}
289280

290-
// Attempt dispatch
281+
dispatchStart := time.Now()
291282
err = d.DispatchWebhook(ctx, &sched)
283+
dispatchEnd := time.Now()
284+
d.logger.Debug("[DISPATCHER] DispatchWebhook duration", zap.Int("worker_id", workerID), zap.Duration("duration", dispatchEnd.Sub(dispatchStart)), zap.Error(err))
285+
292286
if err != nil {
293287
d.logger.Error("Worker failed to dispatch webhook", zap.Int("worker_id", workerID), zap.String("occurrence_id", sched.OccurrenceID.String()), zap.Error(err), zap.Int("attempt_count", sched.AttemptCount))
294-
// Always log failed occurrence (already done in DispatchWebhook)
295288
if sched.AttemptCount >= d.maxRetries {
296-
// Remove from processing queue, do not push to dead letter queue
297-
queueBefore, _ := scheduler.redis.LRange(ctx, dispatchProcessingKey, 0, -1).Result()
298-
d.logger.Debug("[DISPATCHER] Max retries exceeded, removing from processing queue", zap.Int("worker_id", workerID), zap.Any("queue", queueBefore), zap.String("removing", data))
299-
// Debug: print the data string and all items in the queue for comparison
300-
d.logger.Debug("[DEBUG] LRem target data", zap.String("data", data))
301-
for idx, item := range queueBefore {
302-
d.logger.Debug("[DEBUG] Queue item", zap.Int("index", idx), zap.String("item", item))
303-
}
304-
// Use non-cancellable context for removal
305-
_ = scheduler.redis.LRem(context.Background(), dispatchProcessingKey, 1, data).Err()
306-
queueAfter, _ := scheduler.redis.LRange(ctx, dispatchProcessingKey, 0, -1).Result()
307-
d.logger.Debug("[DISPATCHER] Processing queue after LRem (max retries)", zap.Int("worker_id", workerID), zap.Any("queue", queueAfter))
289+
d.logger.Debug("[DISPATCHER] Max retries exceeded, dropping item", zap.Int("worker_id", workerID), zap.Any("schedule", sched))
290+
// No further action needed, item is dropped
308291
} else {
309-
// Debug: log queue state before LSet
310-
queueBeforeLSet, _ := scheduler.redis.LRange(ctx, dispatchProcessingKey, 0, -1).Result()
311-
d.logger.Debug("[DISPATCHER] Processing queue before LSet (increment attempt count)", zap.Int("worker_id", workerID), zap.Any("queue", queueBeforeLSet))
312-
// Update the item in the processing queue with incremented attempt count
292+
zaddStart := time.Now()
293+
nextRetry := time.Now().Add(d.retryDelay).Unix()
313294
updatedData, _ := json.Marshal(sched)
314-
lsetErr := scheduler.redis.LSet(ctx, dispatchProcessingKey, 0, updatedData).Err() // LSet index 0: most recent item
315-
if lsetErr != nil {
316-
d.logger.Error("[DISPATCHER] LSet failed when incrementing attempt count", zap.Int("worker_id", workerID), zap.Error(lsetErr))
295+
d.logger.Debug("[DISPATCHER] Before ZAdd to retry queue", zap.Int("worker_id", workerID), zap.Time("ts", zaddStart))
296+
err := scheduler.redis.ZAdd(ctx, retryQueueKey, redis.Z{
297+
Score: float64(nextRetry),
298+
Member: updatedData,
299+
}).Err()
300+
zaddEnd := time.Now()
301+
d.logger.Debug("[DISPATCHER] After ZAdd to retry queue", zap.Int("worker_id", workerID), zap.Time("ts", zaddEnd), zap.Duration("duration", zaddEnd.Sub(zaddStart)), zap.Error(err))
302+
if err != nil {
303+
d.logger.Error("[DISPATCHER] Failed to add to retry queue", zap.Int("worker_id", workerID), zap.Error(err))
317304
}
318-
// Debug: log queue state after LSet
319-
queueAfterLSet, _ := scheduler.redis.LRange(ctx, dispatchProcessingKey, 0, -1).Result()
320-
d.logger.Debug("[DISPATCHER] Processing queue after LSet (increment attempt count)", zap.Int("worker_id", workerID), zap.Any("queue", queueAfterLSet))
305+
d.logger.Debug("[DISPATCHER] Item moved to retry queue", zap.Int("worker_id", workerID), zap.Any("schedule", sched), zap.Int64("next_retry", nextRetry))
321306
}
307+
itemEnd := time.Now()
308+
d.logger.Debug("[DISPATCHER] Total time for item", zap.Int("worker_id", workerID), zap.Duration("duration", itemEnd.Sub(itemStart)))
322309
continue
323310
}
324311
d.logger.Debug("[DISPATCHER] Worker successfully dispatched webhook", zap.Int("worker_id", workerID), zap.String("occurrence_id", sched.OccurrenceID.String()))
325-
// On success, remove from processing queue using the exact value popped (raw JSON string)
326-
queueBefore, _ := scheduler.redis.LRange(ctx, dispatchProcessingKey, 0, -1).Result()
327-
d.logger.Debug("[DISPATCHER] Processing queue before LRem", zap.Int("worker_id", workerID), zap.Any("queue", queueBefore), zap.String("removing", data))
328-
// Use non-cancellable context for removal
329-
_ = scheduler.redis.LRem(context.Background(), dispatchProcessingKey, 1, data).Err()
330-
queueAfter, _ := scheduler.redis.LRange(ctx, dispatchProcessingKey, 0, -1).Result()
331-
d.logger.Debug("[DISPATCHER] Processing queue after LRem", zap.Int("worker_id", workerID), zap.Any("queue", queueAfter))
312+
itemEnd := time.Now()
313+
d.logger.Debug("[DISPATCHER] Total time for item", zap.Int("worker_id", workerID), zap.Duration("duration", itemEnd.Sub(itemStart)))
332314
}
333315
}
334316
}
335317
for i := 0; i < workerCount; i++ {
336318
go workerFn(i)
337319
}
338320
<-ctx.Done()
321+
pollerCancel()
339322
d.logger.Info("Dispatcher worker pool shutting down")
340323
return ctx.Err()
341324
}

0 commit comments

Comments
 (0)