当前位置: 首页 > 工具软件 > Maxwell > 使用案例 >

Maxwell 的使用

宋高谊
2023-12-01

Maxwell 的使用
1.Maxwell介绍
Maxwell介绍Maxwell是由美国zendesk开源,用java编写的Mysql实时抓取软件,其抓取的原理也是基于binlog。
2.Maxwell 和canal工具对比
➢Maxwell没有canal那种server+client模式,只有一个server把数据发送到消息队列或redis。如果需要多个实例,通过指定不同配置文件启动多个进程。
➢Maxwell有一个亮点功能,就是canal只能抓取最新数据,对已存在的历史数据没有办法处理。而Maxwell有一个bootstrap功能,可以直接引导出完整的历史数据用于初始化,非常好用。
➢Maxwell不能直接支持HA,但是它支持断点还原,即错误解决后重启继续上次点儿读取数据。
➢Maxwell只支持json格式,而Canal如果用Server+client模式的话,可以自定义格式。
➢Maxwell比Canal更加轻量级。
3.安装
安装Maxwell

[root@hadoop01 module]$ tar -zxvf /opt/software/maxwell-1.25.0.tar.gz -C /opt/module/3.4使用前准备工作
➢在数据库中建立一个maxwell库用于存储Maxwell的元数据[root@hadoop01 module]$ mysql -uroot -p123456mysql> CREATE DATABASE maxwell ;
➢分配一个账号可以操作该数据库mysql> GRANT ALL ON maxwell.* TO ‘maxwell’@’%’ IDENTIFIED BY ‘123456’;
➢分配这个账号可以监控其他数据库的权限mysql> GRANT SELECT,REPLICATION SLAVE , REPLICATION CLIENT ON . TO maxwell@’%’;

4.使用Maxwell监控抓取MySQL数据
[[root@hadoop01 maxwell-1.25.0]$ cp config.properties.example config.properties
➢修改配置文件

producer=kafka
kafka.bootstrap.servers=hadoop01:9092,hadoop02:9092,hadoop03:9092
#需要添加
kafka_topic=user_info_log
#mysql login info
host=hadoop01
user=maxwell
password=123456
#需要添加后续初始化会用
client_id=maxwell_1

注意:默认还是输出到指定Kafka主题的一个kafka分区,因为多个分区并行可能会打乱binlog的顺序如果要提高并行度,首先设置kafka的分区数>1,然后设置producer_partition_by属性可选值producer_partition_by=database|table|primary_key|random|column

➢在/home/atguigu/bin目录下编写maxwell.sh启动脚本

[root@hadoop01 maxwell-1.25.0]$ vim/home/root/bin/maxwell.sh
/opt/module/maxwell-1.25.0/bin/maxwell --config /opt/module/maxwell-1.25.0/config.properties >/dev/null 2>&1 &

➢授予执行权限

[root@hadoop01 maxwell-1.25.0]$ sudo chmod +x /home/root/bin/maxwell.sh

➢运行启动程序

[atguigu@hadoop202 maxwell-1.25.0]$ maxwell.sh

➢启动Kafka消费客户端,观察结果

[root@hadoop01 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic user_info_log

5.Maxwell版本的ODS层处理
5.1执行不同操作,Maxwell和canal数据格式对比
➢执行insert测试语句

INSERT INTO z_user_info VALUES(30,‘zhang3’,‘13810001010’),(31,‘li4’,‘1389999999’);

  canal  

{“data”:[{“id”:“30”,“user_name”:“zhang3”,“tel”:“13810001010”},{“id”:“31”,“user_name”:“li4”,“tel”:“1389999999”}],“database”:“gmall-2020-04”,“es”:1589385314000,“id”:2,“isDdl”:false,“mysqlType”:{“id”:“bigint(20)”,“user_name”:“varchar(20)”,“tel”:“varchar(20)”},“old”:null,“pkNames”:[“id”],“sql”:"",“sqlType”:{“id”:-5,“user_name”:12,“tel”:12},“table”:“z_user_info”,“ts”:1589385314116,“type”:“INSERT”}

  maxwell

{“database”:“gmall-2020-04”,“table”:“z_user_info”,“type”:“insert”,“ts”:1589385314,“xid”:82982,“xoffset”:0,“data”:{“id”:30,“user_name”:“zhang3”,“tel”:“13810001010”}}
{“database”:“gmall202004”,“table”:“z_user_info”,“type”:“insert”,“ts”:1589385314,“xid”:82982,“commit”:true,“data”:{“id”:31,“user_name”:“li4”,“tel”:“1389999999”}}

5.2总结数据特点
➢日志结构
canal 每一条SQL会产生一条日志,如果该条Sql影响了多行数据,则已经会通过集合的方式归集在这条日志中。(即使是一条数据也会是数组结构)maxwell 以影响的数据为单位产生日志,即每影响一条数据就会产生一条日志。如果想知道这些日志是否是通过某一条sql产生的可以通过xid进行判断,相同的xid的日志来自同一sql。
➢数字类型
当原始数据是数字类型时,maxwell会尊重原始数据的类型不增加双引,变为字符串。canal一律转换为字符串。
➢带原始数据字段定义
canal数据中会带入表结构。maxwell更简洁。

5.3SparkStreaming对Topic分流业务代码

// An highlighted block
import com.alibaba.fastjson.{JSON, JSONArray, JSONObject}import com.atguigu.gmall.realtime.utils.{MyKafkaSink, MyKafkaUtil, OffsetManagerUtil}i
import  org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.spark.SparkConfimport org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{HasOffsetRanges, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/** Desc: 基于Maxwell从Kafka中读取业务数据,进行分流*/
object BaseDBMaxwellApp {
def main(args: Array[String]): Unit = {
	val sparkConf: SparkConf = newSparkConf().setMaster("local[4]").setAppName("BaseDBCanalApp")
	val ssc = new StreamingContext(sparkConf, Seconds(5))
	val topic = "ods_order_info"
	val groupId = "base_db_maxwell_group"//从Redis中读取偏移量
	var recoredDStream: InputDStream[ConsumerRecord[String, String]] = null
			val kafkaOffsetMap:Map[TopicPartition,Long=OffsetManagerUtil.getOffset(topic,groupId)
	if(kafkaOffsetMap!=null &&kafkaOffsetMap.size >0){
	recoredDStream = MyKafkaUtil.getKafkaStream(topic,ssc,kafkaOffsetMap,groupId)
	}
		else{recoredDStream = MyKafkaUtil.getKafkaStream(topic,ssc,groupId)
		}
//获取当前采集周期中处理的数据对应的分区已经偏移量
	var offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]
	val offsetDStream: DStream[ConsumerRecord[String, String]] = 										recoredDStream.transform {
			rdd => {offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRangesrdd
	}
}
//将从kafka中读取到的recore数据进行封装为json对象
val jsonObjDStream: DStream[JSONObject] = offsetDStream.map {
		record => {//获取value部分的json字符串
			val jsonStr: String = record.value()//将json格式字符串转换为json对象
			val jsonObject: JSONObject = JSON.parseObject(jsonStr)jsonObject
	}
}//从json对象中获取table和data,发送到不同的kafka主题
jsonObjDStream.foreachRDD{
		rdd=>{
			rdd.foreach{
				jsonObj=>{
	val opType: String = jsonObj.getString("type")
	val dataJsonObj: JSONObject = jsonObj.getJSONObject("data")
	if(dataJsonObj!=null && !dataJsonObj.isEmpty && !"delete".equals(opType)){
	//获取更新的表名
	val tableName: String = jsonObj.getString("table")//拼接发送的主题
	var sendTopic = "ods_" + tableName//向kafka发送消息MyKafkaSink.send(sendTopic,dataJsonObj.toString())
					}
			}
	}	//修改Redis中Kafka的偏移量OffsetManagerUtil.saveOffset(topic,groupId,offsetRanges)
		}
	}
	ssc.start()
	ssc.awaitTermination()
		}
	}

5.4测试
➢启动Redis
➢启动Maxwel
l➢运行BaseDBMaxwellApp程序
➢查看kafka下的主题

[root@hadoop01 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop01:9092 --topic ods_order_info

6.生产数据 查看情况

 类似资料: