Giter Club home page Giter Club logo

rabbitmq-stomp's Introduction

RabbitMQ STOMP adapter

This repository has been moved to the main unified RabbitMQ "monorepo", including all open issues. You can find the source under /deps/rabbitmq_stomp. All issues have been transferred.

Overview

The STOMP adapter is included in the RabbitMQ distribution. To enable it, use rabbitmq-plugins:

rabbitmq-plugins enable rabbitmq_stomp

Supported STOMP Versions

1.0 through 1.2.

Documentation

RabbitMQ STOMP plugin documentation.

Continuous Integration

Build Status

rabbitmq-stomp's People

Contributors

acogoluegnes avatar dcorbacho avatar dpw avatar dumbbell avatar essen avatar fenollp avatar gerhard avatar gmr avatar hairyhum avatar invertedacceleration avatar kjnilsson avatar lukebakken avatar michaelklishin avatar priviterag avatar rade avatar samnela avatar spring-operator avatar tonyg avatar userlocalhost avatar videlalvaro avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rabbitmq-stomp's Issues

Parsing frames with non-ascii characters fails

Seems like the rabbitmq-stomp plugin is not handling non-ascii characters very well.Using this code from a browser will crash the plugin, resulting in a silent failure on the client side, no message in the queue, and a crash report in the rabbitMQ logs (see below code).

 var ws = new SockJS('http://127.0.0.1:15674/stomp');
    var client = Stomp.over(ws);
/*
    Once you have the client object you can follow API's exposed by stomp.js library. The next step is usually to establish a STOMP connection with the broker:
*/

    // SockJS does not support heart-beat: disable heart-beats
    client.heartbeat.outgoing = 0;
    client.heartbeat.incoming = 0;
    client.debug = function() {
        if (window.console && console.log && console.log.apply) {
            console.log.apply(console, arguments);
        }
    };

    var on_connect = function() {
        console.log('connected', arguments);

        var id = client.subscribe('measurements', function(d) {
            console.debug("Got measurement", d);
        });
        var msg = JSON.stringify({ message: "Þetta er íslenska sem fer illa í rabbitmq-stomp" });
        console.log("msg", msg);
        client.send("crashqueue", {"content-type":"text/json"}, msg);

    };
    var on_error =  function() {
        console.log('error', arguments);
    };
    client.connect('guest', 'guest', on_connect, on_error, '/');

Error log entry from rabbit.log:

=ERROR REPORT==== 9-Jun-2015::13:24:01 ===
** Generic server <0.5801.0> terminating 
** Last message in was {'$gen_cast',
                           {sockjs_msg,
                               <<83,69,78,68,10,99,111,110,116,101,110,116,
                                 45,116,121,112,101,58,116,101,120,116,47,
                                 106,115,111,110,10,100,101,115,116,105,110,
                                 97,116,105,111,110,58,99,114,97,115,104,113,
                                 117,101,117,101,10,99,111,110,116,101,110,
                                 116,45,108,101,110,103,116,104,58,54,49,10,
                                 10,123,34,109,101,115,115,97,103,101,34,58,
                                 34,195,158,101,116,116,97,32,101,114,32,195,
                                 173,115,108,101,110,115,107,97,32,115,101,
                                 109,32,102,101,114,32,105,108,108,97,32,195,
                                 173,32,114,97,98,98,105,116,109,113,45,115,
                                 116,111,109,112,34,125,0>>}}
** When Server state == {state,
                            {sockjs_session,
                                {<0.5798.0>,
                                 [{peername,{{127,0,0,1},50344}},
                                  {sockname,{{127,0,0,1},15674}},
                                  {path,"/stomp/905/oxauey2o/websocket"},
                                  {headers,[]}]}},
                            <0.5800.0>,
                            {resume,#Fun<rabbit_stomp_frame.0.85562437>}}
** Reason for termination == 
** {{case_clause,{{badmatch,<<123,34,109,101,115,115,97,103,101,34,58,34,195,
                              158,101,116,116,97,32,101,114,32,195,173,115,
                              108,101,110,115,107,97,32,115,101,109,32,102,
                              101,114,32,105,108,108,97,32,195,173,32,114,97,
                              98,98,105,116,109,113,45,115,116,111,109,112,34,
                              125,0>>},
                  [{rabbit_stomp_frame,parse_body2,4,[]},
                   {rabbit_ws_client,process_received_bytes,3,[]},
                   {rabbit_ws_client,handle_cast,2,[]},
                   {gen_server,try_dispatch,4,
                               [{file,"gen_server.erl"},{line,593}]},
                   {gen_server,handle_msg,5,
                               [{file,"gen_server.erl"},{line,659}]},
                   {proc_lib,init_p_do_apply,3,
                             [{file,"proc_lib.erl"},{line,237}]}]}},
    [{rabbit_ws_client,terminate,2,[]},
     {gen_server,try_terminate,3,[{file,"gen_server.erl"},{line,621}]},
     {gen_server,terminate,7,[{file,"gen_server.erl"},{line,787}]},
     {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,237}]}]}

Unable to connect to the stomp server on Windows

Hello,
i'm using the windows version (on windows 10) and when i connect to the stomp server through the opened port (61613 which is the default) i dont have any answer, but the port is opened.
I've set {default_user, [{login, "guest"},
{passcode, "guest"}]},
and {implicit_connect, true}

In my config but still: ERR_EMPTY_RESPONSE
I dont have any errors btw.. how can i fix this ?

Thanks.

Temporary destinations do not support client acknowledgment

Normal (non-temporary) destinations can specify the acknowledgment mode with a header in the SUBSCRIBE frame.

With temporary destinations, sending a SUBSCRIBE frame results in an error message like:
Invalid destination '/temp-queue/foo' is not a valid destination for 'SUBSCRIBE'

RabbitMQ STOMP plugin supports auto, client, and client-individual subscription headers. But since there is no subscribe frame sent for a temporary destination, there is no way to set a acknowledgment mode.

As per RabbitMQ docs, the internal subscription for a temporary queue uses auto-ack mode. With auto-ack, messages are treated as acknowledged as soon as they sent. Is there a way to minimize the risk of message loss for unprocessed messages?

Add support for consumer priority (x-priority header)

Hi,
According to https://www.rabbitmq.com/consumer-priority.html
it should be possible to pass the "x-priority" header in the subscribe method, however this seems to have no effect on the consumer definition when using the Stomp adapter (over Websocket) https://www.rabbitmq.com/web-stomp.html.

While attempting with the Java lib, we can see that it works (see attachment).
image

Other args for the queue like "x-message-ttl" work fine.

Rabbit mq version:
3.6.14 Erlang 19.2.3
More detail can be found in the following thread:
discussion in the mailing list

Regards,

Upgrade stomp.py to 4.x

Our version is grossly out of date and has to be patched because it uses SSLv3. We need to upgrade before we can improve our test suite.

Standalone build fails with `behaviour ranch_protocol undefined`

It seems there is a dependency to ranch that causes a standard plugin build to fail (in both rabbit-stomp and rabbit-mqtt) during build:

$ git clone https://github.com/rabbitmq/rabbitmq-stomp/rabbitmq-stomp.git && cd rabbitmq-stomp && make
[...]
...error is: compile: warnings being treated as errors21:33:31
src/rabbit_stomp_connection_sup.erl:20: behaviour ranch_protocol undefined21:33:41
erlang.mk:4971: recipe for target 'ebin/rabbitmq_stomp.app' failed21:33:59
make[1]: *** [ebin/rabbitmq_stomp.app] Error 1

Adding a newline char after the null char terminating each STOMP Frame creates problems

Starting from RabbitMQ version 3.5.4 you have started to add a newline char after the null char that terminates each STOMP Frame, so /0 has become /0/n.
This is creating huge problems for the thousands of embedded IoT devices we have connected to our RabbitMQ Cluster, and have been connected for years now using STOMP.
The firmware running in our devices are not able to handle this extra newline char which means that we are effectively not able to upgrade RabbitMQ beyond ver. 3.5.3.
Can I ask you to make this addition of a newline char configurable in the stomp section of the rabbitmq.config file in future versions of RabbitMQ, allowing us to configure whether we want the newline char added or not to the end of each STOMP Frame.
This will be a tremendous help to us.
Best Regards
Lars Ellebo
Schneider Electric

Consider emitting stats for connections unconditionally

This makes it much easier to reason about connection state/blocking via management UI and/or monitoring tools: we don't sacrifice accuracy.

The downside is that we emit a bit more stats when a lot of connections are blocked. But now that the new parallel event collector is merged, this sounds like a reasonable trade-off.

Using term_to_binary to generate queue names is not forward-compatible

In OTP-20 term_to_binary has changed atom encoding without a way to generate older version.
term_to_binary hash is used in automatic queue name generation.
Although, queue name generation does not use atoms, so binaries generated in 20 are not different from 19.3.

It still makes sense to not rely on term_to_binary hashes, because it can change in future versions.

The problem here is that it's used to generate queue names and renaming queues even in upgrade functions can be hard and cause bugs.
As an option, we can generate a binary, which will be the same as term_to_binary output.

Refactoring: copy reader/processor responsibility split from MQTT

The STOMP plugin is older than MQTT. We have a better thought out reader/processor responsibility split in the MQTT plugin, STOMP should follow it:

  • Reader becomes a gen_server2.
  • Processor is just a module, not a process.

Having identical structure for both protocols would help reduce maintenance down the road.

Labeling as effort-medium because I feel it can take more than a few days (but not a few weeks).

Notify reductions, garbage_collection, recv_oct and send_oct in the core metrics

Several STOMP metrics are missing in the management database. recv_oct, send_oct are notified using rabbit_core_metrics:connection_stats/2 which treats them as raw metrics and doesn't calculate stats. rabbit_core_metrics:connection_stats/4 is needed to convert them to calculated stats. reductions and garbage_collection info are also missing.

Stats should be notified as in rabbit_reader.erl

The same as rabbitmq/rabbitmq-mqtt#121

Hide server information header

When STOMP server accepts connection it sends information about software and version back to a user. We'd like to be able to have parameter to disable sending this information (for security reasons).

a lot of cryptic error reports on busy rabbitmq serving stomp connections

I'm trying to understand error messages I'm seeing in the logs of a single node rabbitmq server that is solely used by stomp clients.
The rabbitmq server runs on rhel6 with version 3.6.0 and erlang 17.5. There are 16k stomp clients (mcollective/stomp gem 1.2.16) connected to the server.
I'm seeing a high rate of error reports of the following type:

=ERROR REPORT==== 3-Feb-2016::10:31:00 ===
** Generic server <0.582.70> terminating
** Last message in was {'EXIT',<0.574.70>,
                        {function_clause,
                         [{rabbit_stomp_reader,log_reason,
                           [{stomp_unexpected_cast,client_timeout},
                            {reader_state,#Port<0.430455>,
[...]
** Reason for termination ==
** {function_clause,
       [{rabbit_stomp_reader,log_reason,
            [{stomp_unexpected_cast,client_timeout},
[...]

Full error: https://gist.github.com/databus23/20459d571ad5922ab84a

I'm still trying to find a way to reproduce the error reliably so that I can get a tcpdump of the connection recorded but no luck so far.

I'm hoping somebody could give me a hint what this error means and what might be the problem.

In the meantime I'm trying to gather further information.

Unexpected char in header, : but its ok! read on

If I have a header variable called

selector:JMSCorrelationID='instance:1'

Because my variable is in quotes, it should be ok.

I think the stomp interpreter should OK this, as it works in ActriveMQ and Apollo, and those servers don't spit the dummy by terminating the client.

Thoughts any wise men?

Force heartbeat

I have the problem that some clients don´t set heartbeat headers.
The result are hundreds of dead connections on my rabbitmq server.

A feature for "forcing" a min/max heartbeat from the client would be great, that I can set some default heartbeat (1 hour or whatever) that the connections get cleaned up after that.

Any other idea how to deal with that problem?

Stomp connections get terminated by server but not cleaned up

Background

we have been running a large mcollective installation and are having problems with client connections getting terminated but never disappearing from the rabbitmq connection table
resulting in a lot of "zombie" connections which sooner or later bring down the rabbitmq server
due to resource exhaustion. This is to follow up the conversation started at:

Google Groups

After a longer investigation we figured out that the connections are actually not lost due to WAN link problems but are terminated by the server but never cleaned out.

Current Setup

OS: RHEL 6.6

Erlang: 17.5-1.el6.x86_64

Rabbitmq: rabbitmq-server-3.5.1-1

Problematic Behaviour

When a stomp message to the server results in the folowing:

STOMP detected network error on <0.11074.0> (10.97.6.171:56107 -> 10.67.76.191:61613):
{badmatch,<<"\n">>}

The Connection is closed by the server but never cleaned out of the connection table
and also the subscriptions from that client connection are still active.

Recorded Case

I tried to provide as much detail as i could from an example connection, so i performed a tcpdump on the running system from connection setup until the error:

State after Connection

Log:

[email protected]:accepting STOMP connection <0.11074.0> (10.97.6.171:56107 -> 10.67.76.191:61613)

netstat -an:

tcp        0      0 ::ffff:10.67.76.191:61613   ::ffff:10.97.6.171:56107    ESTABLISHED 

rabbitmqctl report:

<[email protected]> 10.97.6.171:56107 -> 10.67.76.191:61613 (1) <[email protected]> 1   guest   /mcollective    false   false   15  0   0   0   0   0   0   running
<[email protected]> stomp-subscription-13Ls1dCVpk0y5QTBuf9DxQ   false   true    []  <[email protected]>             0   0   0   1   1.0 22200               running 0   0   0   0   0   0   0   0   0   0   0   [{q1,0}, {q2,0}, {delta,{delta,undefined,0,undefined}}, {q3,0}, {q4,0}, {len,0}, {target_ram_count,infinity}, {next_seq_id,0}, {avg_ingress_rate,0.0}, {avg_egress_rate,0.0}, {avg_ack_ingress_rate,0.0}, {avg_ack_egress_rate,0.0}]

erlang process state:

<8873.12155.0> amqp_gen_connection:       0     891    7088       0 gen_server:loop/6

Client Server Conversation:

Payload:

CONNECT
accept-version:1.1,1.0
host:/mcollective
heart-beat:60500,59500
login:guest
passcode:guest
content-length:0
content-type:text/plain; charset=UTF-8

CONNECTED
session:session-0meFIa1iLkL54Y9pV8VSag
heart-beat:59000,60000
server:RabbitMQ/3.5.1
version:1.1

SUBSCRIBE
reply-to:/temp-queue/mcollective_reply_terminal
destination:/exchange/mcollective_broadcast/terminal
id:mcollective_broadcast_terminal
content-length:0
content-type:text/plain; charset=UTF-8

SUBSCRIBE
reply-to:/temp-queue/mcollective_reply_package
destination:/exchange/mcollective_broadcast/package
id:mcollective_broadcast_package
content-length:0
content-type:text/plain; charset=UTF-8

SUBSCRIBE
reply-to:/temp-queue/mcollective_reply_service
destination:/exchange/mcollective_broadcast/service
id:mcollective_broadcast_service
content-length:0
content-type:text/plain; charset=UTF-8

SUBSCRIBE
reply-to:/temp-queue/mcollective_reply_firewall
destination:/exchange/mcollective_broadcast/firewall
id:mcollective_broadcast_firewall
content-length:0
content-type:text/plain; charset=UTF-8

SUBSCRIBE
reply-to:/temp-queue/mcollective_reply_agent
destination:/exchange/mcollective_broadcast/agent
id:mcollective_broadcast_agent
content-length:0
content-type:text/plain; charset=UTF-8

SUBSCRIBE
reply-to:/temp-queue/mcollective_reply_monsoonfact
destination:/exchange/mcollective_broadcast/monsoonfact
id:mcollective_broadcast_monsoonfact
content-length:0
content-type:text/plain; charset=UTF-8

SUBSCRIBE
reply-to:/temp-queue/mcollective_reply_chef
destination:/exchange/mcollective_broadcast/chef
id:mcollective_broadcast_chef
content-length:0
content-type:text/plain; charset=UTF-8

SUBSCRIBE
reply-to:/temp-queue/mcollective_reply_provision
destination:/exchange/mcollective_broadcast/provision
id:mcollective_broadcast_provision
content-length:0
content-type:text/plain; charset=UTF-8

SUBSCRIBE
reply-to:/temp-queue/mcollective_reply_user
destination:/exchange/mcollective_broadcast/user
id:mcollective_broadcast_user
content-length:0
content-type:text/plain; charset=UTF-8

SUBSCRIBE
reply-to:/temp-queue/mcollective_reply_git
destination:/exchange/mcollective_broadcast/git
id:mcollective_broadcast_git
content-length:0
content-type:text/plain; charset=UTF-8

SUBSCRIBE
reply-to:/temp-queue/mcollective_reply_sleep
destination:/exchange/mcollective_broadcast/sleep
id:mcollective_broadcast_sleep
content-length:0
content-type:text/plain; charset=UTF-8

SUBSCRIBE
reply-to:/temp-queue/mcollective_reply_rpcutil
destination:/exchange/mcollective_broadcast/rpcutil
id:mcollective_broadcast_rpcutil
content-length:0
content-type:text/plain; charset=UTF-8

SUBSCRIBE
reply-to:/temp-queue/mcollective_reply_discovery
destination:/exchange/mcollective_broadcast/discovery
id:mcollective_broadcast_discovery
content-length:0
content-type:text/plain; charset=UTF-8

SUBSCRIBE
reply-to:/temp-queue/mcollective_reply_mcollective
destination:/exchange/mcollective_broadcast/mcollective
id:mcollective_broadcast_mcollective
content-length:0
content-type:text/plain; charset=UTF-8

SUBSCRIBE
destination:/exchange/mcollective_directed/mo-9c6b47c77
id:mo-9c6b47c77_directed_to_identity
content-length:0
content-type:text/plain; charset=UTF-8

... SNIP Empty lines ....

SEND
reply-to:/temp-queue/mcollective_reply_registration
expiration:70000
destination:/exchange/mcollective_broadcast/registration
content-length:79754
content-type:text/plain; charset=UTF-8

{:  body"6-----BEGIN PKCS7-----
MILkyQYJKoZIhvcNAQcCoILkujCC5LYCAQExCzAJBgUrDgMCGgUAMILdsgYJKoZI
hvcNAQcBoILdowSC3Z8ECHsIOghtc2d7DDoOYWdlbnRsaXN0WxJJIg10ZXJtaW5h

... SNIP ...

VNViH7eHj8HiNqdnjC5TCUJ+LvKHJOd3Z+MRfGc6xO2gIv8AfQzb2KN3OXkxH4sX
6YXyJ3kWPXlcvyvk6g==
-----END PKCS7-----
senderidI"mo-9c6b47c77:ET:requestidI"%71fd062060b859838b7bafedb8a10ff0;F:
                                                                         filter{
cf_class;F[I"F[I"
compound;F[:collectiveI"mcollective;T:
callerid"g/CN=mo-9c6b47c77.zone1.mo.xxx.corp/O=xxx_it_hyperic/OU=monitoring_zone1_prod/L=internal-zone1/C=DEttliA:
                                                                                                                  msgtimel+�P!U

TCP Packets

1   0.000000    10.97.6.171 10.67.76.191    TCP 74  56107→61613 [SYN] Seq=0 Win=14600 Len=0 MSS=1460 SACK_PERM=1 TSval=301787849 TSecr=0 WS=128

2   0.000038    10.67.76.191    10.97.6.171 TCP 74  61613→56107 [SYN, ACK] Seq=0 Ack=1 Win=14480 Len=0 MSS=1460 SACK_PERM=1 TSval=2867615141 TSecr=301787849 WS=128

...

211 1249.492797 10.67.76.191    10.97.6.171 TCP 66  61613→56107 [FIN, ACK] Seq=130 Ack=83092 Win=64128 Len=0 TSval=2868864634 TSecr=303037342

212 1249.546370 10.97.6.171 10.67.76.191    TCP 66  56107→61613 [ACK] Seq=83092 Ack=131 Win=14720 Len=0 TSval=303037396 TSecr=2868864634

State after Connection Termination

Log:

=ERROR REPORT==== 5-Apr-2015::15:12:20 ===
STOMP detected network error on <0.11074.0> (10.97.6.171:56107 -> 10.67.76.191:61613):
{badmatch,<<"\n">>} 

netstat -an:

empty - No longer known to kernel 

rabbitmqctl report:

<[email protected]> 10.97.6.171:56107 -> 10.67.76.191:61613 (1) <[email protected]> 1   guest   /mcollective    false   false   15  0   0   0   0   0   0   running
<[email protected]> stomp-subscription-13Ls1dCVpk0y5QTBuf9DxQ   false   true    []  <[email protected]>             0   0   0   1   1.0 14304               running 0   0   0   0   0   0   0   0   0   0   0   [{q1,0}, {q2,0}, {delta,{delta,undefined,0,undefined}}, {q3,0}, {q4,0}, {len,0}, {target_ram_count,infinity}, {next_seq_id,0}, {avg_ingress_rate,0.0}, {avg_egress_rate,0.0}, {avg_ack_ingress_rate,0.0}, {avg_ack_egress_rate,0.0}]

erlang process state:

<8873.12155.0> amqp_gen_connection:       0       0    7088       0 gen_server:loop/6

Open questions

  • Why is rabbitmq terminating the Connection after the client published the message / why did it result in an error?
  • Why is the connection not properly cleaned out of rabbitmq's client list?

I can provide a raw tcpdump pcap file, please let me know if i should produce more debug output.
From what we can see the Server sends a proper FIN to the client which gets ACK'd and should
result in a connection cleanup.

Handle heartbeats similar to MQTT

Currently heartbeats of stomp connections are handled in rabbit_stomp_processor, while we need to have control over it in rabbit_stomp_reader for throttle purpose.
Can break compatibility with web-stomp.

Request for enhancement: specifying consumer tags when subscribing

Our use case is that we find it very helpful to include some details about the environment in which a consumer is running in the text of the cTag when we subscribe via AMQP, but there currently isn't a way to specify the cTag that should be used when subscribing via Stomp (or webstomp). (Today we use this so we can quickly see from the RabbitMQ Admin UI the time at which a consumer started, the internal name of the consumer process, the internal IP address where the process is running, etc.)

It would be really nice if we could include an X-consumer-tag header to specify the name we would like to use when the plugin subscribes to the queue on behalf of the Stomp subscriber, much in the same way that we can use X-queue-name today.

In the case of a name collision with an existing cTag, treating it as a subscription error would be fine with us, though others may have other ideas.

Support queue naming when subscribing to topics/exchanges

When subscribing to a topic, it would be great if we could explicitly specify the queue name rather than using an autogenerated queue name.

Discussion can be found at this email thread:
https://groups.google.com/d/msg/rabbitmq-users/t9phDzQEP_Y/s0TyYn0RBQAJ

For our use case, the best thing would be for us to be able to explicitly specify what we'd like the queue name to be when we subscribe to the topic/exchange, since historically (from our AMQP connector) this has always been a known, predictable value that we can generate when we subscribe.

So if we could do something like:

SUBSCRIBE /topic/fooexchange
exclusive:false
auto_delete:true
x_queue_name:walter$0f3ec8405244482292ded091854796f4#foo

And then have rabbitmq-stomp create the queue name using our x-queue-name header instead of using the random, autogenerated name, it would give us exactly what we need.

In the case that the queue already existed and was already bound to the topic, it would be perfectly fine to reuse it. If it was bound to some other topic, it would be OK with us to call that an error condition.

Declaring queues with arbitrary properties (e.g. auto_delete)

We're using StompJS (with Spring Websockets providing broker access) to allow the browser to publish to exchanges and subscribe to queues. However, whenever we subscribe to a queue, the queue is created as auto_delete = false and no matter what we do we cannot seem to change that. However, if we subscribe to an exchange, we do get auto_delete exchanges created.

I've inspected the STOMP frames being created and verified there's no explicit setting whatsoever for auto_delete. I've inspected the StompJS source & haven't found anything that leads me to believe the library is responsible. I've also tried instrumenting the frames myself to include the setting (with keys of auto_delete, autoDelete, auto-delete, and pretty much anything else you can imagine) so I'm fairly confident I'm either doing something wrong or the "problem" (for lack of a better word) is on RabbitMQ's side.

Could anyone provide me guidance on subscribing to auto_delete queues?

(Build-system induced) server header change breaks Python tests

'CONNECTED\nserver:RabbitMQ/(.*)\nsession:(.*)\nheart-beat:0,0\nversion:1.0\n\n\x00'
'CONNECTED\nserver:rabbitmq_v3_6_6-23-g675cbaa/3.6.6+23.g675cbaa\nsession:session-AUOBuy8VQqhLyiZIpDmrOQ\nheart-beat:0,0\nversion:1.0\n\n\x00\n'

It would be great to keep this header backwards-compatible since there is no real reason to do away with the "RabbitMQ/" prefix.

Two subscriptions using a single durable queue

Messages don't seem to end up on the right subscription when I have two subscriptions using a single durable queue.

Here is my routing configuration:

  • One topic exchange: app
  • Two STOMP subscriptions against that exchange using the same durable queue "queue-test"
    • topic.a.*
    • topic.b.*

rabbitmqctl list_bindings:

  • exchange queue-test queue queue-test []
  • app exchange queue-test queue topic.a.* []
  • app exchange queue-test queue topic.b.* []

rabbitmqctl list_consumers:

rabbitmqctl list_queues:

  • queue-test 0

Message 1 with destination topic.a.1 sent.
Message received on the right subscription:

MESSAGE
    subscription : sub.ApMOzTQPNUlvwhmj
    destination : /exchange/app/topic.a.1
    message-id : T_sub.ApMOzTQPNUlvwhmj@@session-uqt3ppLSTqUnYXQBPb4-6w@@1
    redelivered : false
    ack : T_sub.ApMOzTQPNUlvwhmj@@session-uqt3ppLSTqUnYXQBPb4-6w@@1

Message 2 with destination topic.a.1 sent.
Message received on the wrong subscription:

MESSAGE
    subscription : sub.HNNZbjRINADxFbrp
    destination : /exchange/app/topic.a.1
    message-id : T_sub.HNNZbjRINADxFbrp@@session-uqt3ppLSTqUnYXQBPb4-6w@@2
    redelivered : false
    ack : T_sub.HNNZbjRINADxFbrp@@session-uqt3ppLSTqUnYXQBPb4-6w@@2

Both messages should have been received on the subscription: ApMOzTQPNUlvwhmj since both messages destination match topic.a.* and not topic.b.*

Only one subscriber receives message from destination starting '/topic/'

According to the doc of stomp, the message with destination from /topic/ will be delivered to all active subscribers.

Topic Destinations

For simple topic destinations which deliver a copy
of each message to all active subscribers
, destinations of the form
/topic/ can be used. Topic destinations support all the routing
patterns of AMQP topic exchanges.

Messages sent to a topic destination that has no active subscribers
are simply discarded.

But the behavior in my application is NOT consistent with above declaration.

I have an app that uses spring-websocket over stomp protocol, and uses RabbitMQ(3.6.6) with stomp as full feature broker.

Once the app receives a message from user, it will send the message to destination /topic/<route-key>. In my test scenario I opened two browers to open the same page. There are two clients to subscribe the same /topic/<route-key>. After the server side app sends the message to destination /topic/<route-key>, only one client receives the message. If more messages are sent by my app, the two clients will rotate to receive messages.

If using SimpleBroker of spring-websocket which is a memory broker implementation instead of RabbitMQ, both of two clients can receive the same message.

Is there any misunderstanding of stomp specification? If not, how I can debug this problem in RabbitMQ? If you need more information please let me know.

Maximum size of a frame body

The STOMP specification says that a STOMP server MAY place a limit on the maximum size of a frame body. Can it be done with the RabbitMQ STOMP adapter? If not, what about the web-stomp server?

Can I send a message with mandatory=True?

If I understand the code correctly it seems to me that the mandatory flag is hard coded to False in the code. I wonder if there is a reason for not supporting it?

Actually I am mainly interested in knowing a message is published to a queue (like, return False if the dest queue doesn't exist). Is there anyway to achieve that with rabbitmq-stomp? Thanks!

Random failure during subscribing to /amq/queue.

It look like a concurrency issue, as it happens when two subscriptions happen on the same time.
Here are the Rabbit log at the time of error:

=ERROR REPORT==== 8-Jun-2015::18:00:33 ===
Channel error on connection <0.283.0> (127.0.0.1:43154 -> 127.0.0.1:5671, vhost: '/', user: 'vcac-server_26325'), channel 6:
{amqp_error,not_found,
            "no queue 'ebs.testSubscribeAsSystemAdmin.serviceTypeId1.eventTopic.one0.testSubscribeAsSystemAdmin0' in vhost '/'",
            'queue.declare'}

=INFO REPORT==== 8-Jun-2015::18:00:33 ===
accepting STOMP connection <0.1506.0> (127.0.0.1:40240 -> 127.0.0.1:61613)

=ERROR REPORT==== 8-Jun-2015::18:00:34 ===
connection <0.1509.0>, channel 1 - error:
{amqp_error,not_allowed,
            "attempt to reuse consumer tag 'T_testSubscribeAsSystemAdmin0'",
            'basic.consume'}

=WARNING REPORT==== 8-Jun-2015::18:00:34 ===
Connection (<0.1509.0>) closing: received hard error {server_initiated_close,
                                                      530,
                                                      "attempt to reuse consumer tag 'T_testSubscribeAsSystemAdmin0'"} from server

=ERROR REPORT==== 8-Jun-2015::18:00:34 ===
STOMP error frame sent:
Message: "Processing error"
Detail: "Processing error"
Server private detail: {{shutdown,
                            {connection_closing,
                                {server_initiated_close,530,
                                    "attempt to reuse consumer tag 'T_testSubscribeAsSystemAdmin0'"}}},
                        {gen_server,call,
                            [<0.1515.0>,
                             {subscribe,
                                 {'basic.consume',0,
                                     <<"ebs.testSubscribeAsSystemAdmin.serviceTypeId1.eventTopic.one0.testSubscribeAsSystemAdmin0">>,
                                     <<"T_testSubscribeAsSystemAdmin0">>,
                                     false,true,false,false,[]},
                                 <0.1505.0>},
                             infinity]}}

=ERROR REPORT==== 8-Jun-2015::18:00:34 ===
** Generic server <0.1505.0> terminating
** Last message in was {'EXIT',<0.1515.0>,
                           {shutdown,
                               {connection_closing,
                                   {server_initiated_close,530,
                                       "attempt to reuse consumer tag 'T_testSubscribeAsSystemAdmin0'"}}}}
** When Server state == {state,"session-pXXzcqyPuT4rZsCCbAUJ9Q",<0.1515.0>,
                         <0.1509.0>,
                         {dict,1,16,16,8,80,48,
                          {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
                          {{[],[],[],[],[],[],[],[],
                            [[<<"T_testSubscribeAsSystemAdmin0">>|
                              {subscription,
                               "/amq/queue/ebs.testSubscribeAsSystemAdmin.serviceTypeId1.eventTopic.one0.testSubscribeAsSystemAdmin0",
                               auto,false,
                               "id='testSubscribeAsSystemAdmin0'"}]],
                            [],[],[],[],[],[],[]}}},
                         "1.2",#Fun<rabbit_stomp_reader.1.50399846>,undefined,
                         {stomp_configuration,"guest","guest",false,false},
                         {set,0,16,16,8,80,48,
                          {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
                          {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},
                         {dict,0,16,16,8,80,48,
                          {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
                          {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},
                         #Fun<rabbit_stomp_processor.5.63782375>,
                         {amqp_adapter_info,
                          {0,0,0,0,0,65535,32512,1},
                          61613,
                          {0,0,0,0,0,65535,32512,1},
                          40240,<<"127.0.0.1:40240 -> 127.0.0.1:61613">>,
                          {'STOMP',0},
                          [{ssl,false}]},
                         #Fun<rabbit_stomp_reader.0.22944995>,none,
                         {0,0,0,0,0,65535,32512,1}}
** Reason for termination == 
** {channel_died,
       {shutdown,
           {connection_closing,
               {server_initiated_close,530,
                   "attempt to reuse consumer tag 'T_testSubscribeAsSystemAdmin0'"}}}}

=INFO REPORT==== 8-Jun-2015::18:00:34 ===
closing STOMP connection <0.1506.0> (127.0.0.1:40240 -> 127.0.0.1:61613)

ACK/NACK message id parsing in STOMP 1.2

I am getting issues while calling NACK/ACK.
Below are the sequence of messages. Correct me if i am doing anything wrong.
I am using from Spring Websockets.

CONNECT
accept-version:1.2,1.1
--------------------------
CONNECTED
server:RabbitMQ/3.3.5
heart-beat:0,0
version:1.2
session:session-LqaSmsWbsrq4y1EkEDS
-----------------------------------------
SUBSCRIBE
destination:/queue/test123
ack:client-individual
id:141414
----------------------------------------------
MESSAGE
content-length:6
destination:/queue/test123
ack:T_141414@@session-LqaSmsWbsrq4y1EkEDS4RQ@@1
message-id:T_141414@@session-LqaSmsWbsrq4y1EkEDS4RQ@@1
subscription:141414
persistent:1
content-length:6

Hello
-------------------------------------------------

NACK
requeue:False
message-id:T_141414@@session-LqaSmsWbsrq4y1EkEDS4RQ@@1
subscription:141414
id:141414
-----------------------------------------------------
ERROR
content-length:40
content-type:text/plain
message:Invalid header
version:1.0,1.1,1.2
content-length:40

"NACK" must include a valid "id" header

message-id format (Apache NMS STOMP)

Apache NMS STOMP cannot handle the format of the message-id field.
NMS expects a specific format and since the message-id (MESSAGE frame) returns by the broker does not match this format NMS will throw an exception.

Is there a way to make the RabbitMQ-STOMP-Plug-In “compatible” with the NMS STOMP library?

NMS Code of parsing the message-id field:

var messageKey = "message-id-field"
var p = messageKey.LastIndexOf( ":" );
if ( p >= 0 )
    ProducerSequenceId = Int64.Parse( messageKey.Substring( p + 1 ) );

Stomp plugin closes connection on "inequivalent arg 'x-dead-letter-exchange'" error

If the Stomp client tries to subscribe to a queue with a (mismatching) x-dead-letter-exchange set, the broker immediately closes the socket connection.

However, if the Stomp client tries to subscribe to a queue with a (mismatching) x-max-priority set, the broker sends a Stomp error frame back to the client with the error description, and does not close the connection.

I suggest to treat the first error situation in the same way, which allows to see the error message on the client side in the Stomp error frame in both cases.

Here is a broker log which shows the different server reactions to the mismatching queue args:

=ERROR REPORT==== 12-Jun-2015::11:37:01 ===
Channel error on connection <0.4321.0> (127.0.0.1:49446 -> 127.0.0.1:61613, vhost: '/', user: 'guest'), channel 1:
{amqp_error,precondition_failed,
"inequivalent arg 'x-max-priority' for queue 'priority_queue' in vhost '/': received none but current is the value '9' of type 'long'",
'queue.declare'}

=ERROR REPORT==== 12-Jun-2015::11:37:01 ===
STOMP error frame sent:
Message: precondition_failed
Detail: "PRECONDITION_FAILED - inequivalent arg 'x-max-priority' for queue 'priority_queue' in vhost '/': received none but current is the value '9' of type 'long'\n"
Server private detail: none

=INFO REPORT==== 12-Jun-2015::11:37:01 ===
closing STOMP connection <0.4318.0> (127.0.0.1:49446 -> 127.0.0.1:61613)

=INFO REPORT==== 12-Jun-2015::11:37:13 ===
accepting STOMP connection <0.4338.0> (127.0.0.1:49447 -> 127.0.0.1:61613)

=ERROR REPORT==== 12-Jun-2015::11:37:13 ===
Channel error on connection <0.4341.0> (127.0.0.1:49447 -> 127.0.0.1:61613, vhost: '/', user: 'guest'), channel 1:
{amqp_error,precondition_failed,
"inequivalent arg 'x-dead-letter-exchange' for queue 'queue_with_dlx' in vhost '/': received none but current is the value 'dlx.exchange.name' of type 'longstr'",
'queue.declare'}

=ERROR REPORT==== 12-Jun-2015::11:37:13 ===
** Generic server <0.4337.0> terminating
** Last message in was {'EXIT',<0.4347.0>,
{shutdown,
{server_initiated_close,406,
<<"PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'queue_with_dlx' in vhost '/': received none but current is the value 'dlx.exchange.name' of type 'longstr'">>}}}
** When Server state == {state,"session-gfjnKs0tYwZu-ZqDNSSYbw",<0.4347.0>,
<0.4341.0>,
{dict,0,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],
[]},
{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],
[],[]}}},
"1.2",#Fun<rabbit_stomp_reader.1.50399846>,
undefined,
{stomp_configuration,"guest","guest",false,false},
{set,1,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],
[]},
{{[],[],[],[],[],[],[],[],[],[],[],
[<<"queue_with_dlx">>],
[],[],[],[]}}},
{dict,0,16,16,8,80,48,
{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],
[]},
{{[],[],[],[],[],[],[],[],[],[],[],[],[],[],
[],[]}}},
#Fun<rabbit_stomp_processor.5.63782375>,
{amqp_adapter_info,
{127,0,0,1},
61613,
{127,0,0,1},
49447,
<<"127.0.0.1:49447 -> 127.0.0.1:61613">>,
{'STOMP',0},
[{ssl,false}]},
#Fun<rabbit_stomp_reader.0.22944995>,none,
{127,0,0,1}}
** Reason for termination ==
** {channel_died,
{shutdown,
{server_initiated_close,406,
<<"PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'queue_with_dlx' in vhost '/': received none but current is the value 'dlx.exchange.name' of type 'longstr'">>}}}

=INFO REPORT==== 12-Jun-2015::11:37:13 ===
closing STOMP connection <0.4338.0> (127.0.0.1:49447 -> 127.0.0.1:61613)

Could not start rabbitmq-stomp

Something weird thing going on with 61613 port I guess but even if I change my rabbitmq.config file it throws the same error.

I've tried changing the config file to

[
{rabbitmq_stomp, [{tcp_listeners, [{"127.0.0.1", 61613},
{"::1", 61613}]}]}
].

but still it throws the same error.

Error description:
{could_not_start,rabbitmq_stomp,
{{undef,
[{rabbit_networking,tcp_listener_spec,
[rabbit_stomp_listener_sup,
{{127,0,0,1},61613,inet},
[{backlog,128},{nodelay,true}],
ranch_tcp,rabbit_stomp_client_sup,
{stomp_configuration,"guest","guest",false,false},
stomp,"STOMP TCP Listener"],
[]},
{rabbit_stomp_sup,'-listener_specs/3-lc$^1/1-1-',4,
[{file,"src/rabbit_stomp_sup.erl"},{line,44}]},
{rabbit_stomp_sup,init,1,
[{file,"src/rabbit_stomp_sup.erl"},{line,38}]},
{supervisor,init,1,[{file,"supervisor.erl"},{line,272}]},
{gen_server,init_it,6,[{file,"gen_server.erl"},{line,328}]},
{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,239}]}]},
{rabbit_stomp,start,[normal,[]]}}}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.