Skip to content

Commit 1389b63

Browse files
JaewonHursaehejkang
authored andcommitted
Fix race condition on SandboxService.waiters (#1289)
This PR fixes #1277. `SandboxService.waiters` had a consistency issue (not exactly race). `SandboxService.wait` XPC can be executed on arbitrary `id`, and it will hang forever if no other handler resumes it. Without knowing this internal, the high level entity can run into this issue, and deadlock. This PR simplifies the mental model: **`SandboxService.waiters[id]: ExitWaiter(continuations, exitCode)` can only be in three states: i) non-existing, ii) existing with nil `exitCode`, and iii) existing with concrete `exitCode`.** **If it is non-existing, no handler has been registered to resume it later. If existing with nil `exitCode`, It is guaranteed the registered `continuations` will be resumed later with a concrete `exitCode`. Finally, if already a concrete `exitCode`, a handler has been registered, and already resumed (with that `exitCode`).** Thus, `SandboxService.wait` should return immediately if `waiters[id]` is non-existing or existing with a concrete `exitCode` (as no handler will resume it later). It should only block when `waiters[id]` is existing with nil `exitCode` as it is guaranteed to be resumed later. By doing so, we can guarantee there is no deadlock at all. For that this PR does followings: 1. Introduce `ExitMonitor` class to updates `continuations` and `exitCode` all together atomically. Initially, `state` variable saved the `exitCode`, but it cannot be tied with `continuations` as they are protected by different primitives (i.e., lock and actor). 2. Gather `waiters` related operations into a single actor method, guaranteeing those are performed atomically under actor protection---i.e., we actually don't need Mutex here. 3. Ensure initialized `waiters` are released (i.e., resumed) later (under any possible circumstances). 4. Move `process.wait` after `process.start` in `io.handleProcess` to run `SandboxService.wait` only after the `waiters[id]` is initialized. By doing fourth step, we can guarantee `SandboxService.wait` can meet only one of two following `ExitMonitor` state: i) existing with nil `exitCode`, or ii) existing with concrete `exitCode` (in case the process exited too early). In both cases, `exitCode` is preserved and returned. ## Type of Change - [X] Bug fix - [ ] New feature - [ ] Breaking change - [ ] Documentation update ## Motivation and Context [Why is this change needed?] ## Testing - [X] Tested locally - [ ] Added/updated tests - [ ] Added/updated docs
1 parent 01a05eb commit 1389b63

3 files changed

Lines changed: 93 additions & 93 deletions

File tree

Sources/Services/ContainerAPIService/Client/ProcessIO.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,9 @@ public struct ProcessIO: Sendable {
149149
public func handleProcess(process: ClientProcess, log: Logger) async throws -> Int32 {
150150
let signals = AsyncSignalHandler.create(notify: Self.signalSet)
151151
return try await withThrowingTaskGroup(of: Int32?.self, returning: Int32.self) { group in
152+
try await process.start()
153+
try closeAfterStart()
154+
152155
let waitAdded = group.addTaskUnlessCancelled {
153156
let code = try await process.wait()
154157
try await wait()
@@ -160,9 +163,6 @@ public struct ProcessIO: Sendable {
160163
return -1
161164
}
162165

163-
try await process.start()
164-
try closeAfterStart()
165-
166166
if let current = console {
167167
let size = try current.size
168168
// It's supremely possible the process could've exited already. We shouldn't treat

Sources/Services/ContainerSandboxService/Server/SandboxService.swift

Lines changed: 75 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public actor SandboxService {
4444
private var container: ContainerInfo?
4545
private let monitor: ExitMonitor
4646
private let eventLoopGroup: any EventLoopGroup
47-
private let waiters: Mutex<[String: [CheckedContinuation<ExitStatus, Never>]?]> = Mutex([:])
47+
private var waiters: [String: ExitWaiter] = [:]
4848
private let lock: AsyncLock = AsyncLock()
4949
private let log: Logging.Logger
5050
private var state: State = .created
@@ -54,6 +54,27 @@ public actor SandboxService {
5454
private static let sshAuthSocketGuestPath = "/run/host-services/ssh-auth.sock"
5555
private static let sshAuthSocketEnvVar = "SSH_AUTH_SOCK"
5656

57+
class ExitWaiter {
58+
public var exitCode: Int32? = nil
59+
public var continuations: [CheckedContinuation<ExitStatus, Never>] = []
60+
61+
public func register(_ cc: CheckedContinuation<ExitStatus, Never>) {
62+
continuations.append(cc)
63+
}
64+
65+
public func doExit(code: Int32) {
66+
for cc in continuations {
67+
cc.resume(returning: ExitStatus(exitCode: code))
68+
}
69+
70+
exitCode = code
71+
}
72+
73+
public func exited() -> Bool {
74+
exitCode != nil
75+
}
76+
}
77+
5778
private static func sshAuthSocketHostUrl(config: ContainerConfiguration) -> URL? {
5879
if config.ssh, let sshSocket = Foundation.ProcessInfo.processInfo.environment[Self.sshAuthSocketEnvVar] {
5980
return URL(fileURLWithPath: sshSocket)
@@ -225,6 +246,8 @@ public actor SandboxService {
225246

226247
do {
227248
try await container.create()
249+
250+
try await self.initializeWaiters(for: id)
228251
try await self.monitor.registerProcess(id: config.id, onExit: self.onContainerExit)
229252
if !container.interfaces.isEmpty {
230253
try await self.startSocketForwarders(attachment: attachments[0], publishedPorts: config.publishedPorts)
@@ -233,7 +256,7 @@ public actor SandboxService {
233256
} catch {
234257
do {
235258
try await self.cleanUpContainer(containerInfo: ctrInfo)
236-
await self.setState(.stopped(nil))
259+
await self.setState(.stopped)
237260
} catch {
238261
self.log.error("failed to clean up container", metadata: ["error": "\(error)"])
239262
}
@@ -320,7 +343,7 @@ public actor SandboxService {
320343

321344
return try await self.lock.withLock { _ in
322345
switch await self.state {
323-
case .created, .stopped(_), .stopping:
346+
case .created, .stopped, .stopping:
324347
await self.setState(.shuttingDown)
325348

326349
default:
@@ -360,27 +383,27 @@ public actor SandboxService {
360383

361384
try await self.addNewProcess(id, config, stdio)
362385

363-
try await self.monitor.registerProcess(
364-
id: id,
365-
onExit: { id, exitStatus in
366-
guard let process = await self.processes[id]?.process else {
367-
throw ContainerizationError(
368-
.invalidState,
369-
message: "ProcessInfo missing for process \(id)"
370-
)
371-
}
372-
self.waiters.withLock {
373-
if let waiters = $0[id], let waiters {
374-
for cc in waiters {
375-
cc.resume(returning: exitStatus)
376-
}
386+
try await self.initializeWaiters(for: id)
387+
do {
388+
try await self.monitor.registerProcess(
389+
id: id,
390+
onExit: { id, exitStatus in
391+
await self.releaseWaiters(for: id, status: exitStatus)
392+
393+
guard let process = await self.processes[id]?.process else {
394+
throw ContainerizationError(
395+
.invalidState,
396+
message: "ProcessInfo missing for process \(id)"
397+
)
377398
}
399+
try await process.delete()
400+
try await self.setProcessState(id: id, state: .stopped)
378401
}
379-
await self.removeWaiters(for: id)
380-
try await process.delete()
381-
try await self.setProcessState(id: id, state: .stopped(exitStatus.exitCode))
382-
}
383-
)
402+
)
403+
} catch {
404+
await self.releaseWaiters(for: id, status: ExitStatus(exitCode: -1))
405+
throw error
406+
}
384407

385408
return message.reply()
386409
default:
@@ -410,7 +433,7 @@ public actor SandboxService {
410433
var cs: ContainerSnapshot?
411434

412435
switch state {
413-
case .created, .stopped(_), .booted, .shuttingDown:
436+
case .created, .stopped, .booted, .shuttingDown:
414437
status = .stopped
415438
case .stopping:
416439
status = .stopping
@@ -464,14 +487,14 @@ public actor SandboxService {
464487
)
465488

466489
do {
467-
if case .stopped(_) = await self.state {
490+
if case .stopped = await self.state {
468491
return message.reply()
469492
}
470493
try await self.cleanUpContainer(containerInfo: ctr, exitStatus: exitStatus)
471494
} catch {
472495
self.log.error("failed to clean up container", metadata: ["error": "\(error)"])
473496
}
474-
await self.setState(.stopped(exitStatus.exitCode))
497+
await self.setState(.stopped)
475498
default:
476499
break
477500
}
@@ -596,39 +619,11 @@ public actor SandboxService {
596619
throw ContainerizationError(.invalidArgument, message: "missing id in wait xpc message")
597620
}
598621

599-
let cachedCode: Int32? = try await self.lock.withLock { _ in
600-
let ctrInfo = try await self.getContainer()
601-
let ctr = ctrInfo.container
602-
if id == ctr.id {
603-
switch await self.state {
604-
case .stopped(let code):
605-
return code
606-
default:
607-
break
608-
}
609-
} else {
610-
guard let processInfo = await self.processes[id] else {
611-
throw ContainerizationError(.notFound, message: "process with id \(id)")
612-
}
613-
switch processInfo.state {
614-
case .stopped(let code):
615-
return code
616-
default:
617-
break
618-
}
619-
}
620-
return nil
621-
}
622-
if let cachedCode {
623-
let reply = message.reply()
624-
reply.set(key: SandboxKeys.exitCode.rawValue, value: Int64(cachedCode))
625-
return reply
626-
}
627-
628622
let exitStatus = await withCheckedContinuation { cc in
629623
// Is this safe since we are in an actor? :(
630-
if !self.addWaiter(id: id, cont: cc) {
631-
cc.resume(returning: ExitStatus(exitCode: -1))
624+
let (added, exitCode) = self.addWaiter(id: id, cont: cc)
625+
if !added {
626+
cc.resume(returning: ExitStatus(exitCode: exitCode ?? -1))
632627
}
633628
}
634629
let reply = message.reply()
@@ -703,7 +698,7 @@ public actor SandboxService {
703698
try await self.monitor.track(id: id, waitingOn: waitFunc)
704699
} catch {
705700
try? await self.cleanUpContainer(containerInfo: info)
706-
self.setState(.stopped(nil))
701+
self.setState(.stopped)
707702
throw error
708703
}
709704
}
@@ -828,7 +823,7 @@ public actor SandboxService {
828823
let ctrInfo = try await getContainer()
829824

830825
switch await self.state {
831-
case .stopped(_), .stopping:
826+
case .stopped, .stopping:
832827
return
833828
default:
834829
break
@@ -839,7 +834,7 @@ public actor SandboxService {
839834
} catch {
840835
self.log.error("failed to clean up container", metadata: ["error": "\(error)"])
841836
}
842-
await setState(.stopped(exitStatus.exitCode))
837+
await setState(.stopped)
843838
}
844839
}
845840

@@ -1077,14 +1072,7 @@ public actor SandboxService {
10771072
await self.stopSocketForwarders()
10781073

10791074
let status = exitStatus ?? ExitStatus(exitCode: 255)
1080-
self.waiters.withLock {
1081-
if let waiters = $0[id], let waiters {
1082-
for cc in waiters {
1083-
cc.resume(returning: status)
1084-
}
1085-
}
1086-
}
1087-
self.invalidateWaiters(for: id)
1075+
self.releaseWaiters(for: id, status: status)
10881076
}
10891077
}
10901078

@@ -1304,34 +1292,31 @@ extension FileHandle: @retroactive ReaderStream, @retroactive Writer {
13041292
// MARK: State handler and bundle creation helpers
13051293

13061294
extension SandboxService {
1307-
private func addWaiter(id: String, cont: CheckedContinuation<ExitStatus, Never>) -> Bool {
1308-
self.waiters.withLock { waiters in
1309-
var current: [CheckedContinuation<ExitStatus, Never>]
1310-
switch waiters[id] {
1311-
case .none:
1312-
current = []
1313-
case .some(.some(let value)):
1314-
current = value
1315-
case .some(.none):
1316-
return false
1317-
}
1318-
1319-
current.append(cont)
1320-
waiters[id] = current
1321-
return true
1295+
private func initializeWaiters(for id: String) throws {
1296+
guard waiters[id] == nil else {
1297+
throw ContainerizationError(.invalidState, message: "waiter for \(id) already initialized")
13221298
}
1299+
waiters[id] = ExitWaiter()
13231300
}
13241301

1325-
private func removeWaiters(for id: String) {
1326-
self.waiters.withLock { waiters in
1327-
waiters[id] = []
1302+
private func addWaiter(id: String, cont: CheckedContinuation<ExitStatus, Never>) -> (Bool, Int32?) {
1303+
guard let current = waiters[id] else {
1304+
// No waiter initialized at all
1305+
return (false, nil)
13281306
}
1329-
}
13301307

1331-
private func invalidateWaiters(for id: String) {
1332-
self.waiters.withLock { waiters in
1333-
waiters[id] = .some(nil)
1308+
if current.exited() {
1309+
// Waiter initialzed but already exited
1310+
return (false, current.exitCode)
13341311
}
1312+
1313+
// Waiter initialized and not exited. Guaranteed to exit later.
1314+
current.register(cont)
1315+
return (true, nil)
1316+
}
1317+
1318+
private func releaseWaiters(for id: String, status: ExitStatus) {
1319+
waiters[id]?.doExit(code: status.exitCode)
13351320
}
13361321

13371322
private func setUnderlyingProcess(_ id: String, _ process: LinuxProcess) throws {
@@ -1387,7 +1372,7 @@ extension SandboxService {
13871372
/// At the beginning of stop() .running will be transitioned to .stopping.
13881373
case stopping
13891374
/// Once a stop is successful, .stopping will transition to .stopped.
1390-
case stopped(Int32?)
1375+
case stopped
13911376
/// .shuttingDown will be the last state the sandbox service will ever be in. Shortly
13921377
/// afterwards the process will exit.
13931378
case shuttingDown

Tests/CLITests/Subcommands/Run/TestCLIRunLifecycle.swift

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,4 +123,19 @@ class TestCLIRunLifecycle: CLITest {
123123
}
124124
try? doRemove(name: name)
125125
}
126+
127+
@Test func testExecInvalidExcutable() async throws {
128+
let name = getTestName()
129+
try doLongRun(name: name)
130+
defer {
131+
try? doStop(name: name)
132+
}
133+
134+
#expect(throws: CLIError.self, "executing invalid executable must throw error, not hang") {
135+
try doExec(
136+
name: name,
137+
cmd: ["foobarbaz"]
138+
)
139+
}
140+
}
126141
}

0 commit comments

Comments
 (0)