From 05ecb3ee0394023b91bbe190ae2891a3159bfeb6 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Thu, 21 May 2026 11:40:34 +0530 Subject: [PATCH 01/22] Add consInput unfold operation --- .../src/Streamly/Internal/Data/Unfold/Type.hs | 40 ++++++++++++++++--- 1 file changed, 35 insertions(+), 5 deletions(-) diff --git a/core/src/Streamly/Internal/Data/Unfold/Type.hs b/core/src/Streamly/Internal/Data/Unfold/Type.hs index 3150fe605c..34da5212bd 100644 --- a/core/src/Streamly/Internal/Data/Unfold/Type.hs +++ b/core/src/Streamly/Internal/Data/Unfold/Type.hs @@ -70,11 +70,12 @@ module Streamly.Internal.Data.Unfold.Type , lmapM , map , mapM - , both - , supply - , first - , second - , carry + , supply -- input or useInput + , first -- asFirst + , second --asSecond + , carry -- XXX carryInput? + , consInput + , consInputWith -- * Trimming , takeWhileM @@ -114,6 +115,7 @@ module Streamly.Internal.Data.Unfold.Type , map2 , mapM2 , takeWhileMWithInput + , both ) where @@ -472,6 +474,34 @@ carry (Unfold ustep uinject) = Unfold step (\a -> (a,) <$> uinject a) {-# INLINE_LATE step #-} step (a, st) = fmap (func a) (ustep st) +{-# ANN type ConsInputState Fuse #-} +data ConsInputState a s = ConsInputFirst a s | ConsInputRest s + +-- | Prepend @f a@ to the output of the unfold, where @a@ is the input seed. +-- +{-# INLINE_NORMAL consInputWith #-} +consInputWith :: Applicative m => (a -> b) -> Unfold m a b -> Unfold m a b +consInputWith f (Unfold ustep uinject) = Unfold step inject + + where + + inject a = ConsInputFirst a <$> uinject a + + next r = case r of + Yield x s1 -> Yield x (ConsInputRest s1) + Skip s1 -> Skip (ConsInputRest s1) + Stop -> Stop + + {-# INLINE_LATE step #-} + step (ConsInputFirst a s) = pure $ Yield (f a) (ConsInputRest s) + step (ConsInputRest s) = next <$> ustep s + +-- | Prepend the input seed to the output of the unfold. +-- +{-# INLINE consInput #-} +consInput :: Applicative m => Unfold m a a -> Unfold m a a +consInput = consInputWith id + {-# DEPRECATED map2 "Use carry with map instead." #-} {-# INLINE_NORMAL map2 #-} map2 :: Functor m => (a -> b -> c) -> Unfold m a b -> Unfold m a c From 796f054bacb83a3370f739eb4b324d53e0453859 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Thu, 21 May 2026 11:59:51 +0530 Subject: [PATCH 02/22] Add tests for consInput unfold combinator --- core/src/Streamly/Internal/Data/Unfold/Type.hs | 6 ++++++ test/Streamly/Test/Data/Unfold.hs | 12 ++++++++++++ 2 files changed, 18 insertions(+) diff --git a/core/src/Streamly/Internal/Data/Unfold/Type.hs b/core/src/Streamly/Internal/Data/Unfold/Type.hs index 34da5212bd..d94822a04d 100644 --- a/core/src/Streamly/Internal/Data/Unfold/Type.hs +++ b/core/src/Streamly/Internal/Data/Unfold/Type.hs @@ -479,6 +479,9 @@ data ConsInputState a s = ConsInputFirst a s | ConsInputRest s -- | Prepend @f a@ to the output of the unfold, where @a@ is the input seed. -- +-- >>> Unfold.fold Fold.toList (Unfold.consInputWith length Unfold.fromList) [1,2,3] +-- [3,1,2,3] +-- {-# INLINE_NORMAL consInputWith #-} consInputWith :: Applicative m => (a -> b) -> Unfold m a b -> Unfold m a b consInputWith f (Unfold ustep uinject) = Unfold step inject @@ -498,6 +501,9 @@ consInputWith f (Unfold ustep uinject) = Unfold step inject -- | Prepend the input seed to the output of the unfold. -- +-- >>> Unfold.fold Fold.toList (Unfold.consInput (Unfold.function (* 2))) 3 +-- [3,6] +-- {-# INLINE consInput #-} consInput :: Applicative m => Unfold m a a -> Unfold m a a consInput = consInputWith id diff --git a/test/Streamly/Test/Data/Unfold.hs b/test/Streamly/Test/Data/Unfold.hs index ac5d60e813..fd7bdf7ae9 100644 --- a/test/Streamly/Test/Data/Unfold.hs +++ b/test/Streamly/Test/Data/Unfold.hs @@ -100,6 +100,16 @@ swap = let unf = UF.swap (UF.function id) in testUnfold unf ((1, 2) :: (Int, Int)) [(2, 1)] +consInput :: Bool +consInput = + let unf = UF.consInput (UF.function (* 2)) + in testUnfold unf (3 :: Int) [3, 6] + +consInputWith :: Bool +consInputWith = + let unf = UF.consInputWith length UF.fromList + in testUnfold unf ([1, 2, 3] :: [Int]) [3, 1, 2, 3] + ------------------------------------------------------------------------------- -- Stream generation ------------------------------------------------------------------------------- @@ -590,6 +600,8 @@ testInputOps = prop "discardFirst" discardFirst prop "discardSecond" discardSecond prop "swap" swap + prop "consInput" consInput + prop "consInputWith" consInputWith testGeneration :: Spec testGeneration = From 9ab295059a3f02e1659247366a8bdf5c3808173d Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Wed, 20 May 2026 18:35:25 +0530 Subject: [PATCH 03/22] Add unfoldLast in Data.Stream --- .../src/Streamly/Internal/Data/Stream/Type.hs | 48 +++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/core/src/Streamly/Internal/Data/Stream/Type.hs b/core/src/Streamly/Internal/Data/Stream/Type.hs index 3dde91b070..1345024280 100644 --- a/core/src/Streamly/Internal/Data/Stream/Type.hs +++ b/core/src/Streamly/Internal/Data/Stream/Type.hs @@ -1645,6 +1645,54 @@ unfoldCross :: Monad m => Unfold m (a,b) c -> Stream m a -> Stream m b -> Stream m c unfoldCross unf m1 m2 = unfoldEach unf $ crossWith (,) m1 m2 +{-# ANN type UnfoldLastState Fuse #-} +data UnfoldLastState o i b = + UnfoldLastInput o (Maybe b) + | UnfoldLastInject (Maybe b) + | UnfoldLastOutput i + +-- | Append to the input stream a stream generated from terminal state +-- accumulated while consuming the input stream. +-- +-- The unfold is seeded with: +-- +-- * 'Just x' where @x@ is the final element of the stream +-- * 'Nothing' if the stream is empty +-- +-- This is useful when a stream processing phase needs to hand over +-- terminal state to another stream generation phase. +-- +-- For example, one stream may incrementally compute state using scans +-- or folds, and the resulting terminal state can then be used to +-- generate a follow-up stream using general unfolding primitives. +unfoldLast :: Applicative m + => Unfold m (Maybe b) b + -> Stream m b + -> Stream m b +{-# INLINE_NORMAL unfoldLast #-} +unfoldLast (Unfold ustep inject) (Stream ostep ost) = + Stream step (UnfoldLastInput ost Nothing) + + where + + {-# INLINE_LATE step #-} + step gst (UnfoldLastInput o lst) = + (\r -> case r of + Yield x o1 -> Yield x (UnfoldLastInput o1 (Just x)) + Skip o1 -> Skip (UnfoldLastInput o1 lst) + Stop -> Skip (UnfoldLastInject lst) + ) <$> ostep (adaptState gst) o + + step _ (UnfoldLastInject lst) = + (Skip . UnfoldLastOutput) <$> inject lst + + step _ (UnfoldLastOutput i) = + (\r -> case r of + Yield x i1 -> Yield x (UnfoldLastOutput i1) + Skip i1 -> Skip (UnfoldLastOutput i1) + Stop -> Stop + ) <$> ustep i + ------------------------------------------------------------------------------ -- Combine N Streams - concatMap ------------------------------------------------------------------------------ From ec411172504910eb9258d3888c553af9977ea9db Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Wed, 20 May 2026 19:12:33 +0530 Subject: [PATCH 04/22] Add test case for unfoldLast --- .../src/Streamly/Internal/Data/Stream/Type.hs | 11 +++++++++ test/Streamly/Test/Data/Stream/Type.hs | 24 +++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/core/src/Streamly/Internal/Data/Stream/Type.hs b/core/src/Streamly/Internal/Data/Stream/Type.hs index 1345024280..353c4302a4 100644 --- a/core/src/Streamly/Internal/Data/Stream/Type.hs +++ b/core/src/Streamly/Internal/Data/Stream/Type.hs @@ -124,6 +124,10 @@ module Streamly.Internal.Data.Stream.Type -- * UnfoldCross , unfoldCross + -- * Unfold Last + , UnfoldLastState (..) + , unfoldLast + -- * ConcatMap -- | Generate streams by mapping a stream generator on each element of an -- input stream, append the resulting streams and flatten. @@ -1665,6 +1669,13 @@ data UnfoldLastState o i b = -- For example, one stream may incrementally compute state using scans -- or folds, and the resulting terminal state can then be used to -- generate a follow-up stream using general unfolding primitives. +-- +-- >>> trailer = Unfold.lmap (maybe [-1] (\x -> [x*10, x*100])) Unfold.fromList +-- >>> Stream.toList $ Stream.unfoldLast trailer (Stream.fromList [1,2,3 :: Int]) +-- [1,2,3,30,300] +-- >>> Stream.toList $ Stream.unfoldLast trailer (Stream.fromList ([] :: [Int])) +-- [-1] +-- unfoldLast :: Applicative m => Unfold m (Maybe b) b -> Stream m b diff --git a/test/Streamly/Test/Data/Stream/Type.hs b/test/Streamly/Test/Data/Stream/Type.hs index b1a629dffb..c63c95b2ee 100644 --- a/test/Streamly/Test/Data/Stream/Type.hs +++ b/test/Streamly/Test/Data/Stream/Type.hs @@ -39,6 +39,26 @@ testgroupsOf = (Stream.groupsOf 2 Fold.sum (Stream.enumerateFromTo 1 10)) `shouldReturn` [3::Int, 7, 11, 15, 19] +testUnfoldLastNonEmpty :: Expectation +testUnfoldLastNonEmpty = + Stream.toList + (Stream.unfoldLast trailer (Stream.fromList [1, 2, 3 :: Int])) + `shouldReturn` [1, 2, 3, 30, 300] + + where + + trailer = Unfold.lmap (maybe [-1] (\x -> [x * 10, x * 100])) Unfold.fromList + +testUnfoldLastEmpty :: Expectation +testUnfoldLastEmpty = + Stream.toList + (Stream.unfoldLast trailer (Stream.fromList ([] :: [Int]))) + `shouldReturn` [-1] + + where + + trailer = Unfold.lmap (maybe [-1] (\x -> [x * 10, x * 100])) Unfold.fromList + moduleName :: String moduleName = "Data.Stream" @@ -155,3 +175,7 @@ main = hspec describe "Tests for Stream.groupsOf" $ do prop "testgroupsOf" testgroupsOf + + describe "Tests for Stream.unfoldLast" $ do + prop "testUnfoldLastNonEmpty" testUnfoldLastNonEmpty + prop "testUnfoldLastEmpty" testUnfoldLastEmpty From 9f90e409939b3cc79a3478550706835603a5aad2 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Thu, 21 May 2026 12:08:14 +0530 Subject: [PATCH 05/22] Rename unfoldLast to appendUnfoldLast --- .../src/Streamly/Internal/Data/Stream/Type.hs | 44 +++++++++---------- test/Streamly/Test/Data/Stream/Type.hs | 18 ++++---- 2 files changed, 31 insertions(+), 31 deletions(-) diff --git a/core/src/Streamly/Internal/Data/Stream/Type.hs b/core/src/Streamly/Internal/Data/Stream/Type.hs index 353c4302a4..2a019f49d3 100644 --- a/core/src/Streamly/Internal/Data/Stream/Type.hs +++ b/core/src/Streamly/Internal/Data/Stream/Type.hs @@ -125,8 +125,8 @@ module Streamly.Internal.Data.Stream.Type , unfoldCross -- * Unfold Last - , UnfoldLastState (..) - , unfoldLast + , AppendUnfoldLastState (..) + , appendUnfoldLast -- * ConcatMap -- | Generate streams by mapping a stream generator on each element of an @@ -1649,11 +1649,11 @@ unfoldCross :: Monad m => Unfold m (a,b) c -> Stream m a -> Stream m b -> Stream m c unfoldCross unf m1 m2 = unfoldEach unf $ crossWith (,) m1 m2 -{-# ANN type UnfoldLastState Fuse #-} -data UnfoldLastState o i b = - UnfoldLastInput o (Maybe b) - | UnfoldLastInject (Maybe b) - | UnfoldLastOutput i +{-# ANN type AppendUnfoldLastState Fuse #-} +data AppendUnfoldLastState o i b = + AppendUnfoldLastInput o (Maybe b) + | AppendUnfoldLastInject (Maybe b) + | AppendUnfoldLastOutput i -- | Append to the input stream a stream generated from terminal state -- accumulated while consuming the input stream. @@ -1671,36 +1671,36 @@ data UnfoldLastState o i b = -- generate a follow-up stream using general unfolding primitives. -- -- >>> trailer = Unfold.lmap (maybe [-1] (\x -> [x*10, x*100])) Unfold.fromList --- >>> Stream.toList $ Stream.unfoldLast trailer (Stream.fromList [1,2,3 :: Int]) +-- >>> Stream.toList $ Stream.appendUnfoldLast trailer (Stream.fromList [1,2,3 :: Int]) -- [1,2,3,30,300] --- >>> Stream.toList $ Stream.unfoldLast trailer (Stream.fromList ([] :: [Int])) +-- >>> Stream.toList $ Stream.appendUnfoldLast trailer (Stream.fromList ([] :: [Int])) -- [-1] -- -unfoldLast :: Applicative m +appendUnfoldLast :: Applicative m => Unfold m (Maybe b) b -> Stream m b -> Stream m b -{-# INLINE_NORMAL unfoldLast #-} -unfoldLast (Unfold ustep inject) (Stream ostep ost) = - Stream step (UnfoldLastInput ost Nothing) +{-# INLINE_NORMAL appendUnfoldLast #-} +appendUnfoldLast (Unfold ustep inject) (Stream ostep ost) = + Stream step (AppendUnfoldLastInput ost Nothing) where {-# INLINE_LATE step #-} - step gst (UnfoldLastInput o lst) = + step gst (AppendUnfoldLastInput o lst) = (\r -> case r of - Yield x o1 -> Yield x (UnfoldLastInput o1 (Just x)) - Skip o1 -> Skip (UnfoldLastInput o1 lst) - Stop -> Skip (UnfoldLastInject lst) + Yield x o1 -> Yield x (AppendUnfoldLastInput o1 (Just x)) + Skip o1 -> Skip (AppendUnfoldLastInput o1 lst) + Stop -> Skip (AppendUnfoldLastInject lst) ) <$> ostep (adaptState gst) o - step _ (UnfoldLastInject lst) = - (Skip . UnfoldLastOutput) <$> inject lst + step _ (AppendUnfoldLastInject lst) = + (Skip . AppendUnfoldLastOutput) <$> inject lst - step _ (UnfoldLastOutput i) = + step _ (AppendUnfoldLastOutput i) = (\r -> case r of - Yield x i1 -> Yield x (UnfoldLastOutput i1) - Skip i1 -> Skip (UnfoldLastOutput i1) + Yield x i1 -> Yield x (AppendUnfoldLastOutput i1) + Skip i1 -> Skip (AppendUnfoldLastOutput i1) Stop -> Stop ) <$> ustep i diff --git a/test/Streamly/Test/Data/Stream/Type.hs b/test/Streamly/Test/Data/Stream/Type.hs index c63c95b2ee..6c180174d7 100644 --- a/test/Streamly/Test/Data/Stream/Type.hs +++ b/test/Streamly/Test/Data/Stream/Type.hs @@ -39,20 +39,20 @@ testgroupsOf = (Stream.groupsOf 2 Fold.sum (Stream.enumerateFromTo 1 10)) `shouldReturn` [3::Int, 7, 11, 15, 19] -testUnfoldLastNonEmpty :: Expectation -testUnfoldLastNonEmpty = +testAppendUnfoldLastNonEmpty :: Expectation +testAppendUnfoldLastNonEmpty = Stream.toList - (Stream.unfoldLast trailer (Stream.fromList [1, 2, 3 :: Int])) + (Stream.appendUnfoldLast trailer (Stream.fromList [1, 2, 3 :: Int])) `shouldReturn` [1, 2, 3, 30, 300] where trailer = Unfold.lmap (maybe [-1] (\x -> [x * 10, x * 100])) Unfold.fromList -testUnfoldLastEmpty :: Expectation -testUnfoldLastEmpty = +testAppendUnfoldLastEmpty :: Expectation +testAppendUnfoldLastEmpty = Stream.toList - (Stream.unfoldLast trailer (Stream.fromList ([] :: [Int]))) + (Stream.appendUnfoldLast trailer (Stream.fromList ([] :: [Int]))) `shouldReturn` [-1] where @@ -176,6 +176,6 @@ main = hspec describe "Tests for Stream.groupsOf" $ do prop "testgroupsOf" testgroupsOf - describe "Tests for Stream.unfoldLast" $ do - prop "testUnfoldLastNonEmpty" testUnfoldLastNonEmpty - prop "testUnfoldLastEmpty" testUnfoldLastEmpty + describe "Tests for Stream.appendUnfoldLast" $ do + prop "testAppendUnfoldLastNonEmpty" testAppendUnfoldLastNonEmpty + prop "testAppendUnfoldLastEmpty" testAppendUnfoldLastEmpty From 8bc80acbeb0d8074d63396c346475b0ebf74348d Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Thu, 21 May 2026 20:09:09 +0530 Subject: [PATCH 06/22] Add signatures of some append/unfold stream ops --- .../Streamly/Internal/Data/Stream/Nesting.hs | 156 +++++++++++++++++- .../Internal/Data/Stream/Transform.hs | 25 +++ .../src/Streamly/Internal/Data/Stream/Type.hs | 64 +------ 3 files changed, 185 insertions(+), 60 deletions(-) diff --git a/core/src/Streamly/Internal/Data/Stream/Nesting.hs b/core/src/Streamly/Internal/Data/Stream/Nesting.hs index 55f230a15f..b20972fccc 100644 --- a/core/src/Streamly/Internal/Data/Stream/Nesting.hs +++ b/core/src/Streamly/Internal/Data/Stream/Nesting.hs @@ -36,11 +36,21 @@ module Streamly.Internal.Data.Stream.Nesting -- -- @Stream m a -> Stream m a -> Stream m a@. + -- *** Appending + AppendUnfoldLastState (..) + + -- stateful appends + , appendIfEmpty + -- , appendUnfoldFirst -- XXX this may not be useful + , appendUnfoldLast + -- , appendUnfoldIndices + , appendMapLast + -- *** Interleaving -- | Interleave elements from two streams alternately. A special case of -- unfoldEachInterleave. Interleave is equivalent to mergeBy with a round -- robin merge function. - InterleaveState(..) + , InterleaveState(..) , interleave , interleaveEndBy' , interleaveSepBy' @@ -75,6 +85,10 @@ module Streamly.Internal.Data.Stream.Nesting -- unfoldEach: Unfold m a b -> Stream m a -> Stream m b -- @ + -- *** Stateful unfolds + -- unfoldFirst + , unfoldLast + -- *** unfoldEach -- | Generate streams by using an unfold on each element of an input -- stream, append the resulting streams and flatten. A special case of @@ -104,6 +118,10 @@ module Streamly.Internal.Data.Stream.Nesting , intercalateSepBy , intercalateEndBy + -- *** stateful concatMap + -- concatMapFirst + , concatMapLast + -- *** concatMap , fairConcatMapM , fairConcatMap @@ -168,6 +186,98 @@ import Prelude hiding (concatMap, zipWith) #include "DocTestDataStream.hs" +------------------------------------------------------------------------------ +-- Appending +------------------------------------------------------------------------------ + +-- NOTE: If we want to view this as the second stream transformed by the first +-- stream then we can use the name "onEmpty" instead. +-- +-- The name appendIfEmpty might sound like there is nothing being appended, so +-- why append in the name. But actually the first stream is fully evaluated +-- even when it is empty, so the effects are generated even in that case, the +-- second stream is appended to the first's effects if there is no visible +-- output. + +-- | Use the second stream only if the first stream is empty. +-- +-- The first stream is evaluated completely. If it yields at least one +-- element, its output is used and the second stream is discarded. +-- Otherwise, the second stream is evaluated and used instead. +-- +-- > appendIfEmpty [1,2] [3,4] == [1,2] +-- > appendIfEmpty [] [3,4] == [3,4] +-- +-- This is analogous to a left-biased alternative on streams. +-- +-- /Unimplemented/ +-- +appendIfEmpty :: Stream m a -> Stream m a -> Stream m a +-- ifEmpty s1 s2 = appendMapLast (\x -> maybe s2 (const nil) x) s1 +appendIfEmpty = undefined + +{-# ANN type AppendUnfoldLastState Fuse #-} +data AppendUnfoldLastState o i b = + AppendUnfoldLastInput o (Maybe b) + | AppendUnfoldLastInject (Maybe b) + | AppendUnfoldLastOutput i + +-- | Append to the input stream a stream generated from terminal state +-- accumulated while consuming the input stream. +-- +-- The unfold is seeded with: +-- +-- * 'Just x' where @x@ is the final element of the stream +-- * 'Nothing' if the stream is empty +-- +-- This is useful when a stream processing phase needs to hand over +-- terminal state to another stream generation phase. +-- +-- For example, one stream may incrementally compute state using scans +-- or folds, and the resulting terminal state can then be used to +-- generate a follow-up stream using general unfolding primitives. +-- +-- >>> trailer = Unfold.lmap (maybe [-1] (\x -> [x*10, x*100])) Unfold.fromList +-- >>> Stream.toList $ Stream.appendUnfoldLast trailer (Stream.fromList [1,2,3 :: Int]) +-- [1,2,3,30,300] +-- >>> Stream.toList $ Stream.appendUnfoldLast trailer (Stream.fromList ([] :: [Int])) +-- [-1] +-- +{-# INLINE_NORMAL appendUnfoldLast #-} +appendUnfoldLast :: Applicative m + => Unfold m (Maybe b) b + -> Stream m b + -> Stream m b +appendUnfoldLast (Unfold ustep inject) (Stream ostep ost) = + Stream step (AppendUnfoldLastInput ost Nothing) + + where + + {-# INLINE_LATE step #-} + step gst (AppendUnfoldLastInput o lst) = + (\r -> case r of + Yield x o1 -> Yield x (AppendUnfoldLastInput o1 (Just x)) + Skip o1 -> Skip (AppendUnfoldLastInput o1 lst) + Stop -> Skip (AppendUnfoldLastInject lst) + ) <$> ostep (adaptState gst) o + + step _ (AppendUnfoldLastInject lst) = + (Skip . AppendUnfoldLastOutput) <$> inject lst + + step _ (AppendUnfoldLastOutput i) = + (\r -> case r of + Yield x i1 -> Yield x (AppendUnfoldLastOutput i1) + Skip i1 -> Skip (AppendUnfoldLastOutput i1) + Stop -> Stop + ) <$> ustep i + +-- | +-- /Unimplemented/ +-- +{-# INLINE_NORMAL appendMapLast #-} +appendMapLast :: (a -> Stream m a) -> Stream m b -> Stream m b +appendMapLast = undefined + ------------------------------------------------------------------------------ -- Interleaving ------------------------------------------------------------------------------ @@ -649,6 +759,50 @@ mergeFstBy :: -- Monad m => mergeFstBy _f _m1 _m2 = undefined -- fromStreamK $ D.mergeFstBy f (toStreamD m1) (toStreamD m2) +------------------------------------------------------------------------------ +-- Selective unfold +------------------------------------------------------------------------------ + +-- | Replace the final element of a stream by unfolding a stream from it. +-- +-- The final element is removed from the output stream and used as the +-- seed to generate a replacement stream using the supplied 'Unfold'. +-- +-- > [a,b,c] -> [a,b] <> unfold c +-- +-- This is analogous to 'concatMap', but applied only to the terminal +-- element of the stream. +-- +-- If the stream is empty, the result is empty. +-- +-- /Unimplemented/ +-- +{-# INLINE unfoldLast #-} +unfoldLast :: Unfold m (Maybe a) a -> Stream m a -> Stream m a +unfoldLast = undefined + +------------------------------------------------------------------------------ +-- Selective ConcatMap +------------------------------------------------------------------------------ + +-- | Replace the final element of a stream using a stream-valued mapping. +-- +-- The final element is removed from the output stream and replaced by +-- the stream generated by applying the supplied function to it. +-- +-- > [a,b,c] -> [a,b] <> f c +-- +-- This is analogous to 'concatMap', but applied only to the terminal +-- element of the stream. +-- +-- If the stream is empty, the result is empty. +-- +-- /Unimplemented/ +-- +{-# INLINE concatMapLast #-} +concatMapLast :: (Maybe a -> Stream m a) -> Stream m a -> Stream m a +concatMapLast = undefined + ------------------------------------------------------------------------------ -- Combine N Streams - unfoldEach ------------------------------------------------------------------------------ diff --git a/core/src/Streamly/Internal/Data/Stream/Transform.hs b/core/src/Streamly/Internal/Data/Stream/Transform.hs index 156a23f6ec..dd13fbad0b 100644 --- a/core/src/Streamly/Internal/Data/Stream/Transform.hs +++ b/core/src/Streamly/Internal/Data/Stream/Transform.hs @@ -160,6 +160,12 @@ module Streamly.Internal.Data.Stream.Transform , rollingMapM , rollingMap2 + -- * Selective Maps + -- , modifyFirst + , modifyLast + -- , modifyLastN -- using a ring buffer + -- , modifyIndices + -- * Maybe Streams , mapMaybe , mapMaybeM @@ -2018,6 +2024,25 @@ rollingMap2 f = catMaybes . rollingMap g g Nothing _ = Nothing g (Just x) y = Just (f x y) +------------------------------------------------------------------------------ +-- Selective Map +------------------------------------------------------------------------------ + +-- | Modify the final element of a stream. +-- +-- The supplied function is applied only to the terminal element of +-- the stream. All preceding elements are emitted unchanged. +-- +-- > modifyLast f [a,b,c] == [a,b,f c] +-- +-- If the stream is empty, the result is empty. +-- +-- /Unimplemented/ +-- +{-# INLINE modifyLast #-} +modifyLast :: (a -> a) -> Stream m a -> Stream m a +modifyLast = undefined + ------------------------------------------------------------------------------ -- Maybe Streams ------------------------------------------------------------------------------ diff --git a/core/src/Streamly/Internal/Data/Stream/Type.hs b/core/src/Streamly/Internal/Data/Stream/Type.hs index 2a019f49d3..e0a18cb51b 100644 --- a/core/src/Streamly/Internal/Data/Stream/Type.hs +++ b/core/src/Streamly/Internal/Data/Stream/Type.hs @@ -124,10 +124,6 @@ module Streamly.Internal.Data.Stream.Type -- * UnfoldCross , unfoldCross - -- * Unfold Last - , AppendUnfoldLastState (..) - , appendUnfoldLast - -- * ConcatMap -- | Generate streams by mapping a stream generator on each element of an -- input stream, append the resulting streams and flatten. @@ -787,6 +783,11 @@ head = fold Fold.one head = foldrM (\x _ -> return (Just x)) (return Nothing) #endif +-- XXX This is not worth keeping as it is trivial to express. + +-- | +-- >> headElse def = fmap (fromMaybe def) . head +-- {-# INLINE_NORMAL headElse #-} headElse :: Monad m => a -> Stream m a -> m a headElse a = foldrM (\x _ -> return x) (return a) @@ -1649,61 +1650,6 @@ unfoldCross :: Monad m => Unfold m (a,b) c -> Stream m a -> Stream m b -> Stream m c unfoldCross unf m1 m2 = unfoldEach unf $ crossWith (,) m1 m2 -{-# ANN type AppendUnfoldLastState Fuse #-} -data AppendUnfoldLastState o i b = - AppendUnfoldLastInput o (Maybe b) - | AppendUnfoldLastInject (Maybe b) - | AppendUnfoldLastOutput i - --- | Append to the input stream a stream generated from terminal state --- accumulated while consuming the input stream. --- --- The unfold is seeded with: --- --- * 'Just x' where @x@ is the final element of the stream --- * 'Nothing' if the stream is empty --- --- This is useful when a stream processing phase needs to hand over --- terminal state to another stream generation phase. --- --- For example, one stream may incrementally compute state using scans --- or folds, and the resulting terminal state can then be used to --- generate a follow-up stream using general unfolding primitives. --- --- >>> trailer = Unfold.lmap (maybe [-1] (\x -> [x*10, x*100])) Unfold.fromList --- >>> Stream.toList $ Stream.appendUnfoldLast trailer (Stream.fromList [1,2,3 :: Int]) --- [1,2,3,30,300] --- >>> Stream.toList $ Stream.appendUnfoldLast trailer (Stream.fromList ([] :: [Int])) --- [-1] --- -appendUnfoldLast :: Applicative m - => Unfold m (Maybe b) b - -> Stream m b - -> Stream m b -{-# INLINE_NORMAL appendUnfoldLast #-} -appendUnfoldLast (Unfold ustep inject) (Stream ostep ost) = - Stream step (AppendUnfoldLastInput ost Nothing) - - where - - {-# INLINE_LATE step #-} - step gst (AppendUnfoldLastInput o lst) = - (\r -> case r of - Yield x o1 -> Yield x (AppendUnfoldLastInput o1 (Just x)) - Skip o1 -> Skip (AppendUnfoldLastInput o1 lst) - Stop -> Skip (AppendUnfoldLastInject lst) - ) <$> ostep (adaptState gst) o - - step _ (AppendUnfoldLastInject lst) = - (Skip . AppendUnfoldLastOutput) <$> inject lst - - step _ (AppendUnfoldLastOutput i) = - (\r -> case r of - Yield x i1 -> Yield x (AppendUnfoldLastOutput i1) - Skip i1 -> Skip (AppendUnfoldLastOutput i1) - Stop -> Stop - ) <$> ustep i - ------------------------------------------------------------------------------ -- Combine N Streams - concatMap ------------------------------------------------------------------------------ From 06d1213e27af5666f3bb8371df2de84c567283a6 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Thu, 21 May 2026 18:57:30 +0530 Subject: [PATCH 07/22] Add design notes for the Scan module --- docs/Developer/Data.Scanl.md | 368 +++++++++++++++++++++++++++++++++++ 1 file changed, 368 insertions(+) create mode 100644 docs/Developer/Data.Scanl.md diff --git a/docs/Developer/Data.Scanl.md b/docs/Developer/Data.Scanl.md new file mode 100644 index 0000000000..3600c1d16c --- /dev/null +++ b/docs/Developer/Data.Scanl.md @@ -0,0 +1,368 @@ +## Scans as Moore and Mealy Machines + +foldl and scanl in Streamly are Moore machine style representations: + + foldl :: (b -> a -> b) -> b -> Stream m a -> m b + scanl :: (s -> a -> s) -> s -> Stream m a -> Stream m s + +A Moore machine emits the states of the machine, the output is solely a +function of the state. scanl emits the initial value of the accumulator +followed by one value per input, the next state of the machine. If the input +stream size is n the output stream size is n + 1. + +The Scanl type in streamly is a data representation of strict left +scan. The Fold type is a data representation of strict left fold. + +mapAccum is a Mealy machine style representation: + + mapAccum :: (s -> a -> (s, b)) -> s -> Stream m a -> Stream m b + mapAccumM :: (s -> a -> m (s, b)) -> m s -> Stream m a -> Stream m b + mapAccumEnd :: (s -> a -> (s, b)) -> s -> (s -> b) -> Stream m a -> Stream m b + mapAccumEndM :: (s -> a -> m (s, b)) -> m s -> (s -> m b) -> Stream m a -> Stream m b + +Here, the state and the output are separate, each input generates the next +state and an output. Notice, the step function has the same shape as mapAccumL, +but mapAccumL returns the final accumulator separately while scan emits a final +value into the stream. scan emits values only on inputs, it does not emit an +initial value. However, the extraction function (s -> b) generates an +additional terminal value. While in Moore we have an additional initial value, +in Mealy we have an additional final value. + +`scan` emits one output per input (from the step's b), followed by one +terminal value (from extract). If the input stream size is n the output +stream size is n + 1. + +scan (Mealy) and scanl (Moore) can be interconverted. + +## Moore vs Mealy Machine + +Moore: observes states +Mealy: observes transitions + +Moore naturally exposes an initial observation. +Mealy naturally supports terminal extraction. + +Moore and Mealy both can represent stream transformations as well as folds +equally well. The Mealy formulation is usually more straightforward to think as +an explicit state machine. We have an initial state, each input causes a state +transition and an output. + +The simplest to think about is a Mealy style scan without extraction: + +``` + mapAccum :: (s -> a -> (s, b)) -> s -> Stream m a -> Stream m b +``` + +## Implementation Optimizations + +We can write a variant of `scanl` that does not emit the initial +value (postscan), and similarly a variant of scan that does not emit +the terminal value. Use cases where we do not require +these additional values can benefit from these more efficient simpler +variants. + +In a scanl, the driver must emit the initial value before processing any +input. Since this cannot be folded into the per-input step, the driver +needs a distinct pre-input state to run the initialization effect and +transition into the normal wait-for-input loop. That state remains in +the dispatch path for the lifetime of the stream. + +In postscan, the initial value is not emitted, so this extra state can +usually be eliminated. However, a monadic initial value still requires a +separate initialization state to execute the effect exactly once before +entering the main state machine. + +Here is a snippet from real postscan code: + +``` + step _ (ScanInit st) = do + res <- initial + return + $ case res of + Partial fs -> Skip $ ScanDo st fs + Done b -> Yield b ScanDone + step gst (ScanDo st fs) = do + res <- sstep st + case res of + Yield x s -> do + ... +``` + +However, when the state is pure, GHC fusion optimizations can +potentially eliminate the initial state, if it can happen reliably we do +not need a separate postscan type for this simplification. In several +cases we have actually seen fusion issues or inefficiencies due to this +initial state where it is not required. + +The Mealy side is symmetric at the other end. In a scan without terminal +state extraction and with pure finalizer we do not need a separate +state. However, a monadic finalizer forces a post-input state even +without terminal state extraction. + +A Moore machine without the initial value and a Mealy machine without +the terminal value are the simplest to implement and the most reliably +fused. + +## Scanl Type + +Both Moore and Mealy can represent transformations as well as consumers. +Operations like map and filter where only transitions are important, and +empty stream does not require any special processing are efficiently +expressible by Mealy style without extraction or Moore without an +initial step. On the other hand operations like sum where empty stream +sums to 0 are more convenient in Moore style or Mealy with a final +extraction step. + +Streamly's Scanl type is Moore style, it emits an initial value extracted from +the initial state. The extract function extracts the output from the state, it +can be called after every `Partial` step to get the output. It also has a +finalizer action, which is called only when the scan is terminated externally +rather than completing on its own (via Done). In that case `final` can be used +to perform any resource finalization, it does not return any output, just an +effect, and should be called only once. It supports resource bracketing by +resource allocation in initial state and release in the final. However always +emitting a value in the initial step can make implementation of operations +like map and filter more complicated than they need to be. Separate types can +be used for different use cases which would be less ergonomic but can be +optimized better. + +``` +data Step s b = Partial !s | Done !b + +data Scanl m a b = + -- | @Scanl@ @step@ @initial@ @extract@ @final@ + forall s. Scanl (s -> a -> m (Step s b)) (m (Step s b)) (s -> m b) (s -> m ()) +``` + +We require the general Scanl type for resource bracketing. For fusion +optimizations, a postscan (no initial emission) type may be helpful for the +case when no resource bracketing is required. + +## concatScanM and concatMapAccumM + +The Mealy formulation generalizes to concatMapAccum, without a final state +emission: + + concatMapAccumM :: (s -> a -> m (s, Stream m b)) -> m s -> Stream m a -> Stream m b + +This is essentially a nested state machine, a state machine composed of many +state machines. + +The Moore style formulation of the same would require an extract function, the +extract function is called once per input as the state advances: + + concatScanM :: (s -> a -> m s) -> s -> (s -> Stream m b) -> Stream m a -> Stream m b + +This represents a uni-directional pipe, we can represent it using a data type +UniPipe. + +## Scan Types + +Fold = terminal observation +Scanl = Moore observation stream +mapAccum = Mealy transition stream +concatScan = Moore style nested state machine +concatMapAccum = Mealy style nested state machine + +## Scan Variants + +scanl, prescanl, postscanl: +scanl, provides the initial value of the accumulator and the next value +on each input. postscanl drops the initial value. prescanl drops the +final value. + +scan is similar to postscanl in that it does not (and cannot) emit an +initial value. + +There are strict variants of these like scanl', monadic variants like scanlM', +variants where we use the first input as accumulator like scanl1' . + +## Scanl Module Name + +* Since there is no Foldr, the fold module was named Fold. +* Currently there is an inconsistency because fold module is called `Fold` and + the scan module is called `Scanl`. Either the fold module should be `Foldl` + or the scan module should be `Scan`. +* There is not going to be a Scanr as well so the scan module could have been + named just `Scan`, but it was probably a consistency mistake to not name it + that. + +We could rename it now or live with it. Let's live with it, not a big deal. + +## Where the runners live + +The scan runner for Stream lives in the Stream module, for fold in the Fold +module, for unfold in the Unfold module and so on. If we put the runners in the +scan module then we will need name disambiguation like scanStream, scanFold, +scanScan, scanUnfold etc. + +## Scan Naming in Data.Stream + +Scanl type runners: + + Stream.scanl: running a Scanl type + Stream.postscanl: running a Scanl type dropping the initial value + +The proposal is to change `scanl` to `scan`, to disambiguate it a little more +than single character difference from the constructor "scanl'" in the Scanl +module. Note that this will also require a change in Fold, Scanl, Unfold +modules. And across all variants like scanlMany, scanlMaybe etc. + +We keep `postscanl` as it is for future introduction of a `Postscan` type. If +we introduce that then we will have to use the `postscan` name for that, and +for running Scanl as postscan we can use a `toPostscan` converter for +converting from Scanl to Postscan and then run it using `postscan`. In that +case we can retire the `postscanl` variants. + +Directly running a scan using a step function + Stream.scanl' + Stream.postscanl' + ... + +The proposal is to not expose these and just suggest using the Scanl +constructors, for the following reasons, (1) there is no performance advantage, +(2) we are giving too many options to the user, (3) the names create +confusion with the Scanl runners, (4) if we have these in Data.Stream then why +not in Data.Scanl, Data.Unfold and Data.Fold as well, (5) we have Scanl +constructors with exact same names as the Data.List APIs therefore +discoverability of traditional APIs should not be a problem. + +Scanl constructors: + + Scanl.mkScanl + Scanl.mkScanl1 + ... + +The proposal is to change these names to "scanl'", "scanl1'" etc for the +following reasons, (1) consistency with the Fold module constructors, (2) once +we change the runner `scanl` to `scan`, the `scanl` root is free, so "scanl'" +is no longer in conflict with any operation and can be used as a constructor. + +Another alternative is to change the Fold constructor names adding the prefix +mk, reasons against doing that, (1) wider deprecation change, (2) in the Fold +module there is no "fold/foldl" runner so there is no naming confusion, (3) +these names maintain discoverability of the replacements of traditional fold +functions from Data.List. + +Directly running a mapAccum using a step function + + Stream.mapAccum(M) + Stream.concatMapAccum(M) + +There is no reified type for mapAccum. + +## Postscan Type + +This type does not exist as of now, but there is a possibility that we might +add it for performance reasons. + +Postscan type (separate type) runners and constructors: + + Stream.postscan: running a Postscan type + Postscan.postscanl': constructor using a step function + Scanl.toPostscan: convert Scanl to Postscan and run using postscan + +## Constructor Naming in Data.Scanl + +Note, currently we cannot use the name "scanl" as a constructor +as we have an operation of the same name in the same module for +scanning the input of a scan. So the "mk" prefix makes the distinction. +Also prime is not necessary in constructor names as the scan is always +strict, there is no choice of strict vs lazy. + +However, once we rename that operation from "scanl" to "scan" (as proposed +above), the "scanl" root is no longer used by any operation, so we can drop the +mk prefix and use the exact same names as the Data.List scan operations. That +way it will become consistent with the Fold module and the names will be +discoverable and familiar. + +## State Machines: At a Glance + +* Stream consumers, producers, and transformations can be represented + as state machines. +* A state machine transition is represented by a function. +* iterate (basic producer step): f :: s -> s +* Builder (foldr style) step (consumer or transformation): f :: a -> b -> b +* Accumulator (foldl style) step (consumer or transformation): f :: b -> a -> b +* Moore step: f :: s -> a -> s, state transformation, output implicit in state +* Mealy step: f :: s -> a -> (s, b), state transformation with explicit output + +* Moore style stream producer (iterate) + * iterate can be viewed as a degenerate Moore machine with no external input. + * Moore style producer step (iterate): f :: s -> s + * Moore style stream producer (iterate): f :: (s -> s) -> s -> Stream m s + * Endo represents the transition algebra used by iterate. + +* Moore style stream consumer (iterate) + * When input ~ state, append becomes a degenerate Moore transition. + * Moore style step (append): f :: s -> s -> s + * Moore style transformation: f :: (s -> s -> s) -> s -> Stream m s -> Stream m s + * Moore style consumer (fold): f :: (s -> s -> s) -> s -> Stream m s -> s + * Semigroup can be used to fold by additionally supplying an initial value + * Monoid can be used to fold without supplying an additional initial value (e.g. Sum) + +* Mealy style stream producer (unfold) + * unfold can be viewed as a degenerate Mealy machine with no input. + * Unfold step: f :: s -> (s, b), Mealy with no input + * Unfold: Mealy machine with no input, explicit state and termination + * unfold: f :: (s -> (s, b)) -> s -> Stream m b + * The data representation is Unfold + * A stream can be represented as a Mealy machine with no input, + using existential state and termination. + * The streamly representation is `Stream` type. + +* Moore stream transducer (scan): + * treating `s` as input in iterate style fold is restrictive + * it can only do `Stream m s -> Stream m s` transformation + * so use a separate input + * output derived from state after transition + * scan using the same state and output type: + * f :: (s -> a -> s) -> s -> Stream m a -> Stream m s + * scan using a separate state and output type: + * f :: (s -> a -> s) -> s -> (s -> b) -> Stream m a -> Stream m b + +* Mealy stream transducer (mapAccum): + * treating `s` as output is restrictive + * So split the state and output + * output produced by the transition itself + * mapAccum: Mealy machine with existential state and termination + * Scan step: f :: s -> a -> (s, b) + * Scan without final extract: + * f :: (s -> a -> (s, b)) -> s -> Stream m a -> Stream m b + * Scan with final extract: + * f :: (s -> a -> (s, b)) -> s -> (s -> b) -> Stream m a -> Stream m b + +* Streamly Scanl (conceptual): + * Scan with initial projection and final extraction + * f step initial project final + * f :: (s -> a -> s) -> s -> (s -> b) -> (s -> b) -> Stream m a -> Stream m b + +* Other state machine formulations: + * Moore with s ~ (s, b), split state and output + * Dropping first element and "fmap snd" gives Mealy + * f :: ((s, b) -> a -> (s, b)) -> (s, b) -> Stream m a -> Stream m (s, b) + * But we do not need to pass the output back, only the state + * f :: (s -> a -> (s, b)) -> (s, b) -> Stream m a -> Stream m (s, b) + * If we do not need the final state + * f :: (s -> a -> (s, b)) -> (s, b) -> Stream m a -> Stream m b + * If we do not need the initial output + * f :: (s -> a -> (s, b)) -> s -> Stream m a -> Stream m b + +* State machines composed of state machines (concatMapAccum) + * Moore style: + * f :: (s -> a -> m s) -> s -> (s -> Stream m b) -> Stream m a -> Stream m b + * Mealy style: + * f :: (s -> a -> (s, Stream m b)) -> s -> Stream m a -> Stream m b + +## TODO + +* Formulate the Step type as `Partial s b` so that we do not need the extract + function. We won't have to thread around the `b` anymore as we do not need to + return it in `final`. + +* Formulate the Scanl/Fold type using a lower level Moore/Mealy step, like + Unfold in streams. We can then wrap that into another type to emit the + initial value. That will allow use to reuse the same underlying machinery for + both Postscanl and Scanl. We can name Postscanl as Transduce? + +* Change the final function in Scanl constructor to not return output type. From 484ca1352794d3bbb5054a4490b58ca7e8d0cdb0 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Sun, 24 May 2026 19:54:49 +0530 Subject: [PATCH 08/22] Update the Scanl design doc incorporating the Scan type --- docs/Developer/Data.Scanl.md | 115 +++++++++++++++++++++-------------- 1 file changed, 70 insertions(+), 45 deletions(-) diff --git a/docs/Developer/Data.Scanl.md b/docs/Developer/Data.Scanl.md index 3600c1d16c..4dd68d1f15 100644 --- a/docs/Developer/Data.Scanl.md +++ b/docs/Developer/Data.Scanl.md @@ -177,17 +177,18 @@ initial value. There are strict variants of these like scanl', monadic variants like scanlM', variants where we use the first input as accumulator like scanl1' . -## Scanl Module Name - -* Since there is no Foldr, the fold module was named Fold. -* Currently there is an inconsistency because fold module is called `Fold` and - the scan module is called `Scanl`. Either the fold module should be `Foldl` - or the scan module should be `Scan`. -* There is not going to be a Scanr as well so the scan module could have been - named just `Scan`, but it was probably a consistency mistake to not name it - that. - -We could rename it now or live with it. Let's live with it, not a big deal. +## Scan and Fold Modules + +* The core scan module representing a stateful transformation or state + machine without emitting an initial or terminal value is to be named as + `Scan`. +* The `Scanl` type and module is a further specialization of the `Scan` + type which adds an emission of an initial value to the state machine. +* The Fold type is exactly the same as the `Scanl` type except that it + does not emit any intermediate values. +* Since there is no Foldr, the fold module was named Fold. Ideally it can be + called `Foldl` to mirror the `Scanl` naming. But we are not going to change + it now. ## Where the runners live @@ -202,17 +203,17 @@ Scanl type runners: Stream.scanl: running a Scanl type Stream.postscanl: running a Scanl type dropping the initial value - -The proposal is to change `scanl` to `scan`, to disambiguate it a little more -than single character difference from the constructor "scanl'" in the Scanl -module. Note that this will also require a change in Fold, Scanl, Unfold -modules. And across all variants like scanlMany, scanlMaybe etc. - -We keep `postscanl` as it is for future introduction of a `Postscan` type. If -we introduce that then we will have to use the `postscan` name for that, and -for running Scanl as postscan we can use a `toPostscan` converter for -converting from Scanl to Postscan and then run it using `postscan`. In that -case we can retire the `postscanl` variants. + (equivalent to Scanl.toScan followed by Stream.scan) + +Note these are the Stream runners and remain unchanged. Separately, in the +Scanl module the combinator to scan the input of a scan (the operation +currently named `scanl`, not the runner) is to be renamed to `compose` as it +composes two scans together to create a single scan by feeding the input from +one to the other, it chains them. This is similar to how we compose the `Scan` +type as well as the `Pipe` type. It makes sense to have a special name for this +operation rather than using `scanl`. The restart-on-termination variant +`scanlMany` is renamed to `composeMany` accordingly, so that the `scanl` root +is fully freed. And `scanlMaybe` to `composeMaybe`. Directly running a scan using a step function Stream.scanl' @@ -227,6 +228,25 @@ not in Data.Scanl, Data.Unfold and Data.Fold as well, (5) we have Scanl constructors with exact same names as the Data.List APIs therefore discoverability of traditional APIs should not be a problem. +Directly running a mapAccum using a step function + + Stream.mapAccum(M) + +The reified type for mapAccum is the `Scan` type. Similar to the above +operations we should not expose these direct operations, rather we should use +the `Scan` type to run these using the `scan` runner. The `Scan` type can have +constructors named `mapAccum` and variants of that, also constructors named +`postscanl'` and variants make sense for the `Scan` type. + +Directly running a concatMapAccum using a step function + + Stream.concatMapAccum(M) + +concatMapAccum is not 1-to-1 (each input may emit many outputs), so it does not +fit the `Scan` type; its natural reified form is perhaps the `Pipe` type. If we +do not have a corresponding type we can have this operation exposed from +Data.Stream. + Scanl constructors: Scanl.mkScanl @@ -235,32 +255,37 @@ Scanl constructors: The proposal is to change these names to "scanl'", "scanl1'" etc for the following reasons, (1) consistency with the Fold module constructors, (2) once -we change the runner `scanl` to `scan`, the `scanl` root is free, so "scanl'" -is no longer in conflict with any operation and can be used as a constructor. +we rename the `scanl` composition operation to `compose`, the `scanl` root is +free, so "scanl'" is no longer in conflict with any operation and can be used +as a constructor. Another alternative is to change the Fold constructor names adding the prefix -mk, reasons against doing that, (1) wider deprecation change, (2) in the Fold -module there is no "fold/foldl" runner so there is no naming confusion, (3) -these names maintain discoverability of the replacements of traditional fold -functions from Data.List. +mk, reasons against doing that, (1) wider deprecation change, (2) the change is +not required otherwise because in the Fold module there is no "fold/foldl" +runner so there is no naming confusion, (3) these names maintain +discoverability of the replacements of traditional fold functions from +Data.List. -Directly running a mapAccum using a step function - - Stream.mapAccum(M) - Stream.concatMapAccum(M) +## Scan Type -There is no reified type for mapAccum. +The `Scan` type is for representing a stateful transformation or state +machine without emitting an initial or terminal value. This can be thought of +as the mapAccum type reified. -## Postscan Type +Scan type runners: -This type does not exist as of now, but there is a possibility that we might -add it for performance reasons. + Stream.scan: running a Scan type -Postscan type (separate type) runners and constructors: +Scan type constructors: + Scan.postscanl': constructor using Moore style step function + Scan.postscanlM': monadic version of postscanl' + Scan.mapAccum: constructor using Mealy style step function + Scan.mapAccumM: monadic version of mapAccum - Stream.postscan: running a Postscan type - Postscan.postscanl': constructor using a step function - Scanl.toPostscan: convert Scanl to Postscan and run using postscan +Scan type adapters: + Scanl.toScan: convert Scanl to Scan + Scanl.fromScan: convert Scan to Scanl + Fold.fromScan: convert Scan to Fold ## Constructor Naming in Data.Scanl @@ -270,11 +295,11 @@ scanning the input of a scan. So the "mk" prefix makes the distinction. Also prime is not necessary in constructor names as the scan is always strict, there is no choice of strict vs lazy. -However, once we rename that operation from "scanl" to "scan" (as proposed -above), the "scanl" root is no longer used by any operation, so we can drop the -mk prefix and use the exact same names as the Data.List scan operations. That -way it will become consistent with the Fold module and the names will be -discoverable and familiar. +However, once we rename that composition operation from "scanl" to "compose" +(as proposed above), the "scanl" root is no longer used by any operation, so we +can drop the mk prefix and use the exact same names as the Data.List scan +operations. That way it will become consistent with the Fold module and the +names will be discoverable and familiar. ## State Machines: At a Glance From 18b119422f39e2f2e1872d58df9a32d5bbd79f80 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Mon, 25 May 2026 01:46:22 +0530 Subject: [PATCH 09/22] Add more details for different types in Scanl design doc --- docs/Developer/Data.Scanl.md | 232 +++++++++++++++++++++++++---------- 1 file changed, 169 insertions(+), 63 deletions(-) diff --git a/docs/Developer/Data.Scanl.md b/docs/Developer/Data.Scanl.md index 4dd68d1f15..ce3171328f 100644 --- a/docs/Developer/Data.Scanl.md +++ b/docs/Developer/Data.Scanl.md @@ -1,9 +1,91 @@ -## Scans as Moore and Mealy Machines +## Moore vs Mealy Machine -foldl and scanl in Streamly are Moore machine style representations: +Moore: observes states +Mealy: observes transitions + +Moore naturally exposes an initial observation. +Mealy naturally supports terminal extraction. - foldl :: (b -> a -> b) -> b -> Stream m a -> m b - scanl :: (s -> a -> s) -> s -> Stream m a -> Stream m s +Moore and Mealy both can represent stream transformations as well as folds +equally well. The Mealy formulation is usually more straightforward to think as +an explicit state machine. We have an initial state, each input causes a state +transition and an output. + +Producer: A state machine that produces output without any input +Transducer: A state machine that produces output on input + +## Unfold and Stream (Simplest Producer) + +A state machine with no input: +``` +unfold :: (s -> m (s, b)) -> m s -> Stream m b +``` + +Reification of the above unfold function as a data type with termination +support and open injectable state. +``` +data Step s b = Yield b s | Stop +data Unfold m a b = forall s. Unfold (s -> m (Step s b)) (a -> m s) +``` + +For filtering we can use `Maybe b` as the output type of the machine or +equivalently we can use an explicit `Skip` constructor: +``` +data Step s b = Yield b s | Skip s | Stop +data Unfold m a b = forall s. Unfold (s -> m (Step s b)) (a -> m s) +``` + +Stream is an unfold with a closed state. +``` +data Stream m a = forall s. Stream (s -> m (Step s a)) s +``` + +In a producer machine, the driver keeps cranking the machine until the machine +stops, the machines decides how many items to produce and when to stop. The +machine also controls whether to produce an output or just pass on each crank. + +## Rescan and Scan (Simplest Transducer) + +The simplest transducer is a Mealy style scan without a terminal state +emission or we can say Moore without an initial state emission: +``` +transduce :: (s -> a -> m (s, b)) -> m s -> Stream m a -> Stream m b +``` + +Reification of the above transduce function as a data type with termination +support and open injectable state. +``` +data Step s b = Yield b s | Skip s | Stop +data Rescan m c a b = forall s. Rescan (s -> a -> m (Step s b)) (c -> m s) +``` + +Equivalently instead of using the Skip constructor we can use a `Maybe b` type +instead if we want to filter outputs (consumer mode). And we can use `Maybe a` +type if we want to allow the machine to be cranked without any input (producer +mode). + +Scan is a Rescan with a closed state: +``` +data Scan m a b = forall s. Scan (s -> a -> m (Step s b)) s +``` + +In a scan the driver cranks the machine and decides whether to supply an input +or crank without one. On each crank the machine decides whether to produce an +output or pass without one. So the driver controls the input side and the +machine controls the output side, but neither can seize control of the cranking +rhythm: the machine cannot enter a producer phase where it forces the driver to +keep cranking until the machine signals it is ready to accept input again. +(That producer phase is what distinguishes a Pipe.) + +## Scanl (Moore and Mealy Machines) + +### Moore Style Scan + +foldl and scanl in Streamly are Moore machine style representations: +``` +foldl :: (b -> a -> b) -> b -> Stream m a -> m b +scanl :: (s -> a -> s) -> s -> Stream m a -> Stream m s +``` A Moore machine emits the states of the machine, the output is solely a function of the state. scanl emits the initial value of the accumulator @@ -13,53 +95,37 @@ stream size is n the output stream size is n + 1. The Scanl type in streamly is a data representation of strict left scan. The Fold type is a data representation of strict left fold. -mapAccum is a Mealy machine style representation: +### Mealy Style mapAccum - mapAccum :: (s -> a -> (s, b)) -> s -> Stream m a -> Stream m b - mapAccumM :: (s -> a -> m (s, b)) -> m s -> Stream m a -> Stream m b - mapAccumEnd :: (s -> a -> (s, b)) -> s -> (s -> b) -> Stream m a -> Stream m b - mapAccumEndM :: (s -> a -> m (s, b)) -> m s -> (s -> m b) -> Stream m a -> Stream m b +mapAccum is a Mealy machine style representation: +``` +mapAccum :: (s -> a -> (s, b)) -> s -> Stream m a -> Stream m b +mapAccumM :: (s -> a -> m (s, b)) -> m s -> Stream m a -> Stream m b +mapAccumEnd :: (s -> a -> (s, b)) -> s -> (s -> b) -> Stream m a -> Stream m b +mapAccumEndM :: (s -> a -> m (s, b)) -> m s -> (s -> m b) -> Stream m a -> Stream m b +``` Here, the state and the output are separate, each input generates the next state and an output. Notice, the step function has the same shape as mapAccumL, -but mapAccumL returns the final accumulator separately while scan emits a final -value into the stream. scan emits values only on inputs, it does not emit an -initial value. However, the extraction function (s -> b) generates an -additional terminal value. While in Moore we have an additional initial value, -in Mealy we have an additional final value. +but mapAccumL returns the final accumulator separately while the mapAccumEnd +implementation above emits a final value into the stream. mapAccum emits values +only on inputs, it does not emit an initial value. However, the extraction +function (s -> b) in mapAccumEnd generates an additional terminal value. While +in Moore we have an additional initial value, in Mealy we have an additional +final value. -`scan` emits one output per input (from the step's b), followed by one +`mapAccumEnd` emits one output per input (from the step's b), followed by one terminal value (from extract). If the input stream size is n the output stream size is n + 1. -scan (Mealy) and scanl (Moore) can be interconverted. - -## Moore vs Mealy Machine - -Moore: observes states -Mealy: observes transitions - -Moore naturally exposes an initial observation. -Mealy naturally supports terminal extraction. - -Moore and Mealy both can represent stream transformations as well as folds -equally well. The Mealy formulation is usually more straightforward to think as -an explicit state machine. We have an initial state, each input causes a state -transition and an output. - -The simplest to think about is a Mealy style scan without extraction: - -``` - mapAccum :: (s -> a -> (s, b)) -> s -> Stream m a -> Stream m b -``` +mapAccumEnd (Mealy) and scanl (Moore) can be interconverted. -## Implementation Optimizations +### Implementation Optimizations (Scan Type) -We can write a variant of `scanl` that does not emit the initial -value (postscan), and similarly a variant of scan that does not emit -the terminal value. Use cases where we do not require -these additional values can benefit from these more efficient simpler -variants. +We can write a variant of `scanl` that does not emit the initial value +(postscan), and similarly a variant of scan that does not emit the terminal +value. Use cases where we do not require these additional values can benefit +from these more efficient simpler variants. In a scanl, the driver must emit the initial value before processing any input. Since this cannot be folded into the per-input step, the driver @@ -101,9 +167,9 @@ without terminal state extraction. A Moore machine without the initial value and a Mealy machine without the terminal value are the simplest to implement and the most reliably -fused. +fused. This is basically the `Scan` type we discussed earlier. -## Scanl Type +### Scanl Type Both Moore and Mealy can represent transformations as well as consumers. Operations like map and filter where only transitions are important, and @@ -138,7 +204,7 @@ We require the general Scanl type for resource bracketing. For fusion optimizations, a postscan (no initial emission) type may be helpful for the case when no resource bracketing is required. -## concatScanM and concatMapAccumM +## concatScanlM and concatMapAccumM The Mealy formulation generalizes to concatMapAccum, without a final state emission: @@ -151,18 +217,35 @@ state machines. The Moore style formulation of the same would require an extract function, the extract function is called once per input as the state advances: - concatScanM :: (s -> a -> m s) -> s -> (s -> Stream m b) -> Stream m a -> Stream m b + concatScanlM :: (s -> a -> m s) -> s -> (s -> Stream m b) -> Stream m a -> Stream m b This represents a uni-directional pipe, we can represent it using a data type UniPipe. -## Scan Types +## Streaming Types + +Unfold: injectible state stream generation +Stream: opaque state stream + +Rescan: injectible state transducer +Scan: opaque state transducer + +Pipe: Combination of Rescan and Unfold -Fold = terminal observation -Scanl = Moore observation stream -mapAccum = Mealy transition stream -concatScan = Moore style nested state machine -concatMapAccum = Mealy style nested state machine +unfoldAccum/concatMapAccum is a restricted Pipe: it has the fixed +rhythm of consuming one input and unfolding/concatenating its +sub-stream, whereas a Pipe can interleave consumption and production +arbitrarily. (Note: a concatMapAccum whose sub-streams are opaque +Streams cannot be expressed in the fused Pipe representation, because +the sub-stream's existential state cannot be merged; the fused form +corresponds to sub-streams produced by Unfold.) + +Rescanl: injectible state Moore machine (with initial state) +Scanl: opaque state Moore machine + +Refold: Rescanl with only the final output +Fold: Scanl with only the final output +Parser: A Fold with error and backtracking ## Scan Variants @@ -240,12 +323,15 @@ constructors named `mapAccum` and variants of that, also constructors named Directly running a concatMapAccum using a step function + Stream.unfoldAccum(M) Stream.concatMapAccum(M) -concatMapAccum is not 1-to-1 (each input may emit many outputs), so it does not -fit the `Scan` type; its natural reified form is perhaps the `Pipe` type. If we -do not have a corresponding type we can have this operation exposed from -Data.Stream. +These are stateful 1-to-n transformations. unfoldAccum is the fused version and +concatMapAccum is the unfused version because of the existential type in the +Stream. The reified type that can represent unfoldAccum is the `Pipe` type. +However, the Pipe type may be more general because it can do n-to-1 +transformations and 1-to-n as well i.e. it can fold and produce. There is no +reified type corresponding to concatMapAccum. Scanl constructors: @@ -339,8 +425,8 @@ names will be discoverable and familiar. * Moore stream transducer (scan): * treating `s` as input in iterate style fold is restrictive * it can only do `Stream m s -> Stream m s` transformation - * so use a separate input - * output derived from state after transition + * so separate the input from state + * output is derived from state after transition * scan using the same state and output type: * f :: (s -> a -> s) -> s -> Stream m a -> Stream m s * scan using a separate state and output type: @@ -349,7 +435,7 @@ names will be discoverable and familiar. * Mealy stream transducer (mapAccum): * treating `s` as output is restrictive * So split the state and output - * output produced by the transition itself + * output produced by the transition itself * mapAccum: Mealy machine with existential state and termination * Scan step: f :: s -> a -> (s, b) * Scan without final extract: @@ -381,13 +467,33 @@ names will be discoverable and familiar. ## TODO +* Formulate the Scanl/Fold type using the Scan type. + * Formulate the Step type as `Partial s b` so that we do not need the extract function. We won't have to thread around the `b` anymore as we do not need to return it in `final`. -* Formulate the Scanl/Fold type using a lower level Moore/Mealy step, like - Unfold in streams. We can then wrap that into another type to emit the - initial value. That will allow use to reuse the same underlying machinery for - both Postscanl and Scanl. We can name Postscanl as Transduce? - * Change the final function in Scanl constructor to not return output type. + This makes sense in folds but in scans a single element generation at the end + is half-hearted solution. Either we should have the ability to generate a + stream in the beginning as well as in the end or nothing. The initial as well + as final operations should be of Step type. In addition, we should be able to + specify these as unfolds which we can internally adapt to the Step type. + This will make all the drivers complicated though especially the combinators. + Maybe we can just return the singleton residual state as the last + element and then externally unfold the output of the scan into a + stream. + + For supporting something like scanlMAfter' we may need to emit the final state + during extract. We may have to return `Maybe b` in extract or `Stream m b`. + The `final` function has to work on the state such that it cannot return the + last emitted value, it will always return the values after the last + emitted value that may have to be drained. + +* scanlMAfter' can be implemented by something like modifyLast after generating + the final state value in a Moore machine. We can also generate a stream in + the end using unfoldLast. + +* Monadic inject in Unfold forces a separate init state when converting to a + stream. We can remove the monadic inject or create a pure type or ensure that + the additional state fuses in the most efficient way. From 86e0aa8683dd034f088dbca8c0c958d24b8218f8 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Mon, 25 May 2026 01:56:25 +0530 Subject: [PATCH 10/22] Add a note about pure vs monadic unfold --- core/src/Streamly/Internal/Data/Stream/Type.hs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/src/Streamly/Internal/Data/Stream/Type.hs b/core/src/Streamly/Internal/Data/Stream/Type.hs index e0a18cb51b..a43b33ca47 100644 --- a/core/src/Streamly/Internal/Data/Stream/Type.hs +++ b/core/src/Streamly/Internal/Data/Stream/Type.hs @@ -343,6 +343,11 @@ uncons (UnStream step state) = go SPEC state data UnfoldState s = UnfoldNothing | UnfoldJust s +-- XXX Because the inject function is monadic we need a separate state for +-- inject. If we had a pure Unfold type then conversion to stream is trivial. +-- We can possibly have Unfold and UnfoldM or Unfold_ (pure). Which use cases +-- require the monadic inject? + -- | Convert an 'Unfold' into a stream by supplying it an input seed. -- -- >>> s = Stream.unfold Unfold.replicateM (3, putStrLn "hello") From 6c1d4465fe667e8e5d8ce4b8ef6541efa4e733f2 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Sun, 24 May 2026 18:31:17 +0530 Subject: [PATCH 11/22] Add mapAccumM and reorg the exports in Stream.Transform --- .../Internal/Data/Stream/Transform.hs | 94 ++++++++++++++----- 1 file changed, 72 insertions(+), 22 deletions(-) diff --git a/core/src/Streamly/Internal/Data/Stream/Transform.hs b/core/src/Streamly/Internal/Data/Stream/Transform.hs index dd13fbad0b..e621b1c96e 100644 --- a/core/src/Streamly/Internal/Data/Stream/Transform.hs +++ b/core/src/Streamly/Internal/Data/Stream/Transform.hs @@ -28,49 +28,60 @@ module Streamly.Internal.Data.Stream.Transform , foldlS -- * Composable Scans - , postscanl , scanl + , postscanl , scanlMany , scanr , pipe - -- * Splitting - - -- NOTE: splitEndBy can be written as an idiom via folds: - -- > splitEndBy p f = Stream.foldMany (Fold.takeEndBy p f) - -- > splitEndBy_ p f = Stream.foldMany (Fold.takeEndBy_ p f) - -- Unlike the Sequence style splitters there is no splitSepByAny here - -- because the default splitSepBy itself is in fact same as splitSepByAny. - - , splitSepBy_ - -- splitSepBy -- keeps the delimiters - - -- * Ad-hoc Scans - -- | Left scans. Stateful, mostly one-to-one maps. + -- * Traditional Left Scans + -- | Stateful, mostly one-to-one maps. + -- ** Strict Scans , scanlM' - , scanlMAfter' , scanl' - , scanlM - , scanlBy , scanl1M' , scanl1' - , scanl1M - , scanl1 + -- Prescans , prescanl' , prescanlM' - , postscanlBy - , postscanlM + -- Postscans , postscanl' , postscanlM' - , postscanlMAfter' + , mapAccumM + -- ** Lazy Scans + , scanlBy -- same as traditional "scanl" + , scanlM + , scanl1M + , scanl1 + + , postscanlBy -- same as traditional lazy "postscanl" + , postscanlM + + -- *** with extraction (for foldl compatibility) , postscanlx' , postscanlMx' , scanlMx' , scanlx' + -- *** Specialized scans + -- transform the terminal state and append a single element + , scanlMAfter' + , postscanlMAfter' + + -- * Splitting + + -- NOTE: splitEndBy can be written as an idiom via folds: + -- > splitEndBy p f = Stream.foldMany (Fold.takeEndBy p f) + -- > splitEndBy_ p f = Stream.foldMany (Fold.takeEndBy_ p f) + -- Unlike the Sequence style splitters there is no splitSepByAny here + -- because the default splitSepBy itself is in fact same as splitSepByAny. + + , splitSepBy_ + -- splitSepBy -- keeps the delimiters + -- * Filtering -- delete is for once like insert, filter is for many like intersperse. @@ -784,8 +795,23 @@ scanlx' fstep begin done = ------------------------------------------------------------------------------ -- Adapted from the vector package. + +-- | postscan and mapAccum (without terminal extraction) are identical in +-- behavior and implementable in terms of each other. mapAccum is cleaner to +-- have because postscan implementation in terms of it is clean whereas the +-- vice-versa requires an undefined. +-- {-# INLINE_NORMAL postscanlM' #-} postscanlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b +{- +postscanlM' step = mapAccumM step1 + + where + + step1 s a = do + b <- step s a + pure (b, b) +-} postscanlM' fstep begin (Stream step state) = Stream step' Nothing where @@ -1033,6 +1059,30 @@ scanl1M' fstep (Stream step state) = Stream step' (state, Nothing) scanl1' :: Monad m => (a -> a -> a) -> Stream m a -> Stream m a scanl1' f = scanl1M' (\x y -> return (f x y)) +------------------------------------------------------------------------------- +-- Mealy style scans +------------------------------------------------------------------------------- + +-- XXX implement this directly instead of using postscanlM' + +-- | postscan and mapAccum (without terminal extraction) are identical in +-- behavior and implementable in terms of each other. mapAccum is cleaner to +-- have because postscan implementation in terms of it is clean whereas the +-- vice-versa requires an undefined. +-- +{-# INLINE mapAccumM #-} +mapAccumM :: Monad m => + (s -> a -> m (s, b)) + -> m s + -> Stream m a + -> Stream m b +mapAccumM step initial stream = + let r = postscanlM' + (\(s, _) a -> step s a) + (fmap (,undefined) initial) + stream + in fmap snd r + ------------------------------------------------------------------------------- -- Filtering ------------------------------------------------------------------------------- From 8e22ae74d2ecb78705c54f475a8ca5edb7f9f9f1 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Mon, 25 May 2026 12:40:32 +0530 Subject: [PATCH 12/22] Add an implementation of mapAccumM --- .../Internal/Data/Stream/Transform.hs | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/core/src/Streamly/Internal/Data/Stream/Transform.hs b/core/src/Streamly/Internal/Data/Stream/Transform.hs index e621b1c96e..837b06bb0f 100644 --- a/core/src/Streamly/Internal/Data/Stream/Transform.hs +++ b/core/src/Streamly/Internal/Data/Stream/Transform.hs @@ -1063,25 +1063,43 @@ scanl1' f = scanl1M' (\x y -> return (f x y)) -- Mealy style scans ------------------------------------------------------------------------------- --- XXX implement this directly instead of using postscanlM' - -- | postscan and mapAccum (without terminal extraction) are identical in -- behavior and implementable in terms of each other. mapAccum is cleaner to -- have because postscan implementation in terms of it is clean whereas the -- vice-versa requires an undefined. -- -{-# INLINE mapAccumM #-} +{-# INLINE_NORMAL mapAccumM #-} mapAccumM :: Monad m => (s -> a -> m (s, b)) -> m s -> Stream m a -> Stream m b +{- +-- It can be implemented in terms of 'postscanlM'' but +-- requires an @undefined@ for the initial output. mapAccumM step initial stream = let r = postscanlM' (\(s, _) a -> step s a) (fmap (,undefined) initial) stream in fmap snd r + -} +mapAccumM fstep begin (Stream step state) = + Stream step' Nothing + where + {-# INLINE_LATE step' #-} + step' _ Nothing = do + !x <- begin + return $ Skip (Just (state, x)) + + step' gst (Just (st, acc)) = do + r <- step (adaptState gst) st + case r of + Yield x s -> do + (!acc1, !y) <- fstep acc x + return $ Yield y (Just (s, acc1)) + Skip s -> return $ Skip (Just (s, acc)) + Stop -> return Stop ------------------------------------------------------------------------------- -- Filtering From 75718023d6f7940e5d72f0ed044ea0614b247d85 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Mon, 25 May 2026 12:41:32 +0530 Subject: [PATCH 13/22] Add tests for mapAccumM --- test/Streamly/Test/Data/Stream/Serial/Common.hs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/test/Streamly/Test/Data/Stream/Serial/Common.hs b/test/Streamly/Test/Data/Stream/Serial/Common.hs index 9e5e80d071..790058391b 100644 --- a/test/Streamly/Test/Data/Stream/Serial/Common.hs +++ b/test/Streamly/Test/Data/Stream/Serial/Common.hs @@ -992,6 +992,13 @@ transformCombineOpsCommon constr desc eq = do (S.scanlM' (\_ a -> return a) (return 0)) prop (desc <> " postscanlM'") $ transform (tail . scanl' (const id) 0) (S.postscanlM' (\_ a -> return a) (return 0)) + prop (desc <> " mapAccumM running sum") $ + transform (tail . scanl' (+) 0) + (S.mapAccumM (\s a -> let s1 = s + a in return (s1, s1)) + (return 0)) + prop (desc <> " mapAccumM identity (== postscanlM')") $ + transform (tail . scanl' (const id) 0) + (S.mapAccumM (\_ a -> return (a, a)) (return 0)) prop (desc <> " scanl1'") $ transform (scanl1 (const id)) (S.scanl1' (const id)) prop (desc <> " scanl1M'") $ transform (scanl1 (const id)) From 8e51b4c7d67399bace59401b0eb09efb669eb04e Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Mon, 25 May 2026 12:45:56 +0530 Subject: [PATCH 14/22] Implement modifyLast scan --- .../Internal/Data/Stream/Transform.hs | 38 ++++++++++++++++--- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/core/src/Streamly/Internal/Data/Stream/Transform.hs b/core/src/Streamly/Internal/Data/Stream/Transform.hs index 837b06bb0f..7512b7dd39 100644 --- a/core/src/Streamly/Internal/Data/Stream/Transform.hs +++ b/core/src/Streamly/Internal/Data/Stream/Transform.hs @@ -2096,20 +2096,48 @@ rollingMap2 f = catMaybes . rollingMap g -- Selective Map ------------------------------------------------------------------------------ +{-# ANN type ModifyLastState Fuse #-} +data ModifyLastState s a = + ModifyLastInit s + | ModifyLastBuf s a + | ModifyLastDone + -- | Modify the final element of a stream. -- -- The supplied function is applied only to the terminal element of -- the stream. All preceding elements are emitted unchanged. -- --- > modifyLast f [a,b,c] == [a,b,f c] +-- >>> Stream.toList $ Stream.modifyLast negate $ Stream.fromList [1,2,3] +-- [1,2,-3] -- -- If the stream is empty, the result is empty. -- --- /Unimplemented/ +-- Space: @O(1)@ +-- +-- /Pre-release/ -- -{-# INLINE modifyLast #-} -modifyLast :: (a -> a) -> Stream m a -> Stream m a -modifyLast = undefined +{-# INLINE_NORMAL modifyLast #-} +modifyLast :: Monad m => (a -> a) -> Stream m a -> Stream m a +modifyLast f (Stream step1 state1) = Stream step (ModifyLastInit state1) + + where + + {-# INLINE_LATE step #-} + step gst (ModifyLastInit s1) = do + r <- step1 (adaptState gst) s1 + return $ case r of + Yield x s2 -> Skip (ModifyLastBuf s2 x) + Skip s2 -> Skip (ModifyLastInit s2) + Stop -> Stop + + step gst (ModifyLastBuf s1 a) = do + r <- step1 (adaptState gst) s1 + return $ case r of + Yield x s2 -> Yield a (ModifyLastBuf s2 x) + Skip s2 -> Skip (ModifyLastBuf s2 a) + Stop -> Yield (f a) ModifyLastDone + + step _ ModifyLastDone = return Stop ------------------------------------------------------------------------------ -- Maybe Streams From 03e025ec17db697ec2fc711081f27ec5122ef1b4 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Mon, 25 May 2026 12:48:00 +0530 Subject: [PATCH 15/22] Add a test case for modifyLast --- test/Streamly/Test/Data/Stream/Serial/Common.hs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/test/Streamly/Test/Data/Stream/Serial/Common.hs b/test/Streamly/Test/Data/Stream/Serial/Common.hs index 790058391b..06f7585cbf 100644 --- a/test/Streamly/Test/Data/Stream/Serial/Common.hs +++ b/test/Streamly/Test/Data/Stream/Serial/Common.hs @@ -999,6 +999,12 @@ transformCombineOpsCommon constr desc eq = do prop (desc <> " mapAccumM identity (== postscanlM')") $ transform (tail . scanl' (const id) 0) (S.mapAccumM (\_ a -> return (a, a)) (return 0)) + let modifyLastList _ [] = [] + modifyLastList f xs = init xs ++ [f (last xs)] + prop (desc <> " modifyLast negate") $ + transform (modifyLastList negate) (S.modifyLast negate) + prop (desc <> " modifyLast id") $ + transform (modifyLastList id) (S.modifyLast id) prop (desc <> " scanl1'") $ transform (scanl1 (const id)) (S.scanl1' (const id)) prop (desc <> " scanl1M'") $ transform (scanl1 (const id)) From 0cb16f42b5323f289b892fa01901bc6b8b06dd34 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Mon, 25 May 2026 12:58:26 +0530 Subject: [PATCH 16/22] Implement appendMapLast --- .../Streamly/Internal/Data/Stream/Nesting.hs | 40 +++++++++++++++++-- 1 file changed, 36 insertions(+), 4 deletions(-) diff --git a/core/src/Streamly/Internal/Data/Stream/Nesting.hs b/core/src/Streamly/Internal/Data/Stream/Nesting.hs index b20972fccc..b0e3d75542 100644 --- a/core/src/Streamly/Internal/Data/Stream/Nesting.hs +++ b/core/src/Streamly/Internal/Data/Stream/Nesting.hs @@ -271,12 +271,44 @@ appendUnfoldLast (Unfold ustep inject) (Stream ostep ost) = Stop -> Stop ) <$> ustep i --- | --- /Unimplemented/ +-- | Append to the input stream a stream generated from terminal state +-- accumulated while consuming the input stream. +-- +-- The function is invoked with: +-- +-- * 'Just x' where @x@ is the final element of the stream +-- * 'Nothing' if the stream is empty +-- +-- This is the 'concatMap' analogue of 'appendUnfoldLast'. Prefer +-- 'appendUnfoldLast' in performance critical code as it fuses better. +-- +-- >>> trailer = maybe (Stream.fromList [-1]) (\x -> Stream.fromList [x*10, x*100]) +-- >>> Stream.toList $ Stream.appendMapLast trailer (Stream.fromList [1,2,3 :: Int]) +-- [1,2,3,30,300] +-- >>> Stream.toList $ Stream.appendMapLast trailer (Stream.fromList ([] :: [Int])) +-- [-1] -- {-# INLINE_NORMAL appendMapLast #-} -appendMapLast :: (a -> Stream m a) -> Stream m b -> Stream m b -appendMapLast = undefined +appendMapLast :: Applicative m + => (Maybe b -> Stream m b) -> Stream m b -> Stream m b +appendMapLast f (Stream ostep ost) = Stream step (Left (ost, Nothing)) + + where + + {-# INLINE_LATE step #-} + step gst (Left (o, lst)) = + (\r -> case r of + Yield x o1 -> Yield x (Left (o1, Just x)) + Skip o1 -> Skip (Left (o1, lst)) + Stop -> Skip (Right (f lst)) + ) <$> ostep (adaptState gst) o + + step gst (Right (UnStream istep ist)) = + (\r -> case r of + Yield x i1 -> Yield x (Right (Stream istep i1)) + Skip i1 -> Skip (Right (Stream istep i1)) + Stop -> Stop + ) <$> istep gst ist ------------------------------------------------------------------------------ -- Interleaving From ac2d1088bf17b5d2d2985d4de7f384a28e57ee68 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Mon, 25 May 2026 12:59:32 +0530 Subject: [PATCH 17/22] Add a test case for appendMapLast --- test/Streamly/Test/Data/Stream/Type.hs | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/test/Streamly/Test/Data/Stream/Type.hs b/test/Streamly/Test/Data/Stream/Type.hs index 6c180174d7..4213746c20 100644 --- a/test/Streamly/Test/Data/Stream/Type.hs +++ b/test/Streamly/Test/Data/Stream/Type.hs @@ -59,6 +59,28 @@ testAppendUnfoldLastEmpty = trailer = Unfold.lmap (maybe [-1] (\x -> [x * 10, x * 100])) Unfold.fromList +testAppendMapLastNonEmpty :: Expectation +testAppendMapLastNonEmpty = + Stream.toList + (Stream.appendMapLast trailer (Stream.fromList [1, 2, 3 :: Int])) + `shouldReturn` [1, 2, 3, 30, 300] + + where + + trailer = + maybe (Stream.fromList [-1]) (\x -> Stream.fromList [x * 10, x * 100]) + +testAppendMapLastEmpty :: Expectation +testAppendMapLastEmpty = + Stream.toList + (Stream.appendMapLast trailer (Stream.fromList ([] :: [Int]))) + `shouldReturn` [-1] + + where + + trailer = + maybe (Stream.fromList [-1]) (\x -> Stream.fromList [x * 10, x * 100]) + moduleName :: String moduleName = "Data.Stream" @@ -179,3 +201,7 @@ main = hspec describe "Tests for Stream.appendUnfoldLast" $ do prop "testAppendUnfoldLastNonEmpty" testAppendUnfoldLastNonEmpty prop "testAppendUnfoldLastEmpty" testAppendUnfoldLastEmpty + + describe "Tests for Stream.appendMapLast" $ do + prop "testAppendMapLastNonEmpty" testAppendMapLastNonEmpty + prop "testAppendMapLastEmpty" testAppendMapLastEmpty From 144b13237a09fb605c88263df1903742b8e252e8 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Mon, 25 May 2026 13:10:23 +0530 Subject: [PATCH 18/22] Implement unfoldLast and concatMapLast --- .../Streamly/Internal/Data/Stream/Nesting.hs | 92 +++++++++++++++---- test/Streamly/Test/Data/Stream/Type.hs | 50 ++++++++++ 2 files changed, 126 insertions(+), 16 deletions(-) diff --git a/core/src/Streamly/Internal/Data/Stream/Nesting.hs b/core/src/Streamly/Internal/Data/Stream/Nesting.hs index b0e3d75542..ae260b7f5c 100644 --- a/core/src/Streamly/Internal/Data/Stream/Nesting.hs +++ b/core/src/Streamly/Internal/Data/Stream/Nesting.hs @@ -87,6 +87,7 @@ module Streamly.Internal.Data.Stream.Nesting -- *** Stateful unfolds -- unfoldFirst + , UnfoldLastState(..) , unfoldLast -- *** unfoldEach @@ -795,23 +796,57 @@ mergeFstBy _f _m1 _m2 = undefined -- Selective unfold ------------------------------------------------------------------------------ +{-# ANN type UnfoldLastState Fuse #-} +data UnfoldLastState o i a = + UnfoldLastInput o (Maybe a) + | UnfoldLastInject (Maybe a) + | UnfoldLastOutput i + -- | Replace the final element of a stream by unfolding a stream from it. -- -- The final element is removed from the output stream and used as the -- seed to generate a replacement stream using the supplied 'Unfold'. -- --- > [a,b,c] -> [a,b] <> unfold c +-- > [a,b,c] -> [a,b] <> unfold (Just c) +-- > [] -> unfold Nothing -- -- This is analogous to 'concatMap', but applied only to the terminal --- element of the stream. --- --- If the stream is empty, the result is empty. +-- element of the stream. If the unfold yields an empty stream for +-- 'Nothing', the result is empty for an empty input. -- --- /Unimplemented/ +-- >>> trailer = Unfold.lmap (maybe [] (\x -> [x*10, x*100])) Unfold.fromList +-- >>> Stream.toList $ Stream.unfoldLast trailer (Stream.fromList [1,2,3 :: Int]) +-- [1,2,30,300] +-- >>> Stream.toList $ Stream.unfoldLast trailer (Stream.fromList ([] :: [Int])) +-- [] -- -{-# INLINE unfoldLast #-} -unfoldLast :: Unfold m (Maybe a) a -> Stream m a -> Stream m a -unfoldLast = undefined +{-# INLINE_NORMAL unfoldLast #-} +unfoldLast :: Applicative m + => Unfold m (Maybe a) a -> Stream m a -> Stream m a +unfoldLast (Unfold ustep inject) (Stream ostep ost) = + Stream step (UnfoldLastInput ost Nothing) + + where + + {-# INLINE_LATE step #-} + step gst (UnfoldLastInput o lst) = + (\r -> case r of + Yield x o1 -> case lst of + Nothing -> Skip (UnfoldLastInput o1 (Just x)) + Just prev -> Yield prev (UnfoldLastInput o1 (Just x)) + Skip o1 -> Skip (UnfoldLastInput o1 lst) + Stop -> Skip (UnfoldLastInject lst) + ) <$> ostep (adaptState gst) o + + step _ (UnfoldLastInject lst) = + (Skip . UnfoldLastOutput) <$> inject lst + + step _ (UnfoldLastOutput i) = + (\r -> case r of + Yield x i1 -> Yield x (UnfoldLastOutput i1) + Skip i1 -> Skip (UnfoldLastOutput i1) + Stop -> Stop + ) <$> ustep i ------------------------------------------------------------------------------ -- Selective ConcatMap @@ -822,18 +857,43 @@ unfoldLast = undefined -- The final element is removed from the output stream and replaced by -- the stream generated by applying the supplied function to it. -- --- > [a,b,c] -> [a,b] <> f c +-- > [a,b,c] -> [a,b] <> f (Just c) +-- > [] -> f Nothing -- --- This is analogous to 'concatMap', but applied only to the terminal --- element of the stream. +-- This is the 'concatMap' analogue of 'unfoldLast'. If @f Nothing@ is +-- the empty stream, the result is empty for an empty input. -- --- If the stream is empty, the result is empty. +-- Prefer 'unfoldLast' in performance critical code as it fuses better. -- --- /Unimplemented/ +-- >>> trailer = maybe Stream.nil (\x -> Stream.fromList [x*10, x*100]) +-- >>> Stream.toList $ Stream.concatMapLast trailer (Stream.fromList [1,2,3 :: Int]) +-- [1,2,30,300] +-- >>> Stream.toList $ Stream.concatMapLast trailer (Stream.fromList ([] :: [Int])) +-- [] -- -{-# INLINE concatMapLast #-} -concatMapLast :: (Maybe a -> Stream m a) -> Stream m a -> Stream m a -concatMapLast = undefined +{-# INLINE_NORMAL concatMapLast #-} +concatMapLast :: Applicative m + => (Maybe a -> Stream m a) -> Stream m a -> Stream m a +concatMapLast f (Stream ostep ost) = Stream step (Left (ost, Nothing)) + + where + + {-# INLINE_LATE step #-} + step gst (Left (o, lst)) = + (\r -> case r of + Yield x o1 -> case lst of + Nothing -> Skip (Left (o1, Just x)) + Just prev -> Yield prev (Left (o1, Just x)) + Skip o1 -> Skip (Left (o1, lst)) + Stop -> Skip (Right (f lst)) + ) <$> ostep (adaptState gst) o + + step gst (Right (UnStream istep ist)) = + (\r -> case r of + Yield x i1 -> Yield x (Right (Stream istep i1)) + Skip i1 -> Skip (Right (Stream istep i1)) + Stop -> Stop + ) <$> istep gst ist ------------------------------------------------------------------------------ -- Combine N Streams - unfoldEach diff --git a/test/Streamly/Test/Data/Stream/Type.hs b/test/Streamly/Test/Data/Stream/Type.hs index 4213746c20..dd3b9555d7 100644 --- a/test/Streamly/Test/Data/Stream/Type.hs +++ b/test/Streamly/Test/Data/Stream/Type.hs @@ -81,6 +81,48 @@ testAppendMapLastEmpty = trailer = maybe (Stream.fromList [-1]) (\x -> Stream.fromList [x * 10, x * 100]) +testUnfoldLastNonEmpty :: Expectation +testUnfoldLastNonEmpty = + Stream.toList + (Stream.unfoldLast trailer (Stream.fromList [1, 2, 3 :: Int])) + `shouldReturn` [1, 2, 30, 300] + + where + + trailer = Unfold.lmap (maybe [] (\x -> [x * 10, x * 100])) Unfold.fromList + +testUnfoldLastEmpty :: Expectation +testUnfoldLastEmpty = + Stream.toList + (Stream.unfoldLast trailer (Stream.fromList ([] :: [Int]))) + `shouldReturn` [] + + where + + trailer = Unfold.lmap (maybe [] (\x -> [x * 10, x * 100])) Unfold.fromList + +testConcatMapLastNonEmpty :: Expectation +testConcatMapLastNonEmpty = + Stream.toList + (Stream.concatMapLast trailer (Stream.fromList [1, 2, 3 :: Int])) + `shouldReturn` [1, 2, 30, 300] + + where + + trailer = + maybe (Stream.fromList []) (\x -> Stream.fromList [x * 10, x * 100]) + +testConcatMapLastEmpty :: Expectation +testConcatMapLastEmpty = + Stream.toList + (Stream.concatMapLast trailer (Stream.fromList ([] :: [Int]))) + `shouldReturn` [] + + where + + trailer = + maybe (Stream.fromList []) (\x -> Stream.fromList [x * 10, x * 100]) + moduleName :: String moduleName = "Data.Stream" @@ -205,3 +247,11 @@ main = hspec describe "Tests for Stream.appendMapLast" $ do prop "testAppendMapLastNonEmpty" testAppendMapLastNonEmpty prop "testAppendMapLastEmpty" testAppendMapLastEmpty + + describe "Tests for Stream.unfoldLast" $ do + prop "testUnfoldLastNonEmpty" testUnfoldLastNonEmpty + prop "testUnfoldLastEmpty" testUnfoldLastEmpty + + describe "Tests for Stream.concatMapLast" $ do + prop "testConcatMapLastNonEmpty" testConcatMapLastNonEmpty + prop "testConcatMapLastEmpty" testConcatMapLastEmpty From 43b37b6076ffddc50ec0a9e1bd0b84578062c96c Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Mon, 25 May 2026 13:31:34 +0530 Subject: [PATCH 19/22] Implement appendIfEmpty --- .../Streamly/Internal/Data/Stream/Nesting.hs | 46 ++++++++++++++++--- test/Streamly/Test/Data/Stream/Type.hs | 20 ++++++++ 2 files changed, 59 insertions(+), 7 deletions(-) diff --git a/core/src/Streamly/Internal/Data/Stream/Nesting.hs b/core/src/Streamly/Internal/Data/Stream/Nesting.hs index ae260b7f5c..c12747830d 100644 --- a/core/src/Streamly/Internal/Data/Stream/Nesting.hs +++ b/core/src/Streamly/Internal/Data/Stream/Nesting.hs @@ -38,6 +38,7 @@ module Streamly.Internal.Data.Stream.Nesting -- *** Appending AppendUnfoldLastState (..) + , AppendIfEmptyState (..) -- stateful appends , appendIfEmpty @@ -200,22 +201,53 @@ import Prelude hiding (concatMap, zipWith) -- second stream is appended to the first's effects if there is no visible -- output. +{-# ANN type AppendIfEmptyState Fuse #-} +data AppendIfEmptyState s1 s2 = + AppendIfEmptyFirstUnseen s1 + | AppendIfEmptyFirstSeen s1 + | AppendIfEmptySecond s2 + -- | Use the second stream only if the first stream is empty. -- -- The first stream is evaluated completely. If it yields at least one -- element, its output is used and the second stream is discarded. -- Otherwise, the second stream is evaluated and used instead. -- --- > appendIfEmpty [1,2] [3,4] == [1,2] --- > appendIfEmpty [] [3,4] == [3,4] +-- >>> Stream.toList $ Stream.appendIfEmpty (Stream.fromList [1,2 :: Int]) (Stream.fromList [3,4]) +-- [1,2] +-- >>> Stream.toList $ Stream.appendIfEmpty (Stream.fromList ([] :: [Int])) (Stream.fromList [3,4]) +-- [3,4] -- -- This is analogous to a left-biased alternative on streams. -- --- /Unimplemented/ --- -appendIfEmpty :: Stream m a -> Stream m a -> Stream m a --- ifEmpty s1 s2 = appendMapLast (\x -> maybe s2 (const nil) x) s1 -appendIfEmpty = undefined +{-# INLINE_NORMAL appendIfEmpty #-} +appendIfEmpty :: Applicative m => Stream m a -> Stream m a -> Stream m a +appendIfEmpty (Stream step1 state1) (Stream step2 state2) = + Stream step (AppendIfEmptyFirstUnseen state1) + + where + + {-# INLINE_LATE step #-} + step gst (AppendIfEmptyFirstUnseen st) = + (\r -> case r of + Yield x s -> Yield x (AppendIfEmptyFirstSeen s) + Skip s -> Skip (AppendIfEmptyFirstUnseen s) + Stop -> Skip (AppendIfEmptySecond state2) + ) <$> step1 gst st + + step gst (AppendIfEmptyFirstSeen st) = + (\r -> case r of + Yield x s -> Yield x (AppendIfEmptyFirstSeen s) + Skip s -> Skip (AppendIfEmptyFirstSeen s) + Stop -> Stop + ) <$> step1 gst st + + step gst (AppendIfEmptySecond st) = + (\r -> case r of + Yield x s -> Yield x (AppendIfEmptySecond s) + Skip s -> Skip (AppendIfEmptySecond s) + Stop -> Stop + ) <$> step2 gst st {-# ANN type AppendUnfoldLastState Fuse #-} data AppendUnfoldLastState o i b = diff --git a/test/Streamly/Test/Data/Stream/Type.hs b/test/Streamly/Test/Data/Stream/Type.hs index dd3b9555d7..bc32b70942 100644 --- a/test/Streamly/Test/Data/Stream/Type.hs +++ b/test/Streamly/Test/Data/Stream/Type.hs @@ -123,6 +123,22 @@ testConcatMapLastEmpty = trailer = maybe (Stream.fromList []) (\x -> Stream.fromList [x * 10, x * 100]) +testAppendIfEmptyNonEmpty :: Expectation +testAppendIfEmptyNonEmpty = + Stream.toList + (Stream.appendIfEmpty + (Stream.fromList [1, 2 :: Int]) + (Stream.fromList [3, 4])) + `shouldReturn` [1, 2] + +testAppendIfEmptyEmpty :: Expectation +testAppendIfEmptyEmpty = + Stream.toList + (Stream.appendIfEmpty + (Stream.fromList ([] :: [Int])) + (Stream.fromList [3, 4])) + `shouldReturn` [3, 4] + moduleName :: String moduleName = "Data.Stream" @@ -255,3 +271,7 @@ main = hspec describe "Tests for Stream.concatMapLast" $ do prop "testConcatMapLastNonEmpty" testConcatMapLastNonEmpty prop "testConcatMapLastEmpty" testConcatMapLastEmpty + + describe "Tests for Stream.appendIfEmpty" $ do + prop "testAppendIfEmptyNonEmpty" testAppendIfEmptyNonEmpty + prop "testAppendIfEmptyEmpty" testAppendIfEmptyEmpty From eaab2eb7b72a6a2d2a1635c1bb820f34be39fd90 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Mon, 25 May 2026 13:50:25 +0530 Subject: [PATCH 20/22] Implement unfoldFirst and concatMapFirst --- .../Streamly/Internal/Data/Stream/Nesting.hs | 139 +++++++++++++++++- test/Streamly/Test/Data/Stream/Type.hs | 50 +++++++ 2 files changed, 187 insertions(+), 2 deletions(-) diff --git a/core/src/Streamly/Internal/Data/Stream/Nesting.hs b/core/src/Streamly/Internal/Data/Stream/Nesting.hs index c12747830d..2a8144e0bc 100644 --- a/core/src/Streamly/Internal/Data/Stream/Nesting.hs +++ b/core/src/Streamly/Internal/Data/Stream/Nesting.hs @@ -87,7 +87,8 @@ module Streamly.Internal.Data.Stream.Nesting -- @ -- *** Stateful unfolds - -- unfoldFirst + , UnfoldFirstState(..) + , unfoldFirst , UnfoldLastState(..) , unfoldLast @@ -121,7 +122,7 @@ module Streamly.Internal.Data.Stream.Nesting , intercalateEndBy -- *** stateful concatMap - -- concatMapFirst + , concatMapFirst , concatMapLast -- *** concatMap @@ -828,6 +829,77 @@ mergeFstBy _f _m1 _m2 = undefined -- Selective unfold ------------------------------------------------------------------------------ +{-# ANN type UnfoldFirstState Fuse #-} +data UnfoldFirstState o i a = + UnfoldFirstWaitInput o + | UnfoldFirstInjectEmpty + | UnfoldFirstInjectSome a o + | UnfoldFirstOutputEmpty i + | UnfoldFirstOutputSome i o + | UnfoldFirstRest o + +-- | Replace the first element of a stream by unfolding a stream from it. +-- +-- The first element is removed from the output stream and replaced by the +-- stream generated from it using the supplied 'Unfold'. The remaining input +-- elements are emitted after the unfolded stream. +-- +-- > [a,b,c] -> unfold (Just a) <> [b,c] +-- > [] -> unfold Nothing +-- +-- This is analogous to 'concatMap', but applied only to the first element of +-- the stream. If the unfold yields an empty stream for 'Nothing', the result +-- is empty for an empty input. +-- +-- >>> header = Unfold.lmap (maybe [] (\x -> [x*10, x*100])) Unfold.fromList +-- >>> Stream.toList $ Stream.unfoldFirst header (Stream.fromList [1,2,3 :: Int]) +-- [10,100,2,3] +-- >>> Stream.toList $ Stream.unfoldFirst header (Stream.fromList ([] :: [Int])) +-- [] +-- +{-# INLINE_NORMAL unfoldFirst #-} +unfoldFirst :: Applicative m + => Unfold m (Maybe a) a -> Stream m a -> Stream m a +unfoldFirst (Unfold ustep inject) (Stream ostep ost) = + Stream step (UnfoldFirstWaitInput ost) + + where + + {-# INLINE_LATE step #-} + step gst (UnfoldFirstWaitInput o) = + (\r -> case r of + Yield x o1 -> Skip (UnfoldFirstInjectSome x o1) + Skip o1 -> Skip (UnfoldFirstWaitInput o1) + Stop -> Skip UnfoldFirstInjectEmpty + ) <$> ostep (adaptState gst) o + + step _ UnfoldFirstInjectEmpty = + (Skip . UnfoldFirstOutputEmpty) <$> inject Nothing + + step _ (UnfoldFirstInjectSome x o) = + (\i -> Skip (UnfoldFirstOutputSome i o)) <$> inject (Just x) + + step _ (UnfoldFirstOutputEmpty i) = + (\r -> case r of + Yield y i1 -> Yield y (UnfoldFirstOutputEmpty i1) + Skip i1 -> Skip (UnfoldFirstOutputEmpty i1) + Stop -> Stop + ) <$> ustep i + + step _ (UnfoldFirstOutputSome i o) = + (\r -> case r of + Yield y i1 -> Yield y (UnfoldFirstOutputSome i1 o) + Skip i1 -> Skip (UnfoldFirstOutputSome i1 o) + Stop -> Skip (UnfoldFirstRest o) + ) <$> ustep i + + step gst (UnfoldFirstRest o) = + (\r -> case r of + Yield y o1 -> Yield y (UnfoldFirstRest o1) + Skip o1 -> Skip (UnfoldFirstRest o1) + Stop -> Stop + ) <$> ostep (adaptState gst) o + {-# ANN type UnfoldLastState Fuse #-} data UnfoldLastState o i a = UnfoldLastInput o (Maybe a) @@ -884,6 +956,69 @@ unfoldLast (Unfold ustep inject) (Stream ostep ost) = -- Selective ConcatMap ------------------------------------------------------------------------------ +data ConcatMapFirstState m o a = + ConcatMapFirstWaitInput o + | ConcatMapFirstOutputEmpty (Stream m a) + | ConcatMapFirstOutputSome (Stream m a) o + | ConcatMapFirstRest o + +-- | Replace the first element of a stream using a stream-valued mapping. +-- +-- The first element is removed from the output stream and replaced by the +-- stream generated by applying the supplied function to it. The remaining +-- input elements are emitted after the generated stream. +-- +-- > [a,b,c] -> f (Just a) <> [b,c] +-- > [] -> f Nothing +-- +-- This is the 'concatMap' analogue of 'unfoldFirst'. If @f Nothing@ is +-- the empty stream, the result is empty for an empty input. +-- +-- Prefer 'unfoldFirst' in performance critical code as it fuses better. +-- +-- >>> header = maybe Stream.nil (\x -> Stream.fromList [x*10, x*100]) +-- >>> Stream.toList $ Stream.concatMapFirst header (Stream.fromList [1,2,3 :: Int]) +-- [10,100,2,3] +-- >>> Stream.toList $ Stream.concatMapFirst header (Stream.fromList ([] :: [Int])) +-- [] +-- +{-# INLINE_NORMAL concatMapFirst #-} +concatMapFirst :: Applicative m + => (Maybe a -> Stream m a) -> Stream m a -> Stream m a +concatMapFirst f (Stream ostep ost) = + Stream step (ConcatMapFirstWaitInput ost) + + where + + {-# INLINE_LATE step #-} + step gst (ConcatMapFirstWaitInput o) = + (\r -> case r of + Yield x o1 -> Skip (ConcatMapFirstOutputSome (f (Just x)) o1) + Skip o1 -> Skip (ConcatMapFirstWaitInput o1) + Stop -> Skip (ConcatMapFirstOutputEmpty (f Nothing)) + ) <$> ostep (adaptState gst) o + + step gst (ConcatMapFirstOutputEmpty (UnStream istep ist)) = + (\r -> case r of + Yield y i1 -> Yield y (ConcatMapFirstOutputEmpty (Stream istep i1)) + Skip i1 -> Skip (ConcatMapFirstOutputEmpty (Stream istep i1)) + Stop -> Stop + ) <$> istep gst ist + + step gst (ConcatMapFirstOutputSome (UnStream istep ist) o) = + (\r -> case r of + Yield y i1 -> Yield y (ConcatMapFirstOutputSome (Stream istep i1) o) + Skip i1 -> Skip (ConcatMapFirstOutputSome (Stream istep i1) o) + Stop -> Skip (ConcatMapFirstRest o) + ) <$> istep gst ist + + step gst (ConcatMapFirstRest o) = + (\r -> case r of + Yield y o1 -> Yield y (ConcatMapFirstRest o1) + Skip o1 -> Skip (ConcatMapFirstRest o1) + Stop -> Stop + ) <$> ostep (adaptState gst) o + -- | Replace the final element of a stream using a stream-valued mapping. -- -- The final element is removed from the output stream and replaced by diff --git a/test/Streamly/Test/Data/Stream/Type.hs b/test/Streamly/Test/Data/Stream/Type.hs index bc32b70942..b4c2e325ac 100644 --- a/test/Streamly/Test/Data/Stream/Type.hs +++ b/test/Streamly/Test/Data/Stream/Type.hs @@ -139,6 +139,48 @@ testAppendIfEmptyEmpty = (Stream.fromList [3, 4])) `shouldReturn` [3, 4] +testUnfoldFirstNonEmpty :: Expectation +testUnfoldFirstNonEmpty = + Stream.toList + (Stream.unfoldFirst header (Stream.fromList [1, 2, 3 :: Int])) + `shouldReturn` [10, 100, 2, 3] + + where + + header = Unfold.lmap (maybe [] (\x -> [x * 10, x * 100])) Unfold.fromList + +testUnfoldFirstEmpty :: Expectation +testUnfoldFirstEmpty = + Stream.toList + (Stream.unfoldFirst header (Stream.fromList ([] :: [Int]))) + `shouldReturn` [] + + where + + header = Unfold.lmap (maybe [] (\x -> [x * 10, x * 100])) Unfold.fromList + +testConcatMapFirstNonEmpty :: Expectation +testConcatMapFirstNonEmpty = + Stream.toList + (Stream.concatMapFirst header (Stream.fromList [1, 2, 3 :: Int])) + `shouldReturn` [10, 100, 2, 3] + + where + + header = + maybe (Stream.fromList []) (\x -> Stream.fromList [x * 10, x * 100]) + +testConcatMapFirstEmpty :: Expectation +testConcatMapFirstEmpty = + Stream.toList + (Stream.concatMapFirst header (Stream.fromList ([] :: [Int]))) + `shouldReturn` [] + + where + + header = + maybe (Stream.fromList []) (\x -> Stream.fromList [x * 10, x * 100]) + moduleName :: String moduleName = "Data.Stream" @@ -275,3 +317,11 @@ main = hspec describe "Tests for Stream.appendIfEmpty" $ do prop "testAppendIfEmptyNonEmpty" testAppendIfEmptyNonEmpty prop "testAppendIfEmptyEmpty" testAppendIfEmptyEmpty + + describe "Tests for Stream.unfoldFirst" $ do + prop "testUnfoldFirstNonEmpty" testUnfoldFirstNonEmpty + prop "testUnfoldFirstEmpty" testUnfoldFirstEmpty + + describe "Tests for Stream.concatMapFirst" $ do + prop "testConcatMapFirstNonEmpty" testConcatMapFirstNonEmpty + prop "testConcatMapFirstEmpty" testConcatMapFirstEmpty From 0cad865cd9955e7097e7db8fbdf3138c1713bb64 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Mon, 25 May 2026 14:08:05 +0530 Subject: [PATCH 21/22] Remove non-existing files from .hlint.ignore --- .hlint.ignore | 2 -- 1 file changed, 2 deletions(-) diff --git a/.hlint.ignore b/.hlint.ignore index 4209b68b9a..c4c43ac137 100644 --- a/.hlint.ignore +++ b/.hlint.ignore @@ -22,9 +22,7 @@ test/Streamly/Test/Unicode/Stream.hs test/Streamly/Test/Data/Unbox.hs benchmark/lib/Streamly/Benchmark/Common.hs benchmark/lib/Streamly/Benchmark/Common/Handle.hs -benchmark/NanoBenchmarks.hs benchmark/Streamly/Benchmark/Data/Array.hs benchmark/Streamly/Benchmark/Data/ParserK.hs -benchmark/Streamly/Benchmark/Data/Stream/StreamK.hs benchmark/Streamly/Benchmark/Data/Unfold.hs benchmark/Streamly/Benchmark/FileSystem/Handle.hs From 9b3a3137797321b53cd6b417bb78243a17b6be0a Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Mon, 25 May 2026 14:18:43 +0530 Subject: [PATCH 22/22] Fix hlint issues --- .../Streamly/Internal/Data/Stream/Nesting.hs | 44 +++++++++---------- test/Streamly/Test/Data/Stream/Type.hs | 8 ++-- 2 files changed, 26 insertions(+), 26 deletions(-) diff --git a/core/src/Streamly/Internal/Data/Stream/Nesting.hs b/core/src/Streamly/Internal/Data/Stream/Nesting.hs index 2a8144e0bc..d7d70156e8 100644 --- a/core/src/Streamly/Internal/Data/Stream/Nesting.hs +++ b/core/src/Streamly/Internal/Data/Stream/Nesting.hs @@ -230,21 +230,21 @@ appendIfEmpty (Stream step1 state1) (Stream step2 state2) = {-# INLINE_LATE step #-} step gst (AppendIfEmptyFirstUnseen st) = - (\r -> case r of + (\case Yield x s -> Yield x (AppendIfEmptyFirstSeen s) Skip s -> Skip (AppendIfEmptyFirstUnseen s) Stop -> Skip (AppendIfEmptySecond state2) ) <$> step1 gst st step gst (AppendIfEmptyFirstSeen st) = - (\r -> case r of + (\case Yield x s -> Yield x (AppendIfEmptyFirstSeen s) Skip s -> Skip (AppendIfEmptyFirstSeen s) Stop -> Stop ) <$> step1 gst st step gst (AppendIfEmptySecond st) = - (\r -> case r of + (\case Yield x s -> Yield x (AppendIfEmptySecond s) Skip s -> Skip (AppendIfEmptySecond s) Stop -> Stop @@ -289,17 +289,17 @@ appendUnfoldLast (Unfold ustep inject) (Stream ostep ost) = {-# INLINE_LATE step #-} step gst (AppendUnfoldLastInput o lst) = - (\r -> case r of + (\case Yield x o1 -> Yield x (AppendUnfoldLastInput o1 (Just x)) Skip o1 -> Skip (AppendUnfoldLastInput o1 lst) Stop -> Skip (AppendUnfoldLastInject lst) ) <$> ostep (adaptState gst) o step _ (AppendUnfoldLastInject lst) = - (Skip . AppendUnfoldLastOutput) <$> inject lst + Skip . AppendUnfoldLastOutput <$> inject lst step _ (AppendUnfoldLastOutput i) = - (\r -> case r of + (\case Yield x i1 -> Yield x (AppendUnfoldLastOutput i1) Skip i1 -> Skip (AppendUnfoldLastOutput i1) Stop -> Stop @@ -331,14 +331,14 @@ appendMapLast f (Stream ostep ost) = Stream step (Left (ost, Nothing)) {-# INLINE_LATE step #-} step gst (Left (o, lst)) = - (\r -> case r of + (\case Yield x o1 -> Yield x (Left (o1, Just x)) Skip o1 -> Skip (Left (o1, lst)) Stop -> Skip (Right (f lst)) ) <$> ostep (adaptState gst) o step gst (Right (UnStream istep ist)) = - (\r -> case r of + (\case Yield x i1 -> Yield x (Right (Stream istep i1)) Skip i1 -> Skip (Right (Stream istep i1)) Stop -> Stop @@ -867,34 +867,34 @@ unfoldFirst (Unfold ustep inject) (Stream ostep ost) = {-# INLINE_LATE step #-} step gst (UnfoldFirstWaitInput o) = - (\r -> case r of + (\case Yield x o1 -> Skip (UnfoldFirstInjectSome x o1) Skip o1 -> Skip (UnfoldFirstWaitInput o1) Stop -> Skip UnfoldFirstInjectEmpty ) <$> ostep (adaptState gst) o step _ UnfoldFirstInjectEmpty = - (Skip . UnfoldFirstOutputEmpty) <$> inject Nothing + Skip . UnfoldFirstOutputEmpty <$> inject Nothing step _ (UnfoldFirstInjectSome x o) = (\i -> Skip (UnfoldFirstOutputSome i o)) <$> inject (Just x) step _ (UnfoldFirstOutputEmpty i) = - (\r -> case r of + (\case Yield y i1 -> Yield y (UnfoldFirstOutputEmpty i1) Skip i1 -> Skip (UnfoldFirstOutputEmpty i1) Stop -> Stop ) <$> ustep i step _ (UnfoldFirstOutputSome i o) = - (\r -> case r of + (\case Yield y i1 -> Yield y (UnfoldFirstOutputSome i1 o) Skip i1 -> Skip (UnfoldFirstOutputSome i1 o) Stop -> Skip (UnfoldFirstRest o) ) <$> ustep i step gst (UnfoldFirstRest o) = - (\r -> case r of + (\case Yield y o1 -> Yield y (UnfoldFirstRest o1) Skip o1 -> Skip (UnfoldFirstRest o1) Stop -> Stop @@ -934,7 +934,7 @@ unfoldLast (Unfold ustep inject) (Stream ostep ost) = {-# INLINE_LATE step #-} step gst (UnfoldLastInput o lst) = - (\r -> case r of + (\case Yield x o1 -> case lst of Nothing -> Skip (UnfoldLastInput o1 (Just x)) Just prev -> Yield prev (UnfoldLastInput o1 (Just x)) @@ -943,10 +943,10 @@ unfoldLast (Unfold ustep inject) (Stream ostep ost) = ) <$> ostep (adaptState gst) o step _ (UnfoldLastInject lst) = - (Skip . UnfoldLastOutput) <$> inject lst + Skip . UnfoldLastOutput <$> inject lst step _ (UnfoldLastOutput i) = - (\r -> case r of + (\case Yield x i1 -> Yield x (UnfoldLastOutput i1) Skip i1 -> Skip (UnfoldLastOutput i1) Stop -> Stop @@ -992,28 +992,28 @@ concatMapFirst f (Stream ostep ost) = {-# INLINE_LATE step #-} step gst (ConcatMapFirstWaitInput o) = - (\r -> case r of + (\case Yield x o1 -> Skip (ConcatMapFirstOutputSome (f (Just x)) o1) Skip o1 -> Skip (ConcatMapFirstWaitInput o1) Stop -> Skip (ConcatMapFirstOutputEmpty (f Nothing)) ) <$> ostep (adaptState gst) o step gst (ConcatMapFirstOutputEmpty (UnStream istep ist)) = - (\r -> case r of + (\case Yield y i1 -> Yield y (ConcatMapFirstOutputEmpty (Stream istep i1)) Skip i1 -> Skip (ConcatMapFirstOutputEmpty (Stream istep i1)) Stop -> Stop ) <$> istep gst ist step gst (ConcatMapFirstOutputSome (UnStream istep ist) o) = - (\r -> case r of + (\case Yield y i1 -> Yield y (ConcatMapFirstOutputSome (Stream istep i1) o) Skip i1 -> Skip (ConcatMapFirstOutputSome (Stream istep i1) o) Stop -> Skip (ConcatMapFirstRest o) ) <$> istep gst ist step gst (ConcatMapFirstRest o) = - (\r -> case r of + (\case Yield y o1 -> Yield y (ConcatMapFirstRest o1) Skip o1 -> Skip (ConcatMapFirstRest o1) Stop -> Stop @@ -1047,7 +1047,7 @@ concatMapLast f (Stream ostep ost) = Stream step (Left (ost, Nothing)) {-# INLINE_LATE step #-} step gst (Left (o, lst)) = - (\r -> case r of + (\case Yield x o1 -> case lst of Nothing -> Skip (Left (o1, Just x)) Just prev -> Yield prev (Left (o1, Just x)) @@ -1056,7 +1056,7 @@ concatMapLast f (Stream ostep ost) = Stream step (Left (ost, Nothing)) ) <$> ostep (adaptState gst) o step gst (Right (UnStream istep ist)) = - (\r -> case r of + (\case Yield x i1 -> Yield x (Right (Stream istep i1)) Skip i1 -> Skip (Right (Stream istep i1)) Stop -> Stop diff --git a/test/Streamly/Test/Data/Stream/Type.hs b/test/Streamly/Test/Data/Stream/Type.hs index b4c2e325ac..84ab0f5662 100644 --- a/test/Streamly/Test/Data/Stream/Type.hs +++ b/test/Streamly/Test/Data/Stream/Type.hs @@ -89,7 +89,7 @@ testUnfoldLastNonEmpty = where - trailer = Unfold.lmap (maybe [] (\x -> [x * 10, x * 100])) Unfold.fromList + trailer = Unfold.lmap (foldMap (\x -> [x * 10, x * 100])) Unfold.fromList testUnfoldLastEmpty :: Expectation testUnfoldLastEmpty = @@ -99,7 +99,7 @@ testUnfoldLastEmpty = where - trailer = Unfold.lmap (maybe [] (\x -> [x * 10, x * 100])) Unfold.fromList + trailer = Unfold.lmap (foldMap (\x -> [x * 10, x * 100])) Unfold.fromList testConcatMapLastNonEmpty :: Expectation testConcatMapLastNonEmpty = @@ -147,7 +147,7 @@ testUnfoldFirstNonEmpty = where - header = Unfold.lmap (maybe [] (\x -> [x * 10, x * 100])) Unfold.fromList + header = Unfold.lmap (foldMap (\x -> [x * 10, x * 100])) Unfold.fromList testUnfoldFirstEmpty :: Expectation testUnfoldFirstEmpty = @@ -157,7 +157,7 @@ testUnfoldFirstEmpty = where - header = Unfold.lmap (maybe [] (\x -> [x * 10, x * 100])) Unfold.fromList + header = Unfold.lmap (foldMap (\x -> [x * 10, x * 100])) Unfold.fromList testConcatMapFirstNonEmpty :: Expectation testConcatMapFirstNonEmpty =