将mysql的binlog以json的形式输出到kafka,它的常见用例包括ETL,缓存建立/过期,指标收集,搜索索引和服务间通信
json示例:
mysql> update test.maxwell set daemon = 'firebus! firebus!' where id = 1;
maxwell: {
"database": "test",
"table": "maxwell",
"type": "update",
"ts": 1449786341,
"xid": 940786,
"commit": true,
"data": {"id":1, "daemon": "Firebus! Firebus!"},
"old": {"daemon": "Stanislaw Lem"}
}
brew install maxwell
$ vi my.cnf
[mysqld]
server_id=1
log-bin=master
binlog_format=row
权限: Maxwell需要权限来在schema_database选项指定的数据库中存储状态(默认:maxwell)
mysql> CREATE USER 'maxwell'@'localhost' IDENTIFIED BY 'XXXXXX';
mysql> GRANT ALL ON maxwell.* TO 'maxwell'@'localhost';
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'localhost';
To stdout
bin/maxwell --user='maxwell' --password='xxx' --host='127.0.0.1' --producer=stdout
To Kafka
bin/maxwell --user='maxwell' --password='xxx' --host='127.0.0.1' --producer=kafka --kafka.bootstrap.servers=localhost:9092 --kafka_topic=maxwell(--kafka_topic=namespace_%{database}_%{table})
$ vi my.cnf
[mysqld]
server_id=1
log-bin=master
binlog_format=row
gtid-mode=ON
log-slave-updates=ON
enforce-gtid-consistency=true
当处于GTID模式时,Maxwell将在进行母版更改后透明地选择一个新的复制位置。请注意,您仍然必须将maxwell重新指向新的母版。目前,Maxwell对GTID的支持被认为是beta质量的。值得注意的是,Maxwell无法透明地从传统复制方案升级到GTID复制方案。当前,当您启用gtid模式时,Maxwell将从“主机所在的位置”重新捕获架构和GTID位置
kafka分区,根据hashcode进行分区存储,默认的是对数据库进行hashcode来进行分区,还可以对数据库,表,primary_key,transaction_id,列等进行分区。
eg1:可以将Maxwell配置为从特定表中过滤掉更新,这由 --filter 命令行标志控制
--filter = 'exclude: foodb.*, include: foodb.tbl, include: foodb./table_\d+/'
This example tells Maxwell to suppress all updates that happen on foodb except for updates to tbl and any table in foodb matching the regexp /table \d+/
eg2:Filter options are evaluated in the order specified, so in this example we suppress everything except updates in the db1 database.
--filter = 'exclude: *.*, include: db1.*'
Maxwell can also include/exclude based on column values:
--filter = 'exclude: db.tbl.col = reject'
过滤掉db.tbl.col的值为‘reject‘的更新
(will reject any row in db.tbl that contains col and where the stringified value of "col" is "reject". Column filters are ignored if the specified column is not present, so --filter = 'exclude: *.*.col_a = *' will exclude updates to any table that contains col_a, but include every other table.)
--filter = 'blacklist: bad_db.*'
--javascript FILE
function process_row(row) {
if ( row.database == "test" && row.table == "bar" ) {
var username = row.data.get("username");
if ( username == "osheroff" )
row.suppress();
row.data.put("username", username.toUpperCase());
}
}
--encrypt=[none|data|all]
none : 不加密; data: 对值加密;all: 对所有数据加密