Giter Club home page Giter Club logo

workers's Introduction

Workers

Workers is a Ruby gem for performing work in background threads. Design goals include high performance, low latency, simple API, customizability, and multi-layered architecture. It provides a number of simple to use classes that solve a wide range of concurrency problems. It is used by Tribe to implement event-driven actors.

Installation

Add this line to your application's Gemfile:

gem 'workers'

And then execute:

$ bundle

Or install it yourself as:

$ gem install workers

Parallel Map

Parallel map is the simplest way to get started with the Workers gem. It is similar to Ruby's built-in Array#map method except each element is mapped in parallel.

Workers.map([1, 2, 3, 4, 5]) { |i| i * i }

Any exceptions while mapping with cause the entire map method to fail. If your block is prone to temporary failures (exceptions), you can retry it.

Workers.map([1, 2, 3, 4, 5], :max_tries => 100) do |i|
  if rand <= 0.5
    puts "Sometimes I like to fail while computing #{i} * #{i}."
    raise 'sad face'
  end

  i * i
end

Tasks

Tasks and task groups provide more flexibility than parallel map. The main benefit is that you get to decide how you want to handle exceptions.

# Create a task group (it contains a pool of worker threads).
group = Workers::TaskGroup.new

# Add tasks to the group.
10.times do |i|
  10.times do |j|
    group.add do
      group.synchronize { puts "Computing #{i} * #{j}..." }
      i * j # Last statement executed is the result of the task.
    end
  end
end

# Execute the tasks (blocks until the tasks complete).
# Returns true if all tasks succeed.  False if any fail.
group.run

# Return an array of all the tasks.
group.tasks

# Return an array of the successful tasks.
group.successes

# Return an array of the failed tasks (raised an exception).
group.failures

# Review the results.
group.tasks.each do |t|
  t.succeeded? # True or false (false if an exception occurred).
  t.failed?    # True or false (true if an exception occurred).
  t.input      # Input value.
  t.result     # Output value (the result of i * i in this example).
  t.exception  # The exception if one exists.
  t.max_tries  # Maximum number of attempts.
  t.tries      # Actual number of attempts.
end

Note that instances of TaskGroup provide a 'synchronize' method. This method uses a mutex so you can serialize portions of your tasks that aren't thread safe.

Options

group = Workers::TaskGroup.new(
  :logger => nil,                   # Ruby logger instance.
  :pool => Workers.pool             # The workers pool used to execute timer callbacks.
)

task = Workers::Task.new(
  :logger => nil,                   # Ruby logger instance.
  :perform => proc {},              # Required option.  Block of code to run.
  :args => [],                      # Array of arguments passed to the 'perform' block.
  :finished => nil,                 # Callback to execute after attempting to run the task.
  :max_tries => 1,                  # Number of times to try completing the task (without an exception).
)

Workers

Basic

The main purpose of the Worker class is to add an event system on top of Ruby's built-in Thread class. This greatly simplifies inter-thread communication. Workers are fairly low level and don't handle exceptions for you. This way you can decide how you want to handle exceptions (or not handle them). Failing to handle exceptions will result in dead workers so in most cases you will want to do so.

# Initialize a worker pool.
pool = Workers::Pool.new

# Perform some work in the background.
100.times do
  pool.perform do
    begin
      sleep(rand(3))
      puts "Hello world from thread #{Thread.current.object_id}"
    rescue Exception => e
      puts "Oh no, my hello world failed!"
    end
  end
end

# Tell the workers to shutdown.
pool.shutdown do
  puts "Worker thread #{Thread.current.object_id} is shutting down."
end

# Wait for the workers to shutdown.
pool.join

Advanced

The Worker class is designed to be customized through inheritence and its event system:

# Create a subclass that handles custom events.
class CustomWorker < Workers::Worker
  private
  def process_event(event)
    case event.command
    when :my_custom
      begin
        puts "Worker received custom event: #{event.data}"
        sleep(1)
      rescue Exception => e
        puts "This is a very sad program."
      end
    end
  end
end

# Create a pool that uses your custom worker class.
pool = Workers::Pool.new(:worker_class => CustomWorker)

# Tell the workers to do some work using custom events.
100.times do |i|
  pool.enqueue(:my_custom, i)
end

# Tell the workers to shutdown.
pool.shutdown do
  puts "Worker thread #{Thread.current.object_id} is shutting down."
end

# Wait for the workers to shutdown.
pool.join

Note that you can use custom workers without a pool. This effectively gives you direct access to a single event-driven thread.

Options

worker = Workers::Worker.new(
  :logger => nil,                   # Ruby Logger instance.
  :input_queue => nil               # Ruby Queue used for input events.
)

Pools

Pools allow a group of workers to share a work queue. The Workers gem has a default pool (Workers.pool) with 20 workers so in most cases you won't need to create your own. Pools can be adjusted using the below methods:

# Create a pool.
pool = Workers::Pool.new

# Return the number of workers in the pool.
pool.size

# Increase the number of workers in the pool.
pool.expand(5)

# Decrease the number of workers in the pool.
pool.contract(5)

# Resize the pool size to a specific value.
pool.resize(20)

Options

pool = Workers::Pool.new(
  :size => 20,                      # Number of threads to create.
  :logger => nil,                   # Ruby Logger instance.
  :worker_class => Workers::Worker  # Class of worker to use for this pool.
)

Timers

Timers provide a way to execute code in the future. You can easily use them to tell a Worker or it's higher level classes (Task, TaskGroup, etc) to perform work in the future.

# Create a one shot timer that executes in 1.5 seconds.
timer = Workers::Timer.new(1.5) do
  puts 'Hello world'
end

# Create a periodic timer that loops infinitely or until 'cancel' is called.
timer = Workers::PeriodicTimer.new(1) do
  puts 'Hello world many times'
end

# Let the timer print some lines.
sleep(5)

# Shutdown the timer.
timer.cancel

Options

timer = Workers::Timer.new(1,
  :logger => nil,                   # Ruby logger instance.
  :repeat => false,                 # Repeat the timer until 'cancel' is called.
  :scheduler => Workers.scheduler,  # The scheduler that manages execution.
  :callback => nil                  # The proc to execute (provide this or a block, but not both).
)

timer = Workers::PeriodicTimer.new(1,
  :logger => nil,                   # Ruby logger instance.
  :scheduler => Workers.scheduler,  # The scheduler that manages execution.
  :callback => nil                  # The proc to execute (provide this or a block, but not both).
)

Schedulers

Schedulers are what trigger Timers to fire. The Workers gem has a default scheduler (Workers.scheduler) so in most cases you won't need to create your own. Schedulers execute timers using a pool of workers so make sure your timer block is thread safe. You can create additional schedulers as necessary:

# Create a workers pool with a larger than default thread count (optional).
pool = Workers::Pool.new(:size => 100)

# Create a scheduler.
scheduler = Workers::Scheduler.new(:pool => pool)

# Create a timer that uses the above scheduler.
timer = Workers::Timer.new(1, :scheduler => scheduler) do
  puts 'Hello world'
end

# Wait for the timer to fire.
sleep(5)

# Shutdown the scheduler.
scheduler.dispose

Options

scheduler = Workers::Scheduler.new(
  :logger => nil,                   # Ruby logger instance.
  :pool => Workers::Pool.new        # The workers pool used to execute timer callbacks.
)

Concurrency in MRI and JRuby

This gem has been tested with MRI Ruby 1.9.3 and JRuby 1.7. Due to MRI’s thread limitations (GIL), effectively only a single thread will run at any given point in time, resulting in non-concurrent parallelism. It is highly recommended that you use JRuby if you are concerned about multi-core CPU performance and concurrency.

Contributing

  1. Fork it
  2. Create your feature branch (git checkout -b my-new-feature)
  3. Commit your changes (git commit -am 'Add some feature')
  4. Push to the branch (git push origin my-new-feature)
  5. Create new Pull Request

workers's People

Contributors

chadrem avatar jure avatar

Watchers

Franco Barbeite avatar James Cloos avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.