Maxwell 的使用
1.Maxwell介绍
Maxwell介绍Maxwell是由美国zendesk开源,用java编写的Mysql实时抓取软件,其抓取的原理也是基于binlog。
2.Maxwell 和canal工具对比
➢Maxwell没有canal那种server+client模式,只有一个server把数据发送到消息队列或redis。如果需要多个实例,通过指定不同配置文件启动多个进程。
➢Maxwell有一个亮点功能,就是canal只能抓取最新数据,对已存在的历史数据没有办法处理。而Maxwell有一个bootstrap功能,可以直接引导出完整的历史数据用于初始化,非常好用。
➢Maxwell不能直接支持HA,但是它支持断点还原,即错误解决后重启继续上次点儿读取数据。
➢Maxwell只支持json格式,而Canal如果用Server+client模式的话,可以自定义格式。
➢Maxwell比Canal更加轻量级。
3.安装
安装Maxwell
[root@hadoop01 module]$ tar -zxvf /opt/software/maxwell-1.25.0.tar.gz -C /opt/module/3.4使用前准备工作
➢在数据库中建立一个maxwell库用于存储Maxwell的元数据[root@hadoop01 module]$ mysql -uroot -p123456mysql> CREATE DATABASE maxwell ;
➢分配一个账号可以操作该数据库mysql> GRANT ALL ON maxwell.* TO ‘maxwell’@’%’ IDENTIFIED BY ‘123456’;
➢分配这个账号可以监控其他数据库的权限mysql> GRANT SELECT,REPLICATION SLAVE , REPLICATION CLIENT ON . TO maxwell@’%’;
4.使用Maxwell监控抓取MySQL数据
[[root@hadoop01 maxwell-1.25.0]$ cp config.properties.example config.properties
➢修改配置文件
producer=kafka
kafka.bootstrap.servers=hadoop01:9092,hadoop02:9092,hadoop03:9092
#需要添加
kafka_topic=user_info_log
#mysql login info
host=hadoop01
user=maxwell
password=123456
#需要添加后续初始化会用
client_id=maxwell_1
注意:默认还是输出到指定Kafka主题的一个kafka分区,因为多个分区并行可能会打乱binlog的顺序如果要提高并行度,首先设置kafka的分区数>1,然后设置producer_partition_by属性可选值producer_partition_by=database|table|primary_key|random|column
➢在/home/atguigu/bin目录下编写maxwell.sh启动脚本
[root@hadoop01 maxwell-1.25.0]$ vim/home/root/bin/maxwell.sh
/opt/module/maxwell-1.25.0/bin/maxwell --config /opt/module/maxwell-1.25.0/config.properties >/dev/null 2>&1 &
➢授予执行权限
[root@hadoop01 maxwell-1.25.0]$ sudo chmod +x /home/root/bin/maxwell.sh
➢运行启动程序
[atguigu@hadoop202 maxwell-1.25.0]$ maxwell.sh
➢启动Kafka消费客户端,观察结果
[root@hadoop01 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic user_info_log
5.Maxwell版本的ODS层处理
5.1执行不同操作,Maxwell和canal数据格式对比
➢执行insert测试语句
INSERT INTO z_user_info VALUES(30,‘zhang3’,‘13810001010’),(31,‘li4’,‘1389999999’);
canal
{“data”:[{“id”:“30”,“user_name”:“zhang3”,“tel”:“13810001010”},{“id”:“31”,“user_name”:“li4”,“tel”:“1389999999”}],“database”:“gmall-2020-04”,“es”:1589385314000,“id”:2,“isDdl”:false,“mysqlType”:{“id”:“bigint(20)”,“user_name”:“varchar(20)”,“tel”:“varchar(20)”},“old”:null,“pkNames”:[“id”],“sql”:"",“sqlType”:{“id”:-5,“user_name”:12,“tel”:12},“table”:“z_user_info”,“ts”:1589385314116,“type”:“INSERT”}
maxwell
{“database”:“gmall-2020-04”,“table”:“z_user_info”,“type”:“insert”,“ts”:1589385314,“xid”:82982,“xoffset”:0,“data”:{“id”:30,“user_name”:“zhang3”,“tel”:“13810001010”}}
{“database”:“gmall202004”,“table”:“z_user_info”,“type”:“insert”,“ts”:1589385314,“xid”:82982,“commit”:true,“data”:{“id”:31,“user_name”:“li4”,“tel”:“1389999999”}}
5.2总结数据特点
➢日志结构
canal 每一条SQL会产生一条日志,如果该条Sql影响了多行数据,则已经会通过集合的方式归集在这条日志中。(即使是一条数据也会是数组结构)maxwell 以影响的数据为单位产生日志,即每影响一条数据就会产生一条日志。如果想知道这些日志是否是通过某一条sql产生的可以通过xid进行判断,相同的xid的日志来自同一sql。
➢数字类型
当原始数据是数字类型时,maxwell会尊重原始数据的类型不增加双引,变为字符串。canal一律转换为字符串。
➢带原始数据字段定义
canal数据中会带入表结构。maxwell更简洁。
5.3SparkStreaming对Topic分流业务代码
// An highlighted block
import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}import com.atguigu.gmall.realtime.utils.{MyKafkaSink, MyKafkaUtil, OffsetManagerUtil}i
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/** Desc: 基于Maxwell从Kafka中读取业务数据,进行分流*/
object BaseDBMaxwellApp {
def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = newSparkConf().setMaster("local[4]").setAppName("BaseDBCanalApp")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val topic = "ods_order_info"
val groupId = "base_db_maxwell_group"//从Redis中读取偏移量
var recoredDStream: InputDStream[ConsumerRecord[String, String]] = null
val kafkaOffsetMap:Map[TopicPartition,Long=OffsetManagerUtil.getOffset(topic,groupId)
if(kafkaOffsetMap!=null &&kafkaOffsetMap.size >0){
recoredDStream = MyKafkaUtil.getKafkaStream(topic,ssc,kafkaOffsetMap,groupId)
}
else{recoredDStream = MyKafkaUtil.getKafkaStream(topic,ssc,groupId)
}
//获取当前采集周期中处理的数据对应的分区已经偏移量
var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]
val offsetDStream: DStream[ConsumerRecord[String, String]] = recoredDStream.transform {
rdd => {offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesrdd
}
}
//将从kafka中读取到的recore数据进行封装为json对象
val jsonObjDStream: DStream[JSONObject] = offsetDStream.map {
record => {//获取value部分的json字符串
val jsonStr: String = record.value()//将json格式字符串转换为json对象
val jsonObject: JSONObject = JSON.parseObject(jsonStr)jsonObject
}
}//从json对象中获取table和data,发送到不同的kafka主题
jsonObjDStream.foreachRDD{
rdd=>{
rdd.foreach{
jsonObj=>{
val opType: String = jsonObj.getString("type")
val dataJsonObj: JSONObject = jsonObj.getJSONObject("data")
if(dataJsonObj!=null && !dataJsonObj.isEmpty && !"delete".equals(opType)){
//获取更新的表名
val tableName: String = jsonObj.getString("table")//拼接发送的主题
var sendTopic = "ods_" + tableName//向kafka发送消息MyKafkaSink.send(sendTopic,dataJsonObj.toString())
}
}
} //修改Redis中Kafka的偏移量OffsetManagerUtil.saveOffset(topic,groupId,offsetRanges)
}
}
ssc.start()
ssc.awaitTermination()
}
}
5.4测试
➢启动Redis
➢启动Maxwel
l➢运行BaseDBMaxwellApp程序
➢查看kafka下的主题
[root@hadoop01 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic ods_order_info
6.生产数据 查看情况