leroy-merlin-br / metamorphosis Goto Github PK
View Code? Open in Web Editor NEWEasy and flexible Kafka Library for Laravel and PHP 7
License: MIT License
Easy and flexible Kafka Library for Laravel and PHP 7
License: MIT License
It seems to me that we should prefix our namespace with \LeroyMerlin
or \LeroyMerlinBr
Estamos migrando alguns de nossos serviços para o PHP 8.1 e nesse processo a produção de mensagens parou de funcionar.
Após algum tempo de análise identificamos que o problema foi causado por uma breaking change do rdkafka 4, onde ele parou de enviar as mensagens durante a "destruição" de seus objetos.
https://github.com/arnaud-lb/php-rdkafka/releases/tag/4.0.0
Mesmo antes da migração o código aparentemente funcionava por um erro. É esperado que as mensagens sejam enviadas conforme um poll de mensagens, mas não é o que acontece.
Ao fim da execução do Metamorphosis::produce
todos os objetos do rdkafka e muitos do metamorphosis são destruídos, pois não tem nada que segure uma referencia dos objetos.
O rdkafka 3 faz um flush durante a destruição dos seus objetos, dando a impressão que esta tudo funcionando. Já o rdkafka 4+ não faz, evidenciando o problema.
Segue um código simples que funciona no rdkafa 3 e não funciona no rdkafka 4 que ilustra o mesmo comportamento do Metamorphosis::produce
:
function produce () {
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', 'kafka:9092');
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("test");
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload 1 " . random_int(1000, 9999));
$producer->poll(0);
}
// Como nada faz referencia aos objetos do rdkafka os mesmos são destruidos.
// no rdkafka 3 um flush é executado durante a destruição dos seus objetos
// no rdfaka 4 esse flush não acontece
produce();
sleep(10);
Segue um outro exemplo de um código que funciona, removendo esse comportamento da destruição dos objetos:
$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', 'kafka:9092');
$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic("test");
$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload 1 " . random_int(1000, 9999));
$producer->poll(0);
// as mensagens são enviadas durante o sleep
sleep(10);
Como uma solução paliativa, configuramos o Metamorphosis para funcionar de forma sincronia e com required_acknowledgment
ativado.
Em alguns de nossos projetos o problema não ocorre pois estamos usando as classes internas do Metamorphosis ao invés do métodoMetamorphosis::produce
, segurando as instancias necessárias em memória durante toda a execução do script.
There is no need to use fpm
on this project.
We should add some more control over our runner loop, such as listening for async signals, graceful stoping, memory limit control.
Reference:
https://github.com/laravel/framework/blob/5.6/src/Illuminate/Queue/Worker.php#L85
When the topic has no config for auto-commit a record, the consumer takes de responsibility to make this action, and there's support for that in the RdKafka lib.
This should come with the package, making it easy to commit records for the consumer.
Refs:
Usually when defining a custom partition and an offset, we just want a single or a couple of records to be processed.
Ideas:
--once
option similar to what Laravel has for queue:work
command--limit
optionright now, we don't have any docs about how producer works :(
When starting a consumer via command php artisan kafka:consume
, it would be nice to have an option to change what broker this will use to run the command.
An option like:
$ php artisan kafka:consume offers --broker=some-other-broker
in files:
/vendor/leroy-merlin-br/metamorphosis/src/Consumers/ConsumerInterface.php:8
/vendor/leroy-merlin-br/metamorphosis/src/Consumers/HighLevel.php:29
/vendor/leroy-merlin-br/metamorphosis/src/Consumers/LowLevel.php:35
Returns a RdKafka\Message or NULL on timeout.
wish fix this bug. thanks.
I think that should be a good implementation of this functionality inside the while(true) of the consumer Runner.php
https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka-conf.seterrorcb.html
Would be nice to support this feature:
https://github.com/arnaud-lb/php-rdkafka#low-level-consuming-from-multiple-topics--partitions
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.