Giter Club home page Giter Club logo

rabbitmq-recent-history-exchange's Introduction

RabbitMQ Recent History Exchange

This repository has been moved to the main unified RabbitMQ "monorepo", including all open issues. You can find the source under /deps/rabbitmq_recent_history_exchange. All issues have been transferred.

Overview

Keeps track of the last 20 messages that passed through the exchange. Every time a queue is bound to the exchange it delivers that last 20 messages to them. This is useful for implementing a very simple Chat History where clients that join the conversation can get the latest messages.

Exchange Type: x-recent-history

Installation

RabbitMQ 3.6.0 or later

As of RabbitMQ 3.6.0 this plugin is included into the RabbitMQ distribution.

Enable it with the following command:

rabbitmq-plugins enable rabbitmq_recent_history_exchange

With Earlier Versions

Install the corresponding .ez files from our Community Plugins archive..

Then run the following command:

rabbitmq-plugins enable rabbitmq_recent_history_exchange

Building from Source

Please see RabbitMQ Plugin Development guide.

To build the plugin:

git clone git://github.com/rabbitmq/rabbitmq-recent-history-exchange.git
cd rabbitmq-recent-history-exchange
make

Then copy all the *.ez files inside the plugins folder to the RabbitMQ plugins directory and enable the plugin:

[sudo] rabbitmq-plugins enable rabbitmq_recent_history_exchange

Usage

Creating an exchange

To create a recent history exchange, just declare an exchange providing the type "x-recent-history".

channel.exchangeDeclare("logs", "x-recent-history");

Providing a custom history length

Typically this exchange will store the latest 20 messages sent over the exchange. If you want to set a different cache length, then you can pass a "x-recent-history-length" argument to exchange.declare. The argument must be an integer greater or equal to zero.

For example in Java:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-recent-history-length", 60);
channel.exchangeDeclare("rh", "x-recent-history", false, false, args);

Preventing some messages from being stored

In case you would like to not store certain messages, just add the header "x-recent-history-no-store" with the value true to the message.

Disabling the Plugin

A future version of RabbitMQ will allow users to disable plugins. When you disable this plugin, it will delete all the cached messages.

License

See LICENSE.

rabbitmq-recent-history-exchange's People

Contributors

acogoluegnes avatar carlhoerberg avatar dcorbacho avatar dumbbell avatar fenollp avatar gerhard avatar hyperthunk avatar kjnilsson avatar legoscia avatar lukebakken avatar michaelklishin avatar mweibel avatar spring-operator avatar videlalvaro avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rabbitmq-recent-history-exchange's Issues

Couldn't bind from topic exchange to x-history-exchange

Hi,
we are trying to use this plugin to bind from topic exchange to x-history-exchange and the code bombs.
Knowing nothing about erlang, I'm at a bit off a loss but looking around this line it appears that you are only allowing bindings to queues, and not from exchanges (though I'm probably reading this all wrong):

https://github.com/videlalvaro/rabbitmq-recent-history-exchange/blob/master/src/rabbit_exchange_type_recent_history.erl#L75

Is this plugin supposed to bind only to queues? Are we doing something wrong?

Cheers

rabbitmq_recent_history_exchange-1.0.1-rmq3.2.4.ez is broken

I've recently tried to install that binary and it fails, rabbitmq says:

=ERROR REPORT==== 9-Apr-2015::11:05:13 ===
Problem reading some plugins: [{"/usr/lib/rabbitmq/lib/rabbitmq_server-3.2.4/sbin/../plugins/rabbitmq_recent_history_exchange-1.0.1-rmq3.2.4.ez",
{invalid_ez,einval}}]
1.0.0 is also broken

$ uname -a
Linux ip-172-30-1-180 3.13.0-44-generic #73-Ubuntu SMP Tue Dec 16 00:22:43 UTC 2014 x86_64 x86_64 x86_64 GNU/Linux

$ cat /etc/os-release
NAME="Ubuntu"
VERSION="14.04.2 LTS, Trusty Tahr"
ID=ubuntu
ID_LIKE=debian
PRETTY_NAME="Ubuntu 14.04.2 LTS"
VERSION_ID="14.04"
HOME_URL="http://www.ubuntu.com/"
SUPPORT_URL="http://help.ubuntu.com/"
BUG_REPORT_URL="http://bugs.launchpad.net/ubuntu/"

$ dpkg -s rabbitmq-server
Package: rabbitmq-server
Status: install ok installed
Priority: extra
Section: net
Installed-Size: 4753
Maintainer: Ubuntu Developers [email protected]
Architecture: all
Version: 3.2.4-1
Depends: erlang-nox (>= 1:13.b.3) | esl-erlang, adduser, logrotate
Conffiles:
/etc/logrotate.d/rabbitmq-server ddaa633357e5f6ff9d7970c6d0a9acd8
/etc/default/rabbitmq-server 7cc5216f193f6be1cb32c45021197b39
/etc/init.d/rabbitmq-server 5d49d9839503d8912b13dc1123498257
Description: AMQP server written in Erlang
RabbitMQ is an implementation of AMQP, the emerging standard for high
performance enterprise messaging. The RabbitMQ server is a robust and
scalable implementation of an AMQP broker.
Original-Maintainer: RabbitMQ Team [email protected]
Homepage: http://www.rabbitmq.com/

Only new queues receive exchange history

This might 1) be by design or 2) because I'm doing something wrong (probably the case...), but I'm only seeing the message history delivered to new queues.

Here's my code (I'm using the Bunny Ruby client):

@conn = Bunny.new(ENV['CLOUDAMQP_URL'])
@conn.start
@ch = @conn.create_channel
@x = @ch.exchange("video-exchange",
                  type: "x-recent-history",
                  arguments: { 'x-recent-history-length' => 1 })

# will only receive history payload once
queue_name = "queue-1"
@ch.queue(queue_name).bind(@x).subscribe do |_, _, payload|
  puts "!!!!!!!!! '#{payload}' on #{queue_name}"
end

# will always receive history payload
queue_name = "queue-#{SecureRandom.hex}"
@ch.queue(queue_name).bind(@x).subscribe do |_, _, payload|
  puts "!!!!!!!!! '#{payload}' on #{queue_name}"
end

High availability cluster support for plugin

Hi

We wanted to use this plugin in a HA environment, however I'm running into a bit of a problem
with the x-recent-history exchanges with the following setup;

  • 2 nodes setup in a cluster ( rabbitmq 3.3.5 )
  • A policy set with the parameters "ha-mode: all"

This replicates the queues and exchanges across both machines.

When both nodes are up, we can bind a queue to one of the recent history exchanges from either of the nodes.

When Node 1 where the x-recent-history exchange was created is shut down, then trying to bind a queue to the exchange on Node 2 gives the following error:

** Reason for termination ==
** {{error,{throw,{error,{no_exists,rh_exchange_table}}}},
[{rabbit_misc,execute_mnesia_transaction,1},
{rabbit_misc,execute_mnesia_tx_with_tail,1},
{rabbit_channel,binding_action,9},
{rabbit_channel,handle_cast,2},
{gen_server2,handle_msg,2},
{proc_lib,init_p_do_apply,3}]}

Unfortunately my erlang is not the best, however from what I can see it appears the in memory table that is setup for the exchange which holds the history events is only created on the node that made the exchange. So when that node is shut down the replica cannot access the table and fails.

Are there any special requirements for creating the exchanges on clustered nodes or is there a way for this table to replicate between nodes?

Thanks
Oliver

for RabbitMQ 3.4.1

Hi,

This Plugin is looks very nice. Please provide me binary for RabbitMQ 3.4.1.

Thanks
memorycraft

History size in config file

As mentioned in the the title, it would be nice to be able to change the history/replay size in the RabbitMQ configuration file.

Cache messages based on time limit instead of a fixed size

This plugin would be more useful if it also supported a method of storing (and redelivering) messages younger than N seconds.

My use case involves clients connecting to exclusive queues to receive data updates. When a client connects, its data may be up to a minute out of date. Limiting the cache to a fixed number of messages doesn't work well when the frequency at which messages are pushed is highly variable. If the message limit is small, pushing many messages in a short period of time may cause messages to not be delivered to new queues. On the other hand, if the message limit is large, there will be messages in the queue that can be hours old that the client will need to handle.

error on queue binding

hi, I'm trying to get your plugin working on rabbitmq 3.0.4 (erlang R16B), but unfortunately every time the queue binds to exchange of "x-recent-history" type, following error is thrown:

01:10:57.731 [error] Connection (<0.76.0>) closing: received hard error {'connection.close',541,<<"INTERNAL_ERROR">>,0,0} from server

plugin itself seem to be recognized and exchange is correctly created:

-- plugins running
rabbitmq_recent_history_exchange  0.1.0-rmq0.0.0

Any hints what may be the cause of this problem?

By the way, how does this plugin route incoming messages? Does it work like a fanout?
Thanks,
Michał

exchange broke after 200000 pubulished msg.

Setting the queue with 'x-recent-history-length: 1000000'

args={'x-recent-history-length':1000000}
channel.exchange_declare(exchange='buffer', exchange_type='x-recent-history', arguments =args)

for i in range(1000000):
    message = datetime.now().strftime("%H:%M:%S") + '.' + str(i)
    channel.basic_publish(exchange='buffer',
                          routing_key='',
                          body=message)

At first it publish 10000 per second. after 100000 msgs , it starts to slow down , and then after 200000, it broke.

Subscribe to x-recent-history exchange with topic

Hi,

Thanks for making this plugin, it looks like it could be handy for a usecase I need to address. I'm trying to set up the following RabbitMQ pub/sub system:

  • Publishers write messages with various topics to a main exchange of type topic.
  • The main exchange is bound to a second exchange, of type x-recent-history. All messages are routed between the two (routing_key == '#').
  • The subscribers connect to the second exchange.

I would like that upon connection, the subscribers only receive the last N messages related to their topic. Right now they receive N messages with any topic. I see that the topic argument is ignored for the recent history queue. I'm wondering if this is by design or if it's a bug?

Thanks!
Radu

Add "howto" information

I've installed and enabled this plugin, but still can't figure how to use it. Could you add some basic information about using this plugin?

consumer stuck after receiving 100000 msg

publisher publish 1000000 msg. consumer starts later to receive the msg. it hangs after 100000 msgs. Code shown below:

Publisher

args={'x-recent-history-length':1000000}
channel.exchange_declare(exchange='buffer', exchange_type='x-recent-history', arguments =args)
for i in range(1000000):
    message = datetime.now().strftime("%H:%M:%S") + '.' + str(i)
    channel.basic_publish(exchange='buffer',
                          routing_key='',
                          body=message)

Consumer

channel.exchange_declare(exchange='buffer', exchange_type='x-recent-history')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='buffer',
                   queue=queue_name)
print(' [*] Waiting for buffer. To exit press CTRL+C')

def callback(ch, method, properties, body):
    msg_id = int (body.split('.')[1])
    if msg_id % 10000 == 0:
        print(" [x] %s:  %r" % (datetime.now().strftime("%H:%M:%S"),body))

Note: this might be related to issue #27

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.