在项目初期时, 为解决Mysql和Redis的数据同步问题, 选择了阿里开源的Canal方案, 然后压力测试时, 解决了Redis并发写入的问题后, 随之而来的瓶颈出现在了Canal client消费速度, 本机测试大约1.3w/s, 不光如此, 使用阿里的Canal还存在一些弊端:
外部程序, 资料不够详尽, 社区交流效率低下, JAVA源码(我不熟悉)且臃肿(大致瞥了一眼)等等因素, 导致遇到问题不好解决, 目前开发测试阶段遇到问题也没有得到即时答复.排查问题困难重重.
然后经过一番思索和查阅, 找到了一个比较好的解决办法, golang实现, 同步速度大约6.2w/s, 与阿里canal官方公布的10w/s还差点距离,但是比我实际使用canal的1.3w/s提升太多了,然后这里我主要是够用了, 就没继续优化了
当然还是得感谢MysqlToAll这个开源项目, 我的快速实现时站在他的肩膀上的.在搜索到这个MysqlToAll之前,我也是有思考过自己去实现过类似阿里Canal这种模拟Mysql从库拉取binlog数据的方案, 但是不知道有什么库可以用, 然后就看到了MysqlToAll使用的mysql官方canal库:
好东西啊!! 使用golang的同学完全可以抛弃阿里的canal了
引用官方文档介绍一下:
// Canal can sync your MySQL data into everywhere, like Elasticsearch, Redis, etc... // MySQL must open row format for binlog
摘录一段核心代码:
func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
ev := e.Event.(*replication.RowsEvent)
// Caveat: table may be altered at runtime.
schema := string(ev.Table.Schema)
table := string(ev.Table.Table)
t, err := c.GetTable(schema, table)
if err != nil {
return err
}
var action string
switch e.Header.EventType {
case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
action = InsertAction
case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
action = DeleteAction
case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
action = UpdateAction
default:
return errors.Errorf("%s not supported now", e.Header.EventType)
}
events := newRowsEvent(t, action, ev.Rows, e.Header)
return c.eventHandler.OnRow(events)
}
这是Canal里每个连接接收到数据之后按照类型 处理row类型事件的接口, 经过加工之后再交给最终的handler(需要自己实现). 例如MysqlToAll的实现:
func (this *CommonEventHandler) OnRow(e *canal.RowsEvent) error {
log.Debug("OnRow, action:%s row's count:%d", e.Action, len(e.Rows))
entity := &common.RawLogEntity{}
entity.Action = e.Action
entity.Rows = e.Rows
entity.TableName = e.Table.Name
entity.Header = []string{}
entity.HeaderMap = map[string]int{}
entity.ValueMap = map[string]interface{}{}
for columnIndex, currColumn := range e.Table.Columns {
entity.Header = append(entity.Header, currColumn.Name)
entity.HeaderMap[currColumn.Name] = columnIndex
entity.ValueMap[currColumn.Name] = e.Rows[len(e.Rows)-1][columnIndex]
}
this.CurrOutput.Write(entity)
return nil
}
主要是OnRow,其他接口就不再赘述了.
这里强调一下, 关于上面这个OnRow里接受到rows条数,我实测会出现如下结果:
[debug ] OnRow, action:update row's count:2980
[debug ] OnRow, action:update row's count:2982
[debug ] OnRow, action:update row's count:2118
[debug ] OnRow, action:insert row's count:1
[debug ] OnRow, action:insert row's count:1
[debug ] OnRow, action:update row's count:2
[debug ] OnRow, action:insert row's count:1
很清楚了吧..然后附上库里RowsEvent 的定义和注释会更明了:
// RowsEvent is the event for row replication.
type RowsEvent struct {
Table *schema.Table
Action string
// changed row list
// binlog has three update event version, v0, v1 and v2.
// for v1 and v2, the rows number must be even.
// Two rows for one event, format is [before update row, after update row]
// for update v0, only one row for a event, and we don't support this version.
Rows [][]interface{}
// Header can be used to inspect the event
Header *replication.EventHeader
}
最后说一下MysqlToAll的一些不足之处, 也是我为什么没用它的原因:
1. rowdata和posdata没分离channal, 我分离后速度从6w/s提升到6.2w/s
2.OnRow回调接受到的rows并不只有一条数据, 他的使用方式会丢失数据
3.有很多为解决通用性而写的代码,会些许影响整体效率
2021.9.26补充:这里有一个开源的基于go-mysql/canal 的库wj596/go-mysql-transfer
就分享到这里吧, 如有错误和不足之处,欢迎留言讨论指正. 如果这篇分享对你有用,记得点赞哦!
不想重复造轮子的也可以留下邮箱地址