当前位置: 首页 > 工具软件 > go-mysql > 使用案例 >

MySQL Redis一致性方案 推荐go-mysql/canal

申思远
2023-12-01

在项目初期时, 为解决Mysql和Redis的数据同步问题, 选择了阿里开源的Canal方案, 然后压力测试时, 解决了Redis并发写入的问题后, 随之而来的瓶颈出现在了Canal client消费速度, 本机测试大约1.3w/s, 不光如此, 使用阿里的Canal还存在一些弊端:

外部程序, 资料不够详尽, 社区交流效率低下, JAVA源码(我不熟悉)且臃肿(大致瞥了一眼)等等因素, 导致遇到问题不好解决, 目前开发测试阶段遇到问题也没有得到即时答复.排查问题困难重重.

然后经过一番思索和查阅, 找到了一个比较好的解决办法, golang实现, 同步速度大约6.2w/s, 与阿里canal官方公布的10w/s还差点距离,但是比我实际使用canal的1.3w/s提升太多了,然后这里我主要是够用了, 就没继续优化了

 当然还是得感谢MysqlToAll这个开源项目, 我的快速实现时站在他的肩膀上的.在搜索到这个MysqlToAll之前,我也是有思考过自己去实现过类似阿里Canal这种模拟Mysql从库拉取binlog数据的方案, 但是不知道有什么库可以用, 然后就看到了MysqlToAll使用的mysql官方canal库:

github.com/go-mysql-org/go-mysql/canal

好东西啊!! 使用golang的同学完全可以抛弃阿里的canal了

引用官方文档介绍一下:

// Canal can sync your MySQL data into everywhere, like Elasticsearch, Redis, etc...
// MySQL must open row format for binlog

摘录一段核心代码:

func (c *Canal) handleRowsEvent(e *replication.BinlogEvent) error {
	ev := e.Event.(*replication.RowsEvent)

	// Caveat: table may be altered at runtime.
	schema := string(ev.Table.Schema)
	table := string(ev.Table.Table)

	t, err := c.GetTable(schema, table)
	if err != nil {
		return err
	}
	var action string
	switch e.Header.EventType {
	case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2:
		action = InsertAction
	case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2:
		action = DeleteAction
	case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2:
		action = UpdateAction
	default:
		return errors.Errorf("%s not supported now", e.Header.EventType)
	}
	events := newRowsEvent(t, action, ev.Rows, e.Header)
	return c.eventHandler.OnRow(events)
}

这是Canal里每个连接接收到数据之后按照类型 处理row类型事件的接口, 经过加工之后再交给最终的handler(需要自己实现). 例如MysqlToAll的实现:

func (this *CommonEventHandler) OnRow(e *canal.RowsEvent) error {
	log.Debug("OnRow, action:%s row's count:%d", e.Action, len(e.Rows))
	entity := &common.RawLogEntity{}
	entity.Action = e.Action
	entity.Rows = e.Rows
	entity.TableName = e.Table.Name
	entity.Header = []string{}
	entity.HeaderMap = map[string]int{}
	entity.ValueMap = map[string]interface{}{}

	for columnIndex, currColumn := range e.Table.Columns {
		entity.Header = append(entity.Header, currColumn.Name)
		entity.HeaderMap[currColumn.Name] = columnIndex
		entity.ValueMap[currColumn.Name] = e.Rows[len(e.Rows)-1][columnIndex]
	}
	this.CurrOutput.Write(entity)

	return nil
}

主要是OnRow,其他接口就不再赘述了.

这里强调一下, 关于上面这个OnRow里接受到rows条数,我实测会出现如下结果:

 [debug  ] OnRow, action:update row's count:2980
 [debug  ] OnRow, action:update row's count:2982
 [debug  ] OnRow, action:update row's count:2118
 [debug  ] OnRow, action:insert row's count:1
 [debug  ] OnRow, action:insert row's count:1
 [debug  ] OnRow, action:update row's count:2
 [debug  ] OnRow, action:insert row's count:1

很清楚了吧..然后附上库里RowsEvent 的定义和注释会更明了:

// RowsEvent is the event for row replication.
type RowsEvent struct {
	Table  *schema.Table
	Action string
	// changed row list
	// binlog has three update event version, v0, v1 and v2.
	// for v1 and v2, the rows number must be even.
	// Two rows for one event, format is [before update row, after update row]
	// for update v0, only one row for a event, and we don't support this version.
	Rows [][]interface{}
	// Header can be used to inspect the event
	Header *replication.EventHeader
}

最后说一下MysqlToAll的一些不足之处, 也是我为什么没用它的原因:

1. rowdata和posdata没分离channal, 我分离后速度从6w/s提升到6.2w/s

2.OnRow回调接受到的rows并不只有一条数据, 他的使用方式会丢失数据

3.有很多为解决通用性而写的代码,会些许影响整体效率

2021.9.26补充:这里有一个开源的基于go-mysql/canal 的库wj596/go-mysql-transfer

就分享到这里吧, 如有错误和不足之处,欢迎留言讨论指正. 如果这篇分享对你有用,记得点赞哦!

不想重复造轮子的也可以留下邮箱地址

 类似资料: