We are running Redis server ver 6.2.7 with hiredis ver 1.1.0 & hiredis-cluster ver 0.9
1] In our existing load test we create a single async context (redisClusterAsyncContext *asyncCtx
) using which we perform SET/GET operations on a 3 Node cluster. This test works well. We have performed multiple long duration runs with this test on > 1 million keys without any issues.
2] For testing pub/sub, we have now created another connection with a new context: redisClusterAsyncContext *asyncCtxPS
:
asyncCtxPS = redisClusterAsyncContextInit();
redisClusterSetOptionAddNodes(asyncCtxPS->cc, cluster_ip);
redisClusterConnect2(asyncCtxPS->cc);
As connections with Redis Nodes get established as a result of the key operations performed in step 1 above, we trigger subscription in the connect callback:
void connectCallback(const redisAsyncContext *ac, int status) {
nodeIterator ni;
cluster_node *clusterNode;
....
....
initNodeIterator(&ni, asyncCtx->cc);
while ((clusterNode = nodeNext(&ni)) != NULL) {
if (ac == clusterNode->acon) {
redisClusterAsyncCommandToNode(asyncCtx, clusterNode, configCb, NULL, "CONFIG SET notify-keyspace-events Eh");
redisClusterAsyncCommandToNode(asyncCtxPS, clusterNode, eventCb, NULL, "PSUBSCRIBE __key*__:*");
....
....
At present, the above two pub/sub commands are invoked only on the first connected Node, not on all 3 Nodes.
void eventCb(redisClusterAsyncContext *cc, void *r, void *privdata) {
redisReply *reply = (redisReply *)r;
if (r == NULL) {
return;
}
if (reply->type == REDIS_REPLY_ARRAY) {
for (int j = 0; j < reply->elements; j++) {
printf("%u) %s\n", j, reply->element[j]->str);
}
}
}
In response, we see the following in the callback:
0) psubscribe
1) __key*__:*
2) (null)
(null)
here doesn't seem correct. Subscription is successful if we run the above PSUBSCRIBE
directly on redis-cli. What could be the issue here?
A couple of seconds later, we notice a crash:
#0 0x00007ffff7fa70d3 in redisClusterAsyncCallback (ac=0x5d94d0, r=0x0, privdata=0x5da940)
at /....../hircluster.c:3794
#1 0x00007ffff7fb45f2 in __redisRunCallback (ac=0x5d94d0, cb=0x5da9a0, reply=0x0)
at /....../async.c:311
#2 0x00007ffff7fb4979 in __redisAsyncFree (ac=0x5d94d0)
at /....../async.c:386
#3 0x00007ffff7fb4bbf in __redisAsyncDisconnect (ac=0x5d94d0)
at /....../async.c:450
#4 0x00007ffff7fb52b2 in redisProcessCallbacks (ac=0x5d94d0)
at /....../async.c:625
#5 0x00007ffff7fb560a in redisAsyncRead (ac=0x5d94d0)
at /....../async.c:717
#6 0x00007ffff7fb569b in redisAsyncHandleRead (ac=0x5d94d0)
at /....../async.c:738
#7 0x00000000004eb13d in redisLibmyeventHandlerRead (fd=<optimized out>, arg=0x5d7de0)
at /....../hiredis_libmyevent.h:27
Line 625 of async.c
is:
605 if (__redisShiftCallback(&ac->replies,&cb) != REDIS_OK) {
606 /*
607 * A spontaneous reply in a not-subscribed context can be the error
608 * reply that is sent when a new connection exceeds the maximum
609 * number of allowed connections on the server side.
610 *
611 * This is seen as an error instead of a regular reply because the
612 * server closes the connection after sending it.
613 *
614 * To prevent the error from being overwritten by an EOF error the
615 * connection is closed here. See issue #43.
616 *
617 * Another possibility is that the server is loading its dataset.
618 * In this case we also want to close the connection, and have the
619 * user wait until the server is ready to take our request.
620 */
621 if (((redisReply*)reply)->type == REDIS_REPLY_ERROR) {
622 c->err = REDIS_ERR_OTHER;
623 snprintf(c->errstr,sizeof(c->errstr),"%s",((redisReply*)reply)->str);
624 c->reader->fn->freeObject(reply);
625 __redisAsyncDisconnect(ac);
626 return;
627 }
If we disable pub/sub in step 2 above, the test runs well again.
Please help debug the issue. Let us know if any addition information is required.
Thank you.