Giter Club home page Giter Club logo

flink-sql-gateway's People

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

flink-sql-gateway's Issues

[BUG]Createsession reports an error after configuring HiveCatalogs

Exception Msg:
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.CatalogFactory' in

Reason for error :
ExecutionContext.createCatalog and FactoryUtil.createCatalog are implemented differently.
It works fine after I replace ExecutionContext.createCatalog with FactoryUtil.createCatalog.

Results not visible on Hue UI even though job completed successfully on flink cluster

Hi,

I'm running the following setup:
Hue: 4.9
Flink: 1.12
Flink Sql Gateway: compiled from source based on https://github.com/ververica/flink-sql-gateway/releases/tag/flink-1.12.0

Flink running as a Yarn Session using below command:

flink-yarn-session -n 2 -s 4 -tm 16GB -d

When I submit a sql query from Hue I'm able to see the job running under Flink Dashboard and also shows correct records sent in output back to sql gateway sink but then nothing appears on Hue UI.
I'm running this without passing execution.target: yarn-per-job parameter in $FLINK_HOME/conf/flink-conf.yaml

If I pass that parameter and start flink sql gateway and submit queries I get the following error:
There was a mention about passing application id of flink job on Yarn in flink-conf.yaml but didn't see it documented anywhere. Could you share the details if that is causing this issue or is it something else?

21:55:24.555 [flink-rest-server-netty-worker-thread-1] ERROR com.ververica.flink.table.gateway.operation.SelectOperation - Session: 678f4ec903c4b996b7fe6eadacd3a06e. Error running SQL job.
java.lang.RuntimeException: Could not execute program.
at com.ververica.flink.table.gateway.deployment.ProgramDeployer.deploy(ProgramDeployer.java:86) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:246) [flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.operation.SelectOperation.execute(SelectOperation.java:87) [flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.rest.session.Session.runStatement(Session.java:106) [flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler.handleRequest(StatementExecuteHandler.java:81) [flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:77) [flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:178) [flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:75) [flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:208) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [flink-dist_2.11-1.12.1.jar:1.12.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
Caused by: java.lang.IllegalStateException
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:177) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:72) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at com.ververica.flink.table.gateway.deployment.ProgramDeployer.deploy(ProgramDeployer.java:84) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
... 47 more
21:55:24.559 [flink-rest-server-netty-worker-thread-1] ERROR com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler - Unhandled exception.
java.lang.RuntimeException: Error running SQL job.
at com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:249) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.operation.SelectOperation.execute(SelectOperation.java:87) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.rest.session.Session.runStatement(Session.java:106) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler.handleRequest(StatementExecuteHandler.java:81) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:77) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:178) [flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:75) [flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:208) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [flink-dist_2.11-1.12.1.jar:1.12.1]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_282]
Caused by: java.lang.RuntimeException: Could not execute program.
at com.ververica.flink.table.gateway.deployment.ProgramDeployer.deploy(ProgramDeployer.java:86) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:246) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
... 46 more
Caused by: java.lang.IllegalStateException
at org.apache.flink.util.Preconditions.checkState(Preconditions.java:177) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:72) ~[flink-dist_2.11-1.12.1.jar:1.12.1]
at com.ververica.flink.table.gateway.deployment.ProgramDeployer.deploy(ProgramDeployer.java:84) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
at com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:246) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
flink_sql_hue

Add Postgres tests

Investigate why this statement works

SELECT * FROM `mypg`.`postgres`.`public.link` 

but this does not:

DESCRIBE `mypg`.`postgres`.`public.link` 

session does not exist

ERROR com.ververica.flink.table.gateway.rest.session.SessionManager - Session: 65c5b8be4e2e2352a537130dd1029240 does not exist.
ERROR com.ververica.flink.table.gateway.rest.handler.SessionHeartbeatHandler - Exception occurred in REST handler: Session: 65c5b8be4e2e2352a537130dd1029240 does not exist.
ERROR com.ververica.flink.table.gateway.rest.session.SessionManager - Session: 65c5b8be4e2e2352a537130dd1029240 does not exist.
ERROR com.ververica.flink.table.gateway.rest.handler.SessionHeartbeatHandler - Exception occurred in REST handler: Session: 65c5b8be4e2e2352a537130dd1029240 does not exist.
ERROR com.ververica.flink.table.gateway.rest.session.Session - Session: 705917caa407a1c237e3f5fb9171c722, Failed to parse statement: USE impala::sddl_ext.c_cons
ERROR com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler - Exception occurred in REST handler: Failed to parse statement.

[Feature] I think we need to create session with application_id parameter when we use yarn-session mode

Currently, only the application_id parameter of the configuration file can be read when starting the sqlgateway server, which causes all users to use the same yarn-session, which causes a single application_id to carry too many jobs, and it is inconvenient to distinguish and manage different users. between jobs.I have already implemented this part of the function, I don't know when the community will discuss it sometimes

Support write query result to hive table

Currently, sql gateway can not support write the olap query result to hive, but to return the result to sql gateway. sometimes user want to persist the query result to hive table for later query.

conf error

while I add catalogs in sql-gateway-defaults.yaml:

catalogs:

  • name: catalog_hive
    type: hive
    hive-conf-dir: /opt/software/flink-sql-gateway-0.1-SNAPSHOT/conf //include hive-site.xml

the error below happens,help please.

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.CatalogFactory' in
the classpath.

Reason: Required context properties mismatch.

The following properties are requested:
hive-conf-dir=/opt/software/flink-sql-gateway-0.1-SNAPSHOT/conf
type=hive

The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113)
at com.ververica.flink.table.gateway.context.ExecutionContext.createCatalog(ExecutionContext.java:364)
at com.ververica.flink.table.gateway.context.ExecutionContext.lambda$null$4(ExecutionContext.java:543)
at java.util.HashMap.forEach(HashMap.java:1288)
at com.ververica.flink.table.gateway.context.ExecutionContext.lambda$initializeCatalogs$5(ExecutionContext.java:542)
at com.ververica.flink.table.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:239)
at com.ververica.flink.table.gateway.context.ExecutionContext.initializeCatalogs(ExecutionContext.java:541)
at com.ververica.flink.table.gateway.context.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:490)
at com.ververica.flink.table.gateway.context.ExecutionContext.(ExecutionContext.java:159)
at com.ververica.flink.table.gateway.context.ExecutionContext.(ExecutionContext.java:118)
at com.ververica.flink.table.gateway.context.ExecutionContext$Builder.build(ExecutionContext.java:751)
... 46 more

Table field 'X' does not match with the physical type error on query

When executing this query, we get an error from the gateway:

https://github.com/ververica/sql-training/wiki/Querying-Dynamic-Tables-with-SQL#average-number-of-persons-leaving-an-area-per-hour

SELECT area, SUM(psgSum)/24.0 AS avgPsgLeaving FROM (SELECT toAreaId(lon, lat) AS area, TUMBLE_END(rideTime ,INTERVAL '1' HOUR) AS t, SUM(psgCnt) AS psgSum FROM Rides WHERE isStart AND isInNYC(lon, lat) GROUP BY toAreaId(lon, lat), TUMBLE(rideTime, INTERVAL '1' HOUR)) GROUP BY area;

['Internal server error.', "<Exception on server side:\ncom.ververica.flink.table.gateway.SqlExecutionException: Invalid SQL query.\n\tat com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:253)\n\tat com.ververica.flink.table.gateway.operation.SelectOperation.execute(SelectOperation.java:89)\n\tat com.ververica.flink.table.gateway.Session.r..... .... org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.flink.table.api.ValidationException: Type DECIMAL(17, 6) of table field 'avgPsgLeaving' does not match with the physical type LEGACY('DECIMAL', 'DECIMAL') of the 'avgPsgLeaving' field of the TableSink consumed type.\n\tat org.apache.flink.table.utils.TypeMappingUtils.lambda$checkPhysicalLogicalTypeCompatible$4(TypeMappingUtils.java:164)\n\tat ....

Contribute to Apache Livy?

I recall seeing a backlog item in Apache Livy JIRA about being able to use their API for more than just Spark.

Any chance y'all could put that on the roadmap?

Update the README.md to remind users that the correct way to use sql gateway with session cluster executors.

Caused by: java.lang.RuntimeException: Could not execute program. at com.ververica.flink.table.gateway.deployment.ProgramDeployer.deploy(ProgramDeployer.java:83) ~[flink-sql-gateway-0.2-SNAPSHOT.jar:?] at com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:247) ~[flink-sql-gateway-0.2-SNAPSHOT.jar:?] ... 46 more Caused by: java.lang.IllegalStateException at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:61) ~[flink-dist_2.12-1.11.1.jar:1.11.1] at com.ververica.flink.table.gateway.deployment.ProgramDeployer.deploy(ProgramDeployer.java:81) ~[flink-sql-gateway-0.2-SNAPSHOT.jar:?] at com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:247) ~[flink-sql-gateway-0.2-SNAPSHOT.jar:?]

The program will thorw exception if we don't set yarn.application.id option in $FLINK_HOME/conf/flink-conf.yaml.

How to recover job in new session?

From README

Flink SQL gateway stores the session properties in memory now. If the service is stopped or crashed, all properties are lost.

Do you have any workaround or suggestions for this case to get previous running job's status or cancel it ? Thanks in advance.

Dropping a table with a Watermark column can fail

Just another one: "dropping a table with a Watermark" column fails:

"Could not drop a table from statement"
"Caused by: org.apache.flink.table.api.TableException: Watermark can not be defined for a processing time attribute column."

DROP table hue_logs

  log STRING,
  proctime AS PROCTIME(), 
  WATERMARK FOR proctime AS proctime - INTERVAL '5' SECOND 
) WITH (
  'connector' = 'kafka',
  'topic' = 'hue_logs',
  'properties.bootstrap.servers' = 'kafka:9094',
  'format' = 'json'
);```

```10:25:48.057 [flink-rest-server-netty-worker-thread-1] ERROR com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler - Unhandled exception.
com.ververica.flink.table.gateway.utils.SqlExecutionException: Could not drop a table from statement: DROP table hue_logs
.
	at com.ververica.flink.table.gateway.operation.DDLOperation.execute(DDLOperation.java:54) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
	at com.ververica.flink.table.gateway.rest.session.Session.runStatement(Session.java:106) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
	at com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler.handleRequest(StatementExecuteHandler.java:81) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
	at com.ververica.flink.table.gateway.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:77) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
	at com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:178) [flink-sql-gateway-0.3-SNAPSHOT.jar:?]
	at com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:75) [flink-sql-gateway-0.3-SNAPSHOT.jar:?]
	at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:110) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:89) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:54) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:177) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [flink-dist_2.11-1.12.0.jar:1.12.0]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_275]
Caused by: org.apache.flink.table.api.TableException: Watermark can not be defined for a processing time attribute column.
	at org.apache.flink.table.api.internal.CatalogTableSchemaResolver.resolve(CatalogTableSchemaResolver.java:90) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.table.catalog.CatalogManager.resolveTableSchema(CatalogManager.java:380) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:408) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.table.catalog.CatalogManager.dropTableInternal(CatalogManager.java:760) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.table.catalog.CatalogManager.dropTable(CatalogManager.java:725) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:781) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:751) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
	at com.ververica.flink.table.gateway.operation.DDLOperation.lambda$execute$0(DDLOperation.java:49) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
	at com.ververica.flink.table.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:170) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
	at com.ververica.flink.table.gateway.operation.DDLOperation.execute(DDLOperation.java:48) ~[flink-sql-gateway-0.3-SNAPSHOT.jar:?]
	... 45 more

Failed to parse statement

ERROR com.ververica.flink.table.gateway.rest.session.Session - Session: 705917caa407a1c237e3f5fb9171c722, Failed to parse statement: USE impala::sddl_ext.c_cons
ERROR com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler - Exception occurred in REST handler: Failed to parse statement.

Can't support temporal table join

SQL example:

CREATE VIEW v_inout_flow AS
SELECT tb.park_code,
       tb.park_name,
       tb.city_code,
       tb.city_name,
       ta.status,
       ta.event_time
  FROM ods_inout_flow AS ta
  JOIN dim_park FOR SYSTEM_TIME AS OF ta.proctime AS tb
    ON ta.org_code = tb.org_code;

The exception is:

Caused by: org.apache.flink.table.api.SqlParserException: SQL parse failed. Encountered "`dim_park`" at line 3, column 20.
Was expecting one of:
    "TABLE" ...
    "(" ...

	at org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:50)
	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:64)
	at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
	at com.ververica.flink.table.gateway.operation.CreateViewOperation.lambda$execute$0(CreateViewOperation.java:57)
	at com.ververica.flink.table.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:204)
	at com.ververica.flink.table.gateway.operation.CreateViewOperation.execute(CreateViewOperation.java:56)
	... 45 more

Job is still running and has a next_result_uri after reaching LIMIT

Hello,

Currently jobs keep running even after reaching the SQL LIMIT. What would be a way to detect the end of a query? (when all the results have been sent)

SELECT * FROM default_database.Fares LIMIT 3

`http://200.240.3.7:8083/v1/sessions/0e3930af55b0a3730ad605c18166e9d7/jobs/6baf6aa22f35c1dae4a07d5f762d3019/result/1

{"results":[{"columns":[{"name":"rideId","type":"BIGINT"},{"name":"payTime","type":"TIMESTAMP(3)"},{"name":"payMethod","type":"STRING"},{"name":"tip","type":"FLOAT"},{"name":"toll","type":"FLOAT"},{"name":"fare","type":"FLOAT"}],"data":[[65,"2013-01-01T00:00:36","CSH",0.0,0.0,3.5],[137,"2013-01-01T00:01:00","CSH",0.0,0.0,3.5],[77,"2013-01-01T00:01:22","CSH",0.0,0.0,4.0]],"change_flags":[true,true,true]}],"next_result_uri":"/v1/sessions/0e3930af55b0a3730ad605c18166e9d7/jobs/6baf6aa22f35c1dae4a07d5f762d3019/result/2"}`

`GET None http://200.240.3.7:8083/v1/sessions/0e3930af55b0a3730ad605c18166e9d7/jobs/6baf6aa22f35c1dae4a07d5f762d3019/status

{"status":"RUNNING"}`

and next call to fetch results have data":[] (which makes sense)

Thanks!

An exception occurred when executing the select script using SQL Gateway

$FLINK_HOME/conf/flink-conf.yaml
execution.target: yarn-session


An exception occurred when executing the SELECT script using SQL-Gateway

-INFO-2021/01/13 20:23:09,166-org.apache.flink.yarn.YarnClusterDescriptor.getLocalFlinkDistPath(YarnClusterDescriptor.java:196)-196-No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
-ERROR-2021/01/13 20:23:09,176-com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:249)-249-Session: 2c436c304a0b7a697c1e005ad34c32bc. Error running SQL job.
java.lang.RuntimeException: Could not execute program.
	at com.ververica.flink.table.gateway.deployment.ProgramDeployer.deploy(ProgramDeployer.java:83)
	at com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:247)
	at com.ververica.flink.table.gateway.operation.SelectOperation.execute(SelectOperation.java:87)
	at com.ververica.flink.table.gateway.rest.session.Session.runStatement(Session.java:106)
	at com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler.handleRequest(StatementExecuteHandler.java:81)
	at com.ververica.flink.table.gateway.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:77)
	at com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:178)
	at com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:75)
	at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:110)
	at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:89)
	at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:54)
	at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:174)
	at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:68)
	at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
	at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)
	at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
	at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1421)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
	at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException
	at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
	at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:61)
	at com.ververica.flink.table.gateway.deployment.ProgramDeployer.deploy(ProgramDeployer.java:81)
	... 47 more
-ERROR-2021/01/13 20:23:09,183-com.ververica.flink.table.gateway.rest.handler.AbstractHandler.handleException(AbstractHandler.java:224)-224-Unhandled exception.
java.lang.RuntimeException: Error running SQL job.
	at com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:250)
	at com.ververica.flink.table.gateway.operation.SelectOperation.execute(SelectOperation.java:87)
	at com.ververica.flink.table.gateway.rest.session.Session.runStatement(Session.java:106)
	at com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler.handleRequest(StatementExecuteHandler.java:81)
	at com.ververica.flink.table.gateway.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:77)
	at com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:178)
	at com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:75)
	at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:110)
	at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:89)
	at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:54)
	at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:174)
	at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:68)
	at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
	at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)
	at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
	at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
	at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1421)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
	at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
	at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Could not execute program.
	at com.ververica.flink.table.gateway.deployment.ProgramDeployer.deploy(ProgramDeployer.java:83)
	at com.ververica.flink.table.gateway.operation.SelectOperation.executeQueryInternal(SelectOperation.java:247)
	... 46 more
Caused by: java.lang.IllegalStateException
	at org.apache.flink.util.Preconditions.checkState(Preconditions.java:179)
	at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:61)
	at com.ververica.flink.table.gateway.deployment.ProgramDeployer.deploy(ProgramDeployer.java:81)
	... 47 more

when returning big amount of offline compute result, rpc call can be timeout

Ask timed out on [Actor[akka.tcp://flink@jssz-bigdata-datanode-1316:15841/user/taskmanager_0#-1168369699]] after [10000 ms]. Message of type [org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation].

the flink job throws a exception like this.if i add some limit in the query sentence, no exception will be found.

I have also see the akka.framesize related errors.

Can the sql-gateway support submitting jobs from VIEW

can sql-gateway support submit job through 'select * from xx_view'

sql statement is :
CREATE TABLE trade_datasource (
userId VARCHAR,
) WITH (
'connector.type' = 'kafka',
xx
);

CREATE TABLE pvuv_sink (
user_id VARCHAR,
) WITH (
'connector.type' = 'jdbc',
xx
);

--create VIEW from source table
create view trade_datasource_view as
select *
from trade_datasource
where userId> 100;

--submit job through VIEW
insert into pvuv_sink
select * from trade_datasource_view;

I got the following error:

020-04-23 03:15:01,929 INFO org.apache.flink.table.module.ModuleManager - Got FunctionDefinition proctime from module core
2020-04-23 03:15:01,930 ERROR com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler - Unhandled exception.
org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76)
at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)
at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:464)
at com.ververica.flink.table.gateway.operation.CreateViewOperation.execute(CreateViewOperation.java:55)
at com.ververica.flink.table.gateway.Session.runStatement(Session.java:84)
at com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler.handleRequest(StatementExecuteHandler.java:81)
at com.ververica.flink.table.gateway.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:77)
at com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:178)
at com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:75)
at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:110)
at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:89)
at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:54)
at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:174)
at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:68)
at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)
at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)
at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1421)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)
at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)
at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)
at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)
at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The following properties are requested:
connector.properties.bootstrap.servers=localhost:9092
connector.properties.zookeeper.connect=noneed
connector.startup-mode=latest-offset

....
schema.watermark.0.rowtime=requestDateTime
schema.watermark.0.strategy.data-type=TIMESTAMP(6)
schema.watermark.0.strategy.expr=requestDateTime - INTERVAL '5' SECOND

The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52)
... 66 more
2020-04-23 03:31:01,559 INFO com.ververica.flink.table.gateway.SessionManager - Start to remove expired session, current session count: 8
2020-04-23 03:31:01,560 INFO com.ververica.flink.table.gateway.SessionManager - Session: ddcff4b29d51f04a01cc0aa85e07a953 is expired, close it...
2020-04-23 03:31:01,560 INFO com.ververica.flink.table.gateway.SessionManager - Session: ddcff4b29d51f04a01cc0aa85e07a953 is closed.

Support submit job with different executor at runtime

Currently, flink-sql-gateway can only submit job with a specified executor which defined in flink-conf.yaml。But if someone wants to submit a job to yarn-session, and others want to do it with standalone cluster, then flink-sql-gateway can't satisfy this right now. So could flink-sql-gateway support this feature future。

And I also has a roughly idea that how flink-sql-gateway supports this feature. we can introduce a new component which I called ExecutionManager .ExecutionManager is like SessionManager, but it's main function is to creating a new Execution, each Execution represents an deploy mode(yarn-seesion, yarn-per-job, standalone, local, etc). The Execution will create a new SessionManager and will use user specified execution.target to instance a DefaultContext. By this way, every new session created by SessionManager can submit job to different cluster.

Furthermore, flink-sql-gateway do not need to create SessionManager at start time but instead of ExecutionManager, and the SessionManager will be instance by ExecutionManager.

And, we'll must implement a new netty handler that used to create/delete/get Execution.

Submit SQL stream task through REST api failed

Through the rest api "http://127.0.0.1:8083/v1/sessions/ "+ sessionId +"/statements" to submit stream task failed.

submit sql stream task to sql-gateway failed through rest api "http://127.0.0.1:8083/v1/sessions/ "+ sessionId +"/statements"

Steps:

  1. Create the session through api http://127.0.0.1:8083/v1/sessions

The request body is:

{
    "session_name": "test_session",
    "planner": "blink", 
    "execution_type": "streaming"
}
  1. Through api "http://127.0.0.1:8083/v1/sessions/ "+ sessionId +"/statements" request execution statement.
    The statement is:

CREATE TABLE trade_datasource (
userId VARCHAR,
itemId VARCHAR,
categoryId VARCHAR,
behavior VARCHAR,
timestamp BIGINT
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'user_trade',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'format.type' = 'json'
);

CREATE TABLE pvuv_sink (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts BIGINT
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://127.0.0.1:3306/flink_sql_platform',
'connector.table' = 'pvuv_sink',
'connector.username' = 'root',
'connector.password' = '123456',
'connector.write.flush.max-rows' = '1'
);

INSERT INTO pvuv_sink
SELECT
userId as user_id,
itemId as item_id,
categoryId as category_id,
behavior,
timestamp as ts
FROM trade_datasource where behavior = 'pv';

These three statements are executed correctly in Flink SQL client embedded. Execute one SQL at a time, with three requests. The response to the request is as follows. What is the problem?

{"statement":"CREATE TABLE trade_datasource (\n\tuserId VARCHAR,\n itemId VARCHAR,\n categoryId VARCHAR,\n behavior VARCHAR,\n timestamp BIGINT\n) WITH (\n 'connector.type' = 'kafka', \n 'connector.version' = 'universal', \n 'connector.topic' = 'user_trade', \n 'connector.startup-mode' = 'earliest-offset', \n 'connector.properties.zookeeper.connect' = 'localhost:2181', \n 'connector.properties.bootstrap.servers' = 'localhost:9092', \n 'format.type' = 'json'\n)"}

{"results":[{"columns":[{"name":"affected_row_count","type":"BIGINT NOT NULL"}],"data":[[0]]}],"statement_types":["CREATE_TABLE"]}

{"statement":"CREATE TABLE pvuv_sink (\n user_id VARCHAR,\n item_id VARCHAR,\n category_id VARCHAR,\n behavior VARCHAR,\n ts BIGINT\n) WITH (\n'connector.type' = 'jdbc',\n'connector.url' = 'jdbc:mysql://127.0.0.1:3306/flink_sql_platform',\n'connector.table' = 'pvuv_sink',\n'connector.username' = 'root',\n'connector.password' = '123456',\n'connector.write.flush.max-rows' = '1'\n)"}

{"results":[{"columns":[{"name":"affected_row_count","type":"BIGINT NOT NULL"}],"data":[[0]]}],"statement_types":["CREATE_TABLE"]}

{"statement":"INSERT INTO pvuv_sink \nSELECT\nuserId as user_id,\nitemId as item_id,\ncategoryId as category_id,\nbehavior,\ntimestamp as ts\nFROM trade_datasource where behavior = 'pv'"}

{"errors":["Internal server error.","<Exception on server side:\ncom.ververica.flink.table.gateway.SqlExecutionException: Invalid SQL update statement.\n\tat com.ververica.flink.table.gateway.operation.InsertOperation.executeUpdateInternal(InsertOperation.java:157)\n\tat com.ververica.flink.table.gateway.operation.InsertOperation.execute(InsertOperation.java:80)\n\tat com.ververica.flink.table.gateway.Session.runStatement(Session.java:95)\n\tat com.ververica.flink.table.gateway.rest.handler.StatementExecuteHandler.handleRequest(StatementExecuteHandler.java:81)\n\tat com.ververica.flink.table.gateway.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:77)\n\tat com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:178)\n\tat com.ververica.flink.table.gateway.rest.handler.AbstractHandler.channelRead0(AbstractHandler.java:75)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)\n\tat org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:110)\n\tat org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:89)\n\tat org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:54)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)\n\tat org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)\n\tat org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:174)\n\tat org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:68)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)\n\tat org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:328)\n\tat org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:302)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1421)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:697)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:632)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:549)\n\tat org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:511)\n\tat org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)\n\tat org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\tat java.lang.Thread.run(Thread.java:748)\nCaused by: org.apache.flink.table.api.TableException: findAndCreateTableSource failed.\n\tat org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:55)\n\tat org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:92)\n\tat org.apache.flink.table.planner.plan.schema.CatalogSourceTable.findAndCreateTableSource(CatalogSourceTable.scala:156)\n\tat org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource$lzycompute(CatalogSourceTable.scala:65)\n\tat org.apache.flink.table.planner.plan.schema.CatalogSourceTable.tableSource(CatalogSourceTable.scala:65)\n\tat org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:76)\n\tat org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3328)\n\tat org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2357)\n\tat org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)\n\tat org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2005)\n\tat org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:646)\n\tat org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:627)\n\tat org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3181)\n\tat org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:563)\n\tat org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:148)\n\tat org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:135)\n\tat org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:522)\n\tat org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:436)\n\tat org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:154)\n\tat org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:342)\n\tat org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:142)\n\tat org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66)\n\tat org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484)\n\tat org.apache.flink.table.api.java.internal.StreamTableEnvironmentImpl.sqlUpdate(StreamTableEnvironmentImpl.java:331)\n\tat com.ververica.flink.table.gateway.operation.InsertOperation.lambda$executeUpdateInternal$1(InsertOperation.java:148)\n\tat com.ververica.flink.table.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:230)\n\tat com.ververica.flink.table.gateway.operation.InsertOperation.executeUpdateInternal(InsertOperation.java:145)\n\t... 46 more\nCaused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in\nthe classpath.\n\nReason: Required context properties mismatch.\n\nThe following properties are requested:\nconnector.properties.bootstrap.servers=localhost:9092\nconnector.properties.zookeeper.connect=localhost:2181\nconnector.startup-mode=earliest-offset\nconnector.topic=user_trade\nconnector.type=kafka\nconnector.version=universal\nformat.type=json\nschema.0.data-type=VARCHAR(2147483647)\nschema.0.name=userId\nschema.1.data-type=VARCHAR(2147483647)\nschema.1.name=itemId\nschema.2.data-type=VARCHAR(2147483647)\nschema.2.name=categoryId\nschema.3.data-type=VARCHAR(2147483647)\nschema.3.name=behavior\nschema.4.data-type=BIGINT\nschema.4.name=timestamp\n\nThe following factories have been considered:\norg.apache.flink.table.sources.CsvBatchTableSourceFactory\norg.apache.flink.table.sources.CsvAppendTableSourceFactory\n\tat org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)\n\tat org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)\n\tat org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)\n\tat org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)\n\tat org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:52)\n\t... 72 more\n\nEnd of exception on server side>"]}

Document e2e tests

For example you need to export FLINK_HOME (as usual) and install jq to make them run

sudo apt install jq

it doesn't support flink-1.11.1? it looks not work well on flink1.11.1

i deploy gateway on flink-1.11.1,but create table doesn't support flink1.11.1's features
eg:
fllink1.11.1 like this ok:
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
)
flink1.10 & gateway like this ok:
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3)
) WITH (
'connector.type' = 'kafka',
'connector.topic' = 'user_behavior',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.group.id' = 'testGroup',
'format.type' = 'json',
'format.fail-on-missing-field' = 'false'
)

gateway is same to flink1.10.
any suggest? thanks @godfreyhe @tsreaper
important: gateway doesn't support 'json.ignore-parse-errors' = 'true' by json

Add new result type to indicate job operations

Currently, job operations return result kind SUCCESS_WITH_CONTENT with Flink job id as data. This may mix up with results from select operations (e.g. select from a table with job_id column).

Maybe we can add a new result type SUCCESS_WITH_JOB to avoid confusion.

Support setting ranges for gateway-port

Currently, users can only use a random port as the gateway port to support multiple select statements, but it's vital to limit the port range in a production environment.

JDBC base URL must handle trailing slash ('/')

Currently, when I add a JDBC catalog to the YAML file I need to add the last '/' to the url.

The following config works:

catalogs:
   - name: mypg
     type: jdbc
     default-database: postgres
     username: postgres
     password: mysecretpassword
     base-url: jdbc:postgresql://localhost:5432/

but this doesn't:

     base-url: jdbc:postgresql://localhost:5432

Support job status and cancel API for yarn per-job execution mode

I tested yarn per-job execution mode, submitted job successfully via http://host1:8083/v1/sessions/ec4f81cee3fe64f6a484844958531c4d/statements.

However, it does not work for job status and cancel API.

According to debugging code, I found the key point is invalid yarn application id in AbstractJobOperation#bridgeClientRequest. I commit PR, please help to review it, thanks.

Is there a way to point the gateway to a Flink in docker container?

e.g. Trying to point it to https://github.com/ververica/sql-training/wiki/Setting-up-the-Training-Environment:

./bin/sql-gateway.sh 
FLINK_HOME is not found in environment variable.
Configures the FLINK_HOME environment variable using the following command: export FLINK_HOME=<flink-install-dir>
export FLINK_HOME=.
./bin/sql-gateway.sh 
./bin/sql-gateway.sh: line 85: ./config.sh: No such file or directory
./bin/sql-gateway.sh: line 91: constructFlinkClassPath: command not found
./bin/sql-gateway.sh: line 102: manglePathList: command not found
./bin/sql-gateway.sh: line 102: manglePath: command not found
./bin/sql-gateway.sh: line 102: exec: -D: invalid option
exec: usage: exec [-cl] [-a name] [command [arguments ...]] [redirection ...]

rest.address must be set

hi,I run CreateTableOperationTest.java with my insert code in test source,but occur exception,Looking forward to your reply
below is my code and exception

code:
public class CreateTableOperationTest extends OperationTestBase {

@Test
public void testCreateTable() {

	final String ddlTemplate = "create table %s(\n" +
		"  a int,\n" +
		"  b bigint,\n" +
		"  c varchar\n" +
		") with (\n" +
		"  'connector.type'='filesystem',\n" +
		"  'format.type'='csv',\n" +
		"  'connector.path'='file:///tmp/MyTable1'\n" +
		")\n";
	CreateTableOperation operation = new CreateTableOperation(context, String.format(ddlTemplate, "MyTable1"));
	ResultSet resultSet = operation.execute();
	assertEquals(OperationUtil.AFFECTED_ROW_COUNT0, resultSet);

	String[] tables = context.getExecutionContext().getTableEnvironment().listTables();
	assertArrayEquals(new String[] { "MyTable1" }, tables);
	InsertOperation insertOperation = new InsertOperation(context, "INSERT INTO MyTable1 select 1,1,'test'");
	ResultSet execute = insertOperation.execute();
	System.out.println(execute);

}

}

Exception:

java.lang.RuntimeException: Error running SQL job.

at com.ververica.flink.table.gateway.operation.InsertOperation.executeUpdateInternal(InsertOperation.java:201)
at com.ververica.flink.table.gateway.operation.InsertOperation.execute(InsertOperation.java:84)
at com.ververica.flink.table.gateway.operation.CreateTableOperationTest.testCreateTable(CreateTableOperationTest.java:52)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:564)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)

Caused by: java.lang.RuntimeException: Could not execute program.
at com.ververica.flink.table.gateway.ProgramDeployer.deploy(ProgramDeployer.java:83)
at com.ververica.flink.table.gateway.operation.InsertOperation.executeUpdateInternal(InsertOperation.java:198)
... 25 more
Caused by: java.lang.RuntimeException: Couldn't retrieve standalone cluster
at org.apache.flink.client.deployment.StandaloneClusterDescriptor.lambda$retrieve$0(StandaloneClusterDescriptor.java:53)
at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:64)
at com.ververica.flink.table.gateway.ProgramDeployer.deploy(ProgramDeployer.java:81)
... 26 more
Caused by: java.lang.NullPointerException: rest.address must be set
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.getWebMonitorAddress(HighAvailabilityServicesUtils.java:196)
at org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils.createClientHAService(HighAvailabilityServicesUtils.java:146)
at org.apache.flink.client.program.rest.RestClusterClient.(RestClusterClient.java:161)
at org.apache.flink.client.deployment.StandaloneClusterDescriptor.lambda$retrieve$0(StandaloneClusterDescriptor.java:51)
... 28 more

does this framework support exactly once semantic?

we are going to build our data computing system base on flink sql.
for now, with flink 1.11.0, we had achived a milestone: consuming from kafka, then select from dynamic table, and write results to mysql.

but, when we test the exactly once(end to end), we found problem.

official documentation about flink sql do ous no favor. I need help

sql file:
-- source, 使用计算列,uuid()在线生成uuid
CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP(3),
uuid as uuid()
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'user_behavior',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.0.key' = 'zookeeper.connect',
'connector.properties.0.value' = 'localhost:2181',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'localhost:9092',
'connector.properties.2.key' = 'group.id',
'connector.properties.2.value' = 'test-consumer-group12',
'update-mode' = 'append',
'format.type' = 'json',
'format.derive-schema' = 'true'
);

-- sink
CREATE TABLE pvuv_sink (
uuid varchar,
dt VARCHAR,
pv BIGINT,
uv BIGINT
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3306/flink_test',
'connector.table' = 'pvuv_sink13',
'connector.username' = 'root',
'connector.password' = '123456',
'connector.write.flush.max-rows' = '1',
'connector.sink.semantic' = 'exactly-once'
);

INSERT INTO pvuv_sink
SELECT
uuid,
DATE_FORMAT(ts, 'yyyy-MM-dd HH:00') dt,
COUNT(*) AS pv,
COUNT(DISTINCT user_id) AS uv
FROM user_log
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd HH:00'), uuid;

sql parse and concat file:
/**

  • 这是进行命令解析和提交的程序,整个工程入口
    */
    public class SqlSubmit {

    public static void main(String[] args) throws Exception {
    // 解析命令行参数
    final CliOptions options = CliOptionsParser.parseClient(args);

     // 将解析好的命令行参数传递给SqlSubmit
     SqlSubmit submit = new SqlSubmit(options);
    
     // 运行程序
     submit.run();
    

    }

    // --------------------------------------------------------------------------------------------
    private String sqlFilePath;
    private TableEnvironment tEnv;

    // 获取到sql执行文件的路径
    private SqlSubmit(CliOptions options) {
    this.sqlFilePath = options.getSqlFilePath();
    }

    private void run() throws Exception {
    // 创建flink执行的上下文对象
    StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    this.tEnv = StreamTableEnvironment.create(environment,
    EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build());

     // 获取所有的sql文件行内容,转为字符串list
     List<String> sql = Files.readAllLines(Paths.get(sqlFilePath));
     List<SqlCommandParser.SqlCommandCall> calls = SqlCommandParser.parse(sql);
     if (calls.size() == 0) {
         //no sql to execute
         throw new RuntimeException("There is no sql statement to execute,please check your sql file: " + sqlFilePath);
     }
     for (SqlCommandParser.SqlCommandCall call : calls) {
    

// System.out.println(call.command.toString());
callCommand(call);
}
}

// --------------------------------------------------------------------------------------------

private void callCommand(SqlCommandParser.SqlCommandCall cmdCall) {
    switch (cmdCall.command) {
        case SET:
            callSet(cmdCall);
            break;
        case CREATE_TABLE:
            callCreateTable(cmdCall);
            break;
        case INSERT_INTO:
            callInsertInto(cmdCall);
            break;
        default:
            throw new RuntimeException("Unsupported command: " + cmdCall.command);
    }
}

private void callSet(SqlCommandParser.SqlCommandCall cmdCall) {
    String key = cmdCall.operands[0];
    String value = cmdCall.operands[1];
    tEnv.getConfig().getConfiguration().setString(key, value);
    System.out.println("设置 " + key + "-->" + value + " 成功");
}

private void callCreateTable(SqlCommandParser.SqlCommandCall cmdCall) {
    String ddl = cmdCall.operands[0];
    try {
        tEnv.executeSql(ddl);
    } catch (SqlParserException e) {
        throw new RuntimeException("SQL parse failed:\n" + ddl + "\n", e);
    }
    String tableName = ddl.split("\\s+")[2];
    System.out.println("创建表 " + tableName + " 成功");
}

private void callInsertInto(SqlCommandParser.SqlCommandCall cmdCall) {
    String dml = cmdCall.operands[0];
    Optional<JobClient> jobClient;
    try {
        TableResult result = tEnv.executeSql(dml);
        jobClient = result.getJobClient();
    } catch (SqlParserException e) {
        throw new RuntimeException("SQL parse failed:\n" + dml + "\n", e);
    }

    if (jobClient.isPresent()) {
        JobID jobID = jobClient.get().getJobID();
        System.out.println("任务提交成功,JobId: " + jobID);
    }
}

}

shell to submit a job:
#!/bin/bash
export FLINK_HOME=/Users/hulc/developEnv/flink-1.11.0

sql_file=$2
# flink home检查
if [ -z "$FLINK_HOME" ];then
echo "请指定FLINK_HOME 或者在该配置文件中配置"
exit 1
fi

# 参数数量检查
if [ $# -lt 2 ];then
echo "命令格式为 ./sql-submit.sh -f "
exit 1
fi

# 要依赖的jar包,这里名字是写死的,后去可以使用传入参数
# SQL_JAR=./flink-sql-submit-1.0-SNAPSHOT.jar
SQL_JAR=./target/flink-test1-1.0-SNAPSHOT.jar
# 检查是否正确加载这个jar包
if [ -f $SQL_JAR ];then
echo "date +%Y-%m-%d" "%H:%M:%S load jars from ${SQL_JAR}"
else
echo "failed to load dependent jars for sql-submit.sh,please specify it"
exit 1
fi

# 检查是否指定sql文件
if [ ! -f $sql_file ];then
echo "sql文件 $sql_file 不存在,请检查文件路径"
exit 1
fi

#提交命令, 注意这里的提交参数也是写死的,并行度5 main主类全名, 工程打出的jar包
# $1 就是 -f,也就是制定需要执行的文件参数
# $sql_file 就是制定需要执行的sql文件
if [ $1 = "-f" ];then
$FLINK_HOME/bin/flink run -p 1 -c SqlSubmit /Users/hulc/develop/flink-test1/target/flink-test1-1.0-SNAPSHOT.jar $1 $sql_file
else
echo "命令格式为 ./sql-submit.sh -f "
exit 1
fi

yarn session mode,query fail

The error happens when I submit a query job using beeline:


2020-02-28 16:41:18,504 INFO com.ververica.flink.table.gateway.Session - Session: 98099e3b3467395cffae698f9d3cbb51, getresult for job: 72e7d9d448deecae9d9d98efbaa5d263, token: 24, maxFetchSize: 0
2020-02-28 16:41:18,545 INFO com.ververica.flink.table.gateway.Session - Session: 98099e3b3467395cffae698f9d3cbb51, getresult for job: 72e7d9d448deecae9d9d98efbaa5d263, token: 25, maxFetchSize: 0
2020-02-28 16:41:18,561 INFO com.ververica.flink.table.gateway.Session - Session: 98099e3b3467395cffae698f9d3cbb51, cancel job: 72e7d9d448deecae9d9d98efbaa5d263
2020-02-28 16:41:18,561 INFO com.ververica.flink.table.gateway.operation.SelectOperation - Session: 98099e3b3467395cffae698f9d3cbb51. Start to cancel job 72e7d9d448deecae9d9d98efbaa5d263 and result retrieval.
2020-02-28 16:41:18,577 INFO org.apache.flink.yarn.YarnClusterDescriptor - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2020-02-28 16:41:18,579 INFO org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider - Failing over to rm123
2020-02-28 16:41:18,583 ERROR com.ververica.flink.table.gateway.operation.SelectOperation - Session: 98099e3b3467395cffae698f9d3cbb51, job: 72e7d9d448deecae9d9d98efbaa5d263. Could not retrieve or create a cluster.
org.apache.flink.client.deployment.ClusterRetrieveException: Couldn't retrieve Yarn cluster
at org.apache.flink.yarn.YarnClusterDescriptor.retrieve(YarnClusterDescriptor.java:366)
at org.apache.flink.yarn.YarnClusterDescriptor.retrieve(YarnClusterDescriptor.java:123)
at com.ververica.flink.table.gateway.operation.SelectOperation.cancelQueryInternal(SelectOperation.java:299)
at com.ververica.flink.table.gateway.operation.SelectOperation.cancelJob(SelectOperation.java:132)
at com.ververica.flink.table.gateway.Session.cancelJob(Session.java:101)
at com.ververica.flink.table.gateway.rest.handler.JobCancelHandler.handleRequest(JobCancelHandler.java:68)
at com.ververica.flink.table.gateway.rest.handler.AbstractRestHandler.respondToRequest(Ab

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.