Giter Club home page Giter Club logo

elasticsearch-cdc's Introduction

elasticsearch-cdc 插件支持将elasticsearch 的cdc 数据异步同步到kafka

elasticsearch-cdc 插件设计

image 为了较大的吞吐量,采用异步的方式向kafka发送消息

安装

1. 编译打包
mvn clean package
2. 移除之前版本(如果之前安装过)
${elasticsearch_home}/bin/elasticsearch-plugin remove elasticsearch-cdc
3. 安装
${elasticsearch_home}/bin/elasticsearch-plugin install file:///${elasticsearch_cdc_home_dir}/target/releases/elasticsearch-cdc-1.0-SNAPSHOT.zip
4. 配置es的javax相关权限
/etc/elasticsearch/jvm.options 文件加入相关java配置
-Djava.security.policy=/your_elasticsearch_home/plugins/elasticsearch-cdc/plugin-security.policy
5. 重启 elasticsearch
service restart elasticsearch

注:对于es分布式集群,每一个节点都应该安装插件

使用

设置集群属性

PUT _cluster/settings
{
  "persistent": {
    "indices.cdc.request.timeout.ms": 30000,
    "indices.cdc.send.buffer.bytes": 131072,
    "indices.cdc.acks": "all",
    "indices.cdc.compression.type": "none",
    "indices.cdc.receive.buffer.bytes": 32768,
    "indices.cdc.batch.size": 16384,
    "indices.cdc.linger.ms": 1000,
    "indices.cdc.buffer.memory": 33554432,
    "indices.cdc.bootstrap.servers": "your_kafka_server1:9092,your_kafka_server2:9092,your_kafka_server3:9092",
    "indices.cdc.max.request.size": 1048576,
    "indices.cdc.max.in.flight.requests.per.connection": 10,
    "indices.cdc.retry.backoff.ms": 100,
    "indices.cdc.retries": 100,
    "indices.cdc.max.block.ms": 86400000
  }
}

创建索引时,enable cdc并初始化相关属性

PUT /index
{
  "settings": {
    "number_of_shards": 2,
    "number_of_replicas": 1,
    "index.cdc.enabled": true,
    "index.cdc.topic": "cdc_test",
    "index.cdc.pk.column": "doc_id",
    "index.refresh_interval": "100s"
  },
  "mappings": {
    "properties": {
      "content": {
        "type": "text"
      },
      "doc_id": {
        "type": "integer"
      },
      "age": {
        "type": "integer"
      },
      "address": {
        "type": "keyword"
      },
      "keywords": {
        "type": "nested",
        "properties": {
          "keyword": {
            "type": "keyword"
          },
          "frequency": {
            "type": "integer"
          }
        }
      }
    }
  }
}

索引级别属性说明

属性值 默认值 说明
index.cdc.enabled false 是否启用cdc 监听
index.cdc.topic "" 指定cdc数据写入的topic 名称
index.cdc.pk.column "" 主键列
index.cdc.exclude.columns "" 排除列,多列使用逗号分割
index.cdc.alias "" index的别名,当设置时,cdc 会使用该别名

集群级别配置属性

属性值 默认值 说明
indices.cdc.bootstrap.servers "" 同 producer bootstrap.servers
indices.cdc.batch.size 16384 同 producer batch.size
indices.cdc.acks all 同 producer acks
indices.cdc.buffer.memory 32 * 1024 * 1024 同 producer buffer.memory
indices.cdc.compression.type none 同 producer compression.type
indices.cdc.request.timeout.ms 30 * 1000 同 producer request.timeout.ms
indices.cdc.max.request.size 1024 * 1024 同 producer max.request.size
indices.cdc.retries 0 同 producer retries
indices.cdc.retry.backoff.ms 100 同 producer retry.backoff.ms
indices.cdc.send.buffer.bytes 131072 同 producer send.buffer.bytes
indices.cdc.receive.buffer.bytes 32768 同 producer receive.buffer.bytes
indices.cdc.linger.ms 0 同 producer linger.ms
indices.cdc.max.in.flight.requests.per.connection 5 同 producer max.in.flight.requests.per.connection
indices.cdc.max.block.ms 0 同 producer max.block.ms
indices.cdc.producer.nums es server 的core数 每一个节点kafka producer的线程数

注意

action 值:delete - 0,insert - 1,update - 2

测试

操作 请求数据 kafka key kafka value
insert with POST POST /index/_create/1
{"doc_id":1,"content":"peace and hope","age":12,"address":"bj","keywords":[{"keyword":"peace","frequency":10},{"keyword":"hope","frequency":5}]}
1 {"op":1,"index":"index","content":{"address":"bj","keywords":[{"keyword":"peace","frequency":10},{"keyword":"hope","frequency":5}],"doc_id":1,"content":"peace and hope","age":12},"ts":1629530725380}
insert with PUT PUT /index/_doc/2
{"doc_id": 2,"content":"peace and hope 2", "age": 12, "address": "bj", "keywords": [{"keyword": "peace", "frequency": 1}, {"keyword": "hope", "frequency": 1}]}
2 {"op":1,"index":"index","content":{"address":"bj","keywords":[{"keyword":"peace","frequency":1},{"keyword":"hope","frequency":1}],"doc_id":2,"content":"peace and hope 2","age":12},"ts":1629531058056}
bulk insert PUT /_bulk
{"index":{"_index":"index","_id":"3"}}
{"doc_id": 3,"content":"peace and hope 3", "age": 13, "address": "bj", "keywords": [{"keyword": "peace", "frequency": 1}, {"keyword": "hope", "frequency": 1}]}
{"index":{"_index":"index","_id":"4"}}
{"doc_id": 4,"content":"peace and hope 4", "age": 14, "address": "bj", "keywords": [{"keyword": "peace", "frequency": 1}, {"keyword": "hope", "frequency": 1}]}
{"index":{"_index":"index","_id":"5"}}
{"doc_id": 5,"content":"peace and hope 5", "age": 15, "address": "bj", "keywords": [{"keyword": "peace", "frequency": 1}, {"keyword": "hope", "frequency": 1}]}
三条记录key分别是5,4,3 与key对应的value分别是:
{"op":1,"index":"index","content":{"address":"bj","keywords":[{"keyword":"peace","frequency":1},{"keyword":"hope","frequency":1}],"doc_id":5,"content":"peace and hope 5","age":15},"ts":1629531206042}
{"op":1,"index":"index","content":{"address":"bj","keywords":[{"keyword":"peace","frequency":1},{"keyword":"hope","frequency":1}],"doc_id":4,"content":"peace and hope 4","age":14},"ts":1629531205887}
{"op":1,"index":"index","content":{"address":"bj","keywords":[{"keyword":"peace","frequency":1},{"keyword":"hope","frequency":1}],"doc_id":3,"content":"peace and hope 3","age":13},"ts":1629531205885}
simple update POST /index/_update/1
{"doc":{"doc_id":1,"address":"sjz"}}
1 {"op":2,"index":"index","content":{"address":"sjz","keywords":[{"keyword":"peace","frequency":10},{"keyword":"hope","frequency":5}],"doc_id":1,"content":"peace and hope","age":12},"ts":1629531453903}
update by query and just update non-nested field POST /index/_update_by_query
{"script":{"source":"ctx._source.age++","lang":"painless"},"query":{"term":{"doc_id":1}}}
1 {"op":2,"index":"index","content":{"address":"sjz","keywords":[{"keyword":"peace","frequency":10},{"keyword":"hope","frequency":5}],"doc_id":1,"content":"peace and hope","age":13},"ts":1629531523775}
update by query --update nested field POST /index/_update/1
{"script":{"source":"for(e in ctx._source.keywords){if (e.frequency == 5) {e.frequency = 10;}}"}}
1 {"op":2,"index":"index","content":{"address":"sjz","keywords":[{"keyword":"peace","frequency":10},{"keyword":"hope","frequency":10}],"doc_id":1,"content":"peace and hope","age":13},"ts":1629531711169}
update by query -- delete nested field POST /index/_update/1
{"script":{"source":"ctx._source.keywords.removeIf(it -> it.frequency == 10);"}}
1 {"op":2,"index":"index","content":{"address":"sjz","keywords":[],"doc_id":1,"content":"peace and hope","age":13},"ts":1629531763492}
simple delete DELETE /index/_doc/1
1 {"op":0,"index":"index","content":{"doc_id":"1"},"ts":1629531859497}
delete by query POST /index/_delete_by_query
{"query":{"match":{"doc_id":2}}}
2 {"op":0,"index":"index","content":{"doc_id":"2"},"ts":1629531944820}

变更cdc 配置 测试

PUT index/_settings
{
  "index.cdc.enabled":false
}

PUT index/_settings
{
  "index.cdc.enabled":true,
  "index.cdc.exclude.columns": "age,content"
}

POST /index/_create/6
{"doc_id": 6,"content":"peace and hope", "age": 12, "address": "bj", "keywords": [{"keyword": "peace", "frequency": 10}, {"keyword": "hope", "frequency": 5}]}


生成的 cdc 消息,在cdc_test中有如下消息:
{"op":1,"index":"index","content":{"address":"bj","keywords":[{"keyword":"peace","frequency":10},{"keyword":"hope","frequency":5}],"doc_id":6},"ts":1629532594468}
已经排除了 content 和 age 的信息

elasticsearch-cdc's People

Contributors

zhendongbai avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar

elasticsearch-cdc's Issues

put _cluster/settings报错

往集群添加配置报错,es版本是7.16的

{"error":{"root_cause":[{"type":"illegal_argument_exception","reason":"persistent setting [indices.cdc.request.timeout.ms], not recognized"}],"type":"illegal_argument_exception","reason":"persistent setting [indices.cdc.request.timeout.ms], not recognized"},"status":400}

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.