moiot / gravity Goto Github PK
View Code? Open in Web Editor NEWA Data Replication Center
License: Apache License 2.0
A Data Replication Center
License: Apache License 2.0
the argument for function current_timestamp() is lost after parsing. waiting for response from pingcap/parser#186
The link is for how mySQL needs to be configured is broken here:
https://github.com/moiot/gravity/blob/master/docs/2.0/01-quick-start-en.md
Can you let me know, so i can get started.
As titled, thx
In [[input.config.table-configs]] and [[output.config.routes]] part when I have many tables in database order_1,eg abcdefg table, I just want to the table abcd to synchronization.current I need to write 4 section config,I wish to simplization the config,like this :
[[output.config.routes]]
match-schema = "order_1"
match-table = ["a","b","c","d"]
target-schema = "OrderDB"
target-table = ["a","b","c","d"]
the mysql_replace engine's performance of huge transaction full of deletions(e.g. delete * from big_table) is bad (on our sandbox TiDB cluster, 5k row/sec with 200 connections) .
Add new sql engine to support insert ignore into
, it's very useful to fix data.
Create _gravity.gravity_txn_tags in the input plugin.
when unique key chagnes, we should guarentee the event sequence
When doing bi-directional replication. There will be conflict when the primary key is auto increment (generated by mysql).
There is a WAL implementation in golang for postgesql here.
https://github.com/wal-g/wal-g
It does not require any add one to pistresql. But it needs a little work to integrate.
Is it something the team is interested in ? I can work on integration.
refactor metrics according our plugin architecture. The framework should collect tps and latency for input, filter, scheduler, output and end2end.
Use english as default document, and use binary distribution in quick start directly @ming-relax
push helm
into public repo, so that other people can install gravity cluster using a single helm command @Ryan-Git
push grafana json definition into repo @ming-relax
使用过程中,停掉gravity一段时间后再启动复制,会出现错误:
ERROR 1236 (HY000): The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.
查看http://mysql.taobao.org/monthly/2016/01/08/,有可能是这个问题引起,gravity内部是否会处理
Because MySQL-->TiDB is asynchronous,the Gravity may down or the other reasons lead to lag,in some cases i want to know when the data write to TiDB,eg,query the incremental data.
So I need to add a special column for every table to record the time in target database. at current version,if i restart gravity it can't synchrone the data. I advice to add a special column for every table when synchronous data,eg _loadtime(alter table t add column _intime datetime(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) ON UPDATE CURRENT_TIMESTAMP(6) ). Then the table in target database will automatic add a column for example _loadtime to record the time when the data to target database ,and if restart Gravity it can replicate normal. The better way to deal with this question ,I think set a special configure options,when need you can turn it on,or you can turn it off,default set it OFF.
Gravity version:
./gravity -V
Release Version (gravity): 0.0.1+git.
Git Commit Hash:
Git Branch:
UTC Build Time: 2019-03-12 12:21:13
the version may be 0.9.27, I clone the master branch and compiled by myself.
Error log:
{"file":"mysql_table_scanner.go","level":"info","line":185,"msg":"[LoopInBatch] prepare current: [{Value:{String:1467178125170318497 Valid:true} Type: Column:id}]","pipeline":"crm_mysql2tidb","time":"2019/03/14 11:20:32.051"}
{"file":"cache.go","level":"fatal","line":63,"msg":"[defaultPositionCache] ticker flush failed: mysqlbatch.BatchPositionValueV1.TableStates: mysqlbatch.TableStatsV1.Done: ScannedCount: EstimatedRowCount: Current: Min: []mysqlbatch.TablePosition: [MapString] unknown type: int32, column: Id\n/usr/local/go/src/github.com/moiot/gravity/pkg/position_store/cache.go:229: \n/usr/local/go/src/github.com/moiot/gravity/pkg/position_store/cache.go:169: ","pipeline":"crm_mysql2tidb","time":"2019/03/14 11:20:32.694"}
configure file like this:
#MySQL 到 TiDB 全量+增量同步
name="crm_mysql2tidb"
[input]
type = "mysql"
mode = "replication"
[input.config]
nr-scanner = 10
table-scan-batch = 10000
batch-per-second-limit = 1
max-full-dump-count = 10000
[input.config.source]
host = "172.16.1.86"
username = "_gravity"
password = "gravity"
port = 3310
[input.config.source-slave]
host = "172.16.1.87"
username = "_gravity"
password = "gravity"
port = 3310
#需要扫描的表
[[input.config.table-configs]]
schema = "yjp_broker"
table = "*"
[[input.config.table-configs]]
schema = "yjp_office"
table = "*"
[[input.config.table-configs]]
schema = "yjp_czbank"
table = "*"
#目标库表:
[output]
type = "mysql"
[output.config.target]
host = "172.17.14.19"
username = "root"
password = "yijiupi"
port = 4000
[output.config]
enable-ddl=true
[[output.config.routes]]
match-schema = "yjp_broker"
match-table = ""
target-schema = "BrokerDB"
target-table = ""
[[output.config.routes]]
match-schema = "yjp_office"
match-table = ""
target-schema = "OfficeDB"
target-table = ""
[[output.config.routes]]
match-schema = "yjp_czbank"
match-table = ""
target-schema = "BankDB"
target-table = ""
at TiDB I just create database,like this:
create database BrokerDB;
create database OfficeDB;
create databse BankDB;
then i start gravity:
/usr/local/gravity/gravity -config=/usr/local/gravity/scm_mysql2tidb.toml -http-addr=172.17.14.22:8091 -log-file=/usr/local/gravity/scm.log -L=info
and the table define like this :
mysql> show create table brokerregionordersync\G
*************************** 1. row ***************************
Table: brokerregionordersync
Create Table: CREATE TABLEbrokerregionordersync
(
id
bigint(20) NOT NULL COMMENT '主键',
orderid
bigint(20) NOT NULL COMMENT '订单ID',
orderitemid
bigint(20) NOT NULL COMMENT '订单项ID',
city_id
int(11) NOT NULL COMMENT '城市id',
state
tinyint(4) NOT NULL COMMENT '订单同步状态 1 下单 2 完成 3 取消 4 退货',
regionbrokerid
int(11) DEFAULT NULL COMMENT '专区经纪人id',
crmchannelid
bigint(20) DEFAULT NULL COMMENT 'CRM频道id',
salaryratio
decimal(4,2) DEFAULT NULL COMMENT '业绩系数',
returnorderid
bigint(20) DEFAULT NULL COMMENT '退货订单ID',
returnorderitemid
bigint(20) DEFAULT NULL COMMENT '退货订单项ID',
createtime
datetime NOT NULL COMMENT '创建时间',
lastmodifytime
datetime NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '最后修改时间',
salarybrokerid
int(11) DEFAULT NULL COMMENT '业绩经纪人id',
PRIMARY KEY (id
),
KEYix_orderid
(orderid
) USING BTREE COMMENT '订单id',
KEYix_regionbrokerid
(regionbrokerid
) USING BTREE COMMENT '经纪人id',
KEYix_city_id
(city_id
) USING BTREE COMMENT '城市id',
KEYix_orderitemid
(orderitemid
) USING BTREE COMMENT '订单项id',
KEYix_returnorderitemid
(returnorderitemid
) USING BTREE COMMENT '退货单项id'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci COMMENT='经纪人多专区订单同步表'
1 row in set (0.00 sec)
conclusion:
I guess the data type(bigint unsigned) in MySQL is not mapping well in Gravity soure code,so the error info is:unknown type: int32, column: Id
Please check.
Initial design https://github.com/moiot/gravity/blob/master/docs/rfc_schema_storage.md
we can consider use https://github.com/bytewatch/ddl-executor
Now one working set is used for all table buffers. As message in different table buffer can't be conflict, each table buffer should have its own working set.
the value of MySQL data type decimal(M,N) to kafka,the data is unknonw.
the original data:
ExactPayAmount: 142.00
OrderAmount: 142.00
PayableAmount: 142.00
ReduceAmount: 0.00
UseCouponAmount: 0.00
GiveBonusAmount: 0.00
by gravity to kafka:
{
"version": "0.1",
"database": "order_3",
"table": "orders",
"type": "insert",
"ts": 1553479471,
"time_zone": "Asia/Shanghai",
"host": "",
"data": {
"Address_Id": "1293184",
"City": "太原市",
"City_Id": "704",
"Contact": "郭生",
"CostWineScore": "[48 46 48 48]",
"County": "小店区",
"CreateTime": "2018-10-10 15:24:41",
"CreateUserId": "408501",
"DeliveryMode": "0",
"DetailAddress": "御龙庭底*****",
"ExactPayAmount": "[49 52 50 46 48 48]",
"GiveBonusAmount": "[48 46 48 48]",
"GiveCouponAmount": "[48 46 48 48]",
"GiveWineScore": "[48 46 48 48]",
"HasPayment": "0",
"Id": "704118101015312437",
"IsTestOrder": "0",
"LastModifyTime": "2019-01-27 00:05:00.1",
"LastUpdateTime": "2018-10-12 10:19:00",
"LastUpdateUserId": "891",
"OrderAmount": "[49 52 50 46 48 48]",
"OrderAttachedGiftRule_Id": null,
"OrderAuditTime": null,
"OrderBatch_Id": "704418101015312436",
"OrderCancelTime": "2018-10-12 10:19:00",
"OrderClassify": "0",
"OrderCompleteTime": null,
"OrderConfirmTime": "2018-10-10 15:24:41",
"OrderCreateTime": "2018-10-10 15:24:41",
"OrderDeliverTime": null,
"OrderNo": "704828300340",
"OrderPrintTime": null,
"OrderReducationRule_Id": null,
"OrderSyncTime": null,
"OrderType": "0",
"PayType": "0",
"PayableAmount": "[49 52 50 46 48 48]",
"PaymentState": null,
"Phone": "1873457****",
"ProductReduceAmount": "[48 46 48 48]",
"Province": "山西省",
"QrCodeFileId": null,
"ReduceAmount": "[48 46 48 48]",
"RemarkService": null,
"RemarkUser": "",
"SalesmanId": "762",
"State": "8",
"StateUser": "7",
"Street": "平阳路西二巷",
"SysRemark": "",
"UseAwardBonusAmount": null,
"UseBonusAmount": "[48 46 48 48]",
"UseCouponAmount": "[48 46 48 48]",
"UseCouponCodeAmount": "[48 46 48 48]",
"UseRewardBonusAmount": "[48 46 48 48]",
"UserCancelReason": null,
"UserCompanyName": "生鲜超市****",
"UserId": "408501",
"UserMobileNo": "1873457****",
"UserName": "郭*生",
"Verison": "1.0.0",
"ZipCode": null,
"area_id": null,
"onlineSalesmanDeptId": null,
"onlineSalesmanId": null,
"orderItemCount": "1",
"pickup_warehouse_Id": "7041",
"salesmanDeptId": "2",
"selfPickUpReduceAmount": "[48 46 48 48]",
"shop_id": null
},
"old": {},
"pks": {
"Id": "704118101015312437"
}
}
unknow data:
"ExactPayAmount": "[49 52 50 46 48 48]",
"GiveBonusAmount": "[48 46 48 48]",
"GiveCouponAmount": "[48 46 48 48]",
"GiveWineScore": "[48 46 48 48]",
We should consider start a snapshot consistent read.
Friends told me that Gravity has frond-end to configure the task with Visualization.
While i don't get the step in the repo how to use it. I wonder check whether the gravity has the frond-end and if exists how to deploy and use.
I just want to synchronize special databases and tables in thousands of tables,like this :
需要扫描的表
- 必填
[[input.config.table-configs]]
schema = "yjp_productsku_1"
table = "productsku,productinfo,ProductSkuSalePolicy"
[[input.config.table-configs]]
schema = "yjp_productsku_2"
table = "productsku,ProductSkuSalePolicy"
[[input.config.table-configs]]
schema = "yjp_prodcutsku_3"
table = "productsku,ProductSkuSalePolicy"
[output]
type = "mysql"
[output.config.target]
host = "172.17.14.19"
username = "root"
password = "xyz"
port = 4000
[output.config]
enable-ddl=true
#分库分表合并
[[output.config.routes]]
match-schema = "jp_productsku_1"
match-table = "ProductSku"
target-schema = "ProductDB"
target-table = ""
[[output.config.routes]]
match-schema = "jp_productsku_1"
match-table = "ProductSkuSalePolicy"
target-schema = "ProductDB"
target-table = ""
[[output.config.routes]]
match-schema = "jp_productsku_2"
match-table = "ProductSku"
target-schema = "ProductDB"
target-table = ""
[[output.config.routes]]
match-schema = "jp_productsku_2"
match-table = "ProductSkuSalePolicy"
target-schema = "ProductDB"
target-table = ""
[[output.config.routes]]
match-schema = "jp_productsku_3"
match-table = "ProductSku"
target-schema = "ProductDB"
target-table = ""
[[output.config.routes]]
match-schema = "jp_productsku_3"
match-table = "ProductSkuSalePolicy"
target-schema = "ProductDB"
target-table = ""
#全局表
[[output.config.routes]]
match-schema = "jp_productsku_1"
match-table = "ProductInfo"
target-schema = "ProductDB"
target-table = "ProductInfo"
however by log or monitor,it scan all the database's table:
and how to wite the configure file to synchronize the tables and database which I just want. by history data and incremental data. The other are not to synchronize.
when MongoDB to TiDB,I need to use the values of field _id in mongodb's evey collections as primary key,In TiDB I can define it VARCHAR(32) to store.However in latest version,It appears like
\kԸ���
or
output exec error: Error 1366: incorrect utf8 value 5c9452256c6aac201560b38d(\\ufffdR%lj\ufffd \u0015`\ufffd\ufffd) for column _id\n
I guess it when get data from mongodb,it does't parse the _id field,just pass the _id 's value like:
ObjectId("5c949127edbfd32f36eea4cd")
not 5c949127edbfd32f36eea4cd.
Compatible with old configuration
Update docuement
Batch mode should exit on success
gravity-operator should create a Job when in batch mode
Support mongo to tidb
首先感谢坐着开源,在这里向问一下。目前我要把gravity放到一个数据迁移的场景下使用。
请问必须要在目标端创建跟源端一样的库及表才可以进行吗?
麻烦可以提供一下微信群号方便之后的学习跟交流。
谢谢。
I want to the from mongodb to tidb/mysql ,but I don‘t know how configure it ? Could you provide an example for it.
Match specific DDL statement
Use hint in SQL to tag the internal DDL, so that we can synchronize DDL bidirectional.
需求1:两个数据源A,B,需要在A库的DDL同步到B库,B库的DDL同步到A库,目前该需求可以通过filter
[[filters]]
type = "reject"
match-dml-op = ["insert", "update", "delete"]
临时解决,
但是仍存在写循环的问题,即A生成的DDL通过binlog同步到B库,但是B库自己写入的binlog,会反向同步到A库,此时B->A的复制链路就会因为冲突而出错。
郁白给出的建议是通过gravity执行的SQL通过加hint的方式标记,B->A链路在处理B库产生的binlog的时候通过hint过滤,不再处理通过gravity执行的SQL,来解决写循环的问题。
需求2:细粒度区分DDL类型,create-table、alter table add 、 alter table drop ,alter table modify ....。 提供matcher 进行过滤同步指定哪一类的DDL type。
Problem:
There is a golang library that is written by the golang team to allow your go code to use different Messages Queues and Stores.
So for Gravity make it easier to use different queues and stores, which is exactly what its intend is.
Message Queues
https://github.com/google/go-cloud/blob/master/pubsub/pubsub.go
There is no Kafka one yet, but it has NATS
Blob Store ( like s3)
https://github.com/google/go-cloud/tree/master/blob
I would like to add nats and minio.
This is easy to do I think with the existing code base and is 100% golang.
NATS is easy.
Minio also easy and has SQL.
Do you use Benthos also for the transformation to customise the data ?
Do you have a plan to support transfer the data of MySQL and TiDB to ElasticSearch?
ElasticSearch is more professional than mysql or tidb do fulltext index .
I would like to add Oracle DB as a source.
Is the team ok with this ?
up to 2019.02.27,gravity not support DDL statement:drop table .Wish to support it,if used not frequently can disabled default.
首先感谢作者的开源。在这里我想把gravity放到一个迁移的场景下进行使用。
这里问一下根据手册必要要在目标端创建跟源端一样的库及表结构才可以进行,请问这里可以不这样做吗?
方便的话可以提供一下微信或者qq群方便学习,交流。
谢谢, ^_^
If deploy in cluster mode, how can i distribute different toml config to different.
eg:if i had 10 different mysql2mysql.toml config file and depley cluster with 10 instances. how each instance server only for one mysql.toml in case of duplicate sync.
没有同步成功,提示信息如下
[mysql] 2019/01/05 17:26:05 packets.go:36: read tcp 10.0.1.4:46838->10.0.1.5:3380: i/o timeout
[mysql] 2019/01/05 17:26:37 packets.go:36: read tcp 10.0.1.4:47064->10.0.1.5:3380: i/o timeout
gravity version:
./gravity -V
Release Version (gravity): 0.0.1+git.dc0de6e
Git Commit Hash: dc0de6e
Git Branch: master
UTC Build Time: 2019-03-18 02:49:05
error message:
{"file":"batch_table_scheduler.go","level":"fatal","line":211,"msg":"[batchScheduler] output exec error: Error 1406: Data too long for column 'AuditRemark' at row 1\n/usr/local/go/src/github.com/moiot/gravity/pkg/sql_execution_engine/mysql_replace_engine.go:101: \n/usr/local/go/src/github.com/moiot/gravity/pkg/outputs/mysql/mysql.go:336: ","pipeline":"crm_mysql2tidb","time":"2019/03/18 10:59:01.054"}
the column AuditRemark's datatye is varchar(1024).
desc yjp_broker.brokerdisplaycontractcyclerecord;
+-------------------+---------------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------------------+---------------+------+-----+---------+----------------+
| Id | int(11) | NO | PRI | NULL | auto_increment |
| CycleId | int(11) | NO | | NULL | |
| Location | varchar(255) | YES | | NULL | |
| Longitude | double(13,10) | YES | | NULL | |
| Latitude | double(13,10) | YES | | NULL | |
| ImgId | varchar(128) | YES | | NULL | |
| UploadTime | datetime | YES | | NULL | |
| State | tinyint(4) | YES | | NULL | |
| CheckTime | datetime | YES | | NULL | |
| CreateTime | datetime | NO | | NULL | |
| CreateUser_Id | int(11) | NO | | NULL | |
| LastUpdateTime | datetime | YES | | NULL | |
| LastUpdateUser_Id | int(11) | YES | | NULL | |
| AuditRemark | varchar(1024) | YES | | NULL | |
+-------------------+---------------+------+-----+---------+----------------+
14 rows in set (0.01 sec)
mysql> create database test;
Query OK, 1 row affected (0.00 sec)
mysql> use test
Database changed
mysql> CREATE TABLE t(id BIGINT,remark VARCHAR(255));
Query OK, 0 rows affected (0.00 sec)
mysql> insert into t(id,remark)values(1,REPEAT('汉',255));
Query OK, 1 row affected (0.00 sec)
mysql> select CHARACTER_LENGTH(remark),length(remark) from t;
+--------------------------+----------------+
| CHARACTER_LENGTH(remark) | length(remark) |
+--------------------------+----------------+
| 255 | 765 |
+--------------------------+----------------+
1 row in set (0.00 sec)
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.