moscajs / aedes-persistence-redis Goto Github PK
View Code? Open in Web Editor NEWAedes persistence, backed by redis
License: MIT License
Aedes persistence, backed by redis
License: MIT License
I noticed that there quite some dated issues that are still open.
My suggestion would be to create a workflow action that automatically closes dated issues after a certain number of days of inactivity.
I use one myself based on githubs https://github.com/actions/stale action.
E.g.:
https://github.com/seriousme/fastify-openapi-glue/blob/master/.github/workflows/closeStale.yml
An example of this flow in action can be seen in: seriousme/fastify-openapi-glue#319
Hope this helps,
Hans
Changes made regarding #35 & moscajs/aedes-cached-persistence#5
new API aedes-persistence#outgoingEnqueue(subs, packet, cb)
with default implementation in aedes-cached-persistence (PR will be linked here) to let the persistence decide on the bulk subscriptions. Aedes will also be provided with a PR containing usage of new outgoingEnqueue instead of the old one removing parallel
as discussed in moscajs/aedes-cached-persistence#5
outgoingEnqueue will store the packet once
SET packet:brokerId:brokerCounter packetPayload
SET expected:brokerId:brokerCounter offlineSubsCount
and for each client subscription:
RPUSH outgoing:clientId packet:brokerId:brokerCounter
OutgoingUpdate is using an in-memory mapping of messageId -> packet key
and won't touch persistence for QOS=1, but for QOS=2 it will update client's offline packet list with pubrel
.
outgoingClearMessageId will fetch original packet, remove it's index from client's offline list, decrement expected:brokerId:brokerCounter offlineSubsCount
and if it counts zero, it also removes the original packet.
Only relaxation added to abstract persistence tests is:
qos=1 packets streamed from outgoingStream won't contain any messageId
and that won't hurt.
(PR link on aedes-persistence)
This way, we are not setting a clone of original packet for each client, and that's easy to handle QOS=1 with minimal overhead. for QOS=2 we are storing a pubrel
for each client.
This has helped a wildcard publish to a subcription list of a million clients to reduce from 63 seconds (switching parallel with fastq in current implementation) to 30 seconds (this PR implementation)
Redis memory greatly reduces for wildcard publishes (since we are not going to re-save the same packet for all the subscribers) and is now tons more scalable.
QOS=1 delivery flow (forward-puback) is touching Redis less
I haven't tested performance of QOS=2 delivery, however it should be better comparing redis commands used between PR and current.
Hi,
we use Aedes and aedes-persistance redis here https://github.com/mainflux/mainflux/blob/master/mqtt/mqtt.js and we seem to have problem when trying to publish more than 20 messages using mosquitto_pub using qos1 or qos2.
After 20 messages, we start getting messages like this one "Error: write EPIPE\n at WriteWrap.afterWrite [as oncomplete] (net.js:788:14)"
I guess connection gets closed somewhere, but not sure where and why exactly.
qos 0 works fine, probably because redis persistance is not used.
Can you please tell us if that may be the aedes-persistance-redis problem?
Thank you!
i am using nginx-plus as reverse proxy. here is the code
var aedes = require('aedes')({
mq: require('mqemitter-redis')(),
persistence: require('aedes-persistence-redis')({
port: 6379,
host: '127.0.0.1',
family: 4,
db: 0,
maxSessionDelivery: 100,
})
})
var server = require('net').createServer(aedes.handle);
var ws = require('websocket-stream');
var mqtt = require('mqtt');
var portAedes = 1883;
server.listen(portAedes, function () {
console.log('server listening on port', port)
});
var ports = [8000, 8001, 8002, 8003];
var servers = [];
ports.forEach(function(port) {
var httpServer = require('http').createServer();
ws.createServer({server: httpServer}, aedes.handle);// must for 'ws' stream
httpServer.listen(port, function () {
console.log('websocket server listening on port', port);
});
servers.push(httpServer);
});
setInterval(function(){
console.log('connected Clients: ', aedes.connectedClients);
}, 2000);
setInterval(function(){
for(var i=0;i<500;i++)
mqtt.connect('ws://localhost/broker/');
}, 5000);
/////////////////////////////////////////////////////////////////
here the number is not getting more than 2037.
Hi Mcollina,
I am facing issues in test cases since individual test case's data stored in redis is being used by the next case, causing test cases to fail.
I realised that in each test case instance.destroy() is being called but the destroy method just clears aedes-cached-persistence data and disconnects redis.
This issue gets solved if I used db.flushdb() in destroy() method. This will make each and ever test case independent of each other avoiding clashing in data.
Do you mind accepting PR.
System Information
Describe the bug
The outgoingUpdate
operation clears the packet's TTL which may result in a dangling entry in redis.
To Reproduce
Steps to reproduce the behavior:
outgoingUpdate
operation.Expected behavior
The packet should expire according to its TTL.
Additional context
PR with proposed fix.
When aedes-persistence-redis used as persistence with Aedes , client connections are very slow . For 20k client connections it was taking around 10 minutes.But without persistence configured,connections are very fast around 50 k connection in less than a minute.
Please find the code below used to run aedes server.
var redis = require('mqemitter-redis');
var aedesPersistenceRedis = require('aedes-persistence-redis');
var mq = redis({
port: 6379,
host: '172.31.38.96',
db: 12
});
var persistence = aedesPersistenceRedis({
port: 6379,
host: '172.31.38.96'
});
var aedes = require('aedes')({
mq:mq,
persistence:persistence
})
var server = require('net').createServer(aedes.handle)
var port = 1883
server.listen(port, function () {
console.log('server listening on port', port)
})
Hi, @mcollina @GavinDmello @behrad
redis is growing in the memory, now it's over 1.2GB(I've limited its size to 2GB, and set it to delete least recent used keys), but I have seen it grow even to 4GB and crash afterwards.
I tried to find out what's taking so much memory with
redis-cli --scan --pattern '*'
and saw this:
packet:rkpZMxyj-:11659005
packet:rkHVnY-s-:792969
packet:rkHVnY-s-:2327008:offlineCount
packet:rkpZMxyj-:10411304:offlineCount
packet:rkpZMxyj-:10518955
packet:rkHVnY-s-:1945435:offlineCount
packet:rkpZMxyj-:9699899:offlineCount
packet:rkHVnY-s-:1619395
packet:rkpZMxyj-:11104535:offlineCount
packet:rkHVnY-s-:88430
packet:rkpZMxyj-:11263721:offlineCount
packet:rkpZMxyj-:10992846:offlineCount
packet:rkHVnY-s-:1451292
packet:rkHVnY-s-:574558
packet:rkpZMxyj-:11723536
packet:rkHVnY-s-:2329539:offlineCount
packet:rkpZMxyj-:11196258
packet:rkHVnY-s-:2485588:offlineCount
packet:rkpZMxyj-:11762183:offlineCount
packet:rkpZMxyj-:11899677
packet:rkpZMxyj-:10020830:offlineCount
packet:rkHVnY-s-:1497004
packet:B1sE1tfjb:9712
packet:rkpZMxyj-:10350638:offlineCount
packet:rkHVnY-s-:546894:offlineCount
packet:rkHVnY-s-:1488548:offlineCount
packet:rkHVnY-s-:2181625:offlineCount
packet:rkpZMxyj-:11011856:offlineCount
packet:rkHVnY-s-:395142
packet:rkpZMxyj-:10538602:offlineCount
packet:rkHVnY-s-:436031
packet:rkpZMxyj-:10964108
packet:rkHVnY-s-:1398650
packet:rkHVnY-s-:1318657:offlineCount
packet:rkHVnY-s-:2740963
packet:rkHVnY-s-:2470460
packet:rkpZMxyj-:10730990:offlineCount
packet:rkpZMxyj-:11480655
packet:rkpZMxyj-:10232509:offlineCount
packet:rkHVnY-s-:1133929:offlineCount
packet:HJaFBtGj-:7516:offlineCount
packet:rkHVnY-s-:1972923
packet:rkpZMxyj-:10942961:offlineCount
packet:rkpZMxyj-:10423989
packet:rkHVnY-s-:727552
packet:rkHVnY-s-:2953210
packet:rkpZMxyj-:11786280
packet:rkpZMxyj-:11718671
packet:rkHVnY-s-:130524:offlineCount
packet:rkHVnY-s-:438247:offlineCount
packet:rkHVnY-s-:2291956
packet:rkHVnY-s-:1102555:offlineCount
packet:rkpZMxyj-:11599252:offlineCount
packet:rkHVnY-s-:149147:offlineCount
packet:rkpZMxyj-:10977916:offlineCount
packet:rkpZMxyj-:10653935
packet:rkpZMxyj-:10649496
packet:rkpZMxyj-:9943336
packet:rkpZMxyj-:10094030:offlineCount
packet:rkHVnY-s-:2434276:offlineCount
packet:rkHVnY-s-:1697471:offlineCount
packet:rkHVnY-s-:1301733
packet:rkHVnY-s-:690065:offlineCount
packet:rkHVnY-s-:958007:offlineCount
packet:rkHVnY-s-:1360575:offlineCount
....
many many lines....
I'm using MQTT with qos 1, what are all these messages, why are they not deleted? How can I investigate further?
Could this be because I'm using random client id on each connection, so when the client gets disconnected aedes holds messages which were not delivered forever? If so, how can I flush such messages?
I noticed we are storing qos=0 subs. Why?
there is a contradiction in abstract tests also
this test checks if qos=0 subs aren't counted! but this & this checks qos=0 are stored!
What is the correct behavior by spec @mcollina ?
System Information
Describe the bug
Wills never send out after a server restart when using redis persistence on a single aedes instance.
To Reproduce
Steps to reproduce the behavior:
Expected behavior
I would expect this will to publish, although perhaps by the letter of the spec the will goes out because of client issues.
Additional context
I suppose this will would publish in a multi process/server environment where one server would stop, otherwise it will never send the way I understand the code, and which my testing would prove out.
System Information
I'm using a translator
outgoing:undefined
is generated when broker is restarted with client of qos=1
When a client goes offline and back online, the broker no longer provides a buffer for the client
To Reproduce
Steps to reproduce the behavior:
qos=1
qos=1
outgoing:undefined
has been created in redisqos=1
System Information
Describe the bug
aedes-persistence-redis crashes when trying to send retained message of QoS=2
To Reproduce
aedes-persistence-redis
persistence layer. I have used mqtt-cli
package for it, with the following config:module.exports = {
protos: ['tcp', 'ws'],
host: '127.0.0.1',
wsPort: 6423,
brokerId: 'aedes-cli',
persistence: {
name: 'redis',
options: {
url: 'redis://127.0.0.1'
}
},
key: null,
cert: null,
rejectUnauthorized: true,
verbose: false,
veryVerbose: false,
noPretty: false
}
C1
(non-clean session), and send message to any topic with {retain: true, qos: 2}
C1
C2
(non-clean session), and subscribe to a topic used above with {qos: 2}
aedes-persistence-redis
with error ERROR: unknown key
Expected behavior
Should not crash
Additional context
After debugging a little, it seems that it crashes when it's trying to send PubRel
packet of the retained message. Aedes invokes RedisPersistence.prototype.outgoingUpdate
, which calls augmentWithBrokerData
. Inside this function, var key = that.messageIdCache.get(messageIdKey)
returns undefined
, which causes the crash
Is there any plan to add support for TS in the near future?
Current implementation for QOS=1 messages is as below:
SET out:clientId:brokerId
RPUSH clientList out:clientId:brokerId
SET out:clientId:brokerId includeMsgId
SET outId:clientId:msgId packetClone
GET outId:clientId:msgId
DEL outId:clientId:msgId
DEL out:clientId:brokerId
LREM clientList out:clientId:brokerId
I want to propose a more lighter schema, however let me ask some questions first @mcollina @GavinDmello
When this path runs? I mean on which cases we have no brokerId and augmentWithBrokerData will be envolved?
When delivering the packet, why should we store outId:clientId:msgId ? Couldn't it be an in memory mapping of msgId -> brokerId,Counter
? Why would we need persistence in between the forward and PUBACK? Since I think a single broker process will be envolved and we don't need process restart resilience.
When doing outgoingUpdate, why the original packet is updated? i.e. SET out:clientId:brokerId includeMsgId
? Should it really updated to contain the messageId?
Is your feature request related to a problem? Please describe.
When I load ttest aedes with redis persistence I found that only one of the six nodes gets very high CPU (100%) and the others are under 5% during the load test. The reason being retained records are all stored on that node. Digging into the code I see that all retained records are stored with 'retained' key and not something like 'retained:344545' . This can explain why the load is not equaly distributed.
Describe the solution you'd like
We could store retained records in the format retained:344545
Describe alternatives you've considered
Additional context
MQTT 5 allows session expiration, so at some point if someone decides to move ahead with implementation, the persistence modules will need some work done. At present I'm considering incorporating it into a MQTT 5 server I've been working on.
What would the roadmap be for this to happen, would the work have to start with aedes-cached-persistence, or would it have start with the individual persistences?
As an alternate, I suppose a person could do a plugin similar to the way the aedes-stats plugin works, but that may be problematic if the internals of the persistences change.
The way I'd see it, it could be separate api calls, or integrated, but essentially I'd be thinking something like:
sessionStart(clientId)
sessionRefresh(clientId) // Reset the session expiration
expiredSessions() // the broker would clear expired sessions based on returned items (client and client:* keys)
For redis I'm thinking about using a sorted set with value = clientId, score = expiration.
What about a caching option to cache the retained
hash key inside memory @mcollina ?
This way createRetainedStream
won't touch redis on each CONNECT, I believe this really improves on a crowded environment. What dya think @GavinDmello ?
How do I test this?
david@david-Latitude-E6440:~/aedes-persistence-redis$ npm test
> [email protected] test /home/david/aedes-persistence-redis
> standard && tape test.js | faucet
✓ store and look up retained messages
✓ look up retained messages with a # pattern
✓ look up retained messages with a + pattern
✓ look up retained messages with multiple patterns
✓ remove retained message
✓ storing twice a retained message should keep only the last
✓ store and look up subscriptions by client
✓ remove subscriptions by client
✓ store and look up subscriptions by topic
✓ get client list after subscriptions
✓ get client list after an unsubscribe
✓ get subscriptions list after an unsubscribe
✓ QoS 0 subscriptions, restored but not matched
✓ clean subscriptions
✓ clean subscriptions with no active subscriptions
✓ store and count subscriptions
✓ add duplicate subs to qlobber for qos > 0
✓ add duplicate subs to persistence for qos 0
✓ add outgoing packet and stream it
✓ add outgoing packet for multiple subs and stream to all
✓ add outgoing packet as a string and stream
✓ add outgoing packet and stream it twice
✓ add outgoing packet and update messageId
✓ add 2 outgoing packet and clear messageId
✓ update to pubrel
✓ add incoming packet, get it, and clear with messageId
✓ store, fetch and delete will message
✓ stream all will messages
✓ stream all will message for unknown brokers
✓ delete wills from dead brokers
✓ do not error if unkown messageId in outoingClearMessageId
# multiple persistences
/home/david/aedes-persistence-redis/node_modules/bluebird/js/release/async.js:61
fn = function () { throw arg; };
^
Error: Connection is closed.
at close (/home/david/aedes-persistence-redis/node_modules/ioredis/lib/redis/event_handler.js:101:21)
at Socket.<anonymous> (/home/david/aedes-persistence-redis/node_modules/ioredis/lib/redis/event_handler.js:76:14)
at Object.onceWrapper (events.js:316:30)
at emitOne (events.js:115:13)
at Socket.emit (events.js:210:7)
✓ multiple persistencese [as _onclose] (net.js:549:12)
not ok 146 no plan found
⨯ fail 1
npm ERR! Test failed. See above for more details.
Is there a way to use Redis sentinel?
I use redis sentinel.
I'd like to connect Redis Sentinel with Aedes and use it.
Add custom, non-standard configurable option to redis-persistence to support TTL on packet storage, subscription storage.
configuration can be both instance wide, and per message/subscription. For the later caller can pass ttlForMessage
and ttlForSubscription
function hooks which synchronously return ttl value for input message/sub argument.
client keys are also stored in a separate list, and when expired, that LIST would get out of sync which can be purged on next restart (_setup)
Hello,
I read issue #45 and realized that many things have improved since then, but I'm still facing unexpected memory usage and I'm really not sure if this is simply because of real usage data or a bug of some kind.
I have an aedes broker running on an EC2 instance and configured to use a Redis cache.t2.small instance (~1GB)
As you can see in the graph, I doubled it in size after reaching the max memory usage.
I don't understand why this is happening, since the number of devices connected and the size of the published retained data should not be that big after doing some calculations.
Could this be an issue? Is there an option to delete old data when there is no more free space?
My config is the following:
redisPersistence({
port: process.env.REDIS_PORT,
host: process.env.REDIS_HOST,
connectTimeout: 10000,
packetTTL: function () {
return 3600 * 24 * 30 // 1 month
},
})
Thanks!
The error occurs here. Steps to reproduce:
PUBLISH
message from the brokerPUBREC
I tried writing a simple snippet in MQTT.js
so that you can get this error, but I was not able to reproduce it with this client library, which made me think that the problem is in MQTT client we use (https://github.com/eclipse/paho.mqtt.golang). Anyway, client errors should not cause the broker to crash and simple fix such as:
if (!key) {
return cb(new Error('Unknown key'))
}
is sufficient. I can't tell if other scenarios can lead to this error, but this fix will cover all the cases where the key can't be found in the cache (for whatever reason). I'm willing to send a PR, but I didn't want to do it on my own, without opening this issue.
I'm running the
master
version of Aedes andv6.0.0
of Aedes-Persistence-Redis natively, along with Redis 5.0.6.
Hi macollina,
I need some help in implementing redis persistance (QoS2) . I am using following version of aedes and redis persistence.
"aedes": "^0.12.4",
"aedes-persistence-redis": "^1.0.3",
I am getting following error while subscribing with clean : false and QoS 2.
_2016-02-02T10:02:39.277Z - info: new client andr_766a0ee9f38e6061
2016-02-02T10:02:39.277Z - info: message published from broker to $SYS/EJaKawFYe/new/clients 0
2016-02-02T10:02:39.583Z - error: Error: no such packet
at project/node_modules/aedes-persistence-redis/persistence.js:465:27
at tryCatcher (project/node_modules/aedes-persistence-redis/node_modules/ioredis/node_modules/bluebird/js/main/util.js:26:23)
at Promise.successAdapter (project/node_modules/aedes-persistence-redis/node_modules/ioredis/node_modules/bluebird/js/main/nodeify.js:23:30)
at Promise._settlePromiseAt (project/node_modules/aedes-persistence-redis/node_modules/ioredis/node_modules/bluebird/js/main/promise.js:579:21)
at Promise._settlePromises (project/node_modules/aedes-persistence-redis/node_modules/ioredis/node_modules/bluebird/js/main/promise.js:697:14)
at Async._drainQueue (project/node_modules/aedes-persistence-redis/node_modules/ioredis/node_modules/bluebird/js/main/async.js:123:16)
at Async._drainQueues (project/node_modules/aedes-persistence-redis/node_modules/ioredis/node_modules/bluebird/js/main/async.js:133:10)
at Async.drainQueues (project/node_modules/aedes-persistence-redis/node_modules/ioredis/node_modules/bluebird/js/main/async.js:15:14)
at process.tickCallback (node.js:448:13)
2016-02-02T10:02:39.595Z - error: client error andr_766a0ee9f38e6061 no such packet
2016-02-02T10:02:39.595Z - info: Client disconnected - andr_766a0ee9f38e6061
2016-02-02T10:03:16.335Z - info: message published from broker to $SYS/EJaKawFYe/heartbeat 0
Please let me know if you can gaze something wrong which I am doing, or possible steps which can rectify the issue.
Thanks
I would like to subscribe / observe, how aedes interacting with Redis. Like what operations being called ?
why i would like to do that - Azure application-insights
AI able to track standard dependencies like redis
. So that Azure cloud can get insights between micro-services and other services. aedes-persistence-redis
is using ioredis
, so azure AI cant track this dependency yet. I could manually track ioredis
to azure, just needs a way to track "interaction between Aedes AND redis".
@mcollina Why?
The modules I have configured are shown below.
3 aedes Node Server(port : 1883) pm2 Cluster
1 Nginx Server(port : 40379) Load Balancer
3 Redis Master(port : 41379, port : 42379, port : 43379)
Process : Aedes -> Nginx -> Redis
Case 1 : Load Balancer Direct Connect
var aedesOption = {
concurrency : 1000
, connectTimeout : 60000
, heartbeatInterval : 600000
};
aedesOption.persistence = require('aedes-persistence-redis')({port : 60379, host : '127.0.0.1'})
aedesOption.mq = require('mqemitter-redis')({port : 60379, host : '127.0.0.1'})
aedes = require('aedes')(aedesOption);
Error Log
E:\jskim\eclipse-workspace_mars\workspace_mars\aedes-sample\node_modules\bluebird\js\release\async.js:61
fn = function () { throw arg; };
^
TypeError: Cannot convert undefined or null to object
at returnSubsForClient (E:\jskim\eclipse-workspace_mars\workspace_mars\aedes-sample\node_modules\aedes-persistence-redis\persistence.js:245:24)
at returnSubs (E:\jskim\eclipse-workspace_mars\workspace_mars\aedes-sample\node_modules\aedes-persistence-redis\persistence.js:239:20)
at tryCatcher (E:\jskim\eclipse-workspace_mars\workspace_mars\aedes-sample\node_modules\bluebird\js\release\util.js:16:23)
at Promise.errorAdapter [as _rejectionHandler0] (E:\jskim\eclipse-workspace_mars\workspace_mars\aedes-sample\node_modules\bluebird\js\release\nodeify.js:35:34)
at Promise._settlePromise (E:\jskim\eclipse-workspace_mars\workspace_mars\aedes-sample\node_modules\bluebird\js\release\promise.js:564:21)
at Promise._settlePromise0 (E:\jskim\eclipse-workspace_mars\workspace_mars\aedes-sample\node_modules\bluebird\js\release\promise.js:612:10)
at Promise._settlePromises (E:\jskim\eclipse-workspace_mars\workspace_mars\aedes-sample\node_modules\bluebird\js\release\promise.js:687:18)
at Async._drainQueue (E:\jskim\eclipse-workspace_mars\workspace_mars\aedes-sample\node_modules\bluebird\js\release\async.js:133:16)
at Async._drainQueues (E:\jskim\eclipse-workspace_mars\workspace_mars\aedes-sample\node_modules\bluebird\js\release\async.js:143:10)
at Immediate.Async.drainQueues (E:\jskim\eclipse-workspace_mars\workspace_mars\aedes-sample\node_modules\bluebird\js\release\async.js:17:14)
at runCallback (timers.js:637:20)
at tryOnImmediate (timers.js:610:5)
at processImmediate [as _immediateCallback] (timers.js:582:5)
Case 2 : Cluster Connect
var aedesOption = {
concurrency : 1000
, connectTimeout : 60000
, heartbeatInterval : 600000
};
aedesOption.persistence = require('aedes-persistence-redis')
([
{ port : 61379, host : '127.0.0.1' }
, { port : 62379, host : '127.0.0.1' }
, { port : 63379, host : '127.0.0.1' }
])
aedesOption.mq = require('mqemitter-redis')
([
{ port : 61379, host : '127.0.0.1' }
, { port : 62379, host : '127.0.0.1' }
, { port : 63379, host : '127.0.0.1' }
])
aedes = require('aedes')(aedesOption);
Hi Mcollina,
As per the documentation, I tried using
var redisPersistence = require('aedes-persistence-redis')(6379, '192.168.1.1');
but I was not able to connect to redis.
I think this is a typo and should be raw = all[key]
.
I can also fix this in my PR if approved.
Should this be an option?
aedes-persistence-redis/persistence.js
Line 37 in bd60484
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.