Giter Club home page Giter Club logo

monix's Introduction

Monix

Asynchronous, Reactive Programming for Scala and Scala.js.

monix Scala version support

Build Gitter Discord

Overview

Monix is a high-performance Scala / Scala.js library for composing asynchronous, event-based programs.

It started as a proper implementation of ReactiveX, with stronger functional programming influences and designed from the ground up for back-pressure and made to interact cleanly with Scala's standard library, compatible out-of-the-box with the Reactive Streams protocol. It then expanded to include abstractions for suspending side effects and for resource handling, and is one of the parents and implementors of Cats Effect.

A Typelevel project, Monix proudly exemplifies pure, typeful, functional programming in Scala, while being pragmatic, and making no compromise on performance.

Highlights:

  • exposes the kick-ass Observable, Iterant, Task, IO[E, A], and Coeval data types, along with all the support they need
  • modular, split into multiple sub-projects, only use what you need
  • designed for true asynchronicity, running on both the JVM and Scala.js
  • excellent test coverage, code quality, and API documentation as a primary project policy

Usage

Library dependency (sbt)

For the stable release (compatible with Cats, and Cats-Effect 2.x):

libraryDependencies += "io.monix" %% "monix" % "3.4.1"

Sub-projects

Monix 3.x is modular by design. See the sub-modules graph:

Sub-modules graph

You can pick and choose:

  • monix-execution exposes the low-level execution environment, or more precisely Scheduler, Cancelable, Atomic, Local, CancelableFuture and Future based abstractions from monix-catnap.
  • monix-catnap exposes pure abstractions built on top of the Cats-Effect type classes; depends on monix-execution, Cats 1.x and Cats-Effect
  • monix-eval exposes Task, Coeval; depends on monix-execution
  • monix-reactive exposes Observable for modeling reactive, push-based streams with back-pressure; depends on monix-eval
  • monix-tail exposes Iterant streams for purely functional pull based streaming; depends on monix-eval and makes heavy use of Cats-Effect
  • monix provides all of the above

Documentation

See:

API Documentation:

(contributions are welcome)

Related:

Contributing

The Monix project welcomes contributions from anybody wishing to participate. You must license all code or documentation provided with the Apache License 2.0, see LICENSE.txt.

You must follow the Scala Code of Conduct when discussing Monix on GitHub, Gitter channel, or other venues.

Feel free to open an issue if you notice a bug, have an idea for a feature, or have a question about the code. Pull requests are also gladly accepted. For more information, check out the contributor guide.

If you'd like to donate in order to help with ongoing maintenance:

Adopters

Here's a (non-exhaustive) list of companies that use Monix in production. Don't see yours? Submit a PR ❤️

License

All code in this repository is licensed under the Apache License, Version 2.0. See LICENSE.

monix's People

Contributors

alexandru avatar allantl avatar aoprisan avatar avasil avatar creyer2 avatar ctoomey avatar ghostbuster91 avatar github-actions[bot] avatar golem131 avatar greenhat avatar guizmaii avatar jozic avatar jvican avatar larsrh avatar leandrob13 avatar lorandszakacs avatar lukestephenson avatar n4to4 avatar oleg-py avatar omainegra avatar paualarco avatar pk044 avatar rfkm avatar scala-steward avatar sethtisue avatar tapanvaishnav avatar valenterry avatar wogan avatar wosin avatar xuwei-k avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

monix's Issues

Use case: alive-bit

Problem: A bit changes every second from 0 to 1. If there is no change for a period of time, like for more than 10 seconds then there should be a fallback to another event onNext().

Get rid of sun.misc.Unsafe

Relying on sun.misc.Unsafe introduces portability problems and is error prone. This dependency is now dropped from the project - the atomic references implementation that depended on it is now piggybacking straight on top of Java's own atomic references.

Missing items when merging PublishSubject

I've recently came across some strange behavior that may be a bug:

When merging a publish subject, the items emitted immediately after the subscription are never sent to the observer. Hence, the following dumps nothing to the console:

val i = PublishSubject[Int]()

val merged = Observable.merge(i)

merged.dump("merged").foreach(_ => ())

i.onNext(1)

i.onNext(1) immediately yields Continue as if there where no subscriptions. When subscribing directly to i everything works as expected. Also, rescheduling the onNext call works:

val i = PublishSubject[Int]()

val merged = Observable.merge(i)

merged.dump("merged").foreach(_ => ())

implicitly[Scheduler].execute {
  i.onNext(1)
}

I use monifu only with ScalaJS so I didn't check if this also applies when running in the JVM.

Observable.buffer(interval) violates the contract

This scheduler task here:

        private[this] val task =
          scheduler.scheduleRecursive(timespan, timespan, { reschedule =>
            lock.enter {
              if (!isDone) {
                observer.onNext(queue)
                queue = ArrayBuffer.empty
                reschedule()
              }
            }
          })

Does not respect the contract for onNext (doesn't do back-pressure).

enhance "buffer" to allow overlap

Buffer currently chunks objects into strict subsequences of the input objects.

This enhancement would enhance buffer to have an overlap parameter so that instead of:

1,2,3,4,5,... => [1,2],[3,4],[5,6],...

we get

1,2,3,4,5,... => [1,2], [2,3], [3,4],...

Change signature of `ConnectableObservable.connect()`

I rushed into changing the signature of ConnectableObservable.connect() too soon.
In current version the signature is like this ...

trait ConnectableObservable[+T] extends Observable[T] {
  /**
   * Starts emitting events to subscribers.
   */
  def connect(implicit s: Scheduler): BooleanCancelable
}

Well, this actually does not make sense and makes things harder when implementing your own connectable observables. The connect method should not receive the scheduler. So changing it back to:

trait ConnectableObservable[+T] extends Observable[T] {
  /**
   * Starts emitting events to subscribers.
   */
  def connect(): BooleanCancelable
}

The implication of this is that operators like Observable.multicast(subject) and Observable.publish() now will want an implicit Scheduler. But that's OK.

Add BufferPolicy.DropBuffer

Need to add a new BufferPolicy that specifies to drop the whole buffer in case of overflow, with a corresponding BufferedSubscriber.

Optimize for Javascript engines

Eliminate synchronization concerns

Even though such synchronization is usually light (e.g. SpinLock.enter is a macro that translates into nothing), we've got instances in which logic is in place to deal with JVM-specific concurrency problems, including usage of atomic references that on top of Javascript is totally unneeded and amounts to unnecessary boxing of primitives.

Optimize the Data-structures

In several places, like in BufferedObserver or in the delay() or buffer() operators, we are using Scala's standard data-structures. This is not good, as in terms of efficiency they are not using Javascript's native capabilities and are too heavy on inheritance and indirection, traits that are alleviated by the JVM. Identified needs for data-structures are:

  1. Growable buffer

Scala's ArrayBuffer uses what is called a ResizableArray, which has an internal Array that is resized when the size has been exceeded and this resize operation involves an array copy. In Javascript on the other hand, arrays are not of fixed length and you can natively append to them. So you can do this and it will be pretty efficient:

items = [];
items.push(1);
items.push(2);
  1. Queues

If we're talking about array-baked implementations, that also implies shrinking the array, but one can do what Queue.js is doing, which is basically keeping an internal start offset that's incremented on dequeue and when the full length of the array is over twice the size of the queue, then it does array.splice(0, offset), another native operation.

  1. Ring buffer

When buffering and wanting to drop older items on overflow, a ring buffer should be easily implementable in Javascript, especially since we don't have multi-threading concerns. On top of the JVM, the easiness with which one could implement a ring buffer depends on the scenario - SPSC / SPMC are doable for me, but MPMC might need research and incurring pain.

  1. We should probably get rid of the usage of immutable collections

Immutable collections aren't used much, but in instances in which they are used, we should get rid of that usage, as it can really fuck things up because of the garbage collector. Immutable collections have been used in combination with atomic references for non-blocking synchronization on the JVM, but on top of Javascript that's totally unnecessary.

Implement *DelayError operators

The task is about implementing a second variant for the merge / concat / combineLatest operators that delay emitting errors until the composite observable is complete. Towards this purpose, because we don't want to lose information (e.g. errors being thrown by multiple observables), we're going to model a CompositeException that will be a list of multiple exceptions gathered from multiple sources and emit that at the end.

The operators being implemented:

  • mergeDelayError
  • concatDelayError (and flattenDelayError)
  • combineLatestDelayError
  • flatScanDelayError

Excerpt from the ReactiveX docs:

like merge but reserving onError notifications until all of the merged Observables complete and only then passing it along to the observers

http://reactivex.io/documentation/operators/merge.html

monifu 0.14.1 is not a friend with Intellij Idea anymore

Hey,

Since I started to use

libraryDependencies += "org.monifu" %% "monifu" % "0.14.1"

on JVM, Idea wouldn't compile the project due to weird stuff like PublishSubject.apply() doesn't exist and similar really insane errors. Sbt compiles it fine. I tried both java 1.8 and java 1.7, a few Scala versions, I also tried to create a new sample project with it, finally I just decreased version to 0.14.0 and it was OK.

It doesn't concern JS environment, it is just a JVM thingy...

I guess that this travis java&scala version change is the cause of it : c165c3d.

What java&scala version are you using (I guess you're an Idea user)?

UPDATE: The local build is working fine...

Update project setup

Maintenance stuff, to update:

  • clean project structure
    • collapse project structure, get rid of monifu-core
    • use Scala.js's crossProject in Build.scala
  • update versions for dependencies
  • update copyright headers

Implement operators for error handling (roadmap to 1.0)

This issue is related to issue #10. The following operators are not implemented and need to be in the following 1.0 milestones:

  1. onErrorResumeNext( ) — instructs an Observable to emit a sequence of items if it encounters an error
  2. onErrorReturn( ) — instructs an Observable to emit a particular item when it encounters an error
  3. retry( ) — if a source Observable emits an error, resubscribe to it in the hopes that it will complete without error

The first 2 are the equivalents of recover and recoverWith from Scala's Future API. All 3 of them are badly needed.

UPDATE ...

I don't necessarily agree with the naming, since for example onErrorResumeNext is something that comes from Visual Basic and in the original meaning implies resuming even if terminated normally, a behavior that wasn't picked up by RxJava and now it creates confusion. I also don't like parameter overloading too much. So we'll have the following signatures, which are pretty self explanatory and fairly familiar (made a combination with Future's API :)) ...

def onErrorRecoverWith[U >: T](pf: PartialFunction[Throwable, Observable[U]]): Observable[U]

def onErrorFallbackTo[U >: T](that: => Observable[U]): Observable[U]

def onErrorRetryUnlimited: Observable[T]

def onErrorRetry(maxRetries: Long): Observable[T]

def onErrorRetryIf(p: Throwable => Boolean): Observable[T]

Design change - Scheduler should not be an ExecutionContext

We've got the following issues:

  1. Requiring people to depend on Monifu's Scheduler when working with Observables is too much, as it raises the friction of adopting Monifu in their projects; only a subset of the Observable's operators depend on scheduling tasks in the future anyway, so it's kind of bad design.
  2. Scheduler should not be an ExecutionContext, again it's bad design as it couples 2 things together - this design was imported from Java's Executors and RxJava's Scheduler.

Add tests for Scala.js

Focus on Scala.js has fallen behind. Right now the code compiles and packages get published for Scala.js and it theoretically works, however we need Scala.js specific testing.

Please port as many tests as possible from the monifu-rx module to monifu-rx-js. Don't bother with tests meant for concurrency issues, as Scala.js runs on Javascript runtimes so there are no multithreading issues to speak of.

Here is how a test looks like: https://github.com/monifu/monifu/blob/v0.13.0-RC5/monifu-rx-js/src/test/scala/monifu/reactive/ObservableTest.scala

Two things to be aware about:

  1. for Scala.js we are using the Scala.js port of Jasmine instead of ScalaTest (because ScalaTest is not cross-compiled to Scala.js)
  2. in a Scala.js Jasmine test, we cannot block the current thread waiting for a result processed asynchronously - thus we must work with jasmine.Clock.tick as can be seen in the given example above

Start with ObservableOperatorsOnUnitTest.

BufferPolicy changes - new policies + add Synchronous category

Currently we've got 3 buffer policies:

  • Unbounded (use an unbounded buffer and if we run out of memory, tough luck)
  • OverflowTriggering (terminates the subscription with an onError in case the buffer size is exceeded)
  • BackPressured (once the buffer size is exceeded it applies back-pressure)

We also need 2 policies for dropping events once a buffer size is exceeded:

  1. DropIncoming - once the buffer size is exceeded, it starts dropping incoming events until downstream consumes events from the underlying buffer and room is made for new events
  2. DropIncomingThenSignal - same as above, but when the underlying buffer makes room also signals an overflow event informing downstream of how many events where dropped

Currently the equivalent for DropIncoming would be ...

observable
  .whileBusyDropEvents
  .asyncBoundary(BackPressured(bufferSize=200))

Also, the equivalent for DropIncomingThenSignal would be ...

observable
  .whileBusyDropEventsThenSignalOverflow(onOverflow)
  .asyncBoundary(BackPressured(bufferSize=200))

However in both cases the usage of the BackPressured buffer policy in combination with the whileBusyDropEvents* operators is more inefficient, plus currently there's no way to specify this when building a Channel instance.

We also need to add a Synchronous category to protect the Channel constructors from using BackPressured (which is incompatible with Channels).

Missing way to observe Either of two observables?

Hi,

I'm probably missing something, but it seems that Monifu lacks a built-in combinator for selecting Either of two observables, i.e. a function from (Observable[A], Observable[B]) => Observable[Either[A,B]] which would "trigger" when either of the two observables does.

Is it just an oversight, or are users supposed to implement it on their own? If the latter, how you suggest implementing it?

Regards,

Observable.subscribe should return Cancelable

All overloads of the user-facing Observable.subscribe should return a Cancelable that on cancel() it cancels the stream and sends an onComplete event to the observer.

This doesn't change Observable.subscribeFn or Observable.unsafeSubscribe which are still returning Unit.

Closing resources upon Cancelable.cancel

Hi,

Currently using Monifu with ScalaJS and no performance issues :) Great lib!

I am wrapping a HTML5 api called EventSource which is a continious stream of data. The methods;

class EventSource(URL : scala.Predef.String, settings : scala.scalajs.js.Dynamic = { /* compiled code */ }) extends org.scalajs.dom.raw.EventTarget {
  def url : scala.Predef.String = { /* compiled code */ }
  def withCredentials : scala.Boolean = { /* compiled code */ }
  def readyState : scala.Int = { /* compiled code */ }
  var onopen : scala.scalajs.js.Function1[org.scalajs.dom.raw.Event, _] = { /* compiled code */ }
  var onmessage : scala.scalajs.js.Function1[org.scalajs.dom.raw.MessageEvent, _] = { /* compiled code */ }
  var onerror : scala.scalajs.js.Function1[org.scalajs.dom.raw.Event, _] = { /* compiled code */ }
  def close() : scala.Unit = { /* compiled code */ }
}

When a subscription is complete (in other words, the consumer calls Cancelable.cancel() BEFORE any onNext is called. How do I get a propagation of .cancel() inside my Observable.create. I need to close the EventSource connection to avoid issues 😄

Hope to hear from you soon

Change definition of Observable.merge

The default behavior of merge should be merge(bufferPolicy=BackPressured, batchSize=0). In Monifu version 0.13.0 the default behavior is for a positive batchSize, being an even more surprise to the user than out of memory errors.

Also get rid of the multiple overloads and leave only one merge with default parameter values.

Scheduler optimizations

There are 2 issues ...

  1. Monifu has been using System.nanoTime() for scheduling stuff. But while that has much better resolution, unfortunately it's way slower than System.currentTimeMillis, as explained by this Oracle blog post
  2. the methods on Scheduler take as parameters FiniteDuration arguments for user friendliness, which is cool, except that the conversions necessary from Long (like in the debounce or timeout operators) are wasteful, so we need overrides that take 2 parameters instead a Long and a TimeUnit

50% execution time idled in scala.js Unbounded SubjectChannel

Hi,

would you please take a quick look before I dive deeper into this? I try to simplify it as much as possible. The core problem is that I felt like the app is too slow so I started profiling 2 things on Unbounded Dispatcher(SubjectChannel) with only one Component(Observer), the CPU was on 8% :

  • Dispatcher subject channel processing time from observable start to its completion - I'm just pushing Observable.from(iterableEvents) to the channel : 90 seconds - I run below example 200 times, each pushes 200 events = 4000 events
  • Individual observer/component event processing time from onNext call to Future[Ack] completion : 45 seconds

There simply 45 seconds get lost somewhere in a black hole. It is completely asynchronous, no trampoline scheduler is used, on PhantomJS... Component#onNext returns Future { dom operations & indexedDb persistence }

I think there might be something idling in the BufferedObserver... Please do you have any tips? I might be overseeing something.

UPDATE: If I use BackPressured(2) the difference shrinks to just several seconds

object Main {
      dispatcher.subscribe(mockComponent)
      start = new js.Date().getTime()
      dispatcher.last.asFuture.map {
        profile(start)
    }
}

class Dispatcher (policy: BufferPolicy = Unbounded, s: Scheduler) extends SubjectChannel[RxEvent, RxEvent](PublishSubject[RxEvent]()(s), policy)(s) {
  def subscribe(components: Component*): Unit = {
    val connectable = publish()(s)
    components.foreach( component => 
      connectable.subscribe(component)(s)
    )
    connectable.connect()
    components.foreach(_.emitTo(this))
  }
}

abstract class Component extends Observer[RxEvent] {
  def emitTo(channel: SubjectChannel[RxEvent, RxEvent])
  def onNext(elem: RxEvent): Future[Ack] = {
    try {
      onNextSafe(elem)
    } catch {
      case NonFatal(ex) => 
        onError(ex)
        Continue
    }
  }
  def onNextSafe(elem: RxEvent): Future[Ack]
  def onError(ex: Throwable): Unit
  def onComplete(): Unit
}

class MockComponent extends ComponentImpl {
    override def emitTo(channel: SubjectChannel[RxEvent, RxEvent]) = {
      val events = // 200 events
      Observable.from(events)
        .doOnComplete(channel.pushComplete())
        .foreach(e => channel.pushNext(e))
    }
    override def onNextSafe(elem: RxEvent): Future[Ack] = {
      val start = new js.Date().getTime()
      val name = elem.name
      super.onNextSafe(elem).map { x =>
        profilingMap.get(name) match {
          case Some(total) => profilingMap = profilingMap.updated(name, total + (new js.Date().getTime - start))
          case None => profilingMap = profilingMap.updated(name, new js.Date().getTime - start)
        }
        x
      }
  }

FlatMap alternative

Given this code and output :

"flatMap"- {
  Observable.from(List(1)).doWork(println).doOnComplete(println("1. done")).flatMap { e =>
   println("concatenating " + e)
   Observable.from(List(2)).doWork(println).doOnComplete(println("2. done"))
 }.asFuture
}

1
concatenating 1
2
2. done
1. done

I very often need this output

1
1. done
concatenating 1
2
2. done

Basically I'd need current flatMap that would complete observables (if they do) before the actual concatenation...

Is there currently any way to do that? Or should I write a custom operator?

Update documentation

Monifu needs good documentation and the current documentation is expired. The documentation needs to be updated and touch on the following subjects:

  • the contract of Observable and Observer
  • the API of Observable, Observer and Schedule (operators, builders)
  • usage of provided utilities that are orthogonal to Rx
  • tutorials, using introtorx.com as inspiration

This is an epic task that needs to be ready for 1.0.
Prior tickets on this issue:

  • monifu/monifu.js#6 (the original)
  • monifu/monifu.js#3
  • issue #31

Implement Observable.delay

I'm implementing delay(). It's a tricky one to get right - compared to other Rx implementations, like RxJava and RxJS, in order to be consistent with the rest of Monifu we must do safe buffering (limited buffer, either back-pressured or that triggers an error, unbounded buffering possible but not the default).

So the behavior of delay() goes like this:

  • subscription is created right from the start
  • elements are buffered within possibility and until some kind of event gets triggered
  • upon the event being triggered, streaming of items to our observer happens, starting first with the already buffered items

The following variations on delay() are going to be implemented:

  1. delay(itemDelay), uses the default buffering policy (back-pressured policy with the default buffer size)
  2. delay(policy, itemDelay) allows specifying a different buffering policy for the above
  3. delay(future) waits on the completion of the given future before starting to stream the events, uses the default buffering policy (back-pressured policy with the default buffer size)
  4. delay(policy, future) allows specifying a different buffering policy for the above
  5. delay((() => Unit, Throwable => Unit) => Cancelable) - allows fine tuning the event that ends the delay, taking a callback that is called on subscription (see examples in comment), being the base implementation for all other variants
  6. delay(policy, callback) allows specifying a different buffering policy for the above

Notably missing from RxJava - delay(observable) - so a delay that takes an observable which specifies the delay per item. But I'm not so sure that this is needed or wanted. I'm waiting on a concrete need solved by this.

I've also taken a look at delaySubscription, but from its specification it apparently drops items until the period of time has elapsed, so the naming doesn't make sense - we already have that as drop(timespan).

enhance "buffer" to allow overlap

Buffer currently chunks objects into strict subsequences of the input objects.

This enhancement would enhance buffer to have an overlap parameter so that instead of:

1,2,3,4,5,... => [1,2],[3,4],[5,6],...

we get

1,2,3,4,5,... => [1,2], [2,3], [3,4],...

monifu for scalajs 0.6.0

now that scalajs 0.6.0 is about to be released (it is on maven central already). when can we expect an updated version that works with 0.6.0?

A wish for a wiki entry on reactive application design

Hey Alexandru,

congratulations for merging JVM with JS, I upgraded all my stuff, it works perfectly.

I'm new to programming apps that have UIs. I feel like I grokked RX at the lower level but I'm still struggling at the higher one, at the application architecture level. I've been using akka for half a year and it was super easy and intuitive to design a reactive application. But when using just RX I can't find the best practice on designing an application that is composed of a few interactive components.

If we consider client-side single page app as a bunch of components (imagine twitter bootstrap grid with 4-6 cells, each one containing a component) that yield events based on user's interaction and they depend on each others events, do you have some sort of proven methodology like : a Hot Channel per component + repushing events to dependend components OR just one main observable shared by (passed along as a constructor arg to) all components, etc. etc. You know what I mean ?

Usually if user clicks at a menu item in Navbar, then it may have impact on three other components sequentially and it may throw error at the second one. This fact makes it quite non-trivial and it makes me thing that the solution is having a main hot channel shared via constructor by all components that may subscribe to it multiple times. It's basically a broadcasting with lots of very long partial functions :-) But it seems to be maintainable and sane. I'm starting to think that RX isn't suitable for this job, at least I'm never quite sure what I'm doing :-)) Maybe I should use scala.rx for the UI part but my head was spinning even more when I used it.

Would you mind write it up a little bit? Or just give me some pointers? I can't find literally nothing on the internets and you seem to be one of a few competent programmers able to do that :-)

UPDATE:

I think that flux kinda does what I said. Their dispatcher is like a main hot channel that all components subscribe to in a specific order and any action is dispatched to them accordingly. It's a unidirectional broadcasting with certain order and considering the fact that subscribers may filter out messages they are not interested in, it practically gives you all you need except for bidirectional communication between components which would cause madness anyway :-)

Beyond this pattern, I think that other Observables should be just helpers to deal with asynchronicity, like DB access or Ajax (combineLatest, AsyncSubject, etc). I have a feeling I finally grok RX even at the higher level :-)

Implement `groupBy` operator

Another important operator is the groupBy operator, this allows for the split of an Observable[Map[String, (String, String)] into multiple Observable[(String, String)]. You can find a more detailed explanation here: http://reactivex.io/documentation/operators/groupby.html

Optimize for Javascript engines

Imported from monifu/monifu.js#9.

Should eliminate synchronization concerns.

Even though such synchronization is usually light, we've got instances in which logic is in place to deal with JVM-specific concurrency problems, including usage of atomic references that on top of Javascript is totally unneeded and amounts to unnecessary boxing of primitives.

Also investigate do profiling and optimize where needed.

Add tests for Scala.js

Imported from monifu/monifu.js#2.

Focus on Scala.js has fallen behind. Right now the code compiles and packages get published for Scala.js and it theoretically works, however we need Scala.js specific testing.

Please port as many tests as possible from the monifu-rx module to monifu-rx-js. Don't bother with tests meant for concurrency issues, as Scala.js runs on Javascript runtimes so there are no multithreading issues to speak of.

Here is how a test looks like: https://github.com/monifu/monifu/blob/v0.13.0-RC5/monifu-rx-js/src/test/scala/monifu/reactive/ObservableTest.scala

Two things to be aware about:

  1. for Scala.js we are using the Scala.js port of Jasmine instead of ScalaTest (because ScalaTest is not cross-compiled to Scala.js)
  2. in a Scala.js Jasmine test, we cannot block the current thread waiting for a result processed asynchronously - thus we must work with jasmine.Clock.tick as can be seen in the given example above

Start with ObservableOperatorsOnUnitTest.

Fix / beautify ScalaDoc API documentation

Related to #27

  1. add ScalaDoc explanations where it is still needed
  2. check existing ScalaDoc to see if it renders correctly
  3. resolve issues with ambiguous method links (find out how, ask on mailing list)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.