Giter Club home page Giter Club logo

redisqueue's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar

redisqueue's Issues

Add ability to set up stream messages TTY with XTRIM MINID

This issue is some kind of a feature request. Redisqueue has a great ability to limit a stream size by evaluating StreamMaxLength field of the ProducerOptions:

// StreamMaxLength sets the MAXLEN option when calling XADD. This creates a
// capped stream to prevent the stream from taking up memory indefinitely.
// It's important to note though that this isn't the maximum number of
// completed messages, but the maximum number of total messages. This
// means that if all consumers are down, but producers are still enqueuing,
// and the maximum is reached, unprocessed message will start to be dropped.
// So ideally, you'll set this number to be as high as you can makee it.
// More info here: https://redis.io/commands/xadd#capped-streams.
StreamMaxLength int64

It works great but does not give proper flexibility in the problem of limiting streams by time which leads to hard stream size limits which could lead to huge stream size if it has big messages and big size in order to offer a huge capacity for lots of consumers or load.

Redis Streams message can not have TTY with expire command https://redis.io/commands/expire but there is a tread with another feature request for the Redis. redis/redis#4450 (comment) So we could emulate a TTY for a Redis Streams messages by this logic:

  • we have a method in this library that get a stream name and time string (ex. "-7 days" to store only messages for the last 7 days in the stream) that define the TTY point for stream messages and callback for error
  • this function has a Redis client inside
  • it makes a call to Redis with XINFO GROUPS command which receive the last delivered ids for each consumer group
  • we compare values from a previous step with TTY time object and check if the stream has undelivered messages
  • if all the messages were delivered to consumers we could run XTRIM MINID command to remove old messages (it will work only for Redis 6.2 https://redis.io/commands/xtrim#history)
  • if the stream has consumers which had not received messages that tend to be deleted, we can run a special error callback function with empty interfaces inside which gives the ability to developer to handle somehow this situation (logging, alerting, etc...)

Soon, we will need to develop such functionality to our services but I think that it would be great to not reinvent the wheel but have this code inside the Redisqueue.

Support creating groups from end of stream, not just beginning

Because the message ID we're specifying when creating the group is always 0, consumers will need to process all messages in the stream before getting to the latest message. For a system I'm building, I'm looking to have the semantics be more lossy when a new consumer comes to life.

Would it be possible to extend the API to include a way to specify whether we want all messages, or only new messages.

Here is the relevant line of code:

err := c.redis.XGroupCreateMkStream(stream, c.options.GroupName, "0").Err()

Add ability to inject *redis.Client instead of *redis.Options

I'd like to continually health check the Redis client by doing a simple SET / GET transaction every n seconds, and if it fails trigger a process restart.

Unfortunately, I have no way to use the same Redis pool as the publisher/consumer which puts extra load on the Redis cluster and doesn't fully test the ability to use Redis.

I was wondering if there'd be a strong objection to adding a *redis.Client to both ConsumerOptions and ProducerOptions. I'm thinking of retaining the *redis.Options for now, and to only use that if *redis.Client is nil.

When multiple listeners are registered, a xreadgroup splicing error occurs.

// .....

queue.Register(global.OrderEvent, event.OrderHandle)
queue.Register(global.ImMessageEvent, event.ImMessageHandle)
queue.Register(global.PlayListEvent, event.PlaylistHandle)
go func() {
for err := range queue.Errors {
log.Println("Error occurred:", err)
}
}()
go queue.Run()

...................

Code as above, sometimes there will be errors

error reading redis stream: ERR Invalid stream ID specified as stream command argument

Go to the redis client to listen and you can see: xreadgroup group redisqueue DESKTOP-TMASQJ5 count 100 block 5000 streams im_message_event_queue order_event_queue playlist_event_queue order_event_queue > > > im_message_event_queue playlist_event_queue > > >

The correct one should be: xreadgroup group redisqueue DESKTOP-TMASQJ5 count 200 block 5000 streams order_event_queue im_message_event_queue playlist_event_queue > > >

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.