Giter Club home page Giter Club logo

coworkers's Introduction

coworkers

Build Status Coverage Status js-standard-style

Coworkers is a RabbitMQ microservice framework

Coworkers is a new microservice framework heavily inspired by Koa, which aims to be a simple, robust, and scalable foundation for creating RabbitMQ microservices. Through leveraging generators Coworkers allows you to ditch callbacks and greatly increase error-handling. Coworkers also uses Amqplib, a battle-tested AMQP client, under the hood to communicate with RabbitMQ and has best-practices baked in. Finally, Coworkers enables easy scalability by running each queue-consumer in its own process through node clustering (optional).

Installation

npm install --save coworkers

Note: amqplib is a peer dependency. This give you flexibility in using any compatible version you please. npm@^3 does not install peer dependencies automatically, so you will have to install amqplib yourself.

Usage

Quick Example

const coworkers = require('coworkers')
const app = coworkers()
// shared middlewares
app.use(function * (next) {
  // all consumers will run this logic...
  yield next
})
// queue consumers w/ middlewares
app.queue('foo-queue', function * () {
  // consumer specific logic
  this.ack = true // acknowledge message later, see `Context` documentation below
})
// middleware error handler
app.on('error', function (err) {
  console.error(err.stack)
})
// connect to rabbitmq and begin consuming
app.connect()

Documentation

Application

The Coworkers Application class is the center of RabbitMQ microservice. It keeps track of which queues to consume and generator middlewares. Coworker's middlewares system should feel familiar as it is similar to that of many other http frameworks such as Ruby's Rack, Connect, and is actually powered by the internals of Koa. This middleware system uses generators to make handling asyncronous behavior a breeze. Coworkers is also powered by amqplib, a battle tested RabbitMQ client library, and has many best practices built in.

Methods (documented below):

  • use
  • prefetch
  • queue
  • connect
  • close
Simple consumer example:
const coworkers = require('coworkers')
const app = coworkers()

app.queue('foo-queue', function * () {
  this.ack = true
})
app.on('error', function (err, channel, context) {
  console.error(err)
  if (channel) {
    channel.nack(context.message).catch(function (err) {
      console.error(err)
    })
  }
})

app.connect()

app.use(middleware)

Use the given middleware for all queues consumed by the app.

/**
 * @param  {GeneratorFunction} middleware
 * @return {Application} app
 */

See "Cascading middleware" section (below) for a full example

app.prefetch(count, [global])

Setup prefetch options for the application's consumer channel (consumerChannel). Equivalent to http://www.squaremobius.net/amqp.node/channel_api.html#channel_prefetch.

/**
 * @param  {Number} count  the maximum number of messages sent over the channel that can be awaiting ack
 * @param  {Boolean} [global] flag specifying if prefetch is global (true) or per-channel (false), default: false
 */

app.prefetch example

const coworkers = require('coworkers')
const app = coworkers()

app.prefetch(100, false) // `global` is optional, and defaults to false
app.queue('foo-queue', function * () {
  this.ack = true
})
app.on('error', /* ... */)

app.connect()

app.queue(queueName, [queueOpts], [consumeOpts], ...middlewares)

Setup a queue consumer w/ options and middleware. Queues will be asserted and consumed with the given options in app.connect.

/**
 * @param  {String} queueName queue name for which the middleware will be used
 * @param  {Object} [queueOpts] assert queue options, don't use w/ schema
 * @param  {Object} [consumeOpts] consume options
 * @param  {GeneratorFunction} ...middlewares one middleware is required
 * @return {Application} app
 */
app.queue example
const coworkers = require('coworkers')

// using optional schema
const app = coworkers({ schema: schema })
// add required error handler
app.on('error', function (err) {
  console.error(err.stack)
})
// setup a queue
const queueOpts = {/* queue options */}
const consumeOpts = {/* consume options */}
// correct usage: (note that consumeOpts becomes the second arg)
app.queue('queue0', consumeOpts, function * () {})
// errors
app.queue('queue0', queueOpts, consumeOpts, function * () {})
// Error: 'app.queue() cannot use "queueOpts" when using a schema'
app.queue('queue1', queueOpts, consumeOpts, function * () {})
// Error: 'app.queue() requires "queue1" queue to exist in schema'
app.queue example when using rabbitmq-schema
const coworkers = require('coworkers')
const RabbitSchema = require('rabbitmq-schema')

const schema = new RabbitSchema({
  exchange: 'exchange0',
  type: 'direct',
  options: {},
  bindings: {
    routingPattern: 'foo.bar.key',
    destination: {
      queue: 'queue0',
      messageSchema: {/* message json-schema */},
      options: {/* queue options */}
    },
    args: {}
  }
})

// using optional schema
const app = coworkers(schema)
// add required error handler
app.on('error', function (err) {
  console.error(err.stack)
})
// setup a queue
const consumeOpts = {/* consume options */}
// correct usage: (note that consumeOpts becomes the second arg)
app.queue('queue0', consumeOpts, function * () {})
// errors
const queueOpts = {/* queue options */}
app.queue('queue0', queueOpts, consumeOpts, function * () {})
// Error: 'app.queue() cannot use "queueOpts" when using a schema'
// (It will use the queueOpts from the schema)
app.queue('queue1', consumeOpts, function * () {})
// Error: 'app.queue() requires "queue1" queue to exist in schema'

See "Cascading middleware" section (below) for a full example

Cascading Middleware

Coworker's middleware cascades in a more traditional way as you may be used to with similar tools - this was previously difficult to make user friendly with node's use of callbacks. However with generators we can achieve "true" middleware. Contrasting Connect's implementation which simply passes control through series of functions until one returns, Coworkers yields "downstream", then control flows back "upstream" just like Koa.

The following example ack's all foo-queue messages. First the message flows through trace and parse-content middleware to mark when the request started, parse content, and then yield control through to the foo-queue consumer middleware. When an middleware invokes yield next the function suspends and passes control to the next middleware defined. After there are no more middleware to execute "downstream", the stack will unwind and each middleware is resumed to perform "upstream" behavior (post yield, in reverse order). Note: "shared middlewares" (use) always run before "consumer middlewares" (consume), regardless of attachment order.

Note: if the message reaches the end of the middlewares without an ack (and consumer is not noAck) a special NoAckError will be thrown.

Cascading example:
const app = require('coworkers')()

/* shared middlewares */

// "trace" middleware
app.use(function * (next) {
  this.id = require('crypto').randomBytes(12)
  // save consumer start time
  const startTime = Date.now()
  // move on to next middleware
  yield next
  // all middlewares have finished
  const elapsed = Date.now() - startTime
  console.log(`coworkers-trace:${this.id}:${elapsed}`)
})

// "parse-content" middleware
app.use(function * (next) {
  this.message.content = JSON.parse(this.message.content)
  yield next
})

/* queue consumers w/ middlewares */

// "foo-queue" consumer middleware
app.queue('foo-queue', function * () {
  this.ack = true // checkout `Context` documentation for ack, nack, and more
})

app.connect()

By default, app handles all errors by logging them and nacking messages

Middleware Error Handling

A coworkers application will not start w/out an error handler. Middleware errors are emitted on the app. To setup error-handling logic such as centralized logging you can add an "error" event listener.

Simple error handler example:
app.on('error', function (err, context) {
  log.error(`${context.queueName} consumer error`, err)
})

For special error-handling behavior make use of the properties available on context. Also, make sure to handle errors that can occur in the error handler (they will not be caught).

Robust error handler example:
app.on('error', function (err, context) {
  log.error(`${context.queueName} consumer error`, err)

  // check if the message has not been acked
  // make sure message is not being consumed on a `noAck` queue
  // and check if the exchange is using a dead letter exchange
  const hasDlx = Boolean(context.queueOpts.deadLetterExchange)
  if (!context.messageAcked && hasDlx) {
    let channel = context.consumerChannel // amqplib promise api: http://www.squaremobius.net/amqp.node/channel_api.html#channel
    let message = context.message
    let requeue = false
    // nack the message
    channel.nack(message, false, requeue)
  }
})

app.context

The recommended namespace to extend with information that's useful throughout the lifetime of your application, as opposed to a per request basis.

app.context.db = db();

app.connect(...)

Connect to RabbitMQ, create channels, and consume queues

    1. Creates a connection to rabbitmq
    1. Creates a consumer channel and publisher channel
    1. Begins consuming queues
  • Note on Clustering: If using clustering and isMaster, it will create all workers and wait for them to connect to RabbitMQ. If any of the workers fail to connect to Rabbitmq they will cause master's connect to error.
/**
 * @param {String} [url] rabbitmq connection url, default: 'amqp://127.0.0.1:5672'
 * @param {Object} [socketOptions] socket options
 * @param {Function} [cb] callback, not required if using promises
 * @return {Promise} promise, if no callback is supplied
 */
Successful connect examples:
const app = require('coworkers')()

app.queue('foo-queue', function * () {/*...*/})

// promise api
app.connect() // connects to 'amqp://127.0.0.1:5678' by default, returns promise
  .then(...)
  .catch(...)
// - or -
app.connect('amqp://127.0.0.1:8000') // returns promise
  .then(...)
  .catch(...)
// - or -
const socketOptions = {} // see http://www.squaremobius.net/amqp.node/channel_api.html#connect
app.connect('amqp://127.0.0.1:8000', socketOptions) // returns promise
  .then(...)
  .catch(...)

// callback api
app.connect(callback) // connects to 'amqp://127.0.0.1:5678' by default
// - or -
app.connect('amqp://127.0.0.1:8000', callback)
// - or -
const socketOptions = {} // see http://www.squaremobius.net/amqp.node/channel_api.html#connect
app.connect('amqp://127.0.0.1:8000', socketOptions, callback)
// callback
function callback (err) {
  // ...
}
Failed connect examples:
const app = require('coworkers')()

app.use(function * () {/*...*/})

app.connect(function (err) {
/*
 Error: App requires consumers, please use "consume" before calling connect
 */
})

app.close(...)

Close channels and disconnect from RabbitMQ

Close examples:
// promise api
app.close()
  .then(...)
  .catch(...)

// callback api
app.close(callback)

Context

A Coworkers Context encapsulates a RabbitMQ consumer's message and channels into a single object. This provides easy access to methods and accessors to data frequently used w/ RabbitMQ microservice development.

A Context is created per message, and is referenced in middleware as the receiver, or the this identifier

Context example:

app.use(function * () {
  this // is the Context
  this.queueName // is the name of the queue where the message originated
  this.message // is the incoming rabbitmq message
  this.consumerChannel // is the channel which recieved the message
  this.publisherChannel // is an extra channel dedicated for publishing messages
})

Many of the context's accessors and methods simply delegate to their this.message, this.consumerChannel, or this.publisherChannel equivalents for convenience, and are otherwise identical. For example, this.deliveryTag and this.messageAcked delegate to the message, this.ack and this.nack delegate to consumerChannel, and this.publish(...) and this.sendToQueue(...) delegate to publisherChannel.

Context Models

For the most part context models should not need to be used. Context accessor and methods should be more convenient, and allow the message to properly flow "downstream" and "upstream".

  • this.app - the coworkers app
  • this.connection - amqplib rabbitmq connection, see "Connection" documentation below
  • this.consumerChannel - amqplib* rabbitmq channel dedicated to consuming, see "Channel" documentation below
  • this.publisherChannel - amqplib* rabbitmq channel dedicated to publishing, see "Channel" documentation below

Context Properties

  • this.queueName - name of the queue from which the message originated
  • this.message - the incoming rabbitmq message
  • this.content - message content buffer, message.content accessor
  • this.fields - message fields, message.fields getter
  • this.properties - message properties, message.properties getter
  • this.headers - message headers, message.properties.headers getter
  • this.exchange - exchange which the message was published to, message.fields.exchange accessor
  • this.routingKey - routingKey which the message was published with, message.fields.routingKey accessor
  • this.deliveryTag - delivery tag of the message, message.fields.deliveryTag getter
  • this.consumerTag - unique identifier of consumer, message.fields.consumerTag getter
  • this.redelivered - whether message was redelivered, message.fields.redelivered getter
  • this.queueOpts - queue options used to assert the queue
  • this.consumeOpts - queue's consume options
  • this.messageAcked - boolean, whether message has been acknowledged (ack, nack, reject)
  • this.state - recommended namespace for passing info between middlewares

Context Ack Properties

These special ack properties should be used in place of channel calls (except in the error handler). These properties allow the message to complete it's flow "downstream" and back "upstream" before acknowledging the message.

  • this.ack - set this property to ack the message at the end of the middlewares
  • this.nack - set this property to nack the message at the end of the middlewares
  • this.reject - set this property to reject the message at the end of the middlewares
Ack Example:
app.use(function * () {
  /* ack */
  this.ack = true // will ack w/ default options
  // - or -
  this.ack = { allUpTo: true } // specify custom options
})
Nack Example:
app.use(function * () {
  /* nack */
  this.nack = true // will nack w/ default options
  // - or -
  this.nack = { requeue: false, allUpTo: false } // specify custom options
})

Context Methods

  • this.publish(...) - publish a message to an exchange w/ a routing key on the publisherChannel
  • this.sendToQueue(...) - publish a message directly to a queue on the publisherChannel
  • this.request(...) - publish an rpc message, and easily recieve it's reply, creates a new channel for publishing and consuming (uses amqplib-rpc
  • this.reply(...) - reply to an rpc message on the publisherChannel (uses amqplib-rpc
  • this.checkQueue(...) - check if a queue exists (creates it's own channel to prevent any unexpected errors)
  • this.checkReplyQueue() - check if a reply-queue exists using message.properties.replyTo (creates it's own channel to prevent any unexpected errors)
  • this.toJSON() - return json version of context (note: will not jsonify context.state, if it includes non-primitives)
Publish example:
// `context.publish` jsdoc:
/**
 * Proxy method to publisherChannel.publish, publish a message to an exchange
 * @param  {String} exchange   exchange name to publish to
 * @param  {String} routingKey message routing key
 * @param  {Buffer|Object|Array|String} content    message content
 * @param  {Object} [options]    publish options
 */
// Example usage in middleware
app.use(function * (next) {
  // Works just like amqplib's channel publish.
  // But context's publish allows publishing of
  // objects and strings in addition to buffers.
  // Non-buffer content will be stringified and casted to a Buffer.
  const content = { foo: 1 }
  const opts = {} // optional
  this.publish('exchange-name', 'routing.key', content, opts)
  // ...
})
SendToQueue and CheckQueue example:
// `context.sendToQueue` jsdoc:
/**
 * Proxy method to publisherChannel.sendToQueue
 *   publish a message directly to a queue
 * @param  {String} queue   queue name to publish to
 * @param  {Buffer|Object|Array|String} content message content
 * @param  {Object} [options] publish options
 */
// `context.checkQueue` jsdoc:
/**
 * create a channel, check if the queue exists, and close the channel
 * @param  {String}   queue    queue name
 * @param  {Function} [cb]     callback, not required if using promises
 * @return {Promise}  if using promises
 */
// Example usage in middleware
app.use(function * (next) {
  // Works just like amqplib's channel sendToQueue.
  // But context's sendToQueue allows publishing of
  // objects and strings in addition to buffers.
  // Non-buffer content will be stringified and casted to a Buffer.
  const content = 'hello'
  const opts = {} // optional
  // check queue: in some case it may be useful to check existance of a queue before publishing to it
  var exists = yield this.checkQueue('queue-name')
  if (!exists) {
    // handle message: ack, nack, or etc
    return
  }
  // reply
  this.sendToQueue('queue-name', content, opts)
  // ...
})
RPC ( Request, Reply, CheckReplyQueue) example:

Client.js using context.request

// `context.request` jsdoc:
/**
 * Make an rpc request, publish a message to an rpc queue
 * @param  {String}   queue     name of rpc-queue to send the message to
 * @param  {Buffer}   content   message content
 * @param  {Object}   [sendOpts]  sendToQueue options
 * @param  {Object}   [queueOpts] assertQueue options for replyTo queue, queueOpts.exclusive defaults to true
 * @param  {Object}   [consumeOpts] consume options for replyTo queue, consumeOpts.noAck defaults to true
 * @param  {Function} [cb] callback, if using callback api
 * @return {Promise} returns a promise
 */
// Example usage in middleware
app.queue('client-queue', function * () {
  // request makes it easy to make an rpc-request from a queue
  const content = { a: 10, b: 20 }
  // request function signatures has a lot of optional arguments:
  // request(queueName, content, [sendOpts], [queueOpts], [consumeOpts])
  const replyMsg = yield this.request('multiply-queue', content)
  console.log(replyMsg.content.toString()) // 200
  // ...
  this.ack = true
})

Server.js using context.reply

// `context.reply` jsdoc:
/**
 * Reply to an rpc request, publish a message to replyTo queue
 * @param  {Buffer|Object|Array|String} content message content
 * @param  {Object} options publish options
 */
// `context.checkReplyQueue` jsdoc:
/**
 * create a channel, check if replyTo queue exists, and close the channel
 * @param  {Function} [cb]    not required if using promises
 * @return {Promise}  if using promises
 */
// Example usage in middleware
app.use(function * (next) {
  // convert message body to json
  this.message.content = JSON.stringify(this.message.content.toString())
  yield next
})
app.queue('multiply-queue', function * () {
  // check reply queue: in some case it may be useful to check existance of a queue before doing any work
  const exists = yield this.checkReplyQueue()
  if (!exists) {
    // handle message: ack, nack, or etc
    return
  }
  const content = this.message.content
  const a = content.a
  const b = content.b
  const c = a * b
  const opts = {} // optional
  // reply is sync and does not return a promise,
  //   uses publisherChannel.sendToQueue (see "Channel" documentation below)
  this.reply(new Buffer(c), opts)
  // ...
})

Channel

see amqplib channel documentation http://www.squaremobius.net/amqp.node/channel_api.html#channel

Connection

see amqplib connection documentation http://www.squaremobius.net/amqp.node/channel_api.html#connect

Clustering / Process management

By default, coworkers will use clustering to give each queue consumer it's own process. Clustering is optional, you can manage coworker processes manually (see "Manual process management" below).

Clustering is opinionated, it make the processes work as a unit:

  • If a worker fails to startup and connect to rabbitmq, it will kill all the workers
  • If a workers dies it will be attempted to be respawned w/ exponential backoff
    • Use the following ENV variables to adjust behavior: COWORKERS_RESPAWN_RETRY_ATTEMPTS, COWORKERS_RESPAWN_RETRY_MIN_TIMEOUT, COWORKERS_RESPAWN_RETRY_FACTOR
  • If a worker dies and repeatedly fails to create process and connect to rabbitmq it will crash the entire cluster
Clustering example:

When clustering is enabled, Coworkers will optimize the number of processes to the number of cpus the server has. The below example will will create four workers in total (to match the number of cpus): two "foo-queue" consumers, and two "bar-queue" consumers. If the number of queues > num cpus, coworkers will only create one consumer per queue. If you want to specify the number of workers per queue you can do this using the environment variable: COWORKERS_WORKERS_PER_QUEUE. If you have any problems w/ a particular worker process you can close it by sending it a SIGINT signal, this will gracefully shutdown the process and not respawn a replacement (to restart the worker after stopping it, restart your coworkers app).

// app.js
const app = require('coworkers')()
require('os').cpus().length // 4

app.queue('foo-queue', ...)
app.queue('bar-queue', ...)

app.on('error', ...)

app.connect(function (err) {
  if (err) console.error(err.stack)
})
Manual process management:

Coworkers forces you to only consume a single queue per process, so that your consumers are decoupled. If you want to manage your own processes w/out using clustering all you have to do is specify three environment variables:

Processes will send process messages so that you can determine the state:

  • messages include: 'coworkers:connect', 'coworkers:connect:error', 'coworkers:close', 'coworkers:close:error'
COWORKERS_CLUSTER="false" # {string-boolean} disabled clustering
COWORKERS_QUEUE="foo-queue" # {string} specify the queue the process will consume
COWORKERS_QUEUE_WORKER_NUM=1 # {number} specify the queue worker number, optional, default: 1
  # if you create multiple processes per queue, this unique id per queue
// app.js
// ...
app.use('foo-queue', ...)
app.use('bar-queue', ...)

module.exports = app
// process-manager.js
var app = require('app')()

// create one consumer process per queue
app.queueNames.forEach(function (queueName) {
  // create node process w/ env:
  // COWORKERS_CLUSTER="false"
  // COWORKERS_QUEUE=queueName
  // COWORKERS_QUEUE_WORKER_NUM=1
  // ...
})
//...

RPC utils

If you need to publish rpc messages from another application you can use amqplib-rpc. Coworkers uses amqplib-rpc under the hood for it's RPC methods, so the method signatures are nearly identical.

Testing

Check out coworkers-test it allows you to easily test a coworkers app's message-handling middlewares as a unit w/out requiring rabbitmq.

Changelog

CHANGELOG.md

License

MIT

coworkers's People

Contributors

giuseppe-santoro avatar jdrouet avatar jucrouzet avatar mwilliams avatar solsend2l avatar tjmehta avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

coworkers's Issues

What is the maintenance status of this project? Is this approach better than Seneca / Cote etc?

I am interested in developing node.js microservices using rabbitmq as the messaging framework. I stumbled upon this project which seemed like the ideal approach to such a solution. However, I want to know if the project is being maintained as the last commit was over an year ago.

Also, does this approach prove superior to other approaches such as Seneca or Cote when developing microservices? I appreciate the bare metal nature of the rabbitmq approach which to me provides a clearer model of the communication going on between microservices. Are there other considerations that need to be taken into account?

Disable forking/cluster completely

Hello!

How to completely disable forking the application?

Should this be enough?

const coworkers = require('coworkers')
const app = coworkers()

process.env.COWORKERS_CLUSTER = false
process.env.COWORKERS_QUEUE = 'foo-queue'
process.env.COWORKERS_QUEUE_WORKER_NUM = 1

// shared middlewares
app.use(function * (next) {
  // all consumers will run this logic...
  yield next
})

app.queue('foo-queue', function * () {
  // consumer specific logic
  this.ack = true // acknowledge message later, see `Context` documentation below
})

app.on('error', function (err) {
  console.error(err.stack)
})

app.connect('amqp://127.0.0.1:5672', function(error) { 
    if (error)
        console.log(error)
})

console.log('hi')

console.log gets triggered five times. Am I doing something wrong?

Nesting Ack in function

I'm loving the framework. I had a simple question about returning this.nack = true after my middleware consumes the message.

My Flow is similar to this:

  1. app.queue('sendEmail', function*()
  2. Call a function to email the user
  3. Email Callback has error or data.
  4. If successful this.ack = true;
  5. If error this.reject=true

Since I dont have access to this object within the sendMail function, what is the best way to return the ack on success, and reject on error?

Channel locked when using RPC

I get the following error when trying to use the RPC implementation in coworkers:

Error: Channel closed by server: 405 (RESOURCE-LOCKED) with message "RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'amq.gen-_G_bE-7udUZ7m6c9seUx0A' in vhost '/'"
    at Channel.C.accept (/home/tom/Programming/AuthServer/node_modules/amqplib/lib/channel.js:406:17)
    at Connection.mainAccept [as accept] (/home/tom/Programming/AuthServer/node_modules/amqplib/lib/connection.js:64:33)
    at Socket.go (/home/tom/Programming/AuthServer/node_modules/amqplib/lib/connection.js:477:48)
    at emitNone (events.js:106:13)
    at Socket.emit (events.js:208:7)
    at emitReadable_ (_stream_readable.js:513:10)
    at emitReadable (_stream_readable.js:507:7)
    at addChunk (_stream_readable.js:274:7)
    at readableAddChunk (_stream_readable.js:250:11)
    at Socket.Readable.push (_stream_readable.js:208:10)
    at TCP.onread (net.js:594:20)
coworkers up and consuming messages
/home/tom/Programming/AuthServer/node_modules/throw-next-tick/index.js:3
    throw err;

This is my server side code:

const coworkers = require('coworkers');
const db = require('./model/database');
const {getBearerToken} = require('./auth/oAuth2');

const app = coworkers();
app.prefetch(100);
app.queue('bearer-token-request', function * (){
    this.ack = true
    const exists = yield this.checkReplyQueue();
    if (!exists){
        this.nack
        return 
    }
    const content = this.message.content;
    console.log(content);
    this.reply(new Buffer('this is the reply'))
    this.ack = true
});

app.on('error', (err)=>{
    console.log(err.stack)
});

db.connect((err)=>{
    if(err){
        console.log('Unable to connect to the database.');
        process.exit(1);
    } else {
        app.connect((err)=>{
            if (err) return console.log(err)
            console.log('coworkers up and consuming messages')
        });
    }
});

and I'm just testing it with the following in a node terminal:

var amqplib = require('amqplib')
var request = require('amqplib-rpc').request

amqplib.connect().then(function (connection) {
  return request(connection, 'bearer-token-request', { a: 10, b: 20 }).then(function (replyMessage) {
    console.log('res',replyMessage.content.toString()) // 200
  })
}).catch((err)=>{console.log('err',err)})

Creating & sending to queue

Hi,

I have been using this module since a long while. My whole workers app is using the co-workers as base. It is very useful and suffice my needs for creating queue consumer. But when it comes to become publisher, its a bit pain.

I am aware that i can use the context to use publisher channel to publish a message to a queue. But the problem arrives when i have a queue for which i dont want to create a consumer in this app but in other app(in other words i want this app to act as a producer in some situations)

For e.g.: I have two queues startWorking and doneWorking my node application will send a message to startWorking when it has a task. what i want to essentially do is put a message in doneWorking when a message in startWorking is processed by consumer(my node app i.e. the producer of message in startWorking will become consumer of doneWorking)
For this i need to consume startWorking and also create a queue doneWorking but not consume it

The main aim of above mechanism is two way communication channel.

The problem is that, co-workers lack on info and documentation as a publisher/producer. Can you please guide me on how this would be possible using co-workers.

I would be really greatfull for your help and reply

Thanks
Hannan

Opening exclusive queue per request and leaving them open

I've had this problem with the amqplib-rpc library. I was trying to use amqplib-rpc on my API Gateway and it would just keep creating new exclusive channels. I had to switch to node-bunnymq because it creates one exclusive queue for responses.

So if I run this code 4 times... I get 4 exclusive queues that don't delete themselves until Coworkers process is killed. This is following the RPC example from the readme...

Imgur

Client

app.use(function * (next) {
  if (this.queueName === 'user.create') {
    const replyMsg = yield this.request('tenant.read', {tenant_id: this.message.content.tenant_id})
    // do something with replyMsg
  }
  yield next
})

Server

app.queue('tenant.read', function * () {
  // return the good stuff...
  this.reply(new Buffer(JSON.stringify(result))
  this.ack = true
})

TypeError: app.consume is not a function

Hi,

I'm trying to get a consumer running, then publish to a queue and have the consumer process it.

I'm using the git repo for this tinkering code - not the npm module.

So trying to first fire up a consumer that consumes from the 'foo-queue'.

The consumer code is cut'n'paste from the README.

const app = require('./coworkers')()

/* shared middlewares */

// "trace" middleware
app.use(function * (next) {
  this.id = require('crypto').randomBytes(12)
  // save consumer start time
  const startTime = Date.now()
  // move on to next middleware
  yield next
  // all middlewares have finished
  const elapsed = Date.now() - startTime
  console.log(`coworkers-trace:${this.id}:${elapsed}`)
})

// "parse-content" middleware
app.use(function * (next) {
  this.message.content = JSON.parse(this.message.content)
  yield next
})

/* queue consumers w/ middlewares */

// "foo-queue" consumer middleware
app.consume('foo-queue', function * () {
  this.ack = true // checkout `Context` documentation for ack, nack, and more
})

app.connect()

Running this I get:

❯ node consumer.js
/projects/coworkers-demo/consumer.js:26
app.consume('foo-queue', function * () {
    ^

TypeError: app.consume is not a function
    at Object.<anonymous> (/projects/coworkers-demo/consumer.js:26:5)
    at Module._compile (module.js:397:26)
    at Object.Module._extensions..js (module.js:404:10)
    at Module.load (module.js:343:32)
    at Function.Module._load (module.js:300:12)
    at Function.Module.runMain (module.js:429:10)
    at startup (node.js:139:18)
    at node.js:999:3

I'm using:

node -v ⏎
v5.4.1
npm -v
3.3.12

Am I doing something wrong?

Thanks.

Clustering / Process management ( DISABLE )

Hello, i try to use coworkers and rabbitmq-schema;
if for some reason i want to disable workers, and left only ONE, but i have two or more queue, how to properly use 1 worker ( = 1 process ) ?

process.env.COWORKERS_CLUSTER = false;
process.env.COWORKERS_QUEUE = 'queue1'; <= should i define here 1 queue or more ?
//process.env.COWORKERS_QUEUE_WORKER_NUM = 1;

Running with (in) express app

Hi there,

here is a sample application, where I'm trying to run express and coworkers as one app, but it ends up with an error EADDRINUSE. What is the best strategy for this type of use case?

const RabbitSchema = require('rabbitmq-schema')
const coworkers = require('coworkers');
const express = require('express')

const qname = `coworkers.user.created.${process.env.QUEUE_ID}`;
const topic = new RabbitSchema({
    exchange: 'Cherry',
    type: 'topic',
    bindings: [{
        routingPattern: 'UserManagement.UserCreated',
        destination: {
            queue: qname,
            messageSchema: {
                type: 'object'
            }
        }
    }]
});

const rabbit = coworkers({schema: topic});
rabbit.queue(qname, function * () {
    console.log(this);
    this.ack = true;
});
rabbit.on('error', (err) => console.error(err.stack));
rabbit.connect('amqp://app:cherry@ubuntu');

const app = express();
app.get('/', (req, res) => res.send('Hello World!'));
app.listen(3000, () => console.log('Example app listening on port 3000!'));

Something wrong with connection promise.

My code (example):

app.connect('wrong-url')
  .then(function() {
    console.log('Connect to RabbitMQ success');
  })
  .catch(function(e) {
    console.log(`Connect to RabbitMQ filed with error: ${e}`);
  });

My console:

[2016-01-26T17:45:30.097Z] Connect to RabbitMQ success
[2016-01-26T17:45:30.554Z] Connect to RabbitMQ filed with error: Error: Expected amqp: or amqps: as the protocol; got null
[2016-01-26T17:45:30.562Z] Connect to RabbitMQ filed with error: Error: Expected amqp: or amqps: as the protocol; got null
[2016-01-26T17:45:30.565Z] Connect to RabbitMQ filed with error: Error: Expected amqp: or amqps: as the protocol; got null
[2016-01-26T17:45:30.567Z] Connect to RabbitMQ filed with error: Error: Expected amqp: or amqps: as the protocol; got null

I think it is because the "connect" method returns "undefined" instead scalar or promise when starts cluster, but I don't know how to fix it.

Question: Standalone Publishing

Hi,

I have a general type question on using coworkers.

I'm still a newb with this library, only spent and hour or so tinkering with it.

Is my assumption correct that coworkers is for consuming only?

I can see that while consuming, there is middleware for publishing, or send to queue - but that's part of the consuming process.

What about straight up publishing in it own process?

For example, from this gist, https://gist.github.com/rudijs/185fc162bdd667a91405, it will publish a message to RabbitMQ every 1 second.

Can I use coworkers to do the same thing? Or is coworkers more for just the consumer/processing side?

The gist example will create a new connection for each message (no channel).

Thanks

How should I work with exchanges?

My application example:

'use strict';

const coworkers = require('coworkers');
const RabbitSchema = require('rabbitmq-schema');

const schema = new RabbitSchema({
  exchange: 'my-existing-exchange',
  type: 'direct',
  bindings: [{
    routingPattern: 'foo.bar.key',
    destination: {
      queue: 'queue0',
      messageSchema: {}
    }
  }]
});

const app = coworkers({schema: schema});
app.queue('queue0', function * () {
  this.ack = true;
});

app.on('error', function (err) {
  console.error(err.stack)
});

app.connect('amqp://my-rabbitmq-url');

After launching the application, the 'queue0' is being created, but it does not bind to the exchange:
screen shot 2016-01-27 at 12 57 48
What am I doing wrong?

uncaughtException ECONNRESET

I'm starting an amq connection and exchange following this code:

const consumer = require('./consumer'); // contain all the logic for queue consuming/coworkers
const config = require('../config');
const logger = require('./utils/logger');

let amq;
let retryCount = 0;
function start() {
  amq = consumer.init();

  return amq.connect(config.amqUrl)
  .then(() => {
    amq.connection.on('error', (e) => {
      logger.error(`Error connection AMQ - ${e.message}`);
      start();
    });
    return amq.consumerChannel.assertExchange(config.consumeExchangeName, 'direct');
  })
  .then(() => {
    return amq.consumerChannel.bindQueue(config.consumeQueueName, config.consumeExchangeName,
      'direct');
  })
  .then(() => {
    return amq.publisherChannel.assertExchange(config.publishExchangeName, 'direct');
  })
  .then(() => {
    logger.info('AMQ Started');
  })
  .catch(err => {
    if (retryCount < 6) {
      retryCount++;
      logger.error(`Error connecting to AMQ - ${retryCount} attempt`);
      return setTimeout(start, retryCount * 1000);
    }
    logger.error(err);
    return process.exit(1);
  });
}

start();
/* then some code to handle gracefully SIGINT, SIGQUITL, and SIGTERM */
  • When AMQ server is available during the start, it works properly and output AMQ started.
  • When AMQ server is unavailable during the start, it will try 6 times before it exit the process.
  • But when AMQ server is available during the start, and we lose the connection after the consumer started, it outputs the following error:
error: uncaughtException: "app.connection" unexpectedly errored: read ECONNRESET date=Wed Sep 21 2016 23:41:30 GMT+0000 (UTC), pid=450, uid=0, gid=0, cwd=/app, execPath=/usr/local/bin/node, version=v4.4.7, argv=[/usr/local/bin/node, /app/index.js], rss=54706176, heapTotal=34252128, heapUsed=22584648, loadavg=[0.32421875, 0.15966796875, 0.111328125], uptime=23586, trace=[column=11, file=util.js, function=exports._errnoException, line=873, method=_errnoException, native=false, column=26, file=net.js, function=TCP.onread, line=557, method=onread, native=false], stack=[Error: "app.connection" unexpectedly errored: read ECONNRESET,     at exports._errnoException (util.js:873:11),     at TCP.onread (net.js:557:26)]

I looked at the documentation and it seems that the following lines would catch the exception:

    amq.connection.on('error', (e) => {
      logger.error(`Error connection AMQ - ${e.message}`);
      start();
    });

But it doesn't seem that this code is catching any error.

Any idea about what's happening?

What to do if a want to ack asynchronously?

I have long async task, that i want to ack/nack only after it finish. How can i implement this with coworkers? As i see, i can ony do it with context properties, but in case with RabbitMQ and JS this is unusable almost always.
Or did I understand something wrong?

Support functions that return a promise.

It would be nice to support functions that return a promise... To me this is a much cleaner abstraction.

Using async functions...

//consumer.js - async function
async function somethingUsingCoworker() {
  try {
    var message = { 'some': 'object' };
    var result = await app.work('queue-name', message); //pass all arguments after queueName to callee
    return result.
  } catch(err) {
    //err is the result of an error/fail from the coworker
    return null; //or rethrow
  }
}
/*****************************/
//worker.js - async function
app.worker('queue-name', async function(defer, message) { //all arguments after defer, from caller
  // reject the work, return error to caller/source
  throw new Error('...');

  //cannot process this work right now, re-queue
  return defer; // or throw defer, either should work

  //positive result that isn't defer, pass this result to caller/source
  return 'success'; 
}

Using raw promises...

//consumer.js - promise
function somethindUsingCoworker() {
  var message = {'some':'object'};
  return app.work('queue-name', message)
    .then(
      function(result) {
        //got result
        return result;
      },
      function(err) {
        //got error
        return null; //resolve, or throw err to rethrow, or return Promise.reject(err);
      }
    );
}
/*******************************/
//worker.js - promse
app.worker('queue-name', function(message, defer) {
  return new Promise(function(resolve, reject){
    //reject the work
    return reject(new Error(...));

    //defer/requeue the work
    return resolve(defer); // or return reject(defer); // either should work

    //positive result returned to calling process
    return resolve('success'); // anything that can be serialized as a response.
  });
})

NPM v3 install: UNMET DEPENDENCY amqplib@^0.4.0

Hi,

Using NPM v3 the peer dependency for amqplib doesn't install.

Currently I need to npm i amqplib after npm i coworkers.

❯ npm i coworkers
[email protected] /media/rudi/crypt2/projects/coworkers-demo
├── UNMET DEPENDENCY amqplib@^0.4.0
└─┬ [email protected] 
  ├─┬ [email protected] 
  │ ├── [email protected] 
  │ ├─┬ [email protected] 
  │ │ └── [email protected] 
  │ └── [email protected] 
  ├─┬ [email protected] 
  │ ├── [email protected] 
  │ ├── [email protected] 
  │ ├── [email protected] 
  │ └─┬ [email protected] 
  │   └─┬ [email protected] 
  │     ├── [email protected] 
  │     ├── [email protected] 
  │     ├─┬ [email protected] 
  │     │ └─┬ [email protected] 
  │     │   ├── [email protected] 
  │     │   └── [email protected] 
  │     ├── [email protected] 
  │     ├── [email protected] 
  │     ├── [email protected] 
  │     └── [email protected] 
  ├── [email protected] 
  ├── [email protected] 
  ├─┬ [email protected] 
  │ └── [email protected] 
  ├── [email protected] 
  ├── [email protected] 
  ├── [email protected] 
  ├─┬ [email protected] 
  │ ├── [email protected] 
  │ └── [email protected] 
  ├── [email protected] 
  ├─┬ [email protected] 
  │ └─┬ [email protected] 
  │   └── [email protected] 
  └── [email protected] 

npm WARN EPEERINVALID [email protected] requires a peer of amqplib@>=0.2.1 but none was installed.
npm WARN EPACKAGEJSON [email protected] No repository field.

Documentation question

First thing, the concept of this package is awesome 💯, I hope you are going to keep maintaining it 👍. Second, not sure if I can post a question here about framework docs.

In case of application loose connection to rabbit, is it going to try to reconnect? I don't find docs about that. I read that If any of the workers fail to connect to Rabbitmq they will cause master's connect to error. https://github.com/tjmehta/coworkers#appconnect , but I guess this is when trying to connect the first time, or is it also when loosing connection after successfully connecting the first time?

How do you set a prefetch count for consumers?

I am using a round robin method of processing where I want each consumer to have a specific prefetch to control memory usage per box. At the moment, if I have thousands of messages on a queue, my consumer will fetch all of them down to process simultaneously. Obviously, this is causing severe memory issues on my consumers. I would like to set a prefetch count to control the flow at a finer level. I have done this in the past with the amqplib library, but cannot exactly see how to do it w/ coworkers. Is there currently a way to achieve this?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.