Skip to content

Commit 659778b

Browse files
committed
feat: singleflight to deduplicate GET requests
1 parent 6f8acba commit 659778b

7 files changed

Lines changed: 227 additions & 34 deletions

File tree

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ go get github.com/aiven/go-client-codegen
2121
| `AIVEN_USER_AGENT` | `string` | User Agent |
2222
| `AIVEN_DEBUG` | `bool` | Debug Output Flag (stderr) |
2323

24+
See all configuration options in [`client.go`](client.go).
25+
2426
#### Via Constructor Options
2527

2628
```go

client.go

Lines changed: 51 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"time"
1717

1818
"golang.org/x/exp/slices"
19+
"golang.org/x/sync/singleflight"
1920

2021
"github.com/hashicorp/go-multierror"
2122
"github.com/hashicorp/go-retryablehttp"
@@ -105,24 +106,28 @@ func NewClient(opts ...Option) (Client, error) {
105106
// Default values: 26 seconds
106107
// Changed values: 67 seconds
107108
type aivenClient struct {
108-
Host string `envconfig:"AIVEN_WEB_URL" default:"https://api.aiven.io"`
109-
UserAgent string `envconfig:"AIVEN_USER_AGENT" default:"aiven-go-client/v3"`
110-
Token string `envconfig:"AIVEN_TOKEN"`
111-
Debug bool `envconfig:"AIVEN_DEBUG"`
112-
RetryMax int `envconfig:"AIVEN_CLIENT_RETRY_MAX" default:"6"`
113-
RetryWaitMin time.Duration `envconfig:"AIVEN_CLIENT_RETRY_WAIT_MIN" default:"2s"`
114-
RetryWaitMax time.Duration `envconfig:"AIVEN_CLIENT_RETRY_WAIT_MAX" default:"15s"`
115-
logger zerolog.Logger
116-
doer Doer
109+
Host string `envconfig:"AIVEN_WEB_URL" default:"https://api.aiven.io"`
110+
UserAgent string `envconfig:"AIVEN_USER_AGENT" default:"aiven-go-client/v3"`
111+
Token string `envconfig:"AIVEN_TOKEN"`
112+
Debug bool `envconfig:"AIVEN_DEBUG"`
113+
RetryMax int `envconfig:"AIVEN_CLIENT_RETRY_MAX" default:"6"`
114+
RetryWaitMin time.Duration `envconfig:"AIVEN_CLIENT_RETRY_WAIT_MIN" default:"2s"`
115+
RetryWaitMax time.Duration `envconfig:"AIVEN_CLIENT_RETRY_WAIT_MAX" default:"15s"`
116+
EnableSingleFlight bool `envconfig:"AIVEN_CLIENT_ENABLE_SINGLE_FLIGHT" default:"true"`
117+
logger zerolog.Logger
118+
doer Doer
119+
singleflight singleflight.Group
117120
}
118121

119122
// OperationIDKey is the key used to store the operation ID in the context.
120123
type OperationIDKey struct{}
121124

122-
func (d *aivenClient) Do(ctx context.Context, operationID, method, path string, in any, query ...[2]string) ([]byte, error) {
125+
func (d *aivenClient) Do(ctx context.Context, operationID, method, path string, in any, query ...[2]string) (_ []byte, err error) {
123126
ctx = context.WithValue(ctx, OperationIDKey{}, operationID)
124-
var rsp *http.Response
125-
var err error
127+
queryString := fmtQuery(operationID, query...)
128+
129+
var statusCode int
130+
var shared bool
126131

127132
if d.Debug {
128133
start := time.Now()
@@ -133,54 +138,75 @@ func (d *aivenClient) Do(ctx context.Context, operationID, method, path string,
133138
if err != nil {
134139
event = d.logger.Error().Err(err)
135140
} else {
136-
event = d.logger.Info().Str("status", rsp.Status)
141+
event = d.logger.Info()
137142
}
138143

139144
event.Ctx(ctx).
140145
Stringer("duration", end).
141146
Str("operationID", operationID).
142147
Str("method", method).
143148
Str("path", path).
144-
Str("query", fmtQuery(operationID, query...)).
149+
Str("query", queryString).
150+
Int("status_code", statusCode).
151+
Bool("shared", shared).
145152
Send()
146153
}()
147154
}
148155

149-
rsp, err = d.do(ctx, operationID, method, path, in, query...)
156+
var body []byte
157+
if d.EnableSingleFlight && (method == http.MethodGet || method == http.MethodHead || method == http.MethodOptions || method == http.MethodTrace) {
158+
type result struct {
159+
statusCode int
160+
body []byte
161+
}
162+
key := strings.Join([]string{method, d.Host, path, queryString}, "|")
163+
v, serr, sh := d.singleflight.Do(key, func() (any, error) {
164+
statusCode, body, err := d.do(ctx, method, path, in, queryString)
165+
return result{statusCode: statusCode, body: body}, err
166+
})
167+
res := v.(result)
168+
statusCode, body, err, shared = res.statusCode, res.body, serr, sh
169+
} else {
170+
statusCode, body, err = d.do(ctx, method, path, in, queryString)
171+
}
150172
if err != nil {
151173
return nil, err
152174
}
153175

154-
defer func() {
155-
err = multierror.Append(rsp.Body.Close()).ErrorOrNil()
156-
}()
157-
158-
return fromResponse(operationID, rsp)
176+
return fromBytes(operationID, statusCode, body)
159177
}
160178

161-
func (d *aivenClient) do(ctx context.Context, operationID, method, path string, in any, query ...[2]string) (*http.Response, error) {
179+
func (d *aivenClient) do(ctx context.Context, method, path string, in any, queryString string) (int, []byte, error) {
162180
var body io.Reader
163181

164182
if !(in == nil || isEmpty(in)) {
165183
b, err := json.Marshal(in)
166184
if err != nil {
167-
return nil, err
185+
return 0, nil, err
168186
}
169187

170188
body = bytes.NewBuffer(b)
171189
}
172190

173191
req, err := http.NewRequestWithContext(ctx, method, d.Host+path, body)
174192
if err != nil {
175-
return nil, err
193+
return 0, nil, err
176194
}
177195

178196
req.Header.Set("Content-Type", "application/json")
179197
req.Header.Set("User-Agent", d.UserAgent)
180198
req.Header.Set("Authorization", "aivenv1 "+d.Token)
181-
req.URL.RawQuery = fmtQuery(operationID, query...)
199+
req.URL.RawQuery = queryString
182200

183-
return d.doer.Do(req)
201+
rsp, err := d.doer.Do(req)
202+
if err != nil {
203+
return 0, nil, err
204+
}
205+
defer func() {
206+
err = multierror.Append(rsp.Body.Close()).ErrorOrNil()
207+
}()
208+
respBody, err := io.ReadAll(rsp.Body)
209+
return rsp.StatusCode, respBody, err
184210
}
185211

186212
func isEmpty(a any) bool {

client_test.go

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"net/http/httptest"
99
"os"
1010
"strings"
11+
"sync"
1112
"sync/atomic"
1213
"testing"
1314
"time"
@@ -299,3 +300,151 @@ func TestServiceIntegrationEndpointGet(t *testing.T) {
299300
// All calls are received
300301
assert.EqualValues(t, 1, callCount)
301302
}
303+
304+
func TestSingleFlightDeduplication(t *testing.T) {
305+
var callCount int64
306+
307+
mux := http.NewServeMux()
308+
mux.HandleFunc(
309+
"/v1/project/test-project/service",
310+
func(w http.ResponseWriter, _ *http.Request) {
311+
w.Header().Set("Content-Type", "application/json")
312+
w.WriteHeader(http.StatusOK)
313+
_, err := w.Write([]byte(`{"services": []}`))
314+
assert.NoError(t, err)
315+
atomic.AddInt64(&callCount, 1)
316+
},
317+
)
318+
319+
server := httptest.NewServer(mux)
320+
defer server.Close()
321+
322+
c, err := NewClient(TokenOpt("token"), HostOpt(server.URL), UserAgentOpt("unit-test"))
323+
require.NotNil(t, c)
324+
require.NoError(t, err)
325+
326+
ctx := t.Context()
327+
328+
var wg sync.WaitGroup
329+
errs := make(chan error, 100)
330+
for range 100 {
331+
wg.Go(func() {
332+
_, err := c.ServiceList(ctx, "test-project")
333+
errs <- err
334+
})
335+
}
336+
wg.Wait()
337+
close(errs)
338+
339+
for err := range errs {
340+
require.NoError(t, err)
341+
}
342+
343+
// We expect less than 10 calls to the API because of the single flight deduplication.
344+
assert.Less(t, callCount, int64(10))
345+
}
346+
347+
// TestSingleFlightDifferentProjects tests that singleflight doesn't merge different paths.
348+
func TestSingleFlightDifferentProjects(t *testing.T) {
349+
var callCount int64
350+
351+
mux := http.NewServeMux()
352+
mux.HandleFunc(
353+
"/v1/project/project-a/service",
354+
func(w http.ResponseWriter, _ *http.Request) {
355+
w.Header().Set("Content-Type", "application/json")
356+
w.WriteHeader(http.StatusOK)
357+
_, err := w.Write([]byte(`{"services": []}`))
358+
assert.NoError(t, err)
359+
atomic.AddInt64(&callCount, 1)
360+
},
361+
)
362+
mux.HandleFunc(
363+
"/v1/project/project-b/service",
364+
func(w http.ResponseWriter, _ *http.Request) {
365+
w.Header().Set("Content-Type", "application/json")
366+
w.WriteHeader(http.StatusOK)
367+
_, err := w.Write([]byte(`{"services": []}`))
368+
assert.NoError(t, err)
369+
atomic.AddInt64(&callCount, 1)
370+
},
371+
)
372+
373+
server := httptest.NewServer(mux)
374+
defer server.Close()
375+
376+
c, err := NewClient(TokenOpt("token"), HostOpt(server.URL), UserAgentOpt("unit-test"))
377+
require.NotNil(t, c)
378+
require.NoError(t, err)
379+
380+
ctx := t.Context()
381+
382+
var wg sync.WaitGroup
383+
errs := make(chan error, 2)
384+
wg.Go(func() {
385+
_, err := c.ServiceList(ctx, "project-a")
386+
errs <- err
387+
})
388+
wg.Go(func() {
389+
_, err := c.ServiceList(ctx, "project-b")
390+
errs <- err
391+
})
392+
wg.Wait()
393+
close(errs)
394+
395+
for err := range errs {
396+
require.NoError(t, err)
397+
}
398+
399+
// We expect exactly 2 calls to the API because different project names should not be deduplicated.
400+
assert.Equal(t, int64(2), callCount)
401+
}
402+
403+
func TestSingleFlightMethodIsolation(t *testing.T) {
404+
var callCount int64
405+
406+
mux := http.NewServeMux()
407+
mux.HandleFunc(
408+
"/v1/test",
409+
func(w http.ResponseWriter, _ *http.Request) {
410+
w.Header().Set("Content-Type", "application/json")
411+
w.WriteHeader(http.StatusOK)
412+
_, err := w.Write([]byte(`{}`))
413+
assert.NoError(t, err)
414+
atomic.AddInt64(&callCount, 1)
415+
},
416+
)
417+
418+
server := httptest.NewServer(mux)
419+
defer server.Close()
420+
421+
d := &aivenClient{
422+
Host: server.URL,
423+
UserAgent: "unit-test",
424+
Token: "token",
425+
EnableSingleFlight: true,
426+
doer: server.Client(),
427+
}
428+
429+
ctx := t.Context()
430+
431+
var wg sync.WaitGroup
432+
errs := make(chan error, 2)
433+
wg.Go(func() {
434+
_, err := d.Do(ctx, "op-get", http.MethodGet, "/v1/test", nil)
435+
errs <- err
436+
})
437+
wg.Go(func() {
438+
_, err := d.Do(ctx, "op-head", http.MethodHead, "/v1/test", nil)
439+
errs <- err
440+
})
441+
wg.Wait()
442+
close(errs)
443+
444+
for err := range errs {
445+
require.NoError(t, err)
446+
}
447+
448+
// We expect exactly 2 calls to the API because different methods should not be deduplicated.
449+
assert.Equal(t, int64(2), callCount)
450+
}

error.go

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -55,19 +55,14 @@ func IsAlreadyExists(err error) bool {
5555
return errors.As(err, &e) && strings.Contains(e.Message, "already exists") && e.Status == http.StatusConflict
5656
}
5757

58-
func fromResponse(operationID string, rsp *http.Response) ([]byte, error) {
59-
e := Error{OperationID: operationID, Status: rsp.StatusCode}
60-
b, err := io.ReadAll(rsp.Body)
61-
if err != nil {
62-
e.Message = fmt.Sprintf("body read error: %s", err)
63-
return nil, e
64-
}
58+
func fromBytes(operationID string, statusCode int, b []byte) ([]byte, error) {
59+
e := Error{OperationID: operationID, Status: statusCode}
6560

66-
if rsp.StatusCode < 200 || rsp.StatusCode >= 300 {
61+
if statusCode < 200 || statusCode >= 300 {
6762
// According to the documentation,
6863
// failed responses must have "errors" and "message" fields
6964
// https://api.aiven.io/doc/#section/Responses/Failed-responses
70-
err = json.Unmarshal(b, &e)
65+
err := json.Unmarshal(b, &e)
7166
if err != nil {
7267
// 1. The body might contain sensitive data
7368
// 2. It might fail the unmarshalling into Error and still be a valid json
@@ -84,3 +79,14 @@ func fromResponse(operationID string, rsp *http.Response) ([]byte, error) {
8479

8580
return b, nil
8681
}
82+
83+
func fromResponse(operationID string, rsp *http.Response) ([]byte, error) {
84+
b, err := io.ReadAll(rsp.Body)
85+
if err != nil {
86+
e := Error{OperationID: operationID, Status: rsp.StatusCode}
87+
e.Message = fmt.Sprintf("body read error: %s", err)
88+
return nil, e
89+
}
90+
91+
return fromBytes(operationID, rsp.StatusCode, b)
92+
}

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ require (
2424
github.com/pmezard/go-difflib v1.0.0 // indirect
2525
github.com/samber/lo v1.52.0 // indirect
2626
github.com/stretchr/objx v0.5.2 // indirect
27+
golang.org/x/sync v0.19.0 // indirect
2728
golang.org/x/sys v0.21.0 // indirect
2829
golang.org/x/text v0.22.0 // indirect
2930
)

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu
4343
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
4444
golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3 h1:hNQpMuAJe5CtcUqCXaWga3FHu+kQvCqcsoVaQgSV60o=
4545
golang.org/x/exp v0.0.0-20240112132812-db7319d0e0e3/go.mod h1:idGWGoKP1toJGkd5/ig9ZLuPcZBC3ewk7SzmH0uou08=
46+
golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4=
47+
golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
4648
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
4749
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
4850
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

option.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,3 +62,10 @@ func RetryWaitMaxOpt(retryWaitMax time.Duration) Option {
6262
d.RetryWaitMax = retryWaitMax
6363
}
6464
}
65+
66+
// EnableSingleFlightOpt enables singleflight for deduplicating concurrent identical requests
67+
func EnableSingleFlightOpt(enableSingleFlight bool) Option {
68+
return func(d *aivenClient) {
69+
d.EnableSingleFlight = enableSingleFlight
70+
}
71+
}

0 commit comments

Comments
 (0)