PoC de infraestrutura de comunicação utilizando RabbitMQ e Kafka.
-
Acesse a UI do RabbitMQ Management: http://localhost:15672/
- As credenciais de login são
guest
, para o usuário e a senha
- As credenciais de login são
-
Crie uma nova fila, através do menu Queues, com o nome
plugins
e configurações padrão -
Adicione um binding desta fila à exchange
amq.topic
, utilizando como routing key o valorplugins.*
-
Execute o producer MQTT com o comando:
go run producer/producer.go
A partir deste momento, os eventos publicados pelo producer deverão constar na fila
plugins
do RabbitMQ, criada na etapa anterior.
-
Acesse o Confluent Control Center: http://localhost:9021/
-
Crie um novo tópico de nome
rabbitmq_plugins
, com configurações padrão -
Crie um novo connector para extrair os eventos da fila
plugins
, do RabbitMQ, para o tópicorabbitmq_plugins
, do Kafka:// .docker/kafka-connect/connectors/rabbitmq_plugins.properties { "name": "rabbitmq_plugins", "connector.class": "io.confluent.connect.rabbitmq.RabbitMQSourceConnector", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "rabbitmq.host": "rabbitmq", "rabbitmq.username": "guest", "rabbitmq.password": "guest", "confluent.topic.bootstrap.servers": [ "kafka:9092" ], "kafka.topic": "plugins", "rabbitmq.queue": [ "plugins" ] }
A partir deste momento, os eventos publicados pelo agente deverão constar no tópico
rabbitmq_plugins
do Kafka.
-
Acesse o container
comm-infra-ksqldb
e execute a CLI do ksqlDB:docker exec -it comm-infra-ksqldb sh ksql
-
Crie um stream
plugins_csv
para o tópicorabbitmq_plugins
, interpretando-o no formato CSV:SET 'auto.offset.reset' = 'earliest'; CREATE STREAM plugins_csv ( client_id VARCHAR, code VARCHAR, value BIGINT, timestamp BIGINT ) WITH (KAFKA_TOPIC = 'rabbitmq_plugins', VALUE_FORMAT = 'DELIMITED');
-
Crie o tópico
plugins_json
a partir do streamplugins_csv
, desta vez no formato JSON:CREATE STREAM plugins_json WITH (KAFKA_TOPIC = 'plugins_json', VALUE_FORMAT = 'JSON') AS SELECT csv.client_id AS "client_id", csv.code AS "code", csv.value AS "value", FORMAT_TIMESTAMP( FROM_UNIXTIME(csv.timestamp), 'yyyy-MM-dd''T''HH:mm:ss.SSSX' ) AS "created_at" FROM plugins_csv csv EMIT CHANGES;
-
Crie o tópico
plugins_avro
a partir do streamplugins_csv
, desta vez no formato Avro:CREATE STREAM plugins_avro WITH (KAFKA_TOPIC = 'plugins_avro', VALUE_FORMAT = 'AVRO') AS SELECT csv.client_id AS "client_id", csv.code AS "code", csv.value AS "value", FROM_UNIXTIME(csv.timestamp) AS "created_at" FROM plugins_csv csv EMIT CHANGES;
O tópico
plugins_avro
é necessário, pois o conversor de JSON do Kafka Connect não é capaz de inferir o schema dos eventos no formato JSON. Além disso, o Avro suporta tipos virtuais, como o timestamp, que seria tratado como uma string no formato JSON.
-
Crie um novo connector para extrair os eventos do tópico
plugins_avro
e inseri-los no banco de dados Postgres:// .docker/kafka-connect/connectors/jdbc_plugins_avro.properties { "name": "jdbc_plugins_avro", "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "value.converter": "io.confluent.connect.avro.AvroConverter", "value.converter.schema.registry.url": "http://schema-registry:8081/", "topics": [ "plugins_avro" ], "connection.url": "jdbc:postgresql://postgres:5432/comm_infra", "connection.user": "postgres", "connection.password": "postgres", "dialect.name": "PostgreSqlDatabaseDialect", "auto.create": "true" }
A partir deste momento, os eventos publicados pelo agente deverão ser automaticamente inseridos no banco de dados.
-
Acesse a URL do Grafana: http://localhost:3000
- As credenciais padrão são
admin
, para o usuário e a senha
- As credenciais padrão são
-
Em Configuration >> Data sources, adicione o banco de dados Postgres como a fonte padrão de dados
- No campo Host, utilize a URL:
postgres:5432
- No campo Host, utilize a URL:
-
Crie um novo dashboard e, em um novo painel, utilize a query abaixo para criar um gráfico de série temporal:
SELECT created_at AS "time", code AS metric, value FROM plugins_avro WHERE $__timeFilter(created_at) AND code IN ('0001', '0002') ORDER BY 1,2
No momento, a biblioteca do client Kafka do Go não suporta a execução no Windows, sendo necessário executar o comando abaixo de dentro do WSL, ou de um container do Docker.
Além disso, pode ser necessária a adição da linha abaixo ao arquivo /etc/hosts
:
127.0.0.1 kafka
-
Execute o consumer do Kafka com o comando:
go run consumer/consumer.go -t plugins_json -g consumer-group-1
- 1.0.0: Versão inicial
Consulte a documentação das tecnologias abaixo para obter proficiência no desenvolvimento e manutenção deste projeto: