Comments (5)
Thanks for the replies 😀
pending, err := q.redis.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: q.stream,
Group: q.streamGroup,
Idle: q.opt.ReservationTimeout,
Start: "-",
End: "+", // <--- I'm wondering if this should be time.Now().Add(-1 * q.opt.ReservationTimeout)?
Count: batchSize,
}).Result()
I imagined a scenario like below, correct me if I've misunderstood anything:
- consumer1 makes
xreadgroup
call: get 10 msgs - consumer2 makes
xreadgroup
call: get 10 other msgs - consumer3 makes
xreadgroup
call: get 10 other msgs
So far the 30 msgs are all treated as pending
. when consumer1 has successfully processed say, the first msg, it makes xdel
call to delete that message (although the message is still pending
, even deleted). This is the behavior of taskq
that it doesn't xack
the msg but simply xdel
it <-- this means no other group will be able to consume it again, which I find it's ok for use cases where a msg only needs to be processed once by consumers from one group.
Now say, one of the consumers grabs the lock and runs schedulePending
, and if it makes xpending
for the period -
to +
(with default count 100): it can possibly get "old enough" msgs from the dead consumers (this is good: msgs got stuck with a dead consumer can be now rescheduled and reprocessed), but also it can get "very recent" messages that are already given to consumers who are actually still alive (which is not good: it grabs work from that alive consumer which could have processed that msg).
Hence I wonder if the intention of schedulePending
is only to schedule "old enough" messages, e.g., from -
to now - 5min
? Since the parameter is named ReservationTimeout
and defaults to 5min, it looks like we would give that "consumer" (not sure alive or dead or just "too slow") at least 5 min to process it, and only reschedule it if the msg is at least 5min old.
from taskq.
AFAIR it is supposed to release stuck messages, i.e. messages that were not processed in expected time so we assume that the consumer is dead and re-queue the messages.
from taskq.
func (q *Queue) schedulePending(ctx context.Context) (int, error) {
tm := time.Now().Add(q.opt.ReservationTimeout)
start := strconv.FormatInt(unixMs(tm), 10)
pending, err := q.redis.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: q.stream,
Group: q.streamGroup,
Start: start,
End: "+",
Count: batchSize,
}).Result()
I don't know if I got this correctly, @zaiyiguo 's idea is that the param start
is in the future, so xpending now+5min
will never be able to get the message not processed.
from taskq.
Yes, it looks like you both are right. It probably should be
pending, err := q.redis.XPendingExt(ctx, &redis.XPendingExtArgs{
Stream: q.stream,
Group: q.streamGroup,
Idle: q.opt.ReservationTimeout,
Start: "-",
End: "+",
Count: batchSize,
}).Result()
Is it meant to requeue any pending messages reserved by any consumers in the same group (including other consumers in the group, not necessarily the "current consumer")? Because the xpending command doesn't specify the consumer's name.
The current consumer is presumably dead / restarting. I think we can't rely on it to come back.
from taskq.
That sounds correct. In SQS this is called visibility timeout and SQS does such cleanup automatically.
<--- I'm wondering if this should be time.Now().Add(-1 * q.opt.ReservationTimeout)?
I honestly don't know what works the best with Redis, but you have the right idea what to do :)
from taskq.
Related Issues (20)
- Named tasks can not be recreated after they complete HOT 4
- How to stop a running queue at runtime?? HOT 2
- Please add q.wg.Wait() to redisq.go HOT 2
- Why is the consumer started in the memqueue.NewQueue() method? HOT 1
- Redis queue visibility HOT 3
- message receive delay when using SQS HOT 1
- Redisq Queue Scheduler Time Sleep Option HOT 2
- Setting WorkerLimit in QueueOptions throws a nil pointer while using a Memqueue HOT 1
- Nil context field on message
- Support Google PubSub HOT 1
- Is taskq production ready? HOT 3
- add message to queue that has a timeout to be processed HOT 1
- I encountered a version conflict when using this project HOT 4
- System resource checks in kubernetes environments
- invalid memory address or nil pointer dereference
- taskqotel v4.0.0-beta.4 not exists
- message's Context doesn't save when a worker fetch the message in v3
- v4 is not working with github.com/redis/[email protected] HOT 1
- v4 is not working with github.com/redis/go-redis/[email protected]
- taskq: system does not have enough free resources
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from taskq.