Giter Club home page Giter Club logo

streamz_ext's Introduction

xpdAcq

test codecov Anaconda-Server Badge

Acquisition packages for XPD-28 beamline at BNL

Installation

Fork and clone this repo. Run the command in a terminal.

bash install.sh <give a name to the conda environment>

This will create a new conda environment and install the xpdacq in it.

streamz_ext's People

Stargazers

 avatar

Watchers

 avatar  avatar  avatar

streamz_ext's Issues

Future friendly filter

We need a future friendly filter. The main issue with filter is that it makes decisions based on the value of the data. This is not possible when running in future mode as the value of the data is unknown until the data is gathered. One approach for this would be to wrap the filter functions so that their futures either evaluate to a dedicated null or the incoming value. All other nodes would wrap their functions to understand the null and not compute and pass the null down. When the null gets to a gather node then the gather node does not emit the null, just as if the data was filtered.

Readme? ;)

This looks interesting, but not totally sure what it does (beyond work with streamz somehow). Some rough ideas and a few copy-paste examples would help. :)

cannot import ``plot_network`` from grave

@CJ-Wright ... making progress, but now cannot import plot_network from grave

grave is installed

$ python live_plot.py
Traceback (most recent call last):
  File "live_plot.py", line 2, in <module>
    from streamz_ext.graph import node_style, run_vis
  File "c:\users\simon\dev\streamz_ext\streamz_ext\graph.py", line 5, in <module>
    from grave import plot_network
ImportError: cannot import name 'plot_network'

visualize pipeline failed on macOS

When try to visualize the pipeline built by streaz_ext package: for example tof_source.visualize('../total_mystream.png', source_node=True).
It will have errors as following:

Traceback (most recent call last):
  File "/Users/Dragon/Documents/Billinge/dev/sidewinder-spec/sidewinder_spec/nomad/parser.py", line 147, in <module>
    from nomad.pipeline import source, filename_name_nodes
  File "/Users/Dragon/Documents/Billinge/dev/nomad/nomad/pipeline.py", line 5, in <module>
    from nomad.raw_pipeline import (length_source, tth_source, composition,
  File "/Users/Dragon/Documents/Billinge/dev/nomad/nomad/raw_pipeline.py", line 45, in <module>
    tof_source.visualize('../total_mystream.png')
  File "/Users/Dragon/anaconda/envs/pipelinePy3/lib/python3.6/site-packages/streamz_ext/core.py", line 370, in visualize
    return visualize(self, filename, source_node=source_node, **kwargs)
  File "/Users/Dragon/anaconda/envs/pipelinePy3/lib/python3.6/site-packages/streamz/graph.py", line 140, in visualize
    data = g.pipe(format=format)
  File "/Users/Dragon/anaconda/envs/pipelinePy3/lib/python3.6/site-packages/graphviz/files.py", line 96, in pipe
    data = text_type(self.source).encode(self._encoding)
  File "/Users/Dragon/anaconda/envs/pipelinePy3/lib/python3.6/site-packages/graphviz/dot.py", line 91, in __str__
    return '\n'.join(self)
  File "/Users/Dragon/anaconda/envs/pipelinePy3/lib/python3.6/site-packages/graphviz/dot.py", line 82, in __iter__
    yield '\t%s%s' % (kw, self.attributes(None, attr))
  File "/Users/Dragon/anaconda/envs/pipelinePy3/lib/python3.6/site-packages/graphviz/lang.py", line 95, in attributes
    for k, v in tools.mapping_items(kwargs) if v is not None]
  File "/Users/Dragon/anaconda/envs/pipelinePy3/lib/python3.6/site-packages/graphviz/lang.py", line 95, in <listcomp>
    for k, v in tools.mapping_items(kwargs) if v is not None]
  File "/Users/Dragon/anaconda/envs/pipelinePy3/lib/python3.6/site-packages/graphviz/lang.py", line 44, in quote
    if html(identifier):
TypeError: expected string or bytes-like object

Package used:

graphviz                  2.38.0                        7    conda-forge
graphviz                  0.5.1                     <pip>
pygraphviz                1.3.1                     <pip>

Greedy filter

It might be ok for filter to be greedy if it is greedy on the dask side so it occupies a dask process.

expose graph object

It may be nice to expose a graph object which actually expresses the entire graph as a single entity. This could also make adding/removing nodes easier as we would have networkx methods.

parallel unique

We need a parallel friendly unique node.

Something like:

  1. data0 comes in
  2. node caches future (dataC) and emits future
  3. data1 comes in
  4. node submits a function on data1 and dataC to the client. The function evaluates the values of the two futures and returns either the output of data1 or a not-unique sentinel.

I can currently see two classes of use cases for this:

  1. The "map" use case. In this use case we don't want to operate on non-unique data (maybe because the operation is expensive). Therefore the not-unique sentinel acts like a null-compute sentinel.
  2. The "join" use case. In this use case we want to join new data with data which is unique. We can't just pass a null-compute here since we would then cause all downstream nodes to run a null-compute. Instead we want to join with the latest outcome which was unique. Note that this is different than a null-compute join, which does exist: for instance if the prior node was a filter node.

For "map" like nodes we need the sentinel to tell us if we should bother computing the outcome. For "join" like nodes we need the sentinel to report the most recent unique future. Note that we can't have the "map" like nodes resolve to a null-compute since this would cause issues if a map and join nodes were in the same branch.

This could also be problematic for zip joins, since those will need to potentially pass down null-computes since they are picky about when things come in.

visualize the critical path

It might be nice to walk through the graph starting at a given node, and color the nodes by their execution order. This would allow us to find the slowest/critical path of the pipeline.

Live pipeline visualization

It would be great to visualize data's progress through the pipeline.

This could be accomplished by:

  1. Building a live updating graph plot
  2. Monkey patching the update methods on all the nodes with a function decorator. This decorator would turn the node yellow when data goes into the update and green when data came out. It may be possible to have it turn red if the data causes an exception.
def visualization_decorator(func):
    def inner(self, data):
        self.v.turn_yellow()
        try:
            func(data)
        except Exception as e:
            self.v.turn_red()
            raise e
        else:
            self.v.turn_green()

trying to run plot example but problems

Hi @CJ-Wright

We are trying to build a pipeline from the live_plot.py example in streams_ext but running into tricky issues..... it won't import node_style and run_viz even though it can find and import the module in an ipython session in the conda env. Any help appreciate.....

S

explicit linking

Currently we handle sub-pipeline linking by connecting nodes with the same name. This causes a bunch of issues, mostly because connect is a little bit broken due to how our stream type inheritance works (dask vs standard backend).

A better approach may be to explicitly request certain nodes incoming and then inherit from those. This way we don't need to use connect and the nodes will properly inherit their type at pipeline creation time. This also simplifies the linker's implementation by requiring a mapping of arg/kwarg names to input streams and a list of functions which take those args and kwargs and return dicts.

def linker(initial_inputs, list_sub_funcs):
    namespace = initial_inputs
    for sub_func in list_sub_funcs:
        namespace.update(sub_func(**namespace))
    return namespace

Note that this requires all sub_funcs to allow for arbitrary kwargs. We could also inspect the signature, but that seems more brittle.

Change scatter signature

Once we have thread, proc, and dask backends then it might be nice to not need separate scatters for each one. The new signature would be scatter(backend='dask') where backend is a string which goes to a lookup table of backends. Note that with this default behavior we mirror current streamz.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.