Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pkg/cache/queue/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,16 +815,22 @@ func (m *Manager) DeleteSecondPassWithoutLock(wl *kueue.Workload) {
// QueueSecondPassIfNeeded queues for the second pass of scheduling with exponential
// delay.
func (m *Manager) QueueSecondPassIfNeeded(ctx context.Context, w *kueue.Workload, iteration int) bool {
log := ctrl.LoggerFrom(ctx)
Copy link
Copy Markdown
Contributor

@mimowo mimowo Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add some unit test for that changes in, probably in TestQueueSecondPassIfNeeded?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I’ll add unit coverage for this case and update the PR.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log := ctrl.LoggerFrom(ctx)
log := ctrl.LoggerFrom(ctx)
wlKey := workload.Key(w)

wlKey is the same during this function.

if workload.NeedsSecondPass(w) {
iteration++
delay := m.secondPassQueue.nextDelay(iteration)
log := ctrl.LoggerFrom(ctx)
log.V(3).Info("Workload pre-queued for second pass (with backoff)", "workload", workload.Key(w), "delay", delay)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.V(3).Info("Workload pre-queued for second pass (with backoff)", "workload", workload.Key(w), "delay", delay)
log.V(3).Info("Workload pre-queued for second pass (with backoff)", "workload", wlKey, "delay", delay)

m.secondPassQueue.prequeue(w)
m.clock.AfterFunc(delay, func() {
m.queueSecondPass(ctx, w, iteration)
})
return true
} else if iteration > 0 {
// Remove the workload from the second-pass queue only after at least one
// retry iteration, to avoid canceling the initial backoff window.
// See #8357.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// See #8357.
// See https://github.com/kubernetes-sigs/kueue/issues/8357.

Could you use a link for visibility?

log.V(3).Info("Workload removed from second pass queue", "workload", workload.Key(w))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
log.V(3).Info("Workload removed from second pass queue", "workload", workload.Key(w))
log.V(3).Info("Workload removed from second pass queue", "workload", wlKey)

m.secondPassQueue.deleteByKey(workload.Key(w))
}
return false
}
Expand Down
47 changes: 33 additions & 14 deletions pkg/cache/queue/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1386,6 +1386,7 @@ func TestQueueSecondPassIfNeeded(t *testing.T) {
RequiredTopologyRequest(corev1.LabelHostname).
Request(corev1.ResourceCPU, "1").
Obj())

baseWorkloadNeedingSecondPass := baseWorkloadBuilder.Clone().
ReserveQuotaAt(
utiltestingapi.MakeAdmission("tas-main").
Expand All @@ -1395,22 +1396,26 @@ func TestQueueSecondPassIfNeeded(t *testing.T) {
DelayedTopologyRequest(kueue.DelayedTopologyRequestStatePending).
Obj(),
).
Obj(), now,
Obj(),
now,
).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "prov-check",
State: kueue.CheckStateReady,
})

baseWorkloadNotNeedingSecondPass := baseWorkloadBuilder.Clone()

cases := map[string]struct {
workloads []*kueue.Workload
deleted sets.Set[workload.Reference]
update func(*kueue.Workload)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
update func(*kueue.Workload)
updateWorkload *kueue.Workload

Such dynamic modification while UT should be avoided as much as possible.
Those anonymous functions bring us surprises, maintenance PoV in the long-running future.

Copy link
Copy Markdown
Contributor

@mimowo mimowo Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, current implementation sets status.admission=nil to make the workload not needing the second pass, but I'm not sure we have a helper for that. We can alternatively Evict the workload for example (eg. using EvictedAt), which would also be more realistic case, because workload is first evicted, and only then the admission is removed.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened the potentially related issue #8633

passTime time.Duration
wantReady sets.Set[workload.Reference]
}{
"single queued workload checked immediately": {
workloads: []*kueue.Workload{baseWorkloadNeedingSecondPass.Obj()},
workloads: []*kueue.Workload{
baseWorkloadNeedingSecondPass.Obj(),
},
},
"single queued workload checked after 1s": {
workloads: []*kueue.Workload{
Expand All @@ -1420,43 +1425,57 @@ func TestQueueSecondPassIfNeeded(t *testing.T) {
passTime: time.Second,
wantReady: sets.New(workload.Key(baseWorkloadNeedingSecondPass.Obj())),
},
"single queued workload deleted in the meanwhile": {
"workload stops needing second pass after being queued": {
workloads: []*kueue.Workload{
baseWorkloadNeedingSecondPass.DeepCopy(),
baseWorkloadNotNeedingSecondPass.DeepCopy(),
},
deleted: sets.New(workload.Key(baseWorkloadNeedingSecondPass.Obj())),
passTime: time.Second,
update: func(wl *kueue.Workload) {
wl.Status.Admission = nil
},
passTime: time.Second,
wantReady: nil,
},
"two queued workloads, one deleted in the meanwhile": {
"two queued workloads, one updated to no longer need second pass": {
workloads: []*kueue.Workload{
baseWorkloadNeedingSecondPass.Clone().Name("first").Obj(),
baseWorkloadNeedingSecondPass.Clone().Name("second").Obj(),
},
deleted: sets.New(workload.Key(baseWorkloadNeedingSecondPass.Obj())),
update: func(wl *kueue.Workload) {
wl.Status.Admission = nil
},
passTime: time.Second,
wantReady: sets.New(workload.NewReference("default", "second")),
},
}

for name, tc := range cases {
t.Run(name, func(t *testing.T) {
ctx, _ := utiltesting.ContextWithLog(t)

fakeClock := testingclock.NewFakeClock(now)
opts := []Option{
manager := NewManager(
utiltesting.NewFakeClient(),
nil,
WithClock(fakeClock),
}
manager := NewManager(utiltesting.NewFakeClient(), nil, opts...)
)

for _, wl := range tc.workloads {
manager.QueueSecondPassIfNeeded(ctx, wl, 0)
}
for _, wl := range tc.deleted.UnsortedList() {
manager.secondPassQueue.deleteByKey(wl)

if tc.update != nil {
wl := tc.workloads[0]
tc.update(wl)
manager.QueueSecondPassIfNeeded(ctx, wl, 1)
}
Comment on lines +1466 to 1470
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if tc.update != nil {
wl := tc.workloads[0]
tc.update(wl)
manager.QueueSecondPassIfNeeded(ctx, wl, 1)
}
if tc.updateWorkload != nil {
manager.QueueSecondPassIfNeeded(ctx, tc.updateWorkload, 1)
}


fakeClock.Step(tc.passTime)

gotReady := sets.New[workload.Reference]()
for _, wlInfo := range manager.secondPassQueue.takeAllReady() {
gotReady.Insert(workload.Key(wlInfo.Obj))
}

if diff := cmp.Diff(tc.wantReady, gotReady); diff != "" {
t.Errorf("Unexpected ready workloads returned (-want,+got):\n%s", diff)
}
Expand Down