Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion apix/config/v1alpha1/endpointpickerconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,22 @@ type EndpointPickerConfig struct {
// FlowControl configures the Flow Control layer.
// This configuration is only respected if the "flowControl" FeatureGate is enabled.
FlowControl *FlowControlConfig `json:"flowControl,omitempty"`

// +optional
// RequestControl configures the request control stage of the EPP pipeline.
RequestControl *RequestControlConfig `json:"requestControl,omitempty"`
Comment thread
tsj-30 marked this conversation as resolved.
}

func (cfg EndpointPickerConfig) String() string {
return fmt.Sprintf(
"{FeatureGates: %v, Plugins: %v, SchedulingProfiles: %v, Data: %v, SaturationDetector: %v, FlowControl: %v}",
"{FeatureGates: %v, Plugins: %v, SchedulingProfiles: %v, Data: %v, SaturationDetector: %v, FlowControl: %v, RequestControl: %v}",
cfg.FeatureGates,
cfg.Plugins,
cfg.SchedulingProfiles,
cfg.Data,
cfg.SaturationDetector,
cfg.FlowControl,
cfg.RequestControl,
)
}

Expand Down Expand Up @@ -183,6 +188,24 @@ type SaturationDetector struct {
MetricsStalenessThreshold metav1.Duration `json:"metricsStalenessThreshold,omitempty"`
}

// RequestControlConfig configures the request control stage.
type RequestControlConfig struct {
// +optional
// PrepareDataTimeout defines the timeout for PrepareData plugins.
Comment thread
tsj-30 marked this conversation as resolved.
// If omitted, defaults to 400ms.
PrepareDataTimeout *metav1.Duration `json:"prepareDataTimeout,omitempty"`
}

func (rc *RequestControlConfig) String() string {
if rc == nil {
return "{}"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is misleading since there is a default value for the parameters.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually disagree. The use of the String() function here is in logging of the configuration.

The configuration gets logged twice during startup. Once before the defaults are applied and again after the defaults are applied. The default value for PrepareDataTimeout should be added during the regular configuration loading process. It would then get displayed when the "complete" configuration is logged.

Copy link
Copy Markdown
Contributor

@ahg-g ahg-g Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we default earlier in the right places, then I agree. My point as implemented it was not reflecting that we are applying a default.

}
if rc.PrepareDataTimeout == nil {
return "{PrepareDataTimeout: default(400ms)}"
}
return "{PrepareDataTimeout: " + rc.PrepareDataTimeout.String() + "}"
}

func (sd *SaturationDetector) String() string {
result := ""
if sd != nil {
Expand Down
24 changes: 24 additions & 0 deletions apix/config/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions cmd/epp/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,10 @@ func (r *Runner) parseConfigurationPhaseTwo(ctx context.Context, rawConfig *conf

r.schedulerConfig = cfg.SchedulerConfig

if cfg.RequestControlConfig != nil && cfg.RequestControlConfig.PrepareDataTimeout > 0 {
r.requestControlConfig.WithPrepareDataTimeout(cfg.RequestControlConfig.PrepareDataTimeout)
}

// Add requestControl plugins
r.requestControlConfig.AddPlugins(handle.GetAllPlugins()...)

Expand Down
67 changes: 67 additions & 0 deletions cmd/epp/runner/runner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
Comment thread
tsj-30 marked this conversation as resolved.
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package runner

import (
"context"
"reflect"
"testing"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

configapi "sigs.k8s.io/gateway-api-inference-extension/apix/config/v1alpha1"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datastore"
)

func TestParseConfigurationPhaseTwoAppliesPrepareDataTimeout(t *testing.T) {
t.Parallel()

r := NewRunner()
r.registerInTreePlugins()

rawConfig := &configapi.EndpointPickerConfig{
RequestControl: &configapi.RequestControlConfig{
PrepareDataTimeout: &metav1.Duration{Duration: 125 * time.Millisecond},
},
}

ctx := context.Background()
epFactory := datalayer.NewEndpointFactory(nil, time.Millisecond)
ds := datastore.NewDatastore(ctx, epFactory, 0)

if _, err := r.parseConfigurationPhaseTwo(ctx, rawConfig, ds); err != nil {
t.Fatalf("parseConfigurationPhaseTwo failed: %v", err)
}

got := readPrepareDataTimeout(t, r.requestControlConfig)
want := 125 * time.Millisecond
if got != want {
t.Fatalf("prepareDataTimeout = %v, want %v", got, want)
}
}

func readPrepareDataTimeout(t *testing.T, cfg any) time.Duration {
t.Helper()

v := reflect.ValueOf(cfg).Elem().FieldByName("prepareDataTimeout")
if v.Kind() != reflect.Int64 {
t.Fatalf("unexpected kind for prepareDataTimeout: %v", v.Kind())
}
return time.Duration(v.Int())
}
8 changes: 8 additions & 0 deletions pkg/epp/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package config

import (
"time"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/datalayer"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/saturationdetector/framework/plugins/utilizationdetector"
Expand All @@ -29,4 +31,10 @@ type Config struct {
SaturationDetectorConfig *utilizationdetector.Config
DataConfig *datalayer.Config
FlowControlConfig *flowcontrol.Config
RequestControlConfig *RequestControlConfig
}

// RequestControlConfig holds configuration for request control behaviors.
type RequestControlConfig struct {
PrepareDataTimeout time.Duration
}
12 changes: 12 additions & 0 deletions pkg/epp/config/loader/configloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ func InstantiateAndConfigure(
SaturationDetectorConfig: buildSaturationConfig(rawConfig.SaturationDetector),
DataConfig: dataConfig,
FlowControlConfig: flowControlConfig,
RequestControlConfig: buildRequestControlConfig(rawConfig.RequestControl),
}, nil
}

Expand Down Expand Up @@ -245,6 +246,17 @@ func buildSaturationConfig(apiConfig *configapi.SaturationDetector) *utilization
return cfg
}

func buildRequestControlConfig(apiConfig *configapi.RequestControlConfig) *config.RequestControlConfig {
if apiConfig == nil {
return nil
}
cfg := &config.RequestControlConfig{}
if apiConfig.PrepareDataTimeout != nil && apiConfig.PrepareDataTimeout.Duration > 0 {
cfg.PrepareDataTimeout = apiConfig.PrepareDataTimeout.Duration
}
return cfg
}

func buildDataLayerConfig(rawDataConfig *configapi.DataLayerConfig, dataLayerEnabled bool, handle fwkplugin.Handle) (*datalayer.Config, error) {
if dataLayerEnabled && (rawDataConfig == nil || rawDataConfig.Sources == nil) { // enabled but no configuration
return nil, errors.New("the Datalayer has been enabled. You must specify the Data section in the configuration")
Expand Down
14 changes: 14 additions & 0 deletions pkg/epp/config/loader/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package loader

import (
"errors"
"fmt"

"k8s.io/apimachinery/pkg/util/sets"
Expand All @@ -32,6 +33,9 @@ func validateConfig(cfg *configapi.EndpointPickerConfig) error {
if err := validateSchedulingProfiles(cfg); err != nil {
return fmt.Errorf("scheduling profile validation failed: %w", err)
}
if err := validateRequestControl(cfg.RequestControl); err != nil {
return fmt.Errorf("request control validation failed: %w", err)
}
return nil
}

Expand Down Expand Up @@ -78,3 +82,13 @@ func validateFeatureGates(gates configapi.FeatureGates) error {

return nil
}

func validateRequestControl(cfg *configapi.RequestControlConfig) error {
if cfg == nil || cfg.PrepareDataTimeout == nil {
return nil
}
if cfg.PrepareDataTimeout.Duration <= 0 {
return errors.New("prepareDataTimeout must be greater than 0")
}
return nil
}
15 changes: 7 additions & 8 deletions pkg/epp/requestcontrol/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,6 @@ import (
requtil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/request"
)

const (
// TODO(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/2081):
// Make this timeout configurable per-plugin or globally via the Director configuration to support plugins with
// varying latency profiles.
prepareDataTimeout = 400 * time.Millisecond
)

// Datastore defines the interface required by the Director.
type Datastore interface {
PoolGet() (*datalayer.EndpointPool, error)
Expand Down Expand Up @@ -352,7 +345,13 @@ func (d *Director) runPrepareDataPlugins(ctx context.Context,
if len(d.requestControlPlugins.prepareDataPlugins) == 0 {
return nil
}
return prepareDataPluginsWithTimeout(prepareDataTimeout, d.requestControlPlugins.prepareDataPlugins, ctx, request, endpoints)
return prepareDataPluginsWithTimeout(
d.requestControlPlugins.prepareDataTimeout,
d.requestControlPlugins.prepareDataPlugins,
ctx,
request,
endpoints,
)
}

func (d *Director) runAdmissionPlugins(ctx context.Context,
Expand Down
12 changes: 12 additions & 0 deletions pkg/epp/requestcontrol/request_control_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ limitations under the License.
package requestcontrol

import (
"time"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/plugin"
fwk "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/framework/interface/requestcontrol"
)

const defaultPrepareDataTimeout = 400 * time.Millisecond

// NewConfig creates a new Config object and returns its pointer.
func NewConfig() *Config {
return &Config{
Expand All @@ -30,6 +34,7 @@ func NewConfig() *Config {
responseReceivedPlugins: []fwk.ResponseReceived{},
responseStreamingPlugins: []fwk.ResponseStreaming{},
responseCompletePlugins: []fwk.ResponseComplete{},
prepareDataTimeout: defaultPrepareDataTimeout,
}
}

Expand All @@ -41,6 +46,7 @@ type Config struct {
responseReceivedPlugins []fwk.ResponseReceived
responseStreamingPlugins []fwk.ResponseStreaming
responseCompletePlugins []fwk.ResponseComplete
prepareDataTimeout time.Duration
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the default timeout shouldn't be set here. Instead it should be set in pkg/epp/config/loader/defaults.go along with all of the other configuration defaults.

This includes being properly set in the configuration. used when none was specified.

}

// WithPreRequestPlugins sets the given plugins as the PreRequest plugins.
Expand Down Expand Up @@ -77,6 +83,12 @@ func (c *Config) WithPrepareDataPlugins(plugins ...fwk.PrepareDataPlugin) *Confi
return c
}

// WithPrepareDataTimeout sets the timeout for PrepareData plugins.
func (c *Config) WithPrepareDataTimeout(timeout time.Duration) *Config {
c.prepareDataTimeout = timeout
return c
}

// WithAdmissionPlugins sets the given plugins as the AdmitRequest plugins.
func (c *Config) WithAdmissionPlugins(plugins ...fwk.AdmissionPlugin) *Config {
c.admissionPlugins = plugins
Expand Down