forked from apple/swift-openapi-runtime
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathServerSentEventsEncoding.swift
More file actions
190 lines (161 loc) · 7.19 KB
/
ServerSentEventsEncoding.swift
File metadata and controls
190 lines (161 loc) · 7.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftOpenAPIGenerator open source project
//
// Copyright (c) 2023 Apple Inc. and the SwiftOpenAPIGenerator project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftOpenAPIGenerator project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
#if canImport(Darwin)
public import class Foundation.JSONEncoder
#else
@preconcurrency public import class Foundation.JSONEncoder
#endif
/// A sequence that serializes Server-sent Events.
public struct ServerSentEventsSerializationSequence<Upstream: AsyncSequence & Sendable>: Sendable
where Upstream.Element == ServerSentEvent {
/// The upstream sequence.
private let upstream: Upstream
/// Creates a new sequence.
/// - Parameter upstream: The upstream sequence of events.
public init(upstream: Upstream) { self.upstream = upstream }
}
extension ServerSentEventsSerializationSequence: AsyncSequence {
/// The type of element produced by this asynchronous sequence.
public typealias Element = ArraySlice<UInt8>
/// The iterator of `ServerSentEventsSerializationSequence`.
public struct Iterator<UpstreamIterator: AsyncIteratorProtocol>: AsyncIteratorProtocol
where UpstreamIterator.Element == ServerSentEvent {
/// The upstream iterator of lines.
var upstream: UpstreamIterator
/// The state machine of the iterator.
var stateMachine: StateMachine = .init()
/// Asynchronously advances to the next element and returns it, or ends the
/// sequence if there is no next element.
public mutating func next() async throws -> ArraySlice<UInt8>? {
while true {
switch stateMachine.next() {
case .returnNil: return nil
case .needsMore:
let value = try await upstream.next()
switch stateMachine.receivedValue(value) {
case .returnNil: return nil
case .returnBytes(let bytes): return bytes
}
}
}
}
}
/// Creates the asynchronous iterator that produces elements of this
/// asynchronous sequence.
public func makeAsyncIterator() -> Iterator<Upstream.AsyncIterator> {
Iterator(upstream: upstream.makeAsyncIterator())
}
}
extension AsyncSequence {
/// Returns another sequence that encodes Server-sent Events with generic data in the data field.
/// - Returns: A sequence that provides the serialized Server-sent Events.
public func asEncodedServerSentEvents() -> ServerSentEventsSerializationSequence<Self>
where Element == ServerSentEvent { .init(upstream: self) }
/// Returns another sequence that encodes Server-sent Events that have a JSON value in the data field.
/// - Parameter encoder: The JSON encoder to use.
/// - Returns: A sequence that provides the serialized Server-sent Events.
public func asEncodedServerSentEventsWithJSONData<JSONDataType: Encodable>(
encoder: JSONEncoder = {
let encoder = JSONEncoder()
encoder.outputFormatting = [.sortedKeys, .withoutEscapingSlashes]
return encoder
}()
) -> ServerSentEventsSerializationSequence<AsyncThrowingMapSequence<Self, ServerSentEvent>>
where Element == ServerSentEventWithJSONData<JSONDataType> {
ServerSentEventsSerializationSequence(
upstream: map { event in
ServerSentEvent(
id: event.id,
event: event.event,
data: try event.data.flatMap { try String(decoding: encoder.encode($0), as: UTF8.self) },
retry: event.retry
)
}
)
}
}
extension ServerSentEventsSerializationSequence.Iterator {
/// A state machine representing the JSON Lines serializer.
struct StateMachine {
/// The possible states of the state machine.
enum State {
/// Is emitting serialized JSON Lines events.
case running
/// Finished, the terminal state.
case finished
}
/// The current state of the state machine.
private(set) var state: State
/// Creates a new state machine.
init() { self.state = .running }
/// An action returned by the `next` method.
enum NextAction {
/// Return nil to the caller, no more bytes.
case returnNil
/// Needs more bytes.
case needsMore
}
/// Read the next byte chunk serialized from upstream lines.
/// - Returns: An action to perform.
mutating func next() -> NextAction {
switch state {
case .running: return .needsMore
case .finished: return .returnNil
}
}
/// An action returned by the `receivedValue` method.
enum ReceivedValueAction {
/// Return nil to the caller, no more bytes.
case returnNil
/// Emit the provided bytes.
case returnBytes(ArraySlice<UInt8>)
}
/// Ingest the provided event.
/// - Parameter value: A new event. If `nil`, then the source of events is finished.
/// - Returns: An action to perform.
mutating func receivedValue(_ value: ServerSentEvent?) -> ReceivedValueAction {
switch state {
case .running:
if let value {
var buffer: [UInt8] = []
func encodeField(name: String, value: some StringProtocol) {
buffer.append(contentsOf: name.utf8)
buffer.append(ASCII.colon)
buffer.append(ASCII.space)
buffer.append(contentsOf: value.utf8)
buffer.append(ASCII.lf)
}
if let id = value.id { encodeField(name: "id", value: id) }
if let event = value.event { encodeField(name: "event", value: event) }
if let retry = value.retry { encodeField(name: "retry", value: String(retry)) }
if let data = value.data {
// Normalize the data section by replacing CRLF and CR with just LF.
// Then split the section into individual field/value pairs.
let lines = data.replacingOccurrences(of: "\r\n", with: "\n")
.replacingOccurrences(of: "\r", with: "\n")
.split(separator: "\n", omittingEmptySubsequences: false)
for line in lines { encodeField(name: "data", value: line) }
}
// End the event.
buffer.append(ASCII.lf)
return .returnBytes(ArraySlice(buffer))
} else {
state = .finished
return .returnNil
}
case .finished: preconditionFailure("Invalid state")
}
}
}
}