moleculerjs / moleculer-channels Goto Github PK
View Code? Open in Web Editor NEWReliable messages for Moleculer services via external queue/channel/topic.
License: MIT License
Reliable messages for Moleculer services via external queue/channel/topic.
License: MIT License
Before:
channels: {
"test.failed_messages.topic": {
group: "mygroup",
maxRetries: 2,
minIdleTime: 50,
claimInterval: 50,
processingAttemptsInterval: 10,
}
}
After:
channels: {
"test.failed_messages.topic": {
group: "mygroup",
maxRetries: 2,
redis: {
minIdleTime: 50,
claimInterval: 50,
processingAttemptsInterval: 10,
}
}
}
Please answer the following questions for yourself before submitting an issue.
I try to tsc my project using @moleculer/channels but it returns me two errors:
@moleculer/channels/types/src/adapters/base.d.ts:120:35 - error TS2304: Cannot find name 'Service'.
@moleculer/channels/types/src/index.d.ts:30:1 - error TS2309: An export assignment cannot be used in a module with other exported elements.
The first error coming from version 0.1.3
I can compile my project.
Please provide detailed steps for reproducing the issue.
Proposing to change export = _exports
to export default _exports
in types/src/index.d.ts.
And add import of type Service in types/src/adapters/base.d.ts (in subscription function declaration)
Please answer the following questions for yourself before submitting an issue.
When emitlLocalChannelHandler
is called on a service with a channel that is defined as an options object, such as:
channels: {
"payment.processed": {
// Using custom consumer-group
group: "other",
handler: async (payload) {
// Do something with the payload
// You should throw error if you want to NACK the message processing.
}
}
}
then the emitlLocalChannelHandler
call fails with:
TypeError: svc.schema[mwOpts.schemaProperty][channelName].call is not a function
If the channel is defined as just a function, such as:
channels: {
async "order.created"(payload) {
return payload
}
}
then the emitlLocalChannelHandler
call works.
emitLocalChannelHandler
should work when channel is defined as an options object.
TypeError: svc.schema[mwOpts.schemaProperty][channelName].call is not a function
See #34 (comment)
Currently, we use Moleculer channel to pub/sub event between our micro service and we use Redis to stream it. But It does not have any method or something like this to trim stream data, it makes our Redis memory large time by time
Do we have any method to handle it? Or we must to do it by ourselves?
Make by analogy with the action/event in the moleculer so that params (payload) are inside the Context
.
Thus, we solve the problem of event tracing with internal calls in handlers.
And we bring everything to a single view for all property transfer adapters inside the Context
.
I understand this will cause breaking changes, but this is WIP. I don’t see anything critical for the project in this.
The built-in moleculerjs events functionality supports wildcards. Is it possible to get this functionality in moleculer channels?
Use of wildcards (?, *, **) is available in event names.
@ https://moleculer.services/docs/0.14/events.html
We should implement header support in Redis adapter as well. After it, all adapters support it and we can use it to send moleculer specific properties like tracing IDs. We can store it as serialized string after the payload
:
const id = await clientPub.xadd(
channelName, // Stream name
"*", // Auto ID
"payload", // Entry
opts.raw ? payload : this.serializer.serialize(payload) // Actual payload
"headers",
this.serializer.serialize(headers)
);
After #20, the following adapter settings don't work well with Redis. The default adapter options are losses.
middlewares: [
ChannelsMiddleware({
adapter: {
type: "Redis",
options: {
redis: "localhost:6379"
}
}
})
],
Please answer the following questions for yourself before submitting an issue.
Specifying the durable_name
consumer option for NATS is not possible as it gets overwritten with the name of the service. This is important, as defining ephemeral streams in NATS requires that this option be set to null
.
The durable_name
consumer option can be set and does not get overridden. Perhaps if not specified, the option can default to the name of the service.
Please provide detailed steps for reproducing the issue.
durable_name
option in the middleware constructionChannelsMiddleware({
adapter: {
type: "NATS",
options: {
nats: {
consumerOptions: {
config: {
durable_name: "TESTING123"
}
}
},
}
}
}),
./src/adapters/nats.js:212
to print the consumer optionsE.g.
...
consumerOpts.queue = streamName;
consumerOpts.config.deliver_group = streamName;
// NATS Stream name does not support: spaces, tabs, period (.), greater than (>) or asterisk (*) are prohibited.
// More info: https://docs.nats.io/jetstream/administration/naming
consumerOpts.config.durable_name = chan.group.split(".").join("_");
consumerOpts.config.deliver_subject = chan.id;
consumerOpts.config.max_ack_pending = chan.maxInFlight;
consumerOpts.callbackFn = this.createConsumerHandler(chan);
console.log("OPTS", consumerOpts) // PRINT THE CONSUMER OPTIONS
// 3. Create a subscription
try {
const sub = await this.client.subscribe(chan.name, consumerOpts);
this.subscriptions.set(chan.id, sub);
} catch (err) {
...
Please provide any relevant information about your setup. This is important in case the issue is not reproducible except for under certain conditions.
Thanks in advance!
Isaac
Add following metrics:
moleculer.channels.messages.sent
with labels: channel
_sent messages via sendToChannel
moleculer.channels.messages.total
with labels: channel, group
_total processed messages in channel handlers`moleculer.channels.messages.active
with labels: channel, group
_total active processing messages in channel handlers (gauge)`moleculer.channels.messages.time
with labels: channel, group
_histogram for processing time of channel handlers`moleculer.channels.messages.errors.total
with labels: channel, group
_total errored messages in channel handlers`moleculer.channels.messages.retries.total
with labels: channel, group
_total retried messages in channel handlers`moleculer.channels.messages.deadLettering.total
with labels: channel, group
_total dead-lettered messages in channel handlers`Implement a Kafka adapter with kafkajs
Middleware hooks as sendToChannel
and localChannel
Please answer the following questions for yourself before submitting an issue.
The adapter currently ignores any specified subjects in options.nats.streamConfig.subjects
. Instead, it will always use [nameOfStream]
as the subject. This occurs here: https://github.com/moleculerjs/moleculer-channels/blob/master/src/adapters/nats.js#L182 . The adapter is trying to create a stream for each subscription it finds.
{
name: 'sub',
'streamOne': {
'streamOneTopic.abc': {
group: 'other',
async handler (payload) {
console.log(`Processing streamOneTopic: ${JSON.stringify(payload)}}`);
}
},
'streamOneTopic.xyz': {
group: 'other',
async handler (payload) {
console.log(`Processing streamOneTopic: ${JSON.stringify(payload)}}`);
}
}
},
}
The above will work. We end up with two streams: streamOneTopic_abc
(using subject=streamOneTopic.abc
) and streamOneTopic_xyz
(using subject=streamOneTopic.xyz
).
But when you take the code snippet below (that listens for streamOneTopic.*
, it fails with Error: no stream matches subject
, even thought in though I specified options.nats.streamConfig.subjects = ["streamOneTopic.*"]
.
At a high level, the following is how the NATS adapter should operate:
options.nats.streamConfig
and create one stream using the options specified there. (e.g. name="streamOne"
and subjects=["streamOneTopic.*"]
streamOneTopic.*
should create a consumer where filter_subject="streamOneTopic.*"
.The current logic is "almost" there. My team uses NATS (and we sponsor Moleculer) and we're happy to battle test this adapter further.
const { ServiceBroker } = require('moleculer');
const ChannelsMiddleware = require('@moleculer/channels').Middleware;
(async () => {
const broker = new ServiceBroker({
nodeID: 'channelTest',
logger: false,
logLevel: 'debug',
middlewares: [
ChannelsMiddleware({
schemaProperty: 'streamOne',
adapterPropertyName: 'streamOneAdapter',
sendMethodName: 'sendToStreamOneChannel',
channelHandlerTrigger: 'emitStreamOneLocalChannelHandler',
adapter: {
type: 'NATS',
options: {
nats: {
url: process.env.NATS_SERVER,
connectionOptions: {
debug: true,
user: process.env.NATS_USER,
pass: process.env.NATS_PASSWORD,
},
streamConfig: {
name: 'streamOne',
subjects: ['streamOneTopic.*'],
},
consumerOptions: {
config: {
deliver_policy: 'new',
ack_policy: 'explicit',
max_ack_pending: 1,
}
}
},
maxInFlight: 10,
maxRetries: 3,
deadLettering: {
enabled: false,
queueName: 'DEAD_LETTER_REG',
}
}
}
})
]
});
await broker.createService({
name: 'sub',
'streamOne': {
'streamOneTopic.*': {
group: 'other',
async handler (payload) {
console.log(`Processing streamOneTopic: ${JSON.stringify(payload)}}`);
}
}
},
});
await broker.start().delay(2000);
const msg = {
id: 1,
name: 'John',
age: 25
};
await broker.sendToStreamOneChannel('streamOneTopic.abc', msg);
await broker.Promise.delay(200);
})();
Please provide any relevant information about your setup. This is important in case the issue is not reproducible except for under certain conditions.
> INFO {"server_id":"NDPRE7K73FVFP6EQFU5YBDZCPRRB3ZREJKQJ734ARDWUJ5PUTIZUIYQP","server_name":"hub-server","version":"2.9.10","proto":1,"git_commit":"4caf6aa","go":"go1.19.4","host":"0.0.0.0","port":4411,"headers":true,"auth_required":true,"max_payload":1048576,"jetstream":true,"client_id":4709,"client_ip":"172.19.0.1","domain":"hub"} ␍␊
< CONNECT {"protocol":1,"version":"2.9.0","lang":"nats.js","verbose":false,"pedantic":false,"user":"acc","pass":"acc","headers":true,"no_responders":true}␍␊
< PING␍␊
> PONG␍␊
< SUB _INBOX.M4LCVGG6YT2SJ9E3EC0RZC.* 1␍␊PUB $JS.API.INFO _INBOX.M4LCVGG6YT2SJ9E3EC0RZC.M4LCVGG6YT2SJ9E3EC0RXR 0␍␊␍␊
> MSG _INBOX.M4LCVGG6YT2SJ9E3EC0RZC.M4LCVGG6YT2SJ9E3EC0RXR 1 349␍␊{"type":"io.nats.jetstream.api.v1.account_info_response","memory":0,"storage":371561,"streams":33,"consumers":10,"limits":{"max_memory":-1,"max_storage":-1,"max_streams":-1,"max_consumers":-1,"max_ack_pending":-1,"memory_max_stream_bytes":-1,"storage_max_stream_bytes":-1,"max_bytes_required":false},"domain":"hub","api":{"total":8982,"errors":156}}␍␊
< PUB $JS.API.STREAM.NAMES _INBOX.M4LCVGG6YT2SJ9E3EC0RZC.M4LCVGG6YT2SJ9E3EC0S0X 30␍␊{"subject":"streamOneTopic.*"}␍␊
> MSG _INBOX.M4LCVGG6YT2SJ9E3EC0RZC.M4LCVGG6YT2SJ9E3EC0S0X 1 145␍␊{"type":"io.nats.jetstream.api.v1.stream_names_response","total":2,"offset":0,"limit":1024,"streams":["streamOneTopic_abc","streamOneTopic_xyz"]}␍␊
/Users/dwayne/Documents/conscia-dev/mesh-server/node_modules/nats/lib/nats-base-client/jsbaseclient_api.js:80
throw new Error("no stream matches subject");
^
Error: no stream matches subject
at JetStreamClientImpl.<anonymous> (/Users/dwayne/Documents/conscia-dev/mesh-server/node_modules/nats/lib/nats-base-client/jsbaseclient_api.js:80:23)
at Generator.next (<anonymous>)
at fulfilled (/Users/dwayne/Documents/conscia-dev/mesh-server/node_modules/nats/lib/nats-base-client/jsbaseclient_api.js:19:58)
at processTicksAndRejections (node:internal/process/task_queues:96:5)
Please answer the following questions for yourself before submitting an issue.
Specifying sendMethodName
in the options is not working. There is no method with the specified name found. Instead, the method is still called the default sendToChannel
.
The broker should have a method with the name specified in the sendMethodName
option.
Error is:
broker.sendChannelMessage is not a function
Set up middleware:
const mw = ChannelMiddleware({
adapter: {
type: 'Fake',
sendMethodName: 'sendChannelMessage'
}
});
Send a message:
await broker.sendChannelMessage('typedService.channel-event-1');
Please provide any relevant information about your setup. This is important in case the issue is not reproducible except for under certain conditions.
broker.sendChannelMessage is not a function
Redis adapter can't decode the MsgPack serialized messages. I've checked, the Buffer
suffix works with all ioredis
methods,e.g xreadgroupBuffer
. Please note, the id will be a buffer, as well.
Publish offline messages, simulate subscription exceptions, set to retry 100 times, and disconnect the subscription end when sending 10 times,
When the subscriber reconnects, the subsequent retry message cannot be received and the message is lost
There is a warning:
The installed amqplib library is not supported officially. Proper functionality cannot be guaranteed. Supported versions: ^0.8.0 || ^0.9.0
The package.json file refers to "amqplib": "^0.10.3"
So there is always a warning. Is this a bug and the amqlib only works on 0.9 or does the warning in amqp.js needs to be fixed?
this.checkClientLibVersion("amqplib", "^0.8.0 || ^0.9.0");
Thanks, appreciate the fine work.
Ro
We are using channels (Kafka message broker) and from time to time (pretty much at least once a day) without any indications warnings or errors the subscriber connection to Kafka topic gets dropped while other code in service-node (like actions handlers) keep working fine. We detect this by seeing that the messages in topics are not read by subscribers.
So far the only solution we’ve found is to restart the service-node. On fresh start the channels reconnect to Kafka topics and for a while everything works until again the connection to Kafka topic gets dropped.
Is there a mechanism to reconnect to the message broker (Kafka) when it (or whatever) drops the subscriber connection?
Or get some sort notification from the framework that we’re not connected anymore?
Thank you!
Please answer the following questions for yourself before submitting an issue.
I'm currently running a Redis cluster and noticed that defining the "cluster" parameter at the top level per the Readme was not connecting successfully. After looking at the source code, I noticed that the "cluster" property needs to be defined under the "redis" property.
Cluster configuration per instructions in Readme should result in successful connection to cluster.
This does NOT work:
ChannelsMiddleware({
adapter: {
type: "Redis",
options: {
cluster: { nodes: [ ... ]},
redis: {
consumerOptions: { .... }
}
}
}
This DOES works:
ChannelsMiddleware({
adapter: {
type: "Redis",
options: {
redis: {
cluster: { nodes: [ ... ]},
consumerOptions: { .... }
}
}
}
Please provide any relevant information about your setup. This is important in case the issue is not reproducible except for under certain conditions.
N/A
moleculer-channels
redis adapter, After processing the message for a period of time, The increase in Redis data leads to an increase in server memory,
How to set up an automatic message recycling mechanism to reduce memory usage
I did not find any relevant configuration instructions in the document
Please answer the following questions for yourself before submitting an issue.
I'm getting repeated errors when dead lettering is enabled:
Error while moving messages of test.handler-completed to test.channels-dead-letter
I'm not sure why dead lettering is even getting triggered but at the very least it should add messages successfully.
I'm using the redis adapter.
Test suites with every adapter:
maxInFligth: 1
maxInFligth: null
(no limit)maxInFligth: 1
maxInFligth: null
(no limit)Others:
Do we have benchmarks of sendToChannel
vs emit
? I see a huge performance penalty while running a load test. with emit
I can ingest and emit around 14K messages/sec, while with sendToChannel
, I can send about 300 messages/sec. I am using with context. This seems to be a very drastic difference...
Please answer the following questions for yourself before submitting an issue.
When a message is tried to be moved to the dead-letter exchange/queue, that delivery fails and the channel closes because the exchange does not exist.
The dead-letter exchange/queue should be asserted into existence so that the message can be moved into the queue.
mol $ [2023-03-22T16:54:03.534Z] WARN @txtsmarter/transformer-triloka-92/CHANNELS: AMQP message processing error in 'transformer.imessage' queue. Error: NOT ACCEPTED!
at TransformerService.transformer.imessage (/Users/ujwal/Work/txtsmarter/sw/repos/blackbird/packages/services/transformer/src/transformer.service.ts:67:11)
at TransformerService.<anonymous> (/Users/ujwal/Work/txtsmarter/sw/repos/blackbird/common/temp/node_modules/.pnpm/[email protected]_o6qzvejtzsnpaf7pu7pwc3mpbq/node_modules/moleculer/src/utils.js:212:22)
at checkAuth (/Users/ujwal/Work/txtsmarter/sw/repos/blackbird/packages/services/transformer/src/middlewares/moleculer.auth.middleware.ts:76:18)
at emitESEvent (/Users/ujwal/Work/txtsmarter/sw/repos/blackbird/packages/services/transformer/src/middlewares/moleculer.eventstore.middleware.ts:76:16)
at logMessage (/Users/ujwal/Work/txtsmarter/sw/repos/blackbird/packages/services/transformer/src/middlewares/moleculer.log.middleware.ts:168:16)
at wrappedHandler (/Users/ujwal/Work/txtsmarter/sw/repos/blackbird/common/temp/node_modules/.pnpm/github.com+moleculerjs+moleculer-channels@[email protected]/node_modules/@moleculer/channels/src/index.js:320:16)
at Object.chan.handler (/Users/ujwal/Work/txtsmarter/sw/repos/blackbird/common/temp/node_modules/.pnpm/github.com+moleculerjs+moleculer-channels@[email protected]/node_modules/@moleculer/channels/src/index.js:336:16)
at /Users/ujwal/Work/txtsmarter/sw/repos/blackbird/common/temp/node_modules/.pnpm/github.com+moleculerjs+moleculer-channels@[email protected]/node_modules/@moleculer/channels/src/adapters/amqp.js:365:16
at Channel.BaseChannel.dispatchMessage (/Users/ujwal/Work/txtsmarter/sw/repos/blackbird/common/temp/node_modules/.pnpm/[email protected]/node_modules/amqplib/lib/channel.js:483:12)
at Channel.BaseChannel.handleDelivery (/Users/ujwal/Work/txtsmarter/sw/repos/blackbird/common/temp/node_modules/.pnpm/[email protected]/node_modules/amqplib/lib/channel.js:492:15)
[2023-03-22T16:54:03.549Z] ERROR @txtsmarter/transformer-triloka-92/CHANNELS: AMQP channel error Error: Channel closed by server: 404 (NOT-FOUND) with message "NOT_FOUND - no exchange 'DEAD_LETTER' in vhost '/'"
at Channel.C.accept (/Users/ujwal/Work/txtsmarter/sw/repos/blackbird/common/temp/node_modules/.pnpm/[email protected]/node_modules/amqplib/lib/channel.js:421:17)
at Connection.mainAccept [as accept] (/Users/ujwal/Work/txtsmarter/sw/repos/blackbird/common/temp/node_modules/.pnpm/[email protected]/node_modules/amqplib/lib/connection.js:63:33)
at Socket.go (/Users/ujwal/Work/txtsmarter/sw/repos/blackbird/common/temp/node_modules/.pnpm/[email protected]/node_modules/amqplib/lib/connection.js:486:48)
at Socket.emit (node:events:513:28)
at Socket.emit (node:domain:489:12)
at emitReadable_ (node:internal/streams/readable:590:12)
at processTicksAndRejections (node:internal/process/task_queues:81:21) {
code: 404,
classId: 60,
methodId: 40
}
[2023-03-22T16:54:03.551Z] ERROR @txtsmarter/transformer-triloka-92/CHANNELS: AMQP channel closed.
Please provide detailed steps for reproducing the issue.
broker config:
maxRetries: 0,
deadLettering: {
enabled: true,
queueName: 'DEAD_LETTER',
exchangeName: 'DEAD_LETTER'
},
channel handler:
async 'transformer.imessage'(
ctx: CTX<IMessageParams>
): Promise<IMessageResponse> {
throw new Error('NOT ACCEPTED!');
What's even worse is the channel will be closed preventing any further publishing.
I will issue a PR for this soon.
Just like events have a emitLocalEventHandler to trigger the handler we should implement a emitLocalChannelHandler
for channels. This will simplify (unit/integration) tests
It would be nice to be able to pause/resume processing of messages without unsubscribing from the channel.
Please answer the following questions for yourself before submitting an issue.
When I'm running moleculer-channels and I got an error, timeout o whatever (an error how cause a reconnection to amqp) when moleculer-channels try to reconnects calls method resubscribeAllChannels. This method always return an error Already tracking active messages of channel XXXX which is located in the method initChannelActiveMessages.
No errors and reconnectins as usual.
I put a try/catch inside resubscribeAllChannels
async resubscribeAllChannels() {
this.logger.info("Resubscribing to all channels...");
for (const { chan } of Array.from(this.subscriptions.values())) {
try {
await this.subscribe(chan);
} catch (e) {
this.logger.error('[resubscribeAllChannels]', e)
}
}
}
Is necessary call the method resubscribeAllChannels, because I make some tests commenting this line and all works well.
Please provide any relevant information about your setup. This is important in case the issue is not reproducible except for under certain conditions.
Please answer the following questions for yourself before submitting an issue.
I have a middleware where I have both localChannel
and sesndToChannel
hooks. The localChannel
hook is firing, but the sendToChannel
hook is not.
sendToChannelHook
should fire.
Please provide detailed steps for reproducing the issue.
Create middleware with localChannel
and sendToChannel
hooks. The middleware is dynamically loaded after broker starts with broker.middlewares.add()
localChannel(next, chan) {
return async (msg, raw) => {
this.logger.info(kleur.magenta(` Before localChannel for '${chan.name}'`), msg);
await next(msg, raw);
this.logger.info(kleur.magenta(` After localChannel for '${chan.name}'`), msg);
};
},
// Wrap the `broker.sendToChannel` method
sendToChannel(next) {
return async (channelName, payload, opts) => {
this.logger.info(kleur.yellow(`Before sendToChannel for '${channelName}'`), payload);
await next(channelName, payload, opts);
this.logger.info(kleur.yellow(`After sendToChannel for '${channelName}'`), payload);
};
}
Please provide any relevant information about your setup. This is important in case the issue is not reproducible except for under certain conditions.
none
Before we push the failed message to the dead-letter topic, we should store the failed "x-original-group" name into the message header, similar to "x-original-channel" name.
Please answer the following questions for yourself before submitting an issue.
When context is used, the channelName should be embedded in the context, similar to to eventName
No reference to channelName in context
It would be great to have a grafana dashboard with stats created by the moleculer-channels.
Would it be feasible to add a feature that allows to buffer/queue incoming events for a period of time and/or a maximum event count before firing the channel handler? There's a similar feature in AWS lambda and it would be very useful in building out data pipelines where bulk data operations can more efficiently insert/upsert data. If i'm overthinking this and there's an easier workaround or solution, please let me know.
I want to add this middleware dynamically instead of defining it in the config. I usually add all my middleware with
broker.middlewares.add(...);
However, this does not work with channels middleware. This is what I tried:
const ChannelMiddleware = require('@moleculer/channels).Middleware;
broker.middlewares.add(ChannelMiddleware({adapter: 'Fake'}));
However, this doesn't work. I get runtime errors.
What's the correct way to do it?
Would be nice to be able to support creating capped streams using MAXLEN ~
when calling clientPub.xaddBuffer(...args) method.
https://redis.io/docs/data-types/streams-tutorial/#capped-streams
Please answer the following questions for yourself before submitting an issue.
I start a service with amqp-transport (rabbitmq) and moleculer-channels listen to a given topic. If the amqp-server connection
breaks than, molceuler-channels won't reconnect. Instead it always logs the following error-message:
[2022-12-02T13:22:49.483Z] INFO my-service/CHANNELS: Reconnecting...
[2022-12-02T13:22:49.535Z] INFO my-service/CHANNELS: AMQP is connected.
[2022-12-02T13:22:49.537Z] INFO my-service/CHANNELS: AMQP channel created.
[2022-12-02T13:22:49.537Z] INFO my-service/CHANNELS: Resubscribing to all channels...
[2022-12-02T13:22:49.540Z]
ERROR my-service/CHANNELS: Unable to connect AMQP server. MoleculerError: Already tracking active messages of channel my-service.my-service.something.done
at AmqpAdapter.initChannelActiveMessages (/home/me/public_html/moleculer-channels-amqp-test/node_modules/@moleculer/channels/src/adapters/base.js:163:10)
at AmqpAdapter.subscribe (/home/me/public_html/moleculer-channels-amqp-test/node_modules/@moleculer/channels/src/adapters/amqp.js:314:9)
at processTicksAndRejections (node:internal/process/task_queues:96:5)
at async AmqpAdapter.resubscribeAllChannels (/home/me/public_html/moleculer-channels-amqp-test/node_modules/@moleculer/channels/src/adapters/amqp.js:496:4)
at async AmqpAdapter.tryConnect (/home/me/public_html/moleculer-channels-amqp-test/node_modules/@moleculer/channels/src/adapters/amqp.js:215:4) {
code: 500,
type: undefined,
data: undefined,
retryable: false
}
As soon as the amqp-server is back online, the channels-module should reconnect to the server and start processing
the waiting messages.
I've created a minimal-repo here: https://github.com/me23/moleculer-channels-amqp-test.git
Please provide any relevant information about your setup. This is important in case the issue is not reproducible except for under certain conditions.
not exist method sendToChannel in ServiceBroker
https://github.com/moleculerjs/moleculer-channels/blob/master/examples/simple/index.js#L120
"use strict";
const { ServiceBroker } = require("moleculer");
const { MoleculerError } = require("moleculer").Errors;
let c = 1;
// Create broker
const serviceBroker = new ServiceBroker({
logLevel: {
CHANNELS: "debug",
"**": "info"
},
replCommands: [
{
command: "publish",
alias: ["p"],
async action(broker, args) {
const { options } = args;
//console.log(options);
await broker.sendToChannel(
"payment.processed",
{
id: 2,
name: "Jane Doe",
status: false,
count: ++c,
pid: process.pid
}
);
}
}
]
});
serviceBroker.createService({
name: "payments",
actions: {
/*...*/
},
channels: {
"payment.processed": {
async handler(payload) {
console.log(`Paylod is ${JSON.stringify(payload)}`);
// Do something with the payload
// You should throw error if you want to NACK the message processing.
}
}
},
methods: {
/*...*/
}
});
serviceBroker
.start()
.then(async () => {
serviceBroker.repl();
console.log("Publish 'payment.processed' message...");
await serviceBroker.sendToChannel(
"payment.processed",
{
id: 2,
name: "Jane Doe",
status: false,
count: ++c,
pid: process.pid
}
);
await Promise.delay(5000);
})
.catch(err => {
serviceBroker.logger.error(err);
serviceBroker.stop();
});
Please provide any relevant information about your setup. This is important in case the issue is not reproducible except for under certain conditions.
TypeError: serviceBroker.sendToChannel is not a function
at /home/alexandre/opnsource/artigos/moleculer/channels/service.js:65:29
Please answer the following questions for yourself before submitting an issue.
Starting with an fresh instance of rabbitmq and trigger sendToChannel from moleculer i got follow Error:
[2022-05-10T07:30:40.939Z] ERROR my-client/CHANNELS: AMQP channel error Error: Channel closed by server: 404 (NOT-FOUND) with message "NOT_FOUND - no exchange 'something.done' in vhost '/'"
and the client exits with unhandled Promise-rejection.
As soon if i start a listener for this channel (somthing.done) once, even if i stop the listener again, then it works like expected.
I don't know if its an bug or a feature, but i expected the channel to be created either if the first listener register or if the
first producer send some messages in.
My idea was to send some events with a ttl-value, so if nobody cares about it, it will be deleted.
As soon as one consumer came up, he can use the stored events.
no exchange
[2022-05-10T07:30:40.939Z] ERROR my-client/CHANNELS: AMQP channel error Error: Channel closed by server: 404 (NOT-FOUND) with message "NOT_FOUND - no exchange 'something.done' in vhost '/'"
at Channel.C.accept (/home/me/public_html/moleculer-channel-test/node_modules/amqplib/lib/channel.js:422:17)
at Connection.mainAccept [as accept] (/home/me/public_html/moleculer-channel-test/node_modules/amqplib/lib/connection.js:64:33)
at Socket.go (/home/me/public_html/moleculer-channel-test/node_modules/amqplib/lib/connection.js:478:48)
at Socket.emit (node:events:527:28)
at Socket.emit (node:domain:475:12)
at emitReadable_ (node:internal/streams/readable:601:12)
at processTicksAndRejections (node:internal/process/task_queues:82:21) {
code: 404,
classId: 60,
methodId: 40
}
[2022-05-10T07:30:40.941Z] ERROR my-client/CHANNELS: AMQP channel closed.
Please provide detailed steps for reproducing the issue.
Please provide any relevant information about your setup. This is important in case the issue is not reproducible except for under certain conditions.
Please answer the following questions for yourself before submitting an issue.
Publishing messages from within the moleculer actions/methods works fine. For example:
await ctx.broker.sendToChannel('mystream.mysubject', {'abc': 123})
await ctx.broker.sendToChannel('mystream.mysubject', 'TEST');
This behaviour changes when publishing messages externally (e.g. using NATS CLI, or from a python script connected to the same NATS instance).
# Python
await jetstream.publish('mystream.mysubject', str.encode('{"abc": 123}'))
# Python
await jetstream.publish('mystream.mysubject', str.encode('TEST'))
This produces the following error:
ERROR: Message redelivered too many times (3). Drop message... 1
Needless to say, it took me a while to pinpoint this issue to the payload, rather than some problems with invalid delivery of messages. This plugin seems to be rejecting the messages / nack'ing them without any information that the messages were received in the first place.
Publishing string-based messages (or other types for that matter) works fine. Alternatively, there should be some mention in the documentation on the limitations of how this integrates with externally-published messages.
Hi, thanks for this nice library.
Is there a reason that the type of exchange can not be changed as an option?
moleculer-channels/src/adapters/amqp.js
Line 289 in 024c251
I think channel can be defined in moleculer.config.js
likes this
module.export = {
// ...
channel: {
redis: {
// config
},
// more : {},
},
//...
}
because current way to use channel middleware, made it can not
overwrite broker’s deeply nested default options, which are not present in moleculer.config.js, via environment variables
https://moleculer.services/docs/0.14/runner.html#Configuration-loading-logic
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.