mikemadisonweb / yii2-rabbitmq Goto Github PK
View Code? Open in Web Editor NEWRabbitMQ Extension for Yii2
License: MIT License
RabbitMQ Extension for Yii2
License: MIT License
Hi - when starting a consumer up, I get the message "Consumer Outbound
doesn't exist."
Command: ./yii rabbitmq/consume Outbound
Config File Snippet for Consumer
'consumers' => [
[
'name' => 'Outbound',
// Every consumer should define one or more callbacks for corresponding queues
'callbacks' => [
// queue name => callback class name
'Forwarding Outbound' => \components\rabbitmq\OutboundForwarding::class
],
],
]
Hi! Firstly... thanks to all contributors for awesome package! The second one, can some one explain to me why my producer throw this error (if i trying place simple message to my queue).
Missing required parameter "user" when instantiating "PhpAmqpLib\Connection\AbstractConnection".
rabbitmq component configuration (in both... console.php and web.php file configs)
return [
'class' => 'mikemadisonweb\rabbitmq\Configuration',
'connections' => [
'default' => [
'host' => '192.168.22.101',
'port' => '5672',
'user' => 'loginxxx',
'password' => 'xxx',
'vhost' => 'interpretation_host',
'heartbeat' => 0,
],
],
'producers' => [
'event_main_interpretation' => [
'connection' => 'default',
'exchange_options' => [
'name' => 'interpretation',
'type' => 'direct',
],
],
],
'consumers' => [
'event_main_interpretation' => [
'connection' => 'default',
'exchange_options' => [
'name' => 'main_interpretation',
'type' => 'direct',
],
'queue_options' => [
'name' => 'interpretation',
'routing_keys' => ['interpretation'],
'durable' => true,
'auto_delete' => false,
],
'callback' => \app\commands\interpretation\InterpretationDataConsumer::class,
],
],
];
My container definition ("included" in web and console config)
return [
'definitions' => [],
'singletons' => [
'rabbitmq.main-interpretation.consumer' => [
[
'class' => \app\commands\interpretation\InterpretationDataConsumer::class,
],
[
'some-dependency-consumer' => \yii\di\Instance::of('\app\commands\interpretation\InterpretationDataConsumer'),
],
],
'rabbitmq.main-interpretation.producer' => [
[
'class' => \app\commands\interpretation\InterpretationDataProducer::class,
],
[
'some-dependency-producer' => \yii\di\Instance::of('\app\commands\interpretation\InterpretationDataProducer'),
],
],
],
];
Code in controller which place message to queue:
\Yii::$app->rabbitmq->load();
$producer = \Yii::$container->get('rabbitmq.main-interpretation.producer');
$msg = serialize([
'val1' => 1,
'val2' => 2,
]);
$producer->publish($msg, 'interpretation');
Producer (\app\commands\interpretation\InterpretationDataProducer)
<?php
namespace app\commands\interpretation;
use mikemadisonweb\rabbitmq\components\Producer;
class InterpretationDataProducer extends Producer
{
}
Can some one help to me?
Hi everyone
At first, thanks for this useful repository
I have problem on consumer connection
It not work after few minutes, no error, no any response in console
I am using Supervisor
on docker
My supervisor config:
[supervisord]
logfile=/dev/stdout
logfile_maxbytes=0
logfile_backups=0
loglevel=info
nodaemon=true
[program:php-fpm]
command=php-fpm -F -y /usr/local/etc/php-fpm.conf -c /usr/local/etc/php/conf.d/local.ini -R
autorestart=true
stdout_events_enabled=true
stderr_events_enabled=true
stdout_logfile_maxbytes=0
stderr_logfile_maxbytes=0
stdout_logfile=/dev/stdout
stderr_logfile=/dev/stderr
[program:consume]
command=php /home/www/app/yii rabbitmq/consume shahkar
autorestart=true
stdout_events_enabled=true
stderr_events_enabled=true
stdout_logfile_maxbytes=0
stderr_logfile_maxbytes=0
stdout_logfile=/dev/stdout
stderr_logfile=/dev/stderr
My rabbit config:
[
//Config help:: https://github.com/mikemadisonweb/yii2-rabbitmq
'class' => \mikemadisonweb\rabbitmq\Configuration::class,
'auto_declare' => true,
'connections' =>
[
[
'name' => 'rabbit',
// You can pass these parameters as a single `url` option: https://www.rabbitmq.com/uri-spec.html
'host' => ENV::get('RABBIT_HOST'),
'port' => ENV::get('RABBIT_PORT'),
'user' => ENV::get('RABBIT_USER'),
'password' => ENV::get('RABBIT_PASSWORD'),
'vhost' => ENV::get('RABBIT_VHOST'),
'type' => AMQPLazyConnection::class,
'url' => null,
'connection_timeout' => 3,
'read_write_timeout' => 3,
'ssl_context' => null,
'keepalive' => false,
'heartbeat' => 0,
'channel_rpc_timeout' => 0.0
]
// When multiple connections is used you need to specify a `name` option for each one and define them in producer and consumer configuration blocks
],
'exchanges' =>
[
[
'name' => 'd_exchange',
'type' => 'direct',
'passive' => false,
'durable' => true,
'auto_delete' => false,
'internal' => false,
'nowait' => false,
'arguments' => null,
'ticket' => null,
],
],
'queues' =>
[
[
'name' => 'sms',
'passive' => false,
'durable' => true,
'exclusive' => false,
'auto_delete' => false,
'nowait' => false,
'arguments' => null,
'ticket' => null,
// Queue can be configured here the way you want it:
//'durable' => true,
//'auto_delete' => false,
],
[
'name' => 'log',
'passive' => false,
'durable' => true,
'exclusive' => false,
'auto_delete' => false,
'nowait' => false,
'arguments' => null,
'ticket' => null,
],
[
'name' => 'nationalID',
'passive' => false,
'durable' => true,
'exclusive' => false,
'auto_delete' => false,
'nowait' => false,
'arguments' => null,
'ticket' => null,
]
],
'bindings' =>
[
[
'exchange' => 'd_exchange',
'queue' => 'sms',
'to_exchange' => null,
'routing_keys' => ['sms'],
],
[
'exchange' => 'd_exchange',
'queue' => 'log',
'to_exchange' => null,
'routing_keys' => ['log'],
],
],
'producers' =>
[
[
'name' => 'producer',
'connection' => 'rabbit',
'safe' => true,
'content_type' => 'text/plain',
'delivery_mode' => 2,
'serializer' => 'serialize',
],
[
'name' => 'log',
'connection' => 'rabbit',
'safe' => true,
'content_type' => 'text/plain',
'delivery_mode' => 2,
'serializer' => 'serialize',
],
],
'consumers' =>
[
[
'name' => 'shahkar',
// Every consumer should define one or more callbacks for corresponding queues
'connection' => 'rabbit',
'qos' => [
'prefetch_size' => 0,
'prefetch_count' => 0,
'global' => false,
],
'idle_timeout' => 0,
'idle_timeout_exit_code' => null,
'proceed_on_exception' => false,
'deserializer' => 'unserialize',
'callbacks' =>
[
// queue name => callback class name
'sms' => ShahkarConsumer::class,
],
],
],
'on before_consume' => function ($event) {
if (\Yii::$app->has('db') && \Yii::$app->db->isActive) {
try {
\Yii::$app->db->createCommand('SELECT 1')->query();
} catch (\yii\db\Exception $exception) {
\Yii::$app->db->close();
}
}
},
'logger' => [
'log' => true,
'category' => 'application',
'print_console' => true,
'system_memory' => true,
],
]
My consumer class:
namespace app\consumers;
use app\base\BaseConsumer;
use app\models\ModuleCorporates;
use app\models\ModuleTemp;
use Faker\Factory;
use Faker\Provider\fa_IR\Address;
use Faker\Provider\fa_IR\Person;
use Kavenegar\KavenegarApi;
use mikemadisonweb\rabbitmq\components\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
class ShahkarConsumer extends BaseConsumer implements \mikemadisonweb\rabbitmq\components\ConsumerInterface
{
/**
* @inheritDoc
*/
public function execute(AMQPMessage $msg)
{
$data = json_decode($msg->body);
/**
* Data format for `request_corporate_details` is : Object
*[
* 'mode' => 'request_corporate_details',
* 'corporate_national_id' => '.........STRING..........',
* 'ceo_nation_code' => '.........STRING..........',
* 'ceo_uid' => '..........INT.............'
*]
*/
try {
$faker = Factory::create();
$person = new Person;
$model = new ModuleTemp();
$fields =
[
'name' => $person->name(),
'operating_license_number' => $faker->numberBetween(1111111, 9999999),
'year_of_operation' => $faker->numberBetween(1350, 1100),
'description' => $faker->text(255),
'address' => $faker->address,
'phone' => $faker->phoneNumber,
'economic_code' => $faker->numberBetween(1111111111, 21399999999),
'legal_mode' => $faker->numberBetween(1, 3),
'site' => $faker->url,
'field_id' => null,
'category_id' => null,
'ceo_uid' => $data->uid,
'region_id' => null,
'province_id' => null,
'city_id' => null,
'national_id' => $data->national_id,
'ceo_name' => $faker->name,
'ceo_phone' => $faker->phoneNumber,
'ceo_national_code' => $person::nationalCode(),
'postal_code' => Address::postcode(),
'production_plan' => null,
];
$model->key = 'corp_'.$data->ceo_uid;
$model->value = json_encode($fields);
$model->created_at = date('Y-m-d H:i:s');
$model->age = 10800;
$model->save();
} catch (\Exception $e) {
$this->logException($e);
return ConsumerInterface::MSG_REQUEUE;
}
return ConsumerInterface::MSG_ACK;
}
}
Top result in Linux after few minutes, when connection is freezed:
Mem: 7993764K used, 173456K free, 83884K shrd, 1878812K buff, 3713880K cached
CPU: 3% usr 1% sys 0% nic 95% idle 0% io 0% irq 0% sirq
Load average: 0.16 0.15 0.12 3/706 31
PID PPID USER STAT VSZ %VSZ CPU %CPU COMMAND
21 17 www-data S 169m 2% 0 0% php-fpm: pool www
22 17 www-data S 169m 2% 2 0% php-fpm: pool www
17 7 root S 169m 2% 2 0% php-fpm: master process (/usr/local/etc/php-fpm.conf)
7 1 root S 138m 2% 1 0% supervisord -c /home/www/.deploy/config/supervisor.conf
16 7 root S 47100 1% 1 0% php /home/www/app/yii rabbitmq/consume sms
25 0 root S 1676 0% 1 0% /bin/sh
1 0 root S 1608 0% 3 0% sh /entrypoint.sh
31 25 root R 1604 0% 3 0% top
After few minutes consumer is running but in fact it is freezed and dont work.
Any idea?
Here is my stack trace when I tried this command
php yii rabbitmq/declare-all
Exception 'PhpAmqpLib\Exception\AMQPRuntimeException' with message 'Broken pipe or closed connection'
in /var/www/html/API-Server/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php:211
Stack trace:
#0 /var/www/html/API-Server/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php(149): PhpAmqpLib\Wire\IO\StreamIO->read(7)
#1 /var/www/html/API-Server/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php(106): PhpAmqpLib\Wire\AMQPReader->rawread(7)
#2 /var/www/html/API-Server/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Connection/AbstractConnection.php(513): PhpAmqpLib\Wire\AMQPReader->read(7)
#3 /var/www/html/API-Server/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Connection/AbstractConnection.php(560): PhpAmqpLib\Connection\AbstractConnection->wait_frame(0)
#4 /var/www/html/API-Server/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php(217): PhpAmqpLib\Connection\AbstractConnection->wait_channel(0, 0)
#5 /var/www/html/API-Server/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php(327): PhpAmqpLib\Channel\AbstractChannel->next_frame(0)
#6 /var/www/html/API-Server/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Connection/AbstractConnection.php(205): PhpAmqpLib\Channel\AbstractChannel->wait(Array)
#7 /var/www/html/API-Server/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Connection/AMQPLazyConnection.php(23): PhpAmqpLib\Connection\AbstractConnection->connect()
#8 /var/www/html/API-Server/vendor/mikemadisonweb/yii2-rabbitmq/components/Routing.php(186): PhpAmqpLib\Connection\AMQPLazyConnection->channel()
#9 /var/www/html/API-Server/vendor/mikemadisonweb/yii2-rabbitmq/components/Routing.php(65): mikemadisonweb\rabbitmq\components\Routing->declareExchange('UserRegister')
#10 /var/www/html/API-Server/vendor/mikemadisonweb/yii2-rabbitmq/controllers/RabbitMQController.php(141): mikemadisonweb\rabbitmq\components\Routing->declareAll()
#11 [internal function]: mikemadisonweb\rabbitmq\controllers\RabbitMQController->actionDeclareAll('default')
#12 /var/www/html/API-Server/vendor/yiisoft/yii2/base/InlineAction.php(57): call_user_func_array(Array, Array)
#13 /var/www/html/API-Server/vendor/yiisoft/yii2/base/Controller.php(157): yii\base\InlineAction->runWithParams(Array)
#14 /var/www/html/API-Server/vendor/yiisoft/yii2/console/Controller.php(135): yii\base\Controller->runAction('declare-all', Array)
#15 /var/www/html/API-Server/vendor/yiisoft/yii2/base/Module.php(528): yii\console\Controller->runAction('declare-all', Array)
#16 /var/www/html/API-Server/vendor/yiisoft/yii2/console/Application.php(180): yii\base\Module->runAction('rabbitmq/declar...', Array)
#17 /var/www/html/API-Server/vendor/yiisoft/yii2/console/Application.php(147): yii\console\Application->runAction('rabbitmq/declar...', Array)
#18 /var/www/html/API-Server/vendor/yiisoft/yii2/base/Application.php(386): yii\console\Application->handleRequest(Object(yii\console\Request))
#19 /var/www/html/API-Server/yii(20): yii\base\Application->run()
#20 {main}
I have config
'rabbitmq' => [
'class' => 'mikemadisonweb\rabbitmq\Configuration',
'connections' => [
'default' => $params['rabbitmq']
],
'producers' => [
'queue_dispatcher' => [
'connection' => 'default',
'exchange_options' => [
'name' => 'queue.exchange',
'type' => 'topic',
],
],
],
In my controller I'm trying to get producer
$producer = \Yii::$container->get(sprintf(BaseRabbitMQ::PRODUCER_SERVICE_NAME, 'queue_dispatcher'));
And I get error
Class rabbit_mq.producer.queue_dispatcher does not exist
How can I get producer instance? Why producer is not registered in container?
Judging by the code in components/Routing.php, the 'declare' property in the configuration of a queue or an exchange is ignored, and the declaration is done regardless of the property being false. I would have expected it to result in the queue or exchange just being flagged as declared, without actually declaring them.
It is necessary to add support for other versions of PHP, in addition to 5.5 - need to change the mechanics of getting the class name.
Composer.lock should be committed so the required versions are locked.
See https://www.engineyard.com/blog/composer-its-all-about-the-lock-file
When initializing a multipleConsumer, it doesn't read the queue_options
configuration.
Hi @mikemadisonweb ,
I have a problem with multiple consumers. It doesn't work with a flag "messagesLimit", the flag has limit which I set in ConsumerController but I can't set the limit from the command:
It doesn't work:
php yii rabbitmq-consumer/multiple integration -m=10
It works:
php yii rabbitmq-consumer/single integration -m=10
The Producer passes the routing key as routing_key
, but the Logger expects it as routingKey
.
Producer:
yii2-rabbitmq/components/Logger.php
Line 76 in 54b48c4
Logger:
yii2-rabbitmq/components/Producer.php
Line 148 in 54b48c4
hey mike,
if you could update the readme with configuration example and implementation example, that would be very helpful!
The default setup of a producer is creating a queue. This is a strange behavior. You specifically need to set 'declare' => false
to stop this.
I guess this should be default behavior to not create a queue when it's not specified. Every time a worker is restarted, the autoSetupFabric makes sure setupFabric is run again, and a new queue is created resulting in tons of queues.
'producers' => [
'server' => [
'connection' => 'conn',
'exchange_options' => [
'name' => 'exchange',
'type' => 'topic',
],
'queue_options' => [
'declare' => false,
]
],
],
...
PHP Recoverable Error 'yii\base\ErrorException' with message 'Argument 1 passed to mikemadisonweb\rabbitmq\components\Consumer::logError() must be an instance of Throwable, instance of yii\base\InvalidParamException given, called in /home/test2/www/xxx.ru/yii2/vendor/mikemadisonweb/yii2-rabbitmq/components/Consumer.php on line 135 and defined'
in /home/test2/www/xxx.ru/yii2/vendor/mikemadisonweb/yii2-rabbitmq/components/Consumer.php:289
Stack trace:
#0 /home/test2/www/xxx.ru/yii2/vendor/mikemadisonweb/yii2-rabbitmq/components/Consumer.php(154): mikemadisonweb\rabbitmq\components\Consumer->processMessageQueueCallback()
#1 /home/test2/www/xxx.ru/yii2/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AMQPChannel.php(983): mikemadisonweb\rabbitmq\components\Consumer->processMessage()
#2 /home/test2/www/xxx.ru/yii2/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AMQPChannel.php(983): ::call_user_func:{/home/test2/www/xxx.ru/yii2/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AMQPChannel.php:983}()
#3 /home/test2/www/xxx.ru/yii2/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php(202): PhpAmqpLib\Channel\AMQPChannel->basic_deliver()
#4 /home/test2/www/xxx.ru/yii2/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php(202): ::call_user_func:{/home/test2/www/xxx.ru/yii2/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php:202}()
#5 /home/test2/www/xxx.ru/yii2/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php(341): PhpAmqpLib\Channel\AbstractChannel->dispatch()
#6 /home/test2/www/xxx.ru/yii2/vendor/mikemadisonweb/yii2-rabbitmq/components/Consumer.php(49): PhpAmqpLib\Channel\AbstractChannel->wait()
#7 /home/test2/www/xxx.ru/yii2/vendor/mikemadisonweb/yii2-rabbitmq/controllers/ConsumerController.php(97): mikemadisonweb\rabbitmq\components\Consumer->consume()
#8 /home/test2/www/xxx.ru/yii2/vendor/yiisoft/yii2/base/InlineAction.php(57): mikemadisonweb\rabbitmq\controllers\ConsumerController->actionSingle()
#9 /home/test2/www/xxx.ru/yii2/vendor/yiisoft/yii2/base/InlineAction.php(57): ::call_user_func_array:{/home/test2/www/xxx.ru/yii2/vendor/yiisoft/yii2/base/InlineAction.php:57}()
#10 /home/test2/www/xxx.ru/yii2/vendor/yiisoft/yii2/base/Controller.php(156): yii\base\InlineAction->runWithParams()
#11 /home/test2/www/xxx.ru/yii2/vendor/yiisoft/yii2/console/Controller.php(128): yii\base\Controller->runAction()
#12 /home/test2/www/xxx.ru/yii2/vendor/yiisoft/yii2/base/Module.php(523): yii\console\Controller->runAction()
#13 /home/test2/www/xxx.ru/yii2/vendor/yiisoft/yii2/console/Application.php(180): yii\base\Module->runAction()
#14 /home/test2/www/xxx.ru/yii2/vendor/yiisoft/yii2/console/Application.php(147): yii\console\Application->runAction()
#15 /home/test2/www/xxx.ru/yii2/vendor/yiisoft/yii2/base/Application.php(380): yii\console\Application->handleRequest()
#16 /home/test2/www/xxx.ru/yii2/yii(20): yii\base\Application->run()
I use follow producer settings:
'producers' => [
'test' => [
'connection' => 'default',
'exchange_options' => [
'name' => 'test',
'type' => 'direct',
'declare' => true,
],
],
],
and I dont want use logger.
But I get error 'Undefined index: enable'
I explain this library and I found this code
if ($this->logger['enable']) {
.....
}
in vendor/mikemadisonweb/yii2-rabbitmq/components/Producer.php:73
but in this place $this->logger
is equal null.
PS: also I found this code:
$this->logger = array_replace($this->getDefaultLoggerOptions(), $this->logger);
in vendor/mikemadisonweb/yii2-rabbitmq/Configuration.php:143 (in method loadConsumers)
Why you didnt use getDefaultLoggerOptions
in method loadProducers ?
How to fix it?
Hi Mikhail , could you make a new release? Then I can use your repo instead of my own fork in our project. Thank you in advance!
I have following but not seeing a "message consumed" output msg on console?
'logger' => [
'log' => true,
'category' => 'application',
'print_console' => true,
'system_memory' => false,
],
I'm trying to connect with a broker over an SSL Connection. The below snippet is a copy of the connection settings part in the main.php
file.
'connections' => [
[
'type' => AMQPSSLConnection::class,
'host' => $host,
'port' => 5671,
'user' => $username,
'password' => $password,
'vhost' => $vhost,
'ssl_context' => [
'verify_peer' => true,
],
]
],
In AbstractConnectionFactory.php
, the createConnection
method sets up the connection. The settings are passed to the class as follows:
return new $this->_class(
$this->_parameters['host'],
$this->_parameters['port'],
$this->_parameters['user'],
$this->_parameters['password'],
$this->_parameters['vhost'],
false, // insist
'AMQPLAIN', // login_method
null, // login_response
'en_EN', // locale
$this->_parameters['connection_timeout'],
$this->_parameters['read_write_timeout'],
$this->_parameters['ssl_context'],
$this->_parameters['keepalive'],
$this->_parameters['heartbeat'],
$this->_parameters['channel_rpc_timeout']
);
With working settings, the connection fails with the following error message:
Exception 'PhpAmqpLib\Exception\AMQPConnectionClosedException' with message 'Broken pipe or closed connection'
When I change the passing of the variables to this order/variables, I can successfully connect to the broker:
return new $this->_class(
$this->_parameters['host'],
$this->_parameters['port'],
$this->_parameters['user'],
$this->_parameters['password'],
$this->_parameters['vhost'],
$this->_parameters['ssl_context'],
);
Even though it works, I rather don't want to change the code of this extension. Am I doing something wrong in setting up the connection through the standard AbstractConnection
or something in my main.php
settings that's wrong?
Hi,
In the update to version 2.3.1 is a little configuration bug.
Problem:
The validation does not approve of the ssl_options
setting for a connection configured by the AMQPSSLConnection class. (https://github.com/php-amqplib/php-amqplib/blob/v2.9.0/PhpAmqpLib/Connection/AMQPSSLConnection.php)
Validator checks configured connection options against the default connection options set at https://github.com/mikemadisonweb/yii2-rabbitmq/blob/master/Configuration.php#L35
The newly added ssl_options
property misses here.
Solution:
ssl_options
property to the default configuration at https://github.com/mikemadisonweb/yii2-rabbitmq/blob/master/Configuration.php#L35. Set to NULL.ssl_context
option. This is prevented by the new check now at https://github.com/mikemadisonweb/yii2-rabbitmq/blob/2.3.1/Configuration.php#L279. This check should allow AMQPStreamConnection to be used in combination with the ssl_context
option.Explanation
I can see the latest commit was to solve a SSL problem:
2c5552a
This made it mandatory to use the AMQPSSLConnection class. (https://github.com/php-amqplib/php-amqplib/blob/v2.9.0/PhpAmqpLib/Connection/AMQPSSLConnection.php)
Before this update i would set the ssl_context option like so:
return [
'class' => \mikemadisonweb\rabbitmq\Configuration::class,
'auto_declare' => false,
'connections' => [
[
'name' => 'default',
'type' => $amqpConnectionType, // AMQP Lazy connection type
'host' => $amqpHost,
'port' => $amqpPort,
'user' => $amqpUser,
'password' => $amqpPassword,
'vhost' => $amqpVhost,
'ssl_context' => $amqpSslContext,
],
...
This results in the following exception:
Now you would get this error first:
Exception 'mikemadisonweb\rabbitmq\exceptions\InvalidConfigException' with message 'If you are using a ssl connection, the connection type must be AMQPSSLConnection::class'
So if i change the configuration to use the AMQPSSLConnection::class
and set the ssl_options
i get the following:
return [
'class' => \mikemadisonweb\rabbitmq\Configuration::class,
'auto_declare' => false,
'connections' => [
[
'name' => 'default',
'type' => $amqpConnectionType, // AMQP SSL connection type
'host' => $amqpHost,
'port' => $amqpPort,
'user' => $amqpUser,
'password' => $amqpPassword,
'vhost' => $amqpVhost,
'ssl_options' => $amqpSslOptions,
],
...
Notice we now have to use ssl_options which will create the context for us instead of making the context ourselves.
Now i get the following error:
Exception 'mikemadisonweb\rabbitmq\exceptions\InvalidConfigException' with message 'Unknown options: {"ssl_options":{"peer_name":"produqt-core.local","verify_peer":true}}'
in /app/vendor/mikemadisonweb/yii2-rabbitmq/Configuration.php:382
Stack trace:
#0 /app/vendor/mikemadisonweb/yii2-rabbitmq/Configuration.php(263): mikemadisonweb\rabbitmq\Configuration->validateArrayFields(Array, Array)
#1 /app/vendor/mikemadisonweb/yii2-rabbitmq/Configuration.php(208): mikemadisonweb\rabbitmq\Configuration->validateRequired()
#2 /app/vendor/mikemadisonweb/yii2-rabbitmq/Configuration.php(140): mikemadisonweb\rabbitmq\Configuration->validate()
#3 /app/vendor/mikemadisonweb/yii2-rabbitmq/DependencyInjection.php(29): mikemadisonweb\rabbitmq\Configuration->getConfig()
Which is to conclude that either the ssl_options
property has to be added to the default configuration (constant DEFAULTS)
Hello!
Please look at https://github.com/queue-interop/queue-interop project. Using interfaces from it allows us reuse some implementations like enqueue, which supports a lot of transports. You can outsource some code
Hi again!
I was tinkering with some reject messages and noticed the following error:
Undefined class constant 'MSG_REJECT_REQUEUE'
This MSG_REJECT_REQUEUE is documented in the readme. When investigating, I came came across ConsumerInterface.php
which has three constants, MSG_ACK
, MSG_REJECT
and MSG_REQUEUE
.
Bug seems to be a typo between MSG_REQUEUE
and MSG_REJECT_REQUEUE
.
Fix should be easy but not sure which of the two is preferred. I personally prefer the use of MSG_REJECT_REQUEUE
.
Hi ,
I am getting the following error while running the consumer on server ubuntu 14, while its working on my local machine which is ubuntu 16.04
in /var/www/html/clm/v1.0/vendor/mikemadisonweb/yii2-rabbitmq/DependencyInjection.php:54
Stack trace:
#0 [internal function]: mikemadisonweb\rabbitmq\DependencyInjection->mikemadisonweb\rabbitmq{closure}(Object(yii\di\Container), Array, Array)
#1 /var/www/html/clm/v1.0/vendor/yiisoft/yii2/di/Container.php(163): call_user_func(Object(Closure), Object(yii\di\Container), Array, Array)
#2 /var/www/html/clm/v1.0/vendor/mikemadisonweb/yii2-rabbitmq/Configuration.php(155): yii\di\Container->get('rabbit_mq.conne...')
#3 /var/www/html/clm/v1.0/vendor/mikemadisonweb/yii2-rabbitmq/controllers/RabbitMQController.php(205): mikemadisonweb\rabbitmq\Configuration->getConnection('default')
#4 [internal function]: mikemadisonweb\rabbitmq\controllers\RabbitMQController->actionDeleteAll('default')
#5 /var/www/html/clm/v1.0/vendor/yiisoft/yii2/base/InlineAction.php(57): call_user_func_array(Array, Array)
#6 /var/www/html/clm/v1.0/vendor/yiisoft/yii2/base/Controller.php(157): yii\base\InlineAction->runWithParams(Array)
#7 /var/www/html/clm/v1.0/vendor/yiisoft/yii2/console/Controller.php(148): yii\base\Controller->runAction('delete-all', Array)
#8 /var/www/html/clm/v1.0/vendor/yiisoft/yii2/base/Module.php(528): yii\console\Controller->runAction('delete-all', Array)
#9 /var/www/html/clm/v1.0/vendor/yiisoft/yii2/console/Application.php(180): yii\base\Module->runAction('rabbitmq/delete...', Array)
#10 /var/www/html/clm/v1.0/vendor/yiisoft/yii2/console/Application.php(147): yii\console\Application->runAction('rabbitmq/delete...', Array)
#11 /var/www/html/clm/v1.0/vendor/yiisoft/yii2/base/Application.php(386): yii\console\Application->handleRequest(Object(yii\console\Request))
#12 /var/www/html/clm/v1.0/yii(20): yii\base\Application->run()
#13 {main}
I've noticed that the usual console logging I do using Yii::info() doesn't work when called within my consumer's execute method.
The console method which publishes the rabbitmq message calls Yii::info('Publishing msg ...') and it gets logged correctly (in both app.log and the db log table where I configured it to go to), but the execute method in the consumer does not. I noticed that when the consumer is killed, the messages DO finally get written but this is not ideal as the consumer is supposed to run continuously and never die so log messages will never get written.
For error logging purposes it would be good if Yii::info / Yii::error() worked in the consumer (in realtime as for other console scripts), is there a way of achieving this?
Btw thanks for a great yii2 extension
How I can to use delayed-message-exchange plugin?
Hi Mikhail,
I noticed a situation where I have one consumer defined, with one callback for one queue with one wildcard binding ('#"), and I get one connection (as expected), but this connection has 4 channels. Only the 4th channel seems to be getting data. I observe this in the standard RabbitMQ admin webtool. An example yii2 config is attached below.
I've tried the same setup based on https://www.rabbitmq.com/tutorials/tutorial-five-php.html to try to identify the source of this behavior, but then I only get one channel, which is more or less what I expect.
Any comments or suggestions? It doesn't look like any messages are lost, but I haven't really put this to a stress test. It could become a factor in memory consumption for consumers, I think.
Test config (slightly edited to remove comments and application specific identifiers):
<?php
return [
'class' => \mikemadisonweb\rabbitmq\Configuration::class,
'auto_declare' => true,
'connections' => [
[
'name' => 'default',
'host' => '127.0.0.1',
'port' => 5672,
'user' => 'test',
'password' => 'test',
'vhost' => 'test',
],
],
'exchanges' => [
[
'name' => 'test.in',
'type' => 'topic',
'passive' => false,
'durable' => true,
'auto_delete' => false,
],
],
'queues' => [
[
'name' => 'incoming-bucket',
'passive' => false,
'durable' => true,
'auto_delete' => false,
],
],
'bindings' => [
[
'queue' => 'incoming-bucket',
'exchange' => 'test.in',
'routing_keys' => ['#'],
],
],
'producers' => [
],
'consumers' => [
[
'name' => 'incoming',
'deserializer' => 'json_decode',
'callbacks' => [
'incoming-bucket' => \app\modules\amqp\components\IncomingConsumer::class,
],
],
],
'logger' => [
'log' => false,
'category' => 'rabbitmq',
'print_console' => true,
'system_memory' => false,
],
];
Seems like it's impossible to configure binding to the exchange of type HEADERS.
$channel->queue_bind($queueName, $exchangeName, '', false, new AMQPTable($bindArguments));
First, thanks for this extension, i really enjoyed it.
But i'm not sure how i make an RPC Client whit this extension.
Can you help me?
Do you have any example?
Since the upgrade of php-amqplib > 2.9.0, the default config of this package is causing it to loop with 100% CPU.
After a long debugging session, it seems that the default value of null
of the idle_timeout
of the Consumer causes this. It should be 0
.
Hi Mikhail,
I'm running into this issue where if I send a signal to one of my consumers (e.g by doing a kill -1 <pid>
), before the the signal handler is called, I get an exception:
2019-02-12 18:50:56 [-][-][-][error][ErrorException:2] ErrorException: stream_select(): unable to select [4]: Interrupted system call (max_fd=3) in /app/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php:473
Stack trace:
#0 /app/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php(474): PhpAmqpLib\Wire\IO\StreamIO->cleanup_error_handler()
#1 /app/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php(255): PhpAmqpLib\Wire\IO\StreamIO->select(3, 0)
#2 /app/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php(151): PhpAmqpLib\Wire\IO\StreamIO->read(7)
#3 /app/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php(106): PhpAmqpLib\Wire\AMQPReader->rawread(7)
#4 /app/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Connection/AbstractConnection.php(530): PhpAmqpLib\Wire\AMQPReader->read(7)
#5 /app/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Connection/AbstractConnection.php(578): PhpAmqpLib\Connection\AbstractConnection->wait_frame(NULL)
#6 /app/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php(227): PhpAmqpLib\Connection\AbstractConnection->wait_channel(1, NULL)
#7 /app/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php(338): PhpAmqpLib\Channel\AbstractChannel->next_frame(NULL)
#8 /app/vendor/mikemadisonweb/yii2-rabbitmq/components/Consumer.php(202): PhpAmqpLib\Channel\AbstractChannel->wait(NULL, false, NULL)
#9 /app/vendor/mikemadisonweb/yii2-rabbitmq/controllers/RabbitMQController.php(95): mikemadisonweb\rabbitmq\components\Consumer->consume(500)
#10 [internal function]: mikemadisonweb\rabbitmq\controllers\RabbitMQController->actionConsume('operational-sto...')
#11 /app/vendor/yiisoft/yii2/base/InlineAction.php(57): call_user_func_array(Array, Array)
#12 /app/vendor/yiisoft/yii2/base/Controller.php(157): yii\base\InlineAction->runWithParams(Array)
#13 /app/vendor/yiisoft/yii2/console/Controller.php(148): yii\base\Controller->runAction('consume', Array)
#14 /app/vendor/yiisoft/yii2/base/Module.php(528): yii\console\Controller->runAction('consume', Array)
#15 /app/vendor/yiisoft/yii2/console/Application.php(180): yii\base\Module->runAction('rabbitmq/consum...', Array)
#16 /app/vendor/yiisoft/yii2/console/Application.php(147): yii\console\Application->runAction('rabbitmq/consum...', Array)
#17 /app/vendor/yiisoft/yii2/base/Application.php(386): yii\console\Application->handleRequest(Object(yii\console\Request))
#18 /app/yii(30): yii\base\Application->run()
#19 {main}
2019-02-12 18:50:56 [-][-][-][info][mikemadisonweb\rabbitmq\components\Consumer::restartDaemon] RESTARTING
The last line above is the result of me adding a call to Yii:info()
at the top of Consumer::restartDaemon()
, just to see if it is called at all. Am I missing something in my code or configuration? It would be great to get the consumers to cleanly restart or exit based on signals I can send.
Thanks in advance.
Joor
Hi Mikhail,
Sending a SIGHUP to a consumer results in an error:
2019-02-13 11:54:34 [-][-][-][error][Error] Error: Call to undefined method PhpAmqpLib\Connection\AMQPLazyConnection::renew() in /app/mikemadisonweb/yii2-rabbitmq/components/BaseRabbitMQ.php:69
Stack trace:
#0 /app/mikemadisonweb/yii2-rabbitmq/components/Consumer.php(241): mikemadisonweb\rabbitmq\components\BaseRabbitMQ->renew()
#1 [internal function]: mikemadisonweb\rabbitmq\components\Consumer->restartDaemon(1, Array)
#2 /app/php-amqplib/php-amqplib/PhpAmqpLib/Wire/IO/StreamIO.php(233): pcntl_signal_dispatch()
It looks like BaseRabbitMQ::renew()
should be calling $this->conn->reconnect()
in stead of $this->conn->renew()
.
Joor
hello, what's your email address? I've got a few questions seeking for your help.
e.g. how to use this extension?
Looking forward to your reply.
Thanks.
Any reason why $additionalProperties
is dropped from Producer::publish()
in v2.0? This could be useful to support the priority
flag you can set on a message.
I would also need a way to define x-max-priority
on the queue, but that also seemed missing in the previous version.
The consumer calls only __constructor method, but not execute, I see this because of debugging by echoing either from __construct() and execute methods.
There is no errors in console, neither in amqp.log.
Hope U`ll help soon.
Thx.
Hi Mikhail,
I ran into the situation where I can't delete an exchange on the fly (Exception 'mikemadisonweb\rabbitmq\exceptions\RuntimeException' with message 'Queue xxxx is not configured. Delete is aborted.')
The reason may be clear from the message, Routing::deleteExchange() checks the configured queues in stead of the exchanges.
Joor
Hi Mikhail,
We're doing some upgrades to PHP8, which requires a few updates to your package. One of my coworkers has provided a PR (#58). Any chance of getting this merged?
Joor
Getting unknown property: yii\web\Application::rabbitmq
RAEDME "Logger" section configuration example is invalid. According to code in example
logger configuration contains key 'enable' => true
.
But it generates exception 'mikemadisonweb\rabbitmq\exceptions\InvalidConfigException' with message 'Unknown options: {"enable":true}'
Please fix documentation, because the key responsible for enabling is 'log' => true
.
And also by default it disabled. But in docs has written another
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.