Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions src/FSharp.Control.AsyncSeq/AsyncSeq.fs
Original file line number Diff line number Diff line change
Expand Up @@ -551,31 +551,31 @@
let state = ref (TryWithState.NotStarted inp)
{ new IAsyncSeqEnumerator<'T> with
member x.MoveNext() =
async { match !state with
async { match state.Value with
| TryWithState.NotStarted inp ->
let res = ref Unchecked.defaultof<_>
try
res := Choice1Of2 (inp.GetEnumerator())
res.Value <- Choice1Of2 (inp.GetEnumerator())
with exn ->
res := Choice2Of2 exn
res.Value <- Choice2Of2 exn
match res.Value with
| Choice1Of2 r ->
return!
(state := TryWithState.HaveBodyEnumerator r
(state.Value <- TryWithState.HaveBodyEnumerator r
x.MoveNext())
| Choice2Of2 exn ->
return!
(x.Dispose()
let enum = (handler exn).GetEnumerator()
state := TryWithState.HaveHandlerEnumerator enum
state.Value <- TryWithState.HaveHandlerEnumerator enum
x.MoveNext())
| TryWithState.HaveBodyEnumerator e ->
let res = ref Unchecked.defaultof<_>
try
let! r = e.MoveNext()
res := Choice1Of2 r
res.Value <- Choice1Of2 r
with exn ->
res := Choice2Of2 exn
res.Value <- Choice2Of2 exn
match res.Value with
| Choice1Of2 res ->
return
Expand All @@ -587,7 +587,7 @@
return!
(x.Dispose()
let e = (handler exn).GetEnumerator()
state := TryWithState.HaveHandlerEnumerator e
state.Value <- TryWithState.HaveHandlerEnumerator e
x.MoveNext())
| TryWithState.HaveHandlerEnumerator e ->
let! res = e.MoveNext()
Expand All @@ -597,9 +597,9 @@
| _ ->
return None }
member x.Dispose() =
match !state with
match state.Value with
| TryWithState.HaveBodyEnumerator e | TryWithState.HaveHandlerEnumerator e ->
state := TryWithState.Finished
state.Value <- TryWithState.Finished
dispose e
| _ -> () }) :> AsyncSeq<'T>

Expand All @@ -617,11 +617,11 @@
let state = ref (TryFinallyState.NotStarted inp)
{ new IAsyncSeqEnumerator<'T> with
member x.MoveNext() =
async { match !state with
async { match state.Value with
| TryFinallyState.NotStarted inp ->
return!
(let e = inp.GetEnumerator()
state := TryFinallyState.HaveBodyEnumerator e
state.Value <- TryFinallyState.HaveBodyEnumerator e
x.MoveNext())
| TryFinallyState.HaveBodyEnumerator e ->
let! res = e.MoveNext()
Expand All @@ -633,9 +633,9 @@
| _ ->
return None }
member x.Dispose() =
match !state with
match state.Value with
| TryFinallyState.HaveBodyEnumerator e->
state := TryFinallyState.Finished
state.Value <- TryFinallyState.Finished
dispose e
compensation()
| _ -> () }) :> AsyncSeq<'T>
Expand Down Expand Up @@ -770,10 +770,10 @@
let state = ref (MapState.NotStarted inp)
{ new IAsyncSeqEnumerator<'T> with
member x.MoveNext() =
async { match !state with
async { match state.Value with
| MapState.NotStarted inp ->
let e = inp.GetEnumerator()
state := MapState.HaveEnumerator e
state.Value <- MapState.HaveEnumerator e
return! x.MoveNext()
| MapState.HaveEnumerator e ->
return
Expand All @@ -784,9 +784,9 @@
None)
| _ -> return None }
member x.Dispose() =
match !state with
match state.Value with
| MapState.HaveEnumerator e ->
state := MapState.Finished
state.Value <- MapState.Finished
dispose e
| _ -> () }) :> AsyncSeq<'T>

Expand Down Expand Up @@ -2167,7 +2167,7 @@
while cur.Value.IsSome do
yield cur.Value.Value
let! next = ie.MoveNext()
cur := next
cur.Value <- next
finally
ie.Dispose() }
return first, rest }
Expand Down Expand Up @@ -2500,7 +2500,7 @@
| None ->
let t = System.Threading.Tasks.TaskCompletionSource()
tasks.[i] <- t.Task // result never gets set
fin := fin.Value - 1
fin.Value <- fin.Value - 1
}

let combineLatestWithAsync (f:'a -> 'b -> Async<'c>) (source1:AsyncSeq<'a>) (source2:AsyncSeq<'b>) : AsyncSeq<'c> =
Expand Down Expand Up @@ -2703,7 +2703,7 @@

[<CompilerMessage("The result of groupBy must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.", 9999)>]
let groupBy (p:'a -> 'k) (s:AsyncSeq<'a>) : AsyncSeq<'k * AsyncSeq<'a>> =
groupByAsync (p >> async.Return) s

Check warning on line 2706 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.

Check warning on line 2706 in src/FSharp.Control.AsyncSeq/AsyncSeq.fs

View workflow job for this annotation

GitHub Actions / build

The result of groupByAsync must be consumed with a parallel combinator such as AsyncSeq.mapAsyncParallel. Sequential consumption will deadlock because sub-sequence completion depends on other sub-sequences being consumed concurrently.
#endif
#endif

Expand Down
16 changes: 8 additions & 8 deletions tests/fable/FSharp.Control.AsyncSeq.Tests/AsyncSeq.test.fs
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ Jest.describe("AsyncSeq.try", fun () ->
let s =
asyncSeq {
try yield 1
finally x := x.Value + 3
finally x.Value <- x.Value + 3
}

Jest.expect(x.Value).toBe(0)
Expand All @@ -476,8 +476,8 @@ Jest.describe("AsyncSeq.try", fun () ->
try
yield 1
failwith "fffail"
finally x := x.Value + 1
finally x := x.Value + 2
finally x.Value <- x.Value + 1
finally x.Value <- x.Value + 2
}

Jest.expect(x.Value).toBe(0)
Expand All @@ -498,7 +498,7 @@ Jest.describe("AsyncSeq.try", fun () ->
let s =
asyncSeq {
try failwith "ffail"
with _ -> x := x.Value + 3
with _ -> x.Value <- x.Value + 3
}

Jest.expect(x.Value).toBe(0)
Expand All @@ -517,7 +517,7 @@ Jest.describe("AsyncSeq.try", fun () ->
let s =
asyncSeq {
try yield 1
with _ -> x := x.Value + 3
with _ -> x.Value <- x.Value + 3
}

Jest.expect(x.Value).toBe(0)
Expand Down Expand Up @@ -857,7 +857,7 @@ Jest.test("AsyncSeq.while should allow do at end", async {
asyncSeq {
while false do
yield 1
do! async { x := x.Value + 3 }
do! async { x.Value <- x.Value + 3 }
}
|> AsyncSeq.toArrayAsync

Expand Down Expand Up @@ -915,7 +915,7 @@ Jest.describe("AsyncSeq.intervalMs", fun () ->
while actual.Value.Length < 10 do
do! Async.Sleep 10
let! timestamp = interval |> AsyncSeq.take 1 |> AsyncSeq.toArrayAsync
actual := (timestamp |> Array.map (fun d -> d.Ticks)) |> Array.append actual.Value
actual.Value <- (timestamp |> Array.map (fun d -> d.Ticks)) |> Array.append actual.Value
}
|> Async.StartImmediate

Expand Down Expand Up @@ -962,7 +962,7 @@ let observe vs err =
if err then
observer.OnError (Failure "fail")
observer.OnCompleted()
{ new IDisposable with member __.Dispose() = discarded := true } },
{ new IDisposable with member __.Dispose() = discarded.Value <- true } },
(fun _ -> discarded.Value)

Jest.describe("AsyncSeq.ofObservableBuffered", fun () ->
Expand Down
Loading