Giter Club home page Giter Club logo

canal_mysql_elasticsearch_sync's Introduction

canal_mysql_elasticsearch_sync 支持请star✨

交流Q群:733688083

canal于v1.1.2版本后,已支持自动同步到Elasticsearch。赞canal!canal传送门

基于 canalMysqlElasticsearch 实时同步的 javaweb 服务。
canal是阿里巴巴mysql数据库binlog的增量订阅&消费组件。

工作原理

全量

暴露Http接口(接口定义见wiki),待调用后开启后台线程,通过主键分批同步指定数据库中数据到Elasticsearch

读取数据库会加读锁
主键必须为数字类型

过程

  1. 首先会根据所给的数据库主键字段,拿到最大的主键数字max_id;
  2. pk=min_id(默认是数据库中的主键最小值);
  3. 加读锁🔐,从数据库中取出pk —— pk+stepSize 大小的数据(默认500)的数据;
  4. 插入到Elasticsearch中;
  5. 释放读锁🔐,pk累加stepSize,循环3.操作,直到pk>max_id

增量

循环监听canal通过binlog同步过来的event事件,区别增删改进行与之对应的Elasticsearch的操作。

目前只解析了 insert、update、delete,其它数据库操作会被忽略

默认相关字段映射

Mysql字段类型 Elasticsearch类型
char {"type": "text", "fields": {"keyword": {"type": "keyword", "ignore_above": 256}}
text {"type": "text", "fields": {"keyword": {"type": "keyword", "ignore_above": 256}}
blob {"type": "text", "fields": {"keyword": {"type": "keyword", "ignore_above": 256}}
int {"type": "long"}
date {"type": "date"}
time {"type": "date"}
float {"type": "float"}
double {"type": "float"}
decimal {"type": "float"}
其它 {"type": "text", "fields": {"keyword": {"type": "keyword", "ignore_above": 256}}

注意事项

  • Mysql的binlog格式必须为ROW
  • 因为有行锁,Mysql中table使用的存储引擎须为InnoDB
  • 由于使用binlog进行增量同步,和数据库主从类似,不可避免的会有一定的主从延迟,延迟时间取决于机房网络、机器负载、数据量大小等
  • Elasticsearch支持的版本为5.x
  • canal已测试版为v1.0.24,其他版本请自行测试
  • 增量同步只监听了 INSERT、UPDATE、DELETE,其它如建表、删表等尚未支持
  • 建议Elasticsearch的mapping手动来创建,因为默认的创建方式不能保证满足业务需求

相关文档

联系方式

如果有不合理的地方,还请不吝赐教。

  • QQ群:733688083

支持记得star✨

canal_mysql_elasticsearch_sync's People

Contributors

starcwang avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

canal_mysql_elasticsearch_sync's Issues

start application has meet some question?

Error starting ApplicationContext. To display the auto-configuration report re-run your application with 'debug' enabled.
[2024-02-22 21:10:37.076] [ERROR] [main] [o.s.boot.SpringApplication] --- Application startup failed
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'canalScheduling': Injection of resource dependencies failed; nested exception is org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'getCanalConnector' defined in class path resource [com/star/sync/elasticsearch/client/CanalClient.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [com.alibaba.otter.canal.client.CanalConnector]: Factory method 'getCanalConnector' threw exception; nested exception is com.alibaba.otter.canal.protocol.exception.CanalClientException: com.alibaba.otter.canal.protocol.exception.CanalClientException: java.net.ConnectException: Connection refused: connect
at org.springframework.context.annotation.CommonAnnotationBeanPostProcessor.postProcessPropertyValues(CommonAnnotationBeanPostProcessor.java:321)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.populateBean(AbstractAutowireCapableBeanFactory.java:1264)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:553)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:483)
at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:306)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:302)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:197)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:761)
at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:867)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:543)
at org.springframework.boot.context.embedded.EmbeddedWebApplicationContext.refresh(EmbeddedWebApplicationContext.java:122)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:693)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:360)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:303)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1118)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1107)
at com.star.sync.elasticsearch.CanalMysqlElasticsearchSyncApplication.main(CanalMysqlElasticsearchSyncApplication.java:20)
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'getCanalConnector' defined in class path resource [com/star/sync/elasticsearch/client/CanalClient.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [com.alibaba.otter.canal.client.CanalConnector]: Factory method 'getCanalConnector' threw exception; nested exception is com.alibaba.otter.canal.protocol.exception.CanalClientException: com.alibaba.otter.canal.protocol.exception.CanalClientException: java.net.ConnectException: Connection refused: connect
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:599)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1173)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1067)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:513)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:483)
at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:306)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:230)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:302)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202)
at org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:208)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1138)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1066)
at org.springframework.context.annotation.CommonAnnotationBeanPostProcessor.autowireResource(CommonAnnotationBeanPostProcessor.java:518)
at org.springframework.context.annotation.CommonAnnotationBeanPostProcessor.getResource(CommonAnnotationBeanPostProcessor.java:496)
at org.springframework.context.annotation.CommonAnnotationBeanPostProcessor$ResourceElement.getResourceToInject(CommonAnnotationBeanPostProcessor.java:627)
at org.springframework.beans.factory.annotation.InjectionMetadata$InjectedElement.inject(InjectionMetadata.java:169)
at org.springframework.beans.factory.annotation.InjectionMetadata.inject(InjectionMetadata.java:88)
at org.springframework.context.annotation.CommonAnnotationBeanPostProcessor.postProcessPropertyValues(CommonAnnotationBeanPostProcessor.java:318)
... 17 common frames omitted
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [com.alibaba.otter.canal.client.CanalConnector]: Factory method 'getCanalConnector' threw exception; nested exception is com.alibaba.otter.canal.protocol.exception.CanalClientException: com.alibaba.otter.canal.protocol.exception.CanalClientException: java.net.ConnectException: Connection refused: connect
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:189)
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:588)
... 34 common frames omitted
Caused by: com.alibaba.otter.canal.protocol.exception.CanalClientException: com.alibaba.otter.canal.protocol.exception.CanalClientException: java.net.ConnectException: Connection refused: connect
at com.alibaba.otter.canal.client.impl.ClusterCanalConnector.connect(ClusterCanalConnector.java:73)
at com.star.sync.elasticsearch.client.CanalClient.getCanalConnector(CanalClient.java:39)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:162)
... 35 common frames omitted
Caused by: com.alibaba.otter.canal.protocol.exception.CanalClientException: java.net.ConnectException: Connection refused: connect
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.doConnect(SimpleCanalConnector.java:171)
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.connect(SimpleCanalConnector.java:97)
at com.alibaba.otter.canal.client.impl.ClusterCanalConnector.connect(ClusterCanalConnector.java:63)
... 41 common frames omitted
Caused by: java.net.ConnectException: Connection refused: connect
at sun.nio.ch.Net.connect0(Native Method)
at sun.nio.ch.Net.connect(Net.java:482)
at sun.nio.ch.Net.connect(Net.java:474)
at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:647)
at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.doConnect(SimpleCanalConnector.java:132)
... 43 common frames omitted
Disconnected from the target VM, address: '127.0.0.1:64466', transport: 'socket'

Process finished with exit code 1

无法捕捉到inert和delete操作,捕捉到update操作,但是没有进行更新操作

com.star.sync.elasticsearch.scheduling.CanalScheduling:
if (batchId != -1 && entries.size() > 0) {
entries.forEach(entry -> {
*****这里只能看到有update
logger.info("entrytype="+entry.getHeader().getEventType());
// entry.
if (entry.getEntryType() == EntryType.ROWDATA) {
publishCanalEvent(entry);
}
});
com.star.sync.elasticsearch.listener.UpdateCanalListener:
doSync这个操作都没执行过

什么情况呢??

syncByTable 问题

maxPK问题: 很多时候maxPK ≠ totalCount. 这里用maxPK会导致进度计算不准确.

第一次同步的最大主键问题

你好,数据库数据一直处于变动中,第一次的时候设置结束主键,比如数据库主键目前最大是1000,但是数据库一直在运行中,我怕出现问题,把最大的主键设置1200,实际数据并没有到1200,那下次同步的时候主键是从多少开始呢,还有第一次手动运行全量之后,后面会自动运行同步数据吗?

如何触发增量的数据导入es

目前可以通过springboot启动访问开放的api进行全量导入到es,但是我在更改数据库数据后,发现没有同步到es,控制台也没反应

全量和增量切换的数据一致性问题

增量同步不断进行中,当手动触发了全量同步时,增量同步会暂停吗?如果会,当这次全量同步结束后,增量同步再次启动,会收到旧的数据变更事件吗?

感谢分享

正在找这方面的解决方案,感觉这个很靠谱,感谢分享

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.