Giter Club home page Giter Club logo

rxrust's Introduction

rxRust: a Rust implementation of Reactive Extensions

codecov

Usage

Add this to your Cargo.toml:

[dependencies]
rxrust = "1.0.0-beta.0"

Example

use rxrust:: prelude::*;

let mut numbers = observable::from_iter(0..10);
// create an even stream by filter
let even = numbers.clone().filter(|v| v % 2 == 0);
// create an odd stream by filter
let odd = numbers.clone().filter(|v| v % 2 != 0);

// merge odd and even stream again
even.merge(odd).subscribe(|v| print!("{} ", v, ));
// "0 2 4 6 8 1 3 5 7 9" will be printed.

Clone Stream

In rxrust almost all extensions consume the upstream. So when you try to subscribe a stream twice, the compiler will complain.

 # use rxrust::prelude::*;
 let o = observable::from_iter(0..10);
 o.subscribe(|_| println!("consume in first"));
 o.subscribe(|_| println!("consume in second"));

In this case, we must clone the stream.

 # use rxrust::prelude::*;
 let o = observable::from_iter(0..10);
 o.clone().subscribe(|_| println!("consume in first"));
 o.clone().subscribe(|_| println!("consume in second"));

If you want to share the same observable, you can use Subject.

Scheduler

rxrust use the runtime of the Future as the scheduler, LocalPool and ThreadPool in futures::executor can be used as schedulers directly, and tokio::runtime::Runtime is also supported, but need to enable the feature futures-scheduler. Across Scheduler to implement custom Scheduler. Some Observable Ops (such as delay, and debounce) need the ability to delay, futures-time supports this ability when set with the timer feature, but you can also customize it by setting the new_timer function to NEW_TIMER_FN variant and removing the timer feature.

use rxrust::prelude::*;

// `FuturesThreadPoolScheduler` is the alias of `futures::executor::ThreadPool`.
let threads_scheduler = FuturesThreadPoolScheduler::new().unwrap();

observable::from_iter(0..10)
  .subscribe_on(threads_scheduler.clone())
  .map(|v| v*2)
  .observe_on_threads(threads_scheduler)
  .subscribe(|v| println!("{},", v));

Also, rxrust supports WebAssembly by enabling the feature wasm-scheduler and using the crate wasm-bindgen. A simple example is here.

Converts from a Future

Just use observable::from_future to convert a Future to an observable sequence.

use rxrust::prelude::*;

let mut scheduler_pool = FuturesLocalSchedulerPool::new();
observable::from_future(std::future::ready(1), scheduler_pool.spawner())
  .subscribe(move |v| println!("subscribed with {}", v));

// Wait `task` finish.
scheduler_pool.run();

A from_future_result function is also provided to propagate errors from `Future``.

Missing Features List

See missing features to know what rxRust does not have yet.

All contributions are welcome

We are looking for contributors! Feel free to open issues for asking questions, suggesting features or other things!

Help and contributions can be any of the following:

  • use the project and report issues to the project issues page
  • documentation and README enhancement (VERY important)
  • continuous improvement in a ci Pipeline
  • implement any unimplemented operator, remember to create a pull request before you start your code, so other people know you are working on it.

you can enable the default timer by timer feature, or set a timer across function new_timer_fn

rxrust's People

Contributors

achary avatar aggalex avatar antonoellerer avatar atry avatar cfeitong avatar chmodas avatar chylynsky avatar dkuhnert avatar dpietrzyk avatar dspasojevic avatar fsandhei avatar godhand4826 avatar helgoboss avatar ishitatsuyuki avatar johntitor avatar kamva9697 avatar m-adoo avatar marcusgrass avatar mesinger avatar mnwa avatar mvalenzuelamoixa avatar mysticfall avatar neo-ciber94 avatar roman avatar sdykae avatar sw0000 avatar trequetrum avatar utilforever avatar wjian23 avatar wufniks avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rxrust's Issues

Using MutRefSubject can easily cause heap corruption

This happens in a real-world use case: In my application I have an AppSession struct which is like a singleton, so it pretty much outlives everything else. It contains a subject to which I want to publish some events over time. I want these events to be references because at the time of pushing the event I'm in a constrained thread (audio thread) where heap allocation is bad and too much copying should be avoided. In rxRust we have MutRefItemSubject so I thought I give it a try (although actually I only need immutable references, not mutable references). The events are very short-lived and only valid until the call to next() returns.

The problem is, I can't get the lifetimes right. Even worse, it's very easy to get heap corruptions. Here's some reduced test code which likely causes a corruption:

  #[test]
  fn heap_corruption() {
    // Preparation
    #[derive(Debug)]
    struct Event(String);

    impl Drop for Event {
      fn drop(&mut self) {
        println!("Dropped {:?}", self)
      }
    }

    struct AppSession {
      event_subject: MutRefItemSubject<'static, Event, ()>
    }

    impl AppSession {
      pub fn publish_event(&mut self, evt: &mut Event) {
        self.event_subject.next(evt)
      }
    }

    // Do some things
    let mut app_session = AppSession {
      event_subject: LocalSubject::new().mut_ref_item()
    };
    let mut previous_events: Vec<&mut Event> = vec!();
    app_session.event_subject.clone().subscribe(move |current_event| {
      if !previous_events.is_empty() {
        println!("Previous events were: {:?}", previous_events);
        // This is the final straw. Without this line it sometimes work, but only because we 
        // are lucky. The actual root issue however is at DANGER comment.
        previous_events.iter_mut().for_each(|evt| evt.0.push_str(" modified"))
      }
      println!("Got new event: {:?}\n", current_event);
      // DANGER: This is bad. Ideally, that shouldn't be possible, but it is.
      previous_events.push(current_event);
    });
    {
      let mut short_living_event = Event(String::from("1"));
      app_session.publish_event(&mut short_living_event);
    }
    {
      let mut short_living_event = Event(String::from("2"));
      app_session.publish_event(&mut short_living_event);
    }
  }

See the DANGER comment. Here I would expect that current_event is exposed with a lifetime that doesn't exceed the observer's next method call. Then Rust compiler would not compile this code.

@M-Adoo I know this is made possible because of the unsafe line in mut_ref_subject.rs ... do you think there's a safe way of implementing (mut) ref subjects?

Unit tests fail when running in docker container

When cloned master branch (dec6bff SHA)
and run in docker container (rustlang/rust:nightly-buster-slim from Docker Hub here) then the unit test execution with cargo test fails throwing few errors.

Most of them seem to be because of sleep-based timing used in the testing code.
It really looks like timing characteristic of those tests cannot be met in some environments (in this case - my machine + docker container).

Here is an example trace:

---- ops::throttle_time::smoke stdout ----
thread 'ops::throttle_time::smoke' panicked at 'assertion failed: `(left == right)`
  left: `[9, 18, 26, 32, 41, 51, 61, 71, 78]`,
 right: `[9, 19, 29, 39, 49, 59, 69, 79, 89, 99]`', src/ops/throttle_time.rs:202:3
stack backtrace:
   0: backtrace::backtrace::libunwind::trace
             at /cargo/registry/src/github.com-1ecc6299db9ec823/backtrace-0.3.40/src/backtrace/libunwind.rs:88
   1: backtrace::backtrace::trace_unsynchronized
             at /cargo/registry/src/github.com-1ecc6299db9ec823/backtrace-0.3.40/src/backtrace/mod.rs:66
   2: std::sys_common::backtrace::_print_fmt
             at src/libstd/sys_common/backtrace.rs:77
   3: <std::sys_common::backtrace::_print::DisplayBacktrace as core::fmt::Display>::fmt
             at src/libstd/sys_common/backtrace.rs:61
   4: core::fmt::write
             at src/libcore/fmt/mod.rs:1028
   5: std::io::Write::write_fmt
             at /rustc/50f8aadd746ebc929a752e5ffb133936ee75c52f/src/libstd/io/mod.rs:1412
   6: std::io::impls::<impl std::io::Write for alloc::boxed::Box<W>>::write_fmt
             at src/libstd/io/impls.rs:141
   7: std::sys_common::backtrace::_print
             at src/libstd/sys_common/backtrace.rs:65
   8: std::sys_common::backtrace::print
             at src/libstd/sys_common/backtrace.rs:50
   9: std::panicking::default_hook::{{closure}}
             at src/libstd/panicking.rs:188
  10: std::panicking::default_hook
             at src/libstd/panicking.rs:202
  11: std::panicking::rust_panic_with_hook
             at src/libstd/panicking.rs:464
  12: std::panicking::continue_panic_fmt
             at src/libstd/panicking.rs:373
  13: std::panicking::begin_panic_fmt
             at src/libstd/panicking.rs:328
  14: rxrust::ops::throttle_time::smoke
             at src/ops/throttle_time.rs:202
  15: rxrust::ops::throttle_time::smoke::{{closure}}
             at src/ops/throttle_time.rs:184
  16: core::ops::function::FnOnce::call_once
             at /rustc/50f8aadd746ebc929a752e5ffb133936ee75c52f/src/libcore/ops/function.rs:227
  17: <alloc::boxed::Box<F> as core::ops::function::FnOnce<A>>::call_once
             at /rustc/50f8aadd746ebc929a752e5ffb133936ee75c52f/src/liballoc/boxed.rs:942
  18: __rust_maybe_catch_panic
             at src/libpanic_unwind/lib.rs:79
  19: std::panicking::try
             at /rustc/50f8aadd746ebc929a752e5ffb133936ee75c52f/src/libstd/panicking.rs:265
  20: std::panic::catch_unwind
             at /rustc/50f8aadd746ebc929a752e5ffb133936ee75c52f/src/libstd/panic.rs:396
  21: test::run_test_in_process
             at src/libtest/lib.rs:570
  22: test::run_test::run_test_inner::{{closure}}
             at src/libtest/lib.rs:473
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.


failures:
    ops::subscribe_on::test::new_thread
    ops::throttle_time::smoke

Sometimes these errors are due to really near misses, like:

---- ops::throttle_time::smoke stdout ----
thread 'ops::throttle_time::smoke' panicked at 'assertion failed: `(left == right)`
  left: `[9, 18, 28, 38, 48, 58, 68, 78, 88, 98]`,
 right: `[9, 19, 29, 39, 49, 59, 69, 79, 89, 99]`', src/ops/throttle_time.rs:202:3
stack backtrace:
...

being off by a single millisecond, but still.

doesn't work on Rust's stable release channel?

Thanks for sharing the lib.

Tried to follow the docs on crates.io and, when compiling, I got:

error[E0554]: #![feature] may not be used on the stable release channel
at rxrust-0.10.0/src/lib.rs:2:1

It would be nice to add why using the non stable Rust would be a good idea and why this dependency requires it...
and, if possible, to give an estimate when this library may become usable by the stable Rust -- when it will be "Production Ready".

subject should support emit value by mut ref

Observer emit item by value, for Subject we bound the item with Copy, that because subject is a multicast stream, need pass item to multi subscriber. So we can emit item T: Copy or &T, but can't emit &mut T.

// these work
Subject::local().next(1);
Subject::local().next(&1);

// emit `&mut T` will not complied. 
Subject::local().next(&mut 1);

But accept an item and change it, it's a useful and normal scene: just subscribe a stream, do some thing for the source emitter (maybe need change its state) when event emit.

subject.subscribe(|obj: &mut T| {
   obj.change_to_next_state();
   // or
   obj.x = x;
})

In logic, this should work, for now Observable can emit &mut T, Subject can't because Copy bound. But subject only pass the item to multi subscribers, never alias the item, what it really do is just like :

fn next<Item>(&mut self,  value &mut Item) {
   self.observers.for_each(|subscriber |{
       subscriber.next(value);
  })
}

But I can't find a way to write an specialization implement Observer<&mut Item, _> for Subject.

Should tracking rust-lang/rust#31844

Update old group_by example in the docs

I imagine the group_by example in the docs was created before flat_map was implemented. My feeling is that this should be updated. A common mantra in the reactive extensions crowd is to never nest subscriptions.

The following should be semantically equivalent:

Current Example:

observable::from_iter([
  Person{ name: String::from("John"), age: 26 },
  Person{ name: String::from("Anne"), age: 28 },
  Person{ name: String::from("Gregory"), age: 24 },
  Person{ name: String::from("Alice"), age: 28 },
])
.group_by(|person: &Person| person.age)
.subscribe(|group| {
  group
  .reduce(|acc, person| format!("{} {}", acc, person.name))
  .subscribe(|result| println!("{}", result));
});

My Suggestion:

observable::from_iter([
    Person{ name: String::from("John"), age: 26 },
    Person{ name: String::from("Anne"), age: 28 },
    Person{ name: String::from("Gregory"), age: 24 },
    Person{ name: String::from("Alice"), age: 28 },
])
.group_by(|person: &Person| person.age)
.flat_map(|group| group.reduce(|acc, person| format!("{} {}", acc, person.name)))
.subscribe(|result| println!("{}", result));

Trait refactoring

I believe traits used in rxRust contain a couple of anti-patterns and we should change some of those definitions.

  1. Observable should be a trait. The current struct Observable should be renamed. (Even official ReactiveX libraries has Observable as an abstract class; so having it as a struct is pretty strange.)
    EDIT: Also, consider using associated type for Observable, otherwise it would cause trouble when used as a bound.
    It seems that what should be (the core of) Observable is currently called RawSubscribable. See below for additional provided methods that should be included in the Observable trait.
  2. RawSubscribable is generic on Subscriber. It shouldn't be like this; it should be generic on Item and Err instead.
  3. Operators are exposed as extension traits, and that's even different from how extension traits are commonly used. See an example from Differential Dataflow as how extension traits are commonly implemented. Bounds should exist on the implementation, not the trait itself. Otherwise, it results in indecipherable generated API docs.
    Though, in the case of rxRust I believe we don't need them, as there's only one type that we are interested in: Observable. So we should just implement all of the operators as provided methods on the Observable trait. (This kind of design is more common, examples are Iterator & itertools and Future)
  4. Maybe also consider eliminating the Error type on observables? Although this will make rxRust different from other implementations, a few crates, namely futures, has moved to opt-in error handling with Result, instead of Item, Err generic arguments.

FlattenOp<S, Inner> doesn't satisfy _: LocalObservable<'_> or _: SubscribeNext<_>

Hi,
I use rxrust version: 0.14 and cannot subscribe to the sequence.

The example as follows:

fn test() {
    rxrust::observable::of(100)
        .flat_map(|crit| find_by(&crit))
        .subscribe(|result| println!("{}", result));
}

fn find_by(crit: &usize) -> LocalBoxOp<String, ()> {
    rxrust::observable::repeat(
        || "FooBar",
        *crit)
        .map(|f| f())
        .box_it()
}

The compilation fails with the error message:

error[E0599]: the method `subscribe` exists for struct `FlattenOp<MapOp<ObservableBase<OfEmitter<usize>>, [closure@src/test.rs:80:19: 80:40]>, BoxOp<Box<dyn BoxObservable<'_, Item = std::string::String, Err = ()>>>>`, but its trait bounds were not satisfied
  --> src/test.rs:81:10
   |
81 |         .subscribe(|result| println!("{}", result));
   |          ^^^^^^^^^ method cannot be called on `FlattenOp<MapOp<ObservableBase<OfEmitter<usize>>, [closure@src/test.rs:80:19: 80:40]>, BoxOp<Box<dyn BoxObservable<'_, Item = std::string::String, Err = ()>>>>` due to unsatisfied trait bounds
   | 
  ::: /XXX/.cargo/registry/src/github.com-1ecc6299db9ec823/rxrust-0.14.0/src/ops/flatten.rs:11:1
   |
11 | pub struct FlattenOp<S, Inner> {
   | ------------------------------
   | |
   | doesn't satisfy `_: LocalObservable<'_>`
   | doesn't satisfy `_: SubscribeNext<_>`
   |
   = note: the following trait bounds were not satisfied:
           `FlattenOp<MapOp<ObservableBase<OfEmitter<usize>>, [closure@src/test.rs:80:19: 80:40]>, BoxOp<Box<dyn BoxObservable<'_, Item = std::string::String, Err = ()>>>>: LocalObservable<'_>`
           which is required by `FlattenOp<MapOp<ObservableBase<OfEmitter<usize>>, [closure@src/test.rs:80:19: 80:40]>, BoxOp<Box<dyn BoxObservable<'_, Item = std::string::String, Err = ()>>>>: SubscribeNext<_>`
           `<&FlattenOp<MapOp<ObservableBase<OfEmitter<usize>>, [closure@src/test.rs:80:19: 80:40]>, BoxOp<Box<dyn BoxObservable<'_, Item = std::string::String, Err = ()>>>> as Observable>::Err = ()`
           which is required by `&FlattenOp<MapOp<ObservableBase<OfEmitter<usize>>, [closure@src/test.rs:80:19: 80:40]>, BoxOp<Box<dyn BoxObservable<'_, Item = std::string::String, Err = ()>>>>: SubscribeNext<_>`
           `&FlattenOp<MapOp<ObservableBase<OfEmitter<usize>>, [closure@src/test.rs:80:19: 80:40]>, BoxOp<Box<dyn BoxObservable<'_, Item = std::string::String, Err = ()>>>>: Observable`
           which is required by `&FlattenOp<MapOp<ObservableBase<OfEmitter<usize>>, [closure@src/test.rs:80:19: 80:40]>, BoxOp<Box<dyn BoxObservable<'_, Item = std::string::String, Err = ()>>>>: SubscribeNext<_>`
           `&FlattenOp<MapOp<ObservableBase<OfEmitter<usize>>, [closure@src/test.rs:80:19: 80:40]>, BoxOp<Box<dyn BoxObservable<'_, Item = std::string::String, Err = ()>>>>: LocalObservable<'_>`
           which is required by `&FlattenOp<MapOp<ObservableBase<OfEmitter<usize>>, [closure@src/test.rs:80:19: 80:40]>, BoxOp<Box<dyn BoxObservable<'_, Item = std::string::String, Err = ()>>>>: SubscribeNext<_>`
           `<&mut FlattenOp<MapOp<ObservableBase<OfEmitter<usize>>, [closure@src/test.rs:80:19: 80:40]>, BoxOp<Box<dyn BoxObservable<'_, Item = std::string::String, Err = ()>>>> as Observable>::Err = ()`
           which is required by `&mut FlattenOp<MapOp<ObservableBase<OfEmitter<usize>>, [closure@src/test.rs:80:19: 80:40]>, BoxOp<Box<dyn BoxObservable<'_, Item = std::string::String, Err = ()>>>>: SubscribeNext<_>`
           `&mut FlattenOp<MapOp<ObservableBase<OfEmitter<usize>>, [closure@src/test.rs:80:19: 80:40]>, BoxOp<Box<dyn BoxObservable<'_, Item = std::string::String, Err = ()>>>>: Observable`
           which is required by `&mut FlattenOp<MapOp<ObservableBase<OfEmitter<usize>>, [closure@src/test.rs:80:19: 80:40]>, BoxOp<Box<dyn BoxObservable<'_, Item = std::string::String, Err = ()>>>>: SubscribeNext<_>`
           `&mut FlattenOp<MapOp<ObservableBase<OfEmitter<usize>>, [closure@src/test.rs:80:19: 80:40]>, BoxOp<Box<dyn BoxObservable<'_, Item = std::string::String, Err = ()>>>>: LocalObservable<'_>`
           which is required by `&mut FlattenOp<MapOp<ObservableBase<OfEmitter<usize>>, [closure@src/test.rs:80:19: 80:40]>, BoxOp<Box<dyn BoxObservable<'_, Item = std::string::String, Err = ()>>>>: SubscribeNext<_>`

How can I fix it? What do I do wrongly here?

trait `Observable` should generic on Item and Err, not on Observer and Subscription

It's should look like below:

pub trait Observable<'a> {
  type Item;
  type Err;
  type Unsub: SubscriptionLike + 'static;
  fn actual_subscribe<O: Observer<Self::Item, Self::Err> + 'a>(
    self,
    subscriber: Subscriber<O, LocalSubscription>,
  ) -> Self::Unsub;
}

Block on

  • 1. #68
  • 2. ObservableFromFn should use a boxed subscriber in closure, this source Observable will no longer zero-cost abstraction. #73
  • 3. LocalSubject emit value by mut ref should not depend on HRTBs any more. #73
  • 4. shared bounds #73
    when generic on actual_subscribe method and not bound with Send + Sync, how do we convert a local thread observable to shared observable.

How does this interop with Rust's async ecosystem?

I'm very curious about how this project will interoperate with Rust's async ecosystem? Specifically, what's the relation between Observables and future-rs's Streams, and Operators and Stream combinators? It seems that both can represent a series of asynchronous values, and we probably want to avoid duplicate work here.

Struggling to return an observable from a function

I can use the library by specifying the whole chain:

from_iter(1..3).subscribe(|v| print!("{}", v))

But if I try to return the intermediate value it does not work:

pub fn get_ob() -> ObservableBase<IterEmitter<SensorEmitter>> {
  from_iter(1..3)
}

This is the type returned by from_iter but IterEmitter is private so I cannot find a return type that works.. How am I supposed to use it?

Unit test failed

The unit test ops::observe_on::test::switch_thread will always failed in a single core machine. Also may failed in multi-core.

I have feature request

Maybe this feature already exists, but is there a PublishSubject or a BehaviouralSubject or any kind of Subject in rxRust.

Kind Regards, Jakob.

PS: I love your library! ๐Ÿ‘

`RawSubscribable` redesign

For now RawSubscribable define like this:

pub trait RawSubscribable {
  /// The type of the elements being emitted.
  type Item;
  // The type of the error may propagating.
  type Err;

  fn raw_subscribe(
    self,
    subscribe: impl RxFn(RxValue<&'_ Self::Item, &'_ Self::Err>)
    + Send
    + Sync
    + 'static,
  ) -> Box<dyn Subscription + Send + Sync>;
}

We add bound Sync + Send + 'static to subscribe, so all user code must be thread safe. So, we use so much Arc, Mutex, RwLock or other wrapper for datas. But in many times, code run on local thread, we shouldn't pay these cost, and suffer these limit.

Those bounds should only need when use Scheduler to switch thread.

In other hand, looks like every operator always run in a same thread except observe_on. So we needn't share data between threads, data races can be avoid.

So we need to redesign RawSubscribable:

pub trait RawSubscribable<S>
where
  S: Subscribe,
{
  /// a type implemented [`Subscription`]
  type Unsub;

  fn raw_subscribe(self, subscribe: S) -> Self::Unsub;
}

We let subscribe to be a generic type, so when we implement an operator we can add Send bound to subscribe or not. Only If the subscribe and upstream/downstream satisfied Send it can switch to another thread.

What is the correct way to construct an observable trait object?

I have a situation where I often need to accept a list of observables but cannot figure out a common trait between them that is object safe. Specifically, subscribing to a dyn Observable, where sometimes Item is a specific type, and other times a generic one.

any help would be appreciated.

Module `interval` is private

Hi!
Thanks for sharing this library!

Problem

I would like to periodically (e.g. every 5 min) fetch some data.
This data will be transformed etc, that's why I want to encapsulate the whole logic inside one method and be able to create/expose Observable which will be emitting transformed data structure.

Unfortunately, when I want to return "Stream" from function I receive error error[E0603]: module 'interval' is private

Simple code example

A simplified version of my problem:

use rxrust:: prelude::*;

use rxrust::ops::map::MapOp;
use futures::executor::{LocalSpawner, LocalPool};
use std::time::Duration;

// use rxrust::observable::interval::IntervalEmitter;          // <-- error[E0603]: module `interval` is private
// use rxrust::prelude::observable::interval::IntervalEmitter; // <-- error[E0603]: module `interval` is private
use rxrust::prelude::interval::IntervalEmitter;                // <-- error[E0603]: module `interval` is private

pub fn get_data_stream() -> MapOp<ObservableBase<IntervalEmitter<LocalSpawner>>, fn(usize) -> i8> {
    let mut local_scheduler = LocalPool::new();
    let duration = Duration::from_secs(1);

    let int = interval(duration, local_scheduler.spawner())
        .map(|_| 4);

    local_scheduler.run();

    int
}

Produce error:
image

This is a bug or feature? ๐Ÿ™ˆ

I am quite new in Rust world (I came from JS world where RxJs library is well known) and I am not sure if I am doing sth incorrect or there is a bug in this library ๐Ÿ˜ญ

I will be grateful for any help ๐Ÿ™

How can I return multiple type of observable in the same function?

Let say I want to make function like following,

  fn foo(condition: bool) -> Box<dyn Observable<Item=i32, Err=()>> {
    if condition {
      Box::new(observable::of(100))
    }
    else {
      Box::new(observable::of(100).map(|x| x+1))
    }
  }

This is not possible now, because Observable trait is not object safe.
But, from what I understand, it was object safe before Sized maker is deleted in the commit 49503e7.
I get that deleting Sized marker made API's a lot cleaner, but I am wondering if we can bring back object safety of Observable.
Or, please let me know if there are some way of doing such things without using trait object.

Implement `Retry` Operator

Hello rxRust team,

I am currently learning Rust. I have worked a lot with RxJs in Typescript.
I would be interested in having a go at implementing the Retry operator.

Are you open to reviewing and merging (given correct implementation and adherence to your style) a PR for this?
Do you have any pointers for me before I start?

Best regards,
Sasha

Consider renaming `Fork`

Fork seems to be a name that no one is familiar with. It's also confusing because it isn't exactly multicast as you would imagine from the word "fork".

I'd suggest that we just make it Clone (with a bound) instead. Does that make sense and would that work with the current implementations?

Run tasks in parallel with `observe_on`

Hey,
While further developing with the library, I noticed behaviour which I did not expect from the documentation:

  #[test]
  fn should_run_map_in_parallel() {
    let pool = ThreadPool::new().unwrap();
    observable::from_iter(1..10)
      .observe_on(pool.clone())
      .map(|v| {
        thread::sleep(Duration::from_secs(1));
        println!(
          "Mapping {} on {:?} at {:?}",
          v,
          thread::current().id(),
          SystemTime::now()
        );
        v
      })
      .into_shared()
      .subscribe(|v| {
        let handle = thread::current();
        println!(
          "Received {} on {:?} at {:?}",
          v,
          handle.id(),
          SystemTime::now()
        )
      });
    thread::sleep(Duration::from_secs(20));
  }

While one would expect that the tasks in the map function would run in parallel, I observe the following output:

Mapping 1 on ThreadId(3) at SystemTime { tv_sec: 1672906363, tv_nsec: 227367839 }
Received 1 on ThreadId(3) at SystemTime { tv_sec: 1672906363, tv_nsec: 227382646 }
Mapping 2 on ThreadId(4) at SystemTime { tv_sec: 1672906364, tv_nsec: 227563828 }
Received 2 on ThreadId(4) at SystemTime { tv_sec: 1672906364, tv_nsec: 227578774 }
Mapping 3 on ThreadId(5) at SystemTime { tv_sec: 1672906365, tv_nsec: 227742914 }
Received 3 on ThreadId(5) at SystemTime { tv_sec: 1672906365, tv_nsec: 227777766 }
Mapping 4 on ThreadId(7) at SystemTime { tv_sec: 1672906366, tv_nsec: 228213876 }
Received 4 on ThreadId(7) at SystemTime { tv_sec: 1672906366, tv_nsec: 228274220 }
Mapping 5 on ThreadId(6) at SystemTime { tv_sec: 1672906367, tv_nsec: 228547805 }
Received 5 on ThreadId(6) at SystemTime { tv_sec: 1672906367, tv_nsec: 228563799 }
Mapping 6 on ThreadId(8) at SystemTime { tv_sec: 1672906368, tv_nsec: 228751755 }
Received 6 on ThreadId(8) at SystemTime { tv_sec: 1672906368, tv_nsec: 228787096 }
Mapping 7 on ThreadId(9) at SystemTime { tv_sec: 1672906369, tv_nsec: 228991885 }
Received 7 on ThreadId(9) at SystemTime { tv_sec: 1672906369, tv_nsec: 229009486 }
Mapping 8 on ThreadId(10) at SystemTime { tv_sec: 1672906370, tv_nsec: 229211760 }
Received 8 on ThreadId(10) at SystemTime { tv_sec: 1672906370, tv_nsec: 229229430 }
Mapping 9 on ThreadId(11) at SystemTime { tv_sec: 1672906371, tv_nsec: 229670988 }
Received 9 on ThreadId(11) at SystemTime { tv_sec: 1672906371, tv_nsec: 229689636 }

Did I make a mistake here, or is there some issue in the observe_on implementation?

Better usage examples

I have been trying to figure out this library on and off for a few months, and its complexity and lack of examples has been quite a challenge. I would love to see how people are managing lifetimes and threading with types like SharedSubject.

`group_by` subscribes again on each new group

Hey,
While writing a short program which should feed objects read from a TCP connection into a stream, I noticed that there
seems to be an issue with the group_by operator, which results in it not completing. (No output is printed)
Do you know how to solve this, or whether this is a bug in the operator?

#[cfg(test)]
mod tests {
    use std::net::TcpListener;
    use rxrust::prelude::*;

    #[test]
    fn it_groups_with_tcp_listener() {
        let mut obs_count = 0;
        observable::create(|subscriber| {
            if let Ok(_) = TcpListener::bind("127.0.0.1:8080") {
                subscriber.next(1);
                subscriber.complete();
            };
        })
        .group_by(|value: &i64| *value)
        .subscribe(move |group| {
            group.subscribe(|_| {
                obs_count += 1;
            });
        });
        assert_eq!(1, obs_count);
    }
}

Best regards
Anton Oellerer

Allow subjects to be used for non-copyable items

Currently subjects only work for copyable items or references (please correct me if I'm wrong or if there's a workaround). I wish there would be a way to support items which are not copyable but implement Clone. Or at least a way to use items wrapped in shared pointers (Rc).

I would be happy to make an implementation attempt once the trait refactoring (#62) has progressed.

Background: I have a struct which I would like to use as observable item. However, it's impossible to implement Copy because it owns a String. One workaround would be to use references but then I have to deal with lifetime issues which I don't want and is also unnecessary in my case. So I was thinking of wrapping the item in an Rc (cloning String over and over again wouldn't be the best idea anyway) ... but Rc is also not copyable.

GTK-rs and observe_on

Hi, I'm new to Rust but pretty familiar with ReactiveX (both RxJava and RxPy).
I'm currently trying to write a GTK application with Rust and I was wondering if I could use RxRust for this, like I already do in my Python apps.

In particular I would like to know how, with RxRust, I can go back to the original thread if I change the scheduler using the subscribe_on operator.
In RxPy I can go back to the GTK mainloop with observe_on(GtkScheduler(GLib)). This is necessary because GTK widgets can be updated only from the main thread.

Is there already something similar with RxRust?

Custom "Disposable" from create

When creating a new Observable using create the original c# implementation allows to return a custom IDisposable so one can have more control over the completion of the Observable. With rxRust there's apparently no way to do that?

How can I return an observable sequence from a function declared in a trait?

Hi,

I have a case as follows:

pub trait Foo {
  fn foo(&self) -> Box<dyn rxrust::Observable<Item=Bar, Err=()>>;
}

I cannot return boxed trait because of Sized constraint. What options do I have to return a sequence as a function result. I know that:

fn foo() -> impl rxrust::Observable<Item=Bar, Err=()> { ... }

will work, but I need the function foo to be struct bound.

How can I return an observable sequence from a function declared in a trait?

Lifetime limit by `Subject` when emit `&mut _`

For now, when we call Subject::next to emit &mut _, It require the item lifetime is longer than subject its self, we must write like below, item's lifetime must longer than subject.

let mut item = 0;
let mut subject: LocalSubject<'_, _, ()> = Subject::local();
subject.next(&mut item);

A more comfortable way is not allowed,

    let mut subject: LocalSubject<'_, _, ()> = Subject::local();
    subject.next(&mut 1);

That because in Subject implementation Box<Publisher<Item, Value> + 'a> introduced item must have a longer lifetime.

Anyway to avoid it ?

from_fn | Correctly typing result

Hello Team!

First of all, I am relatively new to rust, so please do bear with me.

Following problem:
I'm trying to create composable functions which use observables as a streaming mechanism.

e.g.

fn some_composable_stream() -> ???? {
    observable::create(|mut subscriber: Subscriber<????>| {
        let mut counter = 0;
        while counter < 1000 {
            counter += 1;
            subscriber.next(counter);
            sleep(Duration::from_millis(50));
        }

        if (counter > 1000) {
            subscriber.error("Guess numbers are broken now")
        } else {
            subscriber.complete();
        }
    })
}

However - Problem

  1. to just use the resulting observable as a type I need to declare the Item and Err since without the subscribe function rust complains that It can not infer the generic.
  2. to properly return the observable I'll also need the correct type.

I've tried a lot of variations now but I am pretty stuck on how to define the type correctly.

Maybe you are able to give me some assistance here.

`rxrust = "0.15"` seems to be broken, and thus won't build

Hello, I just was swinging in to give rxrust a try, however, it seems to be broken. The resulting error message is as follows:

 Checking rxrust v0.15.0
error[E0277]: `*mut u8` cannot be sent between threads safely
  --> C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:33:16
   |
14 | ) -> (impl Future<Output = ()>, SpawnHandle) {
   |       ------------------------ within this `impl futures::Future<Output = ()>`
...
33 |     self.spawn(f);
   |          ----- ^ `*mut u8` cannot be sent between threads safely
   |          |
   |          required by a bound introduced by this call
   |
   = help: within `impl futures::Future<Output = ()>`, the trait `std::marker::Send` is not implemented for `*mut u8`    
   = note: required because it appears within the type `PhantomData<*mut u8>`
   = note: required because it appears within the type `wasm_bindgen::JsValue`
   = note: required because it appears within the type `ManuallyDrop<wasm_bindgen::JsValue>`
   = note: required because it appears within the type `wasm_bindgen::closure::Closure<(dyn FnMut() + 'static)>`
   = note: required because it appears within the type `Option<wasm_bindgen::closure::Closure<(dyn FnMut() + 'static)>>` 
   = note: required because it appears within the type `gloo_timers::callback::Timeout`
   = note: required because it appears within the type `gloo_timers::future::TimeoutFuture`
   = note: required because it appears within the type `async_std::utils::timer::Timer`
   = note: required because it appears within the type `async_std::future::future::delay::DelayFuture<futures::future::Lazy<[closure@C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:15:18: 15:33]>>`
   = note: required because it appears within the type `Abortable<async_std::future::future::delay::DelayFuture<futures::future::Lazy<[closure@C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:15:18: 15:33]>>>`
   = note: required because it appears within the type `futures::future::future::map::Map<Abortable<async_std::future::future::delay::DelayFuture<futures::future::Lazy<[closure@C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:15:18: 15:33]>>>, [closure@C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:17:12: 17:18]>`
   = note: required because it appears within the type `futures::future::Map<Abortable<async_std::future::future::delay::DelayFuture<futures::future::Lazy<[closure@C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:15:18: 15:33]>>>, [closure@C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:17:12: 17:18]>`
   = note: required because it appears within the type `impl futures::Future<Output = ()>`
note: required by a bound in `scheduler::SharedScheduler::spawn`
  --> C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:24:32
   |
22 |   fn spawn<Fut>(&self, future: Fut)
   |      ----- required by a bound in this
23 |   where
24 |     Fut: Future<Output = ()> + Send + 'static;
   |                                ^^^^ required by this bound in `scheduler::SharedScheduler::spawn`

error[E0277]: `(dyn FnMut() + 'static)` cannot be sent between threads safely
  --> C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:33:16
   |
33 |     self.spawn(f);
   |          ----- ^ `(dyn FnMut() + 'static)` cannot be sent between threads safely
   |          |
   |          required by a bound introduced by this call
   |
   = help: the trait `std::marker::Send` is not implemented for `(dyn FnMut() + 'static)`
   = note: required because of the requirements on the impl of `std::marker::Send` for `Unique<(dyn FnMut() + 'static)>` 
   = note: required because it appears within the type `Box<(dyn FnMut() + 'static)>`
   = note: required because it appears within the type `ManuallyDrop<Box<(dyn FnMut() + 'static)>>`
   = note: required because it appears within the type `wasm_bindgen::closure::Closure<(dyn FnMut() + 'static)>`
   = note: required because it appears within the type `Option<wasm_bindgen::closure::Closure<(dyn FnMut() + 'static)>>` 
   = note: required because it appears within the type `gloo_timers::callback::Timeout`
   = note: required because it appears within the type `gloo_timers::future::TimeoutFuture`
   = note: required because it appears within the type `async_std::utils::timer::Timer`
   = note: required because it appears within the type `async_std::future::future::delay::DelayFuture<futures::future::Lazy<[closure@C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:15:18: 15:33]>>`
   = note: required because it appears within the type `Abortable<async_std::future::future::delay::DelayFuture<futures::future::Lazy<[closure@C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:15:18: 15:33]>>>`
   = note: required because it appears within the type `futures::future::future::map::Map<Abortable<async_std::future::future::delay::DelayFuture<futures::future::Lazy<[closure@C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:15:18: 15:33]>>>, [closure@C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:17:12: 17:18]>`
   = note: required because it appears within the type `futures::future::Map<Abortable<async_std::future::future::delay::DelayFuture<futures::future::Lazy<[closure@C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:15:18: 15:33]>>>, [closure@C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:17:12: 17:18]>`
   = note: required because it appears within the type `impl futures::Future<Output = ()>`
note: required by a bound in `scheduler::SharedScheduler::spawn`
  --> C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:24:32
   |
22 |   fn spawn<Fut>(&self, future: Fut)
   |      ----- required by a bound in this
23 |   where
24 |     Fut: Future<Output = ()> + Send + 'static;
   |                                ^^^^ required by this bound in `scheduler::SharedScheduler::spawn`

error[E0277]: `*mut u8` cannot be sent between threads safely
  --> C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:44:16
   |
44 |     self.spawn(f.map(|_| ()));
   |          ----- ^^^^^^^^^^^^^ `*mut u8` cannot be sent between threads safely
   |          |
   |          required by a bound introduced by this call
   |
   = help: within `futures::future::Map<impl futures::Future<Output = ()>, [closure@C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:44:22: 44:28]>`, the trait `std::marker::Send` is not implemented for `*mut u8`
   = note: required because it appears within the type `PhantomData<*mut u8>`
   = note: required because it appears within the type `wasm_bindgen::JsValue`
   = note: required because it appears within the type `ManuallyDrop<wasm_bindgen::JsValue>`
   = note: required because it appears within the type `wasm_bindgen::closure::Closure<(dyn FnMut() + 'static)>`
   = note: required because it appears within the type `Option<wasm_bindgen::closure::Closure<(dyn FnMut() + 'static)>>` 
   = note: required because it appears within the type `gloo_timers::callback::Timeout`
   = note: required because it appears within the type `gloo_timers::future::TimeoutFuture`
   = note: required because it appears within the type `async_std::utils::timer::Timer`
   = note: required because it appears within the type `async_std::future::future::delay::DelayFuture<futures::future::Then<futures::future::Ready<()>, ForEach<Interval, futures::future::Ready<()>, [closure@C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:162:63: 166:8]>, [closure@C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:160:11: 167:6]>>`
   = note: required because it appears within the type `impl futures::Future<Output = ()>`
   = note: required because it appears within the type `Abortable<impl futures::Future<Output = ()>>`
   = note: required because it appears within the type `futures::future::future::map::Map<Abortable<impl futures::Future<Output = ()>>, [closure@C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:149:12: 149:18]>`
   = note: required because it appears within the type `futures::future::Map<Abortable<impl futures::Future<Output = ()>>, [closure@C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:149:12: 149:18]>`
   = note: required because it appears within the type `impl futures::Future<Output = ()>`
   = note: required because it appears within the type `futures::future::future::map::Map<impl futures::Future<Output = ()>, [closure@C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:44:22: 44:28]>`
   = note: required because it appears within the type `futures::future::Map<impl futures::Future<Output = ()>, [closure@C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:44:22: 44:28]>`
note: required by a bound in `scheduler::SharedScheduler::spawn`
  --> C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:24:32
   |
22 |   fn spawn<Fut>(&self, future: Fut)
   |      ----- required by a bound in this
23 |   where
24 |     Fut: Future<Output = ()> + Send + 'static;
   |                                ^^^^ required by this bound in `scheduler::SharedScheduler::spawn`

error[E0277]: `(dyn FnMut() + 'static)` cannot be sent between threads safely
  --> C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:44:16
   |
44 |     self.spawn(f.map(|_| ()));
   |          ----- ^^^^^^^^^^^^^ `(dyn FnMut() + 'static)` cannot be sent between threads safely
   |          |
   |          required by a bound introduced by this call
   |
   = help: the trait `std::marker::Send` is not implemented for `(dyn FnMut() + 'static)`
   = note: required because of the requirements on the impl of `std::marker::Send` for `Unique<(dyn FnMut() + 'static)>` 
   = note: required because it appears within the type `Box<(dyn FnMut() + 'static)>`
   = note: required because it appears within the type `ManuallyDrop<Box<(dyn FnMut() + 'static)>>`
   = note: required because it appears within the type `wasm_bindgen::closure::Closure<(dyn FnMut() + 'static)>`
   = note: required because it appears within the type `Option<wasm_bindgen::closure::Closure<(dyn FnMut() + 'static)>>` 
   = note: required because it appears within the type `gloo_timers::callback::Timeout`
   = note: required because it appears within the type `gloo_timers::future::TimeoutFuture`
   = note: required because it appears within the type `async_std::utils::timer::Timer`
   = note: required because it appears within the type `async_std::future::future::delay::DelayFuture<futures::future::Then<futures::future::Ready<()>, ForEach<Interval, futures::future::Ready<()>, [closure@C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:162:63: 166:8]>, [closure@C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:160:11: 167:6]>>`
   = note: required because it appears within the type `impl futures::Future<Output = ()>`
   = note: required because it appears within the type `Abortable<impl futures::Future<Output = ()>>`
   = note: required because it appears within the type `futures::future::future::map::Map<Abortable<impl futures::Future<Output = ()>>, [closure@C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:149:12: 149:18]>`
   = note: required because it appears within the type `futures::future::Map<Abortable<impl futures::Future<Output = ()>>, [closure@C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:149:12: 149:18]>`
   = note: required because it appears within the type `impl futures::Future<Output = ()>`
   = note: required because it appears within the type `futures::future::future::map::Map<impl futures::Future<Output = ()>, [closure@C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:44:22: 44:28]>`
   = note: required because it appears within the type `futures::future::Map<impl futures::Future<Output = ()>, [closure@C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:44:22: 44:28]>`
note: required by a bound in `scheduler::SharedScheduler::spawn`
  --> C:\Users\jose2\.cargo\registry\src\github.com-1ecc6299db9ec823\rxrust-0.15.0\src\scheduler.rs:24:32
   |
22 |   fn spawn<Fut>(&self, future: Fut)
   |      ----- required by a bound in this
23 |   where
24 |     Fut: Future<Output = ()> + Send + 'static;
   |                                ^^^^ required by this bound in `scheduler::SharedScheduler::spawn`

For more information about this error, try `rustc --explain E0277`.
error: could not compile `rxrust` due to 4 previous errors

Bug in `scan` operator when mixed types are involved.

I'm sad and broken reporting and issue I've found when playing with scan operator that I've just added. It turned out this extra test case would not compile, and It surely exposes input/output type mismatch. I admit this is where it's easy to overlook.

  #[test]
  fn scan_fork_and_shared_mixed_types() {
    // type to type can fork
    let m = observable::from_iter(vec!['a', 'b', 'c']).scan(|acc, v| 1);
    m.fork()
      .scan(|acc, v| *v as f32)
      .fork()
      .to_shared()
      .fork()
      .to_shared()
      .subscribe(|_| {});
  }

I will soon post a patch for it, but I've decided to report it over issue just for the record.

lessons learned I take - is to ALWAYS introduce mixed type unit test for operators + fork + into_shared - where input/output are not guaranteed to be the same. I was clearly lacking such one initially.
There was a mixed-type unit test, but for a simple subscription, the reversed notation somehow cancelled-out there and did not expose the problem properly.

IntoShared on LocalSubscription uses unneeded transmute

This line of transmute is unsound:

unsafe { transmute(inner) };

Basically it adds the Send + Sync + 'static bound to the dyn SubscriptionLike without verifying if it holds. In the current implementation there's no way to do this conversion safely, so either the impl IntoShared has to be removed or LocalSubscription and SharedSubscription should get a rewrite.

Audit Nomenclature: Subscription | Subscriber | Observer

This codebase likes to shift back and forth between various nomenclature for various actors. It's not clear whether to call a consumer of values a Subscriber or an Observer. The trait's name is Observer:

pub trait Observer {
  type Item;
  type Err;
  fn next(&mut self, value: Self::Item);
  fn error(&mut self, err: Self::Err);
  fn complete(&mut self);
}

This agrees with ReactiveX documentation elsewhere (Ex: It's most popular implementation RxJS), though this isn't as consistent as we might hope. Throughout the code, tests/ docs instances of observers are often called subscribers. rxRust should endeavor to choose one name and stick with it everywhere.

Is the following subscriber is actually an observer? It seems to implement the Observer trait.

let o = observable::create(|subscriber| {
    subscriber.complete();
})

The docs also often refer to Subscriptions or SubscriptionLike as Subscriber meaning that sometimes Subscriber means Observer and sometimes it means SubscriptionLike. For example:

/// Returns a new Observable that multicast (shares) the original
/// Observable. As long as there is at least one Subscriber this
/// Observable will be subscribed and emitting data.

Subscribing with an Observer, returns a SubscriptionLike (via SubscriptionWrapper) and it's ostensibly this Subscription that determines how long an Observable lives.

I'm not sure I get all the nuance at work here, but I think some tightening of the common vernacular might help!

Use Infallible instead of unit for represent an error that will not happen

Currently for representing the absense of errors () unit is being used, but the idiomatic way Rust use for those cases is the never type or Infallible: https://doc.rust-lang.org/std/convert/enum.Infallible.html

I think Infallible is the type which should be used for those cases.

So types like ObserverItem<N> should be rewriten as:

#[derive(Clone)]
pub struct ObserverItem<N> {
  next: N,
}

impl<Item, N> Observer<Item, Infallible> for ObserverItem<N>
where
  N: FnMut(Item),
{
  fn next(&mut self, value: Item) {
    (self.next)(value);
  }

  #[inline]
  fn error(self, _err: Infallible) {}

  #[inline]
  fn complete(self) {}

  #[inline]
  fn is_finished(&self) -> bool {
    false
  }
}

What is delay() supposed to do?

@M-Adoo I've a question regarding the delay operator. I thought it delays the submission of source items (as e.g. in rxjs) but in my case it seems to have the effect that the complete subscription is delayed. Once the subscription is done (after the delay), items are emitted immediately.

Need block_on like future

Rust future has block_on method for waiting async function complete, but in rxRust example, just use sleep for waiting result. I think it should have a block_on method like futures or have some method convert an subscription to a future and it can be block_on and await

`wasm` support

It was brought up in a comment in #176 that wasm has not been tested. I believe it would be a fantastic idea to get support for wasm, specifically the wasm32-unknown-unknown, since the observable spec lends itself most useful to reactive front-end web frameworks, which can currently only target wasm32-unknown-unknown.

Wait GAT

In #134, we removed InnderDeref, and have implemented both local and shared version of Observer for operator, once GAT is ready rust-lang/rust#44265, we should use generic to merge these two version implementation in one. And also support to emit reference value.

  • merge local & shared implementation of Observer for operators.
  • support emit reference value.

Missing features list

Thank you for the great project.

Is it possible to have a list of features that are missing so that people can contribute to the project easily?

Consider replacing PayloadCopy with Clone bound

In the last months I have worked a lot with rxRust in a real-world application. Overall, it works quite well. Although many operators are not available yet, it proved to be very useful already and in some areas it's even indispensable for me.

In some areas it' still a bit hard to use, compared to other Rx implementations. Obviously this is partly related to the nature of Rust itself, which sometimes has to sacrifice convenience for safety - this is totally fine and people who use Rust appreciate this anyway. But there are a few other aspects about rxRust which I think could be improved a bit so that it becomes easier to use.

One of them is the requirement for PayloadCopy. I found that in practice, I often deal with items which don't implement Copy. Because I don't want them to be cloned a dozen times when sending them through an observable chain, I often put them into an Rc<T>. AFAIK there's no way to implement PayloadCopy for Rc<T>, so I end up wrapping it in a custom Payload<T> type which implements PayloadCopy (by cloning its value). Of course, this Payload type then becomes part of my function signatures - a part of the public API, in case of a library. This is quite verbose.

The verbosity of having an extra wrapper could be prevented if rxRust would be satisfied with items that implement Clone. I know, the initial argument for requiring Copy/PayloadCopy was to make it harder for users to do anything stupid performance-wise (see #74). But IMHO it would be better to just mention this in the docs instead of making rxRust harder to use.

@M-Adoo What do you think?

A few delay and thread tests fail when run under tarpaulin

Tarpaulin is a test coverage tool for Rust. The good news is that rxRust seems to have 93% coverage which is great! But when running under tarpaulin, I see some test failures:

rustup run nightly cargo tarpaulin
running 119 tests
...............................F.thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: "PoisonError { inner: .. }"', src/ops/delay.rs:75:8
...............................F..thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: "PoisonError { inner: .. }"', src/ops/observe_on.rs:164:7
thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: "PoisonError { inner: .. }"', src/ops/observe_on.rs:88:11
.................F.thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: "PoisonError { inner: .. }"', src/ops/subscribe_on.rs:117:9
.............. 100/119
..........F.thread '<unnamed>' panicked at 'called `Result::unwrap()` on an `Err` value: "PoisonError { inner: .. }"', src/subject.rs:451:10
......F
failures:

---- ops::delay::smoke stdout ----
thread 'ops::delay::smoke' panicked at 'assertion failed: `(left == right)`
  left: `0`,
 right: `1`', src/ops/delay.rs:79:3
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

---- ops::observe_on::test::switch_thread stdout ----
thread 'ops::observe_on::test::switch_thread' panicked at 'index out of bounds: the len is 0 but the index is 0', /rustc/58b834344fc7b9185e7a50db1ff24e5eb07dae5e/src/libcore/slice/mod.rs:2791:10

---- ops::subscribe_on::test::new_thread stdout ----
thread 'ops::subscribe_on::test::new_thread' panicked at 'assertion failed: `(left == right)`
  left: `[]`,
 right: `[1, 2, 3, 4]`', src/ops/subscribe_on.rs:123:5

---- subject::test::empty_local_subject_can_convert_to_shared stdout ----
thread 'subject::test::empty_local_subject_can_convert_to_shared' panicked at 'assertion failed: `(left == right)`
  left: `0`,
 right: `100`', src/subject.rs:458:5

---- ops::throttle_time::smoke stdout ----
thread 'ops::throttle_time::smoke' panicked at 'assertion failed: `(left == right)`
  left: `[9, 19, 29, 39, 49, 59, 69, 79, 89]`,
 right: `[9, 19, 29, 39, 49, 59, 69, 79, 89, 99]`', src/ops/throttle_time.rs:221:3


failures:
    ops::delay::smoke
    ops::observe_on::test::switch_thread
    ops::subscribe_on::test::new_thread
    ops::throttle_time::smoke
    subject::test::empty_local_subject_can_convert_to_shared

For what it's worth, here is the coverage report:

INFO tarpaulin] Coverage Results:
|| Tested/Total Lines:
|| src/observable/connectable_observable.rs: 27/31 +0%
|| src/observable/from.rs: 112/116 +0%
|| src/observable/from_future.rs: 17/17 +0%
|| src/observable/interval.rs: 20/21 +0%
|| src/observable/trivial.rs: 24/24 +0%
|| src/observable.rs: 67/69 +0%
|| src/observer.rs: 10/12 +0%
|| src/ops/average.rs: 42/42 +0%
|| src/ops/count.rs: 16/16 +0%
|| src/ops/delay.rs: 21/24 +0%
|| src/ops/filter.rs: 22/27 +0%
|| src/ops/filter_map.rs: 36/41 +0%
|| src/ops/first.rs: 75/76 +0%
|| src/ops/last.rs: 106/106 +0%
|| src/ops/map.rs: 46/48 +0%
|| src/ops/merge.rs: 97/103 +0%
|| src/ops/minmax.rs: 104/107 +0%
|| src/ops/observe_on.rs: 50/68 +0%
|| src/ops/publish.rs: 10/11 +0%
|| src/ops/reduce.rs: 40/40 +0%
|| src/ops/ref_count.rs: 69/74 +0%
|| src/ops/scan.rs: 58/61 +0%
|| src/ops/subscribe_on.rs: 30/33 +0%
|| src/ops/sum.rs: 24/24 +0%
|| src/ops/take.rs: 51/52 +0%
|| src/ops/take_last.rs: 53/55 +0%
|| src/ops/throttle_time.rs: 71/85 +0%
|| src/ops.rs: 4/4 +0%
|| src/scheduler/thread_pool_scheduler.rs: 7/7 +0%
|| src/scheduler/thread_scheduler.rs: 10/11 +0%
|| src/scheduler.rs: 18/20 +0%
|| src/subject.rs: 134/143 +0%
|| src/subscribable/subscribable_all.rs: 10/11 +0%
|| src/subscribable/subscribable_comp.rs: 4/5 +0%
|| src/subscribable/subscribable_err.rs: 6/6 +0%
|| src/subscribable/subscribable_pure.rs: 6/7 +0%
|| src/subscribable.rs: 1/1 +0%
|| src/subscriber.rs: 74/76 +0%
|| src/subscription.rs: 82/87 +0%
|| src/util.rs: 2/2 +0%
|| 
93.93% coverage, 1656/1763 lines covered, +0% change in coverage

Types mismatch when using `map` operator

When map converts source observable type to a different one, the program does not compile.

This is a simple error reproducer, can be added to the `map.rs' file in unit tests section:

  #[test]
  fn map_types_mixed() {
    let mut i = 0;
    observable::from_iter!(vec!['a','b','c'])
      .map(|v| 1)
      .subscribe(|v| i += *v);
    assert_eq!(i, 3);
  }

The compilation error received is:

   Compiling rxrust v0.4.1-alpha.0 (/workspaces/rxRust)
error[E0277]: cannot add-assign `char` to `{integer}`
   --> src/ops/map.rs:223:24
    |
223 |       .subscribe(|v| i += *v);
    |                        ^^ no implementation for `{integer} += char`
    |
    = help: the trait `std::ops::AddAssign<char>` is not implemented for `{integer}`

error[E0631]: type mismatch in closure arguments
   --> src/observable/from.rs:7:31
    |
7   |     let _: &Observer<_, ()> = &subscriber;
    |                               ^^^^^^^^^^^ expected signature of `for<'r> fn(&'r {integer}) -> _`
    | 
   ::: src/ops/map.rs:221:5
    |
221 |     observable::from_iter!(vec!['a','b','c'])
    |     ----------------------------------------- in this macro invocation
222 |       .map(|v| 1)
223 |       .subscribe(|v| i += *v);
    |                  ----------- found signature of `for<'r> fn(&'r char) -> _`
    |
    = note: required because of the requirements on the impl of `subscribable::Observer<{integer}, ()>` for `subscribable::subscribable_pure::SubscribePure<[closure@src/ops/map.rs:223:18: 223:29 i:_]>`
    = note: required because of the requirements on the impl of `subscribable::Observer<char, ()>` for `ops::map::MapSubscribe<subscribable::subscribable_pure::SubscribePure<[closure@src/ops/map.rs:223:18: 223:29 i:_]>, [closure@src/ops/map.rs:222:12: 222:17]>`
    = note: required because of the requirements on the impl of `subscribable::Observer<char, ()>` for `subscriber::Subscriber<ops::map::MapSubscribe<subscribable::subscribable_pure::SubscribePure<[closure@src/ops/map.rs:223:18: 223:29 i:_]>, [closure@src/ops/map.rs:222:12: 222:17]>, subscription::LocalSubscription>`
    = note: required for the cast to the object type `dyn subscribable::Observer<char, ()>`

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.