Giter Club home page Giter Club logo

node-rabbitmq-client's Introduction

npm version

RabbitMQ Client

Node.js client library for RabbitMQ. Publish messages, declare rules for routing those messages into queues, consume messages from queues.

Why not amqplib?

  • No dependencies
  • Automatically re-connect, re-subscribe, or retry publishing
  • Optional higher-level Consumer/Publisher API for even more robustness
  • Written in typescript and published with heavily commented type definitions
  • See here for full API documentation
  • Intuitive API with named parameters instead of positional
  • "x-arguments" like "x-message-ttl" don't have camelCase aliases

Performance

Performance is comparable to amqplib (see ./benchmark.ts).

Task Name ops/sec Average Time (ns) Margin Samples
rabbitmq-client publish-confirm (null route) 2,611 382919 ±3.69% 1306
amqplib publish-confirm (null route) 2,315 431880 ±4.89% 1158
rabbitmq-client publish-confirm (transient queue) 961 1039884 ±1.07% 481
amqplib publish-confirm (transient queue) 1,059 943706 ±1.34% 530

Quick start

In addition to the lower-level RabbitMQ methods, this library exposes two main interfaces, a Consumer and a Publisher (which should cover 90% of uses cases), as well as a third RPCClient for request-response communication.

import {Connection} from 'rabbitmq-client'

// Initialize:
const rabbit = new Connection('amqp://guest:guest@localhost:5672')
rabbit.on('error', (err) => {
  console.log('RabbitMQ connection error', err)
})
rabbit.on('connection', () => {
  console.log('Connection successfully (re)established')
})

// Consume messages from a queue:
// See API docs for all options
const sub = rabbit.createConsumer({
  queue: 'user-events',
  queueOptions: {durable: true},
  // handle 2 messages at a time
  qos: {prefetchCount: 2},
  // Optionally ensure an exchange exists
  exchanges: [{exchange: 'my-events', type: 'topic'}],
  // With a "topic" exchange, messages matching this pattern are routed to the queue
  queueBindings: [{exchange: 'my-events', routingKey: 'users.*'}],
}, async (msg) => {
  console.log('received message (user-events)', msg)
  // The message is automatically acknowledged (BasicAck) when this function ends.
  // If this function throws an error, then msg is rejected (BasicNack) and
  // possibly requeued or sent to a dead-letter exchange. You can also return a
  // status code from this callback to control the ack/nack behavior
  // per-message.
})

sub.on('error', (err) => {
  // Maybe the consumer was cancelled, or the connection was reset before a
  // message could be acknowledged.
  console.log('consumer error (user-events)', err)
})

// Declare a publisher
// See API docs for all options
const pub = rabbit.createPublisher({
  // Enable publish confirmations, similar to consumer acknowledgements
  confirm: true,
  // Enable retries
  maxAttempts: 2,
  // Optionally ensure the existence of an exchange before we use it
  exchanges: [{exchange: 'my-events', type: 'topic'}]
})

// Publish a message to a custom exchange
await pub.send(
  {exchange: 'my-events', routingKey: 'users.visit'}, // metadata
  {id: 1, name: 'Alan Turing'}) // message content

// Or publish directly to a queue
await pub.send('user-events', {id: 1, name: 'Alan Turing'})

// Clean up when you receive a shutdown signal
async function onShutdown() {
  // Waits for pending confirmations and closes the underlying Channel
  await pub.close()
  // Stop consuming. Wait for any pending message handlers to settle.
  await sub.close()
  await rabbit.close()
}
process.on('SIGINT', onShutdown)
process.on('SIGTERM', onShutdown)

Connection.createConsumer() vs Channel.basicConsume()

The above Consumer & Publisher interfaces are recommended for most cases. These combine a few of the lower level RabbitMQ methods (exposed on the Channel interface) and and are much safer to use since they can recover after connection loss, or after a number of other edge-cases you may not have imagined. Consider the following list of scenarios (not exhaustive):

  • Connection lost due to a server restart, missed heartbeats (timeout), forced by the management UI, etc.
  • Channel closed as a result of publishing to an exchange which does not exist (or was deleted), or attempting to acknowledge an invalid deliveryTag
  • Consumer closed from the management UI, or because the queue was deleted, or because basicCancel() was called

In all of these cases you would need to create a new channel and re-declare any queues/exchanges/bindings before you can start publishing/consuming messages again. And you're probably publishing many messages, concurrently, so you'd want to make sure this setup only runs once per connection. If a consumer is cancelled then you may be able to reuse the channel but you still need to check the queue and so on...

The Consumer & Publisher interfaces abstract all of that away by running the necessary setup as needed and handling all the edge-cases for you.

Managing queues & exchanges

A number of management methods are available on the Connection interface; you can create/delete queues, exchanges, or bindings between them. It's generally safer to do this declaratively with a Consumer or Publisher. But maybe you just want to do something once.

const rabbit = new Connection()

await rabbit.queueDeclare({queue: 'my-queue', exclusive: true})

await rabbit.exchangeDeclare({queue: 'my-queue', exchange: 'my-exchange', type: 'topic'})

await rabbit.queueBind({queue: 'my-queue', exchange: 'my-exchange'})

const {messageCount} = await rabbit.queueDeclare({queue: 'my-queue', passive: true})

And if you really want to, you can acquire a raw AMQP Channel but this should be a last resort.

// Will wait for the connection to establish and then create a Channel
const ch = await rabbit.acquire()

// Channels can emit some events too (see documentation)
ch.on('close', () => {
  console.log('channel was closed')
})

const msg = ch.basicGet('my-queue')
console.log(msg)

// It's your responsibility to close any acquired channels
await ch.close()

RPCClient: request-response communication between services

This will create a single "client" Channel on which you may publish messages and listen for direct responses. This can allow, for example, two micro-services to communicate with each other using RabbitMQ as the middleman instead of directly via HTTP.

// rpc-server.js
const rabbit = new Connection()

const rpcServer = rabbit.createConsumer({
  queue: 'my-rpc-queue'
}, async (req, reply) => {
  console.log('request:', req.body)
  await reply('pong')
})

process.on('SIGINT', async () => {
  await rpcServer.close()
  await rabbit.close()
})
// rpc-client.js
const rabbit = new Connection()

const rpcClient = rabbit.createRPCClient({confirm: true})

const res = await rpcClient.send('my-rpc-queue', 'ping')
console.log('response:', res.body) // pong

await rpcClient.close()
await rabbit.close()

node-rabbitmq-client's People

Contributors

cody-greene avatar tttp avatar zeichen32 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar

node-rabbitmq-client's Issues

Status codes for ack/nack

Can someone please list down all the status code that I can return from a consumer callback function to ack or nack ?

Unable to capture connection errors

Steps to reproduce:

  • Create new instance of Connection with invalid configuration (ex: invalid credentials) in try/catch block

Expected:

  • Error handled in catch block

Observed:

  • Exception thrown
  • Process terminated

How to mock connection on tests?

Hey there! Nice job man!
I am trying to use but i am having a problem with mocking up the conection while testing or generating documentation.
While I generate the documentation the connection is made..
I am trying to find a way to mock up the conections during tests.
I already have a variable that tells me if it is testing or not, i just need the mockup with
the same functions. Anybody already created something like?

Thanks bro!

control acknowledgement

I need to know how I can prevent auto ack for the messages when I use client.createConsumer I need to control in ack the message I don't need to ack it automatically

Consumer handler `reply()` `envelope.routingKey` and `envelope.exchange` conflict

Problem

The behavior of the reply() callback passed to the ConsumerHandler has changed between v4.4.0 and v4.5.0.
A change was introduced to allow for overriding message envelope properties (correlationId to be specific).
Which resulted in message replies being routed wrongfully due to using both the routingKey and the exchange for routing the message (as with any normal publisher).

Cause

The code responsible for this issue can be found here v4.4.0 - v4.5.0 Diff

Diagnoses

In RabbitMQ, a message can be published to queue either through an exchange that is bound to a queue using some routingKey. Or a message with routingKey that matches a queue name exactly.

Previously, the exchange was set to an empty string (always overwritten explicitly), which made the routingKey the only property responsible for routing the message. Starting from v4.5.0, this property assignment was dropped out, which resulted in the following side-effect:

  • If reply() function caller specifies an exchange —for some reason—, the routing will be done based on the exchange and the routingKey, which will require having to do additional configuration to make a new queue-exchange binding in order for RabbitMQ to know where to deliver that message. This defeats the purpose of the reply() function, as it is solely used as a shortcut for RPC replies and resetting the exchange when replying make no sense and shouldn't be allowed.

Fix

Revert the change made in v4.5.0 and add { exchange: '' } again so that the { routingKey: req.replyTo } is the only field used for routing.

If you don't have the time to fix the issue. I am more than happy to make a PR.


BTW, thanks for the package, I really enjoy working with it. You nailed the API design.

Thank You

Just wanted to drop by and say thank you for this 🙂

I've been looking for a RabbitMQ Node.js library that meets these criteria:

  • TypeScript support
  • Allows specifying multiple nodes in a cluster
  • Await-able components
  • Good docs and few dependencies

Your library meets all of those 💪. It should be added to this list in my humble opinion.

Cheers 🍹

Error: Class extends value undefined is not a constructor or null

Hey everyone, I have a RabbitMQ broker running in the background and a .NET Core backend publishing messages. I need to connect to the broker inside my React app I created using Vite. This is the code I use for connecting to the broker:

import { Connection } from 'rabbitmq-client';

  useEffect(() => {
    const rabbit = new Connection("amqp://guest:guest@localhost:5672");

    rabbit.on('connection', () => {
      console.log('connected');
    });
    rabbit.on('error', () => {
      console.log('error');
    });

    return () => {
      rabbit.close();
    };
  }, []);

But I get this error when the code snippet is called:

Uncaught TypeError: Class extends value undefined is not a constructor or null
    at node_modules/rabbitmq-client/lib/util.js (util.js:106:43)
    at __require2 (chunk-WQG2LZMB.js?v=fa4e550a:16:50)
    at node_modules/rabbitmq-client/lib/Connection.js (Connection.js:11:16)
    at __require2 (chunk-WQG2LZMB.js?v=fa4e550a:16:50)
    at node_modules/rabbitmq-client/lib/index.js (index.js:4:22)
    at __require2 (chunk-WQG2LZMB.js?v=fa4e550a:16:50)
    at index.js:12:126

Any idea what I'm doing wrong or what I can do to resolve this issue?

Side note, this warning also shows up in the browser console:

Module "node:stream" has been externalized for browser compatibility. Cannot access "node:stream.Writable" in client code. See http://vitejs.dev/guide/troubleshooting.html#module-externalized-for-browser-compatibility for more details.

[Question]: Single active consumer

RabbitMq supports single active consumer which is useful in my scenario where I run all my consumers in a Pod in Kubernetes with N replicas. For consumers where ordering is necessary, I set appropriate headers to ensure only one consumer will be processing messages (with concurrency of 1) from the queue, while the other consumer will wait for the active consumer to disconnect etc.. This works nicely as for consumers where ordering does not matter, I can increase throughput by increasing replica count.

I was wondering if I were to use this library (and I really LOVE the consumer/publisher abstractions... the best I've seen so far!), can we set single active consumer option or if that's how it works under hood?

I hope my questions makes sense.

TypeError: class heritage node_stream_1.Writable is not an object or null

Hi there,

I try to connect my angular app to the rabbitmq server (it's an raspberry pi in local network).
When open the component with the api, I'll get the error in title: TypeError: class heritage node_stream_1.Writable is not an object or null
The following is the api service:
`import { Injectable } from '@angular/core';
import { Connection } from 'rabbitmq-client';

@Injectable({
providedIn: 'root'
})
export class OfflineAmpelControllerApiService
{

constructor()
{
console.log("Try to connect...");
let rabbit = new Connection('amqp://username:password@raspi:5672');

rabbit.on('error', (err) => {
  console.log('RabbitMQ connection error', err)
});
rabbit.on('connection', () => {
  console.log('Connection successfully (re)established')
});

}
}`

my angular.json:
{ "$schema": "./node_modules/@angular/cli/lib/config/schema.json", "version": 1, "newProjectRoot": "projects", "projects": { "AngDev": { "projectType": "application", "schematics": { "@schematics/angular:component": { "standalone": false }, "@schematics/angular:directive": { "standalone": false }, "@schematics/angular:pipe": { "standalone": false } }, "root": "", "sourceRoot": "src", "prefix": "app", "architect": { "build": { "builder": "@angular-devkit/build-angular:application", "options": { "outputPath": "dist/ang-dev", "index": "src/index.html", "browser": "src/main.ts", "polyfills": [ "zone.js" ], "tsConfig": "tsconfig.app.json", "assets": [ "src/favicon.ico", "src/assets", ], "styles": [ "@angular/material/prebuilt-themes/deeppurple-amber.css", "src/styles.css" ] "server": "src/main.server.ts", "prerender": false, "ssr": false }, "configurations": { "production": { "budgets": [ { "type": "initial", "maximumWarning": "500kb", "maximumError": "1mb" }, { "type": "anyComponentStyle", "maximumWarning": "2kb", "maximumError": "4kb" } ], "outputHashing": "all" }, "development": { "optimization": false, "extractLicenses": false, "sourceMap": true } }, "defaultConfiguration": "production" }, "serve": { "builder": "@angular-devkit/build-angular:dev-server", "configurations": { "production": { "buildTarget": "AngDev:build:production", "proxyConfig": "src/proxy.conf.json" }, "development": { "buildTarget": "AngDev:build:development", "proxyConfig": "src/proxy.conf.json" } }, "defaultConfiguration": "development" }, "extract-i18n": { "builder": "@angular-devkit/build-angular:extract-i18n", "options": { "buildTarget": "AngDev:build" } }, "test": { "builder": "@angular-devkit/build-angular:karma", "options": { "polyfills": [ "zone.js", "zone.js/testing" ], "tsConfig": "tsconfig.spec.json", "assets": [ "src/favicon.ico", "src/assets" ], "styles": [ "@angular/material/prebuilt-themes/deeppurple-amber.css", "src/styles.css" ] } } }, "i18n": { "sourceLocale": "de-DE" } } }, "cli": { "analytics": "58f39bb4-9245-41d2-9246-110bfd2ba6bd" } }

and package.json:
{ "name": "ang-dev", "version": "0.0.0", "scripts": { "ng": "ng", "start": "ng serve", "build": "ng build", "watch": "ng build --watch --configuration development", "test": "ng test", "serve:ssr:AngDev": "node dist/ang-dev/server/server.mjs" }, "private": true, "dependencies": { "@angular/animations": "^17.0.5", "@angular/cdk": "^17.0.5", "@angular/common": "^17.0.5", "@angular/compiler": "^17.0.5", "@angular/core": "^17.0.5", "@angular/forms": "^17.0.5", "@angular/material": "^17.0.5", "@angular/material-moment-adapter": "^17.2.0", "@angular/platform-browser": "^17.0.5", "@angular/platform-browser-dynamic": "^17.0.5", "@angular/platform-server": "^17.0.5", "@angular/router": "^17.0.5", "@angular/ssr": "^17.0.4", "cryptojs": "^2.5.3", "express": "^4.18.2", "jschardet": "^3.1.2", "localstorage-polyfill": "^1.0.1", "primeicons": "^6.0.1", "rabbitmq-client": "^4.6.0", "rxjs": "~7.8.0", "set-interval-async": "^3.0.3", "tslib": "^2.3.0", "zone.js": "~0.14.2" }, "devDependencies": { "@angular-devkit/build-angular": "^17.1.0", "@angular/cli": "^17.1.0", "@angular/compiler-cli": "^17.1.0", "@types/cryptojs": "^3.1.33", "@types/express": "^4.17.17", "@types/jasmine": "~5.1.0", "@types/mime-types": "^2.1.4", "@types/node": "^18.18.0", "jasmine-core": "~5.1.0", "karma": "~6.4.0", "karma-chrome-launcher": "~3.2.0", "karma-coverage": "~2.2.0", "karma-jasmine": "~5.1.0", "karma-jasmine-html-reporter": "~2.1.0", "typescript": "~5.2.2" } }

Can someone tell me, how I'll get it working?

Thank you very much

Fipsi

Question: Multi Node RabbitMQ / Failover

In production we run RabbitMQ using Kubernetes with multiple nodes. I noticed that if a rabbitmq instance dies so does the process (Which to be expected if it was only one rabbitmq node) but usually it should just continue working / reconnect to the next rabbitmq instance as that one is still healthy.

I saw that amqplib doesn't support this either which is why amqp-connection-manager exists but I feel it would be a good addition to have it natively in this client as it is so far the best option for the future in node. I am not sure if this is a feature yet in this client, but if not is this something that is planned?

If not I may be able to help contribute to this with a PR (If I get some allocation from work).

Thanks in advance

set timeout option for RPC request

Hi, i want to ask how timeouts work in rpc client this is my code right now :

this._rpcClient = this.rabbit.createRPCClient({
      confirm: true,
      maxAttempts: 2,
      timeout: 5000,
      exchanges: [{ exchange: "rpc", durable: true, type: "topic" }],
    });
    
    const res = await this._rpcClient?.send(
        {
          exchange: "rpc",
          routingKey: routeKey,
          expiration: "4000",
        },
        message
      );

so do i need to set expiration in send method? or just set timeout in createRPCClient is enough.
what happend for message after it timeouts in rpc client?

Constant disconnects

Hello!
First of all I would like to thank you for the amazing work you've done on the project. I just recently migrated to this library from amqplib and for most part everything is working great. There is a strange issue however with the exclusive queues. Overall the connection is very stable for all of the services except the ones that declare one or more exclusive queus. The strange part is that they disconnect and reconnect (I use Consumer object) every 3 minutes. The error that is received is not very descriptive:

{
  "level": "error",
  "time": "2023-03-29T05:58:31.599Z",
  "version": "1.0.853",
  "name": "clientbff",
  "trace_id": "2f0a4e39ee64e6862d929dd9c1359205",
  "span_id": "b19aec1f82141b19",
  "msg": "Received disconnect from RabbitMQ",
  "err": {
    "type": "Error",
    "message": "read ECONNRESET",
    "stack": "Error: read ECONNRESET\n    at __node_internal_captureLargerStackTrace (node:internal/errors:490:5)\n    at __node_internal_errnoException (node:internal/errors:620:12)\n    at TCP.onStreamRead (node:internal/stream_base_commons:217:20)\n    at TCP.callbackTrampoline (node:internal/async_hooks:130:17)",
    "errno": -104,
    "code": "ECONNRESET",
    "syscall": "read"
  }
}

And here is the disconnect reaperring each 3 minutes.

image

And as I've told other connections are very stable except two services that user only exclusive queues
image

Any guidance is appreciated

Update:

It seems this issue is not related to exlusive queues, but to noack mode, this is the parameter that triggeres the disconnects

define dead-letter exchange

Hi, i want to ask if is there any way to define dead-letter exchange for each request. so if the consumer throw an error the message will be sent to dead-letter exchange and also if there is any way to set x-queue-type to quorum

MaxListenersExceededWarning

9|index  | (Use `node --trace-warnings ...` to show where the warning was created)
9|index  | MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 drain listeners added to [Socket]. Use emitter.setMaxListeners() to increase limit
9|index  |     at _addListener (node:events:601:17)
9|index  |     at Socket.addListener (node:events:619:10)
9|index  |     at Readable.on (node:internal/streams/readable:887:35)
9|index  |     at new EncoderStream (/home/smsflash/smsflash__node/.build/node_modules/.pnpm/[email protected]/node_modules/rabbitmq-client/lib/util.js:113:13)
9|index  |     at new Channel (/home/smsflash/smsflash__node/.build/node_modules/.pnpm/[email protected]/node_modules/rabbitmq-client/lib/Channel.js:90:21)
9|index  |     at Connection.acquire (/home/smsflash/smsflash__node/.build/node_modules/.pnpm/[email protected]/node_modules/rabbitmq-client/lib/Connection.js:110:20)
9|index  |     at Consumer._setup (/home/smsflash/smsflash__node/.build/node_modules/.pnpm/[email protected]/node_modules/rabbitmq-client/lib/Consumer.js:223:46) {
9|index  |   emitter: null,
9|index  |   type: 'drain',
9|index  |   count: 11
9|index  | } { hide_meta: true }

why i am getting this error ?. No idea

`await Publisher.send()` hangs when reconnecting

This issue is very difficult to reproduce, in fact I am unable to provide a working snippet with the error.

I have deployed a Lambda function in AWS using this library.
Here's a somewhat similar version of the code:

const { Connection } = require("rabbitmq-client");

let rabbitConnection;
let rabbitPublisher;

exports.handler = async (event) => {
  await setup();

  let message = { "this is": "a message" };

  let outputBuffer = Buffer.from(JSON.stringify(message), "utf-8");

  await rabbitPublisher.send(
    { exchange: "my_exc", routingKey: "a.sample.topic" },
    outputBuffer
  );
  console.log("Message published to a.sample.topic");
};

async function setup() {
  if (!rabbitConnection) {
    console.log("Creating RabbitMQ connection");
    rabbitConnection = new Connection({
      connectionName: "a-name",
      url: "http://localhost:15672", // or whatever the default URL is, i'm using a real rabbitMQ broker deployed on aws
      connectionTimeout: 5000,
      acquireTimeout: 5000,
      retryLow: 300,
      retryHigh: 5100,
      noDelay: true,
    });
    rabbitConnection.on("error", (err) => {
      console.log("RabbitMQ connection error", err);
    });
    rabbitConnection.on("connection", () => {
      console.log("Connection successfully (re)established");
    });

    console.log("creating RabbitMQ publisher");
    rabbitPublisher = rabbitConnection.createPublisher({
      confirm: true,
      maxAttempts: 3,
      exchanges: [
        { exchange: "my_exc", type: "topic", durable: true },
      ],
    });
    rabbitPublisher.on("basic.return", (msg) => {
      console.log("RabbitMQ publisher basic return", msg);
    });
    rabbitPublisher.on("retry", (err, envelope, body) => {
      console.log("RabbitMQ publisher error", envelope, body, err);
    });
  }
}

Lambdas exist in a "lambda context", meaning aws will execute the top-level code once, then call the handler function whit the event that triggered the execution.
If a second execution is triggered within a certain time (variable and unspecified, ~2-10 minutes depending on many factors), the top level code doesn't get re-run and instead only the handler function gets called with the new event. This is the reason I'm not closing the connection at the end of the handler function: I'm hoping to reuse it during following executions.
Between executions the lambda context freezes, meaning the underlying node.js loop is stopped.

The problem I'm facing is the following:

  • execution A of the lambda function:
    • everything is fine, the connection gets established and the message is correctly sent to my broker
    • after the execution the lambda context freezes
  • 60 seconds pass (>2 rabbitMQ heartbeats)
  • execution B of the lambda function:
    • the context un-freezes
    • the execution reaches the line where i do await rabbitPublisher.send(...) (verified using console.log)
    • rabbitConnection realizes the connection has been closed by the broker (due to missed heartbeats)
    • the connection gets re-established
    • the promise returned by rabbitPublisher.send(...) never gets resolved and the code hangs waiting on it
    • the lambda function reaches it's time limit and times out

Here is a screenshot of the logs
image

There are a few lines i didn't include in the snippet (like retrieving the secret containing the credentials to access the broker), but those shouldn't be relevant.

As you can see the problem doesn't always show up, and this makes it even more difficult to debug.
I believe that this could be caused by the reconnect mechanism in itself:
I'm not good with js (or ts, for that matter), but what I feel is that when the publisher awaits the underlying buffer to be consumed, sometimes the node.js loop gives back control to the heartbeat thing, and when it detects that the connection is down it re-establishes the connection itself, the publisher and the queue (hence the buffer, i believe), but I'm still awaiting on the old buffer's promise which does never get resolved.
This could very well be complete bs, consider it a humble interpretation from a fellow python dev.

Other important details:

  • the issue occurs (still, only sometimes) if i forcefully close the connection from the broker web-interface while the lambda context is frozen
  • increasing the timeout of the lambda function does not help, it hangs anyway
  • the lambda runs on node.js 20
  • I'm using rabbitmq-client version 4.5.1
  • as a workaround I've configure the lambda to retry once before giving up, but I really don't like it

how nack in consumer

Hi, i want to ask how to nack work on createConsumer in docs you said if you throw an error in handler it automatically return nack but if I throw an error my app crash this is my code :

this._consumer = this.rabbit.createConsumer(
      {
        queue: process.env.SERVICE_NAME as string,
        queueOptions: {
          durable: true,
          arguments: {
            "x-queue-type": "quorum",
            "x-delivery-limit": 3,
          },
        },
        qos: { prefetchCount: 1 },
        exchanges: [
          { exchange: "router", type: "topic", durable: true },
          { exchange: "x-dead-letter-exchange", type: "direct", durable: true },
        ],
        queueBindings: [{ exchange: "router", routingKey: "transcoder" }],
      },
      async (msg) => {
        await this.transcoderConsumer.handleConsume(
          msg,
          this.sendMessage.bind(this)
        );
      }
    );

so if i throw an exception on handleConsume my app gets crashes. and in general is it not better to use a boolean here? for example if this function returns true ack message and if it return false nack message

Publish doesn't await consumer ack

First of all, thanks for the library! Though I'm having some trouble with awaiting acknowledgements.

I have a consumer:

 const consumer = client.createConsumer(
   {
     queue,
     queueOptions: { exclusive: true },
     exchanges: [{ exchange, type: 'topic', autoDelete: true }],
     queueBindings: [{ exchange, routingKey }],
   },
   async (msg) => {
     await doSomeAsyncWork()
  });

And I have a publisher:

const publisher = client.createPublisher({
  confirm: true,
  exchanges: [{ exchange, type: 'topic', autoDelete: true }]
})
                                                                  
await publisher.publish({ exchange, routingKey }, "my-message")

The publish function resolves directly after sending the message to the queue, instead of waiting on the ack. Is there anything I'm missing? I've also tried to just aquire a channel and do await channel.confirmSelect() but it resulted in the same behavior.

keep metrics

Hi,

We've been using this great lib for a while to read messages from a queue and call a REST API for each message and last week we had a first "grown up" moment (70k in 12 hours, 11k in 30 min).

Everything worked fine but the default configuration (concurrency 1 and prefetch 2) was a bit too slow and we got a bit of a backlog until we stopped and restarted with concurrency 4 + prefetch 8.

We would like to monitor how many messages were:

  • processed (ack)
  • requeued (nack+requeued)
  • rejected (nack+dead-letter)
  • waiting to be processing

the first 3 are simple counters and we could implement it in the client using the library, but as it might be a common need, it might be worth implementing in the library?

for the later it seems there are multiple methods to see how many messages are waiting https://www.rabbitmq.com/queues.html#queue-length, would it be possible to implement the queue.declare-ok + messageCount ?

and a more open question: in your experience, is this worth dynamically changing the concurrency + prefetch or is there no downside to start directly with the max concurrency+prefetch ?

connection.unsafeDestroy() and connection.close() hang forever

On Mac OS, my code works fine, including connection.close(); however, on Linux the two connection closing calls connection.unsafeDestroy() and connection.close() hang forever. Everything else in terms of dealing with the RabbitMQ queue seems to work just fine before that point.

The Linux code is embedded in a Vercel pkg executable, if that matters. Sometimes strange things can happen when packaging NodeJS code in pkg, electron etc.

I am not saying there's a bug in your library, just looking for pointers to get to the bottom of possible scenarios / deeper reasons that closing the socket to 127.0.0.1, port 5672 could hang indefinitely. Thanks!

jackrabbit

Hi,

i just noticed "pagerinc" in your profile.. i was wondering why there is another rabbitmq lib and what the reasoning behind this is ?

regards
Dennis

Publisher Confirms - How to use

Hi! I am create a example with lib for study and practice, but i not found how to get the confirmation, i found only function confirmSelection(), how i get the confirmation on publisher ? I read the code and found confirm.select-ok, but dont found how to use this method

Consumer close deadlock

Hello! I believe the latest version is subject to deadlocks in consumer close. I am not yet able to provide concise reproduction, but approximate scenario in my integration tests is following:

  1. Create connection, use lazy channel to declare exchange
  2. Create bunch of publisher and consumers
  3. Close publisher and consumers - here is the deadlock occur while closing the underlying channel during executing
    close callback. The loop inside _findNode never exits and the close promise is not resolved.

Changing first step to avoid using lazy channel and creating explicit channel seems to solve the problem

Cannot find module 'node:net' happened

I used a simple js like below
const Connection =require('rabbitmq-client')
but exception happened.
internal/modules/cjs/loader.js:883
throw err;
^

Error: Cannot find module 'node:net'
Require stack:

  • /Users/keyin/code/framwork/node_modules/.pnpm/[email protected]/node_modules/rabbitmq-client/lib/Connection.js
  • /Users/keyin/code/framwork/node_modules/.pnpm/[email protected]/node_modules/rabbitmq-client/lib/index.js
  • /Users/keyin/code/framwork/sample.js
    at Function.Module._resolveFilename (internal/modules/cjs/loader.js:880:15)
    at Function.Module._load (internal/modules/cjs/loader.js:725:27)
    at Module.require (internal/modules/cjs/loader.js:952:19)
    at require (internal/modules/cjs/helpers.js:88:18)
    at Object. (/Users/keyin/code/framwork/node_modules/.pnpm/[email protected]/node_modules/rabbitmq-client/lib/Connection.js:29:36)
    at Module._compile (internal/modules/cjs/loader.js:1063:30)
    at Object.Module._extensions..js (internal/modules/cjs/loader.js:1092:10)
    at Module.load (internal/modules/cjs/loader.js:928:32)
    at Function.Module._load (internal/modules/cjs/loader.js:769:14)
    at Module.require (internal/modules/cjs/loader.js:952:19) {
    code: 'MODULE_NOT_FOUND',
    requireStack: [
    '/Users/keyin/code/framwork/node_modules/.pnpm/[email protected]/node_modules/rabbitmq-client/lib/Connection.js',
    '/Users/keyin/code/framwork/node_modules/.pnpm/[email protected]/node_modules/rabbitmq-client/lib/index.js',
    '/Users/keyin/code/framwork/sample.js'
    ]
    }

Connection gets closed when using Bun instead of node

Version: 4.5.2
RabbitMQ version: 3.13.0

RabbitMQ error:

2024-04-03 14:26:42.514858+00:00 [info] <0.785.0> accepting AMQP connection <0.785.0> (172.30.0.1:57434 -> 172.30.0.3:5672)
2024-04-03 14:26:45.524204+00:00 [error] <0.785.0> closing AMQP connection <0.785.0> (172.30.0.1:57434 -> 172.30.0.3:5672):
2024-04-03 14:26:45.524204+00:00 [error] <0.785.0> {handshake_error,starting,28255,
2024-04-03 14:26:45.524204+00:00 [error] <0.785.0>                  {amqp_error,frame_error,
2024-04-03 14:26:45.524204+00:00 [error] <0.785.0>                              "type 206, all octets = <<>>: {frame_too_large,1431529844,4088}",
2024-04-03 14:26:45.524204+00:00 [error] <0.785.0>                              none}}

Client side error:
{"code":"CONN_CLOSE","name":"AMQPConnectionError"}

onShutdown (documentation clarification)

what is the best practice to properly shutdown? (I'm consuming messages)

the aim is being able to handle SIGINT/SIGTERM and wait until the messages are processed (or possibly if the processing takes too long, to nack them and leave)

the readme contains an onShutdown function, but I didn't get how/where it's supposed to be used

// Clean up when you receive a shutdown signal
async function onShutdown() {
  // Waits for pending confirmations and closes the underlying Channel
  await pub.close()
  // Stop consuming. Wait for any pending message handlers to settle.
  await sub.close()
  await rabbit.close()
}```

consumer: set requeue on a per message basis

Hi,

we're using rabbitmq to call a rest API and we'd like to handle two different types of error:

  • short term: if the REST server is busy, it will return a "try later" error, we'd like to requeue the message, pause a bit an retry, possibly closing the Consumer and trying with a lower concurrency
  • long term: for whatever reason, the message can't be processed, we would like to nack but not requeue, so the message goes into the dead letter queue, so it gives the option to other messages to be processed

I've seen you can do it when using basicNack, or can set the default when setting up the Consumer. Is there a way to have the both of both world?

bonus if we can do it as well based on the redelivered status of the message

Customizable reply routing in consumer

Hey there!
First of all, big thanks for you work! We really like your library, especially higher-level Consumer/Publisher API.

I want suggest an enhancement for Consumer API: in our application we need to route replies in specific exchanges and this is not possible due the order of Envelope properties in destructuring at line 200 of Consumer. For now we're use monkey patch from example which allow us to config reply function like Publisher.send method and kept default behaviour from documentation.

Do you think this change would be useful for the library?

const { Connection } = require('rabbitmq-client');
const { Consumer } = require('rabbitmq-client/lib/Consumer');

class ProcessingConsumer extends Consumer {
    _makeReplyfn(req) {
        return (body, envelope) => {
            if (!req.replyTo) {
                throw new Error('attempted to reply to a non-RPC message');
            }
            return this._ch.basicPublish({
                exchange: '',
                routingKey: req.replyTo,
                correlationId: req.correlationId,
                ...envelope,
            }, body);
        };
    }
}

class ProcessingConnection extends Connection {
    createConsumer(props, cb) {
        return new ProcessingConsumer(this, props, cb);
    }
}

exports.ProcessingConsumer = ProcessingConsumer;
exports.ProcessingConnection = ProcessingConnection;

Any future additions for batch consumption?

I have an application that receives several messages, with this library I need to search the database for each message at a time and consequently insert/update one by one. Are you thinking about adding support for batch consumer?

For example, we have prefetchCount at 100, it would take these 100 messages and pass them to the handler as an array of messages so we could work on them all at once.

It would also be nice if you use this batch to double the value of prefetchCount to have something hot in your hands.

Thank you for everything

[Feature Request] Lazy Consumer

It would be really helpful if Consumer could be create without starting consuming messages directly. In my usecase i want to inject the Consumer via a dependencies injection Container into a Service and want to start consuming messages after some other dependencies are ready.

I think maybe adding a lazy boalean flag to the Option could control if the connect method is called in the constructor.

I can work on a PR if you are open for such a feature.

Transient vs. Permanent problems

I'm moving a project from amqplib and I am enjoying node-rabbitmq-client very much so far, thank you for your work!

I'm not sure issue are the right place to discuss, please point me to a better venue if there is a mailing list/forum/something I missed and please indulge my rambling below.

One general issue we are trying to get right is how to distinguish between self healing issues (say network is down for 2 min, be sure we recreate the channel and keep exchanging messages as nothing happened and be confident we didn't loose any) and those that needs manual intervention (no connection for 4 hours, someone needs to check if the DNS record is correct or something)

Of course, some of it will need to be handled on the software using your library, but I'm trying to identify what signals are available that already exists to make it more robust.

connection

  • retryLow + retryHigh
    is there a way to alerted that the connection has be done for > retryHigh? I didn't find an connection.on( 'retry' ) is there something to catch that problem? an on('error') ?

channel

I'm not sure if there are cases where the connection is up but channel down, is there?

consumer/producer/queue

the queue seems mostly abstracted under consumer or producer (no top class) https://cody-greene.github.io/node-rabbitmq-client

It might be already handled by the .on('error'), but I didn't find a documentation on what type of errors each object can send and what they mean.

New Feature Request: Add optional back-off option to consumer

Problem

For small queues if a required service goes down for the consumer to function properly and causes errors or a temporary issue with a particular message, the message gets put back into the queue and almost immediately re-consumed hence making a cycle.

Proposed Solution

Optional back-off option to the consumer constructor which adds a non-blocking wait to the consumer before requeuing the message.

const consumer = Rabbit.createConsumer(
  {
    queue: "tasks",
    queueOptions: {
      ...
    },
    exchanges: [...],
    backOff: 60 // When ConsumerStatus.REQUEUE, wait 60 seconds before actually requeuing the message
                      // meanwhile continue processing next message
    ...
  },

problem to set correlationId

this my code

async sendRPCMessage(queue: string, messageData: string) {
        const rabbit = new Connection(rabbitmq)
        const rpcClient = rabbit.createRPCClient({ confirm: true })
        const res = await rpcClient.send({ routingKey: queue, correlationId: '123', expiration: '123' }, 'messageData');
        console.log('response:', res.body)
        await rpcClient.close()
        await rabbit.close()
    }

and here is its output

{
  consumerTag: 'amq.ctag--wGfKOLXxFHts5XzcobW_w',
  deliveryTag: 1,
  redelivered: false,
  exchange: '',
  routingKey: 'tmpTest',
  contentType: 'text/plain',
  deliveryMode: 1,
  correlationId: '1',
  replyTo: 'amq.rabbitmq.reply-to.g1h2AA5yZXBseUA1MDAzMzQxNAAAOUkAAAAmZTzZFA==.uEd6YdebP5/BEhi5KtJGaQ==',
  expiration: '30000',
  timestamp: 1701853044,
  durable: false,
  body: 'messageData'
}

as you can see correlationId fild didnt change and it has defult value

Ampq Connection timeout

AMQPConnectionError: connection timed out
    at Timeout._onTimeout (/home/smsflash/smsflash__node/node_modules/.pnpm/[email protected]/node_modules/rabbitmq-client/lib/Connection.js:285:32)
    at listOnTimeout (node:internal/timers:573:17)
    at processTimers (node:internal/timers:514:7) {
  code: 'CONNECTION_TIMEOUT'

Getting this error from this morning. Tried to restart it did everything but still.

publisher.send throw uncaught exception in case of large payload in first call

Connection and publisher taken from sample. First .send call

connection = new Connection( process.env.RABBIT_MQ_URL )

publisher = connection.createPublisher( {
	confirm     : true,
	maxAttempts : 2,
	exchanges   : [ { exchange: 'NAME', type: 'topic', durable: true } ]
} )

try {
	await publisher.send( 
		{ 
			exchange   : 'NAME', 
			routingKey : 'files.process', 
				headers    : {
					refresh_token,
					ws // too large (large object instead scalar value)
			}
		}, 
		{ uuid } 
	)	
}
catch ( e ) {
	// here receive 1 exception
	// code: 'FRAME_ERROR'
	// name: 'AMQPConnectionError'
	// message: 'undefined: FRAME_ERROR - type 2, all octets = <<>>: {frame_too_large,7477,4088}'
	logger.error( ( e as any ).message )	
}


process.on( 'uncaughtException', ( err, origin ) => {
	// here receive 2 exceptions but this is unwanted exceptions
	// code: 'FRAME_ERROR'
	// name: 'AMQPConnectionError'
	// message: 'undefined: FRAME_ERROR - type 2, all octets = <<>>: {frame_too_large,7477,4088}'	
	console.error( `err '${ err }', occurred in ${ origin }` )
} )

CURRENT
payload large, receive 2 uncaughtException, and 1 exception in catch
// code: 'FRAME_ERROR'
// name: 'AMQPConnectionError'
// message: 'undefined: FRAME_ERROR - type 2, all octets = <<>>: {frame_too_large,7477,4088}'

EXPECTED:
payload large, receive 1 exception in catch
// code: 'FRAME_ERROR'
// name: 'AMQPConnectionError'
// message: 'undefined: FRAME_ERROR - type 2, all octets = <<>>: {frame_too_large,7477,4088}'

Explicit way of setting up the connection

Hey, as I was setting up some tests in one of my projects - I had some issues with rabbitmq server.
The problem though is with the logs my app would capture, they were just generic node AggregateError without any insightful stack.

Then I looked at what's happening, and noticed that in the constructor you're initiating the call that is handled async. So when my app is getting initialized & the error happens, this can throw off whole node process.

Could there be possibility for extending the class Connection to perhaps include a static method? e.g.:

class Connection {
  static create(propsOrUrl?: string|ConnectionOptions): Promise<Connection> {
     ... initialization that waits for the connection to succeed
  }
}

It would really help with catching all errors in app setup.

Let me know if you're opened to outside contributions, so I could help you a little bit with this amazing project 👍

Streaming/batched publishers

In certain cases I'd like to rapidly publish a lot of messages at once, with confirms enabled. Currently this means collecting a bunch of promises and using Promise.all(...). It would be nice if I had a API conforming to the node WritableStream interface (where too many unconfirmed messages creates back-pressure), or maybe just a sendAll(messages) method.

Connection is not a constructor

hello dear
i can not run sample
i get error on the first line import

const rabbit = new Connection();
           ^

TypeError: Connection is not a constructor
at file:///Users/kiax/xxx/src/index.mjs:1:16
at ModuleJob.run (node:internal/modules/esm/module_job:194:25)

my node version is : v18.x

Re-connect on PRECONDITION_FAILED error

First of all, I want to thank you so much for building this superb RMQ client. ❤️
I've been using it for quite some time now and am pleased with the results.

There is only one situation I couldn't find a solution to.
Every now and then I get this error message:

PRECONDITION_FAILED - delivery acknowledgement on channel 2 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more

Unfortunately, the client doesn't re-connect like usual and my worker is just stuck, unable to process any further jobs.

My question is, how can I recover from this error?
I found these 2 places in the code base and it seems like handling that error is not yet implemented:
https://github.com/cody-greene/node-rabbitmq-client/blob/master/src/Channel.ts#L297
https://github.com/cody-greene/node-rabbitmq-client/blob/master/src/Channel.ts#L315

This is a simplified version of my client bootstrapping:

  const rabbit = new Connection({
    url: rmqUrl,
    retryLow: 1_000,
    retryHigh: 30_000,
  });

  rabbit.on('error', (error) => {
    logger.error('MQ connection error');
  });

  rabbit.on('connection', () => {
    logger.log('MQ connection successfully (re)established');
  });

Is there any way I could re-connect manually from the error handler?
Or better yet, what would you recommend?

Thank you in advance 🙏

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.