gocraft / work Goto Github PK
View Code? Open in Web Editor NEWProcess background jobs in Go
License: MIT License
Process background jobs in Go
License: MIT License
Hi @cypriss,
Thanks for the work on work. ๐
Case in point, in my current setup, I'm loading twitter followers with the job args:
{"twitter_id": 123456, "next_cursor_str": "-1", "ids": []}
If I'm rate limited by twitter API, I schedule a new job from within the current job with the args:
{"twitter_id": 123456, "next_cursor_str": "something", "ids": [1,2,3,4,5]}
(I have to make a query for uniqueness at this time)
If I were to use work
for background processing, it seems like embedding partial args like above will have to go unless:
a) There's support to include "extra" args, which do not count towards job uniqueness; or
b) I use a reference to external cache. This sounds reasonably good for my case since the ids
part can get pretty large.
With (a) it would be guaranteed that there's only one followers checking job for a user with twitter_id 123456. The next_cursor_str
and ids
can move into the extra/cache args. What do you think? Are there breaking concerns about how work
is implemented, or is this simple enough to do?
I'm willing to take a stab at this with some guidance on what and how this would be possible.
It would be nice to have webui.Server
implement http.Handler
so it can be mounted in an existing app without having to run another process.
You could also just make the web.Router
public or embedded.
First of all keep up the good work! awesome! Btw, just out of curiosity any chance to stop/delete periodic job? unfortuntely i cant see any, am i right?It seems to be implemented only for scheduled as single shot either dead or retry jobs as well(maybe because a range zscore is needed) . Further more i can't see any pause/unpause public method into client(but only a private method into worker_test file). regards
Hi,
when trying to accessing the webui in Safari on my mac, it doesn't load at all. In the console, we get the following message:
[Error] ReferenceError: Can't find variable: fetch
value (work.js:5:18353)
performInitialMount (work.js:4:2382)
mountComponent (work.js:4:1554)
mountComponent (work.js:2:15978)
performInitialMount (work.js:4:2660)
mountComponent (work.js:4:1554)
mountComponent (work.js:2:15978)
mountChildren (work.js:3:28158)
_createInitialChildren (work.js:3:6598)
mountComponent (work.js:3:4759)
mountComponent (work.js:2:15978)
mountChildren (work.js:3:28158)
_createInitialChildren (work.js:3:6598)
mountComponent (work.js:3:4759)
mountComponent (work.js:2:15978)
mountChildren (work.js:3:28158)
_createInitialChildren (work.js:3:6598)
mountComponent (work.js:3:4759)
mountComponent (work.js:2:15978)
performInitialMount (work.js:4:2660)
mountComponent (work.js:4:1554)
mountComponent (work.js:2:15978)
performInitialMount (work.js:4:2660)
mountComponent (work.js:4:1554)
mountComponent (work.js:2:15978)
performInitialMount (work.js:4:2660)
mountComponent (work.js:4:1554)
mountComponent (work.js:2:15978)
performInitialMount (work.js:4:2660)
mountComponent (work.js:4:1554)
mountComponent (work.js:2:15978)
a (work.js:5:10792)
perform (work.js:2:18190)
i (work.js:5:11004)
perform (work.js:2:18190)
batchedUpdates (work.js:4:15372)
l (work.js:2:12826)
_renderNewRootComponent (work.js:5:12205)
_renderSubtreeIntoContainer (work.js:5:13208)
(anonymous function) (work.js:1:2656)
t (work.js:1:112)
(anonymous function) (work.js:1:196)
Global Code (work.js:1:200)
In Chrome, it seems to work just fine...
Let's say I have enqueued a task to run after 30mins using EnqueueIn
. Is there a way to delete it?
The reaper, which starts during the startup of WorkerPools, will never reap dead jobs if we encounter a dirty shutdown (SIGKILL or other early exit where we don't wait for all workers to finish).
The heartbeat key expires after 60 seconds, which isn't long enough to leave it around for the 2nd reap cycle (10 min) to discover a dead job (job is considered dead if it hasn't received a heartbeat in 5 min).
I just spent a while debugging retry job timing and was mislead by the WebUI labelling the timestamp it shows "Retry At" when in fact it is showing the job's t
or time it was enqueued.
I expected this https://github.com/gocraft/work/blob/master/webui/internal/assets/src/RetryJobs.js#L64
to be job.retry_at
rather than job.t
.
There is an argument to say showing both times would be useful.
All JSON numbers are unmarshalled into float64 by default. Hence large int64 numbers with more than 16 digits can not be passed as an argument. The boundary check is done here https://github.com/gocraft/work/blob/master/job.go#L106
By using json.Number these arguments could be preserved:
dec := json.NewDecoder(bytes.NewReader(rawJSON))
dec.UseNumber()
dec.Decode(&job)
Converting json.Number to primitive types in helper methods can be done simply with:
strconv.ParseInt(string(v), 10, 64)
strconv.ParseFloat(string(v), 64)
This could be added as a UseNumber
job option for backwards compatibility.
Would it be possible to switch out redis for memsql?
For those that don't know, memsql is an in-memory row storage, with an on-disk columnar storage. Which includes the capacity for high availability.
Since I am going to be using memsql myself, I'd rather not introduce another similar db component and simplify my stack.
Is it a case of switching out one in-memory db for another or does it need to be a pubsub aware db?
Thanks
This is going to be vague, as I'm currently trying to debug what's going on, but the MaxConcurrency
option seems to be broken.
According to my current understanding:
For example, the :lock
key for a job, under load, should be the same as the value of MaxConcurrency
, but mine stays at 0.
The :lock_info
key should have individual lock counts, per worker, that add up to the total of :lock
for that job. However, the contents of that hash vary wildly between -4
and 4
. I am testing with a MaxConcurrency
of 1.
It seems there is a race condition in the increment/decrement logic. I have a worker pool size of 4, so the -4 and 4 variance could come from there. Perhaps the system is attempting to pull 4 jobs at once, and all 4 succeed in incrementing/decrementing?
The last commit is merged months ago and several bug fix PR are pending, even it is approved. I'm wondering is this repo is still active developed?
like -redisurl = "redis://:${pwd}@${ip}:${port}/1"
Hi,
When are going be available Schedule jobs?????
Do you have any ideia???
Thanks,
Giorgio
Just wondering know would you like to have another backend? I was working on an internal service to have a queue system. gocraft/work is a great choice. But I want to get rid of the dependencies of Redis. So I combine tidwall/summitdb and gocraft/work. It works well.
If you like the idea, I'd like to fire an MR.
For some reason, I'm unable to stop a worker pool when there are jobs in the queue. I expected calling pool.Stop()
to finish up what jobs are being run, and workers to exit while redis holds whatever else is in the queue... but that doesn't seem the case. Below is a small test case that illustrates the problem, mostly adapted from the README:
package main
import (
"fmt"
"github.com/garyburd/redigo/redis"
"github.com/gocraft/work"
"time"
)
var redisPool = &redis.Pool{
MaxActive: 5,
MaxIdle: 5,
Wait: true,
Dial: func() (redis.Conn, error) {
return redis.Dial("tcp", ":6379")
},
}
type Context struct{}
var enqueuer = work.NewEnqueuer("app", redisPool)
func (c *Context) SampleJob(job *work.Job) error {
fmt.Printf("Sample job %v running\n", job.ID)
time.Sleep(10 * time.Second)
fmt.Printf("Sample job %v done\n", job.ID)
return nil
}
func main() {
pool := work.NewWorkerPool(Context{}, 2, "app", redisPool)
pool.Job("sample_job", (*Context).SampleJob)
for i := 1; i <= 30; i++ {
enqueuer.Enqueue("sample_job", work.Q{})
}
pool.Start()
time.Sleep(30 * time.Second)
fmt.Println("30 seconds is up! Stopping workers...")
pool.Stop()
}
Rather than stopping after 30 seconds (plus up to 2 * 10 seconds
for the running jobs to continue) the program continues and workers keep consuming jobs from the queue until it's empty:
go run main.go
Sample job 4b6859807fe26e8c344cc302 running
Sample job 6f6f30cd645cd328552983a8 running
Sample job 4b6859807fe26e8c344cc302 done
Sample job 6f6f30cd645cd328552983a8 done
Sample job d17378fcb8ab7e2b42b71ccf running
Sample job 5ad6c131d7faab927db749a2 running
Sample job d17378fcb8ab7e2b42b71ccf done
Sample job 5ad6c131d7faab927db749a2 done
Sample job e7f7e2aaeecf5e16d13ad5bc running
Sample job 5a09255570fabb8813eee575 running
30 seconds is up! Stopping workers...
Sample job 5a09255570fabb8813eee575 done
Sample job e7f7e2aaeecf5e16d13ad5bc done
Sample job f25bcdacb746c94348430143 running
Sample job e399bd4c8d3da1de109310c4 running
Sample job f25bcdacb746c94348430143 done
Sample job e399bd4c8d3da1de109310c4 done
Sample job 27a8ff38c160c64f9a1a52d4 running
Sample job fa0e939272c71d2d927bd032 running
Sample job fa0e939272c71d2d927bd032 done
Sample job 27a8ff38c160c64f9a1a52d4 done
Sample job d189cd37e00f1482aa898550 running
Sample job 40761b0c9f8ecaab192c9b79 running
Sample job d189cd37e00f1482aa898550 done
[...snipped! but this keeps going on for awhile...]
Am I doing something wrong in the setup or is this the desired behavior?
28 days since bug reports and two PRs have been opened with no response from the team. I would like to know if the team is alive and willing to support the project or should I just go ahead and fork it completely?
Hi,
I'm starting jobs with:
_, err := enqueuer.Enqueue("handle_tika", work.Q{"pdf": bundle[0], "xml": bundle[1]})
if err != nil {
log.Fatal(err)
}
this is called multiple times. I'd like to enque the jobs but let each of them wait for the running job. How can I achieve that?
Hi,
I'm following allong the example process jobs in the README and trying to get it to work.
in main.go, I'm calling StartWorkers()
which is equivalent to the main func of the example. Later, I'm enqueuing a job after an incoming HTTP request.
Now it seems like StartWorkers()
seems to block the HTTP request as no connections are accepted after calling StartWorkers()
.
I'm sure this is a beginner question and that there is a trivial mistake, but I can't figure it out.
Have you considered doing a benchmark with machinery similar to what you've done for go-workers, goworker and jobs?
PeriodicallyEnqueue("@every 1200s", key)
I found that periodic jobs don't record last execution time, therefore the following piece of code that wouldn't be run.
for t := pj.schedule.Next(nowTime); t.Before(horizon); t = pj.schedule.Next(t) {
epoch := t.Unix()
id := makeUniquePeriodicID(pj.jobName, pj.spec, epoch)
job := &Job{
Name: pj.jobName,
ID: id,
// This is technically wrong, but this lets the bytes be identical for the same periodic job instance. If we don't do this, we'd need to use a different approach -- probably giving each periodic job its own history of the past 100 periodic jobs, and only scheduling a job if it's not in the history.
EnqueuedAt: epoch,
Args: nil,
}
rawJSON, err := job.serialize()
if err != nil {
return err
}
_, err = conn.Do("ZADD", redisKeyScheduled(pe.namespace), epoch, rawJSON)
if err != nil {
return err
}
}`
Im looking at using this for long lived web requests where the initial response to the client is a 202, and the client will check back to see status. Is there a way to be able to look up a job's status given only the job ID?
Hi,
I would like to pass my db connector via context to all request but it looks like the context is cleared each time. I have found a similar PR for "web" gocraft/web#6
There a workaround or am I missing something?
Regards,
Riaan
Everything is missing versions except istanbul
, which I can only guess has a version because the latest
version did not work.
https://github.com/gocraft/work/blob/master/webui/internal/assets/package.json#L11,L31
Pass job id as a argument to control the async job. We may delete the job by id. For that we need to keep the id in other DB. So passing id as a arg will be useful
With nodejs, I had an async.queue with a drain function. Which allowed me to push lots of items in the queue and upon completion (or complete drainage of the queue), the drain function would execute.
var q = async.queue(function (task, callback) {
...
}
q.drain = function() {
...
}
for (let item of items) {
var task = {
...
}
q.push(task);
}
I would like to be able to group jobs when they are queued and then have another process that watches that group empty out and upon empty, then executes a job.
One use-case that I have, is that I am screen-shoting a page, but with many various useragents and resolutions. I'm queuing about 40 unique jobs and within my UI, I'm showing a "loading" notification. Would be nice to fire-off a "completed" job, which could contain a websocket function and at the UI hide the loading alert.
Would someone mind confirming whether this will process sidekiq compatible jobs - ie. can I use a sidekiq client library to push jobs to redis, then use this library to process?
If not, a secondary question: If I wanted to push jobs using PHP - I presume I would just need to port enqueue.go. From this line of code, the main use case looks pretty straight forward - I can use phpredis to LPUSH json, per this line of go:
conn.Do("LPUSH", e.queuePrefix+jobName, rawJSON)
Does that look right? Is it really that straightforward?
Currently periodic jobs cannot have parameters. It would be very useful to be able to do something like enqueuer.PeriodicallyEnqueue(name, params)
Also, it's a bit inconsistent that periodical enqueues are a part of the WorkerPool, but non-periodical functions are in Enqueuer.
context.Context
is the de facto idiomatic way to inject dependencies.
Jobs and Middlewares should receive a context as a first parameter and the last one should forward the context through the next
function.
I'm running into runtime error: invalid memory address or nil pointer dereference
and I think it's because Enqueue doesn't check to see if the connection it receives from the Redis pool is of type errorConnection
, for example here:
https://github.com/gocraft/work/blob/master/enqueue.go#L46
I can submit a PR to add this check.
When stopping the worker, I'd like to be able to tell the jobs that they should stop. And re-queue them on other workers.
In my use-case I've implemented the method to signal that the job needs to stop, however, there's no way to stop the job without either marking it as completed or as failed.
If I re-queue the job manually, I lose the MaxFails options, if I fail the job by returning an error, work
still cannot honor the MaxFails value.
How would one go about doing this? I suggest if a job returns a work.ErrSkip error , MaxFails should not be increased.
It would be good to see the actual target time; not the time when the job was enqueued.
To replicate: use pool.EnqueueIn("job_name", 60*10, work.Q{}) and observe the webui and scheduled jobs.
Alternatively, change the "Scheduled For" title in webui to "Queued At" or something.
Hi, currently the log message is not very helpful in tracing the error if the job results in a panic. Is it possible to log with the stack trace? This will be useful in general for any logging of errors.
ERROR: runJob.panic - some error
# https://github.com/gocraft/work/blob/master/log.go#L8
func logError(key string, err error) {
fmt.Printf("ERROR: %s - %+v\n", key, err)
}
I want to start two processes to read one redis instance to ensure there is at least one process can handle queue. I wonder if this could cause one job to be run two times in different processes?
Thanks!
I have a periodic job that runs every second. If I stop the worker for 5 minutes (usually during development) and then start it again, the job will run 300 times in a row before re-entering it's cycle of 1 per second.
beego.Debug("Starting worker")
pool := work.NewWorkerPool(Context{}, 1, "typely", redisPool)
// Add middleware that will be executed for each job
pool.Middleware((*Context).Log)
pool.PeriodicallyEnqueue("*/1 * * * * *", "create_invoices")
pool.Job("create_invoices", (*Context).CreateInvoices)
// Start processing jobs
pool.Start()
// Wait for a signal to quit:
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, os.Kill)
<-signalChan
// Stop the pool
pool.Stop()
}
func (c *Context) Log(job *work.Job, next work.NextMiddlewareFunc) error {
beego.Debug("Starting job: ", job.Name)
return next()
}
func (c *Context) CreateInvoices(job *work.Job) error {
return CreateInvoices()
}```
During downtime, if I change the cron to run every hour, when I restart it, it still makes up for the lost seconds running 300 times and only then it enters the new cycle of 1/h.
Is this desired behavior?
I'm having 500 on https://worker.palcomp3.com/retry_jobs?page=1
{"error": "dial tcp [::1]:6379: getsockopt: connection refused"}
Start command:
workwebui -redis="redis_hostname:6379" -ns="work" -database="11" -listen=":5040"
I'm being able to connect to redis_hostname:6379
through telnet
, so the problem probably isn't related to the redis setup.
Currently the worker does (numfails^4 + random constant). The 4th power grows extremely quickly for some use cases, and I'd like to have basically my own function to handle the backoff period.
A job for JobOptions perhaps?
Is this project still maintained?, quite a few issues and PR's unanswered.
https://github.com/go-redis/redis
This is a personal request as I prefer GoRedis over redigo. For the time being I have the two connection clients.
Are there any function like this? Example:
//-----------------
//the caller:
job, err := enqueuer.Enqueue("add", work.Q{"a": 1, "b": 2})
...
result := job.GetResult()
//---------------------------
//and the worker:
pool.Job("add", (*Context).Add)
...
func (c *Context) Add(job *work.Job) error {
a := job.ArgInt("a")
b := job.ArgInt("b")
resultAplusB := a + b
// how to return this value of resultAplusB
return nil
}
Thanks.
Middleware in enqueue to send customer_id as default to all jobs
The 'WorkerObservations' will get the inprogress status of the running job, once the job is done, the status info is deleted. Then my question is, how can I get to know my job is successfully done?
key := redisKeyWorkerObservation(o.namespace, o.workerID)
if obv == nil {
if _, err := conn.Do("DEL", key); err != nil {
return err
}
}
After enqueuing a job to run after waiting 5 seconds, I get this error and my job doesn't run:
ERROR: requeuer.process.dead - no job name
What is the best way to troubleshoot this?
If gocraft/work initializes the context struct for me, how do I "pass" a variable, say db
, to a worker?
func main() {
// ...
db, _ := sql.Open(...)
pool := configureBackgroundProcessing(db, redisPool)
go pool.Start()
// monitor for sigterm
}
func configureBackgroundProcessing(db *sql.DB, redisPool *redis.Pool) *work.WorkerPool {
concurrency := 10
namespace := "gocraft_work"
pool := work.NewWorkerPool(jobs.Context{}, concurrency, namespace, redisPool)
// a middleware..?
pool.Job("get_followers", (*jobs.Context).TwitterGetFollowers)
return pool
}
Radically different from other lib, like jrallison/go-workers, with which my configureBackgroundProcessing
func looks something like:
followersWorker := &FollowersWorker{db: db}
workers.Process("get_followers", followersWorker.Perform, 10)
We use redis sentinel, Have you thought about adding sentinel support ? or it already supported? some redigo fork seems to support sentinel but may not proper.
Thank you.
It seems that there is no API to achieve this
Hey,
any plans on adding workflow features like job groups or chains? If not what would be the best way to implement them with the current work version?
Thanks
Current release v0.5.0
doesn't include MaxConcurrency
with JobOptions
. It would be helpful if there is a release which includes this so package manages like glide
can make use of the release to lock down to a version
Hi, this is more of a question actually. Does periodic job have a retry option? What happens if there are no retries or the retry maxes out? Does it continue to run in the next scheduled interval?
Thanks in advance.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.