Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 96 additions & 24 deletions Sources/CodexBarCore/Host/Process/SubprocessRunner.swift
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,66 @@ public struct SubprocessResult: Sendable {
public enum SubprocessRunner {
private static let log = CodexBarLog.logger(LogCategories.subprocess)

/// Thread-safe flag for communicating between concurrent tasks (e.g. timeout → caller).
private final class KillFlag: @unchecked Sendable {
private let lock = NSLock()
private var value = false

func set() {
self.lock.withLock { self.value = true }
}

var isSet: Bool {
self.lock.withLock { self.value }
}
}

// MARK: - Helpers to move blocking calls off the cooperative thread pool

/// Runs `readDataToEndOfFile()` on a GCD thread so it does not block the Swift cooperative pool.
private static func readDataOffPool(_ fileHandle: FileHandle) async -> Data {
await withCheckedContinuation { continuation in
DispatchQueue.global().async {
let data = fileHandle.readDataToEndOfFile()
continuation.resume(returning: data)
}
}
}

/// Runs `waitUntilExit()` on a GCD thread so it does not block the Swift cooperative pool.
private static func waitForExitOffPool(_ process: Process) async -> Int32 {
await withCheckedContinuation { continuation in
DispatchQueue.global().async {
process.waitUntilExit()
continuation.resume(returning: process.terminationStatus)
}
}
}

/// Terminates a process and its process group, escalating from SIGTERM to SIGKILL.
/// Returns `true` if the process was actually killed, `false` if it had already exited.
@discardableResult
private static func terminateProcess(_ process: Process, processGroup: pid_t?) -> Bool {
guard process.isRunning else { return false }
process.terminate()
if let pgid = processGroup {
kill(-pgid, SIGTERM)
}
let killDeadline = Date().addingTimeInterval(0.4)
while process.isRunning, Date() < killDeadline {
usleep(50000)
}
if process.isRunning {
if let pgid = processGroup {
kill(-pgid, SIGKILL)
}
kill(process.processIdentifier, SIGKILL)
}
return true
}

// MARK: - Public API

public static func run(
binary: String,
arguments: [String],
Expand Down Expand Up @@ -66,10 +126,10 @@ public enum SubprocessRunner {
process.standardInput = nil

let stdoutTask = Task<Data, Never> {
stdoutPipe.fileHandleForReading.readDataToEndOfFile()
await self.readDataOffPool(stdoutPipe.fileHandleForReading)
}
let stderrTask = Task<Data, Never> {
stderrPipe.fileHandleForReading.readDataToEndOfFile()
await self.readDataOffPool(stderrPipe.fileHandleForReading)
}

do {
Expand All @@ -82,29 +142,55 @@ public enum SubprocessRunner {
throw SubprocessRunnerError.launchFailed(error.localizedDescription)
}

var processGroup: pid_t?
let pid = process.processIdentifier
if setpgid(pid, pid) == 0 {
processGroup = pid
}
let processGroup: pid_t? = setpgid(pid, pid) == 0 ? pid : nil

let exitCodeTask = Task<Int32, Never> {
process.waitUntilExit()
return process.terminationStatus
await self.waitForExitOffPool(process)
}

let killedByTimeout = KillFlag()

do {
let exitCode = try await withThrowingTaskGroup(of: Int32.self) { group in
group.addTask { await exitCodeTask.value }
group.addTask {
try await Task.sleep(for: .seconds(timeout))
// Kill the process BEFORE throwing so the exit-code task can complete
// and withThrowingTaskGroup can exit promptly. Only throw if we
// actually killed the process; if it already exited, let the exit
// code win the race naturally.
guard self.terminateProcess(process, processGroup: processGroup) else {
return await exitCodeTask.value
}
killedByTimeout.set()
throw SubprocessRunnerError.timedOut(label)
Comment on lines 158 to 167
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid throwing timeout after the process already exited

After waitUntilExit() was moved behind DispatchQueue.global() in SubprocessRunner.run, there is now an extra scheduling hop before exitCodeTask can finish. For commands that complete right around the deadline, this timeout task can wake first, terminateProcess will no-op because process.isRunning is already false, and we still throw .timedOut. That turns successful near-threshold commands into false failures/fallbacks; the timeout branch should only throw if the process was still running or if it actually performed the kill.

Useful? React with 👍 / 👎.

}
let code = try await group.next()!
group.cancelAll()
return code
}

// Race guard: our timeout task killed the process, but the exit code
// arrived at group.next() before the .timedOut throw. Use the explicit
// flag instead of wall-clock heuristics to avoid misclassifying processes
// that crash or are killed externally.
if killedByTimeout.isSet {
let duration = Date().timeIntervalSince(start)
self.log.warning(
"Subprocess timed out (race)",
metadata: [
"label": label,
"binary": binaryName,
"duration_ms": "\(Int(duration * 1000))",
])
stdoutTask.cancel()
stderrTask.cancel()
stdoutPipe.fileHandleForReading.closeFile()
stderrPipe.fileHandleForReading.closeFile()
throw SubprocessRunnerError.timedOut(label)
}

let stdoutData = await stdoutTask.value
let stderrData = await stderrTask.value
let stdout = String(data: stdoutData, encoding: .utf8) ?? ""
Expand Down Expand Up @@ -142,22 +228,8 @@ public enum SubprocessRunner {
"binary": binaryName,
"duration_ms": "\(Int(duration * 1000))",
])
if process.isRunning {
process.terminate()
if let pgid = processGroup {
kill(-pgid, SIGTERM)
}
let killDeadline = Date().addingTimeInterval(0.4)
while process.isRunning, Date() < killDeadline {
usleep(50000)
}
if process.isRunning {
if let pgid = processGroup {
kill(-pgid, SIGKILL)
}
kill(process.processIdentifier, SIGKILL)
}
}
// Safety net: ensure the process is dead (may already be killed by timeout task).
self.terminateProcess(process, processGroup: processGroup)
exitCodeTask.cancel()
stdoutTask.cancel()
stderrTask.cancel()
Expand Down
118 changes: 118 additions & 0 deletions Tests/CodexBarTests/SubprocessRunnerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,122 @@ struct SubprocessRunnerTests {
#expect(result.stdout.count >= 1_000_000)
#expect(result.stderr.isEmpty)
}

/// Regression test for #474: a hung subprocess must be killed and throw `.timedOut`
/// instead of blocking indefinitely.
///
/// This test was previously deleted (commit 3961770) because `waitUntilExit()` blocked
/// the cooperative thread pool, starving the timeout task. The fix moves blocking calls
/// to `DispatchQueue.global()`, making this test reliable.
@Test
func throwsTimedOutWhenProcessHangs() async throws {
let start = Date()
do {
_ = try await SubprocessRunner.run(
binary: "/bin/sleep",
arguments: ["5"],
environment: ProcessInfo.processInfo.environment,
timeout: 1,
label: "hung-process-test")
Issue.record("Expected SubprocessRunnerError.timedOut but no error was thrown")
} catch let error as SubprocessRunnerError {
guard case let .timedOut(label) = error else {
Issue.record("Expected .timedOut, got \(error)")
return
}
#expect(label == "hung-process-test")
} catch {
Issue.record("Expected SubprocessRunnerError.timedOut, got unexpected error: \(error)")
}

let elapsed = Date().timeIntervalSince(start)
// Must complete in well under 5s (the sleep duration). Allow generous bound for CI.
#expect(elapsed < 3, "Timeout should fire in ~1s, not wait for process to exit naturally")
}

/// Multiple concurrent hung subprocesses must all time out independently, proving that
/// one blocked subprocess does not starve the timeout mechanism of others.
/// This is the core scenario that caused the original permanent-refresh-stall bug.
@Test
func concurrentHungProcessesAllTimeOut() async {
let start = Date()
let count = 8

await withTaskGroup(of: Void.self) { group in
for i in 0..<count {
group.addTask {
do {
_ = try await SubprocessRunner.run(
binary: "/bin/sleep",
arguments: ["5"],
environment: ProcessInfo.processInfo.environment,
timeout: 2,
label: "concurrent-hung-\(i)")
Issue.record("Expected .timedOut for concurrent-hung-\(i)")
} catch let error as SubprocessRunnerError {
guard case .timedOut = error else {
Issue.record("Expected .timedOut for concurrent-hung-\(i), got \(error)")
return
}
} catch {
Issue.record("Unexpected error for concurrent-hung-\(i): \(error)")
}
}
}
}

let elapsed = Date().timeIntervalSince(start)
// All 8 should time out in ~2s (parallel), not wait for the 5s sleep.
// Use a generous 4s bound for slow CI.
#expect(
elapsed < 4,
"All \(count) concurrent timeouts should fire in ~2s, took \(elapsed)s")
}

/// Stress-test the timeout race guard: with very short timeouts, the exit-code task
/// and the timeout task race tightly, exercising the KillFlag synchronization path.
@Test
func timeoutRaceStress() async {
for i in 0..<20 {
do {
_ = try await SubprocessRunner.run(
binary: "/bin/sleep",
arguments: ["1"],
environment: ProcessInfo.processInfo.environment,
timeout: 0.1,
label: "race-stress-\(i)")
Issue.record("Expected .timedOut for iteration \(i)")
} catch let error as SubprocessRunnerError {
guard case .timedOut = error else {
Issue.record("Expected .timedOut, got \(error) at iteration \(i)")
continue
}
} catch {
Issue.record("Unexpected error at iteration \(i): \(error)")
}
}
}

/// Verify that many concurrent SubprocessRunner calls complete without starving each other.
@Test
func concurrentCallsDoNotStarve() async throws {
try await withThrowingTaskGroup(of: SubprocessResult.self) { group in
for i in 0..<20 {
group.addTask {
try await SubprocessRunner.run(
binary: "/bin/sleep",
arguments: ["0.2"],
environment: ProcessInfo.processInfo.environment,
timeout: 10,
label: "concurrent-\(i)")
}
}

var count = 0
for try await _ in group {
count += 1
}
#expect(count == 20, "All 20 concurrent calls should complete")
}
}
}