Giter Club home page Giter Club logo

blitzen's Introduction

Blitzen

Blitzen is a python framework built on top of python's multiprocessing module intended to maximize processor utilization on individual computers and clusters.

Installation

pip install blitzen

Usage:

Blitzen uses a dispatcher to provide concurrent processing features. Below are the dispatchers available and their use case.

MulticoreDispatcher

Multicore dispatcher is similar to multiprocessing.Pool in that you initialize the dispatcher and can pass it tasks to complete concurrently.

The notable differences are:

  1. Workers you create with a dispatcher will remain open until you shutdown the dispatcher.
  2. You can queue multiple tasks, and the dispatcher will complete all of them using the specified number of processes.
  3. Workers are Exception resistant, meaning if the worker subprocess crashes, the dispatcher will shut down the process and initialize a new one until the specified number of workers are actively monitor incoming tasks.

Example:

import time
import random
from blitzen import MulticoreDispatcher

def f1():
  delay = random.randint(3,5)
  time.sleep(delay)
  print('Finished after', delay, 'secs.')
  return delay

def f2(delay):
  time.sleep(delay)
  raise ValueError('Throwing error')


if __name__ == '__main__':
  dispatcher = MulticoreDispatcher(workers=4)
  task_id = dispatcher.run(f2, 4)
  task_ids = [
    dispatcher.run(f1)
    for _ in range(5)
  ]

All tasks are either started or queued immediately upon running the dispatcher.run() call.

dispatcher.run() returns the dispatcher's task_id for the task you just passed it. This is used if you want to get specific task results from the dispatcher or tasks results in a specific order.

Getting results from the dispatcher:

print('Fetching Results.')
results = dispatcher.get_results(task_ids)
print('Recevied results from dispatcher:', results)

The results are always returned in the order of task_ids.

Eventually you will have to shutdown the dispatcher to close the underlying worker processes. You can do this two ways.

  1. dispatcher.shutdown() Terminates all worker subprocesses and deallocates memory resources.
  2. dispatcher.join() returns all results in task order as they were passed to the dispatcher, then calls dispatcher.shutdown().
print('Fetching Results.')
results = dispatcher.join()
print('Recevied results from dispatcher:', results)

Critical Note: Once you request a result from the dispatcher, it is removed the dispatchers memory by default. To override this feature use dispatcher.get_results(clear=False)

A full example is visible here.

DistributedDispatcher

DistributedDispatcher lets you utilize a cluster for concurrent computing. It will handle packet synchronization between clients, servers, and drivers. It will also log all activity on the clients and servers.

Define some common functions

The clients and servers need access to the same functions, so it is likely a good idea to make a common file that will be used to import functions that arent natively in python.

#common.py
import time

def time_consuming_function(delay):
  time.sleep(delay)
  return delay

Initialize your server

from blitzen.utils import get_local_ip
from blitzen.distributed import DistributedDispatcher
from common import time_consuming_function

if __name__ == '__main__':
  ip = get_local_ip()
  dispatcher = DistributedDispatcher(server_ip=ip)
  dispatcher.spawn_server(duration=30) #Run server for 30 seconds

Initialize your clients

from blitzen.utils import get_local_ip
from blitzen.distributed import DistributedDispatcher
from common import time_consuming_function

if __name__ == '__main__':
  ip = '192.168.1.2' #Server IP
  dispatcher = DistributedDispatcher(server_ip=ip)
  dispatcher.spawn_client(workers=6)

Run your driver code to be executed by the clients

from blitzen.utils import get_local_ip
from blitzen.distributed import DistributedDispatcher
from common import time_consuming_function

if __name__ == '__main__':
  ip = '192.168.1.2' #Server IP
  dispatcher = DistributedDispatcher(server_ip=ip)

  #With DistributedDispatcher you can specify an amount
  #of time each client has to finish their task
  #with the `timeout` keyword
  task_ids = [
    dispatcher.run(time_consuming_function, i+5, timeout=7) 
    for i in range(5)
  ]

  print('Requesting results.')
  results = dispatcher.join()
  print(results)

Logging

DistributedDispatchers provide some logging, using the python logging module, so you can monitor your cluster. By default the logger is configured to report all logging info of level logging.INFO. To some this can be excessive, so blitzen provides some functions to control the logger.

You can get the logger using logger = blitzen.logging.get_logger()
This logger is used by all blitzen submodules.

The other blitzen.logging fuctions are:

  • set_logfile(filename) lets you set a file to log out to.
  • set_loglevel(level) lets you set the logging level using the same convention as python's logging module.
  • disable() disabled the logger.
  • enable() enables the logger.

A full example visible here.

blitzen's People

Contributors

blakeerichey avatar

Stargazers

Mark Hegreberg avatar

Watchers

James Cloos avatar  avatar

blitzen's Issues

get_results(verbose=True) to show real time status of dispatcher delivering results

Can implement using tqdm and the following code snippet inside dispatcher.get_results

if task_ids is None:
      task_ids = list(self.tasks.keys())
    
    completed = deque()
    task_ids = deque(task_ids)
    with tqdm.trange(len(task_ids)) as pbar:
      pbar.set_description('Fecthing Tasks Results')
      while len(task_ids):
        task_id = task_ids[0]
        if self.tasks[task_id]['completed']:
          completed.append(task_ids.popleft())
          pbar.update(1)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.