diff --git a/internal/cli/pdsh.go b/internal/cli/pdsh.go index c03b980..ded1576 100644 --- a/internal/cli/pdsh.go +++ b/internal/cli/pdsh.go @@ -29,6 +29,7 @@ func newExecCmd(opts *Options) *cobra.Command { var logDir string var noPrefix bool var fanout int + var readyOnly bool cmd := &cobra.Command{ Use: "exec [session]", @@ -65,7 +66,7 @@ func newExecCmd(opts *Options) *cobra.Command { } if multiPod || detach { - return runMultiPodExec(cmd, cc, commandArgs, podNames, role, labels, exclude, container, detach, timeout, logDir, noPrefix, fanout) + return runMultiPodExec(cmd, cc, commandArgs, podNames, role, labels, exclude, container, detach, timeout, logDir, noPrefix, fanout, readyOnly) } // Single-pod mode (existing behavior) @@ -101,6 +102,7 @@ func newExecCmd(opts *Options) *cobra.Command { cmd.Flags().StringVar(&logDir, "log-dir", "", "Write per-pod logs to directory") cmd.Flags().BoolVar(&noPrefix, "no-prefix", false, "Suppress pod name prefix in output") cmd.Flags().IntVar(&fanout, "fanout", pdshDefaultFanout, "Maximum concurrent pod executions") + cmd.Flags().BoolVar(&readyOnly, "ready-only", false, "Run only on pods that are already running (skip readiness check)") return cmd } @@ -147,30 +149,52 @@ func validateMultiPodFlags(podNames []string, role string, labels []string, excl return nil } -func runMultiPodExec(cmd *cobra.Command, cc *commandContext, commandArgs []string, podNames []string, role string, labels []string, exclude []string, container string, detach bool, timeout time.Duration, logDir string, noPrefix bool, fanout int) error { +func runMultiPodExec(cmd *cobra.Command, cc *commandContext, commandArgs []string, podNames []string, role string, labels []string, exclude []string, container string, detach bool, timeout time.Duration, logDir string, noPrefix bool, fanout int, readyOnly bool) error { ctx := cmd.Context() labelSel := selectorForSessionRun(cc.sessionName) sessionPods, err := cc.kube.ListPods(ctx, cc.namespace, false, labelSel) if err != nil { return fmt.Errorf("list session pods: %w", err) } - pods := filterRunningPods(sessionPods) + // Apply user-specified filters before the readiness check so the + // running-vs-total comparison only considers targeted pods. + allPods := sessionPods switch { case len(podNames) > 0: - pods = filterPodsByName(pods, podNames) + allPods = filterPodsByName(allPods, podNames) case role != "": - pods = filterPodsByRole(pods, role) + allPods = filterPodsByRole(allPods, role) case len(labels) > 0: - pods = filterPodsByLabels(pods, labels) + allPods = filterPodsByLabels(allPods, labels) } - if len(exclude) > 0 { - pods = excludePods(pods, exclude) + allPods = excludePods(allPods, exclude) + } + + pods := filterRunningPods(allPods) + + if len(allPods) == 0 { + return fmt.Errorf("no pods match the specified filters in session %q", cc.sessionName) } if len(pods) == 0 { - return fmt.Errorf("no running pods match the specified filters in session %q", cc.sessionName) + return fmt.Errorf("no running pods in session %q (0/%d pods ready)", cc.sessionName, len(allPods)) + } + + if len(pods) < len(allPods) && !readyOnly { + notReady := make([]string, 0, len(allPods)-len(pods)) + runningSet := make(map[string]bool, len(pods)) + for _, p := range pods { + runningSet[p.Name] = true + } + for _, p := range allPods { + if !runningSet[p.Name] { + notReady = append(notReady, fmt.Sprintf("%s (%s)", p.Name, p.Phase)) + } + } + return fmt.Errorf("%d/%d pods are not running: %s\nUse --ready-only to run on the %d ready pods", + len(notReady), len(allPods), strings.Join(notReady, ", "), len(pods)) } targetContainer := container @@ -215,6 +239,8 @@ func runMultiExec(ctx context.Context, client connect.ExecClient, namespace stri podNames[i] = p.Name } shortNames := shortPodNames(podNames) + colorEnabled := isInteractiveWriter(stdout) + displayPrefixes := formatPodPrefixes(shortNames, colorEnabled) var writeMu sync.Mutex noPrefixOut := &lockedWriter{w: stdout} @@ -225,7 +251,7 @@ func runMultiExec(ctx context.Context, client connect.ExecClient, namespace stri var wg sync.WaitGroup for i, pod := range pods { wg.Add(1) - go func(pod kube.PodSummary, shortName string) { + go func(pod kube.PodSummary, shortName, displayPrefix string) { defer wg.Done() sem <- struct{}{} defer func() { <-sem }() @@ -242,8 +268,8 @@ func runMultiExec(ctx context.Context, client connect.ExecClient, namespace stri podStdout = noPrefixOut podStderr = noPrefixErr } else { - pw := newPrefixedWriter(shortName, stdout, &writeMu) - pe := newPrefixedWriter(shortName, stderr, &writeMu) + pw := newPrefixedWriter(displayPrefix, stdout, &writeMu) + pe := newPrefixedWriter(displayPrefix, stderr, &writeMu) defer pw.Flush() defer pe.Flush() podStdout = pw @@ -264,7 +290,7 @@ func runMultiExec(ctx context.Context, client connect.ExecClient, namespace stri err := connect.RunOnContainer(execCtx, client, namespace, pod.Name, container, command, false, nil, podStdout, podStderr) results <- podExecResult{pod: pod.Name, err: err} - }(pod, shortNames[i]) + }(pod, shortNames[i], displayPrefixes[i]) } go func() { @@ -305,6 +331,8 @@ func runDetachExec(ctx context.Context, client connect.ExecClient, namespace str podNames[i] = p.Name } shortNames := shortPodNames(podNames) + colorEnabled := isInteractiveWriter(out) + displayPrefixes := formatPodPrefixes(shortNames, colorEnabled) command := detachCommand(cmdStr) results := make(chan podExecResult, len(pods)) @@ -327,19 +355,21 @@ func runDetachExec(ctx context.Context, client connect.ExecClient, namespace str close(results) }() + displayMap := make(map[string]string, len(podNames)) nameMap := make(map[string]string, len(podNames)) for i, n := range podNames { + displayMap[n] = displayPrefixes[i] nameMap[n] = shortNames[i] } var failCount int for r := range results { - short := nameMap[r.pod] + prefix := displayMap[r.pod] if r.err != nil { - fmt.Fprintf(out, "%s: error: %v\n", short, r.err) + fmt.Fprintf(out, "%s error: %v\n", prefix, r.err) failCount++ } else { - fmt.Fprintf(out, "%s: detached\n", short) + fmt.Fprintf(out, "%s detached\n", prefix) } } if failCount > 0 { diff --git a/internal/cli/pdsh_fanout_test.go b/internal/cli/pdsh_fanout_test.go index e8fea75..97023ab 100644 --- a/internal/cli/pdsh_fanout_test.go +++ b/internal/cli/pdsh_fanout_test.go @@ -223,10 +223,10 @@ func TestMultiExecPipelineAllPodsWithPrefixedOutput(t *testing.T) { // Step 4: verify prefixed output output := stdout.String() - if !strings.Contains(output, "worker-0: gpu0: NVIDIA A100") { + if !strings.Contains(output, "[worker-0] gpu0: NVIDIA A100") { t.Fatalf("expected prefixed output for worker-0, got %q", output) } - if !strings.Contains(output, "worker-1: gpu1: NVIDIA H100") { + if !strings.Contains(output, "[worker-1] gpu1: NVIDIA H100") { t.Fatalf("expected prefixed output for worker-1, got %q", output) } } @@ -327,13 +327,13 @@ func TestMultiExecPipelinePartialFailureWithDetach(t *testing.T) { } output := stdout.String() - if !strings.Contains(output, "worker-0: detached") { + if !strings.Contains(output, "worker-0] detached") { t.Fatalf("expected worker-0 detached, got %q", output) } - if !strings.Contains(output, "worker-1: error: container not ready") { + if !strings.Contains(output, "worker-1] error: container not ready") { t.Fatalf("expected worker-1 error, got %q", output) } - if !strings.Contains(output, "worker-2: detached") { + if !strings.Contains(output, "worker-2] detached") { t.Fatalf("expected worker-2 detached, got %q", output) } } @@ -361,7 +361,7 @@ func TestMultiExecPipelineNoPrefixModeMultiplePods(t *testing.T) { output := stdout.String() // In no-prefix mode, output should NOT contain the pod name prefix. - if strings.Contains(output, "worker-0:") || strings.Contains(output, "worker-1:") { + if strings.Contains(output, "[worker-0]") || strings.Contains(output, "[worker-1]") { t.Fatalf("expected no prefix in output, got %q", output) } if !strings.Contains(output, "alpha") || !strings.Contains(output, "bravo") { @@ -526,7 +526,7 @@ func TestMultiExecFullPipelineRoleFilterFanoutLogDir(t *testing.T) { // Verify prefixed output present. output := stdout.String() - if !strings.Contains(output, "worker-0:") || !strings.Contains(output, "worker-1:") || !strings.Contains(output, "worker-3:") { + if !strings.Contains(output, "[worker-0]") || !strings.Contains(output, "[worker-1]") || !strings.Contains(output, "[worker-3]") { t.Fatalf("expected prefixed output for workers 0,1,3, got %q", output) } } diff --git a/internal/cli/pdsh_test.go b/internal/cli/pdsh_test.go index f98a287..22d6733 100644 --- a/internal/cli/pdsh_test.go +++ b/internal/cli/pdsh_test.go @@ -172,7 +172,7 @@ func TestRunDetachExec(t *testing.T) { if err != nil { t.Fatalf("unexpected error: %v", err) } - if !strings.Contains(stdout.String(), "worker-0: detached") || !strings.Contains(stdout.String(), "worker-1: detached") { + if !strings.Contains(stdout.String(), "worker-0] detached") || !strings.Contains(stdout.String(), "worker-1] detached") { t.Fatalf("expected detach confirmation, got %q", stdout.String()) } } @@ -193,10 +193,10 @@ func TestRunDetachExecWithError(t *testing.T) { if err == nil { t.Fatal("expected error for partial failure") } - if !strings.Contains(stdout.String(), "worker-0: detached") { + if !strings.Contains(stdout.String(), "worker-0] detached") { t.Fatalf("expected detach for worker-0, got %q", stdout.String()) } - if !strings.Contains(stdout.String(), "worker-1: error:") { + if !strings.Contains(stdout.String(), "worker-1] error:") { t.Fatalf("expected error for worker-1, got %q", stdout.String()) } } @@ -218,7 +218,7 @@ func TestRunMultiExecSuccess(t *testing.T) { if err != nil { t.Fatalf("unexpected error: %v", err) } - if !strings.Contains(stdout.String(), "worker-0: ok") || !strings.Contains(stdout.String(), "worker-1: ok") { + if !strings.Contains(stdout.String(), "worker-0] ok") || !strings.Contains(stdout.String(), "worker-1] ok") { t.Fatalf("expected prefixed output, got %q", stdout.String()) } } @@ -373,3 +373,23 @@ func TestValidateMultiPodFlags(t *testing.T) { }) } } + +func TestFilterRunningPodsReadinessCheck(t *testing.T) { + allPods := []kube.PodSummary{ + {Name: "leader-0", Phase: "Running"}, + {Name: "worker-0", Phase: "Pending"}, + {Name: "worker-1", Phase: "ContainerCreating"}, + } + running := filterRunningPods(allPods) + + // Without --ready-only, should detect the gap. + if len(running) == len(allPods) { + t.Fatal("expected fewer running pods than total") + } + if len(running) != 1 { + t.Fatalf("expected 1 running pod, got %d", len(running)) + } + if running[0].Name != "leader-0" { + t.Fatalf("expected leader-0, got %s", running[0].Name) + } +} diff --git a/internal/cli/pdsh_writer.go b/internal/cli/pdsh_writer.go index ce6e0af..c3b4e20 100644 --- a/internal/cli/pdsh_writer.go +++ b/internal/cli/pdsh_writer.go @@ -57,6 +57,47 @@ func shortPodNames(names []string) []string { return out } +func maxPrefixWidth(names []string) int { + w := 0 + for _, n := range names { + if len(n) > w { + w = len(n) + } + } + return w +} + +var podPrefixColors = []string{ + "\033[36m", // cyan + "\033[33m", // yellow + "\033[32m", // green + "\033[35m", // magenta + "\033[34m", // blue + "\033[91m", // bright red + "\033[96m", // bright cyan + "\033[93m", // bright yellow +} + +func podPrefixColor(index int) string { + return podPrefixColors[index%len(podPrefixColors)] +} + +const prefixReset = "\033[0m" + +func formatPodPrefixes(shortNames []string, color bool) []string { + width := maxPrefixWidth(shortNames) + out := make([]string, len(shortNames)) + for i, name := range shortNames { + padded := fmt.Sprintf("%-*s", width, name) + if color { + out[i] = podPrefixColor(i) + "[" + padded + "]" + prefixReset + } else { + out[i] = "[" + padded + "]" + } + } + return out +} + // prefixedWriter is a thread-safe io.Writer that buffers input and emits // complete lines prefixed with the pod short name. Partial lines are held // until a newline arrives or Flush is called. @@ -81,7 +122,7 @@ func (w *prefixedWriter) Write(p []byte) (int, error) { break } w.mu.Lock() - fmt.Fprintf(w.dest, "%s: %s", w.prefix, line) + fmt.Fprintf(w.dest, "%s %s", w.prefix, line) w.mu.Unlock() } return len(p), nil @@ -93,7 +134,7 @@ func (w *prefixedWriter) Flush() { return } w.mu.Lock() - fmt.Fprintf(w.dest, "%s: %s\n", w.prefix, w.buf.String()) + fmt.Fprintf(w.dest, "%s %s\n", w.prefix, w.buf.String()) w.mu.Unlock() w.buf.Reset() } diff --git a/internal/cli/pdsh_writer_test.go b/internal/cli/pdsh_writer_test.go index 8ae90be..688e79d 100644 --- a/internal/cli/pdsh_writer_test.go +++ b/internal/cli/pdsh_writer_test.go @@ -3,6 +3,7 @@ package cli import ( "bytes" "fmt" + "strings" "sync" "testing" ) @@ -59,14 +60,47 @@ func TestShortPodNames(t *testing.T) { } } +func TestMaxPrefixWidth(t *testing.T) { + if got := maxPrefixWidth([]string{"ab", "abcde", "abc"}); got != 5 { + t.Fatalf("expected 5, got %d", got) + } + if got := maxPrefixWidth(nil); got != 0 { + t.Fatalf("expected 0 for nil, got %d", got) + } +} + +func TestFormatPodPrefixesNoColor(t *testing.T) { + names := []string{"worker-0", "worker-10", "master-0"} + got := formatPodPrefixes(names, false) + // Brackets + padded to 9 chars (len("worker-10")) + brackets = 11. + if got[0] != "[worker-0 ]" || got[1] != "[worker-10]" || got[2] != "[master-0 ]" { + t.Fatalf("unexpected prefixes: %v", got) + } +} + +func TestFormatPodPrefixesWithColor(t *testing.T) { + names := []string{"a", "bb"} + got := formatPodPrefixes(names, true) + // Each should contain ANSI color code and reset. + for i, p := range got { + if !strings.Contains(p, "\033[") || !strings.Contains(p, prefixReset) { + t.Fatalf("index %d: expected color codes in %q", i, p) + } + } + // Different pods should get different colors. + if got[0] == got[1] { + t.Fatalf("expected different colors for different pods") + } +} + func TestPrefixedWriterCompleteLine(t *testing.T) { var buf bytes.Buffer var mu sync.Mutex w := newPrefixedWriter("w0", &buf, &mu) fmt.Fprint(w, "hello world\n") w.Flush() - if got := buf.String(); got != "w0: hello world\n" { - t.Fatalf("expected %q, got %q", "w0: hello world\n", got) + if got := buf.String(); got != "w0 hello world\n" { + t.Fatalf("expected %q, got %q", "w0 hello world\n", got) } } @@ -79,8 +113,8 @@ func TestPrefixedWriterPartialLines(t *testing.T) { t.Fatalf("expected no output before newline, got %q", buf.String()) } fmt.Fprint(w, "lo\n") - if got := buf.String(); got != "w0: hello\n" { - t.Fatalf("expected %q, got %q", "w0: hello\n", got) + if got := buf.String(); got != "w0 hello\n" { + t.Fatalf("expected %q, got %q", "w0 hello\n", got) } } @@ -89,7 +123,7 @@ func TestPrefixedWriterMultipleLines(t *testing.T) { var mu sync.Mutex w := newPrefixedWriter("n1", &buf, &mu) fmt.Fprint(w, "line1\nline2\n") - if got := buf.String(); got != "n1: line1\nn1: line2\n" { + if got := buf.String(); got != "n1 line1\nn1 line2\n" { t.Fatalf("expected prefixed lines, got %q", got) } } @@ -100,7 +134,7 @@ func TestPrefixedWriterFlushPartial(t *testing.T) { w := newPrefixedWriter("p", &buf, &mu) fmt.Fprint(w, "no newline") w.Flush() - if got := buf.String(); got != "p: no newline\n" { + if got := buf.String(); got != "p no newline\n" { t.Fatalf("expected flushed partial line, got %q", got) } } diff --git a/internal/workload/helpers.go b/internal/workload/helpers.go index bc8dedd..72b637e 100644 --- a/internal/workload/helpers.go +++ b/internal/workload/helpers.go @@ -139,8 +139,9 @@ func isReadyString(s string) bool { type candidateSelector func(ctx context.Context, k podLister, namespace string) (TargetRef, []kube.PodSummary, error) // waitForCandidatePodReady polls for a candidate pod and waits for it to -// become ready. Used by multi-pod runtimes where pod discovery may take -// time after the workload is applied. +// become ready, then waits for all remaining pods to become ready as well. +// Used by multi-pod runtimes where pod discovery may take time after the +// workload is applied. func waitForCandidatePodReady( ctx context.Context, k WaitClient, @@ -168,7 +169,7 @@ func waitForCandidatePodReady( } waitErr := k.WaitReadyWithProgress(ctx, namespace, target.PodName, waitTimeout, onProgress) if waitErr == nil { - return nil + return waitForRemainingPods(ctx, k, namespace, selectCandidate, deadline, onProgress) } if shouldRetryCandidateWait(waitErr) { continue @@ -190,6 +191,59 @@ func waitForCandidatePodReady( return fmt.Errorf("%s", timeoutMessage) } +// waitForRemainingPods waits for all non-ready pods discovered by +// selectCandidate to become ready before the deadline. It polls the pod +// list and calls WaitReadyWithProgress for each pod that is not yet ready. +func waitForRemainingPods( + ctx context.Context, + k WaitClient, + namespace string, + selectCandidate candidateSelector, + deadline time.Time, + onProgress func(kube.PodReadinessProgress), +) error { + for { + remaining := time.Until(deadline) + if remaining <= 0 { + return fmt.Errorf("timed out waiting for all pods to become ready") + } + + _, pods, err := selectCandidate(ctx, k, namespace) + if err != nil { + return err + } + + var pending []kube.PodSummary + for _, p := range pods { + if !isReadyString(strings.TrimSpace(p.Ready)) { + pending = append(pending, p) + } + } + if len(pending) == 0 { + return nil + } + + progress := summarizePodsAsProgress(pods) + if onProgress != nil { + progress.Reason = fmt.Sprintf("waiting for %d/%d pods", len(pending), len(pods)) + onProgress(progress) + } + + // Wait for the first non-ready pod; once it's ready we re-check all. + waitTimeout := time.Until(deadline) + if waitTimeout <= 0 { + return fmt.Errorf("timed out waiting for all pods to become ready") + } + waitErr := k.WaitReadyWithProgress(ctx, namespace, pending[0].Name, waitTimeout, onProgress) + if waitErr != nil { + if shouldRetryCandidateWait(waitErr) { + continue + } + return waitErr + } + } +} + func shouldRetryCandidateWait(err error) bool { if err == nil { return false diff --git a/internal/workload/helpers_test.go b/internal/workload/helpers_test.go index eb6ed10..ad28bb2 100644 --- a/internal/workload/helpers_test.go +++ b/internal/workload/helpers_test.go @@ -35,7 +35,7 @@ func (s staticPodLister) ListPods(_ context.Context, _ string, _ bool, _ string) func TestWaitForCandidatePodReadyRetriesOnDeletedCandidate(t *testing.T) { client := &retryWaitClient{ - podLister: staticPodLister{pods: []kube.PodSummary{{Name: "target-1"}}}, + podLister: staticPodLister{pods: []kube.PodSummary{{Name: "target-1", Phase: "Running", Ready: "1/1"}}}, errs: []error{errors.New("pod/target-1 was deleted while waiting for readiness")}, } selectCandidate := func(ctx context.Context, k podLister, namespace string) (TargetRef, []kube.PodSummary, error) { @@ -54,6 +54,61 @@ func TestWaitForCandidatePodReadyRetriesOnDeletedCandidate(t *testing.T) { } } +func TestWaitForCandidatePodReadyWaitsForAllPods(t *testing.T) { + // Simulate 3 pods: target-0 becomes ready immediately, target-1 and target-2 + // need one more WaitReadyWithProgress call each. + readyCalls := map[string]int{} + client := &allPodsWaitClient{ + lister: staticPodLister{pods: []kube.PodSummary{ + {Name: "target-0", Phase: "Running", Ready: "1/1"}, + {Name: "target-1", Phase: "Pending", Ready: "0/1"}, + {Name: "target-2", Phase: "Pending", Ready: "0/1"}, + }}, + readyCalls: readyCalls, + markReady: func(name string, lister *staticPodLister) { + for i := range lister.pods { + if lister.pods[i].Name == name { + lister.pods[i].Phase = "Running" + lister.pods[i].Ready = "1/1" + } + } + }, + } + selectCandidate := func(ctx context.Context, k podLister, namespace string) (TargetRef, []kube.PodSummary, error) { + pods, err := k.ListPods(ctx, namespace, false, "") + if err != nil { + return TargetRef{}, nil, err + } + return TargetRef{PodName: "target-0"}, pods, nil + } + + err := waitForCandidatePodReady(context.Background(), client, "default", selectCandidate, 5*time.Second, nil, "timed out") + if err != nil { + t.Fatalf("waitForCandidatePodReady: %v", err) + } + // target-0 waited on first, then target-1 and target-2 in the remaining loop. + if readyCalls["target-0"] < 1 || readyCalls["target-1"] < 1 || readyCalls["target-2"] < 1 { + t.Fatalf("expected all pods to be waited on, got calls: %v", readyCalls) + } +} + +// allPodsWaitClient marks a pod as ready after WaitReadyWithProgress is called for it. +type allPodsWaitClient struct { + lister staticPodLister + readyCalls map[string]int + markReady func(string, *staticPodLister) +} + +func (c *allPodsWaitClient) ListPods(ctx context.Context, namespace string, allNamespaces bool, labelSelector string) ([]kube.PodSummary, error) { + return c.lister.ListPods(ctx, namespace, allNamespaces, labelSelector) +} + +func (c *allPodsWaitClient) WaitReadyWithProgress(_ context.Context, _ string, pod string, _ time.Duration, _ func(kube.PodReadinessProgress)) error { + c.readyCalls[pod]++ + c.markReady(pod, &c.lister) + return nil +} + func TestResolveManifestPathPrefersFolderConfigDir(t *testing.T) { tmp := t.TempDir() configPath := filepath.Join(tmp, ".okdev", "okdev.yaml") diff --git a/internal/workload/job_test.go b/internal/workload/job_test.go index b0c2ead..309f5e0 100644 --- a/internal/workload/job_test.go +++ b/internal/workload/job_test.go @@ -75,7 +75,7 @@ spec: { Name: "trainer-older", Phase: "Running", - Ready: "0/1", + Ready: "1/1", CreatedAt: time.Now().Add(-2 * time.Minute), Labels: map[string]string{"okdev.io/attachable": "true"}, }, diff --git a/scripts/e2e_kind_pytorchjob.sh b/scripts/e2e_kind_pytorchjob.sh index 91d7868..d42bf9e 100755 --- a/scripts/e2e_kind_pytorchjob.sh +++ b/scripts/e2e_kind_pytorchjob.sh @@ -384,8 +384,8 @@ echo "exec --exclude verified" echo "Testing exec --no-prefix suppresses pod name prefix" EXEC_NOPREFIX_OUTPUT=$("$OKDEV_BIN" --config "$CFG_PATH" --session "$SESSION_NAME" exec --no-prefix --no-tty -- sh -lc 'echo raw-output') echo "$EXEC_NOPREFIX_OUTPUT" -# In no-prefix mode, lines should be raw (no "podname: " prefix). -if echo "$EXEC_NOPREFIX_OUTPUT" | grep -qE '^[a-z0-9-]+: raw-output$'; then +# In no-prefix mode, lines should be raw (no "[podname]" prefix). +if echo "$EXEC_NOPREFIX_OUTPUT" | grep -qE '^\[.*\] raw-output$'; then echo "ERROR: expected no-prefix mode but found prefixed output" >&2 echo "$EXEC_NOPREFIX_OUTPUT" >&2 exit 1