一、前言:
现在需要经mysql的数据定时同步到kafka,一开始用的canal但是不知道为啥没成功,启动起来了但就是数据过去不,查看log也不报错,所以转到flume。发现flume挺好用的,而且灵活。
二、同步原理:
Flume原理就不多介绍了,文章一大把,我说一下,连接sql的原理。我调的是网上的开源插件,看了一下log日志,特意分享一下原理,有错误的希望指出,大家交流。其实就是source每隔一段时间(自定义)执行一遍sql,来查询数据库,将查询到的数据sink到kafka。如果数据量大的话,建议查询mysql从库。
三、部署
1、版本说明:
flume:1.9.0
mysql:不限
2、下载flume:apache-flume-1.9.0-bin.tar.gz
下载插件:flume-ng-sql-source-json-1.0.jar
下载数据库连接jar包
apache flume打包下载:flume-mysql.zip
cloudera CDH flume版本下载:cloudera-flume-mysql.zip
注:如果flume版本是CDH的,导入原生的flume-ng-sql-source-json-1.0.jar,Flume在读取mysql的时候会报错:
java.lang.NoSuchMethodError: org.apache.flume.Context.getSubProperties(Ljava/lang/String;)Lcom/google/common/collect/ImmutableMap;所以根据自己的需求下载。
2、安装Flume
1)将文件上传到linux服务器,我上传到/root目录下了。
2)创建文件夹
mkdir /home/install/
3)解压flume
tar -zxvf apache-flume-1.9.0-bin.tar.gz
4)将下载的插件和mysql的jar包导入lib目录下
mv /root/flume-ng-sql-source-json-1.0.jar /home/install/apache-flume-1.9.0-bin/lib
mv /root/mysql-connector-java.jar /home/install/apache-flume-1.9.0-bin/lib
4)创建flume的properties文件(/home/install/apache-flume-1.9.0-bin/conf)
cp /home/install/apache-flume-1.9.0-bin/conf/flume-conf.properties.template /home/install/apache-flume-1.9.0-bin/conf/flume-conf.properties
5)配置flume-conf.properties文件:
agent.sources = s1
agent.channels = c1
agent.sinks = k1
###########sql source#################
# For each one of the sources, the type is defined
agent.sources.s1.type = org.keedio.flume.source.SQLSource
agent.sources.s1.hibernate.connection.url = jdbc:mysql://192.168.26.234:3306/test
# Hibernate Database connection properties
agent.sources.s1.hibernate.connection.user = root
agent.sources.s1.hibernate.connection.password = xloVR91vQ7bRMXcW
agent.sources.s1.hibernate.connection.autocommit = true
agent.sources.s1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
agent.sources.s1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
agent.sources.s1.run.query.delay=5000 #查询间隔
agent.sources.s1.status.file.path = /home/install/apache-flume-1.9.0-bin/status
agent.sources.s1.status.file.name = sqlSource.status
# Custom query
agent.sources.s1.start.from = 0
agent.sources.s1.custom.query = select `id`, `name`, "666" as tt from test1
#where id > $@$ order by id asc 根据id增量查询
#where id > $@$ order by id asc
agent.sources.s1.batch.size = 1000
agent.sources.s1.max.rows = 1000
agent.sources.s1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
agent.sources.s1.hibernate.c3p0.min_size=1
agent.sources.s1.hibernate.c3p0.max_size=10
################################################################
agent.channels.c1.type = memory
agent.channels.c1.capacity = 10000
agent.channels.c1.transactionCapacity = 10000
agent.channels.c1.byteCapacityBufferPercentage = 20
agent.channels.c1.byteCapacity = 800000
################################################################
agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#kafka的topic用来存储查询到的数据
agent.sinks.k1.topic = test-flume
agent.sinks.k1.brokerList = 192.168.6.201:9092,192.168.6.202:9092,192.168.6.203:9092
agent.sinks.k1.requiredAcks = 1
agent.sinks.k1.batchSize = 20
agent.sinks.k1.channel = c1
agent.sinks.k1.channel = c1
agent.sources.s1.channels=c1
6)创建kafak topic
略过:自行查看网上教程
7)启动flume
进入flume安装目录:cd /home/install/apache-flume-1.9.0-bin
启动:bin/flume-ng agent -n agent -c conf -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console
8)安装配置结束,可自行测试。