Giter Club home page Giter Club logo

explore-flink's People

Contributors

codacy-badger avatar dependabot[bot] avatar felipegutierrez avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

explore-flink's Issues

Import the Combiner strategy to flink source code.

Import the static and the dynamic combiner operators implemented by extending Flink Stream Operator into Flink source code. The idea is to have two types of transformation inside DataStream.java class. One is the static combiner which receives the number of tuples to combine (ex: DataStream.combine(long numberOfTuples). Second is the dynamic combiner which watches the frequency of tuples inside the operator (ex: DataStream.combine()).

Create data sources isolated from the Flink application and with triggers to change the pooling frequency dynamically

Application 30 enables one to change the frequency of pooling data from the sources. After we initialize the application one can use the mosquitto_pub cli to change the frequency of a specific data source, as it is shown below:

FLINK_HOME=/home/flink/flink-1.9.0
FLINK_CLI=/home/flink/flink-1.9.0/bin/flink
FLINK_APP=/home/flink/app/explore-flink.jar

$FLINK_CLI run -c org.sense.flink.App $FLINK_APP -app 30 -source 127.0.0.1 -sink 127.0.0.1 \
-offlineData true -frequencyPull 10 -frequencyWindow 30 -syntheticData true

mosquitto_pub -h 127.0.0.1 -t topic-synthetic-frequency-pull -m "TRAFFIC_JAM 1000"
mosquitto_pub -h 127.0.0.1 -t topic-synthetic-frequency-pull -m "AIR_POLLUTION 500"

However, it would be better to isolate the data sources from the actual flink application running on the cluster because it will not be handled by the same JVM processes. One approach that can be used is to implement the data sources using Apache Edgent ant change the pooling frequency dynamically.

  • Operators that create synthetic data are generating backpressure and some operators of the job are idle when observed in the Grafana dashboard. The Flink dashboard can identify which operator is currently generating backpressure.
    Screenshot from 2019-10-02 15-10-50

  • The implementation of the publisher can be based on this and the consumer on this.

  • How Apache Flink™ handles backpressure

Set cgroups command to launch Flink stream applications

We need to launch stream applications using cgroups in order to be able to set different quotas to each operator. It is necessary to add the command cgexec -g "cpulimited" <COMMAND to launch stream application> on the launch.sh script and test it.

Explore partition strategies in Flink or other data stream engines and use probabilistic data structures to improve it.

References

Implement a semi-join for DataStream use case

The semi-join operator for data stream use cases often does not pay off to be implemented. Usually, because it has to handle a state of one of the tables in memory. If the processing window is too large it will be impossible to handle a very big state. So, we came with a solution to use Bloom filters to do this job. It is implemented on two classes ValenciaBloomFilterSemiJoinExample and ValenciaBloomFilterLookupJoinExample.

We have tested using these parameters:

./bin/flink run -c org.sense.flink.App ../app/explore-flink.jar -app 29 -source 127.0.0.1 -sink 127.0.0.1 
 -offlineData true -frequencyPull 60 -frequencyWindow 10 -syntheticData [true|false] 
-optimization [true|false] -lookup [true|false] &

A comparison with a MapReduce semi-join for batch use case is not fair because in data stream use cases we cannot handle the whole state on the operators (it is just too big). A fair comparison would be between a normal join in data stream. Available joins in data stream are alreay working. The good question is when these joins fails due to the big state that they hold. At this point it would pay off to use a MapReduce semi-join with probabilistic data structures.

References about join in data stream

Related papers

make the stream application last for a given time

We need to set a parameter when launch the stream application to be executed in s specific given time. Let's say we want to execute it for 20 minutes. We just need to pass this argument when launch the stream application.

Execute an operator in a given CPU core

Operator's threads are bouncing between different cores, which is normal because th Operating Systems is optimizing the execution of threads to the JVM. However, we need to have an operator fixed in a given CPU core in order to have a more precise mathematical model, even though it will decrease the performance of the whole application. This can be done by using the Thread Affinity library from issue #10 .

Create a bash script to deploy stream applications on the cluster

Create a generic script to deploy stream applications and use different arguments as parameters. The stream application must receive the following parameters:

  • parallelism
  • pooling frequency
  • window frequency
  • offline or online data
  • disable or enable operators chaining
  • The application also has to accept a different pooling frequency while it is running. We want that the application change its pooling frequency dynamically (without restart it).

Improve Flink metrics on Grafana dashboard

  • @felipe: you can now try running the application using two nodes, so we could also collect network data, besides playing with the parallelization. Is it possible to collect the number of cores being utilized by each Task Manager?
  • métrica pra rede em "Network and CPU", talvez porque só tenha um nó sendo executado com dados offline, certo? "Bytes IN" e "Bytes OUT" são disco?
  • Could you please add the quality metric (i.e. throughput) to Grafana?

Please follow up on this link.

Configure Mesos resource manager with Flink on a cluster

I am following the instructiona at the official Flink web site. Set the Mesos parameters and variables on the conf/flink-conf.yaml file:

#===============================================================================
#Mesos configuration
#=============================================================================
mesos.master: r03:5050
mesos.initial-tasks: 10
mesos.resourcemanager.tasks.container.type: mesos
jobmanager.heap.mb: 1024
jobmanager.web.address: 130.239.48.136
jobmanager.web.port: 8081
mesos.resourcemanager.tasks.mem: 4096
taskmanager.heap.mb: 3500
# mesos.constraints.hard.hostattribute: r03,r02
mesos.resourcemanager.tasks.cpus: 10
mesos.resourcemanager.tasks.disk: 4096
mesos.resourcemanager.tasks.taskmanager-cmd: "/home/flink/flink-1.9.0/bin/mesos-taskmanager.sh"
mesos.resourcemanager.artifactserver.ssl.enabled: false
mesos.resourcemanager.framework.name: "FLINK_on_MESOS_intensive_cpu_usage"
# mesos.resourcemanager.tasks.hostname: r03
  • The script ./bin/mesos-appmaster.sh & starts Flink on Mesos.
    I am still not sure which command I have to use to start the Flink application. I created a "HelloWord" project to check which is the correct way. Both commands below are not working. The first return a timeout Could not allocate all requires slots within timeout of 300000 ms. Slots required: 2 and the second does not work at all.
$ /home/flink/flink-1.9.0/bin/flink run /home/flink/hello-flink-mesos/target/hello-flink-mesos.jar &
$ ./bin/mesos-appmaster-job.sh run /home/flink/hello-flink-mesos/target/hello-flink-mesos.jar &
  • I asked on Flink mail list.

  • I had to give permissions to ssh hidden files of all nodes because ssh without password from localhost to localhost was not working.

chmod og-wx ~/.ssh/authorized_keys
chmod 750 $HOME

Collect the outPoolUsage of reducer operators using an asynchronous thread

It is necessary to create a thread inside the pre-aggregate operator to collect the metrics outPoolUsage of the following operator (it might be a reducer) and change the delay interval to aggregate tuples. These metrics are provided through a rest API http://127.0.0.1:8081/jobs/91bef822b69a0b49da5af7f23e0a57bb/vertices/6d2677a0ecc3fd8df0b72ec675edf8f4/metrics?get=0.Shuffle.Netty.Input.Buffers.outPoolUsage,1.Shuffle.Netty.Input.Buffers.outPoolUsage,2.Shuffle.Netty.Input.Buffers.outPoolUsage,3.Shuffle.Netty.Input.Buffers.outPoolUsage

[
{
id: "2.Shuffle.Netty.Input.Buffers.outPoolUsage",
value: "0.1"
},
{
id: "3.Shuffle.Netty.Input.Buffers.outPoolUsage",
value: "1.0"
},
{
id: "1.Shuffle.Netty.Input.Buffers.outPoolUsage",
value: "0.2"
},
{
id: "0.Shuffle.Netty.Input.Buffers.outPoolUsage",
value: "0.1"
}
]

If one of the buffers are higher than 0.25 we should increase the delay to pre-aggregate tuples.

We can start by listening to an MQTT broker every 10 seconds and use another channel to publish data on this broker.

Monitor in which CPU core the Flink operators are running

We need to monitor in which CPU core each physical instance of operators in Flink is placed. This will help the mathematical model to relate the latency and throughput of each physical operator with CPU consumption. Thread Affinity library can be used to show in which CPU core the operator is running.

  • Disclose the CPU core used by each operator.
  • Exchange this value to the Flink metrics.
  • Show this value in Prometheus and Grafana dashboard.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.