Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/controller/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ func SetupControllers(mgr ctrl.Manager, qManager *qcache.Manager, cc *schdcache.
if err := acRec.SetupWithManager(mgr, cfg); err != nil {
return "AdmissionCheck", err
}
wpcRec := NewWorkloadPriorityClassReconciler(mgr.GetClient())
if err := wpcRec.SetupWithManager(mgr, cfg); err != nil {
return "WorkloadPriorityClass", err
}
qRec := NewLocalQueueReconciler(mgr.GetClient(), qManager, cc,
WithAdmissionFairSharingConfig(cfg.AdmissionFairSharing))
if err := qRec.SetupWithManager(mgr, cfg); err != nil {
Expand Down
16 changes: 16 additions & 0 deletions pkg/controller/core/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
WorkloadRuntimeClassKey = "spec.runtimeClass"
OwnerReferenceUID = "metadata.ownerReferences.uid"
WorkloadAdmissionCheckKey = "status.admissionChecks"
WorkloadPriorityClassKey = "spec.priorityClassRef"

// OwnerReferenceGroupKindFmt defines the format string used to construct a field path
// for indexing or matching against a specific owner Group and Kind in a Kubernetes object's metadata.
Expand Down Expand Up @@ -177,6 +178,18 @@ func IndexWorkloadAdmissionCheck(obj client.Object) []string {
})
}

func IndexWorkloadPriorityClass(obj client.Object) []string {
wl, ok := obj.(*kueue.Workload)
if !ok || wl.Spec.PriorityClassRef == nil {
return nil
}
if wl.Spec.PriorityClassRef.Kind != kueue.WorkloadPriorityClassKind ||
wl.Spec.PriorityClassRef.Group != kueue.WorkloadPriorityClassGroup {
return nil
}
return []string{wl.Spec.PriorityClassRef.Name}
}

// Setup sets the index with the given fields for core apis.
func Setup(ctx context.Context, indexer client.FieldIndexer) error {
if err := indexer.IndexField(ctx, &kueue.Workload{}, WorkloadQueueKey, IndexWorkloadQueue); err != nil {
Expand All @@ -194,6 +207,9 @@ func Setup(ctx context.Context, indexer client.FieldIndexer) error {
if err := indexer.IndexField(ctx, &kueue.Workload{}, WorkloadAdmissionCheckKey, IndexWorkloadAdmissionCheck); err != nil {
return fmt.Errorf("setting index on admissionCheck for Workload: %w", err)
}
if err := indexer.IndexField(ctx, &kueue.Workload{}, WorkloadPriorityClassKey, IndexWorkloadPriorityClass); err != nil {
return fmt.Errorf("setting index on priorityClass for Workload: %w", err)
}
if err := indexer.IndexField(ctx, &kueue.LocalQueue{}, QueueClusterQueueKey, IndexQueueClusterQueue); err != nil {
return fmt.Errorf("setting index on clusterQueue for localQueue: %w", err)
}
Expand Down
19 changes: 14 additions & 5 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"sigs.k8s.io/kueue/pkg/util/admissioncheck"
afs "sigs.k8s.io/kueue/pkg/util/admissionfairsharing"
clientutil "sigs.k8s.io/kueue/pkg/util/client"
"sigs.k8s.io/kueue/pkg/util/priority"
qutil "sigs.k8s.io/kueue/pkg/util/queue"
"sigs.k8s.io/kueue/pkg/util/resource"
utilslices "sigs.k8s.io/kueue/pkg/util/slices"
Expand Down Expand Up @@ -947,7 +948,7 @@ func (r *WorkloadReconciler) Update(e event.TypedUpdateEvent[*kueue.Workload]) b
})
case prevStatus == workload.StatusAdmitted && status == workload.StatusAdmitted && !equality.Semantic.DeepEqual(e.ObjectOld.Status.ReclaimablePods, e.ObjectNew.Status.ReclaimablePods),
features.Enabled(features.ElasticJobsViaWorkloadSlices) && workloadslicing.ScaledDown(workload.ExtractPodSetCountsFromWorkload(e.ObjectOld), workload.ExtractPodSetCountsFromWorkload(e.ObjectNew)),
workloadPriorityClassChanged(e.ObjectOld, e.ObjectNew):
workloadPriorityChanged(e.ObjectOld, e.ObjectNew):
// trigger the move of associated inadmissibleWorkloads, if there are any.
r.queues.QueueAssociatedInadmissibleWorkloadsAfter(ctx, e.ObjectNew, func() {
// Update the workload from cache while holding the queues lock
Expand All @@ -969,10 +970,18 @@ func (r *WorkloadReconciler) Update(e event.TypedUpdateEvent[*kueue.Workload]) b
return true
}

func workloadPriorityClassChanged(old, new *kueue.Workload) bool {
return workload.IsWorkloadPriorityClass(old) && workload.IsWorkloadPriorityClass(new) &&
workload.PriorityClassName(old) != "" && workload.PriorityClassName(new) != "" &&
workload.PriorityClassName(old) != workload.PriorityClassName(new)
func workloadPriorityChanged(old, new *kueue.Workload) bool {
// Updates to Pod Priority are not supported.
if !workload.IsWorkloadPriorityClass(old) || !workload.IsWorkloadPriorityClass(new) {
return false
}
// Check if priority class reference changed.
if workload.PriorityClassName(old) != "" && workload.PriorityClassName(new) != "" &&
workload.PriorityClassName(old) != workload.PriorityClassName(new) {
return true
}
// Check if priority value changed (for WorkloadPriorityClass value updates).
return priority.Priority(old) != priority.Priority(new)
}

func (r *WorkloadReconciler) Generic(e event.TypedGenericEvent[*kueue.Workload]) bool {
Expand Down
159 changes: 159 additions & 0 deletions pkg/controller/core/workloadpriorityclass_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
Copyright The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package core

import (
"context"
"errors"

"github.com/go-logr/logr"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"

config "sigs.k8s.io/kueue/apis/config/v1beta2"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta2"
"sigs.k8s.io/kueue/pkg/controller/core/indexer"
)

// WorkloadPriorityClassReconciler reconciles a WorkloadPriorityClass object
type WorkloadPriorityClassReconciler struct {
log logr.Logger
client client.Client
}

var _ reconcile.Reconciler = (*WorkloadPriorityClassReconciler)(nil)
var _ predicate.TypedPredicate[*kueue.WorkloadPriorityClass] = (*WorkloadPriorityClassReconciler)(nil)

func NewWorkloadPriorityClassReconciler(
client client.Client,
) *WorkloadPriorityClassReconciler {
return &WorkloadPriorityClassReconciler{
log: ctrl.Log.WithName("workloadpriorityclass-reconciler"),
client: client,
}
}

// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloadpriorityclasses,verbs=get;list;watch
// +kubebuilder:rbac:groups=kueue.x-k8s.io,resources=workloads,verbs=get;list;update

func (r *WorkloadPriorityClassReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
var wpc kueue.WorkloadPriorityClass
if err := r.client.Get(ctx, req.NamespacedName, &wpc); err != nil {
// we'll ignore not-found errors, since there is nothing to do.
return ctrl.Result{}, client.IgnoreNotFound(err)
}

log := ctrl.LoggerFrom(ctx).WithValues("workloadPriorityClass", klog.KObj(&wpc))
log.V(2).Info("Reconcile WorkloadPriorityClass")

// List all workloads using this WorkloadPriorityClass
var workloads kueue.WorkloadList
if err := r.client.List(ctx, &workloads,
client.MatchingFields{indexer.WorkloadPriorityClassKey: wpc.Name}); err != nil {
log.Error(err, "Failed to list workloads for WorkloadPriorityClass")
return ctrl.Result{}, err
}
if len(workloads.Items) == 0 {
log.V(2).Info("No workloads using this WorkloadPriorityClass")
return ctrl.Result{}, nil
}

var updateErrors []error

// Update each workload's priority field
for i := range workloads.Items {
wl := &workloads.Items[i]
wlLog := log.WithValues("workload", klog.KObj(wl))

// Skip if priority is already up to date
if wl.Spec.Priority != nil && *wl.Spec.Priority == wpc.Value {
wlLog.V(3).Info("Workload priority already up to date")
continue
}

wl.Spec.Priority = ptr.To(wpc.Value)

if err := r.client.Update(ctx, wl); err != nil {
if !apierrors.IsNotFound(err) {
wlLog.Error(err, "Failed to update workload priority")
updateErrors = append(updateErrors, err)
}
continue
}

wlLog.V(2).Info("Updated workload priority", "newPriority", wpc.Value)
}
return ctrl.Result{}, errors.Join(updateErrors...)
}

func (r *WorkloadPriorityClassReconciler) Create(e event.TypedCreateEvent[*kueue.WorkloadPriorityClass]) bool {
log := r.log.WithValues("workloadPriorityClass", klog.KObj(e.Object))
log.V(2).Info("WorkloadPriorityClass create event")

// Covering the case when the WorkloadPriorityClass was re-created with a different priority,
// but the Workload is still referencing it.
return true
}

func (r *WorkloadPriorityClassReconciler) Delete(e event.TypedDeleteEvent[*kueue.WorkloadPriorityClass]) bool {
return false
}

func (r *WorkloadPriorityClassReconciler) Update(e event.TypedUpdateEvent[*kueue.WorkloadPriorityClass]) bool {
log := r.log.WithValues("workloadPriorityClass", klog.KObj(e.ObjectNew))
log.V(2).Info("WorkloadPriorityClass update event")

// Only reconcile if the priority value changed
if e.ObjectOld.Value == e.ObjectNew.Value {
log.V(3).Info("Priority value unchanged, skipping reconciliation")
return false
}

log.V(2).Info("Priority value changed, triggering reconciliation", "oldValue", e.ObjectOld.Value, "newValue", e.ObjectNew.Value)
return true
}

func (r *WorkloadPriorityClassReconciler) Generic(e event.TypedGenericEvent[*kueue.WorkloadPriorityClass]) bool {
return false
}

// SetupWithManager sets up the controller with the Manager.
func (r *WorkloadPriorityClassReconciler) SetupWithManager(mgr ctrl.Manager, cfg *config.Configuration) error {
return builder.TypedControllerManagedBy[reconcile.Request](mgr).
Named("workloadpriorityclass_controller").
WatchesRawSource(source.TypedKind(
mgr.GetCache(),
&kueue.WorkloadPriorityClass{},
&handler.TypedEnqueueRequestForObject[*kueue.WorkloadPriorityClass]{},
r,
)).
WithOptions(controller.Options{
NeedLeaderElection: ptr.To(false),
MaxConcurrentReconciles: mgr.GetControllerOptions().GroupKindConcurrency[kueue.GroupVersion.WithKind("WorkloadPriorityClass").GroupKind().String()],
}).
Complete(WithLeadingManager(mgr, r, &kueue.WorkloadPriorityClass{}, cfg))
}
Loading