Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions pkg/epp/flowcontrol/registry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ const (
defaultInitialShardCount int = 1
// defaultFlowGCTimeout is the default duration of inactivity after which an idle flow is garbage collected.
// This also serves as the interval for the periodic garbage collection scan.
// TODO:(https://github.com/kubernetes-sigs/gateway-api-inference-extension/issues/1982) revert to 5m once this GC
// race condition is properly resolved.
defaultFlowGCTimeout time.Duration = 1 * time.Hour
defaultFlowGCTimeout time.Duration = 5 * time.Minute
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch -- this fix was just merged today, so 5 * time.Minute is safe again.

// defaultPriorityBandGCTimeout is the default duration of inactivity after which a dynamically provisioned
// priority band is garbage collected. Set to 2x flow GC timeout to ensure flows are cleaned up first.
defaultPriorityBandGCTimeout time.Duration = 2 * defaultFlowGCTimeout
// defaultEventChannelBufferSize is the default size of the buffered channel for control plane events.
defaultEventChannelBufferSize int = 4096
)
Expand Down Expand Up @@ -133,6 +134,12 @@ type Config struct {
// Optional: Defaults to `defaultFlowGCTimeout` (1 hour).
FlowGCTimeout time.Duration

// PriorityBandGCTimeout defines the duration of inactivity after which a dynamically provisioned priority band
// is garbage collected. A band is considered idle when it has no flows and no buffered requests across all shards.
// Must be >= FlowGCTimeout to ensure flows are collected before bands.
// Optional: Defaults to `defaultPriorityBandGCTimeout` (10 minutes).
PriorityBandGCTimeout time.Duration

// EventChannelBufferSize defines the size of the buffered channel used for internal control plane events.
// A larger buffer can absorb larger bursts of events (e.g., from many queues becoming non-empty simultaneously)
// without blocking the data path, but consumes more memory.
Expand Down Expand Up @@ -218,6 +225,20 @@ func WithFlowGCTimeout(d time.Duration) ConfigOption {
}
}

// WithPriorityBandGCTimeout sets the idle priority band garbage collection timeout.
func WithPriorityBandGCTimeout(d time.Duration) ConfigOption {
return func(b *configBuilder) error {
if d <= 0 {
return errors.New("priorityBandGCTimeout must be positive")
}
if b.config.FlowGCTimeout > 0 && d < b.config.FlowGCTimeout {
return errors.New("priorityBandGCTimeout must be >= flowGCTimeout")
}
b.config.PriorityBandGCTimeout = d
return nil
}
}

// WithPriorityBand adds a priority band configuration.
// If a band with the same Priority already exists, it returns an error.
func WithPriorityBand(band *PriorityBandConfig) ConfigOption {
Expand Down Expand Up @@ -329,6 +350,7 @@ func NewConfig(handle plugins.Handle, opts ...ConfigOption) (*Config, error) {
MaxBytes: 0, // no limit enforced
InitialShardCount: defaultInitialShardCount,
FlowGCTimeout: defaultFlowGCTimeout,
PriorityBandGCTimeout: defaultPriorityBandGCTimeout,
EventChannelBufferSize: defaultEventChannelBufferSize,
PriorityBands: make(map[int]*PriorityBandConfig),
},
Expand Down Expand Up @@ -447,6 +469,12 @@ func (c *Config) validate(checker capabilityChecker) error {
if c.FlowGCTimeout <= 0 {
return errors.New("flowGCTimeout must be positive")
}
if c.PriorityBandGCTimeout <= 0 {
return errors.New("priorityBandGCTimeout must be positive")
}
if c.PriorityBandGCTimeout < c.FlowGCTimeout {
return errors.New("priorityBandGCTimeout must be >= flowGCTimeout")
}
if c.EventChannelBufferSize <= 0 {
return errors.New("eventChannelBufferSize must be greater than 0")
}
Expand Down
40 changes: 40 additions & 0 deletions pkg/epp/flowcontrol/registry/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func TestNewConfig(t *testing.T) {
assertion: func(t *testing.T, cfg *Config) {
assert.Equal(t, defaultInitialShardCount, cfg.InitialShardCount, "InitialShardCount should be defaulted")
assert.Equal(t, defaultFlowGCTimeout, cfg.FlowGCTimeout, "FlowGCTimeout should be defaulted")
assert.Equal(t, defaultPriorityBandGCTimeout, cfg.PriorityBandGCTimeout, "PriorityBandGCTimeout should be defaulted")
assert.Equal(t, defaultEventChannelBufferSize, cfg.EventChannelBufferSize,
"EventChannelBufferSize should be defaulted")

Expand All @@ -111,13 +112,15 @@ func TestNewConfig(t *testing.T) {
WithInitialShardCount(10),
WithMaxBytes(5000),
WithFlowGCTimeout(1 * time.Hour),
WithPriorityBandGCTimeout(2 * time.Hour),
WithPriorityBand(mustBand(t, 1, "High")),
},
handle: newTestPluginsHandle(t),
assertion: func(t *testing.T, cfg *Config) {
assert.Equal(t, 10, cfg.InitialShardCount)
assert.Equal(t, uint64(5000), cfg.MaxBytes)
assert.Equal(t, 1*time.Hour, cfg.FlowGCTimeout)
assert.Equal(t, 2*time.Hour, cfg.PriorityBandGCTimeout)
},
},
{
Expand Down Expand Up @@ -187,6 +190,43 @@ func TestNewConfig(t *testing.T) {
handle: newTestPluginsHandle(t),
expectErr: true,
},
{
name: "ShouldError_WhenPriorityBandGCTimeoutIsNegative",
opts: []ConfigOption{WithPriorityBandGCTimeout(-1 * time.Second)},
handle: newTestPluginsHandle(t),
expectErr: true,
},
{
name: "ShouldError_WhenPriorityBandGCTimeoutLessThanFlowGCTimeout",
opts: []ConfigOption{
WithFlowGCTimeout(10 * time.Minute),
WithPriorityBandGCTimeout(5 * time.Minute), // Less than flow timeout
},
handle: newTestPluginsHandle(t),
expectErr: true,
},
{
name: "ShouldSucceed_WhenPriorityBandGCTimeoutEqualToFlowGCTimeout",
opts: []ConfigOption{
WithFlowGCTimeout(10 * time.Minute),
WithPriorityBandGCTimeout(10 * time.Minute), // Equal is OK
},
handle: newTestPluginsHandle(t),
assertion: func(t *testing.T, cfg *Config) {
assert.Equal(t, 10*time.Minute, cfg.PriorityBandGCTimeout)
},
},
{
name: "ShouldSucceed_WhenPriorityBandGCTimeoutGreaterThanFlowGCTimeout",
opts: []ConfigOption{
WithFlowGCTimeout(5 * time.Minute),
WithPriorityBandGCTimeout(15 * time.Minute),
},
handle: newTestPluginsHandle(t),
assertion: func(t *testing.T, cfg *Config) {
assert.Equal(t, 15*time.Minute, cfg.PriorityBandGCTimeout)
},
},

// --- Validation Errors (Bands) ---
{
Expand Down
Loading