Giter Club home page Giter Club logo

lightflow's Introduction

Lightflow - a lightweight, distributed workflow system

image

Documentation Status

Lightflow is a Python 3.5+ library and command-line tool for executing workflows, composed of individual tasks, in a distributed fashion. It is based on Celery and provides task dependencies, data exchange between tasks and an intuitive description of workflows.

Dependencies

Python

Lightflow is written in Python 3 and requires Python 3.5 or higher.

Operating system

Lightflow is being developed and tested on Linux, with Debian and RedHat being the main platforms.

redis

The redis database is required by Lightflow as a communication broker between tasks. It is also used as the default broker for the Celery queuing system, but could be replaced with any other supported Celery broker.

MongoDB

Lightflow makes use of MongoDB for storing persistent data during a workflow run that can be accessed by all tasks.

Getting started

The following getting started guide assumes a redis database running on localhost and port 6379 as well as a MongoDB database running on localhost and port 27017.

Install Lightflow from PyPi:

pip install lightflow

Create a default configuration file and copy the provided example workflows to a local directory of your choice:

lightflow config default .
lightflow config examples .

If you like, list all available example workflows:

lightflow workflow list

In order to execute a workflow, start a worker that consumes jobs from the workflow, dag and task queues. Then start a workflow from the list of available examples. The following example starts the workflow simple:

lightflow worker start
lightflow workflow start simple

lightflow's People

Contributors

ivorblockley avatar jrmlhermitte avatar malramsay64 avatar robbieclarken avatar stmudie avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

lightflow's Issues

add `signal.is_completed(dag_name)`?

Hi,
I have a DAG that generates many other DAG's. I would like it to generate at most N running DAG's.
Is there a way to check within a PythonTask is a DAG is complete or not?
I think something like signal.is_complete(dag_name) would be sufficient.

Note, that I don't want to call signal.join_dags(dag_names) since I don't know which DAGs will complete first.
thanks!

Lightflow not compatible with celery 5

Following the installation instructions, when I attempt to start a worker lightflow worker start I get the following error:

File "/home/anderson/.local/lib/python3.10/site-packages/lightflow/queue/jobs.py", line 16, in <module>
    @celery.task(bind=True)
  File "/home/anderson/.local/lib/python3.10/site-packages/celery/local.py", line 478, in __getattr__
    return ModuleType.__getattribute__(self, name)
AttributeError: module 'celery' has no attribute 'task'. Did you mean: 'Task'?

I suspect it is because my system (Ubuntu 22.04) is using celery 5.2.7 rather than celery 4 as defined in lightflow's requirements. But I'm not a celery expert so cannot confirm.

Error On Windows

I have followed the tutorial for execute the simple workflow but it doesn't work properly and doesn't enter callback method of the PythonTask and doesn't print anything

OS: Windows

How to import custom modules in workflow?

When I import the custom module in the workflow callback function, the execution goes wrong.

[12/01/2018 13:21:32][ERROR] ForkPoolWorker-7 | Task lightflow.queue.jobs.execute_task[6cd45833-e7dd-4753-abe2-3b5a1b72886f] raised unexpected: ImportError("No module named 'lib'",)
Traceback (most recent call last):
File "/usr/local/python3/lib/python3.5/site-packages/celery/app/trace.py", line 374, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/local/python3/lib/python3.5/site-packages/celery/app/trace.py", line 629, in protected_call
return self.run(*args, **kwargs)
File "/usr/local/python3/lib/python3.5/site-packages/lightflow/queue/jobs.py", line 214, in execute_task
event_type=JobEventName.Aborted))
File "/usr/local/python3/lib/python3.5/site-packages/lightflow/models/task.py", line 245, in _run
result = self.run(data, store, signal, context)
File "/usr/local/python3/lib/python3.5/site-packages/lightflow/tasks/python_task.py", line 64, in run
result = self._callback(data, store, signal, context, **kwargs)
File "./tasks/filter_task.py", line 14, in inc_number
from lib.filter import HtmlFilter
ImportError: No module named 'lib'

Strange AttributeError

I'm running a fairly complicated pipeline but it works surprisingly well.
( this dag calls this dag which calls this dag )

However, sometimes, I encounter this error below on the DAG. All the tasks, however, complete just fine.

Any idea what this may come from? Have you seen this before? Any tips on debugging? I will also keep searching myself but I thought I'd flag this.

Traceback (most recent call last):
  File "/home/xf11bm/miniconda3/envs/lightflow-pipeline/lib/python3.6/site-packages/kombu/serialization.py", line 50, in _reraise_errors
    yield
  File "/home/xf11bm/miniconda3/envs/lightflow-pipeline/lib/python3.6/site-packages/kombu/serialization.py", line 263, in loads
    return decode(data)
  File "/home/xf11bm/projects/lightflow/lightflow/queue/pickle.py", line 15, in cloudpickle_loads
    return load(BytesIO(s))
AttributeError: type object 'int64' has no attribute '__index__'

dagerrorissue

how to start workflow using API

current
    lightflow workflow start <simple>

Is there a way to start workflow like this?
    import lightflow
    import simple
    lightflow.start_workflow(simple)

Incomplete Docs

The Docs are Blank for a majority of the documentation links.

How to support multiple outputs from a function

Hey,
I am trying to use this project to define some of workflows for my daily needs as a developer.
I can see that the function can support multiple inputs using aliases , how about multiple outputs?
Is there any way to do that or any lead to make it possible?

database error

``Hi, I have installed as described here: https://lightflow.readthedocs.io/en/latest/installation.html (using the ubuntu package manager for installation of MongoDB and redis). When i start a workflow, i get the following errors:

[26/04/2023 15:54:14][ERROR] ForkPoolWorker-1 | Task lightflow.queue.jobs.execute_dag[12e80c66-4754-41a6-83f7-1ce4bed9b70b] raised unexpected: TypeError('database must be an instance of Database')
Traceback (most recent call last):
  File "/exports/scratch/ana3/lib/python3.9/site-packages/celery/app/trace.py", line 385, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/exports/scratch/ana3/lib/python3.9/site-packages/celery/app/trace.py", line 650, in __protected_call__
    return self.run(*args, **kwargs)
  File "/exports/scratch/ana3/lib/python3.9/site-packages/lightflow/queue/jobs.py", line 117, in execute_dag
    store_doc = DataStore(**self.app.user_options['config'].data_store,
  File "/exports/scratch/ana3/lib/python3.9/site-packages/lightflow/models/datastore.py", line 223, in get
    fs = GridFSProxy(GridFS(db.unproxied_object))
  File "/exports/scratch/ana3/lib/python3.9/site-packages/gridfs/__init__.py", line 90, in __init__
    raise TypeError("database must be an instance of Database")
TypeError: database must be an instance of Database
[26/04/2023 15:54:14][ERROR] ForkPoolWorker-8 | Task lightflow.queue.jobs.execute_workflow[feb66360-ae4e-42a9-a7b2-51a04b45f06c] raised unexpected: TypeError('database must be an instance of Database')
Traceback (most recent call last):
  File "/exports/scratch/ana3/lib/python3.9/site-packages/celery/app/trace.py", line 385, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/exports/scratch/ana3/lib/python3.9/site-packages/celery/app/trace.py", line 650, in __protected_call__
    return self.run(*args, **kwargs)
  File "/exports/scratch/ana3/lib/python3.9/site-packages/lightflow/queue/jobs.py", line 69, in execute_workflow
    workflow.run(config=self.app.user_options['config'],
  File "/exports/scratch/ana3/lib/python3.9/site-packages/lightflow/models/workflow.py", line 228, in run
    data_store.remove(self._workflow_id)
  File "/exports/scratch/ana3/lib/python3.9/site-packages/lightflow/models/datastore.py", line 197, in remove
    fs = GridFSProxy(GridFS(db.unproxied_object))
  File "/exports/scratch/ana3/lib/python3.9/site-packages/gridfs/__init__.py", line 90, in __init__
    raise TypeError("database must be an instance of Database")
TypeError: database must be an instance of Database

Is there an initialization needed for the MongoDB? When i go to the http://localhost:27017/ i get:
It looks like you are trying to access MongoDB over HTTP on the native driver port.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.