From 7cd18d299ca9e5730eec6a6489d16b7c7612a8b0 Mon Sep 17 00:00:00 2001 From: Akihiro Suda Date: Tue, 10 Mar 2026 14:39:33 +0900 Subject: [PATCH 1/5] logs: wait for logging binary before reading log file When a container has exited, `nerdctl logs` now calls WaitForLogger() before reading the JSON log file. This ensures the logging binary (a separate process) has finished writing all log entries to disk. Previously, WaitForLogger was only called in the follow (-f) code path. For non-follow reads like `nerdctl logs --since 60s`, the log file was read immediately without waiting, causing flaky test failures when the logging binary hadn't finished processing the final container output. This fixes TestLogs/since_60s and TestLogs/until_60s which failed intermittently because the log file was empty or incomplete at read time. Co-Authored-By: Claude Opus 4.6 --- pkg/cmd/container/logs.go | 66 ++++++++++++++++++++++----------------- 1 file changed, 38 insertions(+), 28 deletions(-) diff --git a/pkg/cmd/container/logs.go b/pkg/cmd/container/logs.go index c2ede0f0b35..7824f994e47 100644 --- a/pkg/cmd/container/logs.go +++ b/pkg/cmd/container/logs.go @@ -71,36 +71,46 @@ func Logs(ctx context.Context, client *containerd.Client, container string, opti } follow := options.Follow - if follow { - task, err := found.Container.Task(ctx, nil) + running := false + task, err := found.Container.Task(ctx, nil) + if err != nil { + if !errdefs.IsNotFound(err) { + return err + } + } else { + status, err := task.Status(ctx) if err != nil { - if !errdefs.IsNotFound(err) { - return err - } - follow = false - } else { - status, err := task.Status(ctx) - if err != nil { - return err - } - if status.Status != containerd.Running { - follow = false - } else { - waitCh, err := task.Wait(ctx) - if err != nil { - return fmt.Errorf("failed to get wait channel for task %#v: %w", task, err) - } + return err + } + running = status.Status == containerd.Running + } - // Setup goroutine to send stop event if container task finishes: - go func() { - <-waitCh - // Wait for logger to process remaining logs after container exit - if err = logging.WaitForLogger(dataStore, l[labels.Namespace], found.Container.ID()); err != nil { - log.G(ctx).WithError(err).Error("failed to wait for logger shutdown") - } - log.G(ctx).Debugf("container task has finished, sending kill signal to log viewer") - stopChannel <- os.Interrupt - }() + if follow && running { + waitCh, err := task.Wait(ctx) + if err != nil { + return fmt.Errorf("failed to get wait channel for task %#v: %w", task, err) + } + + // Setup goroutine to send stop event if container task finishes: + go func() { + <-waitCh + // Wait for logger to process remaining logs after container exit + if err = logging.WaitForLogger(dataStore, l[labels.Namespace], found.Container.ID()); err != nil { + log.G(ctx).WithError(err).Error("failed to wait for logger shutdown") + } + log.G(ctx).Debugf("container task has finished, sending kill signal to log viewer") + stopChannel <- os.Interrupt + }() + } else { + follow = false + if !running { + // Container is not running. Wait for the logging binary + // to finish writing all log entries before reading the + // log file. Without this, we may read an incomplete log + // file because the logging binary (a separate process) + // may still be processing the final container output. + if err := logging.WaitForLogger(dataStore, l[labels.Namespace], found.Container.ID()); err != nil { + log.G(ctx).WithError(err).Warn("failed to wait for logger") } } } From 715be7f9703b277bffa3e991cd9de301a9e088b7 Mon Sep 17 00:00:00 2001 From: Akihiro Suda Date: Tue, 10 Mar 2026 19:57:53 +0900 Subject: [PATCH 2/5] logging: fix data race in logging binary that loses container output MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The logging binary had a goroutine that closed internal pipe writers immediately upon container exit (via gRPC notification). This raced with copyStream, which was still draining data from the external pipe. When the gRPC notification arrived before copyStream finished, the pipe writer was closed mid-copy, causing remaining data to be lost. Fix by making copyStream responsible for closing the pipe writers after all data has been copied. The container-exit goroutine (and its getContainerWait infrastructure) is removed — pipe closure now happens naturally when the parent process exits or the shim closes the pipe. Fixes #4782 Co-Authored-By: Claude Opus 4.6 --- pkg/logging/logging.go | 65 ++++--------------------------------- pkg/logging/logging_test.go | 14 +------- 2 files changed, 7 insertions(+), 72 deletions(-) diff --git a/pkg/logging/logging.go b/pkg/logging/logging.go index 91a3231ee3a..55aca7cc557 100644 --- a/pkg/logging/logging.go +++ b/pkg/logging/logging.go @@ -26,14 +26,12 @@ import ( "os" "path/filepath" "sort" - "strings" "sync" "time" "github.com/fsnotify/fsnotify" "github.com/muesli/cancelreader" - containerd "github.com/containerd/containerd/v2/client" "github.com/containerd/containerd/v2/core/runtime/v2/logging" "github.com/containerd/errdefs" "github.com/containerd/log" @@ -165,49 +163,7 @@ func WaitForLogger(dataStore, ns, id string) error { }) } -func getContainerWait(ctx context.Context, address string, config *logging.Config) (<-chan containerd.ExitStatus, error) { - client, err := containerd.New(strings.TrimPrefix(address, "unix://"), containerd.WithDefaultNamespace(config.Namespace)) - if err != nil { - return nil, err - } - con, err := client.LoadContainer(ctx, config.ID) - if err != nil { - return nil, err - } - - task, err := con.Task(ctx, nil) - if err == nil { - return task.Wait(ctx) - } - if !errdefs.IsNotFound(err) { - return nil, err - } - - // If task was not found, it's possible that the container runtime is still being created. - // Retry every 100ms. - ticker := time.NewTicker(100 * time.Millisecond) - defer ticker.Stop() - - for { - select { - case <-ctx.Done(): - return nil, errors.New("timed out waiting for container task to start") - case <-ticker.C: - task, err = con.Task(ctx, nil) - if err != nil { - if errdefs.IsNotFound(err) { - continue - } - return nil, err - } - return task.Wait(ctx) - } - } -} - -type ContainerWaitFunc func(ctx context.Context, address string, config *logging.Config) (<-chan containerd.ExitStatus, error) - -func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, address string, getContainerWait ContainerWaitFunc, config *logging.Config) error { +func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore string, config *logging.Config) error { if err := driver.PreProcess(ctx, dataStore, config); err != nil { return err } @@ -236,6 +192,10 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, addres if err != nil { log.G(ctx).Errorf("failed to copy stream: %s", err) } + // Close the pipe writer after all data has been copied. + // This signals EOF to the downstream processLogFunc reader, + // ensuring all data is drained before the pipe is closed. + writer.Close() } go copyStream(stdoutR, pipeStdoutW) go copyStream(stderrR, pipeStderrW) @@ -269,18 +229,6 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, addres defer wg.Done() driver.Process(stdout, stderr) }() - go func() { - // close pipeStdoutW and pipeStderrW upon container exit - defer pipeStdoutW.Close() - defer pipeStderrW.Close() - - exitCh, err := getContainerWait(ctx, address, config) - if err != nil { - log.G(ctx).Errorf("failed to get container task wait channel: %v", err) - return - } - <-exitCh - }() wg.Wait() return driver.PostProcess() } @@ -313,8 +261,7 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) { if err := ready(); err != nil { return err } - // getContainerWait is extracted as parameter to allow mocking in tests. - return loggingProcessAdapter(ctx, driver, dataStore, logConfig.Address, getContainerWait, config) + return loggingProcessAdapter(ctx, driver, dataStore, config) }) } else if !errors.Is(err, os.ErrNotExist) { // the file does not exist if the container was created with nerdctl < 0.20 diff --git a/pkg/logging/logging_test.go b/pkg/logging/logging_test.go index da0d535b074..d1b97b7557f 100644 --- a/pkg/logging/logging_test.go +++ b/pkg/logging/logging_test.go @@ -23,9 +23,7 @@ import ( "math/rand" "strings" "testing" - "time" - containerd "github.com/containerd/containerd/v2/client" "github.com/containerd/containerd/v2/core/runtime/v2/logging" ) @@ -79,21 +77,11 @@ func TestLoggingProcessAdapter(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - var getContainerWaitMock ContainerWaitFunc = func(ctx context.Context, address string, config *logging.Config) (<-chan containerd.ExitStatus, error) { - exitChan := make(chan containerd.ExitStatus, 1) - time.Sleep(50 * time.Millisecond) - exitChan <- containerd.ExitStatus{} - return exitChan, nil - } - - err := loggingProcessAdapter(ctx, driver, "testDataStore", "", getContainerWaitMock, config) + err := loggingProcessAdapter(ctx, driver, "testDataStore", config) if err != nil { t.Fatal(err) } - // let bufio read the buffer - time.Sleep(50 * time.Millisecond) - // Verify that the driver methods were called if !driver.processed { t.Fatal("process should be processed") From ce05a86f7a4b9a785d00098c2c8bc5f5188c4701 Mon Sep 17 00:00:00 2001 From: Akihiro Suda Date: Tue, 10 Mar 2026 20:17:36 +0900 Subject: [PATCH 3/5] Revert "logging: fix data race in logging binary that loses container output" This reverts commit 715be7f9703b277bffa3e991cd9de301a9e088b7. --- pkg/logging/logging.go | 65 +++++++++++++++++++++++++++++++++---- pkg/logging/logging_test.go | 14 +++++++- 2 files changed, 72 insertions(+), 7 deletions(-) diff --git a/pkg/logging/logging.go b/pkg/logging/logging.go index 55aca7cc557..91a3231ee3a 100644 --- a/pkg/logging/logging.go +++ b/pkg/logging/logging.go @@ -26,12 +26,14 @@ import ( "os" "path/filepath" "sort" + "strings" "sync" "time" "github.com/fsnotify/fsnotify" "github.com/muesli/cancelreader" + containerd "github.com/containerd/containerd/v2/client" "github.com/containerd/containerd/v2/core/runtime/v2/logging" "github.com/containerd/errdefs" "github.com/containerd/log" @@ -163,7 +165,49 @@ func WaitForLogger(dataStore, ns, id string) error { }) } -func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore string, config *logging.Config) error { +func getContainerWait(ctx context.Context, address string, config *logging.Config) (<-chan containerd.ExitStatus, error) { + client, err := containerd.New(strings.TrimPrefix(address, "unix://"), containerd.WithDefaultNamespace(config.Namespace)) + if err != nil { + return nil, err + } + con, err := client.LoadContainer(ctx, config.ID) + if err != nil { + return nil, err + } + + task, err := con.Task(ctx, nil) + if err == nil { + return task.Wait(ctx) + } + if !errdefs.IsNotFound(err) { + return nil, err + } + + // If task was not found, it's possible that the container runtime is still being created. + // Retry every 100ms. + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return nil, errors.New("timed out waiting for container task to start") + case <-ticker.C: + task, err = con.Task(ctx, nil) + if err != nil { + if errdefs.IsNotFound(err) { + continue + } + return nil, err + } + return task.Wait(ctx) + } + } +} + +type ContainerWaitFunc func(ctx context.Context, address string, config *logging.Config) (<-chan containerd.ExitStatus, error) + +func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, address string, getContainerWait ContainerWaitFunc, config *logging.Config) error { if err := driver.PreProcess(ctx, dataStore, config); err != nil { return err } @@ -192,10 +236,6 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore string, if err != nil { log.G(ctx).Errorf("failed to copy stream: %s", err) } - // Close the pipe writer after all data has been copied. - // This signals EOF to the downstream processLogFunc reader, - // ensuring all data is drained before the pipe is closed. - writer.Close() } go copyStream(stdoutR, pipeStdoutW) go copyStream(stderrR, pipeStderrW) @@ -229,6 +269,18 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore string, defer wg.Done() driver.Process(stdout, stderr) }() + go func() { + // close pipeStdoutW and pipeStderrW upon container exit + defer pipeStdoutW.Close() + defer pipeStderrW.Close() + + exitCh, err := getContainerWait(ctx, address, config) + if err != nil { + log.G(ctx).Errorf("failed to get container task wait channel: %v", err) + return + } + <-exitCh + }() wg.Wait() return driver.PostProcess() } @@ -261,7 +313,8 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) { if err := ready(); err != nil { return err } - return loggingProcessAdapter(ctx, driver, dataStore, config) + // getContainerWait is extracted as parameter to allow mocking in tests. + return loggingProcessAdapter(ctx, driver, dataStore, logConfig.Address, getContainerWait, config) }) } else if !errors.Is(err, os.ErrNotExist) { // the file does not exist if the container was created with nerdctl < 0.20 diff --git a/pkg/logging/logging_test.go b/pkg/logging/logging_test.go index d1b97b7557f..da0d535b074 100644 --- a/pkg/logging/logging_test.go +++ b/pkg/logging/logging_test.go @@ -23,7 +23,9 @@ import ( "math/rand" "strings" "testing" + "time" + containerd "github.com/containerd/containerd/v2/client" "github.com/containerd/containerd/v2/core/runtime/v2/logging" ) @@ -77,11 +79,21 @@ func TestLoggingProcessAdapter(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := loggingProcessAdapter(ctx, driver, "testDataStore", config) + var getContainerWaitMock ContainerWaitFunc = func(ctx context.Context, address string, config *logging.Config) (<-chan containerd.ExitStatus, error) { + exitChan := make(chan containerd.ExitStatus, 1) + time.Sleep(50 * time.Millisecond) + exitChan <- containerd.ExitStatus{} + return exitChan, nil + } + + err := loggingProcessAdapter(ctx, driver, "testDataStore", "", getContainerWaitMock, config) if err != nil { t.Fatal(err) } + // let bufio read the buffer + time.Sleep(50 * time.Millisecond) + // Verify that the driver methods were called if !driver.processed { t.Fatal("process should be processed") From e610741d298164ec33278247fef10017177febed Mon Sep 17 00:00:00 2001 From: Akihiro Suda Date: Tue, 10 Mar 2026 20:17:36 +0900 Subject: [PATCH 4/5] Revert "logs: wait for logging binary before reading log file" This reverts commit 7cd18d299ca9e5730eec6a6489d16b7c7612a8b0. --- pkg/cmd/container/logs.go | 66 +++++++++++++++++---------------------- 1 file changed, 28 insertions(+), 38 deletions(-) diff --git a/pkg/cmd/container/logs.go b/pkg/cmd/container/logs.go index 7824f994e47..c2ede0f0b35 100644 --- a/pkg/cmd/container/logs.go +++ b/pkg/cmd/container/logs.go @@ -71,46 +71,36 @@ func Logs(ctx context.Context, client *containerd.Client, container string, opti } follow := options.Follow - running := false - task, err := found.Container.Task(ctx, nil) - if err != nil { - if !errdefs.IsNotFound(err) { - return err - } - } else { - status, err := task.Status(ctx) - if err != nil { - return err - } - running = status.Status == containerd.Running - } - - if follow && running { - waitCh, err := task.Wait(ctx) + if follow { + task, err := found.Container.Task(ctx, nil) if err != nil { - return fmt.Errorf("failed to get wait channel for task %#v: %w", task, err) - } - - // Setup goroutine to send stop event if container task finishes: - go func() { - <-waitCh - // Wait for logger to process remaining logs after container exit - if err = logging.WaitForLogger(dataStore, l[labels.Namespace], found.Container.ID()); err != nil { - log.G(ctx).WithError(err).Error("failed to wait for logger shutdown") + if !errdefs.IsNotFound(err) { + return err + } + follow = false + } else { + status, err := task.Status(ctx) + if err != nil { + return err } - log.G(ctx).Debugf("container task has finished, sending kill signal to log viewer") - stopChannel <- os.Interrupt - }() - } else { - follow = false - if !running { - // Container is not running. Wait for the logging binary - // to finish writing all log entries before reading the - // log file. Without this, we may read an incomplete log - // file because the logging binary (a separate process) - // may still be processing the final container output. - if err := logging.WaitForLogger(dataStore, l[labels.Namespace], found.Container.ID()); err != nil { - log.G(ctx).WithError(err).Warn("failed to wait for logger") + if status.Status != containerd.Running { + follow = false + } else { + waitCh, err := task.Wait(ctx) + if err != nil { + return fmt.Errorf("failed to get wait channel for task %#v: %w", task, err) + } + + // Setup goroutine to send stop event if container task finishes: + go func() { + <-waitCh + // Wait for logger to process remaining logs after container exit + if err = logging.WaitForLogger(dataStore, l[labels.Namespace], found.Container.ID()); err != nil { + log.G(ctx).WithError(err).Error("failed to wait for logger shutdown") + } + log.G(ctx).Debugf("container task has finished, sending kill signal to log viewer") + stopChannel <- os.Interrupt + }() } } } From 1dbcb6be101073159ead5286d3aa3eaf24a74912 Mon Sep 17 00:00:00 2001 From: Akihiro Suda Date: Tue, 10 Mar 2026 20:19:16 +0900 Subject: [PATCH 5/5] logging: fix data race losing container output, wait for logger before reading logs Two fixes for flaky TestLogs/since_60s and TestLogs/until_60s: 1. In loggingProcessAdapter, the container-exit goroutine closed internal pipe writers immediately on container exit. This raced with copyStream goroutines still draining data from external pipes, causing output to be lost. Fix: after container exit, cancel the external readers to unblock copyStream, then wait for copyStream to finish before closing the pipe writers. This ensures all buffered data is processed. 2. In Logs(), call WaitForLogger() before reading log files when the container is not running, ensuring the logging binary has finished writing all entries. Fixes #4782 Co-Authored-By: Claude Opus 4.6 --- pkg/cmd/container/logs.go | 66 ++++++++++++++++++++++----------------- pkg/logging/logging.go | 10 ++++++ 2 files changed, 48 insertions(+), 28 deletions(-) diff --git a/pkg/cmd/container/logs.go b/pkg/cmd/container/logs.go index c2ede0f0b35..7824f994e47 100644 --- a/pkg/cmd/container/logs.go +++ b/pkg/cmd/container/logs.go @@ -71,36 +71,46 @@ func Logs(ctx context.Context, client *containerd.Client, container string, opti } follow := options.Follow - if follow { - task, err := found.Container.Task(ctx, nil) + running := false + task, err := found.Container.Task(ctx, nil) + if err != nil { + if !errdefs.IsNotFound(err) { + return err + } + } else { + status, err := task.Status(ctx) if err != nil { - if !errdefs.IsNotFound(err) { - return err - } - follow = false - } else { - status, err := task.Status(ctx) - if err != nil { - return err - } - if status.Status != containerd.Running { - follow = false - } else { - waitCh, err := task.Wait(ctx) - if err != nil { - return fmt.Errorf("failed to get wait channel for task %#v: %w", task, err) - } + return err + } + running = status.Status == containerd.Running + } - // Setup goroutine to send stop event if container task finishes: - go func() { - <-waitCh - // Wait for logger to process remaining logs after container exit - if err = logging.WaitForLogger(dataStore, l[labels.Namespace], found.Container.ID()); err != nil { - log.G(ctx).WithError(err).Error("failed to wait for logger shutdown") - } - log.G(ctx).Debugf("container task has finished, sending kill signal to log viewer") - stopChannel <- os.Interrupt - }() + if follow && running { + waitCh, err := task.Wait(ctx) + if err != nil { + return fmt.Errorf("failed to get wait channel for task %#v: %w", task, err) + } + + // Setup goroutine to send stop event if container task finishes: + go func() { + <-waitCh + // Wait for logger to process remaining logs after container exit + if err = logging.WaitForLogger(dataStore, l[labels.Namespace], found.Container.ID()); err != nil { + log.G(ctx).WithError(err).Error("failed to wait for logger shutdown") + } + log.G(ctx).Debugf("container task has finished, sending kill signal to log viewer") + stopChannel <- os.Interrupt + }() + } else { + follow = false + if !running { + // Container is not running. Wait for the logging binary + // to finish writing all log entries before reading the + // log file. Without this, we may read an incomplete log + // file because the logging binary (a separate process) + // may still be processing the final container output. + if err := logging.WaitForLogger(dataStore, l[labels.Namespace], found.Container.ID()); err != nil { + log.G(ctx).WithError(err).Warn("failed to wait for logger") } } } diff --git a/pkg/logging/logging.go b/pkg/logging/logging.go index 91a3231ee3a..690b24ba668 100644 --- a/pkg/logging/logging.go +++ b/pkg/logging/logging.go @@ -229,7 +229,10 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, addres // initialize goroutines to copy stdout and stderr streams to a closable pipe pipeStdoutR, pipeStdoutW := io.Pipe() pipeStderrR, pipeStderrW := io.Pipe() + var copyWg sync.WaitGroup + copyWg.Add(2) copyStream := func(reader io.Reader, writer *io.PipeWriter) { + defer copyWg.Done() // copy using a buffer of size 32K buf := make([]byte, 32<<10) _, err := io.CopyBuffer(writer, reader, buf) @@ -280,6 +283,13 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, addres return } <-exitCh + // After the container exits, cancel external readers to unblock + // copyStream (the pipe write-end may still be held open by the + // parent process), then wait for copyStream to finish draining + // any remaining data before closing the internal pipe writers. + stdoutR.Cancel() + stderrR.Cancel() + copyWg.Wait() }() wg.Wait() return driver.PostProcess()