Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions apis/config/v1beta2/configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,12 @@ type MultiKueue struct {
// ClusterProfile defines configuration for using the ClusterProfile API.
// +optional
ClusterProfile *ClusterProfile `json:"clusterProfile,omitempty"`

// RequireAllAdmissionChecksReady determines whether all admission checks must be
// in Ready state for a workload to be considered when selecting which remote cluster
// reserved quota first. Defaults to false.
// +optional
RequireAllAdmissionChecksReady *bool `json:"requireAllAdmissionChecksReady,omitempty"`
}

// MultiKueueExternalFramework defines a framework that is not built-in.
Expand Down
1 change: 1 addition & 0 deletions cmd/kueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,7 @@ func setupControllers(ctx context.Context, mgr ctrl.Manager, cCache *schdcache.C
multikueue.WithDispatcherName(ptr.Deref(cfg.MultiKueue.DispatcherName, configapi.MultiKueueDispatcherModeAllAtOnce)),
multikueue.WithClusterProfiles(cfg.MultiKueue.ClusterProfile),
multikueue.WithRoleTracker(roleTracker),
multikueue.WithRequireAllAdmissionChecksReady(ptr.Deref(cfg.MultiKueue.RequireAllAdmissionChecksReady, false)),
); err != nil {
return fmt.Errorf("could not setup MultiKueue controller: %w", err)
}
Expand Down
40 changes: 25 additions & 15 deletions pkg/controller/admissionchecks/multikueue/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,15 @@ const (
)

type SetupOptions struct {
gcInterval time.Duration
origin string
workerLostTimeout time.Duration
eventsBatchPeriod time.Duration
adapters map[string]jobframework.MultiKueueAdapter
dispatcherName string
clusterProfileConfig *configapi.ClusterProfile
roleTracker *roletracker.RoleTracker
gcInterval time.Duration
origin string
workerLostTimeout time.Duration
eventsBatchPeriod time.Duration
adapters map[string]jobframework.MultiKueueAdapter
dispatcherName string
clusterProfileConfig *configapi.ClusterProfile
roleTracker *roletracker.RoleTracker
requireAllAdmissionChecksReady bool
}

type SetupOption func(o *SetupOptions)
Expand Down Expand Up @@ -108,14 +109,22 @@ func WithRoleTracker(tracker *roletracker.RoleTracker) SetupOption {
}
}

// WithRequireAllAdmissionChecksReady sets whether all admission checks must be ready
func WithRequireAllAdmissionChecksReady(required bool) SetupOption {
return func(o *SetupOptions) {
o.requireAllAdmissionChecksReady = required
}
}

func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) error {
options := &SetupOptions{
gcInterval: defaultGCInterval,
origin: defaultOrigin,
workerLostTimeout: defaultWorkerLostTimeout,
eventsBatchPeriod: constants.UpdatesBatchPeriod,
adapters: make(map[string]jobframework.MultiKueueAdapter),
dispatcherName: configapi.MultiKueueDispatcherModeAllAtOnce,
gcInterval: defaultGCInterval,
origin: defaultOrigin,
workerLostTimeout: defaultWorkerLostTimeout,
eventsBatchPeriod: constants.UpdatesBatchPeriod,
adapters: make(map[string]jobframework.MultiKueueAdapter),
dispatcherName: configapi.MultiKueueDispatcherModeAllAtOnce,
requireAllAdmissionChecksReady: false,
}

for _, o := range opts {
Expand Down Expand Up @@ -161,6 +170,7 @@ func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) e
}

wlRec := newWlReconciler(mgr.GetClient(), helper, cRec, options.origin, mgr.GetEventRecorderFor(constants.WorkloadControllerName),
options.workerLostTimeout, options.eventsBatchPeriod, options.adapters, options.dispatcherName, options.roleTracker)
options.workerLostTimeout, options.eventsBatchPeriod, options.adapters, options.dispatcherName, options.roleTracker,
options.requireAllAdmissionChecksReady)
return wlRec.setupWithManager(mgr)
}
85 changes: 56 additions & 29 deletions pkg/controller/admissionchecks/multikueue/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,19 @@ var (
)

type wlReconciler struct {
client client.Client
helper *admissioncheck.MultiKueueStoreHelper
clusters *clustersReconciler
origin string
workerLostTimeout time.Duration
deletedWlCache *utilmaps.SyncMap[string, *kueue.Workload]
eventsBatchPeriod time.Duration
adapters map[string]jobframework.MultiKueueAdapter
recorder record.EventRecorder
clock clock.Clock
dispatcherName string
roleTracker *roletracker.RoleTracker
client client.Client
helper *admissioncheck.MultiKueueStoreHelper
clusters *clustersReconciler
origin string
workerLostTimeout time.Duration
deletedWlCache *utilmaps.SyncMap[string, *kueue.Workload]
eventsBatchPeriod time.Duration
adapters map[string]jobframework.MultiKueueAdapter
recorder record.EventRecorder
clock clock.Clock
dispatcherName string
roleTracker *roletracker.RoleTracker
requireAllAdmissionChecksReady bool
}

var _ reconcile.Reconciler = (*wlReconciler)(nil)
Expand Down Expand Up @@ -107,12 +108,12 @@ func (g *wlGroup) IsElasticWorkload() bool {

// bestMatchByCondition returns condition if there is a workload with a specified condition type,
// the string identifies the remote cluster.
func (g *wlGroup) bestMatchByCondition(conditionType string) (*metav1.Condition, string) {
func bestMatchByCondition(remotes map[string]*kueue.Workload, conditionType string) (*metav1.Condition, string) {
var (
bestMatchCond *metav1.Condition
bestMatchRemote string
)
for remote, wl := range g.remotes {
for remote, wl := range remotes {
if wl != nil {
cond := apimeta.FindStatusCondition(wl.Status.Conditions, conditionType)
if cond != nil && cond.Status == metav1.ConditionTrue && (bestMatchCond == nil || cond.LastTransitionTime.Before(&bestMatchCond.LastTransitionTime)) {
Expand All @@ -124,6 +125,26 @@ func (g *wlGroup) bestMatchByCondition(conditionType string) (*metav1.Condition,
return bestMatchCond, bestMatchRemote
}

// filterByAdmissionChecks returns a map of remote workloads that have all admission checks ready.
func (g *wlGroup) filterByAdmissionChecks() map[string]*kueue.Workload {
eligible := make(map[string]*kueue.Workload)
for remote, wl := range g.remotes {
if wl != nil {
allReady := true
for _, ac := range wl.Status.AdmissionChecks {
if ac.State != kueue.CheckStateReady {
allReady = false
break
}
}
if allReady {
eligible[remote] = wl
}
}
}
return eligible
}

func (g *wlGroup) RemoveRemoteObjects(ctx context.Context, cluster string) error {
remWl := g.remotes[cluster]
if remWl == nil {
Expand Down Expand Up @@ -331,7 +352,7 @@ func (w *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco
}

// 3. Finish the local workload when the remote workload is finished.
if remoteFinishedCond, remote := group.bestMatchByCondition(kueue.WorkloadFinished); remoteFinishedCond != nil {
if remoteFinishedCond, remote := bestMatchByCondition(group.remotes, kueue.WorkloadFinished); remoteFinishedCond != nil {
// NOTE: we can have a race condition setting the wl status here, and it being updated by the job controller,
// it should not be problematic, but the "From remote xxxx:" could be lost ....

Expand All @@ -350,7 +371,7 @@ func (w *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco
}

// 4. Handle workload evicted on manager cluster
remoteEvictCond, evictedRemote := group.bestMatchByCondition(kueue.WorkloadEvicted)
remoteEvictCond, evictedRemote := bestMatchByCondition(group.remotes, kueue.WorkloadEvicted)
if remoteEvictCond != nil && remoteEvictCond.Reason == workload.ReasonWithCause(kueue.WorkloadDeactivated, kueue.WorkloadEvictedOnManagerCluster) {
remoteCl := group.remoteClients[evictedRemote].client
remoteWl := group.remotes[evictedRemote]
Expand Down Expand Up @@ -390,7 +411,11 @@ func (w *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco
}

// 6. Get the first reserving
if remoteQuotaReservedCond, reservingRemote := group.bestMatchByCondition(kueue.WorkloadQuotaReserved); remoteQuotaReservedCond != nil {
eligibleRemotes := group.remotes
if w.requireAllAdmissionChecksReady {
eligibleRemotes = group.filterByAdmissionChecks()
}
if remoteQuotaReservedCond, reservingRemote := bestMatchByCondition(eligibleRemotes, kueue.WorkloadQuotaReserved); remoteQuotaReservedCond != nil {
// remove the non-reserving worker workloads
for rem, remWl := range group.remotes {
if remWl != nil && rem != reservingRemote {
Expand Down Expand Up @@ -522,21 +547,23 @@ func (w *wlReconciler) Generic(_ event.GenericEvent) bool {
func newWlReconciler(c client.Client, helper *admissioncheck.MultiKueueStoreHelper, cRec *clustersReconciler, origin string,
recorder record.EventRecorder, workerLostTimeout, eventsBatchPeriod time.Duration,
adapters map[string]jobframework.MultiKueueAdapter, dispatcherName string, roleTracker *roletracker.RoleTracker,
requireAllAdmissionChecksReady bool,
options ...Option,
) *wlReconciler {
r := &wlReconciler{
client: c,
helper: helper,
clusters: cRec,
origin: origin,
workerLostTimeout: workerLostTimeout,
deletedWlCache: utilmaps.NewSyncMap[string, *kueue.Workload](0),
eventsBatchPeriod: eventsBatchPeriod,
adapters: adapters,
recorder: recorder,
clock: realClock,
dispatcherName: dispatcherName,
roleTracker: roleTracker,
client: c,
helper: helper,
clusters: cRec,
origin: origin,
workerLostTimeout: workerLostTimeout,
deletedWlCache: utilmaps.NewSyncMap[string, *kueue.Workload](0),
eventsBatchPeriod: eventsBatchPeriod,
adapters: adapters,
recorder: recorder,
clock: realClock,
dispatcherName: dispatcherName,
roleTracker: roleTracker,
requireAllAdmissionChecksReady: requireAllAdmissionChecksReady,
}
for _, option := range options {
option(r)
Expand Down
Loading