Giter Club home page Giter Club logo

work's Introduction

gocraft/work GoDoc

gocraft/work lets you enqueue and processes background jobs in Go. Jobs are durable and backed by Redis. Very similar to Sidekiq for Go.

  • Fast and efficient. Faster than this, this, and this. See below for benchmarks.
  • Reliable - don't lose jobs even if your process crashes.
  • Middleware on jobs -- good for metrics instrumentation, logging, etc.
  • If a job fails, it will be retried a specified number of times.
  • Schedule jobs to happen in the future.
  • Enqueue unique jobs so that only one job with a given name/arguments exists in the queue at once.
  • Web UI to manage failed jobs and observe the system.
  • Periodically enqueue jobs on a cron-like schedule.
  • Pause / unpause jobs and control concurrency within and across processes

Enqueue new jobs

To enqueue jobs, you need to make an Enqueuer with a redis namespace and a redigo pool. Each enqueued job has a name and can take optional arguments. Arguments are k/v pairs (serialized as JSON internally).

package main

import (
	"github.com/gomodule/redigo/redis"
	"github.com/gocraft/work"
)

// Make a redis pool
var redisPool = &redis.Pool{
	MaxActive: 5,
	MaxIdle: 5,
	Wait: true,
	Dial: func() (redis.Conn, error) {
		return redis.Dial("tcp", ":6379")
	},
}

// Make an enqueuer with a particular namespace
var enqueuer = work.NewEnqueuer("my_app_namespace", redisPool)

func main() {
	// Enqueue a job named "send_email" with the specified parameters.
	_, err := enqueuer.Enqueue("send_email", work.Q{"address": "[email protected]", "subject": "hello world", "customer_id": 4})
	if err != nil {
		log.Fatal(err)
	}
}

Process jobs

In order to process jobs, you'll need to make a WorkerPool. Add middleware and jobs to the pool, and start the pool.

package main

import (
	"github.com/gomodule/redigo/redis"
	"github.com/gocraft/work"
	"os"
	"os/signal"
)

// Make a redis pool
var redisPool = &redis.Pool{
	MaxActive: 5,
	MaxIdle: 5,
	Wait: true,
	Dial: func() (redis.Conn, error) {
		return redis.Dial("tcp", ":6379")
	},
}

type Context struct{
    customerID int64
}

func main() {
	// Make a new pool. Arguments:
	// Context{} is a struct that will be the context for the request.
	// 10 is the max concurrency
	// "my_app_namespace" is the Redis namespace
	// redisPool is a Redis pool
	pool := work.NewWorkerPool(Context{}, 10, "my_app_namespace", redisPool)

	// Add middleware that will be executed for each job
	pool.Middleware((*Context).Log)
	pool.Middleware((*Context).FindCustomer)

	// Map the name of jobs to handler functions
	pool.Job("send_email", (*Context).SendEmail)

	// Customize options:
	pool.JobWithOptions("export", work.JobOptions{Priority: 10, MaxFails: 1}, (*Context).Export)

	// Start processing jobs
	pool.Start()

	// Wait for a signal to quit:
	signalChan := make(chan os.Signal, 1)
	signal.Notify(signalChan, os.Interrupt, os.Kill)
	<-signalChan

	// Stop the pool
	pool.Stop()
}

func (c *Context) Log(job *work.Job, next work.NextMiddlewareFunc) error {
	fmt.Println("Starting job: ", job.Name)
	return next()
}

func (c *Context) FindCustomer(job *work.Job, next work.NextMiddlewareFunc) error {
	// If there's a customer_id param, set it in the context for future middleware and handlers to use.
	if _, ok := job.Args["customer_id"]; ok {
		c.customerID = job.ArgInt64("customer_id")
		if err := job.ArgError(); err != nil {
			return err
		}
	}

	return next()
}

func (c *Context) SendEmail(job *work.Job) error {
	// Extract arguments:
	addr := job.ArgString("address")
	subject := job.ArgString("subject")
	if err := job.ArgError(); err != nil {
		return err
	}

	// Go ahead and send the email...
	// sendEmailTo(addr, subject)

	return nil
}

func (c *Context) Export(job *work.Job) error {
	return nil
}

Redis Cluster

If you're attempting to use gocraft/work on a Redis Cluster deployment, then you may encounter a CROSSSLOT Keys in request don't hash to the same slot error during the execution of the various lua scripts used to manage job data (see Issue 93). The current workaround is to force the keys for an entire namespace for a given worker pool on a single node in the cluster using Redis Hash Tags. Using the example above:

func main() {
	// Make a new pool. Arguments:
	// Context{} is a struct that will be the context for the request.
	// 10 is the max concurrency
	// "my_app_namespace" is the Redis namespace and the {} chars forces all of the keys onto a single node
	// redisPool is a Redis pool
	pool := work.NewWorkerPool(Context{}, 10, "{my_app_namespace}", redisPool)

Note this is not an issue for Redis Sentinel deployments.

Special Features

Contexts

Just like in gocraft/web, gocraft/work lets you use your own contexts. Your context can be empty or it can have various fields in it. The fields can be whatever you want - it's your type! When a new job is processed by a worker, we'll allocate an instance of this struct and pass it to your middleware and handlers. This allows you to pass information from one middleware function to the next, and onto your handlers.

Custom contexts aren't really needed for trivial example applications, but are very important for production apps. For instance, one field in your context can be your tagged logger. Your tagged logger augments your log statements with a job-id. This lets you filter your logs by that job-id.

Check-ins

Since this is a background job processing library, it's fairly common to have jobs that that take a long time to execute. Imagine you have a job that takes an hour to run. It can often be frustrating to know if it's hung, or about to finish, or if it has 30 more minutes to go.

To solve this, you can instrument your jobs to "checkin" every so often with a string message. This checkin status will show up in the web UI. For instance, your job could look like this:

func (c *Context) Export(job *work.Job) error {
	rowsToExport := getRows()
	for i, row := range rowsToExport {
		exportRow(row)
		if i % 1000 == 0 {
			job.Checkin("i=" + fmt.Sprint(i))   // Here's the magic! This tells gocraft/work our status
		}
	}
}

Then in the web UI, you'll see the status of the worker:

Name Arguments Started At Check-in At Check-in
export {"account_id": 123} 2016/07/09 04:16:51 2016/07/09 05:03:13 i=335000

Scheduled Jobs

You can schedule jobs to be executed in the future. To do so, make a new Enqueuer and call its EnqueueIn method:

enqueuer := work.NewEnqueuer("my_app_namespace", redisPool)
secondsInTheFuture := 300
_, err := enqueuer.EnqueueIn("send_welcome_email", secondsInTheFuture, work.Q{"address": "[email protected]"})

Unique Jobs

You can enqueue unique jobs so that only one job with a given name/arguments exists in the queue at once. For instance, you might have a worker that expires the cache of an object. It doesn't make sense for multiple such jobs to exist at once. Also note that unique jobs are supported for normal enqueues as well as scheduled enqueues.

enqueuer := work.NewEnqueuer("my_app_namespace", redisPool)
job, err := enqueuer.EnqueueUnique("clear_cache", work.Q{"object_id_": "123"}) // job returned
job, err = enqueuer.EnqueueUnique("clear_cache", work.Q{"object_id_": "123"}) // job == nil -- this duplicate job isn't enqueued.
job, err = enqueuer.EnqueueUniqueIn("clear_cache", 300, work.Q{"object_id_": "789"}) // job != nil (diff id)

Alternatively, you can provide your own key for making a job unique. When another job is enqueued with the same key as a job already in the queue, it will simply update the arguments.

enqueuer := work.NewEnqueuer("my_app_namespace", redisPool)
job, err := enqueuer.EnqueueUniqueByKey("clear_cache", work.Q{"object_id_": "123"}, map[string]interface{}{"my_key": "586"})
job, err = enqueuer.EnqueueUniqueInByKey("clear_cache", 300, work.Q{"object_id_": "789"}, map[string]interface{}{"my_key": "586"})

For information on how this map will be serialized to form a unique key, see (https://golang.org/pkg/encoding/json/#Marshal).

Periodic Enqueueing (Cron)

You can periodically enqueue jobs on your gocraft/work cluster using your worker pool. The scheduling specification uses a Cron syntax where the fields represent seconds, minutes, hours, day of the month, month, and week of the day, respectively. Even if you have multiple worker pools on different machines, they'll all coordinate and only enqueue your job once.

pool := work.NewWorkerPool(Context{}, 10, "my_app_namespace", redisPool)
pool.PeriodicallyEnqueue("0 0 * * * *", "calculate_caches") // This will enqueue a "calculate_caches" job every hour
pool.Job("calculate_caches", (*Context).CalculateCaches) // Still need to register a handler for this job separately

Job concurrency

You can control job concurrency using JobOptions{MaxConcurrency: <num>}. Unlike the WorkerPool concurrency, this controls the limit on the number jobs of that type that can be active at one time by within a single redis instance. This works by putting a precondition on enqueuing function, meaning a new job will not be scheduled if we are at or over a job's MaxConcurrency limit. A redis key (see redis.go::redisKeyJobsLock) is used as a counting semaphore in order to track job concurrency per job type. The default value is 0, which means "no limit on job concurrency".

Note: if you want to run jobs "single threaded" then you can set the MaxConcurrency accordingly:

      worker_pool.JobWithOptions(jobName, JobOptions{MaxConcurrency: 1}, (*Context).WorkFxn)

Run the Web UI

The web UI provides a view to view the state of your gocraft/work cluster, inspect queued jobs, and retry or delete dead jobs.

Building an installing the binary:

go get github.com/gocraft/work/cmd/workwebui
go install github.com/gocraft/work/cmd/workwebui

Then, you can run it:

workwebui -redis="redis:6379" -ns="work" -listen=":5040"

Navigate to http://localhost:5040/.

You'll see a view that looks like this:

Web UI Screenshot

Design and concepts

Enqueueing jobs

  • When jobs are enqueued, they're serialized with JSON and added to a simple Redis list with LPUSH.
  • Jobs are added to a list with the same name as the job. Each job name gets its own queue. Whereas with other job systems you have to design which jobs go on which queues, there's no need for that here.

Scheduling algorithm

  • Each job lives in a list-based queue with the same name as the job.
  • Each of these queues can have an associated priority. The priority is a number from 1 to 100000.
  • Each time a worker pulls a job, it needs to choose a queue. It chooses a queue probabilistically based on its relative priority.
  • If the sum of priorities among all queues is 1000, and one queue has priority 100, jobs will be pulled from that queue 10% of the time.
  • Obviously if a queue is empty, it won't be considered.
  • The semantics of "always process X jobs before Y jobs" can be accurately approximated by giving X a large number (like 10000) and Y a small number (like 1).

Processing a job

  • To process a job, a worker will execute a Lua script to atomically move a job its queue to an in-progress queue.
    • A job is dequeued and moved to in-progress if the job queue is not paused and the number of active jobs does not exceed concurrency limit for the job type
  • The worker will then run the job and increment the job lock. The job will either finish successfully or result in an error or panic.
    • If the process completely crashes, the reaper will eventually find it in its in-progress queue and requeue it.
  • If the job is successful, we'll simply remove the job from the in-progress queue.
  • If the job returns an error or panic, we'll see how many retries a job has left. If it doesn't have any, we'll move it to the dead queue. If it has retries left, we'll consume a retry and add the job to the retry queue.

Workers and WorkerPools

  • WorkerPools provide the public API of gocraft/work.
    • You can attach jobs and middleware to them.
    • You can start and stop them.
    • Based on their concurrency setting, they'll spin up N worker goroutines.
  • Each worker is run in a goroutine. It will get a job from redis, run it, get the next job, etc.
    • Each worker is independent. They are not dispatched work -- they get their own work.

Retry job, scheduled jobs, and the requeuer

  • In addition to the normal list-based queues that normal jobs live in, there are two other types of queues: the retry queue and the scheduled job queue.
  • Both of these are implemented as Redis z-sets. The score is the unix timestamp when the job should be run. The value is the bytes of the job.
  • The requeuer will occasionally look for jobs in these queues that should be run now. If they should be, they'll be atomically moved to the normal list-based queue and eventually processed.

Dead jobs

  • After a job has failed a specified number of times, it will be added to the dead job queue.
  • The dead job queue is just a Redis z-set. The score is the timestamp it failed and the value is the job.
  • To retry failed jobs, use the UI or the Client API.

The reaper

  • If a process crashes hard (eg, the power on the server turns off or the kernal freezes), some jobs may be in progress and we won't want to lose them. They're safe in their in-progress queue.
  • The reaper will look for worker pools without a heartbeat. It will scan their in-progress queues and requeue anything it finds.

Unique jobs

  • You can enqueue unique jobs such that a given name/arguments are on the queue at once.
  • Both normal queues and the scheduled queue are considered.
  • When a unique job is enqueued, we'll atomically set a redis key that includes the job name and arguments and enqueue the job.
  • When the job is processed, we'll delete that key to permit another job to be enqueued.

Periodic jobs

  • You can tell a worker pool to enqueue jobs periodically using a cron schedule.
  • Each worker pool will wake up every 2 minutes, and if jobs haven't been scheduled yet, it will schedule all the jobs that would be executed in the next five minutes.
  • Each periodic job that runs at a given time has a predictable byte pattern. Since jobs are scheduled on the scheduled job queue (a Redis z-set), if the same job is scheduled twice for a given time, it can only exist in the z-set once.

Paused jobs

  • You can pause jobs from being processed from a specific queue by setting a "paused" redis key (see redisKeyJobsPaused)
  • Conversely, jobs in the queue will resume being processed once the paused redis key is removed

Terminology reference

  • "worker pool" - a pool of workers
  • "worker" - an individual worker in a single goroutine. Gets a job from redis, does job, gets next job...
  • "heartbeater" or "worker pool heartbeater" - goroutine owned by worker pool that runs concurrently with workers. Writes the worker pool's config/status (aka "heartbeat") every 5 seconds.
  • "heartbeat" - the status written by the heartbeater.
  • "observer" or "worker observer" - observes a worker. Writes stats. makes "observations".
  • "worker observation" - A snapshot made by an observer of what a worker is working on.
  • "periodic enqueuer" - A process that runs with a worker pool that periodically enqueues new jobs based on cron schedules.
  • "job" - the actual bundle of data that constitutes one job
  • "job name" - each job has a name, like "create_watch"
  • "job type" - backend/private nomenclature for the handler+options for processing a job
  • "queue" - each job creates a queue with the same name as the job. only jobs named X go into the X queue.
  • "retry jobs" - if a job fails and needs to be retried, it will be put on this queue.
  • "scheduled jobs" - jobs enqueued to be run in th future will be put on a scheduled job queue.
  • "dead jobs" - if a job exceeds its MaxFails count, it will be put on the dead job queue.
  • "paused jobs" - if paused key is present for a queue, then no jobs from that queue will be processed by any workers until that queue's paused key is removed
  • "job concurrency" - the number of jobs being actively processed of a particular type across worker pool processes but within a single redis instance

Benchmarks

The benches folder contains various benchmark code. In each case, we enqueue 100k jobs across 5 queues. The jobs are almost no-op jobs: they simply increment an atomic counter. We then measure the rate of change of the counter to obtain our measurement.

Library Speed
gocraft/work 20944 jobs/s
jrallison/go-workers 19945 jobs/s
benmanns/goworker 10328.5 jobs/s
albrow/jobs 40 jobs/s

gocraft

gocraft offers a toolkit for building web apps. Currently these packages are available:

  • gocraft/web - Go Router + Middleware. Your Contexts.
  • gocraft/dbr - Additions to Go's database/sql for super fast performance and convenience.
  • gocraft/health - Instrument your web apps with logging and metrics.
  • gocraft/work - Process background jobs in Go.

These packages were developed by the engineering team at UserVoice and currently power much of its infrastructure and tech stack.

Authors

work's People

Contributors

aprimadi avatar austintaylor avatar beornf avatar caldempsey avatar chambo-e avatar cypriss avatar davars avatar igorwwwwwwwwwwwwwwwwwwww avatar jnovak-stripe avatar mitchrodrigues avatar peterhellberg avatar shdunning avatar sohymg avatar taylorchu avatar tyler-smith avatar tylerb avatar vaporz avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

work's Issues

Please create a release with latest changes

Current release v0.5.0 doesn't include MaxConcurrency with JobOptions. It would be helpful if there is a release which includes this so package manages like glide can make use of the release to lock down to a version

Periodic jobs don't run as expected

PeriodicallyEnqueue("@every 1200s", key)

I found that periodic jobs don't record last execution time, therefore the following piece of code that wouldn't be run.

for t := pj.schedule.Next(nowTime); t.Before(horizon); t = pj.schedule.Next(t) {
			epoch := t.Unix()
			id := makeUniquePeriodicID(pj.jobName, pj.spec, epoch)

			job := &Job{
				Name: pj.jobName,
				ID:   id,

				// This is technically wrong, but this lets the bytes be identical for the same periodic job instance. If we don't do this, we'd need to use a different approach -- probably giving each periodic job its own history of the past 100 periodic jobs, and only scheduling a job if it's not in the history.
				EnqueuedAt: epoch,
				Args:       nil,
			}

			rawJSON, err := job.serialize()
			if err != nil {
				return err
			}

			_, err = conn.Do("ZADD", redisKeyScheduled(pe.namespace), epoch, rawJSON)
			if err != nil {
				return err
			}
		}`

Support redis sentinel

We use redis sentinel, Have you thought about adding sentinel support ? or it already supported? some redigo fork seems to support sentinel but may not proper.

Thank you.

Log stack trace for panic

Hi, currently the log message is not very helpful in tracing the error if the job results in a panic. Is it possible to log with the stack trace? This will be useful in general for any logging of errors.

ERROR: runJob.panic - some error

# https://github.com/gocraft/work/blob/master/log.go#L8
func logError(key string, err error) {
	fmt.Printf("ERROR: %s - %+v\n", key, err)
}

Usage Example

Hi,

I'm following allong the example process jobs in the README and trying to get it to work.

in main.go, I'm calling StartWorkers() which is equivalent to the main func of the example. Later, I'm enqueuing a job after an incoming HTTP request.

Now it seems like StartWorkers() seems to block the HTTP request as no connections are accepted after calling StartWorkers().

I'm sure this is a beginner question and that there is a trivial mistake, but I can't figure it out.

Sidekiq compatibility and enquers in other languages eg. php

Would someone mind confirming whether this will process sidekiq compatible jobs - ie. can I use a sidekiq client library to push jobs to redis, then use this library to process?

If not, a secondary question: If I wanted to push jobs using PHP - I presume I would just need to port enqueue.go. From this line of code, the main use case looks pretty straight forward - I can use phpredis to LPUSH json, per this line of go:

conn.Do("LPUSH", e.queuePrefix+jobName, rawJSON)

Does that look right? Is it really that straightforward?

Allow *webui.Server to be mounted.

It would be nice to have webui.Server implement http.Handler so it can be mounted in an existing app without having to run another process.

You could also just make the web.Router public or embedded.

Pass defaults to context

Hi,

I would like to pass my db connector via context to all request but it looks like the context is cleared each time. I have found a similar PR for "web" gocraft/web#6

There a workaround or am I missing something?

Regards,
Riaan

Periodic Job

First of all keep up the good work! awesome! Btw, just out of curiosity any chance to stop/delete periodic job? unfortuntely i cant see any, am i right?It seems to be implemented only for scheduled as single shot either dead or retry jobs as well(maybe because a range zscore is needed) . Further more i can't see any pause/unpause public method into client(but only a private method into worker_test file). regards

How to know the job is successfully done?

The 'WorkerObservations' will get the inprogress status of the running job, once the job is done, the status info is deleted. Then my question is, how can I get to know my job is successfully done?

key := redisKeyWorkerObservation(o.namespace, o.workerID)

	if obv == nil {
		if _, err := conn.Do("DEL", key); err != nil {
			return err
		}
	} 

@shdunning ?

Proposal: Add context.Context support

context.Context is the de facto idiomatic way to inject dependencies.

Jobs and Middlewares should receive a context as a first parameter and the last one should forward the context through the next function.

Schedule jobs

Hi,
When are going be available Schedule jobs?????
Do you have any ideia???

Thanks,

Giorgio

Jobs can never be properly reaped on dirty shutdown

The reaper, which starts during the startup of WorkerPools, will never reap dead jobs if we encounter a dirty shutdown (SIGKILL or other early exit where we don't wait for all workers to finish).

The heartbeat key expires after 60 seconds, which isn't long enough to leave it around for the 2nd reap cycle (10 min) to discover a dead job (job is considered dead if it hasn't received a heartbeat in 5 min).

If I stop the worker and start it later the periodic job runs hundreds of times

I have a periodic job that runs every second. If I stop the worker for 5 minutes (usually during development) and then start it again, the job will run 300 times in a row before re-entering it's cycle of 1 per second.

	beego.Debug("Starting worker")
	pool := work.NewWorkerPool(Context{}, 1, "typely", redisPool)
	
	// Add middleware that will be executed for each job
	pool.Middleware((*Context).Log)
	
	pool.PeriodicallyEnqueue("*/1 * * * * *", "create_invoices")
	pool.Job("create_invoices", (*Context).CreateInvoices)
	
	// Start processing jobs
	pool.Start()
	
	// Wait for a signal to quit:
	signalChan := make(chan os.Signal, 1)
	signal.Notify(signalChan, os.Interrupt, os.Kill)
	<-signalChan
	
	// Stop the pool
	pool.Stop()
}

func (c *Context) Log(job *work.Job, next work.NextMiddlewareFunc) error {
	beego.Debug("Starting job: ", job.Name)
	return next()
}

func (c *Context) CreateInvoices(job *work.Job) error {
	return CreateInvoices()
}```

During downtime, if I change the cron to run every hour, when I restart it, it still makes up for the lost seconds running 300 times and only then it enters the new cycle of 1/h.

Is this desired behavior?

MaxConcurrency does not work

This is going to be vague, as I'm currently trying to debug what's going on, but the MaxConcurrency option seems to be broken.

According to my current understanding:

For example, the :lock key for a job, under load, should be the same as the value of MaxConcurrency, but mine stays at 0.
The :lock_info key should have individual lock counts, per worker, that add up to the total of :lock for that job. However, the contents of that hash vary wildly between -4 and 4. I am testing with a MaxConcurrency of 1.

It seems there is a race condition in the increment/decrement logic. I have a worker pool size of 4, so the -4 and 4 variance could come from there. Perhaps the system is attempting to pull 4 jobs at once, and all 4 succeed in incrementing/decrementing?

Wait for job to complete

Hi,

I'm starting jobs with:

_, err := enqueuer.Enqueue("handle_tika", work.Q{"pdf": bundle[0], "xml": bundle[1]})
if err != nil {
    log.Fatal(err)
}

this is called multiple times. I'd like to enque the jobs but let each of them wait for the running job. How can I achieve that?

SummitDB support

Just wondering know would you like to have another backend? I was working on an internal service to have a queue system. gocraft/work is a great choice. But I want to get rid of the dependencies of Redis. So I combine tidwall/summitdb and gocraft/work. It works well.

If you like the idea, I'd like to fire an MR.

Is this repo still be maintained?

The last commit is merged months ago and several bug fix PR are pending, even it is approved. I'm wondering is this repo is still active developed?

ERROR: requeuer.process.dead - no job name

After enqueuing a job to run after waiting 5 seconds, I get this error and my job doesn't run:
ERROR: requeuer.process.dead - no job name

What is the best way to troubleshoot this?

500 errors on /retry_jobs and all other fetch/xhr requests

I'm having 500 on https://worker.palcomp3.com/retry_jobs?page=1

{"error": "dial tcp [::1]:6379: getsockopt: connection refused"}

Start command:

workwebui -redis="redis_hostname:6379" -ns="work" -database="11" -listen=":5040"

I'm being able to connect to redis_hostname:6379 through telnet, so the problem probably isn't related to the redis setup.

Replacing Redis with MemSQL

Would it be possible to switch out redis for memsql?

For those that don't know, memsql is an in-memory row storage, with an on-disk columnar storage. Which includes the capacity for high availability.

Since I am going to be using memsql myself, I'd rather not introduce another similar db component and simplify my stack.

Is it a case of switching out one in-memory db for another or does it need to be a pubsub aware db?

Thanks

How to return a result to the caller in the job worker?

Are there any function like this? Example:

//-----------------
//the caller:

job, err := enqueuer.Enqueue("add", work.Q{"a": 1, "b": 2})
...
result := job.GetResult()

//---------------------------
//and the worker:

pool.Job("add", (*Context).Add)
...
func (c *Context) Add(job *work.Job) error {
a := job.ArgInt("a")
b := job.ArgInt("b")

    resultAplusB := a + b
    // how to return this value of resultAplusB

return nil

}

Thanks.

Webui doesn't work in Safari

Hi,

when trying to accessing the webui in Safari on my mac, it doesn't load at all. In the console, we get the following message:

[Error] ReferenceError: Can't find variable: fetch
	value (work.js:5:18353)
	performInitialMount (work.js:4:2382)
	mountComponent (work.js:4:1554)
	mountComponent (work.js:2:15978)
	performInitialMount (work.js:4:2660)
	mountComponent (work.js:4:1554)
	mountComponent (work.js:2:15978)
	mountChildren (work.js:3:28158)
	_createInitialChildren (work.js:3:6598)
	mountComponent (work.js:3:4759)
	mountComponent (work.js:2:15978)
	mountChildren (work.js:3:28158)
	_createInitialChildren (work.js:3:6598)
	mountComponent (work.js:3:4759)
	mountComponent (work.js:2:15978)
	mountChildren (work.js:3:28158)
	_createInitialChildren (work.js:3:6598)
	mountComponent (work.js:3:4759)
	mountComponent (work.js:2:15978)
	performInitialMount (work.js:4:2660)
	mountComponent (work.js:4:1554)
	mountComponent (work.js:2:15978)
	performInitialMount (work.js:4:2660)
	mountComponent (work.js:4:1554)
	mountComponent (work.js:2:15978)
	performInitialMount (work.js:4:2660)
	mountComponent (work.js:4:1554)
	mountComponent (work.js:2:15978)
	performInitialMount (work.js:4:2660)
	mountComponent (work.js:4:1554)
	mountComponent (work.js:2:15978)
	a (work.js:5:10792)
	perform (work.js:2:18190)
	i (work.js:5:11004)
	perform (work.js:2:18190)
	batchedUpdates (work.js:4:15372)
	l (work.js:2:12826)
	_renderNewRootComponent (work.js:5:12205)
	_renderSubtreeIntoContainer (work.js:5:13208)
	(anonymous function) (work.js:1:2656)
	t (work.js:1:112)
	(anonymous function) (work.js:1:196)
	Global Code (work.js:1:200)

In Chrome, it seems to work just fine...

Is project alive?

28 days since bug reports and two PRs have been opened with no response from the team. I would like to know if the team is alive and willing to support the project or should I just go ahead and fork it completely?

Maintained?

Is this project still maintained?, quite a few issues and PR's unanswered.

Periodic Job with Retries

Hi, this is more of a question actually. Does periodic job have a retry option? What happens if there are no retries or the retry maxes out? Does it continue to run in the next scheduled interval?

Thanks in advance.

Feature request: Allow parameters for periodic jobs

Currently periodic jobs cannot have parameters. It would be very useful to be able to do something like enqueuer.PeriodicallyEnqueue(name, params)

Also, it's a bit inconsistent that periodical enqueues are a part of the WorkerPool, but non-periodical functions are in Enqueuer.

Preserve int64 job arguments with json.Number

All JSON numbers are unmarshalled into float64 by default. Hence large int64 numbers with more than 16 digits can not be passed as an argument. The boundary check is done here https://github.com/gocraft/work/blob/master/job.go#L106

By using json.Number these arguments could be preserved:

dec := json.NewDecoder(bytes.NewReader(rawJSON))
dec.UseNumber()
dec.Decode(&job)

Converting json.Number to primitive types in helper methods can be done simply with:

strconv.ParseInt(string(v), 10, 64)
strconv.ParseFloat(string(v), 64)

This could be added as a UseNumber job option for backwards compatibility.

Drain functionality

With nodejs, I had an async.queue with a drain function. Which allowed me to push lots of items in the queue and upon completion (or complete drainage of the queue), the drain function would execute.

var q = async.queue(function (task, callback) {	
	...
}

q.drain = function() {
	...
}	

for (let item of items) {
	var task = {
		...
	}
	q.push(task);
}	

I would like to be able to group jobs when they are queued and then have another process that watches that group empty out and upon empty, then executes a job.

One use-case that I have, is that I am screen-shoting a page, but with many various useragents and resolutions. I'm queuing about 40 unique jobs and within my UI, I'm showing a "loading" notification. Would be nice to fire-off a "completed" job, which could contain a websocket function and at the UI hide the loading alert.

Graceful shutdown

When stopping the worker, I'd like to be able to tell the jobs that they should stop. And re-queue them on other workers.

In my use-case I've implemented the method to signal that the job needs to stop, however, there's no way to stop the job without either marking it as completed or as failed.

If I re-queue the job manually, I lose the MaxFails options, if I fail the job by returning an error, work still cannot honor the MaxFails value.

How would one go about doing this? I suggest if a job returns a work.ErrSkip error , MaxFails should not be increased.

How to functionally pass variables to worker? (e.g. db conn)

If gocraft/work initializes the context struct for me, how do I "pass" a variable, say db, to a worker?

func main() {
    // ...
    db, _ := sql.Open(...)
    pool := configureBackgroundProcessing(db, redisPool)
    go pool.Start()
    // monitor for sigterm
}

func configureBackgroundProcessing(db *sql.DB, redisPool *redis.Pool) *work.WorkerPool {
    concurrency := 10
    namespace := "gocraft_work"
    pool := work.NewWorkerPool(jobs.Context{}, concurrency, namespace, redisPool)

    // a middleware..?

    pool.Job("get_followers", (*jobs.Context).TwitterGetFollowers)

    return pool
}

Radically different from other lib, like jrallison/go-workers, with which my configureBackgroundProcessing func looks something like:

followersWorker := &FollowersWorker{db: db}
workers.Process("get_followers", followersWorker.Perform, 10)

Stopping worker pool keeps running jobs

For some reason, I'm unable to stop a worker pool when there are jobs in the queue. I expected calling pool.Stop() to finish up what jobs are being run, and workers to exit while redis holds whatever else is in the queue... but that doesn't seem the case. Below is a small test case that illustrates the problem, mostly adapted from the README:

package main

import (
    "fmt"
    "github.com/garyburd/redigo/redis"
    "github.com/gocraft/work"
    "time"
)

var redisPool = &redis.Pool{
    MaxActive: 5,
    MaxIdle:   5,
    Wait:      true,
    Dial: func() (redis.Conn, error) {
        return redis.Dial("tcp", ":6379")
    },
}

type Context struct{}

var enqueuer = work.NewEnqueuer("app", redisPool)

func (c *Context) SampleJob(job *work.Job) error {
    fmt.Printf("Sample job %v running\n", job.ID)

    time.Sleep(10 * time.Second)

    fmt.Printf("Sample job %v done\n", job.ID)
    return nil
}

func main() {
    pool := work.NewWorkerPool(Context{}, 2, "app", redisPool)

    pool.Job("sample_job", (*Context).SampleJob)

    for i := 1; i <= 30; i++ {
        enqueuer.Enqueue("sample_job", work.Q{})
    }

    pool.Start()

    time.Sleep(30 * time.Second)
    fmt.Println("30 seconds is up! Stopping workers...")

    pool.Stop()
}

Rather than stopping after 30 seconds (plus up to 2 * 10 seconds for the running jobs to continue) the program continues and workers keep consuming jobs from the queue until it's empty:

go run main.go
Sample job 4b6859807fe26e8c344cc302 running
Sample job 6f6f30cd645cd328552983a8 running
Sample job 4b6859807fe26e8c344cc302 done
Sample job 6f6f30cd645cd328552983a8 done
Sample job d17378fcb8ab7e2b42b71ccf running
Sample job 5ad6c131d7faab927db749a2 running
Sample job d17378fcb8ab7e2b42b71ccf done
Sample job 5ad6c131d7faab927db749a2 done
Sample job e7f7e2aaeecf5e16d13ad5bc running
Sample job 5a09255570fabb8813eee575 running
30 seconds is up! Stopping workers...
Sample job 5a09255570fabb8813eee575 done
Sample job e7f7e2aaeecf5e16d13ad5bc done
Sample job f25bcdacb746c94348430143 running
Sample job e399bd4c8d3da1de109310c4 running
Sample job f25bcdacb746c94348430143 done
Sample job e399bd4c8d3da1de109310c4 done
Sample job 27a8ff38c160c64f9a1a52d4 running
Sample job fa0e939272c71d2d927bd032 running
Sample job fa0e939272c71d2d927bd032 done
Sample job 27a8ff38c160c64f9a1a52d4 done
Sample job d189cd37e00f1482aa898550 running
Sample job 40761b0c9f8ecaab192c9b79 running
Sample job d189cd37e00f1482aa898550 done
[...snipped! but this keeps going on for awhile...]

Am I doing something wrong in the setup or is this the desired behavior?

"Extra" args which do not matter for job uniqueness

Hi @cypriss,

Thanks for the work on work. ๐Ÿ˜‹

Case in point, in my current setup, I'm loading twitter followers with the job args:

{"twitter_id": 123456, "next_cursor_str": "-1", "ids": []}

If I'm rate limited by twitter API, I schedule a new job from within the current job with the args:

{"twitter_id": 123456, "next_cursor_str": "something", "ids": [1,2,3,4,5]}

(I have to make a query for uniqueness at this time)

If I were to use work for background processing, it seems like embedding partial args like above will have to go unless:

a) There's support to include "extra" args, which do not count towards job uniqueness; or
b) I use a reference to external cache. This sounds reasonably good for my case since the ids part can get pretty large.

With (a) it would be guaranteed that there's only one followers checking job for a user with twitter_id 123456. The next_cursor_str and ids can move into the extra/cache args. What do you think? Are there breaking concerns about how work is implemented, or is this simple enough to do?

I'm willing to take a stab at this with some guidance on what and how this would be possible.

Ability to display job status by ID

Im looking at using this for long lived web requests where the initial response to the client is a 202, and the client will check back to see status. Is there a way to be able to look up a job's status given only the job ID?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.