Giter Club home page Giter Club logo

pipen's Introduction

A pipeline framework for python


Pypi Github Building Docs and API Codacy Codacy coverage Deps

Documentation | ChangeLog | Examples | API

Features

  • Easy to use
  • Nearly zero-configuration
  • Nice logging
  • Highly extendable

Installation

pip install -U pipen

Quickstart

example.py

from pipen import Proc, Pipen

class P1(Proc):
    """Sort input file"""
    input = "infile"
    input_data = ["/tmp/data.txt"]
    output = "outfile:file:intermediate.txt"
    script = "cat {{in.infile}} | sort > {{out.outfile}}"

class P2(Proc):
    """Paste line number"""
    requires = P1
    input = "infile"
    output = "outfile:file:result.txt"
    script = "paste <(seq 1 3) {{in.infile}} > {{out.outfile}}"

class MyPipeline(Pipen):
    starts = P1

if __name__ == "__main__":
    MyPipeline().run()
> echo -e "3\n2\n1" > /tmp/data.txt
> python example.py
06-09 23:15:29 I core                  _____________________________________   __
06-09 23:15:29 I core                  ___  __ \___  _/__  __ \__  ____/__  | / /
06-09 23:15:29 I core                  __  /_/ /__  / __  /_/ /_  __/  __   |/ /
06-09 23:15:29 I core                  _  ____/__/ /  _  ____/_  /___  _  /|  /
06-09 23:15:29 I core                  /_/     /___/  /_/     /_____/  /_/ |_/
06-09 23:15:29 I core
06-09 23:15:29 I core                              version: 0.14.5
06-09 23:15:29 I core
06-09 23:15:29 I core    ╔═══════════════════════════════════════════════════╗
06-09 23:15:29 I core    ║                            MYPIPELINE                            ║
06-09 23:15:29 I core    ╚═══════════════════════════════════════════════════╝
06-09 23:15:29 I core    plugins         : verbose v0.11.0
06-09 23:15:29 I core    # procs         : 2
06-09 23:15:29 I core    profile         : default
06-09 23:15:29 I core    outdir          : /home/pwwang/github/pipen/MyPipeline-output
06-09 23:15:29 I core    cache           : True
06-09 23:15:29 I core    dirsig          : 1
06-09 23:15:29 I core    error_strategy  : ignore
06-09 23:15:29 I core    forks           : 1
06-09 23:15:29 I core    lang            : bash
06-09 23:15:29 I core    loglevel        : info
06-09 23:15:29 I core    num_retries     : 3
06-09 23:15:29 I core    scheduler       : local
06-09 23:15:29 I core    submission_batch: 8
06-09 23:15:29 I core    template        : liquid
06-09 23:15:29 I core    workdir         : /home/pwwang/github/pipen/.pipen/MyPipeline
06-09 23:15:29 I core    plugin_opts     :
06-09 23:15:29 I core    template_opts   :
06-09 23:15:31 I core
06-09 23:15:31 I core    ╭──────────────────────── P1 ───────────────────────╮
06-09 23:15:31 I core    │ Sort input file                                                  │
06-09 23:15:31 I core    ╰──────────────────────────────────────────────────╯
06-09 23:15:31 I core    P1: Workdir: '/home/pwwang/github/pipen/.pipen/MyPipeline/P1'
06-09 23:15:31 I core    P1: <<< [START]
06-09 23:15:31 I core    P1: >>> ['P2']
06-09 23:15:31 I verbose P1: size: 1
06-09 23:15:31 I verbose P1: [0/0] in.infile: /tmp/data.txt
06-09 23:15:31 I verbose P1: [0/0] out.outfile:
                 /home/pwwang/github/pipen/.pipen/MyPipeline/P1/0/output/intermediate.txt
06-09 23:15:33 I verbose P1: Time elapsed: 00:00:02.018s
06-09 23:15:33 I core
06-09 23:15:33 I core    ╭════════════════════════ P2 ═══════════════════════╮
06-09 23:15:33 I core    ║ Paste line number                                                ║
06-09 23:15:33 I core    ╰══════════════════════════════════════════════════╯
06-09 23:15:33 I core    P2: Workdir: '/home/pwwang/github/pipen/.pipen/MyPipeline/P2'
06-09 23:15:33 I core    P2: <<< ['P1']
06-09 23:15:33 I core    P2: >>> [END]
06-09 23:15:33 I verbose P2: size: 1
06-09 23:15:33 I verbose P2: [0/0] in.infile:
                 /home/pwwang/github/pipen/.pipen/MyPipeline/P1/0/output/intermediate.txt
06-09 23:15:33 I verbose P2: [0/0] out.outfile:
                 /home/pwwang/github/pipen/MyPipeline-output/P2/result.txt
06-09 23:15:35 I verbose P2: Time elapsed: 00:00:02.009s
06-09 23:15:35 I core

              MYPIPELINE: 100%|█████████████████████████████| 2/2 [00:06<00:00, 0.36 procs/s]
> cat ./MyPipeline-output/P2/result.txt
1       1
2       2
3       3

Examples

See more examples at examples/ and a more realcase example at: https://github.com/pwwang/pipen-report/tree/master/example

Plugin gallery

Plugins make pipen even better.

pipen's People

Contributors

codacy-badger avatar gitbook-bot avatar marchon avatar osdaf avatar pwwang avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

pipen's Issues

slurm runner

Slurm runner is added in slurm branch, which won't be merged until it is tested.

Before use it, you have to prepare it in either way:

  1. Download pyppl/runners/runner_slurm.py to your script directory, and then in your script: from runner_slurm import runner_slurm; or
  2. Install the slurm branch, and then in your script: from pyppl.runners.runner_slurm import runner_slurm

By either way, before you run the pipeline, you have to register the runner by yourself: proc.registerRunner(runner_slurm)

Where to configure it:
For single process:

p.slurmRunner = {...}

For pipeline:

config = {
  "proc": {
    ... # other configurations
    "runner": "slurm", # all processes run with slurm
    "slurmRunner": {
       ...
    }
  }, # or you can also create a profile
  "runWithSlurm": {
    ... # other configurations
    "runner": "slurm", 
    "slurmRunner": {
       ...
    }
  }
}
pyppl(config).starts(...).run() # uses configurations of 'proc'
# for profile:
# pyppl(config).starts(...).run('runWithSlurm')

The full configuration:

"slurmRunner": {
  "preScript": "export PATH=$PATH:/path/to/add", // default: ''
  "postScript": "# some cleanup",                // default: ''
  // commands (some slurm systems have variants of commands)
  "sbatch": "yhbatch",                           // default: sbatch
  "srun": "yhrun",                               // default: srun
  "squeue": "yhqueue",                           // default: squeue
  // the prefix add to command you want to run
  // i.e "srun -n8 hostname"
  // it defaults to the command you specified to slurmRunner['srun']
  // In this case: "yhrun"
  "cmdPrefix": "srun -n8",                       // default: slurmRunner['srun']
  // sbatch options (with prefix "slurm."):
  "slurm.p": "normal",
  "slurm.mem": "1GB",
  // other options
  // ......
  // Note that job name (slurm.J), stdout (slurm.o), stderr file (slurm.e) is calculated by the runner.
  // Although you can, you are not recommended to set them here.
}

Can we do partial export?

Like I just want to export part of the output files.
For example:

p.output = "out1:file:somefile, out2:file:someotherfile"

I just want to export out1

Fix all linting issues according to `.pylintrc`

Current issues:

Using config file /home/pwwang/PyPPL/.pylintrc
************* Module pyppl.parameters
C:  1, 0: Too many lines in module (1248/1000) (too-many-lines)
W:427, 2: Attribute 'value' defined outside __init__ (attribute-defined-outside-init)
E:627,10: Super of 'Parameters' has no '__getattr__' member (no-member)
W:742,11: Use of eval (eval-used)
R:721, 1: Too many return statements (11/6) (too-many-return-statements)
R:721, 1: Too many branches (22/12) (too-many-branches)
R:780, 1: Too many arguments (6/5) (too-many-arguments)
R:780, 1: Either all return statements in a function should return an expression, or none of them should. (inconsistent-return-statements)
R:820, 1: Too many branches (24/12) (too-many-branches)
R:898, 1: Too many local variables (18/15) (too-many-locals)
C:923,13: Consider iterating the dictionary directly instead of calling .keys() (consider-iterating-dictionary)
R:898, 1: Too many branches (23/12) (too-many-branches)
R:898, 1: Too many statements (54/50) (too-many-statements)
C:1001,18: More than one statement on a single line (multiple-statements)
C:1009,22: More than one statement on a single line (multiple-statements)
C:1059,30: More than one statement on a single line (multiple-statements)
W:531, 2: Attribute '_assembler' defined outside __init__ (attribute-defined-outside-init)
E:1128,10: Super of 'Commands' has no '__getattr__' member (no-member)
C: 11, 0: third party import "from colorama import Fore, Back, Style" should be placed before "from .utils import Box, string_types, ConfigParser, jsonLoads" (wrong-import-order)
************* Module pyppl.proc
C:  1, 0: Too many lines in module (1026/1000) (too-many-lines)
W: 73,21: Redefining built-in 'id' (redefined-builtin)
R:209, 1: Too many branches (18/12) (too-many-branches)
W:302,17: Redefining built-in 'id' (redefined-builtin)
R:302, 1: Too many branches (21/12) (too-many-branches)
C:374,18: More than one statement on a single line (multiple-statements)
R:428, 1: Too many branches (13/12) (too-many-branches)
R:535, 1: Too many branches (28/12) (too-many-branches)
R:535, 1: Too many statements (73/50) (too-many-statements)
R:663, 1: Too many local variables (19/15) (too-many-locals)
R:663, 1: Too many branches (20/12) (too-many-branches)
R:663, 1: Too many statements (57/50) (too-many-statements)
E:854, 4: Possible unbalanced tuple unpacking with sequence defined at line 155 of pyppl.utils: left side has 2 label(s), right side has 0 value(s) (unbalanced-tuple-unpacking)
R:820, 1: Too many branches (13/12) (too-many-branches)
C: 10, 0: third party import "import yaml" should be placed before "from box import Box" (wrong-import-order)
C: 11, 0: third party import "import filelock" should be placed before "from box import Box" (wrong-import-order)
************* Module pyppl.proctree
C:274, 0: Trailing whitespace (trailing-whitespace)
C: 70,21: More than one statement on a single line (multiple-statements)
C:133,19: More than one statement on a single line (multiple-statements)
C:214,19: More than one statement on a single line (multiple-statements)
C:252,22: More than one statement on a single line (multiple-statements)
R:250, 2: Too many nested blocks (6/5) (too-many-nested-blocks)
R:239, 1: Too many branches (13/12) (too-many-branches)
************* Module pyppl.runners
C: 29, 1: Missing method docstring (missing-docstring)
R: 29, 1: Too many arguments (7/5) (too-many-arguments)
C: 71, 1: Missing method docstring (missing-docstring)
W: 33, 2: Attribute 'script' defined outside __init__ (attribute-defined-outside-init)
R:178, 1: Too many local variables (16/15) (too-many-locals)
C:336,33: More than one statement on a single line (multiple-statements)
C:419,35: More than one statement on a single line (multiple-statements)
C:  8, 0: third party import "from psutil import pid_exists" should be placed before "from box import Box" (wrong-import-order)
C:  9, 0: standard import "from multiprocessing import Lock" should be placed before "from psutil import pid_exists" (wrong-import-order)
************* Module pyppl.template
C:  6, 0: Multiple imports on one line (json, inspect) (multiple-imports)
C: 21, 1: Missing method docstring (missing-docstring)
C: 26, 1: Missing method docstring (missing-docstring)
C: 37, 1: Missing method docstring (missing-docstring)
C: 63, 1: Missing method docstring (missing-docstring)
C: 67, 1: Missing method docstring (missing-docstring)
R: 67, 1: Too many return statements (13/6) (too-many-return-statements)
C:102, 1: Missing method docstring (missing-docstring)
************* Module pyppl.utils
W: 35, 1: Redefining built-in 'reduce' (redefined-builtin)
W: 35, 1: Redefining built-in 'map' (redefined-builtin)
W: 35, 1: Redefining built-in 'filter' (redefined-builtin)
W:129, 0: Redefining built-in 'range' (redefined-builtin)
E: 15, 9: cmdy is not callable (not-callable)
W: 45, 1: Statement seems to have no effect (pointless-statement)
W: 46,14: Redefining built-in 'input' (redefined-builtin)
R: 47, 2: Unnecessary "else" after "return" (no-else-return)
E: 96, 0: function already defined line 35 (function-redefined)
E:107, 0: function already defined line 35 (function-redefined)
E:118, 0: function already defined line 35 (function-redefined)
C:178,13: More than one statement on a single line (multiple-statements)
C:184,10: More than one statement on a single line (multiple-statements)
C:364, 0: Missing function docstring (missing-docstring)
E:397, 8: Catching an exception which doesn't inherit from Exception: ChmodError (catching-non-exception)
E:402, 9: Catching an exception which doesn't inherit from Exception: ChmodError (catching-non-exception)
W:375,21: Unused argument 'filetype' (unused-argument)
R:471, 1: Too many arguments (6/5) (too-many-arguments)
W:536, 1: Parameters differ from overridden 'put' method (arguments-differ)
W:545, 1: Parameters differ from overridden 'put_nowait' method (arguments-differ)
W: 19, 1: Unused Queue imported from Queue (unused-import)
W: 19, 1: Unused Empty imported from Queue as QueueEmpty (unused-import)
W: 29, 1: Unused ConfigParser imported from ConfigParser (unused-import)
C: 12, 0: third party import "import psutil" should be placed before "import cmdy" (wrong-import-order)
R:  1, 0: Similar lines in 2 files
==pyppl.parameters:566
==pyppl.proc:291
        def __hash__(self):
                return id(self)

        def __eq__(self, other):
                return id(self) == id(other)

        def __ne__(self, other):
                return not self.__eq__(other)
 (duplicate-code)

Is it possible to add a function like proc.expect?

Is it possible to have a function like that you can set:

p = proc()
# ... 
p.expect = "grep <some string expected> {{outfile}}"

Sometime, even it returned 0 and output file generated, it could be still unexpected results.
By this, you can set the expectation, tell pyppl whether the job is finished as we expected.

Forced cache mode

Now the cache mode is either True, False or export:

Caching method (p.cache=?) How
True A signature* of input files, script and output files of a job is cached in /<job.index>/job.cache, compare the signature before a job starts to run.
False Disable caching, always run jobs.
"export" First try to find the signatures, if failed, try to restore the files existed (or exported previously in p.exdir).

Can we do a forced cache mode, using results current output directory?
This would be super helpful if I can the job.script separately/independently while debugging.

Resume pipeline

If the pipeline failed at one of the processes, it should be able to resume it for the next run.
Uh... the suffix may be difficult to determine to find the cached workdir. Maybe we can determine it by its dependent processes.

A scenario should be like:

params.resume.setType(list)

p1 = proc()
p2 = proc()
p3 = proc()
p3.depends = [p1, p2]
p4 = proc()
p4.depends = p3
p5 = proc()
p5.depends = p4

resumes = []
for p in [p1, p2, p3, p4, p5]:
  if p.id + '.' + p.tag in params.resume.value:
    resumes.append(p)

ppl = pyppl().starts(p1)
if resumes:
  ppl = ppl.resume(*resumes)
ppl.run()
python pipeline.py --param-resume p3.notag

Known issue about SGE runner

If the job is submitted and killed when it's not started, the main thread will wait forever.
Because it will wait for the rcfile to be generated. However, it is generated by the trap command.
To fix it: use qstat to check status.

Dry-runner should remove job cache file

If you have this pipeline:

# proc definition ...
p.runner = 'local'
# run the pipeline ...

p will be cached, cache files are generated at ./workdir/PyPPL.pp.notag.xxxxxx/<job.index>/job.cache
Then switch the runner to dry

p.runner = 'dry'

./workdir/PyPPL.pp.notag.xxxxxx/<job.index>/job.cache will be not touched, but output files/directories will be replaced by empty ones.

Then if you switch back to local runner:

p.runner = 'local'

As the cache files are still there, input files aren't changed, the jobs are cached, script won't run. But the output files/directories are empty.

That should be a bug.
Cache files should be removed in dry runner.

TODO: add gzip export

Support exporting output files gzipped.
If it is a directory, use tar.gz; else use .gz.

Partial echo

Proposed settings for p.echo:

# echo all
p.echo = True
# don't echo anything
p.echo = False
# just echo stdout
p.echo = 'stdout'
# just echo stderr
p.echo = 'stderr'

Caches are consistent with python2 and python3

0332ec7

You can do this now:

python awesome-pipeline.py
# later you want to use python3
python3 awesome-pipeline.py
# processes will be cached...
# switch back to python2
python awesome-pipeline.py
# processes are still cached...

Halt on error

Can we have an option to halt (stop submitting new jobs) when any of the running jobs fails?
It is useful when we are trying to debug if a process has many jobs, we don't have to wait until all jobs being submitted.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.