vesoft-inc / nebula-flink-connector Goto Github PK
View Code? Open in Web Editor NEWFlink Connector for Nebula Graph
Flink Connector for Nebula Graph
In NebulaBatchOutputFormat.commit
there is :
numPendingRow.compareAndSet(executionOptions.getBatch(), 0);
when we set executionOptions.setBatch = 100;
But we only have 50 rows in DataSource;
numPendingRow( =50) will not euqals executionOptions.getBatch()( =100);
numPendingRow will not reset to 0;
the flush() while loop will running all the time;it will block the flink checkpoint
so I think we should change the first parameter of numPendingRow.compareAndSet
it should be:
long pendingRow=numPendingRow.get();
numPendingRow.compareAndSet(pendingRow ,0);
flink现在能否支持nebula-java2.0.0
Spilt discuss from #38
你好,我在用3.0-SNAPSHOT的nebula-flink-connector连接nebula graph 3.0.0时,报了以下错误。能问一下这是因为3.0-SNAPSHOT还不支持nebula graph 3.0.0吗?
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1046)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:449)
at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: java.lang.RuntimeException: An error occurred in NebulaSink.
at org.apache.flink.connector.nebula.sink.NebulaSinkFunction.checkErrorAndRethrow(NebulaSinkFunction.java:68)
at org.apache.flink.connector.nebula.sink.NebulaSinkFunction.invoke(NebulaSinkFunction.java:50)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:57)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:75)
at org.apache.flink.streaming.runtime.tasks.BroadcastingOutputCollector.collect(BroadcastingOutputCollector.java:32)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:191)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:204)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:174)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:412)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:199)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:637)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:601)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:765)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:580)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.IOException: get graph session error
at org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat.renewSession(NebulaBatchOutputFormat.java:125)
at org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat.open(NebulaBatchOutputFormat.java:76)
at org.apache.flink.connector.nebula.sink.NebulaSinkFunction.open(NebulaSinkFunction.java:37)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:105)
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:49)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:428)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:562)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:552)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:592)
... 3 more
Caused by: com.vesoft.nebula.client.graph.exception.NotValidConnectionException: No extra connection: All servers are broken.
at com.vesoft.nebula.client.graph.net.NebulaPool.getConnection(NebulaPool.java:237)
at com.vesoft.nebula.client.graph.net.NebulaPool.getSession(NebulaPool.java:159)
at org.apache.flink.connector.nebula.sink.NebulaBatchOutputFormat.renewSession(NebulaBatchOutputFormat.java:120)
... 13 more
Process finished with exit code -1
add delete wirte mode for nebula:
I have an upstream data source and I'd like to replicate the data to NebulaGraph in real time. In my current setup I have one dynamic table A powered by the Kafka connector with the Debezium format, which captures data change events of the upstream MySQL database. I'd like to use Flink SQL INSERT
statements to write to another dynamic table B powered by Nebula Flink connector. If everything is working, any insert/update/delete operation to dynamic table A will trigger a write to dynamic table B, where the Nebula Flink connector is responsible for understanding the change log and perform corresponding insert/update/delete operations to the underlying graph database. Does the Nebula Flink connector support such a use case?
I was looking at the source code and it seems to me that the Nebula Flink connector do supports Flink SQL, but I cannot find examples how this could be used for streaming applications. Any guidance would be much appreciated.
Thanks a lot!
default: 0, no delay.
if has value, although the record is not reach batch size, send them immediately.
add update wirte mode for nebula.
UPDATE
mode.In the case of large data, it takes a long time to export, and it is too slow to re-export if there is an outage
The NebulaDeserializationConverter
convert nebula's data to Flink data type, but we should judge if nebula data value is NULL for all Flink data type.
For example:
the flink data type is BIGINT, and the code just convert nebula data value to Long, when nebula data value is NULL, then it trigger exception below:
Caused by: com.vesoft.nebula.client.graph.exception.InvalidValueException: Cannot get field long because value's type is NULL
at com.vesoft.nebula.client.graph.data.ValueWrapper.asLong(ValueWrapper.java:332)
at org.apache.flink.connector.nebula.table.NebulaRowDataConverter.convert(NebulaRowDataConverter.java:69)
at org.apache.flink.connector.nebula.table.NebulaRowDataConverter.convert(NebulaRowDataConverter.java:39)
at org.apache.flink.connector.nebula.source.NebulaInputFormat.nextRecord(NebulaInputFormat.java:164)
at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:90)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)
<provided>
in pom.xml
so that it is not included in the jar? (BTW, do we have a plan to publish artifacts compatible with each recent Flink version? For example for the Kafka connector I can pick flink-sql-connector-kafka-1.16.0.jar
where 1.16.0 is the Flink version I need.)org.apache.flink.connector.nebula.shaded
?I've seen the above two issues taken care of in other official connectors (e.g. the Flink Elasticsearch connector). I have the corresponding code changes locally, and I'm happy to submit a PR if the idea makes sense.
In example/FlinkConnectorExample.java
and example/FlinkConnectorSourceExample.java
, the CREATE SPACE
statement in comment is
CREATE SPACE `flinkSink` (partition_num = 100, replica_factor = 3, charset = utf8, collate = utf8_bin, vid_type = INT64, atomic_edge = false) ON default
when i execute the above statement using nebula console, the result is:
The version of nebula is 3.1.0 and is deployed by Docker.
Also i can't find the corresponding parameter description on the doc.
Should we remove on default
, it seems works fine after that just as below.
(root@nebula) [(none)]> CREATE SPACE `tt` (partition_num = 100, replica_factor = 3, charset = utf8, collate = utf8_bin, vid_type = INT64, atomic_edge = false) ON default
[ERROR (-1009)]: SemanticError: Create space with zone is unsupported
Mon, 11 Jul 2022 18:26:25 CST
(root@nebula) [(none)]> CREATE SPACE `tt` (partition_num = 100, replica_factor = 3, charset = utf8, collate = utf8_bin, vid_type = INT64, atomic_edge = false)
Execution succeeded (time spent 19188/19153 us)
Mon, 11 Jul 2022 18:26:29 CST
(root@nebula) [(none)]> SHOW CREATE SPACE `tt`
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| Space | Create Space
|
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| "tt" | "CREATE SPACE `tt` (partition_num = 100, replica_factor = 3, charset = utf8, collate = utf8_bin, vid_type = INT64, atomic_edge = false) ON default_zone_172.28.2.1_9779,default_zone_172.28.2.2_9779,default_zone_172.28.2.3_9779" |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
Got 1 rows (time spent 732/1513 us)
Mon, 11 Jul 2022 18:26:46 CST
Split discussion from #38
session 过期后没有自动重新获取session,导致sink失败,希望支持自动获取正常session的逻辑。
现在使用flink sql能更快速的支持业务开发,后续是否会提供支持
as title
In function NebulaCatalog.listTables
for (TagItem tag : metaClient.getTags(graphSpace)) {
tables.add("VERTEX" + NebulaConstant.POINT + tag.tag_name);
}
The string is directly spliced to the byte array.
public byte[] tag_name;
And the listTables shows:
[VERTEX.[B@3b0143d3, EDGE.[B@7b49cea0]
General Question
For the current implementation of flink sql nebula as sink(#57), I think the table created by flink sql is a temporary table now, the table name is default_catalog.default_database.table_name
, and the table name in flink sql create statement is the same as the vertex/edge name in nebula? If that's true, how to create two tables in flink sql from different nebula graph spaces with same name?
For example, in the code bellow we can see, now the table name must equals to vertex name in nebula.
CREATE TABLE person ...
setTag(context.getObjectIdentifier().getObjectName())
Shoule we add a parameter in with clause to increase flexibility?
At the same time, I noticed the listTables
function in NebulaCatalog.java, comments are as follows: Tag and Edge, tag starts with VERTEX. and edge starts with EDGE.
, should we be compatible with this table name design if we want to use our own catalog?
as title.
Introduction
Contents
Related work
General Question
I have compiled this nebula-flink-connector (v3.5) to a jar package follows 编译 Nebula Flink Connector, and put it into [flink_server_path]/lib
.
Then, I try to execute the sql code from flink/bin/sql-client.sh
.
[In nebula], I have a schema as follows:
> desc tag user;
| Field | Type | Null | Default | Comment
+---------------+---------------------+-------+--------------+---------
| "name" | "fixed_string(100)" | "NO" | | "姓名" |
| "sex" | "int8" | "YES" | __NULL__ | "性别 0男;1女"
| "auth_flag" | "int8" | "NO" | 0 | "是否注册认证 1认证;0非认证" |
| "wx_open_id" | "fixed_string(32)" | "YES" | __NULL__ | "微信openID"
[In Flink], I tried to connect the nebula server vias the following codes:
CREATE TABLE t_user_info (
`name` VARCHAR(100),
`auth_flag` INT,
`wx_open_id` VARCHAR(32)
) WITH (
'connector' = 'nebula',
'meta-address' = '127.0.0.1:9559',
'graph-address' = '127.0.0.1:9669',
'username' = 'root',
'password' = 'nebula',
'graph-space' = 'jmt',
'label-name' = 'user'
);
However, where I tried to select the data, it raise NullPointerException
:
Flink SQL> select * from t_user_info;
[ERROR] Could not execute SQL statement. Reason:
java.lang.NullPointerException
By the way, how can I insert data into nebula? I have tried with #57. But the write-mode
doesn't support.
So, is there any guide or an examples?
eg:
how to access the vertex or edges? And whether the attribute id(vertex)
effects the results?
The maintainer doesn't have the bandwidth for this now, hopefully, anyone could help/volunteer on this.
NebulaGraph 环境
BUG 描述
Caused by: java.lang.LinkageError: loader constraint violation: loader (instance of sun/misc/Launcher$AppClassLoader) previously initiated loading for a different type with name "com/vesoft/nebula/client/graph/net/Session"
at java.lang.ClassLoader.defineClass1(Native Method) ~[?:1.8.0_211]
at java.lang.ClassLoader.defineClass(ClassLoader.java:763) ~[?:1.8.0_211]
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) ~[?:1.8.0_211]
at java.net.URLClassLoader.defineClass(URLClassLoader.java:468) ~[?:1.8.0_211]
at java.net.URLClassLoader.access$100(URLClassLoader.java:74) ~[?:1.8.0_211]
at java.net.URLClassLoader$1.run(URLClassLoader.java:369) ~[?:1.8.0_211]
at java.net.URLClassLoader$1.run(URLClassLoader.java:363) ~[?:1.8.0_211]
at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_211]
at java.net.URLClassLoader.findClass(URLClassLoader.java:362) ~[?:1.8.0_211]
at java.lang.ClassLoader.loadClass(ClassLoader.java:424) ~[?:1.8.0_211]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) ~[?:1.8.0_211]
at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ~[?:1.8.0_211]
at java.lang.Class.getDeclaredFields0(Native Method) ~[?:1.8.0_211]
at java.lang.Class.privateGetDeclaredFields(Class.java:2583) ~[?:1.8.0_211]
at java.lang.Class.getDeclaredField(Class.java:2068) ~[?:1.8.0_211]
at java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1857) ~[?:1.8.0_211]
at java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:79) ~[?:1.8.0_211]
at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:506) ~[?:1.8.0_211]
at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494) ~[?:1.8.0_211]
at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_211]
at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:494) ~[?:1.8.0_211]
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391) ~[?:1.8.0_211]
at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:490) ~[?:1.8.0_211]
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391) ~[?:1.8.0_211]
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134) ~[?:1.8.0_211]
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) ~[?:1.8.0_211]
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632) ~[flink-core-1.16.2.jar:1.16.2]
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:143) ~[flink-core-1.16.2.jar:1.16.2]
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:132) ~[flink-core-1.16.2.jar:1.16.2]
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:69) ~[flink-core-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:2308) ~[flink-streaming-java-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:202) ~[flink-streaming-java-1.16.2.jar:1.16.2]
at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1242) ~[flink-streaming-java-1.16.2.jar:1.16.2]
at com.XXX.jobs.portraits.XXXJober.initSink(XXXJober.java:82) ~[xxx-23.1114.1746.jar:?]
at com.XXX.jobs.portraits.XXXJober.arrange(XXXJober.java:42) ~[xxx-23.1114.1746.jar:?]
at com.XXX.common.launcher.IBdJober$Builder.lambda$execute$8(IBdJober.java:143) ~[xxx-23.1114.1746.jar:?]
at java.util.HashMap$EntrySet.forEach(HashMap.java:1044) ~[?:1.8.0_211]
at com.XXX.common.launcher.IBdJober$Builder.execute(IBdJober.java:137) ~[xxx-23.1114.1746.jar:?]
at com.XXX.BigDataMain.main(BigDataMain.java:31) ~[xxx-23.1114.1746.jar:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_211]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_211]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_211]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_211]
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355) ~[flink-clients-1.16.2.jar:1.16.2]
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222) ~[flink-clients-1.16.2.jar:1.16.2]
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:98) ~[flink-clients-1.16.2.jar:1.16.2]
at org.apache.flink.client.deployment.application.ApplicationDispatcherBootstrap.runApplicationEntryPoint(ApplicationDispatcherBootstrap.java:301) ~[flink-clients-1.16.2.jar:1.16.2]
... 12 more
com.vesoft.nebula.client.graph.net.Session
这个 Java 类,在 com.vesoft:nebula-flink-connector:3.5.0.jar
及其依赖的 com.vesoft:client:3.5.0.jar
中均有定义。Read data from Flink would fail in UT. The only way I can figure out is running UT in the same docker network environment. However it would cause debug codes impossible.
@Nicole00 Would you please enlighten us here?
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.