Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/epp/flowcontrol/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,14 @@ func (fc *FlowController) EnqueueAndWait(
flowKey.ID, priority,
req.InferencePoolName(),
req.ModelName(), req.TargetModelName())
metrics.AddFlowControlQueueBytes(
flowKey.ID, priority,
req.InferencePoolName(),
req.ModelName(), req.TargetModelName(), req.ByteSize())
defer metrics.SubFlowControlQueueBytes(
flowKey.ID, priority,
req.InferencePoolName(),
req.ModelName(), req.TargetModelName(), req.ByteSize())

// 1. Create the derived context that governs this request's lifecycle (Parent Cancellation + TTL).
reqCtx, cancel, enqueueTime := fc.createRequestContext(ctx, req)
Expand Down
6 changes: 6 additions & 0 deletions pkg/epp/flowcontrol/controller/internal/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/types"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/metrics"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

Expand Down Expand Up @@ -291,6 +292,11 @@ func (sp *ShardProcessor) hasCapacity(priority int, itemByteSize uint64) bool {
// blocking to respect the policy's decision and prevent priority inversion, where dispatching lower-priority work might
// exacerbate the saturation affecting the high-priority item.
func (sp *ShardProcessor) dispatchCycle(ctx context.Context) bool {
dispatchCycleStart := time.Now()
defer func() {
metrics.RecordFlowControlDispatchCycleDuration(time.Since(dispatchCycleStart))
}()

for _, priority := range sp.shard.AllOrderedPriorityLevels() {
originalBand, err := sp.shard.PriorityBandAccessor(priority)
if err != nil {
Expand Down
49 changes: 49 additions & 0 deletions pkg/epp/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,28 @@ var (
append([]string{"fairness_id", "priority", "outcome", "inference_pool"}, ModelLabels...),
)

flowControlDispatchCycleDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: InferenceExtension,
Name: "flow_control_dispatch_cycle_duration_seconds",
Help: metricsutil.HelpMsgWithStability("Distribution of the time taken for each dispatch cycle in the EPP flow control layer.", compbasemetrics.ALPHA),
// Buckets: []float64{
// 0.000000005, // 5 ns
// 0.000000010, // 10 ns
// 0.000000025, // 25 ns
// 0.000000050, // 50 ns
// 0.000000100, // 100 ns
// 0.000000250, // 250 ns
// 0.000000500, // 500 ns
// 0.000001000, // 1000 ns (1 µs)
// },
Buckets: []float64{
5, 10, 25, 50, 100, 250, 500, 1000,
},
},
[]string{},
)

flowControlQueueSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: InferenceExtension,
Expand All @@ -407,6 +429,15 @@ var (
},
append([]string{"fairness_id", "priority", "inference_pool"}, ModelLabels...),
)

flowControlQueueBytes = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Subsystem: InferenceExtension,
Name: "flow_control_queue_bytes",
Help: metricsutil.HelpMsgWithStability("Current number of bytes associated with requests actively managed by the EPP flow control layer, from the start of the EnqueueAndWait call until a final outcome is reached.", compbasemetrics.ALPHA),
},
append([]string{"fairness_id", "priority", "inference_pool"}, ModelLabels...),
)
)

// --- Inference Model Rewrite Metrics ---
Expand Down Expand Up @@ -460,7 +491,9 @@ func Register(customCollectors ...prometheus.Collector) {
metrics.Registry.MustRegister(PrefixCacheHitRatio)
metrics.Registry.MustRegister(PrefixCacheHitLength)
metrics.Registry.MustRegister(flowControlRequestQueueDuration)
metrics.Registry.MustRegister(flowControlDispatchCycleDuration)
metrics.Registry.MustRegister(flowControlQueueSize)
metrics.Registry.MustRegister(flowControlQueueBytes)
metrics.Registry.MustRegister(inferenceModelRewriteDecisionsTotal)
for _, collector := range customCollectors {
metrics.Registry.MustRegister(collector)
Expand Down Expand Up @@ -507,6 +540,7 @@ func Reset() {
PrefixCacheHitLength.Reset()
flowControlRequestQueueDuration.Reset()
flowControlQueueSize.Reset()
flowControlQueueBytes.Reset()
inferenceModelRewriteDecisionsTotal.Reset()
}

Expand Down Expand Up @@ -787,6 +821,11 @@ func RecordFlowControlRequestQueueDuration(
).Observe(duration.Seconds())
}

// RecordFlowControlDispatchCycleDuration records the duration of a dispatch cycle in the Flow Control layer.
func RecordFlowControlDispatchCycleDuration(duration time.Duration) {
flowControlDispatchCycleDuration.WithLabelValues().Observe(float64(duration.Nanoseconds()))
}

// IncFlowControlQueueSize increments the Flow Control queue size gauge.
func IncFlowControlQueueSize(fairnessID, priority, inferencePool, modelName, targetModelName string) {
flowControlQueueSize.WithLabelValues(fairnessID, priority, inferencePool, modelName, targetModelName).Inc()
Expand All @@ -797,6 +836,16 @@ func DecFlowControlQueueSize(fairnessID, priority, inferencePool, modelName, tar
flowControlQueueSize.WithLabelValues(fairnessID, priority, inferencePool, modelName, targetModelName).Dec()
}

// AddFlowControlQueueBytes increments the Flow Control queue bytes gauge.
func AddFlowControlQueueBytes(fairnessID, priority, inferencePool, modelName, targetModelName string, bytes uint64) {
flowControlQueueBytes.WithLabelValues(fairnessID, priority, inferencePool, modelName, targetModelName).Add(float64(bytes))
}

// SubFlowControlQueueBytes decrements the Flow Control queue bytes gauge.
func SubFlowControlQueueBytes(fairnessID, priority, inferencePool, modelName, targetModelName string, bytes uint64) {
flowControlQueueBytes.WithLabelValues(fairnessID, priority, inferencePool, modelName, targetModelName).Sub(float64(bytes))
}

// SetTTFTSLOThreshold sets the TTFT SLO threshold for a model.
// This allows dynamic threshold management and makes the threshold visible in metrics.
func SetTTFTSLOThreshold(modelName, targetModelName string, threshold float64) {
Expand Down
83 changes: 83 additions & 0 deletions pkg/epp/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,51 @@ func TestSchedulerE2ELatency(t *testing.T) {
}
}

func TestFlowControlDispatchCycleLengthMetric(t *testing.T) {
Reset()
scenarios := []struct {
name string
durations []time.Duration
}{
{
name: "multiple scheduling latencies",
durations: []time.Duration{
200 * time.Nanosecond, // 0.0000002s
800 * time.Nanosecond, // 0.0000008s
1500 * time.Nanosecond, // 0.0000015s
3 * time.Nanosecond, // 0.000000003s
8 * time.Nanosecond, // 0.000000008s
15 * time.Nanosecond, // 0.000000015s
30 * time.Nanosecond, // 0.000000030s
75 * time.Nanosecond, // 0.000000075s
150 * time.Nanosecond, // 0.00000015s
},
},
}
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
for _, duration := range scenario.durations {
RecordFlowControlDispatchCycleDuration(duration)
}

wantDispatchCycleLatency, err := os.Open("testdata/flow_control_dispatch_cycle_duration_seconds_metric")
defer func() {
if err := wantDispatchCycleLatency.Close(); err != nil {
t.Error(err)
}
}()
if err != nil {
t.Fatal(err)
}
if err := testutil.GatherAndCompare(metrics.Registry, wantDispatchCycleLatency, "inference_extension_flow_control_dispatch_cycle_duration_seconds"); err != nil {
t.Error(err)
}
})
}
}

// TODO (7028): Research histogram bins using real-world data to ensure they are optimal.

func TestSchedulerAttemptsTotal(t *testing.T) {

scenarios := []struct {
Expand Down Expand Up @@ -973,6 +1018,44 @@ func TestFlowControlQueueSizeMetric(t *testing.T) {
require.Equal(t, 0.0, val, "Gauge value for non-existent labels should be 0")
}

func TestFlowControlQueueBytesMetric(t *testing.T) {
Reset()

const (
pool = "pool-1"
model = "llama-2"
target = "llama-base"
)

// Basic Inc/Dec
AddFlowControlQueueBytes("user-a", "100", pool, model, target, 32.0)
val, err := testutil.GetGaugeMetricValue(flowControlQueueBytes.WithLabelValues("user-a", "100", pool, model, target))
require.NoError(t, err, "Failed to get gauge value for user-a/100 after Inc")
require.Equal(t, 32.0, val, "Gauge value should be 32 after Add for user-a/100")

SubFlowControlQueueBytes("user-a", "100", pool, model, target, 32)
val, err = testutil.GetGaugeMetricValue(flowControlQueueBytes.WithLabelValues("user-a", "100", pool, model, target))
require.NoError(t, err, "Failed to get gauge value for user-a/100 after Sub")
require.Equal(t, 0.0, val, "Gauge value should be 0 after Sub for user-a/100")

// Multiple labels
AddFlowControlQueueBytes("user-b", "200", pool, model, target, 32.0)
AddFlowControlQueueBytes("user-b", "200", pool, model, target, 16.0)
val, err = testutil.GetGaugeMetricValue(flowControlQueueBytes.WithLabelValues("user-b", "200", pool, model, target))
require.NoError(t, err, "Failed to get gauge value for user-b/200")
require.Equal(t, 48.0, val, "Gauge value should be 48 for user-b/200")

SubFlowControlQueueBytes("user-b", "200", pool, model, target, 48.0)
val, err = testutil.GetGaugeMetricValue(flowControlQueueBytes.WithLabelValues("user-b", "200", pool, model, target))
require.NoError(t, err, "Failed to get gauge value for user-b/200 after one Sub")
require.Equal(t, 0.0, val, "Gauge value should be 0 for user-b/200 after one Sub")

// Non-existent labels
val, err = testutil.GetGaugeMetricValue(flowControlQueueBytes.WithLabelValues("user-c", "100", pool, model, target))
require.NoError(t, err, "Failed to get gauge value for non-existent user-c/100")
require.Equal(t, 0.0, val, "Gauge value for non-existent labels should be 0")
}

func TestInferenceModelRewriteDecisionsTotalMetric(t *testing.T) {
Reset()

Expand Down
1 change: 1 addition & 0 deletions site-src/guides/metrics-and-observability.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ These metrics provide insights into the experimental flow control layer within t
|:---|:---|:---|:---|:---|
| inference_extension_flow_control_request_queue_duration_seconds | Distribution | Distribution of the total time requests spend in the flow control layer. This is measured from the moment a request enters the `EnqueueAndWait` function until it reaches a final outcome (e.g., Dispatched, Rejected, Evicted). | `fairness_id`=&lt;flow-id&gt; <br> `priority`=&lt;flow-priority&gt; <br> `outcome`=&lt;QueueOutcome&gt; <br> `inference_pool`=&lt;pool-name&gt; <br> `model_name`=&lt;model-name&gt; <br> `target_model_name`=&lt;target-model-name&gt; | ALPHA |
| inference_extension_flow_control_queue_size | Gauge | The current number of requests being actively managed by the flow control layer. This counts requests from the moment they enter the `EnqueueAndWait` function until they reach a final outcome. | `fairness_id`=&lt;flow-id&gt; <br> `priority`=&lt;flow-priority&gt; <br> `inference_pool`=&lt;pool-name&gt; <br> `model_name`=&lt;model-name&gt; <br> `target_model_name`=&lt;target-model-name&gt; | ALPHA |
| inference_extension_flow_control_queue_bytes | Gauge | The current size in bytes of all requests being actively managed by the flow control layer. This includes requests from the moment they enter the `EnqueueAndWait` function until they reach a final outcome. | `fairness_id`=&lt;flow-id&gt; <br> `priority`=&lt;flow-priority&gt; <br> `inference_pool`=&lt;pool-name&gt; <br> `model_name`=&lt;model-name&gt; <br> `target_model_name`=&lt;target-model-name&gt; | ALPHA |

## Scrape Metrics & Pprof profiles

Expand Down