1,py-mysql-elasticsearch-sync《github 工具》
简介:
python版本的mysql导入es的插件
通过配置config.yaml文件,该插件可以将mysql的数据导入到es中。
使用:
es-sync config.xml
程序流程:
思路为:1分析mysqldump导出数据 - > 2分析binlog
该插件,首先会执行mysqldump指令,将mysql中的数据和数据表定义提取出来。然后使用xml解析出需要使用的部分。对比自定义的mapping规则,选择id和字段。最终批量上传给es服务器。
完成上一步操作后,程序会读取mysql的binlog,并从binlog第一行开始读取,直到binlog最后一行,同时将程序执行的binlog位置记录到binlog.info中,用于下次从此开始导入数据。
另外,若已经存在binlog.info。该插件不会执行第一步的操作,而是直接从binlog.info记录的位置开始导入数据。
以下操作说明:
如下操作是将数据库清空后,打开了数据库binlog,然后往数据库插入数据。数据插入完毕后,开始向es导数据。
因此遇到了409版本错误。 (因为mysqldump导出的数据就是binlog中对应的数据操作记录)
安装:
pip install py-mysql-elasticsearch-sync
注意可能会出现依赖包版本冲突 --- pip install --ignore-installed xxxx
添加开启binlog的mysql配置
在mysql配置为恩建中添加以下配置
[mysqld_safe]
log-bin=mysql-bin
binlog_format=mixed
max_binlog_size=500M
binlog-do-db=testdata_big
binlog-ignore-db=mysql
server-id=1
查看mysql的binlog记录位置
mysql> show variables like '%log_bin%';
+---------------------------------+-------------------------------------------------------+
| Variable_name | Value |
+---------------------------------+-------------------------------------------------------+
| log_bin | ON |
| log_bin_basename | /opt/lovelsl/MessageSoft/Mysql/data/mysql-bin |
| log_bin_index | /opt/lovelsl/MessageSoft/Mysql/data/mysql-bin.index |
| log_bin_trust_function_creators | OFF |
| log_bin_use_v1_row_events | OFF |
| sql_log_bin | ON |
+---------------------------------+-------------------------------------------------------+
6 rows in set (0.00 sec)
mysql的binlog处理
重置所有的binlog
reset master
删除某个binlog文件之前的所有binlog文件
purge binary logs to 'bin-log.000021';
查看日志
show binary logs;
在导入数据到es之前,先完成es索引的创建,以及es索引中mapping的定义
导入文件,自动生成索引,一般被禁用。
创建索引:
一般为了安全性,禁止提交数据时自动创建索引。
curl -XPUT -H "Content-Type: application/json" "localhost:9200/testdata"
创建索引的mapping:
curl -XPUT -H "Content-Type: application/json" "127.0.0.1:9200/testdata/_mapping/nodes" -d '
{
"properties": {
"id": {
"type": "integer"
},
"node_flag": {
"type": "integer"
},
"owner_uid": {
"type": "keyword"
},
"parent_uid": {
"type": "keyword"
},
"node_name": {
"type": "keyword"
},
"group_name": {
"type": "keyword"
},
"interface": {
"type": "keyword"
},
"custum_name": {
"type": "keyword"
},
"osname": {
"type": "keyword"
},
"osversion": {
"type": "keyword"
},
"update_time": {
"type": "keyword"
},
"last_active_time": {
"type": "keyword"
},
"last_login_time": {
"type": "keyword"
},
"mac": {
"type": "keyword"
},
"last_local_ip": {
"type": "keyword"
},
"last_local_ip6": {
"type": "keyword"
},
"create_time": {
"type": "keyword"
},
"create_ip": {
"type": "keyword"
},
"long_ip": {
"type": "integer"
}
}
}'
修改es 的mapping:一般而言,mapping一旦创建,只能增加字段,而不能修改已经存在的mapping字段,但是允许增加一个字段。但一般多采用重建索引的方式。
这里由于long_ip的type为integer不能存储ip地址对应的大整数,将修改为long
删除索引:
[root@jiangmin /]# curl -XDELETE "localhost:9200/testdata"
注意,不要带上type
重建索引:
curl -XPUT -H "Content-Type: application/json" "localhost:9200/testdata"
重建索引的mapping
同上,不做重复操作了