Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion apis/kueue/v1beta2/clusterqueue_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,14 @@ type ClusterQueueSpec struct {
// - BestEffortFIFO: workloads are ordered by creation time,
// however older workloads that can't be admitted will not block
// admitting newer workloads that fit existing quota.
// - StrictFIFOPerFlavor: workloads are ordered strictly by creation time.
// Older workloads that can't be admitted will block admitting newer workloads
// only if they compete for the same resource flavor(s). Workloads that can be
// satisfied using different flavors may be admitted out of order.
//
// +optional
// +kubebuilder:default=BestEffortFIFO
// +kubebuilder:validation:Enum=StrictFIFO;BestEffortFIFO
// +kubebuilder:validation:Enum=StrictFIFO;BestEffortFIFO;StrictFIFOPerFlavor
QueueingStrategy QueueingStrategy `json:"queueingStrategy,omitempty"`

// namespaceSelector defines which namespaces are allowed to submit workloads to
Expand Down Expand Up @@ -180,6 +184,12 @@ const (
// however older workloads that can't be admitted will not block
// admitting newer workloads that fit existing quota.
BestEffortFIFO QueueingStrategy = "BestEffortFIFO"

// StrictFIFOPerFlavor means that workloads of the same priority are ordered strictly by creation time.
// Older workloads that can't be admitted will block admitting newer workloads
// only if they compete for the same resource flavor(s).
// Workloads that can be satisfied using different flavors may be admitted out of order.
StrictFIFOPerFlavor QueueingStrategy = "StrictFIFOPerFlavor"
)

// +kubebuilder:validation:XValidation:rule="self.flavors.all(x, size(x.resources) == size(self.coveredResources))", message="flavors must have the same number of resources as the coveredResources"
Expand Down
5 changes: 5 additions & 0 deletions charts/kueue/templates/crd/kueue.x-k8s.io_clusterqueues.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1151,9 +1151,14 @@ spec:
- BestEffortFIFO: workloads are ordered by creation time,
however older workloads that can't be admitted will not block
admitting newer workloads that fit existing quota.
- StrictFIFOPerFlavor: workloads are ordered strictly by creation time.
Older workloads that can't be admitted will block admitting newer workloads
only if they compete for the same resource flavor(s). Workloads that can be
satisfied using different flavors may be admitted out of order.
enum:
- StrictFIFO
- BestEffortFIFO
- StrictFIFOPerFlavor
type: string
resourceGroups:
description: |-
Expand Down
5 changes: 5 additions & 0 deletions config/components/crd/bases/kueue.x-k8s.io_clusterqueues.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1147,9 +1147,14 @@ spec:
- BestEffortFIFO: workloads are ordered by creation time,
however older workloads that can't be admitted will not block
admitting newer workloads that fit existing quota.
- StrictFIFOPerFlavor: workloads are ordered strictly by creation time.
Older workloads that can't be admitted will block admitting newer workloads
only if they compete for the same resource flavor(s). Workloads that can be
satisfied using different flavors may be admitted out of order.
enum:
- StrictFIFO
- BestEffortFIFO
- StrictFIFOPerFlavor
type: string
resourceGroups:
description: |-
Expand Down
134 changes: 124 additions & 10 deletions pkg/cache/queue/cluster_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ type ClusterQueue struct {
localQueuesInClusterQueue map[utilqueue.LocalQueueReference]bool

sw *stickyWorkload

// inadmissibleBlockedFlavors tracks which resource flavors each inadmissible workload
// attempted to use. This is used by StrictFIFOPerFlavor to determine if a workload
// should be blocked by older inadmissible workloads.
// Key: workload reference, Value: set of flavor references
inadmissibleBlockedFlavors map[workload.Reference]map[kueue.ResourceFlavorReference]struct{}
}

func (c *ClusterQueue) GetName() kueue.ClusterQueueReference {
Expand Down Expand Up @@ -187,15 +193,16 @@ func newClusterQueueImpl(ctx context.Context, client client.Client, wo workload.
sw := stickyWorkload{}
lessFunc := queueOrderingFunc(ctx, client, wo, options.fsResWeights, options.enableAdmissionFs, options.afsEntryPenalties, options.afsConsumedResources, &sw)
return &ClusterQueue{
heap: *heap.New(workloadKey, lessFunc),
inadmissibleWorkloads: make(inadmissibleWorkloads),
queueInadmissibleCycle: -1,
lessFunc: lessFunc,
rwm: sync.RWMutex{},
clock: clock,
afsEntryPenalties: options.afsEntryPenalties,
localQueuesInClusterQueue: make(map[utilqueue.LocalQueueReference]bool),
sw: &sw,
heap: *heap.New(workloadKey, lessFunc),
inadmissibleWorkloads: make(inadmissibleWorkloads),
queueInadmissibleCycle: -1,
lessFunc: lessFunc,
rwm: sync.RWMutex{},
clock: clock,
afsEntryPenalties: options.afsEntryPenalties,
localQueuesInClusterQueue: make(map[utilqueue.LocalQueueReference]bool),
sw: &sw,
inadmissibleBlockedFlavors: make(map[workload.Reference]map[kueue.ResourceFlavorReference]struct{}),
}
}

Expand Down Expand Up @@ -297,6 +304,8 @@ func (c *ClusterQueue) delete(log logr.Logger, w *kueue.Workload) {
c.inadmissibleWorkloads.delete(key)
c.heap.Delete(key)
c.forgetInflightByKey(key)
// Clean up blocked flavors tracking for StrictFIFOPerFlavor
delete(c.inadmissibleBlockedFlavors, key)
if c.sw.matches(key) {
if logV := log.V(5); logV.Enabled() {
logV.Info("Clearing sticky workload due to deletion", "clusterQueue", c.name, "workload", key)
Expand Down Expand Up @@ -334,6 +343,8 @@ func (c *ClusterQueue) requeueIfNotPresent(wInfo *workload.Info, immediate bool)
if inadmissibleWl != nil {
wInfo = inadmissibleWl
c.inadmissibleWorkloads.delete(key)
// Clean up blocked flavors tracking when moving to heap
delete(c.inadmissibleBlockedFlavors, key)
}
return c.heap.PushIfNotPresent(wInfo)
}
Expand Down Expand Up @@ -372,10 +383,26 @@ func (c *ClusterQueue) QueueInadmissibleWorkloads(ctx context.Context, client cl
c.inadmissibleWorkloads.forEach(func(key workload.Reference, wInfo *workload.Info) bool {
ns := corev1.Namespace{}
err := client.Get(ctx, types.NamespacedName{Name: wInfo.Obj.Namespace}, &ns)
if err != nil || !c.namespaceSelector.Matches(labels.Set(ns.Labels)) || !c.backoffWaitingTimeExpired(wInfo) {
shouldStayInadmissible := err != nil || !c.namespaceSelector.Matches(labels.Set(ns.Labels)) || !c.backoffWaitingTimeExpired(wInfo)

// For StrictFIFOPerFlavor, also check if blocked by other inadmissible workloads
if !shouldStayInadmissible && c.queueingStrategy == kueue.StrictFIFOPerFlavor {
if c.isBlockedByInadmissibleWorkloads(wInfo) {
shouldStayInadmissible = true
if logV := ctrl.LoggerFrom(ctx).V(3); logV.Enabled() {
logV.Info("Workload stays inadmissible due to flavor blocking",
"workload", key,
"attemptedFlavors", wInfo.AttemptedFlavors)
}
}
}

if shouldStayInadmissible {
inadmissibleWorkloads[key] = wInfo
} else {
moved = c.heap.PushIfNotPresent(wInfo) || moved
// Clean up blocked flavors tracking when moving to heap
delete(c.inadmissibleBlockedFlavors, key)
}
return true
})
Expand Down Expand Up @@ -562,6 +589,55 @@ func (c *ClusterQueue) Active() bool {
return c.active
}

// extractAttemptedFlavors extracts the set of resource flavors that a workload
// attempted to use. Returns nil if no flavor information is available.
func extractAttemptedFlavors(wInfo *workload.Info) map[kueue.ResourceFlavorReference]struct{} {
return wInfo.AttemptedFlavors
}

// hasFlavorConflict checks if two workloads have overlapping flavor requirements.
// Returns true if there is any flavor that both workloads could potentially use.
func hasFlavorConflict(flavors1, flavors2 map[kueue.ResourceFlavorReference]struct{}) bool {
if len(flavors1) == 0 || len(flavors2) == 0 {
return false
}

// Check for intersection
for flavor := range flavors1 {
if _, found := flavors2[flavor]; found {
return true
}
}
return false
}

// isBlockedByInadmissibleWorkloads checks if a workload should be blocked by older
// inadmissible workloads based on flavor conflicts. This is only used for StrictFIFOPerFlavor.
func (c *ClusterQueue) isBlockedByInadmissibleWorkloads(wInfo *workload.Info) bool {
wFlavors := extractAttemptedFlavors(wInfo)
if len(wFlavors) == 0 {
// No flavor information available, conservatively allow it to proceed
return false
}

currentKey := workload.Key(wInfo.Obj)

// Check all inadmissible workloads (which are older due to FIFO ordering)
// IMPORTANT: Skip checking against self to avoid self-blocking
for inadmissibleKey := range c.inadmissibleWorkloads {
if inadmissibleKey == currentKey {
continue // Don't check against self
}

if blockedFlavors, found := c.inadmissibleBlockedFlavors[inadmissibleKey]; found {
if hasFlavorConflict(wFlavors, blockedFlavors) {
return true
}
}
}
return false
}

// RequeueIfNotPresent inserts a workload that was not
// admitted back into the ClusterQueue. If the boolean is true,
// the workloads should be put back in the queue immediately,
Expand All @@ -586,6 +662,44 @@ func (c *ClusterQueue) RequeueIfNotPresent(ctx context.Context, wInfo *workload.
if c.queueingStrategy == kueue.StrictFIFO {
return c.requeueIfNotPresent(wInfo, reason != RequeueReasonNamespaceMismatch)
}

if c.queueingStrategy == kueue.StrictFIFOPerFlavor {
log := ctrl.LoggerFrom(ctx)
// For StrictFIFOPerFlavor, workloads go to inadmissible list (like StrictFIFO)
// to potentially block future workloads with the same flavor.
// They only go to heap immediately if namespace mismatch.
shouldRequeueToHeap := reason == RequeueReasonNamespaceMismatch

// Check if this workload would be blocked by older inadmissible workloads
isBlocked := !shouldRequeueToHeap && c.isBlockedByInadmissibleWorkloads(wInfo)

if logV := log.V(3); logV.Enabled() {
flavors := extractAttemptedFlavors(wInfo)
logV.Info("StrictFIFOPerFlavor requeue decision",
"workload", workload.Key(wInfo.Obj),
"reason", reason,
"isBlocked", isBlocked,
"shouldRequeueToHeap", shouldRequeueToHeap,
"attemptedFlavors", flavors,
"inadmissibleCount", len(c.inadmissibleWorkloads))
}

inserted := c.requeueIfNotPresent(wInfo, shouldRequeueToHeap)

// Track flavor usage for workloads in inadmissible list
if inserted && !shouldRequeueToHeap {
wKey := workload.Key(wInfo.Obj)
if flavors := extractAttemptedFlavors(wInfo); len(flavors) > 0 {
c.inadmissibleBlockedFlavors[wKey] = flavors
if logV := log.V(3); logV.Enabled() {
logV.Info("Tracking blocked flavors in inadmissible", "workload", wKey, "flavors", flavors, "isBlocked", isBlocked)
}
}
}
return inserted
}

// BestEffortFIFO
return c.requeueIfNotPresent(
wInfo,
reason == RequeueReasonFailedAfterNomination ||
Expand Down
Loading