Giter Club home page Giter Club logo

starrocks-connector-for-apache-flink's Introduction

Download | Docs | Benchmarks | Demo

JAVA&C++ Commit Activities Open Issues Website Slack Twitter

StarRocks, a Linux Foundation project, is the next-generation data platform designed to make data-intensive real-time analytics fast and easy. It delivers query speeds 5 to 10 times faster than other popular solutions. StarRocks can perform real-time analytics well while updating historical records. It can also enhance real-time analytics with historical data from data lakes easily. With StarRocks, you can get rid of the de-normalized tables and get the best performance and flexibility.

Learn more 👉🏻 What Is StarRocks: Features and Use Cases



Features

  • 🚀 Native vectorized SQL engine: StarRocks adopts vectorization technology to make full use of the parallel computing power of CPU, achieving sub-second query returns in multi-dimensional analyses, which is 5 to 10 times faster than previous systems.
  • 📊 Standard SQL: StarRocks supports ANSI SQL syntax (fully supported TPC-H and TPC-DS). It is also compatible with the MySQL protocol. Various clients and BI software can be used to access StarRocks.
  • 💡 Smart query optimization: StarRocks can optimize complex queries through CBO (Cost Based Optimizer). With a better execution plan, the data analysis efficiency will be greatly improved.
  • ⚡ Real-time update: The updated model of StarRocks can perform upsert/delete operations according to the primary key, and achieve efficient query while concurrent updates.
  • 🪟 Intelligent materialized view: The materialized view of StarRocks can be automatically updated during the data import and automatically selected when the query is executed.
  • ✨ Querying data in data lakes directly: StarRocks allows direct access to data from Apache Hive™, Apache Iceberg™, Delta Lake™ and Apache Hudi™ without importing.
  • 🎛️ Resource management: This feature allows StarRocks to limit resource consumption for queries and implement isolation and efficient use of resources among tenants in the same cluster.
  • 💠 Easy to maintain: Simple architecture makes StarRocks easy to deploy, maintain and scale out. StarRocks tunes its query plan agilely, balances the resources when the cluster is scaled in or out, and recovers the data replica under node failure automatically.

Architecture Overview

StarRocks’s streamlined architecture is mainly composed of two modules: Frontend (FE) and Backend (BE). The entire system eliminates single points of failure through seamless and horizontal scaling of FE and BE, as well as replication of metadata and data.

Starting from version 3.0, StarRocks supports a new shared-data architecture, which can provide better scalability and lower costs.


Resources

📚 Read the docs

Section Description
Quick Starts How-tos and Tutorials.
Deploy Learn how to run and configure StarRocks.
Docs Full documentation.
Blogs StarRocks deep dive and user stories.

❓ Get support


Contributing to StarRocks

We welcome all kinds of contributions from the community, individuals and partners. We owe our success to your active involvement.

  1. See Contributing.md to get started.
  2. Set up StarRocks development environment:
  1. Understand our GitHub workflow for opening a pull request; use this PR Template when submitting a pull request.
  2. Pick a good first issue and start contributing.

📝 License: StarRocks is licensed under Apache License 2.0.

👥 Community Membership: Learn more about different contributor roles in StarRocks community.


Used By

This project is used by the following companies. Learn more about their use cases:

starrocks-connector-for-apache-flink's People

Contributors

baisui1981 avatar banmoy avatar bigdata-kuxingseng avatar chaplinthink avatar chenhaifengkeda avatar danroscigno avatar dixingxing0 avatar emsnap avatar ggke avatar hehuiyuan avatar hellolilyliuyi avatar hffariel avatar imay avatar jameswangchen avatar jin-h avatar lyang77 avatar qingdongzeng3 avatar shouweikun avatar szza avatar waittttting avatar xlfjcg avatar yuchengxin avatar zaorangyang avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

starrocks-connector-for-apache-flink's Issues

Flink Connector1.2 查询 Srarrocks 出现乱码

1.脚本如下:
image
2.在sql -client启动yarn session
./yarn-session.sh -jm 1024m -tm 4096m
./sql-client.sh embedded -i etl_starrocks2kafka_assassin_employee_post.sql
3.查询
image
4.结果如下:
image

Reason: there are 280 rows couldn't find a partition

  The flink job failed that consume data from kafka into starrocks, since data can't found partition, sometimes, we can't make sure that all of data both find own partition, so I hope the flink job keep running normally, even though it occur the exception like
Caused by: com.starrocks.connector.flink.manager.StarRocksStreamLoadFailedException: Failed to flush data to StarRocks, Error response:

{"Status":"Fail","BeginTxnTimeMs":1,"Message":"too many filtered rows","NumberUnselectedRows":0,"CommitAndPublishTimeMs":0,"Label":"94952a3a-32ce-44bd-a9ef-97392411fa45","LoadBytes":2867274,"StreamLoadPutTimeMs":4,"NumberTotalRows":1725,"WriteDataTimeMs":2879,"TxnId":360,"LoadTimeMs":2885,"ErrorURL":"http://192.168.xx.xxx:8040/api/_load_error_log?file=__shard_34/error_log_insert_stmt_3d41ccd3-cc28-acf2-faa0-71e2679d8fb6_3d41ccd3cc28acf2_faa071e2679d8fb6","ReadDataTimeMs":15,"NumberLoadedRows":1445,"NumberFilteredRows":280}
{"streamLoadErrorLog":"Reason: there are 280 rows couldn't find a partition. src line: [[2021-12-22 15:37:00, '17de1122ff5307-04554ab9f1db16-5373e62-1395396-17de1122ff6950', 'c0cbf7ba22784439b38199145b791998', '1.0.0', 'xxxxx', 0]]; \n"}

Q: What should I do except keeping data find own partition?

flink cdc + starrocks connector 导入数据,不支持starrocks主键字段更新

场景:
1、mysql 表的日期字段非主键
2、starrocks 表日期字段放在主键列中
通过 flink cdc mysql + starrocks connector 写 flink sql 进行数据导入

更新了mysql表的日期字段,在 starrocks 表现为
1、以旧数据组成的主键数据行未被删除
2、以新数据组成的主键数据行新插入了starrocks
也就是 同时有了两天记录分别为新旧记录

flink cdc 源码是会把 UPDATE 的记录数据拆分成 UPDATE_BEFORE 和 UPDATE_AFTER 两条独立记录
而 starrocks connector 源码

public enum StarRocksSinkOP {
    UPSERT, DELETE;

    public static final String COLUMN_KEY = "__op";

    static StarRocksSinkOP parse(RowKind kind) {
        if (RowKind.INSERT.equals(kind) || RowKind.UPDATE_AFTER.equals(kind)) {
// 转换成 
            return UPSERT;
        }
        if (RowKind.DELETE.equals(kind) || RowKind.UPDATE_BEFORE.equals(kind)) {
            return DELETE;
        }
        throw new RuntimeException("Unsupported row kind.");
    }
}

理论上应该会得到同步 mysql 做数据的插入、更新和删除操作

Get the table name from Kafka and sink the data into the corresponding table. Can this method be realized by modifying the source code? What needs to be done in the invoke method? I tried to initialize sinkoptions and sinkmanager in the invoke method, but it didn't work.

Get the table name from Kafka and sink the data into the corresponding table. Can this method be realized by modifying the source code? What needs to be done in the invoke method? I tried to initialize sinkoptions and sinkmanager in the invoke method, but it didn't work.

Examples:
kafka datastream<Tuple2<String, String>> : ("{"score": "99", "name": "stephen"}", "tmp_sr_test_api_2")

StarRocksSink : StarRocksSinkOptions.builder()
.withProperty("jdbc-url", ConfigCommon.FLINK_STARROCKS_JDBC_URL)
.withProperty("load-url", ConfigCommon.FLINK_STARROCKS_LOAD_URL)
.withProperty("username", ConfigCommon.STARROCKS_USER)
.withProperty("password", ConfigCommon.STARROCKS_PASSWORD)
.withProperty("table-name", "tmp_sr_test_api_1")
.withProperty("database-name", ConfigCommon.DATABASE_NAME)
.withProperty("sink.properties.format", "json")
.withProperty("sink.properties.strip_outer_array", "true")
.build()

Finally, I want to add the data to the “tmp_sr_test_api_2” table。

the data format about time type

I have build a realtime synchronize channel which is from MySql table to StarRocks:
mysql table DDL:

CREATE TABLE `base` (
  `base_id` int(11) NOT NULL,
  `start_time` datetime DEFAULT NULL,
  `update_date` date DEFAULT NULL,
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`base_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

and StarRocks table DDL

CREATE TABLE `base` (
  `base_id` int(11) NULL COMMENT "",
  `start_time` datetime NULL COMMENT "",
  `update_date` date NULL COMMENT "",
  `update_time` datetime NULL COMMENT ""
) ENGINE=OLAP
UNIQUE KEY(`base_id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`base_id`) BUCKETS 10
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"storage_format" = "DEFAULT"

then I make some modfiies on existing rows,

insert into base(base_id,start_time,update_date,update_time) values(2,now(),now(),now());

and received row send by Flink-CDC-MySQL-connector, example as below :

start_time:1639831884000,update_time:2021-12-18T04:51:24Z,base_id:3,update_date:18979

as a result the values of start_time and update_date is NULL:

select * from base;

+---------+------------+-------------+---------------------+
| base_id | start_time | update_date | update_time |
+---------+------------+-------------+---------------------+
| 1 | NULL | NULL | 2021-12-17 09:21:20 |
+---------+------------+-------------+---------------------+

**My question is ** the data format like date or datetime must be a format '2021-12-17' or '2021-12-17 09:21:20' ?
date can not be a format as integer from 1970-01-01?

can we change the scope of dependency relevant to the flink to provided?

as you see , the scope of dependency relevant to the flink is compile

This will cause unnecessary trouble,when i introduce the dependency of flink-connector-starrocks to my pom.xml,
I must exclude the flink dependencis in advance .

so , can we change the scope of dependency relevant to the flink to provided?

Why version 1.2.1 shade many jars such as jackson?

I update connector version from 1.1.16 to 1.2.1, I found such errors in my code,
在相应的 try 语句主体中不能抛出异常错误com.fasterxml.jackson.core.JsonProcessingException,
未报告的异常错误com.starrocks.shade.com.fasterxml.jackson.core.JsonProcessingException; 必须对其进行捕获或声明以便抛出,
because the pom.xml shade many jars, why do this?

StarRocksStreamLoadFailedException: all partitions have no load data

Precondition:
1、Enable checkpoint.
2、sink.semantic = EXACTLY_ONCE

Exception:
Caused by: com.starrocks.connector.flink.manager.StarRocksStreamLoadFailedException: Failed to flush data to StarRocks, Error response: {"Status":"Fail","BeginTxnTimeMs":0,"Message":"all partitions have no load data","NumberUnselectedRows":1,"CommitAndPublishTimeMs":0,"Label":"2da8b57b-bdad-4f73-b79c-b8c4664b3ad6","LoadBytes":59,"StreamLoadPutTimeMs":1,"NumberTotalRows":1,"WriteDataTimeMs":2,"TxnId":248101,"LoadTimeMs":4,"ReadDataTimeMs":0,"NumberLoadedRows":0,"NumberFilteredRows":0}

Reason:
When checkpoint triggered, will execute the StarRocksDynamicSinkFunction#snapshotState method, Then add a Tupple2 to the checkpointedState object, And wait for the next checkpoint to write to the StarRocks. If the buffer is empty, a StarRocksStreamLoadFailedException will be thrown.

Suggest:
So i think the buffer should be checked if it is empty before add to checkpointedState.

是否考虑flink sql 模式下无Schema写入StarRocks

场景

上游数据的特征为schema变化频繁,为了避免频繁的重启任务,需要借助于StarRocks 根据表Schema解析Json的能力。即flink 任务的输出为Json体,StarRocks 接收到Json后,根据表当前的Schema 去解析为表数据。只要保证StarRocks 表与上游数据的Schema 保持同步,就可以实现在不影响flink 任务的情况下,下游及时获取到新增Schema的数据

现状

目前低阶API 能实现这个功能,但是无法在sql模式下调用
image
#建议
在sql 模式下通过配置的方式,让用户选择使用Schema模式和无Schema模式,比如:
`create table sr_sink (
biz_type int,
vehicle_id string,
tags array
)with(
'connector' = 'starrocks',
'jdbc-url'='',
'load-url'='
',
'database-name'='test',
'table-name'='test_with_schema',
'username'='****',
'password'='*****',
'sink.with_no_schema' = 'false'
'sink.parallelism' = '2'
);

create table sr_sink (
jsonString string
)with(
'connector' = 'starrocks',
'jdbc-url'='',
'load-url'='
',
'database-name'='test',
'table-name'='test_with_no_schema',
'username'='****',
'password'='*****',
'sink.with_no_schema' = 'true'
'sink.parallelism' = '2'
); `

ClassNotFoundException: org.apache.flink.calcite.shaded.com.google.common.base.Preconditions

Caused by: java.lang.ClassNotFoundException: org.apache.flink.calcite.shaded.com.google.common.base.Preconditions
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 7 more

限制streamload json格式数据时一批最大为100m

starrocks服务端限制了一次导入JSON格式数据最大为100M,且此参数无法修改,当json数据超过100M时会导致导入失败,希望connector端可以限制EXACTLY_ONCE模式下导入一批JSON数据的大小最大为100M

flink 任务重启导致精确一次写入异常

现象:
flink 由于其他问题导致重启,比如 checkpoint 持久化失败,然后发现连接器写入 SR 数据有重复和丢失同时出现。
如下图所示:

image

image

连接器代码逻辑问题:
精确一次的情况下,数据的真正写出,并不是在本次的 checkpoint 中完成的。理论上来讲,应该是触发本次 checkpoint 时,就应该将本次 checkpoint 的所有数据进行写出,直至写出成功,才应该认为是本次 checkpoint 成功,做到真正意义上的精确一次。

Flink job blocked by snapshotState

Environment:

flink : 1.12
flink-connector-starrocks : 1.1.4

Description:

Our user reported an issue about flink job blocked:

  1. Can not consume any data from kafka
  2. Back pressures are high
  3. Checkpoint keep failing

After dig we found an substask blocked by StarRocksDynamicSinkFunction#snapshotState :
image

Check async flush thread instances :
image

So it seems async flush thread was gone (176 StarRocksSinkManager and only 175 async flush thread), and the put operation blocked since the blocking queue capacity is 1:flushQueue.put(new Tuple3<>("", 0l, null));

Check JVM and GC: Near async flush thread disapear time, the TM Old Gen GC occurred, i think this may cause the async flush thread disapeared:
image

I think we can do two things to improve this:

  1. Add configuration sink.buffer-offer.timeout-ms, and use flushQueue.offer(tuple3, timeout, unit) instead of flushQueue.put(tuple3) , if offer failed then thow an Exception, and flink will recover the job automatically.
  2. Improve exception handle for async flush thread.

I will try to submit a PR.

scala多版本

  1. 目前pom文件中指定scala版本为2.12,希望能够对不同的flink和scala版本进行多次打包,比如1.1.13_flink-1.13.0_2.11、1.1.13_flink-1.13.0_2.12、1.1.13_flink-1.13.1_2.11、1.1.13_flink-1.13.1_2.12等,不但区分flink版本,而且区分scala版本,然后上传至**仓库。类似于flink自己提供的连接器jar包。
  2. 打包时,jar包值打包自己的源码,不对第三方jar包进行打包,让用户更容易解决依赖版本冲突问题。下面是我自己使用时对pom文件进行的一些修改,指定了scala版本为2.12,并且只打包源码文件。

<artifactId>flink-connector-starrocks_2.12</artifactId> <version>1.1.13_flink-1.13.3</version> <properties> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <file_encoding>UTF-8</file_encoding> <maven-enforcer-plugin.version>3.0.0-M3</maven-enforcer-plugin.version> <maven-surefire-plugin.version>3.0.0-M4</maven-surefire-plugin.version> <flink.version>1.13.3</flink.version> <scala.version>2.12</scala.version> </properties> <build> <plugins> <!-- 测试代码运行插件,可以在打包之前跳过test包下符合命名规范的所有类的代码 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.22.2</version> <configuration> <skipTests>true</skipTests> </configuration> </plugin> <!-- 打包插件,通过配置不同属性来控制打包形式 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-dependency-plugin</artifactId> <version>3.1.1</version> <executions> <execution> <id>copy</id> <phase>package</phase> <goals> <goal>copy-dependencies</goal> </goals> <configuration> <outputDirectory>${project.build.directory}/lib</outputDirectory> </configuration> </execution> </executions> </plugin> <!-- java编译插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <executions> <execution> <phase>compile</phase> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> <!-- scala编译插件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> <args> <arg>-target:jvm-1.8</arg> </args> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-source-plugin</artifactId> <version>2.2.1</version> <executions> <execution> <id>attach-sources</id> <goals> <goal>jar-no-fork</goal> </goals> </execution> </executions> </plugin> </plugins> <resources> <!--打包资源文件--> <resource> <directory>src/main/resources</directory> <includes> <include>**</include> </includes> <filtering>false</filtering> </resource> </resources> </build>

Can you provide a version of Flink 1.14

The Flink cluster I have installed is 1.14.x,If flink-connector-starrocks is 1.1.14_flink-1.13_2.12,The task cannot be executed because the path or class of the package has changed,thank you

stream load 默认timeout覆盖系统的stream load超时配置

image
image

connector的请求会给一个默认60S的timeout配置,而FE在处理时会优先使用request的timeout而不是系统配置的stream load超时选项,导致sink超过1分钟没完成就会停止请求并失败,这个地方的逻辑和FE的冲突了。

连接器找不到某个节点be的上下文(而且是随机的)

flink连接sr导出数据,程序能够正常运行,但是会突然在某一刻挂掉,提示是某个Failed to get next from be -> ip:[10.151.217.3] NOT_FOUND msg:[context_id: be9a2c1f-2119-4331-ad6b-23dd0698f555 not found],而且这个ip是随机的,每次运行都不一样,数据能够正常导出
image

The format is JSON, and the dynamic column update is specified. Why not add columns in the code by default like CSV?

Version:
flink-connector-starrocks:1.2.3_flink-1.15
flink:1.15.0

environment:
idea

doubt:
When dynamic column update is specified, CSV does not need to set sink.properties Columns is set according to fieldnames in the Flink connector starlocks code when the Flink SQL table is built. However, JSON does not. It is because JSON has any different operations from CSV or it is forgotten to add them in the Flink connector starlocks code of JSON

Relevant source code:

//CLASS:StarRocksStreamLoadVisitor.java 
//METHOD:doHttpPut
        try (CloseableHttpClient httpclient = httpClientBuilder.build()) {
            HttpPut httpPut = new HttpPut(loadUrl);
            Map<String, String> props = sinkOptions.getSinkStreamLoadProperties();
            for (Map.Entry<String,String> entry : props.entrySet()) {
                httpPut.setHeader(entry.getKey(), entry.getValue());
            }
            if (!props.containsKey("columns") && ((sinkOptions.supportUpsertDelete() && !__opAutoProjectionInJson) || StarRocksSinkOptions.StreamLoadFormat.CSV.equals(sinkOptions.getStreamLoadFormat()))) {
                String cols = String.join(",", Arrays.asList(fieldNames).stream().map(f -> String.format("`%s`", f.trim().replace("`", ""))).collect(Collectors.toList()));
                if (cols.length() > 0 && sinkOptions.supportUpsertDelete()) {
                    cols += String.format(",%s", StarRocksSinkOP.COLUMN_KEY); //COLUMN_KEY="__op"
                }
                httpPut.setHeader("columns", cols);
            }

last:

  1. If the logic about JSON is omitted from the code, I can submit PR to add relevant logic.
  2. If it is designed in this way, please explain the reason for the design.

关于flink版本的支持

1、flink-1.11.0在v.1.2.1版本中不兼容,后续是否考虑兼容flink-1.11.0版本(需要有source的,目前flink-1.11分支没有source)
2、flink-1.15版本(即将发布)后期是否有考虑支持?

Current running txns on db 11027 is 100, larger than limit 100?

Caused by: com.starrocks.connector.flink.manager.StarRocksStreamLoadFailedException: Failed to flush data to StarRocks, Error response:
{"Status":"Fail","BeginTxnTimeMs":0,"Message":"current running txns on db 11027 is 100, larger than limit 100","NumberUnselectedRows":0,"CommitAndPublishTimeMs":0,"Label":"a848b543-0040-4f29-839d-fb9dbb7bd61c","LoadBytes":0,"StreamLoadPutTimeMs":0,"NumberTotalRows":0,"WriteDataTimeMs":0,"TxnId":-1,"LoadTimeMs":0,"ReadDataTimeMs":0,"NumberLoadedRows":0,"NumberFilteredRows":0}

Implementation of exactly-once semantic in the sink function is totally wrong!

If we refer to the official doc about exactly-once semantic of starRocks sink connector, we can see descriptions below:

The overall process is as follows:

  1. Save data and its label at each checkpoint that is completed at a specific checkpoint interval.
  2. After data and labels are saved, block the flushing of data cached in the state at the first invoke after each checkpoint is completed.

Unfortunately, the current implementation of such semantic in starRocks connector is totally wrong. It uses Flink's operator state to hold the buffered entities on sync-phase snapshot, and it will flush to starRocks on the next #invoke method. However, this cannot handle the case that the overall checkpoint failed. If the checkpoint-42 needs all 3 sub-tasks completed, subtask-0 succeed with subtask-1 failed will lead to the checkpoint-42 failed in the end. However, data will still be sent to starRocks in subtask-0 in the #invoke call after the failed checkpoint-42. Thus, once the job failed and restore from the last successful checkpoint-41, some data will be stored twice from subtask-0.

We should introduce CheckpointListener with notifyCheckpointComplete to implement such semantics. Even though, we should better leverage two-phase commit semantics to achieve exactly-once for better performance.

package failed

package failed, what could be the problem?
[INFO] ------------------------------------------------------------------------ [ERROR] Failed to execute goal org.apache.maven.plugins:maven-gpg-plugin:1.5:sign (sign-artifacts) on project flink-connector-starrocks: Exit code: 127 -> [Help 1] org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute goal org.apache.maven.plugins:maven-gpg-plugin:1.5:sign (sign-artifacts) on project flink-connector-starrocks: Exit code: 127 at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:216) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145) at org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116) at org.apache.maven.lifecycle.internal.builder.multithreaded.MultiThreadedBuilder$1.call(MultiThreadedBuilder.java:188) at org.apache.maven.lifecycle.internal.builder.multithreaded.MultiThreadedBuilder$1.call(MultiThreadedBuilder.java:184) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.maven.plugin.MojoExecutionException: Exit code: 127 at org.apache.maven.plugin.gpg.GpgSigner.generateSignatureForFile(GpgSigner.java:168) at org.apache.maven.plugin.gpg.AbstractGpgSigner.generateSignatureForArtifact(AbstractGpgSigner.java:205) at org.apache.maven.plugin.gpg.GpgSignAttachedMojo.execute(GpgSignAttachedMojo.java:140) at org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:132) at org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208) ... 11 more [ERROR] [ERROR] [ERROR] For more information about the errors and possible solutions, please read the following articles: [ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/MojoExecutionException

读取StarRocks 表时 谓词下推导致sql解析错误

sql体:

insert into sinkTable
select item_key,
       vehicle_id,
      item_type,item_value,modify_time
from HTWSource 
where CHAR_LENGTH(vehicle_id) < 10 ; 

任务启动时报错为

Caused by: java.lang.RuntimeException: Request of get query plan failed with code 500 {"exception":"The Sql is invalid","status":500}
    at com.starrocks.connector.flink.manager.StarRocksQueryPlanVisitor.getQueryPlan(StarRocksQueryPlanVisitor.java:134) ~[?:?]
    at com.starrocks.connector.flink.manager.StarRocksQueryPlanVisitor.getQueryInfo(StarRocksQueryPlanVisitor.java:64) ~[?:?]
    at com.starrocks.connector.flink.table.source.StarRocksSourceCommonFunc.getQueryInfo(StarRocksSourceCommonFunc.java:207) ~[?:?]
    at com.starrocks.connector.flink.table.source.StarRocksDynamicSourceFunction.<init>(StarRocksDynamicSourceFunction.java:83) ~[?:?]

观察日志发现sql 解析为了

21:06:09,066 INFO  com.starrocks.connector.flink.manager.StarRocksQueryPlanVisitor [] - query sql [select * from `bh_vehicle_iot_stream`.`htw_profile_item_info_sr` where (10)]

Consider to reset `flushException` to null in `checkFlushException`

We should consider to reset flushException to null in checkFlushException, as we came across a scene like this:

  1. Our user developed an UDTF, and he put collect(Row.of(key, value)) in exception catch block, then he just log and swallow the exception.
  2. The collect(row) triggered writeRecord and flush operation (which will offer the batch into the flushQueue).
  3. The asyncFlush operation cause an exception (In our case the reason is: "the length of input is too long than schema."), and flush thread stored the exception in flushException field.
  4. The following collect(row) operation in UDTF triggered writeRecord and checkFlushException, since the exception was swallowed by user code, and the flushException never reset to null, so the collect(row) always met the same exception.
  5. User found the error log, and changed the column length to an bigger value(which will fix the issue), but flink kept printing the origin exception message anyway.

In this case, the flink job can not discover the flushException, and the job will keep running even though it can not write any data into starrocks.

There is a Error "parse auth info failed" in FE when using the "SinkFunction<String> sink(StarRocksSinkOptions sinkOptions)"

There is a Error "parse auth info failed" in FE when using the "SinkFunction sink(StarRocksSinkOptions sinkOptions)"

[BaseAction.getAuthorizationInfo():315] parse auth info failed, Authorization header null, url /api/xxxxxxxx fail to process url: /api/xxxxxxxxxxxxx
com.starrocks.http.UnauthorizedException: Need auth information

But there is an Authorization in the header of PUT HTTP when I debug the process of Flink.

And I implement the code of a sample sink by using HttpClients with the header "Authorization", the stream load successes.

The version of "flink-connector-starrocks" is "1.1.10_flink-1.11"

StreamLoad报错:Failed to parse json as array. error: A string is opened, but never closed.

StarRocks用的是2.2,在StreamLoad json文件时,
Json数组里面如果条数少了返回成功,
如果json数组种的条数太多,返回这样的结果
{
"TxnId": 342,
"Label": "3576b838-6185-4dbc-9ada-b36729d80ee1",
"Status": "Fail",
"Message": "Failed to parse json as array. error: A string is opened, but never closed.",
"NumberTotalRows": 0,
"NumberLoadedRows": 0,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 60632,
"LoadTimeMs": 17,
"BeginTxnTimeMs": 0,
"StreamLoadPutTimeMs": 1,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 13,
"CommitAndPublishTimeMs": 0
}
这是为什么呢?是我的StarRocks的配置有问题吗?

ps:
导入命令:
curl -v --location-trusted -u 'name:password'
-H "format: json" -H "jsonpaths: ["$.score", "$.grade", "$.name", "$.id", "$.age"]"
-H "strip_outer_array: true"
-T b.json
http://.../api/test/student/_stream_load

b.json的部分数据
[{"score":"124","grade":1,"name":"张三","id":109,"age":"18"},{"score":"941","grade":1,"name":"张三","id":926,"age":"18"},{"score":"942","grade":1,"name":"张三","id":927,"age":"18"},{"score":"943","grade":1,"name":"张三","id":928,"age":"18"},{"score":"944","grade":1,"name":"张三","id":929,"age":"18"},{"score":"945","grade":1,"name":"张三","id":930,"age":"18"},{"score":"946","grade":1,"name":"张三","id":931,"age":"18"},{"score":"947","grade":1,"name":"张三","id":932,"age":"18"},{"score":"948","grade":1,"name":"张三","id":933,"age":"18"},{"score":"949","grade":1,"name":"张三","id":934,"age":"18"},{"score":"950","grade":1,"name":"张三","id":935,"age":"18"},{"score":"951","grade":1,"name":"张三","id":936,"age":"18"},{"score":"952","grade":1,"name":"张三","id":937,"age":"18"},{"score":"953","grade":1,"name":"张三","id":938,"age":"18"},{"score":"954","grade":1,"name":"张三","id":939,"age":"18"},{"score":"968","grade":1,"name":"张三","id":953,"age":"18"},{"score":"981","grade":1,"name":"张三","id":966,"age":"18"},{"score":"982","grade":1,"name":"张三","id":967,"age":"18"},{"score":"983","grade":1,"name":"张三","id":968,"age":"18"},{"score":"984","grade":1,"name":"张三","id":969,"age":"18"},{"score":"985","grade":1,"name":"张三","id":970,"age":"18"},{"score":"986","grade":1,"name":"张三","id":971,"age":"18"},{"score":"987","grade":1,"name":"张三","id":972,"age":"18"},{"score":"988","grade":1,"name":"张三","id":973,"age":"18"},{"score":"989","grade":1,"name":"张三","id":974,"age":"18"},{"score":"990","grade":1,"name":"张三","id":975,"age":"18"},{"score":"991","grade":1,"name":"张三","id":976,"age":"18"},{"score":"1009","grade":1,"name":"张三","id":994,"age":"18"},{"score":"1010","grade":1,"name":"张三","id":995,"age":"18"},{"score":"1011","grade":1,"name":"张三","id":996,"age":"18"},{"score":"1012","grade":1,"name":"张三","id":997,"age":"18"},{"score":"1013","grade":1,"name":"张三","id":998,"age":"18"},{"score":"1014","grade":1,"name":"张三","id":999,"age":"18"}]

user flink-connector-starrocks Unable to insert starrocks with date filed

When I use flink-connector-starrocks insert record to starrocks,When there is no date field, it can be inserted,
When I add a date field,The following error occurred:
com.starrocks.connector.flink.table.StarRocksDynamicSinkFunction [] - Unsupported row data invoked: [%s]
this is my code:

import java.util.Date;
public class RequestData {

/**
 * 当前日期(用于starRocks的分区、分桶的处理)
 */
private Date curDate;

}

SinkFunction starRocksSink = StarRocksSink.sink(TableSchema.builder()
.field("cur_date", DataTypes.DATE())
.field("id", DataTypes.STRING())
.build(),
StarRocksSinkOptions.builder()
.withProperty("jdbc-url", "jdbc:mysql://XXX:9030,XXX:9030,XXX:9030")
.withProperty("load-url", "XXX:8030;XXX:8030;XXX:8030")
.withProperty("username", "root")
.withProperty("password", "root")
.withProperty("database-name", "test")
.withProperty("table-name", "logData_url")
.withProperty("sink.properties.format", "json")
.withProperty("sink.properties.strip_outer_array", "true")
.build(),
(slots, requestLog) -> {
slots[0] = requestLog.getCurDate();
slots[1] = UUID.randomUUID().toString();
}
);

my Table structure:

CREATE TABLE test.logData_url
(
cur_date DATE,
id STRING
)
ENGINE=olap
COMMENT "log data"
DISTRIBUTED BY HASH(id) BUCKETS 32
PROPERTIES ("storage_type"="column");

I print out the inserted parameters:

{"id":"8bb09e5e110845f39sf00df668ef3e80","curDate":1643126400000}

I guess there is a problem with the format of curDate

How should I deal with this problem?thank you

Improve StarRocks sink metrics

I think we can add some metrics for StarRocks sink, such as 'totalFlushCount', 'totalFlushErrorCount', 'WriteDataTimeMs', etc.
I'll try to submit a PR.

Thread does not shut down correctly after flink failure recovery

Environment:
flink : 1.12
flink-connector-starrocks : 1.1.4
TM slot number : 8
flink job recovered 10 times.

Description:

  1. Since slot number is 8, so each TM should has 8 async flush threads, but after 10 times recovery, the thread number increase to 176, so we should add 'shutdown' operation for the thread.
    image

  2. starrocks-interval-sink-thread does not shutdown correctly , I will do more dig and try to fix it.
    image

项目编译报错

Are there compilation and packaging steps? Can you send one? These cannot be found
import com.starrocks.shade.org.apache.thrift.TException;
import com.starrocks.shade.org.apache.thrift.protocol.TBinaryProtocol;
import com.starrocks.shade.org.apache.thrift.protocol.TProtocol;
import com.starrocks.shade.org.apache.thrift.transport.TSocket;
import com.starrocks.shade.org.apache.thrift.transport.TTransportException;

flink sql table sink 写入SR表无效

  1. 版本
    1. flink:1.13.3
    2. kafka:2.x
    3. SR:2.0.1
    4. 连接器:1.2.1-flink_1.13_2.12
    5. 采用精确一次语义写入,checkpoint为30秒触发一次。
  2. 异常描述
    1. 精确一次
      1. 上游接收kafka数据,然后原生写入SR表,使用精确一次语义写入。flink sql任务模式为streaming。flink sql任务运行期间,无任何报错,所有checkpoint均成功,但是SR表并没有被写入数据。查看flink的TM角色日志,也没有任何报错。
      2. 如果停止flink任务写入,过2分钟之后,对应的表中即可查询到最新数据。
      3. 在任务运行期间,SR没有接收到任何stream load请求,任务停止之后,瞬间接收到非常多的stream load请求。
      4. 运行期间的所有数据都丢失了。
    2. 至少一次.
      1. 将精确一次语义切换为至少一次,配置相关flush参数之后,写入正常,但是Compaction Cumulate明显增加,其他指标监控正常。
  3. 其他
    1. 一共运行了16个相似的flink sql任务,均为上游接收kafka数据,下游写入SR表,有3个任务出现了这种情况,其中两个任务上游kafka数据量为35k/s,一个任务为1.5k/s。
    2. 一些奇怪日志
    Label [f95c2db9-6345-4b25-800c-f8a3e0755de7] has already been used.

use canal-json can not insert starrocks

tableEnv.executeSql("CREATE TABLE tb_order(\n" +
" id INT,\n" +
" order_num STRING,\n" +
" counter_code STRING,\n" +
" price INT,\n" +
" status INT \n" +
" ) WITH (\n" +
"'connector' = 'kafka',\n" +
"'topic' = 'tb_order',\n" +
"'scan.startup.mode' = 'latest-offset',\n" +
"'properties.bootstrap.servers' = 'hadoop001.test.com:9092,hadoop002.test.com:9092,hadoop003.test.com:9092 ', \n" +
"'format' = 'canal-json', \n" +
"'properties.group.id' = 'liqitest',\n" +
// "'json.fail-on-missing-field' = 'false', \n" +
"'canal-json.ignore-parse-errors' = 'true' \n "+
")");

    tableEnv.executeSql(
            "CREATE TABLE mytest03 (" +
                    "id INT," +
                    "order_num STRING," +
                    "counter_code STRING," +
                    "price INT ," +
                    "status INT " +
                    //" PRIMARY KEY (id) NOT ENFORCED"+
                    ") WITH ( " +
                    "'connector' = 'starrocks'," +
                    "'jdbc-url'='jdbc:mysql://fat-starrocks.xxx.com:9030?characterEncoding=utf-8&useSSL=false'," +
                    "'load-url'='fat-xx.ppdapi.com:8030'," +
                    "'database-name' = 'gouya_test'," +
                    "'table-name' = 'mytest03'," +
                    "'username' = 'xxxx'," +
                    "'password' = 'xxx'," +
                    "'sink.buffer-flush.max-rows' = '65000'," +
                    "'sink.buffer-flush.max-bytes' = '67108864'," +
                    "'sink.buffer-flush.interval-ms' = '1000'," +
                    "'sink.properties.column_separator' = '\\x01'," +
                    "'sink.properties.row_delimiter' = '\\x02'," +
                    "'sink.max-retries' = '3'" +
                    ")"

    );

tableEnv.executeSql("INSERT INTO mytest03 " +
"SELECT id, order_num,counter_code, price,status FROM tb_order ").print();
}

can not insert into starrocks

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.