rxdave / qactive Goto Github PK
View Code? Open in Web Editor NEWReactive queryable observable framework.
License: Apache License 2.0
Reactive queryable observable framework.
License: Apache License 2.0
Edit: The original description was overcomplicated. Simply provide support for projecting to anonymous types from within a client's query. No options or protocol negotiation is required.
Consider supporting projection of anonymous types in the IQbservable by allowing clients to opt-in (via another option flag?) to it and specify a concrete type for T with matching properties. The client proxy would automatically map an instance of the anonymous type into a new instance of T.
To ensure that mismatches cause errors as soon as possible, if the server proxy detects that an anonymous type is being projected, then as part of the negotiation protocol it serializes a single empty instance of the type and transmits it to the client. The client then validates T against the anonymous type's definition during the client-side negotiation process and fails immediately if there is a mismatch.
Research, and consider refactoring for performance.
Currently, if you use the QbservableServer.Create
method to accept arguments, the IObservable<TSource>
that is supposed to represents all clients' arguments is actually just a singleton observable that is instantiated once for each client. This defies intuition and restricts important scenarios regarding static vs. published service implementations.
For example, the following wouldn't work as expected since the function that is passed strings
will actually be invoked once for each connected client. The strings
observable is actually Observable.Return(...args…)
, for each connected client. Instead, it should be a hot observable that actually pushes all connected clients' arguments, thus service authors could use the Publish
operator to construct singleton service instances.
TcpQbservableServer.Create<string, QueryContext>(…, …,
strings => (from name in strings select new User(name))
.Publish( // this isn't what you'd think - the publish function will be called once for each client
users => { // users isn't what you'd think - it only contains a single user, per invocation
var singleton1 = new MyService1(); // Bad assumption - instance is created for each client
var singleton2 = new MyService2(users); // Bad assumption - instance is created for each client
return from user in users
select new QueryContext( // QueryContext created per user - works as expected
singleton1,
singleton2,
user);
}))
.Subscribe(….);
Consider having TcpQbservableClient
(and its base class) implement IQbservable
so that clients don't have to invoke the Query
method at all, unless they want to pass in a subscription parameter.
Services that are themselves clients of upstream Qactive services (or any other IQbservable<T>
implementation) may wish to expose their IQbservable<T>
to downstream clients. Ensure that this is possible.
Check to see whether subject closures work as expected (I believe they do not at the moment). Shouldn't closing over a subject be treated as an IObservable closure by default? Even though it's ambiguous, assuming that the IObserver side was the author's intention seems wrong most times; however sometimes it's not, as in a chat app. So perhaps calls to Subscribe can be special-cased to treat subject closures as IObserver, but IObservable in any other case.
Create a clearer separation of the protocol and transport layers.
Continue to provide the current TCP layer as the default option. (Separate NuGet package?)
Also consider using, within the TCP layer, a specialized interface for serialization rather than IRemotingFormatter.
Implement duplex support for Task<T>
similar to IObservable<T>
.
Primary use case:
When closing over a local method for duplex invocation, the server blocks until the client returns. This is inefficient; however, if you define an async method instead (that returns Task
or Task<T>
, but not void
), then the query will fail because it would try to serialize the Task
result to the server rather than installing a proxy, like it does for IObservable<T>
closures.
Provide support for protobuf. (Separate NuGet package?)
It would work something like this:
I'm super interested in this stuff, since ms seems to be dropping the ball on it in a bad way. I'd like to be able to just serialize the Qbservable... is it reasonable to just convert an expression to a SerializableExpression and then serialize via whatever serializer I choose to and push that over to other end via websockets or rest endpoint?
Currently, the ObserveOn
operator can only be used with serializable IScheduler
implementations; however, this is not the typically intended usage of ObserveOn
. Rather, its usage implies that the server must perform scheduling using the specified scheduler, which is identified via a static property such as NewThreadScheduler.Default
.
The client must rewrite queries containing constant scheduler references by replacing them with references to their corresponding static properties.
Rather than (or in addition to) protobuf (#3) or the BinarySerializationFormatter
that is used by default, reuse my custom serialization (not open-sourced yet) via a new interface defined in a portable class library, as an optional NuGet package.
Benefits:
SerializationAttribute
.Serialization will be provided by any types that must be serialized by simply writing to and reading themselves from a stream (or via a factory pattern; the details have to be considered). Serialization won't include any type info at all, and it will require that each type serializes itself, which means that optional fields and default values will be explicit. (There will probably be a library of helper functions anyway.)
Consider these design guidelines:
This looks incredible!
Just a couple of questions on the project itself:
Consider following the path of the Rx project and target only these platforms:
Write the size of messages (I/O) in bytes to the Qactive
trace source.
All public types and members.
Got some feedback from developers new to Rx who said that the name Query
was confusing. I think I agree, but perhaps for a different reason: it seems to imply that side effects will occur immediately, which isn't true.
Create a PID operator that automatically throttles notifications from the server based on the changing performance of the client.
For example, the operator could buffer notifications on the client while they're being processed. The goal is to keep the buffer at zero. As the length of the buffer increases, a closed-over observable pushes notifications to the server indicating the calculated throttle period that will help the client to consume items in its buffer faster than it receives them; e.g., as the buffer length increases, the throttle period is increased, which decreases the number of notifications arriving on the client, thus eventually decreasing the buffer size to zero. As it's decreasing, the throttle period may also be decreasing, as long as the buffer size continues to decrease.
For situations in which dropping notifications is unacceptable, yet batching notifications is at least more efficient than processing notifications individually, then instead of throttling the server can use batching (in this case, the client would push buffer count changes rather than throttle period changes).
And perhaps some combination of both could be used, for extreme cases where some upper limit is reached and the server must transition from batching to throttling.
Hello,
why are the hooks in the chat service example pushed on the current thread scheduler?
https://github.com/RxDave/Qactive/blob/master/Examples/QbservableServer/ChatService.cs
As I have understood the current thread scheduler, it shouldn‘t make any difference to leave it out. The current thread scheduler only puts nested calls into the message queue.
http://www.introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#Current
Or am I wrong?
Ensure that sockets are terminated by the client rather than the server, wherever possible.
Also ensure that observable termination yields deterministic results upon graceful shutdown; e.g., the client termination notification on the server should indicate that the observable terminated, not that the client terminated or that either side was forcibly closed by the remote connection.
The portable Qactive library doesn't apply SerializableAttribute
to any of the types that must be serializable because that attribute isn't available in the Framework subset of its portable profile. As a result, the other projects strictly targeting .NET 4.5.2 (Expressions, Streaming, TCP) don't work at all because the core library's types can't be serialized. Clients get the following error immediately:
System.Runtime.Serialization.SerializationException:
Type 'Qactive.QbservableSourcePlaceholder`1[[System.Int64,
mscorlib, Version=4.0.0.0, Culture=neutral,
PublicKeyToken=b77a5c561934e089]]' in Assembly
'Qactive, Version=2.0.1.0, Culture=neutral,
PublicKeyToken=55a053e6184a046e' is not marked as serializable.
There may not be any workaround to this problem through the portable target. The only solution is probably to create another flavor of Qactive that targets .NET 4.5.2 directly, and thus can apply SerializableAttribute
as needed.
The malicious client example app doesn't work as it used to.
Seems that Reflection Emit is demanding full trust now. Qactive only asserts Reflection permission but apparently that's not enough. Did Emit always demand full trust or did something change in Rx?
Unfortunately, if full trust is required by Rx's Qbservable Provider (the call to Subscribe) then CAS security may be useless here and all of the AppDomain code may be removed.
Alternatively, consider writing a custom Qbservable Provider just like Rx but relaxing the permission set, if possible. Perhaps one way to do this is to ensure that the expression tree is compiled outside of the call to Subscribe and therefore full trust can be asserted? (Note that asserting full trust around the call to Subscribe entirely defeats the purpose of using CAS because it means that the client's entire query will be running with full trust!)
I wonder that is it working with .NET Core 2.x? I have seen in the README file that it works with ASP.NET Core 1.0
, but I have checked the *.csproj
files that its only target to 4.5.x. My question is how can I make it work with .NET Core 2.x and .NET Core stack?
Define a TraceSource and log any potentially useful infrastructure operations.
Consider updating the protocol to support raw C# input from clients. This will make it possible for JS clients to communicate over the new WebSocket provider.
The idea is simple: Any client can send a query as a string containing a C# code snippet, with the following constraints.
using
statements before the query.Consider using Roslyn on the server-side to parse queries into expression trees and then treat them as if they were normal deserialized expression trees.
See [Security Guidelines](../blob/master/Artifacts/Security Guidelines.md) for ideas on enhancements to the core library, which should create a secure environment by default, as much as possible.
Refactor to make use of more C# 6 features.
Compile against the latest version of Rx.
Hi
Can you suggest the best approach when the server is not online? For example my clients can live without the server online but I do not want to restart them when server goes offline, or a similar case is when server is started after the client
Notifications from clients arrive on I/O completion ports. Blocking these threads causes deadlocks and/or severe performance penalties. Therefore, allowing clients to close over enumerables, or synchronous functions of any kind within their queries MUST not be permitted. Testing these features over the years has often resulted in deadlocks and unexpected behavior, and frankly it's much harder to debug a client query with them in place.
Furthermore, considering all of the great async support that has come to be since having written this framework, it only makes sense to avoid sync callbacks altogether.
Therefore, only the following scenarios SHOULD be supported:
IObservable<T>
, or any type that implements IObservable<T>
(e.g., Subject<T>
)IObservable<T>
, or any type that implements it.Subscribe
is inherently async, thus the server can continue executing the query and call Subscribe
on the proxy observable, while the duplex mechanism asynchronously sends the invoke message to the client and calls Subscribe
on the client-side.Task
or Task<T>
.Task
or Task<T>
.IAsyncEnumerable<T>
-- (For future consideration).IAsyncEnumerable<T>
-- (For future consideration).Add code contracts.
In building the original proof of concept I took some shortcuts with types for the sake of time. For clarity and performance, review all uses of types such as Tuple and see if replacing them with discrete types is justifiable.
Upon disconnection, unload the query. Cheaper than AppDomains, which aren't available in .Net Core anyway.
Consider defining an overload of the service operators (and secure variants) accepting an expression tree visitor rather than an IQbservableProvider
. This could make pre-processing much easier for services since in many cases the only reason for creating their own providers, which can be failry complex, is to have access to a single visitor. Also consider an overload that takes a sequence of visitors and applies them to the client's expression in the order in which they're specified.
ChatService and ChatClient example
when i dispose observable on client side, ClientTermination will be sending.
but when i close client (alt+f4 or crash for example), Server side does not receive a message.
maybe I should use some options when creating a server?
Expose a type that can parse semantic log information back into object form.
Implement all of the ToString methods and debugger view just like the real Expression
implementations.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.