Skip to content

Commit c5f2a06

Browse files
authored
Merge pull request #2096 from simon-agent-go-expert/fix/retryable-429-500
fix: make HTTP 429 retryable when no fallback model, respect Retry-After header
2 parents 13054f4 + e529f3d commit c5f2a06

15 files changed

Lines changed: 978 additions & 46 deletions

File tree

pkg/model/provider/anthropic/adapter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func isContextLengthError(err error) bool {
7070
func (a *streamAdapter) Recv() (chat.MessageStreamResponse, error) {
7171
ok, err := a.next()
7272
if !ok {
73-
return chat.MessageStreamResponse{}, err
73+
return chat.MessageStreamResponse{}, wrapAnthropicError(err)
7474
}
7575

7676
event := a.stream.Current()

pkg/model/provider/anthropic/beta_adapter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func (c *Client) newBetaStreamAdapter(stream *ssestream.Stream[anthropic.BetaRaw
3434
func (a *betaStreamAdapter) Recv() (chat.MessageStreamResponse, error) {
3535
ok, err := a.next()
3636
if !ok {
37-
return chat.MessageStreamResponse{}, err
37+
return chat.MessageStreamResponse{}, wrapAnthropicError(err)
3838
}
3939

4040
event := a.stream.Current()
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package anthropic
2+
3+
import (
4+
"errors"
5+
6+
"github.com/anthropics/anthropic-sdk-go"
7+
8+
"github.com/docker/docker-agent/pkg/modelerrors"
9+
)
10+
11+
// wrapAnthropicError wraps an Anthropic SDK error in a *modelerrors.StatusError
12+
// to carry HTTP status code and Retry-After metadata for the retry loop.
13+
// Non-Anthropic errors (e.g. io.EOF, network errors) pass through unchanged.
14+
func wrapAnthropicError(err error) error {
15+
if err == nil {
16+
return nil
17+
}
18+
apiErr, ok := errors.AsType[*anthropic.Error](err)
19+
if !ok {
20+
return err
21+
}
22+
return modelerrors.WrapHTTPError(apiErr.StatusCode, apiErr.Response, err)
23+
}
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package anthropic
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"net/http"
7+
"net/http/httptest"
8+
"testing"
9+
"time"
10+
11+
"github.com/anthropics/anthropic-sdk-go"
12+
"github.com/stretchr/testify/assert"
13+
"github.com/stretchr/testify/require"
14+
15+
"github.com/docker/docker-agent/pkg/modelerrors"
16+
)
17+
18+
// makeTestAnthropicError creates an *anthropic.Error with the given status code and
19+
// optional Retry-After header value for testing.
20+
func makeTestAnthropicError(statusCode int, retryAfterValue string) *anthropic.Error {
21+
header := http.Header{}
22+
if retryAfterValue != "" {
23+
header.Set("Retry-After", retryAfterValue)
24+
}
25+
resp := httptest.NewRecorder().Result()
26+
resp.StatusCode = statusCode
27+
resp.Header = header
28+
// anthropic.Error.Error() dereferences Request, so we must provide a non-nil one.
29+
req, _ := http.NewRequest(http.MethodPost, "https://api.anthropic.com/v1/messages", http.NoBody)
30+
return &anthropic.Error{
31+
StatusCode: statusCode,
32+
Response: resp,
33+
Request: req,
34+
}
35+
}
36+
37+
func TestWrapAnthropicError(t *testing.T) {
38+
t.Parallel()
39+
40+
t.Run("nil returns nil", func(t *testing.T) {
41+
t.Parallel()
42+
assert.NoError(t, wrapAnthropicError(nil))
43+
})
44+
45+
t.Run("non-anthropic error passes through unchanged", func(t *testing.T) {
46+
t.Parallel()
47+
orig := errors.New("some network error")
48+
result := wrapAnthropicError(orig)
49+
assert.Equal(t, orig, result)
50+
var se *modelerrors.StatusError
51+
assert.NotErrorAs(t, result, &se)
52+
})
53+
54+
t.Run("429 without Retry-After wraps with zero RetryAfter", func(t *testing.T) {
55+
t.Parallel()
56+
apiErr := makeTestAnthropicError(429, "")
57+
result := wrapAnthropicError(apiErr)
58+
var se *modelerrors.StatusError
59+
require.ErrorAs(t, result, &se)
60+
assert.Equal(t, 429, se.StatusCode)
61+
assert.Equal(t, time.Duration(0), se.RetryAfter)
62+
// Original error still accessible
63+
assert.ErrorIs(t, result, apiErr)
64+
})
65+
66+
t.Run("429 with Retry-After header sets RetryAfter", func(t *testing.T) {
67+
t.Parallel()
68+
apiErr := makeTestAnthropicError(429, "20")
69+
result := wrapAnthropicError(apiErr)
70+
var se *modelerrors.StatusError
71+
require.ErrorAs(t, result, &se)
72+
assert.Equal(t, 429, se.StatusCode)
73+
assert.Equal(t, 20*time.Second, se.RetryAfter)
74+
})
75+
76+
t.Run("500 wraps with correct status code", func(t *testing.T) {
77+
t.Parallel()
78+
apiErr := makeTestAnthropicError(500, "")
79+
result := wrapAnthropicError(apiErr)
80+
var se *modelerrors.StatusError
81+
require.ErrorAs(t, result, &se)
82+
assert.Equal(t, 500, se.StatusCode)
83+
assert.Equal(t, time.Duration(0), se.RetryAfter)
84+
})
85+
86+
t.Run("wrapped error is classified correctly by ClassifyModelError", func(t *testing.T) {
87+
t.Parallel()
88+
apiErr := makeTestAnthropicError(429, "15")
89+
result := wrapAnthropicError(apiErr)
90+
retryable, rateLimited, retryAfter := modelerrors.ClassifyModelError(result)
91+
assert.False(t, retryable)
92+
assert.True(t, rateLimited)
93+
assert.Equal(t, 15*time.Second, retryAfter)
94+
})
95+
96+
t.Run("wrapped in fmt.Errorf still classified correctly", func(t *testing.T) {
97+
t.Parallel()
98+
apiErr := makeTestAnthropicError(429, "5")
99+
wrapped := fmt.Errorf("stream error: %w", wrapAnthropicError(apiErr))
100+
retryable, rateLimited, retryAfter := modelerrors.ClassifyModelError(wrapped)
101+
assert.False(t, retryable)
102+
assert.True(t, rateLimited)
103+
assert.Equal(t, 5*time.Second, retryAfter)
104+
})
105+
}

pkg/model/provider/gemini/adapter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func (g *StreamAdapter) Recv() (chat.MessageStreamResponse, error) {
143143
}
144144

145145
if res.err != nil {
146-
return chat.MessageStreamResponse{}, res.err
146+
return chat.MessageStreamResponse{}, wrapGeminiError(res.err)
147147
}
148148

149149
// Build response

pkg/model/provider/gemini/wrap.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package gemini
2+
3+
import (
4+
"errors"
5+
6+
"google.golang.org/genai"
7+
8+
"github.com/docker/docker-agent/pkg/modelerrors"
9+
)
10+
11+
// wrapGeminiError wraps a Gemini SDK error in a *modelerrors.StatusError
12+
// to carry HTTP status code metadata for the retry loop.
13+
// Gemini's *genai.APIError does not expose *http.Response, so no Retry-After
14+
// header extraction is possible; the RetryAfter field will be zero.
15+
// Non-Gemini errors (e.g. io.EOF, network errors) pass through unchanged.
16+
func wrapGeminiError(err error) error {
17+
if err == nil {
18+
return nil
19+
}
20+
apiErr, ok := errors.AsType[*genai.APIError](err)
21+
if !ok {
22+
return err
23+
}
24+
// Pass nil for resp — Gemini doesn't expose *http.Response.
25+
return modelerrors.WrapHTTPError(apiErr.Code, nil, err)
26+
}

pkg/model/provider/oaistream/adapter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func (a *StreamAdapter) Recv() (chat.MessageStreamResponse, error) {
3535
if !a.stream.Next() {
3636
err := a.stream.Err()
3737
if err != nil {
38-
return chat.MessageStreamResponse{}, err
38+
return chat.MessageStreamResponse{}, WrapOpenAIError(err)
3939
}
4040
return chat.MessageStreamResponse{}, io.EOF
4141
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package oaistream
2+
3+
import (
4+
"errors"
5+
6+
openaisdk "github.com/openai/openai-go/v3"
7+
8+
"github.com/docker/docker-agent/pkg/modelerrors"
9+
)
10+
11+
// WrapOpenAIError wraps an OpenAI SDK error in a *modelerrors.StatusError
12+
// to carry HTTP status code and Retry-After metadata for the retry loop.
13+
// Non-OpenAI errors (e.g. io.EOF, network errors) pass through unchanged.
14+
// Exported so openai/response_stream.go can reuse it without duplication.
15+
func WrapOpenAIError(err error) error {
16+
if err == nil {
17+
return nil
18+
}
19+
apiErr, ok := errors.AsType[*openaisdk.Error](err)
20+
if !ok {
21+
return err
22+
}
23+
return modelerrors.WrapHTTPError(apiErr.StatusCode, apiErr.Response, err)
24+
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package oaistream
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"net/http"
7+
"net/http/httptest"
8+
"testing"
9+
"time"
10+
11+
openaisdk "github.com/openai/openai-go/v3"
12+
"github.com/stretchr/testify/assert"
13+
"github.com/stretchr/testify/require"
14+
15+
"github.com/docker/docker-agent/pkg/modelerrors"
16+
)
17+
18+
// makeTestOpenAIError creates an *openai.Error with the given status code and
19+
// optional Retry-After header value for testing.
20+
func makeTestOpenAIError(statusCode int, retryAfterValue string) *openaisdk.Error {
21+
header := http.Header{}
22+
if retryAfterValue != "" {
23+
header.Set("Retry-After", retryAfterValue)
24+
}
25+
resp := httptest.NewRecorder().Result()
26+
resp.StatusCode = statusCode
27+
resp.Header = header
28+
// openai.Error.Error() dereferences Request, so we must provide a non-nil one.
29+
req, _ := http.NewRequest(http.MethodPost, "https://api.openai.com/v1/chat/completions", http.NoBody)
30+
return &openaisdk.Error{
31+
StatusCode: statusCode,
32+
Response: resp,
33+
Request: req,
34+
}
35+
}
36+
37+
func TestWrapOpenAIError(t *testing.T) {
38+
t.Parallel()
39+
40+
t.Run("nil returns nil", func(t *testing.T) {
41+
t.Parallel()
42+
assert.NoError(t, WrapOpenAIError(nil))
43+
})
44+
45+
t.Run("non-openai error passes through unchanged", func(t *testing.T) {
46+
t.Parallel()
47+
orig := errors.New("some network error")
48+
result := WrapOpenAIError(orig)
49+
assert.Equal(t, orig, result)
50+
var se *modelerrors.StatusError
51+
assert.NotErrorAs(t, result, &se)
52+
})
53+
54+
t.Run("429 without Retry-After wraps with zero RetryAfter", func(t *testing.T) {
55+
t.Parallel()
56+
apiErr := makeTestOpenAIError(429, "")
57+
result := WrapOpenAIError(apiErr)
58+
var se *modelerrors.StatusError
59+
require.ErrorAs(t, result, &se)
60+
assert.Equal(t, 429, se.StatusCode)
61+
assert.Equal(t, time.Duration(0), se.RetryAfter)
62+
// Original error still accessible
63+
assert.ErrorIs(t, result, apiErr)
64+
})
65+
66+
t.Run("429 with Retry-After header sets RetryAfter", func(t *testing.T) {
67+
t.Parallel()
68+
apiErr := makeTestOpenAIError(429, "30")
69+
result := WrapOpenAIError(apiErr)
70+
var se *modelerrors.StatusError
71+
require.ErrorAs(t, result, &se)
72+
assert.Equal(t, 429, se.StatusCode)
73+
assert.Equal(t, 30*time.Second, se.RetryAfter)
74+
})
75+
76+
t.Run("500 wraps with correct status code", func(t *testing.T) {
77+
t.Parallel()
78+
apiErr := makeTestOpenAIError(500, "")
79+
result := WrapOpenAIError(apiErr)
80+
var se *modelerrors.StatusError
81+
require.ErrorAs(t, result, &se)
82+
assert.Equal(t, 500, se.StatusCode)
83+
})
84+
85+
t.Run("wrapped error is classified correctly by ClassifyModelError", func(t *testing.T) {
86+
t.Parallel()
87+
apiErr := makeTestOpenAIError(429, "10")
88+
result := WrapOpenAIError(apiErr)
89+
retryable, rateLimited, retryAfter := modelerrors.ClassifyModelError(result)
90+
assert.False(t, retryable)
91+
assert.True(t, rateLimited)
92+
assert.Equal(t, 10*time.Second, retryAfter)
93+
})
94+
95+
t.Run("wrapped in fmt.Errorf still classified correctly", func(t *testing.T) {
96+
t.Parallel()
97+
apiErr := makeTestOpenAIError(500, "")
98+
wrapped := fmt.Errorf("stream error: %w", WrapOpenAIError(apiErr))
99+
retryable, rateLimited, _ := modelerrors.ClassifyModelError(wrapped)
100+
assert.True(t, retryable)
101+
assert.False(t, rateLimited)
102+
})
103+
}

pkg/model/provider/openai/response_stream.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/openai/openai-go/v3/responses"
1010

1111
"github.com/docker/docker-agent/pkg/chat"
12+
"github.com/docker/docker-agent/pkg/model/provider/oaistream"
1213
"github.com/docker/docker-agent/pkg/tools"
1314
)
1415

@@ -33,7 +34,7 @@ func newResponseStreamAdapter(stream *ssestream.Stream[responses.ResponseStreamE
3334
func (a *ResponseStreamAdapter) Recv() (chat.MessageStreamResponse, error) {
3435
if !a.stream.Next() {
3536
if err := a.stream.Err(); err != nil {
36-
return chat.MessageStreamResponse{}, err
37+
return chat.MessageStreamResponse{}, oaistream.WrapOpenAIError(err)
3738
}
3839
return chat.MessageStreamResponse{}, io.EOF
3940
}

0 commit comments

Comments
 (0)