Acquisition packages for XPD-28 beamline at BNL
Fork and clone this repo. Run the command in a terminal.
bash install.sh <give a name to the conda environment>
This will create a new conda environment and install the xpdacq in it.
Extensions for streamz
License: Other
We need a future friendly filter. The main issue with filter is that it makes decisions based on the value of the data. This is not possible when running in future mode as the value of the data is unknown until the data is gathered. One approach for this would be to wrap the filter functions so that their futures either evaluate to a dedicated null or the incoming value. All other nodes would wrap their functions to understand the null and not compute and pass the null down. When the null gets to a gather node then the gather node does not emit the null, just as if the data was filtered.
This looks interesting, but not totally sure what it does (beyond work with streamz
somehow). Some rough ideas and a few copy-paste examples would help. :)
@CJ-Wright ... making progress, but now cannot import plot_network
from grave
grave is installed
$ python live_plot.py
Traceback (most recent call last):
File "live_plot.py", line 2, in <module>
from streamz_ext.graph import node_style, run_vis
File "c:\users\simon\dev\streamz_ext\streamz_ext\graph.py", line 5, in <module>
from grave import plot_network
ImportError: cannot import name 'plot_network'
When try to visualize the pipeline built by streaz_ext package: for example tof_source.visualize('../total_mystream.png', source_node=True)
.
It will have errors as following:
Traceback (most recent call last):
File "/Users/Dragon/Documents/Billinge/dev/sidewinder-spec/sidewinder_spec/nomad/parser.py", line 147, in <module>
from nomad.pipeline import source, filename_name_nodes
File "/Users/Dragon/Documents/Billinge/dev/nomad/nomad/pipeline.py", line 5, in <module>
from nomad.raw_pipeline import (length_source, tth_source, composition,
File "/Users/Dragon/Documents/Billinge/dev/nomad/nomad/raw_pipeline.py", line 45, in <module>
tof_source.visualize('../total_mystream.png')
File "/Users/Dragon/anaconda/envs/pipelinePy3/lib/python3.6/site-packages/streamz_ext/core.py", line 370, in visualize
return visualize(self, filename, source_node=source_node, **kwargs)
File "/Users/Dragon/anaconda/envs/pipelinePy3/lib/python3.6/site-packages/streamz/graph.py", line 140, in visualize
data = g.pipe(format=format)
File "/Users/Dragon/anaconda/envs/pipelinePy3/lib/python3.6/site-packages/graphviz/files.py", line 96, in pipe
data = text_type(self.source).encode(self._encoding)
File "/Users/Dragon/anaconda/envs/pipelinePy3/lib/python3.6/site-packages/graphviz/dot.py", line 91, in __str__
return '\n'.join(self)
File "/Users/Dragon/anaconda/envs/pipelinePy3/lib/python3.6/site-packages/graphviz/dot.py", line 82, in __iter__
yield '\t%s%s' % (kw, self.attributes(None, attr))
File "/Users/Dragon/anaconda/envs/pipelinePy3/lib/python3.6/site-packages/graphviz/lang.py", line 95, in attributes
for k, v in tools.mapping_items(kwargs) if v is not None]
File "/Users/Dragon/anaconda/envs/pipelinePy3/lib/python3.6/site-packages/graphviz/lang.py", line 95, in <listcomp>
for k, v in tools.mapping_items(kwargs) if v is not None]
File "/Users/Dragon/anaconda/envs/pipelinePy3/lib/python3.6/site-packages/graphviz/lang.py", line 44, in quote
if html(identifier):
TypeError: expected string or bytes-like object
Package used:
graphviz 2.38.0 7 conda-forge
graphviz 0.5.1 <pip>
pygraphviz 1.3.1 <pip>
It might be ok for filter to be greedy if it is greedy on the dask side so it occupies a dask process.
It may be nice to expose a graph object which actually expresses the entire graph as a single entity. This could also make adding/removing nodes easier as we would have networkx methods.
We need a parallel friendly unique node.
Something like:
I can currently see two classes of use cases for this:
For "map" like nodes we need the sentinel to tell us if we should bother computing the outcome. For "join" like nodes we need the sentinel to report the most recent unique future. Note that we can't have the "map" like nodes resolve to a null-compute since this would cause issues if a map and join nodes were in the same branch.
This could also be problematic for zip
joins, since those will need to potentially pass down null-computes since they are picky about when things come in.
It might be nice to walk through the graph starting at a given node, and color the nodes by their execution order. This would allow us to find the slowest/critical path of the pipeline.
It would be great to visualize data's progress through the pipeline.
This could be accomplished by:
update
methods on all the nodes with a function decorator. This decorator would turn the node yellow when data goes into the update and green when data came out. It may be possible to have it turn red if the data causes an exception.def visualization_decorator(func):
def inner(self, data):
self.v.turn_yellow()
try:
func(data)
except Exception as e:
self.v.turn_red()
raise e
else:
self.v.turn_green()
Hi @CJ-Wright
We are trying to build a pipeline from the live_plot.py
example in streams_ext but running into tricky issues..... it won't import node_style
and run_viz
even though it can find and import the module in an ipython session in the conda env. Any help appreciate.....
S
Currently we handle sub-pipeline linking by connecting nodes with the same name. This causes a bunch of issues, mostly because connect
is a little bit broken due to how our stream type inheritance works (dask vs standard backend).
A better approach may be to explicitly request certain nodes incoming and then inherit from those. This way we don't need to use connect and the nodes will properly inherit their type at pipeline creation time. This also simplifies the linker's implementation by requiring a mapping of arg/kwarg names to input streams and a list of functions which take those args and kwargs and return dicts.
def linker(initial_inputs, list_sub_funcs):
namespace = initial_inputs
for sub_func in list_sub_funcs:
namespace.update(sub_func(**namespace))
return namespace
Note that this requires all sub_funcs
to allow for arbitrary kwargs. We could also inspect the signature, but that seems more brittle.
Once we have thread, proc, and dask backends then it might be nice to not need separate scatters for each one. The new signature would be scatter(backend='dask')
where backend is a string which goes to a lookup table of backends. Note that with this default behavior we mirror current streamz.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.