I have some trouble with disconnecting multiple consumers I have started with different groupId's.
Hopefully you can help me.
This is my code for disconnecting...
/**
* Service stopped lifecycle event handler
*/
async stopped() {
await Promise.all(this.subscriptions.map(async (subscription) => {
try {
await subscription.consumer.pause( [ { topic: subscription.topic } ]);
this.logger.info(`Consumer for subscription ${subscription.id} paused`);
} catch(err) {
this.logger.warn(`Pause consumer for subscription ${subscription.id} failed`);
}
Promise.resolve();
}));
await Promise.all(this.subscriptions.map(async (subscription) => {
try {
await subscription.consumer.disconnect();
this.logger.info(`Consumer for subscription ${subscription.id} disconnected`);
} catch(err) {
this.logger.warn(`Disconnecting consumer for subscription ${subscription.id} failed`);
}
Promise.resolve();
}));
this.logger.info(`All consumers disconnected`);
}
`
I'm getting the following log:
`
console.log lib\flow.subscriber.js:139
ConsumerGroup INFO Pausing fetching from 1 topics { timestamp: '2018-05-29T19:10:29.954Z',
logger: 'kafkajs',
message: 'Pausing fetching from 1 topics',
topics: [ 'events' ] }
console.log lib\flow.subscriber.js:139
ConsumerGroup INFO Pausing fetching from 1 topics { timestamp: '2018-05-29T19:10:29.959Z',
logger: 'kafkajs',
message: 'Pausing fetching from 1 topics',
topics: [ 'events' ] }
console.log lib\flow.subscriber.js:139
Runner DEBUG stop consumer group { timestamp: '2018-05-29T19:10:29.965Z',
logger: 'kafkajs',
message: 'stop consumer group',
groupId: 'f0c54a7b-1cbd-4248-8471-ae788c3566db',
memberId: 'flow.subscriber1527621028637-9bef2c45-b84f-40f5-b670-c8b687240b51' }
console.log lib\flow.subscriber.js:139
Runner DEBUG stop consumer group { timestamp: '2018-05-29T19:10:29.981Z',
logger: 'kafkajs',
message: 'stop consumer group',
groupId: '6bb861e2-e250-4f79-af08-3e9acecd2d45',
memberId: 'flow.subscriber1527621028637-80fc7d53-6b5f-471c-b9de-504d750e8804' }
console.log lib\flow.subscriber.js:139
Connection DEBUG Request LeaveGroup(key: 13, version: 0) { timestamp: '2018-05-29T19:10:29.983Z',
logger: 'kafkajs',
message: 'Request LeaveGroup(key: 13, version: 0)',
broker: '192.168.2.124:9092',
clientId: 'flow.subscriber1527621028637',
correlationId: 12,
size: 147 }
console.log lib\flow.subscriber.js:139
Connection DEBUG Request LeaveGroup(key: 13, version: 0) { timestamp: '2018-05-29T19:10:29.985Z',
logger: 'kafkajs',
message: 'Request LeaveGroup(key: 13, version: 0)',
broker: '192.168.2.124:9092',
clientId: 'flow.subscriber1527621028637',
correlationId: 12,
size: 147 }
console.info node_modules\moleculer\src\logger.js:112
[2018-05-29T19:10:29.987Z] INFO nbtpt510-al-7796/FLOW.PUBLISHER: Producer disconnectied
console.log lib\flow.subscriber.js:139
Connection DEBUG Response Fetch(key: 1, version: 2) { timestamp: '2018-05-29T19:10:34.452Z',
logger: 'kafkajs',
message: 'Response Fetch(key: 1, version: 2)',
broker: '192.168.2.124:9092',
clientId: 'flow.subscriber1527621028637',
correlationId: 11,
size: 42,
data: '[filtered]' }
console.log lib\flow.subscriber.js:139
Connection DEBUG Request Heartbeat(key: 12, version: 0) { timestamp: '2018-05-29T19:10:34.481Z',
logger: 'kafkajs',
message: 'Request Heartbeat(key: 12, version: 0)',
broker: '192.168.2.124:9092',
clientId: 'flow.subscriber1527621028637',
correlationId: 13,
size: 151 }
console.log lib\flow.subscriber.js:139
Connection DEBUG Response LeaveGroup(key: 13, version: 0) { timestamp: '2018-05-29T19:10:34.487Z',
logger: 'kafkajs',
message: 'Response LeaveGroup(key: 13, version: 0)',
broker: '192.168.2.124:9092',
clientId: 'flow.subscriber1527621028637',
correlationId: 12,
size: 6,
data: { errorCode: 0 } }
console.log lib\flow.subscriber.js:139
Consumer DEBUG consumer has stopped, disconnecting { timestamp: '2018-05-29T19:10:34.491Z',
logger: 'kafkajs',
message: 'consumer has stopped, disconnecting',
groupId: 'f0c54a7b-1cbd-4248-8471-ae788c3566db' }
console.log lib\flow.subscriber.js:139
Connection DEBUG disconnecting... { timestamp: '2018-05-29T19:10:34.495Z',
logger: 'kafkajs',
message: 'disconnecting...',
broker: '192.168.2.124:9092',
clientId: 'flow.subscriber1527621028637' }
console.log lib\flow.subscriber.js:139
Connection DEBUG disconnected { timestamp: '2018-05-29T19:10:34.499Z',
logger: 'kafkajs',
message: 'disconnected',
broker: '192.168.2.124:9092',
clientId: 'flow.subscriber1527621028637' }
console.log lib\flow.subscriber.js:139
Consumer INFO Stopped { timestamp: '2018-05-29T19:10:34.500Z',
logger: 'kafkajs',
message: 'Stopped',
groupId: 'f0c54a7b-1cbd-4248-8471-ae788c3566db' }
console.info node_modules\moleculer\src\logger.js:112
[2018-05-29T19:10:34.502Z] INFO nbtpt510-al-7796/FLOW.SUBSCRIBER: Consumer for subscription f0c54a7b-1cbd-4248-8471-ae788c3566db disconnected
console.log lib\flow.subscriber.js:139
Connection DEBUG Response Fetch(key: 1, version: 2) { timestamp: '2018-05-29T19:10:34.503Z',
logger: 'kafkajs',
message: 'Response Fetch(key: 1, version: 2)',
broker: '192.168.2.124:9092',
clientId: 'flow.subscriber1527621028637',
correlationId: 11,
size: 42,
data: '[filtered]' }
console.log lib\flow.subscriber.js:139
Connection DEBUG Request Heartbeat(key: 12, version: 0) { timestamp: '2018-05-29T19:10:34.505Z',
logger: 'kafkajs',
message: 'Request Heartbeat(key: 12, version: 0)',
broker: '192.168.2.124:9092',
clientId: 'flow.subscriber1527621028637',
correlationId: 13,
size: 151 }
`
now somthing went wrong...
`
console.log lib\flow.subscriber.js:139
Connection ERROR Response Heartbeat(key: 12, version: 0) { timestamp: '2018-05-29T19:10:34.508Z',
logger: 'kafkajs',
message: 'Response Heartbeat(key: 12, version: 0)',
broker: '192.168.2.124:9092',
clientId: 'flow.subscriber1527621028637',
error: 'The coordinator is not aware of this member',
correlationId: 13,
size: 6 }
console.log lib\flow.subscriber.js:139
Connection DEBUG Response Heartbeat(key: 12, version: 0) { timestamp: '2018-05-29T19:10:34.511Z',
logger: 'kafkajs',
message: 'Response Heartbeat(key: 12, version: 0)',
broker: '192.168.2.124:9092',
clientId: 'flow.subscriber1527621028637',
error: 'The coordinator is not aware of this member',
correlationId: 13,
payload: <Buffer 00 19> }
console.log lib\flow.subscriber.js:139
Runner ERROR The coordinator is not aware of this member, re-joining the group { timestamp: '2018-05-29T19:10:34.515Z',
logger: 'kafkajs',
message: 'The coordinator is not aware of this member, re-joining the group',
groupId: 'f0c54a7b-1cbd-4248-8471-ae788c3566db',
memberId: 'flow.subscriber1527621028637-9bef2c45-b84f-40f5-b670-c8b687240b51',
error: 'The coordinator is not aware of this member',
retryCount: 0,
retryTime: 51 }
`