-
-
Notifications
You must be signed in to change notification settings - Fork 629
Fixing interruption behaviour #3183
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
armanbilge
merged 13 commits into
typelevel:main
from
Angel-O:fixing-interruption-behaviour
May 12, 2023
Merged
Changes from 7 commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
b720a5b
* fixing interruption behaviour
Angel-O b03e1f8
fmt
Angel-O 4955b51
reducing rangeLength by a factor of 10 to avoid timeout on CI
Angel-O b785e49
Merge branch 'typelevel:main' into fixing-interruption-behaviour
Angel-O 8182c8e
simplifying stream, using testcontrol
Angel-O fdc2d16
minor
Angel-O d0f0a3a
reducing rangeLength by a factor of 10 to prevent timeout on js
Angel-O ff688be
Update core/shared/src/test/scala/fs2/StreamCombinatorsSuite.scala
Angel-O c73ab66
removing unused ref
Angel-O b64eb5a
downstreamtimeout
Angel-O 8d57590
adding assertion
Angel-O 9ae18d7
supply increase made clear
Angel-O 89889fa
Merge branch 'typelevel:main' into fixing-interruption-behaviour
Angel-O File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -23,7 +23,7 @@ package fs2 | |||||||||
|
|
||||||||||
| import cats.effect.kernel.Deferred | ||||||||||
| import cats.effect.kernel.Ref | ||||||||||
| import cats.effect.std.{Semaphore, Queue} | ||||||||||
| import cats.effect.std.{Queue, Semaphore} | ||||||||||
| import cats.effect.testkit.TestControl | ||||||||||
| import cats.effect.{IO, SyncIO} | ||||||||||
| import cats.syntax.all._ | ||||||||||
|
|
@@ -34,6 +34,7 @@ import org.scalacheck.Prop.forAll | |||||||||
|
|
||||||||||
| import scala.concurrent.duration._ | ||||||||||
| import scala.concurrent.TimeoutException | ||||||||||
| import scala.util.control.NoStackTrace | ||||||||||
|
|
||||||||||
| class StreamCombinatorsSuite extends Fs2Suite { | ||||||||||
| override def munitIOTimeout = 1.minute | ||||||||||
|
|
@@ -834,6 +835,77 @@ class StreamCombinatorsSuite extends Fs2Suite { | |||||||||
| ) | ||||||||||
| .assertEquals(0.millis) | ||||||||||
| } | ||||||||||
|
|
||||||||||
| test("upstream failures are propagated downstream") { | ||||||||||
| TestControl.executeEmbed { | ||||||||||
| case object SevenNotAllowed extends NoStackTrace | ||||||||||
|
|
||||||||||
| val source = Stream | ||||||||||
| .iterate(0)(_ + 1) | ||||||||||
| .covary[IO] | ||||||||||
| .evalTap(n => IO.raiseError(SevenNotAllowed).whenA(n == 7)) | ||||||||||
|
|
||||||||||
| val downstream = source.groupWithin(100, 2.seconds) | ||||||||||
|
|
||||||||||
| downstream.intercept[SevenNotAllowed.type] | ||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we make an assertion here about what the downstream has / has not received before the error? |
||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| test( | ||||||||||
| "upstream interruption causes immediate downstream termination with all elements being emitted" | ||||||||||
| ) { | ||||||||||
|
|
||||||||||
| val sourceTimeout = 5.5.seconds | ||||||||||
| val downstreamTimeout = sourceTimeout + 2.seconds | ||||||||||
|
|
||||||||||
| TestControl | ||||||||||
| .executeEmbed( | ||||||||||
| Ref[IO] | ||||||||||
| .of(0.millis) | ||||||||||
| .flatMap { ref => | ||||||||||
| val source: Stream[IO, Int] = | ||||||||||
| Stream | ||||||||||
| .iterate(0)(_ + 1) | ||||||||||
| .covary[IO] | ||||||||||
| .meteredStartImmediately(1.second) | ||||||||||
| .interruptAfter(sourceTimeout) | ||||||||||
|
|
||||||||||
| // large chunkSize and timeout (no emissions expected in the window | ||||||||||
| // specified, unless source ends, due to interruption or | ||||||||||
| // natural termination (i.e runs out of elements) | ||||||||||
| val downstream: Stream[IO, Chunk[Int]] = | ||||||||||
| source.groupWithin(Int.MaxValue, 1.day) | ||||||||||
|
|
||||||||||
| downstream.compile.lastOrError | ||||||||||
| .map(_.toList) | ||||||||||
| .timeout(downstreamTimeout) | ||||||||||
| .flatTap(_ => IO.monotonic.flatMap(ref.set)) | ||||||||||
| .flatMap(emit => ref.get.map(timeLapsed => (timeLapsed, emit))) | ||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||
| } | ||||||||||
| ) | ||||||||||
| .assertEquals( | ||||||||||
| // downstream ended immediately (i.e timeLapsed = sourceTimeout) | ||||||||||
| // emitting whatever was accumulated at the time of interruption | ||||||||||
| (sourceTimeout, List(0, 1, 2, 3, 4, 5)) | ||||||||||
| ) | ||||||||||
| } | ||||||||||
|
|
||||||||||
| test("stress test: all elements are processed") { | ||||||||||
| val rangeLength = 10000 | ||||||||||
|
|
||||||||||
| Stream | ||||||||||
| .eval(Ref.of[IO, Int](0)) | ||||||||||
| .flatMap { counter => | ||||||||||
| Stream | ||||||||||
| .range(0, rangeLength) | ||||||||||
| .covary[IO] | ||||||||||
| .groupWithin(4096, 100.micros) | ||||||||||
| .evalTap(ch => counter.update(_ + ch.size)) *> Stream.eval(counter.get) | ||||||||||
| } | ||||||||||
| .compile | ||||||||||
| .lastOrError | ||||||||||
| .assertEquals(rangeLength) | ||||||||||
| } | ||||||||||
| } | ||||||||||
|
|
||||||||||
| property("head")(forAll((s: Stream[Pure, Int]) => assertEquals(s.head.toList, s.toList.take(1)))) | ||||||||||
|
|
||||||||||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, dumb question: why is
Int.MaxValuea "magic number" in this context? I would have thought it's effectively maxing out the semaphore, but if it needs+ outputLongto work then I feel like it must have more significance?Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Legit question to be fair. Had to think about it again.
Interruption of the upstream fiber (i.e. Outcome.Cancelled) is handled downstream by doing nothing (permits are never released)
So by increasing the supply to
Int.MaxValuewe are just evening out the negative balance (Int.MaxValueis to account for the worst case scenario: at most thechunkSizeparameter will be equal toInt.MaxValue)Now after getting past the "checkpoint" above we are acquiring
outputLongpermits againSo in order to get past this point we need to release an additional
outputLongpermits and that allows the stream to be unblockedEDIT
uhm well actually I've just tested it, it is not handled with
Outcome.Cancelled...There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for that explanation!
So could we just use
chunkSizehere, instead ofInt.MaxValue?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@armanbilge apologies I was wrong, that's not what's happening here. I'm just doing some tests to figure out why we need the additional
outputLongThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, if these implementations details are no longer relevant after your rewrite in the other PR, then let's not get too hung up on this one :)
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok I think I've figured it out (might be useful for the other implementation actually)
basically the problem is that we need enough supply to cover 2 iterations of the race loop. So if we only increase it by
Int.MaxValuethe following will happenif instead we increase it by
Int.MaxValue + outputLongoutputLongSo since the chunkSize can be as high as
Int.MaxValuethen the minimum supply to unblock the semaphore should beInt.MaxValue + outputLongThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Key word being "can". Wouldn't
chunkSize + outputLongbe sufficient?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah that should work. The test still passes, I'll change it to
outputLong * 2since chunkSize == outputLong