基于flume同步mysql数据至kafka(小白入手超详细)

申屠亦
2023-12-01

一、前言:

      现在需要经mysql的数据定时同步到kafka,一开始用的canal但是不知道为啥没成功,启动起来了但就是数据过去不,查看log也不报错,所以转到flume。发现flume挺好用的,而且灵活。

二、同步原理:

     Flume原理就不多介绍了,文章一大把,我说一下,连接sql的原理。我调的是网上的开源插件,看了一下log日志,特意分享一下原理,有错误的希望指出,大家交流。其实就是source每隔一段时间(自定义)执行一遍sql,来查询数据库,将查询到的数据sink到kafka。如果数据量大的话,建议查询mysql从库。

三、部署

1、版本说明:

      flume:1.9.0

      mysql:不限

2、下载flume:apache-flume-1.9.0-bin.tar.gz

      下载插件:flume-ng-sql-source-json-1.0.jar

      下载数据库连接jar包

      apache flume打包下载:flume-mysql.zip

      cloudera CDH flume版本下载:cloudera-flume-mysql.zip

      注:如果flume版本是CDH的,导入原生的flume-ng-sql-source-json-1.0.jar,Flume在读取mysql的时候会报错:

java.lang.NoSuchMethodError:    org.apache.flume.Context.getSubProperties(Ljava/lang/String;)Lcom/google/common/collect/ImmutableMap;所以根据自己的需求下载。

2、安装Flume

1)将文件上传到linux服务器,我上传到/root目录下了。

2)创建文件夹

         mkdir /home/install/

3)解压flume

          tar -zxvf apache-flume-1.9.0-bin.tar.gz

4)将下载的插件和mysql的jar包导入lib目录下

          mv /root/flume-ng-sql-source-json-1.0.jar /home/install/apache-flume-1.9.0-bin/lib

         mv /root/mysql-connector-java.jar /home/install/apache-flume-1.9.0-bin/lib

4)创建flume的properties文件(/home/install/apache-flume-1.9.0-bin/conf)

          cp /home/install/apache-flume-1.9.0-bin/conf/flume-conf.properties.template /home/install/apache-flume-1.9.0-bin/conf/flume-conf.properties

5)配置flume-conf.properties文件:

 

agent.sources = s1
agent.channels = c1
agent.sinks = k1

###########sql source#################
# For each one of the sources, the type is defined
agent.sources.s1.type = org.keedio.flume.source.SQLSource
agent.sources.s1.hibernate.connection.url = jdbc:mysql://192.168.26.234:3306/test

# Hibernate Database connection properties
agent.sources.s1.hibernate.connection.user = root
agent.sources.s1.hibernate.connection.password = xloVR91vQ7bRMXcW
agent.sources.s1.hibernate.connection.autocommit = true
agent.sources.s1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
agent.sources.s1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
agent.sources.s1.run.query.delay=5000 #查询间隔
agent.sources.s1.status.file.path = /home/install/apache-flume-1.9.0-bin/status
agent.sources.s1.status.file.name = sqlSource.status

# Custom query
agent.sources.s1.start.from = 0
agent.sources.s1.custom.query = select `id`, `name`, "666" as tt from test1
#where id > $@$ order by id asc 根据id增量查询
#where id > $@$ order by id asc
agent.sources.s1.batch.size = 1000
agent.sources.s1.max.rows = 1000
agent.sources.s1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
agent.sources.s1.hibernate.c3p0.min_size=1
agent.sources.s1.hibernate.c3p0.max_size=10

################################################################
agent.channels.c1.type = memory
agent.channels.c1.capacity = 10000
agent.channels.c1.transactionCapacity = 10000
agent.channels.c1.byteCapacityBufferPercentage = 20
agent.channels.c1.byteCapacity = 800000

################################################################
agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

#kafka的topic用来存储查询到的数据
agent.sinks.k1.topic = test-flume
agent.sinks.k1.brokerList = 192.168.6.201:9092,192.168.6.202:9092,192.168.6.203:9092
agent.sinks.k1.requiredAcks = 1
agent.sinks.k1.batchSize = 20
agent.sinks.k1.channel = c1

agent.sinks.k1.channel = c1
agent.sources.s1.channels=c1

6)创建kafak topic

        略过:自行查看网上教程

7)启动flume

        进入flume安装目录:cd /home/install/apache-flume-1.9.0-bin

        启动:bin/flume-ng agent -n agent -c conf -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console

8)安装配置结束,可自行测试。

 

 

 类似资料: