Giter Club home page Giter Club logo

cascading-flink's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

cascading-flink's Issues

Obtaining Flink plan with './bin/flink info' throws ClassCastException

Trying to get the Flink DAG as JSON using ./bin/flink info failed with this exception:

java.lang.ClassCastException: org.apache.flink.client.program.OptimizerPlanEnvironment cannot be cast to org.apache.flink.client.program.ContextEnvironment
at com.dataartisans.flink.cascading.planner.FlinkFlowStepJob.internalNonBlockingStart(FlinkFlowStepJob.java:164)
at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:269)
at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:184)
at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:146)
at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:48)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

exclude *TestCase.java from surefire configuration

As per Chris Wensel, this should be excluded:

make sure you ‘exclude’ *TestCase from your unit test pattern.
all Cascading tests are *PlatformTest and *Test. there are no tests in *TestCase

And without this exclusion you wind up getting an odd JUnit error.

junit.framework.AssertionFailedError: Class cascading.ComparePlatformsTest$CompareTestCase has no public constructor TestCase(String name) or TestCase()

So a simple change to the pom.xml file's section for the maven-surefire-plugin:

                    <excludes>
                        <exclude>${excluded.tests}</exclude>
                        <exclude>**/*TestCase.java</exclude>
                    </excludes>

Add tests with typed fields

The Cascading Flink connector uses different execution paths for pipes with typed and untyped fields. These paths are not fully covered by Cascading's platform tests because these tests are mostly based on untyped fields.

Make Flink execution backend configurable

Flink can be executed in

  • local mode (spinning up a local FlinkMiniCluster)
  • remote mode (going against a remote JobManager)
  • YARN mode (grap containers from YARN and start Flink)

These options should be configurable in the FlinkConnector.

setNumSinkParts is currently ignored

Scheme.setNumSinkParts(1) should result in a single part-00000 file, and thus the upstream grouping should implicitly have a parallelism of 1. Or at least I assume that's how it would need to be handled, since I don't think DataSink is being used here, so you can't set the parallelism on that.

In the Javadocs for Cascading, this call is described as a suggestion (e.g. if your Flow only has maps then no such parallelism can be guaranteed) but it does wind up being relied upon by many workflows, when generating a small output file that has to be globally sorted.

Add support to cancel running jobs

Implement FlinkFlowStepJob.internalBlockOnStop() to stop/cancel a running Flink job.

This feature is tested in CascadePlatformTest.testSimpleCascadeStop().

Improve serialization of Tuple[] for n-ary Joins and CoGroups

N-ary joins and CoGroups build a Tuple2<Tuple, Tuple[]> where the Tuple[] field is treated as a ObjectArrayTypeInfo. The corresponding serializer is inefficient for our case:

  • it serializes the length of the array
  • it serializes a boolean (1byte) for each array field to flag null values
  • it uses Kryo to serialize the Tuple fields.

In our case, all array have the same (known) length, we can flag null values using a bitmap, and use a custom Tuple serializer.

Customize CoGroup and Join cascades for n-ary Joins and CoGroups

Cascading executes n-ary LeftJoin as follows

Only the farthest right tuple stream will be used as the outer join. All preceding joins to the left will be inner joins.

RightJoins are treated correspondingly. MixedJoins use combinations of inner and outer joins.

Right now, HashJoin does only support InnerJoins, CoGroup is always executed with a FullOuterJoin. We can make the execution of n-ary joins a bit more efficient, if we respect Cascading's join semantics.

SortedValuesPlatformTest fails for DOP > 1

Since the data sink DOP is fixed to 1 (see #4 ), data is shuffled to the sink and not ordered any more.
We might also need to add range partitioning for fully sorted output. We need to check how this works in Hadoop/Tez.

Add support for online counters

Flink's counters (called Accumulators) are currently extended for online updates. As soon as these online counters are available from the client, the need to be integrated with Cascading's FlinkFlowStepStats.

Tests fail on master

Results :

Failed tests:
TestSuite$1.warning Class cascading.ComparePlatformsTest$CompareTestCase has no public constructor TestCase(String name) or TestCase()

compile error with current HEAD

Hi,

I tried installing it like explained, but I get the following maven error:

[INFO] -------------------------------------------------------------
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 01:00 min
[INFO] Finished at: 2015-08-28T10:27:01+02:00
[INFO] Final Memory: 35M/304M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project cascading-flink: Compilation failure: Compilation failure:
[ERROR] /Users/akelpe/code/thirdparty/cascading-flink/src/main/java/com/dataArtisans/flinkCascading/runtime/groupBy/GroupByInGate.java:[125,24] com.dataArtisans.flinkCascading.runtime.groupBy.GroupByInGate.KeyPeekingIterator is not abstract and does not override abstract method remove() in java.util.Iterator
[ERROR] /Users/akelpe/code/thirdparty/cascading-flink/src/main/java/com/dataArtisans/flinkCascading/runtime/hashJoin/JoinClosure.java:[112,17] com.dataArtisans.flinkCascading.runtime.hashJoin.JoinClosure.SingleTupleIterator is not abstract and does not override abstract method remove() in java.util.Iterator
[ERROR] /Users/akelpe/code/thirdparty/cascading-flink/src/main/java/com/dataArtisans/flinkCascading/runtime/coGroup/regularJoin/CoGroupInGate.java:[132,24] com.dataArtisans.flinkCascading.runtime.coGroup.regularJoin.CoGroupInGate.JoinResultIterator is not abstract and does not override abstract method remove() in java.util.Iterator
[ERROR] /Users/akelpe/code/thirdparty/cascading-flink/src/main/java/com/dataArtisans/flinkCascading/runtime/coGroup/bufferJoin/CoGroupBufferInGate.java:[121,24] com.dataArtisans.flinkCascading.runtime.coGroup.bufferJoin.CoGroupBufferInGate.KeyPeekingIterator is not abstract and does not override abstract method remove() in java.util.Iterator
[ERROR] /Users/akelpe/code/thirdparty/cascading-flink/src/main/java/com/dataArtisans/flinkCascading/runtime/coGroup/bufferJoin/CoGroupBufferClosure.java:[298,24] com.dataArtisans.flinkCascading.runtime.coGroup.bufferJoin.CoGroupBufferClosure.FlinkUnwrappingIterator is not abstract and does not override abstract method remove() in java.util.Iterator
[ERROR] -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException

Example WordCount program should specify SinkMode.REPLACE for outTap

It's not required, but it's considered standard practice for Cascading workflows, as then you can run the workflow multiple times with the same output directory, and Cascading takes care of getting rid of it so you don't run into the classic "directory already exists" error.

Tap outTap = new Hfs(new TextDelimited(false, "\n"), args[1], SinkMode.REPLACE);

Make BATCH the default execution mode and add a parameter switch to PIPELINED

Flink features two modes to exchange data between parallel tasks over the network.

  • PIPELINED: forwards data over the network as soon as it is produced (modulo filling a network buffer). This provides very good performance but can have significant resource requirements in setups with high degrees of parallelism. PIPELINED shuffles can fail if not enough network buffers are available.
  • BATCH: locally collects all data and ships the data once the producing task finished. This mode increases the latency but requires fewer memory resources.

This issue is about making the robust, yet slower, BATCH mode the default mode for Cascading jobs on Flink. We add a parameter to switch back to PIPELINED mode.

Add checks for Flink union when exact types are required.

If two or more pipes with different tuple schemas (i.e., field types) should be unioned, we have to ensure that the type information of all pipes is identical. This should be done by comparing the types of all fields and unifying them either by adding missing types or removing existing types.

Alternatively we can throw a meaningful exception.

Handle Trap output files of previous runs correctly

Traps do remove existing files in their output directory in case of SinkMode.Replace.
Files with matching names are overwritten, but if an existing file is not removed if it is not overwritten.

This is because traps are not handled as regular data sinks but are individually opened by Flink functions that need to write to a trap. We need to add the InitializeOnMaster hook to regular Flink functions similar to Flink OutputFormats. This hook can be used by Flink functions that included traps to initialize their output directories.

Improve serialization of Cascading tuples

Cascading tuples are serialized with a boolean arrays to mark null-valued tuple fields. Each boolean value is serialized as a full byte.

This can be done more efficiently by using more space efficient bit masks.

Add support for CascadingDataSink with DOP > 1

Currently, running a CascadingDataSink with a DOP > 1 yields in empty results because the job commit logic removes the result files for some reason. We had a similar issue in Flink's Hadoop compatibility wrappers for Hadoop OutputFormats and solve it by moving the commiting to the JM via FinalizeOnMaster (see FLINK-1139).

Right now, the DOP of data sinks is fixed to 1 in FlinkFlowStep.translateSink().

translateHashJoin should allow Joiner subclasses

When running Scalding jobs on Flink (more info here: http://themodernlife.github.io/scala/hadoop/hdfs/sclading/flink/streaming/realtime/2015/12/20/running-scalding-jobs-on-apache-flink/) I noticed that the only join type that works was

pipe.joinWithSmaller(...)

Unfortunately a lot of jobs are going to use something like

pipe.joinWithTiny(...)

Currently this fails on Flink with

Caused by: cascading.flow.planner.PlannerException: [net.themodernlife.Word...] could not build flow from assembly: [HashJoin does only support InnerJoin and LeftJoin.]
    at cascading.flow.planner.FlowPlanner.handleExceptionDuringPlanning(FlowPlanner.java:749)
    at cascading.flow.planner.FlowPlanner.buildFlow(FlowPlanner.java:209)
    at cascading.flow.FlowConnector.connect(FlowConnector.java:456)
    at com.dataartisans.flink.cascading.FlinkConnector.connect(FlinkConnector.java:86)
    at com.twitter.scalding.ExecutionContext$class.buildFlow(ExecutionContext.scala:74)
    at com.twitter.scalding.ExecutionContext$$anon$1.buildFlow(ExecutionContext.scala:165)
    at com.twitter.scalding.Job$$anonfun$buildFlow$1.apply(Job.scala:225)
    at com.twitter.scalding.Job$$anonfun$buildFlow$1.apply(Job.scala:225)
    at scala.util.Success.flatMap(Try.scala:231)
    at com.twitter.scalding.Job.buildFlow(Job.scala:225)
    at com.twitter.scalding.Job.run(Job.scala:295)
    at com.twitter.scalding.Tool.start$1(Tool.scala:126)
    at com.twitter.scalding.Tool.run(Tool.scala:142)
    at com.twitter.scalding.Tool.run(Tool.scala:70)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
    at com.twitter.scalding.Tool$.main(Tool.scala:150)
    ... 12 more
Caused by: cascading.flow.FlowException: HashJoin does only support InnerJoin and LeftJoin.
    at com.dataartisans.flink.cascading.planner.FlinkFlowStep.translateHashJoin(FlinkFlowStep.java:882)
    at com.dataartisans.flink.cascading.planner.FlinkFlowStep.buildFlinkProgram(FlinkFlowStep.java:309)
    at com.dataartisans.flink.cascading.planner.FlinkFlowStep.createFlowStepJob(FlinkFlowStep.java:126)
    at com.dataartisans.flink.cascading.planner.FlinkFlowStep.createFlowStepJob(FlinkFlowStep.java:96)
    at cascading.flow.planner.BaseFlowStep.getCreateFlowStepJob(BaseFlowStep.java:913)
    at cascading.flow.BaseFlow.initializeNewJobsMap(BaseFlow.java:1318)
    at cascading.flow.BaseFlow.initialize(BaseFlow.java:244)
    at cascading.flow.planner.FlowPlanner.buildFlow(FlowPlanner.java:203)
    ... 26 more

The reason is because in the Scalding code for joinWithTiny you have this:

def joinWithTiny(fs: (Fields, Fields), that: Pipe) = {
    val intersection = asSet(fs._1).intersect(asSet(fs._2))
    if (intersection.isEmpty) {
      new HashJoin(assignName(pipe), fs._1, assignName(that), fs._2, WrappedJoiner(new InnerJoin))
    } else {
      val (renamedThat, newJoinFields, temp) = renameCollidingFields(that, fs._2, intersection)
      (new HashJoin(assignName(pipe), fs._1, assignName(renamedThat), newJoinFields, WrappedJoiner(new InnerJoin)))
        .discard(temp)
    }
  }

Notice how the Joiner has been wrapped with WrappedJoiner? In Cascading Flink you have this:

    private DataSet<Tuple> translateHashJoin(List<DataSet<Tuple>> inputs, FlowNode node) {

        HashJoin hashJoin = (HashJoin) getCommonSuccessor(node.getSourceElements(), node);
        Joiner joiner = hashJoin.getJoiner();

        int numJoinInputs = hashJoin.isSelfJoin() ? hashJoin.getNumSelfJoins() + 1 : inputs.size();

        Fields[] inputFields = new Fields[numJoinInputs];
        Fields[] keyFields = new Fields[numJoinInputs];
        String[][] flinkKeys = new String[numJoinInputs][];

        List<DataSet<Tuple>> joinInputs = computeSpliceInputsFieldsKeys(hashJoin, node, inputs, inputFields, keyFields, flinkKeys);

        if(keyFields[0].isNone()) {
            // Cartesian product
            return translateInnerCrossProduct(node, joinInputs);
        }
        else if(joiner.getClass().equals(InnerJoin.class)) {
            // inner join with keys
            return translateInnerHashJoin(node, joinInputs, inputFields, keyFields, flinkKeys);
        }
        else if (joiner.getClass().equals(LeftJoin.class)) {
            return translateLeftHashJoin(node, joinInputs, inputFields, keyFields, flinkKeys);
        }
        else {
            throw new FlowException("HashJoin does only support InnerJoin and LeftJoin.");
        }
    }

I thought it would be simple as replacing joiner.getClass().equals(InnerJoin.class) with joiner.getClass().isAssignableFrom(InnerJoin.class) but that's not the case since WrappedJoiner extends Joiner and not InnerJoin (or LeftJoin).

Not sure if there is a simple solution, but wanted to get the bug in in case anyone has any ideas!

Add Debug messages about Flinks Type System

  • Tuples without schema (Fields.UNKNOWN)
  • Untyped fields (Kryo serialized)
  • Non-normalizable comparators

This information is important to tune the performance of a Cascading flow.

Test Cascading Flink on a cluster

Cascading-Flink has only been locally tested. We need to run some large-scale cluster tests to see if it works and how it performs.

Compile CoGroup without GroupReduce operator

Currently, a Cascading CoGroup is compiled into a Flink CoGroup and a subsequent GroupReduce. We can remove the GroupReduce and do everything in a Flink CoGroup.
We can also decrease the Tuple[] size by 1.

Translate binary CoGroup to Flink CoGroup

At the moment, Cascading CoGroups are translated to Flink Reduce similar as in MapReduce. We can translate binary Cascading CoGroups to Flink CoGroups which are more robust and probably more efficient.

Change Flink job submission to be detached

Right now, jobs are submitted via a LocalExecutionEnvironment with execute() which is a blocking call. Job submission should go through Flink's Client class (or Executor) and be non-blocking.
This needs to be change in FlinkFlowStepJob.internalNonBlockingStart().

Translate CoGroups to OuterJoins

Right now, Cascading CoGroups are translated to Flink CoGroups which perform a full outer join. Once Flink features a native outer join implementation, Cascading CoGroups should be translated to such.

Build failure on Mac

(Normally I'd post this to a mailing list, but I didn't see one listed in the docs)

git clone https://github.com/dataArtisans/cascading-flink.git
cd cascading-flink
mvn clean install -DskipTests

and got...

[ERROR] COMPILATION ERROR :
[INFO] -------------------------------------------------------------
[ERROR] /Users/kenkrugler/git/cascading-flink/src/main/java/com/dataartisans/flink/cascading/planner/FlinkFlowStepJob.java:[189,54] cannot find symbol
symbol: method run(org.apache.flink.runtime.jobgraph.JobGraph,boolean)
location: variable client of type org.apache.flink.client.program.Client

This is on Mac OS X 10.8.5, javac 1.7.0_55
Looks like a compatibility issue with Flink 0.10-SNAPSHOT, where Client.run() now has a different signature.

BTW, after running "mvn eclipse:clean eclipse:eclipse" I had to fix up a few other project settings - two files rely on the default Eclipse encoding being UTF-8, whereas this should be set explicitly in the project definition, and the project was using my (ancient) Java 1.6 runtime, which meant 1.6 compilation level, and that caused a problem with Collections.emptyIterator() not being defined.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.