Giter Club home page Giter Club logo

streamly's Introduction

Streamly: Idiomatic Haskell with C-Like Performance

Gitter chat Hackage Hackage

Upgrading to 0.9.0+

Please read the Streamly 0.9.0 Upgrade Guide.

Overview

Streamly is a powerful Haskell library that provides developers with the essential building blocks to create safe, scalable, modular, and high-performance software. With Streamly, developers can enjoy the benefits of Haskell's type safety while leveraging C-like program performance. Streamly offers a comprehensive range of features, comprising:

  • Haskell's strong type safety.
  • C-program-like performance capabilities.
  • Flexible, modular building blocks.
  • Idiomatic functional programming.
  • Fearless, declarative concurrency for seamless parallel execution.
  • A collection of ecosystem libraries for fast and efficient development.

Check out the Streamly Getting Started Guide and Quick Overview for an introduction to the library. For more detailed documentation, visit the Haskell Streamly website.

Blazing Fast

Streamly delivers C-like speed in Haskell by fusing stream pipelines using the stream-fusion technique, resulting in compiled code that is equivalent to handwritten C code, eliminating intermediate allocations and function calls.

For a comprehensive comparison of Streamly to other Haskell streaming libraries, check out our streaming benchmarks page. In fact, Streamly's fused loops can be up to 100 times faster than those of libraries without stream fusion.

Declarative Concurrency

Streamly introduces declarative concurrency to standard functional streaming abstractions. Declarative concurrency abstracts away the low-level details of concurrency management, such as locks and threads, and allows for easier and safer parallelization of code. For example, with Streamly you can do things like repeat actions concurrently to generate a stream of results, map functions concurrently on a stream, and combine multiple streams concurrently to create a single output stream.

Unified API

Streamly provides a comprehensive and unified API for basic programming needs, covering a wide range of areas including streaming, concurrency, logic programming, reactive programming, pinned and unpinned arrays, serialization, builders, parsers, unicode processing, file-io, file system events, and network-io. By unifying functionality from disparate Haskell libraries, Streamly simplifies development while delivering equivalent or improved performance. Additionally, the complexity of handling combinations of lazy, strict, bytestring, and text is eliminated by using streams for lazy evaluation, and by generalizing bytestring and text to arrays.

Check out Streamly's documentation for more information about Streamly's features.

Batteries Included

In addition to the fundamental programming constructs, Streamly also provides higher-level functionality through supporting packages such as streamly-process, streamly-shell, and streamly-coreutils that are essential for general programming tasks. Check out the streamly-examples repository for some program snippets.

Highly Modular

Traditionally, you must choose between modularity and performance when writing code. However, with Haskell Streamly, you can have the best of both worlds. By taking advantage of GHC's stream fusion optimizations (such as case-of-case and spec-constr), Streamly achieves performance comparable to an equivalent C program while still allowing for highly modular code.

Credits

The following authors/libraries have influenced or inspired this library in a significant way:

Please see the credits directory for a full list of contributors, credits and licenses.

Licensing

Streamly is an open source project available under a liberal BSD-3-Clause license

Contributing to Streamly

As an open project we welcome contributions:

Getting Support

Professional support is available for Streamly: please contact [email protected].

You can also join our community chat channel on Gitter.

streamly's People

Contributors

abhiroop avatar adithyaov avatar aravindgopall avatar bodigrim avatar bwignall avatar fgaz avatar georgefst avatar harendra-kumar avatar hasufell avatar hezhenxing avatar hussein-aitlahcen avatar jkoshy avatar kirelagin avatar lucianu avatar luke-clifton avatar mgsloan avatar mryndzionek avatar nhenin avatar pranaysashank avatar psibi avatar rnjtranjan avatar sanchayanmaity avatar shlok avatar specdrake avatar timbuckley avatar treeowl avatar twitu avatar tymefighter avatar wildcat26 avatar wygulmage avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

streamly's Issues

Relationship to other streaming I/O frameworks?

I would like to encourage you to further explaining how this library came to be. Streaming I/O has been a well explored space in Haskell (in response to a very knotty problem) with iteratees and enumerator originally followed by conduit (which solved one set of problems to do with finalizers and space leaks) pipes (which was built by construction to solve others and also derived in a categorical way) and io-streams (which concentrated on IO and performance for webservers). All three converged on similar internal representations. There have been a few fringe contributions since then, quiver and streaming, both of which looking at the Functor side of things rather than the Monad [Transformer] one. Finally there are machines and transient which are special kinds of crazy.

Given this rich history, it's natural for someone encountering this for the first time to wonder where streamly fits in. It seems you got here in a somewhat ad-hoc fashion but clearly you know what you're about and have put an extraordinary amount of work into this library and benchmarking.

You described some of your motivations in #2 (comment); I would encourage you to lift that up to at a paragraph, perhaps at the conclusion of your top-level README.

I think it's incredibly cool that you have fairly transparently muxed threaded concurrency and streaming I/O. I look forward to trying this!

AfC

RFC - Module organization

Currently we have Streamly as the top level module. I am considering moving it to Data.Streamly to be consistent with other such libraries e.g. Data.List, Data.Machines, Data.Conduit. On the other hand, libraries like Streaming and Pipes use a top level name convention and Streamly currently follows that. However, the former seems more prevalent and perhaps a little bit more organized.

Any comments/feedback on this is welcome.

Alternative instances

With #60 we now do not have Alternative instances for streams. This issue a placeholder for adding meaningful alternative instances for all streams types. I can think of two ways in which alternative instances can work, (1) choose one of the streams as whole e.g. a <|> b <|> c would choose the firs non-empty stream, (2) choose one element from the heads of the streams and yield it and then choose one element from the next elements in the streams, and build the resulting stream like that.

I think the second option makes more sense and is the dual of zip, so that might be the one we want.

Merge the stop continuation with yield continuation

Currently our type looks like this:

newtype Stream m a =
    Stream {
        runStream :: forall r.
               Maybe (SVar m a)               -- local state
            -> m r                               -- stop
            -> (a -> Maybe (Stream m a) -> m r)  -- yield
            -> m r
    }

I have been thinking of merging the stop case with Maybe. That way we do not have to call a continuation to check the empty case. I am hoping this will result in better performance. Like this:

data Step m a = Stop | Step a (Stream m a)

newtype Stream m a =
    Stream {
        runStream :: forall r.
               Maybe (SVar m a)   -- local state
            -> (Step m a -> m r)  -- yield
            -> m r
    }

Further, we can represent singleton streams separately as there is no continuation in
that case. This can perhaps optimize some cases but also leads to confusion and
source of bugs because a stream can end either with a Stop or Singleton.

data Step m a = Stop | Singleton a | Step a (Stream m a)

newtype Stream m a =
    Stream {
        runStream :: forall r.
               Maybe (SVar m a)   -- local state
            -> (Step m a -> m r)  -- yield
            -> m r
    }

The singleton value can optimize the case when the type is not a stream it is
just a single value. For example when StreamT monad is used just as an IO monad (single values) instead of a stream in IO monad. However, we do have the extra cost of one indirection because we have identify which constructor by constructor tag, but I guess it should be cheap.

Add property tests for streams

Currently we have unit tests where we make different cases of streams and compositions manually and test them. It may be a good idea to do this automatically using QuickCheck Arbitrary instances instead. It will be less manual labor and more testing.

Test exception propagation

When a child task throws an exception, the exception is propagated to the parent. However this exception handling is not tested. We need to add tests to make sure that this is working as expected. This can be easily tested by having a task in a a <|> b composition throw an exception and catch that exception in the parent. We need to test this in both an Alternative composition using <|> and in a Monadic composition using ParallelT and/or AsyncT.

In terms of code, the handleException and handleChildException functions need to be covered by these tests.

Use hindent to automate formatting as much as we can

@Abhiroop I tried hindent --style johan-tibell --line-length 79 --indent-size 4 --sort-imports on Prelude.hs and the result is not much different from the existing style so I have no problem with whatever changes it suggests except the bug.

Bug

There is an unfixed bug in it that causes haddock to fail. Essentially it changes:

module Streamly.Prelude
    (
    -- * Construction
      nil
    , cons

To

module Streamly.Prelude
    -- * Construction
    ( nil
    , cons

Note the construction heading comment goes out which haddock does not like.

Stylistic differences

Other changes that differ from the current style, I can forget about the minor stuff in favor of full automation of formatting:

  • it removes several blank lines that usually keep to have a better visible separation between comment sections and in the export list. I guess I can live with that.
  • where has a two space indentation, which is fine I like that because where is more visible with this without loss of indentation.
  • It changes single line signatures to multiline like this:
takeWhile :: Streaming t => (a -> Bool) -> t m a -> t m a

becomes

takeWhile
    :: Streaming t
    => (a -> Bool) -> t m a -> t m a

This is less compact but maybe a bit more readable (subjective), so accepted.

  • Some manually aligned code gets unaligned e.g. this code was aligned on equal sign:
        yield a Nothing  = return (Just (a, nil))
        yield a (Just x) = return (Just (a, fromStream x))

It becomes:

        yield a Nothing = return (Just (a, nil))
        yield a (Just x) = return (Just (a, fromStream x))
  • guards that were on a single line go to the next line:
        let yield a Nothing  | p a       = yld a Nothing
                             | otherwise = stp
            yield a (Just x) | p a       = yld a (Just (go x))
                             | otherwise = stp

becomes

            let yield a Nothing
                    | p a = yld a Nothing
                    | otherwise = stp
                yield a (Just x)
                    | p a = yld a (Just (go x))
                    | otherwise = stp

Not a big deal I guess.

  • It does not keep DoAndIfThenElse style if-then-else:
        in if n1 <= 0
           then (runStream m1) ctx stp yld
           else (runStream m1) ctx stp yield

became

            in if n1 <= 0
                   then (runStream m1) ctx stp yld
                   else (runStream m1) ctx stp yield

I like the first style better.

  • I tend to use if-then-else in a single line where it is possible:
            yield a (Just x) = if a == e then return True else go x

became

            yield a (Just x) =
                if a == e
                    then return True
                    else go x

I am ok with if it can at least follow the DoAndIfThenElse style.

  • I use a let/in like this, where the ends of the keywords align not the beginning:
        let yield a Nothing  = return $ done a
         in (runStream m1) Nothing undefined yield

becomes

        let yield a Nothing = return $ done a
        in (runStream m1) Nothing undefined yield

Minor thing.

Conclusion

Other than the bug and the DoAndIfThenElse I am ok with the other changes. There is already an issue raised on hindent for the bug, maybe we can raise an issue for supporting the DoAndIfThenElse style.

@Abhiroop if you want to have a go at it you can submit a PR. If we change the style then it will be nice if we change all files to match the same style, it can be one at a time though.

Request for comments: API naming and signatures

Compatibility with base/Data.List

Streamly streams are very similar to the standard Haskell lists in nature and in signatures. The only difference is that streamly streams are monadic. If one knows a list function he/she should know that it works exactly the same way in streamly. In other words, streamly streams are just better and generalized lists that come in the form of monadic streams with safe concurrent composition.

For consistency, we strive to keep all streamly functions on the lines of base:Data.List functions as long as it makes sense and is possible.

Differences with Lists

Following are some differences that we have or may deliberately concede compared to base/Data.List.

The main difference is due to the fact that streamly streams are monadic while lists are pure.

  • streamly streams are not instances of Foldable and Traversable though they provide equivalent functionality without the type classes.
  • Fold functions have slightly different but similar signatures
  • Monadic traversal functions like mapM and sequence have slightly different but similar signatures
  • We also have monadic versions of APIs (APIs that take a monadic function instead of a pure one) where ever it makes sense. These APIs are generally suffixed with an M.

The other key difference is due to partial functions:

  • We may not have partial functions like tail, head instead we may have safe versions of those having the same name but a different signature returning a Maybe.

Compatibility with foldl/Control.Foldl

We would also like the streamly fold like functions that work with the foldl library. For that we need fold functions with an extraction argument and we should name them differently from the standard ones. We can perhaps reuse the name fold (as streamly does not have a Foldable instance) for such a composable fold function. Similarly the name scan can be used for a composable scan.

Changes required

Let's fix if anything is currently deviating from these conventions. I can see the following:

  • foldl to be renamed to fold and we can have list compatible foldl written in terms of fold. We do not have a Foldable instance so this name will conflict with that.
  • similarly foldlM to be renamed to foldM

Pass a state through the continuations in all cases

Currently we pass the SVar state only when needed. SVar is a specific state that we need in specific cases. However we need a generalized state passing for configuration type combinators to work. The SVar state could be a subset of the general state that is always passed.

We may need combinators like the following, that can be implemented using a general state passing:

  • trace - to trace the threads created and deleted by the stream
  • buffer - to specify the size of the buffer to be used on producer side
  • threads - to specify a limit of the worker threads on the producer side

Other than implementing the combinators using the internal state passing, as an experimental thought, the same state passing functionality can be used to implement a state monad functionality in streamly itself. Even though a StateT can be used, it may be more efficient and beginner friendly to have integrated get/put.

Race vs parallel composition

Currently we use the Alternative <|> operator to compose streams in parallel. This composition runs all the streams in parallel and combines all of them in a single stream. The dual scenario is to race the streams in parallel, take the first result and cancel the rest. This is how <|> behaves in the async package for example. We can use two separate operators for these two behaviors. One proposal is to use <|> for racing and <||> for parallel composition, the vertical parallel bars meaning parallel, though it is a bit longer operator with four chars.

Operations involving multiple layers of containers

A typical example of multi-layer operations is concat with similar semantics as the list concat operation here. Streaming/Pipes/Machines concat a Foldable whereas Conduit concats a MonoFoldable which includes monomorphic containers as well. I am not clear about the pros and cons of both except for the high level fact that MonoFoldable includes more containers than Foldable. For example MonoFoldable includes Text and ByteString as well.

Any feedback on this is welcome.

Operations from base/Data.List

Involving two layers of containers, we need to decide on how to represent layers (Foldable/MonoFoldable/Streams) in general before attempting these:

Generating layers:

  • subsequences
  • permutations
  • inits
  • tails

Splitting:

  • splitAt
  • span
  • break
  • partition

Grouping:

Transformation with layers:

  • intercalate
  • transpose

Eliminating layers:

  • concat
  • concatMap

Special case of Strings:

  • lines
  • words
  • unlines
  • unwords

Unzipping

  • unzip

Question about consuming infinite monadic streams

Given a function getInfiniteStream which produces an infinite list of monadic actions (say HTTP calls), I'm trying to consume first n elements of this list like this:

S.take 10 $ (asyncly getInfiniteStream)

I was hoping that the producer would only create 10 items in the stream because that's all the consumer needs, but what I'm seeing is that many items are produced and first 10 that come back are consumed by the consumer (ie. many more than 10 HTTP calls are made).

My end goal is this concept of an infinite stream that can be consumed with S.take, S.takeWhile, S.filter... and combinations of those, but the underlying stream to be produced non-serially.

Is there a way to do this with Streamly?

rename "forEachWith" to "foldForWith"

We have three convenience APIs to fold Foldable containers converting them different types of streams, they are foldWith, foldMapWith and forEachWith. The name of the third one is not consistent with the others, foldForWith will be a more consistent name to reflect the map/for convention widely used in Haskell.

A type for execute ahead but consume in order

We can have a hybrid of SerialT and CoparallelT type called AheadT. The behavior of CoparallelT is to execute a batch of actions at the head of the stream in parallel. The results of these actions may be consumed out of order. The behavior of AheadT would be to execute a batch of actions at the head of the stream in parallel (like CoparallelT) but consume the results in the same order (like SerialT). This means we may execute ahead but maintain the serial order. This is like the concept of readahead or we may call it IOahead. The type combinator for this could be ahead.

Add a dual of StreamT

The StreamT type composes first element of the outer loop with all the elements in the inner loop and then goes ahead with the second element of the outer loop. There is a dual possible where we can compose the first element of the inner loop with all the elements of the outer loop and then do the same with the second element of inner loop.

x <- [1,2]
y <- [3,4]
return (x,y)

StreamT produces (1,3) (1,4) (2,3) (2,4) whereas the dual will produce (1,3) (2,3) (1,4) (2,4).

This has the effect of inverting the order of the loops without changing the code. In many cases the efficiency of the program may matter depending on the order of the loops. The dual of StreamT can help in inverting the order by just changing the type just like all other streamly stream types work.

What do we call the type? CoStreamT, InverseT, InverseStreamT? The combinator for this could be inversely?

Another possibility could be to have a combinator that just flips the current order?

Please add `fmapMaybe`

Since streams can be filtered they should have an instance of Control.Compactable or at the very least fmapMaybe :: (IsStream t, Monad m) => (a -> Maybe b) -> t m a -> t m b
This would eliminate the following anti-pattern appearing in code:

foo = fmap fromJust . S.filter isJust

Operators for Applicative zip and async applicative zip of streams

Define two new operators, one for applicative zipping of streams irrespective of the type of the stream and another one for async zipping of streams irrespective of the type of the stream. The proposed operators are:

  • >< zip applicative
  • >|< async zip applicative

missing import, extensions in examples of readme.md

I get an error when running the loops examples in the readme file.

Parse error: $
Perhaps you intended to use TemplateHaskell

I have not yet found the time to look at TemplateHaskell... Any hint welcome.

For the "sum of the square roots" example, I get "ambiguous type variable 'm0' arising from a do statement", in the line with the forEachWith. Am I missing an extension ?

I could get the other examples to work. They look great ! Many thanks for this package!

Tackle ambiguous type variable type errors more gracefully

One of the common mistakes when using streamly is to use all generic code and miss specifying a specific type for the stream. For example:

import Streamly
import Streamly.Prelude

loops = do
    x <- each [1,2]
    y <- each [3,4]
    liftIO $ putStrLn $ show (x, y)

main = runStreaming $ loops

The code above will generate a type error. We should use 'serially' or 'asyncly' or any other such combinator to specify the type. Or we should specify a specific type in the signature of loops. Like this:

main = runStreaming $ serially $ loops

With this little omission, GHC generates a multipage scary type error like this:

xx.hs:5:5: error:
    • Ambiguous type variable ‘t0’ arising from a do statement
      prevents the constraint ‘(Monad (t0 IO))’ from being solved.
      Relevant bindings include loops :: t0 IO () (bound at xx.hs:4:1)
      Probable fix: use a type annotation to specify what ‘t0’ should be.
      These potential instances exist:
        instance MonadAsync m => Monad (AsyncT m)
          -- Defined in ‘streamly-0.1.1:Streamly.Streams’
        instance Monad m => Monad (InterleavedT m)
          -- Defined in ‘streamly-0.1.1:Streamly.Streams’
        instance MonadAsync m => Monad (ParallelT m)
          -- Defined in ‘streamly-0.1.1:Streamly.Streams’
        ...plus one other
        ...plus two instances involving out-of-scope types
        (use -fprint-potential-instances to see them all)
    • In a stmt of a 'do' block: x <- each [1, 2]
      In the expression:
        do x <- each [1, 2]
           y <- each [3, 4]
           liftIO $ putStrLn $ show (x, y)
      In an equation for ‘loops’:
          loops
            = do x <- each [1, ....]
                 y <- each [3, ....]
                 liftIO $ putStrLn $ show (x, y)
  |
5 |     x <- each [1,2]
  |     ^^^^^^^^^^^^^^^

xx.hs:5:10: error:
    • Ambiguous type variable ‘t0’ arising from a use of ‘each’
      prevents the constraint ‘(Streaming t0)’ from being solved.
      Relevant bindings include loops :: t0 IO () (bound at xx.hs:4:1)
      Probable fix: use a type annotation to specify what ‘t0’ should be.
      These potential instances exist:
        instance Streaming AsyncT
          -- Defined in ‘streamly-0.1.1:Streamly.Streams’
        instance Streaming InterleavedT
          -- Defined in ‘streamly-0.1.1:Streamly.Streams’
        instance Streaming ParallelT
          -- Defined in ‘streamly-0.1.1:Streamly.Streams’
        ...plus three others
        (use -fprint-potential-instances to see them all)
    • In a stmt of a 'do' block: x <- each [1, 2]
      In the expression:
        do x <- each [1, 2]
           y <- each [3, 4]
           liftIO $ putStrLn $ show (x, y)
      In an equation for ‘loops’:
          loops
            = do x <- each [1, ....]
                 y <- each [3, ....]
                 liftIO $ putStrLn $ show (x, y)
  |
5 |     x <- each [1,2]
  |          ^^^^^^^^^^

xx.hs:7:5: error:
    • Ambiguous type variable ‘t0’ arising from a use of ‘liftIO’
      prevents the constraint ‘(MonadIO (t0 IO))’ from being solved.
      Relevant bindings include loops :: t0 IO () (bound at xx.hs:4:1)
      Probable fix: use a type annotation to specify what ‘t0’ should be.
      These potential instances exist:
        instance MonadAsync m => MonadIO (AsyncT m)
          -- Defined in ‘streamly-0.1.1:Streamly.Streams’
        instance MonadIO m => MonadIO (InterleavedT m)
          -- Defined in ‘streamly-0.1.1:Streamly.Streams’
        instance MonadAsync m => MonadIO (ParallelT m)
          -- Defined in ‘streamly-0.1.1:Streamly.Streams’
        ...plus one other
        ...plus two instances involving out-of-scope types
        (use -fprint-potential-instances to see them all)
    • In a stmt of a 'do' block: liftIO $ putStrLn $ show (x, y)
      In the expression:
        do x <- each [1, 2]
           y <- each [3, 4]
           liftIO $ putStrLn $ show (x, y)
      In an equation for ‘loops’:
          loops
            = do x <- each [1, ....]
                 y <- each [3, ....]
                 liftIO $ putStrLn $ show (x, y)
  |
7 |     liftIO $ putStrLn $ show (x, y)
  |     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

xx.hs:10:8: error:
    • Ambiguous type variable ‘t0’ arising from a use of ‘runStreaming’
      prevents the constraint ‘(Streaming t0)’ from being solved.
      Probable fix: use a type annotation to specify what ‘t0’ should be.
      These potential instances exist:
        instance Streaming AsyncT
          -- Defined in ‘streamly-0.1.1:Streamly.Streams’
        instance Streaming InterleavedT
          -- Defined in ‘streamly-0.1.1:Streamly.Streams’
        instance Streaming ParallelT
          -- Defined in ‘streamly-0.1.1:Streamly.Streams’
        ...plus three others
        (use -fprint-potential-instances to see them all)
    • In the expression: runStreaming $ loops
      In an equation for ‘main’: main = runStreaming $ loops
   |
10 | main = runStreaming $ loops
   |        ^^^^^^^^^^^^^^^^^^^^

I am wondering if it is possible to teach GHC to either produce a better customized type error in this case? Or even better, if we can teach it to produce a warning and use a default type somehow?

Support for running parts of a stream in a different monad

Conduit addresses this problem with dedicated functions for some common monads. Currently the only alternative is to use hoist, but as described here this is broken for stateful monads (AIUI, it's equivalent to transPipe).

My use case is that I wanted to add a state monad to process part of the stream without leaking its implementation (the type of the state) to the rest of the transformer stack.

Change the return types to stay in the stream monad

We have kept the types of fold and fold like functions to return the output in the underlying monad of the stream. These are all singleton values. I am proposing to change these types to return a singleton stream in the stream monad instead of returning in the underlying monad. The reason is that these operations will usually be used in the stream monad and composed with other stream operations. Returning in the underlying monad means that we will have to immediately lift the value to the stream so that we can use it. This means have gained nothing by returning to the underlying monad and we have lost simplicity of use because we have to use lift. We would not like to use lift at all wherever possible. In case we want to convert a value back to the underlying monad we can use the single exit to monad i.e. the toList function.

I also looked at how other streaming libraries are doing it. conduit and machines do not exit to the underlying monad for these operations whereas pipes and streaming exit to the monad. I prefer staying in the stream monad for the reasons cited earlier.

Missing prelude operations

Some of the base/Data.List operations that are missing and we can add to Streamly.

These operations are fairly easy or in some cases trivial or mechanical to implement. They can be modeled on the implementations of similar existing operations. I will be happy to help in bootstrapping if someone wants to pick up some of these.

basic:

  • tail
  • init
  • null

Infinite streams:

  • iterate
  • repeat
  • replicate
  • cycle

Lookups:

  • lookup
  • find

Indices:

  • (!!)
  • elemIndex
  • elemIndices
  • findIndex
  • findIndices

Folds:

  • foldl1
  • foldr1
  • and
  • or
  • minimumBy
  • maximumBy

Transformation:

  • scanl
  • scanl1
  • scanr
  • scanr1

Filtering:

  • delete/deleteBy

Inserting/deleting:

  • intersperse
  • insert/insertBy

Reordering (finite streams):

  • reverse
  • sort/sortOn/sortBy
  • sortWith from GHC.Exts

SubStreams (Zip and fold):

  • isPrefixOf
  • isInfixOf
  • isSuffixOf
  • isSubsequenceOf
  • stripPrefix

Map and Fold:

  • mapAccumL
  • mapAccumR

Set operations - Note that these operations cannot work on infinite streams. Though it may be possible to define them on sorted infinite streams.

  • the from GHC.Exts
  • nub* - See here
  • (\\)
  • union/unionBy
  • intersect/intersectBy

Sum style composition via types or via explicit operators?

Currently we use types (StreamT, ParallelT etc.) to determine how streams compose in the product composition. However, for sum style composition we use explicit operators i.e. <>, <=>, <| and <|> to combine streams. They combine streams in operator specific manner irrespective of the type (i.e. StreamT, ParallelT ect.). The MonadPlus instance for all types is the same, it combines the streams in parallel using <|>.

This issue is about considering to change the MonadPlus instance and make it specific to the type so that we have a type specific way of combining the streams in a sum style. It will allow us to use msum on foldables to turn them into streams based on the type. We can also choose an operator for mplus e.g. <.> or <+> consistent with other operators but combining in a type specific way. The type specific operator will allow us to just change the type annotation to change the behavior. Whereas the type independent operators fix the behavior. Both of these may be necessary for different use cases.

Since this is a breaking change, better to do this earlier than later if we decide to do it.

Equivalent code producing different output

I have found that two equivalent codes are producing different output.

  • the first flow will produce [(1, 1), (2, 2), (3, 3)]
  • while the second one will produce a single tuple [(1, 1)]

Example:

import           Control.Monad    (join)
import           Streamly
import qualified Streamly.Prelude as S

main = mapM_ print $ join (S.toList . zipping $ flow)
  where
    flow = (,) <$> stream <*> stream
    -- flow = pure (,) <*> stream <*> stream
    stream = S.each [1, 2, 3]

rename "each" to "fromFoldable"

Our naming convention for APIs to convert to or from other types is to use toType and fromType. Let's make this as well conform to that. The name each is more like related to non-determinism rather than conversion type.

Cancel any pending threads when we do not drain the whole stream

For example when we use:

take 5 $ each [1..10] <|> each [1..10] 

Once we are done extracting the first 5 elements from the stream we need to make sure that there are no threads running. There could be multiple ways to implement it:

  • before we call the stop continuation in take, cancel all waiting threads. This will require API specific handling and will have to be done in all the APIs.
  • when an SVAR is garbage collected we cancel any threads waiting on the SVAR? This should take care of all cases in general. However, it may take a while for the garbage collector to get to it, until then the threads may be busy working and generating work that we may not need. Though the work will be limited by the amount of buffer we have on the SVAR.

What is the right way? or we may implement both. The second will guarantee that it is always done. The first will make it prompt and even if we miss doing that in some API the second will catch it, it will at most be a promptness problem. Are there any correctness/semantics issues with any of these?

Put a bound on the buffer between producer and consumer

When using a parallel style composition we use a buffer between producer threads and the consumer thread. The code was designed such that we have a bounded buffer and the producers block when the buffer gets full so that the producers adapt the production rate based on the consumption rate of the consumer. However, we have not yet put that bound on the queue. It is just a matter of adding it, all the code is present to enable that.

MonadBaseControl instances for stream types

The underlying monad of a parallel stream needs a MonadBaseControl instance for forking a thread. This is find when we have IO or any other monad as the underlying monad, but since none of the stream types are MonadBaseControl instances they cannot be used as the underlying monad for a stream Monad.

This issue is for exploring how to make the streams instances of MonadBaseControl or maybe MonadUnliftIO. We should revisit this only after we are done with #29.

logict example ?

The first page of Streamly's documentation says "streamly subsumes the functionality of (..) the logic programming library logict." It's not clear how. Could you provide some hints ? Could you add an example the example directory ?

Add performance benchmarking regression auto-detection in CI

There is no point of measuring unless we take action based on the results of the benchmarks. When we are measuring scores of operations it is not practical to run benchmarks and manually look at the results every time and compare them with older ones. We need to have a CI framework where we can compare the benchmarks with the old results automatically.

This will require us to run the benchmarks two times, once for the old code and once for the new code in quick succession. Ideally we want to benchmark the same op for old code and then immediately the same op for new code so that the computing environment (CPU load due to other tests running on the CI machine) does not change significantly across the tests and the comparison is fair.

Add microbenchmarks for all ops

We can either copy the benchmarking framework from the streaming-benchmarks package or just use that package to add more benchmarks that cover all the operations. Sometimes we do not realize that a small stupid mistake brings performance down by multifold. If we measure everything we can catch it immediately. Once we have the benchmarking streamlined, adding a new op to the benchmark should just be mechanical and a few lines change.

Make sure that the MonadError instance is correct

We need to test the MonadError instance using throwError and catchError the same way we test exceptions in #47.

We need to make sure that MonadError follows the following laws:

throwError e `catchError` f = f e
m `catchError` throwError = m
(m `catchError` f) `catchError` g = m `catchError` (\x -> f x `catchError` g)

Static argument transformation (SAT) for bind and fmap implementations

For example the StreamT functor instance can be rewritten as follows, removing the static argument f being passed around everytime in recursive calls:

instance Monad m => Functor (StreamT m) where
    fmap f x = go x
        where
        go (StreamT (Stream m)) =
            StreamT $ Stream $ \_ stp yld ->
            let yield a Nothing  = yld (f a) Nothing
                yield a (Just r) = yld (f a)
                                   (Just (getStreamT (go (StreamT r))))
            in m Nothing stp yield

Earlier when I checked, this did not show noticeable perf improvement but we can check again once we have better/easier benchmark comparison that I am working on.

scanx vs. StateT

Thanks for the beautiful library that is easy to learn! I am starting to learn composing streaming programs in Haskell, and you also have compared data-based and process-based streaming here, great!

When I need to accumulate information from the beginning of the stream, there are two ways of doing it as I know:

  • use StateT to form a monad stack (an example: AcidRain)
  • use scanx directly provided by streamly (an example: my DNS monitor project)

Is there any pros and cons between the two in terms of performance? I read the code of scanx and find another stream inside. I am wondering why not to implement it with a StateT. I also get lost when trying to reason about performance of the monad stack. The first question I have is whether the order StateT SerialT or SerialT StateT have any performance (CPU or memory) impact.

Change the signature of foldrM

The current signature is different from the standard foldrM in Foldable. We can implement it like this:

{-# INLINE foldrM #-}
foldrM :: (Streaming t, Monad m) => (a -> b -> m b) -> b -> t m a -> m b
foldrM consume final m = go (toStream m)
    where
    go m1 =
        let stop = return final
            yield a Nothing  = consume a final
            yield a (Just x) = go x >>= consume a
         in (runStream m1) Nothing stop yield

The toList implementation will then become:

toList = foldrM (\a xs -> return $ a : xs) []

It also improves the performance of toList by 10%.

Motivation?

Looks very neat! You mention in the README that this library is inspired by transient. What caused you to write this library instead of using transient, for example? What other motivations led to this work? (I'd recommend adding the answers to these questions directly to the README.)

rename StreamT to SerialT?

Currently all the stream types reflect the nature of the stream except StreamT which is a generic name even though the combinator for it is called serially. It will be consistent with the rest of the naming, with the combinator as well as reflect the nature of the stream if we call it SerialT instead. Though it will not have stream in its name but neither do other types.

rename toList to toListM to enable IsList instances

The toList function conflicts with the function of the same name from the IsList typeclass. We can make the types t Identity a instances of IsList to be able to do that cleanly we should rename the existing toList function to toListM.

This will be a breaking change.

Add a consM API

We have a cons API to add a pure value at the head of the stream. However there is no such API for adding a monadic action at the head of the stream. Though we can do that like this:

liftIO m <> stream

Since we know that such an action would produce a singleton stream, using a consM can be more efficient in joining the streams than using <> just like cons is more efficient than creating a stream like this (return 1 <> return 2 <> ...).

consM would look like:

consM :: m a -> t m a -> t m a

Add a WAhead stream type

#53 added a AheadT stream type that performs a speculative lookahead execution of the stream. The semigroup composition of this stream via <> and the ahead combinator combines streams in a depth first manner just like SerialT. We also need a breadth first or wide traversal variant of this stream called WAheadT which will combine streams in a breadth first manner just like WSerialT does.

last is broken

The implementation of last is broken if the stream ends abruptly using the stop continuation. In that case even though there may have been a last element we would not return it. Consider this:

    go m1 =
        let stop            = return Nothing
            yield a Nothing = return (Just a)
            yield _ (Just x) = go x
        in (runStream m1) Nothing stop yield

As soon as stop is called we return Nothing which is incorrect. We should remember and thread around the last element via go and in case stop is called we should yield the last one.

RFC: Nomenclature for fold functions

Currently we have foldl and foldlM functions whose signatures are more general than the standard ones and inline with the foldl library. It means that they are slightly different from the standard fold functions. To keep full compatibility with the nomenclature of standard lists and Foldable we should perhaps use different names for the functions created for the purposes of foldl.

The names foldl, foldl', foldl1 are all taken. The name fold is used by Foldable typeclass. If we are not going to have a Foldable instance we can use fold, foldM for these general functions. Note that we also have scan that is a general version of scanl.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.