Giter Club home page Giter Club logo

node-amqp10's Introduction

amqp10

travis npm coverage npm gitter

NOTE: This repository is no longer being maintained. Please use rhea which is a fully compliant amqp 1.0 library that is actively maintained.

amqp10 is a promise-based, AMQP 1.0 compliant node.js client

Contents

Usage

The basic usage is to require the module, new up a client with the appropriate policy for the server you're connecting against, connect, and then send/receive as necessary. So a simple example for a local Apache Qpid server would look like:

var AMQPClient = require('amqp10').Client,
    Promise = require('bluebird');

var client = new AMQPClient(); // Uses PolicyBase default policy
client.connect('amqp://localhost')
  .then(function() {
    return Promise.all([
      client.createReceiver('amq.topic'),
      client.createSender('amq.topic')
    ]);
  })
  .spread(function(receiver, sender) {
    receiver.on('errorReceived', function(err) { /* Check for errors */ });
    receiver.on('message', function(message) {
      console.log('Rx message: ', message.body);
    });

    return sender.send({ key: "Value" });
  })
  .error(function(err) {
    console.log("error: ", err);
  });

By default send promises are resolved when a disposition frame is received from the remote link for the sent message, at this point the message is considered "settled". To tune this behavior, you can tweak the policy you give to AMQPClient on construction. For instance, to force send promises to be resolved immediately on successful sending of the payload, you would build AMQPClient like so:

var AMQPClient = require('amqp10').Client,
    Policy = require('amqp10').Policy;
var client = new AMQPClient(Policy.merge({
  senderLinkPolicy: {
    callbackPolicy: Policy.Utils.SenderCallbackPolicies.OnSent
  }
}, Policy.DefaultPolicy));

In addition to the above, you can also tune how message link credit is doled out (for throttling), as well as most other AMQP behaviors, all through policy overrides. See DefaultPolicy and the policy utilities for more details on altering various behaviors.

Flow Control and Message Dispositions

Flow control in AMQP occurs at both the Session and Link layers. Using our default policy, we start out with some sensible Session windows and Link credits, and renew those every time they get to the half-way point. In addition, receiver links start in "auto-settle" mode, which means that the sender side can consider the message "settled" as soon as it's sent. However, all of those settings are easily tune-able through Policy overrides (Policy.merge(<overrides>, <base policy>)).

For instance. we've provided a convenience helper for throttling your receiver links to only renew credits on messages they've "settled". To use this with Azure ServiceBus Queues for instance, it would look like:

var AMQPClient = require('amqp10').Client,
    Policy = require('amqp10').Policy;
var client = new AMQPClient(Policy.Utils.RenewOnSettle(1, 1, Policy.ServiceBusQueue));

Where the first number is the initial credit, and the second is the threshold - once remaining credit goes below that, we will give out more credit by the number of messages we've settled. In this case we're setting up the client for one-by-one message processing. Behind the scenes, this does the following:

  1. Sets the Link's creditQuantum to the first number (1), which you can do for yourself via the Policy mix-in { receiverLink: { creditQuantum: 1 } }

  2. Sets the Link to not auto-settle messages at the sender, which you can do for yourself via { receiverLink: { attach: { receiverSettleMode: 1 } } } Where did that magic "1" come from? Well, that's the value from the spec, but you could use the constant we've defined at require('amqp10').Constants.receiverSettleMode.settleOnDisposition

  3. Sets the Link's credit renewal policy to a custom method that renews only when the link credit is below the threshold and we've settled some messages. You can do this yourself by using your own custom method:

{
  receiverLink: {
    credit: function (link, options) {
      // If the receiver link was just connected, set the initial link credit to the quantum. Otherwise, give more credit for every message we've settled.
      var creditQuantum = (!!options && options.initial) ? link.policy.creditQuantum : link.settledMessagesSinceLastCredit;
      if (creditQuantum > 0 && link.linkCredit < threshold) {
        link.addCredits(creditQuantum);
      }
    }
  }
}

Note that once you've set the policy to not auto-settle messages, you'll need to settle them yourself. We've tried to make that easy by providing methods on the receiver link for each of the possible "disposition states" that AMQP allows:

  • link.accept(message) will tell the sender that you've accepted and processed the message.
  • link.reject(message, [error]) will reject the message with the given error (if provided). The sender is free to re-deliver, so this can be used to indicate transient errors.
  • link.modify(message, [options]) will tell the sender to modify the message and re-deliver. You can tell it you can't accept the message by using link.modify(message, { undeliverableHere: true })
  • link.release(message) will tell the sender that you haven't processed the message and it's free to re-deliver - even back to you.

All of these methods accept an array of messages, allowing you to settle many at once.

Plugins

The amqp10 module now supports pluggable Client behaviors with the exported use method. Officially supported plugins include:

  • amqp10-link-cache - caches links with optional purging based on ttl
  • amqp10-rpc - an rpc server/client implementation on top of amqp10

Supported Servers

We are currently actively running integration tests against the following servers:

  1. Azure EventHubs
  2. Azure ServiceBus Queues and Topics
  3. Apache Qpid C++ broker (qpidd)

We have been tested against the following servers, but not exhaustively so issues may remain:

  1. ActiveMQ (open issue related to ActiveMQ ignoring the auto-settle setting and disposition frames may cause messages to re-deliver or stop sending after a certain period)
  2. RabbitMQ with the amqp 1.0 experimental extension
  3. Apache Qpid Java broker

If you find any issues, please report them via GitHub.

Todos and Known Issues

  1. Disposition support is incomplete in that we don't send proper "unsettled" information when re-attaching links.
  2. There are some AMQP types we don't process - notably the Decimal23/64/128 types. These are unused by the protocol, and no-one seems to be using them to convey information in messages, so ignoring them is likely safe.

Implementation Notes

  • Using node's built-in net/tls classes for communicating with the server.

  • Data from the server is written to a buffer-list based on Rod Vagg's BL.

  • Outgoing data is encoded using this buffer builder - streaming output won't really work since each outgoing payload needs to be prefixed with its encoded size, however we're working on converting to use as much streaming as possible.

  • The connection state is managed using Stately.js, with state transitions swapping which callback gets invoked on receipt of new data. (e.g. post-connection, we write the AMQP version header and then install a callback to ensure the correct version. Once incoming data is written to the circular buffer, this callback is invoked, and a comparison vs. the expected version triggers another transition).

  • Debug output is done via debug with the prefix amqp10:. The main client's debug name is amqp10:client so setting DEBUG=amqp10:client as an environment variable will get you all top-level debugging output.

    bash# export DEBUG=amqp*
    C:\> set DEBUG=amqp*
    [root@pinguino]# node simple_eventhub_test.js
      amqp10:client connecting to: amqps://xxxxxx:[email protected] +0ms
      amqp10:connection Connecting to xxxxxx-service-bus-001.servicebus.windows.net:5671 via TLS +72ms
      amqp10:connection Transitioning from DISCONNECTED to START due to connect +17ms
      amqp10:connection Sending Header 414d515003010000 +405ms
      amqp10:connection Transitioning from START to IN_SASL due to connected +6ms
      amqp10:connection Rx: 414d515003010000 +128ms
      amqp10:sasl Server SASL Version: 414d515003010000 vs 414d515003010000 +1ms
      amqp10:connection Rx: 0000003f02010000005340c03201e02f04b3000000074d535342434... +162ms
      amqp10:client Reading variable with prefix 0xc0 of length 52 +2ms
      amqp10:client Decoding 5340 +0ms
      [...]
  • Many thanks to Gordon Sim for inspiration on the type system, gleaned from his project rhea.

node-amqp10's People

Contributors

abhijeetahuja avatar amarzavery avatar bengreenier avatar danlangford avatar dnwe avatar energiz0r avatar gitter-badger avatar kurtb avatar mbroadst avatar noodlefrenzy avatar oferb1 avatar pierreca avatar prestona avatar princjef avatar robblovell avatar snobu avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

node-amqp10's Issues

Connection inactive for a long time

A listener inactive for a long time gives this error:

Error Connecting:
{ descriptor: [Int64 value:29 octets:00 00 00 00 00 00 00 1d],
value: undefined,
condition: { contents: 'amqp:connection:forced' },
description: 'The connection was inactive for more than the allowed period of time.',
errorInfo: null }

Is it possible to increase the allowed period of time for a certain connection?

Frames: Open Frame

Factory method on Frame to create an open frame for transmission, and another to process an incoming open frame. Appropriate modifications to Connection to support connection settings involved.

Frames: Detach Frame

Factory method on Frame to create a detach frame for transmission, and another to process an incoming detach frame. Modify Session as needed to manage link shutdown and handle reclamation. Should log any error in the incoming detach frame.

Receiver stops dequeueing messages in case of big queue length

It seems that receiver stops its receive activity when a lot of messages are pending inside queue. I noticed this during stress test, sending a message per second.

Is this probably the expected behavior when the amqp client detects possible memory leak?

Frames: End Frame

Factory method on Frame to create an end frame for transmission, and another to process an incoming end frame. Should tear down the session as necessary, and should log any error sent in the frame.

Frames: Flow Frame

Factory method on Frame to create a flow frame for transmission, and another to process an incoming flow frame. Should modify Link class (or appropriate) to manage flow control. Talk with EventHub folks to determine which flow-control mechanisms they use and prioritize support as appropriate.

Messages not received

Hi,
It seems that messages bigger than 350 bytes are sent but not received. Is there some size restriction on outgoing messages or I'm missing a particular policy in the receive method?

Thanks

Manage delivery semantics

AMQP 1.0 allows a wide range of delivery semantics, including at-most-once, at-least-once, and umm, neither. This encapsulates all work to manage that state for in-flight messages, and deal with settlement in-flow/out-flow through the processing and sending of disposition frames.

Frames: Disposition Frame

Factory method on Frame to create a disposition frame for transmission, and another to process an incoming disposition frame. This does not cover delivery semantics, which is factored into a separate task. It does, however, cover any invocations of methods for the class(es) involved to manage in-flight message state.

Copying buffers is slow

Copying buffers is slow in node (and maybe everywhere else), so I suggest using bl which does almost the same thing of CircularBuffer but instead maintains the original Buffer objects without copying them.

cc @teomurgi

Handle heartbeat/timeout issues

Connections have a timeout for both sides. We should handle sending the appropriate timeout on begin, closing the connection if that timeout expires without activity, and "heartbeating" by sending payload-free data frames on channel 0 to keep the connection open.

Frames: Begin Frame

Factory method on Frame to create a begin frame for transmission, and another to process an incoming begin frame. Modifications to Session as required to assign the channel number and manage handles. Should parse and log capabilities and properties at this point, although no need to process them.

Link-level flow-control

Need to track link-level flow-control primitives (link credit, primarily) and deal with violations.

Codec: Encode primitive P2 types

These are primitive types, but are less important for getting a V0.1 out the door. Examples include but are not limited to GUID.

Session-level flow control

Need to track session-level flow-control primitives (nextIncoming/outgoing/windows) and deal with violations appropriately.

npm install throws error

Hi,

I tried to install this module with npm but I have this error caused by CircularBuffer module.

npm install error

Codec: Decode all simple types

Process incoming data in message frames for all simple data types. These include all the various integral types as well as UTF8 strings both short and long. These do not include described types or composite types.

Reusing client connection to send messages

"node-amqp-1-0 version 0.0.6"

I initialize and store client connection at my program startup because I need to call client.send() quite often but after some minutes of inactivity I receive this error while performing send again:

"Cannot call method 'write' of null" (lib/connection.js - line 330)

Should I perform a connection for every send or I must set some hearthbeat?

Thanks

Frames not setting channel on encoding

In testing with ActiveMQ, looks like Begin frame was not getting channel set appropriately. Two issues: I didn't have a test to catch that, and the actual issue itself.

Codec: Encode all simple types

Process outgoing data for all simple data types, and deliver to outgoing message frames . These include all the various integral types as well as UTF8 strings both short and long. These do not include described types or composite types.

Frames: Transfer Frame

Factory method on Frame to create a transfer frame for transmission, and another to process an incoming transfer frame. This does not encompass the codec of message contents, that's a separate task. It also does not cover the delivery semantics, which is factored into another task.

Frames: Close Frame

Factory method on Frame to create a close frame for transmission, and another to process an incoming close frame.

Not able to send the message after successful connection to Jboss ActiveMQ via ampq

HI,
I have JBoss AMQ running locally with following connector configuration :
transportConnector name="amqp" uri="amqp://0.0.0.0:5672

I am trying to send a message using node-amqp-1-0 :+1 in nodeJs:
var client = new AMQPClient(); // Uses PolicyBase default policy
client.connect('amqp://admin:admin@localhost/sqrQueue', function(conn_err) {
// ... check for errors ...
client.send(JSON.stringify({ key: "Value" }), function (send_err) {
// ... check for errors ...
});
client.receive(function (rx_err, payload, annotations) {
// ... check for errors ...
console.log('Rx message: ');
console.log(JSON.parse(payload));
});
});

After doing this , i can see the producer in the JBOSS AMQ console but no messages.

Please correct me in case I missed anything or need more information,

Removing received messages from queue

When used with apache activemq, the amqp client does not seem to cause received messages to be removed from the queue, even when the policy sets the receiver settle mode to auto. I think you may have touched on this in one of your caveats on the project page, but is there a workaround for this at the moment?

Frames: Attach Frame

Factory method on Frame to create an attach frame for transmission, and another to process an incoming attach frame. Modify the Session as appropriate to manage link state, assign handles, etc.

JBoss ActiveMQ Topics not working

Hi, I tried using the topics but it always create the producers and consumers under queue in JBoss ActiveMQ.

Can you please have alook and confirm the exact usage ?

Codec: Decode primitive P2 types

These are primitive types, but are less important for getting a V0.1 out the door. Examples include but are not limited to GUID.

Expose Flow-control management

How do clients of the node module manage flow control? They need to be able to tweak windows, bump recv link credit. Should they be able to internally queue messages for sending even when there's no link credit/window left?

Not able to parse the response in receive

I can see my payload in the queue @ JBoss Active MQ i.e.
bytes:
5b 7b 22 69 64 22 3a 31 2c 22 74 69 74 6c 65 22 3a 22 74 69 74 6c 65 22 2c 22 63 72 65 64 69 74 73 22 3a 33 33 2c 22 63 6f 6e 74 65 6e 74 22 3a 22 6a 74 79 6a 22 2c 22 6c 65 76 65 6c 73 22 3a 31 2c 22 71 75 61 6c 69 66 69 63 61 74 69 6f 6e 54 79 70 65 22 3a 31 2c 22 65 6e 74 72 79 52 65 71 75 69 72 65 6d 65 6e 74 73 22 3a 22 6a 79 74 6a 22 2c 22 63 72 65 61 74 65 44 61 74 65 22 3a 22 30 32 2f 30 32 2f 32 30 30 31 22 2c 22 6e 65 78 74 52 65 76 69 65 77 44 61 74 65 22 3a 22 30 33 2f 30 33 2f 32 30 30 31 22 7d 5d

text:
[{"id":1,"title":"title","credits":33,"content":"jtyj","levels":1,"qualificationType":1,"entryRequirements":"jytj","createDate":"02/02/2001","nextReviewDate":"03/03/2001"}]

On UI got the exception : Malformed payload: Unknown code: 0x5b

Can you please have a look and let me know in case I have to do something different ?

AMQP 1.0 Mock server

Not a full mock server, just the skeletal pieces required to enable unit-testing.

Error: Out of available handles (Max = undefined)

Hi,
After 1 day of inactivity the client gave us this error while performing send again :

 [uncaughtException] ERROR: Error: Out of available handles (Max = undefined)
        at new OverCapacityError (/home/azureuser/EMS/backend/20150211114057/node_modules/node-amqp-1-0/lib/exceptions.js:71:11)
        at Session._nextHandle (/home/azureuser/EMS/backend/20150211114057/node_modules/node-amqp-1-0/lib/session.js:259:11)
        at Session.attachLink (/home/azureuser/EMS/backend/20150211114057/node_modules/node-amqp-1-0/lib/session.js:192:34)
        at attach (/home/azureuser/EMS/backend/20150211114057/node_modules/node-amqp-1-0/amqp_client.js:301:27)
        at AMQPClient.send (/home/azureuser/EMS/backend/20150211114057/node_modules/node-amqp-1-0/amqp_client.js:342:13)
        at /home/azureuser/EMS/backend/20150211114057/lib/ems_communication/amqpservicebus/index.js:259:45
        at fn (/home/azureuser/EMS/backend/20150211114057/node_modules/async/lib/async.js:582:34)
        at Object._onImmediate (/home/azureuser/EMS/backend/20150211114057/node_modules/async/lib/async.js:498:34)
        at processImmediate [as _immediateCallback] (timers.js:330:15)

cc @teomurgi

Session class controls links

Session class implemented, controls channel mapping and link setup/teardown. Specifically:

Handles Frames indicated by below:

Frame          Connection  Session  Link
========================================
open               H
begin              I          H
attach                        I      H
flow                          I      H
transfer                      I      H
disposition                   I      H
detach                        I      H
end                I          H
close              H
----------------------------------------

Is node-amqp-1-0 supports the exchanging of messages from Jboss Active MQ topic?

I am planning to implement the pub/sub messaging framework from client to Services via Jboss Active MQ.
Before heading to use topics for exchanging the messages , I just want to know the compatibility of Topics with node-amq-1-0. I already tried with queue , it is working fine but wants Topic now.

Please share the inputs.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.