Giter Club home page Giter Club logo

flink-clickhouse-sink's Introduction

Flink clickhouse sink

  • simple and efficient, at least once guarantee
  • flink 1.8 is currently supported, and future versions are available for reference
  • instead of using JDBC, use clickHouse's HTTP interface directly

why I create this tool

At the beginning, I used this tool (https://github.com/ivi-ru/flink-clickhouse-sink), which linked to the official website, but I found that it would cause data loss, and the flink slot could not be released normally when the clickHouse server showed abnormal response, and the latest version also showed 'Out of memory', so I rewrote this tool for people who want a simple clickhouse sink.

it has been well tested by [email protected], have fun !

Sponsorship

hongshen chenglong

Thank you for your sponsorship and support

Build

mvn clean package

Usage

import java.text.SimpleDateFormat
import java.util.{Date, Properties}

import com.alibaba.fastjson.JSON
import tech.hongshen.clickhouse.ClickhouseSink
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

/**
 * @author hongshen
 * @since 2020/12/24
 */
object SaveToClickhouseJob {

  def main(args: Array[String]): Unit = {
    val parameterTool = ParameterTool.fromArgs(args)
    val topic = parameterTool.get("kafka.topic.name", "hongshen")
    val env = StreamExecutionEnvironment.createLocalEnvironment()

    val ckSinkerProps = new Properties
    ckSinkerProps.put(ClickhouseConstants.TARGET_TABLE_NAME, "db.table")
    ckSinkerProps.put(ClickhouseConstants.BATCH_SIZE, "20000")

    ckSinkerProps.put(ClickhouseConstants.INSTANCES, "localhost:8123")
    ckSinkerProps.put(ClickhouseConstants.USERNAME, "default")
    ckSinkerProps.put(ClickhouseConstants.PASSWORD, "")
    ckSinkerProps.put(ClickhouseConstants.FLUSH_INTERVAL, "2")

    val kafkaProps = new Properties()
    kafkaProps.setProperty("bootstrap.servers", "localhost:9092")
    kafkaProps.setProperty("group.id", "hongshen")

    val myConsumer = new FlinkKafkaConsumer011[String](topic, new SimpleStringSchema(), kafkaProps)

    myConsumer.setStartFromEarliest()

    val sdf = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss")

    val records = env.addSource(myConsumer).map(s => {
      val data = JSON.parseObject(s, classOf[Data])
      s"('${data.name}','${data.city}','${sdf.format(new Date(data.dateT))}','${data.ts}','${data.num}')"
    })

    records.addSink(new ClickhouseSink(ckSinkerProps)).setParallelism(2)

    env.execute("kafka2clickhouse")
  }
}

Notice

The data format uses CSV format include '()' token on both side, and an INSERT statement is generated as follows

String.format("INSERT INTO %s VALUES %s", tableName, csv)

so you need convert your datastream event to that fomat, see the example above.

Contributors

flink-clickhouse-sink's People

Contributors

dongbin86 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

flink-clickhouse-sink's Issues

ClosedChannelException

Hello, my Flink task often appears, caused by: com.xxx.clickhouse .ClickhouseException: java.nio.channels . closedchanneleexception. What causes this?

Selective field insert

If you want to select the field insert, you can't use it. For example, my three fields, name age sex, I just want to insert name sex. How can I do this?I'm looking forward to your reply,Thank you.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.