seepng's People
Forkers
paulrw andrei-alpha davidcmh pgaref lsds ftube matthiasweidlich wjciv huankaichen pk1983 semanticbeengseepng's Issues
Raise processing abstractions to the programming model
Make build dependencies optional
No point in including, for example, the hadoop libraries to read from HDFS if the deployment does not require it. Gradle excels at doing that. Necessary to rethink and organize the build process to include dependencies only when necessary.
Enforce automatically batch sizes bigger than minimum tuple size
Make sure this is robust. Throw errors when parsing configuration or prevent errors from happening with wrong batch sizes (e.g. by changing the value to a safe default).
Binary input and output use different formats
Binary input requires a control byte and batch size information before a set of tuples, whereas the output just contains the tuples themselves. This means that the output cannot be used as input without first putting it through some sort of intermediate program to insert the batch information.
The solution is probably a flag in the WorkerConfig class which indicates whether batch information is present in an input file. We will need to alter the code in InputBuffer.readFrom to handle the case where this flag is set to false.
More code reuse in collectors
Collectors seem to be doing the same type of work. Do some composition to avoid all the redundancy that one can find now.
Remove schema declaration from query definition of seep tasks
Send data with api.sendToStreamId()
There seem to be some difficulty when an operator who has more than one downstream operator tries to call api.sendToStreamId() to selectively send data to a particular streamId.
In addition, WorkerShutdownHookWorker is later triggered when calling the method.
(When the operator has only one downstream operator, api.send() works fine.)
Improve build pipeline
Create gradle tasks for:
- Create query automatically with boilerplate files and directories.
- Streamline build and deploy process.
Scripts for local deployment
Make a bash+tmux based alternative for local deployment
Web-UI
Opening issue to create a design for this.
InputAdapter abstraction
Consider 1 single InputAdapter per DataStore.
Then let the InputAdapter compose, internally, a mechanism to implement a particular DataConnectionType.
This is in contrast to have an InputAdapter per DataStore - DataConnectionType
The outcome would be better code reuse and concentrated functionality.
Scripts for distributed deployment
At the moment fabric is the preferred option. This scripts should actually allow not only to perform a distributed deployment but also some sort of local deployment for quick testing (via tmux scripting for example). Think about keeping this together or actually separating it into two.
Optimize app-level-batching
Avoid heavy array copying or minimise overhead by:
- pooling structures (depends on multicore processing engine implementation)
- arrays instead of datastructures and arrayCopy when necessary
Serializer Configuration [proposed Label] feature
Currently supported SEEP serializers are:
- Java (default)
- Kryo
- Avro
First in the TODO list is the user to be able to select the preferred serializer. For this reason, we need to add extra configuration option in the MasterConfig and WorkerConfig - we also need to check those two are consistent when workers connect to the master(?)
Second: when users implement custom classes for the queries there should be an easy way to register those classes to the serializer. Currently those classes should be part of the seep-common project in order to be serializable by both the master and the workers. Which is the easiest way to do that?
Variable sized tuples - populateOffsets misses buffer position
populateOffsets creates problems:
Exception in thread "SingleThreadProcessingEngine" java.lang.IllegalArgumentException
at java.nio.Buffer.position(Buffer.java:244)
at uk.ac.imperial.lsds.seep.api.data.ITuple.populateOffsets(ITuple.java:198)
at uk.ac.imperial.lsds.seep.api.data.ITuple.setData(ITuple.java:52)
at uk.ac.imperial.lsds.seepworker.core.input.NetworkDataStream.pullDataItem(NetworkDataStream.java:93)
at uk.ac.imperial.lsds.seepworker.core.SingleThreadProcessingEngine$Worker.run(SingleThreadProcessingEngine.java:93)
at java.lang.Thread.run(Thread.java:745)
Scheduled Source [proposed Label] feature
We need a SEEP-Source implementation that is always available - meaining that it will never be scheduled. The Source receives data from network / or reads from file and keeps everything in memory. It then sends memory-data to the next dependent stage whenever is available (being scheduled).
For the basic implementation we need:
- A network Source ( keep data in memory until next stage is available )
- A file Source which could also read from HDFS
- Modifications in the schedule description which needs to be aware of the Sources
- Probably changes in the DataReferenceManager class
Mechanism to map task to node
For the static case this should be quite straightforward. Design something that is also useful for the "static scale-out" case.
Implement shortString to allow tuples to be fixedSize yet supporting string
Scaling out operators - InvalidLifecycleStatusException
It seems like the new type of SEEP query (uk.ac.imperial.lsds.seep.api.operator.SeepLogicalQuery) is not able to properly scale out operators. For example, the static-scaleout-pipeline-stateless-query example is not running with the current SEEP version as it triggers an InvalidLifecycleStatusException.
Looking further into this I found that this event is triggered by lines 135-138 in function setInitialPhysicalInstancesForLogicalOperator in uk.ac.imperial.lsds.seep.api.QueryBuilder as the given opId is expected "not to be" in usedIds. I would rather think that this doesn't make any sense, to scale out an operator its id should have already been used, right? However, if it seems that if I take out this comparison, then a network exception arises in the scaled-out operator:
Exception in thread "Network-Reader-0" java.nio.BufferUnderflowException at java.nio.Buffer.nextGetIndex(Buffer.java:506) at java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:361) at uk.ac.imperial.lsds.seepworker.core.input.InputBuffer.forwardTuples(InputBuffer.java:88) at uk.ac.imperial.lsds.seepworker.core.input.InputBuffer.readFrom(InputBuffer.java:64) at uk.ac.imperial.lsds.seepworker.comm.NetworkSelector$Reader.run(NetworkSelector.java:347) at java.lang.Thread.run(Thread.java:745)
Any clues about how this can be fixed?
Large Input causing OutputBuffer-BufferOverflowException
Just noticed today a rather interesting issue:
I was testing the simple File source example reading String lines from a file.
Each of these line could be rather big (hundreds of bytes-see below). When I used the MarkerSink (meaning the bytes would not have to go over the network) the exampled worked just fine.
On the other hand when I tried to plug in a real Sink I faced the exception below.
I think the exception comes from the OutputBuffer class where we alocating static buffer size:
- int headroomSize = this.BATCH_SIZE * 2;
- buf = ByteBuffer.allocate(headroomSize);
My question here is how we want to handle this case? Split the input into smaller chunks that fit into the buffers? Dynamicly extend the buffers? I know some people before just increased the batch size to bypass it but this is not really a solution is it?
23:29:49 [SingleThreadProcessingEngine] INFO SingleThreadProcessingEngine$Worker - Configuring SINGLETHREAD processing engine with 1 inputAdapters
23:29:49 [File-Reader] INFO Config - FileConfig values:
file.path = /home/pg1712/jmeter.log
character.set = UTF-8
text.source = true
serde.type = 0
23:29:49 [SingleThreadProcessingEngine] INFO SingleThreadProcessingEngine$Worker - Configuring SINGLETHREAD processing engine with 1 outputBuffers
[Processor] data send Size: 81 => total Size: 81
[Processor] data send Size: 129 => total Size: 210
23:29:49 [File-Reader] INFO FileSelector$Reader - Finished text File Reader worker: File-Reader
Exception in thread "SingleThreadProcessingEngine" java.nio.BufferOverflowException
at java.nio.HeapByteBuffer.put(HeapByteBuffer.java:189)
at java.nio.ByteBuffer.put(ByteBuffer.java:859)
at uk.ac.imperial.lsds.seepworker.core.output.OutputBuffer.write(OutputBuffer.java:93)
at uk.ac.imperial.lsds.seepworker.core.Collector.send(Collector.java:116)
at Processor.processData(Processor.java:26)
at uk.ac.imperial.lsds.seepworker.core.SingleThreadProcessingEngine$Worker.run(SingleThreadProcessingEngine.java:111)
at java.lang.Thread.run(Thread.java:745)
^C23:30:41 [Thread-2] INFO WorkerShutdownHookWorker - JVM is shutting down...
Better seep-master UI
Related to issue #5
There is some boilerplate code en the "ui" package in seep-master to define and start different UI front-ends. Right now we only have a very basic one, called SIMPLE_CONSOLE. Ideally we would like a better UI that would:
- Facilitate the lifecycle management of both system and application. E.g. by allowing to deploy, redeploy start and stop queries.
- Ideally would not require direct access to the node where seep-master is running.
- Would serve different purposes such as: understand the current infrastructure (nodes and characteristics), current status of the system (monitoring metrics)...
The better option seems to be a web interface. We would need to define an interface that master exposes with information about the running system and queries, as well as API calls to perform actions on the system. The web-interface, then, can use that interface to retrieve and manage the system.
We can take a look at the Hadoop and Spark web interface to use as a model, see what they offer and what we need.
Also, probably we don't want this UI to be another subproject. It should simply be another subpackage within the "ui" package.
Will be providing more details soon.
Running scheduled examples
It seems like none of the scheduled examples (simple-pipeline-scheduledef, simple-dag-schedule, simple-mdf-schedule) within SEEPng are working at the moment.
When running simple-pipeline-scheduledef & simple-dag-schedule, SEEPng launches an exception due to the call to function ProtocolCommandFactory.buildScheduleStageCommand in lines 67-68 of class DataParallelWithInputDataLocalityLoadBalancingStrategy, which calls cdr.getRankedDatasetForNode(euId) as an input parameter. The cdr variable from class ClusterDatasetRegistry is asked to provide the ranked datasets per node according to the given euId. It seems like this information is stored in a map (Map<Integer, List> rankedDatasetsPerNode), which according to the comments specifically contains datasets per node ordered by priority to live in memory. The call to this object fails in line 38 (rankedDatasetsPerNode.get(euId)) because the rankedDatasetsPerNode map has not been previously initialized. To solve this problem, I think we need to initialize this map before-hand, however, as I'm not sure how this should behave, I don't know where we should initialize this.
In the case of simple-mdf-schedule I assume the current version of this query is not supposed to run in its current state as it is still work in progress, am I right? For example, in the Base class, evaluator1 (line 23) has the same operator id as adderTwo (line 22). When creating the choose operator (line 26), the newChooseOperator method invoked from the QueryBuilder (lines 143 to 147) returns a null value, which I think is something that needs to be implemented.
Exception Handling and seep-common Error package
I was recently looking at the ITuple class code and found some TODOs that need to throw proper exceptions. I was thinking to add some code there but I have two questions:
- Should all the custom SEEP exceptions be part of seep-common => uk.ac.imperial.lsds.seep.errors class? Because for example SchemaException is implemented ouside this package.
- Should we have a generic Exception for all the related cases or create more specific ones? For example in the ITuple class we would throw a SchemaException or a fieldName/Type Exception ?
Cheers,
Panagiotis
Create local query debugger
SEEP as a library
Extern Configs Validator
When people write queries for File, Kafka, HDFS, etc, they must send their properties in a Properties object. Have a validator in the target Config object to make sure that the Properties object contains all the necessary information to create a valid instance of the target configuration object.
Gradle task to create boilerplate structure for query.
Should be able to do something like:
./gradlew makeQuery
and get a directory named with all the stuff required to develop, build and deploy a SEEP query. In particular, the repo should contain all files and dependencies required, a gradle wrapper to build the query and support for IDEs (eclipse and intellij).
Add methodDeclaratorVisitor to AnnotationAnalysis
AnnotationAnalysis analyzes field and variable declarations. We need another method to analyze methodDeclarations as there are annotations (Collection) that annotates that method.
This requires implementing a new method in AnnotationAnalysis and a new unit test.
Metrics and Monitoring system
Design and implement a basic module for collecting metrics and reporting them
OutputAdapter without buffers
Make outputAdapter independent of actual buffers. Similar to InputAdapter.
All buffer should be managed by core or processing engine, accessible by the different collectors, etc.
Further abstraction for EndPoint and Connection
At the moment these are intertwined, which is ok, as they only depend on each other. It is, however, ugly, and prone to error.
Flush outputBuffers on max latency heartbeat
In cases where the data rate is low and one wants to make sure that all tuples are processed with low latency, we'd need a heartbeat mechanism that flushes buffers. Otherwise, buffers will keep tuples until they are fully filled.
Refactor network and file input stream
Too much code repetition right now. Tie this issue with improvement of critical path, as there'll be changes that affect each other.
Automate annotation parsing in AnalysisUtils
The method identifyAnnotation contains an ugly switch. Instead, modify the enum "SDGAnnotation" to contain logic to allow to do this automatically. This would avoid the need to modify this method manually after every change.
Optimize data processing path
Emphasis in receiving side. E2E tests are needed as well.
examples as a new seep-examples subproject
Instead of an independent project. That way examples can work as integration tests and be validated when there are API changes.
Implement app lifecycle in worker
Master has now a notion of the status of a query. Propagate the same to the worker nodes so that they have an opinion. Use this to start, stop, flush clean resources accordingly.
Add support for multiple states per operator
Right now the API accepts one instance of Stateful State. There is a LogicalState interface to allow precisely this, but it requires a bit more of work and testing to make sure that it is easy to use and accomplishes the initial goal.
SchemaParser breaks serialization
SchemaParser contains a Charset attribute that breaks serialization when sent to workers. Non-serializable objects should be created lazily when needed, or through some initialization method when the class is created.
Fold in fixes from RestfulAPI branch in LSDS repository.
I've made a few fixes over there which should be pulled into here. I don't know how to cherrypick, and I don't want to take the time to switch branches on my dev machine at the moment. This issue serves as a placeholder to remind me to do this after I finish with the industry demo stuff.
Relevant changes (committed 25 July 2017 in https://github.com/lsds/SEEPng/commits/RESTful_API):
f74b8cc
a3f0153
2018a35
Implement application-level batching
And add a property to configure such value
Rename DataOrigin
DataOrigin can also be used as a destiny after external software integration. Rename it to something more generic. Some examples: DataEndpoint...
Revisit output buffer flushing
Output is currently going into buffers. We need to spend more time figuring out when to flush those buffers. Doing so after every write is inefficient and undermines the point of using a buffer in the first place, but not flushing often enough will add latency.
Remove .cached files during cleanup
If Datasets are read directly from disk they are never returned to memory, and the .cached file outlives the program. Since these files are stored on disk in the folder where the worker is run you can quickly run up a large number of files. We should be able to figure out which files belong to a worker during the shutdown hook and remove them.
Initial comm Synchronization
There are cases where the system won't start after triggering "start". This is common when the send rate is high. One workaround is to wait additional time (e.g. 2 seconds) before starting the system, to make sure that all communication pairs are properly configured. A proper solution, however, would require to understand what is exactly that is not configured and synchronise on it.
Revise query communication master-worker
Now it's sent serialised. Is this due to a legacy reason? Why otherwise?
Improve GenerateBinaryFile tool
In:
uk.ac.imperial.lsds.seep.tools
there is a tool called GeneratedBinaryFile that can generate binary files by inputting ints. Make this general so that we can create anything according to a given schema. (the tool already accepts schemas)
Hide streamId in SEEP API
A rule is that an operator must have different streamId for their upstream and downstream connections. This must be enforced by the programmer right now, while writing the query. Instead, hide it in the API by default and leave the current methods for cases where this needs to be programmed (as when writing the code from java2sdg).
Schema store
"A more involved solution would be to name schemas, and then make them available in a store that users can access."
Give schemas a name and made them available to user applications. Users can then find the schemas they need and use them to create OTuple without declaring them explicitly in the seeptask code.
Application and system lifecycle management
Control application (e.g. stop query).
Control system (e.g. stop and clean system).
Enumerate requirements for these.
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
๐ Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. ๐๐๐
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google โค๏ธ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.