-
Notifications
You must be signed in to change notification settings - Fork 32
Expand file tree
/
Copy pathRegulator.swift
More file actions
122 lines (107 loc) · 3.32 KB
/
Regulator.swift
File metadata and controls
122 lines (107 loc) · 3.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
//
// Regulator.swift
//
//
// Created by Thibault Wittemberg on 08/09/2022.
//
enum RegulatedElement<Element>: @unchecked Sendable {
case termination
case element(result: Result<Element, Error>)
}
final class Regulator<Base: AsyncSequence>: @unchecked Sendable {
enum State {
case idle
case suspended(UnsafeContinuation<Bool, Never>)
case active
case finished
}
let base: Base
let state: ManagedCriticalState<State>
let onNextRegulatedElement: @Sendable (RegulatedElement<Base.Element>) -> Void
init(
_ base: Base,
onNextRegulatedElement: @Sendable @escaping (RegulatedElement<Base.Element>) -> Void
) {
self.base = base
self.state = ManagedCriticalState(.idle)
self.onNextRegulatedElement = onNextRegulatedElement
}
func unsuspendAndExitOnCancel() {
let continuation = state.withCriticalRegion { state -> UnsafeContinuation<Bool, Never>? in
switch state {
case .suspended(let continuation):
state = .finished
return continuation
default:
state = .finished
return nil
}
}
continuation?.resume(returning: true)
}
func iterate() async {
await withTaskCancellationHandler {
var mutableBase = base.makeAsyncIterator()
do {
baseLoop: while true {
let shouldExit = await withUnsafeContinuation { (continuation: UnsafeContinuation<Bool, Never>) in
let decision = self.state.withCriticalRegion { state -> (UnsafeContinuation<Bool, Never>?, Bool) in
switch state {
case .idle:
state = .suspended(continuation)
return (nil, false)
case .suspended(let continuation):
assertionFailure("Inconsistent state, the base is already suspended")
return (continuation, true)
case .active:
return (continuation, false)
case .finished:
return (continuation, true)
}
}
decision.0?.resume(returning: decision.1)
}
if shouldExit {
// end the loop ... no more values from this base
break baseLoop
}
let element = try await mutableBase.next()
let regulatedElement = self.state.withCriticalRegion { state -> RegulatedElement<Base.Element> in
switch element {
case .some(let element):
state = .idle
return .element(result: .success(element))
case .none:
state = .finished
return .termination
}
}
self.onNextRegulatedElement(regulatedElement)
}
} catch {
self.state.withCriticalRegion { state in
state = .finished
}
self.onNextRegulatedElement(.element(result: .failure(error)))
}
} onCancel: {
self.unsuspendAndExitOnCancel()
}
}
@Sendable
func requestNextRegulatedElement() {
let continuation = self.state.withCriticalRegion { state -> UnsafeContinuation<Bool, Never>? in
switch state {
case .suspended(let continuation):
state = .active
return continuation
case .idle:
state = .active
return nil
case .active, .finished:
return nil
}
}
continuation?.resume(returning: false)
}
}