Giter Club home page Giter Club logo

Comments (3)

RxDave avatar RxDave commented on May 25, 2024

CurrentThreadScheduler simply ensures that the observable can be disposed even if the call to observer.OnNext blocks. The reason for it is that calling observer.OnNext will result in an I/O network operation, which may take some time to complete. If the server needs to shut down, for example, then we don't want it hanging because the Subscribe method hasn't yet returned. The real problem here is that there is no OnNextAsync method to be called, and a SubscribeAsync method to begin the operation, which could allow for cancellation using a token, similar to the TAP (Task-based Async Pattern). NOTE: I believe the Rx team may be addressing this design issue in some future version of Rx, based on their internal, redesigned, distributed version of Rx. Don't quote me on that.

That chat example wasn't very well though out anyway. Looking at it again, I don't think it's even thread-safe, since the Subject isn't synchronized!

I'm considering recording a video to describe some of the patterns that can be used in Qactive, including the hook pattern used in the chat example.

from qactive.

Okarin99 avatar Okarin99 commented on May 25, 2024

Ok I think now I get it.

I've added some console output to your chat service example.

Console.WriteLine("Observer creating Thread ID: " + Thread.CurrentThread.ManagedThreadId);
Scheduler.CurrentThread.Schedule(() =>
{
    Console.WriteLine("Scheduler entered Thread ID: " + Thread.CurrentThread.ManagedThreadId);
    Thread.Sleep(10000);
    observer.OnNext(hooks);
    Console.WriteLine("Scheduler exited Thread ID: " + Thread.CurrentThread.ManagedThreadId);
});

Console.WriteLine("Observer created Thread ID: " + Thread.CurrentThread.ManagedThreadId);

If a client connects the ouput is:

Observer creating Thread ID: 5
Observer created Thread ID: 5
Scheduler entered Thread ID: 5
Scheduler exited Thread ID: 5

So all code scheduled with the current thread scheduler is put on the message queue.

Now I created a similar example without QActive.

private static IDisposable OuterAction(IScheduler scheduler, string state)
{     
    Console.WriteLine("{0} start. ThreadId:{1}", 
        state, 
        Thread.CurrentThread.ManagedThreadId);
      
    scheduler.Schedule(state + ".inner", InnerAction);
    Console.WriteLine("{0} end. ThreadId:{1}", 
        state, 
        Thread.CurrentThread.ManagedThreadId);
    return Disposable.Empty;
}
private static IDisposable InnerAction(IScheduler scheduler, string state)
{
    Console.WriteLine("{0} start. ThreadId:{1}", 
        state, 
        Thread.CurrentThread.ManagedThreadId);
    Thread.Sleep(1000);
    scheduler.Schedule(state + ".Leaf", LeafAction);
    Console.WriteLine("{0} end. ThreadId:{1}", 
        state, 
        Thread.CurrentThread.ManagedThreadId);
    return Disposable.Empty;
}
private static IDisposable LeafAction(IScheduler scheduler, string state)
{
    Console.WriteLine("{0}. ThreadId:{1}", 
        state, 
        Thread.CurrentThread.ManagedThreadId);
    Thread.Sleep(1000);
    return Disposable.Empty;
}

static void Main(string[] args)
{    
    Console.WriteLine("Starting on thread :{0}", 
        Thread.CurrentThread.ManagedThreadId);
    Scheduler.CurrentThread.Schedule("A", OuterAction);

    Console.WriteLine("Ending on thread :{0}", 
        Thread.CurrentThread.ManagedThreadId);

    Console.ReadKey();
}

The output is:

Starting on thread: 10
A start. ThreadId: 10
A end. ThreadId: 10
A.inner start. ThreadId: 10
A.inner end. ThreadId: 10
A.inner.Leaf. ThreadId: 10
Ending on thread: 10

So the message queue only exists in the context of the outermost call to currentthread schedule.
That means qactive subscribes on the CurrentThread scheduler, because the message queue is already created in the first example.

With synchronize you mean the synchronize operator, because two clients may send a message at the same time am I right?
http://reactivex.io/documentation/operators/serialize.html

A video would be really nice :).
In particular, some limitations of IQbservbale are not clear to me. For example, if it is possible for the client to watch several observables on the service, or if it only can watch the messages coming from the IQbservable.

from qactive.

RxDave avatar RxDave commented on May 25, 2024

Ok I think now I get it.

Yep, I think you do. Those code samples look good to me.

That means qactive subscribes on the CurrentThread scheduler, because the message queue is already created in the first example.

Rx actually refers to the queue as the "trampoline", which is really just an internal queue held by the CurrentThreadScheduler. When you call Subscribe, Rx ensures that the trampoline is already created, and if not, it creates the trampoline and enqueues that first call to Subscribe, so that every subsequent call to Subscribe is on the trampoline. This is automatic. This allows us to use the CurrentThreadScheduler from within our own Subscribe implementations to ensure that any previous work that has already been enqueued completes first. Of course, the outer-most call to Subscribe doesn't return until all the trampoline has been emptied, because it's not introducing any concurrency, there is only a single thread available to do all of the work.

With synchronize you mean the synchronize operator, ...

Well there is a method to synchronize Subjects specifically: var synced = Subject.Synchronize(unsynced);. It has a few useful overloads. However, you could also use the Synchronize operator on the output observable, but it would be adding an additional observable into the monad that is unnecessary in my chat example, because you can directly use Subject.Synchronize on the source.

A video would be really nice :).

Okay, I'll try to make the time for it!

is possible for the client to watch several observables on the service, ...

A client may connect multiple times, of course, but that would create multiple socket connections. If you want to use a single socket connection, then use the Either<TLeft, TRight> pattern as your output data; e.g., IQbservable<Either<TLeft, TRight>> An instance of the Either type contains the "TLeft" data OR the "TRight" data, but never both and never neither. It's like the disjunction to the Tuple's conjunction. As you might imagine, you can define any number of Either classes with additional type parameters, depending on how many distinct data types you need.

Essentially, this allows you to merge multiple observables into a single observable, even if they all have different data types!

On the client side, you can split the observable into multiple pieces like this:

var published = client.Query().YourServerQueryHere().AsObservable().Publish();
var left = published.Where(e => e.HasLeft).Select(e => e.Left).Subscribe(left => { });
var right = published.Where(e => e.HasRight).Select(e => e.Right).Subscribe(right => { });
published.Connect();

In many cases, though, you can avoid having to break out of the monad by using a special overload of Publish. It all depends on your needs.

client.Query().YourServerQueryHere().AsObservable().Publish(published =>
  var left = published.Where(e => e.HasLeft).Select(e => e.Left);
  var right = published.Where(e => e.HasRight).Select(e => e.Right);
  return left.YourQueryPossiblyWithSideEffects().Select(result => new CommonOutput(result))
                  .Merge(
            right.YourQueryPossiblyWithSideEffects().Select(result => new CommonOutput(result)));
)
.Subscribe(output => { });

where CommonOutput isn't an actual type, but a placeholder where you can ensure that the output of both queries is the same. A common type to use here is string, which can then be used for logging in the outermost Subscribe.

from qactive.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.