Giter Club home page Giter Club logo

taskflow's Introduction

Taskflow

An advanced yet simple system to run your background tasks and workflows.

Features

  • Recurring tasks (aka jobs) and workflows (a series of dependent tasks) with CRON-like scheduling

  • Workflow dependencies - tasks execute in order and/or in parallel depending on dependency chain.

  • Two types of workers that execute tasks

    • Pull workers - Pull tasks directly off the database queue and execute them.
    • Push workers - Tasks are pushed to a remote work management system, such as AWS Batch, Kubernetes, or Apache Mesos. Container friendly.

Motivation

Other background task and workflow management solutions exist out there, such as Celery for tasks/jobs or Airflow and Luigi for workflows. Taskflow is designed to have a small footprint, maintain it's state in a readily queryable SQL database, have predictable CRON-like scheduling behavior, and GTFO of the way when you need it to.

Concepts

Task

A Task represents some runnable code or command. For example, extact a table from a database, or push to an API. Anything that can run in pyhon or be excuted in Bash.

Task Instances

A Task Instance is a specific run of a Task. Task instances can be created programmatically on-demand or automatically using a recurring schedule attached to a Task.

Workflows

A Workflow represents a series of dependent tasks represent in a graph. Workflows are associated to the Tasks they run.

Workflow Instances

A Workflow Instance is a specific run of a Workflow. A Workflow Instance creates Task Instances as needed during a run. Like a Task Instance, Workflow Instances can be created programmatically on-demand or automatically using a recurring schedule attached to a Workflow.

Scheduler

The scheduler always runs as a single instance at a time. It schedules recurring Tasks and recurring Workflows. It also advances running Workflow Instances, scheduling Task Instances as needed.

Pusher

The Pusher is usually run within the same process as the scheduler. The Pusher pulls tasks destined for a push worker off the task_instances table and pushes them to the push destination. For examples, pushing tasks to AWS Batch. The Push also syncs the state of the currently pushed tasks with the push destination. Multiple push destinations can be used at the same time, for example one task could go to AWS Batch while another goes to Kubernetes.

Pull Worker

A pull worker is a process that directly pulls tasks off the queue and executes them.

API

Task

Creates a new Task definiton that can be used to schedule tasks and create task instances.

task_foo = Task(
	name=None, # Required - name of the task, like 'push_new_accounts'
	workflow=None, # workflow the task is associated with
	active=False, # wether the task is active, if false, it will not be sceduled
	title=None, # human friendly title of the task, used for the UI
	description=None, # human friendly description of the task, used for the UI
	concurrency=1, # maximum number of TaskInstances of this Task that can be running at a time
	sla=None, # numbers of seconds the scheduled Task should start within
	schedule=None, # CRON pattern to schedule the Task, if the task is to be scheduled
	default_priority='normal', # default priority used for the TaskInstances of this Task
	start_date=None, # start datetime for scheduling this Task, the task will not be scheduled before this time
	end_date=None, # end datetime for scheduling this Task, the task will not be scheduled after this time
	retries=0, # number of times to retry task if it fails
	timeout=300, # if the task is not completed and the locked_at datetime is past these number of seconds, retry or fail task
	retry_delay=300, # number of seconds inbetween retries
	params={}, # parameters of the task, these are available to the executors and pushers
	push_destination=None) # for Tasks that need to be pushed, where to push them, for example 'aws_batch'
	
task_foo.depends_on(task_bar) # Task.depends_on tells Taskflow that one Task is dependant on another. Both tasks need to be in the same Workflow

To create a task with executable python code, override the execute function on an inherited class.

class MyTask(Task):
	def execute(self, task_instance):
		print('Any sort of python I want')

Workflow

Creates a new Workflow definiton that can be used to schedule workflows and create workflow instances.

workflow_foo = Workflow(
	name=None, # Required. Name of the workflow like 'salesforce_etl'
	active=False, # wether the workflow is active, if false, it will not be sceduled
	title=None, # human friendly title of the workflow, used for the UI
	description=None, # human friendly description of the workflow, used for the UI
	concurrency=1, # maximum number of WorkflowInstances of this Workflow that can be running at a time
	sla=None, # numbers of seconds the scheduled Workflow should start within
	schedule=None, # CRON pattern to schedule the Workflow, if the workflow is to be scheduled
	default_priority='normal', # default priority used for the WorkflowInstances of this Workflow
	start_date=None, # start datetime for scheduling this Workflow, the workflow will not be scheduled before this time
	end_date=None, # end datetime for scheduling this Workflow, the workflow will not be scheduled after this time)

Taskflow

The Taskflow class is used to create an instance of Taskflow and associate your Tasks and Workflows.

taskflow = Taskflow()

# These function take arrays of Workflows and Tasks (not associated to a Workflow) and add them to Taskflow
taskflow.add_workflows(workflows)
taskflow.add_tasks(tasks)

The above code is usually all you need to setup Taskflow, see the exmaple directory for an example implementation.

CLI

The CLI is used to manage Taskflow, manage the database, run the scheduler, run the pull worker, and execute task instances.

Usage: main.py [OPTIONS] COMMAND [ARGS]...

Options:
  --taskflow TEXT
  --help           Show this message and exit.

Commands:
  api_server
  init_db
  migrate_db
  pull_worker
  queue_task
  queue_workflow
  run_task
  scheduler

taskflow's People

Contributors

awm33 avatar timwis avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

Forkers

zgd716

taskflow's Issues

REST API

A RESTful API with username/password session authorization, for use in the frontend.

Monitoring

Allow heartbeat, counts, and rate.

Proposed implementation - Create a base class like BaseMonitor with functions like heartbeat_scheduler() and task_failure(). Implementation classes like CloudWatchMonitor or DataDogMonitor would take values from the function arguments and publish them to their respective service.

Web UI

A single page web interface, ideally done in Vuejs. Requires #1

Features

  • Latest recurring workflow and tasks run/instance. Highlighting running,success, fail, etc
  • List Workflow instances
    • Filter by state
    • Filter by workflow
    • Sorting
  • List Task instances
    • Filter by state
    • Filter by workflow
    • Sorting

For the instance tables filtering/sorting, both endpoints use restful-ben, so there is opportunity for some reusable table filter components.

Advanced Authorization

Full blown RBAC or even just user permissions based on tags or individual task and workflows.

Timeout AWS Batch Jobs

AWS Batch does not support job timeouts. After a timeout period, Taskflow should terminate the job in AWS Batch.

Implement SLA

The parameter is already in place on Tasks and Workflows, but currently does nothing. Log SLA misses and potentially send alerts. Would also be good to plug SLAs into #6

Taskflow Events

There's an events table, but it's currently not in use. Having high level events recorded for each workflow and task would be useful, particularly in the UI.

Tags

Be able to add tags to Task and Workflows

Concurrency not enforced

The concurrency property on Tasks and Workflows is not currently enforced. The will not run more than one at a time, but this is needed for on-demand use.

Stage Field

Workflow stage field - dev, test, prod

dev and test would skip monitoring and potentially use their own environment, depending on config.

REST API shows different structure for one endpoint

/tasks, /workflows and /workflow-instances shows a structure like this:

{
    "total_pages": 1,
    "page": 1,
    "count": 26,
    "data": [
        { ... }
    ]
}

But /workflow-instances/recurring-latest doesn't have that structure and just shows the data array:

[
     { ... }
]

Not sure if this is intentional or not. One way I've seen around returning pagination details in the body (and having to wrap the actual body) is to do it in the response headers.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.