Giter Club home page Giter Club logo

mysqlsmom's Introduction

Alt text

简介

一个 同步 Mysql 数据到 Elasticsearch的工具,特色是支持分析 binlog 做实时增量同步,以及支持编写自定义逻辑处理数据后再同步到 es。

纯 Python 编写,运行 mysqlsmom 的唯三要求:

  • python2.7
  • redis
  • Mysql 配置 binlog-format=row

中文文档地址:https://mysqlsmom.readthedocs.io/en/latest/

快速开始

从一个全量同步开始。

安装

pip install mysqlsmom

然后指定 elasticsearch 版本(默认支持2.4),支持其它版本请运行(将5.4换成需要的elasticsearch版本)

pip install --upgrade elasticsearch==5.4

设置同步配置

创建全量同步配置文件

mom new test_mom/init_config.py -t init --force

此时的目录结构

└── test_mom
    └── init_config.py

编辑同步配置

vim ./test_mom/init_config.py  # 按注释提示修改配置

开始同步

mom run -c ./test_mom/init_config.py

等待同步完成即可。

注意

全量同步完成后不会自动增量同步新修改的数据,需要增量同步请查看全部文档中的增量同步部分。

版本升级

本次更新只是加入了对 pip install mysqlsmom 以及 命令行的支持,关键代码并无任何改动。

通过旧版本 git clone 和 python mysqlsmom.py ./config/xxx.py 运行同步的用户 无需 更新代码,稍后加入对升级步骤的详细说明。

mysqlsmom's People

Contributors

m358807551 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

mysqlsmom's Issues

当在MySQL中的id为字符串时会报错

Traceback (most recent call last):
File "mysqlsmom.py", line 213, in
handle_init_stream(config_module)
File "mysqlsmom.py", line 113, in handle_init_stream
for row in query:
File "/data/anaconda3/lib/python3.6/site-packages/peewee.py", line 3672, in iterator
yield self.iterate(False)
File "/data/anaconda3/lib/python3.6/site-packages/peewee.py", line 3660, in iterate
result = self.process_row(row)
File "/data/anaconda3/lib/python3.6/site-packages/peewee.py", line 6348, in process_row
result[attr] = convertersi
File "/data/anaconda3/lib/python3.6/site-packages/peewee.py", line 3896, in python_value
return value if value is None else self.adapt(value)
ValueError: invalid literal for int() with base 10: 'stff01'

请求开发python3的版本

pypi 上面已经删掉这个包了 找不到了 ,源码包clone 下来python setup.py install 也报各种2.7的错误 能不能开发python3 的版本

中文字段使用ik插件进行分词

您好,请问一下做这个在哪里修改代码,修改mapping设置,使得某中文字段在ik分词插件下构建索引,并进行搜索?

比如:我的question字段需要设置:
"question": {
"type": "text",
"analyzer": "ik_smart",
"search_analyzer": "ik_max_word",
}

但是我再代码目录结构中没有找到添加的地方

期待作者的回复,谢谢!!!

一些建议

您好,我们计划使用您的这个工具同步mysql和es。使用中想完成这些功能:
1、初始化,针对大表,实现区间查询;
2、针对增量,时间辍不使用上次运行时间,而使用上批数据最大时间。并支持:"select id, name from person where update_time >= ?"
->"select id, name from person where update_time >= ? and _id > ?"

即记录的上次增量的最大时间辍及主键id(我们有些场景,偶尔少量一微秒更新几十万数据)。这样更新大量数据,也可一次取只定条数,并记录本次更新到哪个时间辍和记录id。
3、支持嵌套查询
4、增量模式支持delete

您看,是否有计划完成这些功能?或者我们增加向你提交也行。

elasticsearch 有鉴权,如何配置

遇到ES有鉴权的时候,会报错
2019-12-10 16:54:19,241 elasticsearch WARNING Undecodable raw error response from server: No JSON object could be decoded: line 1 column 0 (char 0) Traceback (most recent call last): File "/usr/local/bin/mom", line 11, in <module> sys.exit(cli()) File "/usr/local/lib/python2.7/dist-packages/click/core.py", line 722, in __call__ return self.main(*args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/click/core.py", line 697, in main rv = self.invoke(ctx) File "/usr/local/lib/python2.7/dist-packages/click/core.py", line 1066, in invoke return _process_result(sub_ctx.command.invoke(sub_ctx)) File "/usr/local/lib/python2.7/dist-packages/click/core.py", line 895, in invoke return ctx.invoke(self.callback, **ctx.params) File "/usr/local/lib/python2.7/dist-packages/click/core.py", line 535, in invoke return callback(*args, **kwargs) File "/usr/local/lib/python2.7/dist-packages/mysqlsmom/mysqlsmom.py", line 416, in run handle_init_stream(config_) File "/usr/local/lib/python2.7/dist-packages/mysqlsmom/mysqlsmom.py", line 193, in handle_init_stream to_dest.upload_docs() File "/usr/local/lib/python2.7/dist-packages/mysqlsmom/mysqlsmom.py", line 137, in upload_docs helpers.bulk(es, self.docs) File "/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/actions.py", line 300, in bulk for ok, item in streaming_bulk(client, actions, *args, **kwargs): File "/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/actions.py", line 230, in streaming_bulk **kwargs File "/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/actions.py", line 116, in _process_bulk_chunk raise e elasticsearch.exceptions.AuthenticationException: AuthenticationException(401, u'Unauthorized')

模版配置是:
NODES= [{"host": "10.10.10.10", "port": 8080}]

鉴权后修改成为如下即可:
NODES = ["http://user:[email protected]:8080"]

配置好,然后运行一段时间后报Killed

ElasticSearch用的是最新的 7.4
安装时没有这个选择,就选择了
pip install --upgrade elasticsearch==7.1.0
mom new test_mom/init_config.py -t init --force
vim ./test_mom/init_config.py # 按注释提示修改配置
执行一段时间,大概几分钟,然后就报Killed,什么信息都没有。
[root@luyaodev local]# mom run -c ./test_mom/init_config.py
Killed
[root@luyaodev local]#

es6.6 同步有问题

2019-07-08 17:31:42,149 root INFO {"status": "\u0001", "lang": -1, "update_time": "2019-07-08 17:23:37", "github": null, "star": 5, "descr": "api", "tags": "["api", "springboot"]", "favicon": "/favicon/spring.png", "title": "SpringBoot", "show": "\u0001", "gitee": null, "link": "https://docs.spring.io/spring-boot/docs/1.5.9.RELEASE/api/", "insert_time": "2016-11-28 16:14:01", "indexed": "springboot", "sorted": 1110, "_id": 423, "type": 100026, "id": 423}
2019-07-08 17:31:42,161 elasticsearch INFO POST http://localhost:9200/_bulk [status:200 request:0.011s]
2019-07-08 17:31:42,164 apscheduler.executors.default ERROR Job "do_one_task (trigger: interval[0:01:00], next run at: 2019-07-08 17:32:42 CST)" raised an exception
Traceback (most recent call last):
File "/usr/lib/python2.7/site-packages/apscheduler/executors/base.py", line 125, in run_job
retval = job.func(*job.args, **job.kwargs)
File "/usr/lib/python2.7/site-packages/mysqlsmom/mysqlsmom.py", line 339, in do_one_task
to_dest.upload_docs()
File "/usr/lib/python2.7/site-packages/mysqlsmom/mysqlsmom.py", line 141, in upload_docs
raise e
BulkIndexError: (u'1 document(s) failed to index.', [{u'update': {u'status': 400, u'_type': u'open', u'_index': u'admin', u'error': {u'caused_by': {u'reason': u'Failed to parse value [\x01] as only [true] or [false] are allowed.', u'type': u'illegal_argument_exception'}, u'reason': u'failed to parse field [show] of type [boolean]', u'type': u'mapper_parsing_exception'}, u'_id': u'423', u'data': {'doc': {u'status': '\x01', u'lang': -1, u'update_time': datetime.datetime(2019, 7, 8, 17, 23, 37), u'github': None, u'star': 5, u'descr': u'api', u'tags': u'["api", "springboot"]', u'favicon': u'/favicon/spring.png', u'title': u'SpringBoot', u'show': '\x01', u'gitee': None, u'link': u'https://docs.spring.io/spring-boot/docs/1.5.9.RELEASE/api/', u'insert_time': datetime.datetime(2016, 11, 28, 16, 14, 1), u'indexed': u'springboot', u'sorted': 1110, u'type': 100026, u'id': 423}, 'doc_as_upsert': True}}}])
^C
Aborted!

mysqlsmom 线程报错

[root@03AF15-A14 video]# mom run -c cron_config.py 2020-03-31 12:38:00,710 apscheduler.scheduler INFO Adding job tentatively -- it will be properly scheduled when the scheduler starts 2020-03-31 12:38:00,711 apscheduler.scheduler INFO Added job "do_one_task" to job store "default" 2020-03-31 12:38:00,711 apscheduler.scheduler INFO Scheduler started 2020-03-31 12:38:00,712 apscheduler.executors.default INFO Running job "do_one_task (trigger: interval[0:00:10], next run at: 2020-03-31 12:38:00 CST)" (scheduled at 2020-03-31 12:38:00.617124+08:00) 2020-03-31 12:38:01,113 apscheduler.executors.default ERROR Job "do_one_task (trigger: interval[0:00:10], next run at: 2020-03-31 12:38:10 CST)" raised an exception Traceback (most recent call last): File "/usr/lib/python2.7/site-packages/apscheduler/executors/base.py", line 125, in run_job retval = job.func(*job.args, **job.kwargs) File "/usr/lib/python2.7/site-packages/mysqlsmom/mysqlsmom.py", line 321, in do_one_task query = MyModel.raw(task["stream"]["sql"].replace("?", "%s"), (last_start_time,)).dicts().iterator() File "/usr/lib/python2.7/site-packages/peewee.py", line 1637, in iterator return iter(self.execute(database).iterator()) File "/usr/lib/python2.7/site-packages/peewee.py", line 1560, in inner return method(self, database, *args, **kwargs) File "/usr/lib/python2.7/site-packages/peewee.py", line 1631, in execute return self._execute(database) File "/usr/lib/python2.7/site-packages/peewee.py", line 1680, in _execute cursor = database.execute(self) File "/usr/lib/python2.7/site-packages/peewee.py", line 2638, in execute return self.execute_sql(sql, params, commit=commit) File "/usr/lib/python2.7/site-packages/peewee.py", line 2632, in execute_sql self.commit() File "/usr/lib/python2.7/site-packages/peewee.py", line 2424, in __exit__ reraise(new_type, new_type(*exc_args), traceback) File "/usr/lib/python2.7/site-packages/peewee.py", line 2625, in execute_sql cursor.execute(sql, params or ()) File "/usr/lib/python2.7/site-packages/pymysql/cursors.py", line 170, in execute result = self._query(query) File "/usr/lib/python2.7/site-packages/pymysql/cursors.py", line 328, in _query conn.query(q) File "/usr/lib/python2.7/site-packages/pymysql/connections.py", line 516, in query self._affected_rows = self._read_query_result(unbuffered=unbuffered) File "/usr/lib/python2.7/site-packages/pymysql/connections.py", line 727, in _read_query_result result.read() File "/usr/lib/python2.7/site-packages/pymysql/connections.py", line 1073, in read self._read_result_packet(first_packet) File "/usr/lib/python2.7/site-packages/pymysql/connections.py", line 1143, in _read_result_packet self._read_rowdata_packet() File "/usr/lib/python2.7/site-packages/pymysql/connections.py", line 1177, in _read_rowdata_packet packet = self.connection._read_packet() File "/usr/lib/python2.7/site-packages/pymysql/connections.py", line 670, in _read_packet % (packet_number, self._next_seq_id)) InternalError: Packet sequence number wrong - got 1 expected 16
怎么解决?

执行 python mysqlsmom.py config/example_cron.py 报错

错误

Traceback (most recent call last):
  File "mysqlsmom.py", line 357, in <module>
    handle_cron_stream(config_module)
  File "mysqlsmom.py", line 324, in handle_cron_stream
    scheduler = BlockingScheduler()
  File "/data/containers/mysqlsmom/.env/lib/python2.7/site-packages/apscheduler/schedulers/base.py", line 83, in __init__
    self.configure(gconfig, **options)
  File "/data/containers/mysqlsmom/.env/lib/python2.7/site-packages/apscheduler/schedulers/base.py", line 122, in configure
    self._configure(config)
  File "/data/containers/mysqlsmom/.env/lib/python2.7/site-packages/apscheduler/schedulers/base.py", line 691, in _configure
    self.timezone = astimezone(config.pop('timezone', None)) or get_localzone()
  File "/data/containers/mysqlsmom/.env/lib/python2.7/site-packages/tzlocal/unix.py", line 131, in get_localzone
    _cache_tz = _get_localzone()
  File "/data/containers/mysqlsmom/.env/lib/python2.7/site-packages/tzlocal/unix.py", line 56, in _get_localzone
    with open(tzpath, 'rb') as tzfile:
IOError: [Errno 21] Is a directory: '/etc/timezone'
配置:
# 修改数据库连接
CONNECTION = {
    'host': '127.0.0.1',
    'port': 3306,
    'user': 'root',
    'passwd': ''
}

# redis存储上次同步时间等信息
REDIS = {
    "host": "192.168.1.240",
    "port": 6379,
    "db": 0,
    "password": "",  # 不需要密码则注释或删掉该行
}

# 一次同步 BULK_SIZE 条数据到elasticsearch,不设置该配置项默认为1
BULK_SIZE = 1

# 修改elasticsearch节点
NODES = [{"host": "192.168,1,240", "port": 9200}]

TASKS = [
    {
        "stream": {
            "database": "welian_commodity",  # 在此数据库执行sql语句
            "sql": "select * from commodity where modity_time >= ?",  # 将该sql语句选中的数据同步到 elasticsearch
            "seconds": 10,  # 每隔 seconds 秒同步一次,
            "init_time": "2018-08-29 15:00:00"  # 只有第一次同步会加载
        },
        "jobs": [
            {
                "pipeline": [
                    {"set_id": {"field": "id"}}  # 默认设置 id字段的值 为elasticsearch中的文档id
                ],
                "dest": {
                    "es": {
                        "action": "upsert",
                        "index": "test_index",   # 设置 index
                        "type": "test"          # 设置 type     
                    }
                }
            }
        ]
    }
]

mysql update 同步异常

mysql 表中部分字段更新,会造成 mysqlsmom 异常,无法继续执行。
sql :
update tb_emp set message="update" where id=20190715101404;
error:
elasticsearch.exceptions.RequestError: RequestError(400, u'action_request_validation_exception', u'Validation Failed: 1: id is missing;')

BULK_SIZE 设置成1000

如果设置了bulk_size 修改的数据不足bulk_size 有没有办法把余量给同步了

TypeError: 'dict_items' object does not support indexing

Traceback (most recent call last):
File "mysqlsmom.py", line 223, in
handle_init_stream(config_module)
File "mysqlsmom.py", line 131, in handle_init_stream
rows = do_pipeline(job["pipeline"], event["values"])
File "mysqlsmom.py", line 65, in do_pipeline
func_name, kwargs = line.items()[0]
TypeError: 'dict_items' object does not support indexing

程序不再读取binlog日志

binlog记录一直卡在:100105408位置,日志也不再打印,也没有报错,持续这个状态很久了,数据库中的binlog位置已经到了126084862。重启程序后也没有任何变化,没有任何日志打印,不知道该如何排查该问题

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.