From 35cb55d02d09c8a6cadcd043cd3b3ef6b8e4c1bc Mon Sep 17 00:00:00 2001 From: Bill Guowei Yang Date: Wed, 17 Jun 2026 15:56:20 -0400 Subject: [PATCH] Add cache proxy origin backpressure --- .github/workflows/ci.yml | 16 ++ cmd/cache-proxy/README.md | 21 +++ cmd/cache-proxy/main.go | 22 +++ cmd/cache-proxy/proxy.go | 74 +++++++- cmd/cache-proxy/proxy_metrics.go | 9 + cmd/cache-proxy/proxy_test.go | 286 ++++++++++++++++++++++++++++++- justfile | 7 +- 7 files changed, 428 insertions(+), 7 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9f894817..608d1fa6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -90,6 +90,22 @@ jobs: - name: Run unit tests run: just test-unit + cache-proxy-tests: + runs-on: ubuntu-24.04-arm + steps: + - name: Checkout code + uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + - name: Set up just + uses: extractions/setup-just@dd310ad5a97d8e7b41793f8ef055398d51ad4de6 # v2.0.0 + + - name: Set up Go + uses: actions/setup-go@f111f3307d8850f501ac008e886eec1fd1932a34 # v5.3.0 + with: + go-version-file: go.mod + + - name: Run cache-proxy tests + run: just test-cache-proxy + integration-tests: needs: unit-tests runs-on: ubuntu-24.04-arm diff --git a/cmd/cache-proxy/README.md b/cmd/cache-proxy/README.md index f36b06b2..d0d0660b 100644 --- a/cmd/cache-proxy/README.md +++ b/cmd/cache-proxy/README.md @@ -14,9 +14,30 @@ available, and forwards cache misses to origin object storage. | `PEER_ADDR` | `:8081` | Peer cache API listener. | | `HEALTH_ADDR` | `:8082` | Health and Prometheus metrics listener. | | `CACHE_HOST_SUFFIXES` | empty | Empty means all `GET` hosts are cacheable. Otherwise, cache only hosts containing one of the comma-separated suffixes. | +| `ORIGIN_MAX_IN_FLIGHT` | `0` | Maximum concurrent cacheable origin fills per proxy pod. `0` disables local origin backpressure. Configure a positive value after observing production origin-fill and retry metrics. Distinct cache misses queue when the limit is saturated. Same-key misses are singleflight-deduplicated and share one slot. | | `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` / `DUCKGRES_TRACE_ENDPOINT` | empty | OTLP/HTTP trace endpoint. Unset → tracing is a no-op. | | `OTEL_EXPORTER_OTLP_TRACES_PATH` | empty | Overrides the OTLP path (e.g. VictoriaTraces' `/insert/opentelemetry/v1/traces`). Mirrors the main duckgres binary. | +## Origin Backpressure + +Cacheable origin fills can be locally bounded by `ORIGIN_MAX_IN_FLIGHT` per +cache-proxy pod. The default is `0`, which disables local origin backpressure so +the limiter can be enabled after observing production metrics from the origin +fetch and retry dashboards. When enabled, the limiter is applied after +local-cache lookup, peer-cache lookup, and singleflight deduplication, so hits, +peer hits, non-GET requests, `CONNECT` tunnels, and duplicate same-key waiters +do not consume separate origin slots. + +When all origin slots are in use, a distinct cache miss waits until a slot is +available or the request context is canceled. If the wait is canceled before an +origin request starts, the proxy returns `503 Service Unavailable` with +`Retry-After: 1`; nothing is written to the cache. + +Prometheus exposes `cache_proxy_origin_fetches_queued` for current waiters and +`cache_proxy_origin_fetch_queue_wait_seconds{outcome}` for saturated-limit wait +duration. `cache_proxy_origin_fetches_in_flight` continues to track only active +origin fills. + ## Tracing When a trace endpoint is set the proxy exports OpenTelemetry spans under diff --git a/cmd/cache-proxy/main.go b/cmd/cache-proxy/main.go index 30f2d0d8..6aafad98 100644 --- a/cmd/cache-proxy/main.go +++ b/cmd/cache-proxy/main.go @@ -97,6 +97,11 @@ func main() { peerAddr := envOrDefault("PEER_ADDR", ":8081") healthAddr := envOrDefault("HEALTH_ADDR", ":8082") peerService := os.Getenv("PEER_SERVICE") // headless K8s service for peer discovery + originMaxInFlight, err := nonNegativeIntEnvOrDefault("ORIGIN_MAX_IN_FLIGHT", defaultOriginMaxInFlight) + if err != nil { + slog.Error("Invalid ORIGIN_MAX_IN_FLIGHT.", "error", err) + os.Exit(1) + } // Comma-separated Host substrings we should cache. Anything else is tunneled // or forwarded without caching. Empty means "cache everything" (legacy). @@ -116,6 +121,7 @@ func main() { "peer_listen", peerAddr, "health", healthAddr, "peer_service", peerService, + "origin_max_in_flight", originMaxInFlight, "cache_host_suffixes", cacheHostSuffixes, ) @@ -134,6 +140,7 @@ func main() { } proxy := NewCacheProxy(store, peers, cacheHostSuffixes) + proxy.setOriginMaxInFlight(originMaxInFlight) // Forward HTTP proxy (DuckDB httpfs traffic). ServeMux can't match absolute // URLs in forward-proxy requests, so use the handler directly. @@ -193,3 +200,18 @@ func envOrDefault(key, def string) string { } return def } + +func nonNegativeIntEnvOrDefault(key string, def int) (int, error) { + raw := os.Getenv(key) + if raw == "" { + return def, nil + } + value, err := strconv.Atoi(raw) + if err != nil { + return 0, fmt.Errorf("%s=%q is not an integer", key, raw) + } + if value < 0 { + return 0, fmt.Errorf("%s=%d must be non-negative", key, value) + } + return value, nil +} diff --git a/cmd/cache-proxy/proxy.go b/cmd/cache-proxy/proxy.go index 3c229963..fd26fb51 100644 --- a/cmd/cache-proxy/proxy.go +++ b/cmd/cache-proxy/proxy.go @@ -49,6 +49,8 @@ type CacheProxy struct { originRetryMaxAttempts int originRetryInitialBackoff time.Duration originRetryMaxBackoff time.Duration + originMaxInFlight int + originSlots chan struct{} // cacheHostSuffixes are the Host substrings that identify DuckLake bucket // traffic worth caching. Requests whose Host doesn't contain any of these @@ -71,6 +73,7 @@ const ( defaultOriginRetryMaxAttempts = 4 defaultOriginRetryInitialBackoff = 100 * time.Millisecond defaultOriginRetryMaxBackoff = 1 * time.Second + defaultOriginMaxInFlight = 0 ) type singleFlight struct { @@ -84,6 +87,18 @@ type call struct { err error } +type originBackpressureWaitError struct { + err error +} + +func (e *originBackpressureWaitError) Error() string { + return fmt.Sprintf("origin fetch backpressure wait canceled: %v", e.err) +} + +func (e *originBackpressureWaitError) Unwrap() error { + return e.err +} + func (sf *singleFlight) Do(key string, fn func() (fetchResult, error)) (fetchResult, error) { sf.mu.Lock() if sf.m == nil { @@ -110,7 +125,7 @@ func (sf *singleFlight) Do(key string, fn func() (fetchResult, error)) (fetchRes } func NewCacheProxy(store *DiskCache, peers *PeerManager, cacheHostSuffixes []string) *CacheProxy { - return &CacheProxy{ + proxy := &CacheProxy{ store: store, peers: peers, client: &http.Client{Timeout: defaultOriginTimeout}, @@ -120,6 +135,50 @@ func NewCacheProxy(store *DiskCache, peers *PeerManager, cacheHostSuffixes []str originRetryMaxBackoff: defaultOriginRetryMaxBackoff, cacheHostSuffixes: cacheHostSuffixes, } + proxy.setOriginMaxInFlight(defaultOriginMaxInFlight) + return proxy +} + +func (p *CacheProxy) setOriginMaxInFlight(max int) { + p.originMaxInFlight = max + if max <= 0 { + p.originSlots = nil + return + } + p.originSlots = make(chan struct{}, max) +} + +func (p *CacheProxy) acquireOriginSlot(ctx context.Context) (func(), error) { + if p.originSlots == nil { + return func() {}, nil + } + select { + case p.originSlots <- struct{}{}: + return func() { <-p.originSlots }, nil + default: + } + + start := time.Now() + originFetchQueued.Inc() + defer originFetchQueued.Dec() + select { + case p.originSlots <- struct{}{}: + originFetchQueueWaitSeconds.WithLabelValues("admitted").Observe(time.Since(start).Seconds()) + return func() { <-p.originSlots }, nil + case <-ctx.Done(): + originFetchQueueWaitSeconds.WithLabelValues(originQueueWaitOutcome(ctx.Err())).Observe(time.Since(start).Seconds()) + return nil, &originBackpressureWaitError{err: ctx.Err()} + } +} + +func originQueueWaitOutcome(err error) string { + if errors.Is(err, context.DeadlineExceeded) { + return "timeout" + } + if errors.Is(err, context.Canceled) { + return "canceled" + } + return "error" } // shouldCache returns true if the request targets a host we want to cache. @@ -310,6 +369,14 @@ func (p *CacheProxy) HandleProxy(w http.ResponseWriter, r *http.Request) { res, err := p.fetchDedup(cacheKey, r, rangeHeader) if err != nil { span.SetStatus(codes.Error, err.Error()) + var backpressureErr *originBackpressureWaitError + if errors.As(err, &backpressureErr) { + slog.Warn("Origin fetch backpressure wait canceled.", + "url", r.URL.String(), "range", rangeHeader, "error", err) + w.Header().Set("Retry-After", "1") + http.Error(w, err.Error(), http.StatusServiceUnavailable) + return + } // An origin that responded with a non-2xx (e.g. S3 returning a 400 with // ExpiredToken in an XML envelope) is forwarded back to // DuckDB verbatim — same status code, same body, same headers minus @@ -372,6 +439,11 @@ func (p *CacheProxy) fetchDedup(cacheKey string, r *http.Request, rangeHeader st return fetchResult{size: n, source: "peer"}, nil } } + releaseOriginSlot, err := p.acquireOriginSlot(r.Context()) + if err != nil { + return fetchResult{}, err + } + defer releaseOriginSlot() originFetchInFlight.Inc() defer originFetchInFlight.Dec() size, ct, err := p.fetchOrigin(cacheKey, r) diff --git a/cmd/cache-proxy/proxy_metrics.go b/cmd/cache-proxy/proxy_metrics.go index d520751b..d15852da 100644 --- a/cmd/cache-proxy/proxy_metrics.go +++ b/cmd/cache-proxy/proxy_metrics.go @@ -18,4 +18,13 @@ var ( Name: "cache_proxy_origin_fetches_in_flight", Help: "Current number of origin fetches filling the local cache", }) + originFetchQueued = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "cache_proxy_origin_fetches_queued", + Help: "Current number of cacheable origin fetch leaders waiting for an origin concurrency slot", + }) + originFetchQueueWaitSeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Name: "cache_proxy_origin_fetch_queue_wait_seconds", + Help: "Seconds cacheable origin fetch leaders spend waiting for an origin concurrency slot after the limit is saturated", + Buckets: prometheus.DefBuckets, + }, []string{"outcome"}) ) diff --git a/cmd/cache-proxy/proxy_test.go b/cmd/cache-proxy/proxy_test.go index 4d2790d4..f354e522 100644 --- a/cmd/cache-proxy/proxy_test.go +++ b/cmd/cache-proxy/proxy_test.go @@ -15,6 +15,8 @@ import ( "sync/atomic" "testing" "time" + + "github.com/prometheus/client_golang/prometheus" ) type timeoutNetError struct{} @@ -73,6 +75,18 @@ func waitForRecorder(t *testing.T, ch <-chan *httptest.ResponseRecorder, msg str } } +func waitForGaugeValue(t *testing.T, g prometheus.Gauge, want float64, msg string) { + t.Helper() + deadline := time.Now().Add(2 * time.Second) + for time.Now().Before(deadline) { + if got := gaugeValue(t, g); got == want { + return + } + time.Sleep(10 * time.Millisecond) + } + t.Fatalf("%s: got %v, want %v", msg, gaugeValue(t, g), want) +} + // captureSlog redirects slog.Default to a buffer for the duration of a test // and returns the buffer + a restore function. Used by the forward-uncached // logging tests to assert presence of the request/response log lines that @@ -94,6 +108,16 @@ func newTestProxy(t *testing.T) *CacheProxy { return proxy } +func TestNewCacheProxyOriginBackpressureDefaultsDisabled(t *testing.T) { + proxy := newTestProxy(t) + if proxy.originMaxInFlight != 0 { + t.Fatalf("originMaxInFlight = %d, want 0 so production rollout is metrics-first", proxy.originMaxInFlight) + } + if proxy.originSlots != nil { + t.Fatal("originSlots is non-nil, want nil when default backpressure is disabled") + } +} + // newTestServer returns an httptest origin plus a proxy that rewrites inbound // forward-proxy-style requests to target that origin. DuckDB sends absolute-form // URIs (scheme + host + path); we give the test the origin's URL so the proxy's @@ -155,6 +179,52 @@ func TestHandleProxyGETMissThenHit(t *testing.T) { } } +func TestNonNegativeIntEnvOrDefault(t *testing.T) { + const key = "DUCKGRES_TEST_NON_NEGATIVE_INT" + t.Run("default", func(t *testing.T) { + t.Setenv(key, "") + got, err := nonNegativeIntEnvOrDefault(key, 64) + if err != nil { + t.Fatalf("nonNegativeIntEnvOrDefault returned error: %v", err) + } + if got != 64 { + t.Fatalf("value = %d, want 64", got) + } + }) + t.Run("custom", func(t *testing.T) { + t.Setenv(key, "17") + got, err := nonNegativeIntEnvOrDefault(key, 64) + if err != nil { + t.Fatalf("nonNegativeIntEnvOrDefault returned error: %v", err) + } + if got != 17 { + t.Fatalf("value = %d, want 17", got) + } + }) + t.Run("zero disables", func(t *testing.T) { + t.Setenv(key, "0") + got, err := nonNegativeIntEnvOrDefault(key, 64) + if err != nil { + t.Fatalf("nonNegativeIntEnvOrDefault returned error: %v", err) + } + if got != 0 { + t.Fatalf("value = %d, want 0", got) + } + }) + t.Run("invalid", func(t *testing.T) { + t.Setenv(key, "nope") + if _, err := nonNegativeIntEnvOrDefault(key, 64); err == nil { + t.Fatal("nonNegativeIntEnvOrDefault returned nil error, want parse error") + } + }) + t.Run("negative", func(t *testing.T) { + t.Setenv(key, "-1") + if _, err := nonNegativeIntEnvOrDefault(key, 64); err == nil { + t.Fatal("nonNegativeIntEnvOrDefault returned nil error, want validation error") + } + }) +} + func TestHandleProxyHEADForwardedUncached(t *testing.T) { proxy := newTestProxy(t) @@ -541,8 +611,214 @@ func TestOriginFetchMetricLabels(t *testing.T) { } } -func TestHandleProxyMetricsOnlyDoesNotRejectConcurrentOriginMisses(t *testing.T) { +func TestHandleProxyOriginBackpressureQueuesDistinctMisses(t *testing.T) { + proxy := newTestProxy(t) + proxy.setOriginMaxInFlight(1) + + queuedBefore := gaugeValue(t, originFetchQueued) + firstStarted := make(chan struct{}) + secondStarted := make(chan struct{}) + releaseFirst := make(chan struct{}) + var releaseOnce sync.Once + release := func() { + releaseOnce.Do(func() { close(releaseFirst) }) + } + defer release() + var originCalls int32 + var activeOriginCalls int32 + var maxActiveOriginCalls int32 + _, originURL := newTestServer(t, func(w http.ResponseWriter, r *http.Request) { + current := atomic.AddInt32(&activeOriginCalls, 1) + defer atomic.AddInt32(&activeOriginCalls, -1) + for { + maxActive := atomic.LoadInt32(&maxActiveOriginCalls) + if current <= maxActive || atomic.CompareAndSwapInt32(&maxActiveOriginCalls, maxActive, current) { + break + } + } + switch atomic.AddInt32(&originCalls, 1) { + case 1: + close(firstStarted) + <-releaseFirst + case 2: + close(secondStarted) + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ok")) + }) + + results := make(chan *httptest.ResponseRecorder, 2) + go func() { + results <- doForwardProxyRequest(proxy, "GET", originURL+"/bucket/queued-first.parquet", http.Header{"Range": []string{"bytes=0-1"}}) + }() + waitForSignal(t, firstStarted, "timed out waiting for first origin fetch") + cachedURL := originURL + "/bucket/already-cached.parquet" + cachedRange := "bytes=0-5" + if err := proxy.store.Put(CacheKey(cachedURL, cachedRange), []byte("cached")); err != nil { + t.Fatalf("seed cached object: %v", err) + } + cachedRec := doForwardProxyRequest(proxy, "GET", cachedURL, http.Header{"Range": []string{cachedRange}}) + if cachedRec.Code != http.StatusPartialContent { + t.Fatalf("cache hit while origin slot is full: status = %d, want 206", cachedRec.Code) + } + if body := cachedRec.Body.String(); body != "cached" { + t.Fatalf("cache hit while origin slot is full: body = %q, want cached", body) + } + go func() { + results <- doForwardProxyRequest(proxy, "GET", originURL+"/bucket/queued-second.parquet", http.Header{"Range": []string{"bytes=2-3"}}) + }() + waitForGaugeValue(t, originFetchQueued, queuedBefore+1, "queued origin fetches while first request holds the only slot") + select { + case <-secondStarted: + t.Fatal("second origin fetch started before the first fetch released its origin slot") + case <-time.After(100 * time.Millisecond): + } + + release() + waitForSignal(t, secondStarted, "timed out waiting for queued origin fetch to start") + for i := 0; i < 2; i++ { + rec := waitForRecorder(t, results, "timed out waiting for queued proxy response") + if rec.Code != http.StatusPartialContent { + t.Fatalf("response %d status = %d, want 206", i+1, rec.Code) + } + } + if got := atomic.LoadInt32(&originCalls); got != 2 { + t.Fatalf("origin calls = %d, want 2", got) + } + if got := atomic.LoadInt32(&maxActiveOriginCalls); got > 1 { + t.Fatalf("simultaneous origin calls = %d, want at most 1", got) + } + if got := gaugeValue(t, originFetchQueued); got != queuedBefore { + t.Fatalf("queued origin fetches after requests = %v, want %v", got, queuedBefore) + } +} + +func TestHandleProxyOriginBackpressureCanceledWhileQueued(t *testing.T) { + proxy := newTestProxy(t) + proxy.setOriginMaxInFlight(1) + + queuedBefore := gaugeValue(t, originFetchQueued) + canceledBefore := counterValue(t, originFetchesTotal.WithLabelValues("canceled")) + firstStarted := make(chan struct{}) + releaseFirst := make(chan struct{}) + var releaseOnce sync.Once + release := func() { + releaseOnce.Do(func() { close(releaseFirst) }) + } + defer release() + var originCalls int32 + _, originURL := newTestServer(t, func(w http.ResponseWriter, r *http.Request) { + if atomic.AddInt32(&originCalls, 1) == 1 { + close(firstStarted) + <-releaseFirst + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ok")) + }) + + firstDone := make(chan *httptest.ResponseRecorder, 1) + go func() { + firstDone <- doForwardProxyRequest(proxy, "GET", originURL+"/bucket/cancel-queued-first.parquet", http.Header{"Range": []string{"bytes=0-1"}}) + }() + waitForSignal(t, firstStarted, "timed out waiting for first origin fetch") + + ctx, cancel := context.WithCancel(context.Background()) + req := httptest.NewRequest("GET", originURL+"/bucket/cancel-queued-second.parquet", nil).WithContext(ctx) + req.Host = req.URL.Host + req.Header.Set("Range", "bytes=2-3") + secondRec := httptest.NewRecorder() + secondDone := make(chan struct{}) + go func() { + defer close(secondDone) + proxy.HandleProxy(secondRec, req) + }() + waitForGaugeValue(t, originFetchQueued, queuedBefore+1, "queued origin fetches before cancellation") + cancel() + waitForSignal(t, secondDone, "timed out waiting for queued request cancellation") + if secondRec.Code != http.StatusServiceUnavailable { + t.Fatalf("queued cancellation status = %d, want 503", secondRec.Code) + } + if got := secondRec.Header().Get("Retry-After"); got != "1" { + t.Fatalf("Retry-After = %q, want 1", got) + } + if got := counterValue(t, originFetchesTotal.WithLabelValues("canceled")); got != canceledBefore { + t.Fatalf("origin canceled metric = %v, want unchanged %v because no origin fetch started", got, canceledBefore) + } + if got := atomic.LoadInt32(&originCalls); got != 1 { + t.Fatalf("origin calls before releasing first request = %d, want 1", got) + } + if got := gaugeValue(t, originFetchQueued); got != queuedBefore { + t.Fatalf("queued origin fetches after cancellation = %v, want %v", got, queuedBefore) + } + + release() + rec := waitForRecorder(t, firstDone, "timed out waiting for first proxy response") + if rec.Code != http.StatusPartialContent { + t.Fatalf("first response status = %d, want 206", rec.Code) + } +} + +func TestHandleProxyOriginBackpressureSingleFlightWaitersDoNotConsumeSlots(t *testing.T) { + proxy := newTestProxy(t) + proxy.setOriginMaxInFlight(1) + + queuedBefore := gaugeValue(t, originFetchQueued) + firstStarted := make(chan struct{}) + releaseOrigin := make(chan struct{}) + var releaseOnce sync.Once + release := func() { + releaseOnce.Do(func() { close(releaseOrigin) }) + } + defer release() + var originCalls int32 + _, originURL := newTestServer(t, func(w http.ResponseWriter, r *http.Request) { + if atomic.AddInt32(&originCalls, 1) == 1 { + close(firstStarted) + <-releaseOrigin + } + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("same-key-body")) + }) + + const requests = 5 + results := make(chan *httptest.ResponseRecorder, requests) + url := originURL + "/bucket/same-key.parquet" + headers := http.Header{"Range": []string{"bytes=0-12"}} + go func() { + results <- doForwardProxyRequest(proxy, "GET", url, headers) + }() + waitForSignal(t, firstStarted, "timed out waiting for first same-key origin fetch") + for i := 1; i < requests; i++ { + go func() { + results <- doForwardProxyRequest(proxy, "GET", url, headers) + }() + } + time.Sleep(100 * time.Millisecond) + if got := atomic.LoadInt32(&originCalls); got != 1 { + t.Fatalf("origin calls while same-key waiters are blocked = %d, want 1", got) + } + if got := gaugeValue(t, originFetchQueued); got != queuedBefore { + t.Fatalf("queued origin fetches for same-key waiters = %v, want %v", got, queuedBefore) + } + + release() + for i := 0; i < requests; i++ { + rec := waitForRecorder(t, results, "timed out waiting for same-key proxy response") + if rec.Code != http.StatusPartialContent { + t.Fatalf("response %d status = %d, want 206", i+1, rec.Code) + } + if body := rec.Body.String(); body != "same-key-body" { + t.Fatalf("response %d body = %q, want same-key-body", i+1, body) + } + } + if got := atomic.LoadInt32(&originCalls); got != 1 { + t.Fatalf("origin calls after same-key waiters completed = %d, want 1", got) + } +} + +func TestHandleProxyOriginBackpressureCanBeDisabled(t *testing.T) { proxy := newTestProxy(t) + proxy.setOriginMaxInFlight(0) const requests = 65 var originCalls int32 @@ -586,11 +862,11 @@ func TestHandleProxyMetricsOnlyDoesNotRejectConcurrentOriginMisses(t *testing.T) } if got := atomic.LoadInt32(&originCalls); got != requests { release() - t.Fatalf("origin calls before release = %d, want %d; metrics-only PR must not limit origin concurrency", got, requests) + t.Fatalf("origin calls before release = %d, want %d; disabled backpressure must not limit origin concurrency", got, requests) } if got := atomic.LoadInt32(&maxActiveOriginCalls); got != requests { release() - t.Fatalf("simultaneous origin calls before release = %d, want %d; metrics-only PR must not queue origin concurrency", got, requests) + t.Fatalf("simultaneous origin calls before release = %d, want %d; disabled backpressure must not queue origin concurrency", got, requests) } release() @@ -602,10 +878,10 @@ func TestHandleProxyMetricsOnlyDoesNotRejectConcurrentOriginMisses(t *testing.T) } } if len(failed) > 0 { - t.Fatalf("metrics-only PR must not add local rejection behavior: %s", strings.Join(failed, ", ")) + t.Fatalf("disabled backpressure must not add local rejection behavior: %s", strings.Join(failed, ", ")) } if got := atomic.LoadInt32(&originCalls); got != requests { - t.Fatalf("origin calls = %d, want %d; metrics-only PR must not limit origin concurrency", got, requests) + t.Fatalf("origin calls = %d, want %d; disabled backpressure must not limit origin concurrency", got, requests) } } diff --git a/justfile b/justfile index 21cb3e1f..af531eab 100644 --- a/justfile +++ b/justfile @@ -262,6 +262,11 @@ test: test-unit: go test -v -p 1 . ./configresolve/... ./duckdbservice/... ./server/... ./transpiler/... ./internal/... +# Run cache-proxy tests +[group('test')] +test-cache-proxy: + go test -v ./cmd/cache-proxy/... + # Run integration tests [group('test')] test-integration: @@ -355,7 +360,7 @@ lint: # Run what CI runs locally (excluding kind-backed K8s integration) [group('test')] -ci: lint test-unit test-integration test-controlplane test-configstore-integration test-controlplane-k8s +ci: lint test-unit test-cache-proxy test-integration test-controlplane test-configstore-integration test-controlplane-k8s # === Metrics ===