Giter Club home page Giter Club logo

mojo-rabbitmq-client's People

Contributors

adamwill avatar christopherraa avatar coolo avatar gregoa avatar horshack avatar kraih avatar manwar avatar okurz avatar prg avatar sjn avatar spodjasek avatar tyldum avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

mojo-rabbitmq-client's Issues

Update documentation

  • add info about new Mojo::Promise based methods
  • update Client.pm with with new Publisher API example

Early success event from ->publish

This one took me quite some time to figure out. It appears that the object returned by $channel->publish emits the success event unconditionally here, even before the message frame has been written to the socket.

my $message = $channel->publish(
  exchange    => 'pubsub',
  routing_key => 'anonymous.info',
  body        => 'Hello World!',
);
$message->on(success => sub {

  # This actually prevents the message from even getting written to the socket
  Mojo::IOLoop->stop;
});
$message->on(error => sub { warn pop });
$message->deliver;

Of course i assumed i could stop the event loop once the success event had been emitted, which is not the case. That has to be a bug, right?

Implement blocking publisher API

Now when we use Promises I believe it would be very convinient to implement default publisher as blocking, when you can enforce non-blocking by using _p methods.
This change would allow us to make one-line publishing scripts:

Mojo::RabbitMQ::Client->publisher(url => 'amqp://.../?exchange=...&routing_key=test')
  ->publish('plain text');

Publisher documentation's examples are hard to parse

While trying to grok how this module works, I was very confused by the Publisher docs. Specifically, this bit:

"So this:

$publisher->publish($body, { header => 'content' });

can be also written like this:

$publisher->publish($body, header => { header => 'content' });

But beware - headers get merged, but params override values so when you write this:

$publisher->publish({ json => 'object' }, header => { header => 'content' });

message will lack C<content_type> header!"

The third example there is very confusing. The fact that the code in that example differs from the code in the previous example strongly implies that this difference is why the content_type header would be missing, and that the content_type header would not be missing in the second example. But IIUC, that is not true. In fact the content_type header would be present for the first example, but missing for both the second and third examples. It's missing because we're passing in header as one of the params, so that overrides the header constructed by the Publisher code (which includes the content_type), not because of the syntax used to construct the body - right?

If I'm right, it'd be great if this could be made clearer...

publish with mandatory but without routing_key leads to uninitialized warning

When a message is published to an exchange with mandatory => 1 but without any routing_key, because it's not a topic exchange, the following warning is emitted:

Use of uninitialized value in concatenation (.) or string at /home/ahartmai/.plenv/versions/5.34.0/lib/perl5/site_perl/5.34.0/Mojo/RabbitMQ/Client/Method/Publish.pm line 32.

This might also affect the functionality of the callback sub.

Unable to use consumer API

It might just be the lack of documentation, but i've been unable to replicate this manually written consumer with the Mojo::RabbitMQ::Client::Consumer API. But just in case there's more to it, i figured it might be worth opening an issue. Otherwise please consider this a request for more documentation.

hard to debug connection failure

I'm struggling to debug a Mojo::IOLoop based daemon issue that manifests itself with the follow two warnings and the following exception:

Use of uninitialized value $exp in concatenation (.) or string at /home/nac/.plenv/versions/COLLECTOR-Debian9-Perl5.28.0/lib/perl5/site_perl/5.28.0/Mojo/RabbitMQ/Client.pm line 545.
Use of uninitialized value $expected[0] in join or string at /home/nac/.plenv/versions/COLLECTOR-Debian9-Perl5.28.0/lib/perl5/site_perl/5.28.0/Mojo/RabbitMQ/Client.pm line 549.
Mojo::Reactor::Poll: I/O watcher failed: Mojo::RabbitMQ::Client::Method: Method is not . It's Net::AMQP::Protocol::Channel::OpenOk at /home/nac/.plenv/versions/COLLECTOR-Debian9-Perl5.28.0/lib/perl5/site_perl/5.28.0/Mojo/EventEmitter.pm line 19

Something passes undef as the expected frame(s) to sub _expect. As it only happens once every few weeks I haven't been able to collect DEBUG logs from the module.

Did you encounter this issue before or have a suggestion how to debug it?
Thanks!

Channel is closed in forked childs

We're using openQA Mojo subprocess()es to handle longer running commands. And it appears that every subprocess inherits the rabbitmq channel, which is then DESTROYed on ioloop::reset.

So it sounds like the channel needs to be fork aware and only disconnect when it's DESTROYed in the same process it was created in.

See https://progress.opensuse.org/issues/40652#note-2 or a more detailed backtrace.

Mojo::RabbitMQ::Client gets destroyed too soon using morbo

Hello

When starting Mojolicious::Lite app with morbo (code from chat.pl) Client object gets destroyed right after start
Starting AMQP
Starting app
-- Emit connect in Mojo::IOLoop::Client (1)
-- Emit connect in Mojo::RabbitMQ::Client (1)
Connect
-> "AMQP\1\1\0\t"
Destroying client at /usr/local/lib/perl5/site_perl/Mojo/RabbitMQ/Client.pm line 605.
Mojo::RabbitMQ::Client::DESTROY(Mojo::RabbitMQ::Client=HASH(0x8080b0378)) called at /usr/local/lib/perl5/site_perl/Mojo/Reactor/Poll.pm line 143
eval {...} called at /usr/local/lib/perl5/site_perl/Mojo/Reactor/Poll.pm line 143
eval {...} called at /usr/local/lib/perl5/site_perl/Mojo/Reactor/Poll.pm line 143
Mojo::Reactor::Poll::_try(Mojo::Reactor::EV=HASH(0x80629e480), "I/O watcher", "*Mojo::IOLoop::Stream::new", 1) called at /usr/local/lib/perl5/site_perl/Mojo/Reactor/EV.pm line 49
Mojo::Reactor::EV::ANON(EV::IO=SCALAR(0x80809a708), 2) called at /usr/local/lib/perl5/site_perl/Mojo/Reactor/EV.pm line 25
eval {...} called at /usr/local/lib/perl5/site_perl/Mojo/Reactor/EV.pm line 25
Mojo::Reactor::EV::start(Mojo::Reactor::EV=HASH(0x80629e480)) called at /usr/local/lib/perl5/site_perl/Mojo/IOLoop.pm line 140
Mojo::IOLoop::start(Mojo::IOLoop=HASH(0x8061f1b70)) called at /usr/local/lib/perl5/site_perl/Mojo/Server/Daemon.pm line 43
Mojo::Server::Daemon::run(Mojo::Server::Daemon=HASH(0x803186258)) called at /usr/local/lib/perl5/site_perl/Mojo/Server/Morbo.pm line 69
Mojo::Server::Morbo::_spawn(Mojo::Server::Morbo=HASH(0x8020e2528)) called at /usr/local/lib/perl5/site_perl/Mojo/Server/Morbo.pm line 54
Mojo::Server::Morbo::_manage(Mojo::Server::Morbo=HASH(0x8020e2528)) called at /usr/local/lib/perl5/site_perl/Mojo/Server/Morbo.pm line 34
Mojo::Server::Morbo::run(Mojo::Server::Morbo=HASH(0x8020e2528), "./BinanceInfo") called at /usr/local/bin/morbo line 19
-- Emit write in Mojo::IOLoop::Stream (0)
-- Emit drain in Mojo::IOLoop::Stream (0)
-- Emit read in Mojo::IOLoop::Stream (1)
-- Emit error in Mojo::Reactor::EV (1)
Mojo::Reactor::EV: I/O watcher failed: Can't call method "_read" on an undefined value at /usr/local/lib/perl5/site_perl/Mojo/RabbitMQ/Client.pm line 386.

Publisher API

AFAICS, the routing key is derived from the ?queue query-param.
This means you need one publisher per routing key, while my design has a very dynamic routing key that is unlikely to repeat more than a few times per week.

I would like to introduce the routing key to publish(). I can create a PR, but I was debating with myself over semantics:

->publish('body',%headers,'routing_key') vs
->publish('body', { headers => %headers, routing_key => 'some.routing.key' })
The second one opens up for more flexibility for other options needed.

Body larger than frame_max not split into multiple frames

Great module!!.

When sending a body frame larger than the default frame_max value for a standard RabbitMQ installation (i.e. 131072) the following error is returned in the MQ logs operation none caused a connection exception frame_error: "type 3, all octets = <<>>: {frame_too_large,402712,131064}".

AnyEvent::RabbitMQ::Channel line starting 521 implements this feature. Wondering if the same could be done for this Mojo client.

Problem with heartbeats

This fairly simple script is supposed to run forever and print all the messages it receives.

use Mojo::Base -strict;
use Mojo::RabbitMQ::Client;

my $client = Mojo::RabbitMQ::Client->new(url => 'amqp://...:5672');
$client->on(error => sub { die pop });
$client->on(
  open => sub {
    my $client = shift;
    say 'Connected';
    my $channel = Mojo::RabbitMQ::Client::Channel->new();
    $channel->on(error => sub { die pop });
    $channel->on(
      open => sub {
        my $channel = shift;
        say 'Channel open';

        # Exchange
        my $exchange = $channel->declare_exchange(
          exchange => 'pubsub',
          type     => 'topic',
          passive  => 1,
          durable  => 1
        );
        $exchange->on(
          success => sub {
            say 'Declared exchange';

            # Declare queue
            my $queue = $channel->declare_queue(exclusive => 1);
            $queue->on(
              success => sub {
                my $result = pop;
                say 'Declared queue';

                # Bind queue
                my $queue_name = $result->method_frame->{queue};
                my $queue      = $channel->bind_queue(
                  exchange    => 'pubsub',
                  queue       => $queue_name,
                  routing_key => '#'
                );
                $queue->on(
                  success => sub {
                    say 'Bound queue';

                    # Consume messages
                    my $consumer
                      = $channel->consume(queue => $queue_name, no_ack => 1);
                    $consumer->on(
                      message => sub {
                        my $message = pop->{body}->to_raw_payload;
                        say $message;
                      }
                    );
                    $consumer->deliver;
                  }
                );
                $queue->deliver;
              }
            );
            $queue->deliver;
          }
        );
        $exchange->deliver;
      }
    );
    $client->open_channel($channel);
  }
);
$client->connect;

Mojo::IOLoop->start;

Which it does for a few seconds, then it produces this.

Mojo::Reactor::Poll: Timer failed: Can't call method "isa" on an undefined value at /usr/lib/perl5/vendor_perl/5.26.1/Mojo/RabbitMQ/Client.pm line 276.

And a few seconds later.

Mojo::Reactor::Poll: Timer failed: Consumer callback failure: Received data is not body frame at test.pl line 13.
Mojo::Reactor::Poll: Timer failed: Consumer callback failure: Received data is not header frame at test.pl line 13.
Mojo::Reactor::Poll: Timer failed: Consumer callback failure: Received data is not body frame at test.pl line 13.
Mojo::Reactor::Poll: Timer failed: Consumer callback failure: Received data is not header frame at test.pl line 13.
...

I was using Mojolicious 7.58 and Mojo::RabbitMQ::Client 0.0.9 for this experiment. Is there something obvious i was doing wrong, or have i stumbled over a bug?

Publisher does not appear to work at all

I've tried the minimal example from the documentation.

use Mojo::Base -strict;

use Mojo::RabbitMQ::Client;
my $publisher
  = Mojo::RabbitMQ::Client->publisher(url => 'amqp://.../?exchange=pubsub&queue=test');
$publisher->on(success => sub { say "Publisher ready" });
$publisher->on(error => sub { say 'Error: ', pop });
$publisher->publish('plain text');
$publisher->publish({encode => {to => 'json'}});

Mojo::IOLoop->start unless Mojo::IOLoop->is_running;

And all i got was this.

Mojo::Reactor::Poll: I/O watcher failed: Can't call method "_read" on an undefined value at /usr/lib/perl5/vendor_perl/5.26.1/Mojo/RabbitMQ/Client.pm line 344.
Mojo::Reactor::Poll: Timer failed: Can't use an undefined value as a HASH reference at /usr/lib/perl5/vendor_perl/5.26.1/Mojo/RabbitMQ/Client/Publisher.pm line 40.

Making Mojo::RabbitMQ::Client::Consumer use promises

Following up on the talk on IRC I've been looking at the Consumer and think it would be cool if Mojo::RabbitMQ::Client::Consumer also would use promises now that the publisher does so.

Locally I've made the consumer use promises. In the process of cleaning the code up now. Creating this issue to let anyone interested know that I'm on it. Will create a PR once it is cleaned up.

I will definitly need some help in writing good tests for the new code. Promises are still new to me.

Proper implementation of AMQP URI specs

As pointed out in #8 uri handling code needs to be compatible with RabbitMQ URI Specification at: https://www.rabbitmq.com/uri-spec.html.

Properly handle those test examples:

URI Username Password Host Port Vhost
amqp://user:pass@host:10000/vhost "user" "pass" "host" 10000 "vhost"
amqp://user%61:%61pass@ho%61st:10000/v%2fhost "usera" "apass" "hoast" 10000 "v/host"
amqp://
amqp://:@/ "" "" ""
amqp://user@ "user"
amqp://user:pass@ "user" "pass"
amqp://host "host"
amqp://:10000 10000
amqp:///vhost "vhost"
amqp://host/ "host" ""
amqp://host/%2f "host" "/"
amqp://[::1] "[::1]" (i.e. the IPv6 address ::1)

'weaken $self' leads to undefined errors

The 'weaken $self' on line 32 leads to messages being rejected with:

Can't call method "url" on an undefined value at local/lib/perl5/Mojo/RabbitMQ/Client/Publisher.pm line 39.

This happens when logging events from requests to a Mojolicious application, but not in standalone tests. I've not yet discovered why.

Method API

I'm still reading up on the code, so maybe i'm mistaken, but is there a reason for why methods like Mojo::RabbitMQ::Client::Channel::declare_exchange have to return event emitting objects (Mojo::RabbitMQ::Client::Method)?

  my $exchange = $channel->declare_exchange(
    exchange => 'mojo',
    type => 'fanout'
  );
  $exchange->on(success => sub {
    my ($exchange, $frame) = @_;
    ...
  });
  $exchange->on(error => sub {
    my ($exchange, $err) = @_;
    ...
  });
  $exchange->deliver;

As far as i can see they usually just emit two events (success/error), and those just once. Wouldn't it make sense to use one callback and/or a promise? Seems quite a bit more efficient.

# Callback
$channel->declare_exchange((exchange => 'mojo', type => 'fanout') => sub {
  my ($err, $frame) = @_;
  die $err if $err;
  ...
});

# Promise
$channel->declare_exchange_p(exchange => 'mojo', type => 'fanout')->then(sub {
  my $frame = shift;
  ...
})->catch(sub {
  my $err = shift;
  ...
});

RMQ message not sent intermittently

We have been trying to use Mojo::RabbitMQ::Client::Publisher for our tool and have been trying to test it for its robustness and found that it does not send message like 1 out of 15 messages when we tried to send messages from different machines. However, the then() part is executed.

$publisher->publish_p($payload, routing_key => $routing_key)->then(sub {...})->catch(sub{...})->wait;

The catch routine does not execute on this intermittent failure.

How to reject a message and I'd like to contribute

Hi, I struggled for the past couple of deas getting a dead-letter-exchange to be created, but now I'm unable to use it.

I can ack a message when I set no_ack to false. Note this is in an on-message callback of a channel:

$consumer->channel->ack($message)->deliver;

Though when I reject a message like this to forward it to the dead-letter-exchange:

$consumer->channel->reject(requeue => 0)->deliver;

I get:

[2022-07-28 07:32:08.44986] [134610] [info] RabbitMQ[#1] Consumer ready
[2022-07-28 07:32:08.45701] [134610] [debug] RabbitMQ: received message
-> "\1\0\1\0\0\0\r\0<\0Z\0\0\0\0\0\0\0\0\0\316"
<- "\1\0\1\0\0\0007\0\24\0(\1\226,PRECONDITION_FAILED - unknown delivery tag 0\0<\0Z\316"
-> "\1\0\1\0\0\0\4\0\24\0)\316"
[2022-07-28 07:32:08.51695] [134610] [warn] RabbitMQ[#1] Channel closed

So, after digging more around I found out that I need to set multiple => 0, so this works:

$consumer->channel->reject(delivery_tag => 1, multiple => 0, requeue => 0)->deliver;

To spare others from going through this rabbit hole (pun intended), I'd like to contribute to the documentation.
Can you please give me a commit bit here on gitlab and perhaps co-maint on pause? My PAUSE-ID is FROGGS.

Circular includes

In some of the modules in this dist there are circular includes. Specifically this is due to the user of use in Mojo::RabbitMQ::Client, Mojo::RabbitMQ::Client::Consumer and Mojo::RabbitMQ::Client::Publisher;

$ for file in $( find lib/ -type f ); do perl -Ilib -c $file; done
Subroutine connect redefined at lib/Mojo/RabbitMQ/Client.pm line 47.
Subroutine connect_p redefined at lib/Mojo/RabbitMQ/Client.pm line 58.
Subroutine consumer redefined at lib/Mojo/RabbitMQ/Client.pm line 80.
Subroutine publisher redefined at lib/Mojo/RabbitMQ/Client.pm line 87.
Subroutine param redefined at lib/Mojo/RabbitMQ/Client.pm line 94.
Subroutine add_channel redefined at lib/Mojo/RabbitMQ/Client.pm line 100.
Subroutine acquire_channel_p redefined at lib/Mojo/RabbitMQ/Client.pm line 132.
Subroutine open_channel redefined at lib/Mojo/RabbitMQ/Client.pm line 147.
Subroutine delete_channel redefined at lib/Mojo/RabbitMQ/Client.pm line 159.
Subroutine close redefined at lib/Mojo/RabbitMQ/Client.pm line 164.
Subroutine _loop redefined at lib/Mojo/RabbitMQ/Client.pm line 181.
Subroutine _error redefined at lib/Mojo/RabbitMQ/Client.pm line 183.
Subroutine _uri_handler redefined at lib/Mojo/RabbitMQ/Client.pm line 189.
Subroutine _close redefined at lib/Mojo/RabbitMQ/Client.pm line 261.
Subroutine _handle redefined at lib/Mojo/RabbitMQ/Client.pm line 266.
Subroutine _read redefined at lib/Mojo/RabbitMQ/Client.pm line 274.
Subroutine _parse_frames redefined at lib/Mojo/RabbitMQ/Client.pm line 308.
Subroutine _connect redefined at lib/Mojo/RabbitMQ/Client.pm line 352.
Subroutine _connected redefined at lib/Mojo/RabbitMQ/Client.pm line 392.
Subroutine _tune redefined at lib/Mojo/RabbitMQ/Client.pm line 451.
Subroutine _write_expect redefined at lib/Mojo/RabbitMQ/Client.pm line 511.
Subroutine _expect redefined at lib/Mojo/RabbitMQ/Client.pm line 530.
Subroutine _write_frame redefined at lib/Mojo/RabbitMQ/Client.pm line 575.
Subroutine _write redefined at lib/Mojo/RabbitMQ/Client.pm line 588.
Subroutine DESTROY redefined at lib/Mojo/RabbitMQ/Client.pm line 601.
lib/Mojo/RabbitMQ/Client.pm syntax OK
lib/Mojo/RabbitMQ/Client/Method.pm syntax OK
lib/Mojo/RabbitMQ/Client/Method/Publish.pm syntax OK
lib/Mojo/RabbitMQ/Client/Publisher.pm syntax OK
lib/Mojo/RabbitMQ/Client/Consumer.pm syntax OK
lib/Mojo/RabbitMQ/Client/LocalQueue.pm syntax OK
lib/Mojo/RabbitMQ/Client/Channel.pm syntax OK

The code works as intended but any application that does linting, syntax checking etc will complain. An example of this is IDE's where the message would appear as an issue to be adressed. Example screenshot from Emacs:

2018-01-20-091627_597x125_scrot

repo and tarballs include non-free file without notification

The top-level LICENSE file is the Artistic License, and the top-level README.md file just says "This program is free software, you can redistribute it and/or modify it under the terms of the Artistic License version 2.0."

However, both the repo and the tarball distributions include two files that are not under the Artistic License - the AMQP specs in share/. The 0.8 spec is not freely licensed at all, which is a problem for distributions with strict licensing policies: in Fedora we actually have to ship a modified tarball with this file stripped out. The 0.9.1 extended spec is freely licensed, but not under the Artistic license - it is under a BSD-style license.

It'd be ideal for Fedora if the 0.8 spec was dropped and replaced with simply a reference to a URL in the README or something, as then we wouldn't have to modify the tarball every time. For the 0.9.1 extended spec, it'd be useful to add a line to the README explaining it is not under Artistic.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.