mysql数据变更捕获的实现已经有很多开源工具,比如canal,debezium,maxwell等等。alibaba/canal实现了mysql连接协议,debezium和maxwell等则是利用mysql-binlog-connector-java开源工具连接mysql数据源,实现获取binlog日志。本篇文章介绍通过引入mysql-binlog-connector-java依赖,提供在线(即连接数据源,实时获取)和离线(读取离线文件)两种方式获取,解析binlog日志(利用mysql-binlog-connector-java的事件解析数据表增删改操作DML)。
话不多说,直接上代码。
BinLogConstants类主要提供目标数据库的连接信息,也包括解析模式选择(在线or离线)。
package com.binlog.parser;
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* 监听配置信息
*
* @author lgq
* @since 2022/7/27
**/
@Data
@Component
public class BinLogConstants {
@Value("${binlog.datasource.host}")
private String host;
@Value("${binlog.datasource.port}")
private int port;
@Value("${binlog.datasource.username}")
private String username;
@Value("${binlog.datasource.passwd}")
private String passwd;
@Value("${binlog.db}")
private String db;
@Value("${binlog.table}")
private String table;
@Value("${binlog.isOnline}")
private boolean isOnline;
public static final int consumerThreads = 4;
public static final int fileScanThreads = 4;
public static final long queueSleep = 1000;
}
BinLogListener类注册消费监听器,提供事件解析方法,并启动消费线程。
package com.binlog.parser;
import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import static com.github.shyiko.mysql.binlog.event.EventType.QUERY;
import static com.github.shyiko.mysql.binlog.event.EventType.isDelete;
import static com.github.shyiko.mysql.binlog.event.EventType.isUpdate;
import static com.github.shyiko.mysql.binlog.event.EventType.isWrite;
/**
* binlog日志监听器
*
* @author lgq
* @since 2022/7/21
**/
public abstract class BinLogListener {
private final int consumerThreads = BinLogConstants.consumerThreads;
private final BlockingQueue<BinLogItem> binLogItemQueue;
private final ExecutorService consumer;
// 存放每张数据表对应的listener
private final Multimap<String, BinLogConsumerListener> listeners;
private final MysqlConnConf mysqlConnConf;
private final Map<String, Map<String, ColumnInfo>> dbTableCols;
private String dbTable;
/**
* 监听器初始化
*
* @param conf
*/
public BinLogListener(MysqlConnConf conf) {
this.binLogItemQueue = new ArrayBlockingQueue<>(1024);
this.mysqlConnConf = conf;
this.listeners = ArrayListMultimap.create();
this.dbTableCols = new ConcurrentHashMap<>();
this.consumer = Executors.newFixedThreadPool(consumerThreads);
}
/**
* 注册消费监听
*
* @param db 数据库
* @param table 操作表
* @param listener 监听器
* @throws Exception
*/
public void regConsumerListener(String db, String table, BinLogConsumerListener listener) throws Exception {
String dbTable = BinLogUtils.getDBTable(db, table);
// 获取字段集合
Map<String, ColumnInfo> cols = BinLogUtils.getColMap(mysqlConnConf, db, table);
// 保存字段信息
dbTableCols.put(dbTable, cols);
// 保存当前注册的listener
listeners.put(dbTable, listener);
}
/**
* 读取日志解析并启动多线程消费
*
*/
public void consume() {
consumer.submit(() -> {
while (true) {
if (binLogItemQueue.size() > 0) {
try {
BinLogItem item = binLogItemQueue.take();
String dbTable = item.getDbTable();
listeners.get(dbTable).forEach(binLogListener -> binLogListener.onEvent(item));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Thread.sleep(BinLogConstants.queueSleep);
}
});
}
public void parseEvent(Event event) {
EventType eventType = event.getHeader().getEventType();
if (eventType == QUERY) {
QueryEventData data = event.getData();
System.out.println(data.getSql());
}
if (eventType == EventType.TABLE_MAP) {
TableMapEventData tableData = event.getData();
String db = tableData.getDatabase();
String table = tableData.getTable();
dbTable = BinLogUtils.getDBTable(db, table);
}
// 只处理添加删除更新三种操作
if (isWrite(eventType) || isUpdate(eventType) || isDelete(eventType)) {
if (isWrite(eventType)) {
WriteRowsEventData data = event.getData();
for (Serializable[] row : data.getRows()) {
if (dbTableCols.containsKey(dbTable)) {
BinLogItem item = BinLogItem.itemFromInsertOrDeleted(row, dbTableCols.get(dbTable),
eventType, event.getHeader().getTimestamp());
item.setDbTable(dbTable);
binLogItemQueue.add(item);
}
}
}
if (isUpdate(eventType)) {
UpdateRowsEventData data = event.getData();
for (Map.Entry<Serializable[], Serializable[]> row : data.getRows()) {
if (dbTableCols.containsKey(dbTable)) {
BinLogItem item = BinLogItem.itemFromUpdate(row, dbTableCols.get(dbTable),
eventType, event.getHeader().getTimestamp());
item.setDbTable(dbTable);
binLogItemQueue.add(item);
}
}
}
if (isDelete(eventType)) {
DeleteRowsEventData data = event.getData();
for (Serializable[] row : data.getRows()) {
if (dbTableCols.containsKey(dbTable)) {
BinLogItem item = BinLogItem.itemFromInsertOrDeleted(row, dbTableCols.get(dbTable),
eventType, event.getHeader().getTimestamp());
item.setDbTable(dbTable);
binLogItemQueue.add(item);
}
}
}
}
}
public abstract void startParseBinLog();
}
OnlineBinLogListener类通过BinaryLogClient开启连接远程实时拉取binlog日志。
package com.binlog.parser;
import java.io.IOException;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import lombok.extern.slf4j.Slf4j;
/**
* 数据库监听器
*
* @author lgq
* @since 2022/7/21
**/
@Slf4j
public class OnlineBinLogListener extends BinLogListener implements BinaryLogClient.EventListener {
private final BinaryLogClient parseClient;
/**
* 监听器初始化
*
* @param conf
*/
public OnlineBinLogListener(MysqlConnConf conf) {
super(conf);
BinaryLogClient client = new BinaryLogClient(conf.getHost(), conf.getPort(), conf.getUsername(), conf.getPasswd());
EventDeserializer eventDeserializer = new EventDeserializer();
/*
eventDeserializer.setCompatibilityMode( //序列化
EventDeserializer.CompatibilityMode.DATE_AND_TIME_AS_LONG,
EventDeserializer.CompatibilityMode.CHAR_AND_BINARY_AS_BYTE_ARRAY
);
*/
client.setEventDeserializer(eventDeserializer);
this.parseClient = client;
}
/**
* 监听处理
*
* @param event
*/
@Override
public void onEvent(Event event) {
parseEvent(event);
}
@Override
public void startParseBinLog() {
// 先启动消费线程
consume();
parseClient.registerEventListener(this);
// 连接数据库,开始拉取binlog日志(没有过滤库,表信息)
try {
parseClient.connect();
} catch (IOException e) {
e.printStackTrace();
}
}
}
OfflineBinLogListener实时获取指定目录中新增的完整的binlog文件(默认为500m)进行离线解析。
package com.binlog.parser;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import javax.annotation.Resource;
import com.github.shyiko.mysql.binlog.BinaryLogFileReader;
import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import com.binlog.config.FileProperties;
import com.binlog.util.SpringUtil;
import lombok.extern.slf4j.Slf4j;
/**
* 数据库监听器
*
* @author lgq
* @since 2022/7/21
**/
@Slf4j
public class OfflineBinLogListener extends BinLogListener {
private MysqlBinLogReader mysqlBinLogReader;
private FileProperties fileProperties;
/**
* 监听器初始化
*
* @param conf
*/
public OfflineBinLogListener(MysqlConnConf conf) {
super(conf);
this.mysqlBinLogReader = (MysqlBinLogReader)SpringUtil.getApplicationContext().getBean("mysqlBinLogReader");
this.fileProperties = (FileProperties)SpringUtil.getApplicationContext().getBean("fileProperties");
}
public OfflineBinLogListener(MysqlConnConf conf, MysqlBinLogReader mysqlBinLogReader,
FileProperties fileProperties) {
super(conf);
this.mysqlBinLogReader = mysqlBinLogReader;
this.fileProperties = fileProperties;
}
@Override
public void startParseBinLog() {
// 先启动消费线程
consume();
BlockingQueue<String> fileNameQueue = mysqlBinLogReader.getFileNameQueue();
ExecutorService offlineFileProcess = mysqlBinLogReader.getOfflineFileProcess();
offlineFileProcess.submit(() -> {
Thread.currentThread().setName("scanBinLogFileConsumer");
while (true) {
if (!fileNameQueue.isEmpty()) {
String fileName;
try {
fileName = fileNameQueue.take();
File binlogFile = new File(fileProperties.getLocalBackupDir(), fileName);
EventDeserializer eventDeserializer = new EventDeserializer();
BinaryLogFileReader reader = new BinaryLogFileReader(binlogFile, eventDeserializer);
for (Event event; (event = reader.readEvent()) != null; ) {
parseEvent(event);
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
});
}
}
BinLogListenerStarter监听启动器注册指定的库,表监听器,并启动日志解析和消费线程。
package com.binlog.parser;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import java.util.Objects;
/**
* 测试监听器
* SpringBoot启动成功后的执行业务线程操作
* CommandLineRunner去实现此操作
* 在有多个可被执行的业务时,通过使用 @Order 注解,设置各个线程的启动顺序(value值由小到大表示启动顺序)。
* 多个实现CommandLineRunner接口的类必须要设置启动顺序,不然程序启动会报错!
*
* @author lgq
* @since 2022/7/21
**/
@Slf4j
@Component
@Order(value = 1)
public class BinLogListenerStarter implements CommandLineRunner {
@Resource
private BinLogConstants binLogConstants;
@Override
public void run(String... args) throws Exception {
log.info("初始化配置信息:" + binLogConstants.toString());
// 初始化配置信息
MysqlConnConf conf = new MysqlConnConf(binLogConstants.getHost(), binLogConstants.getPort(),
binLogConstants.getUsername(), binLogConstants.getPasswd());
// 初始化监听器
BinLogListener binLogListener;
if (binLogConstants.isOnline()) {
binLogListener = new OnlineBinLogListener(conf);
} else {
binLogListener = new OfflineBinLogListener(conf);
}
// 获取table集合
List<String> tableList = BinLogUtils.getListByStr(binLogConstants.getTable());
if (Objects.isNull(tableList) || tableList.size() == 0) {
return;
}
// 注册监听
tableList.forEach(table -> {
log.info("注册监听信息,注册DB:{}, 注册表:{}", binLogConstants.getDb(), table);
try {
binLogListener.regConsumerListener(binLogConstants.getDb(), table, item -> {
log.info("监听逻辑处理:打印一下数据变更信息: {}", item.toString());
});
} catch (Exception e) {
log.error("BinLog监听异常:{}", e);
}
});
// 开启日志获取和解析业务线程
binLogListener.startParseBinLog();
}
}
ColumnInfo存储表字段信息。
package com.binlog.parser;
import lombok.Data;
/**
* 字段属性对象
*
* @author lgq
* @since 2022/7/21
**/
@Data
public class ColumnInfo {
public int inx;
public String colName; // 列名
public String dataType; // 类型
public String schema; // 数据库
public String table; // 表
public boolean isPKC; //是否主键
public ColumnInfo(String schema, String table, int idx, String colName, String dataType, boolean isPKC) {
this.schema = schema;
this.table = table;
this.colName = colName;
this.dataType = dataType;
this.inx = idx;
this.isPKC = isPKC;
}
}
BinLogItem存储解析日志获得的结构化数据。
package com.binlog.parser;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.google.common.collect.Maps;
import lombok.Data;
import static com.github.shyiko.mysql.binlog.event.EventType.isDelete;
import static com.github.shyiko.mysql.binlog.event.EventType.isWrite;
/**
* binlog对象
*
* @author lgq
* @since 2021/7/26
**/
@Data
public class BinLogItem implements Serializable {
private static final long serialVersionUID = 1L;
private String dbTable;
private EventType eventType;
private Long timestamp = null;
private Long serverId = null;
// 存储字段-之前的值之后的值
private Map<String, Serializable> before = null;
private Map<String, Serializable> after = null;
// 存储字段--类型
private Map<String, ColumnInfo> columnInfoMap = null;
// json格式的数据变更信息
private Map<String, Object> messageJson = new HashMap<>();
/**
* 新增或者删除操作数据格式化
*/
public static BinLogItem itemFromInsertOrDeleted(Serializable[] row, Map<String, ColumnInfo> columnInfoMap,
EventType eventType, Long timestamp) {
if (null == row || null == columnInfoMap) {
return null;
}
/* if (row.length != columnInfoMap.size()) {
return null;
}*/
// 初始化Item
BinLogItem item = new BinLogItem();
item.eventType = eventType;
item.columnInfoMap = columnInfoMap;
item.before = Maps.newHashMap();
item.after = Maps.newHashMap();
item.timestamp = timestamp;
// json预设主键
item.messageJson.put("pkc", null);
Map<String, Serializable> beOrAf = Maps.newHashMap();
columnInfoMap.entrySet().forEach(entry -> {
String key = entry.getKey();
ColumnInfo columnInfo = entry.getValue();
beOrAf.put(key, row[columnInfo.inx]);
if (columnInfo.isPKC) {
// json设置主键
item.messageJson.put("pkc", key);
}
});
// 写操作放after,删操作放before
if (isWrite(eventType)) {
item.after = beOrAf;
}
if (isDelete(eventType)) {
item.before = beOrAf;
}
// 构造json格式的数据变更信息
item.messageJson.put("op", Operator.valueOf(eventType));
item.messageJson.put("ts", item.timestamp);
item.messageJson.put("before", item.before);
item.messageJson.put("after", item.after);
return item;
}
/**
* 更新操作数据格式化
*/
public static BinLogItem itemFromUpdate(Map.Entry<Serializable[], Serializable[]> mapEntry, Map<String, ColumnInfo> columnInfoMap,
EventType eventType, Long timestamp) {
if (null == mapEntry || null == columnInfoMap) {
return null;
}
// 初始化Item
BinLogItem item = new BinLogItem();
item.eventType = eventType;
item.columnInfoMap = columnInfoMap;
item.before = Maps.newHashMap();
item.after = Maps.newHashMap();
item.timestamp = timestamp;
// json预设主键
item.messageJson.put("pkc", null);
Map<String, Serializable> be = Maps.newHashMap();
Map<String, Serializable> af = Maps.newHashMap();
columnInfoMap.entrySet().forEach(entry -> {
String key = entry.getKey();
ColumnInfo columnInfo = entry.getValue();
be.put(key, mapEntry.getKey()[columnInfo.inx]);
af.put(key, mapEntry.getValue()[columnInfo.inx]);
if (columnInfo.isPKC) {
// json设置主键
item.messageJson.put("pkc", key);
}
});
item.before = be;
item.after = af;
/*
* 构造json格式的数据变更信息
*/
item.messageJson.put("op", Operator.valueOf(eventType));
item.messageJson.put("ts", item.timestamp);
item.messageJson.put("before", item.before);
item.messageJson.put("after", item.after);
return item;
}
@Override
public String toString() {
return messageJson.toString();
}
}
BinLogUtils提供一些工具类方法供解析时使用。
package com.binlog.parser;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.io.Serializable;
import java.sql.*;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import static com.github.shyiko.mysql.binlog.event.EventType.isDelete;
import static com.github.shyiko.mysql.binlog.event.EventType.isUpdate;
import static com.github.shyiko.mysql.binlog.event.EventType.isWrite;
/**
* 监听工具
*
* @author lgq
* @since 2021/7/27
**/
@Slf4j
@Component
public class BinLogUtils {
private static BinLogUtils binLogUtils;
@PostConstruct
public void init() {
binLogUtils = this;
}
/**
* 拼接dbTable
*/
public static String getDBTable(String db, String table) {
return db + "-" + table;
}
/**
* 获取columns集合
*/
public static Map<String, ColumnInfo> getColMap(MysqlConnConf conf, String db, String table) throws ClassNotFoundException {
try {
Class.forName("com.mysql.jdbc.Driver");
// 保存当前注册的表的column信息
Connection connection = DriverManager.getConnection("jdbc:mysql://" + conf.getHost() + ":"
+ conf.getPort(), conf.getUsername(), conf.getPasswd());
// 执行sql
String preSql = "SELECT TABLE_SCHEMA, TABLE_NAME, COLUMN_NAME, " +
"DATA_TYPE, ORDINAL_POSITION, case when COLUMN_KEY = 'PRI' THEN 'Y' ELSE 'N' END IS_PKC" +
" FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = ? and TABLE_NAME = ?";
PreparedStatement ps = connection.prepareStatement(preSql);
ps.setString(1, db);
ps.setString(2, table);
ResultSet rs = ps.executeQuery();
Map<String, ColumnInfo> map = new HashMap<>(rs.getRow());
while (rs.next()) {
String schema = rs.getString("TABLE_SCHEMA");
String tableName = rs.getString("TABLE_NAME");
String column = rs.getString("COLUMN_NAME");
int idx = rs.getInt("ORDINAL_POSITION");
String dataType = rs.getString("DATA_TYPE");
String isPKC = rs.getString("IS_PKC");
if (column != null && idx >= 1) {
map.put(column, new ColumnInfo(schema, tableName, idx - 1, column, dataType,
"Y".equals(isPKC))); // sql的位置从1开始
}
}
ps.close();
rs.close();
return map;
} catch (SQLException e) {
log.error("load db conf error, db_table={}:{} ", db, table, e);
}
return null;
}
/**
* 根据DBTable获取table
*
* @param dbTable
* @return java.lang.String
*/
public static String getTable(String dbTable) {
if (Objects.isNull(dbTable)) {
return "";
}
String[] split = dbTable.split("-");
if (split.length == 2) {
return split[1];
}
return "";
}
/**
* 将逗号拼接字符串转List
*
* @param str
* @return
*/
public static List<String> getListByStr(String str) {
if (Objects.isNull(str)) {
return Lists.newArrayList();
}
return Arrays.asList(str.split(","));
}
/**
* 根据操作类型获取对应集合
*
* @param binLogItem
* @return
*/
public static Map<String, Serializable> getOptMap(BinLogItem binLogItem) {
// 获取操作类型
EventType eventType = binLogItem.getEventType();
if (isWrite(eventType) || isUpdate(eventType)) {
return binLogItem.getAfter();
}
if (isDelete(eventType)) {
return binLogItem.getBefore();
}
return null;
}
/**
* 获取操作类型
*
* @param binLogItem
* @return
*/
public static Integer getOptType(BinLogItem binLogItem) {
// 获取操作类型
EventType eventType = binLogItem.getEventType();
if (isWrite(eventType)) {
return 1;
}
if (isUpdate(eventType)) {
return 2;
}
if (isDelete(eventType)) {
return 3;
}
return null;
}
}
MysqlBinLogReader初始化并启动扫描指定目录下binlog日志的线程。
package com.binlog.parser;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Resource;
import com.binlog.config.FileProperties;
import com.binlog.util.FileInfoUtil;
import lombok.Data;
import org.springframework.stereotype.Component;
/**
* @author lgq
* @since 2022/7/21
*/
@Component
@Data
public class MysqlBinLogReader {
private final int fileScanThreads = 4;
private final ExecutorService offlineFileProcess;
private final BlockingQueue<String> fileNameQueue;
@Resource
private FileProperties fileProperties;
public MysqlBinLogReader(){
this.offlineFileProcess = Executors.newFixedThreadPool(fileScanThreads);
this.fileNameQueue = new ArrayBlockingQueue<>(1024);
}
public MysqlBinLogReader(FileProperties fileProperties){
this.offlineFileProcess = Executors.newFixedThreadPool(fileScanThreads);
this.fileNameQueue = new ArrayBlockingQueue<>(1024);
this.fileProperties = fileProperties;
}
public void scanBinLogFile() {
offlineFileProcess.submit(() -> {
Thread.currentThread().setName("scanBinLogFileProducer");
while (true) {
TreeMap<Long, String> newBinLogFilesPath = FileInfoUtil.getNewBinLogFilesPath(
fileProperties.getLocalBackupDir(), fileProperties.getBackupLogName(),
fileProperties.getLastFileName());
newBinLogFilesPath.forEach((createTime, fileName) -> {
fileNameQueue.add(fileName);
});
if (newBinLogFilesPath.size() > 0) {
fileProperties.setLastFileName(newBinLogFilesPath.lastEntry().getValue());
}
}
});
}
}
FileInfoUtil提供一些文件扫描的工具类,比如获取指定目录下新增的binlog日志文件(该文件确定已经写入完成,后面mysql的binlog日志是写入新的日志文件)。
package com.binlog.util;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.TreeMap;
public class FileInfoUtil {
public static TreeMap<Long, String> getNewBinLogFilesPath(String localBackupDir,
String backupLogName,
String lastLogPath) throws IOException {
//treeMap默认根据字典序排序,时间小的在前面
TreeMap<Long, String> newBinLogFileMap = new TreeMap<>();
File dir = new File(localBackupDir);
Long lastFileCreateTime = 0L;
if (!ObjectsUtil.isEmpty(lastLogPath)) {
File lastFile = new File(lastLogPath);
lastFileCreateTime = Files.readAttributes(lastFile.toPath(), BasicFileAttributes.class)
.creationTime().toMillis();
}
String [] files = dir.list();
for (int i = 0; i < files.length; i++) {
//排除日志文件
if (files[i].equals(backupLogName)) {
continue;
}
File file = new File(dir, files[i]);
//排除异常情况:目录
if (file.isDirectory()) {
continue;
}
long fileCreateTime = Files.readAttributes(file.toPath(), BasicFileAttributes.class).creationTime().toMillis();
if (fileCreateTime > lastFileCreateTime) {
newBinLogFileMap.put(fileCreateTime, files[i]);
}
}
return newBinLogFileMap;
}
}
FilesScanStarter开启文件扫描线程,捕获新增的日志文件。
package com.binlog.parser;
import javax.annotation.Resource;
import com.binlog.config.FileProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
/**
* @author lgq
* @since 2022/7/21
*/
@Slf4j
@Order(value = 0)
@Component
public class FilesScanStarter implements CommandLineRunner {
@Resource
private MysqlBinLogReader mysqlBinLogReader;
@Override
public void run(String... args) {
mysqlBinLogReader.scanBinLogFile();
}
}
其他的配置文件如下。
package com.binlog.parser;
import lombok.AllArgsConstructor;
import lombok.Data;
/**
* 数据库配置
*
* @author lgq
* @since 2022/7/21
**/
@Data
@AllArgsConstructor
public class MysqlConnConf {
private String host;
private int port;
private String username;
private String passwd;
}
//application.yml文件
/*
server:
port: 8080
binlog:
localBackupDir: E:\backup\binlog
backupLog: E:\backup\binlog\backuplog.txt
backupLogName: backuplog.txt
lastFileName:
table: abc
db: db1
isOnline: false
datasource:
host: 1.1.1.1
port: 3306
username: abc
passwd: 123456
*/
离线方式的指定目录文件获取方式有很多,比如scp,tfp等命令拷贝,或者直接通过mysql提供的mysqlbinlog命令实时拉取,最后给出mysqlbinlog远程实时拉取binlog日志的脚本。
#!/bin/sh
BACKUP_BIN=/usr/bin/mysqlbinlog
LOCAL_BACKUP_DIR=/backup/binlog/
BACKUP_LOG=/backup/binlog/backuplog
REMOTE_HOST=1.1.1.1
REMOTE_PORT=3306
REMOTE_USER=abc
REMOTE_PASS=123456
FIRST_BINLOG=binlog.000001
#time to wait before reconnecting after failure
SLEEP_SECONDS=10
##create local_backup_dir if necessary
mkdir -p ${LOCAL_BACKUP_DIR}
cd ${LOCAL_BACKUP_DIR}
## 运行while循环,连接断开后等待指定时间,重新连接
while :
do
if [ `ls -A "${LOCAL_BACKUP_DIR}" |wc -l` -eq 0 ];then
LAST_FILE=${FIRST_BINLOG}
else
LAST_FILE=`ls -l ${} | grep -v backuplog |tail -n 1 |awk '{print $9}'`
fi
${BACKUP_BIN} --raw --read-from-remote-server --stop-never --host=${REMOTE_HOST} --port=${REMOTE_PORT} --user=${REMOTE_USER} --password=${REMOTE_PASS} ${LAST_FILE}
echo "`date +"%Y/%m/%d %H:%M:%S"` mysqlbinlog停止,返回代码:$?" | tee -a ${BACKUP_LOG}
echo "${SLEEP_SECONDS}秒后再次连接并继续备份" | tee -a ${BACKUP_LOG}
sleep ${SLEEP_SECONDS}
done
本文介绍的解析过程(尤其是离线方式)有个问题,当数据表结构变更频繁(DDL)时会出现解析错误问题。(离线数据滞后,日志中的数据表结构可能与当前实际的数据表结构不一致,导致解析失败或数据解析错误),解决这个问题可以参考开源软件源代码的处理逻辑,比如maxwell。