ververica / flink-sql-gateway Goto Github PK
View Code? Open in Web Editor NEWLicense: Apache License 2.0
License: Apache License 2.0
Exception Msg:
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.CatalogFactory' in
Reason for error :
ExecutionContext.createCatalog and FactoryUtil.createCatalog are implemented differently.
It works fine after I replace ExecutionContext.createCatalog with FactoryUtil.createCatalog.
Hi,
I'm running the following setup:
Hue: 4.9
Flink: 1.12
Flink Sql Gateway: compiled from source based on https://github.com/ververica/flink-sql-gateway/releases/tag/flink-1.12.0
Flink running as a Yarn Session using below command:
flink-yarn-session -n 2 -s 4 -tm 16GB -d
When I submit a sql query from Hue I'm able to see the job running under Flink Dashboard and also shows correct records sent in output back to sql gateway sink but then nothing appears on Hue UI.
I'm running this without passing execution.target: yarn-per-job parameter in $FLINK_HOME/conf/flink-conf.yaml
If I pass that parameter and start flink sql gateway and submit queries I get the following error:
There was a mention about passing application id of flink job on Yarn in flink-conf.yaml but didn't see it documented anywhere. Could you share the details if that is causing this issue or is it something else?
21:55:24.555 [flink-rest-server-netty-worker-thread-1] ERROR com.ververica.flink.table.gateway.operation.SelectOperation - Session: 678f4ec903c4b996b7fe6eadacd3a06e. Error running SQL job.
java.lang.RuntimeException: Could not execute program.
at com.ververica.flink.table.gateway.deployment.ProgramDeployer.deploy(ProgramDeployer.java:86) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:246) [flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.operation.SelectOperation.execute(SelectOperation.java:87) [flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.rest.session.Session.runStatement(Session.java:106) [flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler.handleRequest(StatementExecuteHandler.java:81) [flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:77) [flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:178) [flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:75) [flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:208) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [flink-dist_2.11-1.12.1.jar:1.12.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
Caused by: java.lang.IllegalStateException
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:177) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:72) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at com.ververica.flink.table.gateway.deployment.ProgramDeployer.deploy(ProgramDeployer.java:84) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
... 47 more
21:55:24.559 [flink-rest-server-netty-worker-thread-1] ERROR com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler - Unhandled exception.
java.lang.RuntimeException: Error running SQL job.
at com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:249) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.operation.SelectOperation.execute(SelectOperation.java:87) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.rest.session.Session.runStatement(Session.java:106) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler.handleRequest(StatementExecuteHandler.java:81) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:77) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:178) [flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:75) [flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:208) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [flink-dist_2.11-1.12.1.jar:1.12.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
Caused by: java.lang.RuntimeException: Could not execute program.
at com.ververica.flink.table.gateway.deployment.ProgramDeployer.deploy(ProgramDeployer.java:86) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:246) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
... 46 more
Caused by: java.lang.IllegalStateException
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:177) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:72) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at com.ververica.flink.table.gateway.deployment.ProgramDeployer.deploy(ProgramDeployer.java:84) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:246) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
Add trigger savepoint handler to support user trigger savepoint,Also store the location of the savepoint so that it can be restored from the savepoint next time
Investigate why this statement works
SELECT * FROM `mypg`.`postgres`.`public.link`
but this does not:
DESCRIBE `mypg`.`postgres`.`public.link`
i want run it on hive3.X
RT
ERROR com.ververica.flink.table.gateway.rest.session.SessionManager - Session: 65c5b8be4e2e2352a537130dd1029240 does not exist.
ERROR com.ververica.flink.table.gateway.rest.handler.SessionHeartbeatHandler - Exception occurred in REST handler: Session: 65c5b8be4e2e2352a537130dd1029240 does not exist.
ERROR com.ververica.flink.table.gateway.rest.session.SessionManager - Session: 65c5b8be4e2e2352a537130dd1029240 does not exist.
ERROR com.ververica.flink.table.gateway.rest.handler.SessionHeartbeatHandler - Exception occurred in REST handler: Session: 65c5b8be4e2e2352a537130dd1029240 does not exist.
ERROR com.ververica.flink.table.gateway.rest.session.Session - Session: 705917caa407a1c237e3f5fb9171c722, Failed to parse statement: USE impala::sddl_ext.c_cons
ERROR com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler - Exception occurred in REST handler: Failed to parse statement.
Currently, only the application_id parameter of the configuration file can be read when starting the sqlgateway server, which causes all users to use the same yarn-session, which causes a single application_id to carry too many jobs, and it is inconvenient to distinguish and manage different users. between jobs.I have already implemented this part of the function, I don't know when the community will discuss it sometimes
Currently, sql gateway can not support write the olap query result to hive, but to return the result to sql gateway. sometimes user want to persist the query result to hive table for later query.
Sorry if I missed it, but I only found https://docs.google.com/document/d/1DKpFdov1o_ObvrCmU-5xi-VrT6nR2gxq-BbswSSI9j8/edit#.
For example the Livy docs are pretty handy:
catalogs:
the error below happens,help please.
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.CatalogFactory' in
the classpath.
Reason: Required context properties mismatch.
The following properties are requested:
hive-conf-dir=/opt/software/flink-sql-gateway-0.1-SNAPSHOT/conf
type=hive
The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
at com.ververica.flink.table.gateway.context.ExecutionContext.createCatalog(ExecutionContext.java:364)
at com.ververica.flink.table.gateway.context.ExecutionContext.lambda$null$4(ExecutionContext.java:543)
at java.util.HashMap.forEach(HashMap.java:1288)
at com.ververica.flink.table.gateway.context.ExecutionContext.lambda$initializeCatalogs$5(ExecutionContext.java:542)
at com.ververica.flink.table.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:239)
at com.ververica.flink.table.gateway.context.ExecutionContext.initializeCatalogs(ExecutionContext.java:541)
at com.ververica.flink.table.gateway.context.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:490)
at com.ververica.flink.table.gateway.context.ExecutionContext.(ExecutionContext.java:159)
at com.ververica.flink.table.gateway.context.ExecutionContext.(ExecutionContext.java:118)
at com.ververica.flink.table.gateway.context.ExecutionContext$Builder.build(ExecutionContext.java:751)
... 46 more
This is super useful but is supported only in Flink >= 1.11
failed to submit a sql job for kafka stream table, exception: Cannot generate a valid execution plan for the given query. but i tried the same sql in Flink Client, it works well.
When executing this query, we get an error from the gateway:
SELECT area, SUM(psgSum)/24.0 AS avgPsgLeaving FROM (SELECT toAreaId(lon, lat) AS area, TUMBLE_END(rideTime ,INTERVAL '1' HOUR) AS t, SUM(psgCnt) AS psgSum FROM Rides WHERE isStart AND isInNYC(lon, lat) GROUP BY toAreaId(lon, lat), TUMBLE(rideTime, INTERVAL '1' HOUR)) GROUP BY area;
['Internal server error.', "<Exception on server side:\ncom.ververica.flink.table.gateway.SqlExecutionException: Invalid SQL query.\n\tat com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:253)\n\tat com.ververica.flink.table.gateway.operation.SelectOperation.execute(SelectOperation.java:89)\n\tat com.ververica.flink.table.gateway.Session.r..... .... org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.flink.table.api.ValidationException: Type DECIMAL(17, 6) of table field 'avgPsgLeaving' does not match with the physical type LEGACY('DECIMAL', 'DECIMAL') of the 'avgPsgLeaving' field of the TableSink consumed type.\n\tat org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164)\n\tat ....
Now it is spported by Flink: https://issues.apache.org/jira/browse/FLINK-17357.
Another question: why do we have our own parser? Can't we just use the Flink SQL client to submit SQL to Flink (and thus reuse all the parsing logic)?
I recall seeing a backlog item in Apache Livy JIRA about being able to use their API for more than just Spark.
Any chance y'all could put that on the roadmap?
Caused by: java.lang.RuntimeException: Could not execute program. at com.ververica.flink.table.gateway.deployment.ProgramDeployer.deploy(ProgramDeployer.java:83) ~[flink-sql-gateway-0.2-SNAPSHOT.jar:?] at com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:247) ~[flink-sql-gateway-0.2-SNAPSHOT.jar:?] ... 46 more Caused by: java.lang.IllegalStateException at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:61) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at com.ververica.flink.table.gateway.deployment.ProgramDeployer.deploy(ProgramDeployer.java:81) ~[flink-sql-gateway-0.2-SNAPSHOT.jar:?] at com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:247) ~[flink-sql-gateway-0.2-SNAPSHOT.jar:?]
The program will thorw exception if we don't set yarn.application.id
option in $FLINK_HOME/conf/flink-conf.yaml.
From README
Flink SQL gateway stores the session properties in memory now. If the service is stopped or crashed, all properties are lost.
Do you have any workaround or suggestions for this case to get previous running job's status or cancel it ? Thanks in advance.
Just another one: "dropping a table with a Watermark" column fails:
"Could not drop a table from statement"
"Caused by: org.apache.flink.table.api.TableException: Watermark can not be defined for a processing time attribute column."
DROP table hue_logs
log STRING,
proctime AS PROCTIME(),
WATERMARK FOR proctime AS proctime - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'hue_logs',
'properties.bootstrap.servers' = 'kafka:9094',
'format' = 'json'
);```
```10:25:48.057 [flink-rest-server-netty-worker-thread-1] ERROR com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler - Unhandled exception.
com.ververica.flink.table.gateway.utils.SqlExecutionException: Could not drop a table from statement: DROP table hue_logs
.
at com.ververica.flink.table.gateway.operation.DDLOperation.execute(DDLOperation.java:54) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.rest.session.Session.runStatement(Session.java:106) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler.handleRequest(StatementExecuteHandler.java:81) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:77) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:178) [flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:75) [flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:110) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:89) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:54) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:177) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [flink-dist_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [flink-dist_2.11-1.12.0.jar:1.12.0]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
Caused by: org.apache.flink.table.api.TableException: Watermark can not be defined for a processing time attribute column.
at org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolve(CatalogTableSchemaResolver.java:90) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.table.catalog.CatalogManager.resolveTableSchema(CatalogManager.java:380) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:408) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.table.catalog.CatalogManager.dropTableInternal(CatalogManager.java:760) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.table.catalog.CatalogManager.dropTable(CatalogManager.java:725) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:751) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
at com.ververica.flink.table.gateway.operation.DDLOperation.lambda$execute$0(DDLOperation.java:49) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:170) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.operation.DDLOperation.execute(DDLOperation.java:48) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
... 45 more
ERROR com.ververica.flink.table.gateway.rest.session.Session - Session: 705917caa407a1c237e3f5fb9171c722, Failed to parse statement: USE impala::sddl_ext.c_cons
ERROR com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler - Exception occurred in REST handler: Failed to parse statement.
Hi all,
We can set HADOOP_USER_NAME for each job when to submit job in yarn per-job mode via flink command. But how to specify different user for each job via REST API in yarn per-job mode? Thanks.
There are some new features in Flink 1.13, such as BEGIN STATEMENTSET
.
Shall we merge those features into flink sql gateway
?
SQL example:
CREATE VIEW v_inout_flow AS
SELECT tb.park_code,
tb.park_name,
tb.city_code,
tb.city_name,
ta.status,
ta.event_time
FROM ods_inout_flow AS ta
JOIN dim_park FOR SYSTEM_TIME AS OF ta.proctime AS tb
ON ta.org_code = tb.org_code;
The exception is:
Caused by: org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "`dim_park`" at line 3, column 20.
Was expecting one of:
"TABLE" ...
"(" ...
at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:64)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
at com.ververica.flink.table.gateway.operation.CreateViewOperation.lambda$execute$0(CreateViewOperation.java:57)
at com.ververica.flink.table.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:204)
at com.ververica.flink.table.gateway.operation.CreateViewOperation.execute(CreateViewOperation.java:56)
... 45 more
Hello,
Currently jobs keep running even after reaching the SQL LIMIT. What would be a way to detect the end of a query? (when all the results have been sent)
SELECT * FROM default_database.Fares LIMIT 3
{"results":[{"columns":[{"name":"rideId","type":"BIGINT"},{"name":"payTime","type":"TIMESTAMP(3)"},{"name":"payMethod","type":"STRING"},{"name":"tip","type":"FLOAT"},{"name":"toll","type":"FLOAT"},{"name":"fare","type":"FLOAT"}],"data":[[65,"2013-01-01T00:00:36","CSH",0.0,0.0,3.5],[137,"2013-01-01T00:01:00","CSH",0.0,0.0,3.5],[77,"2013-01-01T00:01:22","CSH",0.0,0.0,4.0]],"change_flags":[true,true,true]}],"next_result_uri":"/v1/sessions/0e3930af55b0a3730ad605c18166e9d7/jobs/6baf6aa22f35c1dae4a07d5f762d3019/result/2"}`
{"status":"RUNNING"}`
and next call to fetch results have data":[]
(which makes sense)
Thanks!
$FLINK_HOME/conf/flink-conf.yaml
execution.target: yarn-session
An exception occurred when executing the SELECT script using SQL-Gateway
-INFO-2021/01/13 20:23:09,166-org.apache.flink.yarn.YarnClusterDescriptor.getLocalFlinkDistPath(YarnClusterDescriptor.java:196)-196-No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
-ERROR-2021/01/13 20:23:09,176-com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:249)-249-Session: 2c436c304a0b7a697c1e005ad34c32bc. Error running SQL job.
java.lang.RuntimeException: Could not execute program.
at com.ververica.flink.table.gateway.deployment.ProgramDeployer.deploy(ProgramDeployer.java:83)
at com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:247)
at com.ververica.flink.table.gateway.operation.SelectOperation.execute(SelectOperation.java:87)
at com.ververica.flink.table.gateway.rest.session.Session.runStatement(Session.java:106)
at com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler.handleRequest(StatementExecuteHandler.java:81)
at com.ververica.flink.table.gateway.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:77)
at com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:178)
at com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:75)
at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:110)
at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:89)
at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:54)
at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:174)
at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:68)
at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1421)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:61)
at com.ververica.flink.table.gateway.deployment.ProgramDeployer.deploy(ProgramDeployer.java:81)
... 47 more
-ERROR-2021/01/13 20:23:09,183-com.ververica.flink.table.gateway.rest.handler.AbstractHandler.handleException(AbstractHandler.java:224)-224-Unhandled exception.
java.lang.RuntimeException: Error running SQL job.
at com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:250)
at com.ververica.flink.table.gateway.operation.SelectOperation.execute(SelectOperation.java:87)
at com.ververica.flink.table.gateway.rest.session.Session.runStatement(Session.java:106)
at com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler.handleRequest(StatementExecuteHandler.java:81)
at com.ververica.flink.table.gateway.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:77)
at com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:178)
at com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:75)
at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:110)
at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:89)
at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:54)
at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:174)
at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:68)
at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1421)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Could not execute program.
at com.ververica.flink.table.gateway.deployment.ProgramDeployer.deploy(ProgramDeployer.java:83)
at com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:247)
... 46 more
Caused by: java.lang.IllegalStateException
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:61)
at com.ververica.flink.table.gateway.deployment.ProgramDeployer.deploy(ProgramDeployer.java:81)
... 47 more
Ask timed out on [Actor[akka.tcp://flink@jssz-bigdata-datanode-1316:15841/user/taskmanager_0#-1168369699]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation].
the flink job throws a exception like this.if i add some limit in the query sentence, no exception will be found.
I have also see the akka.framesize related errors.
can sql-gateway support submit job through 'select * from xx_view'
sql statement is :
CREATE TABLE trade_datasource (
userId VARCHAR,
) WITH (
'connector.type' = 'kafka',
xx
);
CREATE TABLE pvuv_sink (
user_id VARCHAR,
) WITH (
'connector.type' = 'jdbc',
xx
);
--create VIEW from source table
create view trade_datasource_view as
select *
from trade_datasource
where userId> 100;
--submit job through VIEW
insert into pvuv_sink
select * from trade_datasource_view;
I got the following error:
020-04-23 03:15:01,929 INFO org.apache.flink.table.module.ModuleManager - Got FunctionDefinition proctime from module core
2020-04-23 03:15:01,930 ERROR com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler - Unhandled exception.
org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76)
at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
at com.ververica.flink.table.gateway.operation.CreateViewOperation.execute(CreateViewOperation.java:55)
at com.ververica.flink.table.gateway.Session.runStatement(Session.java:84)
at com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler.handleRequest(StatementExecuteHandler.java:81)
at com.ververica.flink.table.gateway.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:77)
at com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:178)
at com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:75)
at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:110)
at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:89)
at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:54)
at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:174)
at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:68)
at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1421)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Reason: Required context properties mismatch.
The following properties are requested:
connector.properties.bootstrap.servers=localhost:9092
connector.properties.zookeeper.connect=noneed
connector.startup-mode=latest-offset
....
schema.watermark.0.rowtime=requestDateTime
schema.watermark.0.strategy.data-type=TIMESTAMP(6)
schema.watermark.0.strategy.expr=requestDateTime
- INTERVAL '5' SECOND
The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52)
... 66 more
2020-04-23 03:31:01,559 INFO com.ververica.flink.table.gateway.SessionManager - Start to remove expired session, current session count: 8
2020-04-23 03:31:01,560 INFO com.ververica.flink.table.gateway.SessionManager - Session: ddcff4b29d51f04a01cc0aa85e07a953 is expired, close it...
2020-04-23 03:31:01,560 INFO com.ververica.flink.table.gateway.SessionManager - Session: ddcff4b29d51f04a01cc0aa85e07a953 is closed.
Currently, flink-sql-gateway can only submit job with a specified executor which defined in flink-conf.yaml。But if someone wants to submit a job to yarn-session, and others want to do it with standalone cluster, then flink-sql-gateway can't satisfy this right now. So could flink-sql-gateway support this feature future。
And I also has a roughly idea that how flink-sql-gateway supports this feature. we can introduce a new component which I called ExecutionManager
.ExecutionManager
is like SessionManager
, but it's main function is to creating a new Execution
, each Execution
represents an deploy mode(yarn-seesion, yarn-per-job, standalone, local, etc). The Execution
will create a new SessionManager
and will use user specified execution.target
to instance a DefaultContext
. By this way, every new session
created by SessionManager
can submit job to different cluster.
Furthermore, flink-sql-gateway do not need to create SessionManager
at start time but instead of ExecutionManager
, and the SessionManager
will be instance by ExecutionManager
.
And, we'll must implement a new netty handler that used to create/delete/get Execution
.
Typically DB APIs provide a way to fetch only N max rows in one call.
This helps avoid sending way too many rows to the clients.
Bit related to #49
Through the rest api "http://127.0.0.1:8083/v1/sessions/ "+ sessionId +"/statements" to submit stream task failed.
submit sql stream task to sql-gateway failed through rest api "http://127.0.0.1:8083/v1/sessions/ "+ sessionId +"/statements"
Steps:
The request body is:
{
"session_name": "test_session",
"planner": "blink",
"execution_type": "streaming"
}
CREATE TABLE trade_datasource (
userId VARCHAR,
itemId VARCHAR,
categoryId VARCHAR,
behavior VARCHAR,
timestamp
BIGINT
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'user_trade',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'format.type' = 'json'
);
CREATE TABLE pvuv_sink (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts
BIGINT
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://127.0.0.1:3306/flink_sql_platform',
'connector.table' = 'pvuv_sink',
'connector.username' = 'root',
'connector.password' = '123456',
'connector.write.flush.max-rows' = '1'
);
INSERT INTO pvuv_sink
SELECT
userId as user_id,
itemId as item_id,
categoryId as category_id,
behavior,
timestamp
as ts
FROM trade_datasource where behavior = 'pv';
These three statements are executed correctly in Flink SQL client embedded. Execute one SQL at a time, with three requests. The response to the request is as follows. What is the problem?
{"statement":"CREATE TABLE trade_datasource (\n\tuserId VARCHAR,\n itemId VARCHAR,\n categoryId VARCHAR,\n behavior VARCHAR,\n timestamp
BIGINT\n) WITH (\n 'connector.type' = 'kafka', \n 'connector.version' = 'universal', \n 'connector.topic' = 'user_trade', \n 'connector.startup-mode' = 'earliest-offset', \n 'connector.properties.zookeeper.connect' = 'localhost:2181', \n 'connector.properties.bootstrap.servers' = 'localhost:9092', \n 'format.type' = 'json'\n)"}
{"results":[{"columns":[{"name":"affected_row_count","type":"BIGINT NOT NULL"}],"data":[[0]]}],"statement_types":["CREATE_TABLE"]}
{"statement":"CREATE TABLE pvuv_sink (\n user_id VARCHAR,\n item_id VARCHAR,\n category_id VARCHAR,\n behavior VARCHAR,\n ts
BIGINT\n) WITH (\n'connector.type' = 'jdbc',\n'connector.url' = 'jdbc:mysql://127.0.0.1:3306/flink_sql_platform',\n'connector.table' = 'pvuv_sink',\n'connector.username' = 'root',\n'connector.password' = '123456',\n'connector.write.flush.max-rows' = '1'\n)"}
{"results":[{"columns":[{"name":"affected_row_count","type":"BIGINT NOT NULL"}],"data":[[0]]}],"statement_types":["CREATE_TABLE"]}
{"statement":"INSERT INTO pvuv_sink \nSELECT\nuserId as user_id,\nitemId as item_id,\ncategoryId as category_id,\nbehavior,\ntimestamp
as ts\nFROM trade_datasource where behavior = 'pv'"}
{"errors":["Internal server error.","<Exception on server side:\ncom.ververica.flink.table.gateway.SqlExecutionException: Invalid SQL update statement.\n\tat com.ververica.flink.table.gateway.operation.InsertOperation.executeUpdateInternal(InsertOperation.java:157)\n\tat com.ververica.flink.table.gateway.operation.InsertOperation.execute(InsertOperation.java:80)\n\tat com.ververica.flink.table.gateway.Session.runStatement(Session.java:95)\n\tat com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler.handleRequest(StatementExecuteHandler.java:81)\n\tat com.ververica.flink.table.gateway.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:77)\n\tat com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:178)\n\tat com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:75)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)\n\tat org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:110)\n\tat org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:89)\n\tat org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:54)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)\n\tat org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)\n\tat org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:174)\n\tat org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:68)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)\n\tat org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)\n\tat org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1421)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)\n\tat org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)\n\tat org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.\n\tat org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)\n\tat org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)\n\tat org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156)\n\tat org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65)\n\tat org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65)\n\tat org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76)\n\tat org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)\n\tat org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)\n\tat org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)\n\tat org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)\n\tat org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)\n\tat org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)\n\tat org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)\n\tat org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)\n\tat org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)\n\tat org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)\n\tat org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)\n\tat org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)\n\tat org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)\n\tat org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)\n\tat org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)\n\tat org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)\n\tat org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)\n\tat org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.sqlUpdate(StreamTableEnvironmentImpl.java:331)\n\tat com.ververica.flink.table.gateway.operation.InsertOperation.lambda$executeUpdateInternal$1(InsertOperation.java:148)\n\tat com.ververica.flink.table.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:230)\n\tat com.ververica.flink.table.gateway.operation.InsertOperation.executeUpdateInternal(InsertOperation.java:145)\n\t... 46 more\nCaused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in\nthe classpath.\n\nReason: Required context properties mismatch.\n\nThe following properties are requested:\nconnector.properties.bootstrap.servers=localhost:9092\nconnector.properties.zookeeper.connect=localhost:2181\nconnector.startup-mode=earliest-offset\nconnector.topic=user_trade\nconnector.type=kafka\nconnector.version=universal\nformat.type=json\nschema.0.data-type=VARCHAR(2147483647)\nschema.0.name=userId\nschema.1.data-type=VARCHAR(2147483647)\nschema.1.name=itemId\nschema.2.data-type=VARCHAR(2147483647)\nschema.2.name=categoryId\nschema.3.data-type=VARCHAR(2147483647)\nschema.3.name=behavior\nschema.4.data-type=BIGINT\nschema.4.name=timestamp\n\nThe following factories have been considered:\norg.apache.flink.table.sources.CsvBatchTableSourceFactory\norg.apache.flink.table.sources.CsvAppendTableSourceFactory\n\tat org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)\n\tat org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)\n\tat org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)\n\tat org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)\n\tat org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52)\n\t... 72 more\n\nEnd of exception on server side>"]}
hi godfrey he, will the flink sql gateway support this feature in the future
which you can see here https://issues.apache.org/jira/browse/FLINK-17326
ca41046 Updates Flink version to 1.13 without adjustment to the interfaces changes in 1.13, so the master branch doesn't compile.
For example you need to export FLINK_HOME (as usual) and install jq to make them run
sudo apt install jq
i deploy gateway on flink-1.11.1,but create table doesn't support flink1.11.1's features
eg:
fllink1.11.1 like this ok:
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
)
flink1.10 & gateway like this ok:
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector.type' = 'kafka',
'connector.topic' = 'user_behavior',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.group.id' = 'testGroup',
'format.type' = 'json',
'format.fail-on-missing-field' = 'false'
)
gateway is same to flink1.10.
any suggest? thanks @godfreyhe @tsreaper
important: gateway doesn't support 'json.ignore-parse-errors' = 'true' by json
Flink 1.11 introduces the Application Mode as a deployment option, does the flink sql gateway support?
Currently, job operations return result kind SUCCESS_WITH_CONTENT
with Flink job id as data. This may mix up with results from select operations (e.g. select from a table with job_id
column).
Maybe we can add a new result type SUCCESS_WITH_JOB
to avoid confusion.
Currently, users can only use a random port as the gateway port to support multiple select statements, but it's vital to limit the port range in a production environment.
hi, guys...I want upgrade flink dependencies version to 1.12, what risks do i attention to? or will it upgrade 1.12 recently?
Currently, when I add a JDBC catalog to the YAML file I need to add the last '/' to the url.
The following config works:
catalogs:
- name: mypg
type: jdbc
default-database: postgres
username: postgres
password: mysecretpassword
base-url: jdbc:postgresql://localhost:5432/
but this doesn't:
base-url: jdbc:postgresql://localhost:5432
I tested yarn per-job execution mode, submitted job successfully via http://host1:8083/v1/sessions/ec4f81cee3fe64f6a484844958531c4d/statements
.
However, it does not work for job status and cancel API.
According to debugging code, I found the key point is invalid yarn application id in AbstractJobOperation#bridgeClientRequest
. I commit PR, please help to review it, thanks.
For our use case, it made sense to create an openapi 3.0.0 spec to autogenerate a typescript client. This spec is based on the current rest interface.
We would like to contribute back to the community for others.
e.g. Trying to point it to https://github.com/ververica/sql-training/wiki/Setting-up-the-Training-Environment:
./bin/sql-gateway.sh
FLINK_HOME is not found in environment variable.
Configures the FLINK_HOME environment variable using the following command: export FLINK_HOME=<flink-install-dir>
export FLINK_HOME=.
./bin/sql-gateway.sh
./bin/sql-gateway.sh: line 85: ./config.sh: No such file or directory
./bin/sql-gateway.sh: line 91: constructFlinkClassPath: command not found
./bin/sql-gateway.sh: line 102: manglePathList: command not found
./bin/sql-gateway.sh: line 102: manglePath: command not found
./bin/sql-gateway.sh: line 102: exec: -D: invalid option
exec: usage: exec [-cl] [-a name] [command [arguments ...]] [redirection ...]
hi,I run CreateTableOperationTest.java with my insert code in test source,but occur exception,Looking forward to your reply
below is my code and exception
code:
public class CreateTableOperationTest extends OperationTestBase {
@Test
public void testCreateTable() {
final String ddlTemplate = "create table %s(\n" +
" a int,\n" +
" b bigint,\n" +
" c varchar\n" +
") with (\n" +
" 'connector.type'='filesystem',\n" +
" 'format.type'='csv',\n" +
" 'connector.path'='file:///tmp/MyTable1'\n" +
")\n";
CreateTableOperation operation = new CreateTableOperation(context, String.format(ddlTemplate, "MyTable1"));
ResultSet resultSet = operation.execute();
assertEquals(OperationUtil.AFFECTED_ROW_COUNT0, resultSet);
String[] tables = context.getExecutionContext().getTableEnvironment().listTables();
assertArrayEquals(new String[] { "MyTable1" }, tables);
InsertOperation insertOperation = new InsertOperation(context, "INSERT INTO MyTable1 select 1,1,'test'");
ResultSet execute = insertOperation.execute();
System.out.println(execute);
}
}
Exception:
java.lang.RuntimeException: Error running SQL job.
at com.ververica.flink.table.gateway.operation.InsertOperation.executeUpdateInternal(InsertOperation.java:201)
at com.ververica.flink.table.gateway.operation.InsertOperation.execute(InsertOperation.java:84)
at com.ververica.flink.table.gateway.operation.CreateTableOperationTest.testCreateTable(CreateTableOperationTest.java:52)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:564)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.lang.RuntimeException: Could not execute program.
at com.ververica.flink.table.gateway.ProgramDeployer.deploy(ProgramDeployer.java:83)
at com.ververica.flink.table.gateway.operation.InsertOperation.executeUpdateInternal(InsertOperation.java:198)
... 25 more
Caused by: java.lang.RuntimeException: Couldn't retrieve standalone cluster
at org.apache.flink.client.deployment.StandaloneClusterDescriptor.lambda$retrieve$0(StandaloneClusterDescriptor.java:53)
at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:64)
at com.ververica.flink.table.gateway.ProgramDeployer.deploy(ProgramDeployer.java:81)
... 26 more
Caused by: java.lang.NullPointerException: rest.address must be set
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.getWebMonitorAddress(HighAvailabilityServicesUtils.java:196)
at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createClientHAService(HighAvailabilityServicesUtils.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.(RestClusterClient.java:161)
at org.apache.flink.client.deployment.StandaloneClusterDescriptor.lambda$retrieve$0(StandaloneClusterDescriptor.java:51)
... 28 more
we are going to build our data computing system base on flink sql.
for now, with flink 1.11.0, we had achived a milestone: consuming from kafka, then select from dynamic table, and write results to mysql.
but, when we test the exactly once(end to end), we found problem.
official documentation about flink sql do ous no favor. I need help
sql file:
-- source, 使用计算列,uuid()在线生成uuid
CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP(3),
uuid as uuid()
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'user_behavior',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.0.key' = 'zookeeper.connect',
'connector.properties.0.value' = 'localhost:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'localhost:9092',
'connector.properties.2.key' = 'group.id',
'connector.properties.2.value' = 'test-consumer-group12',
'update-mode' = 'append',
'format.type' = 'json',
'format.derive-schema' = 'true'
);
-- sink
CREATE TABLE pvuv_sink (
uuid varchar,
dt VARCHAR,
pv BIGINT,
uv BIGINT
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3306/flink_test',
'connector.table' = 'pvuv_sink13',
'connector.username' = 'root',
'connector.password' = '123456',
'connector.write.flush.max-rows' = '1',
'connector.sink.semantic' = 'exactly-once'
);
INSERT INTO pvuv_sink
SELECT
uuid,
DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
COUNT(*) AS pv,
COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00'), uuid;
sql parse and concat file:
/**
这是进行命令解析和提交的程序,整个工程入口
*/
public class SqlSubmit {
public static void main(String[] args) throws Exception {
// 解析命令行参数
final CliOptions options = CliOptionsParser.parseClient(args);
// 将解析好的命令行参数传递给SqlSubmit
SqlSubmit submit = new SqlSubmit(options);
// 运行程序
submit.run();
}
// --------------------------------------------------------------------------------------------
private String sqlFilePath;
private TableEnvironment tEnv;
// 获取到sql执行文件的路径
private SqlSubmit(CliOptions options) {
this.sqlFilePath = options.getSqlFilePath();
}
private void run() throws Exception {
// 创建flink执行的上下文对象
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
this.tEnv = StreamTableEnvironment.create(environment,
EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build());
// 获取所有的sql文件行内容,转为字符串list
List<String> sql = Files.readAllLines(Paths.get(sqlFilePath));
List<SqlCommandParser.SqlCommandCall> calls = SqlCommandParser.parse(sql);
if (calls.size() == 0) {
//no sql to execute
throw new RuntimeException("There is no sql statement to execute,please check your sql file: " + sqlFilePath);
}
for (SqlCommandParser.SqlCommandCall call : calls) {
// System.out.println(call.command.toString());
callCommand(call);
}
}
// --------------------------------------------------------------------------------------------
private void callCommand(SqlCommandParser.SqlCommandCall cmdCall) {
switch (cmdCall.command) {
case SET:
callSet(cmdCall);
break;
case CREATE_TABLE:
callCreateTable(cmdCall);
break;
case INSERT_INTO:
callInsertInto(cmdCall);
break;
default:
throw new RuntimeException("Unsupported command: " + cmdCall.command);
}
}
private void callSet(SqlCommandParser.SqlCommandCall cmdCall) {
String key = cmdCall.operands[0];
String value = cmdCall.operands[1];
tEnv.getConfig().getConfiguration().setString(key, value);
System.out.println("设置 " + key + "-->" + value + " 成功");
}
private void callCreateTable(SqlCommandParser.SqlCommandCall cmdCall) {
String ddl = cmdCall.operands[0];
try {
tEnv.executeSql(ddl);
} catch (SqlParserException e) {
throw new RuntimeException("SQL parse failed:\n" + ddl + "\n", e);
}
String tableName = ddl.split("\\s+")[2];
System.out.println("创建表 " + tableName + " 成功");
}
private void callInsertInto(SqlCommandParser.SqlCommandCall cmdCall) {
String dml = cmdCall.operands[0];
Optional<JobClient> jobClient;
try {
TableResult result = tEnv.executeSql(dml);
jobClient = result.getJobClient();
} catch (SqlParserException e) {
throw new RuntimeException("SQL parse failed:\n" + dml + "\n", e);
}
if (jobClient.isPresent()) {
JobID jobID = jobClient.get().getJobID();
System.out.println("任务提交成功,JobId: " + jobID);
}
}
}
shell to submit a job:
#!/bin/bash
export FLINK_HOME=/Users/hulc/developEnv/flink-1.11.0
sql_file=$2
# flink home检查
if [ -z "$FLINK_HOME" ];then
echo "请指定FLINK_HOME 或者在该配置文件中配置"
exit 1
fi
# 参数数量检查
if [ $# -lt 2 ];then
echo "命令格式为 ./sql-submit.sh -f "
exit 1
fi
# 要依赖的jar包,这里名字是写死的,后去可以使用传入参数
# SQL_JAR=./flink-sql-submit-1.0-SNAPSHOT.jar
SQL_JAR=./target/flink-test1-1.0-SNAPSHOT.jar
# 检查是否正确加载这个jar包
if [ -f $SQL_JAR ];then
echo "date +%Y-%m-%d" "%H:%M:%S
load jars from ${SQL_JAR}"
else
echo "failed to load dependent jars for sql-submit.sh,please specify it"
exit 1
fi
# 检查是否指定sql文件
if [ ! -f $sql_file ];then
echo "sql文件 $sql_file 不存在,请检查文件路径"
exit 1
fi
#提交命令, 注意这里的提交参数也是写死的,并行度5 main主类全名, 工程打出的jar包
# $1 就是 -f,也就是制定需要执行的文件参数
# $sql_file 就是制定需要执行的sql文件
if [ $1 = "-f" ];then
$FLINK_HOME/bin/flink run -p 1 -c SqlSubmit /Users/hulc/develop/flink-test1/target/flink-test1-1.0-SNAPSHOT.jar $1 $sql_file
else
echo "命令格式为 ./sql-submit.sh -f "
exit 1
fi
There is a flink-sql-gateway in flink docs, it says, "In the future, the community plans to extend its functionality by providing a REST-based SQL Client Gateway, see more in FLIP-24 and FLIP-91".
In my opinion, the intention of flink-sql-gateway and flink-sql-client-gateway are the same.
How to consider the future development of this project?
On:
https://gethue.com/blog/tutorial-query-live-data-stream-with-flink-sql/
And the integration keeps getting better!
The error happens when I submit a query job using beeline:
2020-02-28 16:41:18,504 INFO com.ververica.flink.table.gateway.Session - Session: 98099e3b3467395cffae698f9d3cbb51, getresult for job: 72e7d9d448deecae9d9d98efbaa5d263, token: 24, maxFetchSize: 0
2020-02-28 16:41:18,545 INFO com.ververica.flink.table.gateway.Session - Session: 98099e3b3467395cffae698f9d3cbb51, getresult for job: 72e7d9d448deecae9d9d98efbaa5d263, token: 25, maxFetchSize: 0
2020-02-28 16:41:18,561 INFO com.ververica.flink.table.gateway.Session - Session: 98099e3b3467395cffae698f9d3cbb51, cancel job: 72e7d9d448deecae9d9d98efbaa5d263
2020-02-28 16:41:18,561 INFO com.ververica.flink.table.gateway.operation.SelectOperation - Session: 98099e3b3467395cffae698f9d3cbb51. Start to cancel job 72e7d9d448deecae9d9d98efbaa5d263 and result retrieval.
2020-02-28 16:41:18,577 INFO org.apache.flink.yarn.YarnClusterDescriptor - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2020-02-28 16:41:18,579 INFO org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing over to rm123
2020-02-28 16:41:18,583 ERROR com.ververica.flink.table.gateway.operation.SelectOperation - Session: 98099e3b3467395cffae698f9d3cbb51, job: 72e7d9d448deecae9d9d98efbaa5d263. Could not retrieve or create a cluster.
org.apache.flink.client.deployment.ClusterRetrieveException: Couldn't retrieve Yarn cluster
at org.apache.flink.yarn.YarnClusterDescriptor.retrieve(YarnClusterDescriptor.java:366)
at org.apache.flink.yarn.YarnClusterDescriptor.retrieve(YarnClusterDescriptor.java:123)
at com.ververica.flink.table.gateway.operation.SelectOperation.cancelQueryInternal(SelectOperation.java:299)
at com.ververica.flink.table.gateway.operation.SelectOperation.cancelJob(SelectOperation.java:132)
at com.ververica.flink.table.gateway.Session.cancelJob(Session.java:101)
at com.ververica.flink.table.gateway.rest.handler.JobCancelHandler.handleRequest(JobCancelHandler.java:68)
at com.ververica.flink.table.gateway.rest.handler.AbstractRestHandler.respondToRequest(Ab
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.