Giter Club home page Giter Club logo

moleculer-channels's People

Contributors

andremaz avatar dowster avatar icebob avatar lucduong avatar storm8719 avatar ujwal-setlur avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

moleculer-channels's Issues

Loosing subscriber connection to Kafka

We are using channels (Kafka message broker) and from time to time (pretty much at least once a day) without any indications warnings or errors the subscriber connection to Kafka topic gets dropped while other code in service-node (like actions handlers) keep working fine. We detect this by seeing that the messages in topics are not read by subscribers.

So far the only solution we’ve found is to restart the service-node. On fresh start the channels reconnect to Kafka topics and for a while everything works until again the connection to Kafka topic gets dropped.

Is there a mechanism to reconnect to the message broker (Kafka) when it (or whatever) drops the subscriber connection?

Or get some sort notification from the framework that we’re not connected anymore?

Thank you!

durable_name NATS option being overwritten.

Prerequisites

Please answer the following questions for yourself before submitting an issue.

  • I am running the latest version
  • I checked the documentation and found no answer
  • I checked to make sure that this issue has not already been filed
  • I'm reporting the issue to the correct repository

Current Behavior

Specifying the durable_name consumer option for NATS is not possible as it gets overwritten with the name of the service. This is important, as defining ephemeral streams in NATS requires that this option be set to null.

Expected Behavior

The durable_name consumer option can be set and does not get overridden. Perhaps if not specified, the option can default to the name of the service.

Steps to Reproduce

Please provide detailed steps for reproducing the issue.

  1. Specify the durable_name option in the middleware construction
ChannelsMiddleware({
	adapter: {
		type: "NATS",
		options: {
			nats: {
				consumerOptions: {
					config: {
						durable_name: "TESTING123"
					}
				}
			},
		}
	}
}),
  1. Add a console.log / breakpoint in ./src/adapters/nats.js:212 to print the consumer options

E.g.

...
consumerOpts.queue = streamName;
consumerOpts.config.deliver_group = streamName;
// NATS Stream name does not support: spaces, tabs, period (.), greater than (>) or asterisk (*) are prohibited.
// More info: https://docs.nats.io/jetstream/administration/naming
consumerOpts.config.durable_name = chan.group.split(".").join("_");
consumerOpts.config.deliver_subject = chan.id;
consumerOpts.config.max_ack_pending = chan.maxInFlight;
consumerOpts.callbackFn = this.createConsumerHandler(chan);

console.log("OPTS", consumerOpts) // PRINT THE CONSUMER OPTIONS

// 3. Create a subscription
try {
	const sub = await this.client.subscribe(chan.name, consumerOpts);
	this.subscriptions.set(chan.id, sub);
} catch (err) {
...

Context

Please provide any relevant information about your setup. This is important in case the issue is not reproducible except for under certain conditions.

  • Moleculer version: 0.14.0
  • NodeJS version: v18.12.1
  • Operating System: Windows 11

Thanks in advance!
Isaac

Bug or Feature? amqp: SendToChannel without Listener => Error 404 - no exchange

Prerequisites

Please answer the following questions for yourself before submitting an issue.

  • [x ] I am runno exchangening the latest version
  • [ x] I checked the documentation and found no answer
  • [ x] I checked to make sure that this issue has not already been filed
  • [ x] I'm reporting the issue to the correct repository

Current Behavior

Starting with an fresh instance of rabbitmq and trigger sendToChannel from moleculer i got follow Error:
[2022-05-10T07:30:40.939Z] ERROR my-client/CHANNELS: AMQP channel error Error: Channel closed by server: 404 (NOT-FOUND) with message "NOT_FOUND - no exchange 'something.done' in vhost '/'"

and the client exits with unhandled Promise-rejection.

As soon if i start a listener for this channel (somthing.done) once, even if i stop the listener again, then it works like expected.

Expected Behavior

I don't know if its an bug or a feature, but i expected the channel to be created either if the first listener register or if the
first producer send some messages in.

My idea was to send some events with a ttl-value, so if nobody cares about it, it will be deleted.
As soon as one consumer came up, he can use the stored events.
no exchange

Failure Information

[2022-05-10T07:30:40.939Z] ERROR my-client/CHANNELS: AMQP channel error Error: Channel closed by server: 404 (NOT-FOUND) with message "NOT_FOUND - no exchange 'something.done' in vhost '/'"
at Channel.C.accept (/home/me/public_html/moleculer-channel-test/node_modules/amqplib/lib/channel.js:422:17)
    at Connection.mainAccept [as accept] (/home/me/public_html/moleculer-channel-test/node_modules/amqplib/lib/connection.js:64:33)
    at Socket.go (/home/me/public_html/moleculer-channel-test/node_modules/amqplib/lib/connection.js:478:48)
    at Socket.emit (node:events:527:28)
    at Socket.emit (node:domain:475:12)
    at emitReadable_ (node:internal/streams/readable:601:12)
    at processTicksAndRejections (node:internal/process/task_queues:82:21) {
  code: 404,
  classId: 60,
  methodId: 40
}
[2022-05-10T07:30:40.941Z] ERROR my-client/CHANNELS: AMQP channel closed.

Steps to Reproduce

Please provide detailed steps for reproducing the issue.

  1. start a fresh instance of rabbitmq
  2. start producer that trigger sendToChannel("event.occured")
  3. => Error

Context

Please provide any relevant information about your setup. This is important in case the issue is not reproducible except for under certain conditions.

  • Moleculer version: 0.14.21
  • NodeJS version: 17.9.0
  • Operating System: Ubuntu 22.04
  • "@moleculer/channels": "0.1.2",
  • "amqplib": "^0.8.0"

Failure Logs


Add metrics counters

Add following metrics:

  • moleculer.channels.messages.sent with labels: channel _sent messages via sendToChannel
  • moleculer.channels.messages.total with labels: channel, group _total processed messages in channel handlers`
  • moleculer.channels.messages.active with labels: channel, group _total active processing messages in channel handlers (gauge)`
  • moleculer.channels.messages.time with labels: channel, group _histogram for processing time of channel handlers`
  • moleculer.channels.messages.errors.total with labels: channel, group _total errored messages in channel handlers`
  • moleculer.channels.messages.retries.total with labels: channel, group _total retried messages in channel handlers`
  • moleculer.channels.messages.deadLettering.total with labels: channel, group _total dead-lettered messages in channel handlers`

sendToChannel with Context

Make by analogy with the action/event in the moleculer so that params (payload) are inside the Context.
Thus, we solve the problem of event tracing with internal calls in handlers.
And we bring everything to a single view for all property transfer adapters inside the Context.

I understand this will cause breaking changes, but this is WIP. I don’t see anything critical for the project in this.

Redis do not auto trim redis stream data

Currently, we use Moleculer channel to pub/sub event between our micro service and we use Redis to stream it. But It does not have any method or something like this to trim stream data, it makes our Redis memory large time by time

Do we have any method to handle it? Or we must to do it by ourselves?

AMQP: DEAD_LETTER exchange needs to be asserted

Prerequisites

Please answer the following questions for yourself before submitting an issue.

  • [X ] I am running the latest version
  • [ X] I checked the documentation and found no answer
  • [X ] I checked to make sure that this issue has not already been filed
  • [ X] I'm reporting the issue to the correct repository

Current Behavior

When a message is tried to be moved to the dead-letter exchange/queue, that delivery fails and the channel closes because the exchange does not exist.

Expected Behavior

The dead-letter exchange/queue should be asserted into existence so that the message can be moved into the queue.

Failure Information

mol $ [2023-03-22T16:54:03.534Z] WARN  @txtsmarter/transformer-triloka-92/CHANNELS: AMQP message processing error in 'transformer.imessage' queue. Error: NOT ACCEPTED!
    at TransformerService.transformer.imessage (/Users/ujwal/Work/txtsmarter/sw/repos/blackbird/packages/services/transformer/src/transformer.service.ts:67:11)
    at TransformerService.<anonymous> (/Users/ujwal/Work/txtsmarter/sw/repos/blackbird/common/temp/node_modules/.pnpm/[email protected]_o6qzvejtzsnpaf7pu7pwc3mpbq/node_modules/moleculer/src/utils.js:212:22)
    at checkAuth (/Users/ujwal/Work/txtsmarter/sw/repos/blackbird/packages/services/transformer/src/middlewares/moleculer.auth.middleware.ts:76:18)
    at emitESEvent (/Users/ujwal/Work/txtsmarter/sw/repos/blackbird/packages/services/transformer/src/middlewares/moleculer.eventstore.middleware.ts:76:16)
    at logMessage (/Users/ujwal/Work/txtsmarter/sw/repos/blackbird/packages/services/transformer/src/middlewares/moleculer.log.middleware.ts:168:16)
    at wrappedHandler (/Users/ujwal/Work/txtsmarter/sw/repos/blackbird/common/temp/node_modules/.pnpm/github.com+moleculerjs+moleculer-channels@[email protected]/node_modules/@moleculer/channels/src/index.js:320:16)
    at Object.chan.handler (/Users/ujwal/Work/txtsmarter/sw/repos/blackbird/common/temp/node_modules/.pnpm/github.com+moleculerjs+moleculer-channels@[email protected]/node_modules/@moleculer/channels/src/index.js:336:16)
    at /Users/ujwal/Work/txtsmarter/sw/repos/blackbird/common/temp/node_modules/.pnpm/github.com+moleculerjs+moleculer-channels@[email protected]/node_modules/@moleculer/channels/src/adapters/amqp.js:365:16
    at Channel.BaseChannel.dispatchMessage (/Users/ujwal/Work/txtsmarter/sw/repos/blackbird/common/temp/node_modules/.pnpm/[email protected]/node_modules/amqplib/lib/channel.js:483:12)
    at Channel.BaseChannel.handleDelivery (/Users/ujwal/Work/txtsmarter/sw/repos/blackbird/common/temp/node_modules/.pnpm/[email protected]/node_modules/amqplib/lib/channel.js:492:15)
[2023-03-22T16:54:03.549Z] ERROR @txtsmarter/transformer-triloka-92/CHANNELS: AMQP channel error Error: Channel closed by server: 404 (NOT-FOUND) with message "NOT_FOUND - no exchange 'DEAD_LETTER' in vhost '/'"
    at Channel.C.accept (/Users/ujwal/Work/txtsmarter/sw/repos/blackbird/common/temp/node_modules/.pnpm/[email protected]/node_modules/amqplib/lib/channel.js:421:17)
    at Connection.mainAccept [as accept] (/Users/ujwal/Work/txtsmarter/sw/repos/blackbird/common/temp/node_modules/.pnpm/[email protected]/node_modules/amqplib/lib/connection.js:63:33)
    at Socket.go (/Users/ujwal/Work/txtsmarter/sw/repos/blackbird/common/temp/node_modules/.pnpm/[email protected]/node_modules/amqplib/lib/connection.js:486:48)
    at Socket.emit (node:events:513:28)
    at Socket.emit (node:domain:489:12)
    at emitReadable_ (node:internal/streams/readable:590:12)
    at processTicksAndRejections (node:internal/process/task_queues:81:21) {
  code: 404,
  classId: 60,
  methodId: 40
}
[2023-03-22T16:54:03.551Z] ERROR @txtsmarter/transformer-triloka-92/CHANNELS: AMQP channel closed.

Steps to Reproduce

Please provide detailed steps for reproducing the issue.

  1. set up moleculer channels to have 0 retries, and set up deadLettering:
  2. set up channel handler to throw an exception
  3. publish a message

Reproduce code snippet

broker config:

              maxRetries: 0,
              deadLettering: {
                enabled: true,
                queueName: 'DEAD_LETTER',
                exchangeName: 'DEAD_LETTER'
              },

channel handler:

  async 'transformer.imessage'(
    ctx: CTX<IMessageParams>
  ): Promise<IMessageResponse> {
    throw new Error('NOT ACCEPTED!');

Context

What's even worse is the channel will be closed preventing any further publishing.

I will issue a PR for this soon.

Connectivity issue in Redis adapter

After #20, the following adapter settings don't work well with Redis. The default adapter options are losses.

image

	middlewares: [
		ChannelsMiddleware({
			adapter: {
				type: "Redis",
				options: {
					redis: "localhost:6379"
				}
			}
		})
	],

Store the failed consumer "group" in headers

Before we push the failed message to the dead-letter topic, we should store the failed "x-original-group" name into the message header, similar to "x-original-channel" name.

Error using typescript

Prerequisites

Please answer the following questions for yourself before submitting an issue.

  • I am running the latest version
  • I checked the documentation and found no answer
  • I checked to make sure that this issue has not already been filed
  • I'm reporting the issue to the correct repository

Current Behavior

I try to tsc my project using @moleculer/channels but it returns me two errors:
@moleculer/channels/types/src/adapters/base.d.ts:120:35 - error TS2304: Cannot find name 'Service'.
@moleculer/channels/types/src/index.d.ts:30:1 - error TS2309: An export assignment cannot be used in a module with other exported elements.

The first error coming from version 0.1.3

Expected Behavior

I can compile my project.

Steps to Reproduce

Please provide detailed steps for reproducing the issue.

  1. Add package to package.json
  2. Use it in your project
  3. Try to compile typescript

Context

  • Moleculer version: 0.14.23
  • NodeJS version: 18 (failing with 16 as well)

Proposing to change export = _exports to export default _exports in types/src/index.d.ts.
And add import of type Service in types/src/adapters/base.d.ts (in subscription function declaration)

Redis Cluster Configuration Issue

Prerequisites

Please answer the following questions for yourself before submitting an issue.

  • I am running the latest version
  • I checked the documentation and found no answer
  • I checked to make sure that this issue has not already been filed
  • I'm reporting the issue to the correct repository

Current Behavior

I'm currently running a Redis cluster and noticed that defining the "cluster" parameter at the top level per the Readme was not connecting successfully. After looking at the source code, I noticed that the "cluster" property needs to be defined under the "redis" property.

Expected Behavior

Cluster configuration per instructions in Readme should result in successful connection to cluster.

Failure Information

Steps to Reproduce

Reproduce code snippet

This does NOT work:

ChannelsMiddleware({
        adapter: {
          type: "Redis",
          options: {
            cluster: { nodes: [ ... ]},
            redis: {
              consumerOptions: { .... }
          }
    }
}

This DOES works:

ChannelsMiddleware({
        adapter: {
          type: "Redis",
          options: {
            redis: {
              cluster: { nodes: [ ... ]},
              consumerOptions: { .... }
          }
    }
}

Context

Please provide any relevant information about your setup. This is important in case the issue is not reproducible except for under certain conditions.

  • Moleculer version: 0.14.23
  • NodeJS version: 14
  • Operating System: OSX

Failure Logs

N/A

Add benchmarks

Test suites with every adapter:

  • latency test with maxInFligth: 1
  • latency test with maxInFligth: null (no limit)
  • throughput test with 10k messages with maxInFligth: 1
  • throughput test with 10k messages with maxInFligth: null (no limit)

Others:

  • generate report with charts to markdown
  • add Github Action which runs benchmark test and push to repo. Triggered manually.

Custom sendMethodName is not working, still defaulting to sendToChannel

Prerequisites

Please answer the following questions for yourself before submitting an issue.

  • [ X] I am running the latest version
  • [X ] I checked the documentation and found no answer
  • [ X] I checked to make sure that this issue has not already been filed
  • [ X] I'm reporting the issue to the correct repository

Current Behavior

Specifying sendMethodName in the options is not working. There is no method with the specified name found. Instead, the method is still called the default sendToChannel.

Expected Behavior

The broker should have a method with the name specified in the sendMethodName option.

Failure Information

Error is:

broker.sendChannelMessage is not a function

Steps to Reproduce

Set up middleware:

const mw = ChannelMiddleware({
  adapter: {
    type: 'Fake',
    sendMethodName: 'sendChannelMessage'
  }
});

Send a message:

await broker.sendChannelMessage('typedService.channel-event-1');

Context

Please provide any relevant information about your setup. This is important in case the issue is not reproducible except for under certain conditions.

  • Moleculer version: 0.14.28
  • NodeJS version: 18
  • Operating System: OS X 13

Failure Logs

broker.sendChannelMessage is not a function

benchmark sendToChannel vs emit

Do we have benchmarks of sendToChannel vs emit? I see a huge performance penalty while running a load test. with emit I can ingest and emit around 14K messages/sec, while with sendToChannel, I can send about 300 messages/sec. I am using with context. This seems to be a very drastic difference...

amqp failed to reconnect

Prerequisites

Please answer the following questions for yourself before submitting an issue.

  • [ x] I am running the latest version
  • [ x] I checked the documentation and found no answer
  • [ x] I checked to make sure that this issue has not already been filed
  • [ x] I'm reporting the issue to the correct repository

Current Behavior

I start a service with amqp-transport (rabbitmq) and moleculer-channels listen to a given topic. If the amqp-server connection
breaks than, molceuler-channels won't reconnect. Instead it always logs the following error-message:

[2022-12-02T13:22:49.483Z] INFO  my-service/CHANNELS: Reconnecting...
[2022-12-02T13:22:49.535Z] INFO  my-service/CHANNELS: AMQP is connected.
[2022-12-02T13:22:49.537Z] INFO  my-service/CHANNELS: AMQP channel created.
[2022-12-02T13:22:49.537Z] INFO  my-service/CHANNELS: Resubscribing to all channels...
[2022-12-02T13:22:49.540Z]
ERROR my-service/CHANNELS: Unable to connect AMQP server. MoleculerError: Already tracking active messages of channel my-service.my-service.something.done
   at AmqpAdapter.initChannelActiveMessages (/home/me/public_html/moleculer-channels-amqp-test/node_modules/@moleculer/channels/src/adapters/base.js:163:10)
   at AmqpAdapter.subscribe (/home/me/public_html/moleculer-channels-amqp-test/node_modules/@moleculer/channels/src/adapters/amqp.js:314:9)
   at processTicksAndRejections (node:internal/process/task_queues:96:5)
   at async AmqpAdapter.resubscribeAllChannels (/home/me/public_html/moleculer-channels-amqp-test/node_modules/@moleculer/channels/src/adapters/amqp.js:496:4)
   at async AmqpAdapter.tryConnect (/home/me/public_html/moleculer-channels-amqp-test/node_modules/@moleculer/channels/src/adapters/amqp.js:215:4) {
 code: 500,
 type: undefined,
 data: undefined,
 retryable: false
}

Expected Behavior

As soon as the amqp-server is back online, the channels-module should reconnect to the server and start processing
the waiting messages.

Failure Information

Steps to Reproduce

I've created a minimal-repo here: https://github.com/me23/moleculer-channels-amqp-test.git

  1. clone repos and change path to it
  2. if you don't have an Rabbitmq running on localhost you could start one with docker:
    docker-compose up
  3. start the service: node source/service.js
  4. restart rabbitmq: e.g. docker restart
  5. you get the error

Context

Please provide any relevant information about your setup. This is important in case the issue is not reproducible except for under certain conditions.

  • Moleculer version: 0.14.26
  • NodeJS version: 17.9.0
  • Operating System: ubuntu 22.10

Redis adapter

Publish offline messages, simulate subscription exceptions, set to retry 100 times, and disconnect the subscription end when sending 10 times,
When the subscriber reconnects, the subsequent retry message cannot be received and the message is lost

Error on reconnection with resubscribeAllChannels

Prerequisites

Please answer the following questions for yourself before submitting an issue.

  • I am running the latest version
  • I checked the documentation and found no answer
  • I checked to make sure that this issue has not already been filed
  • I'm reporting the issue to the correct repository

Current Behavior

When I'm running moleculer-channels and I got an error, timeout o whatever (an error how cause a reconnection to amqp) when moleculer-channels try to reconnects calls method resubscribeAllChannels. This method always return an error Already tracking active messages of channel XXXX which is located in the method initChannelActiveMessages.

Expected Behavior

No errors and reconnectins as usual.

Temporal solution

I put a try/catch inside resubscribeAllChannels

async resubscribeAllChannels() {
		this.logger.info("Resubscribing to all channels...");
		for (const { chan } of Array.from(this.subscriptions.values())) {
			try {
				await this.subscribe(chan);
			} catch (e) {
				this.logger.error('[resubscribeAllChannels]', e)
			}
		}
	}

Question

Is necessary call the method resubscribeAllChannels, because I make some tests commenting this line and all works well.

Context

Please provide any relevant information about your setup. This is important in case the issue is not reproducible except for under certain conditions.

  • Moleculer version: 0.14.17
  • NodeJS version: 12.22.6

The NATS Channel adapter breaks when using wildcard topics

Prerequisites

Please answer the following questions for yourself before submitting an issue.

  • I am running the latest version
  • I checked the documentation and found no answer
  • I checked to make sure that this issue has not already been filed
  • I'm reporting the issue to the correct repository

Current Behavior

The adapter currently ignores any specified subjects in options.nats.streamConfig.subjects. Instead, it will always use [nameOfStream] as the subject. This occurs here: https://github.com/moleculerjs/moleculer-channels/blob/master/src/adapters/nats.js#L182 . The adapter is trying to create a stream for each subscription it finds.

{
    name: 'sub',
    'streamOne': {
      'streamOneTopic.abc': {
        group: 'other',
        async handler (payload) {
          console.log(`Processing streamOneTopic: ${JSON.stringify(payload)}}`);
        }
      },
      'streamOneTopic.xyz': {
        group: 'other',
        async handler (payload) {
          console.log(`Processing streamOneTopic: ${JSON.stringify(payload)}}`);
        }
      }
    },
  }

The above will work. We end up with two streams: streamOneTopic_abc (using subject=streamOneTopic.abc) and streamOneTopic_xyz (using subject=streamOneTopic.xyz).

But when you take the code snippet below (that listens for streamOneTopic.*, it fails with Error: no stream matches subject, even thought in though I specified options.nats.streamConfig.subjects = ["streamOneTopic.*"].

At a high level, the following is how the NATS adapter should operate:

  • Read options.nats.streamConfig and create one stream using the options specified there. (e.g. name="streamOne" and subjects=["streamOneTopic.*"]
  • For each subscription, create one consumer against the stream that is filtering by the specified topic (e.g. streamOneTopic.* should create a consumer where filter_subject="streamOneTopic.*".

The current logic is "almost" there. My team uses NATS (and we sponsor Moleculer) and we're happy to battle test this adapter further.

Steps to Reproduce

Reproduce code snippet

const { ServiceBroker } = require('moleculer');
const ChannelsMiddleware = require('@moleculer/channels').Middleware;

(async () => {
  const broker = new ServiceBroker({
    nodeID: 'channelTest',
    logger: false,
    logLevel: 'debug',
    middlewares: [
      ChannelsMiddleware({
        schemaProperty: 'streamOne',
        adapterPropertyName: 'streamOneAdapter',
        sendMethodName: 'sendToStreamOneChannel',
        channelHandlerTrigger: 'emitStreamOneLocalChannelHandler',
        adapter: {
          type: 'NATS',
          options: {
            nats: {
              url: process.env.NATS_SERVER,
              connectionOptions: {
                debug: true,
                user: process.env.NATS_USER,
                pass: process.env.NATS_PASSWORD,
              },
              streamConfig: {
                name: 'streamOne',
                subjects: ['streamOneTopic.*'],
              },
              consumerOptions: {
                config: {
                  deliver_policy: 'new',
                  ack_policy: 'explicit',
                  max_ack_pending: 1,
                }
              }
            },
            maxInFlight: 10,
            maxRetries: 3,
            deadLettering: {
              enabled: false,
              queueName: 'DEAD_LETTER_REG',
            }
          }
        }
      })
    ]
  });
  
  
  await broker.createService({
    name: 'sub',
    'streamOne': {
      'streamOneTopic.*': {
        group: 'other',
        async handler (payload) {
          console.log(`Processing streamOneTopic: ${JSON.stringify(payload)}}`);
        }
      }
    },
  });
  
  await broker.start().delay(2000);

  const msg = {
    id: 1,
    name: 'John',
    age: 25
  };

  await broker.sendToStreamOneChannel('streamOneTopic.abc', msg);
  await broker.Promise.delay(200);
})();

Context

Please provide any relevant information about your setup. This is important in case the issue is not reproducible except for under certain conditions.

  • Moleculer version: 0.14.27
  • NodeJS version: 16.14
  • Operating System: Mac OS Monterey

Failure Logs

> INFO {"server_id":"NDPRE7K73FVFP6EQFU5YBDZCPRRB3ZREJKQJ734ARDWUJ5PUTIZUIYQP","server_name":"hub-server","version":"2.9.10","proto":1,"git_commit":"4caf6aa","go":"go1.19.4","host":"0.0.0.0","port":4411,"headers":true,"auth_required":true,"max_payload":1048576,"jetstream":true,"client_id":4709,"client_ip":"172.19.0.1","domain":"hub"} ␍␊
< CONNECT {"protocol":1,"version":"2.9.0","lang":"nats.js","verbose":false,"pedantic":false,"user":"acc","pass":"acc","headers":true,"no_responders":true}␍␊
< PING␍␊
> PONG␍␊
< SUB _INBOX.M4LCVGG6YT2SJ9E3EC0RZC.* 1␍␊PUB $JS.API.INFO _INBOX.M4LCVGG6YT2SJ9E3EC0RZC.M4LCVGG6YT2SJ9E3EC0RXR 0␍␊␍␊
> MSG _INBOX.M4LCVGG6YT2SJ9E3EC0RZC.M4LCVGG6YT2SJ9E3EC0RXR 1 349␍␊{"type":"io.nats.jetstream.api.v1.account_info_response","memory":0,"storage":371561,"streams":33,"consumers":10,"limits":{"max_memory":-1,"max_storage":-1,"max_streams":-1,"max_consumers":-1,"max_ack_pending":-1,"memory_max_stream_bytes":-1,"storage_max_stream_bytes":-1,"max_bytes_required":false},"domain":"hub","api":{"total":8982,"errors":156}}␍␊
< PUB $JS.API.STREAM.NAMES _INBOX.M4LCVGG6YT2SJ9E3EC0RZC.M4LCVGG6YT2SJ9E3EC0S0X 30␍␊{"subject":"streamOneTopic.*"}␍␊
> MSG _INBOX.M4LCVGG6YT2SJ9E3EC0RZC.M4LCVGG6YT2SJ9E3EC0S0X 1 145␍␊{"type":"io.nats.jetstream.api.v1.stream_names_response","total":2,"offset":0,"limit":1024,"streams":["streamOneTopic_abc","streamOneTopic_xyz"]}␍␊
/Users/dwayne/Documents/conscia-dev/mesh-server/node_modules/nats/lib/nats-base-client/jsbaseclient_api.js:80
                throw new Error("no stream matches subject");
                      ^

Error: no stream matches subject
    at JetStreamClientImpl.<anonymous> (/Users/dwayne/Documents/conscia-dev/mesh-server/node_modules/nats/lib/nats-base-client/jsbaseclient_api.js:80:23)
    at Generator.next (<anonymous>)
    at fulfilled (/Users/dwayne/Documents/conscia-dev/mesh-server/node_modules/nats/lib/nats-base-client/jsbaseclient_api.js:19:58)
    at processTicksAndRejections (node:internal/process/task_queues:96:5)

Context should have channelName

Prerequisites

Please answer the following questions for yourself before submitting an issue.

  • [X ] I am running the latest version
  • [ X] I checked the documentation and found no answer
  • [X ] I checked to make sure that this issue has not already been filed
  • [X ] I'm reporting the issue to the correct repository

Current Behavior

When context is used, the channelName should be embedded in the context, similar to to eventName

Expected Behavior

No reference to channelName in context

Documentation fails to mention required payload type with externally-published messages

Prerequisites

Please answer the following questions for yourself before submitting an issue.

  • I am running the latest version
  • I checked the documentation and found no answer
  • I checked to make sure that this issue has not already been filed
  • I'm reporting the issue to the correct repository

Current Behavior

Publishing messages from within the moleculer actions/methods works fine. For example:

Publishing messages with objects in payload ✅

await ctx.broker.sendToChannel('mystream.mysubject',  {'abc': 123})

Publishing messages with strings in payload ✅

await ctx.broker.sendToChannel('mystream.mysubject', 'TEST');

This behaviour changes when publishing messages externally (e.g. using NATS CLI, or from a python script connected to the same NATS instance).

Publishing messages with objects in payload ✅

# Python
await jetstream.publish('mystream.mysubject', str.encode('{"abc": 123}'))

Publishing messages with strings in payload 🚫

# Python
await jetstream.publish('mystream.mysubject', str.encode('TEST'))

This produces the following error:

ERROR: Message redelivered too many times (3). Drop message... 1

Needless to say, it took me a while to pinpoint this issue to the payload, rather than some problems with invalid delivery of messages. This plugin seems to be rejecting the messages / nack'ing them without any information that the messages were received in the first place.

Expected Behavior

Publishing string-based messages (or other types for that matter) works fine. Alternatively, there should be some mention in the documentation on the limitations of how this integrates with externally-published messages.

Group redis specific chan options

Before:

channels: {
    "test.failed_messages.topic": {
        group: "mygroup",
        maxRetries: 2,
        minIdleTime: 50,
        claimInterval: 50,
        processingAttemptsInterval: 10,
    }
}

After:

channels: {
    "test.failed_messages.topic": {
        group: "mygroup",
        maxRetries: 2,
        redis: {
            minIdleTime: 50,
            claimInterval: 50,
            processingAttemptsInterval: 10,
        }
    }
}

sendToChannel middleware hook is not firing

Prerequisites

Please answer the following questions for yourself before submitting an issue.

  • [X ] I am running the latest version
  • [X ] I checked the documentation and found no answer
  • [X ] I checked to make sure that this issue has not already been filed
  • [X ] I'm reporting the issue to the correct repository

Current Behavior

I have a middleware where I have both localChannel and sesndToChannel hooks. The localChannel hook is firing, but the sendToChannel hook is not.

Expected Behavior

sendToChannelHook should fire.

Steps to Reproduce

Please provide detailed steps for reproducing the issue.

Create middleware with localChannel and sendToChannel hooks. The middleware is dynamically loaded after broker starts with broker.middlewares.add()

Reproduce code snippet

    localChannel(next, chan) {
        return async (msg, raw) => {
            this.logger.info(kleur.magenta(`  Before localChannel for '${chan.name}'`), msg);
            await next(msg, raw);
            this.logger.info(kleur.magenta(`  After localChannel for '${chan.name}'`), msg);
        };
    },

    // Wrap the `broker.sendToChannel` method
    sendToChannel(next) {
        return async (channelName, payload, opts) => {
            this.logger.info(kleur.yellow(`Before sendToChannel for '${channelName}'`), payload);
            await next(channelName, payload, opts);
            this.logger.info(kleur.yellow(`After sendToChannel for '${channelName}'`), payload);
        };
    }

Context

Please provide any relevant information about your setup. This is important in case the issue is not reproducible except for under certain conditions.

  • Moleculer version: 0.14.29
  • NodeJS version: 18.13.0
  • Operating System: OS X Ventura

Failure Logs

none

emitlLocalChannelHandler error when channel is defined with options

Prerequisites

Please answer the following questions for yourself before submitting an issue.

  • [ X] I am running the latest version
  • [ X] I checked the documentation and found no answer
  • [ X] I checked to make sure that this issue has not already been filed
  • [ X] I'm reporting the issue to the correct repository

Current Behavior

When emitlLocalChannelHandler is called on a service with a channel that is defined as an options object, such as:

channels: {
"payment.processed": {
            // Using custom consumer-group
            group: "other",
            handler: async (payload) {
                // Do something with the payload
                // You should throw error if you want to NACK the message processing.
            }
        }
}

then the emitlLocalChannelHandler call fails with:

TypeError: svc.schema[mwOpts.schemaProperty][channelName].call is not a function

If the channel is defined as just a function, such as:

    channels: {
        async "order.created"(payload) {
            return payload
        }
    }

then the emitlLocalChannelHandler call works.

Expected Behavior

emitLocalChannelHandler should work when channel is defined as an options object.

Failure Information

TypeError: svc.schema[mwOpts.schemaProperty][channelName].call is not a function

See #34 (comment)

How to dynamically add middleware within code?

I want to add this middleware dynamically instead of defining it in the config. I usually add all my middleware with

broker.middlewares.add(...);

However, this does not work with channels middleware. This is what I tried:

const ChannelMiddleware = require('@moleculer/channels).Middleware;

broker.middlewares.add(ChannelMiddleware({adapter: 'Fake'}));

However, this doesn't work. I get runtime errors.

What's the correct way to do it?

Feature Request: Batching by Time Window or Event Count

Would it be feasible to add a feature that allows to buffer/queue incoming events for a period of time and/or a maximum event count before firing the channel handler? There's a similar feature in AWS lambda and it would be very useful in building out data pipelines where bulk data operations can more efficiently insert/upsert data. If i'm overthinking this and there's an easier workaround or solution, please let me know.

https://aws.amazon.com/about-aws/whats-new/2020/11/aws-lambda-now-supports-batch-windows-of-up-to-5-minutes-for-functions/

not found sendToChannel in ServiceBroker

not exist method sendToChannel in ServiceBroker

https://github.com/moleculerjs/moleculer-channels/blob/master/examples/simple/index.js#L120

Reproduce code snippet

"use strict";

const { ServiceBroker } = require("moleculer");
const { MoleculerError } = require("moleculer").Errors;

let c = 1;

// Create broker
const serviceBroker = new ServiceBroker({
	logLevel: {
		CHANNELS: "debug",
		"**": "info"
	},
	replCommands: [
		{
			command: "publish",
			alias: ["p"],
			async action(broker, args) {
				const { options } = args;
				//console.log(options);
				await broker.sendToChannel(
					"payment.processed",
					{
						id: 2,
						name: "Jane Doe",
						status: false,
						count: ++c,
						pid: process.pid
					}
				);
			}
		}
	]
});

serviceBroker.createService({
    name: "payments",

    actions: {
        /*...*/
    },

    channels: {
        "payment.processed": {
            async handler(payload) {
                console.log(`Paylod is ${JSON.stringify(payload)}`);
                // Do something with the payload
                // You should throw error if you want to NACK the message processing.
            }
        }
    },

    methods: {
        /*...*/
    }
});

serviceBroker
	.start()
	.then(async () => {
		serviceBroker.repl();
 
        console.log("Publish 'payment.processed' message...");
        
        await serviceBroker.sendToChannel(
            "payment.processed",
            {
                id: 2,
                name: "Jane Doe",
                status: false,
                count: ++c,
                pid: process.pid
            }
        );
        await Promise.delay(5000);
    })
    .catch(err => {
		serviceBroker.logger.error(err);
		serviceBroker.stop();
	});

Context

Please provide any relevant information about your setup. This is important in case the issue is not reproducible except for under certain conditions.

  • Moleculer version: 0.14.18
  • NodeJS version: 16.11.1
  • Operating System: Linux

Failure Logs

TypeError: serviceBroker.sendToChannel is not a function
    at /home/alexandre/opnsource/artigos/moleculer/channels/service.js:65:29

amqlib version warning

There is a warning:
The installed amqplib library is not supported officially. Proper functionality cannot be guaranteed. Supported versions: ^0.8.0 || ^0.9.0

The package.json file refers to "amqplib": "^0.10.3"

So there is always a warning. Is this a bug and the amqlib only works on 0.9 or does the warning in amqp.js needs to be fixed?
this.checkClientLibVersion("amqplib", "^0.8.0 || ^0.9.0");

Thanks, appreciate the fine work.

Ro

Fix serialization issue in Redis adapter

Redis adapter can't decode the MsgPack serialized messages. I've checked, the Buffer suffix works with all ioredis methods,e.g xreadgroupBuffer. Please note, the id will be a buffer, as well.

moleculer-channels redis adapter How to clear messages?

moleculer-channels
redis adapter, After processing the message for a period of time, The increase in Redis data leads to an increase in server memory,

How to set up an automatic message recycling mechanism to reduce memory usage

I did not find any relevant configuration instructions in the document

Implement headers in Redis adapter

We should implement header support in Redis adapter as well. After it, all adapters support it and we can use it to send moleculer specific properties like tracing IDs. We can store it as serialized string after the payload:

			const id = await clientPub.xadd(
				channelName, // Stream name
				"*", // Auto ID
				"payload", // Entry
				opts.raw ? payload : this.serializer.serialize(payload) // Actual payload
				"headers",
				this.serializer.serialize(headers)
			);

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.