I want combine my input streams with some non-stream-based reference data which is big (many records) and slow-moving (it changes, but no where near the speed of my input streams). How do I approach I best approach this with Trill?
I could think of a couple of ways of approaching it, but neither seemed ideal—and may not even be valid anyway (I'm new to Trill)—so I wanted to ask if there were a recommended solution for such a scenario.
Scenario
I have a input stream of sensor data and a database which stores a relationship between a particular sensor and the operator who owns it.
I take my sensor data input stream, apply some fancy heuristics and determine outliers. I want to notify the operator who owns the sensor of those outliers, so I need to join these events with that database table somehow.
Potential Solution A: Sensor-owner table to a streamable
Lift the rows from my database table to a streamable and use a Trill join operator.
var owners = db.Query<Owner>("select sensor_id, owner_email_address from sensor_owner")
.ToObservable()
.ToStreamable()
.Cache()
sensorStream
.DoFancyHeuristics()
.Join(owners, l => l.SensorId, r => r.SensorId, (l, r) => new {
l.SensorId,
l.Measurement,
r.OwnerEmailAddress
})
.SendEmailEtc()
However, what if that sensor_owner table is freaking large? (Or we don't have any viable way of scraping the entire set of relationships?)
What about when there are changes made to it? (We could refresh our streamable cache, but we might not be able to do anything smarter than refreshing all of the rows.)
We could instead defer a (specific) relationship query till after there's one of these outliers events, which brings me to...
Potential Solution B: Streamable to observable and back again
Leave the realm of streamables and use an Rx operator.
sensorStream
.DoFancyHeuristics()
.ToStreamEventObservable()
.Select(e => db
.Query<Owner>("select sensor_id, owner_email_address from sensor_owner where sensor_id = $1", e.SensorId)
.Select(owner => new { e.SensorId, e.Measurement, owner.OwnerEmailAddress })
.ToObservable())
.Concat()
.ToStreamable()
.SendEmailEtc()
Here we're paying the cost of a network call on each outlier because we don't want to (or can't) pay the cost of storing it all in memory.
Leaving and re-entering the realm of streamables seems a little off intuitively, would it actually be fine in practice? One thing that comes to mind is if I'm using the HA feature, now I have two checkpoints to take so I no longer have a consistent snapshot of my query state.
(I understand in this scenario I could very well not return to streamables, but in similar scenarios I might want to continue composing operators.)
Other solutions
Leaving Trill
I understand I could leave streamables after I've translated my sensor data to my outlier events, but the complexity of the query is likely to evolve in ways which would very much like me to remain up in the realm of streamables. (e.g. now I want to batch these outliers into one notification to operators.)
Higher-order streamables
I think I could achieve this succinctly if there were support for higher-order streamables in future, but I also understand that such a thing might not be appropriate or compatible with the design of Trill. (e.g. something like Rx's Merge operator not as useful when you add the strict temporal semantics of Trill, though still possible I think...)
sensorStream
.DoFancyHeuristics()
.Select(e => db
.Query<Owner>("select sensor_id, owner_email_address from sensor_owner where sensor_id = $1", e.SensorId)
.Select(owner => new { e.SensorId, e.Measurement, owner.OwnerEmailAddress })
.ToObservable()
.ToStreamable())
.Concat()
.SendEmailEtc()