分布式数据收集:(Flume原理与应用)

胡新
2023-12-01

第一:背景介绍

常见的开源数据收集系统

  • 非结构化日志(数据)收集
    • flume
  • 结构化日志(数据)收集
    • sqoop全量导入
    • canal(Alibaba)增量导入
    • Databus(linkedin)增量导入

第二:Flume(NG)介绍

- Event

  • flume以事件的形式传输数据单元
  • 事件由一个header和载有数据的byte array构成
  • header是一个字典结构的数据,可以在上下文路由中扩展使用

- Client

  • client是一个将原始log包装成event并且发送他们到一个或者多个agent的实体
  • client不是必须的

- Agent

  • source

    1. 接收或者产生event,并且批量发送到一个或者多个channel
    2. 不同类型的source

      • 与系统集成的source:syslog,netCat
      • 自动生成事件的source:Exce
      • 监听文件夹下文件变化的:Spolling Directory Source,Talidir Source
      • 用于Agent与Agent通信的IPC Source:Avro,Thrift
  • channel

    1. 位于Source和Sink之间,用于缓存event
    2. 支持事务
    3. 不同类型的channel
      • Memory Channel:volatile
      • File Channel:
      • JDBC Channel:
  • sink
    1. 将event传输到下一步或者最终目的地,成功后将event从Channel中清除
    2. 不同类型的sink
      • 存储event到最终终端的sink:HDFS,HBASE
      • 自动消耗的Channel:Null sink
      • 用于Agent之间通信的IPC sink:Avro

第三:Sqooq介绍

  • 传统关系型数据库和Hadoop之间的桥梁
    • 把关系型数据的数据导入到hadoop系统
    • 把hadoop系统的数据导入到关系型数据库中
  • 利用MapReduce加快数据传输速度
  • 批处理方式进行数据传输

第四:CDC介绍

  1. canal:
  2. databus

第五:项目实践

一,项目说明

将命名为record.list里面不断生成的内容,收集到hadoop集群中。其中source采用exec的sourc,Channel采用file的Channel,sink采用hdfs的sink

二,安装flume

[hadoop@hadoopa ~]$ tar -zxvf apache-flume-1.7.0-bin.tar.gz

三,配置flume

  1. 配置 flume-conf-logAnalysis.properties
[hadoop@hadoopa conf]$ pwd
/home/hadoop/apache-flume-1.7.0-bin/conf
[hadoop@hadoopa conf]$ vi flume-conf-logAnalysis.properties 

[hadoop@hadoopa conf]$ cat flume-conf-logAnalysis.properties
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#  http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.


# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'

logAgent.sources = logSource
logAgent.channels = fileChannel
logAgent.sinks = hdfsSink

# For each one of the sources, the type is defined
logAgent.sources.logSource.type = exec
logAgent.sources.logSource.command = tail -F /home/hadoop/hadooptraining/datasource/record.list

# The channel can be defined as follows.
logAgent.sources.logSource.channels = fileChannel

# Each sink's type must be defined
logAgent.sinks.hdfsSink.type = hdfs
logAgent.sinks.hdfsSink.hdfs.path = hdfs://hadoopA:8020/flume/record/%Y-%m-%d/%H%M
logAgent.sinks.hdfsSink.hdfs.filePrefix= transaction_log
logAgent.sinks.hdfsSink.hdfs.rollInterval= 600
logAgent.sinks.hdfsSink.hdfs.rollCount= 10000
logAgent.sinks.hdfsSink.hdfs.rollSize= 0
logAgent.sinks.hdfsSink.hdfs.round = true
logAgent.sinks.hdfsSink.hdfs.roundValue = 10
logAgent.sinks.hdfsSink.hdfs.roundUnit = minute
logAgent.sinks.hdfsSink.hdfs.fileType = DataStream
logAgent.sinks.hdfsSink.hdfs.useLocalTimeStamp = true
#Specify the channel the sink should use
logAgent.sinks.hdfsSink.channel = fileChannel

# Each channel's type is defined.
logAgent.channels.fileChannel.type = file
logAgent.channels.fileChannel.checkpointDir= /home/hadoop/apache-flume-1.7.0-bin/dataCheckpointDir
logAgent.channels.fileChannel.dataDirs= /home/hadoop/apache-flume-1.7.0-bin/dataDir

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
  1. 配置 flume-env.sh
[hadoop@hadoopa conf]$ pwd
/home/hadoop/apache-flume-1.7.0-bin/conf
[hadoop@hadoopa conf]$ vi flume-env.sh
[hadoop@hadoopa conf]$ cat flume-env.sh
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


# Give Flume more memory and pre-allocate, enable remote monitoring via JMX
export JAVA_OPTS="-Xms100m -Xmx200m -Dcom.sun.management.jmxremote"

# Let Flume write raw event data and configuration information to its log files for debugging
# purposes. Enabling these flags is not recommended in production,
# as it may result in logging sensitive user information or encryption secrets.
# $JAVA_OPTS="$JAVA_OPTS -Dorg.apache.flume.log.rawdata=true -Dorg.apache.flume.log.printconfig=true "

# Foll. classpath will be included in Flume's classpath.
# Note that the Flume conf directory is always included in the classpath.
FLUME_CLASSPATH="$HADOOP_HOME/share/hadoop/common/hadoop-common-2.7.3.jar"   # Example:  "path1;path2;path3"

四,执行flume

[hadoop@hadoopa conf]$ flume-ng agent --conf /home/hadoop/apache-flume-1.7.0-bin/conf --conf-file /home/hadoop/apache-flume-1.7.0-bin/conf/flume-conf-logAnalysis.properties --name logAgent -Dflume.root.logger=DEBUG,console -Dflume.monitoring.type=http -Dflume.monitoring.port=34545

五,验证结果

通过浏览器访问hdfs:

http://192.168.1.201:50070/explorer.html#/flume/record/
 类似资料: