当前位置: 首页 > 工具软件 > TOML-To-Go > 使用案例 >

go-mysql-elasticsearch 使用

澹台俊晖
2023-12-01

文档

github 链接:GitHub - go-mysql-org/go-mysql-elasticsearch: Sync MySQL data into elasticsearch

参考博客

注意事项

  1. go-mysql-elasticsearch启动以后不能进行修改表结构
  2. 使用go-mysql-elasticsearch的必需要添加就是先开启binlog模式
  3. 记得先创建索引
  4. 提前先同步数据,不然binlog日志之前的数据无法同步。
  5. 可以支持es7 和mysql8版本 不知道为啥官方文档没写可以用

优点

  • 无需三方工具直接监听 mysql binlog 即可同步数据到 es

  • 所占内存小 cpu 水位不会飙升(相对使用logstash)

监控

服务器配置 2 核 4g 代码,mysql和es在同一台服务器上

开启时间 11:00 后

测试数据1万

安装

 

Git clone https://github.com/go-mysql-org/go-mysql-elasticsearch.git
[root@hecs-202871 go-mysql-elasticsearch]# pwd
/home/golang/gopath/go-mysql-elasticsearch
#安装
go install 
#编译
make
[root@hecs-202871 go-mysql-elasticsearch]# ls
bin              cmd         elastic  go.mod  LICENSE   README.md  var
clear_vendor.sh  Dockerfile  etc      go.sum  Makefile  river
#进行配置,配置比较重要,在下边将详细去将
vim /home/golang/gopath/go-mysql-elasticsearch/etc/river.toml
#启动
 ./bin/go-mysql-elasticsearch -config=./etc/river.toml

配置river

# MySQL 的相关配置
# 指定用户必须具备复制权限
my_addr = "127.0.0.1:3306"
my_user = "canal"
my_pass = "123456"
my_charset = "utf8"

# ES 相关配置
es_addr = "127.0.0.1:9200"
es_user = ""
es_pass = ""

# Inner Http status address 不加会报错
stat_addr = "127.0.0.1:12800"
stat_path = "/metrics"

# 数据源配置
# 以 Slave 模式工作
server_id = 10001
# mysql/mariadb
flavor = "mysql"

# mysqldump 路径,如果为空或者未设置,会跳过这一环节。
mysqldump = "mysqldump"
bulk_size = 128
flush_bulk_time = "200ms"
skip_no_pk_table = false

[[source]]
# 数据库名称
schema = "canal"
# 数据表同步范围,支持通配符
tables = ["uuid"]

# 规则定义
[[rule]]
# 数据库名称
schema = "canal"
# 规则对应的数据表,支持通配符
table = "uuid"
# 目标 ES 索引
index = "uuid"
# 该规则在 ES 中生成的文档类型
type = "_doc"

其他参数

# 规则对应的数据表,支持通配符 table = "uuid*"

多表配置

# MySQL address, user and password
# user must have replication privilege in MySQL.
my_addr = "127.0.0.1:3306"
my_user = "root"
my_pass = ""
my_charset = "utf8"

# Set true when elasticsearch use https
#es_https = false
# Elasticsearch address
es_addr = "127.0.0.1:9200"
# Elasticsearch user and password, maybe set by shield, nginx, or x-pack
es_user = ""
es_pass = ""

# Path to store data, like master.info, if not set or empty,
# we must use this to support breakpoint resume syncing.
# TODO: support other storage, like etcd.
data_dir = "./var"

# Inner Http status address
stat_addr = "127.0.0.1:12800"
stat_path = "/metrics"

# pseudo server id like a slave
server_id = 1001

# mysql or mariadb
flavor = "mysql"

# mysqldump execution path
# if not set or empty, ignore mysqldump.
mysqldump = "mysqldump"

# if we have no privilege to use mysqldump with --master-data,
# we must skip it.
#skip_master_data = false

# minimal items to be inserted in one bulk
bulk_size = 128

# force flush the pending requests if we don't have enough items >= bulk_size
flush_bulk_time = "200ms"

# Ignore table without primary key
skip_no_pk_table = false

# MySQL data source
[[source]]
schema = "test"

# Only below tables will be synced into Elasticsearch.
# "t_[0-9]{4}" is a wildcard table format, you can use it if you have many sub tables, like table_0000 - table_1023
# I don't think it is necessary to sync all tables in a database.
tables = ["t", "t_[0-9]{4}", "tfield", "tfilter"]

# Below is for special rule mapping

# Very simple example
#
# desc t;
# +-------+--------------+------+-----+---------+-------+
# | Field | Type         | Null | Key | Default | Extra |
# +-------+--------------+------+-----+---------+-------+
# | id    | int(11)      | NO   | PRI | NULL    |       |
# | name  | varchar(256) | YES  |     | NULL    |       |
# +-------+--------------+------+-----+---------+-------+
#
# The table `t` will be synced to ES index `test` and type `t`.
[[rule]]
schema = "test"
table = "t"
index = "test"
type = "t"

# Wildcard table rule, the wildcard table must be in source tables
# All tables which match the wildcard format will be synced to ES index `test` and type `t`.
# In this example, all tables must have same schema with above table `t`;
[[rule]]
schema = "test"
table = "t_[0-9]{4}"
index = "test"
type = "t"

# Simple field rule
#
# desc tfield;
# +----------+--------------+------+-----+---------+-------+
# | Field    | Type         | Null | Key | Default | Extra |
# +----------+--------------+------+-----+---------+-------+
# | id       | int(11)      | NO   | PRI | NULL    |       |
# | tags     | varchar(256) | YES  |     | NULL    |       |
# | keywords | varchar(256) | YES  |     | NULL    |       |
# +----------+--------------+------+-----+---------+-------+
#
[[rule]]
schema = "test"
table = "tfield"
index = "test"
type = "tfield"

[rule.field]
# Map column `id` to ES field `es_id`
id="es_id"
# Map column `tags` to ES field `es_tags` with array type 
tags="es_tags,list"
# Map column `keywords` to ES with array type
keywords=",list"

# Filter rule 
#
# desc tfilter;
# +-------+--------------+------+-----+---------+-------+
# | Field | Type         | Null | Key | Default | Extra |
# +-------+--------------+------+-----+---------+-------+
# | id    | int(11)      | NO   | PRI | NULL    |       |
# | c1    | int(11)      | YES  |     | 0       |       |
# | c2    | int(11)      | YES  |     | 0       |       |
# | name  | varchar(256) | YES  |     | NULL    |       |
# +-------+--------------+------+-----+---------+-------+
#
[[rule]]
schema = "test"
table = "tfilter"
index = "test"
type = "tfilter"

# Only sync following columns
filter = ["id", "name"]

# id rule
#
# desc tid_[0-9]{4};
# +----------+--------------+------+-----+---------+-------+
# | Field    | Type         | Null | Key | Default | Extra |
# +----------+--------------+------+-----+---------+-------+
# | id       | int(11)      | NO   | PRI | NULL    |       |
# | tag      | varchar(256) | YES  |     | NULL    |       |
# | desc     | varchar(256) | YES  |     | NULL    |       |
# +----------+--------------+------+-----+---------+-------+
#
[[rule]]
schema = "test"
table = "tid_[0-9]{4}"
index = "test"
type = "t"
# The es doc's id will be `id`:`tag`
# It is useful for merge muliple table into one type while theses tables have same PK 
id = ["id", "tag"]

Docker 部署

待续

 类似资料: