使用go-mysql-elasticsearch开源工具同步数据到ES
go-mysql-elasticsearch是用于同步mysql数据到ES集群的一个开源工具,
项目github地址:https://github.com/siddontang/go-mysql-elasticsearch
go-mysql-elasticsearch的基本原理是:如果是第一次启动该程序,首先使用mysqldump工具对源mysql数据库进行一次全量同步,通过elasticsearch client执行操作写入数据到ES;然后实现了一个mysql client,作为slave连接到源mysql,源mysql作为master会将所有数据的更新操作通过binlog event同步给slave, 通过解析binlog event就可以获取到数据的更新内容,之后写入到ES.
另外,该工具还提供了操作统计的功能,每当有数据增删改操作时,会将对应操作的计数加1,程序启动时会开启一个http服务,通过调用http接口可以查看增删改操作的次数。
使用事项:
1. mysql binlog必须是ROW模式
2. 要同步的mysql数据表必须包含主键,否则直接忽略,这是因为如果数据表没有主键,UPDATE和DELETE操作就会因为在ES中找不到对应的document而无法进行同步
3. 不支持程序运行过程中修改表结构
4. 要赋予用于连接mysql的账户RELOAD权限以及REPLICATION权限, SUPER权限:
GRANT REPLICATION SLAVE ON *.* TO 'elastic'@'172.16.32.44';
GRANT RELOAD ON *.* TO 'elastic'@'172.16.32.44';
UPDATE mysql.user SET Super_Priv='Y' WHERE user='elastic' AND host='172.16.32.44';
使用方法
git clone https://github.com/siddontang/go-mysql-elasticsearch
cd go-mysql-elasticsearch/src/github.com/siddontang/go-mysql-elasticsearch
vi etc/river.toml, 修改配置文件,同步172.16.0.101:3306数据库中的webservice.building表到ES集群172.16.32.64:9200的building index(更详细的配置文件说明可以参考项目文档)
# MySQL address, user and password
# user must have replication privilege in MySQL.
my_addr = "172.16.0.101:3306"
my_user = "bellen"
my_pass = "Elastic_123"
my_charset = "utf8"
# Set true when elasticsearch use https
#es_https = false
# Elasticsearch address
es_addr = "172.16.32.64:9200"
# Elasticsearch user and password, maybe set by shield, nginx, or x-pack
es_user = ""
es_pass = ""
# Path to store data, like master.info, if not set or empty,
# we must use this to support breakpoint resume syncing.
# TODO: support other storage, like etcd.
data_dir = "./var"
# Inner Http status address
stat_addr = "127.0.0.1:12800"
# pseudo server id like a slave
server_id = 1001
# mysql or mariadb
flavor = "mariadb"
# mysqldump execution path
# if not set or empty, ignore mysqldump.
mysqldump = "mysqldump"
# if we have no privilege to use mysqldump with --master-data,
# we must skip it.
#skip_master_data = false
# minimal items to be inserted in one bulk
bulk_size = 128
# force flush the pending requests if we don't have enough items >= bulk_size
flush_bulk_time = "200ms"
# Ignore table without primary key
skip_no_pk_table = false
# MySQL data source
[[source]]
schema = "webservice"
tables = ["building"]
[[rule]]
schema = "webservice"
table = "building"
index = "building"
type = "buildingtype"
在ES集群中创建building index, 因为该工具并没有使用ES的auto create index功能,如果index不存在会报错
执行命令:./bin/go-mysql-elasticsearch -config=./etc/river.toml