Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 50 additions & 3 deletions pkg/executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"log/slog"
Expand All @@ -24,12 +25,22 @@ const (
serviceName = "executor"
)

// Run starts the command, waits for it to complete, and returns the error.
// The child PID is registered in the global process registry while the process
// is running so that a PID-1 zombie reaper does not steal it.
func Run(cmd *exec.Cmd) error {
// TODO context: hook name, hook phase, hook binding
// TODO observability
log.Debug("Executing command", slog.String(pkg.LogKeyCommand, strings.Join(cmd.Args, " ")), slog.String(pkg.LogKeyDir, cmd.Dir))

return cmd.Run()
if err := cmd.Start(); err != nil {
return err
}

registerPID(cmd.Process.Pid)
defer unregisterPID(cmd.Process.Pid)

return cmd.Wait()
}

// StderrError is returned by RunAndLogLines when a command fails and produces
Expand Down Expand Up @@ -113,7 +124,36 @@ func (e *Executor) Output() ([]byte, error) {
e.logger.Debug("Executing command",
slog.String(pkg.LogKeyCommand, strings.Join(e.cmd.Args, " ")),
slog.String(pkg.LogKeyDir, e.cmd.Dir))
return e.cmd.Output()

// Reproduce cmd.Output() but interleave PID registration so that the
// PID-1 zombie reaper skips this process.
if e.cmd.Stdout != nil {
return nil, errors.New("exec: Stdout already set")
}
var stdout bytes.Buffer
e.cmd.Stdout = &stdout

captureErr := e.cmd.Stderr == nil
var stderrBuf bytes.Buffer
if captureErr {
e.cmd.Stderr = &stderrBuf
}

if err := e.cmd.Start(); err != nil {
return nil, err
}

registerPID(e.cmd.Process.Pid)
defer unregisterPID(e.cmd.Process.Pid)

err := e.cmd.Wait()
if err != nil && captureErr {
if ee, ok := err.(*exec.ExitError); ok {
ee.Stderr = stderrBuf.Bytes()
}
}

return stdout.Bytes(), err
}

type CmdUsage struct {
Expand Down Expand Up @@ -154,7 +194,14 @@ func (e *Executor) RunAndLogLines(ctx context.Context, logLabels map[string]stri
e.cmd.Stdout = plo
e.cmd.Stderr = io.MultiWriter(ple, stdErr)

err := e.cmd.Run()
if err := e.cmd.Start(); err != nil {
return nil, fmt.Errorf("cmd start: %w", err)
}

registerPID(e.cmd.Process.Pid)
defer unregisterPID(e.cmd.Process.Pid)

err := e.cmd.Wait()
if err != nil {
if len(stdErr.Bytes()) > 0 {
return nil, &StderrError{Message: stdErr.String()}
Expand Down
155 changes: 154 additions & 1 deletion pkg/executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package executor
import (
"bytes"
"context"
json "github.com/flant/shell-operator/pkg/utils/json"
"fmt"
"io"
"math/rand/v2"
Expand All @@ -16,6 +15,8 @@ import (
"github.com/deckhouse/deckhouse/pkg/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

json "github.com/flant/shell-operator/pkg/utils/json"
)

func TestRunAndLogLines(t *testing.T) {
Expand Down Expand Up @@ -250,3 +251,155 @@ func randStringRunes(n int) string {
}
return string(b)
}

// newTestRegistry creates a fresh processRegistry for tests and swaps the
// global singleton, returning a cleanup function that restores it.
func newTestRegistry(t *testing.T) *processRegistry {
t.Helper()

r := &processRegistry{activePIDs: make(map[int32]struct{})}
orig := registry
registry = r
t.Cleanup(func() { registry = orig })

return r
}

func TestProcessRegistry_Basic(t *testing.T) {
r := &processRegistry{activePIDs: make(map[int32]struct{})}

// Initially empty
assert.False(t, r.IsActive(1), "IsActive should return false for unknown PID")
assert.False(t, r.IsActive(12345), "IsActive should return false for unknown PID")

// Register and check
r.register(42)
assert.True(t, r.IsActive(42), "IsActive should return true for registered PID")
assert.False(t, r.IsActive(43), "IsActive should return false for different PID")

// Unregister and check
r.unregister(42)
assert.False(t, r.IsActive(42), "IsActive should return false after unregister")
}

func TestProcessRegistry_DoubleUnregister(t *testing.T) {
r := &processRegistry{activePIDs: make(map[int32]struct{})}

r.register(100)
r.unregister(100)
r.unregister(100) // should not panic

assert.False(t, r.IsActive(100))
}

func TestProcessRegistry_Concurrent(t *testing.T) {
r := &processRegistry{activePIDs: make(map[int32]struct{})}
const goroutines = 100
const pidsPerGoroutine = 100

done := make(chan struct{})

// Concurrently register PIDs
for i := range goroutines {
go func() {
defer func() { done <- struct{}{} }()
for j := 0; j < pidsPerGoroutine; j++ {
r.register(i*pidsPerGoroutine + j)
}
}()
}

for range goroutines {
<-done
}

// All PIDs should be registered
for i := range goroutines {
for j := 0; j < pidsPerGoroutine; j++ {
assert.True(t, r.IsActive(i*pidsPerGoroutine+j))
}
}

// Concurrently unregister PIDs
for i := range goroutines {
go func() {
defer func() { done <- struct{}{} }()
for j := 0; j < pidsPerGoroutine; j++ {
r.unregister(i*pidsPerGoroutine + j)
}
}()
}

for range goroutines {
<-done
}

// All PIDs should be unregistered
for i := range goroutines {
for j := 0; j < pidsPerGoroutine; j++ {
assert.False(t, r.IsActive(i*pidsPerGoroutine+j))
}
}
}

func TestTracker_IsActive(t *testing.T) {
r := newTestRegistry(t)
tracker := Tracker()

// PID not registered
assert.False(t, tracker.IsActive(42))

// Register via internal helper (same path as executor methods)
r.register(42)
assert.True(t, tracker.IsActive(42))

r.unregister(42)
assert.False(t, tracker.IsActive(42))
}

func TestGlobalRegistry_Output_RegistersPID(t *testing.T) {
newTestRegistry(t)

ex := NewExecutor("", "echo", []string{"hello"}, []string{})

output, err := ex.Output()
assert.NoError(t, err)
assert.Contains(t, string(output), "hello")

// PID should be unregistered after Output returns.
}

func TestGlobalRegistry_Output_FailedStart(t *testing.T) {
newTestRegistry(t)

// Command that doesn't exist — Start() should fail.
ex := NewExecutor("", "/nonexistent/binary", []string{}, []string{})
_, err := ex.Output()
assert.Error(t, err)
}

func TestGlobalRegistry_RunAndLogLines_RegistersPID(t *testing.T) {
newTestRegistry(t)

logger := log.NewLogger()
logger.SetLevel(log.LevelInfo)

ex := NewExecutor("", "echo", []string{"test-output"}, []string{}).
WithLogger(logger)

usage, err := ex.RunAndLogLines(context.Background(), map[string]string{})
assert.NoError(t, err)
assert.NotNil(t, usage)
}

func TestGlobalRegistry_RunAndLogLines_FailedStart(t *testing.T) {
newTestRegistry(t)

logger := log.NewLogger()

ex := NewExecutor("", "/nonexistent/binary", []string{}, []string{}).
WithLogger(logger)

_, err := ex.RunAndLogLines(context.Background(), map[string]string{})
assert.Error(t, err)
}
73 changes: 73 additions & 0 deletions pkg/executor/registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package executor

import "sync"

// ProcessTracker is a read-only view into the process registry.
// It is intended for consumers (such as a PID-1 zombie reaper) that need
// to check whether a PID is managed by the executor but must not modify
// the registry.
type ProcessTracker interface {
// IsActive reports whether pid is currently tracked as a running process.
IsActive(pid int) bool
}

// processRegistry tracks PIDs of processes started by the executor so that
// a PID-1 zombie reaper can skip them (their parent already calls Wait).
// This prevents the reaper from stealing a child that cmd.Wait expects to reap.
//
// The struct is intentionally unexported — all external access goes through
// the ProcessTracker interface (read-only) or the package-level helpers
// registerPID / unregisterPID (write, executor-internal).
type processRegistry struct {
mu sync.RWMutex
activePIDs map[int32]struct{}
}

// register adds pid to the set of active PIDs.
func (r *processRegistry) register(pid int) {
r.mu.Lock()
defer r.mu.Unlock()

r.activePIDs[int32(pid)] = struct{}{}
}

// unregister removes pid from the set of active PIDs.
func (r *processRegistry) unregister(pid int) {
r.mu.Lock()
defer r.mu.Unlock()

delete(r.activePIDs, int32(pid))
}

// IsActive reports whether pid is currently tracked as an active process.
func (r *processRegistry) IsActive(pid int) bool {
r.mu.RLock()
defer r.mu.RUnlock()

_, ok := r.activePIDs[int32(pid)]

return ok
}

// registry is the singleton process registry.
// It is not exported — external packages obtain a ProcessTracker via Tracker().
var registry = &processRegistry{
activePIDs: make(map[int32]struct{}),
}

// Tracker returns a read-only view of the global process registry.
// The zombie reaper should call this once and use the returned ProcessTracker
// to check whether a PID is managed by the executor.
func Tracker() ProcessTracker {
return registry
}

// registerPID and unregisterPID are package-internal helpers used by Run,
// Output, and RunAndLogLines to track child PIDs.
func registerPID(pid int) {
registry.register(pid)
}

func unregisterPID(pid int) {
registry.unregister(pid)
}
Loading