Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions pkg/epp/flowcontrol/contracts/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,6 @@ type ShardStats struct {
type PriorityBandStats struct {
// Priority is the numerical priority level this struct describes.
Priority int
// PriorityName is a human-readable name for the priority band (e.g., "Critical", "Sheddable").
// The registry configuration requires this field, so it is guaranteed to be non-empty.
PriorityName string
// CapacityBytes is the configured maximum total byte size for this priority band.
// When viewed via `AggregateStats`, this is the global limit. When viewed via `ShardStats`, this is the partitioned
// value for that specific shard.
Expand Down
22 changes: 7 additions & 15 deletions pkg/epp/flowcontrol/controller/internal/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,19 +230,11 @@ func (sp *ShardProcessor) enqueue(item *FlowItem) {
return
}

band, err := sp.shard.PriorityBandAccessor(key.Priority)
if err != nil {
finalErr := fmt.Errorf("configuration error: failed to get priority band for priority %d: %w", key.Priority, err)
sp.logger.Error(finalErr, "Rejecting item.", "flowKey", key, "reqID", req.ID())
item.FinalizeWithOutcome(types.QueueOutcomeRejectedOther, fmt.Errorf("%w: %w", types.ErrRejected, finalErr))
return
}

// --- Capacity Check ---
// This check is safe because it is performed by the single-writer Run goroutine.
if !sp.hasCapacity(key.Priority, req.ByteSize()) {
sp.logger.V(logutil.DEBUG).Info("Rejecting request, queue at capacity",
"flowKey", key, "reqID", req.ID(), "priorityName", band.PriorityName(), "reqByteSize", req.ByteSize())
"flowKey", key, "reqID", req.ID(), "priority", key.Priority, "reqByteSize", req.ByteSize())
item.FinalizeWithOutcome(types.QueueOutcomeRejectedCapacity, fmt.Errorf("%w: %w",
types.ErrRejected, types.ErrQueueAtCapacity))
return
Expand All @@ -253,12 +245,12 @@ func (sp *ShardProcessor) enqueue(item *FlowItem) {
if err := managedQ.Add(item); err != nil {
finalErr := fmt.Errorf("failed to add item to queue for flow key %s: %w", key, err)
sp.logger.Error(finalErr, "Rejecting item post-admission.",
"flowKey", key, "reqID", req.ID(), "priorityName", band.PriorityName())
"flowKey", key, "reqID", req.ID(), "priority", key.Priority)
item.FinalizeWithOutcome(types.QueueOutcomeRejectedOther, fmt.Errorf("%w: %w", types.ErrRejected, finalErr))
return
}
sp.logger.V(logutil.TRACE).Info("Item enqueued.",
"flowKey", key, "reqID", req.ID(), "priorityName", band.PriorityName())
"flowKey", key, "reqID", req.ID(), "priority", key.Priority)
}

// hasCapacity checks if the shard and the specific priority band have enough capacity.
Expand Down Expand Up @@ -301,7 +293,7 @@ func (sp *ShardProcessor) dispatchCycle(ctx context.Context) bool {
item, err := sp.selectItem(ctx, originalBand)
if err != nil {
sp.logger.Error(err, "Failed to select item, skipping priority band for this cycle",
"priority", priority, "priorityName", originalBand.PriorityName())
"priority", priority)
continue // Continue to the next band to maximize work conservation.
}
if item == nil {
Expand All @@ -313,7 +305,7 @@ func (sp *ShardProcessor) dispatchCycle(ctx context.Context) bool {
candidates := sp.podLocator.Locate(ctx, req.GetMetadata())
if sp.saturationDetector.IsSaturated(ctx, candidates) {
sp.logger.V(logutil.DEBUG).Info("Policy's chosen item is saturated; enforcing HoL blocking.",
"flowKey", req.FlowKey(), "reqID", req.ID(), "priorityName", originalBand.PriorityName())
"flowKey", req.FlowKey(), "reqID", req.ID(), "priority", priority)
// Stop the dispatch cycle entirely to respect strict policy decision and prevent priority inversion where
// lower-priority work might exacerbate the saturation affecting high-priority work.
return false
Expand All @@ -322,7 +314,7 @@ func (sp *ShardProcessor) dispatchCycle(ctx context.Context) bool {
// --- Dispatch ---
if err := sp.dispatchItem(item); err != nil {
sp.logger.Error(err, "Failed to dispatch item, skipping priority band for this cycle",
"flowKey", req.FlowKey(), "reqID", req.ID(), "priorityName", originalBand.PriorityName())
"flowKey", req.FlowKey(), "reqID", req.ID(), "priority", priority)
continue // Continue to the next band to maximize work conservation.
}
return true
Expand Down Expand Up @@ -483,7 +475,7 @@ func (sp *ShardProcessor) processAllQueuesConcurrently(
for _, priority := range sp.shard.AllOrderedPriorityLevels() {
band, err := sp.shard.PriorityBandAccessor(priority)
if err != nil {
logger.Error(err, "Failed to get PriorityBandAccessor", "priority", priority)
logger.Error(err, "Failed to get PriorityBandAccessor, skipping band", "priority", priority)
continue
}
band.IterateQueues(func(queue framework.FlowQueueAccessor) bool {
Expand Down
14 changes: 1 addition & 13 deletions pkg/epp/flowcontrol/controller/internal/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,19 +592,7 @@ func TestShardProcessor(t *testing.T) {
assert.ErrorIs(t, item.FinalState().Err, testErr, "The underlying error should be preserved")
},
},
{
name: "should reject item on registry priority band lookup failure",
setupHarness: func(h *testHarness) {
h.addQueue(testFlow)
h.PriorityBandAccessorFunc = func(int) (framework.PriorityBandAccessor, error) { return nil, testErr }
},
assert: func(t *testing.T, h *testHarness, item *FlowItem) {
assert.Equal(t, types.QueueOutcomeRejectedOther, item.FinalState().Outcome,
"Outcome should be RejectedOther")
require.Error(t, item.FinalState().Err, "An error should be returned")
assert.ErrorIs(t, item.FinalState().Err, testErr, "The underlying error should be preserved")
},
},

{
name: "should reject item on queue add failure",
setupHarness: func(h *testHarness) {
Expand Down
3 changes: 0 additions & 3 deletions pkg/epp/flowcontrol/framework/accessors.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,6 @@ type PriorityBandAccessor interface {
// Priority returns the numerical priority level of this group.
Priority() int

// PriorityName returns the human-readable name of this priority band.
PriorityName() string

// FlowKeys returns the list of identities for every flow currently active within this group.
// The caller can use the ID field from each key to look up a specific queue via the Queue(id) method.
// The order of keys is not guaranteed unless specified by the implementation.
Expand Down
6 changes: 2 additions & 4 deletions pkg/epp/flowcontrol/framework/mocks/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,14 @@ var _ framework.FlowQueueAccessor = &MockFlowQueueAccessor{}
// This avoids collision with the interface method of the same name.
type MockPriorityBandAccessor struct {
PriorityV int
PriorityNameV string
PolicyStateV any
FlowKeysFunc func() []types.FlowKey
QueueFunc func(flowID string) framework.FlowQueueAccessor
IterateQueuesFunc func(callback func(flow framework.FlowQueueAccessor) (keepIterating bool))
}

func (m *MockPriorityBandAccessor) Priority() int { return m.PriorityV }
func (m *MockPriorityBandAccessor) PriorityName() string { return m.PriorityNameV }
func (m *MockPriorityBandAccessor) PolicyState() any { return m.PolicyStateV }
func (m *MockPriorityBandAccessor) Priority() int { return m.PriorityV }
func (m *MockPriorityBandAccessor) PolicyState() any { return m.PolicyStateV }

func (m *MockPriorityBandAccessor) FlowKeys() []types.FlowKey {
if m.FlowKeysFunc != nil {
Expand Down
28 changes: 4 additions & 24 deletions pkg/epp/flowcontrol/registry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"fmt"
"time"

"k8s.io/apimachinery/pkg/util/sets"

"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/contracts"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework"
"sigs.k8s.io/gateway-api-inference-extension/pkg/epp/flowcontrol/framework/plugins/interflow"
Expand Down Expand Up @@ -158,11 +156,6 @@ type PriorityBandConfig struct {
// Required.
Priority int

// PriorityName is a human-readable name for this priority band (e.g., "Critical", "Standard").
// It must be unique across all priority bands in the configuration.
// Required.
PriorityName string

// IntraFlowDispatchPolicy specifies the default name of the policy used to select a request from within a single
// flow's queue in this band.
// Optional: Defaults to defaultIntraFlowDispatchPolicy ("FCFS").
Expand Down Expand Up @@ -348,11 +341,11 @@ func NewConfig(handle fwkplugin.Handle, opts ...ConfigOption) (*Config, error) {
builder := &configBuilder{
config: &Config{
MaxBytes: 0, // no limit enforced
PriorityBands: make(map[int]*PriorityBandConfig),
InitialShardCount: defaultInitialShardCount,
FlowGCTimeout: defaultFlowGCTimeout,
PriorityBandGCTimeout: defaultPriorityBandGCTimeout,
EventChannelBufferSize: defaultEventChannelBufferSize,
PriorityBands: make(map[int]*PriorityBandConfig),
},
checker: &runtimeCapabilityChecker{},
}
Expand All @@ -366,7 +359,7 @@ func NewConfig(handle fwkplugin.Handle, opts ...ConfigOption) (*Config, error) {
// Initialize DefaultPriorityBand if missing.
// This ensures we always have a template for dynamic provisioning.
if builder.config.DefaultPriorityBand == nil {
template, err := NewPriorityBandConfig(handle, 0, "Dynamic-Default")
template, err := NewPriorityBandConfig(handle, 0)
if err != nil {
return nil, fmt.Errorf("failed to create default priority band: %w", err)
}
Expand Down Expand Up @@ -395,12 +388,10 @@ func NewConfig(handle fwkplugin.Handle, opts ...ConfigOption) (*Config, error) {
func NewPriorityBandConfig(
handle fwkplugin.Handle,
priority int,
name string,
opts ...PriorityBandConfigOption,
) (*PriorityBandConfig, error) {
pb := &PriorityBandConfig{
Priority: priority,
PriorityName: name,
Priority: priority,
}

if err := pb.applyDefaults(handle); err != nil {
Expand Down Expand Up @@ -440,9 +431,6 @@ func (p *PriorityBandConfig) applyDefaults(handle fwkplugin.Handle) error {

// validate checks the integrity of a single band's configuration.
func (p *PriorityBandConfig) validate(checker capabilityChecker) error {
if p.PriorityName == "" {
return fmt.Errorf("PriorityName is required for priority band %d", p.Priority)
}
if p.IntraFlowDispatchPolicy == "" {
return fmt.Errorf("IntraFlowDispatchPolicy required for priority band %d", p.Priority)
}
Expand All @@ -454,8 +442,7 @@ func (p *PriorityBandConfig) validate(checker capabilityChecker) error {
}
if checker != nil {
if err := checker.CheckCompatibility(p.IntraFlowDispatchPolicy, p.Queue); err != nil {
return fmt.Errorf("priority band %d (%s) configuration error: %w",
p.Priority, p.PriorityName, err)
return fmt.Errorf("priority band %d configuration error: %w", p.Priority, err)
}
}
return nil
Expand Down Expand Up @@ -488,13 +475,7 @@ func (c *Config) validate(checker capabilityChecker) error {
}

// Validate statically configured bands.
names := sets.New[string]()
for _, band := range c.PriorityBands {
if names.Has(band.PriorityName) {
return fmt.Errorf("duplicate priority name %q found", band.PriorityName)
}
names.Insert(band.PriorityName)

if err := band.validate(checker); err != nil {
return err
}
Expand Down Expand Up @@ -522,7 +503,6 @@ func (c *Config) partition(shardIndex, totalShards int) *ShardConfig {
for _, template := range c.PriorityBands {
shardBand := &PriorityBandConfig{
Priority: template.Priority,
PriorityName: template.PriorityName,
IntraFlowDispatchPolicy: template.IntraFlowDispatchPolicy,
FairnessPolicy: template.FairnessPolicy,
Queue: template.Queue,
Expand Down
Loading