Skip to content

Commit 5e04043

Browse files
Fixed an issue where AsyncThrowingBackpressureStream would not react to cancellation properly
1 parent 17e0fb9 commit 5e04043

2 files changed

Lines changed: 309 additions & 21 deletions

File tree

Sources/CodableDatastore/Persistence/Disk Persistence/AsyncThrowingBackpressureStream.swift

Lines changed: 74 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,50 +8,98 @@
88

99
import Foundation
1010

11+
/// A stream that limits reads based on the speed results are consumed at.
12+
///
13+
/// A backpressure stream is consumed within a _reading task_, usually in a `for try await` loop. Separately, it is fed within an internal _writing task_ inherited during initialization. These may share the same parent task depending on the use case.
14+
///
15+
/// Writes to the stream can be made one at a time until they are consumed by the reading task, usually via an iterator. If a write is not consumed, the writing task is suspended until the reading task is ready to consume the event. To this effect, a backpressure stream may hold onto at most a single pending event while waiting for a read to take place. Similarly, if a read happens before a write is ready, the reading task will be suspended, while the write will be processed immediately allowing a follow-up write to be made.
16+
///
17+
/// The reading task may be cancelled at any time, immediately ending the loop, and propagaing the cancellation to the writing child task, stopping any more values from being provided to the stream.
1118
struct AsyncThrowingBackpressureStream<Element: Sendable>: Sendable {
1219
fileprivate actor StateMachine {
13-
var pendingEvents: [(CheckedContinuation<Void, Error>, Result<Element?, Error>)] = []
14-
var eventsReadyContinuation: CheckedContinuation<Element?, Error>?
20+
var pendingWriteEvents: [(CheckedContinuation<Void, Error>, Result<Element?, Error>)] = []
21+
var pendingReadContinuation: CheckedContinuation<Element?, Error>?
1522
var wasCancelled = false
1623

1724
func provide(_ result: Result<Element?, Error>) async throws {
18-
guard !wasCancelled else { throw CancellationError() }
25+
/// If reads were cancelled, propagate the cancellation to the provider without saving the result.
26+
if wasCancelled { throw CancellationError() }
1927

28+
/// Enqueue the provided result and continue the task once it is ready to be consumed.
2029
try await withCheckedThrowingContinuation { continuation in
21-
precondition(pendingEvents.isEmpty, "More than one event has bee queued on the stream.")
22-
if let eventsReadyContinuation {
23-
self.eventsReadyContinuation = nil
24-
eventsReadyContinuation.resume(with: result)
30+
/// Ideally, no more than one pending event should be queued up, as a second event means backpressure isn't working.
31+
precondition(pendingWriteEvents.isEmpty, "More than one event has been queued on the stream.")
32+
33+
/// If a read is currently pending, signal that a new result has been provided.
34+
if let pendingReadContinuation {
35+
self.pendingReadContinuation = nil
36+
pendingReadContinuation.resume(with: result)
2537
continuation.resume()
2638
} else {
27-
pendingEvents.append((continuation, result))
39+
/// If we aren't ready for events, queue the event and suspend the task until events are ready. This will stop more values from being provided (ie. the backpressure at work).
40+
pendingWriteEvents.append((continuation, result))
2841
}
2942
}
3043
}
3144

45+
/// Cancel any reads by immediately signalling that no events are available to any pending read.
46+
private func cancelPendingRead() {
47+
wasCancelled = true
48+
if let pendingReadContinuation {
49+
self.pendingReadContinuation = nil
50+
pendingReadContinuation.resume(throwing: CancellationError())
51+
}
52+
}
53+
54+
/// Consume the next value on the read task.
55+
///
56+
/// There are two scenarios to consider here:
57+
/// - A read happens before a write.
58+
/// - A write happens before a read.
59+
///
60+
/// In the first case, a continuation is saved and the read task is suspended. In the second case, a read is popped off the from of the pending write events and returned immediately.
3261
func consumeNext() async throws -> Element? {
3362
if Task.isCancelled {
3463
wasCancelled = true
3564
}
3665

37-
return try await withCheckedThrowingContinuation { continuation in
38-
guard !pendingEvents.isEmpty else {
39-
eventsReadyContinuation = continuation
40-
return
41-
}
42-
let (providerContinuation, result) = pendingEvents.removeFirst()
43-
continuation.resume(with: result)
44-
if wasCancelled {
45-
providerContinuation.resume(throwing: CancellationError())
46-
} else {
47-
providerContinuation.resume()
66+
return try await withTaskCancellationHandler {
67+
try await withCheckedThrowingContinuation { continuation in
68+
guard !pendingWriteEvents.isEmpty else {
69+
/// If there are no pending events, suspend the reading task until one is signaled.
70+
guard !wasCancelled else {
71+
/// If the task was cancelled, stop here without waiting for the signal — the provider will be cancelled as soon as they try to provide their first value.
72+
continuation.resume(throwing: CancellationError())
73+
return
74+
}
75+
pendingReadContinuation = continuation
76+
return
77+
}
78+
79+
/// Otherwise, pop the first entry off the stack and return it.
80+
let (providerContinuation, result) = pendingWriteEvents.removeFirst()
81+
82+
/// Return the reading task with the result we have from the write queue.
83+
continuation.resume(with: result)
84+
85+
/// Determine if the provider should continue providing values or if it should be stopped here.
86+
if wasCancelled {
87+
providerContinuation.resume(throwing: CancellationError())
88+
} else {
89+
providerContinuation.resume()
90+
}
4891
}
92+
} onCancel: {
93+
Task { await cancelPendingRead() }
4994
}
5095
}
5196

5297
deinit {
53-
if let eventsReadyContinuation {
54-
eventsReadyContinuation.resume(throwing: CancellationError())
98+
if let pendingReadContinuation {
99+
pendingReadContinuation.resume(throwing: CancellationError())
100+
}
101+
for (providerContinuation, _) in pendingWriteEvents {
102+
providerContinuation.resume(throwing: CancellationError())
55103
}
56104
}
57105
}
@@ -102,6 +150,11 @@ extension AsyncThrowingBackpressureStream: AsyncInstances {
102150
func next() async throws -> Element? {
103151
try await stateMachine.consumeNext()
104152
}
153+
154+
/// Used only for testing.
155+
internal var wasCancelled: Bool {
156+
get async { await stateMachine.wasCancelled }
157+
}
105158
}
106159

107160
func makeAsyncIterator() -> AsyncIterator {
Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
//
2+
// AsyncThrowingBackpressureStreamTests.swift
3+
// CodableDatastore
4+
//
5+
// Created by Dimitri Bouniol on 2026-01-23.
6+
// Copyright © 2023-26 Mochi Development, Inc. All rights reserved.
7+
//
8+
9+
import XCTest
10+
@testable import CodableDatastore
11+
12+
final class AsyncThrowingBackpressureStreamTests: XCTestCase {
13+
func testStreamForwardsResults() async throws {
14+
let stream = AsyncThrowingBackpressureStream<Int> { continuation in
15+
try await continuation.yield(0)
16+
try await continuation.yield(1)
17+
try await continuation.yield(2)
18+
try await continuation.yield(3)
19+
try await continuation.yield(4)
20+
}
21+
22+
let results = try await stream.collectInstances(upTo: .infinity)
23+
24+
XCTAssertEqual(results, [0, 1, 2, 3, 4])
25+
}
26+
27+
func testReadTaskSuspendsWriteTask() async throws {
28+
let (writeContinuations, readProvider) = AsyncStream.makeStream(of: (Int, CheckedContinuation<Void, Never>).self)
29+
30+
let stream = AsyncThrowingBackpressureStream<Int> { continuation in
31+
try await continuation.yield(0)
32+
await withCheckedContinuation { continuation in
33+
readProvider.yield((0, continuation))
34+
}
35+
try await continuation.yield(1)
36+
await withCheckedContinuation { continuation in
37+
readProvider.yield((1, continuation))
38+
}
39+
try await continuation.yield(2)
40+
await withCheckedContinuation { continuation in
41+
readProvider.yield((2, continuation))
42+
}
43+
try await continuation.yield(3)
44+
await withCheckedContinuation { continuation in
45+
readProvider.yield((3, continuation))
46+
}
47+
try await continuation.yield(4)
48+
await withCheckedContinuation { continuation in
49+
readProvider.yield((4, continuation))
50+
}
51+
}
52+
53+
let iterator = stream.makeAsyncIterator()
54+
var consumer = writeContinuations.makeAsyncIterator()
55+
56+
var result = try await iterator.next()
57+
XCTAssertEqual(result, 0)
58+
var accumulatedResult = await consumer.next()!
59+
accumulatedResult.1.resume()
60+
XCTAssertEqual(accumulatedResult.0, 0)
61+
62+
result = try await iterator.next()
63+
XCTAssertEqual(result, 1)
64+
accumulatedResult = await consumer.next()!
65+
accumulatedResult.1.resume()
66+
XCTAssertEqual(accumulatedResult.0, 1)
67+
68+
result = try await iterator.next()
69+
XCTAssertEqual(result, 2)
70+
accumulatedResult = await consumer.next()!
71+
accumulatedResult.1.resume()
72+
XCTAssertEqual(accumulatedResult.0, 2)
73+
74+
result = try await iterator.next()
75+
XCTAssertEqual(result, 3)
76+
accumulatedResult = await consumer.next()!
77+
accumulatedResult.1.resume()
78+
XCTAssertEqual(accumulatedResult.0, 3)
79+
80+
result = try await iterator.next()
81+
XCTAssertEqual(result, 4)
82+
accumulatedResult = await consumer.next()!
83+
accumulatedResult.1.resume()
84+
XCTAssertEqual(accumulatedResult.0, 4)
85+
86+
result = try await iterator.next()
87+
XCTAssertEqual(result, nil)
88+
}
89+
90+
func testWriteTaskNeverProgressesWhenReadsDoNotHappen() async throws {
91+
let (writeContinuations, readProvider) = AsyncStream.makeStream(of: (Int, CheckedContinuation<Void, Never>).self)
92+
93+
let stream = AsyncThrowingBackpressureStream<Int> { continuation in
94+
try await continuation.yield(0)
95+
await withCheckedContinuation { continuation in
96+
readProvider.yield((0, continuation))
97+
}
98+
try await continuation.yield(1)
99+
XCTFail()
100+
}
101+
102+
let iterator = stream.makeAsyncIterator()
103+
var consumer = writeContinuations.makeAsyncIterator()
104+
105+
let result = try await iterator.next()
106+
XCTAssertEqual(result, 0)
107+
let accumulatedResult = await consumer.next()!
108+
accumulatedResult.1.resume()
109+
XCTAssertEqual(accumulatedResult.0, 0)
110+
111+
try await Task.sleep(for: .seconds(1))
112+
}
113+
114+
func testWriteTaskNeverProgressesWhenReadsAreCancelled() async throws {
115+
let expectation = expectation(description: "Writes were cancelled")
116+
117+
let task = Task {
118+
let stream = AsyncThrowingBackpressureStream<Int> { continuation in
119+
try await continuation.yield(0)
120+
do {
121+
try await continuation.yield(1)
122+
XCTFail()
123+
} catch {
124+
XCTAssertEqual(error is CancellationError, true)
125+
expectation.fulfill()
126+
}
127+
}
128+
129+
let iterator = stream.makeAsyncIterator()
130+
let result = try await iterator.next()
131+
XCTAssertEqual(result, 0)
132+
133+
withUnsafeCurrentTask { task in
134+
task?.cancel()
135+
}
136+
137+
do {
138+
/// Perform two reads, because we can't control if the write happens before this happens (in which case the first read will succeed) or if it happens after (in which the first read will fail). Either way, the second read will always fail and return nil.
139+
_ = try await iterator.next()
140+
_ = try await iterator.next()
141+
XCTFail()
142+
} catch {
143+
XCTAssertEqual(error is CancellationError, true)
144+
}
145+
}
146+
147+
try? await task.value
148+
149+
await fulfillment(of: [expectation], timeout: 10)
150+
}
151+
152+
func testReadingNotSuspendedWhenCancelledBeforeWrite() async throws {
153+
let (writeContinuations, readProvider) = AsyncStream.makeStream(of: CheckedContinuation<Void, Never>.self)
154+
155+
let expectation = expectation(description: "Writes were cancelled")
156+
157+
let task = Task {
158+
let stream = AsyncThrowingBackpressureStream<Int> { continuation in
159+
try await continuation.yield(0)
160+
await withCheckedContinuation { continuation in
161+
readProvider.yield(continuation)
162+
}
163+
do {
164+
try await continuation.yield(1)
165+
XCTFail()
166+
} catch {
167+
XCTAssertEqual(error is CancellationError, true)
168+
expectation.fulfill()
169+
}
170+
}
171+
172+
let iterator = stream.makeAsyncIterator()
173+
let result = try await iterator.next()
174+
XCTAssertEqual(result, 0)
175+
176+
withUnsafeCurrentTask { task in
177+
task?.cancel()
178+
}
179+
180+
do {
181+
/// This read is guaranteed to happen before the write, which is blocked below. It should _never_ stall until the write is made.
182+
_ = try await iterator.next()
183+
XCTFail()
184+
} catch {
185+
XCTAssertEqual(error is CancellationError, true)
186+
await writeContinuations.first(where: { _ in true })?.resume()
187+
}
188+
}
189+
190+
try? await task.value
191+
192+
await fulfillment(of: [expectation], timeout: 10)
193+
}
194+
195+
func testWritingUnsuspendsWhenReadsCancelledButNeverMade() async throws {
196+
let (writeContinuations, readProvider) = AsyncStream.makeStream(of: CheckedContinuation<Void, Never>.self)
197+
198+
let expectation = expectation(description: "Writes were cancelled")
199+
200+
let task = Task {
201+
let stream = AsyncThrowingBackpressureStream<Int> { continuation in
202+
try await continuation.yield(0)
203+
await withCheckedContinuation { continuation in
204+
readProvider.yield(continuation)
205+
}
206+
do {
207+
try await continuation.yield(1)
208+
XCTFail()
209+
} catch {
210+
XCTAssertEqual(error is CancellationError, true)
211+
expectation.fulfill()
212+
}
213+
}
214+
215+
let iterator = stream.makeAsyncIterator()
216+
let result = try await iterator.next()
217+
XCTAssertEqual(result, 0)
218+
219+
withUnsafeCurrentTask { task in
220+
task?.cancel()
221+
}
222+
223+
/// Let the write happen stritly after cancellation
224+
await writeContinuations.first(where: { _ in true })?.resume()
225+
226+
/// The stream can't be marked as cancelled if another read never happens.
227+
let wasCancelled = await iterator.wasCancelled
228+
XCTAssertEqual(wasCancelled, false)
229+
}
230+
231+
try? await task.value
232+
233+
await fulfillment(of: [expectation], timeout: 10)
234+
}
235+
}

0 commit comments

Comments
 (0)