diff --git a/pkg/executor/executor.go b/pkg/executor/executor.go index 6e723066..beb23f88 100644 --- a/pkg/executor/executor.go +++ b/pkg/executor/executor.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "context" + "errors" "fmt" "io" "log/slog" @@ -24,12 +25,22 @@ const ( serviceName = "executor" ) +// Run starts the command, waits for it to complete, and returns the error. +// The child PID is registered in the global process registry while the process +// is running so that a PID-1 zombie reaper does not steal it. func Run(cmd *exec.Cmd) error { // TODO context: hook name, hook phase, hook binding // TODO observability log.Debug("Executing command", slog.String(pkg.LogKeyCommand, strings.Join(cmd.Args, " ")), slog.String(pkg.LogKeyDir, cmd.Dir)) - return cmd.Run() + if err := cmd.Start(); err != nil { + return err + } + + registerPID(cmd.Process.Pid) + defer unregisterPID(cmd.Process.Pid) + + return cmd.Wait() } // StderrError is returned by RunAndLogLines when a command fails and produces @@ -113,7 +124,36 @@ func (e *Executor) Output() ([]byte, error) { e.logger.Debug("Executing command", slog.String(pkg.LogKeyCommand, strings.Join(e.cmd.Args, " ")), slog.String(pkg.LogKeyDir, e.cmd.Dir)) - return e.cmd.Output() + + // Reproduce cmd.Output() but interleave PID registration so that the + // PID-1 zombie reaper skips this process. + if e.cmd.Stdout != nil { + return nil, errors.New("exec: Stdout already set") + } + var stdout bytes.Buffer + e.cmd.Stdout = &stdout + + captureErr := e.cmd.Stderr == nil + var stderrBuf bytes.Buffer + if captureErr { + e.cmd.Stderr = &stderrBuf + } + + if err := e.cmd.Start(); err != nil { + return nil, err + } + + registerPID(e.cmd.Process.Pid) + defer unregisterPID(e.cmd.Process.Pid) + + err := e.cmd.Wait() + if err != nil && captureErr { + if ee, ok := err.(*exec.ExitError); ok { + ee.Stderr = stderrBuf.Bytes() + } + } + + return stdout.Bytes(), err } type CmdUsage struct { @@ -154,7 +194,14 @@ func (e *Executor) RunAndLogLines(ctx context.Context, logLabels map[string]stri e.cmd.Stdout = plo e.cmd.Stderr = io.MultiWriter(ple, stdErr) - err := e.cmd.Run() + if err := e.cmd.Start(); err != nil { + return nil, fmt.Errorf("cmd start: %w", err) + } + + registerPID(e.cmd.Process.Pid) + defer unregisterPID(e.cmd.Process.Pid) + + err := e.cmd.Wait() if err != nil { if len(stdErr.Bytes()) > 0 { return nil, &StderrError{Message: stdErr.String()} diff --git a/pkg/executor/executor_test.go b/pkg/executor/executor_test.go index ccfc52fb..ba92c134 100644 --- a/pkg/executor/executor_test.go +++ b/pkg/executor/executor_test.go @@ -3,7 +3,6 @@ package executor import ( "bytes" "context" - json "github.com/flant/shell-operator/pkg/utils/json" "fmt" "io" "math/rand/v2" @@ -16,6 +15,8 @@ import ( "github.com/deckhouse/deckhouse/pkg/log" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + json "github.com/flant/shell-operator/pkg/utils/json" ) func TestRunAndLogLines(t *testing.T) { @@ -250,3 +251,155 @@ func randStringRunes(n int) string { } return string(b) } + +// newTestRegistry creates a fresh processRegistry for tests and swaps the +// global singleton, returning a cleanup function that restores it. +func newTestRegistry(t *testing.T) *processRegistry { + t.Helper() + + r := &processRegistry{activePIDs: make(map[int32]struct{})} + orig := registry + registry = r + t.Cleanup(func() { registry = orig }) + + return r +} + +func TestProcessRegistry_Basic(t *testing.T) { + r := &processRegistry{activePIDs: make(map[int32]struct{})} + + // Initially empty + assert.False(t, r.IsActive(1), "IsActive should return false for unknown PID") + assert.False(t, r.IsActive(12345), "IsActive should return false for unknown PID") + + // Register and check + r.register(42) + assert.True(t, r.IsActive(42), "IsActive should return true for registered PID") + assert.False(t, r.IsActive(43), "IsActive should return false for different PID") + + // Unregister and check + r.unregister(42) + assert.False(t, r.IsActive(42), "IsActive should return false after unregister") +} + +func TestProcessRegistry_DoubleUnregister(t *testing.T) { + r := &processRegistry{activePIDs: make(map[int32]struct{})} + + r.register(100) + r.unregister(100) + r.unregister(100) // should not panic + + assert.False(t, r.IsActive(100)) +} + +func TestProcessRegistry_Concurrent(t *testing.T) { + r := &processRegistry{activePIDs: make(map[int32]struct{})} + const goroutines = 100 + const pidsPerGoroutine = 100 + + done := make(chan struct{}) + + // Concurrently register PIDs + for i := range goroutines { + go func() { + defer func() { done <- struct{}{} }() + for j := 0; j < pidsPerGoroutine; j++ { + r.register(i*pidsPerGoroutine + j) + } + }() + } + + for range goroutines { + <-done + } + + // All PIDs should be registered + for i := range goroutines { + for j := 0; j < pidsPerGoroutine; j++ { + assert.True(t, r.IsActive(i*pidsPerGoroutine+j)) + } + } + + // Concurrently unregister PIDs + for i := range goroutines { + go func() { + defer func() { done <- struct{}{} }() + for j := 0; j < pidsPerGoroutine; j++ { + r.unregister(i*pidsPerGoroutine + j) + } + }() + } + + for range goroutines { + <-done + } + + // All PIDs should be unregistered + for i := range goroutines { + for j := 0; j < pidsPerGoroutine; j++ { + assert.False(t, r.IsActive(i*pidsPerGoroutine+j)) + } + } +} + +func TestTracker_IsActive(t *testing.T) { + r := newTestRegistry(t) + tracker := Tracker() + + // PID not registered + assert.False(t, tracker.IsActive(42)) + + // Register via internal helper (same path as executor methods) + r.register(42) + assert.True(t, tracker.IsActive(42)) + + r.unregister(42) + assert.False(t, tracker.IsActive(42)) +} + +func TestGlobalRegistry_Output_RegistersPID(t *testing.T) { + newTestRegistry(t) + + ex := NewExecutor("", "echo", []string{"hello"}, []string{}) + + output, err := ex.Output() + assert.NoError(t, err) + assert.Contains(t, string(output), "hello") + + // PID should be unregistered after Output returns. +} + +func TestGlobalRegistry_Output_FailedStart(t *testing.T) { + newTestRegistry(t) + + // Command that doesn't exist — Start() should fail. + ex := NewExecutor("", "/nonexistent/binary", []string{}, []string{}) + _, err := ex.Output() + assert.Error(t, err) +} + +func TestGlobalRegistry_RunAndLogLines_RegistersPID(t *testing.T) { + newTestRegistry(t) + + logger := log.NewLogger() + logger.SetLevel(log.LevelInfo) + + ex := NewExecutor("", "echo", []string{"test-output"}, []string{}). + WithLogger(logger) + + usage, err := ex.RunAndLogLines(context.Background(), map[string]string{}) + assert.NoError(t, err) + assert.NotNil(t, usage) +} + +func TestGlobalRegistry_RunAndLogLines_FailedStart(t *testing.T) { + newTestRegistry(t) + + logger := log.NewLogger() + + ex := NewExecutor("", "/nonexistent/binary", []string{}, []string{}). + WithLogger(logger) + + _, err := ex.RunAndLogLines(context.Background(), map[string]string{}) + assert.Error(t, err) +} diff --git a/pkg/executor/registry.go b/pkg/executor/registry.go new file mode 100644 index 00000000..4fa5b4d7 --- /dev/null +++ b/pkg/executor/registry.go @@ -0,0 +1,73 @@ +package executor + +import "sync" + +// ProcessTracker is a read-only view into the process registry. +// It is intended for consumers (such as a PID-1 zombie reaper) that need +// to check whether a PID is managed by the executor but must not modify +// the registry. +type ProcessTracker interface { + // IsActive reports whether pid is currently tracked as a running process. + IsActive(pid int) bool +} + +// processRegistry tracks PIDs of processes started by the executor so that +// a PID-1 zombie reaper can skip them (their parent already calls Wait). +// This prevents the reaper from stealing a child that cmd.Wait expects to reap. +// +// The struct is intentionally unexported — all external access goes through +// the ProcessTracker interface (read-only) or the package-level helpers +// registerPID / unregisterPID (write, executor-internal). +type processRegistry struct { + mu sync.RWMutex + activePIDs map[int32]struct{} +} + +// register adds pid to the set of active PIDs. +func (r *processRegistry) register(pid int) { + r.mu.Lock() + defer r.mu.Unlock() + + r.activePIDs[int32(pid)] = struct{}{} +} + +// unregister removes pid from the set of active PIDs. +func (r *processRegistry) unregister(pid int) { + r.mu.Lock() + defer r.mu.Unlock() + + delete(r.activePIDs, int32(pid)) +} + +// IsActive reports whether pid is currently tracked as an active process. +func (r *processRegistry) IsActive(pid int) bool { + r.mu.RLock() + defer r.mu.RUnlock() + + _, ok := r.activePIDs[int32(pid)] + + return ok +} + +// registry is the singleton process registry. +// It is not exported — external packages obtain a ProcessTracker via Tracker(). +var registry = &processRegistry{ + activePIDs: make(map[int32]struct{}), +} + +// Tracker returns a read-only view of the global process registry. +// The zombie reaper should call this once and use the returned ProcessTracker +// to check whether a PID is managed by the executor. +func Tracker() ProcessTracker { + return registry +} + +// registerPID and unregisterPID are package-internal helpers used by Run, +// Output, and RunAndLogLines to track child PIDs. +func registerPID(pid int) { + registry.register(pid) +} + +func unregisterPID(pid int) { + registry.unregister(pid) +}