irc-sphere / hyperstream Goto Github PK
View Code? Open in Web Editor NEWHyperStream
Home Page: https://irc-sphere.github.io/HyperStream/
License: MIT License
HyperStream
Home Page: https://irc-sphere.github.io/HyperStream/
License: MIT License
Would make it easier to check for errors, and remove special cases such as alignment_node and splitting_stream. However would require all tools to be rewritten.
logging output:
======================================================================
ERROR: test_new_api_overlapping_plates (tests.test_workflows.HyperStreamWorkflowTests)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/home/travis/build/IRC-SPHERE/HyperStream/tests/test_workflows.py", line 151, in test_new_api_overlapping_plates
w.execute(time_interval)
File "/home/travis/build/IRC-SPHERE/HyperStream/hyperstream/workflow/workflow.py", line 89, in execute
factor.execute(time_interval)
File "/home/travis/build/IRC-SPHERE/HyperStream/hyperstream/factor/factor.py", line 172, in execute
alignment_stream=self.get_alignment_stream(None, None))
File "/home/travis/build/IRC-SPHERE/HyperStream/hyperstream/tool/tool.py", line 78, in execute
sources=sources, alignment_stream=alignment_stream, interval=interval):
File "plugins/example/tools/dummy/2017-06-16_v1.0.0.py", line 36, in _execute
for d in data:
File "/home/travis/build/IRC-SPHERE/HyperStream/hyperstream/stream/stream_view.py", line 58, in __iter__
self.stream.parent_node.factor.execute(interval)
File "/home/travis/build/IRC-SPHERE/HyperStream/hyperstream/factor/factor.py", line 167, in execute
raise NotImplementedError
NotImplementedError:
Test coverage no longer working, most likely due to changes in the codeclimate API. See here for details.
These are both out of date:
Dependency | Resolved | Newest |
---|---|---|
mongoengine | 0.13.0 | 0.14.3 |
pymongo | 3.4.0 | 3.5.1 |
An upgrade and testing is recommended
logging output:
WARNING:root:Stream session: [uuid=254ac60a-70bb-11e7-9956-42010a0a149f] not available for time interval (0001-01-01 00:00:00+00:00, 2017-07-24 21:57:54.507000+00:00]. Perhaps upstream calculations haven't been performed
At the moment is that all of the nodes have to be created up front (w.create_node(stream_name, channel_id, plate_ids)
) and, and since we also store a dict of stream_name->node at the same time (N
), they can then be accessed during workflow creation using N['whatever']
. I'd actually like to get rid of this, so that you can use notation like the following:
X = hs.plate_manager.plates["X"]
with hs.create_workflow(**kwargs) as w:
for x in X:
my_node[x] = hs.factors.my_tool(w, sources=[])
Of course this can't work because my_node
hasn't yet been defined. To define it, we need to know what channel it belongs in, and take a reference to the workflow. One way to do this would be to put references to the channels in the workflow, so the last line would become:
w.channels.memory.my_node[x] = hs.factors.my_tool(w, sources=[])
although I'm not sure if that's going to work either (well it can, but it will be ugly). Actually, there are two references to the workflow on that line (one for the node, and one for the factor), so perhaps old way of referencing the channel could be used:
M['my_node'][x] = hs.factors.my_tool(w, sources=[])
or another alternative would be to move the channel specification to the right, e.g.:
w.nodes[x] = hs.factors.my_tool(w, sources=[], channel='memory')
Logging output:
root: INFO: New workflow created with id test_new_api_overlapping_plates
root: INFO: Added workflow test_new_api_overlapping_plates to workflow manager
root: INFO: Added node with id ticker containing 1 streams
root: INFO: Added node with id rr1 containing 4 streams
root: INFO: Added node with id rr2 containing 4 streams
root: INFO: Added node with id rp containing 16 streams
root: INFO: Added node with id rpc containing 16 streams
root: INFO: Added factor with tool Clock(first=datetime.datetime(1, 1, 1, 0, 0, 0, 0, tzinfo=UTC, stride=1.0)
root: INFO: Added factor with tool Random(seed=1)
root: INFO: Added factor with tool Random(seed=2)
root: INFO: Added factor with tool Product()
root: INFO: Added factor with tool Dummy()
root: INFO: Dummy with sink node rpc running from 2016-04-28 20:00:00+00:00 to 2016-04-28 20:01:00+00:00
root: INFO: Product with sink node rp running from 2016-04-28 20:00:00+00:00 to 2016-04-28 20:01:00+00:00
--------------------- >> end captured logging << ---------------------
mongoengine/base/fields.py RuntimeError: dictionary changed size during iteration
It might be nice to have some helpers for creating plugins and/or tools. Ideally these would just be small python scripts (could also be bash scripts, but that'd exclude non-bash environments). The create plugin tool would create a python package as in tutorial 2:
|- one_plugin/
| |- __init__.py
| |- tools/
| |- __init__.py
And the create tool would do the final part of creating a file with the appropriate name (default today's date and v0.0.1):
| |- my_fancy_tool
| |- __init__.py
| |- 2017-06-20_v0.0.1.py
with some default contents, e.g.:
from hyperstream import Tool, StreamInstance
from hyperstream.utils import check_input_stream_count
class MyFancyTool(Tool):
def __init__(self):
super(MyFancyTool, self).__init__()
@check_input_stream_count(0)
def _execute(self, sources, alignment_stream, interval):
raise NotImplementedError()
I think that it would be nice to be able to print all the elements involved in a composition of streams. If we use Graphviz, it would be possible to create the directed graph given that we know all the information about the nodes. Then, it is possible to output the graph in a variety of formats that are supported by Graphviz, like bitmaps, vectorial images, json, plain text, and including direct compatibility with Jupyter Notebook.
Here is an example of a portion of the tutorial that I am writing. In this case, being able to visualize the graph would be beneficial to understand how it works.
According to the example in plate_manager.py, the format to read plate definitions should be:
{'H1': [(('house', '1'),)]}
that is: identifier[tag:data].
However, for the current implementation, it is: tag[identifier:data].
( in container.py ('{0}[{1}:{2}]'.format(display_value, self[nid].identifier, str(self[nid].data))))
So I just wonder if the previous one makes more sense.
For instance, for tutorial 5:
country[country_NZ:NZ]
country[country_Australia:Australia]
We can change it to:
country_NZ[country:NZ]
country_Australia[country:Australia]
We need more unit tests: at the moment coverage is down at 50%. In particular:
Logging output
INFO:root:New workflow created with id test_save_workflow
INFO:root:Added node with id ticker containing 1 streams
INFO:root:Added factor with tool Clock(first=datetime.datetime(1, 1, 1, 0, 0, 0, 0, tzinfo=UTC, stride=1.0)
INFO:root:Added workflow test_save_workflow to workflow manager
INFO:root:Plate T1 not found for deletion
INFO:root:Meta data test_3.nested_test_A deleted
INFO:root:Meta data test_2.nested_test_D deleted
INFO:root:Meta data test_2.nested_test_C deleted
INFO:root:Meta data test_2.nested_test_B deleted
INFO:root:Meta data test_2.nested_test_A deleted
INFO:root:Meta data test_1.nested_test_D deleted
INFO:root:Meta data test_1.nested_test_C deleted
INFO:root:Meta data test_0.nested_test_C deleted
INFO:root:Meta data test_0.nested_test_B deleted
INFO:root:Meta data test_0.nested_test_A deleted
INFO:root:Plate T.U deleted
INFO:root:Meta data test_3 deleted
INFO:root:Meta data test_2 deleted
INFO:root:Meta data test_1 deleted
INFO:root:Meta data test_0 deleted
INFO:root:Plate T deleted
INFO:root:Meta data test_overlap_3 deleted
INFO:root:Meta data test_overlap_2 deleted
INFO:root:Meta data test_overlap_1 deleted
INFO:root:Meta data test_overlap_0 deleted
INFO:root:Plate V deleted
======================================================================
ERROR: test_new_api_overlapping_plates (tests.test_workflows.HyperStreamWorkflowTests)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/home/travis/build/IRC-SPHERE/HyperStream/tests/test_workflows.py", line 151, in test_new_api_overlapping_plates
w.execute(time_interval)
File "/home/travis/build/IRC-SPHERE/HyperStream/hyperstream/workflow/workflow.py", line 89, in execute
factor.execute(time_interval)
File "/home/travis/build/IRC-SPHERE/HyperStream/hyperstream/factor/factor.py", line 172, in execute
alignment_stream=self.get_alignment_stream(None, None))
File "/home/travis/build/IRC-SPHERE/HyperStream/hyperstream/tool/tool.py", line 78, in execute
sources=sources, alignment_stream=alignment_stream, interval=interval):
File "plugins/example/tools/dummy/2017-06-16_v1.0.0.py", line 36, in _execute
for d in data:
File "/home/travis/build/IRC-SPHERE/HyperStream/hyperstream/stream/stream_view.py", line 58, in __iter__
self.stream.parent_node.factor.execute(interval)
File "/home/travis/build/IRC-SPHERE/HyperStream/hyperstream/factor/factor.py", line 156, in execute
if list(sorted(itertools.chain(*[s.plates for s in self.sources]))) == self.plates:
TypeError: '<' not supported between instances of 'Plate' and 'Plate'
logging output - would expect the Product and Dummy to only be created once:
INFO:root:Added factor with tool Product()
DEBUG:root:Defining a Dummy tool
INFO:root:Added factor with tool Dummy()
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Dummy tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Dummy tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Dummy tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Dummy tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Dummy tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Dummy tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Dummy tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Dummy tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Dummy tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Dummy tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Dummy tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Dummy tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Dummy tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Dummy tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Product tool
DEBUG:root:Defining a Dummy tool
As I just reinstall my Ubuntu I tried to run Tutorial 1 again for a test.
The issue occurs in the (Executing a new interval) section, which I think is caused by the precision of utcnow() defined in hyperstream/utils/time_utils.py.
It is interesting I didn't have this issue in the past...
For instance:
interval.end = datetime.datetime(2017, 8, 18, 9, 41, 14, 262064, tzinfo=)
utcnow() = datetime.datetime(2017, 8, 18, 9, 41, 14, 262000, tzinfo=)
137 if interval.end > utcnow():
138 print [interval.end, utcnow()]
--> 139 raise ValueError("Calculated intervals should not be in the future")
140
141 self._calculated_intervals = value
ValueError: Calculated intervals should not be in the future
Currently the tools can now be accessed using shortcut methods: e.g. given a hyperstream instance hs
hs.tools.clock()
and plugins are accessed in a slightly more convoluted way:
hs.plugins.data_generators.tools.random()
This could be changed to make plugins a bit easier to access, at the cost of the core tools, like so:
hs.tools.core.clock()
hs.tools.data_generators.random()
Is this better or should we stick with what we've got?
Now that the documentation is automatically populated on readthedocs, it would be a good time to improve the documentation. Some tasks:
It is not clear to me what is the role of MQTT in a general framework as HyperStream.
I understand that the original reason is because of our internal use in an Internet of Thinks environment where communication is driven by MQTT, but this requirement may be a barrier for the general user.
Would you think it should be activated optionally or change it to a plugin?
If it is really a required part of HyperStream, we should advertise it in this context then.
In master branch, I do not manage to pass the test_loggers in this lines.
I am not sure what is the problem, as when I pushed my master branch Travis succeeds.
======================================================================
FAIL: Test the MQTT logger using the standard format
----------------------------------------------------------------------
Traceback (most recent call last):
File "/home/hyperstream/sphere-hyperstream/hyperstream_core/tests/test_loggers.py", line 64, in test_mqtt_logger
assert(str(client.last_messages["topics/test"].decode("utf8")[24:]) == '[MON ] 1234567890')
AssertionError:
-------------------- >> begin captured stdout << ---------------------
Connected with result code 0
topics/test 2018-02-21 17:44:43.232000+00:00 ABC
{u'topics/test': '2018-02-21 17:44:43.232000+00:00 ABC'}
00+00:00 ABC
We currently have a long list of (poorly documented) tools:
aggregate
aggregate_into_dict_and_apply
aggregate_plate
aligned_merge
aligning_window
apply
asset_plate_generator
asset_splitter
asset_writer
clock
component
component_filter
component_set
dict_argmax
dict_values_to_const
histogram_from_list
histograms_to_csv
index_of
index_of_by_stream
jsonify
list_dict_mean
list_dict_sum
list_length
list_mean
list_sum
meta_instance
meta_instance_from_list
percentiles_from_list
percentiles_to_csv
plate_sink
product
relative_apply
relative_apply2
relative_window
sink
slice
sliding_apply
sliding_listify
sliding_sink
sliding_window
splitter
splitter_from_stream
splitter_of_dict
splitter_of_list
splitter_time_aware
splitter_time_aware_from_stream
splitter_with_unlist
stream_broadcaster
stream_broadcaster_from_stream
We should attach usage numbers to each of these for sphere-hyperstream, and determine whether any should be pruned/renamed. For those that the remain the documentation should be improved.
Should we create a script with a contained runtest maybe using Docker containers? Something like this runtest.sh
#!/bin/bash
echo "Activating virtual environment"
. ./venv/bin/activate
echo "Starting MongoDB server"
sudo service mongodb start
#
#MONGODB_CONTAINER=`docker run --name hs_mongodb -p 27017:27017 -d library/mongo`
## Wait for the MONGODB port to be available
#until nc -z $(docker inspect --format='{{.NetworkSettings.IPAddress}}' $MONGODB_CONTAINER) 27017
#do
# echo "waiting for docker MongoDB container..."
# sleep 0.5
#done
echo "Starting docker MQTT container"
MQTT_CONTAINER=`docker run --name hs_mqtt -d -ti -p 1883:1883 -p 9001:9001 toke/mosquitto`
# Wait for the MQTT port to be available
until nc -z $(docker inspect --format='{{.NetworkSettings.IPAddress}}' $MQTT_CONTAINER) 9001
do
echo "waiting for docker MQTT container..."
sleep 0.5
done
nosetests --with-coverage
echo "Stopping docker MQTT container"
docker stop hs_mqtt
echo "Removing docker MQTT container"
docker rm hs_mqtt
echo "Stopping MongoDB server"
sudo service mongodb stop
I had to comment the MongoDB container and run my own version of MongoDB because I don't have enough space in my root.
Began work on python3 compatibility (see the python3 branch). This requires extensive testing.
Some of the tools currently have confusing names. An idea would be to try to align the tool names to common names in python. For example, the apply
tool applies a function to the values in a stream, which is what python's map
does, so perhaps this tool should just be called map
. Note that the actual tool class will be Map
, so there's no conflict with builtins.
N.B. in this case one could argue that this should actually be map_values
(and MapValues
) since it's only applying the function to the values, and there could be corresponding map_timestamps
and map_instances
tools.
This is problematic for deployed hyperstream instances, since workflows that depended on those tools would now be broken. However this is probably the right time to do it before there are too many deployed instances.
What are the implications of using Streams of data with a timestamp in the Future (or in the present moment)?
At this moment, if you ask for a Stream to be computed for the current time it will raise the following Exception
File "some_path/site-packages/hyperstream/stream/stream_instance.py", line 39, in __new__
raise ValueError("Timestamp {} should not be in the future!".format(timestamp))
ValueError: Timestamp 2017-08-22 14:05:00.308326+00:00 should not be in the future!
I can think of two cases where it could be interesting to allow Streams in the future:
See e.g. here
Logging output:
======================================================================
FAIL: test_data_importers (tests.test_tools.TestTools)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/home/travis/build/IRC-SPHERE/HyperStream/tests/test_tools.py", line 388, in test_data_importers
assert_all_close(sea_ice_sums.window().values(), list(map(sum, sea_ice.window().values())), 1e-5)
File "/home/travis/build/IRC-SPHERE/HyperStream/tests/helpers.py", line 203, in assert_all_close
.format(i, x, y, prec=prec))
AssertionError: Elements not equal at location 0. a = 15.48000, b = 0.00000
-------------------- >> begin captured logging << --------------------
root: DEBUG: creating session: [uuid=49c40ada-726b-11e7-a3c3-42010a0a140e]
root: DEBUG: Defining a CsvReader tool with parameters {'filename': 'plugins/data_importers/data/sea_ice.csv'}
root: DEBUG: creating sea_ice
root: DEBUG: creating sea_ice_sums
root: DEBUG: Defining a ListSum tool
root: DEBUG: set calculated intervals
--------------------- >> end captured logging << ---------------------
Can't really understand why b = 0.00000 there. The same test runs fine on OS-X also with python 3.6.1. For now I've commented the offending line (last line of test)
Contents of the test:
def test_data_importers(self):
with HyperStream(file_logger=False, console_logger=False, mqtt_logger=None) as hs:
reader = hs.plugins.data_importers.tools.csv_reader('plugins/data_importers/data/sea_ice.csv')
# noinspection PyTypeChecker
ti = TimeInterval(datetime(1990, 1, 1).replace(tzinfo=UTC), datetime(2011, 4, 1).replace(tzinfo=UTC))
# TODO: More complicated tests, including headers, different delimiters, messy data etc etc.
sea_ice = hs.channel_manager.memory.get_or_create_stream("sea_ice")
reader.execute(sources=[], sink=sea_ice, interval=ti)
sea_ice_sums = hs.channel_manager.mongo.get_or_create_stream("sea_ice_sums")
hs.tools.list_sum().execute(sources=[sea_ice], sink=sea_ice_sums, interval=ti)
# print(sea_ice_sums.window().values())
# TODO: the below assertion is causing travis to fail - why?
assert_all_close(sea_ice_sums.window().values(), list(map(sum, sea_ice.window().values())), 1e-5)
The function split from the class TimeIntervals raises an exception when passing a list of about 990 points.
@profile
def split(self, points):
if len(points) == 0:
return
p = points[-1]
for i in range(len(self.intervals)):
if (self.intervals[i].start < p) and (self.intervals[i].end > p):
self.intervals = self.intervals[:i] \
+ [TimeInterval(self.intervals[i].start, p), TimeInterval(p, self.intervals[i].end)] \
+ self.intervals[(i + 1):]
self.split(points[:-1])
This is because there is a limit in the recursion in Python to avoid possible overflows.
We should change the recursive call to a for loop.
Before I do the modification, do you see any reason why it was a recursive call from the first instance that I should consider?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.