Giter Club home page Giter Club logo

bokchoy's People

Contributors

adamrt avatar celant avatar dependabot[bot] avatar kefu-reddit avatar roman-vynar avatar thoas avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

bokchoy's Issues

How to handle errors?

It needs fine tuning from the README

  • What's happening when an error is catched?
  • When an task is returning an explicit error?
  • When a task is panicking?

ids no longer unique

071d83d Introduced a change that makes ids predictable and no longer random.
This needs to be fixed, either replace ulid by a global counter or generate a random entropy source.

Having issues if redis drops and comes back

Noticed some issues with the queue processing new jobs if redis connection drops for a bit and then comes back.

Timeline is something like:

  • All is good
  • Redis connection drops
  • Get unable to publish error
  • Redis connection comes back after a bit
  • Able to publish again
  • Consumer not consuming the queued jobs
  • Restart the whole app
  • Everything working fine again

Any idea what's going wrong here? Any ideas for a different config? At the moment I'm just providing the credentials and the redis URL.

Redis cluster mode doesn't work properly

Everything works fine locally with the single/client mode.
But when I run the code on production with cluster mode, the consumer never receives anything from the queue.

Question: Tracer vs logger

Hello!

My focus atm is on instrumentation, I wrote my logger and my tracer using zap. It is kind of working I think, but I do not understand the difference. Looking at the Tracer and how it is used in queue.go it logs and that's it. I would like to do a trace such as OpenTracing, OpenTeletry. Do you have any suggestions? I think I can do a mix of middleware/event but I would like to get a direction from you.

Thanks a lot

Delay task when it fails

Hello!
Thanks a lot for your work. I have a question, I would like to delay the execution of a task when something inside the handlers fails.
Let's assume I am calling an API that rates limit requests for a day when 1000 calls are reached.
When I catch a failed request to that API caused by a rate-limiting I would like to re-enqueue the task with a couple of hours delay.
I saw that task as TTL I should for sure increase that, but what else can I do to delay the next execution? I am also not sure if the modification to a task inside a handler gets stored or not.
Thanks!

Are Task IDs unique

Imagine the following scenario:

  • A certain queue contains is processing a task with the ID 1234
  • While that item is being processed, another task with the ID 1234 is submitted to the same queue

What will happen in this scenario?

Will it process both tasks or will it disregard the second one (as it's already processing)?

Task not written to data base.

I can publish and process a task using the example you providie.

When . I tried to publish using my application it appears not to be written to my DB.

This is the log entry I am seeing: logging/logging.go:129 Task published {"logger": "bokchoy", "component": "queue", "queue": {"name": "suspend", "consumers_count": 2}, "duration": "6.290392ms", "task": {"id": "01DV2D28B0MQJHBF4QX1EFD6Y3", "name": "suspend", "status": "waiting", "payload": "{crn:v1:staging:public:is:us-south-1:a/78d00cc88d254fba849cdd8a8e4f1618::instance:dc4b93ef-2c02-4e4b-bd4c-35deb6c39c6e}", "max_retries": 3, "ttl": "3m0s", "timeout": "3m0s", "published_at": "2019-12-02T04:14:30.240Z", "retry_intervals": "30s, 30s, 30s"}}

Doing a get with the task ID does not return anything.

Task may stop retrying on a specific condition

I have been implementing bokchoy actively to replace our existing queue library. There is an issue with tasks being retried and when there are connectivity issues.

For example, I create a task which always fails and being retried every 5s.
Then I have the network connectivity issues to Redis and do hard-stop of my application.

When a task is retried there is a key in Redis delay containing task ID:

Type: Sorted Set (1 Members)

# | Score | Value
-- | -- | --
0 | 1566470199 | 01DJWE7F5WWCQ16BMYGHSK4CAX

However, if it happens to be a coincidence of redis connectivity issues and I stop an app, delay key is removed and the task remains always in queue with Failed status, MaxRetries >=0, some ETA.

To fix this we need whether to do some "transaction" on delay key so it is never lost or to create a cleanup goroutine to garbage collect Failed tasks having MaxRetries >=0 and re-queue them.

Also it would be great to have a method to tell a task to be re-processed if "max_retries" field is removed (no more attempts left) or some other case like above. I know I can modify MaxRetries, ETA etc. but need to put a task id into delay set anyway.

Thanks!

Queues created afrer Engine started

Hi
How can I process tasks in queues created after Engine started? All this tasks have Status "waiting" and Queue "start" method is unexported

math.Rand doesn't support concurrency

I am using the Publish method like this

task, err := c.engine.Queue(queueName).Publish(context.Background(), entry)
if err != nil {
	c.errs <- err
	return
}

but eventually, I always encounter this error

panic: runtime error: index out of range [-2]

goroutine 9634 [running]:
math/rand.(*rngSource).Uint64(...)
        /usr/local/go/src/math/rand/rng.go:249
math/rand.(*rngSource).Int63(0xc0000af500, 0x290b9eb2ba0ec300)
        /usr/local/go/src/math/rand/rng.go:234 +0x93
math/rand.(*Rand).Int63(...)
        /usr/local/go/src/math/rand/rand.go:85
math/rand.read(0xc0030a3116, 0xa, 0xa, 0xc0058956b0, 0xc000098d70, 0xc000098d78, 0x10, 0xc0030a3110, 0xc000161400)
        /usr/local/go/src/math/rand/rand.go:272 +0x64
math/rand.(*Rand).Read(0xc000098d50, 0xc0030a3116, 0xa, 0xa, 0xc005895720, 0x100e258, 0x10)
        /usr/local/go/src/math/rand/rand.go:264 +0x106
io.ReadAtLeast(0x143b6a0, 0xc000098d50, 0xc0030a3116, 0xa, 0xa, 0xa, 0x16b8620, 0x4b8f430, 0xed5645a4e)
        /usr/local/go/src/io/io.go:310 +0x87
io.ReadFull(...)
        /usr/local/go/src/io/io.go:329
github.com/oklog/ulid.New(0x16e7dd3e8ff, 0x143b6a0, 0xc000098d50, 0xc002ce5a40, 0xc00564c440, 0xc0030a30f0, 0xc0030a30e0)
        /Users/tsatke/go/pkg/mod/github.com/oklog/[email protected]/ulid.go:96 +0xc9
github.com/oklog/ulid.MustNew(...)
        /Users/tsatke/go/pkg/mod/github.com/oklog/[email protected]/ulid.go:105
github.com/thoas/bokchoy.id(0xc002ce5a40, 0x1000000010d75c7)
        /Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/utils.go:16 +0x53
github.com/thoas/bokchoy.NewTask(0x13c59ad, 0xd, 0x13596a0, 0xc0030d86c0, 0x0, 0x0, 0x0, 0x10)
        /Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/task.go:51 +0x6d
github.com/thoas/bokchoy.(*Queue).NewTask(0xc00012e000, 0x13596a0, 0xc0030d86c0, 0x0, 0x0, 0x0, 0x1)
        /Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/queue.go:427 +0xd3
github.com/thoas/bokchoy.(*Queue).Publish(0xc00012e000, 0x14411c0, 0xc00001a078, 0x13596a0, 0xc0030d86c0, 0x0, 0x0, 0x0, 0xc004c83f40, 0x0, ...)
        /Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/queue.go:445 +0x67
github.com/TimSatke/crawler/crawler.(*Crawler).Enqueue(0xc000098f30, 0xc002cecd80, 0x29)
        /Users/tsatke/Desktop/crawler/crawler/crawler.go:123 +0xd2
github.com/TimSatke/crawler/crawler.(*Crawler).processReference(0xc000098f30, 0xc0030ca680)
        /Users/tsatke/Desktop/crawler/crawler/crawler.go:235 +0x49
github.com/TimSatke/crawler/crawler.(*Crawler).processAnchorNode(0xc000098f30, 0xc003012c80, 0xc002072e70)
        /Users/tsatke/Desktop/crawler/crawler/crawler.go:230 +0x13b
github.com/TimSatke/crawler/crawler.(*Crawler).processHTMLInternal(0xc000098f30, 0xc003012c80, 0xc002072e70)
        /Users/tsatke/Desktop/crawler/crawler/crawler.go:199 +0x9e
github.com/TimSatke/crawler/crawler.(*Crawler).processHTMLInternal(0xc000098f30, 0xc003012c80, 0xc002072e00)
        /Users/tsatke/Desktop/crawler/crawler/crawler.go:202 +0x69
github.com/TimSatke/crawler/crawler.(*Crawler).processHTMLInternal(0xc000098f30, 0xc003012c80, 0xc002059030)
        /Users/tsatke/Desktop/crawler/crawler/crawler.go:202 +0x69
github.com/TimSatke/crawler/crawler.(*Crawler).processHTMLInternal(0xc000098f30, 0xc003012c80, 0xc001c16700)
        /Users/tsatke/Desktop/crawler/crawler/crawler.go:202 +0x69
github.com/TimSatke/crawler/crawler.(*Crawler).processHTMLInternal(0xc000098f30, 0xc003012c80, 0xc001c16690)
        /Users/tsatke/Desktop/crawler/crawler/crawler.go:202 +0x69
github.com/TimSatke/crawler/crawler.(*Crawler).processHTMLInternal(0xc000098f30, 0xc003012c80, 0xc001c16000)
        /Users/tsatke/Desktop/crawler/crawler/crawler.go:202 +0x69
github.com/TimSatke/crawler/crawler.(*Crawler).processHTMLInternal(0xc000098f30, 0xc003012c80, 0xc001c15a40)
        /Users/tsatke/Desktop/crawler/crawler/crawler.go:202 +0x69
github.com/TimSatke/crawler/crawler.(*Crawler).processHTMLInternal(0xc000098f30, 0xc003012c80, 0xc001c157a0)
        /Users/tsatke/Desktop/crawler/crawler/crawler.go:202 +0x69
github.com/TimSatke/crawler/crawler.(*Crawler).processHTMLInternal(0xc000098f30, 0xc003012c80, 0xc0065abea0)
        /Users/tsatke/Desktop/crawler/crawler/crawler.go:202 +0x69
github.com/TimSatke/crawler/crawler.(*Crawler).processHTMLInternal(0xc000098f30, 0xc003012c80, 0xc0065abdc0)
        /Users/tsatke/Desktop/crawler/crawler/crawler.go:202 +0x69
github.com/TimSatke/crawler/crawler.(*Crawler).processHTML(0xc000098f30, 0xc003012c80, 0x1b5fdd8, 0xc003e7ae70, 0xc003e7ae70, 0x0)
        /Users/tsatke/Desktop/crawler/crawler/crawler.go:193 +0x8f
github.com/TimSatke/crawler/crawler.(*Crawler).process(0xc000098f30, 0xc007073080, 0x22, 0x0, 0x0)
        /Users/tsatke/Desktop/crawler/crawler/crawler.go:184 +0x29d
github.com/TimSatke/crawler/crawler.(*Crawler).Worker(0xc000098f30, 0xc0005ebc80, 0x20, 0x20)
        /Users/tsatke/Desktop/crawler/crawler/crawler.go:149 +0xb8
github.com/thoas/bokchoy.HandlerFunc.Handle(0xc000124040, 0xc0005ebc80, 0x100d9e6, 0xc0001f7e68)
        /Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/handler.go:8 +0x30
github.com/thoas/bokchoy.(*consumer).handleTask.func1.2(0xc0005ebc80, 0xc0001dc001, 0xc005227340)
        /Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/consumer.go:73 +0x64
github.com/thoas/bokchoy.HandlerFunc.Handle(0xc005227320, 0xc0005ebc80, 0xc0001f7ec0, 0x12df49e)
        /Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/handler.go:8 +0x30
github.com/thoas/bokchoy.(*consumer).handleRequest.func1(0xc0005ebc80, 0xc005227340, 0xc005227320)
        /Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/consumer.go:162 +0x5c
github.com/thoas/bokchoy.HandlerFunc.Handle(0xc005227340, 0xc0005ebc80, 0x1, 0x143bf60)
        /Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/handler.go:8 +0x30
github.com/thoas/bokchoy.(*consumer).handleTask.func1(0xc000b71c80, 0xc002484900, 0xc000130120, 0xc0005ebc80)
        /Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/consumer.go:74 +0x1b8
created by github.com/thoas/bokchoy.(*consumer).handleTask
        /Users/tsatke/go/pkg/mod/github.com/thoas/[email protected]/consumer.go:56 +0x13c

Why is that?

Priority tasks

Ability to add priorities to tasks, example:

<Task id=1 priority=5>
<Task id=2 priority=0>
<Task id=3 priority=6>

In this case, the process will be: task3, task1, task2.

BLPOP supports multiple list keys, it might be a solution.

Suggested by @novln

2 panics when using custom redis broker

Started using v0.2.0.
The example https://github.com/thoas/bokchoy/blob/master/examples/custom-broker/main.go stopped working because there are private fields which have to be initialized.
My code:

broker, err = bokchoy.New(ctx, bokchoy.Config{},
	bokchoy.WithBroker(&bokchoy.RedisBroker{Client: myRedisClient, ClientType: "client"}),

type RedisBroker requires the following fields to be initialized:

mu         *sync.Mutex
queues     map[string]struct{}

and it's only done in newRedisBroker() which is private.

Without those 2 being initialized there are 2 panics:
1.

panic: runtime error: invalid memory address or nil pointer dereference
[signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x13c7912]

goroutine 79 [running]:
sync.(*Mutex).Lock(...)
	/usr/local/Cellar/go/1.13.7/libexec/src/sync/mutex.go:74
github.com/thoas/bokchoy.(*RedisBroker).consumeDelayed(0xc000545440, 0x1d0ec60, 0xc0000d0038, 0xc0003ceda0, 0x1d, 0x3b9aca00)
	/Users/weber/gopath/pkg/mod/github.com/thoas/[email protected]/broker_redis.go:183 +0x42
panic: assignment to entry in nil map

goroutine 52 [running]:
github.com/thoas/bokchoy.(*RedisBroker).consumeDelayed(0xc0005bf0e0, 0x1d0ec20, 0xc0000380c8, 0xc0003882e0, 0x1d, 0x3b9aca00)
	/Users/weber/gopath/pkg/mod/github.com/thoas/[email protected]/broker_redis.go:228 +0x1fe

So this makes impossible to use a custom broker (custom redis client in my case).

Thanks!

Restarting Redis causes error retrieving delayed tasks

I'm running a processor using Run(ctx). Everything works great until I restart redis server. It's not uncommon for our systems to have hiccups, so I would hope that the server picks up where it left off. It simply stops working and I don't see anything unless I turn on development logging. Then I see the following:

ERROR	logging/logging.go:123	Received error when retrieving delayed tasks	{"error": "unable to ZPOPBYSCORE PrefixQueue:delay: NOSCRIPT No matching script. Please use EVAL.", "errorVerbose": "NOSCRIPT No matching script. Please use EVAL.\nunable to ZPOPBYSCORE PrefixQueue:delay\ngithub.com/thoas/bokchoy.(*redisBroker).Consume\n\t/userdir/go/pkg/mod/github.com/thoas/[email protected]/broker_redis.go:198\ngithub.com/thoas/bokchoy.(*Queue).consume\n\t/userdir/go/pkg/mod/github.com/thoas/[email protected]/queue.go:308\ngithub.com/thoas/bokchoy.(*Queue).ConsumeDelayed\n\t/userdir/go/pkg/mod/github.com/thoas/[email protected]/queue.go:299\ngithub.com/thoas/bokchoy.(*Queue).consumeDelayedTasks.func1\n\t/userdir/go/pkg/mod/github.com/thoas/[email protected]/queue.go:150\nruntime.goexit\n\t/usr/local/go/src/runtime/asm_amd64.s:1337"}
github.com/thoas/bokchoy/logging.wrapLogger.Error
	/userdir/go/pkg/mod/github.com/thoas/[email protected]/logging/logging.go:123
github.com/thoas/bokchoy.loggerTracer.Log
	/userdir/go/pkg/mod/github.com/thoas/[email protected]/tracer.go:32
github.com/thoas/bokchoy.(*Queue).consumeDelayedTasks.func1
	/userdir/go/pkg/mod/github.com/thoas/[email protected]/queue.go:152

By-pass serializer or pass struct to be serialized

Hello ๐Ÿ‘‹,

It would be great it we could by-pass the serializer so we can unmarshal (manually) a specific struct, and not a generic map.
Or, provide the struct as payload so the serializer could use this instance.

For simplicity, I would prefer the second option.
For flexibility, the first one: we could receive a payload that is schema-less (a webhook for example) and depending of its type (defined by a keyword or composite information), unmarshal a specific struct.

If you have further question, don't hesitate to ask.

Cheers,

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.