Giter Club home page Giter Club logo

qactive's Introduction

Qactive

A reactive queryable observable framework.

Download from NuGet

Qactive.Providers.Tcp
Depends on Rx, Qactive.Providers.Streaming, Qactive.Expressions and Qactive
Runtimes: .NET Framework 4.6.1; 4.5.2; 4.0

Qactive.Providers.Streaming
Depends on Rx, Qactive.Expressions and Qactive
Runtimes: .NET Framework 4.6.1; 4.5.2; 4.0

Qactive
Depends on Rx
Runtimes: .NET Framework 4.6.1; 4.5.2; 4.0, ASP.NET Core 1.0, Windows 8, Windows Phone 8.1, Xamarin.Android, Xamarin.iOS

Qactive.Expressions
No dependencies
Runtimes: .NET Framework 4.6.1; 4.5.2; 4.0

Overview

Qactive builds on Reactive Extension's queryable observable providers, enabling you to write elegant reactive queries in LINQ that execute server-side, even though they are written on the client. Qactive makes the extremely powerful act of querying a reactive service as easy as writing a typical Rx query.

More specifically, Qactive enables you to easily expose IQbservable<T> services for clients to query. When a client defines a query and subscribes, a connection is made to the server and the serialized query is transmitted to the server as an expression tree. The server deserializes the expression tree and executes it as a standing query. Any output from the query is marshaled back to the client over a persistent, full-duplex connection. Members on closures and static members that are local to the client are invoked from within the service automatically via full-duplex messaging. Anonymous types are automatically serialized as well.

For more information, see this series of blog posts.

Warning: Qactive allows clients to execute arbitrary code on your server. There are security mechanisms in place by default to prevent malicious clients but only to a point, it hasn't been fully considered yet. Do not expose a Qbservable service on a public server without taking the necessary precautions to secure it first.

See Security Guidelines for more information.

Features

Please refer to the list of features in the wiki.

Getting Started

Qactive is a set of .NET class libraries that you can reference in your projects. NuGet is recommended.

Add a reference to the Qactive.Providers.Tcp package in your Visual Studio project. That package references the other packages as dependencies, so NuGet will automatically download all of them for you.

Note: Currently, the TCP provider is the only provider available.

The source code's Examples folder contains projects that show various usages of Qactive, from a simple query over a timer to a real-time chat application.

To run the examples:

  1. Run QbservableServer.exe.
  2. The server will start hosting example Qbservable services as soon as the console application begins.
  3. Pressing a key at any time will stop the server.
  4. Run QbservableClient.exe.
  5. You can run several client console applications at the same time.
  6. When the client console application starts, press any key to connect to the server. The client will begin running the first example.
  7. Press any key to stop the current example and start the following example.

To build the source code:

  1. Set the QbservableServer project as the startup project.
  2. Build and run. The server will start as soon as the console application begins.
  3. Set the QbservableClient project as the startup project.
  4. Build and run. You can run several client console applications at the same time.
  5. When the client console application starts, press any key to connect to the server.

Tip: To see the original and rewritten expression trees, run the client application with the debugger attached and look at the Output window.

Simple Example

The following example creates a cold observable sequence that generates a new notification every second and exposes it as an IQbservable<long> service over TCP port 3205 on the local computer.

Server

IObservable<long> source = Observable.Interval(TimeSpan.FromSeconds(1));

var service = source.ServeQbservableTcp(new IPEndPoint(IPAddress.Loopback, 3205));

using (service.Subscribe(
  client => Console.WriteLine("Client shutdown."),
  ex => Console.WriteLine("Fatal error: {0}", ex.Message),
  () => Console.WriteLine("This will never be printed because a service host never completes.")))
{
  Console.ReadKey();
}

The following example creates a LINQ query over the IQbservable<long> service that is created by the previous example. Subscribing to the query on the client causes the query to be serialized to the server and executed there. In other words, the where clause is actually executed on the server so that the client only receives the data that it requested without having to do any filtering itself. The client will receive the first six values, one per second. The server then filters out the next 2 values - it does not send them to the client. Finally, the remaining values are sent to the client until either the client or the server disposes of the subscription.

Client

var client = new TcpQbservableClient<long>(new IPEndPoint(IPAddress.Loopback, 3205));

IQbservable<long> query =
  from value in client.Query()
  where value <= 5 || value >= 8
  select value;

using (query.Subscribe(
  value => Console.WriteLine("Client observed: " + value),
  ex => Console.WriteLine("Error: {0}", ex.Message),
  () => Console.WriteLine("Completed")))
{
  Console.ReadKey();
}

qactive's People

Contributors

rxdave avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

qactive's Issues

Review Socket Shutdown

Ensure that sockets are terminated by the client rather than the server, wherever possible.
Also ensure that observable termination yields deterministic results upon graceful shutdown; e.g., the client termination notification on the server should indicate that the observable terminated, not that the client terminated or that either side was forcibly closed by the remote connection.

Server not online example?

Hi

Can you suggest the best approach when the server is not online? For example my clients can live without the server online but I do not want to restart them when server goes offline, or a similar case is when server is started after the client

[Question] Still active?

This looks incredible!

Just a couple of questions on the project itself:

  • Are you still actively developing this?
  • Are you using it any production applications?
  • Have you spoken to anyone at Microsoft regarding Reactor and their road to open-sourcing bits of it?

TcpQbservableClient : IQbservable

Consider having TcpQbservableClient (and its base class) implement IQbservable so that clients don't have to invoke the Query method at all, unless they want to pass in a subscription parameter.

Duplex Task<T>

Implement duplex support for Task<T> similar to IObservable<T>.

Primary use case:
When closing over a local method for duplex invocation, the server blocks until the client returns. This is inefficient; however, if you define an async method instead (that returns Task or Task<T>, but not void), then the query will fail because it would try to serialize the Task result to the server rather than installing a proxy, like it does for IObservable<T> closures.

Custom Serialization

Rather than (or in addition to) protobuf (#3) or the BinarySerializationFormatter that is used by default, reuse my custom serialization (not open-sourced yet) via a new interface defined in a portable class library, as an optional NuGet package.

Benefits:

  • Lightweight for performance and memory
  • Explicit serialization for clarity and full control
  • Enables serialization for the portable versions of Qactive, since they don't support SerializationAttribute.
  • Optional NuGet package can be plugged in easily - could make it that if the package is referenced, then it becomes the default serialization format automatically (although still need a way to opt-out, as a cross-cutting concern)

Serialization will be provided by any types that must be serialized by simply writing to and reading themselves from a stream (or via a factory pattern; the details have to be considered). Serialization won't include any type info at all, and it will require that each type serializes itself, which means that optional fields and default values will be explicit. (There will probably be a library of helper functions anyway.)

Consider these design guidelines:

  1. Avoid reflection wherever possible.
  2. Reuse byte arrays (first-class) and any other objects required during serialization to avoid GC pressure.
  3. Consider offering an attribute model, an interface and a factory model.

AppDomain Security vs Reflection Emit

The malicious client example app doesn't work as it used to.

Seems that Reflection Emit is demanding full trust now. Qactive only asserts Reflection permission but apparently that's not enough. Did Emit always demand full trust or did something change in Rx?

Unfortunately, if full trust is required by Rx's Qbservable Provider (the call to Subscribe) then CAS security may be useless here and all of the AppDomain code may be removed.

Alternatively, consider writing a custom Qbservable Provider just like Rx but relaxing the permission set, if possible. Perhaps one way to do this is to ensure that the expression tree is compiled outside of the call to Subscribe and therefore full trust can be asserted? (Note that asserting full trust around the call to Subscribe entirely defeats the purpose of using CAS because it means that the client's entire query will be running with full trust!)

Support ObserveOn And Known Schedulers

Currently, the ObserveOn operator can only be used with serializable IScheduler implementations; however, this is not the typically intended usage of ObserveOn. Rather, its usage implies that the server must perform scheduling using the specified scheduler, which is identified via a static property such as NewThreadScheduler.Default.

The client must rewrite queries containing constant scheduler references by replacing them with references to their corresponding static properties.

Instrumentation

Define a TraceSource and log any potentially useful infrastructure operations.

Review all uses of Tuple

In building the original proof of concept I took some shortcuts with types for the sake of time. For clarity and performance, review all uses of types such as Tuple and see if replacing them with discrete types is justifiable.

Rx 3.0

Compile against the latest version of Rx.

.NET 4.5.2 Source Not Serializable

The portable Qactive library doesn't apply SerializableAttribute to any of the types that must be serializable because that attribute isn't available in the Framework subset of its portable profile. As a result, the other projects strictly targeting .NET 4.5.2 (Expressions, Streaming, TCP) don't work at all because the core library's types can't be serialized. Clients get the following error immediately:

System.Runtime.Serialization.SerializationException: 
Type 'Qactive.QbservableSourcePlaceholder`1[[System.Int64, 
mscorlib, Version=4.0.0.0, Culture=neutral, 
PublicKeyToken=b77a5c561934e089]]' in Assembly 
'Qactive, Version=2.0.1.0, Culture=neutral, 
PublicKeyToken=55a053e6184a046e' is not marked as serializable.

There may not be any workaround to this problem through the portable target. The only solution is probably to create another flavor of Qactive that targets .NET 4.5.2 directly, and thus can apply SerializableAttribute as needed.

Visitor Overload

Consider defining an overload of the service operators (and secure variants) accepting an expression tree visitor rather than an IQbservableProvider. This could make pre-processing much easier for services since in many cases the only reason for creating their own providers, which can be failry complex, is to have access to a single visitor. Also consider an overload that takes a sequence of visitors and applies them to the client's expression in the order in which they're specified.

ClientTermination event when client crashed

ChatService and ChatClient example
when i dispose observable on client side, ClientTermination will be sending.
but when i close client (alt+f4 or crash for example), Server side does not receive a message.

maybe I should use some options when creating a server?

Separate Transport

Create a clearer separation of the protocol and transport layers.
Continue to provide the current TCP layer as the default option. (Separate NuGet package?)
Also consider using, within the TCP layer, a specialized interface for serialization rather than IRemotingFormatter.

Trace Message Size

Write the size of messages (I/O) in bytes to the Qactive trace source.

Support for .NET Core 2.x?

I wonder that is it working with .NET Core 2.x? I have seen in the README file that it works with ASP.NET Core 1.0, but I have checked the *.csproj files that its only target to 4.5.x. My question is how can I make it work with .NET Core 2.x and .NET Core stack?

Protocol: C# Script Input

Consider updating the protocol to support raw C# input from clients. This will make it possible for JS clients to communicate over the new WebSocket provider.

The idea is simple: Any client can send a query as a string containing a C# code snippet, with the following constraints.

  • MUST NOT exceed a maximum length (an optional security constraint specified by the service).
  • MUST NOT contain any { } or ; characters (except in strings and comments).
  • MUST contain an unambiguous placeholder for the query source; e.g., {{SOURCE}}
  • MAY contain any number of using statements before the query.
  • MAY contain closures via unambiguous identifiers; e.g., from x in {{OBSERVABLE:myClosure}}
  • consider other constraints...

Consider using Roslyn on the server-side to parse queries into expression trees and then treat them as if they were normal deserialized expression trees.

ChatService example OnNext scheduled on current thread

Hello,

why are the hooks in the chat service example pushed on the current thread scheduler?
https://github.com/RxDave/Qactive/blob/master/Examples/QbservableServer/ChatService.cs

As I have understood the current thread scheduler, it shouldn‘t make any difference to leave it out. The current thread scheduler only puts nested calls into the message queue.
http://www.introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#Current

Or am I wrong?

PID Controller

Create a PID operator that automatically throttles notifications from the server based on the changing performance of the client.

For example, the operator could buffer notifications on the client while they're being processed. The goal is to keep the buffer at zero. As the length of the buffer increases, a closed-over observable pushes notifications to the server indicating the calculated throttle period that will help the client to consume items in its buffer faster than it receives them; e.g., as the buffer length increases, the throttle period is increased, which decreases the number of notifications arriving on the client, thus eventually decreasing the buffer size to zero. As it's decreasing, the throttle period may also be decreasing, as long as the buffer size continues to decrease.

For situations in which dropping notifications is unacceptable, yet batching notifications is at least more efficient than processing notifications individually, then instead of throttling the server can use batching (in this case, the client would push buffer count changes rather than throttle period changes).

And perhaps some combination of both could be used, for extreme cases where some upper limit is reached and the server must transition from batching to throttling.

https://en.wikipedia.org/wiki/PID_controller

Protobuf

Provide support for protobuf. (Separate NuGet package?)

It would work something like this:

  1. Client defines proto for itself.
  2. When Subscribe is called on the Query(), the proto-aware transport layer has a specific connection negotiation step that transmits the proto to the server (directly written to the NetworkStream).
  3. Server accepts the proto and installs it within its proto-aware transport layer.
  4. All further communication is serialized/deserialized through a managed protobuf library.

Rename Query method to AsQueryable

Got some feedback from developers new to Rx who said that the name Query was confusing. I think I agree, but perhaps for a different reason: it seems to imply that side effects will occur immediately, which isn't true.

Reconsider Subject Closures

Check to see whether subject closures work as expected (I believe they do not at the moment). Shouldn't closing over a subject be treated as an IObservable closure by default? Even though it's ambiguous, assuming that the IObserver side was the author's intention seems wrong most times; however sometimes it's not, as in a chat app. So perhaps calls to Subscribe can be special-cased to treat subject closures as IObserver, but IObservable in any other case.

Ability to take Serialization of Qbservables without the tcp component?

I'm super interested in this stuff, since ms seems to be dropping the ball on it in a bad way. I'd like to be able to just serialize the Qbservable... is it reasonable to just convert an expression to a SerializableExpression and then serialize via whatever serializer I choose to and push that over to other end via websockets or rest endpoint?

Service Args Observable Must Be Hot

Currently, if you use the QbservableServer.Create method to accept arguments, the IObservable<TSource> that is supposed to represents all clients' arguments is actually just a singleton observable that is instantiated once for each client. This defies intuition and restricts important scenarios regarding static vs. published service implementations.

For example, the following wouldn't work as expected since the function that is passed strings will actually be invoked once for each connected client. The strings observable is actually Observable.Return(...args…), for each connected client. Instead, it should be a hot observable that actually pushes all connected clients' arguments, thus service authors could use the Publish operator to construct singleton service instances.

TcpQbservableServer.Create<string, QueryContext>(…, …, 
   strings => (from name in strings select new User(name))
   .Publish(  // this isn't what you'd think - the publish function will be called once for each client
       users => {  // users isn't what you'd think - it only contains a single user, per invocation
           var singleton1 = new MyService1();  // Bad assumption - instance is created for each client
           var singleton2 = new MyService2(users);  // Bad assumption - instance is created for each client
           return from user in users
                      select new QueryContext(     // QueryContext created per user - works as expected
                          singleton1, 
                          singleton2, 
                          user);
      }))
.Subscribe(….);

Support Async Duplex Only

Notifications from clients arrive on I/O completion ports. Blocking these threads causes deadlocks and/or severe performance penalties. Therefore, allowing clients to close over enumerables, or synchronous functions of any kind within their queries MUST not be permitted. Testing these features over the years has often resulted in deadlocks and unexpected behavior, and frankly it's much harder to debug a client query with them in place.

Furthermore, considering all of the great async support that has come to be since having written this framework, it only makes sense to avoid sync callbacks altogether.

Therefore, only the following scenarios SHOULD be supported:

  1. Closing over any IObservable<T>, or any type that implements IObservable<T> (e.g., Subject<T>)
  2. Closing over any local function that returns IObservable<T>, or any type that implements it.
    a. Note that this is acceptable because the function call from the server to the client can be async since an observable proxy can be created on the server. After all, the call to Subscribe is inherently async, thus the server can continue executing the query and call Subscribe on the proxy observable, while the duplex mechanism asynchronously sends the invoke message to the client and calls Subscribe on the client-side.
  3. Closing over any Task or Task<T>.
  4. Closing over any local function that returns a Task or Task<T>.
  5. Closing over any IAsyncEnumerable<T> -- (For future consideration).
  6. Closing over any local function that returns IAsyncEnumerable<T> -- (For future consideration).

Enhanced Security Model

See [Security Guidelines](../blob/master/Artifacts/Security Guidelines.md) for ideas on enhancements to the core library, which should create a secure environment by default, as much as possible.

Anonymous Projection

Edit: The original description was overcomplicated. Simply provide support for projecting to anonymous types from within a client's query. No options or protocol negotiation is required.

Consider supporting projection of anonymous types in the IQbservable by allowing clients to opt-in (via another option flag?) to it and specify a concrete type for T with matching properties. The client proxy would automatically map an instance of the anonymous type into a new instance of T.

To ensure that mismatches cause errors as soon as possible, if the server proxy detects that an anonymous type is being projected, then as part of the negotiation protocol it serializes a single empty instance of the type and transmits it to the client. The client then validates T against the anonymous type's definition during the client-side negotiation process and fails immediately if there is a mismatch.

Semantic Log Parser

Expose a type that can parse semantic log information back into object form.

Support Nested IQbservable

Services that are themselves clients of upstream Qactive services (or any other IQbservable<T> implementation) may wish to expose their IQbservable<T> to downstream clients. Ensure that this is possible.

More C# 6

Refactor to make use of more C# 6 features.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.