Maxwell = MySQL + Kafka,设计的初衷是实时采集MySQL数据到Kafka,它是Kafka的Producer
内容 | Canal(服务端) | Maxwell(服务端+客户端) |
---|---|---|
语言 | Java | Java |
活跃度 | 活跃 | 活跃 |
HA | 支持 | 定制 但是支持断点还原功能 |
数据落地 | 定制 | 落地到Kafka |
分区 | 支持 | 支持 |
bootstrap(引导) | 不支持 | 支持 |
数据格式 | 格式自由 | Json(格式固定) |
文档 | 较详细 | 较详细 |
随机读 | 支持 | 支持 |
# 解压
[hadoop@spark000 software]$ tar -zxvf maxwell-1.24.0.tar.gz -C ~/app
[hadoop@spark000 maxwell-1.24.0]$ cp config.properties.example config.properties
[root@spark000 ~]# su - mysqladmin
[mysqladmin@spark000 ~]$ service mysql stop
# 修改 /etc/my.cnf 文件(如果已是下面值,无需修改)
[mysqladmin@spark000 ~]$ vim /etc/my.cnf
[mysqld]
binlog_format = ROW
[mysqladmin@spark000 ~]$ service mysql start
[mysqladmin@spark000 ~]$ mysql -uroot -p
# 查看 binlog_format 的值是否为 ROW
mysql> show variables like '%binlog%';
# 创建Maxwell的数据库和用户
mysql> create database maxwell;
mysql> GRANT ALL on maxwell.* to 'maxwell'@'%' identified by 'wujidata;
mysql> GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE on *.* to 'maxwell'@'%';
mysql> flush privileges;
# stdout 启动 maxwell
[root@spark000 ~]# su - hadoop
[hadoop@spark000 ~]$ cd ~/app/maxwell-1.24.0/
[hadoop@spark000 maxwell-1.24.0]$ bin/maxwell --user='maxwell' --password='wujidata' --host='127.0.0.1' --producer=stdout
# 在MySQL客户端执行表的增删改操作,查看控制台输出结果
mysql> insert into dept values(50,'Bigdata','Hangzhou');
mysql> update dept set loc = 'Beijing' where deptno = 50;
mysql> delete from dept where deptno = 50;
[hadoop@spark000 maxwell-1.24.0]$ bin/maxwell --user='maxwell' --password='wujidata' --host='127.0.0.1' --producer=stdout
Using kafka version: 1.0.0
15:31:07,021 WARN MaxwellMetrics - Metrics will not be exposed: metricsReportingType not configured.
15:31:07,279 INFO SchemaStoreSchema - Creating maxwell database
15:31:07,411 INFO Maxwell - Maxwell v1.24.0 is booting (StdoutProducer), starting at Position[BinlogPosition[mysql-bin.000010:4633], lastHeartbeat=0]
15:31:07,630 INFO AbstractSchemaStore - Maxwell is capturing initial schema
15:31:08,053 INFO BinlogConnectorReplicator - Setting initial binlog pos to: mysql-bin.000010:4633
15:31:08,145 INFO BinaryLogClient - Connected to 127.0.0.1:3306 at mysql-bin.000010/4633 (sid:6379, cid:12)
15:31:08,145 INFO BinlogConnectorLifecycleListener - Binlog connected.
{"database":"wujidata_ba","table":"dept","type":"insert","ts":1577691180,"xid":365,"commit":true,"data":{"deptno":50,"dname":"Bigdata","loc":"Hangzhou"}}
{"database":"wujidata_ba","table":"dept","type":"update","ts":1577691371,"xid":617,"commit":true,"data":{"deptno":50,"dname":"Bigdata","loc":"Beijing"},"old":{"loc":"Hangzhou"}}
{"database":"wujidata_ba","table":"dept","type":"delete","ts":1577691481,"xid":761,"commit":true,"data":{"deptno":50,"dname":"Bigdata","loc":"Beijing"}}
1.开启zookeeper
[hadoop@spark000 app]$ cd ~/app/zookeeper
[hadoop@spark000 zookeeper]$ bin/zkServer.sh start
2.启动Kafka
[hadoop@spark000 kafka_2.11-2.2.0_1]$ bin/kafka-server-start.sh -daemon config/server.properties
3.创建maxwell topic
[hadoop@spark000 kafka_2.11-2.2.0_1]$ bin/kafka-topics.sh --create --zookeeper spark000:2181/kafka --replication-factor 1 --partitions 1 --topic maxwell
Created topic maxwell.
4.kafka启动maxwell
[hadoop@spark000 maxwell-1.24.0]$ bin/maxwell --user='maxwell' --password='wujidata' --host='spark000' --producer=kafka --kafka.bootstrap.servers=spark000:9092 --kafka_topic=maxwell
5.启动Kafka消费者
[hadoop@spark000 kafka_2.11-2.2.0_1]$ bin/kafka-console-consumer.sh --bootstrap-server spark000:9092 --topic maxwell
{"database":"ruozedata_ba","table":"dept","type":"insert","ts":1577695733,"xid":6053,"commit":true,"data":{"deptno":50,"dname":"Bigdata","loc":"Hangzhou"}}
{"database":"ruozedata_ba","table":"dept","type":"update","ts":1577695894,"xid":6264,"commit":true,"data":{"deptno":50,"dname":"Bigdata","loc":"Beijing"},"old":{"loc":"Hangzhou"}}
{"database":"ruozedata_ba","table":"dept","type":"delete","ts":1577695900,"xid":6273,"commit":true,"data":{"deptno":50,"dname":"Bigdata","loc":"Beijing"}}
注:我用的kafka版本是2.2.0,启动Maxwell时并没有指定Kafka的版本,使用的仍然是默认的1.0.0版本,但Kafka消费者可以正常接收消息。
6.指定Kafka版本启动
[hadoop@spark000 libs]$ cp kafka-clients-2.2.0.jar /home/hadoop/app/maxwell-1.24.0/lib/kafka-clients
[hadoop@spark000 maxwell-1.24.0]$ bin/maxwell --user='maxwell' --password='wujidata' --host='spark000' --producer=kafka --kafka.bootstrap.servers=spark000:9092 --kafka_topic=maxwell --kafka_version=2.2.0
Using kafka version: 2.2.0
注:需要将 kafka-clients-2.2.0.jar 拷贝到 maxwell-1.24.0/lib/kafka-clients 目录下,测试结果消费正常。
7.Maxwell过滤指定表
[hadoop@spark000 maxwell-1.24.0]$ bin/maxwell --user='maxwell' --password='wujidata' --host='spark000' --filter 'exclude: *.*, include:wujidata_ba.dept1' --producer=kafka --kafka_version=2.2.0 --kafka.bootstrap.servers=spark000:9092 --kafka_topic=maxwell
# 测试
mysql> update dept set loc = 'Beijing' where deptno = 50;
mysql> update dept1 set loc = 'Beijing' where deptno = 50;
mysql> delete from dept where deptno = 50;
mysql> delete from dept1 where deptno = 50;
# Kafka消费者接收消息
{"database":"wujidata_ba","table":"dept1","type":"update","ts":1577700196,"xid":11892,"commit":true,"data":{"deptno":50,"dname":"Bigdata","loc":"Beijing"},"old":{"loc":"Hangzhou"}}
{"database":"wujidata_ba","table":"dept1","type":"delete","ts":1577700223,"xid":11934,"commit":true,"data":{"deptno":50,"dname":"Bigdata","loc":"Beijing"}}
注:–filter 'exclude: ., include:wujidata_ba.dept1’的意思是只监控wujidata_ba.dept1表的变化,其他的都不监控
8.Maxwell bootstrap
mysql> insert into maxwell.bootstrap (database_name, table_name) values ("ruozedata_ba", "dept");
mysql> select * from bootstrap;
# Kafka消费者端
{"database":"maxwell","table":"bootstrap","type":"insert","ts":1577761004,"xid":87673,"commit":true,"data":{"id":1,"database_name":"ruozedata_ba","table_name":"dept","where_clause":null,"is_complete":0,"inserted_rows":0,"total_rows":0,"created_at":null,"started_at":null,"completed_at":null,"binlog_file":null,"binlog_position":0,"client_id":"maxwell","comment":null}}{"database":"ruozedata_ba","table":"dept","type":"bootstrap-start","ts":1577761004,"data":{}}
{"database":"ruozedata_ba","table":"dept","type":"bootstrap-insert","ts":1577761004,"data":{"deptno":10,"dname":"ACCOUNTING","loc":"NEW YORK"}}
{"database":"ruozedata_ba","table":"dept","type":"bootstrap-insert","ts":1577761004,"data": {"deptno":20,"dname":"RESEARCH","loc":"DALLAS"}}
{"database":"ruozedata_ba","table":"dept","type":"bootstrap-insert","ts":1577761004,"data":{"deptno":30,"dname":"SALES","loc":"CHICAGO"}}
{"database":"ruozedata_ba","table":"dept","type":"bootstrap-insert","ts":1577761004,"data":{"deptno":40,"dname":"OPERATIONS","loc":"BOSTON"}}
{"database":"ruozedata_ba","table":"dept","type":"bootstrap-complete","ts":1577761004,"data":{}}