Giter Club home page Giter Club logo

Comments (11)

shreyasraghunath avatar shreyasraghunath commented on May 24, 2024 1

Hi @wassim-k ,
We couldn't get any answer to this question. Trill is a very good product but unfortunately lacks community support. Hence, we decided to go with Kafka Streams itself.

from trill.

shreyasraghunath avatar shreyasraghunath commented on May 24, 2024

@peterfreiling , @badrishc , @cybertyche can you please help us? Thanks in advance!

from trill.

peterfreiling avatar peterfreiling commented on May 24, 2024

Is the problem that Trill is outputting too frequently/inputs are being dropped, or that Trill not outputting frequently enough? Punctuations will also progress time forward to the punctuation timestamp, but if this is not required, you could periodically call Flush on the Process (returned from QueryContainer.Restore). I don't think there is a way to configure Trill to only output when the session window ends, but I don't see why you would need that behavior either.

from trill.

agarwalshashank95 avatar agarwalshashank95 commented on May 24, 2024

Hi @peterfreiling , Ill rephrase and try to explain our problem. We are sending data from multiple sensors into a service bus, and a MessageConsumer is picking up messages from the bus and adding it our root stream. On this stream we do a GroupBy to separate it based on the sensor and apply SessionTimeoutWindow as we want to know when a sensor has stopped sending data so that we can do analysis on it and persists the aggregated data into a db.

For the first part of the question we figured out that Trill doesn't automatically populate the punctuation if there is no incoming data. For that we are currently manually pushing in the punctuation using

Observable.Interval(TimeSpan.FromSeconds(1)).Subscribe(t =>
            {
                var timeTicker = DateTime.Now.Ticks;
                this.rootStream.OnNext(StreamEvent.CreatePunctuation<DeviceMessage>(timeTicker + 1));
            });

and the actual data is sent via

rootStream.OnNext(StreamEvent.CreateStart<DeviceMessage>(syncTime, data))

and this works perfectly for our case. But we are still having some issues with getting the 2nd, running some aggregations and persisting the data from a particular session. According to a similar question #112 you mentioned that the output from a SessionTimeout operator would be like

Start 1:10
Start 1:12
Start 1:13

and then end edges for all at the end when the session is supposed to end. Instead we are getting the output like

Start 1:10
End 1:12
Start 1:12
End 1:13
Start 1:13
End 1:43

assuming that timeout was for 30s. My query is

var query = inputStream.GroupApply((msg) => msg.DeviceGuid,
                (stream) => stream.SessionTimeoutWindow(TimeSpan.FromMinutes(1).Ticks)
                    .Aggregate(
                        window => window.SessionAggregate(message => message)
                    ),
                (group, value) => new GroupedAggrgate(group.Key, value)
            );

where SessionAgregrate is a simple implementation of IAggregate that return count, and GroupedAggrerate is a simple class that has a string key and ulong value.
The value that I get from the SesionAgregrate is correct, and returns the correct number of messages belonging to that session. But we have 3 queries:

  1. Is the Start and End edge behavior we are seeing correct for session window
  2. Even if the grouped aggregate operator cannot return an event only for when the session expires, can we somehow get to know it inside our SessionAggregrate implementation, as we want to persist something to the DB only when the session is over.
  3. Currently we create the stream event as StreamEvent.CreateStart(data). If we use StreamEvent.CreatePoint(data) then nothing comes up in our final query stream, just punctuation. If this also expected?

Hopefully this clarifies our problem. Thanks for your help

from trill.

agarwalshashank95 avatar agarwalshashank95 commented on May 24, 2024

Hi @arunkm did you get a chance to look at this issue?

from trill.

shreyasraghunath avatar shreyasraghunath commented on May 24, 2024

@peterfreiling and @arunkm please let us know if you need more information! We're blocked and any help would be appreciated. Thanks in advance

from trill.

arunkm avatar arunkm commented on May 24, 2024

Hi @shreyasraghunath,

  1. This behavior is right for an Aggregate. Typically aggregate has to produce values with mutually exclusive time, So a End and another Start is created when aggregate state/value changes.

  2. No, the aggregate doesn't have much control over data flow.

  3. The StreamEvent.CreateStart(data) and StreamEvent.CreatePoint(data) input through SessionTimeoutWindow should produce identical results in this case.

from trill.

agarwalshashank95 avatar agarwalshashank95 commented on May 24, 2024

Hi @arunkm

As you mentioned the Aggregate operator by itself cannot control the flow of data and the Start and End edges will change since we pass it though a GroupAggrgate operator.

But the scenarios I mentioned above, multiple sensors sending data and wanting to do session timeout and some aggregation for each sensor individually, still remains the same. Do you have any insights into how that could be achieved?

Also regarding the 3rd point we are seeing inconsistent behavior when using CreatePoint vs CreateStart even if all the other code remains the same. Do you think it could be bug in the Trill library?

from trill.

wassim-k avatar wassim-k commented on May 24, 2024

I've run into the same issue.
I expected that, similar to a tumbling window, the session timeout window would only produce an output once the window closes after the specified period elapses, but instead it produces an output for every ingressed event + the final aggregation at the end of the window.

Is there anyway to change it so that it only outputs the aggregation once per session window?

from trill.

peterfreiling avatar peterfreiling commented on May 24, 2024

Sorry for the unresponsiveness. There are two of us maintaining this project part-time among our other commitments.

@shreyasraghunath / @agarwalshashank95 / @wassim-k Let me try to answer your questions. SessionTimeoutWindow simply modifies the lifetime of the events so that the event's end time is set to the end time of the session window, and does not group or aggregate them. So for an input sequence of:

StreamEvent.CreatePoint(0, 1),
StreamEvent.CreatePoint(2, 2),
StreamEvent.CreatePoint(6, 3),
StreamEvent.CreatePoint(40, 4),
...

SessionTimeoutWindow(timeout: 30) will produce the following:

StreamEvent.CreateInterval(0, 36, 1),
StreamEvent.CreateInterval(2, 36, 2),
StreamEvent.CreateInterval(6, 36, 3),
...

In order to aggregate the events within this session, you could do something like the following:

var outputStream = inputStream
    .SessionTimeoutWindow(timeout: 30) // modify end times of events to window termination
    .PointAtEnd() // modify events to be single points in time at the end of the window
    .Sum(e => e); // aggregate all events in the session

This will produce events like:

StreamEvent.CreatePoint(36, 6),
...

These principles can be applied to a grouped stream via GroupApply or Map/Reduce.

As for the Start vs. Point inconsistencies with SessionTimeoutWindow, these should produce identical results. If you have sample input that produces different results, please provide it so we can investigate.

from trill.

wassim-k avatar wassim-k commented on May 24, 2024

That worked perfectly. Thank you @peterfreiling your help so far has been invaluable for our project.

from trill.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.