diff --git a/integration/systemd_test.go b/integration/systemd_test.go index 4c552db0..22b8393b 100644 --- a/integration/systemd_test.go +++ b/integration/systemd_test.go @@ -3,8 +3,10 @@ package integration import ( "bytes" "context" + "io" "os" "strings" + "syscall" "testing" "time" @@ -162,6 +164,66 @@ func TestSystemdMode(t *testing.T) { t.Logf("Agent logs (last 5 lines):\n%s", output) }) + // Regression: serial log must survive copytruncate without leaving a + // sparse NUL hole. Pre-fix, Cloud Hypervisor held a non-O_APPEND fd on + // app.log, so post-truncate writes landed at the stale offset and the + // file became multi-GB sparse with NUL bytes from byte 0. After the + // fix, hypeman owns the writer fd with O_APPEND. + t.Run("SerialLogSurvivesCopytruncate", func(t *testing.T) { + if inst.HypervisorType != hypervisor.TypeCloudHypervisor { + t.Skipf("regression test is CH-specific; instance uses %s", inst.HypervisorType) + } + appLog := p.InstanceAppLog(inst.Id) + + require.Eventually(t, func() bool { + st, err := os.Stat(appLog) + return err == nil && st.Size() > 1024 + }, 30*time.Second, 200*time.Millisecond, "expected serial output to accumulate before rotation") + + src, err := os.Open(appLog) + require.NoError(t, err) + dst, err := os.Create(appLog + ".1") + require.NoError(t, err) + _, err = io.Copy(dst, src) + _ = src.Close() + _ = dst.Close() + require.NoError(t, err) + require.NoError(t, os.Truncate(appLog, 0)) + + // Drive more serial output post-truncate. + _, _, err = execInInstance(ctx, inst, "sh", "-c", "for i in 1 2 3; do echo post-rotate-marker-$i > /dev/kmsg; done") + require.NoError(t, err) + + require.Eventually(t, func() bool { + st, err := os.Stat(appLog) + return err == nil && st.Size() > 0 + }, 10*time.Second, 200*time.Millisecond) + + st, err := os.Stat(appLog) + require.NoError(t, err) + sys := st.Sys().(*syscall.Stat_t) + allocated := int64(sys.Blocks) * 512 + apparent := st.Size() + // Allow one block of slack since stat blocks granularity is 512B. + assert.LessOrEqualf(t, apparent, allocated+4096, + "post-rotation app.log is sparse: apparent=%d allocated=%d (sparse_bytes=%d)", + apparent, allocated, apparent-allocated) + + head := make([]byte, 64) + f, err := os.Open(appLog) + require.NoError(t, err) + n, _ := f.Read(head) + _ = f.Close() + nulCount := 0 + for _, b := range head[:n] { + if b == 0 { + nulCount++ + } + } + assert.Lessf(t, nulCount, n/2, + "post-rotation app.log starts with too many NULs (%d/%d) — likely a sparse hole", nulCount, n) + }) + t.Log("All systemd mode tests passed!") } diff --git a/lib/hypervisor/cloudhypervisor/cloudhypervisor.go b/lib/hypervisor/cloudhypervisor/cloudhypervisor.go index 95e0792a..d9c8c30c 100644 --- a/lib/hypervisor/cloudhypervisor/cloudhypervisor.go +++ b/lib/hypervisor/cloudhypervisor/cloudhypervisor.go @@ -15,6 +15,11 @@ import ( type CloudHypervisor struct { client *vmm.VMM socketPath string + // serial is set on the Starter path (StartVM/RestoreVM) so Shutdown + // can stop the reader explicitly. When the client is constructed via + // the reconnect factory (New), there is no reader to own — the + // goroutine from the original process exited with that process. + serial *serialReader } var balloonTargetCache hypervisor.BalloonTargetCache @@ -73,10 +78,13 @@ func (c *CloudHypervisor) DeleteVM(ctx context.Context) error { // Shutdown stops the VMM process gracefully. func (c *CloudHypervisor) Shutdown(ctx context.Context) error { resp, err := c.client.ShutdownVMMWithResponse(ctx) + // Stop the serial reader regardless of API outcome — once Shutdown + // has been requested the VM is going away and the reader has no + // further work. + c.serial.Close() if err != nil { return fmt.Errorf("shutdown vmm: %w", err) } - // ShutdownVMM may return various codes, 204 is success if resp.StatusCode() != 204 { return fmt.Errorf("shutdown vmm failed with status %d", resp.StatusCode()) } diff --git a/lib/hypervisor/cloudhypervisor/config.go b/lib/hypervisor/cloudhypervisor/config.go index d728036b..0c5a0ae5 100644 --- a/lib/hypervisor/cloudhypervisor/config.go +++ b/lib/hypervisor/cloudhypervisor/config.go @@ -1,10 +1,30 @@ package cloudhypervisor import ( + "path/filepath" + "github.com/kernel/hypeman/lib/hypervisor" "github.com/kernel/hypeman/lib/vmm" ) +// serialSocketPath returns the unix socket path that Cloud Hypervisor +// binds for serial output. The socket lives at the instance directory +// level, next to ch.sock and vsock.sock — not under logs/ — so the +// total path stays under the 108-byte sun_path limit on Linux (104 on +// macOS) when long test temp prefixes are involved. +// +// We route serial through a hypeman-owned socket reader (see serial.go) +// rather than letting CH open the file directly, because CH's File-mode +// serial opens without O_APPEND. Combined with copytruncate-style log +// rotation that leaves CH's fd offset stale, the next write lands past +// EOF and creates a sparse hole of NUL bytes from byte 0 onward. +func serialSocketPath(logPath string) string { + if logPath == "" { + return "" + } + return filepath.Join(filepath.Dir(filepath.Dir(logPath)), "serial.sock") +} + // ToVMConfig converts hypervisor.VMConfig to Cloud Hypervisor's vmm.VmConfig. func ToVMConfig(cfg hypervisor.VMConfig) vmm.VmConfig { // Payload configuration (kernel + initramfs) @@ -66,10 +86,12 @@ func ToVMConfig(cfg hypervisor.VMConfig) vmm.VmConfig { disks = append(disks, disk) } - // Serial console configuration + // Serial console configuration. We route serial through a unix socket + // that hypeman listens on (see startSerialReader) instead of letting + // CH write to the file directly — see serialSocketPath for rationale. serial := vmm.ConsoleConfig{ - Mode: vmm.ConsoleConfigMode("File"), - File: ptr(cfg.SerialLogPath), + Mode: vmm.ConsoleConfigModeSocket, + Socket: ptr(serialSocketPath(cfg.SerialLogPath)), } // Console off (we use serial) diff --git a/lib/hypervisor/cloudhypervisor/fork_snapshot.go b/lib/hypervisor/cloudhypervisor/fork_snapshot.go index 831c600e..7db73ced 100644 --- a/lib/hypervisor/cloudhypervisor/fork_snapshot.go +++ b/lib/hypervisor/cloudhypervisor/fork_snapshot.go @@ -88,7 +88,37 @@ func updateSerialConfig(config map[string]any, logPath string) { if !ok || serial == nil { return } - serial["file"] = logPath + // Forks always use the socket-based serial reader (see config.go), so + // rewrite to the new shape regardless of the source snapshot's mode. + // This also migrates legacy File-mode snapshots to Socket on fork. + delete(serial, "file") + serial["mode"] = "Socket" + serial["socket"] = serialSocketPath(logPath) +} + +// rewriteSerialConfigForRestore migrates the on-disk snapshot config so +// CH binds the socket-mode reader on restore. Pre-fix snapshots embed +// serial.mode=File, which keeps the original copytruncate sparse-hole +// bug alive after restore. New snapshots are already mode=Socket, so +// this is a cheap no-op for them. +func rewriteSerialConfigForRestore(configPath, logPath string) error { + data, err := os.ReadFile(configPath) + if err != nil { + return fmt.Errorf("read snapshot config: %w", err) + } + var config map[string]any + if err := json.Unmarshal(data, &config); err != nil { + return fmt.Errorf("unmarshal snapshot config: %w", err) + } + updateSerialConfig(config, logPath) + updated, err := json.MarshalIndent(config, "", " ") + if err != nil { + return fmt.Errorf("marshal snapshot config: %w", err) + } + if err := os.WriteFile(configPath, updated, 0644); err != nil { + return fmt.Errorf("write snapshot config: %w", err) + } + return nil } func updateNetworkConfig(config map[string]any, netCfg *hypervisor.ForkNetworkConfig) { diff --git a/lib/hypervisor/cloudhypervisor/fork_snapshot_test.go b/lib/hypervisor/cloudhypervisor/fork_snapshot_test.go index b25b1034..4f024b99 100644 --- a/lib/hypervisor/cloudhypervisor/fork_snapshot_test.go +++ b/lib/hypervisor/cloudhypervisor/fork_snapshot_test.go @@ -17,7 +17,7 @@ func TestRewriteSnapshotConfigForFork(t *testing.T) { orig := map[string]any{ "disks": []any{map[string]any{"path": "/src/guests/a/overlay.raw"}}, - "serial": map[string]any{"file": "/src/guests/a/logs/app.log"}, + "serial": map[string]any{"mode": "Socket", "socket": "/src/guests/a/serial.sock"}, "vsock": map[string]any{"cid": float64(100), "socket": "/src/guests/a/vsock.sock"}, "metadata": map[string]any{ "note": "keep-/src/guests/a-as-substring", @@ -59,7 +59,10 @@ func TestRewriteSnapshotConfigForFork(t *testing.T) { assert.Equal(t, "/dst/guests/b/overlay.raw", disk0["path"]) serial := updated["serial"].(map[string]any) - assert.Equal(t, "/dst/guests/b/logs/app.log", serial["file"]) + assert.Equal(t, "Socket", serial["mode"]) + assert.Equal(t, "/dst/guests/b/serial.sock", serial["socket"]) + _, hasFile := serial["file"] + assert.False(t, hasFile, "fork rewrite should drop legacy serial.file") vsock := updated["vsock"].(map[string]any) assert.Equal(t, float64(100), vsock["cid"]) @@ -74,3 +77,62 @@ func TestRewriteSnapshotConfigForFork(t *testing.T) { metadata := updated["metadata"].(map[string]any) assert.Equal(t, "keep-/src/guests/a-as-substring", metadata["note"]) } + +func TestRewriteSerialConfigForRestore(t *testing.T) { + t.Run("FileToSocket", func(t *testing.T) { + path := writeSnapshotConfig(t, map[string]any{ + "serial": map[string]any{"mode": "File", "file": "/old/logs/app.log"}, + }) + require.NoError(t, rewriteSerialConfigForRestore(path, "/inst/logs/app.log")) + + serial := readSerialConfig(t, path) + assert.Equal(t, "Socket", serial["mode"]) + assert.Equal(t, "/inst/serial.sock", serial["socket"]) + _, hasFile := serial["file"] + assert.False(t, hasFile, "legacy file field must be removed") + }) + + t.Run("AlreadySocketIsIdempotent", func(t *testing.T) { + path := writeSnapshotConfig(t, map[string]any{ + "serial": map[string]any{"mode": "Socket", "socket": "/inst/serial.sock"}, + }) + require.NoError(t, rewriteSerialConfigForRestore(path, "/inst/logs/app.log")) + + serial := readSerialConfig(t, path) + assert.Equal(t, "Socket", serial["mode"]) + assert.Equal(t, "/inst/serial.sock", serial["socket"]) + }) + + t.Run("NoSerialBlockIsNoOp", func(t *testing.T) { + path := writeSnapshotConfig(t, map[string]any{ + "disks": []any{map[string]any{"path": "/x"}}, + }) + require.NoError(t, rewriteSerialConfigForRestore(path, "/inst/logs/app.log")) + + var cfg map[string]any + data, err := os.ReadFile(path) + require.NoError(t, err) + require.NoError(t, json.Unmarshal(data, &cfg)) + _, hasSerial := cfg["serial"] + assert.False(t, hasSerial, "should not synthesize a serial block") + }) +} + +func writeSnapshotConfig(t *testing.T, cfg map[string]any) string { + t.Helper() + dir := t.TempDir() + path := filepath.Join(dir, "config.json") + data, err := json.Marshal(cfg) + require.NoError(t, err) + require.NoError(t, os.WriteFile(path, data, 0644)) + return path +} + +func readSerialConfig(t *testing.T, path string) map[string]any { + t.Helper() + data, err := os.ReadFile(path) + require.NoError(t, err) + var cfg map[string]any + require.NoError(t, json.Unmarshal(data, &cfg)) + return cfg["serial"].(map[string]any) +} diff --git a/lib/hypervisor/cloudhypervisor/process.go b/lib/hypervisor/cloudhypervisor/process.go index 0618de43..bd609c4f 100644 --- a/lib/hypervisor/cloudhypervisor/process.go +++ b/lib/hypervisor/cloudhypervisor/process.go @@ -69,17 +69,26 @@ func (s *Starter) StartVM(ctx context.Context, p *paths.Paths, version string, s return 0, nil, fmt.Errorf("unsupported cloud-hypervisor version: %s", version) } + // 0. Start the serial reader before CH so the unix socket is bound by + // the time CH boots and tries to connect. + sr, err := startSerialReader(ctx, serialSocketPath(config.SerialLogPath), config.SerialLogPath) + if err != nil { + return 0, nil, fmt.Errorf("start serial reader: %w", err) + } + // 1. Start the Cloud Hypervisor process processCtx, processSpan := hypervisor.StartProcessSpan(ctx, hypervisor.TypeCloudHypervisor) pid, err := vmm.StartProcess(processCtx, p, chVersion, socketPath) hypervisor.FinishTraceSpan(processSpan, err) if err != nil { + sr.Close() return 0, nil, fmt.Errorf("start process: %w", err) } // Setup cleanup to kill the process if subsequent steps fail cu := cleanup.Make(func() { syscall.Kill(pid, syscall.SIGKILL) + sr.Close() }) defer cu.Clean() @@ -112,7 +121,10 @@ func (s *Starter) StartVM(ctx context.Context, p *paths.Paths, version string, s return 0, nil, fmt.Errorf("boot vm failed with status %d: %s", bootResp.StatusCode(), string(bootResp.Body)) } - // Success - release cleanup to prevent killing the process + // Success - release cleanup to prevent killing the process. Hand + // ownership of the serial reader to the client so Shutdown can + // stop it. + hv.serial = sr cu.Release() return pid, hv, nil } @@ -129,12 +141,29 @@ func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string, return 0, nil, fmt.Errorf("unsupported cloud-hypervisor version: %s", version) } + // 0. Start the serial reader before CH. The serial log path lives at + // a fixed offset from the CH API socket within the instance directory. + logPath := filepath.Join(filepath.Dir(socketPath), "logs", "app.log") + sr, err := startSerialReader(ctx, serialSocketPath(logPath), logPath) + if err != nil { + return 0, nil, fmt.Errorf("start serial reader: %w", err) + } + + // Migrate legacy serial.mode=File snapshots to Socket so CH binds the + // reader's socket on restore. New snapshots are already Socket; the + // rewrite is idempotent. + if err := rewriteSerialConfigForRestore(filepath.Join(snapshotPath, "config.json"), logPath); err != nil { + sr.Close() + return 0, nil, fmt.Errorf("rewrite snapshot serial config: %w", err) + } + // 1. Start the Cloud Hypervisor process processStartTime := time.Now() processCtx, processSpan := hypervisor.StartProcessSpan(ctx, hypervisor.TypeCloudHypervisor) pid, err := vmm.StartProcess(processCtx, p, chVersion, socketPath) hypervisor.FinishTraceSpan(processSpan, err) if err != nil { + sr.Close() return 0, nil, fmt.Errorf("start process: %w", err) } log.DebugContext(ctx, "CH process started", "pid", pid, "duration_ms", time.Since(processStartTime).Milliseconds()) @@ -142,6 +171,7 @@ func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string, // Setup cleanup to kill the process if subsequent steps fail cu := cleanup.Make(func() { syscall.Kill(pid, syscall.SIGKILL) + sr.Close() }) defer cu.Clean() @@ -167,7 +197,10 @@ func (s *Starter) RestoreVM(ctx context.Context, p *paths.Paths, version string, } log.DebugContext(ctx, "CH restore API complete", "duration_ms", time.Since(restoreAPIStart).Milliseconds()) - // Success - release cleanup to prevent killing the process + // Success - release cleanup to prevent killing the process. Hand + // ownership of the serial reader to the client so Shutdown can + // stop it. + hv.serial = sr cu.Release() log.DebugContext(ctx, "CH restore complete", "pid", pid, "total_duration_ms", time.Since(startTime).Milliseconds()) return pid, hv, nil diff --git a/lib/hypervisor/cloudhypervisor/serial.go b/lib/hypervisor/cloudhypervisor/serial.go new file mode 100644 index 00000000..9fa3c7e2 --- /dev/null +++ b/lib/hypervisor/cloudhypervisor/serial.go @@ -0,0 +1,162 @@ +package cloudhypervisor + +import ( + "context" + "errors" + "fmt" + "io" + "log/slog" + "net" + "os" + "path/filepath" + "sync" + "syscall" + "time" + + "github.com/kernel/hypeman/lib/logger" +) + +// serialReader connects to the per-instance unix socket that Cloud +// Hypervisor binds for serial output (mode=Socket), and copies bytes +// from that connection into the serial log file with O_APPEND. Owning +// the writer fd lets copytruncate log rotation work safely: O_APPEND +// atomically seeks to EOF on every write, so writes after a truncate +// land at byte 0 instead of the writer's stale offset. +// +// CH is the server here — it calls UnixListener::bind(socket) during +// VM creation. Hypeman is the client and dials with retry, since the +// reader is started before CH has had a chance to bind. +type serialReader struct { + socketPath string + logPath string + logFile *os.File + + cancel context.CancelFunc + done chan struct{} + + mu sync.Mutex + conn net.Conn +} + +// startSerialReader removes any stale socket left over from a prior +// run, opens the log file with O_APPEND, and spawns a goroutine that +// dials the socket once CH binds it and pipes serial output into the +// log. Callers must call Close on failure paths and on shutdown. +func startSerialReader(ctx context.Context, socketPath, logPath string) (*serialReader, error) { + if socketPath == "" || logPath == "" { + return nil, errors.New("serial: socket and log path are required") + } + + if err := os.MkdirAll(filepath.Dir(logPath), 0755); err != nil { + return nil, fmt.Errorf("serial: create log dir: %w", err) + } + + // CH refuses to start if the socket path already exists (it calls + // bind() on it). Wipe any leftover from a prior run. + if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) { + return nil, fmt.Errorf("serial: remove stale socket: %w", err) + } + + f, err := os.OpenFile(logPath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0644) + if err != nil { + return nil, fmt.Errorf("serial: open log: %w", err) + } + + runCtx, cancel := context.WithCancel(context.Background()) + sr := &serialReader{ + socketPath: socketPath, + logPath: logPath, + logFile: f, + cancel: cancel, + done: make(chan struct{}), + } + go sr.run(runCtx, logger.FromContext(ctx)) + return sr, nil +} + +func (s *serialReader) run(ctx context.Context, log *slog.Logger) { + defer close(s.done) + defer s.logFile.Close() + // Drop the socket file when the goroutine exits so it doesn't persist + // on disk after the VM is gone, regardless of whether Close was + // called explicitly. CH normally unlinks the path itself on Drop, + // but this is belt-and-suspenders for the leak-on-success case. + defer func() { _ = os.Remove(s.socketPath) }() + + conn, err := dialUnixWithRetry(ctx, s.socketPath, 60*time.Second) + if err != nil { + // Caller closed us before CH bound the socket, or CH never + // did. Either way, nothing more to do. + return + } + // Re-check ctx under the lock before publishing the conn. If Close + // fired between dial returning and us taking the lock, it would have + // observed s.conn == nil and given up — without this check we'd + // publish the conn afterward and io.Copy would block forever. + s.mu.Lock() + if ctx.Err() != nil { + s.mu.Unlock() + _ = conn.Close() + return + } + s.conn = conn + s.mu.Unlock() + + if _, err := io.Copy(s.logFile, conn); err != nil && + !errors.Is(err, io.EOF) && + !errors.Is(err, net.ErrClosed) && + !errors.Is(err, syscall.EPIPE) { + log.WarnContext(ctx, "serial: copy ended with error", + "path", s.logPath, "err", err) + } + _ = conn.Close() +} + +// dialUnixWithRetry polls for the CH listener to come up. The socket +// path is created by CH inside vm.create, which happens after the +// reader is started, so we must wait. Returns the first successful +// connection or the last dial error after timeout. +func dialUnixWithRetry(ctx context.Context, path string, timeout time.Duration) (net.Conn, error) { + deadline := time.Now().Add(timeout) + var dialer net.Dialer + var lastErr error + for { + if err := ctx.Err(); err != nil { + if lastErr != nil { + return nil, lastErr + } + return nil, err + } + conn, err := dialer.DialContext(ctx, "unix", path) + if err == nil { + return conn, nil + } + lastErr = err + if time.Now().After(deadline) { + return nil, fmt.Errorf("serial: dial %s: %w", path, lastErr) + } + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(50 * time.Millisecond): + } + } +} + +// Close stops the reader. Safe to call multiple times. +func (s *serialReader) Close() { + if s == nil { + return + } + s.cancel() + s.mu.Lock() + if s.conn != nil { + _ = s.conn.Close() + } + s.mu.Unlock() + select { + case <-s.done: + case <-time.After(2 * time.Second): + } + _ = os.Remove(s.socketPath) +} diff --git a/lib/hypervisor/cloudhypervisor/serial_test.go b/lib/hypervisor/cloudhypervisor/serial_test.go new file mode 100644 index 00000000..466595c4 --- /dev/null +++ b/lib/hypervisor/cloudhypervisor/serial_test.go @@ -0,0 +1,132 @@ +package cloudhypervisor + +import ( + "context" + "io" + "net" + "os" + "path/filepath" + "syscall" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// shortTempDir returns a temp dir with a short prefix so unix socket +// paths stay under the platform sun_path limit (108 bytes on Linux, +// 104 on macOS). t.TempDir() under /var/folders on macOS overflows. +func shortTempDir(t *testing.T) string { + t.Helper() + dir, err := os.MkdirTemp("/tmp", "ch") + require.NoError(t, err) + t.Cleanup(func() { _ = os.RemoveAll(dir) }) + return dir +} + +func TestSerialReader_CopiesBytesToLog(t *testing.T) { + tmp := shortTempDir(t) + logPath := filepath.Join(tmp, "logs", "app.log") + sockPath := serialSocketPath(logPath) + + // Reader starts first (mirrors production order: hypeman starts the + // reader, then CH binds the socket during vm.create). The reader + // dials with retry until the listener comes up. + sr, err := startSerialReader(context.Background(), sockPath, logPath) + require.NoError(t, err) + t.Cleanup(sr.Close) + + ln, err := net.Listen("unix", sockPath) + require.NoError(t, err) + t.Cleanup(func() { _ = ln.Close() }) + + conn, err := ln.Accept() + require.NoError(t, err) + + payload := []byte("hello from cloud hypervisor\n") + _, err = conn.Write(payload) + require.NoError(t, err) + require.NoError(t, conn.Close()) + + require.Eventually(t, func() bool { + data, err := os.ReadFile(logPath) + return err == nil && len(data) == len(payload) + }, 2*time.Second, 10*time.Millisecond, "expected reader to flush bytes to log") + + data, err := os.ReadFile(logPath) + require.NoError(t, err) + assert.Equal(t, payload, data) +} + +// TestSerialReader_NoSparseHoleAfterCopytruncate is the regression test +// for the bug that motivated this fix: copytruncate against a file whose +// writer holds a non-O_APPEND fd creates a sparse hole of NUL bytes from +// byte 0 to the writer's stale offset. Hypeman now owns the writer fd +// with O_APPEND, so post-truncate writes correctly resume at byte 0. +func TestSerialReader_NoSparseHoleAfterCopytruncate(t *testing.T) { + tmp := shortTempDir(t) + logPath := filepath.Join(tmp, "logs", "app.log") + sockPath := serialSocketPath(logPath) + + sr, err := startSerialReader(context.Background(), sockPath, logPath) + require.NoError(t, err) + t.Cleanup(sr.Close) + + ln, err := net.Listen("unix", sockPath) + require.NoError(t, err) + t.Cleanup(func() { _ = ln.Close() }) + + conn, err := ln.Accept() + require.NoError(t, err) + t.Cleanup(func() { _ = conn.Close() }) + + pre := []byte("pre-rotate line\n") + _, err = conn.Write(pre) + require.NoError(t, err) + + require.Eventually(t, func() bool { + st, err := os.Stat(logPath) + return err == nil && st.Size() == int64(len(pre)) + }, 2*time.Second, 10*time.Millisecond) + + // Mimic rotateLogIfNeeded: copy then truncate the file out from under + // the writer. + require.NoError(t, copyFile(logPath, logPath+".1")) + require.NoError(t, os.Truncate(logPath, 0)) + + post := []byte("post-rotate line\n") + _, err = conn.Write(post) + require.NoError(t, err) + + require.Eventually(t, func() bool { + st, err := os.Stat(logPath) + return err == nil && st.Size() == int64(len(post)) + }, 2*time.Second, 10*time.Millisecond) + + st, err := os.Stat(logPath) + require.NoError(t, err) + sys := st.Sys().(*syscall.Stat_t) + allocated := int64(sys.Blocks) * 512 + apparent := st.Size() + assert.LessOrEqual(t, apparent, allocated, "post-truncate file is sparse: apparent=%d allocated=%d", apparent, allocated) + + data, err := os.ReadFile(logPath) + require.NoError(t, err) + assert.Equal(t, post, data, "post-truncate writes should land at byte 0, not at the writer's stale offset") +} + +func copyFile(src, dst string) error { + in, err := os.Open(src) + if err != nil { + return err + } + defer in.Close() + out, err := os.Create(dst) + if err != nil { + return err + } + defer out.Close() + _, err = io.Copy(out, in) + return err +}