Giter Club home page Giter Club logo

trill's Introduction

Introduction

Trill is a high-performance one-pass in-memory streaming analytics engine from Microsoft Research. It can handle both real-time and offline data, and is based on a temporal data and query model. Trill can be used as a streaming engine, a lightweight in-memory relational engine, and as a progressive query processor (for early query results on partial data).

Getting Started

Building Trill

  1. Of course, the sources are right here!
  2. Clone the Repo and make sure you have Visual Studio 2017 installed
  3. Open Trill.sln solution available in ./Sources with Visual Studio 2017
  4. Build Trill

Samples using Trill

If you don't want to compile Trill yourself, you can get binaries from our NuGet feed. Samples of Trill usage are available at our samples repository. Make sure you start from the Hello World sample to get confident with Trill.

Learn More

Contact/Feedback

You can create Git issues in this repo, or contact the team using this email.

Contributing

This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.microsoft.com.

When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.

This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact [email protected] with any additional questions or comments.

trill's People

Contributors

algorithmsarecool avatar arunkm avatar badrishc avatar chaycej avatar domoritz avatar jonathankeav avatar jthelin avatar microsoft-github-policy-service[bot] avatar microsoftopensource avatar mike-barnett avatar msftgits avatar nsulikowski avatar peterfreiling avatar qinzhenms avatar rodrigoaatmicrosoft avatar yorek avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

trill's Issues

Enabling multiple disorder policy support

Currently, the Trill ingress code only allows a single disorder policy to be specified. Research has been done to allow that restriction to be loosened to multiple disorder policies set at different lag allowances. However, this work has not yet been imported into the main code base because of API issues - we need to figure out how to integrate the work more seamlessly.

Research paper link:
https://ieeexplore.ieee.org/document/8509288

QueryContainer.Flush?

When would someone use that method vs say managing the Flushes with say Punctuations ?

SyncTime public?

I’d submit that SyncTime and OtherTime should not be internal but public in the StreamEvent struct.
They are both integral concepts of the library.

question

i have a stream of the following 6 events
A1 , B1 , B2 , A2 , B3 , B4
and I want to produce another stream of the B's, together with the last A's
(B1,A1) , (B2,A1) , (B3,A2) , (B4,A2)
how can I go about it?

Global SyncTime

Does Trill have one notion of time across all streams or only per stream?
If it does, is there a way to access that long property?

Batch Processing : How to withhold output until input terminates

Hello, thank you very much for designing and open sourcing this system. I've been reading y'alls papers on Trill, FASTER and Quill for years now.

I have a few questions about how to use it.

Trill for batch processing

I realize that Trill is primarily a streaming data engine. But I work with a lot of batched data also that i would like to perform queries against using Trill.

Maybe i don't have the egress side of things setup correctly but my setup looks like this

public static async Task EventsPerHour(string rootFolder)
{
    await
        LogExtractor
        .Create()
        .ExtractSingleSiteDirectory(rootFolder)
        .AsObservable()
        .ToTemporalStreamable(
            e => e.Entry.Timestamp.Ticks,
            DisorderPolicy.Drop(TimeSpan.TicksPerHour),
            FlushPolicy.None,
            PeriodicPunctuationPolicy.None())
        .GroupApply(
            e => e.Entry.Timestamp.Hour,
            g => g.Count(),
            (g, v) => new { Hour = g.Key, Count = v })
        .ToStreamEventObservable()
        .ForEachAsync(i => {
            Console.WriteLine(i.Payload);
        });
}

So what i would like to get out of this somehow is a single list of (Hour, Count) pairs.

What i actually get is a lot of incremental updates as the data flows in. To compensate i made a handler method that tracks all the updates per group and only keeps the last one. It produces correct output, but it seems wasteful to have the engine continue to produce output that i'm discarding.

Can i tell Trill to withhold output until the input stream terminates? If so how?

Weakened Discoverability

Also, why are so many things marked [EditorBrowsable(EditorBrowsableState.Never)]? For example, I see people using the 3 argument version of group apply in examples, but for whatever reason you have GroupSelectorInput<T>.Key marked as never browseable, making the result selector function seem useless initially.

Is there a reason this property (and others like it) is hidden?

GroupApply vs Partition+Aggregate+SelectByKey

Are these two constructions equivalent? If so which should i prefer?

 .GroupApply(
    e => e.Entry.Timestamp.Hour,
    g => g.Count(),
    (g, v) => new { Hour = g.Key, Count = v })
//yield type `IStreamable<Empty,'a>

vs.

.Partition(e => e.Entry.Timestamp.Hour)
.Aggregate(g => g.Count())
.SelectByKey((time, key, count) => new { Hour = key, Count = count })
//yields type IStreamable<PartitionKey<int>, 'a>

External state management using FASTER

The present (default) implementation of Trill has all memory management done by .Net and in-memory. We are in the process of making implementations of Trill operators that use the FASTER key-value store engine for state management instead. Doing so will allow the option of having operator state spanning both memory and disk, and would allow for potentially faster checkpointing and recovery.

Should the FASTER version of operators prove to have similar or better performance than their previous versions, we will remove the previous versions in favor of the new ones to reduce code duplication.

Repo link for FASTER:
https://github.com/microsoft/faster

Second query on Streamable causes exception

In the following code I am saving a Streamable to disk, then restoring it such that I can execute queries on it. The first one is fine, but I get an exception with the second.

All I am doing here is issuing a Count query twice. Doing some digging, I found that the position on the Stream is at the end after the first query. If you uncomment the Position manipulation, the second query succeeds.

Is the behaviour I am seeing intended? I feel like this is going to make Streambles backed by on-disk Streams very difficult to use.

 internal struct Payload
 {
    public long field1;
    public long field2;
 }

public static void Test()
{
    Random random = new Random(Guid.NewGuid().GetHashCode());
    string filePath = @"C:\temp\streamtest.bin";
    var dataset = Observable.Range(0, TotalInputEvents)
        .Select(e => StreamEvent.CreatePoint(0, new Payload { field1 = e, field2 = random.Next(100, 250) }))
        .ToStreamable();

    using (var stream = File.Create(filePath))
    {
        dataset.ToBinaryStream(stream, writePropertiesToStream: true);
    }

    using (var stream = File.OpenRead(filePath))
    {
        var streamable = stream.ToStreamable<Payload>(readPropertiesFromStream: true);
        // long position = stream.Position;

        streamable.ToStreamEventObservable().Where(e => e.IsPoint).Count().ForEachAsync(e => Console.WriteLine(e)).Wait();

        // stream.Seek(position, SeekOrigin.Begin);

        streamable.ToStreamEventObservable().Where(e => e.IsPoint).Count().ForEachAsync(e => Console.WriteLine(e)).Wait();
    }
}

Unhandled Exception: System.AggregateException: One or more errors occurred. (Invalid integer long in the input stream.) ---> System.Runtime.Serialization.SerializationException: Invalid integer long in the input stream.
   at Microsoft.StreamProcessing.Serializer.BinaryDecoder.DecodeLong() in D:\a\1\s\Sources\Core\Microsoft.StreamProcessing\Serializer\Encoders\BinaryDecoder.cs:line 88
   at lambda_method(Closure , BinaryDecoder )
   at Microsoft.StreamProcessing.BinaryIngressReader`2.Ingress(IStreamObserver`2 observer) in D:\a\1\s\Sources\Core\Microsoft.StreamProcessing\Ingress\Binary\BinaryIngressReader.cs:line 68
   --- End of inner exception stack trace ---
   at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken)
   at System.Threading.Tasks.Task.Wait()
   at PerformanceTest.Program.Test() in C:\Git\TrillSamples\TrillSamples\PerformanceTest\Program.cs:line 87
   at PerformanceTest.Program.Main(String[] args) in C:\Git\TrillSamples\TrillSamples\PerformanceTest\Program.cs:line 148

Direct support for custom data formats

One can always create custom adapters for data in any format and convert it into IObservable or IObservable<StreamEvent>. However, for some operations, it may instead make sense to instead compile the query into operators that operate directly over data in its native format. For instance, one may instead of having a batch of T payloads or a batch of columns of T, one may have a batch of data in parquet format.

The current heuristic for data flowing through Trill is that if it begins in "row mode" it will stay in row mode for the duration of the query. If it is in "column mode" it will stay that way until it hits an operator that cannot operate on columnar data, so it switches to "row mode". We may be able to allow an extensibility point wherein we allow a query to operate over data in "parquet mode" or "JSON mode" or some other mode, and then when it can no longer do so, switch to either "column mode" or "row mode".

Flushes in joins issue?

Bug in joins?

    public struct IONRecord
    {
        public string Id;
        public string Data;

        public override string ToString() => $" {{{nameof(Id)}:{Id}, {nameof(Data)}:{Data}}}";
    }

        [Fact]
        public void LeftOuterJoin_FlushProblem_Test()
        {
            var left_subject = new Subject<StreamEvent<IONRecord>>();
            var right_subject = new Subject<StreamEvent<IONRecord>>();

            var left_stream = left_subject.ToStreamable(disorderPolicy: DisorderPolicy.Throw(), flushPolicy: FlushPolicy.FlushOnPunctuation, periodicPunctuationPolicy: null);
            var right_stream = right_subject.ToStreamable(disorderPolicy: DisorderPolicy.Throw(), flushPolicy: FlushPolicy.FlushOnPunctuation, periodicPunctuationPolicy: null);

            var output_events = new List<string>();
            var output_stream = left_stream
                .LeftOuterJoin(right: right_stream,
                               leftKeySelector: l => l.Id,
                               rightKeySelector: r => r.Id,
                               outerResultSelector: l => new
                               {
                                   l_Id = l.Id,
                                   r_Id = (string)null
                               },
                               innerResultSelector: (l, r) => new
                               {
                                   l_Id = l.Id,
                                   r_Id = r.Id
                               });

            var output_observable = output_stream.ToStreamEventObservable(reshapingPolicy: ReshapingPolicy.None);
            output_observable.Subscribe(onNext: se =>
            {
                if (se.IsData) output_events.Add(se.ToString());
            });

            //Start with no output events
            Assert.Equal(0, output_events.Count);

            //No flush on first left is ok (I guess...)... waiting for sync time to move forward
            left_subject.OnNext(StreamEvent.CreateStart(701, new IONRecord { Id = "c1", Data = "shortdes1" }));
            Assert.Equal(0, output_events.Count);

           
            //Eh? Sync time on the left moved forward... but no flush yet
            left_subject.OnNext(StreamEvent.CreatePunctuation<IONRecord>(punctuationTime: 702));
            Assert.Equal(0, output_events.Count);

            //Only flushing when both, left and right move forward... i thought in joins the sync time is supposed 
            //to be the max sync time from the left and right
            right_subject.OnNext(StreamEvent.CreatePunctuation<IONRecord>(punctuationTime: 702));
            Assert.Equal(new[] {
                "[Start: 701,{ l_Id = c1, r_Id =  }]",
            }, output_events);
        }
 

Stitch doesn't seem to work for Partitioned streams

        [Fact]
        public void PartitionedStreamEvent_EndStart_Stitching_Test()
        {
            PartitionedStreamEvent<string, IONRecord> last;
            var input = new List<PartitionedStreamEvent<string, IONRecord>>
            {
                (last = PartitionedStreamEvent.CreateStart(key: "c1", startTime: 1, payload: new IONRecord { Id = "c1", Data = "shortdes1" })),
                PartitionedStreamEvent.CreateEnd(key: "c1", endTime: 7, originalStartTime: last.StartTime, payload: last.Payload),

                (last = PartitionedStreamEvent.CreateStart(key: "c1", startTime: 7, payload: new IONRecord { Id = "c1", Data = "shortdes1" }))
            }.ToObservable().ToStreamable();

            UnitTests_Funcs.RunAndAssert(new[]
            {
                " #                                         01234567 ",
                " 1 (c1)[Start: 1, {Id:c1, Data:shortdes1}]  [       ",
                " 2 (c1)[End: 7,1, {Id:c1, Data:shortdes1}]  ------) ",
                " 3 (c1)[Start: 7, {Id:c1, Data:shortdes1}]        [ ",
                " 4 [LowWatermark: +inf]                             ",
            }, input, onlyData: false);

            var output = input.Stitch();
            UnitTests_Funcs.RunAndAssert(new[]
            {
                " #                                         01234567 ",
                " 1 (c1)[Start: 1, {Id:c1, Data:shortdes1}]  [       ",
                " 2 (c1)[End: 7,1, {Id:c1, Data:shortdes1}]  ------) ",
                " 3 (c1)[Start: 7, {Id:c1, Data:shortdes1}]        [ ",
                " 4 [LowWatermark: +inf]                             ",
            }, output, onlyData: false);
        }

Direct support for .Net 4.7 and later features

.Net 4..7 introduced the Span data type for better direct memory access. This feature, as well as potentially many others, could be used to either simplify the Trill code base or to improve performance for users of later framework versions.

[Question] Ingress data from real-time source

I have a real-time source to tracking links in a website, I have to collect the links have requested, and aggregate data (group, count...) according to each time period before save info(url, request count, ...) into a database.
How can I use Trill in the best way on this case? I have read the sample, but they are offline ingress data, It only have a sample RealTimeExample but I can't apply to my case.
In case use Trill, I don't need save data permanently, I only need use Trill as a data buffer to aggregate (query/group/count...) of period time, after that data can be deleted.
Can you guide me how to use Trill in this case? Thank you

Question on payload representation value assignment in AfaMultiEventListTransformer.cs

This code below from AfaMultiEventListTransformer.cs looks strange, but I can't quite tell if it is a bug or just a candidate for some code-cleanup?

Is a different payloadRepresentation value supposed to be used if Config.ForceRowBasedExecution == true?
Currently both branches of the C ? A : B statement below end up making the same assignment with a new ColumnarRepresentation(typeof(TPayload))

https://github.com/Microsoft/Trill/blob/eaefac3943f76941955f2169d14f92820e58e6f2/Sources/Core/Microsoft.StreamProcessing/Operators/Afa/AfaMultiEventListTransformer.cs#L52-L58

If this code should always assign the same new ColumnarRepresentation, then I can easily submit a PR to simplify and clarify the code down to:

var payloadRepresentation = new ColumnarRepresentation(typeof(TPayload));

Progressive version of Quantize operator

Implement a progressive version of the quantize operator

  • Add a new overload to IStreamable.Quantize that accepts an additional parameter representing the interval over which intermediate data results should be produced.
  • The parameter should be a proper factor/clean divisor of the hop size parameter

F# API?

Streams are very FP-friendly concept.
But C# is not very FP friendly language, that's why making ETL processes with F# is much easier (currying, pipelining etc).

Do you have any plans on making F# friendly facade?

Difficulty writing to and restoring from binary stream

Is there any sample code available that demonstrates how to write to and read from binary streams?

Currently I have the following adapted from one of the existing Trill samples:

var data = CreateStream(false);
string filePath = @"C:\temp\trill_stream.bin";

using (var stream = new FileStream(filePath, FileMode.Open))
{
    data.ToBinaryStream(stream, true);
}

var fromFile = filePath.ToStreamableFromFile<SensorReading>();

fromFile.ToStreamEventObservable().ForEachAsync(e => Console.WriteLine(e)).Wait();

When I run this code I get a null reference exception on that last line with the following stack trace:

at Microsoft.StreamProcessing.StreamEventEgressPipe`1.OnNext(StreamMessage`2 batch) in D:\a\1\s\Sources\Core\Microsoft.StreamProcessing\Egress\Temporal\TemporalEgressPipe.cs:line 28    
at Microsoft.StreamProcessing.BinaryIngressReader`2.Ingress(IStreamObserver`2 observer) in D:\a\1\s\Sources\Core\Microsoft.StreamProcessing\Ingress\Binary\BinaryIngressReader.cs:line 69

Realtime Join

Hello,

I have the following query, my issue is that, I don't get any output on my first publication to
parentSubject and childSubject, but I do start to receive output on the subsequent publications, but form previous publications. For example, I publish A, but I don't get result of A, until I publish B.

My impression was that, streams would get flushed every 1 and 2 seconds and produce output on every flush.

I create events using StreamEvent.CreateStart
`
var parentStreamable = parentSubject
.ToStreamable(null, FlushPolicy.FlushOnPunctuation, PeriodicPunctuationPolicy.Time((ulong)TimeSpan.FromSeconds(1).Ticks));
var clippedParentStreamable = parentStreamable
.ClipEventDuration(parentStreamable, x => x.Id, x => x.Id);

var childStreamable = childSubject
.ToStreamable(null, FlushPolicy.FlushOnPunctuation, PeriodicPunctuationPolicy.Time((ulong)TimeSpan.FromSeconds(2).Ticks));
var clippedChildStreamable = childStreamable
.ClipEventDuration(childStreamable, x => x.Id, x => x.Id);

        var output = clippedParentStreamable.Where(x => x.State == OrderState.Open)
            .Join(clippedChildStreamable.Where(x => x.State == OrderState.Open),
                x => x.Id,
                x => x.ParentId,
                (left, right) => new {Parent = left, Child = right})
            .GroupApply(
                x => x.Parent,
                x => x.Count(),
                (key, count) => new {ParentOrder = key.Key, ChildCount = count})
            .Where(x => x.ChildCount > 0);`

Help calculating duration of condition

Trying to write code for a stream containing temperature values and if they are above a specified threshold (i.e 70 degrees) for more than 20 minutes, then output a "TEMPERATURE LIMIT EXCEEDED" error value. Once the condition is no longer satisfied, output a "TEMPERATURE NORMAL" value.

Its probably quite straight forward but I currently struggling to work out how to approach this since I need to calculate the passing of time between values if the condition is met.

Nuspec definition / assembly conflicts

The nuspec definitions for the net46 assembly do not seem to be in line with the net standard version.
For example it declares the following dependency:

The net standard has far fewer dependencies listed and refers to 2.9.0.
The result is that I get assembly conflicts and use of the 1.3.2 version instead of the 2.9.0 version.

Could you update the nuspec definition and publish to nuget?

Trill vs Rx

I'm looking at Trill and it seems similar to Rx. In fact Trill seems to use IObservable for ingress and egress to the pipeline. I'm trying to understand the differences. Some that I see are:

  • allows things to be out of sequence (and supports reordering)
  • ??

Areas where it is the same:

  • has Linq with operators like Join, Where and window
  • others

Questions:
Is it different on:

  • performance ?
  • did I read somewhere that it has a concept of checkpointing? So I could resubscribe to a stream and it would ignore until it got to a checkpoint ?
  • we operate on batches but really isn't that the same as saying that I have a IList ?
  • why might I prefer to use Trill instead of Rx ?

I'll probably ask more but this I figure should be the start of a conversation or at least a FAQ item.

Ingress data policy: forward-looking outliers

There are already data policies at ingress for data that arrives "late". We can drop, adjust, or throw when data arrives late, and we can hold data in reserve for a certain period of time to allow some reordering.

However, if a data point arrives "too early" we do not have a way to deal with it currently. For instance, if the current data time is X, and the next data point arrives with a timestamp of X + 2 days, this may be a result of:

  • A spurious data value whose timestamp is garbled
  • Some data that has arrived well ahead of other valid data

Today, however, we accept this value as valid and current, and the sync time is advance all the way to X + 2 days. Any further data will now be compared against the new sync time, and thus data may come start to get dropped or adjusted improperly.

We would like to add an ingress policy that allows for a threshold to be specified for maximum sync time advancement. If a data value arrives so far into the future that the maximum advancement is exceeded, the value is either:

  • Dropped
  • Adjusted to current sync time + maximum delta
  • Throw
  • Held in a queue until time advances

.Net Standard 2.0 implementation of DSP FFT

Performance numbers for FFT in the digital signal processing library were based on a native implementation of the algorithms in .Net Framework. The OSS release of Trill and all of its features must be .Net Standard 2.0 compliant, and it cannot rely on any library for which there is not proper licensing. We need to do one of the following:

  • Implement our own FFT library over .Net Standard 2.0 (not preferable)
  • Find a .Net Standard library over which we can take a dependency (proper licensing, good performance)
  • Find a native library over which we can take a dependency and implement .Net Standard interop (proper licensing, good performance) (not preferable)

First-class support for enumerables

All ingress methods into Trill currently rely on IObservable and thus rely on push semantics. If one wants to ingress data from an IEnumerable, one must first use an external framework or custom code to convert the IEnumerable to an IObservable. For multi-input queries, doing so caused potential threading issues.

We would like to add first-class support for IEnumerable ingress. Doing so will likely require implementation of back pressure mechanisms in binary operators, where data is drawn from one side or the other to maintain optimal data flow (instead of relying on the ingress operators to handle control flow).

Columnar data format expressiveness: polymorphic types

At present, columnar data batches are only supported for non-polymorphic data types. They do not support interfaces, nor do they support non-sealed classes.

  • For non-sealed classes, if the query never does a type test on the objects, and therefore does not ever refer to fields or properties of subclasses, we can lift this restriction and allow columnar on that type for that query. Same for interfaces and for implementers of those interfaces.
  • For queries that do type tests and refer to properties of derived types, we shall add a type reference column (a discriminator column to say what type each row is) and additional columns for any referenced fields and properties. The resulting layout will be similar to a "table-per-hierarchy" layout for mapping an object hierarchy to a relational table.

Support for Apache Arrow

Support for Apache Arrow which is a cross-language development platform for in-memory data. It specifies a standardized language-independent columnar memory format for flat and hierarchical data, organized for efficient analytic operations on modern hardware. It also provides computational libraries and zero-copy streaming messaging and interprocess communication.

Apache Arrow is backed by key developers of 13 major open source projects, including Calcite, Cassandra, Drill, Hadoop, HBase, Ibis, Impala, Kudu, Pandas, Parquet, Phoenix, Spark, and Storm making it the de-facto standard for columnar in-memory analytics.

Related issues:

NullReferenceException upon dispose subscription

I run into an exception when trying to dispose a subscription.
For example:
subscription = observableInput.ToAtemporalStreamable(TimelinePolicy.Sequence(1)).Count().ToStreamEventObservable().Where(c => c.IsData).Subscribe

My input feed will continue to deliver input data, but if I dispose of the subscription then this will result in a null reference exception.

I did some analysis on the codebase and noticed the following:
DisposeState method in SubscriptionBase will call the disposer?.Dispose() method. However the "disposer" field is never set.
In addition it releases the "currentBatch" buffer before any dispose call as well.

The result is that the currentBatch is set to null before the dispose call. This means another OnNext could happen and this will then result in a null reference on the currentBatch.

A possible solution is to make sure the "disposer" field is set to the subscription and the diposer?.Dispose() should be called first in the DisposeState method in order to properly dispose of the observable chain.

There is a second problem with the MonotonicSubscriptionWallClock. This will override the DisposeState method and never calls the base implementation, which means it never disposes of the subscription and keeps on processing events after the subscription has been disposed of.

Progressive versions of hopping and tumbling windows

Implement progressive versions of hopping and tumbling windows:

  • Both window macro methods should get added versions that take an additional parameter
  • The parameter should represent the time interval that should be used to produce intermediate results of aggregations
  • The parameter should be a clean divisor of the tumble size for tumbling windows and the hop size for hopping windows

Operator fusion: Join

At the moment, operator fusion of select, select many, where, and alter duration by constant operators is in place for ingress. Those operations can effectively become part of the ingress operation, localizing some of the computation.

We would like to do the same for join operations, allowing that subset of operations to fuse into the output of a join operator. The result will:

  • Take a row as it is being added to an output batch and intercept it
  • Apply the fused operations to that row
  • Add the result to the output batch

left outer join

I'd like to do a LEFT OUTER JOIN between two streams
It looks like I could use DefinePattern. Is there any other better way?
Thanks,
Nestor

Big and slow-moving reference data

I want combine my input streams with some non-stream-based reference data which is big (many records) and slow-moving (it changes, but no where near the speed of my input streams). How do I approach I best approach this with Trill?

I could think of a couple of ways of approaching it, but neither seemed ideal—and may not even be valid anyway (I'm new to Trill)—so I wanted to ask if there were a recommended solution for such a scenario.

Scenario

I have a input stream of sensor data and a database which stores a relationship between a particular sensor and the operator who owns it.

I take my sensor data input stream, apply some fancy heuristics and determine outliers. I want to notify the operator who owns the sensor of those outliers, so I need to join these events with that database table somehow.

Potential Solution A: Sensor-owner table to a streamable

Lift the rows from my database table to a streamable and use a Trill join operator.

var owners = db.Query<Owner>("select sensor_id, owner_email_address from sensor_owner")
	.ToObservable()
	.ToStreamable()
	.Cache()

sensorStream
	.DoFancyHeuristics()
	.Join(owners, l => l.SensorId, r => r.SensorId, (l, r) => new {
		l.SensorId,
		l.Measurement,
		r.OwnerEmailAddress
	})
	.SendEmailEtc()

However, what if that sensor_owner table is freaking large? (Or we don't have any viable way of scraping the entire set of relationships?)
What about when there are changes made to it? (We could refresh our streamable cache, but we might not be able to do anything smarter than refreshing all of the rows.)

We could instead defer a (specific) relationship query till after there's one of these outliers events, which brings me to...

Potential Solution B: Streamable to observable and back again

Leave the realm of streamables and use an Rx operator.

sensorStream
	.DoFancyHeuristics()
	.ToStreamEventObservable()
	.Select(e => db
		.Query<Owner>("select sensor_id, owner_email_address from sensor_owner where sensor_id = $1", e.SensorId)
		.Select(owner => new { e.SensorId, e.Measurement, owner.OwnerEmailAddress })
		.ToObservable())
	.Concat()
	.ToStreamable()
	.SendEmailEtc()

Here we're paying the cost of a network call on each outlier because we don't want to (or can't) pay the cost of storing it all in memory.
Leaving and re-entering the realm of streamables seems a little off intuitively, would it actually be fine in practice? One thing that comes to mind is if I'm using the HA feature, now I have two checkpoints to take so I no longer have a consistent snapshot of my query state.
(I understand in this scenario I could very well not return to streamables, but in similar scenarios I might want to continue composing operators.)

Other solutions

Leaving Trill

I understand I could leave streamables after I've translated my sensor data to my outlier events, but the complexity of the query is likely to evolve in ways which would very much like me to remain up in the realm of streamables. (e.g. now I want to batch these outliers into one notification to operators.)

Higher-order streamables

I think I could achieve this succinctly if there were support for higher-order streamables in future, but I also understand that such a thing might not be appropriate or compatible with the design of Trill. (e.g. something like Rx's Merge operator not as useful when you add the strict temporal semantics of Trill, though still possible I think...)

sensorStream
	.DoFancyHeuristics()
	.Select(e => db
		.Query<Owner>("select sensor_id, owner_email_address from sensor_owner where sensor_id = $1", e.SensorId)
		.Select(owner => new { e.SensorId, e.Measurement, owner.OwnerEmailAddress })
		.ToObservable()
		.ToStreamable())
	.Concat()
	.SendEmailEtc()

Provider model for IStreamable

Requirement 1: Create an analogue to IQueryable or IQbservable for IStreamable.

That means having a new interface, IQStreamable, and a provider class, IStreamProvider, that can build up an expression in LINQ and then evaluate it later. The expression is inspectible and manipulable just like other LINQ expressions.

Requirement 2: Ensure that the provider can stand independently of the engine.

Not only would this allow someone to implement an engine to the Trill API with different characteristics, but it would also allow the API implementation to evolve independently of the API if it needs to.

Biggest open question: Do we precisely mirror the existing IStreamable API, or do we instead try to implement API 2.0 with lessons learned?

For the most part, we are incredibly proud of our API. However, there are a few places where we would make changes if we could. Those places are:

  • Making IQStreamable have one type parameter instead of two
  • Consequently, also having a group-by syntax that is actually identical to IQueryable instead of GroupApply()
  • No longer requiring QueryContainer to register output, only input

Single invocation of code generation per query

In the current implementation, generated code artifacts are created on demand. That implies a code generation session for each generated operator, a generation session for each generated batch subtype, and a generation session for each generated dictionary subclass.

We speculate that if we can gather all code generation needs into a single code generation session to occur just before data source subscription that startup time for the query should decrease.

Note: If this feature does not result in a significant gain in startup performance, or it causes runtime throughput or latency regression, it should be abandoned.

Operator fusion: aggregates

At the moment, operator fusion of select, select many, where, and alter duration by constant operators is in place for ingress. Those operations can effectively become part of the ingress operation, localizing some of the computation.

We would like to do the same for aggregate operations, allowing that subset of operations to fuse into the output of an aggregate operator. The result will:

  • Take a row as it is being added to an output batch and intercept it
  • Apply the fused operations to that row
  • Add the result to the output batch

Digital Signal Processing extension - API review and integration

The prototype/research implementation of DSP over Trill has been developed, but it is not in product-ready shape. It needs a thorough API review, and a cleanup of the code, as well as tests added.

Key concepts to review:

  • DSP relies on a key notion of a "signal", a data stream that is known to have only one valid value per key. It also relies on equal spacing among data points, and allows one to sample or interpolate to achieve it. How this is done needs to be simplified.
  • How the above interacts with grouping is also to be reviewed and approved - vanilla grouping in Trill is already a way to establish uniqueness among keys

Research paper:
https://dl.acm.org/citation.cfm?doid=3035918.3035935

Does not have a valid equality operator for columnar mode

I changed my payload from struct to class, and now after doing a join I get the error:
Type of left side of join, 'xxxxxxx', does not have a valid equality operator for columnar mode.
Can you help me fast? (usually it takes many days to get a reply here... it'd be great to either have better documentation, or more prompt responses … thanks much)

Cannot run performance test

Cannot run perf test. The error is in a generated file so cannot set a breakpoint and debug it easily as well. The code in the PerformanceTesting project hasn't changed since the initial commit and there is TODO in the generated file, so the question is if the test is supposed to work at all at the moment?

image

image

Trill multicast vs publish

Hello,

I'd like to experiment with a Trill lib a little bit and was wondering what is the best approach for use case where there is a single 'source stream' for example trading data, like individual trades and many queries for such source stream, where first query would be Where that would narrow down trades to individual instruments. In a writing queries guide I've read about multicast that is necessary for such use case, but when looking at the source code I've also found about Publish .
I'm not really familiar with rx so that is a little bit confusing for me when to use Multicast vs Publish , what would you suggest? Or perhaps it's better to create separate streams instead of single one for each Where so then multicast is not necessary? Do you know any guidelines, lessons learned about that? I'd love to read more, but couldn't find anything in docs.

I've also seen mentions about partitioning in the source code, is this only related to group operator or something that could also be useful for my use case?

I'll be setting low batch size (< 5, maybe less), are there any settings that I could tweak for very near real-time queries to get best perf, sacrificing throughput, but getting lowest latency possible ?

Thanks a lot!

Columnar data format efficiency: do not create unnecessary columns

At present, the columnar data format creates columns for every field and property in the type. For instance, a struct with fields a, b, and c will yield batches that have arrays for a, b, and c as well. However, if no downstream operators require access to field b, it makes no sense to allocate any arrays for it let alone populate them. This change need not actually change the type of the generated columnar format, just that the arrays for unneeded fields don't get allocated or populated.

Problem with QueryContainer

See the following code:

        [Fact]
        public void QueryContainer_MultipleOutputs_Test()
        {
            var container = new QueryContainer();

            var asset_subject = new Subject<StreamEvent<IONRecord_Struct>>();
            var assets_input = container.RegisterInput(asset_subject, identifier: "input1");

            var prices_subject = new Subject<StreamEvent<IONRecord_Struct>>();
            var prices_input = container.RegisterInput(prices_subject, identifier: "input2");

            var join = assets_input.Join(
                right: prices_input,
                leftKeySelector: l => l.Id,
                rightKeySelector: r => r.Id,
                resultSelector: (l, r) => new
                {
                    l.Id,
                    l.Data,
                    r_Data = r.Data,
                });
            container.RegisterOutput(join, identifier: "output1").Subscribe(onNext: p => Debug.Print($"{p}"));

            var left_join = assets_input
                .LeftOuterJoin(right: prices_input,
                    leftKeySelector: l => l.Id,
                    rightKeySelector: r => r.Id,
                    outerResultSelector: l => new
                    {
                        l.Id,
                        l.Data,
                        r_Data = string.Empty,
                    },
                    innerResultSelector: (l, r) => new
                    {
                        l.Id,
                        l.Data,
                        r_Data = r.Data
                    });

            //NEXT LINE THROWS !??
            //System.InvalidOperationException: 'Operation is not valid due to the current state of the object.'
            container.RegisterOutput(left_join, identifier: "output2").Subscribe(onNext: p => Debug.Print($"{p}"));

            var process = container.Restore(inputStream: null);
        }

Columnar data format effiency: create extra columns for needed expressions

For a data structure with fields a, b, and c, if the downstream query operators never refer to field a directly but instead refer to a.d.z, or a["bacon"], or some other constant expression, it may make sense to have a column representing a.d.z or a["bacon"] instead of a. This change would require an alteration of the data type structure of the generated columnar batch, and it would change the way that generated operators over those columns reference fields.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.