Disc
Disc fills the gap between your Ruby service objects and antirez's wonderful Disque backend.
<a href=https://www.flickr.com/photos/noodlefish/5321412234" target="blank_">
Usage
- Install the gem
$ gem install disc
- Write your jobs
require 'disc'
class CreateGameGrid
include Disc::Job
disc queue: 'urgent'
def perform(type)
# perform rather lengthy operations here.
end
end
- Enqueue them to perform them asynchronously
CreateGameGrid.enqueue('light_cycle')
- Or enqueue them to be performed at some time in the future, or on a queue other than it's default.
CreateGameGrid.enqueue(
'disc_arena',
at: DateTime.new(2015, 12, 31),
queue: 'not_so_important'
)
- Create a file that requires anything needed for your jobs to run
# disc_init.rb
require 'ohm'
Dir['./jobs/**/*.rb'].each { |job| require job }
- Run as many Disc Worker processes as you wish, requiring your
disc_init.rb
file
$ QUEUES=urgent,default disc -r ./disc_init.rb
Notes about Jobs
Jobs are fairly straightforward Ruby classes, internally Disc serializes them to MessagePack so they can be stored in Disque, this has a few implications:
- Don't enqueue complex objects! Instead of
user
, enqueueuser.id
! - If your job takes multiple arguments, you'll want to pass all those arguments in the first parameter of
#enqueue
as an array.
Example:
class ComplexJob
include Disc::Job
disc queue: 'urgent'
def perform(first_parameter, second_parameter)
# do things...
end
end
ComplexJob.enqueue(['first argument', { second: 'argument' }])
Settings
Disc takes its configuration from environment variables.
ENV Variable | Default Value | Description |
---|---|---|
QUEUES |
'default' | The list of queues that Disc::Worker will listen to, it can be a single queue name or a list of comma-separated queues |
DISC_CONCURRENCY |
'25' | Amount of threads to spawn when Celluloid is available. |
DISQUE_NODES |
'localhost:7711' | This is the list of Disque servers to connect to, it can be a single node or a list of comma-separated nodes |
DISQUE_AUTH |
'' | Authorization credentials for Disque. |
DISQUE_TIMEOUT |
'100' | Time in milliseconds that the client will wait for the Disque server to acknowledge and replicate a job |
DISQUE_CYCLE |
'1000' | The client keeps track of which nodes are providing more jobs, after the amount of operations specified in cycle it tries to connect to the preferred node. |
Error handling
When a job raises an exception, Disc.on_error
is invoked with the error and
the job data. By default, this method prints the error to standard error, but
you can override it to report the error to your favorite error aggregator.
# On disc_init.rb
def Disc.on_error(exception, job)
# ... report the error
end
Dir["./jobs/**/*.rb"].each { |job| require job }
Job Definition
The error handler function gets the data of the current job as a Hash, that has the following schema.
'class' |
(String) The Job class. |
'arguments' |
(Array) The arguments passed to perform. |
'queue' |
(String) The queue from which this job was picked up. |
'id' |
(String) Disque's job ID. |
[Optional] Celluloid integration
Disc workers run just fine on their own, but if you happen to be using Celluloid you might want Disc to take advantage of it and spawn multiple worker threads per process, doing this is trivial! Just require Celluloid before your init file:
$ QUEUES=urgent,default disc -r celluloid/current -r ./disc_init.rb
Whenever Disc detects that Celluloid is available it will use it to spawn a
number of threads equal to the DISC_CONCURRENCY
environment variable, or 25 by
default.
[Optional] Rails and ActiveJob integration
You can use Disc easily in Rails without any more hassle, but if you'd like to use it via ActiveJob you can use the adapter included in this gem.
# Gemfile
gem 'disc'
# config/application.rb
module YourApp
class Application < Rails::Application
require 'active_job/queue_adapters/disc_adapter'
config.active_job.queue_adapter = :disc
end
end
# app/jobs/clu_job.rb
class CluJob < ActiveJob::Base
queue_as :urgent
def perform(*args)
# Try to take over The Grid here...
end
end
# disc_init.rb
require ::File.expand_path('../config/environment', __FILE__)
# Wherever you want
CluJob.perform_later(a_bunch_of_arguments)
Disc is run in the exact same way, for this example it'd be:
$ QUEUES=urgent disc -r ./disc_init.rb
A note on stability
The version of Disque at the time of this writing is 0.0.1
. It is a wonderful project, I know of people running it in production and I expect it will only get better and better with time, but please do not expect Disc to be more production ready than Disque is.
Similar Projects
If you want to use Disque but Disc isn't cutting it for you then you should take a look at Havanna, a project by my friend @djanowski.
License
The code is released under an MIT license. See the LICENSE file for more information.
Acknowledgements
- To @foca for helping me ship a quality thing and putting up with my constant whining.
- To @antirez for Redis, Disque, and his refreshing way of programming wonderful tools.
- To @soveran for pushing me to work on this and publishing gems that keep me enjoying ruby.
- To all contributors
Sponsorship
This open source tool is proudly sponsored by 13Floor