Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
xcuserdata/
DerivedData/
.swiftpm/xcode/package.xcworkspace/contents.xcworkspacedata
Package.resolved
16 changes: 0 additions & 16 deletions Package.resolved

This file was deleted.

34 changes: 21 additions & 13 deletions Package.swift
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// swift-tools-version:5.5
// swift-tools-version:6.0
// The swift-tools-version declares the minimum version of Swift required to build this package.

import PackageDescription
Expand All @@ -16,23 +16,31 @@ let package = Package(
name: "AsyncExtensions",
targets: ["AsyncExtensions"]),
],
dependencies: [.package(url: "https://github.com/apple/swift-collections.git", .upToNextMajor(from: "1.0.3"))],
dependencies: [
.package(url: "https://github.com/apple/swift-collections.git", .upToNextMajor(from: "1.0.3")),
.package(url: "https://github.com/apple/swift-async-algorithms.git", .upToNextMajor(from: "1.0.0")),
.package(url: "https://github.com/OpenCombine/OpenCombine.git", from: "0.14.0"),
.package(url: "https://github.com/apple/swift-atomics.git", .upToNextMajor(from: "1.2.0")),
],
targets: [
.target(
name: "AsyncExtensions",
dependencies: [.product(name: "Collections", package: "swift-collections")],
path: "Sources"
// ,
// swiftSettings: [
// .unsafeFlags([
// "-Xfrontend", "-warn-concurrency",
// "-Xfrontend", "-enable-actor-data-race-checks",
// ])
// ]
dependencies: [
.product(name: "Collections", package: "swift-collections"),
.product(name: "Atomics", package: "swift-atomics")
],
path: "Sources",
swiftSettings: [.swiftLanguageMode(.v5)]
),
.testTarget(
name: "AsyncExtensionsTests",
dependencies: ["AsyncExtensions"],
path: "Tests"),
dependencies: [
"AsyncExtensions",
.product(name: "OpenCombine", package: "OpenCombine", condition: .when(platforms: [.linux])),
.product(name: "AsyncAlgorithms", package: "swift-async-algorithms")
],
path: "Tests",
swiftSettings: [.swiftLanguageMode(.v5)]
),
]
)
8 changes: 1 addition & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

**AsyncExtensions** provides a collection of operators that intends to ease the creation and combination of `AsyncSequences`.

**AsyncExtensions** can be seen as a companion to Apple [swift-async-algorithms](https://github.com/apple/swift-async-algorithms). For now there is an overlap between both libraries, but when **swift-async-algorithms** becomes stable the overlapping operators while be deprecated in **AsyncExtensions**. Nevertheless **AsyncExtensions** will continue to provide the operators that the community needs and are not provided by Apple.
**AsyncExtensions** can be seen as a companion to Apple [swift-async-algorithms](https://github.com/apple/swift-async-algorithms), which provides operators that the community needs and are not provided by Apple.

## Adding AsyncExtensions as a Dependency

Expand Down Expand Up @@ -44,11 +44,6 @@ AsyncStream)
* [AsyncThrowingReplaySubject](./Sources/AsyncSubjects/AsyncThrowingReplaySubject.swift): Throwing subject with a shared output. Maintains and replays a buffered amount of values

### Combiners
* [`zip(_:_:)`](./Sources/Combiners/Zip/AsyncZip2Sequence.swift): Zips two `AsyncSequence` into an AsyncSequence of tuple of elements
* [`zip(_:_:_:)`](./Sources/Combiners/Zip/AsyncZip3Sequence.swift): Zips three `AsyncSequence` into an AsyncSequence of tuple of elements
* [`zip(_:)`](./Sources/Combiners/Zip/AsyncZipSequence.swift): Zips any async sequences into an array of elements
* [`merge(_:_:)`](./Sources/Combiners/Merge/AsyncMerge2Sequence.swift): Merges two `AsyncSequence` into an AsyncSequence of elements
* [`merge(_:_:_:)`](./Sources/Combiners/Merge/AsyncMerge3Sequence.swift): Merges three `AsyncSequence` into an AsyncSequence of elements
* [`merge(_:)`](./Sources/Combiners/Merge/AsyncMergeSequence.swift): Merges any `AsyncSequence` into an AsyncSequence of elements
* [`withLatest(_:)`](./Sources/Combiners/WithLatestFrom/AsyncWithLatestFromSequence.swift): Combines elements from self with the last known element from an other `AsyncSequence`
* [`withLatest(_:_:)`](./Sources/Combiners/WithLatestFrom/AsyncWithLatestFrom2Sequence.swift): Combines elements from self with the last known elements from two other async sequences
Expand All @@ -58,7 +53,6 @@ AsyncStream)
* [AsyncFailSequence](./Sources/Creators/AsyncFailSequence.swift): Creates an `AsyncSequence` that immediately fails
* [AsyncJustSequence](./Sources/Creators/AsyncJustSequence.swift): Creates an `AsyncSequence` that emits an element an finishes
* [AsyncThrowingJustSequence](./Sources/Creators/AsyncThrowingJustSequence.swift): Creates an `AsyncSequence` that emits an elements and finishes bases on a throwing closure
* [AsyncLazySequence](./Sources/Creators/AsyncLazySequence.swift): Creates an `AsyncSequence` of the elements from the base sequence
* [AsyncTimerSequence](./Sources/Creators/AsyncTimerSequence.swift): Creates an `AsyncSequence` that emits a date value periodically
* [AsyncStream Pipe](./Sources/Creators/AsyncStream+Pipe.swift): Creates an AsyncStream and returns a tuple standing for its inputs and outputs

Expand Down
33 changes: 19 additions & 14 deletions Sources/AsyncChannels/AsyncBufferedChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// Created by Thibault Wittemberg on 07/01/2022.
//

import Atomics
import DequeModule
import OrderedCollections

Expand Down Expand Up @@ -69,27 +70,24 @@ public final class AsyncBufferedChannel<Element: Sendable>: AsyncSequence, Senda
enum State: @unchecked Sendable {
case idle
case queued(Deque<Value>)
case awaiting(OrderedSet<Awaiting>)
case awaiting([Awaiting])
case finished

static var initial: State {
.idle
}
}

let ids: ManagedCriticalState<Int>
let ids: ManagedAtomic<Int>
let state: ManagedCriticalState<State>

public init() {
self.ids = ManagedCriticalState(0)
self.ids = ManagedAtomic(0)
self.state = ManagedCriticalState(.initial)
}

func generateId() -> Int {
self.ids.withCriticalRegion { ids in
ids += 1
return ids
}
ids.wrappingIncrementThenLoad(by: 1, ordering: .relaxed)
}

var hasBufferedElements: Bool {
Expand Down Expand Up @@ -155,12 +153,12 @@ public final class AsyncBufferedChannel<Element: Sendable>: AsyncSequence, Senda

func next(onSuspend: (() -> Void)? = nil) async -> Element? {
let awaitingId = self.generateId()
let cancellation = ManagedCriticalState<Bool>(false)
let cancellation = ManagedAtomic<Bool>(false)

return await withTaskCancellationHandler {
await withUnsafeContinuation { [state] (continuation: UnsafeContinuation<Element?, Never>) in
let decision = state.withCriticalRegion { state -> AwaitingDecision in
let isCancelled = cancellation.withCriticalRegion { $0 }
let isCancelled = cancellation.load(ordering: .acquiring)
guard !isCancelled else { return .resume(nil) }

switch state {
Expand All @@ -184,7 +182,13 @@ public final class AsyncBufferedChannel<Element: Sendable>: AsyncSequence, Senda
return .suspend
}
case .awaiting(var awaitings):
awaitings.updateOrAppend(Awaiting(id: awaitingId, continuation: continuation))
let awaiting = Awaiting(id: awaitingId, continuation: continuation)

if let index = awaitings.firstIndex(where: { $0 == awaiting }) {
awaitings[index] = awaiting
} else {
awaitings.append(awaiting)
}
state = .awaiting(awaitings)
return .suspend
case .finished:
Expand All @@ -200,12 +204,13 @@ public final class AsyncBufferedChannel<Element: Sendable>: AsyncSequence, Senda
}
} onCancel: { [state] in
let awaiting = state.withCriticalRegion { state -> Awaiting? in
cancellation.withCriticalRegion { cancellation in
cancellation = true
}
cancellation.store(true, ordering: .releasing)
switch state {
case .awaiting(var awaitings):
let awaiting = awaitings.remove(.placeHolder(id: awaitingId))
let index = awaitings.firstIndex(where: { $0 == .placeHolder(id: awaitingId) })
guard let index else { return nil }
let awaiting = awaitings[index]
awaitings.remove(at: index)
if awaitings.isEmpty {
state = .idle
} else {
Expand Down
33 changes: 19 additions & 14 deletions Sources/AsyncChannels/AsyncThrowingBufferedChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// Created by Thibault Wittemberg on 07/01/2022.
//

import Atomics
import DequeModule
import OrderedCollections

Expand Down Expand Up @@ -80,27 +81,24 @@ public final class AsyncThrowingBufferedChannel<Element, Failure: Error>: AsyncS
enum State: @unchecked Sendable {
case idle
case queued(Deque<Value>)
case awaiting(OrderedSet<Awaiting>)
case awaiting([Awaiting])
case terminated(Termination)

static var initial: State {
.idle
}
}

let ids: ManagedCriticalState<Int>
let ids: ManagedAtomic<Int>
let state: ManagedCriticalState<State>

public init() {
self.ids = ManagedCriticalState(0)
self.ids = ManagedAtomic(0)
self.state = ManagedCriticalState(.initial)
}

func generateId() -> Int {
self.ids.withCriticalRegion { ids in
ids += 1
return ids
}
ids.wrappingIncrementThenLoad(by: 1, ordering: .relaxed)
}

var hasBufferedElements: Bool {
Expand Down Expand Up @@ -176,12 +174,12 @@ public final class AsyncThrowingBufferedChannel<Element, Failure: Error>: AsyncS

func next(onSuspend: (() -> Void)? = nil) async throws -> Element? {
let awaitingId = self.generateId()
let cancellation = ManagedCriticalState<Bool>(false)
let cancellation = ManagedAtomic<Bool>(false)

return try await withTaskCancellationHandler {
try await withUnsafeThrowingContinuation { [state] (continuation: UnsafeContinuation<Element?, Error>) in
let decision = state.withCriticalRegion { state -> AwaitingDecision in
let isCancelled = cancellation.withCriticalRegion { $0 }
let isCancelled = cancellation.load(ordering: .acquiring)
guard !isCancelled else { return .resume(nil) }

switch state {
Expand All @@ -208,7 +206,13 @@ public final class AsyncThrowingBufferedChannel<Element, Failure: Error>: AsyncS
return .suspend
}
case .awaiting(var awaitings):
awaitings.updateOrAppend(Awaiting(id: awaitingId, continuation: continuation))
let awaiting = Awaiting(id: awaitingId, continuation: continuation)

if let index = awaitings.firstIndex(where: { $0 == awaiting }) {
awaitings[index] = awaiting
} else {
awaitings.append(awaiting)
}
state = .awaiting(awaitings)
return .suspend
case .terminated(.finished):
Expand All @@ -227,12 +231,13 @@ public final class AsyncThrowingBufferedChannel<Element, Failure: Error>: AsyncS
}
} onCancel: { [state] in
let awaiting = state.withCriticalRegion { state -> Awaiting? in
cancellation.withCriticalRegion { cancellation in
cancellation = true
}
cancellation.store(true, ordering: .releasing)
switch state {
case .awaiting(var awaitings):
let awaiting = awaitings.remove(.placeHolder(id: awaitingId))
let index = awaitings.firstIndex(where: { $0 == .placeHolder(id: awaitingId) })
guard let index else { return nil }
let awaiting = awaitings[index]
awaitings.remove(at: index)
if awaitings.isEmpty {
state = .idle
} else {
Expand Down
57 changes: 30 additions & 27 deletions Sources/AsyncSubjects/AsyncCurrentValueSubject.swift
Original file line number Diff line number Diff line change
Expand Up @@ -67,54 +67,57 @@ public final class AsyncCurrentValueSubject<Element>: AsyncSubject where Element
/// Sends a value to all consumers
/// - Parameter element: the value to send
public func send(_ element: Element) {
self.state.withCriticalRegion { state in
let channels = self.state.withCriticalRegion { state -> [AsyncBufferedChannel<Element>] in
state.current = element
for channel in state.channels.values {
channel.send(element)
}
return Array(state.channels.values)
}

for channel in channels {
channel.send(element)
}
}

/// Finishes the async sequences with a normal ending.
/// - Parameter termination: The termination to finish the subject.
public func send(_ termination: Termination<Failure>) {
self.state.withCriticalRegion { state in
let channels = self.state.withCriticalRegion { state -> [AsyncBufferedChannel<Element>] in
state.terminalState = termination
let channels = Array(state.channels.values)
state.channels.removeAll()
for channel in channels {
channel.finish()
}
return channels
}

for channel in channels {
channel.finish()
}
}

func handleNewConsumer() -> (iterator: AsyncBufferedChannel<Element>.Iterator, unregister: @Sendable () -> Void) {
let asyncBufferedChannel = AsyncBufferedChannel<Element>()
var consumerId: Int!
var unregister: (@Sendable () -> Void)?

let (terminalState, current) = self.state.withCriticalRegion { state -> (Termination?, Element) in
(state.terminalState, state.current)
}

if let terminalState = terminalState, terminalState.isFinished {
asyncBufferedChannel.finish()
return (asyncBufferedChannel.makeAsyncIterator(), {})
}

asyncBufferedChannel.send(current)

let consumerId = self.state.withCriticalRegion { state -> Int in
state.ids += 1
state.channels[state.ids] = asyncBufferedChannel
return state.ids
self.state.withCriticalRegion { state in
let terminalState = state.terminalState
if let terminalState, terminalState.isFinished {
asyncBufferedChannel.finish()
} else {
state.ids &+= 1
consumerId = state.ids
state.channels[consumerId] = asyncBufferedChannel
asyncBufferedChannel.send(state.current)
}
}

let unregister = { @Sendable [state] in
state.withCriticalRegion { state in
state.channels[consumerId] = nil
if let consumerId {
unregister = { @Sendable [state, consumerId] in
state.withCriticalRegion { state in
state.channels[consumerId] = nil
}
}
}

return (asyncBufferedChannel.makeAsyncIterator(), unregister)
return (asyncBufferedChannel.makeAsyncIterator(), unregister ?? {})
}

public func makeAsyncIterator() -> AsyncIterator {
Expand Down
20 changes: 12 additions & 8 deletions Sources/AsyncSubjects/AsyncPassthroughSubject.swift
Original file line number Diff line number Diff line change
Expand Up @@ -52,23 +52,27 @@ public final class AsyncPassthroughSubject<Element: Sendable>: AsyncSubject {
/// Sends a value to all consumers
/// - Parameter element: the value to send
public func send(_ element: Element) {
self.state.withCriticalRegion { state in
for channel in state.channels.values {
channel.send(element)
}
let channels = self.state.withCriticalRegion { state in
state.channels.values
}

for channel in channels {
channel.send(element)
}
}

/// Finishes the subject with a normal ending.
/// - Parameter termination: The termination to finish the subject
public func send(_ termination: Termination<Failure>) {
self.state.withCriticalRegion { state in
let channels = self.state.withCriticalRegion { state -> [AsyncBufferedChannel<Element>] in
state.terminalState = termination
let channels = Array(state.channels.values)
state.channels.removeAll()
for channel in channels {
channel.finish()
}
return channels
}

for channel in channels {
channel.finish()
}
}

Expand Down
Loading