Giter Club home page Giter Club logo

nebula-flink-connector's Introduction

nebula-flink-connector

Flink Connector for Nebula Graph

GitHub stars GitHub fork

Nebula-Flink-Connector 2.0/3.0 is a connector that helps Flink users to easily access Nebula Graph 2.0/3.0. If you want to access Nebula Graph 1.x with Flink, please refer to Nebula-Flink-Connector 1.0.

Quick start

Prerequisites

To use Nebula Flink Connector, do a check of these:

Use in Maven

Add the dependency to your pom.xml.

<dependency>
    <groupId>com.vesoft</groupId>
    <artifactId>nebula-flink-connector</artifactId>
    <version>3.0-SNAPSHOT</version>
</dependency>

Example

To write data into NebulaGraph using Flink.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOptionsBuilder()
                .setGraphAddress("127.0.0.1:9669")
                .setMetaAddress("127.0.0.1:9559")
                .build();
NebulaGraphConnectionProvider graphConnectionProvider = new NebulaGraphConnectionProvider(nebulaClientOptions);
NebulaMetaConnectionProvider metaConnectionProvider = new NebulaMetaConnectionProvider(nebulaClientOptions);

VertexExecutionOptions executionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
                .setGraphSpace("flinkSink")
                .setTag("player")
                .setIdIndex(0)
                .setFields(Arrays.asList("name", "age"))
                .setPositions(Arrays.asList(1, 2))
                .setBatchSize(2)
                .build();

NebulaVertexBatchOutputFormat outputFormat = new NebulaVertexBatchOutputFormat(
                graphConnectionProvider, metaConnectionProvider, executionOptions);
NebulaSinkFunction<Row> nebulaSinkFunction = new NebulaSinkFunction<>(outputFormat);
DataStream<Row> dataStream = playerSource.map(row -> {
            Row record = new org.apache.flink.types.Row(row.size());
            for (int i = 0; i < row.size(); i++) {
                record.setField(i, row.get(i));
            }
            return record;
        });
dataStream.addSink(nebulaSinkFunction);
env.execute("write nebula")

To read data from NebulaGraph using Flink.

        NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOptionsBuilder()
                .setMetaAddress("127.0.0.1:9559")
                .build();
        storageConnectionProvider = new NebulaStorageConnectionProvider(nebulaClientOptions);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        VertexExecutionOptions vertexExecutionOptions = new VertexExecutionOptions.ExecutionOptionBuilder()
                .setGraphSpace("flinkSource")
                .setTag("person")
                .setNoColumn(false)
                .setFields(Arrays.asList())
                .setLimit(100)
                .build();
        NebulaSourceFunction sourceFunction = new NebulaSourceFunction(storageConnectionProvider)
                .setExecutionOptions(vertexExecutionOptions);
        DataStreamSource<BaseTableRow> dataStreamSource = env.addSource(sourceFunction);
        dataStreamSource.map(row -> {
            List<ValueWrapper> values = row.getValues();
            Row record = new Row(15);
            record.setField(0, values.get(0).asLong());
            record.setField(1, values.get(1).asString());
            record.setField(2, values.get(2).asString());
            record.setField(3, values.get(3).asLong());
            record.setField(4, values.get(4).asLong());
            record.setField(5, values.get(5).asLong());
            record.setField(6, values.get(6).asLong());
            record.setField(7, values.get(7).asDate());
            record.setField(8, values.get(8).asDateTime().getUTCDateTimeStr());
            record.setField(9, values.get(9).asLong());
            record.setField(10, values.get(10).asBoolean());
            record.setField(11, values.get(11).asDouble());
            record.setField(12, values.get(12).asDouble());
            record.setField(13, values.get(13).asTime().getUTCTimeStr());
            record.setField(14, values.get(14).asGeography());
            return record;
        }).print();
        env.execute("NebulaStreamSource");

To operate Schema and data using Flink SQL.

  1. create graph space
        NebulaCatalog nebulaCatalog = NebulaCatalogUtils.createNebulaCatalog(
                "NebulaCatalog",
                "default",
                "root",
                "nebula",
                "127.0.0.1:9559",
                "127.0.0.1:9669");

        EnvironmentSettings settings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                .build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        tableEnv.registerCatalog(CATALOG_NAME, nebulaCatalog);
        tableEnv.useCatalog(CATALOG_NAME);

        String createDataBase = "CREATE DATABASE IF NOT EXISTS `db1`"
                + " COMMENT 'space 1'"
                + " WITH ("
                + " 'partition_num' = '100',"
                + " 'replica_factor' = '3',"
                + " 'vid_type' = 'FIXED_STRING(10)'"
                + ")";
        tableEnv.executeSql(createDataBase);
  1. create tag
        tableEnvironment.executeSql("CREATE TABLE `person` ("
                + " vid BIGINT,"
                + " col1 STRING,"
                + " col2 STRING,"
                + " col3 BIGINT,"
                + " col4 BIGINT,"
                + " col5 BIGINT,"
                + " col6 BIGINT,"
                + " col7 DATE,"
                + " col8 TIMESTAMP,"
                + " col9 BIGINT,"
                + " col10 BOOLEAN,"
                + " col11 DOUBLE,"
                + " col12 DOUBLE,"
                + " col13 TIME,"
                + " col14 STRING"
                + ") WITH ("
                + " 'connector' = 'nebula',"
                + " 'meta-address' = '127.0.0.1:9559',"
                + " 'graph-address' = '127.0.0.1:9669',"
                + " 'username' = 'root',"
                + " 'password' = 'nebula',"
                + " 'data-type' = 'vertex',"
                + " 'graph-space' = 'flink_test',"
                + " 'label-name' = 'person'"
                + ")"
        );
  1. create edge
        tableEnvironment.executeSql("CREATE TABLE `friend` ("
                + " sid BIGINT,"
                + " did BIGINT,"
                + " rid BIGINT,"
                + " col1 STRING,"
                + " col2 STRING,"
                + " col3 BIGINT,"
                + " col4 BIGINT,"
                + " col5 BIGINT,"
                + " col6 BIGINT,"
                + " col7 DATE,"
                + " col8 TIMESTAMP,"
                + " col9 BIGINT,"
                + " col10 BOOLEAN,"
                + " col11 DOUBLE,"
                + " col12 DOUBLE,"
                + " col13 TIME,"
                + " col14 STRING"
                + ") WITH ("
                + " 'connector' = 'nebula',"
                + " 'meta-address' = '127.0.0.1:9559',"
                + " 'graph-address' = '127.0.0.1:9669',"
                + " 'username' = 'root',"
                + " 'password' = 'nebula',"
                + " 'graph-space' = 'flink_test',"
                + " 'label-name' = 'friend',"
                + " 'data-type'='edge',"
                + " 'src-id-index'='0',"
                + " 'dst-id-index'='1',"
                + " 'rank-id-index'='2'"
                + ")"
        );
  1. query edge data and insert into another edge type
        Table table = tableEnvironment.sqlQuery("SELECT * FROM `friend`");
        table.executeInsert("`friend_sink`").await();

Version match

There are the version correspondence between Nebula Flink Connector and Nebula:

Nebula Flink Connector Version Nebula Version
2.0.0 2.0.0, 2.0.1
2.5.0 2.5.0, 2.5.1
2.6.0 2.6.0, 2.6.1
2.6.1 2.6.0, 2.6.1
3.0.0 3.x.x
3.3.0 3.x.x
3.5.0 3.x.x
3.0-SNAPSHOT nightly

Note

Flink version requirements: 1.11.x

nebula-flink-connector's People

Contributors

dutor avatar harrischu avatar linhr avatar liuxiaocs7 avatar nicole00 avatar sophie-xie avatar spike-liu avatar strangerofdawah avatar tonyandfriday avatar whitewum avatar yixinglu avatar ystaticy avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

nebula-flink-connector's Issues

flink-connector: Support update write mode

add update wirte mode for nebula.

  1. Add WriteMode config for executionOptions.
  2. The WriteMode should be a enum class, and it contains UPDATE mode.
  3. Construct batch update ngql for UPDATE mode.

NebulaBatchOutputFormat.commit is not return when source data amount is small

In NebulaBatchOutputFormat.commit
there is :

numPendingRow.compareAndSet(executionOptions.getBatch(), 0);

when we set executionOptions.setBatch = 100;
But we only have 50 rows in DataSource;

numPendingRow( =50) will not euqals executionOptions.getBatch()( =100);
numPendingRow will not reset to 0;

the flush() while loop will running all the time;it will block the flink checkpoint

so I think we should change the first parameter of numPendingRow.compareAndSet

it should be:

long pendingRow=numPendingRow.get();
numPendingRow.compareAndSet(pendingRow ,0);

maven-shade-plugin重命名org.slf4j,这会导致日志输出失效

Please check the FAQ documentation before raising an issue

Describe the bug (required)
由于从重命名org.slf4j包,会导致日志不输出,不便于排错

image

image

Your Environments (required)

  • OS: Centos
  • Commit id:3.5

How To Reproduce(required)

Steps to reproduce the behavior:

Expected behavior

Additional context

Session not existed!

session 过期后没有自动重新获取session,导致sink失败,希望支持自动获取正常session的逻辑。

CREATE SPACE error in example comment

In example/FlinkConnectorExample.java and example/FlinkConnectorSourceExample.java, the CREATE SPACE statement in comment is

CREATE SPACE `flinkSink` (partition_num = 100, replica_factor = 3, charset = utf8, collate = utf8_bin, vid_type = INT64, atomic_edge = false) ON default

when i execute the above statement using nebula console, the result is:

Snipaste_2022-07-05_13-03-32

The version of nebula is 3.1.0 and is deployed by Docker.
Also i can't find the corresponding parameter description on the doc.

Should we remove on default, it seems works fine after that just as below.

(root@nebula) [(none)]> CREATE SPACE `tt` (partition_num = 100, replica_factor = 3, charset = utf8, collate = utf8_bin, vid_type = INT64, atomic_edge = false) ON default
[ERROR (-1009)]: SemanticError: Create space with zone is unsupported

Mon, 11 Jul 2022 18:26:25 CST

(root@nebula) [(none)]> CREATE SPACE `tt` (partition_num = 100, replica_factor = 3, charset = utf8, collate = utf8_bin, vid_type = INT64, atomic_edge = false)
Execution succeeded (time spent 19188/19153 us)

Mon, 11 Jul 2022 18:26:29 CST

(root@nebula) [(none)]> SHOW CREATE SPACE `tt`
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Space | Create Space
                                                          |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| "tt"  | "CREATE SPACE `tt` (partition_num = 100, replica_factor = 3, charset = utf8, collate = utf8_bin, vid_type = INT64, atomic_edge = false) ON default_zone_172.28.2.1_9779,default_zone_172.28.2.2_9779,default_zone_172.28.2.3_9779" |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Got 1 rows (time spent 732/1513 us)

Mon, 11 Jul 2022 18:26:46 CST

deserialize bug for nebula NULL data

The NebulaDeserializationConverter convert nebula's data to Flink data type, but we should judge if nebula data value is NULL for all Flink data type.

For example:
the flink data type is BIGINT, and the code just convert nebula data value to Long, when nebula data value is NULL, then it trigger exception below:

Caused by: com.vesoft.nebula.client.graph.exception.InvalidValueException: Cannot get field long because value's type is NULL
	at com.vesoft.nebula.client.graph.data.ValueWrapper.asLong(ValueWrapper.java:332)
	at org.apache.flink.connector.nebula.table.NebulaRowDataConverter.convert(NebulaRowDataConverter.java:69)
	at org.apache.flink.connector.nebula.table.NebulaRowDataConverter.convert(NebulaRowDataConverter.java:39)
	at org.apache.flink.connector.nebula.source.NebulaInputFormat.nextRecord(NebulaInputFormat.java:164)
	at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)

image

Is there is any example or a guide for flink sql?

General Question

I have compiled this nebula-flink-connector (v3.5) to a jar package follows 编译 Nebula Flink Connector, and put it into [flink_server_path]/lib.
Then, I try to execute the sql code from flink/bin/sql-client.sh.

[In nebula], I have a schema as follows:

> desc tag user;
| Field         | Type                | Null  | Default      | Comment 
+---------------+---------------------+-------+--------------+---------
| "name"        | "fixed_string(100)" | "NO"  |           | "姓名"          |
| "sex"            | "int8"           | "YES" | __NULL__  | "性别 0男;1女"
| "auth_flag"   | "int8"           | "NO"      | 0         | "是否注册认证 1认证;0非认证"  |
| "wx_open_id"  | "fixed_string(32)"  | "YES" | __NULL__ | "微信openID"

[In Flink], I tried to connect the nebula server vias the following codes:

CREATE TABLE t_user_info (
	`name` VARCHAR(100),
	`auth_flag` INT,
	`wx_open_id` VARCHAR(32)
  ) WITH (
    'connector' = 'nebula',
    'meta-address' = '127.0.0.1:9559',
    'graph-address' = '127.0.0.1:9669',
    'username' = 'root',
    'password' = 'nebula',
    'graph-space' = 'jmt',
    'label-name' = 'user'
  );

However, where I tried to select the data, it raise NullPointerException:

Flink SQL> select * from  t_user_info; 
[ERROR] Could not execute SQL statement. Reason:
java.lang.NullPointerException

By the way, how can I insert data into nebula? I have tried with #57. But the write-mode doesn't support.

So, is there any guide or an examples?

eg:

how to access the vertex or edges? And whether the attribute id(vertex) effects the results?

Local UT would fail caused by IP used by Nebula JAVA client is not accessible

  • Nebula Java Client would get storage server IP which is defined in docker network environment.
  • Local development is isolated from docker network environment.

Read data from Flink would fail in UT. The only way I can figure out is running UT in the same docker network environment. However it would cause debug codes impossible.

@Nicole00 Would you please enlighten us here?

3.0-SNAPSHOT 支持 nebula graph 3.0.0吗?

你好,我在用3.0-SNAPSHOT的nebula-flink-connector连接nebula graph 3.0.0时,报了以下错误。能问一下这是因为3.0-SNAPSHOT还不支持nebula graph 3.0.0吗?
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:449)
at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: java.lang.RuntimeException: An error occurred in NebulaSink.
at org.apache.flink.connector.nebula.sink.NebulaSinkFunction.checkErrorAndRethrow(NebulaSinkFunction.java:68)
at org.apache.flink.connector.nebula.sink.NebulaSinkFunction.invoke(NebulaSinkFunction.java:50)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75)
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:412)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:199)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:637)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:601)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:765)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:580)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.IOException: get graph session error
at org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat.renewSession(NebulaBatchOutputFormat.java:125)
at org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat.open(NebulaBatchOutputFormat.java:76)
at org.apache.flink.connector.nebula.sink.NebulaSinkFunction.open(NebulaSinkFunction.java:37)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:105)
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:49)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:428)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:562)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:552)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:592)
... 3 more
Caused by: com.vesoft.nebula.client.graph.exception.NotValidConnectionException: No extra connection: All servers are broken.
at com.vesoft.nebula.client.graph.net.NebulaPool.getConnection(NebulaPool.java:237)
at com.vesoft.nebula.client.graph.net.NebulaPool.getSession(NebulaPool.java:159)
at org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat.renewSession(NebulaBatchOutputFormat.java:120)
... 13 more

Process finished with exit code -1

Remove the Flink library and shade dependencies from the fat jar

  1. I noticed that the Nebula Flink connector library available in the Maven central repository is large (over 100 MB). It seems that the flat jar contains the entire Flink library as dependencies. Shall we mark the Flink dependencies as <provided> in pom.xml so that it is not included in the jar? (BTW, do we have a plan to publish artifacts compatible with each recent Flink version? For example for the Kafka connector I can pick flink-sql-connector-kafka-1.16.0.jar where 1.16.0 is the Flink version I need.)
  2. It seems that third-party dependencies are not shaded in the flat jar, so there may be version conflicts with other libraries at runtime. Shall we shade all third-party dependencies under e.g. org.apache.flink.connector.nebula.shaded?

I've seen the above two issues taken care of in other official connectors (e.g. the Flink Elasticsearch connector). I have the corresponding code changes locally, and I'm happy to submit a PR if the idea makes sense.

Full insert/update/delete support for dynamic table sinks

I have an upstream data source and I'd like to replicate the data to NebulaGraph in real time. In my current setup I have one dynamic table A powered by the Kafka connector with the Debezium format, which captures data change events of the upstream MySQL database. I'd like to use Flink SQL INSERT statements to write to another dynamic table B powered by Nebula Flink connector. If everything is working, any insert/update/delete operation to dynamic table A will trigger a write to dynamic table B, where the Nebula Flink connector is responsible for understanding the change log and perform corresponding insert/update/delete operations to the underlying graph database. Does the Nebula Flink connector support such a use case?

I was looking at the source code and it seems to me that the Nebula Flink connector do supports Flink SQL, but I cannot find examples how this could be used for streaming applications. Any guidance would be much appreciated.

Thanks a lot!

Add Support for Flink 1.13

The maintainer doesn't have the bandwidth for this now, hopefully, anyone could help/volunteer on this.

List table names shows address instead of strings

In function NebulaCatalog.listTables

for (TagItem tag : metaClient.getTags(graphSpace)) {
    tables.add("VERTEX" + NebulaConstant.POINT + tag.tag_name);
}

The string is directly spliced to the byte array.

public byte[] tag_name;

And the listTables shows:

[VERTEX.[B@3b0143d3, EDGE.[B@7b49cea0]

table name design for Flink SQL Connector

For the current implementation of flink sql nebula as sink(#57), I think the table created by flink sql is a temporary table now, the table name is default_catalog.default_database.table_name, and the table name in flink sql create statement is the same as the vertex/edge name in nebula? If that's true, how to create two tables in flink sql from different nebula graph spaces with same name?

For example, in the code bellow we can see, now the table name must equals to vertex name in nebula.

  • create statement: CREATE TABLE person ...
  • nebula execution options setTag(context.getObjectIdentifier().getObjectName())

Shoule we add a parameter in with clause to increase flexibility?

At the same time, I noticed the listTables function in NebulaCatalog.java, comments are as follows: Tag and Edge, tag starts with VERTEX. and edge starts with EDGE., should we be compatible with this table name design if we want to use our own catalog?

The Flink job using nebula-flink-connector-3.5.0 cannot run after being submitted to Yarn due to class name conflicts.

NebulaGraph 环境

  • 版本 v3.5.0
  • 操作系统 CentOS7.6
  • Hadoop 版本 v3.3.4
  • JDK 版本 v1.8

BUG 描述

  • 现象:nebula-flink-connector-3.5.0 的 Flink 作业,提交到 Yarn 之后无法运行,报错信息如下:
Caused by: java.lang.LinkageError: loader constraint violation: loader (instance of sun/misc/Launcher$AppClassLoader) previously initiated loading for a different type with name "com/vesoft/nebula/client/graph/net/Session"
	at java.lang.ClassLoader.defineClass1(Native Method) ~[?:1.8.0_211]
	at java.lang.ClassLoader.defineClass(ClassLoader.java:763) ~[?:1.8.0_211]
	at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) ~[?:1.8.0_211]
	at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) ~[?:1.8.0_211]
	at java.net.URLClassLoader.access$100(URLClassLoader.java:74) ~[?:1.8.0_211]
	at java.net.URLClassLoader$1.run(URLClassLoader.java:369) ~[?:1.8.0_211]
	at java.net.URLClassLoader$1.run(URLClassLoader.java:363) ~[?:1.8.0_211]
	at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_211]
	at java.net.URLClassLoader.findClass(URLClassLoader.java:362) ~[?:1.8.0_211]
	at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_211]
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) ~[?:1.8.0_211]
	at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_211]
	at java.lang.Class.getDeclaredFields0(Native Method) ~[?:1.8.0_211]
	at java.lang.Class.privateGetDeclaredFields(Class.java:2583) ~[?:1.8.0_211]
	at java.lang.Class.getDeclaredField(Class.java:2068) ~[?:1.8.0_211]
	at java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1857) ~[?:1.8.0_211]
	at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:79) ~[?:1.8.0_211]
	at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:506) ~[?:1.8.0_211]
	at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494) ~[?:1.8.0_211]
	at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_211]
	at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:494) ~[?:1.8.0_211]
	at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391) ~[?:1.8.0_211]
	at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:490) ~[?:1.8.0_211]
	at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391) ~[?:1.8.0_211]
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134) ~[?:1.8.0_211]
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) ~[?:1.8.0_211]
	at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632) ~[flink-core-1.16.2.jar:1.16.2]
	at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:143) ~[flink-core-1.16.2.jar:1.16.2]
	at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132) ~[flink-core-1.16.2.jar:1.16.2]
	at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69) ~[flink-core-1.16.2.jar:1.16.2]
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2308) ~[flink-streaming-java-1.16.2.jar:1.16.2]
	at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:202) ~[flink-streaming-java-1.16.2.jar:1.16.2]
	at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1242) ~[flink-streaming-java-1.16.2.jar:1.16.2]
	at com.XXX.jobs.portraits.XXXJober.initSink(XXXJober.java:82) ~[xxx-23.1114.1746.jar:?]
	at com.XXX.jobs.portraits.XXXJober.arrange(XXXJober.java:42) ~[xxx-23.1114.1746.jar:?]
	at com.XXX.common.launcher.IBdJober$Builder.lambda$execute$8(IBdJober.java:143) ~[xxx-23.1114.1746.jar:?]
	at java.util.HashMap$EntrySet.forEach(HashMap.java:1044) ~[?:1.8.0_211]
	at com.XXX.common.launcher.IBdJober$Builder.execute(IBdJober.java:137) ~[xxx-23.1114.1746.jar:?]
	at com.XXX.BigDataMain.main(BigDataMain.java:31) ~[xxx-23.1114.1746.jar:?]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_211]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_211]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_211]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_211]
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-clients-1.16.2.jar:1.16.2]
	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-clients-1.16.2.jar:1.16.2]
	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98) ~[flink-clients-1.16.2.jar:1.16.2]
	at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-clients-1.16.2.jar:1.16.2]
	... 12 more
  • 初步发现的原因:
    官方的 com.vesoft.nebula.client.graph.net.Session 这个 Java 类,在 com.vesoft:nebula-flink-connector:3.5.0.jar 及其依赖的 com.vesoft:client:3.5.0.jar 中均有定义。

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.