一、准备
Python 2.7;
Python 3.4 or Python 3.5 or Python 3.6;
MySQL 5.5 or MySQL 5.6 or MySQL 5.7;
二、Mysql 配置
1. Mysql 开启 binlog
查看 Mysql 是否开启 binlog show variables like 'log_bin' 如果 Value 为 OFF 则为未启日志文件。
找到 my,cnf 中 [mysqld] 添加如下
[mysqld]
# binlog 配置
server-id = 1
# 目录需要有写的权限
log-bin = /var/lib/mysql-binlog/mysql-bin.log
expire-logs-days = 10
max-binlog-size = 100M
binlog-format = row
重启mysql后 show variables like 'log_bin' , Value 为 ON即可
查询binlog 变动信息 show binlog events;
Binlog要满足如下条件:
MySQL> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
1 row in set (0.01 sec)
MySQL>show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
1 row in set (0.00 sec)
MySQL>show variables like 'binlog_row_image';
+------------------+-------+
| Variable_name | Value |
+------------------+-------+
| binlog_row_image | FULL |
+------------------+-------+
1 row in set (0.00 sec)
MySQL> show master status;
+------------------+----------+--------------+------------------+-------------------+
| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+-------------------+
| mysql-bin.000001 | 431 | | | |
+------------------+----------+--------------+------------------+-------------------+
2. 配置同步账号
自己伪装成一个 slave 不断的从 MySQL 数据库获取 binlog 并解析。
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator'@'%' IDENTIFIED BY '123456';
# 刷新权限
flush privileges;
三、Python
安装 mysql-replication 包
pip install mysql-replication
代码例子:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pymysqlreplication import BinLogStreamReader
from pymysqlreplication.row_event import (
DeleteRowsEvent,
UpdateRowsEvent,
WriteRowsEvent,
)
import sys
import json
def main():
mysql_settings = {
'host': '127.0.0.7',
'port': 3306,
'user': 'replicator',
'passwd': '123456'
}
stream = BinLogStreamReader(
connection_settings=mysql_settings,
server_id=101,
blocking=True,
only_schemas=['zow'], # 监控表名
only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],
resume_stream=True,
log_file='mysql-bin.000001', log_pos=0) #show master status; 中的Position
for binlogevent in stream:
for row in binlogevent.rows:
event = {"schema": binlogevent.schema, "table": binlogevent.table, "log_pos": binlogevent.packet.log_pos}
if isinstance(binlogevent, DeleteRowsEvent):
event["action"] = "delete"
event["values"] = dict(row["values"].items())
event = dict(event.items())
elif isinstance(binlogevent, UpdateRowsEvent):
event["action"] = "update"
event["before_values"] = dict(row["before_values"].items())
event["after_values"] = dict(row["after_values"].items())
event = dict(event.items())
elif isinstance(binlogevent, WriteRowsEvent):
event["action"] = "insert"
event["values"] = dict(row["values"].items())
event = dict(event.items())
print( json.dumps(event))
sys.stdout.flush()
stream.close()
if __name__ == "__main__":
main()
运行结果:
# python mysql-replication.py
{"table": "test4", "log_pos": 884, "action": "insert", "schema": "test", "values": {"id": 1, "data2": "World", "data": "Hello"}}
{"table": "test4", "after_values": {"id": 1, "data2": "Hello", "data": "World"}, "log_pos": 1111, "action": "update", "schema": "test", "before_values": {"id": 1, "data2": "World", "data": "Hello"}}
{"table": "test4", "log_pos": 1320, "action": "delete", "schema": "test", "values": {"id": 1, "data2": "Hello", "data": "World"}}
{"table": "test4", "log_pos": 1529, "action": "insert", "schema": "test", "values": {"id": 2, "data2": "World", "data": "Hello"}}
{"table": "test4", "log_pos": 3010, "action": "delete", "schema": "test", "values": {"id": 2, "data2": "Hello", "data": "World"}}
参考:
https://github.com/noplay/python-mysql-replication
其他语言:
Java: https://github.com/shyiko/mysql-binlog-connector-java
GO: https://github.com/siddontang/go-mysql
PHP: Based on this this project https://github.com/krowinski/php-mysql-replication and https://github.com/fengxiangyun/mysql-replication