Giter Club home page Giter Club logo

Comments (5)

zaiyiguo avatar zaiyiguo commented on June 18, 2024 1

Thanks for the replies 😀

pending, err := q.redis.XPendingExt(ctx, &redis.XPendingExtArgs{
		Stream: q.stream,
		Group:  q.streamGroup,
		Idle: q.opt.ReservationTimeout,
		Start:  "-",
		End:    "+",  // <--- I'm wondering if this should be time.Now().Add(-1 * q.opt.ReservationTimeout)?
		Count:  batchSize,
	}).Result()

I imagined a scenario like below, correct me if I've misunderstood anything:

  • consumer1 makes xreadgroup call: get 10 msgs
  • consumer2 makes xreadgroup call: get 10 other msgs
  • consumer3 makes xreadgroup call: get 10 other msgs

So far the 30 msgs are all treated as pending. when consumer1 has successfully processed say, the first msg, it makes xdel call to delete that message (although the message is still pending, even deleted). This is the behavior of taskq that it doesn't xack the msg but simply xdel it <-- this means no other group will be able to consume it again, which I find it's ok for use cases where a msg only needs to be processed once by consumers from one group.

Now say, one of the consumers grabs the lock and runs schedulePending, and if it makes xpending for the period - to + (with default count 100): it can possibly get "old enough" msgs from the dead consumers (this is good: msgs got stuck with a dead consumer can be now rescheduled and reprocessed), but also it can get "very recent" messages that are already given to consumers who are actually still alive (which is not good: it grabs work from that alive consumer which could have processed that msg).

Hence I wonder if the intention of schedulePending is only to schedule "old enough" messages, e.g., from - to now - 5min? Since the parameter is named ReservationTimeout and defaults to 5min, it looks like we would give that "consumer" (not sure alive or dead or just "too slow") at least 5 min to process it, and only reschedule it if the msg is at least 5min old.

from taskq.

vmihailenco avatar vmihailenco commented on June 18, 2024

AFAIR it is supposed to release stuck messages, i.e. messages that were not processed in expected time so we assume that the consumer is dead and re-queue the messages.

from taskq.

lilien1010 avatar lilien1010 commented on June 18, 2024
func (q *Queue) schedulePending(ctx context.Context) (int, error) {
	tm := time.Now().Add(q.opt.ReservationTimeout)
	start := strconv.FormatInt(unixMs(tm), 10)

	pending, err := q.redis.XPendingExt(ctx, &redis.XPendingExtArgs{
		Stream: q.stream,
		Group:  q.streamGroup,
		Start:  start,
		End:    "+",
		Count:  batchSize,
	}).Result()

I don't know if I got this correctly, @zaiyiguo 's idea is that the param start is in the future, so xpending now+5min will never be able to get the message not processed.

from taskq.

vmihailenco avatar vmihailenco commented on June 18, 2024

Yes, it looks like you both are right. It probably should be

	pending, err := q.redis.XPendingExt(ctx, &redis.XPendingExtArgs{
		Stream: q.stream,
		Group:  q.streamGroup,
		Idle: q.opt.ReservationTimeout,
		Start:  "-",
		End:    "+",
		Count:  batchSize,
	}).Result()

Is it meant to requeue any pending messages reserved by any consumers in the same group (including other consumers in the group, not necessarily the "current consumer")? Because the xpending command doesn't specify the consumer's name.

The current consumer is presumably dead / restarting. I think we can't rely on it to come back.

from taskq.

vmihailenco avatar vmihailenco commented on June 18, 2024

That sounds correct. In SQS this is called visibility timeout and SQS does such cleanup automatically.

<--- I'm wondering if this should be time.Now().Add(-1 * q.opt.ReservationTimeout)?

I honestly don't know what works the best with Redis, but you have the right idea what to do :)

from taskq.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.