Skip to content

Commit ceb7700

Browse files
committed
async i/o
1 parent 1c16a2a commit ceb7700

5 files changed

Lines changed: 600 additions & 140 deletions

File tree

Sources/BaseKit/Async/AsyncFileStream.swift

Lines changed: 20 additions & 100 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,14 @@ public extension Data {
44
/// Asynchronously read from the contents of the fileURL. This method
55
/// will throw an error if it's not a file URL.
66
init(asyncContentsOf url: URL) async throws {
7-
let stream = try url.openForReading()
8-
var allData = Data()
9-
let bytes = try await stream.readToEnd()
10-
for await data in bytes {
11-
allData.append(data)
12-
}
13-
self = allData
7+
var stream = try url.openForReading()
8+
self = try await stream.readToEnd()
149
}
1510

1611
/// Asynchronously write the contents of self into the fileURL.
1712
func asyncWrite(to url: URL) async throws {
18-
// This line makes me sad because we're copying the data. I'm not
19-
// currently aware of a way to not copy these bytes.
20-
let dispatchData = withUnsafeBytes { DispatchData(bytes: $0) }
21-
let stream = try url.openForWriting()
22-
try await stream.write(dispatchData)
13+
var stream = try url.openForWriting()
14+
try await stream.write(self)
2315
}
2416
}
2517

@@ -54,113 +46,41 @@ public extension URL {
5446
}
5547

5648
/// Allow async reading or writing to a file.
57-
public struct AsyncFileStream<Mode>: ~Copyable {
58-
private let queue: DispatchQueue
59-
private let fileDescriptor: Int32
60-
private let io: DispatchIO
61-
private var isClosed = false
62-
49+
public struct AsyncFileStream<Mode>: ~Copyable, Sendable {
50+
private let file: AsyncRandomAccessFile
51+
private var offset: off_t = 0
52+
6353
/// `url` has to be a file url, or this will throw
6454
/// `mode` is passed into the POSIX function `open()`
6555
fileprivate init(url: URL, mode: Int32) throws {
66-
guard url.isFileURL else {
67-
throw AsyncFileStreamError.notFileURL
68-
}
69-
// Since we're reading/writing as a stream, keep it a serial queue
70-
let queue = DispatchQueue(label: "AsyncFileStream")
71-
let fileDescriptor = open(url.absoluteURL.path, mode, 0o666)
72-
// Once we start setting properties, we can't throw. So check to see if
73-
// we need to throw now, then set properties
74-
if fileDescriptor == -1 {
75-
throw AsyncFileStreamError.openError(errno)
76-
}
77-
self.queue = queue
78-
self.fileDescriptor = fileDescriptor
79-
io = DispatchIO(
80-
type: .stream,
81-
fileDescriptor: fileDescriptor,
82-
queue: queue,
83-
cleanupHandler: { [fileDescriptor] error in
84-
// Unfortunately, we can't seem to do anything with `error`.
85-
// There are no guarantees when this closure is invoked, so
86-
// the safe thing would be to save the error in an actor
87-
// that the AsyncFileStream holds. That would allow the caller
88-
// to check for it, or the read()/write() methods to check
89-
// for it as well. Howevever, having an actor as a property
90-
// on a non-copyable type appears to uncover a compiler bug.
91-
92-
// Since we opened the file, we need to close it
93-
Darwin.close(fileDescriptor)
94-
}
95-
)
96-
}
97-
98-
deinit {
99-
// Ensure we've closed the file if we're going out of scope
100-
if !isClosed {
101-
io.close()
102-
}
56+
file = try AsyncRandomAccessFile(url: url, mode: mode)
10357
}
104-
58+
10559
/// Close the file. Consuming method
10660
public consuming func close() {
107-
isClosed = true
108-
io.close()
61+
file.close()
10962
}
110-
11163
}
11264

11365
/// Methods available in read mode
11466
public extension AsyncFileStream where Mode == ReadMode {
11567
/// Read the entire contents of the file in one go
116-
func readToEnd() async throws -> AsyncStream<Data> {
117-
try await read(upToCount: .max)
118-
}
119-
120-
func readData(upToCount length: Int) async throws -> Data {
121-
let stream = try await read(upToCount: length)
122-
var allData = Data()
123-
for await data in stream {
124-
allData.append(data)
125-
}
126-
return allData
68+
mutating func readToEnd() async throws -> Data {
69+
try await readData(upToCount: .max)
12770
}
12871

129-
/// Read the next `length` bytes.
130-
func read(upToCount length: Int) async throws -> AsyncStream<Data> {
131-
let (stream, continuation) = AsyncStream.makeStream(of: Data.self)
132-
io.read(offset: 0, length: length, queue: queue) { done, data, error in
133-
if let data {
134-
continuation.yield(Data(data))
135-
}
136-
guard done else {
137-
return // not done yet
138-
}
139-
continuation.finish()
140-
}
141-
return stream
72+
mutating func readData(upToCount length: Int) async throws -> Data {
73+
let dataRead = try await file.read(length, at: offset)
74+
offset += off_t(dataRead.count)
75+
return dataRead
14276
}
14377
}
14478

14579
/// Methods available in write mode
14680
public extension AsyncFileStream where Mode == WriteMode {
14781
/// Write the data out to file async
148-
func write(_ data: DispatchData) async throws {
149-
try await withCheckedThrowingContinuation { continuation in
150-
io.write(
151-
offset: 0,
152-
data: data,
153-
queue: queue
154-
) { done, _, error in
155-
guard done else {
156-
return // not done yet
157-
}
158-
if error != 0 {
159-
continuation.resume(throwing: AsyncFileStreamError.writeError(error))
160-
} else {
161-
continuation.resume(returning: ())
162-
}
163-
}
164-
} as Void
82+
mutating func write(_ data: Data) async throws {
83+
let bytesWritten = try await file.write(data, at: offset)
84+
offset += off_t(bytesWritten)
16585
}
16686
}

0 commit comments

Comments
 (0)