Giter Club home page Giter Club logo

rabbitmqbundle's Introduction

RabbitMqBundle

Latest Version Test Scrutinizer Code Quality Code Coverage PHPStan Join the chat at https://gitter.im/php-amqplib/RabbitMqBundle

About

The RabbitMqBundle incorporates messaging in your application via RabbitMQ using the php-amqplib library.

The bundle implements several messaging patterns as seen on the Thumper library. Therefore publishing messages to RabbitMQ from a Symfony controller is as easy as:

$msg = array('user_id' => 1235, 'image_path' => '/path/to/new/pic.png');
$this->get('old_sound_rabbit_mq.upload_picture_producer')->publish(serialize($msg));

Later when you want to consume 50 messages out of the upload_pictures queue, you just run on the CLI:

$ ./app/console rabbitmq:consumer -m 50 upload_picture

All the examples expect a running RabbitMQ server.

This bundle was presented at Symfony Live Paris 2011 conference. See the slides here.

Version 2

Due to the breaking changes happened caused by Symfony >=4.4, a new tag was released, making the bundle compatible with Symfony >=4.4.

Installation

For Symfony Framework >= 4.4

Require the bundle and its dependencies with composer:

$ composer require php-amqplib/rabbitmq-bundle

Register the bundle:

// app/AppKernel.php

public function registerBundles()
{
    $bundles = array(
        new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle(),
    );
}

Enjoy !

For a console application that uses Symfony Console, Dependency Injection and Config components

If you have a console application used to run RabbitMQ consumers, you do not need Symfony HttpKernel and FrameworkBundle. From version 1.6, you can use the Dependency Injection component to load this bundle configuration and services, and then use the consumer command.

Require the bundle in your composer.json file:

{
    "require": {
        "php-amqplib/rabbitmq-bundle": "^2.0",
    }
}

Register the extension and the compiler pass:

use OldSound\RabbitMqBundle\DependencyInjection\OldSoundRabbitMqExtension;
use OldSound\RabbitMqBundle\DependencyInjection\Compiler\RegisterPartsPass;

// ...

$containerBuilder->registerExtension(new OldSoundRabbitMqExtension());
$containerBuilder->addCompilerPass(new RegisterPartsPass());

Warning - BC Breaking Changes

  • Since 2012-06-04 Some default options for exchanges declared in the "producers" config section have changed to match the defaults of exchanges declared in the "consumers" section. The affected settings are:

    • durable was changed from false to true,
    • auto_delete was changed from true to false.

    Your configuration must be updated if you were relying on the previous default values.

  • Since 2012-04-24 The ConsumerInterface::execute method signature has changed

  • Since 2012-01-03 the consumers execute method gets the whole AMQP message object and not just the body. See the CHANGELOG file for more details.

Usage

Add the old_sound_rabbit_mq section in your configuration file:

old_sound_rabbit_mq:
    connections:
        default:
            host:     'localhost'
            port:     5672
            user:     'guest'
            password: 'guest'
            vhost:    '/'
            lazy:     false
            connection_timeout: 3
            read_write_timeout: 3
            
            # the timeout when waiting for a response from rabbitMQ (0.0 means waits forever)
            channel_rpc_timeout: 0.0

            # requires php-amqplib v2.4.1+ and PHP5.4+
            keepalive: false

            # requires php-amqplib v2.4.1+
            heartbeat: 0

            #requires php_sockets.dll
            use_socket: true # default false
            
            login_method: 'AMQPLAIN' # default 'AMQPLAIN', can be 'EXTERNAL' or 'PLAIN', see https://www.rabbitmq.com/docs/access-control#mechanisms
            
        another:
            # A different (unused) connection defined by an URL. One can omit all parts,
            # except the scheme (amqp:). If both segment in the URL and a key value (see above)
            # are given the value from the URL takes precedence.
            # See https://www.rabbitmq.com/uri-spec.html on how to encode values.
            url: 'amqp://guest:password@localhost:5672/vhost?lazy=1&connection_timeout=6'
    producers:
        upload_picture:
            connection:            default
            exchange_options:      {name: 'upload-picture', type: direct}
            service_alias:         my_app_service # no alias by default
            default_routing_key:   'optional.routing.key' # defaults to '' if not set
            default_content_type:  'content/type' # defaults to 'text/plain'
            default_delivery_mode: 2 # optional. 1 means non-persistent, 2 means persistent. Defaults to "2".
    consumers:
        upload_picture:
            connection:       default
            exchange_options: {name: 'upload-picture', type: direct}
            queue_options:    {name: 'upload-picture'}
            callback:         upload_picture_service
            options:
                no_ack:       false # optional. If set to "true", automatic acknowledgement mode will be used by this consumer. Default "false". See https://www.rabbitmq.com/confirms.html for details.

Here we configure the connection service and the message endpoints that our application will have. In this example your service container will contain the service old_sound_rabbit_mq.upload_picture_producer and old_sound_rabbit_mq.upload_picture_consumer. The later expects that there's a service called upload_picture_service.

If you don't specify a connection for the client, the client will look for a connection with the same alias. So for our upload_picture the service container will look for an upload_picture connection.

If you need to add optional queue arguments, then your queue options can be something like this:

queue_options: {name: 'upload-picture', arguments: {'x-ha-policy': ['S', 'all']}}

another example with message TTL of 20 seconds:

queue_options: {name: 'upload-picture', arguments: {'x-message-ttl': ['I', 20000]}}

The argument value must be a list of datatype and value. Valid datatypes are:

  • S - String
  • I - Integer
  • D - Decimal
  • T - Timestamps
  • F - Table
  • A - Array
  • t - Bool

Adapt the arguments according to your needs.

If you want to bind queue with specific routing keys you can declare it in producer or consumer config:

queue_options:
    name: "upload-picture"
    routing_keys:
      - 'android.#.upload'
      - 'iphone.upload'

Important notice - Lazy Connections

In a Symfony environment all services are fully bootstrapped for each request, from version >= 4.3 you can declare a service as lazy (Lazy Services). This bundle still doesn't support new Lazy Services feature but you can set lazy: true in your connection configuration to avoid unnecessary connections to your message broker in every request. It's extremely recommended to use lazy connections because performance reasons, nevertheless lazy option is disabled by default to avoid possible breaks in applications already using this bundle.

Important notice - Heartbeats

It's a good idea to set the read_write_timeout to 2x the heartbeat so your socket will be open. If you don't do this, or use a different multiplier, there's a risk the consumer socket will timeout.

Please bear in mind, that you can expect problems, if your tasks are generally running longer than the heartbeat period, to which there are no good solutions (link). Consider using either a big value for the heartbeat or leave the heartbeat disabled in favour of the tcp's keepalive (both on the client and server side) and the graceful_max_execution_timeout feature.

Multiple Hosts

You can provide multiple hosts for a connection. This will allow you to use RabbitMQ cluster with multiple nodes.

  old_sound_rabbit_mq:
      connections:
          default:
              hosts:
                - host: host1
                  port: 3672
                  user: user1
                  password: password1
                  vhost: vhost1
                - url: 'amqp://guest:password@localhost:5672/vhost'
              connection_timeout: 3
              read_write_timeout: 3

Pay attention that you can not specify

  connection_timeout 
  read_write_timeout
  use_socket
  ssl_context
  keepalive
  heartbeat
  connection_parameters_provider 

parameters to each host separately.

Dynamic Connection Parameters

Sometimes your connection information may need to be dynamic. Dynamic connection parameters allow you to supply or override parameters programmatically through a service.

e.g. In a scenario when the vhost parameter of the connection depends on the current tenant of your white-labeled application and you do not want (or can't) change it's configuration every time.

Define a service under connection_parameters_provider that implements the ConnectionParametersProviderInterface, and add it to the appropriate connections configuration.

connections:
    default:
        host:     'localhost'
        port:     5672
        user:     'guest'
        password: 'guest'
        vhost:    'foo' # to be dynamically overridden by `connection_parameters_provider`
        connection_parameters_provider: connection_parameters_provider_service

Example Implementation:

class ConnectionParametersProviderService implements ConnectionParametersProvider {
    ...
    public function getConnectionParameters() {
        return array('vhost' => $this->getVhost());
    }
    ...
}

In this case, the vhost parameter will be overridden by the output of getVhost().

Producers, Consumers, What?

In a messaging application, the process sending messages to the broker is called producer while the process receiving those messages is called consumer. In your application you will have several of them that you can list under their respective entries in the configuration.

Producer

A producer will be used to send messages to the server. In the AMQP Model, messages are sent to an exchange, this means that in the configuration for a producer you will have to specify the connection options along with the exchange options, which usually will be the name of the exchange and the type of it.

Now let's say that you want to process picture uploads in the background. After you move the picture to its final location, you will publish a message to server with the following information:

public function indexAction($name)
{
    $msg = array('user_id' => 1235, 'image_path' => '/path/to/new/pic.png');
    $this->get('old_sound_rabbit_mq.upload_picture_producer')->publish(serialize($msg));
}

As you can see, if in your configuration you have a producer called upload_picture, then in the service container you will have a service called old_sound_rabbit_mq.upload_picture_producer.

Besides the message itself, the OldSound\RabbitMqBundle\RabbitMq\Producer#publish() method also accepts an optional routing key parameter and an optional array of additional properties. The array of additional properties allows you to alter the properties with which an PhpAmqpLib\Message\AMQPMessage object gets constructed by default. This way, for example, you can change the application headers.

You can use setContentType and setDeliveryMode methods in order to set the message content type and the message delivery mode respectively, overriding any default set in the "producers" config section. If not overriden by either the "producers" configuration or an explicit call to these methods (as per the below example), the default values are text/plain for content type and 2 for delivery mode.

$this->get('old_sound_rabbit_mq.upload_picture_producer')->setContentType('application/json');

If you need to use a custom class for a producer (which should inherit from OldSound\RabbitMqBundle\RabbitMq\Producer), you can use the class option:

    ...
    producers:
        upload_picture:
            class: My\Custom\Producer
            connection: default
            exchange_options: {name: 'upload-picture', type: direct}
    ...

The next piece of the puzzle is to have a consumer that will take the message out of the queue and process it accordingly.

Consumers

A consumer will connect to the server and start a loop waiting for incoming messages to process. Depending on the specified callback for such consumer will be the behavior it will have. Let's review the consumer configuration from above:

consumers:
    upload_picture:
        connection:       default
        exchange_options: {name: 'upload-picture', type: direct}
        queue_options:    {name: 'upload-picture'}
        callback:         upload_picture_service

As we see there, the callback option has a reference to an upload_picture_service. When the consumer gets a message from the server it will execute such callback. If for testing or debugging purposes you need to specify a different callback, then you can change it there.

Apart from the callback we also specify the connection to use, the same way as we do with a producer. The remaining options are the exchange_options and the queue_options. The exchange_options should be the same ones as those used for the producer. In the queue_options we will provide a queue name. Why?

As we said, messages in AMQP are published to an exchange. This doesn't mean the message has reached a queue. For this to happen, first we need to create such queue and then bind it to the exchange. The cool thing about this is that you can bind several queues to one exchange, in that way one message can arrive to several destinations. The advantage of this approach is the decoupling from the producer and the consumer. The producer does not care about how many consumers will process his messages. All it needs is that his message arrives to the server. In this way we can expand the actions we perform every time a picture is uploaded without the need to change code in our controller.

Now, how to run a consumer? There's a command for it that can be executed like this:

$ ./app/console rabbitmq:consumer -m 50 upload_picture

What does this mean? We are executing the upload_picture consumer telling it to consume only 50 messages. Every time the consumer receives a message from the server, it will execute the configured callback passing the AMQP message as an instance of the PhpAmqpLib\Message\AMQPMessage class. The message body can be obtained by calling $msg->body. By default the consumer will process messages in an endless loop for some definition of endless.

If you want to be sure that consumer will finish executing instantly on Unix signal, you can run command with flag -w.

$ ./app/console rabbitmq:consumer -w upload_picture

Then the consumer will finish executing instantly.

For using command with this flag you need to install PHP with PCNTL extension.

If you want to establish a consumer memory limit, you can do it by using flag -l. In the following example, this flag adds 256 MB memory limit. Consumer will be stopped five MB before reaching 256MB in order to avoid a PHP Allowed memory size error.

$ ./app/console rabbitmq:consumer -l 256

If you want to remove all the messages awaiting in a queue, you can execute this command to purge this queue:

$ ./app/console rabbitmq:purge --no-confirmation upload_picture

For deleting the consumer's queue, use this command:

$ ./app/console rabbitmq:delete --no-confirmation upload_picture

Consumer Events

This can be useful in many scenarios. There are 3 AMQPEvents:

ON CONSUME
class OnConsumeEvent extends AMQPEvent
{
    const NAME = AMQPEvent::ON_CONSUME;

    /**
     * OnConsumeEvent constructor.
     *
     * @param Consumer $consumer
     */
    public function __construct(Consumer $consumer)
    {
        $this->setConsumer($consumer);
    }
}

Lets say you need to sleep / stop consumer/s on a new application deploy. You can listen for OldSound\RabbitMqBundle\Event\OnConsumeEvent` and check for new application deploy.

BEFORE PROCESSING MESSAGE
class BeforeProcessingMessageEvent extends AMQPEvent
{
    const NAME = AMQPEvent::BEFORE_PROCESSING_MESSAGE;

    /**
     * BeforeProcessingMessageEvent constructor.
     *
     * @param AMQPMessage $AMQPMessage
     */
    public function __construct(Consumer $consumer, AMQPMessage $AMQPMessage)
    {
        $this->setConsumer($consumer);
        $this->setAMQPMessage($AMQPMessage);
    }
}

Event raised before processing a AMQPMessage.

AFTER PROCESSING MESSAGE
class AfterProcessingMessageEvent extends AMQPEvent
{
    const NAME = AMQPEvent::AFTER_PROCESSING_MESSAGE;

    /**
     * AfterProcessingMessageEvent constructor.
     *
     * @param AMQPMessage $AMQPMessage
     */
    public function __construct(Consumer $consumer, AMQPMessage $AMQPMessage)
    {
        $this->setConsumer($consumer);
        $this->setAMQPMessage($AMQPMessage);
    }
}

Event raised after processing a AMQPMessage. If the process message will throw an Exception the event will not raise.

IDLE MESSAGE
<?php
class OnIdleEvent extends AMQPEvent
{
    const NAME = AMQPEvent::ON_IDLE;

    /**
     * OnIdleEvent constructor.
     *
     * @param AMQPMessage $AMQPMessage
     */
    public function __construct(Consumer $consumer)
    {
        $this->setConsumer($consumer);
        
        $this->forceStop = true;
    }
}

Event raised when wait method exit by timeout without receiving a message. In order to make use of this event a consumer idle_timeout has to be configured. By default process exit on idle timeout, you can prevent it by setting $event->setForceStop(false) in a listener.

Idle timeout

If you need to set a timeout when there are no messages from your queue during a period of time, you can set the idle_timeout in seconds. The idle_timeout_exit_code specifies what exit code should be returned by the consumer when the idle timeout occurs. Without specifying it, the consumer will throw an PhpAmqpLib\Exception\AMQPTimeoutException exception.

consumers:
    upload_picture:
        connection:             default
        exchange_options:       {name: 'upload-picture', type: direct}
        queue_options:          {name: 'upload-picture'}
        callback:               upload_picture_service
        idle_timeout:           60
        idle_timeout_exit_code: 0

Timeout wait

Set the timeout_wait in seconds. The timeout_wait specifies how long the consumer will wait without receiving a new message before ensuring the current connection is still valid.

consumers:
    upload_picture:
        connection:             default
        exchange_options:       {name: 'upload-picture', type: direct}
        queue_options:          {name: 'upload-picture'}
        callback:               upload_picture_service
        idle_timeout:           60
        idle_timeout_exit_code: 0
        timeout_wait:           10

Graceful max execution timeout

If you'd like your consumer to be running up to certain time and then gracefully exit, then set the graceful_max_execution.timeout in seconds. "Gracefully exit" means, that the consumer will exit either after the currently running task or immediatelly, when waiting for new tasks. The graceful_max_execution.exit_code specifies what exit code should be returned by the consumer when the graceful max execution timeout occurs. Without specifying it, the consumer will exit with status 0.

This feature is great in conjuction with supervisord, which together can allow for periodical memory leaks cleanup, connection with database/rabbitmq renewal and more.

consumers:
    upload_picture:
        connection:             default
        exchange_options:       {name: 'upload-picture', type: direct}
        queue_options:          {name: 'upload-picture'}
        callback:               upload_picture_service

        graceful_max_execution:
            timeout: 1800 # 30 minutes 
            exit_code: 10 # default is 0 

Fair dispatching

You might have noticed that the dispatching still doesn't work exactly as we want. For example in a situation with two workers, when all odd messages are heavy and even messages are light, one worker will be constantly busy and the other one will do hardly any work. Well, RabbitMQ doesn't know anything about that and will still dispatch messages evenly.

This happens because RabbitMQ just dispatches a message when the message enters the queue. It doesn't look at the number of unacknowledged messages for a consumer. It just blindly dispatches every n-th message to the n-th consumer.

In order to defeat that we can use the basic.qos method with the prefetch_count=1 setting. This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don't dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.

From: http://www.rabbitmq.com/tutorials/tutorial-two-python.html

Be careful as implementing the fair dispatching introduce a latency that will hurt performance (see this blogpost). But implemeting it allow you to scale horizontally dynamically as the queue is increasing. You should evaluate, as the blogpost recommends, the right value of prefetch_size accordingly with the time taken to process each message and your network performance.

With RabbitMqBundle, you can configure that qos_options per consumer like that:

consumers:
    upload_picture:
        connection:       default
        exchange_options: {name: 'upload-picture', type: direct}
        queue_options:    {name: 'upload-picture'}
        callback:         upload_picture_service
        qos_options:      {prefetch_size: 0, prefetch_count: 1, global: false}

Autowiring producers and consumers

If used with Symfony 4.2+ bundle declares in container set of aliases for producers and regular consumers. Those are used for arguments autowiring based on declared type and argument name. This allows you to change previous producer example to:

public function indexAction($name, ProducerInterface $uploadPictureProducer)
{
    $msg = array('user_id' => 1235, 'image_path' => '/path/to/new/pic.png');
    $uploadPictureProducer->publish(serialize($msg));
}

Name of argument is constructed from producer or consumer name from configuration and suffixed with producer or consumer word according to type. In contrast to container items naming convention word suffix (producer or consumer) will not be duplicated if name is already suffixed. upload_picture producer key will be changed to $uploadPictureProducer argument name. upload_picture_producer producer key would also be aliased to $uploadPictureProducer argument name. It is best to avoid names similar in such manner.

All producers are aliased to OldSound\RabbitMqBundle\RabbitMq\ProducerInterface and producer class option from configuration. In sandbox mode only ProducerInterface aliases are made. It is highly recommended to use ProducerInterface class when type hinting arguments for producer injection.

All consumers are aliased to OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface and %old_sound_rabbit_mq.consumer.class% configuration option value. There is no difference between regular and sandbox mode. It is highly recommended to use ConsumerInterface when type hinting arguments for client injection.

Callbacks

Here's an example callback:

<?php

//src/Acme/DemoBundle/Consumer/UploadPictureConsumer.php

namespace Acme\DemoBundle\Consumer;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;

class UploadPictureConsumer implements ConsumerInterface
{
    public function execute(AMQPMessage $msg)
    {
        //Process picture upload.
        //$msg will be an instance of `PhpAmqpLib\Message\AMQPMessage` with the $msg->body being the data sent over RabbitMQ.

        $isUploadSuccess = someUploadPictureMethod();
        if (!$isUploadSuccess) {
            // If your image upload failed due to a temporary error you can return false
            // from your callback so the message will be rejected by the consumer and
            // requeued by RabbitMQ.
            // Any other value not equal to false will acknowledge the message and remove it
            // from the queue
            return false;
        }
    }
}

As you can see, this is as simple as implementing one method: ConsumerInterface::execute.

Keep in mind that your callbacks need to be registered as normal Symfony services. There you can inject the service container, the database service, the Symfony logger, and so on.

See https://github.com/php-amqplib/php-amqplib/blob/master/doc/AMQPMessage.md for more details of what's part of a message instance.

To stop the consumer, callback can throw StopConsumerException (the last consumed message will not be ack) or AckStopConsumerException (the message will be ack). If using demonized, ex: supervisor, the consumer will actually restart.

Recap

This seems to be quite a lot of work for just sending messages, let's recap to have a better overview. This is what we need to produce/consume messages:

  • Add an entry for the consumer/producer in the configuration.
  • Implement your callback.
  • Start the consumer from the CLI.
  • Add the code to publish messages inside the controller.

And that's it!

Audit / Logging

This was a requirement to have a traceability of messages received/published. In order to enable this you'll need to add enable_logger config to consumers or publishers.

consumers:
    upload_picture:
        connection:       default
        exchange_options: {name: 'upload-picture', type: direct}
        queue_options:    {name: 'upload-picture'}
        callback:         upload_picture_service
        enable_logger: true

If you would like you can also treat logging from queues with different handlers in monolog, by referencing channel phpamqplib.

RPC or Reply/Response

So far we just have sent messages to consumers, but what if we want to get a reply from them? To achieve this we have to implement RPC calls into our application. This bundle makes it pretty easy to achieve such things with Symfony.

Let's add a RPC client and server into the configuration:

rpc_clients:
    integer_store:
        connection: default #default: default
        unserializer: json_decode #default: unserialize
        lazy: true #default: false
        direct_reply_to: false
rpc_servers:
    random_int:
        connection: default
        callback:   random_int_server
        qos_options: {prefetch_size: 0, prefetch_count: 1, global: false}
        exchange_options: {name: random_int, type: topic}
        queue_options: {name: random_int_queue, durable: false, auto_delete: true}
        serializer: json_encode

For a full configuration reference please use the php app/console config:dump-reference old_sound_rabbit_mq command.

Here we have a very useful server: it returns random integers to its clients. The callback used to process the request will be the random_int_server service. Now let's see how to invoke it from our controllers.

First we have to start the server from the command line:

$ ./app/console_dev rabbitmq:rpc-server random_int

And then add the following code to our controller:

public function indexAction($name)
{
    $client = $this->get('old_sound_rabbit_mq.integer_store_rpc');
    $client->addRequest(serialize(array('min' => 0, 'max' => 10)), 'random_int', 'request_id');
    $replies = $client->getReplies();
}

As you can see there, if our client id is integer_store, then the service name will be old_sound_rabbit_mq.integer_store_rpc. Once we get that object we place a request on the server by calling addRequest that expects three parameters:

  • The arguments to be sent to the remote procedure call.
  • The name of the RPC server, in our case random_int.
  • A request identifier for our call, in this case request_id.

The arguments we are sending are the min and max values for the rand() function. We send them by serializing an array. If our server expects JSON information, or XML, we will send such data here.

The final piece is to get the reply. Our PHP script will block till the server returns a value. The $replies variable will be an associative array where each reply from the server will contained in the respective request_id key.

By default the RPC Client expects the response to be serialized. If the server you are working with returns a non-serialized result then set the RPC client expect_serialized_response option to false. For example, if the integer_store server didn't serialize the result the client would be set as below:

rpc_clients:
    integer_store:
        connection: default
        expect_serialized_response: false

You can also set a expiration for request in milliseconds, after which message will no longer be handled by server and client request will simply time out. Setting expiration for messages works only for RabbitMQ 3.x and above. Visit http://www.rabbitmq.com/ttl.html#per-message-ttl for more information.

public function indexAction($name)
{
    $expiration = 5000; // milliseconds
    $client = $this->get('old_sound_rabbit_mq.integer_store_rpc');
    $client->addRequest($body, $server, $requestId, $routingKey, $expiration);
    try {
        $replies = $client->getReplies();
        // process $replies['request_id'];
    } catch (\PhpAmqpLib\Exception\AMQPTimeoutException $e) {
        // handle timeout
    }
}

As you can guess, we can also make parallel RPC calls.

Parallel RPC

Let's say that for rendering some webpage, you need to perform two database queries, one taking 5 seconds to complete and the other one taking 2 seconds โ€“very expensive queriesโ€“. If you execute them sequentially, then your page will be ready to deliver in about 7 seconds. If you run them in parallel then you will have your page served in about 5 seconds. With RabbitMqBundle we can do such parallel calls with ease. Let's define a parallel client in the config and another RPC server:

rpc_clients:
    parallel:
        connection: default
rpc_servers:
    char_count:
        connection: default
        callback:   char_count_server
    random_int:
        connection: default
        callback:   random_int_server

Then this code should go in our controller:

public function indexAction($name)
{
    $client = $this->get('old_sound_rabbit_mq.parallel_rpc');
    $client->addRequest($name, 'char_count', 'char_count');
    $client->addRequest(serialize(array('min' => 0, 'max' => 10)), 'random_int', 'random_int');
    $replies = $client->getReplies();
}

Is very similar to the previous example, we just have an extra addRequest call. Also we provide meaningful request identifiers so later will be easier for us to find the reply we want in the $replies array.

Direct Reply-To clients

To enable direct reply-to clients you just have to enable option direct_reply_to on the rpc_clients configuration for the client.

This option will use pseudo-queue amq.rabbitmq.reply-to when doing RPC calls. On the RPC server there is no modification needed.

Priority queue

RabbitMQ has priority queue implementation in the core as of version 3.5.0. Any queue can be turned into a priority one using client-provided optional arguments (but, unlike other features that use optional arguments, not policies). The implementation supports a limited number of priorities: 255. Values between 1 and 10 are recommended. Check documentation

here is how you can declare a priority queue

    consumers:
        upload_picture:
            connection:       default
            exchange_options: {name: 'upload-picture', type: direct}
            queue_options:    {name: 'upload-picture', arguments: {'x-max-priority': ['I', 10]} }
            callback:         upload_picture_service

if upload-picture queue exist before you must delete this queue before you run rabbitmq:setup-fabric command

Now let's say you want to make a message with high priority, you have to publish the message with this additional information

public function indexAction()
{    
    $msg = array('user_id' => 1235, 'image_path' => '/path/to/new/pic.png');
    $additionalProperties = ['priority' => 10] ; 
    $routing_key = '';
    $this->get('old_sound_rabbit_mq.upload_picture_producer')->publish(serialize($msg), $routing_key , $additionalProperties );
}

Multiple Consumers

It's a good practice to have a lot of queues for logic separation. With a simple consumer you will have to create one worker (consumer) per queue and it can be hard to manage when dealing with many evolutions (forget to add a line in your supervisord configuration?). This is also useful for small queues as you may not want to have as many workers as queues, and want to regroup some tasks together without losing flexibility and separation principle.

Multiple consumers allow you to handle this use case by listening to multiple queues on the same consumer.

Here is how you can set a consumer with multiple queues:

multiple_consumers:
    upload:
        connection:       default
        exchange_options: {name: 'upload', type: direct}
        queues_provider: queues_provider_service
        queues:
            upload-picture:
                name:     upload_picture
                callback: upload_picture_service
                routing_keys:
                    - picture
            upload-video:
                name:     upload_video
                callback: upload_video_service
                routing_keys:
                    - video
            upload-stats:
                name:     upload_stats
                callback: upload_stats

The callback is now specified under each queues and must implement the ConsumerInterface like a simple consumer. All the options of queues-options in the consumer are available for each queue.

Be aware that all queues are under the same exchange, it's up to you to set the correct routing for callbacks.

The queues_provider is a optional service that dynamically provides queues. It must implement QueuesProviderInterface.

Be aware that queues providers are responsible for the proper calls to setDequeuer and that callbacks are callables (not ConsumerInterface). In case service providing queues implements DequeuerAwareInterface, a call to setDequeuer is added to the definition of the service with a DequeuerInterface currently being a MultipleConsumer.

Arbitrary Bindings

You may find that your application has a complex workflow and you need to have arbitrary binding. Arbitrary binding scenarios might include exchange to exchange bindings via destination_is_exchange property.

bindings:
    - {exchange: foo, destination: bar, routing_key: 'baz.*' }
    - {exchange: foo1, destination: foo, routing_key: 'baz.*', destination_is_exchange: true}

The rabbitmq:setup-fabric command will declare exchanges and queues as defined in your producer, consumer and multi consumer configurations before it creates your arbitrary bindings. However, the rabbitmq:setup-fabric will NOT declare addition queues and exchanges defined in the bindings. It's up to you to make sure exchanges/queues are declared.

Dynamic Consumers

Sometimes you have to change the consumer's configuration on the fly. Dynamic consumers allow you to define the consumers queue options programmatically, based on the context.

e.g. In a scenario when the defined consumer must be responsible for a dynamic number of topics and you do not want (or can't) change it's configuration every time.

Define a service queue_options_provider that implements the QueueOptionsProviderInterface, and add it to your dynamic_consumers configuration.

dynamic_consumers:
    proc_logs:
        connection: default
        exchange_options: {name: 'logs', type: topic}
        callback: parse_logs_service
        queue_options_provider: queue_options_provider_service

Example Usage:

$ ./app/console rabbitmq:dynamic-consumer proc_logs server1

In this case the proc_logs consumer runs for server1 and it can decide over the queue options it uses.

Anonymous Consumers

Now, why will we ever need anonymous consumers? This sounds like some internet threat or somethingโ€ฆ Keep reading.

In AMQP there's a type of exchange called topic where the messages are routed to queues based on โ€“you guessโ€“ the topic of the message. We can send logs about our application to a RabbiMQ topic exchange using as topic the hostname where the log was created and the severity of such log. The message body will be the log content and our routing keys the will be like this:

  • server1.error
  • server2.info
  • server1.warning
  • ...

Since we don't want to be filling up queues with unlimited logs what we can do is that when we want to monitor the system, we can launch a consumer that creates a queue and attaches to the logs exchange based on some topic, for example, we would like to see all the errors reported by our servers. The routing key will be something like: #.error. In such case we have to come up with a queue name, bind it to the exchange, get the logs, unbind it and delete the queue. Happily AMPQ provides a way to do this automatically if you provide the right options when you declare and bind the queue. The problem is that you don't want to remember all those options. For such reason we implemented the Anonymous Consumer pattern.

When we start an Anonymous Consumer, it will take care of such details and we just have to think about implementing the callback for when the messages arrive. Is it called Anonymous because it won't specify a queue name, but it will wait for RabbitMQ to assign a random one to it.

Now, how to configure and run such consumer?

anon_consumers:
    logs_watcher:
        connection:       default
        exchange_options: {name: 'app-logs', type: topic}
        callback:         logs_watcher

There we specify the exchange name and it's type along with the callback that should be executed when a message arrives.

This Anonymous Consumer is now able to listen to Producers, which are linked to the same exchange and of type topic:

    producers:
        app_logs:
            connection:       default
            exchange_options: {name: 'app-logs', type: topic}

To start an Anonymous Consumer we use the following command:

$ ./app/console_dev rabbitmq:anon-consumer -m 5 -r '#.error' logs_watcher

The only new option compared to the commands that we have seen before is the one that specifies the routing key: -r '#.error'.

Batch Consumers

In some cases you will want to get a batch of messages and then do some processing on all of them. Batch consumers will allow you to define logic for this type of processing.

e.g: Imagine that you have a queue where you receive a message for inserting some information in the database, and you realize that if you do a batch insert is much better then by inserting one by one.

Define a callback service that implements BatchConsumerInterface and add the definition of the consumer to your configuration.

batch_consumers:
    batch_basic_consumer:
        connection:       default
        exchange_options: {name: 'batch', type: fanout}
        queue_options:    {name: 'batch'}
        callback:         batch.basic
        qos_options:      {prefetch_size: 0, prefetch_count: 2, global: false}
        timeout_wait:     5
        auto_setup_fabric: false
        idle_timeout_exit_code: -2
        keep_alive: false
        graceful_max_execution:
            timeout: 60

Note: If the keep_alive option is set to true, idle_timeout_exit_code will be ignored and the consumer process continues.

You can implement a batch consumer that will acknowledge all messages in one return or you can have control on what message to acknowledge.

namespace AppBundle\Service;

use OldSound\RabbitMqBundle\RabbitMq\BatchConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;

class DevckBasicConsumer implements BatchConsumerInterface
{
    /**
     * @inheritDoc
     */
    public function batchExecute(array $messages)
    {
        echo sprintf('Doing batch execution%s', PHP_EOL);
        foreach ($messages as $message) {
            $this->executeSomeLogicPerMessage($message);
        }

        // you ack all messages got in batch
        return true; 
    }
}
namespace AppBundle\Service;

use OldSound\RabbitMqBundle\RabbitMq\BatchConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;

class DevckBasicConsumer implements BatchConsumerInterface
{
    /**
     * @inheritDoc
     */
    public function batchExecute(array $messages)
    {
        echo sprintf('Doing batch execution%s', PHP_EOL);
        $result = [];
        /** @var AMQPMessage $message */
        foreach ($messages as $message) {
            $result[$message->getDeliveryTag()] = $this->executeSomeLogicPerMessage($message);
        }

        // you ack only some messages that have return true
        // e.g:
        // $return = [
        //      1 => true,
        //      2 => true,
        //      3 => false,
        //      4 => true,
        //      5 => -1,
        //      6 => 2,
        //  ];
        // The following will happen:
        //  * ack: 1,2,4
        //  * reject and requeq: 3
        //  * nack and requeue: 6
        //  * reject and drop: 5
        return $result;
    }
}

How to run the following batch consumer:

    $ ./bin/console rabbitmq:batch:consumer batch_basic_consumer -w

Important: BatchConsumers will not have the -m|messages option available Important: BatchConsumers can also have the -b|batches option available if you want to only consume a specific number of batches and then stop the consumer. Give the number of the batches only if you want the consumer to stop after those batch messages were consumed!

STDIN Producer

There's a Command that reads data from STDIN and publishes it to a RabbitMQ queue. To use it first you have to configure a producer service in your configuration file like this:

producers:
    words:
      connection:       default
      exchange_options: {name: 'words', type: direct}

That producer will publish messages to the words direct exchange. Of course you can adapt the configuration to whatever you like.

Then let's say you want to publish the contents of some XML files so they are processed by a farm of consumers. You could publish them by just using a command like this:

$ find vendor/symfony/ -name "*.xml" -print0 | xargs -0 cat | ./app/console rabbitmq:stdin-producer words

This means you can compose producers with plain Unix commands.

Let's decompose that one liner:

$ find vendor/symfony/ -name "*.xml" -print0

That command will find all the .xml files inside the symfony folder and will print the file name. Each of those file names is then piped to cat via xargs:

$ xargs -0 cat

And finally the output of cat goes directly to our producer that is invoked like this:

$ ./app/console rabbitmq:stdin-producer words

It takes only one argument which is the name of the producer as you configured it in your config.yml file.

Other Commands

Setting up the RabbitMQ fabric

The purpose of this bundle is to let your application produce messages and publish them to some exchanges you configured.

In some cases and even if your configuration is right, the messages you are producing will not be routed to any queue because none exist. The consumer responsible for the queue consumption has to be run for the queue to be created.

Launching a command for each consumer can be a nightmare when the number of consumers is high.

In order to create exchanges, queues and bindings at once and be sure you will not lose any message, you can run the following command:

$ ./app/console rabbitmq:setup-fabric

When desired, you can configure your consumers and producers to assume the RabbitMQ fabric is already defined. To do this, add the following to your configuration:

producers:
    upload_picture:
      auto_setup_fabric: false
consumers:
    upload_picture:
      auto_setup_fabric: false

By default a consumer or producer will declare everything it needs with RabbitMQ when it starts. Be careful using this, when exchanges or queues are not defined, there will be errors. When you've changed any configuration you need to run the above setup-fabric command to declare your configuration.

How To Contribute

To contribute just open a Pull Request with your new code taking into account that if you add new features or modify existing ones you have to document in this README what they do. If you break BC then you have to document it as well. Also you have to update the CHANGELOG. So:

  • Document New Features.
  • Update CHANGELOG.
  • Document BC breaking changes.

License

See: resources/meta/LICENSE.md

Credits

The bundle structure and the documentation is partially based on the RedisBundle

rabbitmqbundle's People

Contributors

alexbumbacea avatar andreea-anamaria avatar bburnichon avatar caciobanu avatar come avatar doppynl avatar eloar avatar fatmuemoo avatar goetas avatar haswalt avatar icolomina avatar igaponov avatar igrizzli avatar ikwattro avatar joelwurtz avatar mihaileu avatar nathanjrobertson avatar passkey1510 avatar ramunasd avatar ruudk avatar sixdayz avatar skafandri avatar steveyeah avatar stloyd avatar stof avatar torondor27 avatar trompette avatar twistedlogic avatar vicb avatar videlalvaro avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rabbitmqbundle's Issues

Create rpc server

Hi,
I want to write a rpc server and I found that the document only mentioned rpc client,
I read the source code and guessed the way to write rpc server is to write a callback "function"
which is defined in service.yml, but in this way I would get a "object", not "function".

Thanks in advanced.

RpcClient infinite loop if the RpcServer dies

The current implementation makes RpcClient wait in an infinite loop if the processing Server side dies silently (segfault, or unhandled php error like out of memory error).
This is caused by server, because it confirms the message before calling the user function.
If the two lines are exchanged (and the ack placed in the catch as well) it will be more robust, more tolerant to errors.
The backdraw is that this case the call maybe processed twice or more, depends where the exception happens in the RPC code.

hardcoded values in exchange_declare call

Hi,

RabbitMq/Producer.php, line 12:
$this->ch->exchange_declare($this->exchangeOptions['name'], $this->exchangeOptions['type'], false, true, false);

Why are you using hardcoded values here? I have completely equal declaration of producer and consumer (with auto_delete set to true, and durable to false) and the code above tries to redeclare (with no success of course) the queue with other options just because it ignores my settings

Is it a bug or am I missing something?

Thanks

Queues/Messages not created automatically ?

Hi,

I have the following configuration :

old_sound_rabbit_mq:
    connections:
        default:
            host:      %rabbitmq.host%
            port:      %rabbitmq.port%
            user:      %rabbitmq.user%
            password:  %rabbitmq.password%
            vhost:     '/'
    producers:
        main:
            connection: default
            exchange_options: {name: 'main_direct', type: direct}
    consumers:
       post_processing:
            connection: default
            exchange_options: {name: 'main_direct', type: direct}
            queue_options:    {name: 'post_processing', durable: true}
            callback:          consumer.post_processing
        permission:
            connection: default
            exchange_options: {name: 'main_direct', type: direct}
            queue_options:    {name: 'permission', durable: true}
            callback:         consumer.permission
        index:
            connection: default
            exchange_options: {name: 'main_direct', type: direct}
            queue_options:    {name: 'index', durable: true}
            callback:         consumer.index_document
        following_encode:
            connection: default
            exchange_options: {name: 'main_direct', type: direct}
            queue_options:    {name: 'following_encode', durable: true}
            callback:         consumer.encode_following_list
        user_activity:
            connection: default
            exchange_options: {name: 'main_direct', type: direct}
            queue_options:    {name: 'user_activity', durable: true}
            callback:         consumer.user_activity
        build_feed:
            connection: default
            exchange_options: {name: 'main_direct', type: direct}
            queue_options:    {name: 'build_feed', durable: true}
            callback:         consumer.build_feed

And i add jobs through a manager which basically does :

    public function addJob($queue, $data, $priority = 0)
    {
        $message = serialize($data);
        $this->producer->publish($message, $queue);
    }

$queue being the queue name defined in the configuration.

The things is that with this configuration I have no queue nor message being added.
sudo rabbitmqctl list_queues returns nothing.

What am i doing wrong ?

Thanks !

There is no way to setup queues\bindings

Currently you manually should declare queues and bindings between queues and exchanges. It will be cool to have a CLI command that will setup necessary relations for the application based on some configuration. Should it be implemented in this bundle and how would you like to see it? What's your opinion on that feature?

[RFC] Added Stub producer

Hello.

IMHO, it could be usefull to add a stub producer into this bundle (or in php-aqm lib) that will mock a producer. It will we usefull in functionnal tests to mock the real rabbitmq.
Thanks to the data-collector, we could ask to the mock if new message has been pushed to rabbitmq, etc.

What do you think ?

[Feature Request]

It looks like this requirement may overlap a couple of existing requests.

We need the ability to dynamically create and destroy queues - while running as a consumer - using the same connection.

I will continue to review the code - maybe this can be done already - I was just concerned reading "lazy" threads.

Do Consumer have the timeout mechanism?

Hi, I'm using this bundle to deal with the data transfer, it really help me a lot.
I have several consumers, each use the bulk insert to get better performance.
But i find out every consumer hold some data because the amount didn't reach the batch size.
Will you add the timeout mechanism to Consumer?
Or there has any method can let Consumer know the queue situation?

Consumer architecture

Hello @videlalvaro

I review your bundle a bit and I have couple of issues.

  1. Does producer should know anything about queues? As far as I understand producer just publishes messages to exchange. The only thing producer should declare is an exchange.
  2. Does consumer should know anything about exchanges and routing keys? As far as I understand consumer just consumes the message from a queue and doesn't care where it come from. The only thing consumer should declare is a queue he consumes.
  3. If both questions above should be answered "no" then we need a way to setup exchanges,queues and bindings between them and here comes my issue #27. I can implement these changes, but I just need your opinion maybe I have missed something important.

InvalidDefinitionException

After upgrade symfony to 2.1, we got this error.

  [Symfony\Component\Config\Definition\Exception\InvalidDefinitionException]                              
  ->addDefaultsIfNotSet() is not applicable to prototype nodes at path "old_sound_rabbit_mq.connections"  

Exchange names cannot contain the "period" character

Messages sent to an exchange which name contains a period (.) never arrive to the RabbitMQ broker. Here's the config for such a producer:

producers:
    accounts_update:
        connection: default
        exchange_options: { name: 'accounts.update', type: direct }

According to RabbitMQ's reference for AMQP 0-9-1 periods are allowed in the exchange name:

The exchange name consists of a non-empty sequence of these characters: 
letters, digits, hyphen, underscore, period, or colon.

Make RabbitMQ unavailability catchable

Currently if the server is unavailable then even on pages that do not use rabbitmq, our app dies with a fatal error.

I want to implement a fallback in this case but seem unable to prevent the fatal errors.

As long as the rabbitmq bundle is configured in config.yml, then it seems there is no way to operate the app in the case of the rabbit server being unavailable, so this is a problem.

Consumer and Producer config

Default parameters for the consumer and the producer aren't the same. So when you don't explicitly specify all the parameters you will have tha kind of exceptions

PRECONDITION_FAILED - cannot redeclare exchange 'exchange-name' in vhost '/' with different type, durable, internal or autodelete value

with that config.yml

old_sound_rabbit_mq:
    connections:
        default:
            host:      'localhost'
            port:      5672
            user:      'guest'
            password:  'guest'
            vhost:     '/'
    producers:
        sample_task:
            connection: default
            exchange_options: {name: 'exchange-name', type: direct}
    consumers:
        sample_task:
            connection: default
            exchange_options: {name: 'exchange-name', type: direct}
            queue_options:    {name: 'queue-name'}
            callback:         sample_task_service

requeue loop

Hi,

I'm using rabbitmq to verify a status change in a third-party service, reading messages from a queue and sending them to a webservice that returns if it's status has changed or not. If there is no status change, I have to requeue the message to check it again later (I'm doing this just returning false in the consumer's execute method), but the message is again delivered to consumer instantaneously, causing a loop until the message is dropped. Is there any way to avoid this behavior? Some way to postpone the delivery of the message to consumer or postpone the requeuing of the message?

Reconnect if connection is closed

I have a consumer that once started performs some time-consuming operations and then tries to write to Rabbit. If the connection was closed by RabbitMQ server, then the application crashes with:

Notice: fwrite(): send of 31 bytes failed with errno=104 Connection reset by peer in vendor/videlalvaro/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php line 157

What can be done about it? Doesn't the bundle have possibility to re-connect to RabbitMQ server?

Message rates stop at 25 msg/s

I have a simple command

config.yml:

producers:
    my-producer':
        connection: default
        exchange_options: {name: 'my-producer', type: direct}

SimpleCommand.php

    ...
    $producer = $this->getContainer()
            ->get('old_sound_rabbit_mq.my-producer');

    for($i = 0; $i < 10000; $i++) {
        $producer->publish('for_test');
    }
    ...

It runs correctly but the message rates display on the RabbitMq management is alway 25 msg/s.
Run 2 process at the same time can get 50 msg/s.

At the same machine, I can get about 2000 msg/s using node-amqp.
Do I miss something?

debian 6
symfony 2.0.3
rabbitmq-server version 2.6.1 / R14A

multiple routing key

hi,

I would like to know if it is possible to have several routing key in the configuration:
my configuartion is:

# define producers
rabbitmq_producers:
    upload_picture:
        connection:         default
        exchange_options:   {name: 'upload-picture', type: topic, durable: true }
    content_manager:
        connection:         default
        exchange_options:   {name: 'Service', type: topic, durable: true }

rabbitmq_consumers:
    upload_picture:
        connection: default
        exchange_options: {name: 'upload-picture', type: topic}
        queue_options:    {name: 'upload-picture'}
        callback:         upload_picture_service

rabbitmq_urn:
    upload_picture:

rabbitmq_routing_key:  upload_picture.state

I would like to have something like :

rabbitmq_producers:
    upload_picture:
        connection:         default
        exchange_options:   {name: 'upload-picture', type: topic, durable: true }
        routing_key: upload_picture.state
    content_manager:
        connection:         default
        exchange_options:   {name: 'Service', type: topic, durable: true 
         routing_key: content.state

it is possible for consumers too?
thank you for ur help =)

New Symfony preview release compatibility

The Symfony DependencyInjector has no more Extension class, it's now an Interface, so the actual call
use Symfony\Component\DependencyInjection\Extension\Extension
in
src/OldSound/RabbitMqBundle/DependencyInjection/OldSoundRabbitmqExtension.php
is broken. We tried to use the ExtensionInterface instead but we faced a missing "Debug" method when launching a consumer

Queue on producer

Hi there,
I'm working with AMQP since few days.
I have build a Java application which use AMQP & do some little test with phpamqp-lib out-of-box.

Right now, I'm trying to integrate the AMQP producer to our Symfony2 application.
I'm looking on your RabbitMqBundle.

The first question is about queue.
When work with AMQP, Producer & Consumer have to work with a specific channel. This channel can be configure to use a specific queue_name. Isn't it ?

The problem is, that I have not found any method to specify a queue_name for the producer channel.

For the consumer, the configuration have a dedicated part for queue options.. Like queue_name.

Thanks for help.

Queue configuration doesn't handle argument passing correctly

When trying to pass an argument array into the queue declaration the Configuration.php doesn't accept it because it actually isn't a scalar value.

queue_options:    {name: 'search_indexer', arguments: { x-ha-policy: ['S', 'all']} }

Results in:

  [Symfony\Component\Config\Definition\Exception\InvalidTypeException]                                                           
  Invalid type for path "old_sound_rabbit_mq.consumers.search_indexer.queue_options.arguments". Expected scalar, but got array.  

I tried to modify the TreeBuilder configuration a bit, but without success. The variant below cuts off the key 'x-ha-policy' key.

    public function addQueueConfiguration(NodeBuilder $nb)
    {
        return $nb
            ->arrayNode('queue_options')
                ->children()
                    ->scalarNode('name')->end()
                    ->booleanNode('passive')->defaultFalse()->end()
                    ->booleanNode('durable')->defaultTrue()->end()
                    ->booleanNode('exclusive')->defaultFalse()->end()
                    ->booleanNode('auto_delete')->defaultFalse()->end()
                    ->booleanNode('nowait')->defaultFalse()->end()
                    ->arrayNode('arguments')->defaultNull()
                        ->prototype('array')
                            ->prototype('scalar')->end()
                        ->end()
                    ->end()
                    ->scalarNode('ticket')->defaultNull()->end()
                ->end()
            ->end();
    }

Results in

array(8) {
  ["name"]=>
  string(14) "search_indexer"
  ["arguments"]=>
  array(1) {
    [0]=>
    array(2) {
      [0]=>
      string(1) "S"
      [1]=>
      string(3) "all"
    }
  }
  ["passive"]=>
  bool(false)
  ["durable"]=>
  bool(true)
  ["exclusive"]=>
  bool(false)
  ["auto_delete"]=>
  bool(false)
  ["nowait"]=>
  bool(false)
  ["ticket"]=>
  NULL
}

Connect to RabbitMQ server even without using it

Hi,
I find that RabbitMqBundle will try to connect to the server if we declare a producer service.
When we are doing some functional test, we can see a lot of connection at the RabbitMQ server even these tests is not about RabbitMqBundle.

We are updating RabbitMQ to the latest master, and aware this new behavior.
Could we avoid this? I think this is not necessary.

Bundle INACTIVE/DEAD?

Hi @videlalvaro ,

Is this bundle/project considered inactive or dead? Questions/Issues are not answered and code was not maintained within the last 6 months.

Enrico

Consumer performance decreasing with increasing run time

I've run a simple consumer that does nothing but ackonwledges the messages. When run it consumes 30-40 messages per second, but then performance decreases constantly, dropping to 2-3 messages per second after 2 minutes. The process uses about 90MB of memory and it doesn't increase much during those 2 minutes. Any ideas what might be the reason? How can this solution be used for consumes supposed to run constantly, if performance drops that fast?

Another question is: when started, consumer fetches all messages from the queue (11k messages, all move from Ready to Unacked in RabbitMQ panel). When I run another consumer it has nothing to process, which makes it impossible to paralelize processing. Is there a way not to fetch all messages from the queue? I tried running it with "-m" parameter to limit number of messages to process, but it doesn't help.

Handle messages that can't be delivered

According to my experience, currently when a consumer fails to consume a message (the callback ends with fatal error, exception, etc), the message is still considered as delivered and is not restored.

Is there any workaround for this?

SF2 app with RabbitMqBundle failed when rabbitmq is stopped

Hi @videlalvaro,

there is an issue with the RabbitMqBundle and php-amqplib. Here the steps to reproduce the bug:

  1. bootstrap a Symfony2 app with "oldsound/rabbitmq-bundle": "dev-master" in composer.json
  2. don't install rabbitmq server or stop it.
  3. Try to display any page on the bootstrapped SF2 app with rabbitmq-bundle enabled

=> you will have the following exception:

ErrorException: Warning: stream_socket_client() [function.stream-socket-client]: unable to connect to tcp://vserver-jrouff.dev.sensio.net:5672 (Connection refused) in /Users/joseph.rouff/projects/insight/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php line 80

It would be cool that RabbitMqBundle fail silently without any exception (or just a warning on the datacollector icon).

Thanks for you answer :).

[RPC Server] Error in Tutorial example: Undefined index: routing_keys

Hi Alvaro,
when I configure RPC or Reply/Response as defined in the Usage section and try to start random_int server via command line, I give the following error:

[ErrorException]
Notice: Undefined index: routing_keys in /home/matteo/git_prj/sf21/vendor/oldsound/rabbitmq-bundle/OldSound/RabbitMqBundle/RabbitMq/BaseAmqp.php line 135

If I add the routing_keys parameter in the queueOptions array in the abstract class BaseAmqp:

protected $queueOptions = array(
    'name' => '',
    'passive' => false,
    'durable' => true,
    'exclusive' => false,
    'auto_delete' => false,
    'nowait' => false,
    'arguments' => null,
    'ticket' => null,
    'routing_keys'=> null
);

Then run succesfully.

I missing some configuration or is a bug?

Thanks in advice
Matteo

Producers Not Declaring Queue

Great work on this, it has saved us tons of time.

One thing however... In my somewhat brief exposure to the AMQP world, I find it is usually recommended to create your exchange and queues on the sending as well as the receiving side. This prevents messages from never reaching the exchange/queue due to it being deleted etc...

We are pushing into a queue that is talking to a log server. The server will create the queue, but it would be good if that server were to go down and the queue was to somehow get deleted, we could have assurance that it would recreate and continue sending.

We are working on a pull for this but wanted to find out if there was a rationale behind it.

[Feature request] Support for configuring with an amqp uri

It would be great if the config could accept an amqp uri and convert it internally to extract each setting. This would make it easier to use the bundle with hosted rabbitmq providing an amqp uri with the settings (for instance, CloudAMQP sets the uri directly as an environment variable on heroku)

get channel

Hello, is there any easy way to get the channel from inside a consumer interface?

example, you have this consumer:

<?php

//src/Sensio/HelloBundle/Consumer/UploadPictureConsumer.php

namespace Sensio\HelloBundle\Consumer;

use Symfony\Component\DependencyInjection\ContainerAware;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;

class UploadPictureConsumer extends ContainerAware implements ConsumerInterface
{
    public function execute($msg)
    {
        //Process picture upload. 
        //$msg will be what was published from the Controller.
    }
}

what I want to do is something like this:

<?php

//src/Sensio/HelloBundle/Consumer/UploadPictureConsumer.php

namespace Sensio\HelloBundle\Consumer;

use Symfony\Component\DependencyInjection\ContainerAware;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;

class UploadPictureConsumer extends ContainerAware implements ConsumerInterface
{
    public function execute($msg)
    {
        // Get the channel
        $channel = $this->getChannel();
    }
}

Should I code it by myself or there is a way to do it currently cuz from what I see from the interface there isn't any way to do it

Can not set 'x-message-ttl' in arguments in queue_options

Following the example here:

queue_options: {name: 'upload-picture', arguments: {'x-ha-policy': ['S', 'all']}}

I'm trying to do:

queue_options: {name: 'upload-picture', arguments: {'x-message-ttl': 3600000}}

as I'm trying to set the message ttl for one hour

Using routing_keys parameter

Hello there,
If I try add routing_keys parameter into config.yml like this:

!#yaml
old_sound_rabbit_mq:
    connections:
        default:
            host:      'localhost'
            port:      5672
            user:      'guest'
            password:  'guest'
            vhost:     '/'
    producers:
        twitter_metrics:
            connection: default
            exchange_options: {name: 'social-networks', type: topic}
    consumers:
        twitter_metrics:
            connection: default
            exchange_options:
                name: 'social-networks'
                type: topic
            queue_options:
                name: 'twitter-metrics-queue'
                routing_keys:
                  - 'metrics.twitter'
            callback: twitter_metrics_service

I tried add routing_keys parameter into producers too and variants only in consumers and producers. But when I try to use symfony2 console, I get this:

[Symfony\Component\Config\Definition\Exception\InvalidConfigurationException]
Unrecognized options "routing_keys" under "old_sound_rabbit_mq.consumers.twitter_metrics.queue_options"

What I'm doing wrong?

If this issue isn't suitable here, please delete it and asnwer at stackoverflow

Symfony 2.2 Compatibility

Trying to upgrade an app to 2.2, but this bundle is still requiring 2.1. Are there any known issues with 2.2?

Not register as a service ??

Hi,

I recently create one consumer and it's fully works even if it's not declare as a Symfony service.
According to what I understand of your code, that sounds logic but if I read the manual it's not since :
"Keep in mind that your callbacks need to be registered as normal Symfony2 services."

My question is : Which one is wrong ? If none, what did I miss ?

Regards,

PxlCtzn.

timeout for RPC client + custom (de)serialization

This bundle is great and we are very happy with it.

Only thing we are missing at the moment are these 2 features:

Timeout for RPC client:
Our RPC client works fine when RPC server is listening but in case that RPC server is offline, client is blocking whole apache which is very unfortunate. I tried to build a timeout mechanism around getReplies() function like it is mentioned here: https://gist.github.com/3085581 but it is very unstable. It works first few times (throws timeout exception as we want) but then same old issue occurs where whole apache thread is blocked and no other web requests can be made.

Custom (de)serialization:
Now serialization and deserialization are both done using php native serialize command. We wish to have a custom setting for adding our own serialization (for use with JMSSerializerBundle for example https://github.com/schmittjoh/JMSSerializerBundle/blob/master/Resources/doc/index.rst)

pcntl signals processed only after message is handled

Hey @videlalvaro,

Great bundle. We use it in many of our projects.

Recently, we've had some issues with one of our consumers that doesn't receive messages that often. 1 per 30 minutes or so.

The issue we have is with handling the pcntl signals. Presently, handling signals happens after the next message is processed. When your script is processing messages at a rate of 1 per second, sending a SIGINT will only have you waiting 1 second before it's handled properly.

Now, in the case of 1 per 30 minutes, this is obviously a problem. We're specifically having the issue during deploys. We want to restart all consumers each deploy. Waiting 30 minutes really isn't an option.

Is there a way to have the following happen?

  • If a signal arrives while the consumer is waiting for the message, it just handles it immediately and close down.
  • If a signal arrives during the processing of one of these messages, it obviously should finish handling the message first, then handle the signal and close down.

I tried playing around with the bundle and the underlying lib, but the architecture seems to make this hard to accomplish.

Any ideas? I'm happy to write the code if there's a good solution without a ton of refactoring.

Receive fwrite error exception

Hi,
I wrote some producer/consumer commands in sf2, and when I excuted a certain producer command,
I continuously receive the following error exception:

[ErrorException]                                                                                                                                              
  Notice: fwrite(): send of 10 bytes failed with errno=11 Resource temporarily unavailable in /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php line 141  






PHP Fatal error:  Uncaught exception 'Exception' with message 'Error reading data. Recevived 0 instead of expected 1 bytes' in /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php:64
Stack trace:
#0 /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php(95): PhpAmqpLib\Wire\AMQPReader->rawread(1)
#1 /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php(260): PhpAmqpLib\Wire\AMQPReader->read_octet()
#2 /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php(280): PhpAmqpLib\Connection\AMQPConnection->wait_frame()#3 /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php(119): PhpAmqpLib\Connection\AMQPConnection->wait_channel(0)
#4 /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php(225): PhpAmqpLib\Channel\AbstractChannel->next_frame()#5 /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php(321): PhpAmq in /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php on line 64
Fatal error: Uncaught exception 'Exception' with message 'Error reading data. Recevived 0 instead of expected 1 bytes' in /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php:64 
Stack trace:
#0 /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php(95): PhpAmqpLib\Wire\AMQPReader->rawread(1) #1 /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php(260): PhpAmqpLib\Wire\AMQPReader->read_octet()
#2 /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php(280): PhpAmqpLib\Connection\AMQPConnection->wait_frame()
#3 /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php(119): PhpAmqpLib\Connection\AMQPConnection->wait_channel(0)
#4 /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php(225): PhpAmqpLib\Channel\AbstractChannel->next_frame()
#5 /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Connection/AMQPConnection.php(321): PhpAmq in /root/convert_src/vendor/videlalvaro/php-amqplib/PhpAmqpLib/Wire/AMQPReader.php on line 64

It would throw exception until it had published about 88000 or above messages,

Thanks in advanced.

Can't install with composer, wrong deps

There seems to be a problem with the name of the branches, I think they have changed. When I try to install with composer, I get this:

Problem 1
    - Installation request for oldsound/rabbitmq-bundle dev-master -> satisfiable by oldsound/rabbitmq-bundle dev-master.
    - oldsound/rabbitmq-bundle dev-master requires videlalvaro/php-amqplib dev-master -> no matching package found.

Container is not being passed into consumers

The documentation indicates that by declaring your consumer to extend ContainerAware, the consumer should have access to the DIC. In practice, however, this does not appear to work.

$this->container is always set to null, and defining the setContainer function:

public function setContainer(ContainerInterface $container = null) {
  die('Consumer container set');
  $this->container = $container;
}

the setContainer function is never called.

Is this an implementation problem with RabbitMQBundle or a symfony core issue?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.