python mysql binlog_利用Python my-replication读取mysql的binlog

逑阳泽
2023-12-01

一、准备

Python 2.7;

Python 3.4 or Python 3.5 or Python 3.6;

MySQL 5.5 or MySQL 5.6 or MySQL 5.7;

二、Mysql 配置

1. Mysql 开启 binlog

查看 Mysql 是否开启 binlog show variables like 'log_bin' 如果 Value 为 OFF 则为未启日志文件。

找到 my,cnf 中 [mysqld] 添加如下

[mysqld]

# binlog 配置

server-id = 1

# 目录需要有写的权限

log-bin = /var/lib/mysql-binlog/mysql-bin.log

expire-logs-days = 10

max-binlog-size = 100M

binlog-format = row

重启mysql后 show variables like 'log_bin' , Value 为 ON即可

查询binlog 变动信息 show binlog events;

Binlog要满足如下条件:

MySQL> show variables like 'log_bin';

+---------------+-------+

| Variable_name | Value |

+---------------+-------+

| log_bin | ON |

+---------------+-------+

1 row in set (0.01 sec)

MySQL>show variables like 'binlog_format';

+---------------+-------+

| Variable_name | Value |

+---------------+-------+

| binlog_format | ROW |

+---------------+-------+

1 row in set (0.00 sec)

MySQL>show variables like 'binlog_row_image';

+------------------+-------+

| Variable_name | Value |

+------------------+-------+

| binlog_row_image | FULL |

+------------------+-------+

1 row in set (0.00 sec)

MySQL> show master status;

+------------------+----------+--------------+------------------+-------------------+

| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |

+------------------+----------+--------------+------------------+-------------------+

| mysql-bin.000001 | 431 | | | |

+------------------+----------+--------------+------------------+-------------------+

2. 配置同步账号

自己伪装成一个 slave 不断的从 MySQL 数据库获取 binlog 并解析。

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'replicator'@'%' IDENTIFIED BY '123456';

# 刷新权限

flush privileges;

三、Python

安装 mysql-replication 包

pip install mysql-replication

代码例子:

#!/usr/bin/env python

# -*- coding: utf-8 -*-

from pymysqlreplication import BinLogStreamReader

from pymysqlreplication.row_event import (

DeleteRowsEvent,

UpdateRowsEvent,

WriteRowsEvent,

)

import sys

import json

def main():

mysql_settings = {

'host': '127.0.0.7',

'port': 3306,

'user': 'replicator',

'passwd': '123456'

}

stream = BinLogStreamReader(

connection_settings=mysql_settings,

server_id=101,

blocking=True,

only_schemas=['zow'], # 监控表名

only_events=[DeleteRowsEvent, WriteRowsEvent, UpdateRowsEvent],

resume_stream=True,

log_file='mysql-bin.000001', log_pos=0) #show master status; 中的Position

for binlogevent in stream:

for row in binlogevent.rows:

event = {"schema": binlogevent.schema, "table": binlogevent.table, "log_pos": binlogevent.packet.log_pos}

if isinstance(binlogevent, DeleteRowsEvent):

event["action"] = "delete"

event["values"] = dict(row["values"].items())

event = dict(event.items())

elif isinstance(binlogevent, UpdateRowsEvent):

event["action"] = "update"

event["before_values"] = dict(row["before_values"].items())

event["after_values"] = dict(row["after_values"].items())

event = dict(event.items())

elif isinstance(binlogevent, WriteRowsEvent):

event["action"] = "insert"

event["values"] = dict(row["values"].items())

event = dict(event.items())

print( json.dumps(event))

sys.stdout.flush()

stream.close()

if __name__ == "__main__":

main()

运行结果:

# python mysql-replication.py

{"table": "test4", "log_pos": 884, "action": "insert", "schema": "test", "values": {"id": 1, "data2": "World", "data": "Hello"}}

{"table": "test4", "after_values": {"id": 1, "data2": "Hello", "data": "World"}, "log_pos": 1111, "action": "update", "schema": "test", "before_values": {"id": 1, "data2": "World", "data": "Hello"}}

{"table": "test4", "log_pos": 1320, "action": "delete", "schema": "test", "values": {"id": 1, "data2": "Hello", "data": "World"}}

{"table": "test4", "log_pos": 1529, "action": "insert", "schema": "test", "values": {"id": 2, "data2": "World", "data": "Hello"}}

{"table": "test4", "log_pos": 3010, "action": "delete", "schema": "test", "values": {"id": 2, "data2": "Hello", "data": "World"}}

参考:

https://github.com/noplay/python-mysql-replication

其他语言:

Java: https://github.com/shyiko/mysql-binlog-connector-java

GO: https://github.com/siddontang/go-mysql

PHP: Based on this this project https://github.com/krowinski/php-mysql-replication and https://github.com/fengxiangyun/mysql-replication

 类似资料: