From 884451365615660ad42c544510f440fd48d08bca Mon Sep 17 00:00:00 2001 From: Maciek Kisiel Date: Fri, 20 Mar 2026 15:57:05 +0000 Subject: [PATCH] mcp: re-enable race test after fixing data races --- .github/workflows/test.yml | 6 ++---- mcp/server.go | 8 ++++++++ mcp/streamable.go | 9 +++++++-- mcp/streamable_client_test.go | 16 ++++++++-------- mcp/streamable_test.go | 11 +++++++++-- 5 files changed, 34 insertions(+), 16 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index a6824b85..4c07f0fd 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -18,7 +18,7 @@ jobs: - name: Set up Go uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0 with: - go-version: "^1.25" + go-version: "^1.26" - name: Check formatting run: | unformatted=$(gofmt -l .) @@ -52,8 +52,6 @@ jobs: run: go test -v ./... race-test: - # Temporarily disable until fixes are prepared. - if: false runs-on: ubuntu-latest steps: - name: Check out code @@ -61,6 +59,6 @@ jobs: - name: Set up Go uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0 with: - go-version: "1.25" + go-version: "1.26" - name: Test with -race run: go test -v -race ./... diff --git a/mcp/server.go b/mcp/server.go index 00f88c0e..c13955c5 100644 --- a/mcp/server.go +++ b/mcp/server.go @@ -628,6 +628,14 @@ func (s *Server) changeAndNotify(notification string, change func() bool) { s.mu.Lock() defer s.mu.Unlock() if change() && s.shouldSendListChangedNotification(notification) { + if len(s.sessions) == 0 { + if t := s.pendingNotifications[notification]; t != nil { + t.Stop() + s.pendingNotifications[notification] = nil + } + return + } + // Reset the outstanding delayed call, if any. if t := s.pendingNotifications[notification]; t == nil { s.pendingNotifications[notification] = time.AfterFunc(notificationDelay, func() { s.notifySessions(notification) }) diff --git a/mcp/streamable.go b/mcp/streamable.go index 16bca070..7f09b40b 100644 --- a/mcp/streamable.go +++ b/mcp/streamable.go @@ -26,6 +26,7 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/modelcontextprotocol/go-sdk/auth" @@ -1515,9 +1516,13 @@ var ( // reconnectInitialDelay is the base delay for the first reconnect attempt. // // Mutable for testing. - reconnectInitialDelay = 1 * time.Second + reconnectInitialDelay atomic.Int64 ) +func init() { + reconnectInitialDelay.Store(int64(1 * time.Second)) +} + // Connect implements the [Transport] interface. // // The resulting [Connection] writes messages via POST requests to the @@ -2196,7 +2201,7 @@ func calculateReconnectDelay(attempt int) time.Duration { return 0 } // Calculate the exponential backoff using the grow factor. - backoffDuration := time.Duration(float64(reconnectInitialDelay) * math.Pow(reconnectGrowFactor, float64(attempt-1))) + backoffDuration := time.Duration(float64(reconnectInitialDelay.Load()) * math.Pow(reconnectGrowFactor, float64(attempt-1))) // Cap the backoffDuration at maxDelay. backoffDuration = min(backoffDuration, reconnectMaxDelay) diff --git a/mcp/streamable_client_test.go b/mcp/streamable_client_test.go index d189ca41..e805aa5e 100644 --- a/mcp/streamable_client_test.go +++ b/mcp/streamable_client_test.go @@ -446,10 +446,10 @@ func TestStreamableClientResumption_Cancelled(t *testing.T) { // // TODO(#680): experiment with instead using synctest. const tick = 10 * time.Millisecond - defer func(delay time.Duration) { - reconnectInitialDelay = delay - }(reconnectInitialDelay) - reconnectInitialDelay = 2 * tick + defer func(delay int64) { + reconnectInitialDelay.Store(delay) + }(reconnectInitialDelay.Load()) + reconnectInitialDelay.Store(int64(2 * tick)) // The setup: terminate a request stream and make the resumed request hang // indefinitely. CallTool should still exit when its context is canceled. @@ -702,10 +702,10 @@ func TestStreamableClientTransientErrors(t *testing.T) { func TestStreamableClientRetryWithoutProgress(t *testing.T) { // Speed up reconnection delays for testing. const tick = 10 * time.Millisecond - defer func(delay time.Duration) { - reconnectInitialDelay = delay - }(reconnectInitialDelay) - reconnectInitialDelay = tick + defer func(delay int64) { + reconnectInitialDelay.Store(delay) + }(reconnectInitialDelay.Load()) + reconnectInitialDelay.Store(int64(tick)) // Use the fakeStreamableServer pattern like other tests to avoid race conditions. ctx := context.Background() diff --git a/mcp/streamable_test.go b/mcp/streamable_test.go index f688a908..d1dc482a 100644 --- a/mcp/streamable_test.go +++ b/mcp/streamable_test.go @@ -626,7 +626,10 @@ func TestServerTransportCleanup(t *testing.T) { handler := NewStreamableHTTPHandler(func(*http.Request) *Server { return server }, nil) handler.onTransportDeletion = func(sessionID string) { - chans[sessionID] <- struct{}{} + mu.Lock() + ch := chans[sessionID] + mu.Unlock() + ch <- struct{}{} } httpServer := httptest.NewServer(mustNotPanic(t, handler)) @@ -658,7 +661,11 @@ func TestServerTransportCleanup(t *testing.T) { t.Cleanup(func() { _ = clientSession.Close() }) } - for _, ch := range chans { + mu.Lock() + channels := slices.Collect(maps.Values(chans)) + mu.Unlock() + + for _, ch := range channels { select { case <-ctx.Done(): t.Errorf("did not capture transport deletion event from all session in 10 seconds")