Giter Club home page Giter Club logo

canal-client's Introduction

Build Status

易用的canal 客户端 easy canal client

介绍

canal 是阿里巴巴mysql数据库binlog的增量订阅&消费组件
使用该客户端前请先了解canal,https://github.com/alibaba/canal
canal 自身提供了简单的客户端,如果要转换为数据库的实体对象,处理消费数据要每次进行对象转换。 该客户端直接将canal的数据原始类型转换为各个数据表的实体对象,并解耦数据的增删改操作,方便给业务使用。

要求

java8+

特性

  • 解耦单表增删操作
  • simple,cluster,zookeeper,kafka客户端支持
  • 同步异步处理支持
  • spring boot 开箱即用

如何使用

spring boot 方式 maven 依赖

  <dependency>
      <groupId>top.javatool</groupId>
      <artifactId>canal-spring-boot-starter</artifactId>
      <version>1.2.6-RELEASE</version>
  </dependency>

java 方式

 <dependency>
       <groupId>top.javatool</groupId>
       <artifactId>canal-client</artifactId>
       <version>1.2.6-RELEASE</version>
 </dependency>

配置说明

属性 描述 默认值
canal.mode canal 客户端类型 目前支持4种类型 simple,cluster,zk,kafka(kafka 目前支持flatMessage 格式) simple
canal.filter canal过滤的表名称,如配置则只订阅配置的表 ""
canal.batch-size 消息的数量,超过该次数将进行一次消费 1(个)
canal.timeout 消费的时间间隔(s) 1s
canal.server 服务地址,多个地址以,分隔 格式 ${host}:${port} null
canal.destination canal 的instance 名称,kafka模式为topic 名称 null
canal.user-name canal 的用户名 null
canal.password canal 的密码 null
canal.group-id kafka groupId 消费者订阅消息时可使用,kafka canal 客户端 null
canal.async 是否是异步消费,异步消费时,消费时异常将导致消息不会回滚,也不保证顺序性 true
canal.partition kafka partition null

订阅数据库的增删改操作

实现EntryHandler 接口,泛型为想要订阅的数据库表的实体对象, 该接口的方法为 java 8 的 default 方法,方法可以不实现,如果只要监听增加操作,只实现增加方法即可
下面以一个t_user表的user实体对象为例, 默认情况下,将使用实体对象的jpa 注解 @Table中的表名来转换为EntryHandler中的泛型对象,

public class UserHandler implements EntryHandler<User>{

}

如果实体类没有使用jpa @Table的注解,也可以使用@CanalTable 注解在EntryHandler来标记表名,例如

@CanalTable(value = "t_user")
@Component
public class UserHandler implements EntryHandler<User>{

   /**
   *  新增操作
   * @param user
    */
   @Override
    public void insert(User user) {
	   //你的逻辑
        log.info("新增 {}",user);
    }
    /**
    * 对于更新操作来讲,before 中的属性只包含变更的属性,after 包含所有属性,通过对比可发现那些属性更新了
   * @param before
   * @param after
    */
    @Override
    public void update(User before, User after) {
    	//你的逻辑
        log.info("更新 {} {}",before,after);
    }
    /**
    *  删除操作
    * @param user
    */
    @Override
    public void delete(User user) {
       //你的逻辑
        log.info("删除 {}",user); 
   }
}

另外也支持统一的处理@CanalTable(value="all"),这样除去存在EntryHandler的表以外,其他所有表的处理将通过该处理器,统一转为Map<String, String>对象

@CanalTable(value = "all")
@Component
public class DefaultEntryHandler implements EntryHandler<Map<String, String>> {
     @Override
        public void insert(Map<String, String> map) {
            logger.info("insert message  {}", map);
        }
    
        @Override
        public void update(Map<String, String> before, Map<String, String> after) {
            logger.info("update before {} ", before);
            logger.info("update after {}", after);
        }
    
        @Override
        public void delete(Map<String, String> map) {
            logger.info("delete  {}", map);
        }
}

如果你想获取除实体类信息外的其他信息,可以使用

CanalModel canal = CanalContext.getModel();

具体使用可以查询项目demo 示例
https://github.com/NormanGyllenhaal/canal-client/tree/master/canal-example

canal-client's People

Contributors

normangyllenhaal avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar

canal-client's Issues

事务问题

当mysql 开启事务的时候 。 写入多表 多数据 。这边能相应到 但是 是循环返回的 , 能不能一次性返回 。不做多次返回

Column中isNull取到的是true时,Java实体属性为Number类型时会转换异常

   @Override
    <R> R newInstance(Class<R> c, List<CanalEntry.Column> columns) throws Exception {
        R object = c.newInstance();
        Map<String, String> columnNames = EntryUtil.getFieldName(object.getClass());
        for (CanalEntry.Column column : columns) {
            String fieldName = columnNames.get(column.getName());
            if (StringUtils.isNotEmpty(fieldName)) {
                FieldUtil.setFieldValue(object, fieldName, column.getValue());
            }
        }
        return object;
    }

    static Object convertType(Class<?> type, String columnValue) {
        if(columnValue==null){
            return null;
        }else if (type.equals(Integer.class)) {
            return Integer.parseInt(columnValue);
        } else if (type.equals(Long.class)) {
            return Long.parseLong(columnValue);
        } else if (type.equals(Boolean.class)) {
            return convertToBoolean(columnValue);
        } else if (type.equals(BigDecimal.class)) {
            return new BigDecimal(columnValue);
        } else if (type.equals(Double.class)) {
            return Double.parseDouble(columnValue);
        } else if (type.equals(Float.class)) {
            return Float.parseFloat(columnValue);
        } else if (type.equals(Date.class)) {
            return parseDate(columnValue);
        } else if (type.equals(java.sql.Date.class)) {
            return parseDate(columnValue);
        } else {
            return columnValue;
        }
    }

一点建议

1.字段日期类型不全面,YYYY-MM-dd hh:mm:ss.SSS 等格式无法转换,目前只能使用string类型接收
2.canal.asyncs设置为false不生效,启动会报错,原因:同一个类中2个MessageHandler用同样的methodname,@bean注解应该只会按照方法名注入第一个bean. 换用不同方法名即可
3.版本1.2.6目前无法下载

启动后控制台一直在打log

INFO 5324 --- [l-client-thread] t.j.c.client.client.AbstractCanalClient : 获取消息 Message[id=9732,entries=[header {
version: 1
logfileName: "mysql-bin.000092"
logfileOffset: 40795484
serverId: 12345
serverenCode: "UTF-8"
executeTime: 1565505699000
sourceType: MYSQL
schemaName: "changgou_goods"
tableName: "tb_sku"
eventLength: 6592
eventType: UPDATE
props {
key: "rowsCount"
value: "4"
}
}
entryType: ROWDATA

@CanalTable(value = "") 遇到的问题

top.javatool.canal.client.annotation.CanalTable 此注解,在数据库表名不规范时监听不到
例:
错误 table : T_TxxxxxxInfo
正确 table : t_txxxx_info
以后使用可以注意下,或者作者可以优化下

多库支持的问题

是否支持多个数据库:
@CanalTable(value = "t_user")只支持单库,当多个库中都有t_user这个表的时候均会进入同一个EntryHandler,不知道有没有根据数据库和数据表一起做条件匹配EntryHandler的方法?

映射相关的问题

1、对非纯字母的字段值实体映射时,属性值为空。如包含"_"的字段,属性名为驼峰;
2、ORM实体属性不支持LocalDateTime;

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.