nytlabs / streamtools Goto Github PK
View Code? Open in Web Editor NEWtools for working with streams of data
Home Page: http://nytlabs.github.io/streamtools/
License: Apache License 2.0
tools for working with streams of data
Home Page: http://nytlabs.github.io/streamtools/
License: Apache License 2.0
...this results in the command line never becoming available again, because count and fromNSQ both are created but none of the required rules are set.
One nice solution would be for the return message (ie, {"daemon":"BLOCK_CREATED"}) would have text describing which rules must be set.
Another approach is to create defaults for all blocks, but this encourages errors and might reinforce bad habits IMO.
Some ideas for the standardization of blocks, heavily inspired by #34 :
Block Routine
A Block Routine is the func
that is called as a go routine that contains only logic. Has a simplejson.Json
in chan
, a simpleJson out chan
, or both. Block Routines are wrapped in Blocks. Block Routines reside as part of the /streamtools
library.
Block Function
Block Functions are dependencies for Block Routines . They are not run as go routines and are located in the /streamtools
library. It may be a good idea to divorces Block Routines from Block Functions in /streamtools
.
Blocks
Blocks provide an NSQ wrapper for Block Routines. Blocks take care of wiring up go channels to NSQ and are responsible for initializing and running Block Routines. Typically, there will be 2-3 go routines per block: an NSQ reader, NSQ publisher, and a Block Routine.
A block should act as a standalone executable and be able to interface to a standard mode of execution and introspection.
one Block Routine per Block
All logic for a block should be contained in a single function ( Block Routine ). All Block Routine state should be maintained within that single go routine. All messages in and out of that block should be managed by that single go routine. A Block Routine should not share state with any other go routines, unless it is through a channel.
Block Routines allow introspection
Block Routines should have a chan of some kind that allow reports on what is currently happening in the routine. A health chan may also be nice, to divorce technical stats (in flight, backed up queues, processing time, num msgs processed ) from stats that come from the blocks logic (distribution of X, etc).
online setting of rules
We have yet to standardize how a Block Routine is initialized with rules that govern its logic. I propose that Block Routines should have a rules chan that initializes the logic and signals it to start processing. This would help us in avoiding any kind of flag soup, and potentially allowing for run-time rule changes.
Our current architecture is limited in that we cannot use the http server created by daemon to handle websockets or http streaming capabilities on a per block basis. This means that each block that needs to http stream/use websockets has to stand up another http server on a different port.
Ideally we would write our own handler in the future that allows blocks more basic control of the kinds of http traffic it can handle.
emits a force-directed network vis vega object based on the incoming message
... but I can't remember what it was. This issue is a placeholder for whatever that was.
all our blocks use different methods to grab values from a key specified in the rule. Many of the blocks lack the ability to grab nested keys. We should have some standard way of grabbing keys in streamtools/utils where we accept '.' delineated key paths.
filter value by set membership accepts a nested key in the format of key_a.key_b.key_c, I think other blocks shoud probably follow suit
terribly inconsistent!
I think a 'long poll' has more subtleties to it than I thought...
So that they don't share a channel when reading from the same topic.
a new message contributes a new node if we've not seen that node type before, and increments an edge weight.
store in a standard json graph format (d3 must have specified one) and keep in mind this will be backed by some graph database one day.
Right now one MUST encode one's POSTs as application/x-www-form-urlencoded
which is terrible given we're only posting JSON.
there need to be some updated example scripts that demonstrate simple data processing cases with ST, for example:
br, _ := bucket.GetReader(v.Key)
needs to check to handle the error and break on error.
emits a line chart vega object based on the incoming message
all blocks should be doing SOMETHING while waiting for their initial rule set. They should be queryable, connectable etc
poll an HTTP endpoint
ideas?
It would be super useful for one streamtools apps to be able to use multiple computers. For this, though, we need to make sure each block is using the same lookupd!
write messages into mongo
I don't know if this is a bug, but I sort of feel like newlines at the end of the http responses would be nice.
some of the scripts in /st don't work anymore and they are named inconsistently.
A connection should have a cheap model of the time between events. If time since the last event is unlikely under that model, then begin alerting.
write messages to S3
Consider an example flow:
The import from NSQ takes a stream from a non-local NSQ and puts in the local NSQ. The Filter then reads from the local NSQ and then publishes to the local NSQ. The Synchronizer reads from the local NSQ and then publishes to the local NSQ, and finally, the Export reads from the local NSQ.
This would work fine for smaller streams, but the load caused by putting things on and off a local NSQ causes a bunch of redundancy. The thing is that all of the filtering/synchronizing/export logic is still super useful, the only problem is that the logic to speed up the architecture drastically is locked away in binaries that include NSQ readers/publishers.
I propose an architecture for the design of megablocks
something like this:
/streamtools just contains the structs that we use to deal with Go chan messages.
/blocks contains basically everything that is in the root right now (w/ NSQ stuff)
a file in streamtools would look something like this:
package streamtools
type Filter struct{
in chan []byte,
out chan []byte,
pattern string
}
func NewFilter(in chan []byte, out chan[]byte, pattern string){
this = &Filter{
in: make(chan []byte),
out: make(chan []byte),
pattern: pattern
}
go func(){
this.run();
}
return this
}
func (this *Filter) run(){
for{
select{
case in<-:
// do filter stuff here
}
This way, all of the logic in streamtools becomes agnostic as to how they are implemented. You could use them as part of the streamtools suite, or if you are just handling Go msgs in your own application you can import from /streamtools and use them without NSQ. Or you could chain them together to make megablocks.
/blocks would be full of NSQ-ready binaries, with really simple code that are basically NSQ wrappers around the streamtools logic. a filer would look something like this:
import "github.com/nytlabs/stream_tools"
func main(){
streamtools.NewNSQReader(params, channel A)
streamtools.NewFilter(pattern, channel A, channel B)
streamtools.NewNSQPublisher(params, channel B)
}
and this also means you could do something like
import "github.com/nytlabs/stream_tools"
func main(){
streamtools.NewNSQReader(params, channel A)
streamtools.NewFilter(pattern, channel A, channel B)
streamtools.NewSynchronizer(channel B, channel C)
streamtools.NewNSQPublisher(params, channel C)
}
basically, it allows for streamtools core to be a library that we use in the making of the NSQ-based block binaries. It's also good for when we want to start sharing util functions, like flatten, map, etc
eh?
we should probably spot bad JSON on the way in
if you create a block with id="" it should complain
sometimes you want to mimic a time-accurate stream.
Many of our blocks are missing documentation. Every block should have a basic description in the file that contains the blockroutine. This should include both how the block works, what kind of data can be expected form the block, whether or not the block hasoutputs, and what kind of parameters that can be sent to it.
emits a bar chart vega object based on the incoming message
Currently there is a bunch of duplicate header code in daemon.go that should be put in some kind of wrapper so that we don't have to add it every time we have a new type of HTTP response. blech.
it would be nice if we could tell which block was logging. We should include id, possibly blocktype.
should read through a file, line by line, and emit into streamtools
We need to be passing pointers to JSON around, not the values themselves. This helps simplejson work, as well as massively reducing the copying of data that we are doing currently.
it leaves a lot of logs like
cannot perform an equals operation on this type
make sure each connection has a copy of the last message it saw so we can interrogate it.
Often streams are being read from that haven't ever seen any messages, especially at startup. It would be nice if we could 'touch' a topic when we're reading from it, in order to quell the lookupd errors, and to spot mis-matched names!
a kernel density estimate that is updated with each new message.
Are there online methods for this? Gotta be...
i like the idea that the sources are called 'from' or 'poll' and the sinks are all 'to'
If I forget quotes around my connect curl it only gets a "from" parameter and pukes.
Post dial tcp 127.0.0.1:7070: connection refused
causes postValue to fail dramatically
... like
jqblock -command="'if .data.stories[0].url == \"http://www.nytimes.com/\" then .data.stories[1] else .data.stories[0] end'" -read-topic="top_stories_by_tweet" -write-topic="top_story_by_tweet" -name="extract_top_story"
emits a scatter chart vega object based on the incoming message
So we have a ton of blocks now, and some experience of using this in prod. Let's do one more big architecture review before all this get serious. Suggestions should go in the comments below!
it may be convenient/nice if any block could add a time stamp of simple time.Now() to a processed messsage/saved state/etc.
collect messages from Amazon SNS
doesn't have to be complicated
every block should write their settings to the log files for santiy checking!
collect data from SQS
make get requests by converting specific message keys, or all the message key/values into URL params
A declarative, efficient, and flexible JavaScript library for building user interfaces.
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. ๐๐๐
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google โค๏ธ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.