Hello what is the article of org.apache.flink.graph.streaming.example.BroadcastTriangleCount?

Support multiple ways to process GraphStreams

There are also multiple ways to create or process graphs based on a stream. All of these should eventually be supported.

  • Regular stream -- updated on each event
  • Window graph streams
    • Sliding windows
    • Tumbling windows

Implement semi-streaming geometric centralities

According to this paper, closeness and geometric centralities can be efficiently approximated in the semi-streaming model. We should examine whether these algorithms (or variations) can be applied to unbounded streams and if yes, implement them and compare.

Connected components merger issue

The reduce function is setting wrong component id in some cases

after folding we get:
{2=[2, 4]}{3=[3, 5]}{1=[1, 3]}

reduce on this data gives:
{2=[2, 4], 5=[1, 3, 5]}

Expected output:
{2=[2, 4], 1=[1, 3, 5]}

Clarification of gelly-stream features

Hi there.

I read in the presentation on gelly-stream ( that it only supports single-pass graph computations, and moreover with only approximate results. Is that (still) correct?

I'm just getting started with Flink and did not find a way yet to get gelly-streaming running. Do you have step-by-step instructions somewhere that can help me get say ConnectedComponents running?

Exact global and local triangle count

We can use a modified version of the Triest-Base algorithm from this paper to compute exact global and local triangle counts. The algorithm requires storing the complete graph in memory plus the counters and it requires one neighborhood set intersection per edge.
We should look into:

  • efficient neighborhood representations
  • algorithms for efficient neighborhood intersection, e.g. keep neighborhoods sorted and use binary search if one of them is much smaller than the other
  • a version of the algorithm using windows to batch together multiple intersections.

Add tests for the examples

Currently, only the window triangles example is covered by a test. We should add tests for connected components and the bipartiteness check, so we don't let issues like #11 happen again :)

How to run GellyStream on Flink

I want to test GellyStream's examples on Flink environment, Flink version is 1.2.0, java is java8, operator system is RedHat, but with the error, and can't successful (local model and cluster model also error).
the error is like flink can't run with java-8,

The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The program's entry point class 'org.apache.flink.graph.streaming.example.ExactTriangleCount' could not be loaded due to a linkage failure.
at org.apache.flink.client.program.PackagedProgram.loadMainClass(
at org.apache.flink.client.program.PackagedProgram.(
at org.apache.flink.client.CliFrontend.buildProgram(
at org.apache.flink.client.CliFrontend.parseParameters(
at org.apache.flink.client.CliFrontend$
at org.apache.flink.client.CliFrontend$
at Method)
at org.apache.flink.client.CliFrontend.main(
Caused by: java.lang.UnsupportedClassVersionError: org/apache/flink/graph/streaming/example/ExactTriangleCount : Unsupported major.minor version 52.0

Why run IterativeConnectedComponents is similar to the death cycle

I try to run IterativeConnectedComponents with socketStream, when input of: 1,3 1,4,,1,5 the result is OK, when input "5,6", the result print so many "6,1":

Process finished with exit code -1
could you tell me how to run this algorithm? thank you very much.

Port Examples to Library Window Aggregation Functions

It would be nice to offer ConnectedComponents, TriangleCount etc as library methods instead of plain examples.

We can keep the examples to demonstrate use cases on how to use these aggregation functions and add unit tests to test them.

Degree distribution example

Add a degree distribution algorithm/example that ingests a fully dynamic stream of edges (both additions and deletions) and emits a stream of (degree, count). We can initially implement this with a custom stream of Tuple3<SrcID, TrgID, EventType> and later add a fully dynamic GraphStream type.

Adding data-parallel window graph aggregation

@vasia I added a windowAssigner-agnostic slice method to GraphStream / SimpleEdgeStream, hoping to generalize slice for both use cases (see here), but much of the existing logic revolves around neighborhoods, and is apparently spread across a couple classes. I'll open another issue to discuss refactoring it.

I started over and hacked together a non-reducing connected components sliding window example in a unit test, using only the DisjointSet implementation from gelly-streaming (see here). This essentially reproduces most of WindowGraphAggregation with a No-Op reduce step.

I now (again) think PR #25 is the best way forward for these sorts of non-reducing window folds. What do you think?

As an aside, I think there's probably a more powerful abstraction underneath WindowGraphAggregation (other than GraphAggregation), but I'm not sure what to call it. EdgeStream -> distribute edges uniformly -> apply an arbitrary window -> local fold on each graph partition-window. For global connected components aggregation, add a reduce step at the end.

Port Gelly Graph methods to GraphStreams

All relevant Gelly methods should be ported. Some of these are:

  • Graph creation
    • fromRandomEdgeStream()
    • fromIncidenceStream()
  • Graph stream properties
    • getVertices(), getEdges()
    • getVertexIds(), getEdgeIds()
    • inDegrees(), outDegrees(), getDegrees()
    • numberOfVertices(), numberOfEdges()
  • Graph stream transformations
    • mapVertices(), mapEdges()
    • filterOnVertices(), filterOnEdges()
    • subgraph()
    • reverse()
    • undirected()
    • union()
  • Neighborhood methods

Each of these have to be considered for both normal streams and (tumbling?) windows.

The result of ExactTriangleCount

I want to test ExactTriangleCount with the fellow test data in my IDEA local model , but the result I think is not right:
new Edge<>(1, 2, NullValue.getInstance()),
new Edge<>(2, 3, NullValue.getInstance()),
new Edge<>(2, 6, NullValue.getInstance()),
new Edge<>(5, 6, NullValue.getInstance()),
new Edge<>(1, 4, NullValue.getInstance()),
new Edge<>(5, 3, NullValue.getInstance()),
new Edge<>(3, 4, NullValue.getInstance()),
new Edge<>(3, 6, NullValue.getInstance()),
new Edge<>(1, 3, NullValue.getInstance())),

In addition,the results are not the same for each run, and one time the result is like this:
1> (-1,1)
3> (6,1)
8> (2,1)
2> (3,1)
7> (1,1)
2> (3,2)
8> (4,1)
1> (-1,2)

with vertexID=3, the triangle is 4 ,not 1; so I want to know how to run this algorithm?

thank you very much.

Add single pass aggregation support for global graph properties

A preliminary aggregation scheme for global property computation can be the following:

  1. EdgeStream keyBy partitionID
  2. Parallel window fold for incremental local partial aggregation (e.g. bipartition groups, connected components etc)
  3. (Optional) combiner for partial states
  4. (Optional) merge function that converts partial state to final aggregation state
  5. (Optional) flag for transient/persistent state across windows

Scala version

I couldn't find checkpoint system and Scala and python version of Gelly streaming. what are the main problems to implement these ideas on this library for production usage?

implement GraphWindowStream

GraphWindowStream should provide access to the same transformations and properties with GraphStream. Moreover, it should support neighborhood aggregation, reduceOnNeighbors, which takes a user-defined neighborhood function and a reduce function to apply it per vertex neighborhood.

Allow initialState() to access the vertex context

The current API of initialState() assumes that all vertices will initially have the same state. This might not be true for some algorithms, e.g. in SSSP the source has an initial value of and all other vertices have inifinity. The issue can be bypassed by setting the initial state in preCompute() instead.

[Discuss] Decouple SimpleEdgeStream and GraphWindowStream from neighbor stream keying

Right now, GraphWindowStream claims to operate over vertex neighborhoods, but that's only the case because SimpleGraphStream instantiates GraphWindowStreams with streams keyed by a NeighborKeySelector.

SimpleGraphStream#slice would be more valuable if it did not exclusively key streams in this way. For example, I think the aggregating Connected Components example could benefit from GraphWindowStream#slice if it allowed keying the stream arbitrarily (i.e. by subtask index).

GraphWindowStreams do not necessarily operate on neighborhoods, only coincidentally. One option is to rename this class and alter it to control the stream keying itself, to enforce its neighbor operation claims.

How to use GellyStream API in Scala code

I want to use GellyStream algorithm API in my Scala project, but when I transform code to My Scala code, but when compile with error:
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Specifying keys via field positions is only valid for tuple data types. Type: GenericType<scala.Tuple2>
at org.apache.flink.api.common.operators.Keys$ExpressionKeys.(
at org.apache.flink.api.common.operators.Keys$ExpressionKeys.(
at org.apache.flink.streaming.api.datastream.DataStream.keyBy(
at com.huawei.flink_test.gelly_stream_test.ConnectedComponent$.main(ConnectedComponent.scala:120)
at com.huawei.flink_test.gelly_stream_test.ConnectedComponent.main(ConnectedComponent.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(
at sun.reflect.DelegatingMethodAccessorImpl.invoke(
at java.lang.reflect.Method.invoke(
at com.intellij.rt.execution.application.AppMain.main(

My code :
val edges: GraphStream[lang.Long, NullValue, NullValue] = getGraphStream(socketStream,split,env)
val init_val: (lang.Long, lang.Long) = (0l,0l)

val cc_new: ConnectedComponents[lang.Long, NullValue] = new ConnectedComponents[java.lang.Long, NullValue](mergeWindowTime)
val cc = edges.aggregate(cc_new)
 val vertexInfo = cc.flatMap(new FlatMapFunction[DisjointSet[lang.Long], (lang.Long, lang.Long)] {
   override def flatMap(set:DisjointSet[lang.Long], out: Collector[Tuple2[lang.Long, lang.Long]]) = {
          val matches= set.getMatches

val keyValuePair = vertexInfo.keyBy(0).timeWindow(Time.of(printWindowTime, TimeUnit.MILLISECONDS))

Create a GraphStream class

This class should support multiple ways to stream graph data:

  • Random edge streams (with or without duplicates)
  • Incidence streams
  • Mutation events -- edge addition, deletion or value modification
  • Others (?)

