Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0
with:
go-version: "^1.25"
go-version: "^1.26"
- name: Check formatting
run: |
unformatted=$(gofmt -l .)
Expand Down Expand Up @@ -52,15 +52,13 @@ jobs:
run: go test -v ./...

race-test:
# Temporarily disable until fixes are prepared.
if: false
runs-on: ubuntu-latest
steps:
- name: Check out code
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Set up Go
uses: actions/setup-go@4b73464bb391d4059bd26b0524d20df3927bd417 # v6.3.0
with:
go-version: "1.25"
go-version: "1.26"
- name: Test with -race
run: go test -v -race ./...
8 changes: 8 additions & 0 deletions mcp/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,14 @@ func (s *Server) changeAndNotify(notification string, change func() bool) {
s.mu.Lock()
defer s.mu.Unlock()
if change() && s.shouldSendListChangedNotification(notification) {
if len(s.sessions) == 0 {
if t := s.pendingNotifications[notification]; t != nil {
t.Stop()
s.pendingNotifications[notification] = nil
}
return
}

// Reset the outstanding delayed call, if any.
if t := s.pendingNotifications[notification]; t == nil {
s.pendingNotifications[notification] = time.AfterFunc(notificationDelay, func() { s.notifySessions(notification) })
Expand Down
9 changes: 7 additions & 2 deletions mcp/streamable.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/modelcontextprotocol/go-sdk/auth"
Expand Down Expand Up @@ -1515,9 +1516,13 @@ var (
// reconnectInitialDelay is the base delay for the first reconnect attempt.
//
// Mutable for testing.
reconnectInitialDelay = 1 * time.Second
reconnectInitialDelay atomic.Int64
)

func init() {
reconnectInitialDelay.Store(int64(1 * time.Second))
}

// Connect implements the [Transport] interface.
//
// The resulting [Connection] writes messages via POST requests to the
Expand Down Expand Up @@ -2196,7 +2201,7 @@ func calculateReconnectDelay(attempt int) time.Duration {
return 0
}
// Calculate the exponential backoff using the grow factor.
backoffDuration := time.Duration(float64(reconnectInitialDelay) * math.Pow(reconnectGrowFactor, float64(attempt-1)))
backoffDuration := time.Duration(float64(reconnectInitialDelay.Load()) * math.Pow(reconnectGrowFactor, float64(attempt-1)))
// Cap the backoffDuration at maxDelay.
backoffDuration = min(backoffDuration, reconnectMaxDelay)

Expand Down
16 changes: 8 additions & 8 deletions mcp/streamable_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,10 +446,10 @@ func TestStreamableClientResumption_Cancelled(t *testing.T) {
//
// TODO(#680): experiment with instead using synctest.
const tick = 10 * time.Millisecond
defer func(delay time.Duration) {
reconnectInitialDelay = delay
}(reconnectInitialDelay)
reconnectInitialDelay = 2 * tick
defer func(delay int64) {
reconnectInitialDelay.Store(delay)
}(reconnectInitialDelay.Load())
reconnectInitialDelay.Store(int64(2 * tick))

// The setup: terminate a request stream and make the resumed request hang
// indefinitely. CallTool should still exit when its context is canceled.
Expand Down Expand Up @@ -702,10 +702,10 @@ func TestStreamableClientTransientErrors(t *testing.T) {
func TestStreamableClientRetryWithoutProgress(t *testing.T) {
// Speed up reconnection delays for testing.
const tick = 10 * time.Millisecond
defer func(delay time.Duration) {
reconnectInitialDelay = delay
}(reconnectInitialDelay)
reconnectInitialDelay = tick
defer func(delay int64) {
reconnectInitialDelay.Store(delay)
}(reconnectInitialDelay.Load())
reconnectInitialDelay.Store(int64(tick))

// Use the fakeStreamableServer pattern like other tests to avoid race conditions.
ctx := context.Background()
Expand Down
11 changes: 9 additions & 2 deletions mcp/streamable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,10 @@ func TestServerTransportCleanup(t *testing.T) {

handler := NewStreamableHTTPHandler(func(*http.Request) *Server { return server }, nil)
handler.onTransportDeletion = func(sessionID string) {
chans[sessionID] <- struct{}{}
mu.Lock()
ch := chans[sessionID]
mu.Unlock()
ch <- struct{}{}
}

httpServer := httptest.NewServer(mustNotPanic(t, handler))
Expand Down Expand Up @@ -658,7 +661,11 @@ func TestServerTransportCleanup(t *testing.T) {
t.Cleanup(func() { _ = clientSession.Close() })
}

for _, ch := range chans {
mu.Lock()
channels := slices.Collect(maps.Values(chans))
mu.Unlock()

for _, ch := range channels {
select {
case <-ctx.Done():
t.Errorf("did not capture transport deletion event from all session in 10 seconds")
Expand Down
Loading