thoas / bokchoy Goto Github PK
View Code? Open in Web Editor NEWSimple job queues for Go backed by Redis
License: MIT License
Simple job queues for Go backed by Redis
License: MIT License
It needs fine tuning from the README
071d83d Introduced a change that makes ids predictable and no longer random.
This needs to be fixed, either replace ulid by a global counter or generate a random entropy source.
Noticed some issues with the queue processing new jobs if redis connection drops for a bit and then comes back.
Timeline is something like:
Any idea what's going wrong here? Any ideas for a different config? At the moment I'm just providing the credentials and the redis URL.
Everything works fine locally with the single/client mode.
But when I run the code on production with cluster mode, the consumer never receives anything from the queue.
Hello!
My focus atm is on instrumentation, I wrote my logger and my tracer using zap. It is kind of working I think, but I do not understand the difference. Looking at the Tracer and how it is used in queue.go
it logs and that's it. I would like to do a trace such as OpenTracing, OpenTeletry. Do you have any suggestions? I think I can do a mix of middleware/event but I would like to get a direction from you.
Thanks a lot
Hello!
Thanks a lot for your work. I have a question, I would like to delay the execution of a task when something inside the handlers fails.
Let's assume I am calling an API that rates limit requests for a day when 1000 calls are reached.
When I catch a failed request to that API caused by a rate-limiting I would like to re-enqueue the task with a couple of hours delay.
I saw that task as TTL
I should for sure increase that, but what else can I do to delay the next execution? I am also not sure if the modification to a task inside a handler gets stored or not.
Thanks!
Imagine the following scenario:
1234
1234
is submitted to the same queueWhat will happen in this scenario?
Will it process both tasks or will it disregard the second one (as it's already processing)?
I can publish and process a task using the example you providie.
When . I tried to publish using my application it appears not to be written to my DB.
This is the log entry I am seeing: logging/logging.go:129 Task published {"logger": "bokchoy", "component": "queue", "queue": {"name": "suspend", "consumers_count": 2}, "duration": "6.290392ms", "task": {"id": "01DV2D28B0MQJHBF4QX1EFD6Y3", "name": "suspend", "status": "waiting", "payload": "{crn:v1:staging:public:is:us-south-1:a/78d00cc88d254fba849cdd8a8e4f1618::instance:dc4b93ef-2c02-4e4b-bd4c-35deb6c39c6e}", "max_retries": 3, "ttl": "3m0s", "timeout": "3m0s", "published_at": "2019-12-02T04:14:30.240Z", "retry_intervals": "30s, 30s, 30s"}}
Doing a get with the task ID does not return anything.
Intermediately seeing this error: NOSCRIPT No matching script. Please use EVAL.
Is there a recovery?
I have been implementing bokchoy actively to replace our existing queue library. There is an issue with tasks being retried and when there are connectivity issues.
For example, I create a task which always fails and being retried every 5s.
Then I have the network connectivity issues to Redis and do hard-stop of my application.
When a task is retried there is a key in Redis delay
containing task ID:
Type: Sorted Set (1 Members)
# | Score | Value
-- | -- | --
0 | 1566470199 | 01DJWE7F5WWCQ16BMYGHSK4CAX
However, if it happens to be a coincidence of redis connectivity issues and I stop an app, delay
key is removed and the task remains always in queue with Failed status, MaxRetries >=0, some ETA.
To fix this we need whether to do some "transaction" on delay
key so it is never lost or to create a cleanup goroutine to garbage collect Failed tasks having MaxRetries >=0 and re-queue them.
Also it would be great to have a method to tell a task to be re-processed if "max_retries" field is removed (no more attempts left) or some other case like above. I know I can modify MaxRetries, ETA etc. but need to put a task id into delay
set anyway.
Thanks!
Hi
How can I process tasks in queues created after Engine started? All this tasks have Status "waiting" and Queue "start" method is unexported
I am using the Publish method like this
task, err := c.engine.Queue(queueName).Publish(context.Background(), entry)
if err != nil {
c.errs <- err
return
}
but eventually, I always encounter this error
panic: runtime error: index out of range [-2]
goroutine 9634 [running]:
math/rand.(*rngSource).Uint64(...)
/usr/local/go/src/math/rand/rng.go:249
math/rand.(*rngSource).Int63(0xc0000af500, 0x290b9eb2ba0ec300)
/usr/local/go/src/math/rand/rng.go:234 +0x93
math/rand.(*Rand).Int63(...)
/usr/local/go/src/math/rand/rand.go:85
math/rand.read(0xc0030a3116, 0xa, 0xa, 0xc0058956b0, 0xc000098d70, 0xc000098d78, 0x10, 0xc0030a3110, 0xc000161400)
/usr/local/go/src/math/rand/rand.go:272 +0x64
math/rand.(*Rand).Read(0xc000098d50, 0xc0030a3116, 0xa, 0xa, 0xc005895720, 0x100e258, 0x10)
/usr/local/go/src/math/rand/rand.go:264 +0x106
io.ReadAtLeast(0x143b6a0, 0xc000098d50, 0xc0030a3116, 0xa, 0xa, 0xa, 0x16b8620, 0x4b8f430, 0xed5645a4e)
/usr/local/go/src/io/io.go:310 +0x87
io.ReadFull(...)
/usr/local/go/src/io/io.go:329
github.com/oklog/ulid.New(0x16e7dd3e8ff, 0x143b6a0, 0xc000098d50, 0xc002ce5a40, 0xc00564c440, 0xc0030a30f0, 0xc0030a30e0)
/Users/tsatke/go/pkg/mod/github.com/oklog/[email protected]/ulid.go:96 +0xc9
github.com/oklog/ulid.MustNew(...)
/Users/tsatke/go/pkg/mod/github.com/oklog/[email protected]/ulid.go:105
github.com/thoas/bokchoy.id(0xc002ce5a40, 0x1000000010d75c7)
/Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/utils.go:16 +0x53
github.com/thoas/bokchoy.NewTask(0x13c59ad, 0xd, 0x13596a0, 0xc0030d86c0, 0x0, 0x0, 0x0, 0x10)
/Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/task.go:51 +0x6d
github.com/thoas/bokchoy.(*Queue).NewTask(0xc00012e000, 0x13596a0, 0xc0030d86c0, 0x0, 0x0, 0x0, 0x1)
/Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/queue.go:427 +0xd3
github.com/thoas/bokchoy.(*Queue).Publish(0xc00012e000, 0x14411c0, 0xc00001a078, 0x13596a0, 0xc0030d86c0, 0x0, 0x0, 0x0, 0xc004c83f40, 0x0, ...)
/Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/queue.go:445 +0x67
github.com/TimSatke/crawler/crawler.(*Crawler).Enqueue(0xc000098f30, 0xc002cecd80, 0x29)
/Users/tsatke/Desktop/crawler/crawler/crawler.go:123 +0xd2
github.com/TimSatke/crawler/crawler.(*Crawler).processReference(0xc000098f30, 0xc0030ca680)
/Users/tsatke/Desktop/crawler/crawler/crawler.go:235 +0x49
github.com/TimSatke/crawler/crawler.(*Crawler).processAnchorNode(0xc000098f30, 0xc003012c80, 0xc002072e70)
/Users/tsatke/Desktop/crawler/crawler/crawler.go:230 +0x13b
github.com/TimSatke/crawler/crawler.(*Crawler).processHTMLInternal(0xc000098f30, 0xc003012c80, 0xc002072e70)
/Users/tsatke/Desktop/crawler/crawler/crawler.go:199 +0x9e
github.com/TimSatke/crawler/crawler.(*Crawler).processHTMLInternal(0xc000098f30, 0xc003012c80, 0xc002072e00)
/Users/tsatke/Desktop/crawler/crawler/crawler.go:202 +0x69
github.com/TimSatke/crawler/crawler.(*Crawler).processHTMLInternal(0xc000098f30, 0xc003012c80, 0xc002059030)
/Users/tsatke/Desktop/crawler/crawler/crawler.go:202 +0x69
github.com/TimSatke/crawler/crawler.(*Crawler).processHTMLInternal(0xc000098f30, 0xc003012c80, 0xc001c16700)
/Users/tsatke/Desktop/crawler/crawler/crawler.go:202 +0x69
github.com/TimSatke/crawler/crawler.(*Crawler).processHTMLInternal(0xc000098f30, 0xc003012c80, 0xc001c16690)
/Users/tsatke/Desktop/crawler/crawler/crawler.go:202 +0x69
github.com/TimSatke/crawler/crawler.(*Crawler).processHTMLInternal(0xc000098f30, 0xc003012c80, 0xc001c16000)
/Users/tsatke/Desktop/crawler/crawler/crawler.go:202 +0x69
github.com/TimSatke/crawler/crawler.(*Crawler).processHTMLInternal(0xc000098f30, 0xc003012c80, 0xc001c15a40)
/Users/tsatke/Desktop/crawler/crawler/crawler.go:202 +0x69
github.com/TimSatke/crawler/crawler.(*Crawler).processHTMLInternal(0xc000098f30, 0xc003012c80, 0xc001c157a0)
/Users/tsatke/Desktop/crawler/crawler/crawler.go:202 +0x69
github.com/TimSatke/crawler/crawler.(*Crawler).processHTMLInternal(0xc000098f30, 0xc003012c80, 0xc0065abea0)
/Users/tsatke/Desktop/crawler/crawler/crawler.go:202 +0x69
github.com/TimSatke/crawler/crawler.(*Crawler).processHTMLInternal(0xc000098f30, 0xc003012c80, 0xc0065abdc0)
/Users/tsatke/Desktop/crawler/crawler/crawler.go:202 +0x69
github.com/TimSatke/crawler/crawler.(*Crawler).processHTML(0xc000098f30, 0xc003012c80, 0x1b5fdd8, 0xc003e7ae70, 0xc003e7ae70, 0x0)
/Users/tsatke/Desktop/crawler/crawler/crawler.go:193 +0x8f
github.com/TimSatke/crawler/crawler.(*Crawler).process(0xc000098f30, 0xc007073080, 0x22, 0x0, 0x0)
/Users/tsatke/Desktop/crawler/crawler/crawler.go:184 +0x29d
github.com/TimSatke/crawler/crawler.(*Crawler).Worker(0xc000098f30, 0xc0005ebc80, 0x20, 0x20)
/Users/tsatke/Desktop/crawler/crawler/crawler.go:149 +0xb8
github.com/thoas/bokchoy.HandlerFunc.Handle(0xc000124040, 0xc0005ebc80, 0x100d9e6, 0xc0001f7e68)
/Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/handler.go:8 +0x30
github.com/thoas/bokchoy.(*consumer).handleTask.func1.2(0xc0005ebc80, 0xc0001dc001, 0xc005227340)
/Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/consumer.go:73 +0x64
github.com/thoas/bokchoy.HandlerFunc.Handle(0xc005227320, 0xc0005ebc80, 0xc0001f7ec0, 0x12df49e)
/Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/handler.go:8 +0x30
github.com/thoas/bokchoy.(*consumer).handleRequest.func1(0xc0005ebc80, 0xc005227340, 0xc005227320)
/Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/consumer.go:162 +0x5c
github.com/thoas/bokchoy.HandlerFunc.Handle(0xc005227340, 0xc0005ebc80, 0x1, 0x143bf60)
/Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/handler.go:8 +0x30
github.com/thoas/bokchoy.(*consumer).handleTask.func1(0xc000b71c80, 0xc002484900, 0xc000130120, 0xc0005ebc80)
/Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/consumer.go:74 +0x1b8
created by github.com/thoas/bokchoy.(*consumer).handleTask
/Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/consumer.go:56 +0x13c
Why is that?
Hello,
When a task is not found in Redis, it returns an error with message cannot cast id: Attribute error
.
Looking into the code, I see there is actually a check to tell whether the task is not found in Redis:
Line 228 in 0421bea
But I think that check will always be false because of
Line 325 in 0421bea
Started using v0.2.0.
The example https://github.com/thoas/bokchoy/blob/master/examples/custom-broker/main.go stopped working because there are private fields which have to be initialized.
My code:
broker, err = bokchoy.New(ctx, bokchoy.Config{},
bokchoy.WithBroker(&bokchoy.RedisBroker{Client: myRedisClient, ClientType: "client"}),
type RedisBroker requires the following fields to be initialized:
mu *sync.Mutex
queues map[string]struct{}
and it's only done in newRedisBroker() which is private.
Without those 2 being initialized there are 2 panics:
1.
panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x13c7912]
goroutine 79 [running]:
sync.(*Mutex).Lock(...)
/usr/local/Cellar/go/1.13.7/libexec/src/sync/mutex.go:74
github.com/thoas/bokchoy.(*RedisBroker).consumeDelayed(0xc000545440, 0x1d0ec60, 0xc0000d0038, 0xc0003ceda0, 0x1d, 0x3b9aca00)
/Users/weber/gopath/pkg/mod/github.com/thoas/[email protected]/broker_redis.go:183 +0x42
panic: assignment to entry in nil map
goroutine 52 [running]:
github.com/thoas/bokchoy.(*RedisBroker).consumeDelayed(0xc0005bf0e0, 0x1d0ec20, 0xc0000380c8, 0xc0003882e0, 0x1d, 0x3b9aca00)
/Users/weber/gopath/pkg/mod/github.com/thoas/[email protected]/broker_redis.go:228 +0x1fe
So this makes impossible to use a custom broker (custom redis client in my case).
Thanks!
I'm running a processor using Run(ctx). Everything works great until I restart redis server. It's not uncommon for our systems to have hiccups, so I would hope that the server picks up where it left off. It simply stops working and I don't see anything unless I turn on development logging. Then I see the following:
ERROR logging/logging.go:123 Received error when retrieving delayed tasks {"error": "unable to ZPOPBYSCORE PrefixQueue:delay: NOSCRIPT No matching script. Please use EVAL.", "errorVerbose": "NOSCRIPT No matching script. Please use EVAL.\nunable to ZPOPBYSCORE PrefixQueue:delay\ngithub.com/thoas/bokchoy.(*redisBroker).Consume\n\t/userdir/go/pkg/mod/github.com/thoas/[email protected]/broker_redis.go:198\ngithub.com/thoas/bokchoy.(*Queue).consume\n\t/userdir/go/pkg/mod/github.com/thoas/[email protected]/queue.go:308\ngithub.com/thoas/bokchoy.(*Queue).ConsumeDelayed\n\t/userdir/go/pkg/mod/github.com/thoas/[email protected]/queue.go:299\ngithub.com/thoas/bokchoy.(*Queue).consumeDelayedTasks.func1\n\t/userdir/go/pkg/mod/github.com/thoas/[email protected]/queue.go:150\nruntime.goexit\n\t/usr/local/go/src/runtime/asm_amd64.s:1337"}
github.com/thoas/bokchoy/logging.wrapLogger.Error
/userdir/go/pkg/mod/github.com/thoas/[email protected]/logging/logging.go:123
github.com/thoas/bokchoy.loggerTracer.Log
/userdir/go/pkg/mod/github.com/thoas/[email protected]/tracer.go:32
github.com/thoas/bokchoy.(*Queue).consumeDelayedTasks.func1
/userdir/go/pkg/mod/github.com/thoas/[email protected]/queue.go:152
Hello ๐,
It would be great it we could by-pass the serializer so we can unmarshal (manually) a specific struct
, and not a generic map
.
Or, provide the struct
as payload so the serializer could use this instance.
For simplicity, I would prefer the second option.
For flexibility, the first one: we could receive a payload that is schema-less (a webhook for example) and depending of its type (defined by a keyword or composite information), unmarshal a specific struct
.
If you have further question, don't hesitate to ask.
Cheers,
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.