-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat: retry on 429 #2244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: retry on 429 #2244
Changes from 5 commits
b8e75fa
8c2f4ff
6147653
ae4b2e9
7fe6a48
b75d4ee
7b11825
8d424be
c4d5a29
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,7 @@ import ( | |
| "strconv" | ||
| "strings" | ||
| "sync" | ||
| "sync/atomic" | ||
| "time" | ||
|
|
||
| "golang.org/x/exp/maps" | ||
|
|
@@ -1257,6 +1258,9 @@ func (r *Runner) RunEnumeration() { | |
| }(nextStep) | ||
|
|
||
| wg, _ := syncutil.New(syncutil.WithSize(r.options.Threads)) | ||
| retryCh := make(chan retryJob) | ||
|
|
||
| _, drainedCh := r.retryLoop(context.Background(), retryCh, output, r.analyze) | ||
|
|
||
| processItem := func(k string) error { | ||
| if r.options.resumeCfg != nil { | ||
|
|
@@ -1279,10 +1283,10 @@ func (r *Runner) RunEnumeration() { | |
| for _, p := range r.options.requestURIs { | ||
| scanopts := r.scanopts.Clone() | ||
| scanopts.RequestURI = p | ||
| r.process(k, wg, r.hp, protocol, scanopts, output) | ||
| r.process(k, wg, r.hp, protocol, scanopts, output, retryCh) | ||
| } | ||
| } else { | ||
| r.process(k, wg, r.hp, protocol, &r.scanopts, output) | ||
| r.process(k, wg, r.hp, protocol, &r.scanopts, output, retryCh) | ||
| } | ||
|
|
||
| return nil | ||
|
|
@@ -1299,9 +1303,10 @@ func (r *Runner) RunEnumeration() { | |
| } | ||
|
|
||
| wg.Wait() | ||
|
|
||
| if r.options.RetryRounds > 0 { | ||
| <-drainedCh | ||
| } | ||
| close(output) | ||
|
|
||
| wgoutput.Wait() | ||
|
|
||
| if r.scanopts.StoreVisionReconClusters { | ||
|
|
@@ -1323,6 +1328,70 @@ func (r *Runner) RunEnumeration() { | |
| } | ||
| } | ||
|
|
||
| type analyzeFunc func(*httpx.HTTPX, string, httpx.Target, string, string, *ScanOptions) Result | ||
|
|
||
| func (r *Runner) retryLoop( | ||
| parent context.Context, | ||
| retryCh chan retryJob, | ||
| output chan<- Result, | ||
| analyze analyzeFunc, | ||
| ) (stop func(), drained <-chan struct{}) { | ||
| var remaining atomic.Int64 | ||
| ctx, cancel := context.WithCancel(parent) | ||
| drainedCh := make(chan struct{}) | ||
|
|
||
| go func() { | ||
| defer close(retryCh) | ||
|
|
||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| case job, ok := <-retryCh: | ||
| if !ok { | ||
| return | ||
| } | ||
| if job.attempt == 1 { | ||
| remaining.Add(1) | ||
| } | ||
|
|
||
| go func(j retryJob) { | ||
| if wait := time.Until(j.when); wait > 0 { | ||
| timer := time.NewTimer(wait) | ||
| select { | ||
| case <-ctx.Done(): | ||
| timer.Stop() | ||
| return | ||
| case <-timer.C: | ||
| } | ||
| } | ||
|
|
||
| res := analyze(j.hp, j.protocol, j.target, j.method, j.origInput, j.scanopts) | ||
| output <- res | ||
|
|
||
| if res.StatusCode == http.StatusTooManyRequests && j.attempt < r.options.RetryRounds { | ||
| j.attempt++ | ||
| j.when = time.Now().Add(time.Duration(r.options.RetryDelay) * time.Millisecond) | ||
|
|
||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| case retryCh <- j: | ||
| return | ||
| } | ||
| } | ||
|
|
||
| if remaining.Add(-1) == 0 { | ||
| close(drainedCh) | ||
| } | ||
| }(job) | ||
| } | ||
| } | ||
|
Comment on lines
+1650
to
+1673
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Retried successes bypass TLS/CSP expansion.
Also applies to: 1779-1800, 1840-1849 🤖 Prompt for AI Agents |
||
| }() | ||
|
|
||
| return func() { cancel() }, drainedCh | ||
| } | ||
|
coderabbitai[bot] marked this conversation as resolved.
Outdated
|
||
|
|
||
| func logFilteredErrorPage(fileName, url string) { | ||
| dir := filepath.Dir(fileName) | ||
| if !fileutil.FolderExists(dir) { | ||
|
|
@@ -1380,11 +1449,11 @@ func (r *Runner) GetScanOpts() ScanOptions { | |
| return r.scanopts | ||
| } | ||
|
|
||
| func (r *Runner) Process(t string, wg *syncutil.AdaptiveWaitGroup, protocol string, scanopts *ScanOptions, output chan Result) { | ||
| r.process(t, wg, r.hp, protocol, scanopts, output) | ||
| func (r *Runner) Process(t string, wg *syncutil.AdaptiveWaitGroup, protocol string, scanopts *ScanOptions, output chan Result, retryCh chan retryJob) { | ||
| r.process(t, wg, r.hp, protocol, scanopts, output, retryCh) | ||
| } | ||
|
coderabbitai[bot] marked this conversation as resolved.
Outdated
|
||
|
|
||
| func (r *Runner) process(t string, wg *syncutil.AdaptiveWaitGroup, hp *httpx.HTTPX, protocol string, scanopts *ScanOptions, output chan Result) { | ||
| func (r *Runner) process(t string, wg *syncutil.AdaptiveWaitGroup, hp *httpx.HTTPX, protocol string, scanopts *ScanOptions, output chan Result, retryCh chan retryJob) { | ||
| // attempts to set the workpool size to the number of threads | ||
| if r.options.Threads > 0 && wg.Size != r.options.Threads { | ||
| if err := wg.Resize(context.Background(), r.options.Threads); err != nil { | ||
|
|
@@ -1409,15 +1478,28 @@ func (r *Runner) process(t string, wg *syncutil.AdaptiveWaitGroup, hp *httpx.HTT | |
| defer wg.Done() | ||
| result := r.analyze(hp, protocol, target, method, t, scanopts) | ||
| output <- result | ||
| if result.StatusCode == http.StatusTooManyRequests && | ||
| r.options.RetryRounds > 0 { | ||
| retryCh <- retryJob{ | ||
| hp: hp, | ||
| protocol: protocol, | ||
| target: target, | ||
| method: method, | ||
| origInput: t, | ||
| scanopts: scanopts.Clone(), | ||
| attempt: 1, | ||
| when: time.Now().Add(time.Duration(r.options.RetryDelay) * time.Millisecond), | ||
| } | ||
| } | ||
| if scanopts.TLSProbe && result.TLSData != nil { | ||
| for _, tt := range result.TLSData.SubjectAN { | ||
| if !r.testAndSet(tt) { | ||
| continue | ||
| } | ||
| r.process(tt, wg, hp, protocol, scanopts, output) | ||
| r.process(tt, wg, hp, protocol, scanopts, output, retryCh) | ||
| } | ||
| if r.testAndSet(result.TLSData.SubjectCN) { | ||
| r.process(result.TLSData.SubjectCN, wg, hp, protocol, scanopts, output) | ||
| r.process(result.TLSData.SubjectCN, wg, hp, protocol, scanopts, output, retryCh) | ||
| } | ||
| } | ||
| if scanopts.CSPProbe && result.CSPData != nil { | ||
|
|
@@ -1428,7 +1510,7 @@ func (r *Runner) process(t string, wg *syncutil.AdaptiveWaitGroup, hp *httpx.HTT | |
| if !r.testAndSet(tt) { | ||
| continue | ||
| } | ||
| r.process(tt, wg, hp, protocol, scanopts, output) | ||
| r.process(tt, wg, hp, protocol, scanopts, output, retryCh) | ||
| } | ||
| } | ||
| }(target, method, prot) | ||
|
|
@@ -1463,15 +1545,28 @@ func (r *Runner) process(t string, wg *syncutil.AdaptiveWaitGroup, hp *httpx.HTT | |
| } | ||
| result := r.analyze(hp, protocol, target, method, t, scanopts) | ||
| output <- result | ||
| if result.StatusCode == http.StatusTooManyRequests && | ||
| r.options.RetryRounds > 0 { | ||
| retryCh <- retryJob{ | ||
| hp: hp, | ||
| protocol: protocol, | ||
| target: target, | ||
| method: method, | ||
| origInput: t, | ||
| scanopts: scanopts.Clone(), | ||
| attempt: 1, | ||
| when: time.Now().Add(time.Duration(r.options.RetryDelay) * time.Millisecond), | ||
| } | ||
| } | ||
| if scanopts.TLSProbe && result.TLSData != nil { | ||
| for _, tt := range result.TLSData.SubjectAN { | ||
| if !r.testAndSet(tt) { | ||
| continue | ||
| } | ||
| r.process(tt, wg, hp, protocol, scanopts, output) | ||
| r.process(tt, wg, hp, protocol, scanopts, output, retryCh) | ||
| } | ||
| if r.testAndSet(result.TLSData.SubjectCN) { | ||
| r.process(result.TLSData.SubjectCN, wg, hp, protocol, scanopts, output) | ||
| r.process(result.TLSData.SubjectCN, wg, hp, protocol, scanopts, output, retryCh) | ||
| } | ||
| } | ||
| }(port, target, method, wantedProtocol) | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,16 +1,22 @@ | ||||||||||||||||||||||||||||
| package runner | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| import ( | ||||||||||||||||||||||||||||
| "context" | ||||||||||||||||||||||||||||
| "fmt" | ||||||||||||||||||||||||||||
| "net/http" | ||||||||||||||||||||||||||||
| "net/http/httptest" | ||||||||||||||||||||||||||||
| "os" | ||||||||||||||||||||||||||||
| "strings" | ||||||||||||||||||||||||||||
| "sync" | ||||||||||||||||||||||||||||
| "sync/atomic" | ||||||||||||||||||||||||||||
| "testing" | ||||||||||||||||||||||||||||
| "time" | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| _ "github.com/projectdiscovery/fdmax/autofdmax" | ||||||||||||||||||||||||||||
| "github.com/projectdiscovery/httpx/common/httpx" | ||||||||||||||||||||||||||||
| "github.com/projectdiscovery/mapcidr/asn" | ||||||||||||||||||||||||||||
| stringsutil "github.com/projectdiscovery/utils/strings" | ||||||||||||||||||||||||||||
| syncutil "github.com/projectdiscovery/utils/sync" | ||||||||||||||||||||||||||||
| "github.com/stretchr/testify/require" | ||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
|
|
@@ -227,10 +233,10 @@ func TestCreateNetworkpolicyInstance_AllowDenyFlags(t *testing.T) { | |||||||||||||||||||||||||||
| runner := &Runner{} | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| tests := []struct { | ||||||||||||||||||||||||||||
| name string | ||||||||||||||||||||||||||||
| allow []string | ||||||||||||||||||||||||||||
| deny []string | ||||||||||||||||||||||||||||
| testCases []struct { | ||||||||||||||||||||||||||||
| name string | ||||||||||||||||||||||||||||
| allow []string | ||||||||||||||||||||||||||||
| deny []string | ||||||||||||||||||||||||||||
| testCases []struct { | ||||||||||||||||||||||||||||
| ip string | ||||||||||||||||||||||||||||
| expected bool | ||||||||||||||||||||||||||||
| reason string | ||||||||||||||||||||||||||||
|
|
@@ -312,3 +318,92 @@ func TestCreateNetworkpolicyInstance_AllowDenyFlags(t *testing.T) { | |||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| func TestRunner_Process_And_RetryLoop(t *testing.T) { | ||||||||||||||||||||||||||||
| var hits1, hits2 int32 | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // srv1: returns 429 for the first 3 requests, and 200 on the 4th request | ||||||||||||||||||||||||||||
| srv1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||||||||||||||||||||||||||||
| if atomic.AddInt32(&hits1, 1) != 4 { | ||||||||||||||||||||||||||||
| w.WriteHeader(http.StatusTooManyRequests) | ||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| w.WriteHeader(http.StatusOK) | ||||||||||||||||||||||||||||
| })) | ||||||||||||||||||||||||||||
| defer srv1.Close() | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // srv2: returns 429 for the first 2 requests, and 200 on the 3rd request | ||||||||||||||||||||||||||||
| srv2 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { | ||||||||||||||||||||||||||||
| if atomic.AddInt32(&hits2, 1) != 3 { | ||||||||||||||||||||||||||||
| w.WriteHeader(http.StatusTooManyRequests) | ||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| w.WriteHeader(http.StatusOK) | ||||||||||||||||||||||||||||
| })) | ||||||||||||||||||||||||||||
| defer srv2.Close() | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| r, err := New(&Options{ | ||||||||||||||||||||||||||||
| Threads: 1, | ||||||||||||||||||||||||||||
| RetryRounds: 2, | ||||||||||||||||||||||||||||
| RetryDelay: 5, | ||||||||||||||||||||||||||||
| Timeout: 3, | ||||||||||||||||||||||||||||
| }) | ||||||||||||||||||||||||||||
| require.NoError(t, err) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| output := make(chan Result) | ||||||||||||||||||||||||||||
| retryCh := make(chan retryJob) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| _, drainedCh := r.retryLoop(context.Background(), retryCh, output, r.analyze) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 💡 Verification agent 🧩 Analysis chainDon’t ignore the retry loop’s stop handle; avoid a goroutine leak
Apply this diff to wire cancellation correctly and ensure cleanup even on early test failures: - _, drainedCh := r.retryLoop(context.Background(), retryCh, output, r.analyze)
+ ctx, cancel := context.WithCancel(context.Background())
+ stopRetry, drainedCh := r.retryLoop(ctx, retryCh, output, r.analyze)
+ t.Cleanup(func() {
+ // Ensure the loop exits and resources are released
+ stopRetry()
+ cancel()
+ })If you want to double-check the signature and the semantics of the first return value: 🏁 Script executed: #!/bin/bash
# Inspect retryLoop signature/returns to confirm cleanup expectations
rg -nP -C2 'func\s+\(r\s+\*Runner\)\s+retryLoop\(' runnerLength of output: 342 🏁 Script executed: #!/bin/bash
# Retrieve the full signature (including return values) of retryLoop to confirm the first return value’s type and semantics
rg -nP -C10 'func\s+\(r\s+\*Runner\)\s+retryLoop\(' runner/runner.goLength of output: 691 🏁 Script executed: #!/bin/bash
# Extract the full body of retryLoop to confirm how stop() and drainedCh operate
sed -n '1333,1400p' runner/runner.goLength of output: 1518 🏁 Script executed: #!/bin/bash
# Locate usages of retryLoop in the test file
rg -n -C5 'retryLoop' runner/runner_test.go
# Show full context around lines 350–380 to inspect how stop and drainedCh are handled
sed -n '340,380p' runner/runner_test.goLength of output: 1276 Wire up the retryLoop stop handle to prevent goroutine leaks The test currently invokes retryLoop with a background context and discards its first return value ( • File: runner/runner_test.go Suggested change: - _, drainedCh := r.retryLoop(context.Background(), retryCh, output, r.analyze)
+ // Create a cancellable context and capture the stop function
+ ctx, cancel := context.WithCancel(context.Background())
+ stopRetry, drainedCh := r.retryLoop(ctx, retryCh, output, r.analyze)
+ // Ensure the loop exits and the context is cleaned up after the test
+ t.Cleanup(func() {
+ stopRetry()
+ cancel()
+ })📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||
| wg, _ := syncutil.New(syncutil.WithSize(r.options.Threads)) | ||||||||||||||||||||||||||||
| so := r.scanopts.Clone() | ||||||||||||||||||||||||||||
| so.Methods = []string{"GET"} | ||||||||||||||||||||||||||||
| so.TLSProbe = false | ||||||||||||||||||||||||||||
| so.CSPProbe = false | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| seed := map[string]string{ | ||||||||||||||||||||||||||||
| "srv1": srv1.URL, | ||||||||||||||||||||||||||||
| "srv2": srv2.URL, | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| var drainWG sync.WaitGroup | ||||||||||||||||||||||||||||
| drainWG.Add(1) | ||||||||||||||||||||||||||||
| var s1n429, s1n200, s2n429, s2n200 int | ||||||||||||||||||||||||||||
| go func() { | ||||||||||||||||||||||||||||
| defer drainWG.Done() | ||||||||||||||||||||||||||||
| for res := range output { | ||||||||||||||||||||||||||||
| switch res.StatusCode { | ||||||||||||||||||||||||||||
| case http.StatusTooManyRequests: | ||||||||||||||||||||||||||||
| if res.URL == srv1.URL { | ||||||||||||||||||||||||||||
| s1n429++ | ||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||
| s2n429++ | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| case http.StatusOK: | ||||||||||||||||||||||||||||
| if res.URL == srv1.URL { | ||||||||||||||||||||||||||||
| s1n200++ | ||||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||||
| s2n200++ | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
| }() | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| for _, url := range seed { | ||||||||||||||||||||||||||||
| r.process(url, wg, r.hp, httpx.HTTP, so, output, retryCh) | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| wg.Wait() | ||||||||||||||||||||||||||||
| <-drainedCh | ||||||||||||||||||||||||||||
| close(output) | ||||||||||||||||||||||||||||
| drainWG.Wait() | ||||||||||||||||||||||||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🛠️ Refactor suggestion Guard against hangs: time-bound the wait for drainedCh If the retry loop logic regresses, Apply this diff: - wg.Wait()
- <-drainedCh
- close(output)
+ wg.Wait()
+ select {
+ case <-drainedCh:
+ // drained successfully
+ case <-time.After(5 * time.Second):
+ t.Fatalf("retry loop did not drain within timeout")
+ }
+ close(output)📝 Committable suggestion
Suggested change
|
||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // Verify expected results | ||||||||||||||||||||||||||||
| // srv1: should have 3x 429 responses and no 200 (never succeeds within retries) | ||||||||||||||||||||||||||||
| require.Equal(t, 3, s1n429) | ||||||||||||||||||||||||||||
| require.Equal(t, 0, s1n200) | ||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||
| // srv2: should have 2x 429 responses and 1x 200 (succeeds on 3rd attempt) | ||||||||||||||||||||||||||||
| require.Equal(t, 2, s2n429) | ||||||||||||||||||||||||||||
| require.Equal(t, 1, s2n200) | ||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tie the retry context to enumeration cancellation.
retryCtxis created fromcontext.Background()only afterwg.Wait(), so queued 429s still run afterInterrupt(), and theRetryTimeoutwindow does not even start until the initial scan has fully drained. Please derive this context from the runner shutdown path instead of starting a fresh background context here.🤖 Prompt for AI Agents