swarthy / redis-semaphore Goto Github PK
View Code? Open in Web Editor NEWDistributed mutex and semaphore based on Redis
License: MIT License
Distributed mutex and semaphore based on Redis
License: MIT License
I have about 32 workers running the following process:
const sema = new Semaphore(
redisClient,
'my-key',
100,
{
acquireTimeout: 30 * 60 * 1000,
lockTimeout: 10 * 60 * 1000,
refreshInterval: 1 * 60 * 1000,
},
);
const transferItem = async (key: string): Promise<void> => {
try {
await sema.acquire();
return await asyncWorkTakingApprox100ms(key);
} finally {
void sema.release();
}
}
// later, inside my SQS event consumer
const chunks = chunkIdentifiers(itemIdentifiers);
for await (const chunk of chunks) {
await Promise.all(chunk.map((item) => transferItem(item)));
}
Whenever contention for the semaphore's lock gets high, all of the workers simultaneously crash with Lock Lost errors. I can't for the life of me figure out what's going on -- I increased the lock timeout a ton just to see if for some reason a transfer operation was taking forever, but this happens within seconds of receiving high loads, so there's no way that's it. I'm happy to provide more information, but I'm not sure what would be useful.
I am getting 'Lost mutex for key xxx' when using 2 seperate Mutexes on 2 seperate services as part of my moleculer-based app. Just wondering what sort of thing I should be looking for that might cause this? I can't think right now how to construct a minimal example that replicates this behaviour, sorry, Just a few clues would be cool, especially since they are coming up as UnhandledPromiseRejectionWarning
directly from RedisMutex._refresh
with no stack trace.
My codes:
const {Semaphore, TimeoutError} = require('redis-semaphore');
const Redis = require('ioredis');
redis = new Redis.Cluster(
[
{
host: REDIS_HOST,
port: 6379
}
],
{
slotsRefreshTimeout: 10000,
redisOptions: {
password: REDIS_TOKEN,
tls: false
}
}
)
async function doSomething() {
const semaphore = new Semaphore(redis, 'lockingResource', 5)
await semaphore.acquire()
try {
// maximum 5 simultaneously executions
} finally {
await semaphore.release()
}
}
When running new Semaphore(redis, 'lockingResource', 5), the following error throwed:
Error: "client" must be instance of ioredis client or cluster
at new RedisMutex (C:\workspace\node-test\node_modules\redis-semaphore\lib\RedisMutex.js:19:19)
at new RedisSemaphore (C:\workspace\node-test\node_modules\redis-semaphore\lib\RedisSemaphore.js:12:9)
Hello!
I'm using this library to control distributed mutexes on our application and, sometimes, I get an error of connectionTimeout on Redis during the lock refresh. This is an infrastructure problem of course, but the library currently doesn't offer me a way to treat this error. I'm using v5.50 of the lib and using node 20. Here's my error Stack:
Error: Command timed out
at optimizedEval (/app/node_modules/redis-semaphore/lib/utils/createEval.js:24:34)
at refreshMutex (/app/node_modules/redis-semaphore/lib/mutex/refresh.js:26:55)
at RedisMutex._refresh (/app/node_modules/redis-semaphore/lib/RedisMutex.js:31:49)
at RedisMutex._processRefresh (/app/node_modules/redis-semaphore/lib/Lock.js:97:42)
at listOnTimeout (node:internal/timers:573:17)
at process.processTimers (node:internal/timers:514:7)
I think it'd be great if I could inform some listener to treat those unhandled errors during refresh
When a mutex instance's scheduled lock refresh is delayed, the lock may expire. This leaves a gap of time for another mutex instance to acquire and release the lock before the first instance's refresh runs. In this case, the first mutex silently re-acquires the lock, hiding the fact that mutual exclusion was not in fact guaranteed during its lifetime.
If the lock is expired at the time of going to refresh the lock, the mutex should call the onLockLost callback, to signal that the mutex guarantee was not held.
Hi. I have noticed recently that it seems as though the mutexes in my app are no longer released when the process is paused or stopped. Perhaps / probably it is a misunderstanding on my part, but I wonder if you could clarify.
Is it the case that they ARE released, but only AFTER the lockTimeout has expired? (I am not explicitly setting refreshInterval, so that will remain the default)
I have quite long lockTimeouts, in some cases hours, maybe that is why I can't see the lock released.
Is there a way to make the lock release immediately on a process crash or stop?
I'm having an issue with this on lambda.
After doing some tests and reading the discussion /issues/9, still failed to fix this issue.
Edit 1:
Btw, I'm not releasing the lock.
Hi me again!
I switched my semaphores to the lib, but I am still using the mutex/lock from node-redlock
because it supports the redlock algorithm. Basically it ensures that the slave instances are also pinged in the lock process so even if there is a failover from the master there is no chance that a concurrent job can acquire a lock it wasn't supposed to.
It might be hard/time consuming to do, so this is really more a wish if/when you have time :)
Thanks!
If mutex never acquired lock in the first place, the rest of release
operation makes no sense, so it should return early without invoking any lua scripts.
I can send a PR for this if this is a desireable change.
Hi, thank you for your awesome lib!
I used to use the externallyAcquiredIdentifier
option of Mutex
to implement a reentrant distributed lock. That's to say, if multi locks have the same identifier, they will be treated as the same lock holder. It is convenient because there are many complex scenarios I don't want to add a lock state argument to all my functions involved and pass through everywhere. Instead, I can use transaction id as an identifier to indicate that if the mutex has been acquired by the outer stack frame, just go through. This is an example test case:
test('reentrant', async ({ assert }) => {
const identifier = randomUUID();
const lock1 = new Mutex(redis, key, {
// ...,
externallyAcquiredIdentifier: identifier
});
const lock2 = new Mutex(redis, key, {
// ...,
externallyAcquiredIdentifier: identifier
});
assert.isTrue(await lock1.tryAcquire());
assert.isTrue(await lock2.tryAcquire());
});
But it was broken at version 5.3.1 it this commit: 3f70489#diff-f30ff58349492ea4098fb2a9d1e13e5ea42c34b003959dc4d1e8a008bfcdf313
Is there any alternative or suggestion?
It would be helpful to be able to import TimeoutError class for instanceof checks
Hello,
I am using mutex to synchronize mutiple apollo servers (10 Docker images on a single computer) and everything works as expected when the computer is up and running. Only one apollo server instance is doing the job at a time.
But when the computer awakes after being in sleep mode, then things go wrong.
I get the following error messages even with try/catch everywhere :
apollo_7 | (node:55) UnhandledPromiseRejectionWarning: Error: Lost mutex for key mutex:LogsCleanerMutex
apollo_7 | at RedisMutex._processRefresh (/usr/src/app/node_modules/redis-semaphore/src/Lock.ts:85:15)
apollo_7 | at processTicksAndRejections (internal/process/task_queues.js:97:5)
apollo_7 | (node:55) UnhandledPromiseRejectionWarning: Unhandled promise rejection. This error originated either by throwing inside of an async function without a catch block, or by rejecting a promise which was not handled with .catch(). To terminate the node process on unhandled promise rejection, use the CLI flag --unhandled-rejections=strict
(see https://nodejs.org/api/cli.html#cli_unhandled_rejections_mode). (rejection id: 2)
Multiple apollo server instances are doing the same job.
Versions in use :
Please can you help ?
I get a weird error when I call .aquire()
with more than 3999 permits.
const resource = 'x';
const maxPermits = 5000;
const permitsToAquire = 4000;
const semaphore = new MultiSemaphore(redisClient, resource, maxPermits, permitsToAquire);
await semaphore.acquire(); // Error
Error:
ReplyError: ERR Error running script (call to f_10c1c2e05e7c48aa5824d227b8c51abcde87b18e): @user_script:18: user_script:18: too many results to unpack
at parseError (/Users/severinbuhler/git/lnroute/ln-route-node/node_modules/redis-parser/lib/parser.js:179:12)
at parseType (/Users/severinbuhler/git/lnroute/ln-route-node/node_modules/redis-parser/lib/parser.js:302:14) {
command: {
name: 'evalsha',
args: [
'10c1c2e05e7c48aa5824d227b8c51abcde87b18e',
'1',
'semaphore:x',
'5000',
'4000',
'372c2438-ac42-4376-84d2-4327dd648684',
'20000',
'1625537113196'
]
}
}
At the moment, MultiSemaphore
is not usable with more than 4000+ permits. It would be great to get some help :)
Best regards
Severin
Hi,
I have a case in which I need to pass an ioredis-mock instance for testing but I get the following error:
"client" must be instance of ioredis client or cluster
Is there a solution or workaround for this case?
Thanks!
See https://martin.kleppmann.com/2016/02/08/how-to-do-distributed-locking.html for the explanation of a problem.
Is there any way to use this library with node-redis rather than ioredis?
Here's my simple test code:
const Redis = require('ioredis').default;
const { Semaphore } = require('redis-semaphore');
const REDIS = new Redis();
const sem = new Semaphore(REDIS, 'lockingres', 5, { lockTimeout: 60000 });
console.log('wait for connect...');
REDIS.on('connect', async () => {
console.log('starting...');
for (let i = 0; i < 20; i++) {
await sem.acquire();
console.log('got ', i);
// note: semaphore NOT released
}
})
The expected behavior is that the code will immediately print got 0
through got 4
, then pause for 60s until the locks timeout.
This is indeed what happens with [email protected]
(and prior):
$ yarn add [email protected]
$ node test.js
wait for connect...
starting...
got 0
got 1
got 2
got 3
got 4
^C
However, for all versions from 3.1.0
, the limit doesn't seem to be respected, and all acquires succeed immediately:
$ yarn add [email protected]
$ node test.js
wait for connect...
starting...
got 0
got 1
got 2
got 3
got 4
got 5
got 6
got 7
...
Am I doing something wrong?
I want to avoid reaching to a 3rd party api rate limit.
The way I'd like to achieve it is to call acquire
from src/fair-semaphore/index.ts
, call the api, and then call to refresh
without release
so the lock is released lockTimeout
milliseconds after the api call completed.
The problem is that for some reason I can't import items from this folder. I'm kind of new to typescript world, perhaps I'm missing something, but when I cloned the project and put the src folder inside my project it worked, but it doesn't work with the package from npm.
Great library btw!
redis-node-1:
container_name: redis-node-1
image: bitnami/redis-cluster:7.2
volumes:
- redis-node-1:/data
environment:
- REDIS_PASSWORD=test
- "REDIS_NODES=redis-node-1 redis-node-2 redis-node-3"
ports:
- "6379:6379"
networks:
- network-v2
redis-node-2:
container_name: redis-node-2
image: bitnami/redis-cluster:7.2
volumes:
- redis-node-2:/data
environment:
- REDIS_PASSWORD=test
- "REDIS_NODES=redis-node-1 redis-node-2 redis-node-3"
ports:
- "6380:6379"
networks:
- network-v2
redis-node-3:
image: bitnami/redis-cluster:7.2
volumes:
- redis-node-2:/data
ports:
- "6381:6379"
environment:
- "REDIS_PASSWORD=test"
- "REDISCLI_AUTH=test"
- "REDIS_CLUSTER_REPLICAS=0"
- "REDIS_NODES=redis-node-1 redis-node-2 redis-node-3"
- "REDIS_CLUSTER_CREATOR=yes"
depends_on:
- redis-node-1
- redis-node-2
networks:
- network-v2
My redis connection using ioredis is able to connect and write data to it
import { scribe } from "@/logger/scribe";
import * as IORedis from "ioredis";
import { LockOptions } from "redis-semaphore";
import Config from "../../config";
// This class also maintains references to all the locks and semaphores
// And while going down, it will release all the locks and semaphores
// This will help us to run multiple replicas of the same service
class Redis {
private static _instance: Redis;
private readonly cluster: IORedis.Cluster;
private locks: Map<string, Lock> = new Map();
private semaphores: Map<string, Semaphore> = new Map();
constructor() {
// Config.Redis.ClusterNodes contains host and port array [{host:"",port:6789}...]
this.cluster = new IORedis.Cluster(Config.Redis.ClusterNodes, {
redisOptions: {
password: Config.Redis.Password,
},
});
this.cluster.on("ready", () => {
scribe.info("[ โ
] Redis ready");
});
this.cluster.on("error", (err) => {
scribe.error("[ ๐ฅ ] Redis error", err);
});
}
public async ping(): Promise<void> {
await this.cluster.ping();
await this.cluster.set("lastPing", new Date().toISOString());
}
}
When I run ping()
, I can see the corresponding key being set
I initialize a mutex like this
new RedlockMutext(cluster.nodes("master"), key, options)
It always results in
Acquire redlock-semaphore semaphore:dev:xxxx:xxxxxx:xxxxx:0 timeout',
stack: 'Error: Acquire redlock-semaphore semaphore:dev:xxxx:xxxxxx:xxxxx:0 timeout\n' +
' at MaximSemaphore.acquire (/xxx/xxx/xxx/xxx/node_modules/redis-semaphore/src/Lock.ts:140:13)\n' +
' at RetryOperation.operation.attempt.timeout [as _fn] (webpack-internal:///(rsc)/./src/lib/services/xxx/xxx.ts:352:21)
function createCluster() {
const nodes = [
{ host: 'localhost', port: 6379 },
{ host: 'localhost', port: 6380 },
{ host: 'localhost', port: 6381 }
]
console.log('-----', nodes)
const client = new Redis.Cluster(nodes, {
redisOptions: {
password: 'test',
lazyConnect: true,
autoResendUnfulfilledCommands: false, // dont queue commands while server is offline (dont break test logic)
maxRetriesPerRequest: 0 // dont retry, fail faster (default is 20)
// https://github.com/luin/ioredis#auto-reconnect
// retryStrategy is a function that will be called when the connection is lost.
// The argument times means this is the nth reconnection being made and the return value represents how long (in ms) to wait to reconnect.
},
lazyConnect: true,
enableOfflineQueue: false,
clusterRetryStrategy: () => {
return 100 // for tests we disable increasing timeout
}
})
client.on('error', err => {
console.log('Redis client error:', err.message)
})
return client
}
export const cluster = createCluster()
import { allClients, client1, client2, client3, cluster } from '../redisClient'
const timeoutOptions: TimeoutOptions = {
lockTimeout: 300,
acquireTimeout: 100,
refreshInterval: 80,
retryInterval: 10
}
async function expectGetAll(key: string, value: string | null) {
await expect(
Promise.all([client1.get(key), client2.get(key), client3.get(key)])
).to.become([value, value, value])
}
describe('RedlockMutex', () => {
it('should acquire and release lock using cluster', async () => {
const mutex = new RedlockMutex(cluster.nodes('master'), 'key')
expect(mutex.isAcquired).to.be.false
await mutex.acquire()
expect(mutex.isAcquired).to.be.true
await expectGetAll('mutex:key', mutex.identifier)
await mutex.release()
expect(mutex.isAcquired).to.be.false
await expectGetAll('mutex:key', null)
})
RedlockMutex
1) should acquire and release lock using cluster
0 passing (10s)
1 failing
1) RedlockMutex
should acquire and release lock using cluster:
Error: Acquire redlock-mutex mutex:key timeout
at RedlockMutex.acquire (src/Lock.ts:140:13)
at async Context.<anonymous> (test/src/RedlockMutex.test.ts:59:5)
โ redis-semaphore git:(master) โ
Hello,
Thank you for writing this library. Our team wants to use this in production as we find it very valuable for our use case.
As we went up our deployment chain, we go from single instance Redis to clustered. We encountered this and looking at the code it seems that a cluster configuration is not supported. When passing in our Cluster client instance for ioredis, we get:
'"client" must be instance of ioredis client'
error.
Is this by design?
Would you accept a MR that changes RedisMutex
from this:
if (!(client instanceof Redis)) {
throw new Error('"client" must be instance of ioredis client')
}
to this:
if (!(client instanceof Redis) && !(client instanceof Redis.Cluster)) {
throw new Error('"client" must be instance of ioredis client')
}
In addition to max concurrent locks, provide the ability to limit the max number of locks that can be acquired per second. Similar to functionality of https://github.com/SGrondin/bottleneck.
Hi wonderful developers, I was wondering if I am able to use Cluster
type from ioredis
library with Mutex?
Sometimes it is useful to be able to attempt acquiring lock, but back off immediately if it's already taken by someone else (e. g. you want to skip processing item entirely if someone else is already doing it)
Implementing this with redis-semaphore currently requires setting timeout to 0, and then catching and swallowing a timeout error, which is not ideal (throwing errors is an expensive operation).\
Would you be open to either implementing a different type of lock for this scenario, or alternative method for Mutex, e. g. tryAcquire, which returns either true or false, based on whether lock acquisition was successful?
I can provide a PR for this, based on which solution you prefer.
Could you kindly elaborate on why refreshInterval
is useful? It looks like we need to disable it (keep on losing locks when using identifier
) and I'm wondering what we'll be missing without it.
Thanks!
semaphore.acquire() does not return success / fail
reply is undefined
Hi there!
Currently the semaphore acquire
methods doesn't accept any parameter. This means that if a user requires more than one token/lock/resource, he needs to call the method twice (thus make two calls to redis). It would be valuable if we could only pass it an integer and be done with it.
An example of this use case is an application making calls to an external system with a limit on the concurrent connections (say 4). If I need to make 2 concurrent connections or none to this service, I want to be able to acquire 2 tokens/locks from the semaphore or error out.
Thanks a lot!
Just curious, I should be able to see the keys generated by Mutex in Redis by using
scan 0 MATCH *mutex:*
right? Because I can't.
I am definitely on the right Redis instance because when I stop that instance I get back ERROR: Error 10061 connecting to localhost:6379. No connection could be made because the target machine actively refused it.
What subtle/dumb thing have I done wrong....?
Hi,
Our team detected that there are breaking changes coming from an old version (1.x) to 3.1.0+
The CHANGELOG.md mentions about a breaking change, but it says to replace from FairSemaphore
to Semaphore
but it doesn't warn that the acquire
method no longer returns a boolean, and rather, throws when it cannot acquire, and returns nothing.
I think it would be good to add this into the CHANGELOG.md entry, in case other are coming from an old version (like us).
Here's a PR: #136
Thank you!
Luis
execution environment
Error contents
When I run a command that uses Lua, as shown below, it throws NOSCRIPT No matching script. Please use EVAL.
.
const lock = new Semaphore(redisClient, redisKey, 1, lockOptions)
await lock.acquire()
Workaround
It seems that the error is not caught in createEval.ts line 32.
We do not know the cause, but if you fix line 31 as follows, the error is caught.
Before
return (await client.evalsha(sha1, keysCount, .... . args)) as Promise<Result>
After
const result = (await client.evalsha(sha1, keysCount, .... .args)) as Promise<Result>
return result
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.