@@ -1401,110 +1401,6 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
14011401 * @param timeout maximum time that input elements are held in the buffer
14021402 * before being emitted by the resulting stream.
14031403 */
1404- def groupWithin1 [F2 [x] >: F [x]](
1405- chunkSize : Int ,
1406- timeout : FiniteDuration
1407- )(implicit F : Temporal [F2 ]): Stream [F2 , Chunk [O ]] = {
1408-
1409- case class JunctionBuffer [T ](
1410- data : Vector [T ],
1411- endOfSupply : Option [Either [Throwable , Unit ]],
1412- endOfDemand : Option [Either [Throwable , Unit ]]
1413- ) {
1414- def splitAt (n : Int ): (JunctionBuffer [T ], JunctionBuffer [T ]) =
1415- if (this .data.size >= n) {
1416- val (head, tail) = this .data.splitAt(n.toInt)
1417- (this .copy(tail), this .copy(head))
1418- } else {
1419- (this .copy(Vector .empty), this )
1420- }
1421- }
1422-
1423- val outputLong = chunkSize.toLong
1424- fs2.Stream .force {
1425- for {
1426- demand <- Semaphore [F2 ](outputLong)
1427- supply <- Semaphore [F2 ](0L )
1428- buffer <- Ref [F2 ].of(
1429- JunctionBuffer [O ](Vector .empty[O ], endOfSupply = None , endOfDemand = None )
1430- )
1431- } yield {
1432- /* - Buffer: stores items from input to be sent on next output chunk
1433- * - Demand Semaphore: to avoid adding too many items to buffer
1434- * - Supply: counts filled positions for next output chunk */
1435- def enqueue (t : O ): F2 [Boolean ] =
1436- for {
1437- _ <- demand.acquire
1438- buf <- buffer.modify(buf => (buf.copy(buf.data :+ t), buf))
1439- _ <- supply.release
1440- } yield buf.endOfDemand.isEmpty
1441-
1442- val dequeueNextOutput : F2 [Option [Vector [O ]]] = {
1443- // Trigger: waits until the supply buffer is full (with acquireN)
1444- val waitSupply = supply.acquireN(outputLong).guaranteeCase {
1445- case Outcome .Succeeded (_) => supply.releaseN(outputLong)
1446- case _ => F .unit
1447- }
1448-
1449- val onTimeout : F2 [Long ] =
1450- for {
1451- _ <- supply.acquire // waits until there is at least one element in buffer
1452- m <- supply.available
1453- k = m.min(outputLong - 1 )
1454- b <- supply.tryAcquireN(k)
1455- } yield if (b) k + 1 else 1
1456-
1457- // in JS cancellation doesn't always seem to run, so race conditions should restore state on their own
1458- for {
1459- acq <- F .race(F .sleep(timeout), waitSupply).flatMap {
1460- case Left (_) => onTimeout
1461- case Right (_) => supply.acquireN(outputLong).as(outputLong)
1462- }
1463- buf <- buffer.modify(_.splitAt(acq.toInt))
1464- _ <- demand.releaseN(buf.data.size.toLong)
1465- res <- buf.endOfSupply match {
1466- case Some (Left (error)) => F .raiseError(error)
1467- case Some (Right (_)) if buf.data.isEmpty => F .pure(None )
1468- case _ => F .pure(Some (buf.data))
1469- }
1470- } yield res
1471- }
1472-
1473- def endSupply (result : Either [Throwable , Unit ]): F2 [Unit ] =
1474- buffer.update(_.copy(endOfSupply = Some (result))) *> supply.releaseN(Int .MaxValue )
1475-
1476- def endDemand (result : Either [Throwable , Unit ]): F2 [Unit ] =
1477- buffer.update(_.copy(endOfDemand = Some (result))) *> demand.releaseN(Int .MaxValue )
1478-
1479- def toEnding (ec : ExitCase ): Either [Throwable , Unit ] = ec match {
1480- case ExitCase .Succeeded => Right (())
1481- case ExitCase .Errored (e) => Left (e)
1482- case ExitCase .Canceled => Right (())
1483- }
1484-
1485- val enqueueAsync = F .start {
1486- this
1487- .evalMap(enqueue)
1488- .forall(identity)
1489- .onFinalizeCase(ec => endSupply(toEnding(ec)))
1490- .compile
1491- .drain
1492- }
1493-
1494- val outputStream : Stream [F2 , Chunk [O ]] =
1495- Stream
1496- .eval(dequeueNextOutput)
1497- .repeat
1498- .collectWhile { case Some (data) => Chunk .vector(data) }
1499-
1500- Stream
1501- .bracketCase(enqueueAsync) { case (upstream, exitCase) =>
1502- endDemand(toEnding(exitCase)) *> upstream.cancel
1503- } >> outputStream
1504- }
1505- }
1506- }
1507-
15081404 def groupWithin [F2 [x] >: F [x]](
15091405 chunkSize : Int ,
15101406 timeout : FiniteDuration
0 commit comments