It is necessary to create a thread inside the pre-aggregate operator to collect the metrics outPoolUsage
of the following operator (it might be a reducer) and change the delay interval to aggregate tuples. These metrics are provided through a rest API http://127.0.0.1:8081/jobs/91bef822b69a0b49da5af7f23e0a57bb/vertices/6d2677a0ecc3fd8df0b72ec675edf8f4/metrics?get=0.Shuffle.Netty.Input.Buffers.outPoolUsage,1.Shuffle.Netty.Input.Buffers.outPoolUsage,2.Shuffle.Netty.Input.Buffers.outPoolUsage,3.Shuffle.Netty.Input.Buffers.outPoolUsage
[
{
id: "2.Shuffle.Netty.Input.Buffers.outPoolUsage",
value: "0.1"
},
{
id: "3.Shuffle.Netty.Input.Buffers.outPoolUsage",
value: "1.0"
},
{
id: "1.Shuffle.Netty.Input.Buffers.outPoolUsage",
value: "0.2"
},
{
id: "0.Shuffle.Netty.Input.Buffers.outPoolUsage",
value: "0.1"
}
]
If one of the buffers are higher than 0.25 we should increase the delay to pre-aggregate tuples.
We can start by listening to an MQTT broker every 10 seconds and use another channel to publish data on this broker.