Giter Club home page Giter Club logo

akka-stream-json's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

akka-stream-json's Issues

Document some example usages

Would be nice with some examples on how to parse diverse streaming JSON for example as this SO issue, im the poster of that i have been stuck on this for some time now:

SO Issue

Support for JSON EntityStreamingSupport and example needed

It's not clear to me if akka-stream-json can be used together with Source Streaming/JSON streaming as documented by http://doc.akka.io/docs/akka/2.4/scala/http/routing-dsl/source-streaming-support.html.
In particular I don't understand if the framing done by akka-stream-json can work together with EntityStreamingSupport.json() (and the configuration of it) or is the framing done explicitly by akka-stream-json?
I think an example with a web socket client which opens a connection and streams entities would be useful to illustrate concretely the API's use, either with a concrete case class object or a general Json object, especially as there are not many example around (example, let's not just do the canonical Sink.foreach(println), can the example illustrate working with the case class)?

Do not cancel the stream under any circumstances

In diagnosing the issue here akka/akka-http#1459, we found an apparent root cause.

It turns out that under some circumstances, akka-stream-json can cancel the JSON stream which ends up propagating and cancelling the HTTP connection stream (causing the UnexpectedConnectionClosureException issue)

Would it be possible to make sure the akka-stream-json will never cancel the stream that it is working on? There is also an issue on akka-http end here akka/akka-http#1479 to add a feature so that cancelling the entity stream will not cancel the connection.

Find another maintainer?

We are currently being blocked by #14 due to binary compatibility issues and it appears that this repository is no longer being maintained, @knutwalker is it possible to find another maintainer for such reasons?

java.lang.OutOfMemoryError: Java heap space after multiple files parsing.

I am not really sure if this is related to the parser itself. I do have the code below now:

FileIO.fromFile(filePath.toFile)
        .withAttributes(Attributes.logLevels(onElement = Logging.DebugLevel))
        .via(JsonStreamParser[JsValue])
        .runWith(Sink.foreach(println))
        .fallbackTo(Future { Files.deleteIfExists(filePath) })

When I ran this with 16 files of size 9.6 mb dropped at one time into the specified folder. It ran for while and threw:

Uncaught error from thread [Sys-akka.actor.default-dispatcher-12] shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled for ActorSystem[Sys]
[error] java.lang.OutOfMemoryError: Java heap space
[error] at java.util.Arrays.copyOf(Arrays.java:3332)
[error] at java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
[error] at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
[error] at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
[error] at java.lang.StringBuilder.append(StringBuilder.java:136)
[error] at spray.json.JsonPrinter$class.printString(JsonPrinter.scala:62)
[error] at spray.json.CompactPrinter$.printString(CompactPrinter.scala:51)
[error] at spray.json.CompactPrinter$$anonfun$printObject$2.apply(CompactPrinter.scala:37)
[error] at spray.json.CompactPrinter$$anonfun$printObject$2.apply(CompactPrinter.scala:36)
[error] at spray.json.JsonPrinter$$anonfun$printSeq$1.apply(JsonPrinter.scala:92)
[error] at spray.json.JsonPrinter$$anonfun$printSeq$1.apply(JsonPrinter.scala:90)
[error] at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
[error] at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
[error] at spray.json.JsonPrinter$class.printSeq(JsonPrinter.scala:90)
[error] at spray.json.CompactPrinter$.printSeq(CompactPrinter.scala:51)
[error] at spray.json.CompactPrinter$class.printObject(CompactPrinter.scala:36)
[error] at spray.json.CompactPrinter$.printObject(CompactPrinter.scala:51)
[error] at spray.json.CompactPrinter$class.print(CompactPrinter.scala:28)
[error] at spray.json.CompactPrinter$.print(CompactPrinter.scala:51)
[error] at spray.json.CompactPrinter$$anonfun$printObject$2.apply(CompactPrinter.scala:39)
[error] at spray.json.CompactPrinter$$anonfun$printObject$2.apply(CompactPrinter.scala:36)
[error] at spray.json.JsonPrinter$$anonfun$printSeq$1.apply(JsonPrinter.scala:92)
[error] at spray.json.JsonPrinter$$anonfun$printSeq$1.apply(JsonPrinter.scala:90)
[error] at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:221)
[error] at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
[error] at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:428)
[error] at spray.json.JsonPrinter$class.printSeq(JsonPrinter.scala:90)
[error] at spray.json.CompactPrinter$.printSeq(CompactPrinter.scala:51)
[error] at spray.json.CompactPrinter$class.printObject(CompactPrinter.scala:36)
[error] at spray.json.CompactPrinter$.printObject(CompactPrinter.scala:51)
[error] at spray.json.CompactPrinter$class.print(CompactPrinter.scala:28)
[error] at spray.json.CompactPrinter$.print(CompactPrinter.scala:51)
java.lang.RuntimeException: Nonzero exit code returned from runner: 255
at scala.sys.package$.error(package.scala:27)
[trace] Stack trace suppressed: run last compile:run for the full output.
error Nonzero exit code returned from runner: 255
[error] Total time: 556 s, completed Jun 11, 2016 10:46

Thanks for any help or suggestion.

The stream gets stuck with the parser when large number of files are pushed thru.

Hi,

I managed to get file delete into the stream processing by defining a runnable graph as below:

  def g(source: Source[Path, Unit]) = RunnableGraph.fromGraph(GraphDSL.create() {
    implicit builder =>
      import GraphDSL.Implicits._

      val broadcast = builder.add(Broadcast[Path](2))

      val A: FlowShape[Path, JsValue] = builder.add(flow)
      val B: FlowShape[Path, Path] = builder.add(id)
      val C: FlowShape[Path, Boolean] = builder.add(deleteFlow)
      val zip = builder.add(Zip[JsValue, Path]())
      val unzip = builder.add(Unzip[JsValue, Path]())

      val out = Sink.foreach(println)
      source ~> broadcast ~> A ~> zip.in0
                broadcast ~> B ~> zip.in1
                zip.out ~> unzip.in
                unzip.out0 ~> out
                unzip.out1 ~> C ~> Sink.ignore
      ClosedShape
  })

However, if I have the flow as

val flow = Flow[Path]
    .filter(_.getFileName.toString.trim.endsWith(".json"))
    .throttle(1, 1 second, 1, p => 2, ThrottleMode.Shaping)
    .flatMapConcat(p => FileIO.fromFile(p.toFile))
    .via(JsonStreamParser[JsValue])

The stream will get stuck if I have like 50 files pushed thru in succession. But it would work slowly thru if I have the following code instead:

val flow = Flow[Path]
    .filter(_.getFileName.toString.trim.endsWith(".json"))
    .map(p => ByteBuffer.wrap(scala.io.Source.fromFile(p.toString).toArray.map(_.toByte)))
    .map(Parser.parseFromByteBuffer)
    .map(_.get)

the error that I got: java.lang.RuntimeException: Nonzero exit code returned from runner: 255
at scala.sys.package$.error(package.scala:27)

I know the _.get is unsafe. But I am just testing out and it turns out to have different behavior. Again, I am not really sure if this has anything to do with the parser. Any hint or idea how I should investigate this further is appreciated. Thanks.

Can you provide an example using the library with Akka-Stream?

Hi,

I came across the library and I would like to try to parse some json stream that I will be reading from a file. I am not really sure how to use the library yet.

When I tried to parse the stream using Circe, I do get

could not find implicit value for evidence parameter of type io.circe.Decoder[jawn.ast.JValue]

I have looked at Circe source and found the support for spray-json file. So I include them as below to resolve the implicit issue.

import jawn.ast.JValue
import jawn.support.spray.Parser._
import spray.json.{ DefaultJsonProtocol, JsValue }
import io.circe._
import io.circe.generic.auto._
import io.circe.parser._
import io.circe.syntax._

But when I ran the the flow:

FileIO.fromFile(someFile)
      .via(Framing.delimiter(ByteString(System.lineSeparator), maximumFrameLength = 512, allowTruncation = true))
      //.via(de.knutwalker.akka.stream.support.CirceStreamSupport.decode[JsValue])
      .map(s => { println(s); s.toString })
      .runWith(Sink.foreach(println))

The first via prints out the content while the second just does not process anything.

Would it be possible to have some sample code for such a scenario? Thanks for your help.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.