Giter Club home page Giter Club logo

hyperstream's People

Contributors

gitter-badger avatar meeliskull avatar mh17310 avatar perellonieto avatar so-cool avatar sphere-deploy avatar srceh avatar tdiethe avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

hyperstream's Issues

sporadic failure of python 2 unit test (always on travis)

logging output:

======================================================================
ERROR: test_new_api_overlapping_plates (tests.test_workflows.HyperStreamWorkflowTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/travis/build/IRC-SPHERE/HyperStream/tests/test_workflows.py", line 151, in test_new_api_overlapping_plates
    w.execute(time_interval)
  File "/home/travis/build/IRC-SPHERE/HyperStream/hyperstream/workflow/workflow.py", line 89, in execute
    factor.execute(time_interval)
  File "/home/travis/build/IRC-SPHERE/HyperStream/hyperstream/factor/factor.py", line 172, in execute
    alignment_stream=self.get_alignment_stream(None, None))
  File "/home/travis/build/IRC-SPHERE/HyperStream/hyperstream/tool/tool.py", line 78, in execute
    sources=sources, alignment_stream=alignment_stream, interval=interval):
  File "plugins/example/tools/dummy/2017-06-16_v1.0.0.py", line 36, in _execute
    for d in data:
  File "/home/travis/build/IRC-SPHERE/HyperStream/hyperstream/stream/stream_view.py", line 58, in __iter__
    self.stream.parent_node.factor.execute(interval)
  File "/home/travis/build/IRC-SPHERE/HyperStream/hyperstream/factor/factor.py", line 167, in execute
    raise NotImplementedError
NotImplementedError: 

Upgrade pymongo and mongoengine

These are both out of date:

Dependency Resolved Newest
mongoengine 0.13.0 0.14.3
pymongo 3.4.0 3.5.1

An upgrade and testing is recommended

warning in unit test for sessions

logging output:

WARNING:root:Stream session: [uuid=254ac60a-70bb-11e7-9956-42010a0a149f] not available for time interval (0001-01-01 00:00:00+00:00, 2017-07-24 21:57:54.507000+00:00]. Perhaps upstream calculations haven't been performed

Node creation syntax

At the moment is that all of the nodes have to be created up front (w.create_node(stream_name, channel_id, plate_ids)) and, and since we also store a dict of stream_name->node at the same time (N), they can then be accessed during workflow creation using N['whatever']. I'd actually like to get rid of this, so that you can use notation like the following:

X = hs.plate_manager.plates["X"]

with hs.create_workflow(**kwargs) as w:
    for x in X:
        my_node[x] = hs.factors.my_tool(w, sources=[])

Of course this can't work because my_node hasn't yet been defined. To define it, we need to know what channel it belongs in, and take a reference to the workflow. One way to do this would be to put references to the channels in the workflow, so the last line would become:

        w.channels.memory.my_node[x] = hs.factors.my_tool(w, sources=[])

although I'm not sure if that's going to work either (well it can, but it will be ugly). Actually, there are two references to the workflow on that line (one for the node, and one for the factor), so perhaps old way of referencing the channel could be used:

     M['my_node'][x] = hs.factors.my_tool(w, sources=[])

or another alternative would be to move the channel specification to the right, e.g.:

    w.nodes[x] = hs.factors.my_tool(w, sources=[], channel='memory')

unit test failing on travis ci

Logging output:

root: INFO: New workflow created with id test_new_api_overlapping_plates
root: INFO: Added workflow test_new_api_overlapping_plates to workflow manager
root: INFO: Added node with id ticker containing 1 streams
root: INFO: Added node with id rr1 containing 4 streams
root: INFO: Added node with id rr2 containing 4 streams
root: INFO: Added node with id rp containing 16 streams
root: INFO: Added node with id rpc containing 16 streams
root: INFO: Added factor with tool Clock(first=datetime.datetime(1, 1, 1, 0, 0, 0, 0, tzinfo=UTC, stride=1.0) 
root: INFO: Added factor with tool Random(seed=1) 
root: INFO: Added factor with tool Random(seed=2) 
root: INFO: Added factor with tool Product() 
root: INFO: Added factor with tool Dummy() 
root: INFO: Dummy with sink node rpc running from 2016-04-28 20:00:00+00:00 to 2016-04-28 20:01:00+00:00
root: INFO: Product with sink node rp running from 2016-04-28 20:00:00+00:00 to 2016-04-28 20:01:00+00:00
--------------------- >> end captured logging << ---------------------
mongoengine/base/fields.py   RuntimeError: dictionary changed size during iteration

Plugin/Tool helpers

It might be nice to have some helpers for creating plugins and/or tools. Ideally these would just be small python scripts (could also be bash scripts, but that'd exclude non-bash environments). The create plugin tool would create a python package as in tutorial 2:

|- one_plugin/
|   |- __init__.py
|   |- tools/
|       |- __init__.py

And the create tool would do the final part of creating a file with the appropriate name (default today's date and v0.0.1):

|       |- my_fancy_tool
|           |- __init__.py
|           |- 2017-06-20_v0.0.1.py

with some default contents, e.g.:

from hyperstream import Tool, StreamInstance
from hyperstream.utils import check_input_stream_count

class MyFancyTool(Tool):
    def __init__(self):
        super(MyFancyTool, self).__init__()

    @check_input_stream_count(0)
    def _execute(self, sources, alignment_stream, interval):
        raise NotImplementedError()

Graphviz visualization of streams and factors

I think that it would be nice to be able to print all the elements involved in a composition of streams. If we use Graphviz, it would be possible to create the directed graph given that we know all the information about the nodes. Then, it is possible to output the graph in a variety of formats that are supported by Graphviz, like bitmaps, vectorial images, json, plain text, and including direct compatibility with Jupyter Notebook.

Here is an example of a portion of the tutorial that I am writing. In this case, being able to visualize the graph would be beneficial to understand how it works.

graphviz_hyperstream

Format for printing plate definition

According to the example in plate_manager.py, the format to read plate definitions should be:

{'H1': [(('house', '1'),)]}

that is: identifier[tag:data].

However, for the current implementation, it is: tag[identifier:data].

( in container.py ('{0}[{1}:{2}]'.format(display_value, self[nid].identifier, str(self[nid].data))))

So I just wonder if the previous one makes more sense.

For instance, for tutorial 5:

country[country_NZ:NZ]
country[country_Australia:Australia]

We can change it to:

country_NZ[country:NZ]
country_Australia[country:Australia]

unit tests

We need more unit tests: at the moment coverage is down at 50%. In particular:

  • tests for every channel
  • tests for every (core) tool

python 3 unit test failing on travis (2)

Logging output

INFO:root:New workflow created with id test_save_workflow
INFO:root:Added node with id ticker containing 1 streams
INFO:root:Added factor with tool Clock(first=datetime.datetime(1, 1, 1, 0, 0, 0, 0, tzinfo=UTC, stride=1.0) 
INFO:root:Added workflow test_save_workflow to workflow manager
INFO:root:Plate T1 not found for deletion
INFO:root:Meta data test_3.nested_test_A deleted
INFO:root:Meta data test_2.nested_test_D deleted
INFO:root:Meta data test_2.nested_test_C deleted
INFO:root:Meta data test_2.nested_test_B deleted
INFO:root:Meta data test_2.nested_test_A deleted
INFO:root:Meta data test_1.nested_test_D deleted
INFO:root:Meta data test_1.nested_test_C deleted
INFO:root:Meta data test_0.nested_test_C deleted
INFO:root:Meta data test_0.nested_test_B deleted
INFO:root:Meta data test_0.nested_test_A deleted
INFO:root:Plate T.U deleted
INFO:root:Meta data test_3 deleted
INFO:root:Meta data test_2 deleted
INFO:root:Meta data test_1 deleted
INFO:root:Meta data test_0 deleted
INFO:root:Plate T deleted
INFO:root:Meta data test_overlap_3 deleted
INFO:root:Meta data test_overlap_2 deleted
INFO:root:Meta data test_overlap_1 deleted
INFO:root:Meta data test_overlap_0 deleted
INFO:root:Plate V deleted
======================================================================
ERROR: test_new_api_overlapping_plates (tests.test_workflows.HyperStreamWorkflowTests)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/travis/build/IRC-SPHERE/HyperStream/tests/test_workflows.py", line 151, in test_new_api_overlapping_plates
    w.execute(time_interval)
  File "/home/travis/build/IRC-SPHERE/HyperStream/hyperstream/workflow/workflow.py", line 89, in execute
    factor.execute(time_interval)
  File "/home/travis/build/IRC-SPHERE/HyperStream/hyperstream/factor/factor.py", line 172, in execute
    alignment_stream=self.get_alignment_stream(None, None))
  File "/home/travis/build/IRC-SPHERE/HyperStream/hyperstream/tool/tool.py", line 78, in execute
    sources=sources, alignment_stream=alignment_stream, interval=interval):
  File "plugins/example/tools/dummy/2017-06-16_v1.0.0.py", line 36, in _execute
    for d in data:
  File "/home/travis/build/IRC-SPHERE/HyperStream/hyperstream/stream/stream_view.py", line 58, in __iter__
    self.stream.parent_node.factor.execute(interval)
  File "/home/travis/build/IRC-SPHERE/HyperStream/hyperstream/factor/factor.py", line 156, in execute
    if list(sorted(itertools.chain(*[s.plates for s in self.sources]))) == self.plates:
TypeError: '<' not supported between instances of 'Plate' and 'Plate'

plate cache not working correctly?

logging output - would expect the Product and Dummy to only be created once:

INFO:root:Added factor with tool Product() 
DEBUG:root:Defining a Dummy tool
INFO:root:Added factor with tool Dummy() 
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Dummy tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Dummy tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Dummy tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Dummy tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Dummy tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Dummy tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Dummy tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Dummy tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Dummy tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Dummy tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Dummy tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Dummy tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Dummy tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Dummy tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Dummy tool

ti.end in the future issue with Tutorial 1. (related to precision defined in /utils/time_utils.py)

As I just reinstall my Ubuntu I tried to run Tutorial 1 again for a test.

The issue occurs in the (Executing a new interval) section, which I think is caused by the precision of utcnow() defined in hyperstream/utils/time_utils.py.

It is interesting I didn't have this issue in the past...

For instance:

interval.end = datetime.datetime(2017, 8, 18, 9, 41, 14, 262064, tzinfo=)
utcnow() = datetime.datetime(2017, 8, 18, 9, 41, 14, 262000, tzinfo=)

137             if interval.end > utcnow():
138                 print [interval.end, utcnow()]

--> 139 raise ValueError("Calculated intervals should not be in the future")
140
141 self._calculated_intervals = value

ValueError: Calculated intervals should not be in the future

Shortcut methods for tools

Currently the tools can now be accessed using shortcut methods: e.g. given a hyperstream instance hs

hs.tools.clock()

and plugins are accessed in a slightly more convoluted way:

hs.plugins.data_generators.tools.random()

This could be changed to make plugins a bit easier to access, at the cost of the core tools, like so:

hs.tools.core.clock()
hs.tools.data_generators.random()

Is this better or should we stick with what we've got?

Improve documentation

Now that the documentation is automatically populated on readthedocs, it would be a good time to improve the documentation. Some tasks:

  • introductory documentation
  • documentation for each package/module (rather than just classes and methods)
  • fix bugs such as missing spaces between text and parameters

Should MQTT be an strict requirement of HyperStream?

It is not clear to me what is the role of MQTT in a general framework as HyperStream.
I understand that the original reason is because of our internal use in an Internet of Thinks environment where communication is driven by MQTT, but this requirement may be a barrier for the general user.

Would you think it should be activated optionally or change it to a plugin?

If it is really a required part of HyperStream, we should advertise it in this context then.

Can not pass test_loggers unittest in local machine

In master branch, I do not manage to pass the test_loggers in this lines.

I am not sure what is the problem, as when I pushed my master branch Travis succeeds.

======================================================================
FAIL: Test the MQTT logger using the standard format
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/hyperstream/sphere-hyperstream/hyperstream_core/tests/test_loggers.py", line 64, in test_mqtt_logger
    assert(str(client.last_messages["topics/test"].decode("utf8")[24:]) == '[MON  ]  1234567890')
AssertionError: 
-------------------- >> begin captured stdout << ---------------------
Connected with result code 0
topics/test 2018-02-21 17:44:43.232000+00:00 ABC
{u'topics/test': '2018-02-21 17:44:43.232000+00:00 ABC'}
00+00:00 ABC

Tool Audit

We currently have a long list of (poorly documented) tools:

aggregate
aggregate_into_dict_and_apply
aggregate_plate
aligned_merge
aligning_window
apply
asset_plate_generator
asset_splitter
asset_writer
clock
component
component_filter
component_set
dict_argmax
dict_values_to_const
histogram_from_list
histograms_to_csv
index_of
index_of_by_stream
jsonify
list_dict_mean
list_dict_sum
list_length
list_mean
list_sum
meta_instance
meta_instance_from_list
percentiles_from_list
percentiles_to_csv
plate_sink
product
relative_apply
relative_apply2
relative_window
sink
slice
sliding_apply
sliding_listify
sliding_sink
sliding_window
splitter
splitter_from_stream
splitter_of_dict
splitter_of_list
splitter_time_aware
splitter_time_aware_from_stream
splitter_with_unlist
stream_broadcaster
stream_broadcaster_from_stream

We should attach usage numbers to each of these for sphere-hyperstream, and determine whether any should be pruned/renamed. For those that the remain the documentation should be improved.

Add runtest script with all the dependencies

Should we create a script with a contained runtest maybe using Docker containers? Something like this runtest.sh

#!/bin/bash

echo "Activating virtual environment"
. ./venv/bin/activate

echo "Starting MongoDB server"
sudo service mongodb start
#
#MONGODB_CONTAINER=`docker run --name hs_mongodb -p 27017:27017 -d library/mongo`
## Wait for the MONGODB port to be available
#until nc -z $(docker inspect --format='{{.NetworkSettings.IPAddress}}' $MONGODB_CONTAINER) 27017
#do
#    echo "waiting for docker MongoDB container..."
#    sleep 0.5
#done

echo "Starting docker MQTT container"
MQTT_CONTAINER=`docker run --name hs_mqtt -d -ti -p 1883:1883 -p 9001:9001 toke/mosquitto`
# Wait for the MQTT port to be available
until nc -z $(docker inspect --format='{{.NetworkSettings.IPAddress}}' $MQTT_CONTAINER) 9001
do
    echo "waiting for docker MQTT container..."
    sleep 0.5
done
nosetests --with-coverage

echo "Stopping docker MQTT container"
docker stop hs_mqtt
echo "Removing docker MQTT container"
docker rm hs_mqtt
echo "Stopping MongoDB server"
sudo service mongodb stop

I had to comment the MongoDB container and run my own version of MongoDB because I don't have enough space in my root.

python3 support

Began work on python3 compatibility (see the python3 branch). This requires extensive testing.

Better naming of tools

Some of the tools currently have confusing names. An idea would be to try to align the tool names to common names in python. For example, the apply tool applies a function to the values in a stream, which is what python's map does, so perhaps this tool should just be called map. Note that the actual tool class will be Map, so there's no conflict with builtins.

N.B. in this case one could argue that this should actually be map_values (and MapValues) since it's only applying the function to the values, and there could be corresponding map_timestamps and map_instances tools.

This is problematic for deployed hyperstream instances, since workflows that depended on those tools would now be broken. However this is probably the right time to do it before there are too many deployed instances.

Limitation of Streams in the past [General discussion]

What are the implications of using Streams of data with a timestamp in the Future (or in the present moment)?

At this moment, if you ask for a Stream to be computed for the current time it will raise the following Exception

File "some_path/site-packages/hyperstream/stream/stream_instance.py", line 39, in __new__
    raise ValueError("Timestamp {} should not be in the future!".format(timestamp))
ValueError: Timestamp 2017-08-22 14:05:00.308326+00:00 should not be in the future!

I can think of two cases where it could be interesting to allow Streams in the future:

  1. Asking a classifier tool to train from now till 1 hour in the future.
    • It would be nice to tell to a classifier, given the data that is given from a specific source stream, keep training until the specified time.
    • E.g. Some real time from stock exchanges that gets to a real time stream and that keeps yielding data. The model could take this data every time that it is available and train.
  2. Some dataset where the timestamps are in the future.
    • I am not sure how plausible this scenario is.
    • But I imagine that if someone wants to use a stream that outputs data from some particular future time.
  3. Asking a tool for predictions in the future
    • A model that makes predictions for the future, given that it has been already trained with past data.

test_data_importers fails on travis python 3.6.1 only

See e.g. here

Logging output:

======================================================================
FAIL: test_data_importers (tests.test_tools.TestTools)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/travis/build/IRC-SPHERE/HyperStream/tests/test_tools.py", line 388, in test_data_importers
    assert_all_close(sea_ice_sums.window().values(), list(map(sum, sea_ice.window().values())), 1e-5)
  File "/home/travis/build/IRC-SPHERE/HyperStream/tests/helpers.py", line 203, in assert_all_close
    .format(i, x, y, prec=prec))
AssertionError: Elements not equal at location 0. a = 15.48000, b = 0.00000
-------------------- >> begin captured logging << --------------------
root: DEBUG: creating session: [uuid=49c40ada-726b-11e7-a3c3-42010a0a140e]
root: DEBUG: Defining a CsvReader tool with parameters {'filename': 'plugins/data_importers/data/sea_ice.csv'}
root: DEBUG: creating sea_ice
root: DEBUG: creating sea_ice_sums
root: DEBUG: Defining a ListSum tool
root: DEBUG: set calculated intervals
--------------------- >> end captured logging << ---------------------

Can't really understand why b = 0.00000 there. The same test runs fine on OS-X also with python 3.6.1. For now I've commented the offending line (last line of test)

Contents of the test:

    def test_data_importers(self):
        with HyperStream(file_logger=False, console_logger=False, mqtt_logger=None) as hs:
            reader = hs.plugins.data_importers.tools.csv_reader('plugins/data_importers/data/sea_ice.csv')
            # noinspection PyTypeChecker
            ti = TimeInterval(datetime(1990, 1, 1).replace(tzinfo=UTC), datetime(2011, 4, 1).replace(tzinfo=UTC))

            # TODO: More complicated tests, including headers, different delimiters, messy data etc etc.
            sea_ice = hs.channel_manager.memory.get_or_create_stream("sea_ice")

            reader.execute(sources=[], sink=sea_ice, interval=ti)

            sea_ice_sums = hs.channel_manager.mongo.get_or_create_stream("sea_ice_sums")
            hs.tools.list_sum().execute(sources=[sea_ice], sink=sea_ice_sums, interval=ti)

            # print(sea_ice_sums.window().values())

            # TODO: the below assertion is causing travis to fail - why?
            assert_all_close(sea_ice_sums.window().values(), list(map(sum, sea_ice.window().values())), 1e-5)

time_interval split recursion limit

The function split from the class TimeIntervals raises an exception when passing a list of about 990 points.

    @profile
    def split(self, points):
        if len(points) == 0:
            return
        p = points[-1]
        for i in range(len(self.intervals)):
            if (self.intervals[i].start < p) and (self.intervals[i].end > p):
                self.intervals = self.intervals[:i] \
                                 + [TimeInterval(self.intervals[i].start, p), TimeInterval(p, self.intervals[i].end)] \
                                 + self.intervals[(i + 1):]
        self.split(points[:-1])

This is because there is a limit in the recursion in Python to avoid possible overflows.

We should change the recursive call to a for loop.

Before I do the modification, do you see any reason why it was a recursive call from the first instance that I should consider?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.