Giter Club home page Giter Club logo

seneca-amqp-transport's People

Contributors

disintegrator avatar ericnograles avatar greenkeeper[bot] avatar greenkeeperio-bot avatar idoshamun avatar mitchellparsons avatar nfantone avatar xrysanthos avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

seneca-amqp-transport's Issues

calling seneca.close throws an error

I have an integration script that used to use the default transport. Now I have migrated to seneca-amqp-transport, there is an error client side when I try to close the connection and exit the script. The channel is already closing.

IllegalOperationError: Connection closing

I won't post the whole thing, but the line(s) causing the problem starts on line 82 in client-hook.js

Exception when integrate to Express

Hello,

Currently, I am trying to integrate Seneca with Express.

It will work well if we act like below code
let seneca = require('seneca')()
.use('seneca-amqp-transport');
let client = seneca.client({
type: 'amqp',
url: '....',
pin: 'role:communicationservice'
});

client.act({ role: 'communicationservice', cmd: 'ping' }, (err, result) => {
if (err) {
console.log(err);
}
console.log(result);
});

However, when we include into express routing, it throw exception
let seneca = require('seneca')()
.use('seneca-amqp-transport');
let client = seneca.client({
type: 'amqp',
url: '.......'
pin: 'role:communicationservice'
});
let express = require('express');
let router = express.Router();

router.get('/ping', (req, res, next) => {
console.log('123456');
client.act({ role: 'communicationservice', cmd: 'ping' }, (err, result) => {
if (err) {
console.log(err);
res.statusCode(500).json(err);
}
console.log(result);
res.json(result);
});
})

module.exports = router;

2016-06-23T06:46:42.937Z q8vwu2cu607j/1466664390127/8160/- ERROR act amqp-transport OUT hook:client,role:transport,type:amqp 10518 {type:amqp,url:amqp://xxxx,pin:role:communicationservice,pg:role:communicatio ENTRY (izp9dgqiwqoe) - seneca: Action hook:client,role:transport,type:amqp failed: read ECONNRESET. act_execute {id:fwz0l2kbsucg/j11mkslegzpj,gate:false,ungate:true,desc:null,time:{start:1466664392401,end:1466664402914},mes Error: read ECONNRESET
at exports._errnoException (util.js:870:11)
at TCP.onread (net.js:552:26)
2016-06-23T06:46:42.937Z q8vwu2cu607j/1466664390127/8160/- FATAL plugin client$ sys seneca 2.1.0 q8vwu2cu607j/1466664390127/8160/- act_execute seneca: Action hook:client,role:transport,type:amqp failed: read ECONNRESET. {id:fwz0l2kbsucg/j11mkslegzpj,gate:false,ungate:true,desc:null,time:{start:1466664392401,end:1466664402914},mes all-errors-fatal
2016-06-23T06:46:43.519Z q8vwu2cu607j/1466664390127/8160/- ERROR act transport OUT cmd:client,role:transport 13168 {config:{type:amqp,url:amqp://.....,pin:role:communicationservice,pg:role:comm ENTRY (7bx5qybvvz5f) GATE seneca: Action cmd:client,role:transport failed: [TIMEOUT:9pevg91qhh3e/j11mkslegzpj:11111<1466664403510-1466664392398:undefined]. act_execute {id:9pevg91qhh3e/j11mkslegzpj,gate:true,ungate:true,desc:null,time:{start:1466664392398,end:1466664403510},mess Error: [TIMEOUT:9pevg91qhh3e/j11mkslegzpj:11111<1466664403510-1466664392398:undefined]
at null._onTimeout (d:\Research\Git\NodeJS\demo-micro-service\src\node_modules\seneca\node_modules\gate-executor\gate-executor.js:112:21)
at Timer.listOnTimeout (timers.js:92:15)
2016-06-23T06:46:43.520Z q8vwu2cu607j/1466664390127/8160/- FATAL plugin client$ sys seneca 2.1.0 q8vwu2cu607j/1466664390127/8160/- act_execute seneca: Action cmd:client,role:transport failed: [TIMEOUT:9pevg91qhh3e/j11mkslegzpj:11111<1466664403510-1466664392398:undefined].

       'tx$': 'j11mkslegzpj' },
    delegate: [Function],
    context: {},
    client: [Function],
    listen: [Function],
    makelogfuncs: [Function],
    log: 
     { [Function]
       debug: [Function],
       info: [Function],
       warn: [Function],
       error: [Function],
       fatal: [Function] },
    prior: [Function],
    good: [Function],
    bad: [Function] } },

fn: [Function],
time: { start: 1466664392401, end: 1466664402914 },
'orig$': { [Error: read ECONNRESET] code: 'ECONNRESET', errno: 'ECONNRESET', syscall: 'read' },
'message$': 'read ECONNRESET',
message: 'read ECONNRESET',
pattern: 'hook:client,role:transport,type:amqp',
instance: 'Seneca/2.1.0/q8vwu2cu607j/1466664390127/8160/-',
plugin: {} }
Stack:
at exports._errnoException (util.js:870:11)
at TCP.onread (net.js:552:26)

Could you let me know what is wrong?

Thanks,

Phuong Le

Idea: What about adding model:observe similar to mesh?

I would like to log some searches going on in the system but I don't need to reply back. In seneca-mesh there is the concept of model:consume and model:observe.

I added model:observe to the listen options and then made this minor change to lib/listener.js handleMessage function.

handleMessage(message, data) {
  return this.utils.handle_request(this.seneca, data, this.options, (out) => {
    if (!out) {
      return;
    }
    if (typeof this.options.model !== 'string' || this.options.model !== 'observe') {
        var outstr = this.utils.stringifyJSON(this.seneca, `listen-${this.options.type}`, out);
        this.transport.channel.sendToQueue(message.properties.replyTo, new Buffer(outstr), {
          correlationId: message.properties.correlationId
        });
    }
    this.transport.channel.ack(message);
  });
}

Based on your develop branch it would be pretty trivial to add it to the createReplyWith function

Mismatched queue names are used when pin values have more than one key-value pair

The example only uses a single pin, "role:create", which works fine. If you add more detail to the pin such as "role:create,type:random" communication will be impossible. The cause is that the queue names generated to match each pin are not generated correctly by the client resulting in the inability of rabbit to route messages.

I have attempted to fix this (pull request incoming) by extracting the queue name generation to a single common place and then calling in both listen and client code paths. I have also updated the the examples to include multiple value pins.

Random issue with the transport

Apologies for the terrible issue title, I did not know how to describe it.

So I'm running into an issue which (I assume based on the circumstances) is caused by this transport plugin.

Basically, I am building an API using seneca and I have three services for now, web-server, users-read and users-write. web-server is the express server, users-read performs only read operations for the API, and users-write only the writing ones.

The issue I am running into is that the call to a specific seneca patterns works only on my first request, and if I re-request it again (with nothing changed) it times out. Specifically, it's this line.

So what it does is calls the 'users-read' service to check if the username passed via post is already present in the database. This works great the first time, but the second time I just get a timeout here (and the service which is supposed to handle this request, defined here, never does).

Now comes the crazy part, if I remove the username validation and only validate email (which does the same thing, only with email (but the pattern is the same), it works every time and I get no timeout issues. The only difference is that in username validation request, the queryParams object sent has a username property, while when validating email it's the email property that the object has.

You can run my example via docker compose. I know it's a lot of code and I wouldn't be posting this here if I was not desparate. I think this is transport related because if I move the users.api.query-factory.js plugin to my users-write service (so that the pattern is matched locally and it doesn't have to look for it via transport), it works without issues.

Any help will be greatly appreciated, thanks!

P.S. I tried downgrading seneca to 1.3.0 and it did not help.

listen for "normal" messages

Hey there,
is there a way to listen to messages that were not made with a seneca-amqp-transport client?

Thanks in advance

japel

question: how to handle async processing

I want my message to be re-queued if my microservice dies while asynchronously processing the message. My consumer service's add function of seneca is doing some asynchronous processing. Something like this (typescript):

seneca()
    .use('seneca-amqp-transport')
    .add('cmd:indexLog', (msg, cb: Function) => {
        return Promise.resolve()
            .then(() =>
                doSomethingAsynchronous()
                .then(
                    () => cb(null, { ok: true, when: Date.now() }),
                    (err: Error) => cb(err)
                )
            );
    })
    .listen({
        type: 'amqp',
        pin: 'cmd:indexLog',
        url: config.rabbit.connectionString
    });

The problem is, if somewhere inside doSomethingAsynchronous encounters an error and my microservice dies, it seems like the message is still acked. Therefore on restart of my microservice, the message I just received is lost. It looks like this bit expects the processing of the message to be synchronous? Am I going about this the wrong way? Any info would be greatly appreciated. Thanks!

AMQP Listener Doesn't Work With dot values in the pin (e.g. role:twitter,version:v1.0,cmd:tweets,method:GET)

Hi there!

First off, fantastic work on Seneca and its ecosystem. It is a revelation, especially after having "rolled our own" microservice conventions and knowing how difficult it is to land on a proper pattern. The Seneca scheme is quite elegant.

Issue Overview

I think I ran into a weird edge case, or I missed something in the documentation. In short, it appears that Seneca AMQP services don't seem to understand a pin if the value has dots in it.

Sample Use Case

For example, let's say I have a client that fires off role:twitter,version:v1.0,cmd:tweets,method:GET to an AMQP listener. Even if the receiving service specifies "any" for the version, it still won't understand the v1.0 bit of the pin.

However, firing off role:twitter,version:v1,cmd:tweets,method:GET will actually work.

I haven't had a chance to dive into the source code of seneca-amqp-transport yet to pinpoint how things might work (will do that later today), but I wanted to fire up this issue to see if there was a quick answer.

Code for the AMQP listener below. Thanks for your help, everyone, and keep up the great work!

AMQP Listener Code

var fs = require('fs');

/**
 * A sample AMQP microservice for service bus communication via RabbitMQ with the seneca-service-api
 * @param options
 */
function twitter(options) {
  var log;

  this.add('init:twitter', init);
  this.add('role:twitter,version:*,cmd:*,method:*', noMatch);
  this.add('role:twitter,version:*,cmd:tweets,method:GET', tweets);

  function init(msg, respond) {
    // Note, all the code below is optional
    // log to a custom file
    fs.open(options.logfile, 'a', function (err, fd) {

      // cannot open for writing, so fail
      // this error is fatal to Seneca
      if (err) return respond(err);

      log = makeLog(fd);
      respond();
    });
  }

  function makeLog(fd) {
    // TODO: Tie this into something like Winston
    return function (entry) {
      fs.write(fd, '\n' + new Date().toISOString() + ' ' + entry, null, 'utf8', function (err) {
        if (err) return console.log(err);

        // ensure log entry is flushed
        fs.fsync(fd, function (err) {
          if (err) return console.log(err);
        })
      });
    }
  }

  function noMatch(payload, respond) {
    respond(null, {
      statusCode: 404,
      original: payload,
      error: {
        message: 'Invalid service command'
      }
    });
  }

  function tweets(payload, respond) {
    // TODO: Your service begins here
    log('RECEIVED: ' + payload);
    respond(null, {status: 'success', tweets: []});
  }
}

// Define queues
require('seneca')()
  .use(twitter, { logfile: './twitter.log'})
  .use('seneca-amqp-transport')
  .listen({
    type: 'amqp',
    pin: 'role:twitter,version:*,cmd:*,method:*',
    url: process.env.AMQP_URL || 'amqp://127.0.0.1'
  });

Pins with integer attributes not supported?

If I define a seneca instance to listen to a message that contains an integer attribute I get a fatal error. The same message, with a string value instead will work.
This is unfortunate because messages like remote:1 will get translated to {remote:1} and not {remote:'1'}

Examples:

Won't work:

const Seneca = require('seneca')
const seneca = Seneca()
seneca.use('amqp-transport')
seneca.listen({
  type: 'amqp',
  url: 'amqp://guest:guest@localhost:5672',
  pin: 'remote:1'
})
const Seneca = require('seneca')
const seneca = Seneca()
seneca.use('amqp-transport')
seneca.listen({
  type: 'amqp',
  url: 'amqp://guest:guest@localhost:5672',
  pin: {remote:1}
})

Will work

const Seneca = require('seneca')
const seneca = Seneca()
seneca.use('amqp-transport')
seneca.listen({
  type: 'amqp',
  url: 'amqp://guest:guest@localhost:5672',
  pin: {remote:'1'}
})
const Seneca = require('seneca')
const seneca = Seneca()
seneca.use('amqp-transport')
seneca.client({
  type: 'amqp',
  url: 'amqp://guest:guest@localhost:5672',
  pin: 'remote:1'
})

Error received

Seneca Fatal Error
==================

Message: seneca: value.replace is not a function

Code: transport_listen

Details: { type: 'amqp',
  url: 'amqp://guest:guest@localhost:5672',
  pin: 'remote:1',
  'orig$':
   { TypeError: value.replace is not a function
       at _.forOwn (/Users/paolochiodi/Projects/seneca/seneca-zipkin-tracer/node_modules/seneca-amqp-transport/lib/amqp-util.js:152:21)
       at /Users/paolochiodi/Projects/seneca/seneca-zipkin-tracer/node_modules/seneca-amqp-transport/node_modules/lodash/lodash.js:4917:15
       at baseForOwn (/Users/paolochiodi/Projects/seneca/seneca-zipkin-tracer/node_modules/seneca-amqp-transport/node_modules/lodash/lodash.js:2979:24)
       at Function.forOwn (/Users/paolochiodi/Projects/seneca/seneca-zipkin-tracer/node_modules/seneca-amqp-transport/node_modules/lodash/lodash.js:12926:24)
       at stringify (/Users/paolochiodi/Projects/seneca/seneca-zipkin-tracer/node_modules/seneca-amqp-transport/lib/amqp-util.js:147:5)

How to limit message consumption rate for services?

Hi @nfantone,

I ran into a scenario that I think warranted a little discussion before I attempted to attack it.

I've "rolled my own" microservice framework with my team before using a variety of libraries + amqplib, which is why Seneca was such a revelation to me. I certainly can appreciate the thought put into this framework and its plugins.

One situation I remember that we had to handle was limiting the consumption rate of services. In other words, the capability to configure a service to consume only, say, 500 messages at a time. We had to do this for several reasons:

  1. We had to handle massive load, upwards of 10,000 reqs/sec at peak from user and integration traffic
  2. In our microservice ecosystem, we'd sit have numerous instances of services (on Heroku) listening on a particular exchange and queue. If no limit was set, one (or a small subset) of services would monopolize all message handling and we couldn't really achieve true load balancing.
  3. As a side effect to the above, when the one (or subset) of services gets overwhelmed with requests, users experience service degradation. The users that were lucky enough to have their messages processed by the non-pegged services would experience good throughput while the ones unfortunate enough to have their messages handled by the pegged services would see major latency
  4. Avoid having services run out of memory due to processing too many messages at a time

I know that RabbitMQ has a maxLength property you can set on queues, but that wouldn't necessarily be the best solution. I know, natively on AMQP, you can do something like setting basic.qos, but that's on the channel level.

My next instinct was then to bake it into the service implementation itself. Perhaps something internally that keeps track of the number of messages a service is currently processing, but I was unclear as to how to tell the service instance to then not handle a message it received and kick it back to the queue for another service instance to handle.

Anyway, what would your suggestion be to attack this problem within the context of Seneca and the AMQP transport?

Thanks!

Eric

Unit tests and coverage

Before releasing 1.0.0, it would be nice to have basic functionality tested using Mocha and a coverage report

  • Write unit test for amqp-util.js
  • Mock out amqplib stuff.
  • Write unit test for listener.js.
  • Write unit test for client.js.

@disintegrator Could you give a hand here, if you are available?

Question about mq type transports

Hi,

I'm quite new to Seneca. So far, I'm loving it but got confused on message queue type transports.

Assume I got two services, an api service and a recommendation engine service, communicating via AMQ transport. In a traditional scenario, when a user makes a request to api service, api service sends the action to recommendation engine, recommendation engine finds the product and sends it back to the api service, api service receives the recommended product, and sends it back to the user.

This requires a bi-directional transport layer. How does seneca message queue plugins achieve this bi-directional communication? Does the recommendation service create a response message, which in turn received by the api service?

In my previous projects, I used message/job queues uni-directional. The main server creates a job, sends it to the message queue and moves on without waiting for the job workers. On a separate machine, the workers fetches the job and completes, without informing the main server. Is it possible to achieve this with Seneca mq transports?

incompatible version of `amqpuri`

Recently I had an absurd error on seneca-amqp-transport on the service that had been run on production for a while, the error says:

Seneca Fatal Error
==================

Message: seneca: use-plugin: Could not load plugin seneca-amqp-transport defined in seneca-amqp-transport due to syntax error: Unexpected token {. See STDERR for details.

Code: plugin_syntax_error

Details: { 'orig$':
   { [Error: use-plugin: Could not load plugin seneca-amqp-transport defined in seneca-amqp-transport due to syntax error: Unexpected token {. See STDERR for details.]
     eraro: true,
     orig: null,
     code: 'syntax_error',
     'use-plugin': true,
     package: 'use-plugin',
     msg: 'use-plugin: Could not load plugin seneca-amqp-transport defined in seneca-amqp-transport due to syntax error: Unexpected token {. See STDERR for details.',
     details:
      { options: {},
        callback: undefined,
        history:
         [ { module: '/build-tmp/node_modules/seneca/seneca.js',
             path: 'seneca-amqp-transport' } ],
        name: 'seneca-amqp-transport',
        search:
         [ { type: 'normal', name: 'seneca-amqp-transport' },
           { type: 'normal', name: 'seneca-seneca-amqp-transport' },
           { type: 'normal', name: './seneca-amqp-transport' },
           { type: 'normal', name: './seneca-seneca-amqp-transport' } ],
        err: [SyntaxError: Unexpected token {],
        found: { type: 'normal', name: 'seneca-amqp-transport' },
        found_name: 'seneca-amqp-transport',
        err_msg: 'Unexpected token {' },
     callpoint: 'at Seneca.api_use [as use] (/build-tmp/node_modules/seneca/seneca.js:967:29)' },
  'message$': 'use-plugin: Could not load plugin seneca-amqp-transport defined in seneca-amqp-transport due to syntax error: Unexpected token {. See STDERR for details.' }

Stack:
    at errormaker (/build-tmp/node_modules/eraro/eraro.js:94:15)
    at handle_load_error (/build-tmp/node_modules/use-plugin/use.js:248:12)
    at loadplugin (/build-tmp/node_modules/use-plugin/use.js:205:13)
    at Object.use (/build-tmp/node_modules/use-plugin/use.js:87:7)
    at Seneca.api_use [as use] (/build-tmp/node_modules/seneca/seneca.js:967:29)
    at Object.<anonymous> (/build-tmp/src/index.js:185:8)
    at Module._compile (module.js:413:34)
    at Object.Module._extensions..js (module.js:422:10)
    at Module.load (module.js:357:32)
    at Function.Module._load (module.js:314:12)
    at Function.Module.runMain (module.js:447:10)
    at startup (node.js:148:18)
    at node.js:405:3

Instance: Seneca/s8u6l5litvid/1493724375490/18/3.3.0/-
    at Seneca.api_use [as use] (/build-tmp/node_modules/seneca/seneca.js:970:12)

When: 2017-05-02T11:26:17.863Z

Log: {kind:null,plugin:seneca,tag:3.3.0,id:s8u6l5litvid/1493724375490/18/3.3.0/-,code:plugin_syntax_error,notice:sen

Node:
  { http_parser: '2.7.0', node: '5.12.0', v8: '4.6.85.32', uv: '1.8.0', zlib: '1.2.8', ares: '1.10.1-DEV', modules: '47', openssl: '1.0.2h' },
  { debug: false, uv: true, ipv6: true, tls_npn: true, tls_alpn: true, tls_sni: true, tls_ocsp: true, tls: true },
  [ 'Binding contextify', 'Binding natives', 'NativeModule events', 'NativeModule buffer', 'Binding buffer', 'NativeModule internal/util', 'Binding util', 'NativeModule timers', 'Binding timer_wrap', 'NativeModule internal/linkedlist', 'NativeModule assert', 'NativeModule util', 'Binding uv', 'NativeModule internal/process', 'NativeModule internal/process/next_tick', 'NativeModule internal/process/promises', 'NativeModule internal/process/stdio', 'NativeModule path', 'NativeModule module', 'NativeModule internal/module', 'NativeModule vm', 'NativeModule fs', 'Binding fs', 'NativeModule constants', 'Binding constants', 'NativeModule stream', 'NativeModule _stream_readable', 'NativeModule _stream_writable', 'NativeModule _stream_duplex', 'NativeModule _stream_transform', 'NativeModule _stream_passthrough', 'Binding fs_event_wrap', 'NativeModule console', 'Binding tty_wrap', 'NativeModule tty', 'NativeModule net', 'NativeModule internal/net', 'Binding cares_wrap', 'Binding tcp_wrap', 'Binding pipe_wrap', 'Binding stream_wrap', 'Binding signal_wrap', 'NativeModule os', 'Binding os', 'NativeModule http', 'NativeModule _http_incoming', 'NativeModule _http_common', 'NativeModule internal/freelist', 'Binding http_parser', 'NativeModule _http_outgoing', 'NativeModule _http_server', 'NativeModule _http_agent', 'NativeModule _http_client', 'NativeModule url', 'NativeModule punycode', 'NativeModule querystring', 'NativeModule https', 'NativeModule tls', 'Binding crypto', 'NativeModule _tls_common', 'NativeModule _tls_wrap', 'NativeModule crypto', 'NativeModule internal/streams/lazy_transform', 'NativeModule string_decoder', 'NativeModule _stream_wrap', 'Binding js_stream', 'Binding tls_wrap', 'NativeModule _tls_legacy', 'NativeModule zlib', 'Binding zlib', 'NativeModule dns' ]

Process:
  pid=18, arch=x64, platform=linux,
  path=/usr/bin/node,
  argv=[ '/usr/bin/node', '/build-tmp/build/index.js' ],
  env={ npm_config_cache_lock_stale: '60000',  npm_package_devDependencies_babel_plugin_transform_object_rest_spread: '^6.8.0',  npm_package_dependencies_mongoose_validator: '^1.2.5',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_1_PORT_9300_TCP_PORT: '9300',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_1_PORT_9200_TCP_PROTO: 'tcp',  ES_PORT_9300_TCP: 'tcp://es:9300',  ESB_RABBITMQ_1_PORT_5671_TCP_PORT: '5671',  ESB_RABBITMQ_1_PORT_4369_TCP_ADDR: 'rabbitmq',  ES_ENV_CA_CERTIFICATES_JAVA_VERSION: '20161107~bpo8+1',  ESB_RABBITMQ_1_ENV_RABBITMQ_SASL_LOGS: '-',  npm_config_sign_git_tag: '',  npm_config_legacy_bundling: '',  npm_package_devDependencies_babel_core: '^6.14.0',  npm_package_dependencies_mongodb_migrate: '^2.0.1',  npm_package_dependencies_bluebird: '^3.4.6',  ESB_RABBITMQ_1_PORT_5672_TCP_PORT: '5672',  ESB_RABBITMQ_1_PORT: 'tcp://rabbitmq:15672',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_1_PORT_9300_TCP_PROTO: 'tcp',  ESB_RABBITMQ_1_PORT_5671_TCP_PROTO: 'tcp',  ES_ENV_JAVA_VERSION: '8u121',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_ELASTICSEARCH_DATA_1_ENV_JAVA_DEBIAN_VERSION: '8u121-b13-1~bpo8+1',  npm_config_always_auth: '',  npm_config_user_agent: 'npm/3.10.3 node/v5.12.0 linux x64',  npm_package_dependencies_xml2js: '^0.4.17',  ESB_RABBITMQ_1_PORT_5672_TCP_PROTO: 'tcp',  ES_NAME: '/cattle/es',  ESB_RABBITMQ_1_ENV_RABBITMQ_DEBIAN_VERSION: '3.6.6-1',  npm_config_key: '',  npm_config_bin_links: 'true',  npm_package_dependencies_extend: '^3.0.0',  ES_ENV_ELASTICSEARCH_VERSION: '2.4.4',  ESB_RABBITMQ_1_PORT_4369_TCP_PORT: '4369',  npm_node_execpath: '/usr/bin/node',  npm_config_user: '',  npm_config_init_version: '1.0.0',  npm_config_if_present: '',  npm_config_heading: 'npm',  npm_config_fetch_retries: '2',  npm_config_description: 'true',  npm_package_devDependencies_nodemon: '^1.10.2',  npm_package_dependencies_node_fetch: '^1.6.3',  RABBITMQ_PORT_15671_TCP: 'tcp://rabbitmq:15671',  ESB_RABBITMQ_1_PORT_4369_TCP_PROTO: 'tcp',  ESB_RABBITMQ_1_NAME: '/cattle/esb-rabbitmq-1',  AMQP_URL: 'amqp://guest:guest@rabbitmq:5672',  HOSTNAME: 'b08968a69097',  SHLVL: '1',  npm_package_devDependencies_babel_preset_es2015: '^6.14.0',  npm_package_devDependencies_babel_cli: '^6.14.0',  npm_package_dependencies_seneca_amqp_transport: '^1.0.1',  RABBITMQ_PORT_15672_TCP: 'tcp://rabbitmq:15672',  npm_config_force: '',  npm_package_devDependencies_mocha: '^3.0.2',  npm_package_dependencies_moment_timezone: '^0.5.11',  npm_package_scripts_test_api_win: 'mocha --grep REST --require source-map-support/register --compilers js:babel-core/register --recursive',  npm_package_scripts_test_api: 'NODE_ENV=test node_modules/.bin/mocha --grep /RenterApi/v1/Exam/Retrieve --require source-map-support/register --compilers js:babel-core/register --recursive',  HOME: '/root',  RABBITMQ_PORT_25672_TCP: 'tcp://rabbitmq:25672',  IS_MAIL_AGENT_APPLICATION: 'false',  npm_config_only: '',  npm_package_dependencies_ls: '^0.2.1',  ESB_RABBITMQ_1_PORT_15671_TCP_ADDR: 'rabbitmq',  DB_NAME: '/cattle/db',  npm_config_init_license: 'ISC',  npm_config_cache_min: '10',  npm_package_devDependencies_chai: '^3.5.0',  npm_package_devDependencies_babel_preset_latest: '^6.14.0',  ES_ENV_JAVA_DEBIAN_VERSION: '8u121-b13-1~bpo8+1',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_1_PORT_9200_TCP: 'tcp://es:9200',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_1_PORT: 'tcp://es:9200',  ESB_RABBITMQ_1_PORT_15672_TCP_ADDR: 'rabbitmq',  npm_config_tag_version_prefix: 'v',  npm_config_rollback: 'true',  npm_config_editor: 'vi',  npm_package_dependencies_validator: '^6.2.0',  ESB_RABBITMQ_1_PORT_5671_TCP: 'tcp://rabbitmq:5671',  ESB_RABBITMQ_1_PORT_25672_TCP_ADDR: 'rabbitmq',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_1_ENV_CA_CERTIFICATES_JAVA_VERSION: '20161107~bpo8+1',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_1_PORT_9300_TCP: 'tcp://es:9300',  npm_config_userconfig: '/root/.npmrc',  npm_config_cache_max: 'Infinity',  npm_package_scripts_serve: 'NODE_ENV=production node ./build/index.js',  ESB_RABBITMQ_1_PORT_5672_TCP: 'tcp://rabbitmq:5672',  ESB_RABBITMQ_1_PORT_15671_TCP_PORT: '15671',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_1_ENV_JAVA_VERSION: '8u121',  APPLICATION_SERVICE_APPLICATION_MONGODB_APPLICATION_MONGODB_DATA_1_NAME: '/cattle/application-service-application-mongodb-application-mongodb-data-1',  TRANSUNION_PRIVATE_KEY: 'dSCDfCXrgeXqNn9/7VUKzGqiN6smu/1rEwi9zTkNOqj1hbwTk5L73wIlESYuy6y4px6/iV2gkGEcR2lB11B1jQ==',  npm_config_tmp: '/tmp',  npm_config_init_author_url: '',  npm_config_init_author_name: '',  npm_config_engine_strict: '',  npm_package_dependencies_moment: '^2.15.1',  ESB_RABBITMQ_1_PORT_15672_TCP_PORT: '15672',  ESB_RABBITMQ_1_PORT_15671_TCP_PROTO: 'tcp',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_1_NAME: '/cattle/rancher-elastic-mongo-elasticsearch-1',  npm_config_usage: '',  npm_config_save_dev: '',  npm_config_depth: 'Infinity',  npm_package_devDependencies_eslint_plugin_async_await: '0.0.0',  npm_package_description: '',  ESB_RABBITMQ_1_PORT_15672_TCP_PROTO: 'tcp',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_ELASTICSEARCH_DATA_1_ENV_LANG: 'C.UTF-8',  ESB_RABBITMQ_1_PORT_25672_TCP_PORT: '25672',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_1_ENV_ELASTICSEARCH_VERSION: '2.4.4',  ESB_RABBITMQ_1_PORT_4369_TCP: 'tcp://rabbitmq:4369',  RABBITMQ_ENV_RABBITMQ_LOGS: '-',  'TRANSUNION_BASE_URL-prod': 'https://www.mysmgateway.com',  MONGO_DB: 'db',  npm_config_https_proxy: '',  npm_config_progress: 'true',  npm_config_cafile: '',  npm_package_devDependencies_babel_plugin_transform_decorators_legacy: '^1.3.4',  npm_package_devDependencies_babel_istanbul: '^0.11.0',  ESB_RABBITMQ_1_PORT_25672_TCP_PROTO: 'tcp',  TMPDIR: '/build-tmp',  npm_config_onload_script: '',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_ELASTICSEARCH_DATA_1_ENV_JAVA_HOME: '/usr/lib/jvm/java-8-openjdk-amd64/jre',  npm_config_shell: 'bash',  npm_config_save_bundle: '',  npm_config_rebuild_bundle: 'true',  VERSION: 'v5.12.0',  npm_config_prefix: '/usr',  npm_config_dry_run: '',  npm_package_dependencies_stately_js: '^1.3.0',  npm_package_dependencies_express: '^4.14.0',  APPLICATION_SERVICE_APPLICATION_MONGODB_1_NAME: '/cattle/application-service-application-mongodb-1',  RABBITMQ_ENV_GOSU_VERSION: '1.7',  IS_SEND_REQUEST_REMINDER: 'true',  npm_config_versions: '',  npm_config_searchopts: '',  npm_config_scope: '',  npm_config_save_optional: '',  npm_config_registry: 'https://registry.npmjs.org/',  npm_config_cache_lock_wait: '10000',  npm_config_browser: '',  npm_package_devDependencies_babel_plugin_transform_regenerator: '^6.14.0',  npm_package_dependencies_seneca: '^3.0.0',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_1_ENV_JAVA_DEBIAN_VERSION: '8u121-b13-1~bpo8+1',  PROD_ELASTIC: 'http://es:9200',  npm_config_proxy: '',  npm_config_cache: '/root/.npm',  npm_package_devDependencies_expect: '^1.20.2',  npm_package_dependencies_inflect: '^0.3.0',  TERM: 'xterm',  npm_config_version: '',  npm_config_searchsort: 'name',  npm_config_ignore_scripts: '',  npm_config_global_style: '',  npm_package_devDependencies_babel_plugin_transform_class_properties: '^6.16.0',  npm_package_dependencies_mongodb: '^2.2.11',  npm_package_scripts_start_win: 'nodemon src/index.js --watch src --exec babel-node',  npm_package_scripts_start: 'nodemon src/index.js --watch src --exec \'node_modules/.bin/eslint src && node_modules/.bin/babel-node\'',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_ELASTICSEARCH_DATA_1_PORT_9200_TCP_ADDR: 'es',  ESB_RABBITMQ_1_PORT_15671_TCP: 'tcp://rabbitmq:15671',  RABBITMQ_ENV_RABBITMQ_VERSION: '3.6.6',  ES_ENV_LANG: 'C.UTF-8',  npm_config_viewer: 'man',  npm_config_local_address: '',  npm_package_devDependencies_mongoose_mock: '^0.4.0',  npm_package_devDependencies_babel_plugin_transform_async_to_generator: '^6.8.0',  npm_package_dependencies_axios: '^0.14.0',  ESB_RABBITMQ_1_PORT_15672_TCP: 'tcp://rabbitmq:15672',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_ELASTICSEARCH_DATA_1_PORT_9300_TCP_ADDR: 'es',  NODE: '/usr/bin/node',  npm_package_name: 'application-service',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_ELASTICSEARCH_DATA_1_ENV_ELASTICSEARCH_DEB_VERSION: '2.4.4',  RABBITMQ_PORT_5671_TCP_ADDR: 'rabbitmq',  ESB_RABBITMQ_1_PORT_25672_TCP: 'tcp://rabbitmq:25672',  ES_ENV_JAVA_HOME: '/usr/lib/jvm/java-8-openjdk-amd64/jre',  PATH: '/usr/lib/node_modules/npm/bin/node-gyp-bin:/build-tmp/node_modules/.bin:/usr/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin',  npm_config_color: 'true',  npm_package_devDependencies_babel_polyfill: '^6.20.0',  npm_package_dependencies_babel_runtime: '^6.11.6',  NPM_VERSION: '3',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_ELASTICSEARCH_DATA_1_ENV_GOSU_VERSION: '1.7',  RABBITMQ_PORT_5672_TCP_ADDR: 'rabbitmq',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_ELASTICSEARCH_DATA_1_PORT_9200_TCP_PORT: '9200',  npm_config_maxsockets: '50',  npm_config_fetch_retry_mintimeout: '10000',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_ELASTICSEARCH_DATA_1_PORT_9300_TCP_PORT: '9300',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_ELASTICSEARCH_DATA_1_PORT_9200_TCP_PROTO: 'tcp',  npm_config_umask: '0022',  npm_package_devDependencies_pow_mongoose_fixtures: '^0.3.0',  npm_package_devDependencies_eslint: '^3.4.0',  npm_package_dependencies_elasticsearch: '^11.0.1',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_ELASTICSEARCH_DATA_1_PORT_9300_TCP_PROTO: 'tcp',  RABBITMQ_PORT_4369_TCP_ADDR: 'rabbitmq',  RABBITMQ_ENV_RABBITMQ_SASL_LOGS: '-',  RABBITMQ_PORT_5671_TCP_PORT: '5671',  npm_lifecycle_script: 'NODE_ENV=production node ./build/index.js',  npm_config_message: '%s',  npm_config_loglevel: 'warn',  npm_config_fetch_retry_maxtimeout: '60000',  npm_package_devDependencies_babel_plugin_transform_builtin_extend: '^1.1.0',  npm_package_main: 'index.js',  RABBITMQ_PORT_5671_TCP_PROTO: 'tcp',  RABBITMQ_PORT: 'tcp://rabbitmq:15672',  RABBITMQ_PORT_5672_TCP_PORT: '5672',  npm_config_link: '',  npm_config_global: '',  npm_config_cert: '',  npm_config_ca: '',  npm_package_scripts_test_win: 'mocha --require source-map-support/register --compilers js:babel-core/register --recursive',  npm_package_scripts_test: 'NODE_ENV=test node_modules/.bin/mocha --require source-map-support/register --compilers js:babel-core/register --recursive',  ES_PORT_9200_TCP_ADDR: 'es',  RABBITMQ_ENV_RABBITMQ_DEBIAN_VERSION: '3.6.6-1',  RABBITMQ_PORT_5672_TCP_PROTO: 'tcp',  npm_lifecycle_event: 'serve',  npm_config_unicode: '',  npm_config_save: '',  npm_config_also: '',  npm_config_access: '',  npm_package_devDependencies_assert: '^1.4.1',  npm_package_version: '1.0.0',  RABBITMQ_PORT_4369_TCP_PORT: '4369',  ESB_RABBITMQ_1_ENV_RABBITMQ_LOGS: '-',  ES_PORT_9300_TCP_ADDR: 'es',  npm_config_unsafe_perm: '',  npm_config_production: '',  npm_config_long: '',  npm_config_argv: '{"remain":[],"cooked":["run","serve"],"original":["run","serve"]}',  npm_package_scripts_build: 'rm -rf build/** && node_modules/.bin/babel src/ -d build -s',  ES_ENV_ELASTICSEARCH_DEB_VERSION: '2.4.4',  RABBITMQ_NAME: '/cattle/rabbitmq',  RABBITMQ_PORT_4369_TCP_PROTO: 'tcp',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_1_ENV_LANG: 'C.UTF-8',  npm_config_tag: 'latest',  npm_config_node_version: '5.12.0',  npm_package_devDependencies_proxyquire: '^1.7.10',  npm_package_devDependencies_chai_as_promised: '^5.3.0',  npm_package_dependencies_lodash: '^4.15.0',  npm_package_author: '',  ES_ENV_GOSU_VERSION: '1.7',  ES_PORT_9200_TCP_PORT: '9200',  npm_config_shrinkwrap: 'true',  npm_config_git_tag_version: 'true',  npm_package_devDependencies_events: '^1.1.1',  npm_package_devDependencies_babel_preset_stage_0: '^6.5.0',  npm_package_dependencies_source_map_support: '^0.4.11',  ES_PORT_9200_TCP_PROTO: 'tcp',  ES_PORT_9300_TCP_PORT: '9300',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_1_ENV_JAVA_HOME: '/usr/lib/jvm/java-8-openjdk-amd64/jre',  npm_config_strict_ssl: 'true',  npm_config_save_exact: '',  npm_config_proprietary_attribs: 'true',  npm_config_npat: '',  npm_config_fetch_retry_factor: '10',  npm_package_dependencies_mongoose: '^4.6.0',  npm_package_license: 'ISC',  ES_PORT_9300_TCP_PROTO: 'tcp',  RABBITMQ_PORT_15671_TCP_ADDR: 'rabbitmq',  ESB_RABBITMQ_1_ENV_GOSU_VERSION: '1.7',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_ELASTICSEARCH_DATA_1_PORT_9200_TCP: 'tcp://es:9200',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_ELASTICSEARCH_DATA_1_PORT: 'tcp://es:9200',  npm_config_parseable: '',  npm_config_init_module: '/root/.npm-init.js',  npm_config_globalconfig: '/usr/etc/npmrc',  npm_config_dev: '',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_ELASTICSEARCH_DATA_1_PORT_9300_TCP: 'tcp://es:9300',  RABBITMQ_PORT_15672_TCP_ADDR: 'rabbitmq',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_ELASTICSEARCH_DATA_1_ENV_CA_CERTIFICATES_JAVA_VERSION: '20161107~bpo8+1',  PWD: '/build-tmp',  npm_execpath: '/usr/lib/node_modules/npm/bin/npm-cli.js',  npm_config_globalignorefile: '/usr/etc/npmignore',  npm_package_dependencies_base64_url: '^1.3.3',  RABBITMQ_PORT_5671_TCP: 'tcp://rabbitmq:5671',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_ELASTICSEARCH_DATA_1_ENV_JAVA_VERSION: '8u121',  RABBITMQ_PORT_25672_TCP_ADDR: 'rabbitmq',  npm_config_cache_lock_retries: '10',  RABBITMQ_PORT_5672_TCP: 'tcp://rabbitmq:5672',  RABBITMQ_PORT_15671_TCP_PORT: '15671',  ESB_RABBITMQ_1_ENV_RABBITMQ_VERSION: '3.6.6',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_ELASTICSEARCH_DATA_1_NAME: '/cattle/rancher-elastic-mongo-elasticsearch-elasticsearch-data-1',  UAT: 'true',  npm_config_save_prefix: '^',  npm_package_scripts_test_scope: 'NODE_ENV=test node_modules/.bin/mocha --grep RequestedSubmission_Service --require source-map-support/register --compilers js:babel-core/register --recursive',  RABBITMQ_PORT_15672_TCP_PORT: '15672',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_1_PORT_9200_TCP_ADDR: 'es',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_ELASTICSEARCH_DATA_1_ENV_ELASTICSEARCH_VERSION: '2.4.4',  RABBITMQ_PORT_15671_TCP_PROTO: 'tcp',  npm_config_searchexclude: '',  npm_config_init_author_email: '',  npm_config_group: '',  npm_package_devDependencies_babel_plugin_transform_runtime: '^6.15.0',  npm_package_devDependencies_babel_eslint: '^6.1.2',  RABBITMQ_PORT_15672_TCP_PROTO: 'tcp',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_1_PORT_9300_TCP_ADDR: 'es',  RABBITMQ_PORT_4369_TCP: 'tcp://rabbitmq:4369',  RABBITMQ_PORT_25672_TCP_PORT: '25672',  ESB_RABBITMQ_1_PORT_5671_TCP_ADDR: 'rabbitmq',  TRANSUNION_PARTNER_ID: '151',  NODE_ENV: 'production',  npm_config_optional: 'true',  npm_config_git: 'git',  npm_package_devDependencies_sinon: '^1.17.6',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_1_ENV_ELASTICSEARCH_DEB_VERSION: '2.4.4',  ESB_RABBITMQ_1_PORT_5672_TCP_ADDR: 'rabbitmq',  RABBITMQ_PORT_25672_TCP_PROTO: 'tcp',  npm_config_json: '',  npm_package_devDependencies_babel_plugin_module_resolver: '^2.2.0',  npm_package_dependencies_morgan: '^1.7.0',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_1_ENV_GOSU_VERSION: '1.7',  ES_PORT_9200_TCP: 'tcp://es:9200',  RANCHER_ELASTIC_MONGO_ELASTICSEARCH_1_PORT_9200_TCP_PORT: '9200',  ES_PORT: 'tcp://es:9200' }




SENECA TERMINATED (on timeout) at 2017-05-02T11:26:28.984Z.



npm ERR! Linux 4.4.0-72-generic
npm ERR! argv "/usr/bin/node" "/usr/bin/npm" "run" "serve"
npm ERR! node v5.12.0
npm ERR! npm  v3.10.3
npm ERR! code ELIFECYCLE
npm ERR! [email protected] serve: `NODE_ENV=production node ./build/index.js`
npm ERR! Exit status 2
npm ERR!
npm ERR! Failed at the [email protected] serve script 'NODE_ENV=production node ./build/index.js'.
npm ERR! Make sure you have the latest version of node.js and npm installed.
npm ERR! If you do, this is most likely a problem with the application-service package,
npm ERR! not with npm itself.
npm ERR! Tell the author that this fails on your system:
npm ERR!     NODE_ENV=production node ./build/index.js
npm ERR! You can get information on how to open an issue for this project with:
npm ERR!     npm bugs application-service
npm ERR! Or if that isn't available, you can get their info via:
npm ERR!     npm owner ls application-service
npm ERR! There is likely additional logging output above.

npm ERR! Please include the following file with any support request:
npm ERR!     /build-tmp/npm-debug.log

Because I dockertized all my services and there're only newly build containers show this error, I assume it must be the incompatible version of dependencies, this is the dependencies on the working container:

and this is, for the not-working-version;

notice the version of amqpuri dependency,

I think there should be the dependency lock or some kind??

right now, my fix was that I need to use npm shrinkwrap to create a dependency lock file

npm-shrinkwrap.json in your project directory,

{
  "name": "you-app-name",
  "version": "1.0.0",
  "dependencies": {
    "seneca-amqp-transport": {
      "version": "1.1.1",
      "from": "seneca-amqp-transport@>=1.0.1 <2.0.0",
      "resolved": "https://registry.npmjs.org/seneca-amqp-transport/-/seneca-amqp-transport-1.1.1.tgz",
      "dependencies": {
        "amqpuri": {
          "from": "[email protected]",
          "version": "1.0.3"
        }
      }
    }
  }
}

after a clean npm install the amqp transport will continue to work

Multiple Clients on for same pin

This is my set up. I have three microservices. One listener and two clients.

I am trying to create two clients to listen to the same pin 'register:*' . When one client is active, running another client throws an error.

Same as the example files:

When I run one instance of listener and one instance of client. It works fine.
But when I try to run another instance of client, it throws an error.

Document proper usage and API

Snippets found on README.md show examples of usage for a particular setup and doesn't showcase configuration options. Also, they don't work out of the box.

Another problem with the examples is that they somewhat contradict the following from the Getting Started guide on senecajs.org:

You export the plugin definition function and then call seneca.use with the name of the file. You can either require it in or if you like to be terse, let Seneca make the require call.

This doesn't hold true for this plugin as it expects the exported function of the module to be bound to the seneca instance:

module.exports = function(opts) {
    var seneca = this;
    var plugin = 'amqp-transport';
    var so = seneca.options();
    // ...

So, actually, you cannot require the module manually and must let seneca do that.

Anyway, this:

require('seneca')()
    .use(require('..'))
    .add({generate: 'id'}, function(message, done) {
        done(null, {pid: process.pid, id: '' + Math.random()});
    })
    .listen({type: 'amqp'});

Should really be something like:

require('seneca')()
      .use('seneca-amqp-transport', {
    amqp: {
      url: 'amqp://<username>:<password>@<hostname>:<port>'
    })
    .add({generate: 'id'}, function(message, done) {
        done(null, {pid: process.pid, id: '' + Math.random()});
    })
    .listen({type: 'amqp'});

Typo in the npm module uri parsing code

Hello,

I noticed that the development branch fixes the issue with the url parsing (join('://') instead of join('//')), was wondering if you plan on merging it to master soon?

Thank you for the plugin!

pin vs. pins

@disintegrator @geek Is there any difference between defining multiple clients declaring a pin and just a single client with a pins array? Concretely, I am comparing the following scenarios:

Multiple clients

var seneca = require('seneca')()
  .use('seneca-amqp-transport')
  .client({
    type: 'amqp',
    pin: 'name:contact'
  })
  .client({
    type: 'amqp',
    pin: 'role:email'
  });

Single client with an array of pins

var seneca = require('seneca')()
  .use('seneca-amqp-transport')
  .client({
    type: 'amqp',
    pins: ['name:contact', 'role:email']
  });

I'm asking this, because, currently, the plugin doesn't show the same behavior in those scenarios. First one will create two clients, each one with its own res queue; while the second one, creates only one.

Same question goes for listeners, actually. What about those?

Remove the need for pins

@rjrodger and @mcdonnelldean I've been looking for a way to avoid the need for users to set pins on clients and/or listeners (I believe your plans include deprecating them in a not-so-distant future?). Currently, they don't really serve much purpose other than feeding the exported transport/utils functions, which make pins mandatory.

So, here's my conceptual idea: instead of relying on pins for generating queues and routing keys in the AMQP world, I'd like to use the actual action patterns.

For instance, in a simple scenario with a microservice containing:

# listener.js
require('seneca')()
  .use('seneca-amqp-transport')
  .add('cmd:salute', function(req, done) {
    return done(null, { message: `${Hello req.name}!` });
  });
  .add('cmd:sum', function(req, done) {
    return done(null, { sum: req.a + req.b });
  })
  .add('cmd:log,level:*', function(req, done) {
    console[req.level](req.message);
    return done(null, { ok: true });
  });

I'd like to generate one AMQP queue bound to some topic exchange with routing keys cmd.salute, cmd.sum, cmd.log.level.*. The thing is, I wasn't able to find a way to obtain all those patterns at plugin initialization. I know seneca.list() returns an array of all declared topics, but that also includes all "internal" patterns (like role:transport,hook:listen or role:seneca,cmd:close) - I'm only interested in "user defined" actions, and AFAIK there's no way to distinguish those from others. There's also no event emitted when adding an action to the actmap I could potentially listen to.

One approach I could think of is defining some actmeta.plugin_name when declaring actions, such as:

require('seneca')()
  .use('seneca-amqp-transport')
  .add('cmd:salute', function(req, done) {
    return done(null, { message: `${Hello req.name}!` });
  }, { plugin_name: 'amqp-transport' });

And then leverage private$.stats.actmap to query for acts containing that name. But it's (very) far from an ideal solution.

Any thoughts on this? It'd be much appreciated.

Dev branch

When are you expecting to push the refactor?

Every queue has a consume rate of at best 25 messages per second. How can I increase this?

Hello,

I am using SenecaJS with this plugin and RabbitMQ. When I did some benchmarking to understand why at best I had 25 messages per second handled on each queue, I found out that it takes 40ms for a message to go and come back.
For exemple, a microservice ask for an object to another microservice (Basically a mongodb query).
The act callback is called 40ms after when Mongodb query only takes 2ms to complete.
So I now understand why on RabbitMQ admin interface, I have at best 25 messages handled per second per queue: 40ms * 25 = 1 second.

I use the default config for Amqp plugin and SenecaJS. All microservices and RabbiMQ are on the same server.
Each microservice has only one instance running.

Is this a normal behaviour?
Is it possible to only increase the rate by modifying the parameters of Seneca AMQP Transport, SenacaJS and/or RabbitMQ ?

Handling Error in Listener Callback

Hi.

I am sure that there is a lot of discussion happening around error handling at senecajs/seneca#398

But I wanted to know if the below scenario is inherent to amqp-transport.

So a microservice

seneca.add('otp:verify', function (args, cb) {

  redis.pttl(`otp:${args.id}`).then(function (time) {
    console.log(typeof time, time);
    if (time <= 0) {
      return cb({success:false,errorCode:1002,message:'OTP EXPIRED'});
    }

  });

});

Calling the above throws an error and kills the process.

Seneca Fatal Error
==================

Message: seneca: Action otp:verify failed: gate-executor: action-error.

Code: act_execute

Details: { id: 'np6rnxa4j9zy/gbn0aq2wlxid',
  gate: false,
  ungate: true,
  desc: 'otp:verify',
  cb: [Function],
  plugin: { name: 'root$', tag: undefined },
  fn: [Function],
  time: { start: 1461393871342, end: 1461393871344 },
  message: 'gate-executor: action-error',
  pattern: 'otp:verify',
  instance: 'Seneca/0.7.2/xslhf5tt28gk/1461393809134/3107/-',
  'orig$': 
   { [Error: gate-executor: action-error]
     eraro: true,
     orig: null,
     code: 'unknown',
     'gate-executor': true,
     package: 'gate-executor',
     msg: 'gate-executor: action-error',
     details: 
      { id: 'np6rnxa4j9zy/gbn0aq2wlxid',
        gate: false,
        ungate: true,
        desc: 'otp:verify',
        cb: [Function],
        plugin: { name: 'root$', tag: undefined },
        fn: [Function],
        time: { start: 1461393871342, end: 1461393871344 } },
     callpoint: 'at /home/zapstitch/IdeaProjects/plento_ecospace/otp/node_modules/seneca/node_modules/gate-executor/gate-executor.js:122:23' },
  'message$': 'gate-executor: action-error' }

On the client side, I get the seneca timeout error.

Now I do the below changes

  seneca.add('otp:verify', function (args, cb) {

  redis.pttl(`otp:${args.id}`).then(function (time) {
    console.log(typeof time, time);
    if (time <= 0) {

      var error = new Error('OTP EXPIRED');
      return cb(error);
    }


  });

});

Also it throws an error and kills the process.

Seneca Fatal Error
==================

Message: seneca: Action otp:verify failed: OTP EXPIRED.

Code: act_execute

Details: { id: 'np6rnxa4j9zy/gbn0aq2wlxid',
  gate: false,
  ungate: true,
  desc: 'otp:verify',
  cb: [Function],
  plugin: { name: 'root$', tag: undefined },
  fn: [Function],
  time: { start: 1461394294407, end: 1461394294408 },
  'orig$': [Error: OTP EXPIRED],
  'message$': 'OTP EXPIRED',
  message: 'OTP EXPIRED',
  pattern: 'otp:verify',
  instance: 'Seneca/0.7.2/6pgvd4zmdfta/1461394293874/3510/-' }

Stack: 
    at /home/zapstitch/IdeaProjects/ecospace/otp/modules/generate_otp/services/generate_otp.js:24:19
    at tryCatcher (/home/zapstitch/IdeaProjects/ecospace/otp/node_modules/ioredis/node_modules/bluebird/js/main/util.js:26:23)
    at Promise._settlePromiseFromHandler (/home/zapstitch/IdeaProjects/ecospace/otp/node_modules/ioredis/node_modules/bluebird/js/main/promise.js:507:31)
    at Promise._settlePromiseAt (/home/zapstitch/IdeaProjects/ecospace/otp/node_modules/ioredis/node_modules/bluebird/js/main/promise.js:581:18)
    at Promise._settlePromises (/home/zapstitch/IdeaProjects/ecospace/otp/node_modules/ioredis/node_modules/bluebird/js/main/promise.js:697:14)
    at Async._drainQueue (/home/zapstitch/IdeaProjects/ecospace/otp/node_modules/ioredis/node_modules/bluebird/js/main/async.js:123:16)
    at Async._drainQueues (/home/zapstitch/IdeaProjects/ecospace/otp/node_modules/ioredis/node_modules/bluebird/js/main/async.js:133:10)
    at Immediate.Async.drainQueues [as _onImmediate] (/home/zapstitch/IdeaProjects/ecospace/otp/node_modules/ioredis/node_modules/bluebird/js/main/async.js:15:14)
    at processImmediate [as _immediateCallback] (timers.js:383:17)

Instance: Seneca/0.7.2/6pgvd4zmdfta/1461394293874/3510/-
  ALL ERRORS FATAL: action called with argument fatal$:true (probably a plugin init error, or using a plugin seneca instance, see senecajs.org/fatal.html)
    at act_done (/home/zapstitch/IdeaProjects/ecospace/otp/node_modules/seneca/seneca.js:1529:29)

When: 2016-04-23T06:51:34.417Z

Log: [sys,seneca,0.7.2,6pgvd4zmdfta/1461394293874/3510/-,act_execute,seneca: Action otp:verify failed: OTP EXPIRED.,

What is the right approach here?

RabbitMQ: Unknown delivery tag 1

Hello,

I just came across this error, the reason why I'm opening an issue is because it seems to have some sort of relation to the AMQP transport plugin, even though I'm not sure why it happened only now, since I've been using it for quite some time in the same way, and never had this happen.

Here is the error log from the console:

api-server_1             | 23:29:34 api-server-0 (node:16) Warning: a promise was created in a handler at /home/service/api-server/node_modules/amqplib/lib/channel.js:316:9 but was not returned from it, see http://goo.gl/rRqMUw
api-server_1             | 23:29:34 api-server-0     at ret (eval at <anonymous> (/home/service/api-server/node_modules/bluebird/js/release/promisify.js:184:12), <anonymous>:8:21)
rabbitmq-api_1           | 
rabbitmq-api_1           | =ERROR REPORT==== 21-Sep-2016::23:29:34 ===
rabbitmq-api_1           | Channel error on connection <0.757.0> (172.18.0.13:33284 -> 172.18.0.4:5672, vhost: '/', user: 'guest'), channel 1:
rabbitmq-api_1           | operation basic.ack caused a channel exception precondition_failed: "unknown delivery tag 1"
api-server_1             | 23:29:34 api-server-0 ["client","unknown_message_id",{"pin":"role:api,path:tokens","type":"amqp","url":"amqp://rabbitmq-api","pg":"path:tokens,role:api","id":"pg:path:tokens,role:api,pin:role:api,path:tokens,type:amqp,url:amqp://rabbitmq-api","role":"transport","hook":"client","exchange":{"type":"topic","name":"seneca.topic","options":{"durable":true,"autoDelete":false}},"queues":{"action":{"prefix":"seneca","separator":".","options":{"durable":true}},"response":{"prefix":"seneca.res","separator":".","options":{"autoDelete":true,"exclusive":true}}}},{"id":"6hbvam631og7/q04ynyzdhmyg","kind":"res","origin":"i12msb2n0ocm/1474500567450/16/3.0.0/-","accept":"5nbd5oa1lyjg/1474500567307/16/3.1.0/-","track":["i12msb2n0ocm/1474500567450/16/3.0.0/-"],"time":{"client_sent":1474500574358,"listen_recv":1474500574363,"listen_sent":1474500574405,"client_recv":1474500574439},"sync":true,"res":{"data":{"id":"19fb83a0-891c-4248-ba0a-d8fc04de1a43","type":"password:update","userId":"e3bce0ea-b203-4776-a204-5003021e55ea"}}}]
api-tokens_1             | 23:29:34 0|api-toke | {"notice":"seneca: Action hook:listen,role:transport,type:amqp failed: Channel closed by server: 406 (PRECONDITION-FAILED) with message \"PRECONDITION_FAILED - unknown delivery tag 1\".","code":"act_execute","err":{"code":"act_execute","eraro":true,"orig":{"code":406},"seneca":true,"package":"seneca","msg":"seneca: Action hook:listen,role:transport,type:amqp failed: Channel closed by server: 406 (PRECONDITION-FAILED) with message \"PRECONDITION_FAILED - unknown delivery tag 1\".","details":{"message":"Channel closed by server: 406 (PRECONDITION-FAILED) with message \"PRECONDITION_FAILED - unknown delivery tag 1\"","pattern":"hook:listen,role:transport,type:amqp","instance":"Seneca/5nbd5oa1lyjg/1474500567307/16/3.1.0/-","orig$":{"code":406},"message$":"Channel closed by server: 406 (PRECONDITION-FAILED) with message \"PRECONDITION_FAILED - unknown delivery tag 1\"","plugin":{}},"callpoint":"at Channel.C.accept (/home/service/api-tokens/node_modules/amqplib/lib/channel.js:406:17)"},"actid":"kyk7t76f8hkz/a4te2ck5gegj","msg":{"pin":"role:api,path:tokens","type":"amqp","url":"amqp://rabbitmq-api","msgprefix":"seneca_","callmax":111111,"msgidlen":12,"role":"transport","hook":"listen","plugin$":{"name":"transport","tag":"-"},"tx$":"a4te2ck5gegj","meta$":{"id":"m7m1nlbrunu5/a4te2ck5gegj","tx":"a4te2ck5gegj","start":1474500567742,"pattern":"hook:listen,role:transport,type:amqp","action":"(eftl2hmek99h)","entry":true,"chain":[],"sync":true,"plugin_name":"amqp-transport","plugin_tag":"-"}},"entry":true,"prior":[],"meta":{"plugin_name":"amqp-transport","plugin_tag":"-","plugin_fullname":"amqp-transport","raw":{"role":"transport","hook":"listen","type":"amqp"},"sub":false,"client":false,"args":{"role":"transport","hook":"listen","type":"amqp"},"rules":{},"id":"(eftl2hmek99h)","pattern":"hook:listen,role:transport,type:amqp","msgcanon":{"hook":"listen","role":"transport","type":"amqp"},"priorpath":""},"client":false,"listen":false,"transport":{},"kind":"act","case":"ERR","duration":6698,"level":"error","plugin_name":"transport","plugin_tag":"-","pattern":"cmd:listen,role:transport","seneca":"5nbd5oa1lyjg/1474500567307/16/3.1.0/-","when":1474500574459}
api-tokens_1             | 23:29:34 0|api-toke | {"notice":"seneca: Action hook:listen,role:transport,type:amqp failed: Channel closed by server: 406 (PRECONDITION-FAILED) with message \"PRECONDITION_FAILED - unknown delivery tag 1\".","code":"act_execute","err":{"code":"act_execute","eraro":true,"orig":{"code":406},"seneca":true,"package":"seneca","msg":"seneca: Action hook:listen,role:transport,type:amqp failed: Channel closed by server: 406 (PRECONDITION-FAILED) with message \"PRECONDITION_FAILED - unknown delivery tag 1\".","details":{"message":"Channel closed by server: 406 (PRECONDITION-FAILED) with message \"PRECONDITION_FAILED - unknown delivery tag 1\"","pattern":"hook:listen,role:transport,type:amqp","instance":"Seneca/5nbd5oa1lyjg/1474500567307/16/3.1.0/-","orig$":{"code":406},"message$":"Channel closed by server: 406 (PRECONDITION-FAILED) with message \"PRECONDITION_FAILED - unknown delivery tag 1\"","plugin":{}},"callpoint":"at Channel.C.accept (/home/service/api-tokens/node_modules/amqplib/lib/channel.js:406:17)"},"actid":"kyk7t76f8hkz/a4te2ck5gegj","msg":{"config":{"pin":"role:api,path:tokens","type":"amqp","url":"amqp://rabbitmq-api","msgprefix":"seneca_","callmax":111111,"msgidlen":12},"gate$":true,"role":"transport","cmd":"listen","meta$":{"id":"kyk7t76f8hkz/a4te2ck5gegj","tx":"a4te2ck5gegj","start":1474500567741,"pattern":"cmd:listen,role:transport","action":"(a2ye4muxbpk8)","entry":true,"chain":[],"sync":true,"plugin_name":"transport","plugin_tag":"-"},"plugin$":{"name":"transport","tag":"-"},"tx$":"a4te2ck5gegj"},"entry":true,"prior":[],"gate":true,"meta":{"plugin_name":"transport","plugin_tag":"-","plugin_fullname":"transport","raw":{"role":"transport","cmd":"listen"},"sub":false,"client":false,"args":{"role":"transport","cmd":"listen"},"rules":{},"id":"(a2ye4muxbpk8)","pattern":"cmd:listen,role:transport","msgcanon":{"cmd":"listen","role":"transport"},"priorpath":""},"client":false,"listen":false,"transport":{},"kind":"act","case":"ERR","duration":6719,"level":"error","seneca":"5nbd5oa1lyjg/1474500567307/16/3.1.0/-","when":1474500574460}
api-tokens_1             | 23:29:34 0|api-toke | []
api-tokens_1             | 23:29:34 0|api-toke | ["Channel failed to close",{"message":"Channel closed","stack":"IllegalOperationError: Channel closed\n    at Channel.<anonymous> (/home/service/api-tokens/node_modules/amqplib/lib/channel.js:149:11)\n    at Channel.C.closeBecause (/home/service/api-tokens/node_modules/amqplib/lib/channel.js:201:8)\n    at Channel.C.close (/home/service/api-tokens/node_modules/amqplib/lib/channel_model.js:81:8)\n    at Seneca.<anonymous> (/home/service/api-tokens/node_modules/seneca-amqp-transport/lib/hook.js:18:29)\n    at Array.done (/home/service/api-tokens/node_modules/seneca/seneca.js:950:22)\n    at validate_action_message (/home/service/api-tokens/node_modules/seneca/seneca.js:1398:19)\n    at execute_action (/home/service/api-tokens/node_modules/seneca/seneca.js:880:7)\n    at Object.execspec.fn [as orig_fn] (/home/service/api-tokens/node_modules/seneca/seneca.js:1084:11)\n    at Object.timeout_fn [as fn] (/home/service/api-tokens/node_modules/gate-executor/gate-executor.js:194:12)\n    at next (/home/service/api-tokens/node_modules/gate-executor/gate-executor.js:126:14)\n    at process (/home/service/api-tokens/node_modules/gate-executor/gate-executor.js:175:12)\n    at Immediate.<anonymous> (/home/service/api-tokens/node_modules/gate-executor/gate-executor.js:165:13)\n    at runCallback (timers.js:574:20)\n    at tryOnImmediate (timers.js:554:5)\n    at processImmediate [as _immediateCallback] (timers.js:533:5)","stackAtStateChange":"Stack capture: Channel closed by server: 406 (PRECONDITION-FAILED) with message \"PRECONDITION_FAILED - unknown delivery tag 1\"\n    at Channel.C.accept (/home/service/api-tokens/node_modules/amqplib/lib/channel.js:410:13)\n    at Connection.mainAccept [as accept] (/home/service/api-tokens/node_modules/amqplib/lib/connection.js:63:33)\n    at Socket.go (/home/service/api-tokens/node_modules/amqplib/lib/connection.js:476:48)\n    at emitNone (events.js:86:13)\n    at Socket.emit (events.js:185:7)\n    at emitReadable_ (_stream_readable.js:433:10)\n    at emitReadable (_stream_readable.js:427:7)\n    at readableAddChunk (_stream_readable.js:188:13)\n    at Socket.Readable.push (_stream_readable.js:135:10)\n    at TCP.onread (net.js:542:20)"}]
api-tokens_1             | 23:29:34 0|api-toke | Seneca Fatal Error
api-tokens_1             | 23:29:34 0|api-toke | ==================
api-tokens_1             | 23:29:34 0|api-toke | Message: seneca: Channel closed by server: 406 (PRECONDITION-FAILED) with message "PRECONDITION_FAILED - unknown delivery tag 1"
api-tokens_1             | 23:29:34 0|api-toke | Code: transport_listen
api-tokens_1             | 23:29:34 0|api-toke | Details: { pin: 'role:api,path:tokens',
api-tokens_1             | 23:29:34 0|api-toke |   type: 'amqp',
api-tokens_1             | 23:29:34 0|api-toke |   url: 'amqp://rabbitmq-api',
api-tokens_1             | 23:29:34 0|api-toke |   msgprefix: 'seneca_',
api-tokens_1             | 23:29:34 0|api-toke |   callmax: 111111,
api-tokens_1             | 23:29:34 0|api-toke |   msgidlen: 12,
api-tokens_1             | 23:29:34 0|api-toke |   'orig$': 
api-tokens_1             | 23:29:34 0|api-toke |    { Error: Channel closed by server: 406 (PRECONDITION-FAILED) with message "PRECONDITION_FAILED - unknown delivery tag 1"
api-tokens_1             | 23:29:34 0|api-toke |        at Channel.C.accept (/home/service/api-tokens/node_modules/amqplib/lib/channel.js:406:17)
api-tokens_1             | 23:29:34 0|api-toke |        at Connection.mainAccept [as accept] (/home/service/api-tokens/node_modules/amqplib/lib/connection.js:63:33)
api-tokens_1             | 23:29:34 0|api-toke |        at Socket.go (/home/service/api-tokens/node_modules/amqplib/lib/connection.js:476:48)
api-tokens_1             | 23:29:34 0|api-toke |        at emitNone (events.js:86:13)
api-tokens_1             | 23:29:34 0|api-toke |        at Socket.emit (events.js:185:7)
api-tokens_1             | 23:29:34 0|api-toke |        at emitReadable_ (_stream_readable.js:433:10)
api-tokens_1             | 23:29:34 0|api-toke |        at emitReadable (_stream_readable.js:427:7)
api-tokens_1             | 23:29:34 0|api-toke |        at readableAddChunk (_stream_readable.js:188:13)
api-tokens_1             | 23:29:34 0|api-toke |        at Socket.Readable.push (_stream_readable.js:135:10)
api-tokens_1             | 23:29:34 0|api-toke |        at TCP.onread (net.js:542:20)
api-tokens_1             | 23:29:34 0|api-toke |      code: 'act_execute',
api-tokens_1             | 23:29:34 0|api-toke |      eraro: true,
api-tokens_1             | 23:29:34 0|api-toke |      orig: 
api-tokens_1             | 23:29:34 0|api-toke |       { Error: Channel closed by server: 406 (PRECONDITION-FAILED) with message "PRECONDITION_FAILED - unknown delivery tag 1"
api-tokens_1             | 23:29:34 0|api-toke |           at Channel.C.accept (/home/service/api-tokens/node_modules/amqplib/lib/channel.js:406:17)
api-tokens_1             | 23:29:34 0|api-toke |           at Connection.mainAccept [as accept] (/home/service/api-tokens/node_modules/amqplib/lib/connection.js:63:33)
rabbitmq-api_1           | 
rabbitmq-api_1           | =WARNING REPORT==== 21-Sep-2016::23:29:34 ===
rabbitmq-api_1           | closing AMQP connection <0.757.0> (172.18.0.13:33284 -> 172.18.0.4:5672):
rabbitmq-api_1           | client unexpectedly closed TCP connection
api-tokens_1             | 23:29:34 0|api-toke |           at Socket.go (/home/service/api-tokens/node_modules/amqplib/lib/connection.js:476:48)
api-tokens_1             | 23:29:34 0|api-toke |           at emitNone (events.js:86:13)
api-tokens_1             | 23:29:34 0|api-toke |           at Socket.emit (events.js:185:7)
api-tokens_1             | 23:29:34 0|api-toke |           at emitReadable_ (_stream_readable.js:433:10)
api-tokens_1             | 23:29:34 0|api-toke |           at emitReadable (_stream_readable.js:427:7)
api-tokens_1             | 23:29:34 0|api-toke |           at readableAddChunk (_stream_readable.js:188:13)
api-tokens_1             | 23:29:34 0|api-toke |           at Socket.Readable.push (_stream_readable.js:135:10)
api-tokens_1             | 23:29:34 0|api-toke |           at TCP.onread (net.js:542:20) code: 406 },
api-tokens_1             | 23:29:34 0|api-toke |      seneca: true,
api-tokens_1             | 23:29:34 0|api-toke |      package: 'seneca',
api-tokens_1             | 23:29:34 0|api-toke |      msg: 'seneca: Action hook:listen,role:transport,type:amqp failed: Channel closed by server: 406 (PRECONDITION-FAILED) with message "PRECONDITION_FAILED - unknown delivery tag 1".',
api-tokens_1             | 23:29:34 0|api-toke |      details: 
api-tokens_1             | 23:29:34 0|api-toke |       { message: 'Channel closed by server: 406 (PRECONDITION-FAILED) with message "PRECONDITION_FAILED - unknown delivery tag 1"',
api-tokens_1             | 23:29:34 0|api-toke |         pattern: 'hook:listen,role:transport,type:amqp',
api-tokens_1             | 23:29:34 0|api-toke |         fn: [Function],
api-tokens_1             | 23:29:34 0|api-toke |         cb: 
api-tokens_1             | 23:29:34 0|api-toke |          { [Function: bound ]
api-tokens_1             | 23:29:34 0|api-toke |            seneca: 
api-tokens_1             | 23:29:34 0|api-toke |             Seneca {
api-tokens_1             | 23:29:34 0|api-toke |               'private$': {},
api-tokens_1             | 23:29:34 0|api-toke |               did: '(p7ip5527o5xr)',
api-tokens_1             | 23:29:34 0|api-toke |               toString: [Function],
api-tokens_1             | 23:29:34 0|api-toke |               fixedargs: 
api-tokens_1             | 23:29:34 0|api-toke |                { 'plugin$': { name: 'transport', tag: '-' },
api-tokens_1             | 23:29:34 0|api-toke |                  'tx$': 'a4te2ck5gegj' },
api-tokens_1             | 23:29:34 0|api-toke |               delegate: [Function],
api-tokens_1             | 23:29:34 0|api-toke |               context: {},
api-tokens_1             | 23:29:34 0|api-toke |               client: [Function],
api-tokens_1             | 23:29:34 0|api-toke |               listen: [Function],
api-tokens_1             | 23:29:34 0|api-toke |               log: 
api-tokens_1             | 23:29:34 0|api-toke |                { [Function: prepare_log_data]
api-tokens_1             | 23:29:34 0|api-toke |                  debug: [Function: prepare_log_data],
api-tokens_1             | 23:29:34 0|api-toke |                  info: [Function: prepare_log_data],
api-tokens_1             | 23:29:34 0|api-toke |                  warn: [Function: prepare_log_data],
api-tokens_1             | 23:29:34 0|api-toke |                  error: [Function: prepare_log_data],
api-tokens_1             | 23:29:34 0|api-toke |                  fatal: [Function: prepare_log_data] },
api-tokens_1             | 23:29:34 0|api-toke |               prior: [Function],
api-tokens_1             | 23:29:34 0|api-toke |               good: [Function],
api-tokens_1             | 23:29:34 0|api-toke |               bad: [Function] } },
api-tokens_1             | 23:29:34 0|api-toke |         instance: 'Seneca/5nbd5oa1lyjg/1474500567307/16/3.1.0/-',
api-tokens_1             | 23:29:34 0|api-toke |         'orig$': 
api-tokens_1             | 23:29:34 0|api-toke |          { Error: Channel closed by server: 406 (PRECONDITION-FAILED) with message "PRECONDITION_FAILED - unknown delivery tag 1"
api-tokens_1             | 23:29:34 0|api-toke |              at Channel.C.accept (/home/service/api-tokens/node_modules/amqplib/lib/channel.js:406:17)
api-tokens_1             | 23:29:34 0|api-toke |              at Connection.mainAccept [as accept] (/home/service/api-tokens/node_modules/amqplib/lib/connection.js:63:33)
api-tokens_1             | 23:29:34 0|api-toke |              at Socket.go (/home/service/api-tokens/node_modules/amqplib/lib/connection.js:476:48)
api-tokens_1             | 23:29:34 0|api-toke |              at emitNone (events.js:86:13)
api-tokens_1             | 23:29:34 0|api-toke |              at Socket.emit (events.js:185:7)
api-tokens_1             | 23:29:34 0|api-toke |              at emitReadable_ (_stream_readable.js:433:10)
api-tokens_1             | 23:29:34 0|api-toke |              at emitReadable (_stream_readable.js:427:7)
api-tokens_1             | 23:29:34 0|api-toke |              at readableAddChunk (_stream_readable.js:188:13)
api-tokens_1             | 23:29:34 0|api-toke |              at Socket.Readable.push (_stream_readable.js:135:10)
api-tokens_1             | 23:29:34 0|api-toke |              at TCP.onread (net.js:542:20) code: 406 },
api-tokens_1             | 23:29:34 0|api-toke |         'message$': 'Channel closed by server: 406 (PRECONDITION-FAILED) with message "PRECONDITION_FAILED - unknown delivery tag 1"',
api-tokens_1             | 23:29:34 0|api-toke |         plugin: {} },
api-tokens_1             | 23:29:34 0|api-toke |      callpoint: 'at Channel.C.accept (/home/service/api-tokens/node_modules/amqplib/lib/channel.js:406:17)' },
api-tokens_1             | 23:29:34 0|api-toke |   'message$': 'seneca: Action hook:listen,role:transport,type:amqp failed: Channel closed by server: 406 (PRECONDITION-FAILED) with message "PRECONDITION_FAILED - unknown delivery tag 1".' }
api-tokens_1             | 23:29:34 0|api-toke | Stack: 
api-tokens_1             | 23:29:34 0|api-toke |     at Channel.C.accept (/home/service/api-tokens/node_modules/amqplib/lib/channel.js:406:17)
api-tokens_1             | 23:29:34 0|api-toke |     at Connection.mainAccept [as accept] (/home/service/api-tokens/node_modules/amqplib/lib/connection.js:63:33)
api-tokens_1             | 23:29:34 0|api-toke |     at Socket.go (/home/service/api-tokens/node_modules/amqplib/lib/connection.js:476:48)
api-tokens_1             | 23:29:34 0|api-toke |     at emitNone (events.js:86:13)
api-tokens_1             | 23:29:34 0|api-toke |     at Socket.emit (events.js:185:7)
api-tokens_1             | 23:29:34 0|api-toke |     at emitReadable_ (_stream_readable.js:433:10)
api-tokens_1             | 23:29:34 0|api-toke |     at emitReadable (_stream_readable.js:427:7)
api-tokens_1             | 23:29:34 0|api-toke |     at readableAddChunk (_stream_readable.js:188:13)
api-tokens_1             | 23:29:34 0|api-toke |     at Socket.Readable.push (_stream_readable.js:135:10)
api-tokens_1             | 23:29:34 0|api-toke |     at TCP.onread (net.js:542:20)
api-tokens_1             | 23:29:34 0|api-toke | Instance: Seneca/5nbd5oa1lyjg/1474500567307/16/3.1.0/-
api-tokens_1             | 23:29:34 0|api-toke |     at Seneca.<anonymous> (/home/service/api-tokens/node_modules/seneca/lib/transport.js:46:23)
api-tokens_1             | 23:29:34 0|api-toke | When: 2016-09-21T23:29:34.467Z
api-tokens_1             | 23:29:34 0|api-toke | Log: {kind:null,plugin:seneca,tag:3.1.0,id:5nbd5oa1lyjg/1474500567307/16/3.1.0/-,code:transport_listen,notice:seneca
api-tokens_1             | 23:29:34 0|api-toke | Node:
api-tokens_1             | 23:29:34 0|api-toke |   { http_parser: '2.7.0', node: '6.4.0', v8: '5.0.71.60', uv: '1.9.1', zlib: '1.2.8', ares: '1.10.1-DEV', icu: '57.1', modules: '48', openssl: '1.0.2h' },
api-tokens_1             | 23:29:34 0|api-toke |   { debug: false, uv: true, ipv6: true, tls_npn: true, tls_alpn: true, tls_sni: true, tls_ocsp: true, tls: true },
api-tokens_1             | 23:29:34 0|api-toke |   [ 'Binding contextify', 'Binding natives', 'NativeModule events', 'NativeModule util', 'Binding uv', 'NativeModule buffer', 'Binding buffer', 'Binding util', 'NativeModule internal/util', 'NativeModule timers', 'Binding timer_wrap', 'NativeModule internal/linkedlist', 'NativeModule assert', 'NativeModule internal/process', 'Binding config', 'NativeModule internal/process/warning', 'NativeModule internal/process/next_tick', 'NativeModule internal/process/promises', 'NativeModule internal/process/stdio', 'Binding constants', 'NativeModule child_process', 'Binding spawn_sync', 'Binding pipe_wrap', 'NativeModule internal/child_process', 'NativeModule string_decoder', 'NativeModule net', 'NativeModule stream', 'NativeModule _stream_readable', 'NativeModule internal/streams/BufferList', 'NativeModule _stream_writable', 'NativeModule _stream_duplex', 'NativeModule _stream_transform', 'NativeModule _stream_passthrough', 'NativeModule internal/net', 'Binding cares_wrap', 'Binding tty_wrap', 'Binding tcp_wrap', 'Binding stream_wrap', 'NativeModule dgram', 'Binding udp_wrap', 'Binding process_wrap', 'NativeModule internal/socket_list', 'NativeModule cluster', 'NativeModule path', 'NativeModule module', 'NativeModule internal/module', 'NativeModule vm', 'NativeModule fs', 'Binding fs', 'Binding fs_event_wrap', 'NativeModule tty', 'NativeModule domain', 'NativeModule os', 'Binding os', 'Binding signal_wrap', 'NativeModule console', 'NativeModule http', 'NativeModule _http_incoming', 'NativeModule _http_common', 'Binding http_parser', 'NativeModule internal/freelist', 'NativeModule _http_outgoing', 'NativeModule _http_server', 'NativeModule _http_agent', 'NativeModule _http_client', 'NativeModule url', 'Binding icu', 'NativeModule querystring', 'NativeModule https', 'NativeModule tls', 'Binding crypto', 'NativeModule _tls_common', 'NativeModule _tls_wrap', 'NativeModule crypto', 'NativeModule internal/streams/lazy_transform', 'NativeModule _stream_wrap', 'Binding js_stream', 'Binding tls_wrap', 'NativeModule _tls_legacy', 'NativeModule dns' ]
api-tokens_1             | 23:29:34 0|api-toke | Process: 
api-tokens_1             | 23:29:34 0|api-toke |   pid=16, arch=x64, platform=linux,
api-tokens_1             | 23:29:34 0|api-toke |   path=/usr/local/bin/node,
  '--no-daemon' ],node_modules/pm2/lib/ProcessContainer.js',usr/local/bin/node',
  _pm2_version: '2.0.15' }51',]',2/pids/api-tokens-0.pid',or-0.log',n',rvice/api-tokens',
api-tokens_1             | 23:29:34 0|api-toke | SENECA TERMINATED at 2016-09-21T23:29:34.477Z. See above for error report.
api-tokens_1             | 2016-09-21 23:29:34: App name:api-tokens id:0 disconnected
api-tokens_1             | 2016-09-21 23:29:34: App [api-tokens] with id [0] and pid [16], exited with code [1] via signal [SIGINT]
api-tokens_1             | 23:29:34 PM2        | App name:api-tokens id:0 disconnected
api-tokens_1             | 23:29:34 PM2        | App [api-tokens] with id [0] and pid [16], exited with code [1] via signal [SIGINT]

I did some research and it seems to have something to do with acks option when setting up channels, but that's beyond my lvl, so was wondering if you have any immediate ideas?

Thanks again for your work on this!

Support dead lettering of messages

A message created from an .act that didn't reach its destination (eg.: no listener was active or a pattern mismatch) should be routed to a dead letter exchange instead of being silently discarded and lost. Dead lettering should be configurable by the user.

Can't set durable: false or can't specify message TTL

I'm trying to set the queue's durability to false with this:

require('seneca')()
    .use('seneca-amqp-transport', {
        amqp: {
            client: {
                queues: {
                    options: {
                        durable: false
                    }
                }
            }
        }
    });

However, this doesn't do anything. I am also trying to set a message-ttl. Is this even possible?

timeout error not happening in node v6

I tried installing the latest version of seneca on node v6.

I am using seneca-amqp-transport. When I try calling a pattern that does not even exist, the seneca instance does not even throw a timeout error.

However, it works as expected in seneca v2.0.1.

Explicitly setting the timeout as below does not help either.

var seneca = Seneca({
  tag: 'some-name',
  timeout: 500,
  log: 'silent',
  debug: {
    undead: false,
    short_logs: false,
  },
  strict: {
    result: true,
    add: false,
    find: true,
    maxloop: 11
  }
})

seneca-amqp-transport is fragile for production that causes to denial-of-service

there are 2 services set up on different site, both connect to the same rabbitmq server I make the situation roughly here

// service.js
seneca.add({role:'foo',cmd:'bar'},function(args,done){
   done(null,{message:"bar"});
});

seneca.add({role:'foo',cmd:'baz'},function(args,done){
     // done will not be call accidentally in this service
     ....
});

....
// client.js

var app = express();

app.get('/api/bar',function(req,res){
   seneca.act({role:'foo',cmd:'bar'},function(err,resp){
     res.send(resp);
   })
});

app.get('/api/baz',function(req,res){
   seneca.act({role:'foo',cmd:'baz'},function(err,resp){
     res.send(resp);
   })
});

app.listen('3000',function(){
   console.log('listening 3000');
});

....

when I call a service /api/bar It's work well , but when I call /api/baz it's stuck because done was not called It's fine but I expect that when I call /api/bar again it should be work
but it's stuck too and a message in Rabbitmq of /api/bar have not been consumed yet ,calling/api/baz causes a whole service definitely die I have to restart server to go on.

the situation above happens in seneca-amqp-transport but not in http and tcp
Is there any suggestion to handle it? I think timeout is one of solution and important feature

amqp connection timeout

Hi,
I am getting a time out exception when i try to connect a service running locally to rabbit mq instance which is running on a docker container. I could access the uri on the browser through. http://127.0.0.1:15672 works good and brings up the admin screen. am i not setting something?

08:52:14 karmasoc-user-0 Seneca Fatal Error
08:52:14 karmasoc-user-0 ==================
08:52:14 karmasoc-user-0 Message: seneca: gate-executor: [TIMEOUT:cskfllwwm4bi/qniojukx66dg:11111<1469969534677-1469969523565:undefined]
08:52:14 karmasoc-user-0 Code: transport_listen
08:52:14 karmasoc-user-0 Details: { type: 'amqp',
08:52:14 karmasoc-user-0 url: 'amqp://guest:[email protected]:15672',
08:52:14 karmasoc-user-0 pin: 'role:user',
08:52:14 karmasoc-user-0 'orig$':
08:52:14 karmasoc-user-0 { Error: [TIMEOUT:cskfllwwm4bi/qniojukx66dg:11111<1469969534677-1469969523565:undefined]
08:52:14 karmasoc-user-0 at Timeout._onTimeout (/Users/bhaskarferrari/Documents/karmasoc/karmasoc-user/node_modules/gate-executor/gate-executor.js:112:21)
08:52:14 karmasoc-user-0 at tryOnTimeout (timers.js:224:11)

Login Credential

Hi there,

Thanks for the great work! After I updated the RabbitMQ version on our server, it seems that a login credential is required for all connections. I couldn't find a place to add user and password into options, and I am wondering if you have the time to do this small fix? If not, I can still just use the url to send the login credential for creating the connection, but I am afraid that would not be as clear as having the username and password in the options.

Thanks for the great support!

Best,
Richard

Production usage

Are you using this in any of your projects right now? I'm just getting started on Seneca and we use RabbitMQ heavily in production right now. I would love to use this vs standing up and maintaining Kafka and wondering if you have an suggestions or recommendations on setup.

node 8 fails

[email protected] test /home/travis/build/senecajs/seneca-core-plugins-next/seneca-amqp-transport
gulp test
/home/travis/build/senecajs/seneca-core-plugins-next/seneca-amqp-transport/node_modules/require-dir/index.js:93
if (!require.extensions.hasOwnProperty(ext)) {
^
TypeError: require.extensions.hasOwnProperty is not a function
at requireDir (/home/travis/build/senecajs/seneca-core-plugins-next/seneca-amqp-transport/node_modules/require-dir/index.js:93:37)
at Object. (/home/travis/build/senecajs/seneca-core-plugins-next/seneca-amqp-transport/node_modules/gulp-git/index.js:4:18)
at Module._compile (module.js:569:30)
at Object.Module._extensions..js (module.js:580:10)
at Module.load (module.js:503:32)
at tryModuleLoad (module.js:466:12)
at Function.Module._load (module.js:458:3)
at Module.require (module.js:513:17)
at require (internal/module.js:11:18)
at Object. (/home/travis/build/senecajs/seneca-core-plugins-next/seneca-amqp-transport/node_modules/gulp-release/lib/release.js:11:13)

How to make every seneca service (using amqp transport) to have separate queue?

I have noticed that whatever i specify in options only 1 queue is created "seneca_any_act".
At the same time different exchanges are created (routed the queue)

I want to launch multiple seneca services each in separate process. And make them communicate via amqp. That probably assumes that each process should have separate queue.

Is there any way to specify it?

AMQP Transport manage seneca host wrongly with multiple client

Consider this code

var clienta = require('seneca')()
    .use('seneca-amqp-transport')
    .client({
        type: 'amqp',
        url: 'amqp://gues:guest@localhost:5672',
        pin: 'module:a'
    });


var clientb = require('seneca')()
    .use('seneca-amqp-transport')
    .client({
        type: 'amqp',
        url: 'amqp://gues:guest@localhost:5672',
        pin: 'module:b'
    });

for (var i = 0; i < 100; i++) {
    var t1 = Date.now();
    clienta.act('module:a,role:create,foo:1,zed:' + i + ",t1:" + t1, function (err, ret) {
        var when = Date.now();
        var dt = when - ret.when;
        var dt2 = when - ret.t1;
        console.log("server a, with i = " + ret.zed + "; dt1 = " + dt + "; dt2 = " + dt2 + " ;  t1 = " + ret.when + " ; t2= " + when);
    });

    clientb.act('module:b,role:create,foo:1,zed:' + i + ",t1:" + t1, function (err, ret) {
        var when = Date.now();
        var dt = when - ret.when;
        var dt2 = when - ret.t1;
        console.log("server b, with i = " + ret.zed + "; dt1 = " + dt + "; dt2 = " + dt2 + " ;  t1 = " + ret.when + " ; t2= " + when);
    });
}

When running this code, you will see warning invalid_origin. All response from Server A to Client A will be invalid (invalid_origin). The issue is because, when initialize seneca-amqp-transport, maybe in somewhere, the plugin keep seneca host in wrongly way. I guess the issue is in client-hook.

That why only the last client will be alive, all other will be lost.

Support for overlapping and subset Routes

Currently, each service can only provide a fixed set of pins to properly cluster around routes.

Should a listener pin a different subset of the routes, the queues will be distinct, and both respond.

Let's say I have listeners pinned ['foo:1,bar:1','foo:1,bar:2'] and ['foo:1,bar:2','foo:1,bar:3']... If my client is pinned ['foo:1,bar:1','foo:1,bar:2',"foo:1,bar:3'], my message will be received by one of EACH listener. There appears to be no way to support subset routes other than have a different listener for each individual route or guaranteed subset of routes.

I feel the expected behavior should be that the listener should have the option to share queues by action type (if necessary, a unique queue per .add() ), without several calls to .listen() with different subsets.

How to handle connection loss?

What is the best way to recover from a connection loss to the RabbitMq server from a client or from a listener? Currently if I lose the connection, then seneca calls root.die which terminates my application. I need to be able to try to re-establish the connection instead of exiting the application.

Thanks

How can I separate client queues by environment?

Hello,

I have different environments on a same server using the same RabbitMQ instance.
I am using prefix option to prefix queue name for each micro services.

Issue is on the client side. When I add clients to a micro service, from what I understood, the queue name is based on pins. Pins are the same on each environment, so I have conflicts.
Response queue is the same across environment, if I am correct.

I though about adding an option in my micro service config files to also prefix pins.
What's the best practice here?

AMQPS Support

I'm trying to connect to a hosted amqp provider that mandates AMQPS be used. Does this module support AMPQS and if so, is there an example of a working connection?

Debug/test messages?

Wonder if someone could help me understand how I can test the transport by sending messages to rabbitMQ from the outside?

For example, if I have the following, what would I need in the payload to send a Seneca "cmd":

rabbitmqadmin publish exchange='seneca.topic' routing_key='role.demo' payload=?

Support more than 2 arguments for .add / .act callbacks

Example:

// listener
require('seneca')().use('seneca-amqp-transport')
.add({a:1, b:2}, function (msg, reply) {
    reply(null, {z: msg.z}, 123) // <--- 3 arguments here
}).listen({type: 'amqp', pin: 'a:1,b:2', url: 'amqp://guest:guest@localhost:5672/'});

// client
require('seneca')().use('seneca-amqp-transport')
.client({type: 'amqp', pin: 'a:1,b:2', url: 'amqp://guest:guest@localhost:5672/'})
.act('a:1,b:2,z:9', console.log); // should log all arguments

Expected output:

null { z: 9 } 123

Actual output:

null { z: 9 } { id: 'wmkrp7ebp8yd/tjrznrcu10hm',
  accept: 'tkxxb46prhmh/1487592318189/4167/3.3.0/-',
  track: [ '2j0b5xk44e4z/1487592318564/4167/3.3.0/-' ],
  time:
   { client_sent: 1487592318854,
     listen_recv: 1487592318885,
     listen_sent: 1487592318893,
     client_recv: 1487592318896 } }

Seneca supports more than 2 arguments in act callbacks since #340.

Does this allow for communication across docker-compose containers?

I'm trying to set up a few microservices that will sit on different docker containers. My thought is that I can have a rabbitMQ container running and then have each microservice on other containers make actions against and listen (with pins) to the rabbitMQ container. Each container has one seneca instance exported to be used by mircoservices within the container. Each instance is using the same amqp url associated with the rabbitMQ container

I'm able to get a container to make a successful action against itself but all other actions timeout. Each of my containers start their instances of seneca and act with a message to a system container. The system container is able to send and receive its own action but all actions sent to it timeout from other containers timeout.
screen shot 2016-09-07 at 12 53 28 pm
screen shot 2016-09-07 at 12 53 49 pm

Has anyone successfully used the amqp transport across docker-containers?

Allow for fanout/broadcast type behaviour

The current implementation does RPC style pretty well. I have a use case however for a broadcast / fire&forget situation that is really easy to pull of just with RabbitMQ and straight node AMPQ.

  • one exchange of the fanout type

  • many queues

  • the 'act' publishes to the fanout exchange

  • services use the 'add' to registers a queue with the exchange

    Is there a way the plugin currently supports this? Otherwise it would be a handy future addition

Question about scaling/loadbalancing

Hello,

I was wondering what is the best way to scale the microservices system that is using amqp for the transport?

Say I have a high load on my application and I want to introduce more services of the same type and share load between them, as far as I understand this will be problematic because (at least by default) the queues are exclusive so I can't have two services listening on the same pin. Is the proper solution to spin another rabbitmq server?

I see in options there is an "exclusive" property for the response, is changing that to false the proper way to do it? If so, why is it exclusive by the default?

Thank you in advance!

"Round Robin" load balancing

Hi,

I see there's another question on queue broadcast, but what if I just want to "randomly" distribute requests to multiple, equal, micro services? It seems that this is supported in the queue-world, but from what I understand in your documentation, every seneca listener will create its own queue.

I would still want the result of the operation back in my callback, so it's not "fire and forget".

Does this make sense with seneca, and/or is it already possible?

Multiple processors would make sense from a scaling and performance perspective.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.