Giter Club home page Giter Club logo

fs2's Introduction

FS2: Functional Streams for Scala

Continuous Integration Discord Maven Central

Overview

FS2 is a library for purely functional, effectful, and polymorphic stream processing library in the Scala programming language. Its design goals are compositionality, expressiveness, resource safety, and speed. The name is a modified acronym for Functional Streams for Scala (FSS, or FS2).

FS2 is available for Scala 2.12, Scala 2.13, Scala 3, and Scala.js and Scala Native. FS2 is built upon two major functional libraries for Scala, Cats, and Cats-Effect. Regardless of those dependencies, FS2 core types (streams and pulls) are polymorphic in the effect type (as long as it is compatible with cats-effect typeclasses), and thus FS2 can be used with other effect libraries, such as Monix.

Getting Started

Quick links:

Documentation and getting help

  • There are Scaladoc API documentations for [the core library][core-api], which defines and implements the core types for streams and pulls, as well as the type aliases for pipes and sinks. The io library provides FS2 bindings for NIO-based file I/O and TCP/UDP networking.
  • The official guide is a good starting point for learning more about the library.
  • The documentation page is intended to serve as a list of all references, including conference presentation recordings, academic papers, and blog posts, on the use and implementation of fs2.
  • The FAQ has frequently asked questions. Feel free to open issues or PRs with additions to the FAQ!
  • Also feel free to come discuss and ask/answer questions in the Typelevel Discord channel and/or on StackOverflow using the tag FS2. Discord will generally get you a quicker answer.

Projects using FS2

You can find a list of libraries and integrations with data stores built on top of FS2 here: https://fs2.io/#/ecosystem.

If you have a project you'd like to include in this list, please open a PR or let us know in the Discord channel and we'll add a link to it.

Acknowledgments

YourKit

Special thanks to YourKit for supporting this project's ongoing performance tuning efforts with licenses to their excellent product.

Code of Conduct

See the Code of Conduct.

fs2's People

Contributors

adamchlupacek avatar alissapajer avatar amarrella avatar armanbilge avatar balmungsan avatar bplommer avatar christopherdavenport avatar danicheg avatar diesalbla avatar djspiewak avatar domaspoliakas avatar fiadliel avatar fthomas avatar gvolpe avatar kubukoz avatar larsrh avatar mpilquist avatar nikiforo avatar pchiusano avatar pchlupacek avatar rossabaker avatar runarorama avatar scala-steward avatar systemfw avatar timwspence avatar tom91136 avatar typelevel-steward[bot] avatar vasilmkd avatar yilinwei avatar zaneli avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

fs2's Issues

Resource is not properly closed

Hi !

resource function doesn't handle resource management properly. Here is my example:

package bug

import java.io.{File, FileInputStream}
import scalaz.stream.{Process, Bytes, processes}
import scalaz.concurrent.Task
import Process.Sink

object Bug {
  type Chunk = Array[Byte] => Task[Bytes]

  def main(args: Array[String]) {
    val source = processes.unsafeChunkR(new FileInputStream(new File(".", "foo.txt")))

    source.to(sink).run.run
  }

  val sink: Sink[Task, Chunk] = {
    val buffer = new Array[Byte](8192)

    def go(f: Chunk) =
      for {
        bytes <- f(buffer)
      } yield println(buffer.dropRight(buffer.size - bytes.size))

    Process.await(Task.now[Chunk => Task[Unit]](go))(Process.emit).repeat
  }
}

Here is what I got once that program finished (in SBT)

[yoeight@gikoo scalaz-stream-bug]$ lsof foo.txt
COMMAND   PID    USER   FD   TYPE DEVICE SIZE/OFF     NODE NAME
java    26928 yoeight   78r   REG   8,18       15 17700895 foo.txt

Have been browsing the codebase without getting any clue so far.

I'm up-to-date to master.

Regards,
Yorick

wye.either will fail on Await.Both

wye.either will fail to match on Await.Both:

scala.MatchError: Await(Both,,Halt(scalaz.stream.Process$End$),Halt(scalaz.stream.Process$End$)) (of class scalaz.stream.Process$Await)

I think it is because there is only contramapL and R used?

Fix SOE and heap overflow in wye

Wye uses Nondeterminism.chooseAny that unfortunately leaks on stack and heap. We have to reimplement and probably specialise wye to Task to work around this.

Only wye`s using AwaitBoth pattern (like merge, either, interrupt) are affected.

pipe and tee discard the failure cause, converting all failures to normal termination

pipe and tee discard the failure cause, converting all failures to normal termination. For example:

scala> import scalaz.stream._; import scalaz.concurrent.Task
scala> val p = Process.fail(new Exception("oh no!")): Process[Task,Int]
p: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Halt(java.lang.Exception: oh no!)

scala> p.pipe(process1.id)
res4: scalaz.stream.Process[scalaz.concurrent.Task,Int] = Halt(scalaz.stream.Process$End$)

This means that possiblyFailingProc.pipe(anything) will convert errors to ordinary termination! So possiblyFailingProc.pipe(anything).attempt() gives no errors, rather defeating the purpose of attempt.

Note: I believe this should be fixed by the new trampolined representation of await, with just the single receive function, but wanted to track this as an issue. @pchlupacek could you take a look and make sure your branch addresses this case? We should get that branch merged as the next big item.

Process with map and filter does too much work

Solved in the "lazy" branch:

https://github.com/scalaz/scalaz-stream/tree/lazy

The problem is demonstrated by the following:

val src = io.linesR("/tmp/limerick.txt")

val fiveLines: Process[Task, Vector[String]] = src.chunk(5)

val limericks: Process[Task, Unit] = fiveLines.filter(_.nonEmpty).map(println)

val check = limericks.runLog.run

The result is that println receives and prints all the intermediate vectors. The reason is that map and flatMap will strictly evaluate the fallback and cleanup processes. In the case that they are emits, the subsequent map over those emits will be called repeatedly.

I.e. if the input file contains the following lines

There was a young man of Japan
Whose limericks never would scan.
When asked why this was,
He replied "It's because
I always try to fit as many syllables into the last line as ever I possibly can."

The following will be printed:

Vector(There was a young man of Japan)
Vector(There was a young man of Japan, Whose limericks never would scan.)
Vector(There was a young man of Japan, Whose limericks never would scan., When asked why this was,)
Vector(There was a young man of Japan, Whose limericks never would scan., When asked why this was,, He replied "It's because, I always try to fit as many syllables into the last line as ever I possibly can.") 

If the process after the filter is doing a lot of work, all of it will be done n times where n is the argument to chunk.

Consider renaming collect

Current implementation uses collect to gather A from the process into IndexedSeq[A].

collect is used in scala collection framework as filter + map. I would propose to replace collect with gatherAll and have implemented collect on processes as combination of filter & map (or maybe via flatMap) if we agree on that I would like to submit PR with this.

P.

consider renaming when

when is clashing with specification defaults and with Mockito framework.

Maybe we shall use different name for it, its too generic...

:-)

Infinite recursive call when Exception thrown inside Process

A process gets in an infinite loop when an exception is thrown as a result of the evaluation of one of its inputs. An example of this behaviour can be observed in the following snippet:

class PersonalSpec extends Specification {


  object PersonalProcess {

    def sum : Process1[Double, Double] = {
      def go(acc: Double) : Process1[Double, Double] = {
        await1[Double].flatMap(d => emit(d+acc) ++ go(d+acc))
      }
      go(0.0)
    }
  }

  "My personal process" should {

    "sum the right amount for a sequence of tasks that contain exceptions" in {

      val myChannel: Process[Task, Double] = emitAll(List(Task(1.0), Task(2.0), Task.fail(new RuntimeException("My error")), Task(3.0))).eval
      val r1 = io.collectTask(myChannel.pipe(PersonalProcess.sum))
      r1 must equalTo(Seq(1.0, 3.0, 6.0))

    }
  }
}

One would expect the flow to halt when an exception is thrown ideally giving the possibility to at least log the error. As a nice to have would be the possibility to handle the exception and keep consuming from the flow.

The infinite recursion is somehow due to the call to appending failTask to the error when dealing with the exception on the collectTask method:

case Await(req,recv,fb,err) =>
          val next = 
            try recv(req.run)
            catch { 
              case End => {
                fb // Normal termination
              }
              case e: Exception => {
                err ++ failTask(e) // Helper function, defined below
              }
            }
          go(next, acc)

Odd behavior of chunkBy

As I was writing more tests for process1 processes I noticed an odd behavior of chunkBy:

Process(1, 3, 8, 5, 2, 4, 7).chunkBy(_ < 4).toList
res2: List[Vector[Int]] = List(Vector(1, 3, 8), Vector(5, 2, 4), Vector(7))

The emitted chunks contain elements that satisfy and that do not satisfy the predicate. Is this behavior intended? Because, I would have expected the following result:

List(Vector(1, 3), Vector(8, 5), Vector(2), Vector(4, 7))

Since there is no test for chunkBy I do not know what the result should have been. So either the documentation should be extended or the implementation should be fixed.

Btw, here is a version of chunkBy that produces the result I had expected:

def chunkByFixed[I](f: I => Boolean): Process1[I, Vector[I]] = {
  def go(acc: Vector[I], last: Boolean): Process1[I, Vector[I]] =
    await1[I].flatMap { i =>
      val cur = f(i)
      if (cur == last) go(acc :+ i, cur)
      else emit(acc) fby go(Vector(i), cur)
    } orElse emit(acc)
  await1[I].flatMap(i => go(Vector(i), f(i)))
}

Introduce dynamic process variant of gatherUnordered (multi-merge)

When in processes we have multiple streams that eventually needs to be combined together in non-deterministic way, we can use p1 merge p2. While this is fine for few processes, it does not scale in environment where there may be lot of short-term processes that need non-deterministically to be joined together and leave the merge once terminated.

One of the solutions today is to have queue and driving process that runs the processes (possibly in new threads) together and feed their results to common queue which is then drained by some other process.

Although this is probably scalable solution, it is based on lot of side-effects and potentially brings a lot of issues with resource safety, ordered termination etc.

Idea is to either wrap the queue in sort of combinator that does the job behind the scenes or introduce variant of merge, that via recursion will allow to accumulate more processes together.

Potentially enhance cleanup to pass reason for termination

In resource cleaning procedure it may be required to behave differently if the resource finished because the process terminated successfully or because of the exception.
Probably new combinator like EnhancedResource or similar may be required?

Description of process1.sum does not match it's implementation

This is process1.sum:

/**
 * Emit a running sum of the values seen so far. The first value emitted will be the
 * first number seen (not `0`). The length of the output `Process` always matches the
 * length of the input `Process`.
 */
def sum[N](implicit N: Numeric[N]): Process1[N,N] =
  reduce(N.plus)

The description says that the output will match the length of the input, but actually sum reduces its input to single value.

Here is an example:

(Process.range(1,4) |> sum).runLog.run
res13: scala.collection.immutable.IndexedSeq[Int] = Vector(6)

According to the description I would have expected this result:

(Process.range(1,4) |> sum).runLog.run
res13: scala.collection.immutable.IndexedSeq[Int] = Vector(1,3,6)

Process().toSource and emitAll().eval behaves differently on repeat

Process().toSource and emitAll().eval should I believe behave same way when used with repeat combinator:

((Process(1).toSource.repeat) |> take(5)).collect.run  //will return Seq(1)

while

((emitAll(Seq(1)).eval.repeat) |> take(5)).collect.run  //will return Seq(1,1,1,1,1) as expected

Process signature changes proposal

Hi,
in several issues we are talking about non-optimal situation with resource cleanup and strict behaviour which makes live much harder. I put this proposal together and would like to get feedback before going to implement it.

@pchiusano @runarorama @fthomas @radekm I would appreciate if you can see your opinion on it.

Main Idea is to change signatures of Await and Emit as follows:

Await[R,O](
  req:F[R]
  , rcv: (Throwable \/ R) => Trampoline[Process[F,O]]
  , cleanup: Option[Throwable] => Trampoline[Process[F,Nothing]]
) 

Emit[O](
 head:Seq[O] 
  , tail : Trampoline[Process[F,O]]
  , cleanup: Option[Throwable] => Trampoline[Process[F,Nothing]]
)

The reasons:

  • Trampoline - It will most likely solve most of SOE dangers. The particullar nasty danger against using just the pure function is the if we nest in awaits to deep and at every step we will alter recv/cleanup, the evaluation of rcv/cleanup may throw SOE
  • Emit - currently we have specific handling in flatmap and on other various places in case the pure f will throw exception. This would eliminate the problem and allow us just build next trampoline step in case of failure. IMHO this is more cosmetic, and I don`t think is that necessary.
  • Emit-cleanup - At this moment in various places (like in flatmap) we have issues with cleanup code not running (try (emit("S") onComplete that).flatMap(_=> Halt(new Exception) that is unfortunatelly not solvable IMHO if we won't know at Emit step which part of code is cleanup and which part is tail.

Overall there will be convenience apply methods on object to hide this kind of complexity from user. i.e. await may be like :

 def await(req:F[R])(
  recv: R => Process[F,O]
  , fb:  => Process[F,O] = halt
  , c: Option[Throwable] => Process[F,Nothing] = _ =>  halt
  , fbc : Option[Throwable => Process[F,O]] = None
)  = new Await(recv, {
  case -\/(End) => Trampoline.suspend(fb)
  case -\/(rsn) => Trampoline.suspend(fbc(rsn))
  case \/-(r) => Trampoline.suspend(recv(r))
}
, c)

Thanks for thoughts, improvements etc...

Consider to make Await.fallback and Await.cleanup lazy

Current implementation of fallback and cleanup combinators is lazy by default, so it is not evaluated unless used.

However, if orElse is used in combinatinon with other combinators such as flatMap, the fallback and cleanup are evaluated.

Even when there are no side-effects and the cleanup and fallbacks on OrElse are pure this may still involve costly operation (specifically for fallback).

def sum1 : Process1[Int,Int] = {
 def go(t:Int):  Process1[Int,Int]  = {
   def total = {
      println("total " + t)
      emit(t)
   }

   await1[Int].flatMap {
     case  i if i % 2 == 0 => emit(i) ++ go(t+i)
     case i => go (t+i)
   } orElse total
  }
  go(0)
}

Process(0 until 10:_*) |> sum1).toList

will output

total 0
total 0
total 1
total 3
total 6
total 10
total 15
total 21
total 28
total 36
total 45
res8: List[Int] = List(0, 2, 4, 6, 8, 45)

`kill` may result in resource leakage

Using kill may result in resource leakage. Suppose following definitions:

import scalaz.concurrent.Task
import scalaz.stream._
import Process._

def acquire          = Task.delay(println("acquire"))
def release(r: Unit) = Task.delay(println("release"))
def step(r: Unit)    = Task.delay(println("step"))

The simplest example of leakage is

await(acquire)(
  r => eval(step(r)) onComplete eval_(release(r)),
  halt,
  halt
).once.runLog.run

The reason why the above code leaks is that it's not kill-safe (pipe calls kill after reading the first unit produced by step; there is no leakage without once).

The code can be fixed by replacing eval_(release(r)) by
await(release(r))(_ => halt, halt, eval_(release(r))). IMO the problem is that the original code seems correct and the fix is counterintuitive without knowing how once, pipe and kill work.

Here is another example with tee which leaks. The nature of the example is same:

emit(1).zip(io.resource(acquire)(release)(step)).once.runLog.run

Note that after swapping sides it doesn't leak:

io.resource(acquire)(release)(step).zip(emit(1)).once.runLog.run

wye.interrupt SOE

When right side of wye.interrupt terminates, and left side is repeated continuos process SOE is thrown :

val p1 = Process(1,2,3,4,6).toSource

val i1 = repeatEval(Task.now(false))  

(i1.wye(p1)(wye.interrupt).runLog.run

How to construct resource-safe processes?

It's currently very hard to construct a process which is resource-safe. The reason is that the process may be interrupted by pipe, tee and wye. Both pipe and tee may interrupt the process after Emit. wye may interrupt the process at any time (even during an evaluation of a task from Await - if the task is asynchronous).

  1. In the case of an interruption a cleanup is called. The problem is that different functions call different cleanups - it's not consistent. For example wye calls the cleanup from the last evaluated Await, pipe and tee use kill which calls the cleanup from the next Await.

  2. Now suppose that the process is interrupted when it's cleaning resources - the cleanup part of the process is running. Interpreter of the process doesn't know about it and so it finds some Await and runs the cleanup from that Await. This may result in resource leakage if the cleanup part wasn't ready for this (for example if the cleanup part was eval_(cleanupTask)) - see https://github.com/scalaz/scalaz-stream/issues/72 for more details .

  3. can be overcome by constructing the process differently - but it's currently difficult - please see again https://github.com/scalaz/scalaz-stream/issues/72. To make it easier we can introduce a new method - for example whole - which alters every Await without cleanup - sets itself as the cleanup (but not recursively).

  4. can be fixed by precisely defining which cleanup is called when and constructing processes with these rules in mind.

The rules for cleanup when interrupting can be:

  • When the process is Emit - find the next Await and run its cleanup.
  • When the process is Await and we haven't started its task - run its cleanup.
  • When the process is Await and we have interrupted evaluation of its task - run its cleanup (tasks in cleanups shouldn't be asynchronous, so they won't be interrupted).
  • When the process is Await and we have unsuccessfully evaluated its task - run its cleanup (even if the exception was End).
  • When the process is Await and we have successfully evaluated its task - find the next Await and run its cleanup.
  • recv from Await should always produce process - but if it doesn't, run the cleanup from that Await.
  • When f in flatMap fails to produce a process (the process which is being flat-mapped is in Emit) - find the next Await and run its cleanup.

io.chunkR/W uses unnecessary BufferedInputStream/BufferedOutputStream

Trying to get my head around Process, and came across the fileChunkR/W methods in io. From what I can see (which is admittedly not a lot at all), I can't see any need to use the buffered streams, as there's no need for the mark/reset functionality they provide, and the implementation effectively buffers already (by writing into an Array[Byte]).

Seq is a poor choice for the block value type.

Using Seq as the block value type inhibits users from being able to reason about the performance characteristics of operations on blocks of data. I think it would be preferable to either choose a single, specific block type (say Vector) or else to use implicit typeclass constraints and/or explicit dictionaries to provide the operations that are available for block values. Seq is a very muddled type, and doesn't seem to me to really have a place in so principled a library as scalaz-stream.

Functions from sink can outlive resource

It's nice that we don't need anything special for sinks - they're just processes producing functions. But the disadvantage is that these functions can outlive the resource which they're writing to:

val data = Array[Byte](65, 66, 67)
val sink = scalaz.stream.io.fileChunkW("foo.txt", 2)

// Example 1
val f = sink.once.runLast.run.get
f(data).run

// Example 2
sink.once.last.map(_(data)).eval.run.run

Both examples end with java.io.IOException. The important thing in the second example is last which ensures that output stream is closed before we write into it.

is Process a good name?

the default java.lang library has a Process class. This means that one has to do the following:

import java.lang.{Process=>_}
import scalaz.stream._

At this early stage finding another name is still easy...
I don't have a good intution on what it could be yet.

bugs in Bytes

There are a few bugs in the Bytes objects including the ability to pattern match to the underlying arrays and the exception throwing bug below.

val aa = Bytes.of(Array(1,2,3).map(_.toByte))
val arr = Array.ofDim[Byte](3)
aa.copyToArray(arr)
scala.MatchError: Bytes1: pos=0, length=3, src: (1,2,3) (of class scalaz.stream.Bytes1)

I'll attach a pull request that addresses these issues and improves a few implementations as well.

Read lines from file

The comments for resource in scalaz.stream.io currently mention "See lines below for an example use.", but there is no "lines" function. There is, however, one in the book answer code.

I tried adapting that one, results below:

  def lines(filename: String): Process[Task, String] =
    resource(Task(new BufferedReader(new FileReader(filename))))(r => Task(r.close))(r => 
        Task( if (r.ready) r.readLine else throw End )
      )

I had it with scala.io.Source at first, but tried a BufferedReader after noticed it would just repeat the first line of the file forever. Changing to BufferedReader seemed to make no difference in that behavior, though.
Perhaps the nio example as in the book code would best, anyways?

Also, if an exception is thrown, it seems to be just an eternal stream of Awaits.

zipAll Has Incorrect Behavior

For the test

val is = Process.range(0,5)
val os = Process.range(0,4)
os.tee(is)(tee.zipAll(-1, 1)).to(repeatEval(now((t: Any) => now(print(t))))).run.run

I am getting

(0,0)(1,1)(2,2)(3,3)

Thanks

Possible exponential run time when zipping multiple streams

Following the discussion here, it seems that ziping multiple streams together may trigger a run time cost that is exponential in the number of zipped streams.

The code to blame is the killing part of the wye implementation, the number of kill instructions may grow exponentially in the number of zipped Process instances.

The growth depends on the cleanup and fallback Processes of the Await instances being run. Everything is okay when they are instances of Halt, but goes exponential on emits.

Here's a runnable example of this behavior (that's not a real benchmark, but demonstrates the point well enough):

import scalaz.concurrent.Task
import scalaz.concurrent.Task._
import scalaz.stream._
import scalaz.stream.Process._

object Test extends App {

  /** Pseudo benchmarking code. */
  def time[R](block: => R): R = {
    val t0 = System.currentTimeMillis()
    val result = block
    val t1 = System.currentTimeMillis()
    println("\tElapsed time: " + (t1 - t0) / 1000 + "s")
    result
  }

  /** Balanced zipping of n streams. */
  def zipN[F[_], A](xs: List[Process[F, A]]): Process[F, Seq[A]] = xs match {
    case Nil => Process.halt
    case List(x) => x map (Vector(_)) // somehow the type argument F gets broken if I do a :: pattern match
    case _ => {
      val ys = xs.splitAt(xs.length / 2) // same here
      zipN[F, A](ys._1).zipWith(zipN(ys._2))(_ ++ _)
    }
  }

  def constant[A](a: A, fallback: Process[Task, A], cleanup: Process[Task, A]): Process[Task, A] = {
    lazy val go: Process[Task, A] =
      await(Task.now(a))(a => Emit(List(a), go), fallback, cleanup)
    go
  }

  def run[A](name: String, fallback: Process[Task, A], cleanup: Process[Task, A]) = {
    println(name)
    for (i <- 1 to 8) {
      val procs = (1 to i).map(constant(_, fallback, cleanup).take(500000))
      println(s"\tZipping $i streams")
      time {
        zipN(procs.toList).runLast.run
      }
      println("\t-----")
    }
  }

  val emitVal = Process.emit(1) // doesn't really matter what to emit, as long as it's not a halt and it type checks
  run("With no fallback and no cleanup", halt, halt)
  run("With fallback", emitVal, halt)
  run("With cleanup", halt, emitVal)
  run("With fallback and cleanup", emitVal, emitVal)
}

The output on my machine is:

With no fallback and no cleanup
    Zipping 1 streams
    Elapsed time: 1s
    -----
    Zipping 2 streams
    Elapsed time: 2s
    -----
    Zipping 3 streams
    Elapsed time: 3s
    -----
    Zipping 4 streams
    Elapsed time: 4s
    -----
    Zipping 5 streams
    Elapsed time: 6s
    -----
    Zipping 6 streams
    Elapsed time: 7s
    -----
    Zipping 7 streams
    Elapsed time: 8s
    -----
    Zipping 8 streams
    Elapsed time: 10s
    -----
With fallback
    Zipping 1 streams
    Elapsed time: 1s
    -----
    Zipping 2 streams
    Elapsed time: 4s
    -----
    Zipping 3 streams
    Elapsed time: 8s
    -----
    Zipping 4 streams
    Elapsed time: 12s
    -----
    Zipping 5 streams
    Elapsed time: 17s
    -----
    Zipping 6 streams
    Elapsed time: 23s
    -----
    Zipping 7 streams
    Elapsed time: 29s
    -----
    Zipping 8 streams
    Elapsed time: 35s
    -----
With cleanup
    Zipping 1 streams
    Elapsed time: 1s
    -----
    Zipping 2 streams
    Elapsed time: 4s
    -----
    Zipping 3 streams
    Elapsed time: 9s
    -----
    Zipping 4 streams
    Elapsed time: 15s
    -----
    Zipping 5 streams
    Elapsed time: 23s
    -----
    Zipping 6 streams
    Elapsed time: 34s
    -----
    Zipping 7 streams
    Elapsed time: 49s
    -----
    Zipping 8 streams
    Elapsed time: 71s
    -----
With fallback and cleanup
    Zipping 1 streams
    Elapsed time: 1s
    -----
    Zipping 2 streams
    Elapsed time: 8s
    -----
    Zipping 3 streams
    Elapsed time: 21s
    -----
    Zipping 4 streams
    Elapsed time: 45s
    -----
    Zipping 5 streams
    Elapsed time: 97s
    -----
    Zipping 6 streams
    Elapsed time: 199s
    -----
    Zipping 7 streams
    Elapsed time: 406s
    -----
    Zipping 8 streams
    Elapsed time: 841s
    -----

release 0.0.1

it'd be nice to get a stable released artefact for those of us coming from scala-machines

Function passed to map being called multiple times for each emitted value

I'm attempting to transform the final output of a process with an expensive operation. I see the expected result, but with some unexpected invocations of expensive.

import scalaz._, Scalaz._
import scalaz.concurrent.Task
import scalaz.stream.Process, Process._
object RangeTest extends App {
  def expensive(i: Int): Int = { println(s"expensive(${i})"); i * 10 }
  def runRange(p: Process[Task, Int]) = p.last.map(expensive).runLog.run
  println("emitRange = "+runRange(emitRange(0, 5)))
  println("range = "+runRange(range(0, 5)))
}
Output:

expensive(4)
emitRange = Vector(40)
expensive(0)
expensive(0)
expensive(1)
expensive(1)
expensive(2)
expensive(2)
expensive(3)
expensive(3)
expensive(4)
expensive(4)
expensive(4)
range = Vector(40)

The emitRange case behaves exactly as I expect. I'm lost as to why expensive is applied twice to each element of the range. Is this a bug, or have I not yet comprehended the consequences of range's laziness? Thanks for any pointers.

Add support for SSL

This is a feature request. We'd like to implement the following combinator, or something like it:

def ssl(config: SSLParameters)(e: Exchange[Bytes, Bytes]): Exchange[Bytes,Bytes]

Where config is perhaps of type SSLParameters, or whatever type encapsulates all the options one might use in configuring an SSL connection.

I'd like it to be done as an Exchange transformer so this can be easily used with the nio client and servers that have already been written (as well as with ordinary blocking java.io sockets).

The returned Exchange should block on completion of an SSL handshake before sending or receiving any bytes, and after the handshake is complete, bytes should be encrypted using the negotiated encrypted channel.

It seems like one could use SSLEngine directly for this. It's a pretty awful API, which is why wrapping it in something sane would be valuable. I am open to borrowing or stealing code from other libraries to help with this task, but would strongly prefer not to include any other dependencies in scalaz-stream.

If you're interested in contributing and have questions about the requirements or how to approach this, please feel free to ask questions in the comments.

Exponential wrapping of Emitted sequences by wye

The align and unalign method calls in wye
https://github.com/scalaz/scalaz-stream/blob/master/src/main/scala/scalaz/stream/Process.scala#L384
https://github.com/scalaz/scalaz-stream/blob/master/src/main/scala/scalaz/stream/Process.scala#L409
will generate two wrappers around any leftover emitted Seq every time the Wye enters an Emit state. This causes the following example to crash with an 'java.lang.OutOfMemoryError: GC overhead limit exceeded' exception:

emitSeq(0 until 1000000) zip emitSeq(0 until 1000000)

I've fixed this issue, and will submit a pull request.

Unfortunately, the wye method is not stack safe. While the pull request fixes the wrapping issue, the same example code will blow the stack due the the recursive nature of wye. This requires a trampoline, either through punting to the Monad, which will require all Tee and friends receive a Monad instead of a null, or it may also be solved by making the tail of emit a thunk.

Exception safety problem in implementation of `wye`

Minimize this later. It appears that when an exception is thrown while mapping over the result of a.merge(b), the finalizer for a is not run.

  import scalaz.stream._
  val s = async.signal[Double]
  val s2 = async.signal[Double]; s.value.set(0)
  val S = Strategy.DefaultStrategy
  S { Process.awakeEvery(1 seconds).map { _ =>
    s.value.set(math.random)
    s2.value.set(math.random)
  }.run.run
  }
  val t = s.discrete.onComplete(Process.eval_ {
    Task.delay { println("completed") }
  }).map(_.toString).merge(s2.discrete.map(_.toString)).evalMap {
    line => Task.delay {
      if (math.random < .3) sys.error("die")
      else line
    }
  }

This code does not print "completed", which is registered onComplete. Replacing the evalMap with a regular map and throwing the exception in pure code has the same issue. Removing the call to merge fixes the issue.

Decide on Bytes

Current implementation of Bytes is somehow second class citizen in the library.

We shall probably extend it or decide on alternative approach that will not require allocation of arrays on every read request.

I think the contract shall be either :

  • read-only wrapper around the Array[Bytes]. Ref to Array will be held separatelly from the wrapper closed over Process that actually reads the data from input source.
trait Bytes {
  // writes to os
  def writeTo(os:OutputStrem):Int

  // return read only nio.ByteBuffer
  def toBuffer : ByteBuffer 

//efficient append
 def append(other:Bytes) : Bytes

 def ++(other:bytes) : Bytes

// will copy content to new array
 def toArray : Array[Byte]
}

//somewhere in process
var array = new Array.ofDim[Byte](1024)

val is: InputStream = ???


///in step 

Bytes.apply(is.read(array), array) : Bytes

//on other end of Process 
bytes ++ otherBytes
//or
(bytes:Bytes).writeTo(output:OutputStream)

  • State machine, that will not expose internal array to public (but will keep it internally) and will be either in Full or Empty state. In Full state it will allow only operations that write content to some input to be performed, in Empty it will allow only single read from some output to be performed

Both scenarios will have some effective append and seek operations on them inspired for example by akka.ByteString : http://doc.akka.io/api/akka/2.0/akka/util/ByteString.html

What are your thoughts? We need efficient byte streaming for scalaz-stream-mongo, and other scalaz-stream goodies....

Thanks,

P.

Implement java.nio.socketXX

Implement Processes that will encapsulate non-blocking sockets of Java Nio.

Suggesting to start with sockets first, hence thanks to nonblocking nature this brings IMHO significant value-add to streams. The FileChannel can be implemented laters, once we will decide on #25 .

The basic idea for now is to have :

package nio

trait tcp {

def bind(addr:InetSocketAddress):Process[Task,SocketClient] = ...

def connect(addr:InetSocketAddress):SocketClient = ...

}


trait SocketClient {

 def receive : Process[Task, Array[Byte]]

 def send : Sink[Task,Array[Byte]]

}

I think even when SocketClient has similar signature as Topic where you can get multiple receive and send side, but, unlike topic, it shall not allow multiple receives or sends to be acquired, only first send and receive Process shall be alive the others shall be in failed or finalized state.

This is I believe needed to implement correctly back-pressure so we would not take more elements from receive side that we can handle and we would not write more bytes to socket that socket can send to client. Anyhow whoever can combine Topic or Queue when different behaviour is desired.

Also once the send or receive side will get closed for whatever reason we shall close the socket.

Running either of send or receive shall initiate connection with other party. One shall assure, that receive and send are run when needed.

Can you please think of it for while so we can decide on basics here?

Bug in Bytes.slice

scala> (Bytes.of("0".getBytes) ++ Bytes.of("0".getBytes)).splitAt(1)
res24: (scalaz.stream.Bytes, scalaz.stream.Bytes) = (BytesN(48),BytesN(48))

scala> (Bytes.of("0".getBytes) ++ Bytes.of("0".getBytes)).splitAt(1)._2.splitAt(0)
res25: (scalaz.stream.Bytes, scalaz.stream.Bytes) = (BytesN(),BytesN())

res24 is okay, but res25 should be (BytesN(),BytesN(48)), right?

More principled `Step` type

It is sometimes useful / necessary when binding to an external API to grant control over traversing the stream to some external process. For these situations, we could use a more principled way of 'stepping' a stream, such that after each step, the caller receives the current emitted values and a next step, as well as the latest finalizer.

Fix SOE in `enqueue`

Process.range(0, 400).map(i => (i+1).toString).connect(io.stdOutLines)(wye.boundedQueue(400)).run.run

Results in SOE. Need to rewrite enqueue, which is used by connect to use either a custom actor or other stack safe library primitives.

Also see #50.

Official 0.2 version

We are currently trying to remove SNAPSHOT version from our build. Is it possible to publish 0.2 and move the current issues to a 0.3-SNAPSHOT milestone?

Thanks.

Consider changing representation of `Channel`

From Process[F, A => F[B]] to Process[F, A => Process[F,B]]. This is more flexible - channel can emit zero or more values for each input, rather than exactly one value. And I suspect can reimplement all the existing combinators.

Need to do some investigation of this to see if it's possible.

improve async.topic test

There is a typos, that change the semantics in the test, also the test is depending on closing the topic which will not be typical usage scenario, the subscribers should receive the messages on the run

then is deprecated

Consider to rename Process.then to something else hence since scala 2.10 it is reserved word.

process1 aliases

I noticed that some process1 combinators like awaitOption, init, and skip (to name a few) do not have an this |> alias in Process. What should be the rule here? Should every combinator have an alias? If not, what is the criteria for providing one?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.