pwwang / pipen Goto Github PK
View Code? Open in Web Editor NEWA pipeline framework for python
Home Page: https://pwwang.github.io/pipen/
License: Apache License 2.0
A pipeline framework for python
Home Page: https://pwwang.github.io/pipen/
License: Apache License 2.0
Will you support private key for ssh runners?
Can we hide some trivial processes in flowchart?
If not , remove them
Can we have an option to halt (stop submitting new jobs) when any of the running jobs fails?
It is useful when we are trying to debug if a process has many jobs, we don't have to wait until all jobs being submitted.
c = Channel.create()
c1 = c.insert(0, '')
# expect c1 == [('', )]
# but c1 == []
params.loadFile("config.json")
renders:
u'whatever'
Proposed settings for p.echo
:
# echo all
p.echo = True
# don't echo anything
p.echo = False
# just echo stdout
p.echo = 'stdout'
# just echo stderr
p.echo = 'stderr'
Either standalone or html file using pandoc.
Current those files will be remove if job retries.
Support exporting output files gzipped.
If it is a directory, use tar.gz; else use .gz.
Add better test for #39
see: https://shopify.github.io/liquid/
Try to separate it as an independent python package (work in progress):
https://github.com/pwwang/liquid.py
stdout and stderr should not be cleared if a process is cached.
Now the cache mode is either True, False or export
:
Caching method (p.cache=?) | How |
---|---|
True | A signature* of input files, script and output files of a job is cached in /<job.index>/job.cache, compare the signature before a job starts to run. |
False | Disable caching, always run jobs. |
"export" | First try to find the signatures, if failed, try to restore the files existed (or exported previously in p.exdir). |
Can we do a forced cache mode, using results current output directory?
This would be super helpful if I can the job.script
separately/independently while debugging.
Is there a way to track the running time for each job? Like save it in job.time
Is it possible to have a function like that you can set:
p = proc()
# ...
p.expect = "grep <some string expected> {{outfile}}"
Sometime, even it returned 0 and output file generated, it could be still unexpected results.
By this, you can set the expectation, tell pyppl
whether the job is finished as we expected.
If the job is submitted and killed when it's not started, the main thread will wait forever.
Because it will wait for the rcfile
to be generated. However, it is generated by the trap
command.
To fix it: use qstat
to check status.
Slurm runner is added in slurm
branch, which won't be merged until it is tested.
Before use it, you have to prepare it in either way:
pyppl/runners/runner_slurm.py
to your script directory, and then in your script: from runner_slurm import runner_slurm
; orfrom pyppl.runners.runner_slurm import runner_slurm
By either way, before you run the pipeline, you have to register the runner by yourself: proc.registerRunner(runner_slurm)
Where to configure it:
For single process:
p.slurmRunner = {...}
For pipeline:
config = {
"proc": {
... # other configurations
"runner": "slurm", # all processes run with slurm
"slurmRunner": {
...
}
}, # or you can also create a profile
"runWithSlurm": {
... # other configurations
"runner": "slurm",
"slurmRunner": {
...
}
}
}
pyppl(config).starts(...).run() # uses configurations of 'proc'
# for profile:
# pyppl(config).starts(...).run('runWithSlurm')
The full configuration:
"slurmRunner": {
"preScript": "export PATH=$PATH:/path/to/add", // default: ''
"postScript": "# some cleanup", // default: ''
// commands (some slurm systems have variants of commands)
"sbatch": "yhbatch", // default: sbatch
"srun": "yhrun", // default: srun
"squeue": "yhqueue", // default: squeue
// the prefix add to command you want to run
// i.e "srun -n8 hostname"
// it defaults to the command you specified to slurmRunner['srun']
// In this case: "yhrun"
"cmdPrefix": "srun -n8", // default: slurmRunner['srun']
// sbatch options (with prefix "slurm."):
"slurm.p": "normal",
"slurm.mem": "1GB",
// other options
// ......
// Note that job name (slurm.J), stdout (slurm.o), stderr file (slurm.e) is calculated by the runner.
// Although you can, you are not recommended to set them here.
}
a = Aggr( pSort )
a.pSort.runner = 'sge'
But a.pSort.runner is not running on sge.
Current issues:
Using config file /home/pwwang/PyPPL/.pylintrc
************* Module pyppl.parameters
C: 1, 0: Too many lines in module (1248/1000) (too-many-lines)
W:427, 2: Attribute 'value' defined outside __init__ (attribute-defined-outside-init)
E:627,10: Super of 'Parameters' has no '__getattr__' member (no-member)
W:742,11: Use of eval (eval-used)
R:721, 1: Too many return statements (11/6) (too-many-return-statements)
R:721, 1: Too many branches (22/12) (too-many-branches)
R:780, 1: Too many arguments (6/5) (too-many-arguments)
R:780, 1: Either all return statements in a function should return an expression, or none of them should. (inconsistent-return-statements)
R:820, 1: Too many branches (24/12) (too-many-branches)
R:898, 1: Too many local variables (18/15) (too-many-locals)
C:923,13: Consider iterating the dictionary directly instead of calling .keys() (consider-iterating-dictionary)
R:898, 1: Too many branches (23/12) (too-many-branches)
R:898, 1: Too many statements (54/50) (too-many-statements)
C:1001,18: More than one statement on a single line (multiple-statements)
C:1009,22: More than one statement on a single line (multiple-statements)
C:1059,30: More than one statement on a single line (multiple-statements)
W:531, 2: Attribute '_assembler' defined outside __init__ (attribute-defined-outside-init)
E:1128,10: Super of 'Commands' has no '__getattr__' member (no-member)
C: 11, 0: third party import "from colorama import Fore, Back, Style" should be placed before "from .utils import Box, string_types, ConfigParser, jsonLoads" (wrong-import-order)
************* Module pyppl.proc
C: 1, 0: Too many lines in module (1026/1000) (too-many-lines)
W: 73,21: Redefining built-in 'id' (redefined-builtin)
R:209, 1: Too many branches (18/12) (too-many-branches)
W:302,17: Redefining built-in 'id' (redefined-builtin)
R:302, 1: Too many branches (21/12) (too-many-branches)
C:374,18: More than one statement on a single line (multiple-statements)
R:428, 1: Too many branches (13/12) (too-many-branches)
R:535, 1: Too many branches (28/12) (too-many-branches)
R:535, 1: Too many statements (73/50) (too-many-statements)
R:663, 1: Too many local variables (19/15) (too-many-locals)
R:663, 1: Too many branches (20/12) (too-many-branches)
R:663, 1: Too many statements (57/50) (too-many-statements)
E:854, 4: Possible unbalanced tuple unpacking with sequence defined at line 155 of pyppl.utils: left side has 2 label(s), right side has 0 value(s) (unbalanced-tuple-unpacking)
R:820, 1: Too many branches (13/12) (too-many-branches)
C: 10, 0: third party import "import yaml" should be placed before "from box import Box" (wrong-import-order)
C: 11, 0: third party import "import filelock" should be placed before "from box import Box" (wrong-import-order)
************* Module pyppl.proctree
C:274, 0: Trailing whitespace (trailing-whitespace)
C: 70,21: More than one statement on a single line (multiple-statements)
C:133,19: More than one statement on a single line (multiple-statements)
C:214,19: More than one statement on a single line (multiple-statements)
C:252,22: More than one statement on a single line (multiple-statements)
R:250, 2: Too many nested blocks (6/5) (too-many-nested-blocks)
R:239, 1: Too many branches (13/12) (too-many-branches)
************* Module pyppl.runners
C: 29, 1: Missing method docstring (missing-docstring)
R: 29, 1: Too many arguments (7/5) (too-many-arguments)
C: 71, 1: Missing method docstring (missing-docstring)
W: 33, 2: Attribute 'script' defined outside __init__ (attribute-defined-outside-init)
R:178, 1: Too many local variables (16/15) (too-many-locals)
C:336,33: More than one statement on a single line (multiple-statements)
C:419,35: More than one statement on a single line (multiple-statements)
C: 8, 0: third party import "from psutil import pid_exists" should be placed before "from box import Box" (wrong-import-order)
C: 9, 0: standard import "from multiprocessing import Lock" should be placed before "from psutil import pid_exists" (wrong-import-order)
************* Module pyppl.template
C: 6, 0: Multiple imports on one line (json, inspect) (multiple-imports)
C: 21, 1: Missing method docstring (missing-docstring)
C: 26, 1: Missing method docstring (missing-docstring)
C: 37, 1: Missing method docstring (missing-docstring)
C: 63, 1: Missing method docstring (missing-docstring)
C: 67, 1: Missing method docstring (missing-docstring)
R: 67, 1: Too many return statements (13/6) (too-many-return-statements)
C:102, 1: Missing method docstring (missing-docstring)
************* Module pyppl.utils
W: 35, 1: Redefining built-in 'reduce' (redefined-builtin)
W: 35, 1: Redefining built-in 'map' (redefined-builtin)
W: 35, 1: Redefining built-in 'filter' (redefined-builtin)
W:129, 0: Redefining built-in 'range' (redefined-builtin)
E: 15, 9: cmdy is not callable (not-callable)
W: 45, 1: Statement seems to have no effect (pointless-statement)
W: 46,14: Redefining built-in 'input' (redefined-builtin)
R: 47, 2: Unnecessary "else" after "return" (no-else-return)
E: 96, 0: function already defined line 35 (function-redefined)
E:107, 0: function already defined line 35 (function-redefined)
E:118, 0: function already defined line 35 (function-redefined)
C:178,13: More than one statement on a single line (multiple-statements)
C:184,10: More than one statement on a single line (multiple-statements)
C:364, 0: Missing function docstring (missing-docstring)
E:397, 8: Catching an exception which doesn't inherit from Exception: ChmodError (catching-non-exception)
E:402, 9: Catching an exception which doesn't inherit from Exception: ChmodError (catching-non-exception)
W:375,21: Unused argument 'filetype' (unused-argument)
R:471, 1: Too many arguments (6/5) (too-many-arguments)
W:536, 1: Parameters differ from overridden 'put' method (arguments-differ)
W:545, 1: Parameters differ from overridden 'put_nowait' method (arguments-differ)
W: 19, 1: Unused Queue imported from Queue (unused-import)
W: 19, 1: Unused Empty imported from Queue as QueueEmpty (unused-import)
W: 29, 1: Unused ConfigParser imported from ConfigParser (unused-import)
C: 12, 0: third party import "import psutil" should be placed before "import cmdy" (wrong-import-order)
R: 1, 0: Similar lines in 2 files
==pyppl.parameters:566
==pyppl.proc:291
def __hash__(self):
return id(self)
def __eq__(self, other):
return id(self) == id(other)
def __ne__(self, other):
return not self.__eq__(other)
(duplicate-code)
To implement the reporting system.
If you have this pipeline:
# proc definition ...
p.runner = 'local'
# run the pipeline ...
p
will be cached, cache files are generated at ./workdir/PyPPL.pp.notag.xxxxxx/<job.index>/job.cache
Then switch the runner to dry
p.runner = 'dry'
./workdir/PyPPL.pp.notag.xxxxxx/<job.index>/job.cache
will be not touched, but output files/directories will be replaced by empty ones.
Then if you switch back to local
runner:
p.runner = 'local'
As the cache files are still there, input files aren't changed, the jobs are cached, script won't run. But the output files/directories are empty.
That should be a bug.
Cache files should be removed in dry
runner.
You can do this now:
python awesome-pipeline.py
# later you want to use python3
python3 awesome-pipeline.py
# processes will be cached...
# switch back to python2
python awesome-pipeline.py
# processes are still cached...
If the pipeline failed at one of the processes, it should be able to resume it for the next run.
Uh... the suffix may be difficult to determine to find the cached workdir. Maybe we can determine it by its dependent processes.
A scenario should be like:
params.resume.setType(list)
p1 = proc()
p2 = proc()
p3 = proc()
p3.depends = [p1, p2]
p4 = proc()
p4.depends = p3
p5 = proc()
p5.depends = p4
resumes = []
for p in [p1, p2, p3, p4, p5]:
if p.id + '.' + p.tag in params.resume.value:
resumes.append(p)
ppl = pyppl().starts(p1)
if resumes:
ppl = ppl.resume(*resumes)
ppl.run()
python pipeline.py --param-resume p3.notag
A warning should be raised when it falls to local runner if profile or runner not found.
Pipeline won't run if I don't have optional package (i.e. graphviz
) installed.
Like I just want to export part of the output files.
For example:
p.output = "out1:file:somefile, out2:file:someotherfile"
I just want to export out1
The bring files do change results, but they are changed if you generate them in the script, which will change the computed script.
Allow {{#}} placeholder in output
.
Now it's just allowed in script
.
FYI
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.