lsstdesc / ceci Goto Github PK
View Code? Open in Web Editor NEWExperimental pipeline prototype software
License: BSD 3-Clause "New" or "Revised" License
Experimental pipeline prototype software
License: BSD 3-Clause "New" or "Revised" License
I've proposed a new read-only property called StageParameter.msg
, which matches the parameter name in the class constructor.
This looks like it works, b/c the stages are correctly ordered, but fails when several stages depend on a single input.
The open_input
method currently fails when you have an alias defined, whether you call it with the original tag or the alias.
One of them fails on get_input_type and the other on just get_input.
Something similar happens with open_output.
I'm trying to use a custom filename for the output of a stage. By specifying the output in make_stage:
pzflex = FZBoost.make_stage(other arguments, output="custom_filename.hdf5")
Even if print_io()
displays the custom file name, once the pipeline is run, it uses the default file name.
This can be solved by modifying the funcion find_outputs
in stage.py
so that is uses the dictionary _outputs
, like the following:
def find_outputs(self, outdir):
"""Find and retrun all the outputs associated to this stage
These are returned as a dictionary of tag : path pairs
"""
ret_dict = {}
for tag, ftype in self.outputs_():
aliased_tag = self.get_aliased_tag(tag)
if not aliased_tag in self._outputs.keys():
self._outputs[aliased_tag]=ftype.make_name(aliased_tag)
ret_dict[aliased_tag] = f"{outdir}/{self._outputs[aliased_tag]}"
In all my projects I use setuptools_scm
which automatically retrieves the version of the package from git. Never have to update version numbers again.
Unless someones has any objectsion I'll make a PR to switch to setuptools scm
If I have a stage, and call help(stage)
to look at the docstring, it just looks like this
config_options = {'param1': <ceci.config.StageParameter object>,...
It would be great if it would print something like
- param1: type, default=...
param1 is the...
In TXPipe we want to use another repo, FlexZPipe, for some pipeline stages. Right now you have to add these to PYTHONPATH manually. It will be nice to change this.
e.g. ceci test.yml resume=False
ends up with False as a text value which evaluates to True.
Given the heterogeneous nature of the DESC pipelines, I anticipate that we will soon be running into a major hurdle to keep track of all the software dependencies of all the pipeline stages. One option is to execute the pipeline in Docker containers, leaving it the responsibility of the pipeline designers to provide a container that can execute their codes. The benefits of this approach are the following:
After talking with @yadudoc today, it sounds like parsl
with conveniently support both docker on local machines and shifter at nersc. One way of setting this up is to provide the container environment in the definition of the site
, then all the apps running on that site will run in that container. For nersc, to handle shifter
, it sounds like all we need is to setup the site
with a slurm script header which provides the shifter parameters (like which image to use). So this is technically doable :-) .
Here is a proposal for how containers could be used in the ceci
framework:
We could ask that every ceci
module, like TXpipe
, provide a Dockerfile which can be used to execute all the stages defined in that module. That Dockerfile should extend a given base file, that would be common to all modules, for instance a given version of the LSST stack.
This way, we can reuse the same site
to run various stages in parallel when we can. When combining different ceci
modules, like photoz, shape measurement, and txpipe, each module will have their own docker image, and since they are all based on the same base, we can hope that one could still install the complete environment locally if they needed to.
@joezuntz let me know if that sounds good to you or if you anticipate more complicated patterns where this would fail.
As it is now, if the default for a configuration parameter is not set (i.e., None
) and the user tries to set it to None
the code thinks that is has a missing parameter.
We need a way to distinguish this. As some level the issue is that the mechanism of definition of configuration parameters through config_options class data doesn't distinguish these cases.
Investigate pulling the interactive tooling from RAIL into ceci.
This includes:
Need to return an error status if the pipeline fails.
Currently ceci
keeps the install requirements in the setup.py
to a minimum, so that the user doens;t have to install all the requirements for some runners that are not being used.
I suggest modifying the setup.py
so that we can do the following:
$ pip install ceci[parsl]
$ pip install ceci
with the latter option just using the simplest set of requirements
--export-cwl EXPORT_CWL
Exports pipeline in CWL format to provided path and
exits
and what exists? My mind is running wild with possibilities.
We'd like to be able to robustly resume a partially run pipeline, skipping stages that were completed the first time.
The current resume
option only checks whether the file exists, which does not work if the file is partially completed - the pipeline.
@yadudoc (sorry, don't know Kyle's github name to include him too) - does Parsl have any specific features (or planned features) for re-running pipelines? Or can you suggest approaches to this?
It may be difficult to scale the very large jobs involved in complex pipeline stages using only MPI. Instead it may be more scalable to have a concept of nested workflow so that a given stage can be farmed as a multitude of smaller parsl tasks.
This is a concept of scatter/gather (https://www.commonwl.org/v1.0/Workflow.html#WorkflowStep) that already exists in some workflow manager, we just need to figure out if this can be implemented with parsl.
Hi, I followed the instructions here, I ran
cookiecutter https://github.com/LSSTDESC/pipeline-package-template
and then I did ceci test/test.yml
, but it gives this error message:
/global/homes/z/zdu863/.local/cori/3.6-anaconda-4.4/lib/python3.6/site-packages/ceci-0.0.7-py3.6.egg/ceci/main.py:26: YAMLLoadWarning: calling yaml.load() without Loader=... is deprecated, as the default Loader is unsafe. Please read https://msg.pyyaml.org/load for full details.
pipe_config = yaml.load(config_text)
fatal: Not a git repository: '/global/homes/z/zdu863/.local/cori/3.6-anaconda-4.4/lib/python3.6/site-packages/parsl-0.5.2-py3.6.egg/.git'
Pipeline queuing stage LSSPipeStage1 with 1 processes
Compiling command:
python3 -m lsspipe LSSPipeStage1 --some_input_tag=test/input_data.txt --raw_data_dir=/global/cscratch1/sd/zdu863/objcat_sgclass_all --config=test/config.yml --some_output_tag=./outputs/some_output_tag.txt
If your pipeline tries to use multiple processes on cori login nodes the code triggers a complaint. This should not happen when using the dry-run pipeline.
Doing some testing running 'ceci ./test/test.yml` I was getting:
File "/Users/walter/.conda/envs/3x2pt/lib/python3.8/site-packages/ceci/main.py", line 37, in run
os.makedirs(os.path.split(log_file)[0], exist_ok=True)
File "/Users/walter/.conda/envs/3x2pt/lib/python3.8/os.py", line 223, in makedirs
mkdir(name, mode)
FileNotFoundError: [Errno 2] No such file or directory: ''
This is because test.yml contains
# Put the log for the overall pipeline infrastructure in this file:
pipeline_log: log.txt
changing to './log.txt' fixes the error. But, the parser should probably be more robust.
We just released the new Parsl v0.6.1 which has a few non-backward changes, that I suspect requires some updates in Ceci. Most changes will go to the launchers module. As a solution for the immediate problem I would recommend adding a specific requirement for an older parsl release, and add a separate PR for updated configs.
We need to rationalize the provision of configuration options to stages somewhat - right now it is a bit confused - the pipeline both reads the yaml config file and passes all the options in it on the command line and passes the file itself. It's useful to be able to override parameters, but the current design is just a bit confusing.
Here are a few suggested principles:
The config yaml file is a required for every stage in the pipeline. It should therefore be an implicit input parameter for each one, and not need to be specified manually.
A pipeline run should not specify parameters on the command line but instead just pass the yaml file
If (when testing/debugging) the user wants to pass parameters on the command line then they should override the config file as they do now.
@EiffL do these bullet points make sense to you?
This would be helpful for doing studies in rail, by making it easier to re-run particular stages with e.g., different ML models, or different ML training parameters.
PipelineStage.make_stage()
has no fixed signature. If the method is called with an unknown stage parameter (e.g. by mistyping the name), it does not throw an exception but instead silently discards the value because it cannot be mapped to any of the defined stage parameters.
Suggestion:
Add a check in StageConfig.set_config()
for any keys that are not present in its underlying stage parameter dictionary.
Goals:
To require only minimal changes to existing TXPipe-based pipelines. I think with the changes I am suggesting we might not need any changes to the various TXPipe classes, except maybe to the DataFile stuff.
To enable creating interactive 'pipelines' with the following properties:
Plans.
Eric C. will get a version of ceci with these changes working in a fork and then pull that onto a branch in this repo, then we can move to full testing, etc...
Larger design change wish list:
Switch emphasis to using object instances instead of classes in pipelines. I.e,. configuration could be store in an instance of a class, rather than as a separate object. This will also make it easy to configure/ run individual stages, and will make it much easier to build pipelines via python interface (by creating a bunch of analysis objects and adding them to a pipeline). My suggestion would be to have the instance live in the StageExecutionConfig objects, and then become the primary source of configuration information. Also, this will require some changes to when configuration is loaded / latched. This includes moving methods in pipeline that take 'stage' as an argument to Stage or StageExecutionConfig.
Switch from keying on class member 'name' to something like 'instance_name', which, to maintain background compatibility, can default to class name, but can be overridden for individual objects. This will make it easier to define multiple objects of the same class for data exploration.
Add something like a 'SubPipeline', which is a PipelineStage that is actually a Pipeline in its own right.
Consider adding something like a 'DataHandle' class that allows access to data either in memory or off disk. This will make it easier to pass data between stages in memory, which could be particular useful when defining a SubPipeline that tacks together a few stages that just add / modify one or two columns of a table.
Interface wish list, presented as pseudo-code
---------------- PipelineStage interface -----------------
largely the same as before, except:
class PipelineStage:
parallel = True
dask_parallel = False
config_options = {}
doc = ""
@classmethod
def _parse_command_line(cls, cmd=None) -> namespace <unchanged>
<build parser, parse command line arguments and return namespace>
@classmethod
def main(cls) -> int
cls._parse_command_line()
cls.execute()
@classmethod
def execute(cls, args, comm=None)
stage = cls(args, comm)
<mpi stuff>
stage.run()
@classmethod
def main(cls) -> int
stage_class = <get stage class from factory>
args = stage_class._parse_command_line()
stage_class.execute(args)
def __init__(self, args, comm=None): # probably actually want MPI stuff not to be in __init__
self._configs = {}
self._parallel = SERIAL
self._comm = None
self._size = 1
self._rank = 0
self._instance_name = self.name <can be overridden in config>
self.load_config(args)
self.setup_mpi(comm)
@classmethod
def clone(cls, orig, cloneName, **kwargs)
args = orig.config.copy()
args.update(**kwargs)
args['name'] = cloneName
return cls(args, orgi.comm)
@property
def instance_name(self) -> str
@abstractmethod
def run(self)
def config_and_run(self, **kwargs)
self.load_configs(kwargs)
self.run()
def load_configs(self, args)
<fills self._configs from args and yaml files referred to by args>
def setup_mpi(self, comm)
<fill data members based on comm>
def get_inputs(self) -> dict, (str, handle)
<fills dict w/ all input handles>
def get_input(self, tag) -> handle
<as now, but returns handle instaed of path>
def get_outputs(self) -> dict, (str, handle)
<fills dict w/ all input handles>
def get_outputs(self) -> dict, (str, handle)
<fills dict w/ all input handles>
def get_output(self, tag, name_name) -> handle
<as now, but returns handle instaed of path>
def add_to_config_dict(self, config_dict)
<for taking snapshots>
config_dict[self.instance_name] = copy(self.config)
@property
def instance_name(self) -> str
--------------- new methods, replacing parts of pipeline ---------------
def find_inputs(self, pipeline_files : dict, (str, handle)) -> dict, (str, handle)
<does lookup in pipeline_files, and fills dict with input handles>
def find_outputs(self, outdir) -> dict, (str, handle)
<prepends outdir to all files, and fills dict with output handles>
def should_skip(self, run_config) -> bool
<checks if outputs exist, and skip if they do and run_config is set to 'resume'>
def already_finished(self, run_info)
<prints message>
--------------- unchanged parts of interface -----------
@classmethod
def usage(cls) <unchanged>
@classmethod
def get_module(cls) -> module <unchanged>
@classmethod
def get_stage(cls, name) -> class <unchanged>
@classmethod
def generate_command(cls, inputs, config, outputs):
@classmethod
def generate_cwl(cls, log_dir=None) -> cwlgen.CommandLineTool <unchanged>
@classmethod
def output_tags(cls) -> list, (str) <unchanged>
@classmethod
def input_tags(cls) -> list, (str) <unchanged>
@property
def config(self) -> dict <unchanged>
@property
def rank(self) -> int <unchanged>
@property
def size(self) -> int <unchanged>
@property
def comm(self) -> communicator <unchanged>
def is_parallel(self) -> bool <unchanged>
def is_mpi(self) -> bool <unchanged>
def is_dask(self) -> bool <unchanged>
def start_dask(self) <unchanged>
def stop_dask(self) <unchanged>
def split_tasks_by_rank(self, tasks) <unchanged>
def data_ranges_by_rank(self, n_rows, chunk_rows, parallel=True) <unchanged>
def read_config(self, args) <unchanged>
def open_input(self, tag, wrapper=False, **kwargs) <unchanged>
def open_output(self, tag, wrapper=False, final_name=False, **kwargs) <unchanged>
def get_input_type(self, tag) -> class <unchanged>
def get_output_type(self, tag) -> class <unchanged>
---------------- StageExecutionConfig interface -----------------
Extended this a bit, to
class StageExecutionConfig:
def __init__(self, info):
<mostly unchanged>
self.stage_class = PiplineStage.get_stage(self.name)
self.stage_obj = None
def build_stage_object(self, args):
self.stage_obj = self.stage_class(args)
def clear_stage_object(self)
self.stage_obj = None
def generate_full_command(self, inputs, outputs, config, run_config):
core = self.stage_class.generate_command(inputs, config, outputs)
return self.site.command(core, self)
---------------- Pipeline interface -----------------
This one has the most changes.
Pipeline
@staticmethod
def create(pipe_config)
launcher_config = pipe_config.get("launcher")
launcher_name = launcher_config["name"]
stages_config = pipe_config["config"]
stages = pipe_config["stages"]
inputs = pipe_config["inputs"]
run_config = {"output_dir": pipe_config["output_dir"],
"log_dir": pipe_config["log_dir"],
"resume": pipe_config["resume"]}
<get pipeline_class>
p = pipeline_class(stages, launcher_config)
p.load(inputs, run_config, stages_config)
return p
@staticmethod
def build_config(pipeline_config_filename, extra_config=None, dry_run=False) -> dict
@staticmethod
def read(pipeline_config_filename, extra_config=None, dry_run=False)
pipe_config = build_config(pipeline_config_filename, extra_config, dry_run)
return create(pipe_config)
def __init__(self, stages, launcher_config):
self.launcher_config = launcher_config
self.stage_execution_config = {}
self.stage_names = []
---- deferred data members -----
---- current ----
self.run_info = None # currently filled by run, type varies
self.pipeline_outputs = self.find_all_outputs(stages, run_config) # filled by run,
---- suggested ----
self.run_info = None # filled by self.load_configs()
self.run_configs = None # filled by self.load_configs()
self.stages = None # list, (class) filled by self.ordered_stages() in self.load_configs()
self.pipeline_outputs = None # dict, (str, handle) filled by self.load_configs()
def load_configs(self, overall_inputs, run_config, stages_config)
<fills self.run_info, self.run_configs, self.stages, self.pipeline_outputs>
def run(self)
<uses self.run_info, self.run_configs, self.stages>
def add_stage(self, stage_info : dict, stage : Stage)
<can be called using either a stage info or a stage>
def remove_stage(self, name : str)
def __get__(self, name) -> Stage
def ordered_stages(self, overall_inputs : dict, (str, url) ) -> list, (class)
def find_all_outputs(self) -> dict, (str, handle)
def write(self, yamlfile)
@abstractmethod
def initiate_run(self, overall_inputs)
return []
return [] # list of futures
return {}, []
rerurn dict, (str, cwl_stuff)
@abstractmethod
def enqueue_job(self, stage, pipeline_files)
return dict, (str, handle) # and cmd to run_info
return dict, (str, handle) # add Parsl.app to run_info
return dict, (str, handle) # insert minirunner.Job to run_info[0]
retrun dict, (str, step.id) # insest step into run_info['workflow']
@abstractmethod
def run_jobs(self)
return 0 # print(cmd)
return 0 or 1 # future.result()
return 0 or 1 # runner.run(interval)
return status # os.system("cwltool ....")
-------- Moved to other classes -------------------
def find_inputs(self, stage : class or instance, pipeline_files : dict, (str, url), run_config : ?) -> dict, (str, handle)
<remove, move to Stage>
def find_outputs(self, stage, run_config) -> dict, (str, handle)
<remove, move to Stage>
def generate_full_command(self,
stage : class,
inputs : dict, (str, url),
outputs : dict, (str, url),
config : str,
run_config : ?,
missing_inputs_in_outdir=False) -> str
<remove, move to StageExecutionConfig>
def should_skip_stage(self, stage, run_config):
<remove, move to stage>
def already_finished_job(self, stage, run_info)
<remove, move to stage>
Ziang pointed out that reloading python packages with ceci stages runs into a problem b/c of the check on the naming.
It would be nice to have a way to set an option that lets you do this.
I'll push some changes to a branch that let you do it.
For example, if you have 1 node available but a job requiring 2 nodes, it will stay in that state forever.
The current setup handles parallelism alright on test machine but not when submitting to clusters or supercomputers via batch systems.
As I understand it we currently need to dynamically create a new site configuration for each size of MPI job, but this could be wrong.
If two stages create the same output which is then used by a subsequent stage then the error is pretty opaque. We should check for this case.
Having stuff like:
stage_name:
input: None
Is overriding stuff from the aliases and configuration inputs when the input files are being read.
The easiest fix to not write keys that are "None". I'm not quite sure why these are getting set to "None" instead of None
, but I was just going to be pragmatic and not write the 'None' inputs.
Shall we adopt the Black code style ?
If nobody complains, I'll convert the code, and add documentation for code maintenairs and contributors.
So, ceci is installed in nersc, so we should be okay for hack week, but I wanted to test out installing ceci locally.
I've done the following:
(1) python (or python3) setup.py install (instructions on the read the docs) gives an error
Running parsl-0.5.2/setup.py -q bdist_egg --dist-dir /tmp/easy_install-dufhqeco/parsl-0.5.2/egg-dist-tmp-kzi8121i
error: [Errno 2] No such file or directory: 'requirements.txt'
(2) Also, if I pip install git+git://github.com/EiffL/python-cwlgen.git (instructions on the ceci readme), I get
ceci 0.0.5 requires parsl<0.6.0, which is not installed.
(2) After I pip install parsl, I get:
cwlgen 0.2.6 has requirement ruamel.yaml==0.13.13, but you'll have ruamel-yaml -VERSION which is incompatible.
ceci 0.0.5 has requirement parsl<0.6.0, but you'll have parsl 0.6.1 which is incompatible.
If I try to set the data_path option on the bpz_lite stage on the command line by adding --data_path=/path/to/somewhere
then the parameter value ends up with a broken value, looking like a string representation of the class, like '<class StageParameter ...>'
or something like that. I guess the wrong thing is being str'd somewhere.
Installing ceci through pip is failing on my machine. The error message is
Using cached https://files.pythonhosted.org/packages/d9/2d/f1773b60c7a78a832f6068761167907302927cb6c7279275a0dcee102e05/ruamel.yaml-0.13.13.tar.gz
Complete output from command python setup.py egg_info:
/var/folders/5g/5ynbyz2528vfyfpjw7spc5fc0000gn/T/tmp_ruamel_q8ni772l/test_ruamel_yaml.c:6:8: warning: explicitly assigning value of variable of type 'yaml_parser_t' (aka 'struct yaml_parser_s') to itself [-Wself-assign]
parser = parser; /* prevent warning */
~~~~~~ ^ ~~~~~~
/var/folders/5g/5ynbyz2528vfyfpjw7spc5fc0000gn/T/tmp_ruamel_q8ni772l/test_ruamel_yaml.c:6:10: warning: variable 'parser' is uninitialized when used here [-Wuninitialized]
parser = parser; /* prevent warning */
^~~~~~
/var/folders/5g/5ynbyz2528vfyfpjw7spc5fc0000gn/T/tmp_ruamel_q8ni772l/test_ruamel_yaml.c:5:1: note: variable 'parser' is declared here
yaml_parser_t parser;
^
2 warnings generated.
Warning: 'keywords' should be a list, got type 'NoneType'
sys.argv ['-c', 'egg_info', '--egg-base', 'pip-egg-info']
test compiling test_ruamel_yaml
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/private/var/folders/5g/5ynbyz2528vfyfpjw7spc5fc0000gn/T/pip-install-wnsl8fb8/ruamel.yaml/setup.py", line 854, in <module>
main()
File "/private/var/folders/5g/5ynbyz2528vfyfpjw7spc5fc0000gn/T/pip-install-wnsl8fb8/ruamel.yaml/setup.py", line 843, in main
setup(**kw)
File "/Users/yooken/Codes/miniconda/envs/tjpcosmo/lib/python3.7/site-packages/setuptools/__init__.py", line 129, in setup
return distutils.core.setup(**attrs)
File "/Users/yooken/Codes/miniconda/envs/tjpcosmo/lib/python3.7/distutils/core.py", line 108, in setup
_setup_distribution = dist = klass(attrs)
File "/Users/yooken/Codes/miniconda/envs/tjpcosmo/lib/python3.7/site-packages/setuptools/dist.py", line 370, in __init__
k: v for k, v in attrs.items()
File "/Users/yooken/Codes/miniconda/envs/tjpcosmo/lib/python3.7/distutils/dist.py", line 267, in __init__
getattr(self.metadata, "set_" + key)(val)
File "/Users/yooken/Codes/miniconda/envs/tjpcosmo/lib/python3.7/distutils/dist.py", line 1203, in set_keywords
self.keywords = _ensure_list(value, 'keywords')
File "/Users/yooken/Codes/miniconda/envs/tjpcosmo/lib/python3.7/distutils/dist.py", line 40, in _ensure_list
value = list(value)
TypeError: 'NoneType' object is not iterable
I've tried a number of different pyyaml installations (3.13, 4.2) but this doesn't seem to have an effect. Python version is 3.7.
Installing from master
with python setup.py install
fails as well at the ruamel.yaml
step.
setup.py says MIT, LICENSE
says BSD, source files don't say anything ^^'
Do we have a preferred license? If so, I'll harmonize everything
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.