大数据—数据收集系统介绍(Flume,Sqoop)

弘康安
2023-12-01

概述

大数据,数据收集是非常重要的一块知识体系。数据收集,一般会对不同的数据,拥有不同的手机方式,那么常见的数据来源有什么呢?

  1. 非结构化数据。一般有用户访问日志、图片、视屏、网页等信息。
  2. 半结构化数据。一般类似xml、json之类的数据。
  3. 结构化数据。一般是传统关系型数据库(MySQL、Oracle)等存储的数据。

针对结构化数据,导入到大数据系统Hadoop中,有两种导入方式,一种是全量导入,一种是增量导入。

对于不同类型的数据,大数据提供了不同的收集方法。
非结构化的数据收集:Flume
结构化的数据收集:Sqoop、Canal、Databus

其中,Sqoop是一种全量导入数据的方式。
Canal是一种增量导入数据的方式。

Flume

它是一种由Cloudera公司开源的数据收集系统。主要针对非结构化的数据收集。

HDFS是大数据技术体系中的数据存储方案。而Flume是客户端同HDFS之间的连接桥梁。

Flume是由一个个的Agent组成,这些Agent的两端对接这数据的生产方Client以及数据的存储方HDFS。

每一个Agent由三部分构成:Source、Channel、Sink。

1、Event

它是Flume进行数据传输的基本单元,Flume以事件的方式进行数据传递,之所以使用事件的方式,是因为数据的生产方和数据的消费方,吞吐能力是不一样的,事件的方式,可以把接收的数据暂存到Channel中,让数据的消费方根据自己的能力来慢慢消费数据。

Event是由两部分组成:header以及数据体构成。header是一个key-value字符串对的无序集合。

2、Client

将原始数据进行包装,组成Flume的Event的形式进行数据传输,它存在的意义就是,将Flume的系统同数据源系统剥离出来。

3、Agent

它是整个Flume中的核心部分,Source进行数据接收,并且把Client产生的数据持续的添加到Channel中,而Channel本质上是一个队列,在队列的尾部添加数据,在队列的头部消费数据,Sink就是数据的接收方,它对Event类型的数据进行解析,然后注入HDFS系统中。

Flume在大数据的体系中,处于一个基础工具的位置,一般要求是会写基本的Souce配置以及Sink的配置等等。

3.1、Source

Source,一般用于监听一个文件夹下新产生的文件,并且读取内容,Source的数据一般发送给至少一个Channel。

Source:Spooling Directory Source
使用的时候,对于已经产生的文件不会进行任意更改,一般在使用的时候,会把文件写入一个临时目录下,之后move到监听目录之下。

配置:

a1.channels = ch-1
a1.sources = src-1

a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
a1.sources.src-1.fileHeader = true

Source: TailDir
这是一种,一旦写入数据,就会被监听读取的方式
配置也是脱离不了type、channels等,同时新增了positionFile,用于存储偏移量的文件。

也有其他的类型:Exec、avro等

3.2、channel

关于Memory的类型的channel的使用

a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
......
3.3、Sink

Sink的类型很多,有HDFS、HBase、Hive等等

##Sqoop全量导入
它是链接数据库和Hadoop的桥梁
使用MapReduce加快数据传输速度
批处理的方式进行数据传输

aqoop import \
 --connect jdbc:mysql://mysql.example.com/sqoop \
 --username sqoop \
 --password sqoop \
 --table cities

其中 --connect的意思就是:指定JDBC URL

–username、password:表示mysql数据库的用户名

–table:要读取的数据库表

sqoop export \
	--connect jdbc:mysql://mysql.example.com/sqoop \
	--username sqoop \
	--password sqoop \
	--table cities \
	--export-dir cities

其中–export-dir代表数据在HDFS上的存放目录

Flume的Java Api的介绍

我们可能常用的类有这么几个。

  1. RpcClinet
  2. RpcClintFactory
  3. Event
  4. EventDeliveryException
  5. EventBuilder

在项目中,我们可能要在代码中体现这么几个步骤:

第一步,实例化一个Flume的client。

clinet = RpcClientFactory.getDefaultInstance(hostname, port);

第二步,组建我们需要发送的数据的Event。

Event event = EventBuilder.withBody(msg, Charset.forName("UTF-8"));

第三步,把Event当作一个数据追加到clint上完成发送。

client.append(event);

第四步,数据发送完成,关闭client

clinet.close();

Flume的配置文件介绍

在flume的安装目录下,启动一个flume的命令执行语句:

bin / flume-ng agent --conf conf --conf-file logagent.properties -- name LogAgent -Dflume.root.logger=DEBUG,console

启动命令的说明:
–name是指定这个flume的名称
–conf-file是指指定这个flume的配置文件

比如这个Flume的代理服务叫LogAgent

LogAgent.sources = mysource
LogAgent.channels = mychannel
LogAgent.sinks = mysink
LogAgent.sources.mysource.type = spooldir
LogAgent.sources.mysource.channel = mychannel
LogAgent.sources.mysource.spoolDir = /tmp/logs
LogAgent.sinks.mysink.channel = mychannel
LogAgent.sinks.mysink.type = hdfs
// 下面是设置hdfs相关的东西
LogAgent.sinks.mysink.hdfs.path = hdfs://hadoop:9000/data/newlogs/%Y/%m/%d/%H/
LogAgent.sinks.hdfs.batchSize = 1000
LogAgent.sinks.hdfs.rollSize = 0
LogAgent.sinks.hdfs.rollCount = 10000
LogAgent.sinks.hdfs.useLocalTimeStamp = true
LogAgent.channels.mychannel.type = memory
LogAgent.channels.mychannel.capacity = 10000
LogAgent.channels.mychannel.transactionCapacity = 100

Canal、DataBus增量导入(Change Data Captrue)

 类似资料: