Skip to content

Commit 3f641ac

Browse files
committed
feat: Add concurrency saturation detector
This introduces the `concurrencydetector` package, a new saturation mechanism based on real-time in-flight request tracking. Unlike the legacy `utilizationdetector` which relies on polling proxy metrics (queue depth, KV cache) and suffers from scrape lag and complex tuning, this detector maintains atomic counters updated via request lifecycle hooks. Key features: - Real-time `IsSaturated` signal to drive Flow Control backpressure. - `Filter` implementation to prevent scheduling to overloaded pods, solving "hot spot" issues where one pod is saturated while others idle. - Configurable `Headroom` to allow controlled bursting for affinity. Note: Wiring and configuration enablement are deferred to a follow-up PR.
1 parent cd2e1bc commit 3f641ac

3 files changed

Lines changed: 642 additions & 0 deletions

File tree

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package concurrencydetector
18+
19+
// Config holds the configuration for the Concurrency Detector.
20+
type Config struct {
21+
// MaxConcurrency defines the saturation threshold for a backend.
22+
//
23+
// This limit serves as the "ideal" capacity for a backend. When the number of active requests on a replica reaches
24+
// this value, that specific backend is considered "full" for the purpose of global saturation detection. If all
25+
// available replicas are full (>= MaxConcurrency), the Detector signals saturation to the Flow Controller, which will
26+
// trigger backpressure.
27+
//
28+
// Defaults to 100 if unset.
29+
MaxConcurrency int64 `json:"maxConcurrency"`
30+
31+
// Headroom defines the allowed burst capacity above MaxConcurrency for specific pod scheduling, expressed as a
32+
// fraction in [0.0, 1.0].
33+
//
34+
// While IsSaturated uses MaxConcurrency as the "ideal" capacity limit per pod to determine if the pool is full
35+
// (rejecting admission if ALL pods are >= MaxConcurrency), the Filter logic uses (MaxConcurrency * (1 + Headroom))
36+
// to determine if a specific pod is capable of accepting more work.
37+
//
38+
// Example: MaxConcurrency=100 (per pod), Headroom=0.2.
39+
// - Global Saturation: Triggered when all pods have >= 100 active requests.
40+
// This stops new requests from entering the scheduling loop, enforcing pool-average concurrency.
41+
// - Pod Filter Limit: 120 active requests.
42+
// The scheduler can still assign a request to a pod with 110 requests to satisfy affinity rules, as long as it
43+
// stays under 120.
44+
//
45+
// This allows the system to maintain a target average load (100) while giving the scheduler flexibility to handle
46+
// hot-spots or affinity (up to 120).
47+
//
48+
// Defaults to 0.0 (no burst allowed).
49+
Headroom float64 `json:"headroom"`
50+
}
51+
52+
const (
53+
// DefaultMaxConcurrency is the safe baseline for many LLM serving engines.
54+
DefaultMaxConcurrency = 100
55+
// DefaultHeadroom is the default burst allowance (0%).
56+
DefaultHeadroom = 0.0
57+
)
Lines changed: 262 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,262 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
// Package concurrencydetector implements a real-time saturation detection and scheduling filter mechanism based on
18+
// active in-flight request accounting.
19+
//
20+
// # Role in Flow Control (The Gatekeeper)
21+
//
22+
// The Detector implements the SaturationDetector interface to act as a "Circuit Breaker".
23+
// It signals saturation when every available candidate pod has reached the configured MaxConcurrency limit.
24+
// This indicates that the backend pool has no remaining capacity for new work, triggering the Flow Controller to queue
25+
// incoming requests.
26+
//
27+
// # Role in Scheduling (The Traffic Shaper)
28+
//
29+
// The Detector implements the Filter interface to protect individual pods.
30+
// It removes pods from candidate lists if they exceed the specific safety limit:
31+
//
32+
// Limit = MaxConcurrency * (1 + Headroom)
33+
//
34+
// This two-tier approach allows the Flow Controller to manage average pool load, while the Scheduler retains the
35+
// flexibility to burst slightly above ideal targets (the "Headroom") to satisfy affinity or scoring objectives.
36+
//
37+
// # Consistency & Drift Warning
38+
//
39+
// The Detector relies on a strict symmetry between PreRequest (increment) and ResponseComplete (decrement) calls.
40+
// It assumes the EPP framework guarantees that every PreRequest is eventually paired with a ResponseComplete.
41+
//
42+
// If the application panics, crashes, or if the framework fails to invoke the ompletion hook for a request, the
43+
// internal counters for a pod will drift upwards. This can lead to a "false saturated" state where the detector
44+
// believes a pod is full when it is actually empty.
45+
//
46+
// Currently, the only mechanism to reset a drifted counter is the DeletePod signal (when a backend is removed from the
47+
// pool). Future iterations may require a reconciliation loop or a TTL-based cleanup to recover from persistent drift.
48+
package concurrencydetector
49+
50+
import (
51+
"context"
52+
"encoding/json"
53+
"fmt"
54+
"sync"
55+
"sync/atomic"
56+
57+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend"
58+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics"
59+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins"
60+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/requestcontrol"
61+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/framework"
62+
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types"
63+
)
64+
65+
const ConcurrencyDetectorType = "concurrency-detector"
66+
67+
func init() {
68+
plugins.Register(ConcurrencyDetectorType, func(_ string, params json.RawMessage, handle plugins.Handle) (plugins.Plugin, error) {
69+
var cfg Config
70+
if len(params) > 0 {
71+
if err := json.Unmarshal(params, &cfg); err != nil {
72+
return nil, fmt.Errorf("failed to unmarshal concurrency detector config: %w", err)
73+
}
74+
}
75+
return NewDetector(cfg), nil
76+
})
77+
}
78+
79+
var (
80+
_ requestcontrol.PreRequest = &Detector{}
81+
_ requestcontrol.ResponseComplete = &Detector{}
82+
_ framework.Filter = &Detector{}
83+
)
84+
85+
// Detector implements a saturation detector and scheduling filter based on active request concurrency.
86+
type Detector struct {
87+
tracker *concurrencyTracker
88+
config Config
89+
}
90+
91+
// NewDetector creates a new instance of the Concurrency Detector.
92+
func NewDetector(config Config) *Detector {
93+
// TODO: Replace with more robust validation and defaulting logic once Saturation Detector becomes an official
94+
// extension point.
95+
if config.MaxConcurrency <= 0 {
96+
config.MaxConcurrency = DefaultMaxConcurrency
97+
}
98+
if config.Headroom < 0 {
99+
config.Headroom = DefaultHeadroom
100+
}
101+
102+
return &Detector{
103+
tracker: newConcurrencyTracker(),
104+
config: config,
105+
}
106+
}
107+
108+
// TypedName returns the type and name tuple of this plugin instance.
109+
func (d *Detector) TypedName() plugins.TypedName {
110+
return plugins.TypedName{
111+
Type: ConcurrencyDetectorType,
112+
Name: ConcurrencyDetectorType,
113+
}
114+
}
115+
116+
// IsSaturated acts as the global circuit breaker.
117+
//
118+
// It iterates through the provided list of candidate pods. If it finds at least one pod where the current in-flight
119+
// requests are below the MaxConcurrency threshold, it returns false (not saturated), allowing the Flow Controller to
120+
// admit the request.
121+
//
122+
// If all candidate pods are at or above the MaxConcurrency limit, it returns true, signaling the Flow Controller to
123+
// halt dispatch and queue incoming requests.
124+
func (d *Detector) IsSaturated(ctx context.Context, candidatePods []metrics.PodMetrics) bool {
125+
if len(candidatePods) == 0 {
126+
return true
127+
}
128+
129+
for _, pod := range candidatePods {
130+
if pod.GetMetadata() == nil {
131+
continue
132+
}
133+
134+
podID := pod.GetMetadata().NamespacedName.String()
135+
inflight := d.tracker.get(podID)
136+
if inflight < d.config.MaxConcurrency {
137+
return false
138+
}
139+
}
140+
return true
141+
}
142+
143+
// Filter blocks traffic to specific pods that are physically saturated or exceeding their safety limits.
144+
//
145+
// It applies a relaxed limit (MaxConcurrency * (1 + Headroom)) to allow for scheduling flexibility and burst tolerance.
146+
func (d *Detector) Filter(
147+
_ context.Context,
148+
_ *types.CycleState,
149+
_ *types.LLMRequest,
150+
pods []types.Pod,
151+
) []types.Pod {
152+
limit := int64(float64(d.config.MaxConcurrency) * (1.0 + d.config.Headroom))
153+
154+
// Pre-allocate assuming most pods will pass the filter to minimize allocations.
155+
filtered := make([]types.Pod, 0, len(pods))
156+
157+
for _, pod := range pods {
158+
podID := pod.GetPod().NamespacedName.String()
159+
if d.tracker.get(podID) <= limit {
160+
filtered = append(filtered, pod)
161+
}
162+
}
163+
return filtered
164+
}
165+
166+
// PreRequest increments the atomic in-flight counter for the target pod.
167+
// We assume the scheduling result is valid based on the Director's contract.
168+
func (d *Detector) PreRequest(_ context.Context, _ *types.LLMRequest, result *types.SchedulingResult) {
169+
d.tracker.inc(result.ProfileResults[result.PrimaryProfileName].TargetPods[0].GetPod().NamespacedName.String())
170+
}
171+
172+
// ResponseComplete decrements the atomic in-flight counter for the target pod.
173+
func (d *Detector) ResponseComplete(
174+
_ context.Context,
175+
_ *types.LLMRequest,
176+
_ *requestcontrol.Response,
177+
targetPod *backend.Pod,
178+
) {
179+
d.tracker.dec(targetPod.NamespacedName.String())
180+
}
181+
182+
// DeletePod removes a pod from the concurrency tracker to prevent memory leaks.
183+
// This should be called by the controller when a backend is removed from the pool.
184+
func (d *Detector) DeletePod(podID string) {
185+
d.tracker.delete(podID)
186+
}
187+
188+
// concurrencyTracker manages thread-safe counters for inflight requests.
189+
// It is optimized for a read-heavy workload.
190+
type concurrencyTracker struct {
191+
mu sync.RWMutex
192+
// counts stores the inflight count per pod ID.
193+
// We use *atomic.Int64 to allow safe concurrent updates without holding the map lock.
194+
counts map[string]*atomic.Int64
195+
}
196+
197+
func newConcurrencyTracker() *concurrencyTracker {
198+
return &concurrencyTracker{
199+
counts: make(map[string]*atomic.Int64),
200+
}
201+
}
202+
203+
// get returns the current inflight count for the given pod.
204+
// It returns 0 if the pod is not tracked.
205+
func (ct *concurrencyTracker) get(podID string) int64 {
206+
ct.mu.RLock()
207+
counter, exists := ct.counts[podID]
208+
ct.mu.RUnlock()
209+
210+
if !exists {
211+
return 0
212+
}
213+
return counter.Load()
214+
}
215+
216+
// inc increments the inflight count for the given pod.
217+
// It creates the counter if it does not exist.
218+
func (ct *concurrencyTracker) inc(podID string) {
219+
// Fast path: Try with read lock first.
220+
ct.mu.RLock()
221+
counter, exists := ct.counts[podID]
222+
ct.mu.RUnlock()
223+
224+
if exists {
225+
counter.Add(1)
226+
return
227+
}
228+
229+
// Slow path: Create counter with write lock.
230+
ct.mu.Lock()
231+
defer ct.mu.Unlock()
232+
233+
// Double-check existence to handle race conditions.
234+
if counter, exists = ct.counts[podID]; exists {
235+
counter.Add(1)
236+
return
237+
}
238+
239+
counter = &atomic.Int64{}
240+
counter.Store(1)
241+
ct.counts[podID] = counter
242+
}
243+
244+
// dec decrements the inflight count for the given pod.
245+
func (ct *concurrencyTracker) dec(podID string) {
246+
ct.mu.RLock()
247+
counter, exists := ct.counts[podID]
248+
ct.mu.RUnlock()
249+
250+
if exists {
251+
counter.Add(-1)
252+
}
253+
// If it doesn't exist, we silently ignore.
254+
// This can happen if a pod was deleted/garbage collected while a request was inflight.
255+
}
256+
257+
// delete removes the counter for the given pod.
258+
func (ct *concurrencyTracker) delete(podID string) {
259+
ct.mu.Lock()
260+
defer ct.mu.Unlock()
261+
delete(ct.counts, podID)
262+
}

0 commit comments

Comments
 (0)