vasia / gelly-streaming Goto Github PK
View Code? Open in Web Editor NEWAn experimental Graph Streaming API for Apache Flink
License: Apache License 2.0
An experimental Graph Streaming API for Apache Flink
License: Apache License 2.0
For the bipartite detection algorithm and the top-k degrees.
@Zainab-Abbas has developed several streaming partitioning algorithms in her fork. We should port the best performing ones here.
Hello what is the article of org.apache.flink.graph.streaming.example.BroadcastTriangleCount
?
slice()
will return a GraphWindowStream
, i.e. a discretized GraphStream
.
We can add a flag to this method or simply call graphStream.distinct().getDegrees()
to get the current behavior.
There are also multiple ways to create or process graphs based on a stream. All of these should eventually be supported.
According to this paper, closeness and geometric centralities can be efficiently approximated in the semi-streaming model. We should examine whether these algorithms (or variations) can be applied to unbounded streams and if yes, implement them and compare.
The reduce function is setting wrong component id in some cases
Example:
after folding we get:
{2=[2, 4]}{3=[3, 5]}{1=[1, 3]}
reduce on this data gives:
{2=[2, 4], 5=[1, 3, 5]}
Expected output:
{2=[2, 4], 1=[1, 3, 5]}
Hi there.
I read in the presentation on gelly-stream (https://www.slideshare.net/vkalavri/gellystream-singlepass-graph-streaming-analytics-with-apache-flink) that it only supports single-pass graph computations, and moreover with only approximate results. Is that (still) correct?
I'm just getting started with Flink and did not find a way yet to get gelly-streaming running. Do you have step-by-step instructions somewhere that can help me get say ConnectedComponents running?
We can use a modified version of the Triest-Base algorithm from this paper to compute exact global and local triangle counts. The algorithm requires storing the complete graph in memory plus the counters and it requires one neighborhood set intersection per edge.
We should look into:
Create an interface AggregationState
that user-defined summaries must implement.
Currently, only the window triangles example is covered by a test. We should add tests for connected components and the bipartiteness check, so we don't let issues like #11 happen again :)
Hi
I want to test GellyStream's examples on Flink environment, Flink version is 1.2.0, java is java8, operator system is RedHat, but with the error, and can't successful (local model and cluster model also error).
the error is like flink can't run with java-8,
The program finished with the following exception:
org.apache.flink.client.program.ProgramInvocationException: The program's entry point class 'org.apache.flink.graph.streaming.example.ExactTriangleCount' could not be loaded due to a linkage failure.
at org.apache.flink.client.program.PackagedProgram.loadMainClass(PackagedProgram.java:625)
at org.apache.flink.client.program.PackagedProgram.(PackagedProgram.java:198)
at org.apache.flink.client.CliFrontend.buildProgram(CliFrontend.java:890)
at org.apache.flink.client.CliFrontend.run(CliFrontend.java:228)
at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:1073)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1120)
at org.apache.flink.client.CliFrontend$2.call(CliFrontend.java:1117)
at org.apache.flink.runtime.security.HadoopSecurityContext$1.run(HadoopSecurityContext.java:43)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:40)
at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1116)
Caused by: java.lang.UnsupportedClassVersionError: org/apache/flink/graph/streaming/example/ExactTriangleCount : Unsupported major.minor version 52.0
Hi,
I try to run IterativeConnectedComponents with socketStream, when input of: 1,3 1,4,,1,5 the result is OK, when input "5,6", the result print so many "6,1":
(1,1)
(3,1)
(4,1)
(4,1)
(4,1)
(4,1)
(4,1)
(4,1)
(4,1)
(4,1)
(4,1)
(4,1)
(4,4)
(5,4)
(4,1)
(5,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
(6,1)
Process finished with exit code -1
could you tell me how to run this algorithm? thank you very much.
It would be nice to offer ConnectedComponents, TriangleCount etc as library methods instead of plain examples.
We can keep the examples to demonstrate use cases on how to use these aggregation functions and add unit tests to test them.
Hi!
I wondering how stable is this library to use and run Connected Components in production? I see the following https://github.com/vasia/gelly-streaming/blob/master/src/main/java/org/apache/flink/graph/streaming/SummaryBulkAggregation.java#L78 and I am wondering why is it hardcoded to 0? There is no parallelism here?
New triangle estimation algorithms that look like a good fit for gelly-stream
http://www.kdd.org/kdd2016/papers/files/rfp0465-de-stefaniA.pdf
Add a degree distribution algorithm/example that ingests a fully dynamic stream of edges (both additions and deletions) and emits a stream of (degree, count). We can initially implement this with a custom stream of Tuple3<SrcID, TrgID, EventType>
and later add a fully dynamic GraphStream
type.
@vasia I added a windowAssigner-agnostic slice method to GraphStream / SimpleEdgeStream, hoping to generalize slice for both use cases (see here), but much of the existing logic revolves around neighborhoods, and is apparently spread across a couple classes. I'll open another issue to discuss refactoring it.
I started over and hacked together a non-reducing connected components sliding window example in a unit test, using only the DisjointSet implementation from gelly-streaming (see here). This essentially reproduces most of WindowGraphAggregation with a No-Op reduce step.
I now (again) think PR #25 is the best way forward for these sorts of non-reducing window folds. What do you think?
As an aside, I think there's probably a more powerful abstraction underneath WindowGraphAggregation (other than GraphAggregation), but I'm not sure what to call it. EdgeStream -> distribute edges uniformly -> apply an arbitrary window -> local fold on each graph partition-window. For global connected components aggregation, add a reduce step at the end.
All relevant Gelly methods should be ported. Some of these are:
Each of these have to be considered for both normal streams and (tumbling?) windows.
Hi,
I want to test ExactTriangleCount with the fellow test data in my IDEA local model , but the result I think is not right:
new Edge<>(1, 2, NullValue.getInstance()),
new Edge<>(2, 3, NullValue.getInstance()),
new Edge<>(2, 6, NullValue.getInstance()),
new Edge<>(5, 6, NullValue.getInstance()),
new Edge<>(1, 4, NullValue.getInstance()),
new Edge<>(5, 3, NullValue.getInstance()),
new Edge<>(3, 4, NullValue.getInstance()),
new Edge<>(3, 6, NullValue.getInstance()),
new Edge<>(1, 3, NullValue.getInstance())),
In addition,the results are not the same for each run, and one time the result is like this:
1> (-1,1)
3> (6,1)
8> (2,1)
2> (3,1)
7> (1,1)
2> (3,2)
8> (4,1)
1> (-1,2)
with vertexID=3, the triangle is 4 ,not 1; so I want to know how to run this algorithm?
thank you very much.
A preliminary aggregation scheme for global property computation can be the following:
Hi
I want to develop real-time PageRank incremental algorithm, but I do not know how to implement iterative calculations, do you have some idea can help me ,Thank you.
I couldn't find checkpoint system and Scala and python version of Gelly streaming. what are the main problems to implement these ideas on this library for production usage?
For both iterateFor
and iterateFixpoint
and even if the source emits a Long.MaxValue watermark in the end.
GraphWindowStream
should provide access to the same transformations and properties with GraphStream
. Moreover, it should support neighborhood aggregation, reduceOnNeighbors
, which takes a user-defined neighborhood function and a reduce function to apply it per vertex neighborhood.
The current API of initialState()
assumes that all vertices will initially have the same state. This might not be true for some algorithms, e.g. in SSSP the source has an initial value of and all other vertices have inifinity. The issue can be bypassed by setting the initial state in preCompute()
instead.
(Potentially related to #16 ?)
Hi,
Just tried to clone this repo and run the unit tests using mvn
(recent master @ f0cc8e1). One test failed due to the error below:
Running org.apache.flink.graph.streaming.test.operations.TestSlice
FoldFunction can not be a RichFunction. Please use apply(FoldFunction, WindowFunction) instead.
java.lang.UnsupportedOperationException: FoldFunction can not be a RichFunction. Please use apply(FoldFunction, WindowFunction) instead.
at org.apache.flink.streaming.api.datastream.WindowedStream.fold(WindowedStream.java:183)
at org.apache.flink.graph.streaming.GraphWindowStream.foldNeighbors(GraphWindowStream.java:63)
at org.apache.flink.graph.streaming.test.operations.TestSlice.testFoldNeighborsDefault(TestSlice.java:77)
at org.apache.flink.graph.streaming.test.operations.TestSlice.testProgram(TestSlice.java:59)
at org.apache.flink.streaming.util.StreamingProgramTestBase.testJob(StreamingProgramTestBase.java:85)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
at org.apache.maven.surefire.junit4.JUnit4TestSet.execute(JUnit4TestSet.java:53)
at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:123)
at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:104)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:164)
at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:110)
at org.apache.maven.surefire.booter.SurefireStarter.invokeProvider(SurefireStarter.java:175)
at org.apache.maven.surefire.booter.SurefireStarter.runSuitesInProcessWhenForked(SurefireStarter.java:107)
at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:68)
Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 0.296 sec <<< FAILURE!
Results :
Failed tests: testJob(org.apache.flink.graph.streaming.test.operations.TestSlice): Error while calling the test program: FoldFunction can not be a RichFunction. Please use apply(FoldFunction, WindowFunction) instead.
Java version:
java version "1.8.0_91"
Java(TM) SE Runtime Environment (build 1.8.0_91-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.91-b14, mixed mode)
OS:
Distributor ID: Ubuntu
Description: Ubuntu 12.04 LTS
Release: 12.04
Codename: precise
Right now, GraphWindowStream claims to operate over vertex neighborhoods, but that's only the case because SimpleGraphStream instantiates GraphWindowStreams with streams keyed by a NeighborKeySelector.
SimpleGraphStream#slice would be more valuable if it did not exclusively key streams in this way. For example, I think the aggregating Connected Components example could benefit from GraphWindowStream#slice if it allowed keying the stream arbitrarily (i.e. by subtask index).
GraphWindowStreams do not necessarily operate on neighborhoods, only coincidentally. One option is to rename this class and alter it to control the stream keying itself, to enforce its neighbor operation claims.
Hello,
I want to use GellyStream algorithm API in my Scala project, but when I transform ConnectedComponentsExample.java code to My Scala code, but when compile with error:
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Specifying keys via field positions is only valid for tuple data types. Type: GenericType<scala.Tuple2>
at org.apache.flink.api.common.operators.Keys$ExpressionKeys.(Keys.java:232)
at org.apache.flink.api.common.operators.Keys$ExpressionKeys.(Keys.java:223)
at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:257)
at com.huawei.flink_test.gelly_stream_test.ConnectedComponent$.main(ConnectedComponent.scala:120)
at com.huawei.flink_test.gelly_stream_test.ConnectedComponent.main(ConnectedComponent.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
My code :
val edges: GraphStream[lang.Long, NullValue, NullValue] = getGraphStream(socketStream,split,env)
val init_val: (lang.Long, lang.Long) = (0l,0l)
val cc_new: ConnectedComponents[lang.Long, NullValue] = new ConnectedComponents[java.lang.Long, NullValue](mergeWindowTime)
val cc = edges.aggregate(cc_new)
val vertexInfo = cc.flatMap(new FlatMapFunction[DisjointSet[lang.Long], (lang.Long, lang.Long)] {
override def flatMap(set:DisjointSet[lang.Long], out: Collector[Tuple2[lang.Long, lang.Long]]) = {
val matches= set.getMatches
wrapAsScala.mapAsScalaMap(matches).foreach(out.collect)
}
})
val keyValuePair = vertexInfo.keyBy(0).timeWindow(Time.of(printWindowTime, TimeUnit.MILLISECONDS))
Following exception is thrown by updating pom file based on latest commit when we debug the code:
FoldFunction can not be a RichFunction, Please use apply(FoldFunction, WindowFunction) instead.
at
or define GraphStream as an abstract type which will be implemented by EdgeOnlyStream.
Error:
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'run(WindowGraphAggregation.java:59)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
at org.apache.flink.streaming.api.transformations.StreamTransformation.getOutputType(StreamTransformation.java:211)
at org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:139)
at org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:575)
at org.apache.flink.graph.streaming.WindowGraphAggregation.run(WindowGraphAggregation.java:59)
at org.apache.flink.graph.streaming.SimpleEdgeStream.aggregate(SimpleEdgeStream.java:105)
at org.apache.flink.graph.streaming.example.ConnectedComponents.main(ConnectedComponents.java:62)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The types of the interface org.apache.flink.api.common.functions.FoldFunction could not be inferred. Support for synthetic interfaces, lambdas, and generic or raw types is limited at this point
at org.apache.flink.api.java.typeutils.TypeExtractor.getParameterType(TypeExtractor.java:708)
at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:366)
at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfo(TypeExtractor.java:346)
at org.apache.flink.graph.streaming.WindowGraphAggregation$PartialAgg.getProducedType(WindowGraphAggregation.java:99)
at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:278)
at org.apache.flink.api.java.typeutils.TypeExtractor.getFoldReturnTypes(TypeExtractor.java:131)
at org.apache.flink.streaming.api.datastream.WindowedStream.fold(WindowedStream.java:193)
... 8 more
This class should support multiple ways to stream graph data:
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.