Skip to content

Commit 5ff6106

Browse files
ferhatelmasLinkinStars
authored andcommitted
fix: address comments and add a test
Signed-off-by: ferhat elmas <elmas.ferhat@gmail.com>
1 parent 26d868b commit 5ff6106

5 files changed

Lines changed: 59 additions & 7 deletions

File tree

internal/base/queue/queue.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,17 @@ import (
2626
"github.com/segmentfault/pacman/log"
2727
)
2828

29+
type Service[T any] interface {
30+
// Send enqueues a message to be processed asynchronously.
31+
Send(ctx context.Context, msg T)
32+
33+
// RegisterHandler sets the handler function for processing messages.
34+
RegisterHandler(handler func(ctx context.Context, msg T) error)
35+
36+
// Close gracefully shuts down the queue, waiting for pending messages to be processed.
37+
Close()
38+
}
39+
2940
// Queue is a generic message queue service that processes messages asynchronously.
3041
// It is thread-safe and supports graceful shutdown.
3142
type Queue[T any] struct {
@@ -51,10 +62,9 @@ func New[T any](name string, bufferSize int) *Queue[T] {
5162
// It will block if the queue is full.
5263
func (q *Queue[T]) Send(ctx context.Context, msg T) {
5364
q.mu.RLock()
54-
closed := q.closed
55-
q.mu.RUnlock()
65+
defer q.mu.RUnlock()
5666

57-
if closed {
67+
if q.closed {
5868
log.Warnf("[%s] queue is closed, dropping message", q.name)
5969
return
6070
}

internal/base/queue/queue_test.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package queue
2121

2222
import (
2323
"context"
24+
"fmt"
2425
"sync"
2526
"sync/atomic"
2627
"testing"
@@ -209,3 +210,44 @@ func TestQueue_ConcurrentRegisterHandler(t *testing.T) {
209210
}
210211
wg.Wait()
211212
}
213+
214+
// TestQueue_SendCloseRace is a regression test for the race condition between
215+
// Send and Close. Without proper synchronization, concurrent Send and Close
216+
// calls could cause a "send on closed channel" panic.
217+
// Run with: go test -race -run TestQueue_SendCloseRace
218+
func TestQueue_SendCloseRace(t *testing.T) {
219+
for i := range 100 {
220+
t.Run(fmt.Sprintf("iteration_%d", i), func(t *testing.T) {
221+
// Use large buffer to avoid blocking on channel send while holding RLock
222+
q := New[*testMessage]("test-race", 1000)
223+
q.RegisterHandler(func(ctx context.Context, msg *testMessage) error {
224+
return nil
225+
})
226+
227+
var wg sync.WaitGroup
228+
229+
// Use cancellable context so senders can exit when Close is called
230+
ctx, cancel := context.WithCancel(context.Background())
231+
232+
// Start multiple senders
233+
for j := range 10 {
234+
wg.Add(1)
235+
go func(id int) {
236+
defer wg.Done()
237+
for k := range 100 {
238+
q.Send(ctx, &testMessage{ID: id*1000 + k})
239+
}
240+
}(j)
241+
}
242+
243+
// Close while senders are still running
244+
go func() {
245+
time.Sleep(time.Microsecond * 10)
246+
cancel() // Cancel context to unblock any waiting senders
247+
q.Close()
248+
}()
249+
250+
wg.Wait()
251+
})
252+
}
253+
}

internal/service/activityqueue/activity_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
"github.com/apache/answer/internal/schema"
2525
)
2626

27-
type Service = *queue.Queue[*schema.ActivityMsg]
27+
type Service queue.Service[*schema.ActivityMsg]
2828

2929
func NewService() Service {
3030
return queue.New[*schema.ActivityMsg]("activity", 128)

internal/service/eventqueue/event_queue.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
"github.com/apache/answer/internal/schema"
2525
)
2626

27-
type Service = *queue.Queue[*schema.EventMsg]
27+
type Service queue.Service[*schema.EventMsg]
2828

2929
func NewService() Service {
3030
return queue.New[*schema.EventMsg]("event", 128)

internal/service/noticequeue/notice_queue.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@ import (
2424
"github.com/apache/answer/internal/schema"
2525
)
2626

27-
type Service = *queue.Queue[*schema.NotificationMsg]
27+
type Service queue.Service[*schema.NotificationMsg]
2828

2929
func NewService() Service {
3030
return queue.New[*schema.NotificationMsg]("notification", 128)
3131
}
3232

33-
type ExternalService = *queue.Queue[*schema.ExternalNotificationMsg]
33+
type ExternalService queue.Service[*schema.ExternalNotificationMsg]
3434

3535
func NewExternalService() ExternalService {
3636
return queue.New[*schema.ExternalNotificationMsg]("external_notification", 128)

0 commit comments

Comments
 (0)