Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions dune/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package dune
import (
"fmt"
"io"
"os"
"time"

"github.com/duneanalytics/duneapi-client-go/models"
Expand Down Expand Up @@ -70,16 +69,20 @@ func (e *execution) GetResultsCSV() (io.Reader, error) {
}

func (e *execution) WaitGetResults(pollInterval time.Duration, maxRetries int) (*models.ResultsResponse, error) {
errCount := 0
errAttempts := 0
for {
resultsResp, err := e.client.QueryResultsV2(e.ID, models.ResultOptions{})
if err != nil {
if maxRetries != 0 && errCount > maxRetries {
errAttempts++
if maxRetries != 0 && errAttempts >= maxRetries {
return nil, fmt.Errorf("%w. %s", ErrorRetriesExhausted, err.Error())
}
fmt.Fprintln(os.Stderr, "failed to retrieve results. Retrying...\n", err)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove the print here?

errCount += 1
} else if resultsResp.IsExecutionFinished {
sleep := nextBackoff(errAttempts, defaultRetryPolicy)
time.Sleep(sleep)
continue
}
errAttempts = 0
if resultsResp.IsExecutionFinished {
return resultsResp, nil
}
time.Sleep(pollInterval)
Expand Down
150 changes: 138 additions & 12 deletions dune/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"time"
)

var ErrorReqUnsuccessful = errors.New("request was not successful")
Expand All @@ -13,6 +16,88 @@ type ErrorResponse struct {
Error string `json:"error"`
}

type RateLimit struct {
Limit int
Remaining int
Reset int64
}

type APIError struct {
StatusCode int
StatusText string
BodySnippet string
RateLimit *RateLimit
RetryAfter time.Duration
}

func (e *APIError) Error() string {
if e.BodySnippet != "" {
return fmt.Sprintf("http %d %s: %s", e.StatusCode, e.StatusText, e.BodySnippet)
}
return fmt.Sprintf("http %d %s", e.StatusCode, e.StatusText)
}

type RetryPolicy struct {
MaxAttempts int
InitialBackoff time.Duration
MaxBackoff time.Duration
Jitter time.Duration
RetryableStatusCodes []int
}

var defaultRetryPolicy = RetryPolicy{
MaxAttempts: 3,
InitialBackoff: 500 * time.Millisecond,
MaxBackoff: 5 * time.Second,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that rate limits apply on a minute basis, I think it makes sense to bump these a bit higher, as it's very possible that you'll still be rate limited after waiting only a couple of seconds

Jitter: 100 * time.Millisecond,
RetryableStatusCodes: []int{429, 500, 502, 503, 504},
}

func parseRateLimitHeaders(h http.Header) *RateLimit {
limStr := h.Get("X-RateLimit-Limit")
remStr := h.Get("X-RateLimit-Remaining")
resetStr := h.Get("X-RateLimit-Reset")

var lim, rem int
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to shorten variable names :)

Suggested change
var lim, rem int
var limit, remaining int

var reset int64

if limStr != "" {
if v, err := strconv.Atoi(limStr); err == nil {
lim = v
}
}
if remStr != "" {
if v, err := strconv.Atoi(remStr); err == nil {
rem = v
}
}
if resetStr != "" {
if v, err := strconv.ParseInt(resetStr, 10, 64); err == nil {
reset = v
}
}

if lim == 0 && rem == 0 && reset == 0 {
return nil
}
return &RateLimit{Limit: lim, Remaining: rem, Reset: reset}
}

func nextBackoff(attempt int, p RetryPolicy) time.Duration {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And would you mind moving the retry and backoff code into a separate file please (e.g. dune/retries.go) 🙏

Suggested change
func nextBackoff(attempt int, p RetryPolicy) time.Duration {
func (p RetryPolicy) nextBackoff(attempt int) time.Duration {

b := p.InitialBackoff
for i := 1; i < attempt; i++ {
b *= 2
if b > p.MaxBackoff {
b = p.MaxBackoff
break
}
}
if p.Jitter > 0 {
b += p.Jitter
}
return b
}

func decodeBody(resp *http.Response, dest interface{}) error {
defer resp.Body.Close()
err := json.NewDecoder(resp.Body).Decode(dest)
Expand All @@ -24,20 +109,61 @@ func decodeBody(resp *http.Response, dest interface{}) error {

func httpRequest(apiKey string, req *http.Request) (*http.Response, error) {
req.Header.Add("X-DUNE-API-KEY", apiKey)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to send request: %w", err)
}
p := defaultRetryPolicy
attempt := 1
for {
resp, err := http.DefaultClient.Do(req)
if err != nil {
if attempt >= p.MaxAttempts {
return nil, fmt.Errorf("failed to send request: %w", err)
}
time.Sleep(nextBackoff(attempt, p))
attempt++
continue
}

if resp.StatusCode == 200 {
return resp, nil
}

if resp.StatusCode != 200 {
defer resp.Body.Close()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Defer in loop accumulates unclosed response bodies

The defer resp.Body.Close() statement is inside the retry loop. Each time the loop iterates with a non-200 status code, a new defer is added without the previous response body being closed immediately. This keeps multiple response bodies and their associated resources (TCP connections, file descriptors) open until the function returns, which could exhaust connection pool slots or cause "too many open files" errors during extended retry sequences.

Fix in Cursor Fix in Web

var errorResponse ErrorResponse
err := json.NewDecoder(resp.Body).Decode(&errorResponse)
if err != nil {
return nil, fmt.Errorf("failed to read error response body: %w", err)
snippetBytes, _ := io.ReadAll(io.LimitReader(resp.Body, 1024))
var er ErrorResponse
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var er ErrorResponse
var errorResp ErrorResponse

_ = json.Unmarshal(snippetBytes, &er)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unmarshal could fail, since the body is capped at 1024 characters, so you'll need to handle the error here in that case

msg := string(snippetBytes)
if er.Error != "" {
msg = er.Error
}
return resp, fmt.Errorf("%w [%d]: %s", ErrorReqUnsuccessful, resp.StatusCode, errorResponse.Error)
rl := parseRateLimitHeaders(resp.Header)
retryAfter := time.Duration(0)
if ra := resp.Header.Get("Retry-After"); ra != "" {
if secs, err := strconv.Atoi(ra); err == nil {
retryAfter = time.Duration(secs) * time.Second
}
}
apiErr := &APIError{
StatusCode: resp.StatusCode,
StatusText: resp.Status,
BodySnippet: msg,
RateLimit: rl,
RetryAfter: retryAfter,
}
retryable := false
for _, code := range p.RetryableStatusCodes {
if resp.StatusCode == code {
retryable = true
break
}
}
if retryable && attempt < p.MaxAttempts {
sleep := nextBackoff(attempt, p)
if apiErr.RetryAfter > 0 && apiErr.RetryAfter > sleep {
sleep = apiErr.RetryAfter
}
time.Sleep(sleep)
attempt++
continue
}
return nil, fmt.Errorf("%w: %v", ErrorReqUnsuccessful, apiErr)
}

return resp, nil
}