From 7210fb282d7ff123b1636a535f14a55db0781e5a Mon Sep 17 00:00:00 2001 From: Nadia Mayor Date: Wed, 7 Jan 2026 10:59:41 -0300 Subject: [PATCH 1/8] Updated BegingShutdown to have a final state and to avoid deadlocks --- struct/traits/lifecycle/lifecycle.go | 15 +++++++- struct/traits/lifecycle/lifecycle_test.go | 44 +++++++++++++++++++++++ 2 files changed, 58 insertions(+), 1 deletion(-) diff --git a/struct/traits/lifecycle/lifecycle.go b/struct/traits/lifecycle/lifecycle.go index 765b7bc..10891cb 100644 --- a/struct/traits/lifecycle/lifecycle.go +++ b/struct/traits/lifecycle/lifecycle.go @@ -49,6 +49,10 @@ func (l *Manager) InitializationComplete() bool { func (l *Manager) BeginShutdown() bool { // If we're currently initializing but not yet running, just change the status. if atomic.CompareAndSwapInt32(&l.status, StatusStarting, StatusInitializationCancelled) { + l.c.L.Lock() + atomic.StoreInt32(&l.status, StatusIdle) + l.c.Broadcast() + l.c.L.Unlock() return true } @@ -56,7 +60,16 @@ func (l *Manager) BeginShutdown() bool { return false } - l.shutdown <- struct{}{} + select { + case l.shutdown <- struct{}{}: + default: + } + + l.c.L.Lock() + atomic.StoreInt32(&l.status, StatusIdle) + l.c.Broadcast() + l.c.L.Unlock() + return true } diff --git a/struct/traits/lifecycle/lifecycle_test.go b/struct/traits/lifecycle/lifecycle_test.go index e0ee14f..88de218 100644 --- a/struct/traits/lifecycle/lifecycle_test.go +++ b/struct/traits/lifecycle/lifecycle_test.go @@ -4,6 +4,8 @@ import ( "sync/atomic" "testing" "time" + + "github.com/stretchr/testify/require" ) func TestLifecycleManager(t *testing.T) { @@ -232,3 +234,45 @@ func TestShutdownRequestWhileInitNotComplete(t *testing.T) { t.Error("the goroutine should have not executed further than the InitializationComplete check.") } } + +func TestInitializationCancelledEventuallyBecomesIdle(t *testing.T) { + var m Manager + m.Setup() + + require.True(t, m.BeginInitialization()) + require.True(t, m.BeginShutdown()) // cancela init + + done := make(chan struct{}) + go func() { + m.AwaitShutdownComplete() + close(done) + }() + + select { + case <-done: + case <-time.After(200 * time.Millisecond): + t.Fatal("initialization cancellation never transitions to Idle") + } +} + +func TestShutdownWithoutWorkerDoesNotHang(t *testing.T) { + var m Manager + m.Setup() + + require.True(t, m.BeginInitialization()) + require.True(t, m.InitializationComplete()) + + require.True(t, m.BeginShutdown()) + + done := make(chan struct{}) + go func() { + m.AwaitShutdownComplete() + close(done) + }() + + select { + case <-done: + case <-time.After(200 * time.Millisecond): + t.Fatal("shutdown hangs forever waiting for ShutdownComplete") + } +} From b6881cb6ebdf7cd28d2776dce6e2bcbcf1df490b Mon Sep 17 00:00:00 2001 From: Nadia Mayor Date: Wed, 7 Jan 2026 11:34:53 -0300 Subject: [PATCH 2/8] Updated tests cases --- struct/traits/lifecycle/lifecycle.go | 5 ----- struct/traits/lifecycle/lifecycle_test.go | 22 ---------------------- 2 files changed, 27 deletions(-) diff --git a/struct/traits/lifecycle/lifecycle.go b/struct/traits/lifecycle/lifecycle.go index 10891cb..703536e 100644 --- a/struct/traits/lifecycle/lifecycle.go +++ b/struct/traits/lifecycle/lifecycle.go @@ -65,11 +65,6 @@ func (l *Manager) BeginShutdown() bool { default: } - l.c.L.Lock() - atomic.StoreInt32(&l.status, StatusIdle) - l.c.Broadcast() - l.c.L.Unlock() - return true } diff --git a/struct/traits/lifecycle/lifecycle_test.go b/struct/traits/lifecycle/lifecycle_test.go index 88de218..2b09b02 100644 --- a/struct/traits/lifecycle/lifecycle_test.go +++ b/struct/traits/lifecycle/lifecycle_test.go @@ -254,25 +254,3 @@ func TestInitializationCancelledEventuallyBecomesIdle(t *testing.T) { t.Fatal("initialization cancellation never transitions to Idle") } } - -func TestShutdownWithoutWorkerDoesNotHang(t *testing.T) { - var m Manager - m.Setup() - - require.True(t, m.BeginInitialization()) - require.True(t, m.InitializationComplete()) - - require.True(t, m.BeginShutdown()) - - done := make(chan struct{}) - go func() { - m.AwaitShutdownComplete() - close(done) - }() - - select { - case <-done: - case <-time.After(200 * time.Millisecond): - t.Fatal("shutdown hangs forever waiting for ShutdownComplete") - } -} From c3b69f0738305edd48ea96822301578682de490d Mon Sep 17 00:00:00 2001 From: Nadia Mayor Date: Wed, 7 Jan 2026 12:13:19 -0300 Subject: [PATCH 3/8] Updated ci --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b3e82df..7ad738d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,7 +35,7 @@ jobs: - name: SonarQube Scan (Push) if: github.event_name == 'push' - uses: SonarSource/sonarcloud-github-action@v1.5 + uses: SonarSource/sonarqube-scan-action@v6.0.0 env: SONAR_TOKEN: ${{ secrets.SONARQUBE_TOKEN }} with: @@ -51,7 +51,7 @@ jobs: - name: SonarQube Scan (Pull Request) if: github.event_name == 'pull_request' - uses: SonarSource/sonarcloud-github-action@v1.5 + uses: SonarSource/sonarqube-scan-action@v6.0.0 env: SONAR_TOKEN: ${{ secrets.SONARQUBE_TOKEN }} with: From 0a552d9042128f444338e6f657fe96f7bca398eb Mon Sep 17 00:00:00 2001 From: Nadia Mayor Date: Wed, 7 Jan 2026 14:36:20 -0300 Subject: [PATCH 4/8] Updated sse to fix issue --- sse/sse.go | 53 ++++++++++++++++++++++++++----------------- sse/sse_test.go | 60 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 21 deletions(-) diff --git a/sse/sse.go b/sse/sse.go index 5c533de..05f7b27 100644 --- a/sse/sse.go +++ b/sse/sse.go @@ -49,28 +49,35 @@ func NewClient(url string, keepAlive int, dialTimeout int, logger logging.Logger return client, nil } -func (l *Client) readEvents(in *bufio.Reader, out chan<- RawEvent) { +func (l *Client) readEvents(ctx context.Context, in *bufio.Reader, out chan<- RawEvent) { eventBuilder := NewEventBuilder() + defer close(out) + defer l.logger.Info("SSE reader goroutine exited") + for { - line, err := in.ReadString(endOfLineChar) - l.logger.Debug("Incoming SSE line: ", line) - if err != nil { - if l.lifecycle.IsRunning() { // If it's supposed to be running, log an error - l.logger.Error(err) - } - close(out) + select { + case <-ctx.Done(): return - } - if line != endOfLineStr { - eventBuilder.AddLine(line) - continue + default: + line, err := in.ReadString(endOfLineChar) + l.logger.Debug("Incoming SSE line: ", line) + if err != nil { + if l.lifecycle.IsRunning() { + l.logger.Error(err) + } + return + } + if line != endOfLineStr { + eventBuilder.AddLine(line) + continue + } + + if event := eventBuilder.Build(); event != nil { + out <- event + } + eventBuilder.Reset() } - l.logger.Debug("Building SSE event") - if event := eventBuilder.Build(); event != nil { - out <- event - } - eventBuilder.Reset() } } @@ -116,7 +123,11 @@ func (l *Client) Do(params map[string]string, headers map[string]string, callbac reader := bufio.NewReader(resp.Body) eventChannel := make(chan RawEvent, 1000) - go l.readEvents(reader, eventChannel) + activeGoroutines.Add(1) + go func() { + defer activeGoroutines.Done() + l.readEvents(ctx, reader, eventChannel) + }() // Create timeout timer in case SSE dont receive notifications or keepalive messages keepAliveTimer := time.NewTimer(l.timeout) @@ -140,10 +151,10 @@ func (l *Client) Do(params map[string]string, headers map[string]string, callbac continue // don't forward empty/comment events } activeGoroutines.Add(1) - go func() { + go func(ev RawEvent) { defer activeGoroutines.Done() - callback(event) - }() + callback(ev) + }(event) case <-keepAliveTimer.C: // Timeout l.logger.Warning("SSE idle timeout.") l.lifecycle.AbnormalShutdown() diff --git a/sse/sse_test.go b/sse/sse_test.go index df6f2b3..20ca792 100644 --- a/sse/sse_test.go +++ b/sse/sse_test.go @@ -218,6 +218,66 @@ func TestConnectionEOF(t *testing.T) { mockedClient.Shutdown(true) } +type fakeRawEvent struct { + id int +} + +func (f fakeRawEvent) ID() string { return fmt.Sprintf("%d", f.id) } +func (f fakeRawEvent) Event() string { return "test" } +func (f fakeRawEvent) Data() string { return "data" } +func (f fakeRawEvent) Retry() int64 { return 0 } +func (f fakeRawEvent) IsError() bool { return false } +func (f fakeRawEvent) IsEmpty() bool { return false } + +func TestProcessEvents_ClosureBug_WithInterface(t *testing.T) { + const n = 200 + + events := make([]RawEvent, n) + for i := 0; i < n; i++ { + events[i] = fakeRawEvent{id: i} + } + + received := make([]string, 0, n) + var mu sync.Mutex + + processEventsBug(events, func(e RawEvent) { + mu.Lock() + received = append(received, e.ID()) + mu.Unlock() + }) + + if len(received) != n { + t.Fatalf("expected %d events, got %d", n, len(received)) + } + + unique := map[string]bool{} + for _, id := range received { + unique[id] = true + } + + if len(unique) != n { + t.Fatalf( + "expected %d unique events, got %d (closure bug exposed)", + n, + len(unique), + ) + } +} + +func processEventsBug(events []RawEvent, callback func(RawEvent)) { + var wg sync.WaitGroup + + for _, event := range events { + wg.Add(1) + go func(ev RawEvent) { + defer wg.Done() + callback(ev) + }(event) + } + + wg.Wait() +} + /* func TestCustom(t *testing.T) { url := `https://streaming.split.io/event-stream` From c44dd2396ead765e4ef16e270c2d988328bc1539 Mon Sep 17 00:00:00 2001 From: Nadia Mayor Date: Wed, 7 Jan 2026 14:38:59 -0300 Subject: [PATCH 5/8] Updated tests --- sse/sse_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sse/sse_test.go b/sse/sse_test.go index 20ca792..0bc7598 100644 --- a/sse/sse_test.go +++ b/sse/sse_test.go @@ -229,7 +229,7 @@ func (f fakeRawEvent) Retry() int64 { return 0 } func (f fakeRawEvent) IsError() bool { return false } func (f fakeRawEvent) IsEmpty() bool { return false } -func TestProcessEvents_ClosureBug_WithInterface(t *testing.T) { +func TestProcessEventsClosureBugWithInterface(t *testing.T) { const n = 200 events := make([]RawEvent, n) From 4e1897f52ba66895e7bb0b7acc4791b7deab7d4d Mon Sep 17 00:00:00 2001 From: Nadia Mayor Date: Thu, 8 Jan 2026 12:27:01 -0300 Subject: [PATCH 6/8] Udated sse logic to ensure shut down the SSE client --- sse/sse.go | 17 +++++++++++++ sse/sse_test.go | 68 ++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 84 insertions(+), 1 deletion(-) diff --git a/sse/sse.go b/sse/sse.go index 05f7b27..b2b051e 100644 --- a/sse/sse.go +++ b/sse/sse.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "io" "net/http" "sync" "time" @@ -25,6 +26,8 @@ type Client struct { client http.Client timeout time.Duration logger logging.LoggerInterface + bodyMu sync.Mutex + body io.ReadCloser } // NewClient creates new SSEClient @@ -92,6 +95,10 @@ func (l *Client) Do(params map[string]string, headers map[string]string, callbac ctx, cancel := context.WithCancel(context.Background()) defer func() { + l.bodyMu.Lock() + l.body = nil + l.bodyMu.Unlock() + l.logger.Info("SSE streaming exiting") cancel() activeGoroutines.Wait() @@ -111,6 +118,9 @@ func (l *Client) Do(params map[string]string, headers map[string]string, callbac l.logger.Error("Error performing get: ", req.URL.String(), err.Error()) return &ErrConnectionFailed{wrapped: fmt.Errorf("error issuing request: %w", err)} } + l.bodyMu.Lock() + l.body = resp.Body + l.bodyMu.Unlock() if resp.StatusCode != 200 { l.logger.Error(fmt.Sprintf("GET method: Status Code: %d - %s", resp.StatusCode, resp.Status)) return &ErrConnectionFailed{wrapped: fmt.Errorf("sse request status code: %d", resp.StatusCode)} @@ -170,6 +180,13 @@ func (l *Client) Shutdown(blocking bool) { return } + l.bodyMu.Lock() + if l.body != nil { + _ = l.body.Close() + l.body = nil + } + l.bodyMu.Unlock() + if blocking { l.lifecycle.AwaitShutdownComplete() } diff --git a/sse/sse_test.go b/sse/sse_test.go index 0bc7598..da9dd9f 100644 --- a/sse/sse_test.go +++ b/sse/sse_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/splitio/go-toolkit/v5/logging" + "github.com/stretchr/testify/require" ) func TestSSEErrorConnecting(t *testing.T) { @@ -278,6 +279,70 @@ func processEventsBug(events []RawEvent, callback func(RawEvent)) { wg.Wait() } +func TestShutdownDoesNotHangWhenSSEIsIdle(t *testing.T) { + // Fake SSE server: accepts connection, sends headers, then blocks forever + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.WriteHeader(http.StatusOK) + + flusher, ok := w.(http.Flusher) + require.True(t, ok) + flusher.Flush() + + // Block until client closes the connection + <-r.Context().Done() + })) + defer server.Close() + + logger := logging.NewLogger(nil) + + client, err := NewClient( + server.URL, + 70, // keepAlive + 0, // dialTimeout + logger, + ) + require.NoError(t, err) + + done := make(chan struct{}) + + // Start streaming + go func() { + _ = client.Do( + map[string]string{"channels": "test"}, + nil, + func(e RawEvent) {}, + ) + close(done) + }() + + // Give the client time to connect and block on read + time.Sleep(100 * time.Millisecond) + + shutdownDone := make(chan struct{}) + + go func() { + client.Shutdown(true) + close(shutdownDone) + }() + + select { + case <-shutdownDone: + // OK + case <-time.After(500 * time.Millisecond): + t.Fatal("Shutdown(true) blocked — SSE reader did not exit") + } + + // Ensure Do() also returns + select { + case <-done: + // OK + case <-time.After(500 * time.Millisecond): + t.Fatal("Do() did not return after shutdown") + } +} + /* func TestCustom(t *testing.T) { url := `https://streaming.split.io/event-stream` @@ -307,7 +372,8 @@ func TestCustom(t *testing.T) { <-ready fmt.Println(1) go func() { - err := client.Do( + err := client.Do +( map[string]string{ "accessToken": accessToken, "v": "1.1", From 672f4f0ffe86644299a4856fb29b526c41bfd1c7 Mon Sep 17 00:00:00 2001 From: Nadia Mayor Date: Thu, 8 Jan 2026 17:22:36 -0300 Subject: [PATCH 7/8] Updated sse.go --- sse/sse.go | 49 +++++++++++++++++++++++++++++++------------------ 1 file changed, 31 insertions(+), 18 deletions(-) diff --git a/sse/sse.go b/sse/sse.go index b2b051e..14284ff 100644 --- a/sse/sse.go +++ b/sse/sse.go @@ -28,6 +28,7 @@ type Client struct { logger logging.LoggerInterface bodyMu sync.Mutex body io.ReadCloser + cancel context.CancelFunc } // NewClient creates new SSEClient @@ -91,16 +92,23 @@ func (l *Client) Do(params map[string]string, headers map[string]string, callbac return ErrNotIdle } - activeGoroutines := sync.WaitGroup{} + var activeGoroutines sync.WaitGroup ctx, cancel := context.WithCancel(context.Background()) + + l.bodyMu.Lock() + l.cancel = cancel + l.bodyMu.Unlock() + defer func() { + l.logger.Info("SSE streaming exiting") + + cancel() + l.bodyMu.Lock() - l.body = nil + l.cancel = nil l.bodyMu.Unlock() - l.logger.Info("SSE streaming exiting") - cancel() activeGoroutines.Wait() l.lifecycle.ShutdownComplete() }() @@ -110,22 +118,21 @@ func (l *Client) Do(params map[string]string, headers map[string]string, callbac return &ErrConnectionFailed{wrapped: fmt.Errorf("error building request: %w", err)} } - l.logger.Debug("[GET] ", req.URL.String()) - l.logger.Debug(fmt.Sprintf("Headers: %v", req.Header)) - resp, err := l.client.Do(req) if err != nil { - l.logger.Error("Error performing get: ", req.URL.String(), err.Error()) return &ErrConnectionFailed{wrapped: fmt.Errorf("error issuing request: %w", err)} } + + if resp.StatusCode != http.StatusOK { + _ = resp.Body.Close() + return &ErrConnectionFailed{ + wrapped: fmt.Errorf("sse request status code: %d", resp.StatusCode), + } + } + l.bodyMu.Lock() l.body = resp.Body l.bodyMu.Unlock() - if resp.StatusCode != 200 { - l.logger.Error(fmt.Sprintf("GET method: Status Code: %d - %s", resp.StatusCode, resp.Status)) - return &ErrConnectionFailed{wrapped: fmt.Errorf("sse request status code: %d", resp.StatusCode)} - } - defer resp.Body.Close() if !l.lifecycle.InitializationComplete() { return nil @@ -133,23 +140,24 @@ func (l *Client) Do(params map[string]string, headers map[string]string, callbac reader := bufio.NewReader(resp.Body) eventChannel := make(chan RawEvent, 1000) + activeGoroutines.Add(1) go func() { defer activeGoroutines.Done() l.readEvents(ctx, reader, eventChannel) }() - // Create timeout timer in case SSE dont receive notifications or keepalive messages keepAliveTimer := time.NewTimer(l.timeout) defer keepAliveTimer.Stop() for { select { case <-l.lifecycle.ShutdownRequested(): - l.logger.Info("Shutting down listener") return nil + case event, ok := <-eventChannel: keepAliveTimer.Reset(l.timeout) + if !ok { if l.lifecycle.IsRunning() { return ErrReadingStream @@ -158,15 +166,16 @@ func (l *Client) Do(params map[string]string, headers map[string]string, callbac } if event.IsEmpty() { - continue // don't forward empty/comment events + continue } + activeGoroutines.Add(1) go func(ev RawEvent) { defer activeGoroutines.Done() callback(ev) }(event) - case <-keepAliveTimer.C: // Timeout - l.logger.Warning("SSE idle timeout.") + + case <-keepAliveTimer.C: l.lifecycle.AbnormalShutdown() return ErrTimeout } @@ -181,6 +190,10 @@ func (l *Client) Shutdown(blocking bool) { } l.bodyMu.Lock() + if l.cancel != nil { + l.cancel() + l.cancel = nil + } if l.body != nil { _ = l.body.Close() l.body = nil From 5e255b338b2d01acddbec95ef11e1d73ce301333 Mon Sep 17 00:00:00 2001 From: Nadia Mayor Date: Sun, 11 Jan 2026 20:06:31 -0300 Subject: [PATCH 8/8] Updated sse --- sse/sse.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sse/sse.go b/sse/sse.go index 14284ff..74782b2 100644 --- a/sse/sse.go +++ b/sse/sse.go @@ -152,6 +152,9 @@ func (l *Client) Do(params map[string]string, headers map[string]string, callbac for { select { + case <-ctx.Done(): + return nil + case <-l.lifecycle.ShutdownRequested(): return nil