Giter Club home page Giter Club logo

rabbit.js's Introduction

Messaging in Node.JS with RabbitMQ

$ npm install rabbit.js

This library provides a simple, socket-oriented API* for messaging in Node.JS, using RabbitMQ as a backend.

var context = require('rabbit.js').createContext();
context.on('ready', function() {
  var pub = context.socket('PUB'), sub = context.socket('SUB');
  sub.pipe(process.stdout);
  sub.connect('events', function() {
    pub.connect('events', function() {
      pub.write(JSON.stringify({welcome: 'rabbit.js'}), 'utf8');
    });
  });
});

See Github pages for documentation of the most recent release, and the branch gh-pages-next for provisional documentation of the next release (which usually corresponds to the code in master branch).

Status

Still on major version 0, though in use in a number of places, I believe.

Version 0.3.0 and on are built on amqplib. Previous versions, of which version 0.2.2 was the last, used node-amqp.

Uses

This library is suitable for co-ordinating peers (e.g., Node.JS programs), acting as a gateway to other kinds of network (e.g., relaying to browsers via SockJS), and otherwise as a really easy way to use RabbitMQ.

rabbit.js's People

Contributors

christophermina avatar cpdigger avatar dvberkel avatar englercj avatar majek avatar manjbhachu avatar pledbrook avatar sinetbot avatar squaremo avatar timoxley avatar wolfeidau avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

rabbit.js's Issues

Disconnect a socket from a target

Worth having a way to unsubscribe a SUB (and the equivalent for other sockets?).
Among other things this would delete the appropriate resources (ifUnused).

True Asynchronous RPC

This seems to be a sticking point for me. At a protocol level, RPC over AMQP is supposedly synchronous, but that doesn't make sense to me. RabbitMQ (and I assume other brokers) don't care about replyTo or correlationId. It should therefore be possible to send multiple RPCs and wait for responses asynchronously.

I was trying to figure out how to get this to work in the context of rabbit.js, as its a stream-oriented interface, but I'm not having much luck. The best I can figure out is to ensure that things coming out of the asynchronous REQ socket are ordered, which seems easy enough, but the async REP socket is more complicated, because you can't be sure that anybody will write in the correct order. I suppose it could become synchronous on the consumer's end, but this would reduce the efficiency boots that come with asynchronicity.

The basic idea here is that you associate each outgoing message with a sequential correlationId, then match that correlationId back up to the promise/callback when you get a response on the replyTo queue. The back-end routes each incoming request to a handler--probably just the same function--which does some work. When the work is finished, the handler responds to the incoming request through the replyTo queue and keeps the correlationId.

The problem with this model, as stated above, is that streams inherently decouple the order in this instance, because the work takes an unknowable amount of time.

Could this fit into rabbit.js? Did I get this completely wrong?

Garbage collection of server objects

It's desirable that rabbit.js doesn't leave garbage around on the server. For instance, that a queue declared implicitly by sockets connecting is removed when all sockets are disconnected (well, perhaps after a grace period, so we don't lose messages accidentally).

When consuming from a queue, this is pretty easy: mark the queue as auto-delete (or give a queue a TTL), so when all sockets have gone, their consumer tags go, and the queue goes. Similarly exchanges and bindings.

From the publishing side it is not easy.

One scheme I have thought of is to represent a publisher's "lease" on an exchange by a queue bound to it, with its fate tied to the socket. We don't actually want messages in the queue, so it should have a queue-based TTL of 0, and no consumer, and a binding key unlikely to match routing keys.

So far so good. I can't adopt this tactic with queues, however. Probably the closest I can come is to declare a queue with a TTL, and occasionally redeclare it. Which is a bit gross, and will lead to a lot of redundant queue.declares, if there are a lot of sockets. (Also I have to think of what to do if the queue.declare fails!)

Try to reconnect after a restart of rabbbitMQ generate an Error

Hi guys,

I try to reconnect after a restart of rabbitMq server.

Example:
var context = require('rabbit.js').createContext("amqp://" + CONFIG.amqpHost + ":" + CONFIG.amqpPort);
pubStatSub = context.socket('PUSH');
pubStatSub.connect("statsub_queue");
pubStatSub.write(JSON.stringify(message), 'utf8');

 try {
        pubStatSub.write(JSON.stringify(message), 'utf8');
    } catch (err) {
       // I've restarted rabbitMQ
        context = require('rabbit.js').createContext("amqp://" + CONFIG.amqpHost + ":" + CONFIG.amqpPort);
        pubStatSub = context.socket('PUSH');
        pubStatSub.connect("statsub_queue");
        pubStatSub.write(JSON.stringify(message), 'utf8');
    }

But the connection generate the following error:

Message: Cannot set property '1' of undefined

Stacktrace:

TypeError: Cannot set property '1' of undefined
at Connection.queue (/Users/arnaud/Fivecool/StreamHub_Next/streamhub-collector/node_modules/rabbit.js/node_modules/amqp/amqp.js:1190:26)
at Stream._advertise (/Users/arnaud/Fivecool/StreamHub_Next/streamhub-collector/node_modules/rabbit.js/lib/sockets.js:177:24)
at Stream.connect (/Users/arnaud/Fivecool/StreamHub_Next/streamhub-collector/node_modules/rabbit.js/lib/sockets.js:379:8)
at proxyMessage (/Users/arnaud/Fivecool/StreamHub_Next/streamhub-collector/server.js:251:24)
at SockJSConnection. (/Users/arnaud/Fivecool/StreamHub_Next/streamhub-collector/server.js:70:13)
at SockJSConnection.emit (events.js:67:17)
at Session.didMessage (/Users/arnaud/Fivecool/StreamHub_Next/streamhub-collector/node_modules/sockjs/lib/transport.js:202:25)
at WebSocketReceiver.didMessage (/Users/arnaud/Fivecool/StreamHub_Next/streamhub-collector/node_modules/sockjs/lib/trans-websocket.js:105:29)
at /Users/arnaud/Fivecool/StreamHub_Next/streamhub-collector/node_modules/sockjs/lib/trans-websocket.js:82:22
at [object Object]. (/Users/arnaud/Fivecool/StreamHub_Next/streamhub-collector/node_modules/sockjs/node_modules/faye-websocket/lib/faye/websocket/api/event_target.js:41:7)

Cant get resume_name to work

I'm trying to implement resume_nameso that i don't risk losing messages

I'm getting the error:

Error: Channel closed by server: 403 (ACCESS-REFUSED) with message "ACCESS_REFUSED - queue 'subs.*' in vhost '/' in exclusive use"

Any ideas what I'm doing wrong? Also seems that the pub never submits any message.

The code i run is:

var encoding = "utf8";
var context = require("rabbit.js").createContext(queueSocket); 

context.on("ready", function () {
    console.log(" [x] ready");

    var sub = context.socket('SUB', {resume_name: 'subs.*', persistent: true});
    sub.connect('events');
    sub.setsockopt('persistent', true);
    sub.setEncoding(encoding);

    sub.on('data', function (msg) {
        console.log("(1) MSG --> " + msg);
    }); 
    sub.close();


    var pub = context.socket('PUB');
    pub.connect('events');    
    pub.publish('subs.hello', 'test message', encoding);



    var sub2 = context.socket('SUB', {resume_name: 'subs.*', persistent: true});
    sub2.connect('events');
    sub2.setsockopt('persistent', true);
    sub2.setEncoding(encoding);

    sub2.on('data', function (msg) {
        console.log("(2) MSG --> " + msg);
    });
});

When is a socket ready?

var rabbitjs = require('./backend/node_modules/rabbit.js')

var context = rabbitjs.createContext('amqp://localhost');

var req = context.socket('REQ');

req.connect('backend');

req.on('data', function(data){
    var json = JSON.parse(data);

    console.log(json);
});

context.on('ready', function(){
    console.log('ready')

    req.write(JSON.stringify({
            method: 'users.login',
            info: {}
        }), 'utf8');
})

context.on('error', function(err){
    console.log(err);
})

This code does not work. "ready" does get logged to the console, but the message isn't written to the MQ.

var rabbitjs = require('./backend/node_modules/rabbit.js')

var context = rabbitjs.createContext('amqp://localhost');

var req = context.socket('REQ');

req.connect('backend');

req.on('data', function(data){
    var json = JSON.parse(data);

    console.log(json);
});

context.on('ready', function(){
    console.log('ready')

    setTimeout(function(){
        req.write(JSON.stringify({
                method: 'users.login',
                info: {}
            }), 'utf8');
    }, 1000)
})

context.on('error', function(err){
    console.log(err);
})

This code does work. The message is written to the MQ, and a response is logged to the console.

This makes me think that for some reason, the req stream isn't ready to have data written to it immediately. So, why is this? I tried listening to the req object's 'ready', 'readable', 'connected', 'connection', 'connect' events, but none seem to be emitted. Is there not an event that is emitted on the req stream object that tells me that the req object is ready to have data written to it?

Array.prototype and for loop

Hi,
iยดm using the Array.prototype mechanism to extend arrays for some functions in my project. This causes the rabbit.js to crash.

Callstack:
/Users/cp/node_modules/rabbit.js/lib/sockets.js:168
if (ad.exchange.name == ex.name &&
^
TypeError: Cannot read property 'name' of undefined
at addToAdvertisements (/Users/cp/node_modules/rabbit.js/lib/sockets.js:168:24)
at Queue._openCallback (/Users/cp/node_modules/rabbit.js/lib/sockets.js:181:11)
at Queue._onMethod (/Users/cp/node_modules/amqp/amqp.js:1683:14)
at Queue._onChannelMethod (/Users/cp/node_modules/amqp/amqp.js:1348:14)
at Connection._onMethod (/Users/cp/node_modules/amqp/amqp.js:917:28)
at AMQPParser.onMethod (/Users/cp/node_modules/amqp/amqp.js:792:12)
at AMQPParser._parseMethodFrame (/Users/cp/node_modules/amqp/amqp.js:441:10)
at frameEnd (/Users/cp/node_modules/amqp/amqp.js:186:16)
at frame (/Users/cp/node_modules/amqp/amqp.js:171:14)
at AMQPParser.header as parse

Code to reproduce:

var context = require('rabbit.js').createContext('amqp://localhost:5672');

Array.prototype.example = function (func) {
}

context.on('ready', function() {
var push = context.socket('PUSH');
push.connect('items', function() {
});
});

I think that happens because you iterate arrays by: for(var i in array) instead of for(var i =0;i<length;i++).
This causes the loop to iterate the properties of the prototype too.

Could you please try to reproduce our problem?
What solution for our problem do you propose?

Thanks for your reply.

I am getting COMMAND_INVALID - second 'channel.open' seen errors

Gday

Been working with rabbit.js quite a bit and started getting these errors on 0.10.12.

     Uncaught Error: COMMAND_INVALID - second 'channel.open' seen
      at Connection._onMethod (/Users/markw/Code/Javascript/testproj/node_modules/rabbit.js/node_modules/amqp/amqp.js:1155:15)
      at AMQPParser.parser.onMethod (/Users/markw/Code/Javascript/testproj/node_modules/rabbit.js/node_modules/amqp/amqp.js:897:12)
      at AMQPParser._parseMethodFrame (/Users/markw/Code/Javascript/testproj/node_modules/rabbit.js/node_modules/amqp/amqp.js:451:10)
      at frameEnd (/Users/markw/Code/Javascript/testproj/node_modules/rabbit.js/node_modules/amqp/amqp.js:192:16)
      at frame (/Users/markw/Code/Javascript/testproj/node_modules/rabbit.js/node_modules/amqp/amqp.js:177:14)
      at AMQPParser.header [as parse] (/Users/markw/Code/Javascript/testproj/node_modules/rabbit.js/node_modules/amqp/amqp.js:163:14)
      at AMQPParser.execute (/Users/markw/Code/Javascript/testproj/node_modules/rabbit.js/node_modules/amqp/amqp.js:236:21)
      at Connection.<anonymous> (/Users/markw/Code/Javascript/testproj/node_modules/rabbit.js/node_modules/amqp/amqp.js:935:14)
      at Connection.EventEmitter.emit (events.js:95:17)
      at Connection.<anonymous> (_stream_readable.js:736:14)
      at Connection.EventEmitter.emit (events.js:92:17)
      at emitReadable_ (_stream_readable.js:408:10)
      at emitReadable (_stream_readable.js:404:5)
      at readableAddChunk (_stream_readable.js:165:9)
      at Connection.Readable.push (_stream_readable.js:127:10)
      at TCP.onread (net.js:526:21)

I saw there were some issues a ways back and wonder if you had any ideas.

I have this thing i call a bus, just hides all the messaging stuff behind a couple of simple functions.

var log = require('debug')('libs:bus');
var rabbit = require('rabbit.js');

var Bus = function(){
  log('connect', settings.rabbit_url);
  this.rabbitContext = rabbit.createContext(settings.rabbit_url);
}

Bus.prototype.subscribe = function(path, cb){
  log('subscribe', path)
  var subStream = this.rabbitContext.socket('SUB')
  subStream.connect(path);
  log('callback', 'invoked');
  cb(null, subStream);
}

Bus.prototype.publish = function(path, cb){
  log('publish', path)
  var pubStream = this.rabbitContext.socket('PUB')
  pubStream.connect(path);
  log('callback', 'invoked');
  cb(null, pubStream);
}

module.exports = new Bus();

And a test using mocha.

var should = require('should');
var log = require('debug')('test:bus')

var bus = require('../bus');

describe('Bus', function () {

  it('should open a publish stream', function(done){

    log('open')

    bus.publish('sometestpub', function(err, stream){
      log('stream', 'publish');
      stream.should.exist;
      stream.write("TEST TEST", 'utf8');
      done()
    })
  })

  it('should open a subscribe stream', function(done){

    log('open')

    bus.subscribe('sometestpub', function(err, stream){
      log('stream', 'subscribe');
      //err.should.not.exist;
      stream.should.exist;
      stream.pipe(process.stdout);
      done()
    })
  })
})

This works fine on 0.8.22 and fails on 0.10.12.

Ability for 'REP' to 'autoDelete'

We have various services which expose functionality via 'Req/Rep' messaging pattern. However, when the services come down, their queues remain. I would like to be able to specify the "autoDelete" flag.

For example:

this.rabbitContext.socket('REP', {expiration: constants.queue.timeout, autoDelete: true});

Any thoughts as to why this would not be a good idea?

Reconnect if RabbitMQ server restarts

Im trying to figure out how to reconnect to the RabbitMQ server if it should restart.

Any ideas?

PS. I LOVE your lib!

My current code:

context.on("ready", function () {
    console.log(" [x] RabbitMQ is ready");

    sockjs.on('connection', function (conn) {
        var pub = context.socket('PUB');
        var sub = context.socket('SUB');

        sub.connect('cobrowsing');
        pub.connect('cobrowsing');
        sub.setEncoding(encoding);

        sub.on('data', function (msg) {
            console.log("MSG --> " + msg);
            conn.write(msg);
        });

        conn.on('data', function (message) {
            try {
                var obj = JSON.parse(message);

                if (obj.header == "register") {
                    sub.subscribe(BROWSE + '.' + obj.sid + '.' + obj.channel + '.*');
                }

            } catch (err) {
                console.log("ERROR ----> " + err.stack);
            }
        });

        conn.on('close', function () {
            try {

                sub.close();
                pub.close()
                conn.close();

            } catch (er) {
                console.log(":::::::: Exception ::::::::>>>>>>> " + er.stack);
            }
        });
    });
});

context.on("close", function () {

    console.log("RabbitMQ connection was closed");
    setInterval(function () {
        console.log("Rabbit trying to reconnect...");
        var context = require("rabbit.js").createContext(queueSocket);
    }, 1500);

});
context.on("error", function (e) {
    console.log("RabbitMQ connection was closed");
    setInterval(function () {
        console.log("Rabbit trying to reconnect...");
        var context = require("rabbit.js").createContext(queueSocket);
    }, 1500);
});

Fails to install

Hi. Trying to install rabbit.js with npm on node 0.8.3 gets me this;

tom-AOD255# npm install -g rabbit.js
npm http GET https://registry.npmjs.org/rabbit.js
npm http 200 https://registry.npmjs.org/rabbit.js
npm http GET https://registry.npmjs.org/rabbit.js/-/rabbit.js-0.2.0.tgz
npm http GET https://registry.npmjs.org/amqp
npm http 200 https://registry.npmjs.org/amqp
npm ERR! Error: No compatible version found: amqp@'>=0.1.0'
npm ERR! Valid install targets:
npm ERR! ["0.0.1","0.0.2","0.0.2-squaremo01.184010"]
npm ERR! at installTargetsError (/usr/local/lib/node_modules/npm/lib/cache.js:506:10)
npm ERR! at next_ (/usr/local/lib/node_modules/npm/lib/cache.js:452:17)
npm ERR! at next (/usr/local/lib/node_modules/npm/lib/cache.js:427:44)
npm ERR! at /usr/local/lib/node_modules/npm/lib/cache.js:419:5
npm ERR! at saved (/usr/local/lib/node_modules/npm/node_modules/npm-registry-client/lib/get.js:136:7)
npm ERR! at /usr/local/lib/node_modules/npm/node_modules/graceful-fs/graceful-fs.js:230:7
npm ERR! at Object.oncomplete (fs.js:297:15)
npm ERR! [Error: No compatible version found: amqp@'>=0.1.0'
npm ERR! Valid install targets:
npm ERR! ["0.0.1","0.0.2","0.0.2-squaremo01.184010"]]
npm ERR! You may report this log at:
npm ERR! http://github.com/isaacs/npm/issues
npm ERR! or email it to:
npm ERR! [email protected]

npm ERR! System Linux 3.2.0-23-generic
npm ERR! command "/usr/local/bin/node" "/usr/local/bin/npm" "install" "-g" "rabbit.js"
npm ERR! cwd /root
npm ERR! node -v v0.8.3
npm ERR! npm -v 1.1.32
npm ERR! message No compatible version found: amqp@'>=0.1.0'
npm ERR! message Valid install targets:
npm ERR! message ["0.0.1","0.0.2","0.0.2-squaremo01.184010"]
npm ERR!
npm ERR! Additional logging details can be found in:
npm ERR! /root/npm-debug.log
npm ERR! not ok code 0

However npm info gives me:
tom-AOD255# npm info amqp
npm http GET https://registry.npmjs.org/amqp
npm http 304 https://registry.npmjs.org/amqp

{ name: 'amqp',
description: 'AMQP driver for node',
'dist-tags': { latest: '0.1.3' },
...
engines: { node: '0.4 || 0.5 || 0.6' },
...

Might this be the error?

Durable topic exchange

I need to be able to publish to a durable topic exchange but at the moment this seems to be disallowed as per

PubSocket.prototype.connect = function(destination, callback) {
  var self = this, ch = this.ch;
  ch.assertExchange(destination,
                    this.options.routing || 'fanout',
                    {durable: false, autoDelete: false})
    .then(function(ok) {
      self.pubs.push(destination);
    }).then(callback);
};

When I connect to RabbitMQ I get the following error

Channel closed by server: 406 (PRECONDITION-FAILED) with message
"PRECONDITION_FAILED - cannot redeclare exchange 'ExchangeName'
in vhost '/' with different type, durable, internal or autodelete value"

It would be great if it were possible to do the following:

var pub = context.socket('PUBLISH', {routing: 'topic', persistent: true});
pub.connect(exchange);

I ended up updating the assertExchange code above to set durable to true but I'm not sure what the wider implications are of doing so.

I'm happy to submit a pull request if this is a feasible option.

Custom exchanges for REQ/REP sockets

I would like to create a REQ or REP socket on a particular exchange. I think rabbit.js just uses the default exchanges for these sockets. Not sure how to achieve this and I believe it's exposed in amqlib. Is it as simple as adding another param to ReqSocket.connect and using assertExchange?

rabbit.js fails when messages > 100kb

Using :

vvo@think-vvo [06:54:03] [~] 
-> % sudo apt-cache show rabbitmq-server
Package: rabbitmq-server
Version: 2.8.2-2

And rabbit.js HEAD.

Trying req/rep examples from my fork : https://github.com/vvo/rabbit.js/commit/8154666f099ef9da560f7aff148aac10376cfdd5

I get :

buffer.js:479
  if (end < start) throw new Error('sourceEnd < sourceStart');
                         ^
Error: sourceEnd < sourceStart
    at Buffer.copy (buffer.js:479:26)
    at Message.<anonymous> (/x/rabbit.js/lib/sockets.js:294:11)
    at Message.emit (events.js:67:17)
    at Queue._onContent (/x/rabbit.js/node_modules/amqp/amqp.js:1740:23)
    at AMQPParser.onContent (/x/rabbit.js/node_modules/amqp/amqp.js:817:32)
    at AMQPParser.execute (/x/rabbit.js/node_modules/amqp/amqp.js:215:22)
    at Connection.<anonymous> (/x/rabbit.js/node_modules/amqp/amqp.js:851:12)
    at Connection.emit (events.js:67:17)
    at TCP.onread (net.js:367:14)

Lowering message size to 100kb solves it, seems to be a problem with the data event of the message.

Creating multiple contexts causes real problems

While investigating workarounds for issue #37 I tried to create multiple contexts pointing to the same server.

First I tried this:

var rabbitJS = require('rabbit.js');
var context0 = rabbitJS.createContext('amqp://localhost');
context0.on('ready', function() {
  var pub = context0.socket('PUB');
  pub.connect('events', function() {
    var context1 = rabbitJS.createContext('amqp://localhost');
    context1.on('ready', function() {
      var sub1 = context1.socket('SUB');
      sub1.setEncoding('utf8');
      sub1.on('data', function(data) {
        console.log('subscriber 1');
        console.log(data);
      });
      sub1.connect('events', function() {
        var context2 = rabbitJS.createContext('amqp://localhost');
        context2.on('ready', function() {
          var sub2 = context2.socket('SUB');
          sub2.setEncoding('utf8');
          sub2.on('data', function(data) {
            console.log('subscriber 2');
            console.log(data);
          });
          sub2.connect('events', function() {
            console.log('publish');
            pub.write(JSON.stringify({welcome: 'rabbit.js', source: 'pub'}), 'utf8');
          });
        });
      });
    });
  });
});

with the following result:

Unhandled connection error: COMMAND_INVALID - second 'channel.open' seen

events.js:72
        throw er; // Unhandled 'error' event
              ^
Error: COMMAND_INVALID - second 'channel.open' seen
    at Connection._onMethod (/home/pghalliday/Development/testRabbitMQ/node_modules/rabbit.js/node_modules/amqp/amqp.js:1155:15)
    at AMQPParser.parser.onMethod (/home/pghalliday/Development/testRabbitMQ/node_modules/rabbit.js/node_modules/amqp/amqp.js:897:12)
    at AMQPParser._parseMethodFrame (/home/pghalliday/Development/testRabbitMQ/node_modules/rabbit.js/node_modules/amqp/amqp.js:451:10)
    at frameEnd (/home/pghalliday/Development/testRabbitMQ/node_modules/rabbit.js/node_modules/amqp/amqp.js:192:16)
    at frame (/home/pghalliday/Development/testRabbitMQ/node_modules/rabbit.js/node_modules/amqp/amqp.js:177:14)
    at AMQPParser.header [as parse] (/home/pghalliday/Development/testRabbitMQ/node_modules/rabbit.js/node_modules/amqp/amqp.js:163:14)
    at AMQPParser.execute (/home/pghalliday/Development/testRabbitMQ/node_modules/rabbit.js/node_modules/amqp/amqp.js:236:21)
    at Connection.<anonymous> (/home/pghalliday/Development/testRabbitMQ/node_modules/rabbit.js/node_modules/amqp/amqp.js:935:14)
    at Connection.EventEmitter.emit (events.js:95:17)
    at Connection.<anonymous> (_stream_readable.js:710:14)

I'm guessing multiple contexts are not supported (although I think they may be useful/vital when I get round to creating unit and integration tests)

I then tried a similar experiment with just 2 contexts:

var rabbitJS = require('rabbit.js');
var context0 = rabbitJS.createContext('amqp://localhost');
context0.on('ready', function() {
  var pub = context0.socket('PUB');
  pub.connect('events', function() {
    var context1 = rabbitJS.createContext('amqp://localhost');
    context1.on('ready', function() {
      var sub = context1.socket('SUB');
      sub.setEncoding('utf8');
      sub.on('data', function(data) {
        console.log('subscriber 1');
        console.log(data);
      });
      sub.connect('events', function() {
        console.log('publish');
        pub.write(JSON.stringify({welcome: 'rabbit.js', source: 'pub'}), 'utf8');
      });
    });
  });
});

This time I got the following result:

publish
subscriber 1
{"welcome":"rabbit.js","source":"pub"}
subscriber 1
{"welcome":"rabbit.js","source":"pub"}
subscriber 1
{"welcome":"rabbit.js","source":"pub"}
subscriber 1
{"welcome":"rabbit.js","source":"pub"}
subscriber 1
{"welcome":"rabbit.js","source":"pub"}
subscriber 1
{"welcome":"rabbit.js","source":"pub"}
subscriber 1
{"welcome":"rabbit.js","source":"pub"}
subscriber 1
{"welcome":"rabbit.js","source":"pub"}
subscriber 1
... etc forever ...

This was clearly an infinite loop. Again am I misunderstanding something about how RabbitMQ works. This seems like a pretty serious issue too but possibly not as bad as the last #37

High Throughput Error

Hey guys,

I'm not familiar at all with the internals of Node or Rabbit.js, so let me know if you need more info.

Basically if I listen to a relatively high throughput stream (~3.6k rpm) over http, pushing data into rabbit for about 10 minutes or so, I'll get this error every time. Let me know if you have any clue.

The code is fairly simple...

context = require('rabbit.js').createContext('amqp://localhost:5672');

context.on('ready', function() {
  var express = require('express'),
  util = require('util'),
  fs = require('fs'),
  https = require("https"),
  querystring = require('querystring'),

  var sub = context.socket('SUB');
  var pub = context.socket('PUB'); 
  sub.pipe(process.stdout);
  sub.connect('pre', function() {});

  var app = express(),
  port = (process.env.PORT || 9000);

// Allow to parse the body of the POST requests
app.use(express.bodyParser());

// POST /callback
app.post('/callback', function(request, response){
  // request.body is a JSON already parsed
  pub.connect('pre', function() {
    request.body.forEach(function(notificationOjb){
      pub.write(notificationOjb);    
    });
  });

  response.send();
});

// Run the app
app.listen(port, function(){
  console.log("Listening in port %d", port);
});

});

Unhandled connection error: CHANNEL_ERROR - unexpected method in connection state running

stream.js:81
throw er; // Unhandled stream error in pipe.
^
Error: CHANNEL_ERROR - unexpected method in connection state running

at Connection._onMethod (/node_modules/rabbit.js/node_modules/amqp/amqp.js:1119:15)

at AMQPParser.self.addListener.parser.onMethod (/node_modules/rabbit.js/node_modules/amqp/amqp.js:895:12)

at AMQPParser._parseMethodFrame (/node_modules/rabbit.js/node_modules/amqp/amqp.js:449:10)

at frameEnd (/node_modules/rabbit.js/node_modules/amqp/amqp.js:190:16)

at frame (/node_modules/rabbit.js/node_modules/amqp/amqp.js:175:14)

at AMQPParser.header [as parse] (/node_modules/rabbit.js/node_modules/amqp/amqp.js:162:14)

at AMQPParser.execute (/node_modules/rabbit.js/node_modules/amqp/amqp.js:234:21)

at Connection.<anonymous> (/node_modules/rabbit.js/node_modules/amqp/amqp.js:932:12)

at Connection.EventEmitter.emit (events.js:96:17)

at TCP.onread (net.js:397:14)

Unhandled 'error' event

Hi guys,

I wanna handle this error when it is being emitted but i can't. Can you help me?

I'm using NodeJS v0.8.14 on a Windows machine.

I understand this error but i'm doing a dynamic 'listener' so i don't have any fixed exchange created.

Thanks,

Duarte Madueรฑo

events.js:71
throw arguments[1]; // Unhandled 'error' event
^
Error: PRECONDITION_FAILED - cannot redeclare exchange 'testNODE' in vhost '/' w
ith different type, durable, internal or autodelete value
at Exchange._onMethod - amqp.js:2091:15
at Exchange.Channel._onChannelMethod amqp.js:1533:14
at Connection._onMethod - amqp.js:1056:28
at AMQPParser.self.addListener.parser.onMethod - amqp.js:895:12
at AMQPParser._parseMethodFrame - amqp.js:449:10
at frameEnd - amqp.js:190:16
at frame - amqp.js:175:14
at AMQPParser.header [as parse] - amqp.js:162:14
at AMQPParser.execute - amqp.js:234:21
at Connection. - amqp.js:932:12

Performance of piping 2 REQ/REP

I'm new to messaging system and I'm checking is rabbit.js can be used in the following case:

server1 (expressjs) receives a http request
to compute that request, server1 needs to send a request to server2 (on a REQ socket)
server2 receives the message and forwards it to a worker on a new REQ socket
the worker replies to server2 on it's REP counterpart
server2 replies to server1 of the request on it's REP counterpart
server1 send a http response to the http client

I know this approach seems basic, but is rabbit and 2 pairs of REQ/REP the good way to go ?

Durable subscriptions

A feature that has come up is durable subscriptions (e.g., in issue 30). The idea is to avoid losing messages, a SUB socket can connect to a persistent queue (rather than a throwaway queue).

This requires a non-autodelete, non-exclusive queue, so it doesn't go away when the connection does; durable too, because why not. Probably it'd also want a queue TTL so it doesn't leak resources if you don't reconnect. And an exclusive consume so to reduce the chance of accidentally having competing consumers.

How to name the queue? Could just punt this to the application (though will probably want to append a hash of the subscription details).

Using rabbit.js with node cluster

More of a question than a issue, but I've been racking my brain over this
for the past day so I figured I'd ask here.

I'm trying to get rabbit.js working with node cluster. I have a master and a worker. The worker creates a connection to RabbitMQ using rabbit.js. When new code is pushed live, the worker runs worker.disconnect() and deals with any remaining connections, then exits. When the worker is disconnected, the master spawns a new worker, which accepts the new connections.

The problem is that when I disconnect the worker, it will never actually exit, because it still has a connection to RabbitMQ. However, if I immediately close the RabbitMQ connection, I risk having messages not getting sent to RabbitMQ from callbacks that are still on the event loop and still connected to the disconnected worker.

Is there a better way of doing this?

DEV REQUEST: add durability option for all socket types in V.03

Currently PUB/SUB sockets are not durable. This is a major issue, and I've seen other users requesting this. All socket types should have the durability option and let the application decide. By default it should be on. One of the main reasons for using RabbitMQ vs ZeroMQ is the persistence feature, high-availabiltiy, reliability, etc. in a production environment.

Suggested API:

var sub = context.socket('SUB');
sub.connect({address: 'users',
pattern: 'mikeb',
routing: 'direct',
{durable: true, autoDelete: false, exclusive: false } });

Also it would be nice to have a matching unsubscribe function to the subscribe one. Currently cancel does all the unsubscriptions under the hood.

proto._unsubscribe = function(q, callback)

example/sockjs.js doesn't work

I am not sure if I am not reading the doc carefully, but I couldn't get the example/sockjs.js to work.

2011-12-06 18:11:48 UTC [hayashis@soichi2:~/dev/eventbus]$ NODE_PATH=lib node example/sockjs.js
The "sys" module is now called "util". It should have a similar interface.
SockJS v0.1.2 bound to "[/]socks"
This type of response MUST NOT have a body. Ignoring write() calls.
This type of response MUST NOT have a body. Ignoring write() calls.

I am trying to build a simple node server where it can relay events published on RabbitMQ to client browsers through a comet connection made to the node server. I am not sure where I should begin. Is there a good sample code for this?

Thanks!
Soichi

First example doesn't work !

My code :
var context = require('rabbit.js').createContext();
var pub = context.socket('PUB'), sub = context.socket('SUB');
sub.pipe(process.stdout);
sub.connect('events');

pub.connect('events');
pub.write(JSON.stringify({welcome: 'rabbit.js'}), 'utf8');

The error:
TypeError: Cannot set property '1' of undefined
at Connection.queue (/node_modules/rabbit.js/node_modules/amqp/amqp.js:1190:26)
at createQueueBindAndConsume (/node_modules/rabbit.js/lib/sockets.js:268:31)
at Stream._consume (/node_modules/rabbit.js/lib/sockets.js:302:30)
at Stream.connect (/node_modules/rabbit.js/lib/sockets.js:361:8)
at Object. (/home/workspace/rabbit.js:1:581)
at Module._compile (module.js:441:26)
at Object..js (module.js:459:10)
at Module.load (module.js:348:31)
at Function._load (module.js:308:12)
at Array.0 (module.js:479:10)

examples not working

Hi, I'm taking a look at your lib to implement a push/pull scenario. It looks like the samples are currently not working, making it tough to evaluate. Any chance they can be fixed?

Thanks,

Matt

upgrade to readable-stream breaks 0.8.x

The readable-stream module used to define a global.setImmediate (if it didn't exist).

The new version does not do this, so line 11 of sockets.js is breaking on node 0.8.x:

var delay = global.setImmediate || nextTick;

(Shouldn't nextTick just be process.nextTick?)

DEV REQUEST: list of topics ID for SUB socket.

Not sure if this is currently allowed or possible, but it would be very useful if subscribing to different topics on the same exchange.

Current:
var sub = context.socket('SUB');
sub.connect({address: 'users',
pattern: 'A',
routing: 'direct'});
sub.connect({address: 'users',
pattern: 'B',
routing: 'direct'});
sub.connect({address: 'users',
pattern: 'C',
routing: 'direct'});

Requested:
sub.connect({address: 'users',
pattern: { 'A', 'B', 'C'},
routing: 'direct'});

Duplicate messages received by SUB

I wrote the following test while trying to understand the rabbit.js implementation and Rabbit MQ, running on Ubuntu 12.10 desktop

var rabbitJS = require('rabbit.js');
var context = rabbitJS.createContext('amqp://localhost');
context.on('ready', function() {
  var sub1 = context.socket('SUB');
  sub1.setEncoding('utf8');
  sub1.on('data', function(data) {
    console.log('subscriber 1');
    console.log(data);
  });
  sub1.connect('events', function() {
    var sub2 = context.socket('SUB');
    sub2.setEncoding('utf8');
    sub2.on('data', function(data) {
      console.log('subscriber 2');
      console.log(data);
    });
    sub2.connect('events', function() {
      var pub = context.socket('PUB');
      pub.connect('events', function() {
        console.log('publish');
        pub.write(JSON.stringify({welcome: 'rabbit.js', source: 'pub'}), 'utf8');
      });
    });
  });
});

I expected to get the message printed to the console twice, once from each subscriber. What I actually got was this:

publish
subscriber 1
{"welcome":"rabbit.js","source":"pub"}
subscriber 2
{"welcome":"rabbit.js","source":"pub"}
subscriber 1
{"welcome":"rabbit.js","source":"pub"}
subscriber 2
{"welcome":"rabbit.js","source":"pub"}

As you can see each subscriber got the message twice.

I then started experimenting and found that the following code works correctly:

var rabbitJS = require('rabbit.js');
var context = rabbitJS.createContext('amqp://localhost');
context.on('ready', function() {
  var pub = context.socket('PUB');
  pub.connect('events', function() {
    var sub1 = context.socket('SUB');
    sub1.setEncoding('utf8');
    sub1.on('data', function(data) {
      console.log('subscriber 1');
      console.log(data);
    });
    sub1.connect('events', function() {
      var sub2 = context.socket('SUB');
      sub2.setEncoding('utf8');
      sub2.on('data', function(data) {
        console.log('subscriber 2');
        console.log(data);
      });
      sub2.connect('events', function() {
        console.log('publish');
        pub.write(JSON.stringify({welcome: 'rabbit.js', source: 'pub'}), 'utf8');
      });
    });
  });
});

which gives the following output:

publish
subscriber 1
{"welcome":"rabbit.js","source":"pub"}
subscriber 2
{"welcome":"rabbit.js","source":"pub"}

The only difference is that in the second code snippet I connect the PUB socket first.

Am I misunderstanding something about how this works, because this looks like a bug to me?

callback method for completion of `write` or `publish`

I'd love to see a callback method available on completion of a write or publish method call.

At the moment, I'm doing this:

  var triggerSocket = this.ctx.socket("PUSH");

  console.log("Triggering");

  triggerSocket.connect(jobDest, function(){

    // make the call
    triggerSocket.write(JSON.stringify(trigger));

    // wait til the next tick before closing
    setImmediate(function(){
      triggerSocket.close();
      done();
    });
  });

it works well enough right now... but this makes me really really really really really nervous. i don't want to end up in a situation where this is somehow closing before the message completes... and that seems very likely.

I would prefer something with a callback:

    triggerSocket.write(JSON.stringify(trigger), function(){
      triggerSocket.close();
      done();
    });

helper methods instead of magic strings?

Just started using rabbit.js today, after spending a week trying to build my own bus. This is pretty awesome. :)

One thing I noticed, though, is there are a lot of magic strings in creating a socket type. Could we get a simple helper method for those?

For example:

var pub = context.socket("PUB", {
  routing: "topic"
});

Would be mildly better (reduce magic strings and provide semantically meaningful method name) as:

var pub = context.getPublisher({
  routing: "topic"
});

Or something similarly named. I'm just not a fan of magic strings, is all.

Doesn't work with node-v0.6.3

I just downloaded the latest version of node.js (0.6.3) and when I ran the code from rabbit git, I encountered following error message.

2011-12-01 20:12:00 UTC [root@soichi2:/home/hayashis/app/rabbitjs]# node example/socketserver.js

node.js:201
throw e; // process.nextTick error, or 'error' event on first tick
^
Error: require.paths is removed. Use node_modules folders, or the NODE_PATH environment variable instead.
at Function. (module.js:376:11)
at Object. (/usr/local/hayashis/app/rabbitjs/example/socketserver.js:1:70)
at Module._compile (module.js:432:26)
at Object..js (module.js:450:10)
at Module.load (module.js:351:31)
at Function._load (module.js:310:12)
at Array.0 (module.js:470:10)
at EventEmitter._tickCallback (node.js:192:40)

Invalid encoding type freezes the whole app

If you write:

sub.setEncoding('wrong');

instead of something like 'utf8', there are no messages about invalid encoding types - the app just mysteriously freezes up entirely.

I would expect an exception to be raised, with something like 'Unsupported Encoding Type: 'wrong''

Server is running, but doesn't work and no response OR errors.

Guys, I've got this weird problem on my production server. On localhost everything worked find.
I found out that my app didn't work because my messages didn't get around. To test, I set up these two bare bones files to test: listen.js and send.js

  1. send.js - sends random messages on channel foobar

`
var context = require('rabbit.js').createContext('amqp://localhost');
var abc = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'.split('');

var msg = function() {
var str = '';
var random = function(list) { return list[(Math.floor(Math.random() * list.length))]; };

for(var i = 0; i < 10; i++) {
    str += random(abc); 
}

return str; 

};

context.on('ready', function() {
var pub = context.socket('PUB');

pub.connect('foobar', function() {
    function write() {
        var m = msg();
        console.log("sending: " + m);
        pub.write(JSON.stringify({data: m}), 'utf8');

        setTimeout(write, 300);
    }

    write();    
});

});
`

  1. listen.js - listens to the channels
    `
    var context = require('rabbit.js').createContext('amqp://localhost');
    context.on('ready', function() {
    var sub = context.socket('SUB');

    sub.connect('foobar');
    sub.setEncoding('utf8');

    sub.on('data', function(data) {
    console.log("Received Message:");
    console.log(data + "\n");
    });
    });
    `

for some reason, listen.js just doesn't work :(

Rabbit MQ server is running. Status output:

Status of node rabbit@vps20841 ... [{pid,28561}, {running_applications,[{rabbit,"RabbitMQ","2.8.2"}, {os_mon,"CPO CXC 138 46","2.2.5"}, {sasl,"SASL CXC 138 11","2.1.9.3"}, {mnesia,"MNESIA CXC 138 12","4.4.17"}, {stdlib,"ERTS CXC 138 10","1.17.3"}, {kernel,"ERTS CXC 138 10","2.14.3"}]}, {os,{unix,linux}}, {erlang_version,"Erlang R14B02 (erts-5.8.3) [source] [64-bit] [rq:1] [async-threads:30] [kernel-poll:true]\n"}, {memory,[{total,209418432}, {processes,185275792}, {processes_used,185252272}, {system,24142640}, {atom,1136665}, {atom_used,1134002}, {binary,288656}, {code,11322346}, {ets,5200680}]}, {vm_memory_high_watermark,0.3999999988108007}, {vm_memory_limit,201816473}, {disk_free_limit,504541184}, {disk_free,5005692928}, {file_descriptors,[{total_limit,924}, {total_used,69}, {sockets_limit,829}, {sockets_used,67}]}, {processes,[{limit,1048576},{used,40090}]}, {run_queue,0}, {uptime,640754}] ...done.

What should I do? Thanks!

Multiple asynchronous writes to a socket fails

/**
 * Message Queue (RabbitMQ)
 */

global.mqContext = require('rabbit.js').createContext('amqp://localhost');
global.mqSockets = { // Cache for all the sockets
    'backend': 'REQ'
};

global.mqContext.on('ready', function(){
    for (var key in global.mqSockets)
    {
        if (global.mqSockets.hasOwnProperty(key));
        {
            if (['REQ', 'SUB'].indexOf(global.mqSockets[key]) !== -1)
            {
                var mqsock = global.mqContext.socket(global.mqSockets[key]);
                mqsock.connect(key, function(){
                    global.mqSockets[key] = mqsock;
                });
            }
        }
    }
})

global.MQ = function(queue){
    var mqsock = global.mqSockets[queue];

    function rtnFun(method, info, cb){
        if (typeof info === 'function')
        {
            var cb = info;
            var info = {};
        }

        mqsock.once('error', function(err){
            cb(err);
        })

        mqsock.once('data', function(data){
            var json = JSON.parse(data);

            if (json.error)
                return cb(json.error);

            cb(null, json);
        });

        var payload = JSON.stringify({
                method: method,
                info: info
            });

        mqsock.write(payload, 'utf8');

        return rtnFun;
    };

    return rtnFun;
}

Above is my code to construct a global MQ function that allows me to first get the exchange/queue/socket for use, then write to it. This is how I use the function:

var mqBackend = MQ('backend');

mqBackend('method.string', {info:'object'}, callback);

Everything works as intended. However, when I try to make multiple calls asynchronously, like so:

mqBackend('method1', {...}, method1Cb);
mqBackend('method2', {...}, method2Cb);

What happens is that the response for method1 is returned twice for both method1Cb and method2Cb.

Is this an issue with rabbit.js, or an issue with how my code is written? If it is a problem with how my code is written, what am I doing wrong and how would you propose I fix it?

Run demo failed

Hi,
we are using rabbit.js to transfer data from client to rabbitMQ, but I got a serious problem.
I test my demo with 1000 TCP connections by sending 2000 bytes text packet , then the demo code will crash.
But it works when I send less than 100 bytes.
Could you help me find out what the problem go with my demo?
Thank you very much!

Here is the code and error information.

DEMO CODE:
var net = require('net');
var context = require('/usr/local/lib/node_modules/rabbit.js').createContext('amqp://xxx.xxx.x.xxx:xxxx');
var server = net.createServer(function(connection)
{
connection.setEncoding("utf8");
var push = context.socket('PUSH');
push.connect('chat', function(){
connection.on('data', function(msg){
console.log(msg);
push.write(msg);
});
});
connection.on('close', function(){
push.destroy();
console.log('close');
});
connection.on('disconnect', function(){
push.destroy();
console.log('disconnect');
});
});
server.listen(8080, function(){
console.log("Listening for pub connections on 8080");
});

ERROR:
Unhandled connection error: UNEXPECTED_FRAME - expected method frame, got non method frame instead

node.js:201
throw e; // process.nextTick error, or 'error' event on first tick
^
Error: UNEXPECTED_FRAME - expected method frame, got non method frame instead
at Connection._onMethod (/usr/local/lib/node_modules/rabbit.js/node_modules/amqp/amqp.js:976:15)
at AMQPParser.onMethod (/usr/local/lib/node_modules/rabbit.js/node_modules/amqp/amqp.js:792:12)
at AMQPParser._parseMethodFrame (/usr/local/lib/node_modules/rabbit.js/node_modules/amqp/amqp.js:441:10)
at frameEnd (/usr/local/lib/node_modules/rabbit.js/node_modules/amqp/amqp.js:186:16)
at frame (/usr/local/lib/node_modules/rabbit.js/node_modules/amqp/amqp.js:171:14)
at AMQPParser.header as parse
at AMQPParser.execute (/usr/local/lib/node_modules/rabbit.js/node_modules/amqp/amqp.js:230:21)
at Connection. (/usr/local/lib/node_modules/rabbit.js/node_modules/amqp/amqp.js:832:12)
at Connection.emit (events.js:67:17)
at TCP.onread (net.js:347:14)

'routing' option is not implemented as it is documented.

https://github.com/squaremo/rabbit.js/blob/master/lib/sockets.js#L151

As you can see there is no 'routing' case in the switch statement. Therefore, you cannot do:

sock.setsokopt('routing', 'topic');

Which means you cannot even use topics and topic patterns.

Here's an example:

producer.js

var context = require('rabbit.js').createContext('amqp://localhost');

context.on('error', function(err){
    console.log(err);
})

var pub = context.socket('PUB');

pub.setsockopt('routing', 'topic');
pub.connect('foo');

setInterval(function(){
    pub.publish('one', "Message from topic 'one' producer!\n");
    pub.publish('two', "Message from topic 'two' producer!\n");
}, 1000)

consumer.js

var context = require('rabbit.js').createContext('amqp://localhost');

context.on('error', function(err){
    console.log(err);
})

var sub = context.socket('SUB');

sub.setsockopt('routing', 'topic');
sub.setsockopt('topic', 'two');
sub.connect('foo');

sub.pipe(process.stdout);

consumer.js' output log looks like this:

Message from topic 'one' producer!
Message from topic 'two' producer!
Message from topic 'one' producer!
Message from topic 'two' producer!
Message from topic 'one' producer!
Message from topic 'two' producer!
Message from topic 'one' producer!
Message from topic 'two' producer!
Message from topic 'one' producer!
Message from topic 'two' producer!

When there should only be messages from topic 'two'.

rg:declaring queues

hi

im using rabit.js....
in that how to declare a queue,publishing messages to queues

thanks in advance

'npm install bundle' in documentation

Your documentation specifies the installation of bundles as 'npm bundle install ...', but it seems you meant 'npm install bundle ...'. In any case, thanks for rabbit.js! I can't wait to get to using it.

specifying custom options/headers when sending messages

it would be nice, if the rabbit.js API provides a way to specify custom AMQP headers when sending messages, i.e. pass custom options to methods like sendToQueue().
Something like passing an additional message options parameter to write/publish.

don't swallow errors

Errors are often swallowed into oblivion, instead of bubbling their way up.

For example:

var rabbit = require("rabbit.js")
var rabbitCtx = rabbit.createContext(rabbitConnectionString);

rabbitCtx.on("error", function(err){
  console.log("AN ERROR!!!!!");
  console.log(err);
});

rabbitCtx.on("ready", function(){
  throw new Error("some test error message");
});

This code produces nothing on the console. The error is never logged through on("error") handler.

There are other cases where throwing an error from within an error handler will swallow the error. In these cases, I have to put a setImmediate call in place.

For example (a very poor example... but illustrates the point):

rabbitCtx.on("error", function(err){
  throw err;
});

this produces nothing... i would expect it to error out the app at this point. Instead, I have to do this:

rabbitCtx.on("error", function(err){
  setImmediate(function(){
    throw err;
  });
});

Then it will bubble up to the global error handler, as expected.

Should the setImmediate (or whatever equivalent) done inside of the Context object and other objects (Sockets, etc) so that I don't have to wrap my own error handler code with a setImmediate?

New version

Hello! And thanks for the great job, rabbit.js is really cool!
I've seen some interesting and very useful changes in the master branch and I'm wondering when they will be released, are you close to a new release? Or otherwise, would you say it's safe to point npm to the master branch directly in the meantime?
We are using rabbit.js for a big real world project so it would be great to know.
Thanks!

Socket Option 'expiration' and README

I have been having trouble with the socket option 'expiration' when going from the README. It kept giving me this error: Error: Unmatched field {"name":"expiration","domain":"shortstr"}
After some searching, I found that the test cases had it formatted differently. While the README says:
pub.setsockopt('expiration', 60000)
the tests had:
push.setsockopt('expiration', '10')
the difference being that in the tests the amount of time before expiration was a string instead of an int. After changing that in my code, the error stopped appearing. Is the error supposed to occur? Is setsockopt supposed to only take strings?

Can you provide an example of simple Node.JS consumer

Can you provide an example of simple consumer in Node.JS that consumes a message from a queue and replies with confirmation to the replyTo queue specified in the message headers, or is this mainly intended for browsers?

pseudo code for consume:

receive message from queue
do something
print "got message"
acknowledge message
if message.replyToQueue
exchange.publish(message.replyToQueue, { status: 'yeah-ok'. }

hybrid sockets for topic routing and acknowledged messages?

I find myself wanting a hybrid between push/pub and worker/sub, so that i can do topic workers (topic based routing with acknowledged messages, optionally persistent/durable).

any chance the push socket can be updated to use an exchange with routing type option, instead of pushing directly to a queue? the worker would also to allow topic subscriptions, too. or would you prefer to see new socket types for these?

i might be able to take a shot at modifying or building these, in the next week or two... been looking for a way to contribute more than just issue tickets :)

rg:rabbit.js

hi

when i am running node server.js getting following errors

SockJS v0.3.0 bound to "[/]socks"

node.js:201
throw e; // process.nextTick error, or 'error' event on first tick
^
Error: connect ECONNREFUSED
at errnoException (net.js:670:11)
at Object.afterConnect as oncomplete
can anyone tell whats the problem?
thanks in advance

Worker abort not supported with #ack

Currently, the api for Worker#ack() does not support passing in a delivery tag from a specific message, requiring the application to maintain support for ordering. This is ok, and you described your rationale well, here: bc2bef1

The problem exists if the application code wants to abort processing of a single message and not acknowledge that message, but continue processing the queue. In the ack code, the following line is used, which requires acknowledgements to always be made in order by application code

var msg = this.unacked.shift();

There should be a public API method, such as #noAck(), that allows the application to skip a message and continue processing. #noAck() method would simply call this.unacked.shift().

REQ/REP Socket - Reply queue utilisation problem

Hi guys! I have a weird problem. I'm trying to write simple test for REQ/REP sockets.

Here is client.js:

var rabbit = require('rabbit.js');

var context = rabbit.createContext();

context.on("ready", function() {
    for (var i = 0; i < 10; i++) {
        (function(i) {
            var request = context.socket("REQ");

            request.on("data", function(message) {
                console.log('%s-st Handler received message - %j', i, JSON.parse(message));
                request.close();
            });

            request.connect("my_app.user_profile.authenticate_user", function() {
                var data = { username: 'Alex-' + i};

                console.log('Queued message: %j', data);

                var message = JSON.stringify(data);
                request.write(message, "utf8");
            });

        })(i);
    }
});

Here is server.js:

var rabbit = require('rabbit.js');

var context = rabbit.createContext();

context.on("ready", function() {
    var reply = context.socket("REP");
    reply.connect("my_app.user_profile.authenticate_user", function() {
        reply.on("data", function(inMessage) {
            var inData = JSON.parse(inMessage);
            var outData = {
                success: true,
                username: inData.username
            };

            console.log('Processed message: %j', outData);

            var outMessage = JSON.stringify(outData);
            reply.write(outMessage, "utf8");
        });

    });
});

When I start client.js. I see that 10 messages are queued into my_app.user_profile.authenticate_user durable queue. Then 10 temporal exclusive, autodelete=true queues (for replies) are created. It is nice.

Then I'm starting server.js. I see that server.js processes all 10 queued messages. Then on client.js I see replies from server.js. Very nice. But here is a problem. I see that 10 temporal queues are still alive. As I understand they will disappear with socket (REQ) shutdown. That is why I'm trying to correct client.js code to shutdown REQ socket, after it receives reply message from server.js. I found two methods on Socket: Socket#close() and Socket#end(). I have inserted calls of this methods here:

client.js:

request.on("data", function(message) {
    console.log('%s-st Handler received message - %j', i, JSON.parse(message));
    request.end(); // or request.close();
});

In case of using request.close() or request.end() I'm getting this:

$ node client.test.js
Queued message: {"username":"Alex-0"}
Queued message: {"username":"Alex-3"}
Queued message: {"username":"Alex-1"}
Queued message: {"username":"Alex-4"}
Queued message: {"username":"Alex-2"}
Queued message: {"username":"Alex-5"}
Queued message: {"username":"Alex-9"}
Queued message: {"username":"Alex-8"}
Queued message: {"username":"Alex-7"}
Queued message: {"username":"Alex-6"}
0-st Handler received message - {"success":true,"username":"Alex-0"}

events.js:72
        throw er; // Unhandled 'error' event
              ^
IllegalOperationError: Channel closing
    at Channel.<anonymous> (/Sources/test/node_modules/rabbit.js/node_modules/amqplib/lib/channel.js:143:11)
    at Channel.C.closeBecause (/Sources/test/node_modules/rabbit.js/node_modules/amqplib/lib/channel.js:195:8)
    at Channel.C.closeWithError (/Sources/test/node_modules/rabbit.js/node_modules/amqplib/lib/channel.js:209:8)
    at Channel.C.acceptMessageFrame (/Sources/test/node_modules/rabbit.js/node_modules/amqplib/lib/channel.js:232:12)
    at Channel.C.accept (/Sources/test/node_modules/rabbit.js/node_modules/amqplib/lib/channel.js:377:17)
    at Connection.mainAccept [as accept] (/Sources/test/node_modules/rabbit.js/node_modules/amqplib/lib/connection.js:63:33)
    at Socket.go (/Sources/test/node_modules/rabbit.js/node_modules/amqplib/lib/connection.js:448:48)
    at Socket.EventEmitter.emit (events.js:92:17)
    at emitReadable_ (_stream_readable.js:408:10)

If I wrap up call of request.close() into setTimeout() function like this:

request.on("data", function(message) {
    console.log('%s-st Handler received message - %j', i, JSON.parse(message));
    setTimeout(function() {
        request.close();
    }, 5000);
});

Temporal queues are closed successfully without exceptions. But using setTimeout() seems not very good solution. So where it is correct to call Socket#close(), if I want correctly free resources after I have received message from server? Why does it throw exception if I call it in "on data" event handler?

Is there something I'm doing wrong, or here is some other problem? Could you, please, explain me, what is wrong here?

AMQP and the Stream API

I wanted to take a moment to clarify your comment in sockets.js:

AMQP and the stream API don't really work well together here. I'm supposed to initiate reads when this method is called, then not push any more once I get false back from #push; but how do I do that with AMQP? (I guess I could use channel.flow, but that seems rather overwrought; or, I could use prefetch and start/stop acknowledging messages).

The _read method is advisory at best. It only applies to Streams which are pull-style. For streams like network streams, your _read implementation is usually a noop, and you simply push when you get data. You would use _read for lazily loading a file or something similar, and the bytes parameter would indicate how many more bytes to read from the file.

The docs have been fairly recently updated to reflect this:

The size argument is advisory. Implementations where a "read" is a single call that returns data can use this to know how much data to fetch. Implementations where that is not relevant, such as TCP or TLS, may ignore this argument, and simply provide data whenever it becomes available. There is no need, for example to "wait" until size bytes are available before calling stream.push(chunk).

This was a weird sticking point for me for a while when Streams2 came out. Maybe you've figured this out by now, but I wanted to put your mind at rest.

new messaging pattern

Can you add a new messaging pattern (pubSave/subSave) ? So if a consumer is temporary down the messages are not lost...

something like this:

var amqp = require('amqp');
var sys = require('sys');
var EventEmitter = require('events').EventEmitter;

var debug = (process.env['DEBUG']) ?
    function(msg) { sys.debug(msg) } : function() {};

function pubSaveSocket(connection, client, exchangeName) {
    sys.log('pubSave socket opened');
    function publishTo(exchange) {
        client.on('message', function(msg) {
            exchange.publish('', msg);
        });
    }
    (exchangeName == '') ?
        connection.exchange('amq.fanout', {'passive': true}, publishTo) :
        connection.exchange(exchangeName, {'type': 'direct', 'durable': true}, publishTo);
}

function subSaveSocket(connection, client, exchangeName, queueName) {
    sys.log('subSave socket opened');
    function consume(exchange) {
        queue = connection.queue(queueName, {durable:true, 'autoDelete': false}, function() {
            queue.subscribe(function(message) {
                debug('sub:'); debug(message);
                client.send(message.data.toString());
            });
            queue.bind(exchange.name, '');
        });
    };
    (exchangeName == '') ?
        connection.exchange('amq.fanout', {'passive': true}, consume) :
        connection.exchange(exchangeName, {'type': 'direct', 'durable': true}, consume);
}

function pubSocket(connection, client, exchangeName) {
    sys.log('pub socket opened');
    function publishTo(exchange) {
        client.on('message', function(msg) {
            exchange.publish('', msg);
        });
    }
    (exchangeName == '') ?
        connection.exchange('amq.fanout', {'passive': true}, publishTo) :
        connection.exchange(exchangeName, {'type': 'fanout'}, publishTo);
}

function subSocket(connection, client, exchangeName) {
    sys.log('sub socket opened');
    function consume(exchange) {
        queue = connection.queue('', {durable:false}, function() {
            queue.subscribe(function(message) {
                debug('sub:'); debug(message);
                client.send(message.data.toString());
            });
            queue.bind(exchange.name, '');
        });
    };
    (exchangeName == '') ?
        connection.exchange('amq.fanout', {'passive': true}, consume) :
        connection.exchange(exchangeName, {'type': 'fanout'}, consume);
}

function pushSocket(connection, client, queueName) {
    sys.log('push socket opened');
    if (queueName == '') {
        client.send("Must send address for push");
        client.end();
        return;
    }
    var queue = connection.queue(queueName, {'autoDelete': false,
                                             'durable': true,
                                             'exclusive': false});
    client.on('message', function(msg) {
        debug('push:'); debug(msg);
        connection.publish(queueName, msg);
    });
}

function pullSocket(connection, client, queueName) {
    sys.debug('pull socket opened');
    if (queueName == '') {
        client.send("Must send address for pull");
        client.end();
        return;
    }
    var queue = connection.queue(
        queueName,
        {'autoDelete': false, 'durable': true, 'exclusive': false},
        function() {
            queue.subscribe(function(message) {
                debug('pull:'); debug(message);
                client.send(message.data.toString());
            });
        });
}

function reqSocket(connection, client, queueName) {
    sys.log("req socket opened");
    if (queueName == '') {
        client.send("Must send address for req");
        client.end();
        return;
    }
    connection.queue('',
        {'exclusive': true, 'autoDelete': true, 'durable': false},
        function(replyQueue) {
            replyQueue.subscribe(function(message) {
                debug('reply:'); debug(message);
                client.send(message.data.toString());
            });
            connection.queue(
                queueName, {'durable': true, 'autoDelete': false},
                function(queue) {
                    client.on('message', function(message) {
                        debug('request:'); debug(message);
                        connection.publish(queueName, message,
                                           {'replyTo': replyQueue.name});
                    });
                });
        });
}

function repSocket(connection, client, queueName) {
    sys.log("rep socket opened");
    if (queueName == '') {
        client.send("Must send address for req");
        client.end();
        return;
    }
    connection.queue(
        queueName, {'durable': true, 'autoDelete': false},
        function(queue) {
            var replyTo = '';
            client.on('message', function (message) {
                debug('reply to: ' + replyTo); debug(message);
                connection.publish(replyTo, message);
            });
            queue.subscribe(function(message, _headers, properties) {
                replyTo = properties['replyTo'];
                debug('request:'); debug(message);
                client.send(message.data.toString());
            });
        });
}

function Pipe() {
    var fore = this.fore = new EventEmitter(); // client --> server
    var aft = this.aft = new EventEmitter();   // server --> client
    aft.send = function (msg) {
        debug('aft send:'); debug(msg);
        fore.emit('message', msg);
    };
    fore.send = function (msg) {
        debug('fore send:'); debug(msg);
        aft.emit('message', msg);
    };
    aft.end = function() {
        aft.emit('close');
    }
    fore.end = function() {
        fore.emit('close');
    }
    fore.on('close', function () {
        debug('fore close');
    });
    aft.on('close', function () {
        debug('aft close');
    });
}

function PipeServer() {
    EventEmitter.call(this);
}

(function(S) {
    var P = S.prototype = new EventEmitter();

    P.connect = function() {
        var p = new Pipe();
        this.emit('connection', p.aft);
        return p.fore;
    }

})(PipeServer);

exports.Server = PipeServer;
exports.Pipe = Pipe;

function listen(server, options /* , callback */) {
    var url = options && options.url || 'amqp://localhost';
    var allowed = options && options.allowed;

    var connection = amqp.createConnection({'url': url});
    var callback = (arguments.length > 2) ? arguments[2] : null;
    connection.on('ready', function () {
        server.on('connection', function (client) {
            function dispatch(msg) {
                client.removeListener('message', dispatch);
                msg = msg.toString();
                var i = msg.indexOf(' ');
                var type = (i > -1) ? msg.substring(0, i) : msg;
                var addr = (i > -1) ? msg.substr(i+1) : '';
                if (check_rendezvous(type, addr, allowed)) {
                    switch (type) {
                    case 'pubSave':
                        pubSaveSocket(connection, client, addr);
                        break;
                    case 'subSave':
                        var queueName = options.queueName != null ? options.queueName : '';
                        subSaveSocket(connection, client, addr, queueName);
                        break;
                    case 'pub':
                        pubSocket(connection, client, addr);
                        break;
                    case 'sub':
                        subSocket(connection, client, addr);
                        break;
                    case 'push':
                        pushSocket(connection, client, addr);
                        break;
                    case 'pull':
                        pullSocket(connection, client, addr);
                        break;
                    case 'req':
                        reqSocket(connection, client, addr);
                        break;
                    case 'rep':
                        repSocket(connection, client, addr);
                        break;
                    default:
                        client.send("Unknown socket type");
                        client.end();
                        sys.log("Unknown socket type in: " + msg);
                    }
                }
                else {
                    client.send("Unauthorised rendezvous");
                    client.end();
                    sys.log("Access denied: " + type + " to " + addr);
                }
            }
            client.on('message', dispatch);
        });
        if (callback) callback();
    });
}

function check_rendezvous(type, addr, allowed) {
    if (!allowed) return true; // no explicit list = everything goes
    var socks = allowed[addr];
    return socks && socks.indexOf(type) > -1
}

exports.listen = listen;

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.