Giter Club home page Giter Club logo

go-workers2's Introduction

Build Status GoDoc

Sidekiq compatible background workers in golang.

  • reliable queueing for all queues using brpoplpush
  • handles retries
  • support custom middleware
  • customize concurrency per queue
  • responds to Unix signals to safely wait for jobs to finish before exiting.
  • provides stats on what jobs are currently running
  • redis sentinel support
  • well tested

Example usage:

package main

import (
  "fmt"

  workers "github.com/digitalocean/go-workers2"
)

func myJob(message *workers.Msg) error {
  // do something with your message
  // message.Jid()
  // message.Args() is a wrapper around go-simplejson (http://godoc.org/github.com/bitly/go-simplejson)
  return nil
}

func myMiddleware(queue string, mgr *workers.Manager, next workers.JobFunc) workers.JobFunc {
  return func(message *workers.Msg) (err error) {
    // do something before each message is processed
    err = next(message)
    // do something after each message is processed
    return
  }
}

func main() {
  // Create a manager, which manages workers
  manager, err := workers.NewManager(workers.Options{
    // location of redis instance
    ServerAddr: "localhost:6379",
    // instance of the database
    Database:   0,
    // number of connections to keep open with redis
    PoolSize:   30,
    // unique process id for this instance of workers (for proper recovery of inprogress jobs on crash)
    ProcessID:  "1",
  })

  if err != nil {
    fmt.Println(err)
  }

  // create a middleware chain with the default middlewares, and append myMiddleware
  mids := workers.DefaultMiddlewares().Append(myMiddleware)

  // pull messages from "myqueue" with concurrency of 10
  // this worker will not run myMiddleware, but will run the default middlewares
  manager.AddWorker("myqueue", 10, myJob)

  // pull messages from "myqueue2" with concurrency of 20
  // this worker will run the default middlewares and myMiddleware
  manager.AddWorker("myqueue2", 20, myJob, mids...)

  // pull messages from "myqueue3" with concurrency of 20
  // this worker will only run myMiddleware
  manager.AddWorker("myqueue3", 20, myJob, myMiddleware)

  // If you already have a manager and want to enqueue
  // to the same place:
  producer := manager.Producer()

  // Alternatively, if you want to create a producer to enqueue messages
  // producer, err := workers.NewProducer(Options{
  //   // location of redis instance
  //   ServerAddr: "localhost:6379",
  //   // instance of the database
  //   Database:   0,
  //   // number of connections to keep open with redis
  //   PoolSize:   30,
  //   // unique process id for this instance of workers (for proper recovery of inprogress jobs on crash)
  //   ProcessID:  "1",
  // })

  // Add a job to a queue
  producer.Enqueue("myqueue3", "Add", []int{1, 2})

  // Add a job to a queue with retry
  producer.EnqueueWithOptions("myqueue3", "Add", []int{1, 2}, workers.EnqueueOptions{Retry: true})

  // Add a job to a queue passing the context to redis
  producer.EnqueueWithContext(ctx.Background(), "myqueue3", "Add", []int{1, 2}, workers.EnqueueOptions{Retry: true})

  // stats will be available at http://localhost:8080/stats
  go workers.StartAPIServer(8080)

  // Blocks until process is told to exit via unix signal
  manager.Run()
}

When running the above code example, it will produce the following output at localhost:8080/stats:

[
  {
    "manager_name": "",
    "processed": 5,
    "failed": 57,
    "jobs": {
      "myqueue": null,
      "myqueue2": null,
      "myqueue3": null
    },
    "enqueued": {
      "myqueue": 0,
      "myqueue2": 0,
      "myqueue3": 0
    },
    "retry_count": 4
  }
]

Development sponsored by DigitalOcean. Code forked from github/jrallison/go-workers. Initial development sponsored by Customer.io.

go-workers2's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

go-workers2's Issues

Ability to get Redis connection

In the original go-workers it's possible to access the underlying Redis pool object with simply workers.Config.Pool which is an instance of *redis.Pool

In this project it's hidden behind hidden instance variables.

We use it to be able to run fetches for statistics, and other minor things that doesn't (imo) motivate setting up a separate connection pool.

So an idea would be a public method to GetRedisPool or perhaps just expose it as a public field?

Unique jobs queue option

Hey I was just tracing around and found this hard fork. Originally I hacked in these two features on top of your work in the soft fork of the go-workers lib, since I like handling errors rather than recovering from panics (which seems to be a common ruby/rails idiom).

I am planning on putting appropriate testing around these two features, but was wondering about getting some informal review of my implementation details.

If you are open to this, and honestly I am not really sure if it's okay because the original developer charges money for these features in his ruby gem, am I infringing on some kinda fuzzy line here?

Moving on, are you willing to give it a look over, and if so should I just open a tentative MR into your soft fork, or should I rebase my changelog on this repo and open an MR here for review?

Setup basic CLI

Create the foundation for a CLI to connect to stats.go data.

Logging `err: nil` to stdout

Something is causing this output in every second when the queue is empty:

workers: 2018/12/13 13:54:44.163985 ERR:  queue:pusher redis: nil
workers: 2018/12/13 13:54:44.163989 ERR:  queue:hub_store redis: nil
workers: 2018/12/13 13:54:44.163993 ERR:  queue:post_to_elastic redis: nil
workers: 2018/12/13 13:54:46.023171 ERR:  queue:posts redis: nil
workers: 2018/12/13 13:54:46.023171 ERR:  queue:post_contents redis: nil
workers: 2018/12/13 13:54:46.023180 ERR:  queue:websub_hub redis: nil
workers: 2018/12/13 13:54:47.028561 ERR:  queue:hub_store redis: nil
workers: 2018/12/13 13:54:47.028575 ERR:  queue:post_to_elastic redis: nil
workers: 2018/12/13 13:54:47.028584 ERR:  queue:pusher redis: nil
workers: 2018/12/13 13:54:48.032880 ERR:  queue:websub_hub redis: nil
workers: 2018/12/13 13:54:48.032895 ERR:  queue:post_contents redis: nil
workers: 2018/12/13 13:54:48.032904 ERR:  queue:posts redis: nil
workers: 2018/12/13 13:54:49.035437 ERR:  queue:post_to_elastic redis: nil
workers: 2018/12/13 13:54:50.038053 ERR:  queue:pusher redis: nil
workers: 2018/12/13 13:54:50.038075 ERR:  queue:hub_store redis: nil
workers: 2018/12/13 13:54:50.038076 ERR:  queue:websub_hub redis: nil
workers: 2018/12/13 13:54:51.039688 ERR:  queue:posts redis: nil
workers: 2018/12/13 13:54:51.039702 ERR:  queue:post_contents redis: nil
workers: 2018/12/13 13:54:51.039707 ERR:  queue:post_to_elastic redis: nil

How can I remove it?

Not enough arguments in call to r.client.cmdable.BRPopLPush"

Any ideas?
Receiving this error on storage/redis.go line 33

message, err := r.client.BRPopLPush(r.getQueueName(queue), r.getQueueName(inprogressQueue), timeout).Result()

Not enough arguments in call to r.client.cmdable.BRPopLPush" Looks like it requires a context now.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.