Giter Club home page Giter Club logo

seepng's People

Contributors

andrei-alpha avatar jamesdavidcarr avatar matthiasweidlich avatar mypijika avatar paulrw avatar pgaref avatar raulcf avatar wjciv avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

seepng's Issues

Make build dependencies optional

No point in including, for example, the hadoop libraries to read from HDFS if the deployment does not require it. Gradle excels at doing that. Necessary to rethink and organize the build process to include dependencies only when necessary.

Binary input and output use different formats

Binary input requires a control byte and batch size information before a set of tuples, whereas the output just contains the tuples themselves. This means that the output cannot be used as input without first putting it through some sort of intermediate program to insert the batch information.

The solution is probably a flag in the WorkerConfig class which indicates whether batch information is present in an input file. We will need to alter the code in InputBuffer.readFrom to handle the case where this flag is set to false.

More code reuse in collectors

Collectors seem to be doing the same type of work. Do some composition to avoid all the redundancy that one can find now.

Send data with api.sendToStreamId()

There seem to be some difficulty when an operator who has more than one downstream operator tries to call api.sendToStreamId() to selectively send data to a particular streamId.
In addition, WorkerShutdownHookWorker is later triggered when calling the method.

(When the operator has only one downstream operator, api.send() works fine.)

Improve build pipeline

Create gradle tasks for:

  • Create query automatically with boilerplate files and directories.
  • Streamline build and deploy process.

Web-UI

Opening issue to create a design for this.

InputAdapter abstraction

Consider 1 single InputAdapter per DataStore.
Then let the InputAdapter compose, internally, a mechanism to implement a particular DataConnectionType.

This is in contrast to have an InputAdapter per DataStore - DataConnectionType

The outcome would be better code reuse and concentrated functionality.

Scripts for distributed deployment

At the moment fabric is the preferred option. This scripts should actually allow not only to perform a distributed deployment but also some sort of local deployment for quick testing (via tmux scripting for example). Think about keeping this together or actually separating it into two.

Optimize app-level-batching

Avoid heavy array copying or minimise overhead by:

  • pooling structures (depends on multicore processing engine implementation)
  • arrays instead of datastructures and arrayCopy when necessary

Serializer Configuration [proposed Label] feature

Currently supported SEEP serializers are:

  • Java (default)
  • Kryo
  • Avro

First in the TODO list is the user to be able to select the preferred serializer. For this reason, we need to add extra configuration option in the MasterConfig and WorkerConfig - we also need to check those two are consistent when workers connect to the master(?)

Second: when users implement custom classes for the queries there should be an easy way to register those classes to the serializer. Currently those classes should be part of the seep-common project in order to be serializable by both the master and the workers. Which is the easiest way to do that?

Variable sized tuples - populateOffsets misses buffer position

populateOffsets creates problems:

Exception in thread "SingleThreadProcessingEngine" java.lang.IllegalArgumentException
at java.nio.Buffer.position(Buffer.java:244)
at uk.ac.imperial.lsds.seep.api.data.ITuple.populateOffsets(ITuple.java:198)
at uk.ac.imperial.lsds.seep.api.data.ITuple.setData(ITuple.java:52)
at uk.ac.imperial.lsds.seepworker.core.input.NetworkDataStream.pullDataItem(NetworkDataStream.java:93)
at uk.ac.imperial.lsds.seepworker.core.SingleThreadProcessingEngine$Worker.run(SingleThreadProcessingEngine.java:93)
at java.lang.Thread.run(Thread.java:745)

Scheduled Source [proposed Label] feature

We need a SEEP-Source implementation that is always available - meaining that it will never be scheduled. The Source receives data from network / or reads from file and keeps everything in memory. It then sends memory-data to the next dependent stage whenever is available (being scheduled).

For the basic implementation we need:

  • A network Source ( keep data in memory until next stage is available )
  • A file Source which could also read from HDFS
  • Modifications in the schedule description which needs to be aware of the Sources
  • Probably changes in the DataReferenceManager class

Request for comments: @raulcf @WJCIV

Mechanism to map task to node

For the static case this should be quite straightforward. Design something that is also useful for the "static scale-out" case.

Scaling out operators - InvalidLifecycleStatusException

It seems like the new type of SEEP query (uk.ac.imperial.lsds.seep.api.operator.SeepLogicalQuery) is not able to properly scale out operators. For example, the static-scaleout-pipeline-stateless-query example is not running with the current SEEP version as it triggers an InvalidLifecycleStatusException.

Looking further into this I found that this event is triggered by lines 135-138 in function setInitialPhysicalInstancesForLogicalOperator in uk.ac.imperial.lsds.seep.api.QueryBuilder as the given opId is expected "not to be" in usedIds. I would rather think that this doesn't make any sense, to scale out an operator its id should have already been used, right? However, if it seems that if I take out this comparison, then a network exception arises in the scaled-out operator:

Exception in thread "Network-Reader-0" java.nio.BufferUnderflowException at java.nio.Buffer.nextGetIndex(Buffer.java:506) at java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:361) at uk.ac.imperial.lsds.seepworker.core.input.InputBuffer.forwardTuples(InputBuffer.java:88) at uk.ac.imperial.lsds.seepworker.core.input.InputBuffer.readFrom(InputBuffer.java:64) at uk.ac.imperial.lsds.seepworker.comm.NetworkSelector$Reader.run(NetworkSelector.java:347) at java.lang.Thread.run(Thread.java:745)

Any clues about how this can be fixed?

Large Input causing OutputBuffer-BufferOverflowException

Just noticed today a rather interesting issue:
I was testing the simple File source example reading String lines from a file.
Each of these line could be rather big (hundreds of bytes-see below). When I used the MarkerSink (meaning the bytes would not have to go over the network) the exampled worked just fine.
On the other hand when I tried to plug in a real Sink I faced the exception below.
I think the exception comes from the OutputBuffer class where we alocating static buffer size:

  • int headroomSize = this.BATCH_SIZE * 2;
  • buf = ByteBuffer.allocate(headroomSize);

My question here is how we want to handle this case? Split the input into smaller chunks that fit into the buffers? Dynamicly extend the buffers? I know some people before just increased the batch size to bypass it but this is not really a solution is it?

23:29:49 [SingleThreadProcessingEngine] INFO  SingleThreadProcessingEngine$Worker - Configuring SINGLETHREAD processing engine with 1 inputAdapters
23:29:49 [File-Reader] INFO  Config - FileConfig values: 
    file.path = /home/pg1712/jmeter.log
    character.set = UTF-8
    text.source = true
    serde.type = 0

23:29:49 [SingleThreadProcessingEngine] INFO  SingleThreadProcessingEngine$Worker - Configuring SINGLETHREAD processing engine with 1 outputBuffers
[Processor] data send Size: 81 => total Size: 81
[Processor] data send Size: 129 => total Size: 210
23:29:49 [File-Reader] INFO  FileSelector$Reader - Finished text File Reader worker: File-Reader
Exception in thread "SingleThreadProcessingEngine" java.nio.BufferOverflowException
    at java.nio.HeapByteBuffer.put(HeapByteBuffer.java:189)
    at java.nio.ByteBuffer.put(ByteBuffer.java:859)
    at uk.ac.imperial.lsds.seepworker.core.output.OutputBuffer.write(OutputBuffer.java:93)
    at uk.ac.imperial.lsds.seepworker.core.Collector.send(Collector.java:116)
    at Processor.processData(Processor.java:26)
    at uk.ac.imperial.lsds.seepworker.core.SingleThreadProcessingEngine$Worker.run(SingleThreadProcessingEngine.java:111)
    at java.lang.Thread.run(Thread.java:745)
^C23:30:41 [Thread-2] INFO  WorkerShutdownHookWorker - JVM is shutting down...

Better seep-master UI

Related to issue #5

There is some boilerplate code en the "ui" package in seep-master to define and start different UI front-ends. Right now we only have a very basic one, called SIMPLE_CONSOLE. Ideally we would like a better UI that would:

  • Facilitate the lifecycle management of both system and application. E.g. by allowing to deploy, redeploy start and stop queries.
  • Ideally would not require direct access to the node where seep-master is running.
  • Would serve different purposes such as: understand the current infrastructure (nodes and characteristics), current status of the system (monitoring metrics)...

The better option seems to be a web interface. We would need to define an interface that master exposes with information about the running system and queries, as well as API calls to perform actions on the system. The web-interface, then, can use that interface to retrieve and manage the system.

We can take a look at the Hadoop and Spark web interface to use as a model, see what they offer and what we need.

Also, probably we don't want this UI to be another subproject. It should simply be another subpackage within the "ui" package.

Will be providing more details soon.

Running scheduled examples

It seems like none of the scheduled examples (simple-pipeline-scheduledef, simple-dag-schedule, simple-mdf-schedule) within SEEPng are working at the moment.

When running simple-pipeline-scheduledef & simple-dag-schedule, SEEPng launches an exception due to the call to function ProtocolCommandFactory.buildScheduleStageCommand in lines 67-68 of class DataParallelWithInputDataLocalityLoadBalancingStrategy, which calls cdr.getRankedDatasetForNode(euId) as an input parameter. The cdr variable from class ClusterDatasetRegistry is asked to provide the ranked datasets per node according to the given euId. It seems like this information is stored in a map (Map<Integer, List> rankedDatasetsPerNode), which according to the comments specifically contains datasets per node ordered by priority to live in memory. The call to this object fails in line 38 (rankedDatasetsPerNode.get(euId)) because the rankedDatasetsPerNode map has not been previously initialized. To solve this problem, I think we need to initialize this map before-hand, however, as I'm not sure how this should behave, I don't know where we should initialize this.

In the case of simple-mdf-schedule I assume the current version of this query is not supposed to run in its current state as it is still work in progress, am I right? For example, in the Base class, evaluator1 (line 23) has the same operator id as adderTwo (line 22). When creating the choose operator (line 26), the newChooseOperator method invoked from the QueryBuilder (lines 143 to 147) returns a null value, which I think is something that needs to be implemented.

Exception Handling and seep-common Error package

I was recently looking at the ITuple class code and found some TODOs that need to throw proper exceptions. I was thinking to add some code there but I have two questions:

  • Should all the custom SEEP exceptions be part of seep-common => uk.ac.imperial.lsds.seep.errors class? Because for example SchemaException is implemented ouside this package.
  • Should we have a generic Exception for all the related cases or create more specific ones? For example in the ITuple class we would throw a SchemaException or a fieldName/Type Exception ?

Cheers,
Panagiotis

Extern Configs Validator

When people write queries for File, Kafka, HDFS, etc, they must send their properties in a Properties object. Have a validator in the target Config object to make sure that the Properties object contains all the necessary information to create a valid instance of the target configuration object.

Gradle task to create boilerplate structure for query.

Should be able to do something like:

./gradlew makeQuery

and get a directory named with all the stuff required to develop, build and deploy a SEEP query. In particular, the repo should contain all files and dependencies required, a gradle wrapper to build the query and support for IDEs (eclipse and intellij).

Add methodDeclaratorVisitor to AnnotationAnalysis

AnnotationAnalysis analyzes field and variable declarations. We need another method to analyze methodDeclarations as there are annotations (Collection) that annotates that method.
This requires implementing a new method in AnnotationAnalysis and a new unit test.

OutputAdapter without buffers

Make outputAdapter independent of actual buffers. Similar to InputAdapter.
All buffer should be managed by core or processing engine, accessible by the different collectors, etc.

Flush outputBuffers on max latency heartbeat

In cases where the data rate is low and one wants to make sure that all tuples are processed with low latency, we'd need a heartbeat mechanism that flushes buffers. Otherwise, buffers will keep tuples until they are fully filled.

Automate annotation parsing in AnalysisUtils

The method identifyAnnotation contains an ugly switch. Instead, modify the enum "SDGAnnotation" to contain logic to allow to do this automatically. This would avoid the need to modify this method manually after every change.

Implement app lifecycle in worker

Master has now a notion of the status of a query. Propagate the same to the worker nodes so that they have an opinion. Use this to start, stop, flush clean resources accordingly.

Add support for multiple states per operator

Right now the API accepts one instance of Stateful State. There is a LogicalState interface to allow precisely this, but it requires a bit more of work and testing to make sure that it is easy to use and accomplishes the initial goal.

SchemaParser breaks serialization

SchemaParser contains a Charset attribute that breaks serialization when sent to workers. Non-serializable objects should be created lazily when needed, or through some initialization method when the class is created.

Rename DataOrigin

DataOrigin can also be used as a destiny after external software integration. Rename it to something more generic. Some examples: DataEndpoint...

Revisit output buffer flushing

Output is currently going into buffers. We need to spend more time figuring out when to flush those buffers. Doing so after every write is inefficient and undermines the point of using a buffer in the first place, but not flushing often enough will add latency.

Remove .cached files during cleanup

If Datasets are read directly from disk they are never returned to memory, and the .cached file outlives the program. Since these files are stored on disk in the folder where the worker is run you can quickly run up a large number of files. We should be able to figure out which files belong to a worker during the shutdown hook and remove them.

Initial comm Synchronization

There are cases where the system won't start after triggering "start". This is common when the send rate is high. One workaround is to wait additional time (e.g. 2 seconds) before starting the system, to make sure that all communication pairs are properly configured. A proper solution, however, would require to understand what is exactly that is not configured and synchronise on it.

Improve GenerateBinaryFile tool

In:
uk.ac.imperial.lsds.seep.tools
there is a tool called GeneratedBinaryFile that can generate binary files by inputting ints. Make this general so that we can create anything according to a given schema. (the tool already accepts schemas)

Hide streamId in SEEP API

A rule is that an operator must have different streamId for their upstream and downstream connections. This must be enforced by the programmer right now, while writing the query. Instead, hide it in the API by default and leave the current methods for cases where this needs to be programmed (as when writing the code from java2sdg).

Schema store

"A more involved solution would be to name schemas, and then make them available in a store that users can access."

Give schemas a name and made them available to user applications. Users can then find the schemas they need and use them to create OTuple without declaring them explicitly in the seeptask code.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.