dataartisans / cascading-flink Goto Github PK
View Code? Open in Web Editor NEWCascading on Apache Flink®
License: Apache License 2.0
Cascading on Apache Flink®
License: Apache License 2.0
Trying to get the Flink DAG as JSON using ./bin/flink info
failed with this exception:
java.lang.ClassCastException: org.apache.flink.client.program.OptimizerPlanEnvironment cannot be cast to org.apache.flink.client.program.ContextEnvironment
at com.dataartisans.flink.cascading.planner.FlinkFlowStepJob.internalNonBlockingStart(FlinkFlowStepJob.java:164)
at cascading.flow.planner.FlowStepJob.blockOnJob(FlowStepJob.java:269)
at cascading.flow.planner.FlowStepJob.start(FlowStepJob.java:184)
at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:146)
at cascading.flow.planner.FlowStepJob.call(FlowStepJob.java:48)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
No JoinPrepareMap with Tuple[].
As per Chris Wensel, this should be excluded:
make sure you ‘exclude’ *TestCase from your unit test pattern.
all Cascading tests are *PlatformTest and *Test. there are no tests in *TestCase
And without this exclusion you wind up getting an odd JUnit error.
junit.framework.AssertionFailedError: Class cascading.ComparePlatformsTest$CompareTestCase has no public constructor TestCase(String name) or TestCase()
So a simple change to the pom.xml file's section for the maven-surefire-plugin:
<excludes>
<exclude>${excluded.tests}</exclude>
<exclude>**/*TestCase.java</exclude>
</excludes>
The Cascading Flink connector uses different execution paths for pipes with typed and untyped fields. These paths are not fully covered by Cascading's platform tests because these tests are mostly based on untyped fields.
Flink can be executed in
These options should be configurable in the FlinkConnector.
Scheme.setNumSinkParts(1)
should result in a single part-00000 file, and thus the upstream grouping should implicitly have a parallelism of 1. Or at least I assume that's how it would need to be handled, since I don't think DataSink is being used here, so you can't set the parallelism on that.
In the Javadocs for Cascading, this call is described as a suggestion (e.g. if your Flow only has maps then no such parallelism can be guaranteed) but it does wind up being relied upon by many workflows, when generating a small output file that has to be globally sorted.
In DefinedTupleSerializer your are calling.
Object copy = fieldSers[i].copy(from.getObject(i));
But getObject()
could return null
, which is not supported by serializers. A null check is necessary here.
In the current implementation, BufferJoins might fail if the inputs do not have identical tuple schema.
Implement FlinkFlowStepJob.internalBlockOnStop()
to stop/cancel a running Flink job.
This feature is tested in CascadePlatformTest.testSimpleCascadeStop()
.
N-ary joins and CoGroups build a Tuple2<Tuple, Tuple[]>
where the Tuple[]
field is treated as a ObjectArrayTypeInfo
. The corresponding serializer is inefficient for our case:
Tuple
fields.In our case, all array have the same (known) length, we can flag null values using a bitmap, and use a custom Tuple
serializer.
Cascading executes n-ary LeftJoin as follows
Only the farthest right tuple stream will be used as the outer join. All preceding joins to the left will be inner joins.
RightJoins are treated correspondingly. MixedJoins use combinations of inner and outer joins.
Right now, HashJoin does only support InnerJoins, CoGroup is always executed with a FullOuterJoin. We can make the execution of n-ary joins a bit more efficient, if we respect Cascading's join semantics.
HashJoin operators should have the same parallelism as the probe side input to avoid data shuffling.
Right now, all GroupBy is compiled to Partition -> SortPartition -> GroupReduce
to support all possible combinations of sort inversion and group sorting.
We should add shortcut to compile the common cases of GroupBy to only GroupReduce where possible to avoid serialization overheads.
Since the data sink DOP is fixed to 1 (see #4 ), data is shuffled to the sink and not ordered any more.
We might also need to add range partitioning for fully sorted output. We need to check how this works in Hadoop/Tez.
Flink does not provide access to the RuntimeContext
in InputFormats
and OutputFormats
. Once FLINK-1819 is fixed, we need to access the proper RuntimeContext
in CascadingInputFormat
and CascadingOutputFormat
and get rid of the FakeRuntimeContext
.
The test JoinFieldedPipesPlatformTest.testJoinMergeGroupBy()
is expected to fail (because FlinkTestPlatform.isDag()
returns true
) but does execute correctly on Flink.
Cascading tuples can have type information. We should use this information to register field type classes with the Kryo serializer.
The test TapPlatformTest.testMultiSourceIterator()
is failing for a DOP > 1.
Flink's counters (called Accumulators) are currently extended for online updates. As soon as these online counters are available from the client, the need to be integrated with Cascading's FlinkFlowStepStats
.
Results :
Failed tests:
TestSuite$1.warning Class cascading.ComparePlatformsTest$CompareTestCase has no public constructor TestCase(String name) or TestCase()
Hi,
I tried installing it like explained, but I get the following maven error:
[INFO] -------------------------------------------------------------
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 01:00 min
[INFO] Finished at: 2015-08-28T10:27:01+02:00
[INFO] Final Memory: 35M/304M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project cascading-flink: Compilation failure: Compilation failure:
[ERROR] /Users/akelpe/code/thirdparty/cascading-flink/src/main/java/com/dataArtisans/flinkCascading/runtime/groupBy/GroupByInGate.java:[125,24] com.dataArtisans.flinkCascading.runtime.groupBy.GroupByInGate.KeyPeekingIterator is not abstract and does not override abstract method remove() in java.util.Iterator
[ERROR] /Users/akelpe/code/thirdparty/cascading-flink/src/main/java/com/dataArtisans/flinkCascading/runtime/hashJoin/JoinClosure.java:[112,17] com.dataArtisans.flinkCascading.runtime.hashJoin.JoinClosure.SingleTupleIterator is not abstract and does not override abstract method remove() in java.util.Iterator
[ERROR] /Users/akelpe/code/thirdparty/cascading-flink/src/main/java/com/dataArtisans/flinkCascading/runtime/coGroup/regularJoin/CoGroupInGate.java:[132,24] com.dataArtisans.flinkCascading.runtime.coGroup.regularJoin.CoGroupInGate.JoinResultIterator is not abstract and does not override abstract method remove() in java.util.Iterator
[ERROR] /Users/akelpe/code/thirdparty/cascading-flink/src/main/java/com/dataArtisans/flinkCascading/runtime/coGroup/bufferJoin/CoGroupBufferInGate.java:[121,24] com.dataArtisans.flinkCascading.runtime.coGroup.bufferJoin.CoGroupBufferInGate.KeyPeekingIterator is not abstract and does not override abstract method remove() in java.util.Iterator
[ERROR] /Users/akelpe/code/thirdparty/cascading-flink/src/main/java/com/dataArtisans/flinkCascading/runtime/coGroup/bufferJoin/CoGroupBufferClosure.java:[298,24] com.dataArtisans.flinkCascading.runtime.coGroup.bufferJoin.CoGroupBufferClosure.FlinkUnwrappingIterator is not abstract and does not override abstract method remove() in java.util.Iterator
[ERROR] -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoFailureException
It's not required, but it's considered standard practice for Cascading workflows, as then you can run the workflow multiple times with the same output directory, and Cascading takes care of getting rid of it so you don't run into the classic "directory already exists" error.
Tap outTap = new Hfs(new TextDelimited(false, "\n"), args[1], SinkMode.REPLACE);
Flink features two modes to exchange data between parallel tasks over the network.
This issue is about making the robust, yet slower, BATCH mode the default mode for Cascading jobs on Flink. We add a parameter to switch back to PIPELINED mode.
If two or more pipes with different tuple schemas (i.e., field types) should be unioned, we have to ensure that the type information of all pipes is identical. This should be done by comparing the types of all fields and unifying them either by adding missing types or removing existing types.
Alternatively we can throw a meaningful exception.
Traps do remove existing files in their output directory in case of SinkMode.Replace.
Files with matching names are overwritten, but if an existing file is not removed if it is not overwritten.
This is because traps are not handled as regular data sinks but are individually opened by Flink functions that need to write to a trap. We need to add the InitializeOnMaster hook to regular Flink functions similar to Flink OutputFormats. This hook can be used by Flink functions that included traps to initialize their output directories.
I noticed that your package names are camel case. That is very unusal. Consider making it all lowercase.
Cascading tuples are serialized with a boolean arrays to mark null-valued tuple fields. Each boolean value is serialized as a full byte.
This can be done more efficiently by using more space efficient bit masks.
At the moment, all joins are translated to a Flink Mapper with broadcast sets. Binary inner joins can be translated to more robust and native Flink joins.
Currently, running a CascadingDataSink with a DOP > 1 yields in empty results because the job commit logic removes the result files for some reason. We had a similar issue in Flink's Hadoop compatibility wrappers for Hadoop OutputFormats and solve it by moving the commiting to the JM via FinalizeOnMaster
(see FLINK-1139).
Right now, the DOP of data sinks is fixed to 1 in FlinkFlowStep.translateSink()
.
When running Scalding jobs on Flink (more info here: http://themodernlife.github.io/scala/hadoop/hdfs/sclading/flink/streaming/realtime/2015/12/20/running-scalding-jobs-on-apache-flink/) I noticed that the only join type that works was
pipe.joinWithSmaller(...)
Unfortunately a lot of jobs are going to use something like
pipe.joinWithTiny(...)
Currently this fails on Flink with
Caused by: cascading.flow.planner.PlannerException: [net.themodernlife.Word...] could not build flow from assembly: [HashJoin does only support InnerJoin and LeftJoin.]
at cascading.flow.planner.FlowPlanner.handleExceptionDuringPlanning(FlowPlanner.java:749)
at cascading.flow.planner.FlowPlanner.buildFlow(FlowPlanner.java:209)
at cascading.flow.FlowConnector.connect(FlowConnector.java:456)
at com.dataartisans.flink.cascading.FlinkConnector.connect(FlinkConnector.java:86)
at com.twitter.scalding.ExecutionContext$class.buildFlow(ExecutionContext.scala:74)
at com.twitter.scalding.ExecutionContext$$anon$1.buildFlow(ExecutionContext.scala:165)
at com.twitter.scalding.Job$$anonfun$buildFlow$1.apply(Job.scala:225)
at com.twitter.scalding.Job$$anonfun$buildFlow$1.apply(Job.scala:225)
at scala.util.Success.flatMap(Try.scala:231)
at com.twitter.scalding.Job.buildFlow(Job.scala:225)
at com.twitter.scalding.Job.run(Job.scala:295)
at com.twitter.scalding.Tool.start$1(Tool.scala:126)
at com.twitter.scalding.Tool.run(Tool.scala:142)
at com.twitter.scalding.Tool.run(Tool.scala:70)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at com.twitter.scalding.Tool$.main(Tool.scala:150)
... 12 more
Caused by: cascading.flow.FlowException: HashJoin does only support InnerJoin and LeftJoin.
at com.dataartisans.flink.cascading.planner.FlinkFlowStep.translateHashJoin(FlinkFlowStep.java:882)
at com.dataartisans.flink.cascading.planner.FlinkFlowStep.buildFlinkProgram(FlinkFlowStep.java:309)
at com.dataartisans.flink.cascading.planner.FlinkFlowStep.createFlowStepJob(FlinkFlowStep.java:126)
at com.dataartisans.flink.cascading.planner.FlinkFlowStep.createFlowStepJob(FlinkFlowStep.java:96)
at cascading.flow.planner.BaseFlowStep.getCreateFlowStepJob(BaseFlowStep.java:913)
at cascading.flow.BaseFlow.initializeNewJobsMap(BaseFlow.java:1318)
at cascading.flow.BaseFlow.initialize(BaseFlow.java:244)
at cascading.flow.planner.FlowPlanner.buildFlow(FlowPlanner.java:203)
... 26 more
The reason is because in the Scalding code for joinWithTiny
you have this:
def joinWithTiny(fs: (Fields, Fields), that: Pipe) = {
val intersection = asSet(fs._1).intersect(asSet(fs._2))
if (intersection.isEmpty) {
new HashJoin(assignName(pipe), fs._1, assignName(that), fs._2, WrappedJoiner(new InnerJoin))
} else {
val (renamedThat, newJoinFields, temp) = renameCollidingFields(that, fs._2, intersection)
(new HashJoin(assignName(pipe), fs._1, assignName(renamedThat), newJoinFields, WrappedJoiner(new InnerJoin)))
.discard(temp)
}
}
Notice how the Joiner
has been wrapped with WrappedJoiner
? In Cascading Flink you have this:
private DataSet<Tuple> translateHashJoin(List<DataSet<Tuple>> inputs, FlowNode node) {
HashJoin hashJoin = (HashJoin) getCommonSuccessor(node.getSourceElements(), node);
Joiner joiner = hashJoin.getJoiner();
int numJoinInputs = hashJoin.isSelfJoin() ? hashJoin.getNumSelfJoins() + 1 : inputs.size();
Fields[] inputFields = new Fields[numJoinInputs];
Fields[] keyFields = new Fields[numJoinInputs];
String[][] flinkKeys = new String[numJoinInputs][];
List<DataSet<Tuple>> joinInputs = computeSpliceInputsFieldsKeys(hashJoin, node, inputs, inputFields, keyFields, flinkKeys);
if(keyFields[0].isNone()) {
// Cartesian product
return translateInnerCrossProduct(node, joinInputs);
}
else if(joiner.getClass().equals(InnerJoin.class)) {
// inner join with keys
return translateInnerHashJoin(node, joinInputs, inputFields, keyFields, flinkKeys);
}
else if (joiner.getClass().equals(LeftJoin.class)) {
return translateLeftHashJoin(node, joinInputs, inputFields, keyFields, flinkKeys);
}
else {
throw new FlowException("HashJoin does only support InnerJoin and LeftJoin.");
}
}
I thought it would be simple as replacing joiner.getClass().equals(InnerJoin.class)
with joiner.getClass().isAssignableFrom(InnerJoin.class)
but that's not the case since WrappedJoiner
extends Joiner
and not InnerJoin
(or LeftJoin
).
Not sure if there is a simple solution, but wanted to get the bug in in case anyone has any ideas!
Cascading supports negative field indices, where -1 refers to the last tuple position.
This does currently not work if the tuple schema/size of a pipe in not known (Fields.UNKNOWN
).
Right now, Cascading on Flink uses Kryo to serialize tuples with unknown schema.
Wrapping to Cascading's own serialization logic in Flink's serializer might improve the performance.
The master branch is still on 0.1-SNAPSHOT, but the flink-1.0 branch was/is using 0.2-SNAPSHOT. So I've got all my projects switched to that version.
I would like to release cascading-flink 0.1 to maven central, pointing to Flink 0.10.0 and some cascading WIP.
I think with this commit, we have that available: 7ac7111
Are there any release blockers?
This information is important to tune the performance of a Cascading flow.
I will work on preparing our infrastructure to deploy cascading on flink to the maven snapshots and maven central.
This issues looks similar to the #4
Temporary files are correctly written, but the commit deletes everything.
Cascading-Flink has only been locally tested. We need to run some large-scale cluster tests to see if it works and how it performs.
Currently, a Cascading CoGroup is compiled into a Flink CoGroup and a subsequent GroupReduce. We can remove the GroupReduce and do everything in a Flink CoGroup.
We can also decrease the Tuple[] size by 1.
At the moment, Cascading CoGroups are translated to Flink Reduce similar as in MapReduce. We can translate binary Cascading CoGroups to Flink CoGroups which are more robust and probably more efficient.
Right now, jobs are submitted via a LocalExecutionEnvironment with execute()
which is a blocking call. Job submission should go through Flink's Client class (or Executor) and be non-blocking.
This needs to be change in FlinkFlowStepJob.internalNonBlockingStart()
.
Right now, Cascading CoGroups are translated to Flink CoGroups which perform a full outer join. Once Flink features a native outer join implementation, Cascading CoGroups should be translated to such.
Also update the source code license header which licenses the code to the ASF.
Right now, we only use Flink native serializers for basic types (Java primitives, boxed primitives, String).
We should use Flink's native serializers and comparators for types that implement Writable and WritableComparable.
Cascading allows users to add Jar and resource files via FlowDef.addToClassPath()
. These resources must be available at runtime in Flink's usercode classloaders.
This feature is tested in ClasspathPlatformTest
.
The following tests in JoinFieldedPipesPlatformTest
fail when the join is executed with a DOP > 1:
(Normally I'd post this to a mailing list, but I didn't see one listed in the docs)
git clone https://github.com/dataArtisans/cascading-flink.git
cd cascading-flink
mvn clean install -DskipTests
and got...
[ERROR] COMPILATION ERROR :
[INFO] -------------------------------------------------------------
[ERROR] /Users/kenkrugler/git/cascading-flink/src/main/java/com/dataartisans/flink/cascading/planner/FlinkFlowStepJob.java:[189,54] cannot find symbol
symbol: method run(org.apache.flink.runtime.jobgraph.JobGraph,boolean)
location: variable client of type org.apache.flink.client.program.Client
This is on Mac OS X 10.8.5, javac 1.7.0_55
Looks like a compatibility issue with Flink 0.10-SNAPSHOT, where Client.run() now has a different signature.
BTW, after running "mvn eclipse:clean eclipse:eclipse" I had to fix up a few other project settings - two files rely on the default Eclipse encoding being UTF-8, whereas this should be set explicitly in the project definition, and the project was using my (ancient) Java 1.6 runtime, which meant 1.6 compilation level, and that caused a problem with Collections.emptyIterator() not being defined.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.