diff --git a/pkg/cmd/container/logs.go b/pkg/cmd/container/logs.go index c2ede0f0b35..7824f994e47 100644 --- a/pkg/cmd/container/logs.go +++ b/pkg/cmd/container/logs.go @@ -71,36 +71,46 @@ func Logs(ctx context.Context, client *containerd.Client, container string, opti } follow := options.Follow - if follow { - task, err := found.Container.Task(ctx, nil) + running := false + task, err := found.Container.Task(ctx, nil) + if err != nil { + if !errdefs.IsNotFound(err) { + return err + } + } else { + status, err := task.Status(ctx) if err != nil { - if !errdefs.IsNotFound(err) { - return err - } - follow = false - } else { - status, err := task.Status(ctx) - if err != nil { - return err - } - if status.Status != containerd.Running { - follow = false - } else { - waitCh, err := task.Wait(ctx) - if err != nil { - return fmt.Errorf("failed to get wait channel for task %#v: %w", task, err) - } + return err + } + running = status.Status == containerd.Running + } - // Setup goroutine to send stop event if container task finishes: - go func() { - <-waitCh - // Wait for logger to process remaining logs after container exit - if err = logging.WaitForLogger(dataStore, l[labels.Namespace], found.Container.ID()); err != nil { - log.G(ctx).WithError(err).Error("failed to wait for logger shutdown") - } - log.G(ctx).Debugf("container task has finished, sending kill signal to log viewer") - stopChannel <- os.Interrupt - }() + if follow && running { + waitCh, err := task.Wait(ctx) + if err != nil { + return fmt.Errorf("failed to get wait channel for task %#v: %w", task, err) + } + + // Setup goroutine to send stop event if container task finishes: + go func() { + <-waitCh + // Wait for logger to process remaining logs after container exit + if err = logging.WaitForLogger(dataStore, l[labels.Namespace], found.Container.ID()); err != nil { + log.G(ctx).WithError(err).Error("failed to wait for logger shutdown") + } + log.G(ctx).Debugf("container task has finished, sending kill signal to log viewer") + stopChannel <- os.Interrupt + }() + } else { + follow = false + if !running { + // Container is not running. Wait for the logging binary + // to finish writing all log entries before reading the + // log file. Without this, we may read an incomplete log + // file because the logging binary (a separate process) + // may still be processing the final container output. + if err := logging.WaitForLogger(dataStore, l[labels.Namespace], found.Container.ID()); err != nil { + log.G(ctx).WithError(err).Warn("failed to wait for logger") } } } diff --git a/pkg/logging/logging.go b/pkg/logging/logging.go index 91a3231ee3a..690b24ba668 100644 --- a/pkg/logging/logging.go +++ b/pkg/logging/logging.go @@ -229,7 +229,10 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, addres // initialize goroutines to copy stdout and stderr streams to a closable pipe pipeStdoutR, pipeStdoutW := io.Pipe() pipeStderrR, pipeStderrW := io.Pipe() + var copyWg sync.WaitGroup + copyWg.Add(2) copyStream := func(reader io.Reader, writer *io.PipeWriter) { + defer copyWg.Done() // copy using a buffer of size 32K buf := make([]byte, 32<<10) _, err := io.CopyBuffer(writer, reader, buf) @@ -280,6 +283,13 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, addres return } <-exitCh + // After the container exits, cancel external readers to unblock + // copyStream (the pipe write-end may still be held open by the + // parent process), then wait for copyStream to finish draining + // any remaining data before closing the internal pipe writers. + stdoutR.Cancel() + stderrR.Cancel() + copyWg.Wait() }() wg.Wait() return driver.PostProcess()