Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,22 @@ jobs:
- name: Run unit tests
run: just test-unit

cache-proxy-tests:
runs-on: ubuntu-24.04-arm
steps:
- name: Checkout code
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Set up just
uses: extractions/setup-just@dd310ad5a97d8e7b41793f8ef055398d51ad4de6 # v2.0.0

- name: Set up Go
uses: actions/setup-go@f111f3307d8850f501ac008e886eec1fd1932a34 # v5.3.0
with:
go-version-file: go.mod

- name: Run cache-proxy tests
run: just test-cache-proxy

integration-tests:
needs: unit-tests
runs-on: ubuntu-24.04-arm
Expand Down
21 changes: 21 additions & 0 deletions cmd/cache-proxy/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,30 @@ available, and forwards cache misses to origin object storage.
| `PEER_ADDR` | `:8081` | Peer cache API listener. |
| `HEALTH_ADDR` | `:8082` | Health and Prometheus metrics listener. |
| `CACHE_HOST_SUFFIXES` | empty | Empty means all `GET` hosts are cacheable. Otherwise, cache only hosts containing one of the comma-separated suffixes. |
| `ORIGIN_MAX_IN_FLIGHT` | `0` | Maximum concurrent cacheable origin fills per proxy pod. `0` disables local origin backpressure. Configure a positive value after observing production origin-fill and retry metrics. Distinct cache misses queue when the limit is saturated. Same-key misses are singleflight-deduplicated and share one slot. |
| `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` / `DUCKGRES_TRACE_ENDPOINT` | empty | OTLP/HTTP trace endpoint. Unset → tracing is a no-op. |
| `OTEL_EXPORTER_OTLP_TRACES_PATH` | empty | Overrides the OTLP path (e.g. VictoriaTraces' `/insert/opentelemetry/v1/traces`). Mirrors the main duckgres binary. |

## Origin Backpressure

Cacheable origin fills can be locally bounded by `ORIGIN_MAX_IN_FLIGHT` per
cache-proxy pod. The default is `0`, which disables local origin backpressure so
the limiter can be enabled after observing production metrics from the origin
fetch and retry dashboards. When enabled, the limiter is applied after
local-cache lookup, peer-cache lookup, and singleflight deduplication, so hits,
peer hits, non-GET requests, `CONNECT` tunnels, and duplicate same-key waiters
do not consume separate origin slots.

When all origin slots are in use, a distinct cache miss waits until a slot is
available or the request context is canceled. If the wait is canceled before an
origin request starts, the proxy returns `503 Service Unavailable` with
`Retry-After: 1`; nothing is written to the cache.

Prometheus exposes `cache_proxy_origin_fetches_queued` for current waiters and
`cache_proxy_origin_fetch_queue_wait_seconds{outcome}` for saturated-limit wait
duration. `cache_proxy_origin_fetches_in_flight` continues to track only active
origin fills.

## Tracing

When a trace endpoint is set the proxy exports OpenTelemetry spans under
Expand Down
22 changes: 22 additions & 0 deletions cmd/cache-proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ func main() {
peerAddr := envOrDefault("PEER_ADDR", ":8081")
healthAddr := envOrDefault("HEALTH_ADDR", ":8082")
peerService := os.Getenv("PEER_SERVICE") // headless K8s service for peer discovery
originMaxInFlight, err := nonNegativeIntEnvOrDefault("ORIGIN_MAX_IN_FLIGHT", defaultOriginMaxInFlight)
if err != nil {
slog.Error("Invalid ORIGIN_MAX_IN_FLIGHT.", "error", err)
os.Exit(1)
}

// Comma-separated Host substrings we should cache. Anything else is tunneled
// or forwarded without caching. Empty means "cache everything" (legacy).
Expand All @@ -116,6 +121,7 @@ func main() {
"peer_listen", peerAddr,
"health", healthAddr,
"peer_service", peerService,
"origin_max_in_flight", originMaxInFlight,
"cache_host_suffixes", cacheHostSuffixes,
)

Expand All @@ -134,6 +140,7 @@ func main() {
}

proxy := NewCacheProxy(store, peers, cacheHostSuffixes)
proxy.setOriginMaxInFlight(originMaxInFlight)

// Forward HTTP proxy (DuckDB httpfs traffic). ServeMux can't match absolute
// URLs in forward-proxy requests, so use the handler directly.
Expand Down Expand Up @@ -193,3 +200,18 @@ func envOrDefault(key, def string) string {
}
return def
}

func nonNegativeIntEnvOrDefault(key string, def int) (int, error) {
raw := os.Getenv(key)
if raw == "" {
return def, nil
}
value, err := strconv.Atoi(raw)
if err != nil {
return 0, fmt.Errorf("%s=%q is not an integer", key, raw)
}
if value < 0 {
return 0, fmt.Errorf("%s=%d must be non-negative", key, value)
}
return value, nil
}
74 changes: 73 additions & 1 deletion cmd/cache-proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type CacheProxy struct {
originRetryMaxAttempts int
originRetryInitialBackoff time.Duration
originRetryMaxBackoff time.Duration
originMaxInFlight int
originSlots chan struct{}

// cacheHostSuffixes are the Host substrings that identify DuckLake bucket
// traffic worth caching. Requests whose Host doesn't contain any of these
Expand All @@ -71,6 +73,7 @@ const (
defaultOriginRetryMaxAttempts = 4
defaultOriginRetryInitialBackoff = 100 * time.Millisecond
defaultOriginRetryMaxBackoff = 1 * time.Second
defaultOriginMaxInFlight = 0
)

type singleFlight struct {
Expand All @@ -84,6 +87,18 @@ type call struct {
err error
}

type originBackpressureWaitError struct {
err error
}

func (e *originBackpressureWaitError) Error() string {
return fmt.Sprintf("origin fetch backpressure wait canceled: %v", e.err)
}

func (e *originBackpressureWaitError) Unwrap() error {
return e.err
}

func (sf *singleFlight) Do(key string, fn func() (fetchResult, error)) (fetchResult, error) {
sf.mu.Lock()
if sf.m == nil {
Expand All @@ -110,7 +125,7 @@ func (sf *singleFlight) Do(key string, fn func() (fetchResult, error)) (fetchRes
}

func NewCacheProxy(store *DiskCache, peers *PeerManager, cacheHostSuffixes []string) *CacheProxy {
return &CacheProxy{
proxy := &CacheProxy{
store: store,
peers: peers,
client: &http.Client{Timeout: defaultOriginTimeout},
Expand All @@ -120,6 +135,50 @@ func NewCacheProxy(store *DiskCache, peers *PeerManager, cacheHostSuffixes []str
originRetryMaxBackoff: defaultOriginRetryMaxBackoff,
cacheHostSuffixes: cacheHostSuffixes,
}
proxy.setOriginMaxInFlight(defaultOriginMaxInFlight)
return proxy
}

func (p *CacheProxy) setOriginMaxInFlight(max int) {
p.originMaxInFlight = max
if max <= 0 {
p.originSlots = nil
return
}
p.originSlots = make(chan struct{}, max)
}

func (p *CacheProxy) acquireOriginSlot(ctx context.Context) (func(), error) {
if p.originSlots == nil {
return func() {}, nil
}
select {
case p.originSlots <- struct{}{}:
return func() { <-p.originSlots }, nil
default:
}

start := time.Now()
originFetchQueued.Inc()
defer originFetchQueued.Dec()
select {
case p.originSlots <- struct{}{}:
originFetchQueueWaitSeconds.WithLabelValues("admitted").Observe(time.Since(start).Seconds())
return func() { <-p.originSlots }, nil
case <-ctx.Done():
originFetchQueueWaitSeconds.WithLabelValues(originQueueWaitOutcome(ctx.Err())).Observe(time.Since(start).Seconds())
return nil, &originBackpressureWaitError{err: ctx.Err()}
}
}

func originQueueWaitOutcome(err error) string {
if errors.Is(err, context.DeadlineExceeded) {
return "timeout"
}
if errors.Is(err, context.Canceled) {
return "canceled"
}
return "error"
}

// shouldCache returns true if the request targets a host we want to cache.
Expand Down Expand Up @@ -310,6 +369,14 @@ func (p *CacheProxy) HandleProxy(w http.ResponseWriter, r *http.Request) {
res, err := p.fetchDedup(cacheKey, r, rangeHeader)
if err != nil {
span.SetStatus(codes.Error, err.Error())
var backpressureErr *originBackpressureWaitError
if errors.As(err, &backpressureErr) {
slog.Warn("Origin fetch backpressure wait canceled.",
"url", r.URL.String(), "range", rangeHeader, "error", err)
w.Header().Set("Retry-After", "1")
http.Error(w, err.Error(), http.StatusServiceUnavailable)
return
}
// An origin that responded with a non-2xx (e.g. S3 returning a 400 with
// <Code>ExpiredToken</Code> in an XML envelope) is forwarded back to
// DuckDB verbatim — same status code, same body, same headers minus
Expand Down Expand Up @@ -372,6 +439,11 @@ func (p *CacheProxy) fetchDedup(cacheKey string, r *http.Request, rangeHeader st
return fetchResult{size: n, source: "peer"}, nil
}
}
releaseOriginSlot, err := p.acquireOriginSlot(r.Context())
if err != nil {
return fetchResult{}, err
}
defer releaseOriginSlot()
originFetchInFlight.Inc()
defer originFetchInFlight.Dec()
size, ct, err := p.fetchOrigin(cacheKey, r)
Expand Down
9 changes: 9 additions & 0 deletions cmd/cache-proxy/proxy_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,13 @@ var (
Name: "cache_proxy_origin_fetches_in_flight",
Help: "Current number of origin fetches filling the local cache",
})
originFetchQueued = promauto.NewGauge(prometheus.GaugeOpts{
Name: "cache_proxy_origin_fetches_queued",
Help: "Current number of cacheable origin fetch leaders waiting for an origin concurrency slot",
})
originFetchQueueWaitSeconds = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "cache_proxy_origin_fetch_queue_wait_seconds",
Help: "Seconds cacheable origin fetch leaders spend waiting for an origin concurrency slot after the limit is saturated",
Buckets: prometheus.DefBuckets,
}, []string{"outcome"})
)
Loading
Loading