flume拦截mysql_为flume安装Mysql监控支持插件

高朝明
2023-12-01

为flume安装Mysql监控支持插件

问题汇总:flume的mysql插件仅仅支持id增量监控

执行flume指令后,只能从指定行开始同步,一直到指定列。除非不断重复调用flume的启动命令,否则很难实现增量实时同步flume,当然也可以使用定时任务。

这里不用flume了,换阿里的canal

单纯将指定列的数据同步到模板存储倒是还行

下载插件

flume原生不支持监控mysql,这里使用github上提供的一个插件

编译插件

首先安装mvn(安装mvn前先安装mvn的java依赖)

解压源码,在main目录(带pom.xml)下执行mv package 编译插件

在target目录下,会生成目标jar插件

部署插件:

在安装目录下新建文件夹

mkdir -p  $FLUME_HOME/plugins.d/sql-source/lib   $FLUME_HOME/plugins.d/sql-source/libext

cp  flume-ng-source-xxx.jar   $FLUME_HOME/plugins.d/sql-source/lib

为数据库安装连接工具

从mysql官网下载最近的mysql-connector-java插件

cp  mysql-connector-java-bin.jar  $FLUME_HOME/plugins.d/sql-source/libext

FLUME配置文件修订

环境配置:

在conf目录下,复制flume的环境配置模板,指定JAVA位置,根据情况配置JAVA虚拟机使用的内存

cd  $FLUME_HOME/conf

cp flume-env.sh.template  flume-env.sh

vi  flume-env.sh

export JAVA_HOME=/xxxx

定制FLUME监控MYSQL配置文件:

FLUME监控MSYQL模型

MYSQL    <>  KAFKA  ---------->>   SPARK   ---------->>  KAFKA   -------->> WEBGIC

vi  flume-conf.properties

lsl.channels = c1

lsl.sources = r1

lsl.sinks = k1

lsl.sources.r1.type = org.keedio.flume.source.SQLSource

lsl.sources.r1.hibernate.connection.url = jdbc:mysql://192.168.10.56:33060/fbd_kvdata         #指定数据库

lsl.sources.r1.hibernate.connection.user = root                                                                         #数据库账号密码

lsl.sources.r1.hibernate.connection.password = jmuser

lsl.sources.r1.hiberante.connection.autocommit = true

lsl.sources.r1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect

lsl.sources.r1.hibernate.connection.driver_class = com.mysql.jdbc.Driver                               #数据库驱动

lsl.sources.r1.run.query.delay = 5000                                                                                        #查询时间间隔

lsl.sources.r1.status.file.path = /var/flume

lsl.sources.r1.status.file.name = sqlSource.status                                                                     #记录mysql查询的最后一次状态未知

lsl.sources.r1.start.from = 1                                                                                                                                   #id的起始列,从这一行开始查

lsl.sources.r1.custom.query = select nid, uid,virus_name  from node_file_viruses where id > $@$                    #数据查询语句

lsl.sources.r1.batch.size = 1000                                                                                                                             #给查询语句添加limit限制条件一次1000

lsl.sources.r1.max.rows = 1000

lsl.sources.r1.hibernate.connection.provides_class = org.hibernate.connection.C3P0ConnectionProvider

lsl.sources.r1.hiberante.c3p0.min_size = 1

lsl.sources.r1.hiberante.c3p0.max_szie = 10

lsl.channels.c1.type = memory

lsl.channels.c1.capacity = 10000

lsl.channels.c1.transactionCapacity = 10000

lsl.channels.c1.byteCapacityBufferPercentage = 20

lsl.channels.c1.byteCapacity = 8000000

lsl.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

lsl.sinks.k1.topic = jmvirus                                                           #kafka的topic

lsl.sinks.k1.brokerList = 127.0.0.1:9092                                        #kafka的broker list

lsl.sinks.k1.requireAcks = 1

lsl.sinks.k1.batchSize = 20

lsl.sinks.k1.channel = c1                                                              #

lsl.sinks.k1.channel = c1

lsl.sources.r1.channels = c1

 类似资料: