contributte / rabbitmq Goto Github PK
View Code? Open in Web Editor NEW🐰 RabbitMQ (AMQP, STOMP, MQTT) using BunnyPHP for Nette Framework (@nette).
Home Page: https://contributte.org/packages/contributte/rabbitmq.html
License: MIT License
🐰 RabbitMQ (AMQP, STOMP, MQTT) using BunnyPHP for Nette Framework (@nette).
Home Page: https://contributte.org/packages/contributte/rabbitmq.html
License: MIT License
Hello, i am trying to setup and test run on Nette 3 (with Nette 2.4 was fine).
Adding message to RabbitMq is working, but consuming not.
When i executed command for consuming, this will return:
<h1>Redirect</h1>
<p><a href="http:/sign/in">Please click here to continue</a>.</p>
I dont know if i forgot something or if its Nette 3 issue.
Any ideas?
config.neon
services:
- App\RabbitMq\Consumer\TestConsumer
- App\RabbitMq\Queue\TestQueue(@Gamee\RabbitMQ\Client::getProducer(testProducer))
extensions:
rabbitmq: Gamee\RabbitMQ\DI\RabbitMQExtension
rabbitmq:
connections:
default: %rabbitmq_connection%
queues:
testQueue:
connection: default
autoCreate: true
producers:
testProducer:
queue: testQueue
contentType: application/json
deliveryMode: 2 # Producer::DELIVERY_MODE_PERSISTENT
consumers:
testConsumer:
queue: testQueue
callback: [@App\RabbitMq\Consumer\TestConsumer, consume]
qos:
prefetchSize: 0
prefetchCount: 5
TestConsumer
<?php declare(strict_types=1);
namespace App\RabbitMq\Consumer;
use Bunny\Message;
use Gamee\RabbitMQ\Consumer\IConsumer;
/**
* Class TestConsumer
* @package App\RabbitMq\Consumer
*/
final class TestConsumer implements IConsumer
{
/**
* @param Message $message
* @return int
*/
public function consume(Message $message): int
{
$messageData = json_decode($message->content);
$headers = $message->headers;
var_dump($messageData);
return IConsumer::MESSAGE_ACK; // Or ::MESSAGE_NACK || ::MESSAGE_REJECT
}
}
TestQueue
<?php declare(strict_types=1);
namespace App\RabbitMq\Queue;
use Gamee\RabbitMQ\Producer\Producer;
/**
* Class TestQueue
* @package App\RabbitMq\Queue
*/
final class TestQueue
{
/**
* @var Producer
*/
private $testProducer;
/**
* TestQueue constructor.
* @param Producer $testProducer
*/
public function __construct(Producer $testProducer)
{
$this->testProducer = $testProducer;
}
/**
* @param string $message
*/
public function publish(string $message): void
{
$json = json_encode(['message' => $message]);
$headers = [];
$this->testProducer->publish($json, $headers);
}
}
index.php
<?php
declare(strict_types=1);
require __DIR__ . '/../vendor/autoload.php';
App\Bootstrap::boot()
->createContainer()
->getByType(Nette\Application\Application::class)
->run();
Version 5.x and 6.x should be compatible with php7.3, but there is a lot of syntax from php7.4. So it doesnt work.
Hello,
i have one problem with queue/consumer definition in config. If the configuration is in main config.neon (root/app/config/config.neon
) everything is ok. But i need the configuration in different config (root/libs/Ext/src/DI/ext_config.neon
). When i move it into 'extension' config it doesnt work:
Gamee\RabbitMQ\Producer\Exception\ProducerFactoryException
Producer [testProducer] does not exist
I have some parts of app like small extensions (root/libs
).
All services (consumer and queue) is at the same config (ext_config.neon) like configuration.
Any way or ideas how this could work?
Thank you very much
// root/libs/Ext/src/DI/ext_config.neon
services:
# rabbitMQ
- Ext\RabbitMq\Consumer\TestConsumer
- Ext\RabbitMq\Queue\TestQueue(@Gamee\RabbitMQ\Client::getProducer(testProducer))
rabbitmq:
connections:
default:
user: guest
password: guest
host: localhost
port: 5672
queues:
testQueue:
connection: default
autoCreate: true
producers:
testProducer:
queue: testQueue
contentType: application/json
deliveryMode: 2 # Producer::DELIVERY_MODE_PERSISTENT
consumers:
testConsumer:
queue: testQueue
callback: [@Ext\RabbitMq\Consumer\TestConsumer, consume]
qos:
prefetchSize: 0
prefetchCount: 10
// root/libs/Ext/src/DI/ExtExtension.php
<?php
namespace Ext\DI;
use Nette;
class ExtExtension extends Nette\DI\CompilerExtension {
public function loadConfiguration() {
$config = $this->loadFromFile(__DIR__ . '/ext_config.neon');
$config = Nette\DI\Helpers::expand($config, $this->validateConfig($this->config));
Nette\DI\Compiler::loadDefinitions(
$this->getContainerBuilder(),
$config['services'],
$this->name
);
}
}
Hello RabbitMq supports binding one queue to exchange over multiple binding keys. I am missing this function in this library and i would really appreciate if it can be possible.
If it is possible, can you tell me how?
Currently only string value is possible to pass as a value to attribute routingKey
.
exchanges:
testExchange:
connection: default
type: fanout
queueBindings:
testQueue:
routingKey: testRoutingKey
# force exchange declare on first exchange operation during request
# autoCreate: true
It would be great if something like this will be possible
exchanges:
testExchange:
connection: default
type: fanout
queueBindings:
testQueue:
routingKey:
- 'FirstKey.#'
- 'SecondKey.*'
# force exchange declare on first exchange operation during request
# autoCreate: true
Hello, is there any possibility to stop consumer when error occurs? Typically when doctrine entity manager was closed I need to restart consumer job and supervisor starts it again.
If not can I send PR with new case branch here? https://github.com/gameeapp/nette-rabbitmq/blob/master/src/Consumer/Consumer.php#L79
Thank you for your answer.
I have suspicious about BulkConsumer which silently skips throwable errors. I quickly looked into the code and there is the try-catch block in the BulkConsumer.php which probably cause this.
I would prefer to remove this block and let control returning values by man.
rabbitmq/src/Consumer/BulkConsumer.php
Line 104 in 96a55a9
exception Bunny\Exception\ClientException
(Broken pipe or closed connection)
is raised by calling
Gamee\RabbitMQ\Producer\ProducerFactory->getProducer(xxx)
CALLSTACK
.../web/vendor/bunny/bunny/src/Bunny/Client.php:75 source Bunny\AbstractClient->read()
.../web/vendor/bunny/bunny/src/Bunny/ClientMethods.php:302 source Bunny\Client->feedReadBuffer()
.../web/vendor/bunny/bunny/src/Bunny/Client.php:110 source Bunny\AbstractClient->awaitConnectionTune()
.../gamee/nette-rabbitmq/src/Connection/Connection.php:64 source Bunny\Client->connect()
.../src/Connection/ConnectionFactory.php:76 source Gamee\RabbitMQ\Connection\Connection->__construct(arguments)
.../src/Connection/ConnectionFactory.php:46 source Gamee\RabbitMQ\Connection\ConnectionFactory->create(arguments)
.../nette-rabbitmq/src/Exchange/ExchangeFactory.php:90 source Gamee\RabbitMQ\Connection\ConnectionFactory->getConnection(arguments)
.../nette-rabbitmq/src/Exchange/ExchangeFactory.php:67 source Gamee\RabbitMQ\Exchange\ExchangeFactory->create(arguments)
.../nette-rabbitmq/src/Producer/ProducerFactory.php:82 source Gamee\RabbitMQ\Exchange\ExchangeFactory->getExchange(arguments)
.../nette-rabbitmq/src/Producer/ProducerFactory.php:58 source Gamee\RabbitMQ\Producer\ProducerFactory->create(arguments)
.../web/vendor/gamee/nette-rabbitmq/src/Client.php:41 source Gamee\RabbitMQ\Producer\ProducerFactory->getProducer(arguments)
.../cache/Nette.Configurator/Container_95a270975e.php:1700 source Gamee\RabbitMQ\Client->getProducer(arguments)
1690:
1691: public function createServiceSecurity__userStorage(): Nette\Security\IUserStorage
1692: {
1693: $service = new Nette\Http\UserStorage($this->getService('session.session'));
1694: return $service;
1695: }
1696:
1697:
1698: public function createServiceSegmentationQueue(): App\Model\SegmentationQueue
1699: {
1700: $service = new App\Model\SegmentationQueue($this->getService('rabbitmq.client')->getProducer('segmentationProducer'));
1701: return $service;
1702: }
1703:
1704:
.../web/vendor/nette/di/src/DI/Container.php:181 source Container_95a270975e->createServiceSegmentationQueue()
.../web/vendor/nette/di/src/DI/Container.php:107 source Nette\DI\Container->createService(arguments)
.../cache/Nette.Configurator/Container_95a270975e.php:469 source Nette\DI\Container->getService(arguments)
.../web/vendor/nette/di/src/DI/Container.php:181 source Container_95a270975e->createService__37_App_Presenters_ApiSegmentationPresenter()
.../Bridges/ApplicationDI/PresenterFactoryCallback.php:59 source Nette\DI\Container->createService(arguments)
inner-code Nette\Bridges\ApplicationDI\PresenterFactoryCallback->__invoke(arguments)
.../application/src/Application/PresenterFactory.php:49 source call_user_func(arguments)
.../nette/application/src/Application/Application.php:140 source Nette\Application\PresenterFactory->createPresenter(arguments)
.../nette/application/src/Application/Application.php:83 source Nette\Application\Application->processRequest(arguments)
/application/web/www/index.php:6 source Nette\Application\Application->run()
In ExchangeFactory::create at line 82
This line:
...$queueBinding['routingKeys']
When array key routingKeys
is not set then it's resulting in null
and it cannot be unpacked with ...
.
It causing: TypeError Only arrays and Traversables can be unpacked
Hello, could you please update the dependency symfony/console
to allow latest version (7)? Thank you so much!
When I run command for start a consumer, it runs all of declared consumers, not just one.
/usr/bin/php7.2 /var/www/www.myweb.com/www/index.php rabbitmq:consumer accessInsertConsumer 5
Output from __construct of declared consumers
App\AccessInsertConsumer
App\AccessUpdateConsumer
App\BugConsumer
In current version, queues and exchanges are by default automatically declared on every interaction. This can be overridden by setting autoCreate: false
on queues and exchanges.
Proposition is to have default value of autoCreate: false
and creating console command to create all queues and exchanges. The console command is intended to be ran during the deploy process to create missing queues/exchanges according to configs while the code that's executed during producing and consuming messages will by default behave more efficiently.
Every time a producer/consumer touches any exchange/queue, it is redeclared in QueueFactory
or ExchangeFactory
. This is huge waste of performance and it killed our production RabbitMq server. We would like to switch off this behaviour in the config, like there is autoSetupFabric: off
in Kdyby/RabbitMq extension.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.