Comments (11)
Hi @wassim-k ,
We couldn't get any answer to this question. Trill is a very good product but unfortunately lacks community support. Hence, we decided to go with Kafka Streams itself.
from trill.
@peterfreiling , @badrishc , @cybertyche can you please help us? Thanks in advance!
from trill.
Is the problem that Trill is outputting too frequently/inputs are being dropped, or that Trill not outputting frequently enough? Punctuations will also progress time forward to the punctuation timestamp, but if this is not required, you could periodically call Flush on the Process (returned from QueryContainer.Restore). I don't think there is a way to configure Trill to only output when the session window ends, but I don't see why you would need that behavior either.
from trill.
Hi @peterfreiling , Ill rephrase and try to explain our problem. We are sending data from multiple sensors into a service bus, and a MessageConsumer is picking up messages from the bus and adding it our root stream. On this stream we do a GroupBy to separate it based on the sensor and apply SessionTimeoutWindow as we want to know when a sensor has stopped sending data so that we can do analysis on it and persists the aggregated data into a db.
For the first part of the question we figured out that Trill doesn't automatically populate the punctuation if there is no incoming data. For that we are currently manually pushing in the punctuation using
Observable.Interval(TimeSpan.FromSeconds(1)).Subscribe(t =>
{
var timeTicker = DateTime.Now.Ticks;
this.rootStream.OnNext(StreamEvent.CreatePunctuation<DeviceMessage>(timeTicker + 1));
});
and the actual data is sent via
rootStream.OnNext(StreamEvent.CreateStart<DeviceMessage>(syncTime, data))
and this works perfectly for our case. But we are still having some issues with getting the 2nd, running some aggregations and persisting the data from a particular session. According to a similar question #112 you mentioned that the output from a SessionTimeout operator would be like
Start 1:10
Start 1:12
Start 1:13
and then end edges for all at the end when the session is supposed to end. Instead we are getting the output like
Start 1:10
End 1:12
Start 1:12
End 1:13
Start 1:13
End 1:43
assuming that timeout was for 30s. My query is
var query = inputStream.GroupApply((msg) => msg.DeviceGuid,
(stream) => stream.SessionTimeoutWindow(TimeSpan.FromMinutes(1).Ticks)
.Aggregate(
window => window.SessionAggregate(message => message)
),
(group, value) => new GroupedAggrgate(group.Key, value)
);
where SessionAgregrate is a simple implementation of IAggregate that return count, and GroupedAggrerate is a simple class that has a string key and ulong value.
The value that I get from the SesionAgregrate is correct, and returns the correct number of messages belonging to that session. But we have 3 queries:
- Is the Start and End edge behavior we are seeing correct for session window
- Even if the grouped aggregate operator cannot return an event only for when the session expires, can we somehow get to know it inside our SessionAggregrate implementation, as we want to persist something to the DB only when the session is over.
- Currently we create the stream event as StreamEvent.CreateStart(data). If we use StreamEvent.CreatePoint(data) then nothing comes up in our final query stream, just punctuation. If this also expected?
Hopefully this clarifies our problem. Thanks for your help
from trill.
Hi @arunkm did you get a chance to look at this issue?
from trill.
@peterfreiling and @arunkm please let us know if you need more information! We're blocked and any help would be appreciated. Thanks in advance
from trill.
-
This behavior is right for an Aggregate. Typically aggregate has to produce values with mutually exclusive time, So a End and another Start is created when aggregate state/value changes.
-
No, the aggregate doesn't have much control over data flow.
-
The StreamEvent.CreateStart(data) and StreamEvent.CreatePoint(data) input through SessionTimeoutWindow should produce identical results in this case.
from trill.
Hi @arunkm
As you mentioned the Aggregate operator by itself cannot control the flow of data and the Start and End edges will change since we pass it though a GroupAggrgate operator.
But the scenarios I mentioned above, multiple sensors sending data and wanting to do session timeout and some aggregation for each sensor individually, still remains the same. Do you have any insights into how that could be achieved?
Also regarding the 3rd point we are seeing inconsistent behavior when using CreatePoint vs CreateStart even if all the other code remains the same. Do you think it could be bug in the Trill library?
from trill.
I've run into the same issue.
I expected that, similar to a tumbling window, the session timeout window would only produce an output once the window closes after the specified period elapses, but instead it produces an output for every ingressed event + the final aggregation at the end of the window.
Is there anyway to change it so that it only outputs the aggregation once per session window?
from trill.
Sorry for the unresponsiveness. There are two of us maintaining this project part-time among our other commitments.
@shreyasraghunath / @agarwalshashank95 / @wassim-k Let me try to answer your questions. SessionTimeoutWindow simply modifies the lifetime of the events so that the event's end time is set to the end time of the session window, and does not group or aggregate them. So for an input sequence of:
StreamEvent.CreatePoint(0, 1),
StreamEvent.CreatePoint(2, 2),
StreamEvent.CreatePoint(6, 3),
StreamEvent.CreatePoint(40, 4),
...
SessionTimeoutWindow(timeout: 30) will produce the following:
StreamEvent.CreateInterval(0, 36, 1),
StreamEvent.CreateInterval(2, 36, 2),
StreamEvent.CreateInterval(6, 36, 3),
...
In order to aggregate the events within this session, you could do something like the following:
var outputStream = inputStream
.SessionTimeoutWindow(timeout: 30) // modify end times of events to window termination
.PointAtEnd() // modify events to be single points in time at the end of the window
.Sum(e => e); // aggregate all events in the session
This will produce events like:
StreamEvent.CreatePoint(36, 6),
...
These principles can be applied to a grouped stream via GroupApply or Map/Reduce.
As for the Start vs. Point inconsistencies with SessionTimeoutWindow, these should produce identical results. If you have sample input that produces different results, please provide it so we can investigate.
from trill.
That worked perfectly. Thank you @peterfreiling your help so far has been invaluable for our project.
from trill.
Related Issues (20)
- Caching in non-library mode throws exception and question regarding performance tests
- Install trill on a Linux server?
- NullReferenceException in Columnar Mode HOT 4
- [QUESTION] Output count = 0 when the last event leaves snapshot window HOT 2
- Controlled flush on partitioned streams HOT 1
- Output on aggregations and partition reset
- ReflectionSchemaBuilder ignores knownTypes of the nested type
- Parsing query language to derive Trill operation HOT 1
- NullReferenceException when using checkpointing HOT 2
- Random splitter in a multicore setting
- Trill with Azure Storage Blob and Table HOT 1
- Ingress from Azure Event Hubs - how?
- NullReferenceException when running under net6.0 HOT 1
- Is Trill alive? HOT 10
- Enable .NET 7.0 support HOT 6
- Trillsx HOT 1
- Calling Connect() on a ConnectableStreamable without subscribers throw System.NullReferenceException
- This repo is missing important files HOT 1
- .NET 8 support
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from trill.