diff --git a/.hlint.ignore b/.hlint.ignore index 4209b68b9a..c4c43ac137 100644 --- a/.hlint.ignore +++ b/.hlint.ignore @@ -22,9 +22,7 @@ test/Streamly/Test/Unicode/Stream.hs test/Streamly/Test/Data/Unbox.hs benchmark/lib/Streamly/Benchmark/Common.hs benchmark/lib/Streamly/Benchmark/Common/Handle.hs -benchmark/NanoBenchmarks.hs benchmark/Streamly/Benchmark/Data/Array.hs benchmark/Streamly/Benchmark/Data/ParserK.hs -benchmark/Streamly/Benchmark/Data/Stream/StreamK.hs benchmark/Streamly/Benchmark/Data/Unfold.hs benchmark/Streamly/Benchmark/FileSystem/Handle.hs diff --git a/core/src/Streamly/Internal/Data/Stream/Nesting.hs b/core/src/Streamly/Internal/Data/Stream/Nesting.hs index 55f230a15f..d7d70156e8 100644 --- a/core/src/Streamly/Internal/Data/Stream/Nesting.hs +++ b/core/src/Streamly/Internal/Data/Stream/Nesting.hs @@ -36,11 +36,22 @@ module Streamly.Internal.Data.Stream.Nesting -- -- @Stream m a -> Stream m a -> Stream m a@. + -- *** Appending + AppendUnfoldLastState (..) + , AppendIfEmptyState (..) + + -- stateful appends + , appendIfEmpty + -- , appendUnfoldFirst -- XXX this may not be useful + , appendUnfoldLast + -- , appendUnfoldIndices + , appendMapLast + -- *** Interleaving -- | Interleave elements from two streams alternately. A special case of -- unfoldEachInterleave. Interleave is equivalent to mergeBy with a round -- robin merge function. - InterleaveState(..) + , InterleaveState(..) , interleave , interleaveEndBy' , interleaveSepBy' @@ -75,6 +86,12 @@ module Streamly.Internal.Data.Stream.Nesting -- unfoldEach: Unfold m a b -> Stream m a -> Stream m b -- @ + -- *** Stateful unfolds + , UnfoldFirstState(..) + , unfoldFirst + , UnfoldLastState(..) + , unfoldLast + -- *** unfoldEach -- | Generate streams by using an unfold on each element of an input -- stream, append the resulting streams and flatten. A special case of @@ -104,6 +121,10 @@ module Streamly.Internal.Data.Stream.Nesting , intercalateSepBy , intercalateEndBy + -- *** stateful concatMap + , concatMapFirst + , concatMapLast + -- *** concatMap , fairConcatMapM , fairConcatMap @@ -168,6 +189,161 @@ import Prelude hiding (concatMap, zipWith) #include "DocTestDataStream.hs" +------------------------------------------------------------------------------ +-- Appending +------------------------------------------------------------------------------ + +-- NOTE: If we want to view this as the second stream transformed by the first +-- stream then we can use the name "onEmpty" instead. +-- +-- The name appendIfEmpty might sound like there is nothing being appended, so +-- why append in the name. But actually the first stream is fully evaluated +-- even when it is empty, so the effects are generated even in that case, the +-- second stream is appended to the first's effects if there is no visible +-- output. + +{-# ANN type AppendIfEmptyState Fuse #-} +data AppendIfEmptyState s1 s2 = + AppendIfEmptyFirstUnseen s1 + | AppendIfEmptyFirstSeen s1 + | AppendIfEmptySecond s2 + +-- | Use the second stream only if the first stream is empty. +-- +-- The first stream is evaluated completely. If it yields at least one +-- element, its output is used and the second stream is discarded. +-- Otherwise, the second stream is evaluated and used instead. +-- +-- >>> Stream.toList $ Stream.appendIfEmpty (Stream.fromList [1,2 :: Int]) (Stream.fromList [3,4]) +-- [1,2] +-- >>> Stream.toList $ Stream.appendIfEmpty (Stream.fromList ([] :: [Int])) (Stream.fromList [3,4]) +-- [3,4] +-- +-- This is analogous to a left-biased alternative on streams. +-- +{-# INLINE_NORMAL appendIfEmpty #-} +appendIfEmpty :: Applicative m => Stream m a -> Stream m a -> Stream m a +appendIfEmpty (Stream step1 state1) (Stream step2 state2) = + Stream step (AppendIfEmptyFirstUnseen state1) + + where + + {-# INLINE_LATE step #-} + step gst (AppendIfEmptyFirstUnseen st) = + (\case + Yield x s -> Yield x (AppendIfEmptyFirstSeen s) + Skip s -> Skip (AppendIfEmptyFirstUnseen s) + Stop -> Skip (AppendIfEmptySecond state2) + ) <$> step1 gst st + + step gst (AppendIfEmptyFirstSeen st) = + (\case + Yield x s -> Yield x (AppendIfEmptyFirstSeen s) + Skip s -> Skip (AppendIfEmptyFirstSeen s) + Stop -> Stop + ) <$> step1 gst st + + step gst (AppendIfEmptySecond st) = + (\case + Yield x s -> Yield x (AppendIfEmptySecond s) + Skip s -> Skip (AppendIfEmptySecond s) + Stop -> Stop + ) <$> step2 gst st + +{-# ANN type AppendUnfoldLastState Fuse #-} +data AppendUnfoldLastState o i b = + AppendUnfoldLastInput o (Maybe b) + | AppendUnfoldLastInject (Maybe b) + | AppendUnfoldLastOutput i + +-- | Append to the input stream a stream generated from terminal state +-- accumulated while consuming the input stream. +-- +-- The unfold is seeded with: +-- +-- * 'Just x' where @x@ is the final element of the stream +-- * 'Nothing' if the stream is empty +-- +-- This is useful when a stream processing phase needs to hand over +-- terminal state to another stream generation phase. +-- +-- For example, one stream may incrementally compute state using scans +-- or folds, and the resulting terminal state can then be used to +-- generate a follow-up stream using general unfolding primitives. +-- +-- >>> trailer = Unfold.lmap (maybe [-1] (\x -> [x*10, x*100])) Unfold.fromList +-- >>> Stream.toList $ Stream.appendUnfoldLast trailer (Stream.fromList [1,2,3 :: Int]) +-- [1,2,3,30,300] +-- >>> Stream.toList $ Stream.appendUnfoldLast trailer (Stream.fromList ([] :: [Int])) +-- [-1] +-- +{-# INLINE_NORMAL appendUnfoldLast #-} +appendUnfoldLast :: Applicative m + => Unfold m (Maybe b) b + -> Stream m b + -> Stream m b +appendUnfoldLast (Unfold ustep inject) (Stream ostep ost) = + Stream step (AppendUnfoldLastInput ost Nothing) + + where + + {-# INLINE_LATE step #-} + step gst (AppendUnfoldLastInput o lst) = + (\case + Yield x o1 -> Yield x (AppendUnfoldLastInput o1 (Just x)) + Skip o1 -> Skip (AppendUnfoldLastInput o1 lst) + Stop -> Skip (AppendUnfoldLastInject lst) + ) <$> ostep (adaptState gst) o + + step _ (AppendUnfoldLastInject lst) = + Skip . AppendUnfoldLastOutput <$> inject lst + + step _ (AppendUnfoldLastOutput i) = + (\case + Yield x i1 -> Yield x (AppendUnfoldLastOutput i1) + Skip i1 -> Skip (AppendUnfoldLastOutput i1) + Stop -> Stop + ) <$> ustep i + +-- | Append to the input stream a stream generated from terminal state +-- accumulated while consuming the input stream. +-- +-- The function is invoked with: +-- +-- * 'Just x' where @x@ is the final element of the stream +-- * 'Nothing' if the stream is empty +-- +-- This is the 'concatMap' analogue of 'appendUnfoldLast'. Prefer +-- 'appendUnfoldLast' in performance critical code as it fuses better. +-- +-- >>> trailer = maybe (Stream.fromList [-1]) (\x -> Stream.fromList [x*10, x*100]) +-- >>> Stream.toList $ Stream.appendMapLast trailer (Stream.fromList [1,2,3 :: Int]) +-- [1,2,3,30,300] +-- >>> Stream.toList $ Stream.appendMapLast trailer (Stream.fromList ([] :: [Int])) +-- [-1] +-- +{-# INLINE_NORMAL appendMapLast #-} +appendMapLast :: Applicative m + => (Maybe b -> Stream m b) -> Stream m b -> Stream m b +appendMapLast f (Stream ostep ost) = Stream step (Left (ost, Nothing)) + + where + + {-# INLINE_LATE step #-} + step gst (Left (o, lst)) = + (\case + Yield x o1 -> Yield x (Left (o1, Just x)) + Skip o1 -> Skip (Left (o1, lst)) + Stop -> Skip (Right (f lst)) + ) <$> ostep (adaptState gst) o + + step gst (Right (UnStream istep ist)) = + (\case + Yield x i1 -> Yield x (Right (Stream istep i1)) + Skip i1 -> Skip (Right (Stream istep i1)) + Stop -> Stop + ) <$> istep gst ist + ------------------------------------------------------------------------------ -- Interleaving ------------------------------------------------------------------------------ @@ -649,6 +825,243 @@ mergeFstBy :: -- Monad m => mergeFstBy _f _m1 _m2 = undefined -- fromStreamK $ D.mergeFstBy f (toStreamD m1) (toStreamD m2) +------------------------------------------------------------------------------ +-- Selective unfold +------------------------------------------------------------------------------ + +{-# ANN type UnfoldFirstState Fuse #-} +data UnfoldFirstState o i a = + UnfoldFirstWaitInput o + | UnfoldFirstInjectEmpty + | UnfoldFirstInjectSome a o + | UnfoldFirstOutputEmpty i + | UnfoldFirstOutputSome i o + | UnfoldFirstRest o + +-- | Replace the first element of a stream by unfolding a stream from it. +-- +-- The first element is removed from the output stream and replaced by the +-- stream generated from it using the supplied 'Unfold'. The remaining input +-- elements are emitted after the unfolded stream. +-- +-- > [a,b,c] -> unfold (Just a) <> [b,c] +-- > [] -> unfold Nothing +-- +-- This is analogous to 'concatMap', but applied only to the first element of +-- the stream. If the unfold yields an empty stream for 'Nothing', the result +-- is empty for an empty input. +-- +-- >>> header = Unfold.lmap (maybe [] (\x -> [x*10, x*100])) Unfold.fromList +-- >>> Stream.toList $ Stream.unfoldFirst header (Stream.fromList [1,2,3 :: Int]) +-- [10,100,2,3] +-- >>> Stream.toList $ Stream.unfoldFirst header (Stream.fromList ([] :: [Int])) +-- [] +-- +{-# INLINE_NORMAL unfoldFirst #-} +unfoldFirst :: Applicative m + => Unfold m (Maybe a) a -> Stream m a -> Stream m a +unfoldFirst (Unfold ustep inject) (Stream ostep ost) = + Stream step (UnfoldFirstWaitInput ost) + + where + + {-# INLINE_LATE step #-} + step gst (UnfoldFirstWaitInput o) = + (\case + Yield x o1 -> Skip (UnfoldFirstInjectSome x o1) + Skip o1 -> Skip (UnfoldFirstWaitInput o1) + Stop -> Skip UnfoldFirstInjectEmpty + ) <$> ostep (adaptState gst) o + + step _ UnfoldFirstInjectEmpty = + Skip . UnfoldFirstOutputEmpty <$> inject Nothing + + step _ (UnfoldFirstInjectSome x o) = + (\i -> Skip (UnfoldFirstOutputSome i o)) <$> inject (Just x) + + step _ (UnfoldFirstOutputEmpty i) = + (\case + Yield y i1 -> Yield y (UnfoldFirstOutputEmpty i1) + Skip i1 -> Skip (UnfoldFirstOutputEmpty i1) + Stop -> Stop + ) <$> ustep i + + step _ (UnfoldFirstOutputSome i o) = + (\case + Yield y i1 -> Yield y (UnfoldFirstOutputSome i1 o) + Skip i1 -> Skip (UnfoldFirstOutputSome i1 o) + Stop -> Skip (UnfoldFirstRest o) + ) <$> ustep i + + step gst (UnfoldFirstRest o) = + (\case + Yield y o1 -> Yield y (UnfoldFirstRest o1) + Skip o1 -> Skip (UnfoldFirstRest o1) + Stop -> Stop + ) <$> ostep (adaptState gst) o + +{-# ANN type UnfoldLastState Fuse #-} +data UnfoldLastState o i a = + UnfoldLastInput o (Maybe a) + | UnfoldLastInject (Maybe a) + | UnfoldLastOutput i + +-- | Replace the final element of a stream by unfolding a stream from it. +-- +-- The final element is removed from the output stream and used as the +-- seed to generate a replacement stream using the supplied 'Unfold'. +-- +-- > [a,b,c] -> [a,b] <> unfold (Just c) +-- > [] -> unfold Nothing +-- +-- This is analogous to 'concatMap', but applied only to the terminal +-- element of the stream. If the unfold yields an empty stream for +-- 'Nothing', the result is empty for an empty input. +-- +-- >>> trailer = Unfold.lmap (maybe [] (\x -> [x*10, x*100])) Unfold.fromList +-- >>> Stream.toList $ Stream.unfoldLast trailer (Stream.fromList [1,2,3 :: Int]) +-- [1,2,30,300] +-- >>> Stream.toList $ Stream.unfoldLast trailer (Stream.fromList ([] :: [Int])) +-- [] +-- +{-# INLINE_NORMAL unfoldLast #-} +unfoldLast :: Applicative m + => Unfold m (Maybe a) a -> Stream m a -> Stream m a +unfoldLast (Unfold ustep inject) (Stream ostep ost) = + Stream step (UnfoldLastInput ost Nothing) + + where + + {-# INLINE_LATE step #-} + step gst (UnfoldLastInput o lst) = + (\case + Yield x o1 -> case lst of + Nothing -> Skip (UnfoldLastInput o1 (Just x)) + Just prev -> Yield prev (UnfoldLastInput o1 (Just x)) + Skip o1 -> Skip (UnfoldLastInput o1 lst) + Stop -> Skip (UnfoldLastInject lst) + ) <$> ostep (adaptState gst) o + + step _ (UnfoldLastInject lst) = + Skip . UnfoldLastOutput <$> inject lst + + step _ (UnfoldLastOutput i) = + (\case + Yield x i1 -> Yield x (UnfoldLastOutput i1) + Skip i1 -> Skip (UnfoldLastOutput i1) + Stop -> Stop + ) <$> ustep i + +------------------------------------------------------------------------------ +-- Selective ConcatMap +------------------------------------------------------------------------------ + +data ConcatMapFirstState m o a = + ConcatMapFirstWaitInput o + | ConcatMapFirstOutputEmpty (Stream m a) + | ConcatMapFirstOutputSome (Stream m a) o + | ConcatMapFirstRest o + +-- | Replace the first element of a stream using a stream-valued mapping. +-- +-- The first element is removed from the output stream and replaced by the +-- stream generated by applying the supplied function to it. The remaining +-- input elements are emitted after the generated stream. +-- +-- > [a,b,c] -> f (Just a) <> [b,c] +-- > [] -> f Nothing +-- +-- This is the 'concatMap' analogue of 'unfoldFirst'. If @f Nothing@ is +-- the empty stream, the result is empty for an empty input. +-- +-- Prefer 'unfoldFirst' in performance critical code as it fuses better. +-- +-- >>> header = maybe Stream.nil (\x -> Stream.fromList [x*10, x*100]) +-- >>> Stream.toList $ Stream.concatMapFirst header (Stream.fromList [1,2,3 :: Int]) +-- [10,100,2,3] +-- >>> Stream.toList $ Stream.concatMapFirst header (Stream.fromList ([] :: [Int])) +-- [] +-- +{-# INLINE_NORMAL concatMapFirst #-} +concatMapFirst :: Applicative m + => (Maybe a -> Stream m a) -> Stream m a -> Stream m a +concatMapFirst f (Stream ostep ost) = + Stream step (ConcatMapFirstWaitInput ost) + + where + + {-# INLINE_LATE step #-} + step gst (ConcatMapFirstWaitInput o) = + (\case + Yield x o1 -> Skip (ConcatMapFirstOutputSome (f (Just x)) o1) + Skip o1 -> Skip (ConcatMapFirstWaitInput o1) + Stop -> Skip (ConcatMapFirstOutputEmpty (f Nothing)) + ) <$> ostep (adaptState gst) o + + step gst (ConcatMapFirstOutputEmpty (UnStream istep ist)) = + (\case + Yield y i1 -> Yield y (ConcatMapFirstOutputEmpty (Stream istep i1)) + Skip i1 -> Skip (ConcatMapFirstOutputEmpty (Stream istep i1)) + Stop -> Stop + ) <$> istep gst ist + + step gst (ConcatMapFirstOutputSome (UnStream istep ist) o) = + (\case + Yield y i1 -> Yield y (ConcatMapFirstOutputSome (Stream istep i1) o) + Skip i1 -> Skip (ConcatMapFirstOutputSome (Stream istep i1) o) + Stop -> Skip (ConcatMapFirstRest o) + ) <$> istep gst ist + + step gst (ConcatMapFirstRest o) = + (\case + Yield y o1 -> Yield y (ConcatMapFirstRest o1) + Skip o1 -> Skip (ConcatMapFirstRest o1) + Stop -> Stop + ) <$> ostep (adaptState gst) o + +-- | Replace the final element of a stream using a stream-valued mapping. +-- +-- The final element is removed from the output stream and replaced by +-- the stream generated by applying the supplied function to it. +-- +-- > [a,b,c] -> [a,b] <> f (Just c) +-- > [] -> f Nothing +-- +-- This is the 'concatMap' analogue of 'unfoldLast'. If @f Nothing@ is +-- the empty stream, the result is empty for an empty input. +-- +-- Prefer 'unfoldLast' in performance critical code as it fuses better. +-- +-- >>> trailer = maybe Stream.nil (\x -> Stream.fromList [x*10, x*100]) +-- >>> Stream.toList $ Stream.concatMapLast trailer (Stream.fromList [1,2,3 :: Int]) +-- [1,2,30,300] +-- >>> Stream.toList $ Stream.concatMapLast trailer (Stream.fromList ([] :: [Int])) +-- [] +-- +{-# INLINE_NORMAL concatMapLast #-} +concatMapLast :: Applicative m + => (Maybe a -> Stream m a) -> Stream m a -> Stream m a +concatMapLast f (Stream ostep ost) = Stream step (Left (ost, Nothing)) + + where + + {-# INLINE_LATE step #-} + step gst (Left (o, lst)) = + (\case + Yield x o1 -> case lst of + Nothing -> Skip (Left (o1, Just x)) + Just prev -> Yield prev (Left (o1, Just x)) + Skip o1 -> Skip (Left (o1, lst)) + Stop -> Skip (Right (f lst)) + ) <$> ostep (adaptState gst) o + + step gst (Right (UnStream istep ist)) = + (\case + Yield x i1 -> Yield x (Right (Stream istep i1)) + Skip i1 -> Skip (Right (Stream istep i1)) + Stop -> Stop + ) <$> istep gst ist + ------------------------------------------------------------------------------ -- Combine N Streams - unfoldEach ------------------------------------------------------------------------------ diff --git a/core/src/Streamly/Internal/Data/Stream/Transform.hs b/core/src/Streamly/Internal/Data/Stream/Transform.hs index 156a23f6ec..7512b7dd39 100644 --- a/core/src/Streamly/Internal/Data/Stream/Transform.hs +++ b/core/src/Streamly/Internal/Data/Stream/Transform.hs @@ -28,49 +28,60 @@ module Streamly.Internal.Data.Stream.Transform , foldlS -- * Composable Scans - , postscanl , scanl + , postscanl , scanlMany , scanr , pipe - -- * Splitting - - -- NOTE: splitEndBy can be written as an idiom via folds: - -- > splitEndBy p f = Stream.foldMany (Fold.takeEndBy p f) - -- > splitEndBy_ p f = Stream.foldMany (Fold.takeEndBy_ p f) - -- Unlike the Sequence style splitters there is no splitSepByAny here - -- because the default splitSepBy itself is in fact same as splitSepByAny. - - , splitSepBy_ - -- splitSepBy -- keeps the delimiters - - -- * Ad-hoc Scans - -- | Left scans. Stateful, mostly one-to-one maps. + -- * Traditional Left Scans + -- | Stateful, mostly one-to-one maps. + -- ** Strict Scans , scanlM' - , scanlMAfter' , scanl' - , scanlM - , scanlBy , scanl1M' , scanl1' - , scanl1M - , scanl1 + -- Prescans , prescanl' , prescanlM' - , postscanlBy - , postscanlM + -- Postscans , postscanl' , postscanlM' - , postscanlMAfter' + , mapAccumM + + -- ** Lazy Scans + , scanlBy -- same as traditional "scanl" + , scanlM + , scanl1M + , scanl1 + , postscanlBy -- same as traditional lazy "postscanl" + , postscanlM + + -- *** with extraction (for foldl compatibility) , postscanlx' , postscanlMx' , scanlMx' , scanlx' + -- *** Specialized scans + -- transform the terminal state and append a single element + , scanlMAfter' + , postscanlMAfter' + + -- * Splitting + + -- NOTE: splitEndBy can be written as an idiom via folds: + -- > splitEndBy p f = Stream.foldMany (Fold.takeEndBy p f) + -- > splitEndBy_ p f = Stream.foldMany (Fold.takeEndBy_ p f) + -- Unlike the Sequence style splitters there is no splitSepByAny here + -- because the default splitSepBy itself is in fact same as splitSepByAny. + + , splitSepBy_ + -- splitSepBy -- keeps the delimiters + -- * Filtering -- delete is for once like insert, filter is for many like intersperse. @@ -160,6 +171,12 @@ module Streamly.Internal.Data.Stream.Transform , rollingMapM , rollingMap2 + -- * Selective Maps + -- , modifyFirst + , modifyLast + -- , modifyLastN -- using a ring buffer + -- , modifyIndices + -- * Maybe Streams , mapMaybe , mapMaybeM @@ -778,8 +795,23 @@ scanlx' fstep begin done = ------------------------------------------------------------------------------ -- Adapted from the vector package. + +-- | postscan and mapAccum (without terminal extraction) are identical in +-- behavior and implementable in terms of each other. mapAccum is cleaner to +-- have because postscan implementation in terms of it is clean whereas the +-- vice-versa requires an undefined. +-- {-# INLINE_NORMAL postscanlM' #-} postscanlM' :: Monad m => (b -> a -> m b) -> m b -> Stream m a -> Stream m b +{- +postscanlM' step = mapAccumM step1 + + where + + step1 s a = do + b <- step s a + pure (b, b) +-} postscanlM' fstep begin (Stream step state) = Stream step' Nothing where @@ -1027,6 +1059,48 @@ scanl1M' fstep (Stream step state) = Stream step' (state, Nothing) scanl1' :: Monad m => (a -> a -> a) -> Stream m a -> Stream m a scanl1' f = scanl1M' (\x y -> return (f x y)) +------------------------------------------------------------------------------- +-- Mealy style scans +------------------------------------------------------------------------------- + +-- | postscan and mapAccum (without terminal extraction) are identical in +-- behavior and implementable in terms of each other. mapAccum is cleaner to +-- have because postscan implementation in terms of it is clean whereas the +-- vice-versa requires an undefined. +-- +{-# INLINE_NORMAL mapAccumM #-} +mapAccumM :: Monad m => + (s -> a -> m (s, b)) + -> m s + -> Stream m a + -> Stream m b +{- +-- It can be implemented in terms of 'postscanlM'' but +-- requires an @undefined@ for the initial output. +mapAccumM step initial stream = + let r = postscanlM' + (\(s, _) a -> step s a) + (fmap (,undefined) initial) + stream + in fmap snd r + -} +mapAccumM fstep begin (Stream step state) = + Stream step' Nothing + where + {-# INLINE_LATE step' #-} + step' _ Nothing = do + !x <- begin + return $ Skip (Just (state, x)) + + step' gst (Just (st, acc)) = do + r <- step (adaptState gst) st + case r of + Yield x s -> do + (!acc1, !y) <- fstep acc x + return $ Yield y (Just (s, acc1)) + Skip s -> return $ Skip (Just (s, acc)) + Stop -> return Stop + ------------------------------------------------------------------------------- -- Filtering ------------------------------------------------------------------------------- @@ -2018,6 +2092,53 @@ rollingMap2 f = catMaybes . rollingMap g g Nothing _ = Nothing g (Just x) y = Just (f x y) +------------------------------------------------------------------------------ +-- Selective Map +------------------------------------------------------------------------------ + +{-# ANN type ModifyLastState Fuse #-} +data ModifyLastState s a = + ModifyLastInit s + | ModifyLastBuf s a + | ModifyLastDone + +-- | Modify the final element of a stream. +-- +-- The supplied function is applied only to the terminal element of +-- the stream. All preceding elements are emitted unchanged. +-- +-- >>> Stream.toList $ Stream.modifyLast negate $ Stream.fromList [1,2,3] +-- [1,2,-3] +-- +-- If the stream is empty, the result is empty. +-- +-- Space: @O(1)@ +-- +-- /Pre-release/ +-- +{-# INLINE_NORMAL modifyLast #-} +modifyLast :: Monad m => (a -> a) -> Stream m a -> Stream m a +modifyLast f (Stream step1 state1) = Stream step (ModifyLastInit state1) + + where + + {-# INLINE_LATE step #-} + step gst (ModifyLastInit s1) = do + r <- step1 (adaptState gst) s1 + return $ case r of + Yield x s2 -> Skip (ModifyLastBuf s2 x) + Skip s2 -> Skip (ModifyLastInit s2) + Stop -> Stop + + step gst (ModifyLastBuf s1 a) = do + r <- step1 (adaptState gst) s1 + return $ case r of + Yield x s2 -> Yield a (ModifyLastBuf s2 x) + Skip s2 -> Skip (ModifyLastBuf s2 a) + Stop -> Yield (f a) ModifyLastDone + + step _ ModifyLastDone = return Stop + ------------------------------------------------------------------------------ -- Maybe Streams ------------------------------------------------------------------------------ diff --git a/core/src/Streamly/Internal/Data/Stream/Type.hs b/core/src/Streamly/Internal/Data/Stream/Type.hs index 3dde91b070..a43b33ca47 100644 --- a/core/src/Streamly/Internal/Data/Stream/Type.hs +++ b/core/src/Streamly/Internal/Data/Stream/Type.hs @@ -343,6 +343,11 @@ uncons (UnStream step state) = go SPEC state data UnfoldState s = UnfoldNothing | UnfoldJust s +-- XXX Because the inject function is monadic we need a separate state for +-- inject. If we had a pure Unfold type then conversion to stream is trivial. +-- We can possibly have Unfold and UnfoldM or Unfold_ (pure). Which use cases +-- require the monadic inject? + -- | Convert an 'Unfold' into a stream by supplying it an input seed. -- -- >>> s = Stream.unfold Unfold.replicateM (3, putStrLn "hello") @@ -783,6 +788,11 @@ head = fold Fold.one head = foldrM (\x _ -> return (Just x)) (return Nothing) #endif +-- XXX This is not worth keeping as it is trivial to express. + +-- | +-- >> headElse def = fmap (fromMaybe def) . head +-- {-# INLINE_NORMAL headElse #-} headElse :: Monad m => a -> Stream m a -> m a headElse a = foldrM (\x _ -> return x) (return a) diff --git a/core/src/Streamly/Internal/Data/Unfold/Type.hs b/core/src/Streamly/Internal/Data/Unfold/Type.hs index 3150fe605c..d94822a04d 100644 --- a/core/src/Streamly/Internal/Data/Unfold/Type.hs +++ b/core/src/Streamly/Internal/Data/Unfold/Type.hs @@ -70,11 +70,12 @@ module Streamly.Internal.Data.Unfold.Type , lmapM , map , mapM - , both - , supply - , first - , second - , carry + , supply -- input or useInput + , first -- asFirst + , second --asSecond + , carry -- XXX carryInput? + , consInput + , consInputWith -- * Trimming , takeWhileM @@ -114,6 +115,7 @@ module Streamly.Internal.Data.Unfold.Type , map2 , mapM2 , takeWhileMWithInput + , both ) where @@ -472,6 +474,40 @@ carry (Unfold ustep uinject) = Unfold step (\a -> (a,) <$> uinject a) {-# INLINE_LATE step #-} step (a, st) = fmap (func a) (ustep st) +{-# ANN type ConsInputState Fuse #-} +data ConsInputState a s = ConsInputFirst a s | ConsInputRest s + +-- | Prepend @f a@ to the output of the unfold, where @a@ is the input seed. +-- +-- >>> Unfold.fold Fold.toList (Unfold.consInputWith length Unfold.fromList) [1,2,3] +-- [3,1,2,3] +-- +{-# INLINE_NORMAL consInputWith #-} +consInputWith :: Applicative m => (a -> b) -> Unfold m a b -> Unfold m a b +consInputWith f (Unfold ustep uinject) = Unfold step inject + + where + + inject a = ConsInputFirst a <$> uinject a + + next r = case r of + Yield x s1 -> Yield x (ConsInputRest s1) + Skip s1 -> Skip (ConsInputRest s1) + Stop -> Stop + + {-# INLINE_LATE step #-} + step (ConsInputFirst a s) = pure $ Yield (f a) (ConsInputRest s) + step (ConsInputRest s) = next <$> ustep s + +-- | Prepend the input seed to the output of the unfold. +-- +-- >>> Unfold.fold Fold.toList (Unfold.consInput (Unfold.function (* 2))) 3 +-- [3,6] +-- +{-# INLINE consInput #-} +consInput :: Applicative m => Unfold m a a -> Unfold m a a +consInput = consInputWith id + {-# DEPRECATED map2 "Use carry with map instead." #-} {-# INLINE_NORMAL map2 #-} map2 :: Functor m => (a -> b -> c) -> Unfold m a b -> Unfold m a c diff --git a/docs/Developer/Data.Scanl.md b/docs/Developer/Data.Scanl.md new file mode 100644 index 0000000000..ce3171328f --- /dev/null +++ b/docs/Developer/Data.Scanl.md @@ -0,0 +1,499 @@ +## Moore vs Mealy Machine + +Moore: observes states +Mealy: observes transitions + +Moore naturally exposes an initial observation. +Mealy naturally supports terminal extraction. + +Moore and Mealy both can represent stream transformations as well as folds +equally well. The Mealy formulation is usually more straightforward to think as +an explicit state machine. We have an initial state, each input causes a state +transition and an output. + +Producer: A state machine that produces output without any input +Transducer: A state machine that produces output on input + +## Unfold and Stream (Simplest Producer) + +A state machine with no input: +``` +unfold :: (s -> m (s, b)) -> m s -> Stream m b +``` + +Reification of the above unfold function as a data type with termination +support and open injectable state. +``` +data Step s b = Yield b s | Stop +data Unfold m a b = forall s. Unfold (s -> m (Step s b)) (a -> m s) +``` + +For filtering we can use `Maybe b` as the output type of the machine or +equivalently we can use an explicit `Skip` constructor: +``` +data Step s b = Yield b s | Skip s | Stop +data Unfold m a b = forall s. Unfold (s -> m (Step s b)) (a -> m s) +``` + +Stream is an unfold with a closed state. +``` +data Stream m a = forall s. Stream (s -> m (Step s a)) s +``` + +In a producer machine, the driver keeps cranking the machine until the machine +stops, the machines decides how many items to produce and when to stop. The +machine also controls whether to produce an output or just pass on each crank. + +## Rescan and Scan (Simplest Transducer) + +The simplest transducer is a Mealy style scan without a terminal state +emission or we can say Moore without an initial state emission: +``` +transduce :: (s -> a -> m (s, b)) -> m s -> Stream m a -> Stream m b +``` + +Reification of the above transduce function as a data type with termination +support and open injectable state. +``` +data Step s b = Yield b s | Skip s | Stop +data Rescan m c a b = forall s. Rescan (s -> a -> m (Step s b)) (c -> m s) +``` + +Equivalently instead of using the Skip constructor we can use a `Maybe b` type +instead if we want to filter outputs (consumer mode). And we can use `Maybe a` +type if we want to allow the machine to be cranked without any input (producer +mode). + +Scan is a Rescan with a closed state: +``` +data Scan m a b = forall s. Scan (s -> a -> m (Step s b)) s +``` + +In a scan the driver cranks the machine and decides whether to supply an input +or crank without one. On each crank the machine decides whether to produce an +output or pass without one. So the driver controls the input side and the +machine controls the output side, but neither can seize control of the cranking +rhythm: the machine cannot enter a producer phase where it forces the driver to +keep cranking until the machine signals it is ready to accept input again. +(That producer phase is what distinguishes a Pipe.) + +## Scanl (Moore and Mealy Machines) + +### Moore Style Scan + +foldl and scanl in Streamly are Moore machine style representations: +``` +foldl :: (b -> a -> b) -> b -> Stream m a -> m b +scanl :: (s -> a -> s) -> s -> Stream m a -> Stream m s +``` + +A Moore machine emits the states of the machine, the output is solely a +function of the state. scanl emits the initial value of the accumulator +followed by one value per input, the next state of the machine. If the input +stream size is n the output stream size is n + 1. + +The Scanl type in streamly is a data representation of strict left +scan. The Fold type is a data representation of strict left fold. + +### Mealy Style mapAccum + +mapAccum is a Mealy machine style representation: +``` +mapAccum :: (s -> a -> (s, b)) -> s -> Stream m a -> Stream m b +mapAccumM :: (s -> a -> m (s, b)) -> m s -> Stream m a -> Stream m b +mapAccumEnd :: (s -> a -> (s, b)) -> s -> (s -> b) -> Stream m a -> Stream m b +mapAccumEndM :: (s -> a -> m (s, b)) -> m s -> (s -> m b) -> Stream m a -> Stream m b +``` + +Here, the state and the output are separate, each input generates the next +state and an output. Notice, the step function has the same shape as mapAccumL, +but mapAccumL returns the final accumulator separately while the mapAccumEnd +implementation above emits a final value into the stream. mapAccum emits values +only on inputs, it does not emit an initial value. However, the extraction +function (s -> b) in mapAccumEnd generates an additional terminal value. While +in Moore we have an additional initial value, in Mealy we have an additional +final value. + +`mapAccumEnd` emits one output per input (from the step's b), followed by one +terminal value (from extract). If the input stream size is n the output +stream size is n + 1. + +mapAccumEnd (Mealy) and scanl (Moore) can be interconverted. + +### Implementation Optimizations (Scan Type) + +We can write a variant of `scanl` that does not emit the initial value +(postscan), and similarly a variant of scan that does not emit the terminal +value. Use cases where we do not require these additional values can benefit +from these more efficient simpler variants. + +In a scanl, the driver must emit the initial value before processing any +input. Since this cannot be folded into the per-input step, the driver +needs a distinct pre-input state to run the initialization effect and +transition into the normal wait-for-input loop. That state remains in +the dispatch path for the lifetime of the stream. + +In postscan, the initial value is not emitted, so this extra state can +usually be eliminated. However, a monadic initial value still requires a +separate initialization state to execute the effect exactly once before +entering the main state machine. + +Here is a snippet from real postscan code: + +``` + step _ (ScanInit st) = do + res <- initial + return + $ case res of + Partial fs -> Skip $ ScanDo st fs + Done b -> Yield b ScanDone + step gst (ScanDo st fs) = do + res <- sstep st + case res of + Yield x s -> do + ... +``` + +However, when the state is pure, GHC fusion optimizations can +potentially eliminate the initial state, if it can happen reliably we do +not need a separate postscan type for this simplification. In several +cases we have actually seen fusion issues or inefficiencies due to this +initial state where it is not required. + +The Mealy side is symmetric at the other end. In a scan without terminal +state extraction and with pure finalizer we do not need a separate +state. However, a monadic finalizer forces a post-input state even +without terminal state extraction. + +A Moore machine without the initial value and a Mealy machine without +the terminal value are the simplest to implement and the most reliably +fused. This is basically the `Scan` type we discussed earlier. + +### Scanl Type + +Both Moore and Mealy can represent transformations as well as consumers. +Operations like map and filter where only transitions are important, and +empty stream does not require any special processing are efficiently +expressible by Mealy style without extraction or Moore without an +initial step. On the other hand operations like sum where empty stream +sums to 0 are more convenient in Moore style or Mealy with a final +extraction step. + +Streamly's Scanl type is Moore style, it emits an initial value extracted from +the initial state. The extract function extracts the output from the state, it +can be called after every `Partial` step to get the output. It also has a +finalizer action, which is called only when the scan is terminated externally +rather than completing on its own (via Done). In that case `final` can be used +to perform any resource finalization, it does not return any output, just an +effect, and should be called only once. It supports resource bracketing by +resource allocation in initial state and release in the final. However always +emitting a value in the initial step can make implementation of operations +like map and filter more complicated than they need to be. Separate types can +be used for different use cases which would be less ergonomic but can be +optimized better. + +``` +data Step s b = Partial !s | Done !b + +data Scanl m a b = + -- | @Scanl@ @step@ @initial@ @extract@ @final@ + forall s. Scanl (s -> a -> m (Step s b)) (m (Step s b)) (s -> m b) (s -> m ()) +``` + +We require the general Scanl type for resource bracketing. For fusion +optimizations, a postscan (no initial emission) type may be helpful for the +case when no resource bracketing is required. + +## concatScanlM and concatMapAccumM + +The Mealy formulation generalizes to concatMapAccum, without a final state +emission: + + concatMapAccumM :: (s -> a -> m (s, Stream m b)) -> m s -> Stream m a -> Stream m b + +This is essentially a nested state machine, a state machine composed of many +state machines. + +The Moore style formulation of the same would require an extract function, the +extract function is called once per input as the state advances: + + concatScanlM :: (s -> a -> m s) -> s -> (s -> Stream m b) -> Stream m a -> Stream m b + +This represents a uni-directional pipe, we can represent it using a data type +UniPipe. + +## Streaming Types + +Unfold: injectible state stream generation +Stream: opaque state stream + +Rescan: injectible state transducer +Scan: opaque state transducer + +Pipe: Combination of Rescan and Unfold + +unfoldAccum/concatMapAccum is a restricted Pipe: it has the fixed +rhythm of consuming one input and unfolding/concatenating its +sub-stream, whereas a Pipe can interleave consumption and production +arbitrarily. (Note: a concatMapAccum whose sub-streams are opaque +Streams cannot be expressed in the fused Pipe representation, because +the sub-stream's existential state cannot be merged; the fused form +corresponds to sub-streams produced by Unfold.) + +Rescanl: injectible state Moore machine (with initial state) +Scanl: opaque state Moore machine + +Refold: Rescanl with only the final output +Fold: Scanl with only the final output +Parser: A Fold with error and backtracking + +## Scan Variants + +scanl, prescanl, postscanl: +scanl, provides the initial value of the accumulator and the next value +on each input. postscanl drops the initial value. prescanl drops the +final value. + +scan is similar to postscanl in that it does not (and cannot) emit an +initial value. + +There are strict variants of these like scanl', monadic variants like scanlM', +variants where we use the first input as accumulator like scanl1' . + +## Scan and Fold Modules + +* The core scan module representing a stateful transformation or state + machine without emitting an initial or terminal value is to be named as + `Scan`. +* The `Scanl` type and module is a further specialization of the `Scan` + type which adds an emission of an initial value to the state machine. +* The Fold type is exactly the same as the `Scanl` type except that it + does not emit any intermediate values. +* Since there is no Foldr, the fold module was named Fold. Ideally it can be + called `Foldl` to mirror the `Scanl` naming. But we are not going to change + it now. + +## Where the runners live + +The scan runner for Stream lives in the Stream module, for fold in the Fold +module, for unfold in the Unfold module and so on. If we put the runners in the +scan module then we will need name disambiguation like scanStream, scanFold, +scanScan, scanUnfold etc. + +## Scan Naming in Data.Stream + +Scanl type runners: + + Stream.scanl: running a Scanl type + Stream.postscanl: running a Scanl type dropping the initial value + (equivalent to Scanl.toScan followed by Stream.scan) + +Note these are the Stream runners and remain unchanged. Separately, in the +Scanl module the combinator to scan the input of a scan (the operation +currently named `scanl`, not the runner) is to be renamed to `compose` as it +composes two scans together to create a single scan by feeding the input from +one to the other, it chains them. This is similar to how we compose the `Scan` +type as well as the `Pipe` type. It makes sense to have a special name for this +operation rather than using `scanl`. The restart-on-termination variant +`scanlMany` is renamed to `composeMany` accordingly, so that the `scanl` root +is fully freed. And `scanlMaybe` to `composeMaybe`. + +Directly running a scan using a step function + Stream.scanl' + Stream.postscanl' + ... + +The proposal is to not expose these and just suggest using the Scanl +constructors, for the following reasons, (1) there is no performance advantage, +(2) we are giving too many options to the user, (3) the names create +confusion with the Scanl runners, (4) if we have these in Data.Stream then why +not in Data.Scanl, Data.Unfold and Data.Fold as well, (5) we have Scanl +constructors with exact same names as the Data.List APIs therefore +discoverability of traditional APIs should not be a problem. + +Directly running a mapAccum using a step function + + Stream.mapAccum(M) + +The reified type for mapAccum is the `Scan` type. Similar to the above +operations we should not expose these direct operations, rather we should use +the `Scan` type to run these using the `scan` runner. The `Scan` type can have +constructors named `mapAccum` and variants of that, also constructors named +`postscanl'` and variants make sense for the `Scan` type. + +Directly running a concatMapAccum using a step function + + Stream.unfoldAccum(M) + Stream.concatMapAccum(M) + +These are stateful 1-to-n transformations. unfoldAccum is the fused version and +concatMapAccum is the unfused version because of the existential type in the +Stream. The reified type that can represent unfoldAccum is the `Pipe` type. +However, the Pipe type may be more general because it can do n-to-1 +transformations and 1-to-n as well i.e. it can fold and produce. There is no +reified type corresponding to concatMapAccum. + +Scanl constructors: + + Scanl.mkScanl + Scanl.mkScanl1 + ... + +The proposal is to change these names to "scanl'", "scanl1'" etc for the +following reasons, (1) consistency with the Fold module constructors, (2) once +we rename the `scanl` composition operation to `compose`, the `scanl` root is +free, so "scanl'" is no longer in conflict with any operation and can be used +as a constructor. + +Another alternative is to change the Fold constructor names adding the prefix +mk, reasons against doing that, (1) wider deprecation change, (2) the change is +not required otherwise because in the Fold module there is no "fold/foldl" +runner so there is no naming confusion, (3) these names maintain +discoverability of the replacements of traditional fold functions from +Data.List. + +## Scan Type + +The `Scan` type is for representing a stateful transformation or state +machine without emitting an initial or terminal value. This can be thought of +as the mapAccum type reified. + +Scan type runners: + + Stream.scan: running a Scan type + +Scan type constructors: + Scan.postscanl': constructor using Moore style step function + Scan.postscanlM': monadic version of postscanl' + Scan.mapAccum: constructor using Mealy style step function + Scan.mapAccumM: monadic version of mapAccum + +Scan type adapters: + Scanl.toScan: convert Scanl to Scan + Scanl.fromScan: convert Scan to Scanl + Fold.fromScan: convert Scan to Fold + +## Constructor Naming in Data.Scanl + +Note, currently we cannot use the name "scanl" as a constructor +as we have an operation of the same name in the same module for +scanning the input of a scan. So the "mk" prefix makes the distinction. +Also prime is not necessary in constructor names as the scan is always +strict, there is no choice of strict vs lazy. + +However, once we rename that composition operation from "scanl" to "compose" +(as proposed above), the "scanl" root is no longer used by any operation, so we +can drop the mk prefix and use the exact same names as the Data.List scan +operations. That way it will become consistent with the Fold module and the +names will be discoverable and familiar. + +## State Machines: At a Glance + +* Stream consumers, producers, and transformations can be represented + as state machines. +* A state machine transition is represented by a function. +* iterate (basic producer step): f :: s -> s +* Builder (foldr style) step (consumer or transformation): f :: a -> b -> b +* Accumulator (foldl style) step (consumer or transformation): f :: b -> a -> b +* Moore step: f :: s -> a -> s, state transformation, output implicit in state +* Mealy step: f :: s -> a -> (s, b), state transformation with explicit output + +* Moore style stream producer (iterate) + * iterate can be viewed as a degenerate Moore machine with no external input. + * Moore style producer step (iterate): f :: s -> s + * Moore style stream producer (iterate): f :: (s -> s) -> s -> Stream m s + * Endo represents the transition algebra used by iterate. + +* Moore style stream consumer (iterate) + * When input ~ state, append becomes a degenerate Moore transition. + * Moore style step (append): f :: s -> s -> s + * Moore style transformation: f :: (s -> s -> s) -> s -> Stream m s -> Stream m s + * Moore style consumer (fold): f :: (s -> s -> s) -> s -> Stream m s -> s + * Semigroup can be used to fold by additionally supplying an initial value + * Monoid can be used to fold without supplying an additional initial value (e.g. Sum) + +* Mealy style stream producer (unfold) + * unfold can be viewed as a degenerate Mealy machine with no input. + * Unfold step: f :: s -> (s, b), Mealy with no input + * Unfold: Mealy machine with no input, explicit state and termination + * unfold: f :: (s -> (s, b)) -> s -> Stream m b + * The data representation is Unfold + * A stream can be represented as a Mealy machine with no input, + using existential state and termination. + * The streamly representation is `Stream` type. + +* Moore stream transducer (scan): + * treating `s` as input in iterate style fold is restrictive + * it can only do `Stream m s -> Stream m s` transformation + * so separate the input from state + * output is derived from state after transition + * scan using the same state and output type: + * f :: (s -> a -> s) -> s -> Stream m a -> Stream m s + * scan using a separate state and output type: + * f :: (s -> a -> s) -> s -> (s -> b) -> Stream m a -> Stream m b + +* Mealy stream transducer (mapAccum): + * treating `s` as output is restrictive + * So split the state and output + * output produced by the transition itself + * mapAccum: Mealy machine with existential state and termination + * Scan step: f :: s -> a -> (s, b) + * Scan without final extract: + * f :: (s -> a -> (s, b)) -> s -> Stream m a -> Stream m b + * Scan with final extract: + * f :: (s -> a -> (s, b)) -> s -> (s -> b) -> Stream m a -> Stream m b + +* Streamly Scanl (conceptual): + * Scan with initial projection and final extraction + * f step initial project final + * f :: (s -> a -> s) -> s -> (s -> b) -> (s -> b) -> Stream m a -> Stream m b + +* Other state machine formulations: + * Moore with s ~ (s, b), split state and output + * Dropping first element and "fmap snd" gives Mealy + * f :: ((s, b) -> a -> (s, b)) -> (s, b) -> Stream m a -> Stream m (s, b) + * But we do not need to pass the output back, only the state + * f :: (s -> a -> (s, b)) -> (s, b) -> Stream m a -> Stream m (s, b) + * If we do not need the final state + * f :: (s -> a -> (s, b)) -> (s, b) -> Stream m a -> Stream m b + * If we do not need the initial output + * f :: (s -> a -> (s, b)) -> s -> Stream m a -> Stream m b + +* State machines composed of state machines (concatMapAccum) + * Moore style: + * f :: (s -> a -> m s) -> s -> (s -> Stream m b) -> Stream m a -> Stream m b + * Mealy style: + * f :: (s -> a -> (s, Stream m b)) -> s -> Stream m a -> Stream m b + +## TODO + +* Formulate the Scanl/Fold type using the Scan type. + +* Formulate the Step type as `Partial s b` so that we do not need the extract + function. We won't have to thread around the `b` anymore as we do not need to + return it in `final`. + +* Change the final function in Scanl constructor to not return output type. + This makes sense in folds but in scans a single element generation at the end + is half-hearted solution. Either we should have the ability to generate a + stream in the beginning as well as in the end or nothing. The initial as well + as final operations should be of Step type. In addition, we should be able to + specify these as unfolds which we can internally adapt to the Step type. + This will make all the drivers complicated though especially the combinators. + Maybe we can just return the singleton residual state as the last + element and then externally unfold the output of the scan into a + stream. + + For supporting something like scanlMAfter' we may need to emit the final state + during extract. We may have to return `Maybe b` in extract or `Stream m b`. + The `final` function has to work on the state such that it cannot return the + last emitted value, it will always return the values after the last + emitted value that may have to be drained. + +* scanlMAfter' can be implemented by something like modifyLast after generating + the final state value in a Moore machine. We can also generate a stream in + the end using unfoldLast. + +* Monadic inject in Unfold forces a separate init state when converting to a + stream. We can remove the monadic inject or create a pure type or ensure that + the additional state fuses in the most efficient way. diff --git a/test/Streamly/Test/Data/Stream/Serial/Common.hs b/test/Streamly/Test/Data/Stream/Serial/Common.hs index 9e5e80d071..06f7585cbf 100644 --- a/test/Streamly/Test/Data/Stream/Serial/Common.hs +++ b/test/Streamly/Test/Data/Stream/Serial/Common.hs @@ -992,6 +992,19 @@ transformCombineOpsCommon constr desc eq = do (S.scanlM' (\_ a -> return a) (return 0)) prop (desc <> " postscanlM'") $ transform (tail . scanl' (const id) 0) (S.postscanlM' (\_ a -> return a) (return 0)) + prop (desc <> " mapAccumM running sum") $ + transform (tail . scanl' (+) 0) + (S.mapAccumM (\s a -> let s1 = s + a in return (s1, s1)) + (return 0)) + prop (desc <> " mapAccumM identity (== postscanlM')") $ + transform (tail . scanl' (const id) 0) + (S.mapAccumM (\_ a -> return (a, a)) (return 0)) + let modifyLastList _ [] = [] + modifyLastList f xs = init xs ++ [f (last xs)] + prop (desc <> " modifyLast negate") $ + transform (modifyLastList negate) (S.modifyLast negate) + prop (desc <> " modifyLast id") $ + transform (modifyLastList id) (S.modifyLast id) prop (desc <> " scanl1'") $ transform (scanl1 (const id)) (S.scanl1' (const id)) prop (desc <> " scanl1M'") $ transform (scanl1 (const id)) diff --git a/test/Streamly/Test/Data/Stream/Type.hs b/test/Streamly/Test/Data/Stream/Type.hs index b1a629dffb..84ab0f5662 100644 --- a/test/Streamly/Test/Data/Stream/Type.hs +++ b/test/Streamly/Test/Data/Stream/Type.hs @@ -39,6 +39,148 @@ testgroupsOf = (Stream.groupsOf 2 Fold.sum (Stream.enumerateFromTo 1 10)) `shouldReturn` [3::Int, 7, 11, 15, 19] +testAppendUnfoldLastNonEmpty :: Expectation +testAppendUnfoldLastNonEmpty = + Stream.toList + (Stream.appendUnfoldLast trailer (Stream.fromList [1, 2, 3 :: Int])) + `shouldReturn` [1, 2, 3, 30, 300] + + where + + trailer = Unfold.lmap (maybe [-1] (\x -> [x * 10, x * 100])) Unfold.fromList + +testAppendUnfoldLastEmpty :: Expectation +testAppendUnfoldLastEmpty = + Stream.toList + (Stream.appendUnfoldLast trailer (Stream.fromList ([] :: [Int]))) + `shouldReturn` [-1] + + where + + trailer = Unfold.lmap (maybe [-1] (\x -> [x * 10, x * 100])) Unfold.fromList + +testAppendMapLastNonEmpty :: Expectation +testAppendMapLastNonEmpty = + Stream.toList + (Stream.appendMapLast trailer (Stream.fromList [1, 2, 3 :: Int])) + `shouldReturn` [1, 2, 3, 30, 300] + + where + + trailer = + maybe (Stream.fromList [-1]) (\x -> Stream.fromList [x * 10, x * 100]) + +testAppendMapLastEmpty :: Expectation +testAppendMapLastEmpty = + Stream.toList + (Stream.appendMapLast trailer (Stream.fromList ([] :: [Int]))) + `shouldReturn` [-1] + + where + + trailer = + maybe (Stream.fromList [-1]) (\x -> Stream.fromList [x * 10, x * 100]) + +testUnfoldLastNonEmpty :: Expectation +testUnfoldLastNonEmpty = + Stream.toList + (Stream.unfoldLast trailer (Stream.fromList [1, 2, 3 :: Int])) + `shouldReturn` [1, 2, 30, 300] + + where + + trailer = Unfold.lmap (foldMap (\x -> [x * 10, x * 100])) Unfold.fromList + +testUnfoldLastEmpty :: Expectation +testUnfoldLastEmpty = + Stream.toList + (Stream.unfoldLast trailer (Stream.fromList ([] :: [Int]))) + `shouldReturn` [] + + where + + trailer = Unfold.lmap (foldMap (\x -> [x * 10, x * 100])) Unfold.fromList + +testConcatMapLastNonEmpty :: Expectation +testConcatMapLastNonEmpty = + Stream.toList + (Stream.concatMapLast trailer (Stream.fromList [1, 2, 3 :: Int])) + `shouldReturn` [1, 2, 30, 300] + + where + + trailer = + maybe (Stream.fromList []) (\x -> Stream.fromList [x * 10, x * 100]) + +testConcatMapLastEmpty :: Expectation +testConcatMapLastEmpty = + Stream.toList + (Stream.concatMapLast trailer (Stream.fromList ([] :: [Int]))) + `shouldReturn` [] + + where + + trailer = + maybe (Stream.fromList []) (\x -> Stream.fromList [x * 10, x * 100]) + +testAppendIfEmptyNonEmpty :: Expectation +testAppendIfEmptyNonEmpty = + Stream.toList + (Stream.appendIfEmpty + (Stream.fromList [1, 2 :: Int]) + (Stream.fromList [3, 4])) + `shouldReturn` [1, 2] + +testAppendIfEmptyEmpty :: Expectation +testAppendIfEmptyEmpty = + Stream.toList + (Stream.appendIfEmpty + (Stream.fromList ([] :: [Int])) + (Stream.fromList [3, 4])) + `shouldReturn` [3, 4] + +testUnfoldFirstNonEmpty :: Expectation +testUnfoldFirstNonEmpty = + Stream.toList + (Stream.unfoldFirst header (Stream.fromList [1, 2, 3 :: Int])) + `shouldReturn` [10, 100, 2, 3] + + where + + header = Unfold.lmap (foldMap (\x -> [x * 10, x * 100])) Unfold.fromList + +testUnfoldFirstEmpty :: Expectation +testUnfoldFirstEmpty = + Stream.toList + (Stream.unfoldFirst header (Stream.fromList ([] :: [Int]))) + `shouldReturn` [] + + where + + header = Unfold.lmap (foldMap (\x -> [x * 10, x * 100])) Unfold.fromList + +testConcatMapFirstNonEmpty :: Expectation +testConcatMapFirstNonEmpty = + Stream.toList + (Stream.concatMapFirst header (Stream.fromList [1, 2, 3 :: Int])) + `shouldReturn` [10, 100, 2, 3] + + where + + header = + maybe (Stream.fromList []) (\x -> Stream.fromList [x * 10, x * 100]) + +testConcatMapFirstEmpty :: Expectation +testConcatMapFirstEmpty = + Stream.toList + (Stream.concatMapFirst header (Stream.fromList ([] :: [Int]))) + `shouldReturn` [] + + where + + header = + maybe (Stream.fromList []) (\x -> Stream.fromList [x * 10, x * 100]) + moduleName :: String moduleName = "Data.Stream" @@ -155,3 +297,31 @@ main = hspec describe "Tests for Stream.groupsOf" $ do prop "testgroupsOf" testgroupsOf + + describe "Tests for Stream.appendUnfoldLast" $ do + prop "testAppendUnfoldLastNonEmpty" testAppendUnfoldLastNonEmpty + prop "testAppendUnfoldLastEmpty" testAppendUnfoldLastEmpty + + describe "Tests for Stream.appendMapLast" $ do + prop "testAppendMapLastNonEmpty" testAppendMapLastNonEmpty + prop "testAppendMapLastEmpty" testAppendMapLastEmpty + + describe "Tests for Stream.unfoldLast" $ do + prop "testUnfoldLastNonEmpty" testUnfoldLastNonEmpty + prop "testUnfoldLastEmpty" testUnfoldLastEmpty + + describe "Tests for Stream.concatMapLast" $ do + prop "testConcatMapLastNonEmpty" testConcatMapLastNonEmpty + prop "testConcatMapLastEmpty" testConcatMapLastEmpty + + describe "Tests for Stream.appendIfEmpty" $ do + prop "testAppendIfEmptyNonEmpty" testAppendIfEmptyNonEmpty + prop "testAppendIfEmptyEmpty" testAppendIfEmptyEmpty + + describe "Tests for Stream.unfoldFirst" $ do + prop "testUnfoldFirstNonEmpty" testUnfoldFirstNonEmpty + prop "testUnfoldFirstEmpty" testUnfoldFirstEmpty + + describe "Tests for Stream.concatMapFirst" $ do + prop "testConcatMapFirstNonEmpty" testConcatMapFirstNonEmpty + prop "testConcatMapFirstEmpty" testConcatMapFirstEmpty diff --git a/test/Streamly/Test/Data/Unfold.hs b/test/Streamly/Test/Data/Unfold.hs index ac5d60e813..fd7bdf7ae9 100644 --- a/test/Streamly/Test/Data/Unfold.hs +++ b/test/Streamly/Test/Data/Unfold.hs @@ -100,6 +100,16 @@ swap = let unf = UF.swap (UF.function id) in testUnfold unf ((1, 2) :: (Int, Int)) [(2, 1)] +consInput :: Bool +consInput = + let unf = UF.consInput (UF.function (* 2)) + in testUnfold unf (3 :: Int) [3, 6] + +consInputWith :: Bool +consInputWith = + let unf = UF.consInputWith length UF.fromList + in testUnfold unf ([1, 2, 3] :: [Int]) [3, 1, 2, 3] + ------------------------------------------------------------------------------- -- Stream generation ------------------------------------------------------------------------------- @@ -590,6 +600,8 @@ testInputOps = prop "discardFirst" discardFirst prop "discardSecond" discardSecond prop "swap" swap + prop "consInput" consInput + prop "consInputWith" consInputWith testGeneration :: Spec testGeneration =