From ee54846191692b722833ef625d7706f14b21df63 Mon Sep 17 00:00:00 2001 From: acmore Date: Tue, 14 Apr 2026 07:34:07 +0800 Subject: [PATCH 1/3] feat(exec): colorize and align pod name prefixes in multi-pod output Use bracketed prefixes [pod-name] instead of pod-name: for clearer visual separation. Pad all prefixes to the same width so output columns align. When stdout is a terminal, each pod gets a distinct ANSI color from a rotating palette for easy scanning. Made-with: Cursor --- internal/cli/pdsh.go | 20 +++++++++----- internal/cli/pdsh_fanout_test.go | 14 +++++----- internal/cli/pdsh_test.go | 8 +++--- internal/cli/pdsh_writer.go | 45 +++++++++++++++++++++++++++++-- internal/cli/pdsh_writer_test.go | 46 +++++++++++++++++++++++++++----- scripts/e2e_kind_pytorchjob.sh | 4 +-- 6 files changed, 109 insertions(+), 28 deletions(-) diff --git a/internal/cli/pdsh.go b/internal/cli/pdsh.go index c03b980..8c57673 100644 --- a/internal/cli/pdsh.go +++ b/internal/cli/pdsh.go @@ -215,6 +215,8 @@ func runMultiExec(ctx context.Context, client connect.ExecClient, namespace stri podNames[i] = p.Name } shortNames := shortPodNames(podNames) + colorEnabled := isInteractiveWriter(stdout) + displayPrefixes := formatPodPrefixes(shortNames, colorEnabled) var writeMu sync.Mutex noPrefixOut := &lockedWriter{w: stdout} @@ -225,7 +227,7 @@ func runMultiExec(ctx context.Context, client connect.ExecClient, namespace stri var wg sync.WaitGroup for i, pod := range pods { wg.Add(1) - go func(pod kube.PodSummary, shortName string) { + go func(pod kube.PodSummary, shortName, displayPrefix string) { defer wg.Done() sem <- struct{}{} defer func() { <-sem }() @@ -242,8 +244,8 @@ func runMultiExec(ctx context.Context, client connect.ExecClient, namespace stri podStdout = noPrefixOut podStderr = noPrefixErr } else { - pw := newPrefixedWriter(shortName, stdout, &writeMu) - pe := newPrefixedWriter(shortName, stderr, &writeMu) + pw := newPrefixedWriter(displayPrefix, stdout, &writeMu) + pe := newPrefixedWriter(displayPrefix, stderr, &writeMu) defer pw.Flush() defer pe.Flush() podStdout = pw @@ -264,7 +266,7 @@ func runMultiExec(ctx context.Context, client connect.ExecClient, namespace stri err := connect.RunOnContainer(execCtx, client, namespace, pod.Name, container, command, false, nil, podStdout, podStderr) results <- podExecResult{pod: pod.Name, err: err} - }(pod, shortNames[i]) + }(pod, shortNames[i], displayPrefixes[i]) } go func() { @@ -305,6 +307,8 @@ func runDetachExec(ctx context.Context, client connect.ExecClient, namespace str podNames[i] = p.Name } shortNames := shortPodNames(podNames) + colorEnabled := isInteractiveWriter(out) + displayPrefixes := formatPodPrefixes(shortNames, colorEnabled) command := detachCommand(cmdStr) results := make(chan podExecResult, len(pods)) @@ -327,19 +331,21 @@ func runDetachExec(ctx context.Context, client connect.ExecClient, namespace str close(results) }() + displayMap := make(map[string]string, len(podNames)) nameMap := make(map[string]string, len(podNames)) for i, n := range podNames { + displayMap[n] = displayPrefixes[i] nameMap[n] = shortNames[i] } var failCount int for r := range results { - short := nameMap[r.pod] + prefix := displayMap[r.pod] if r.err != nil { - fmt.Fprintf(out, "%s: error: %v\n", short, r.err) + fmt.Fprintf(out, "%s error: %v\n", prefix, r.err) failCount++ } else { - fmt.Fprintf(out, "%s: detached\n", short) + fmt.Fprintf(out, "%s detached\n", prefix) } } if failCount > 0 { diff --git a/internal/cli/pdsh_fanout_test.go b/internal/cli/pdsh_fanout_test.go index e8fea75..97023ab 100644 --- a/internal/cli/pdsh_fanout_test.go +++ b/internal/cli/pdsh_fanout_test.go @@ -223,10 +223,10 @@ func TestMultiExecPipelineAllPodsWithPrefixedOutput(t *testing.T) { // Step 4: verify prefixed output output := stdout.String() - if !strings.Contains(output, "worker-0: gpu0: NVIDIA A100") { + if !strings.Contains(output, "[worker-0] gpu0: NVIDIA A100") { t.Fatalf("expected prefixed output for worker-0, got %q", output) } - if !strings.Contains(output, "worker-1: gpu1: NVIDIA H100") { + if !strings.Contains(output, "[worker-1] gpu1: NVIDIA H100") { t.Fatalf("expected prefixed output for worker-1, got %q", output) } } @@ -327,13 +327,13 @@ func TestMultiExecPipelinePartialFailureWithDetach(t *testing.T) { } output := stdout.String() - if !strings.Contains(output, "worker-0: detached") { + if !strings.Contains(output, "worker-0] detached") { t.Fatalf("expected worker-0 detached, got %q", output) } - if !strings.Contains(output, "worker-1: error: container not ready") { + if !strings.Contains(output, "worker-1] error: container not ready") { t.Fatalf("expected worker-1 error, got %q", output) } - if !strings.Contains(output, "worker-2: detached") { + if !strings.Contains(output, "worker-2] detached") { t.Fatalf("expected worker-2 detached, got %q", output) } } @@ -361,7 +361,7 @@ func TestMultiExecPipelineNoPrefixModeMultiplePods(t *testing.T) { output := stdout.String() // In no-prefix mode, output should NOT contain the pod name prefix. - if strings.Contains(output, "worker-0:") || strings.Contains(output, "worker-1:") { + if strings.Contains(output, "[worker-0]") || strings.Contains(output, "[worker-1]") { t.Fatalf("expected no prefix in output, got %q", output) } if !strings.Contains(output, "alpha") || !strings.Contains(output, "bravo") { @@ -526,7 +526,7 @@ func TestMultiExecFullPipelineRoleFilterFanoutLogDir(t *testing.T) { // Verify prefixed output present. output := stdout.String() - if !strings.Contains(output, "worker-0:") || !strings.Contains(output, "worker-1:") || !strings.Contains(output, "worker-3:") { + if !strings.Contains(output, "[worker-0]") || !strings.Contains(output, "[worker-1]") || !strings.Contains(output, "[worker-3]") { t.Fatalf("expected prefixed output for workers 0,1,3, got %q", output) } } diff --git a/internal/cli/pdsh_test.go b/internal/cli/pdsh_test.go index f98a287..b62e7ec 100644 --- a/internal/cli/pdsh_test.go +++ b/internal/cli/pdsh_test.go @@ -172,7 +172,7 @@ func TestRunDetachExec(t *testing.T) { if err != nil { t.Fatalf("unexpected error: %v", err) } - if !strings.Contains(stdout.String(), "worker-0: detached") || !strings.Contains(stdout.String(), "worker-1: detached") { + if !strings.Contains(stdout.String(), "worker-0] detached") || !strings.Contains(stdout.String(), "worker-1] detached") { t.Fatalf("expected detach confirmation, got %q", stdout.String()) } } @@ -193,10 +193,10 @@ func TestRunDetachExecWithError(t *testing.T) { if err == nil { t.Fatal("expected error for partial failure") } - if !strings.Contains(stdout.String(), "worker-0: detached") { + if !strings.Contains(stdout.String(), "worker-0] detached") { t.Fatalf("expected detach for worker-0, got %q", stdout.String()) } - if !strings.Contains(stdout.String(), "worker-1: error:") { + if !strings.Contains(stdout.String(), "worker-1] error:") { t.Fatalf("expected error for worker-1, got %q", stdout.String()) } } @@ -218,7 +218,7 @@ func TestRunMultiExecSuccess(t *testing.T) { if err != nil { t.Fatalf("unexpected error: %v", err) } - if !strings.Contains(stdout.String(), "worker-0: ok") || !strings.Contains(stdout.String(), "worker-1: ok") { + if !strings.Contains(stdout.String(), "worker-0] ok") || !strings.Contains(stdout.String(), "worker-1] ok") { t.Fatalf("expected prefixed output, got %q", stdout.String()) } } diff --git a/internal/cli/pdsh_writer.go b/internal/cli/pdsh_writer.go index ce6e0af..c3b4e20 100644 --- a/internal/cli/pdsh_writer.go +++ b/internal/cli/pdsh_writer.go @@ -57,6 +57,47 @@ func shortPodNames(names []string) []string { return out } +func maxPrefixWidth(names []string) int { + w := 0 + for _, n := range names { + if len(n) > w { + w = len(n) + } + } + return w +} + +var podPrefixColors = []string{ + "\033[36m", // cyan + "\033[33m", // yellow + "\033[32m", // green + "\033[35m", // magenta + "\033[34m", // blue + "\033[91m", // bright red + "\033[96m", // bright cyan + "\033[93m", // bright yellow +} + +func podPrefixColor(index int) string { + return podPrefixColors[index%len(podPrefixColors)] +} + +const prefixReset = "\033[0m" + +func formatPodPrefixes(shortNames []string, color bool) []string { + width := maxPrefixWidth(shortNames) + out := make([]string, len(shortNames)) + for i, name := range shortNames { + padded := fmt.Sprintf("%-*s", width, name) + if color { + out[i] = podPrefixColor(i) + "[" + padded + "]" + prefixReset + } else { + out[i] = "[" + padded + "]" + } + } + return out +} + // prefixedWriter is a thread-safe io.Writer that buffers input and emits // complete lines prefixed with the pod short name. Partial lines are held // until a newline arrives or Flush is called. @@ -81,7 +122,7 @@ func (w *prefixedWriter) Write(p []byte) (int, error) { break } w.mu.Lock() - fmt.Fprintf(w.dest, "%s: %s", w.prefix, line) + fmt.Fprintf(w.dest, "%s %s", w.prefix, line) w.mu.Unlock() } return len(p), nil @@ -93,7 +134,7 @@ func (w *prefixedWriter) Flush() { return } w.mu.Lock() - fmt.Fprintf(w.dest, "%s: %s\n", w.prefix, w.buf.String()) + fmt.Fprintf(w.dest, "%s %s\n", w.prefix, w.buf.String()) w.mu.Unlock() w.buf.Reset() } diff --git a/internal/cli/pdsh_writer_test.go b/internal/cli/pdsh_writer_test.go index 8ae90be..688e79d 100644 --- a/internal/cli/pdsh_writer_test.go +++ b/internal/cli/pdsh_writer_test.go @@ -3,6 +3,7 @@ package cli import ( "bytes" "fmt" + "strings" "sync" "testing" ) @@ -59,14 +60,47 @@ func TestShortPodNames(t *testing.T) { } } +func TestMaxPrefixWidth(t *testing.T) { + if got := maxPrefixWidth([]string{"ab", "abcde", "abc"}); got != 5 { + t.Fatalf("expected 5, got %d", got) + } + if got := maxPrefixWidth(nil); got != 0 { + t.Fatalf("expected 0 for nil, got %d", got) + } +} + +func TestFormatPodPrefixesNoColor(t *testing.T) { + names := []string{"worker-0", "worker-10", "master-0"} + got := formatPodPrefixes(names, false) + // Brackets + padded to 9 chars (len("worker-10")) + brackets = 11. + if got[0] != "[worker-0 ]" || got[1] != "[worker-10]" || got[2] != "[master-0 ]" { + t.Fatalf("unexpected prefixes: %v", got) + } +} + +func TestFormatPodPrefixesWithColor(t *testing.T) { + names := []string{"a", "bb"} + got := formatPodPrefixes(names, true) + // Each should contain ANSI color code and reset. + for i, p := range got { + if !strings.Contains(p, "\033[") || !strings.Contains(p, prefixReset) { + t.Fatalf("index %d: expected color codes in %q", i, p) + } + } + // Different pods should get different colors. + if got[0] == got[1] { + t.Fatalf("expected different colors for different pods") + } +} + func TestPrefixedWriterCompleteLine(t *testing.T) { var buf bytes.Buffer var mu sync.Mutex w := newPrefixedWriter("w0", &buf, &mu) fmt.Fprint(w, "hello world\n") w.Flush() - if got := buf.String(); got != "w0: hello world\n" { - t.Fatalf("expected %q, got %q", "w0: hello world\n", got) + if got := buf.String(); got != "w0 hello world\n" { + t.Fatalf("expected %q, got %q", "w0 hello world\n", got) } } @@ -79,8 +113,8 @@ func TestPrefixedWriterPartialLines(t *testing.T) { t.Fatalf("expected no output before newline, got %q", buf.String()) } fmt.Fprint(w, "lo\n") - if got := buf.String(); got != "w0: hello\n" { - t.Fatalf("expected %q, got %q", "w0: hello\n", got) + if got := buf.String(); got != "w0 hello\n" { + t.Fatalf("expected %q, got %q", "w0 hello\n", got) } } @@ -89,7 +123,7 @@ func TestPrefixedWriterMultipleLines(t *testing.T) { var mu sync.Mutex w := newPrefixedWriter("n1", &buf, &mu) fmt.Fprint(w, "line1\nline2\n") - if got := buf.String(); got != "n1: line1\nn1: line2\n" { + if got := buf.String(); got != "n1 line1\nn1 line2\n" { t.Fatalf("expected prefixed lines, got %q", got) } } @@ -100,7 +134,7 @@ func TestPrefixedWriterFlushPartial(t *testing.T) { w := newPrefixedWriter("p", &buf, &mu) fmt.Fprint(w, "no newline") w.Flush() - if got := buf.String(); got != "p: no newline\n" { + if got := buf.String(); got != "p no newline\n" { t.Fatalf("expected flushed partial line, got %q", got) } } diff --git a/scripts/e2e_kind_pytorchjob.sh b/scripts/e2e_kind_pytorchjob.sh index 91d7868..d42bf9e 100755 --- a/scripts/e2e_kind_pytorchjob.sh +++ b/scripts/e2e_kind_pytorchjob.sh @@ -384,8 +384,8 @@ echo "exec --exclude verified" echo "Testing exec --no-prefix suppresses pod name prefix" EXEC_NOPREFIX_OUTPUT=$("$OKDEV_BIN" --config "$CFG_PATH" --session "$SESSION_NAME" exec --no-prefix --no-tty -- sh -lc 'echo raw-output') echo "$EXEC_NOPREFIX_OUTPUT" -# In no-prefix mode, lines should be raw (no "podname: " prefix). -if echo "$EXEC_NOPREFIX_OUTPUT" | grep -qE '^[a-z0-9-]+: raw-output$'; then +# In no-prefix mode, lines should be raw (no "[podname]" prefix). +if echo "$EXEC_NOPREFIX_OUTPUT" | grep -qE '^\[.*\] raw-output$'; then echo "ERROR: expected no-prefix mode but found prefixed output" >&2 echo "$EXEC_NOPREFIX_OUTPUT" >&2 exit 1 From 01b9c7025e45c3ab0686cf36b968e40ca269474b Mon Sep 17 00:00:00 2001 From: acmore Date: Tue, 14 Apr 2026 08:18:07 +0800 Subject: [PATCH 2/3] feat(exec): error when not all pods are running, add --ready-only flag Multi-pod exec previously silently skipped pods not yet in Running phase, causing incomplete execution on LWS workloads where followers start after the leader. Now errors by default with pod status details and offers --ready-only to opt into executing on only the ready subset. Co-Authored-By: Claude Opus 4.6 --- internal/cli/pdsh.go | 42 ++++++++++++++++++++++++++++++--------- internal/cli/pdsh_test.go | 20 +++++++++++++++++++ 2 files changed, 53 insertions(+), 9 deletions(-) diff --git a/internal/cli/pdsh.go b/internal/cli/pdsh.go index 8c57673..ded1576 100644 --- a/internal/cli/pdsh.go +++ b/internal/cli/pdsh.go @@ -29,6 +29,7 @@ func newExecCmd(opts *Options) *cobra.Command { var logDir string var noPrefix bool var fanout int + var readyOnly bool cmd := &cobra.Command{ Use: "exec [session]", @@ -65,7 +66,7 @@ func newExecCmd(opts *Options) *cobra.Command { } if multiPod || detach { - return runMultiPodExec(cmd, cc, commandArgs, podNames, role, labels, exclude, container, detach, timeout, logDir, noPrefix, fanout) + return runMultiPodExec(cmd, cc, commandArgs, podNames, role, labels, exclude, container, detach, timeout, logDir, noPrefix, fanout, readyOnly) } // Single-pod mode (existing behavior) @@ -101,6 +102,7 @@ func newExecCmd(opts *Options) *cobra.Command { cmd.Flags().StringVar(&logDir, "log-dir", "", "Write per-pod logs to directory") cmd.Flags().BoolVar(&noPrefix, "no-prefix", false, "Suppress pod name prefix in output") cmd.Flags().IntVar(&fanout, "fanout", pdshDefaultFanout, "Maximum concurrent pod executions") + cmd.Flags().BoolVar(&readyOnly, "ready-only", false, "Run only on pods that are already running (skip readiness check)") return cmd } @@ -147,30 +149,52 @@ func validateMultiPodFlags(podNames []string, role string, labels []string, excl return nil } -func runMultiPodExec(cmd *cobra.Command, cc *commandContext, commandArgs []string, podNames []string, role string, labels []string, exclude []string, container string, detach bool, timeout time.Duration, logDir string, noPrefix bool, fanout int) error { +func runMultiPodExec(cmd *cobra.Command, cc *commandContext, commandArgs []string, podNames []string, role string, labels []string, exclude []string, container string, detach bool, timeout time.Duration, logDir string, noPrefix bool, fanout int, readyOnly bool) error { ctx := cmd.Context() labelSel := selectorForSessionRun(cc.sessionName) sessionPods, err := cc.kube.ListPods(ctx, cc.namespace, false, labelSel) if err != nil { return fmt.Errorf("list session pods: %w", err) } - pods := filterRunningPods(sessionPods) + // Apply user-specified filters before the readiness check so the + // running-vs-total comparison only considers targeted pods. + allPods := sessionPods switch { case len(podNames) > 0: - pods = filterPodsByName(pods, podNames) + allPods = filterPodsByName(allPods, podNames) case role != "": - pods = filterPodsByRole(pods, role) + allPods = filterPodsByRole(allPods, role) case len(labels) > 0: - pods = filterPodsByLabels(pods, labels) + allPods = filterPodsByLabels(allPods, labels) } - if len(exclude) > 0 { - pods = excludePods(pods, exclude) + allPods = excludePods(allPods, exclude) + } + + pods := filterRunningPods(allPods) + + if len(allPods) == 0 { + return fmt.Errorf("no pods match the specified filters in session %q", cc.sessionName) } if len(pods) == 0 { - return fmt.Errorf("no running pods match the specified filters in session %q", cc.sessionName) + return fmt.Errorf("no running pods in session %q (0/%d pods ready)", cc.sessionName, len(allPods)) + } + + if len(pods) < len(allPods) && !readyOnly { + notReady := make([]string, 0, len(allPods)-len(pods)) + runningSet := make(map[string]bool, len(pods)) + for _, p := range pods { + runningSet[p.Name] = true + } + for _, p := range allPods { + if !runningSet[p.Name] { + notReady = append(notReady, fmt.Sprintf("%s (%s)", p.Name, p.Phase)) + } + } + return fmt.Errorf("%d/%d pods are not running: %s\nUse --ready-only to run on the %d ready pods", + len(notReady), len(allPods), strings.Join(notReady, ", "), len(pods)) } targetContainer := container diff --git a/internal/cli/pdsh_test.go b/internal/cli/pdsh_test.go index b62e7ec..22d6733 100644 --- a/internal/cli/pdsh_test.go +++ b/internal/cli/pdsh_test.go @@ -373,3 +373,23 @@ func TestValidateMultiPodFlags(t *testing.T) { }) } } + +func TestFilterRunningPodsReadinessCheck(t *testing.T) { + allPods := []kube.PodSummary{ + {Name: "leader-0", Phase: "Running"}, + {Name: "worker-0", Phase: "Pending"}, + {Name: "worker-1", Phase: "ContainerCreating"}, + } + running := filterRunningPods(allPods) + + // Without --ready-only, should detect the gap. + if len(running) == len(allPods) { + t.Fatal("expected fewer running pods than total") + } + if len(running) != 1 { + t.Fatalf("expected 1 running pod, got %d", len(running)) + } + if running[0].Name != "leader-0" { + t.Fatalf("expected leader-0, got %s", running[0].Name) + } +} From 5432ea3df81b526830b9803acf8e02d8376c2848 Mon Sep 17 00:00:00 2001 From: acmore Date: Tue, 14 Apr 2026 08:23:16 +0800 Subject: [PATCH 3/3] fix(up): wait for all pods to be ready before completing okdev up waitForCandidatePodReady previously returned as soon as the single candidate pod was ready, leaving follower pods (e.g. LWS workers) still starting. Now continues waiting for every discovered pod to reach Ready before returning. Co-Authored-By: Claude Opus 4.6 --- internal/workload/helpers.go | 60 +++++++++++++++++++++++++++++-- internal/workload/helpers_test.go | 57 ++++++++++++++++++++++++++++- internal/workload/job_test.go | 2 +- 3 files changed, 114 insertions(+), 5 deletions(-) diff --git a/internal/workload/helpers.go b/internal/workload/helpers.go index bc8dedd..72b637e 100644 --- a/internal/workload/helpers.go +++ b/internal/workload/helpers.go @@ -139,8 +139,9 @@ func isReadyString(s string) bool { type candidateSelector func(ctx context.Context, k podLister, namespace string) (TargetRef, []kube.PodSummary, error) // waitForCandidatePodReady polls for a candidate pod and waits for it to -// become ready. Used by multi-pod runtimes where pod discovery may take -// time after the workload is applied. +// become ready, then waits for all remaining pods to become ready as well. +// Used by multi-pod runtimes where pod discovery may take time after the +// workload is applied. func waitForCandidatePodReady( ctx context.Context, k WaitClient, @@ -168,7 +169,7 @@ func waitForCandidatePodReady( } waitErr := k.WaitReadyWithProgress(ctx, namespace, target.PodName, waitTimeout, onProgress) if waitErr == nil { - return nil + return waitForRemainingPods(ctx, k, namespace, selectCandidate, deadline, onProgress) } if shouldRetryCandidateWait(waitErr) { continue @@ -190,6 +191,59 @@ func waitForCandidatePodReady( return fmt.Errorf("%s", timeoutMessage) } +// waitForRemainingPods waits for all non-ready pods discovered by +// selectCandidate to become ready before the deadline. It polls the pod +// list and calls WaitReadyWithProgress for each pod that is not yet ready. +func waitForRemainingPods( + ctx context.Context, + k WaitClient, + namespace string, + selectCandidate candidateSelector, + deadline time.Time, + onProgress func(kube.PodReadinessProgress), +) error { + for { + remaining := time.Until(deadline) + if remaining <= 0 { + return fmt.Errorf("timed out waiting for all pods to become ready") + } + + _, pods, err := selectCandidate(ctx, k, namespace) + if err != nil { + return err + } + + var pending []kube.PodSummary + for _, p := range pods { + if !isReadyString(strings.TrimSpace(p.Ready)) { + pending = append(pending, p) + } + } + if len(pending) == 0 { + return nil + } + + progress := summarizePodsAsProgress(pods) + if onProgress != nil { + progress.Reason = fmt.Sprintf("waiting for %d/%d pods", len(pending), len(pods)) + onProgress(progress) + } + + // Wait for the first non-ready pod; once it's ready we re-check all. + waitTimeout := time.Until(deadline) + if waitTimeout <= 0 { + return fmt.Errorf("timed out waiting for all pods to become ready") + } + waitErr := k.WaitReadyWithProgress(ctx, namespace, pending[0].Name, waitTimeout, onProgress) + if waitErr != nil { + if shouldRetryCandidateWait(waitErr) { + continue + } + return waitErr + } + } +} + func shouldRetryCandidateWait(err error) bool { if err == nil { return false diff --git a/internal/workload/helpers_test.go b/internal/workload/helpers_test.go index eb6ed10..ad28bb2 100644 --- a/internal/workload/helpers_test.go +++ b/internal/workload/helpers_test.go @@ -35,7 +35,7 @@ func (s staticPodLister) ListPods(_ context.Context, _ string, _ bool, _ string) func TestWaitForCandidatePodReadyRetriesOnDeletedCandidate(t *testing.T) { client := &retryWaitClient{ - podLister: staticPodLister{pods: []kube.PodSummary{{Name: "target-1"}}}, + podLister: staticPodLister{pods: []kube.PodSummary{{Name: "target-1", Phase: "Running", Ready: "1/1"}}}, errs: []error{errors.New("pod/target-1 was deleted while waiting for readiness")}, } selectCandidate := func(ctx context.Context, k podLister, namespace string) (TargetRef, []kube.PodSummary, error) { @@ -54,6 +54,61 @@ func TestWaitForCandidatePodReadyRetriesOnDeletedCandidate(t *testing.T) { } } +func TestWaitForCandidatePodReadyWaitsForAllPods(t *testing.T) { + // Simulate 3 pods: target-0 becomes ready immediately, target-1 and target-2 + // need one more WaitReadyWithProgress call each. + readyCalls := map[string]int{} + client := &allPodsWaitClient{ + lister: staticPodLister{pods: []kube.PodSummary{ + {Name: "target-0", Phase: "Running", Ready: "1/1"}, + {Name: "target-1", Phase: "Pending", Ready: "0/1"}, + {Name: "target-2", Phase: "Pending", Ready: "0/1"}, + }}, + readyCalls: readyCalls, + markReady: func(name string, lister *staticPodLister) { + for i := range lister.pods { + if lister.pods[i].Name == name { + lister.pods[i].Phase = "Running" + lister.pods[i].Ready = "1/1" + } + } + }, + } + selectCandidate := func(ctx context.Context, k podLister, namespace string) (TargetRef, []kube.PodSummary, error) { + pods, err := k.ListPods(ctx, namespace, false, "") + if err != nil { + return TargetRef{}, nil, err + } + return TargetRef{PodName: "target-0"}, pods, nil + } + + err := waitForCandidatePodReady(context.Background(), client, "default", selectCandidate, 5*time.Second, nil, "timed out") + if err != nil { + t.Fatalf("waitForCandidatePodReady: %v", err) + } + // target-0 waited on first, then target-1 and target-2 in the remaining loop. + if readyCalls["target-0"] < 1 || readyCalls["target-1"] < 1 || readyCalls["target-2"] < 1 { + t.Fatalf("expected all pods to be waited on, got calls: %v", readyCalls) + } +} + +// allPodsWaitClient marks a pod as ready after WaitReadyWithProgress is called for it. +type allPodsWaitClient struct { + lister staticPodLister + readyCalls map[string]int + markReady func(string, *staticPodLister) +} + +func (c *allPodsWaitClient) ListPods(ctx context.Context, namespace string, allNamespaces bool, labelSelector string) ([]kube.PodSummary, error) { + return c.lister.ListPods(ctx, namespace, allNamespaces, labelSelector) +} + +func (c *allPodsWaitClient) WaitReadyWithProgress(_ context.Context, _ string, pod string, _ time.Duration, _ func(kube.PodReadinessProgress)) error { + c.readyCalls[pod]++ + c.markReady(pod, &c.lister) + return nil +} + func TestResolveManifestPathPrefersFolderConfigDir(t *testing.T) { tmp := t.TempDir() configPath := filepath.Join(tmp, ".okdev", "okdev.yaml") diff --git a/internal/workload/job_test.go b/internal/workload/job_test.go index b0c2ead..309f5e0 100644 --- a/internal/workload/job_test.go +++ b/internal/workload/job_test.go @@ -75,7 +75,7 @@ spec: { Name: "trainer-older", Phase: "Running", - Ready: "0/1", + Ready: "1/1", CreatedAt: time.Now().Add(-2 * time.Minute), Labels: map[string]string{"okdev.io/attachable": "true"}, },