当前位置: 首页 > 知识库问答 >
问题:

Flume HDFS接收器仅使用netcat源存储一行数据源

何麻雀
2023-03-14

我尝试使用Flume 1.7将数据加载到HDFS中。我创建了以下配置:

# Starting with: /opt/flume/bin/flume-ng agent -n Agent -c conf -f /opt/flume/conf/test.conf
# Naming the components on the current agent
Agent.sources = Netcat   
Agent.channels = MemChannel 
Agent.sinks = LoggerSink hdfs-sink LocalOut

# Describing/Configuring the source 
Agent.sources.Netcat.type = netcat 
Agent.sources.Netcat.bind = 0.0.0.0
Agent.sources.Netcat.port = 56565  

# Describing/Configuring the sink 
Agent.sinks.LoggerSink.type = logger  

# Define a sink that outputs to hdfs.
Agent.sinks.hdfs-sink.type = hdfs
Agent.sinks.hdfs-sink.hdfs.path = hdfs://<<IP of HDFS node>>:8020/user/admin/flume_folder/%y-%m-%d/%H%M/
Agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
Agent.sinks.hdfs-sink.hdfs.fileType = DataStream
Agent.sinks.hdfs-sink.hdfs.writeFormat = Text
Agent.sinks.hdfs-sink.hdfs.batchSize = 100
Agent.sinks.hdfs-sink.hdfs.rollSize = 0
Agent.sinks.hdfs-sink.hdfs.rollCount = 0
Agent.sinks.hdfs-sink.hdfs.rollInterval = 0
Agent.sinks.hdfs-sink.hdfs.idleTimeout = 0

# Schreibt input into local Filesystem
#http://flume.apache.org/FlumeUserGuide.html#file-roll-sink
Agent.sinks.LocalOut.type = file_roll  
Agent.sinks.LocalOut.sink.directory = /tmp/flume
Agent.sinks.LocalOut.sink.rollInterval = 0  


# Describing/Configuring the channel 
Agent.channels.MemChannel.type = memory 
Agent.channels.MemChannel.capacity = 1000 
Agent.channels.MemChannel.transactionCapacity = 100 

# Bind the source and sink to the channel 
Agent.sources.Netcat.channels = MemChannel
Agent.sinks.LoggerSink.channel = MemChannel
Agent.sinks.hdfs-sink.channel = MemChannel
Agent.sinks.LocalOut.channel = MemChannel

之后,我使用 netcat 将以下文件发送到源:

cat textfile.csv | nc <IP of flume agent> 56565

该文件包含以下元素:

Name1,1
Name2,2
Name3,3
Name4,4
Name5,5
Name6,6
Name7,7
Name8,8
Name9,9
Name10,10
Name11,11
Name12,12
Name13,13
Name14,14
Name15,15
Name16,16
Name17,17
Name18,18
Name19,19
Name20,20
...
Name490,490
Name491,491
Name492,492

我面临的问题是,没有任何错误,flume正在写入hdfs,但只有一行传输的文件。如果您开始使用nectat多次将文件推送到源文件,那么有时flume会将多个文件写入hdfs,包括多个行。但很少是所有行。

我试图改变hdfs参数roll大小,批量大小和其他,但它并没有真正改变的行为。

也配置了本地文件的接收器工作正常。

有人知道如何配置它以确保所有条目都写入hdfs而不会丢失条目吗?

谢谢你的帮助。

更新1.12.2016

我删除了除HDFS接收器外的所有接收器,并更改了一些参数。在此之后,HDFS接收器按其应有的方式运行。

这里的配置:

# Naming the components on the current agent
Agent.sources = Netcat   
Agent.channels = MemChannel 
Agent.sinks = hdfs-sink 

# Describing/Configuring the source 
Agent.sources.Netcat.type = netcat 
Agent.sources.Netcat.bind = 0.0.0.0
Agent.sources.Netcat.port = 56565  


# Define a sink that outputs to hdfs.
Agent.sinks.hdfs-sink.type = hdfs
Agent.sinks.hdfs-sink.hdfs.path = hdfs://<<IP of HDFS node>>/user/admin/flume_folder/%y-%m-%d/%H%M/
Agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
Agent.sinks.hdfs-sink.hdfs.fileType = DataStream
Agent.sinks.hdfs-sink.hdfs.writeFormat = Text
Agent.sinks.hdfs-sink.hdfs.batchSize = 100
Agent.sinks.hdfs-sink.hdfs.rollSize = 0
Agent.sinks.hdfs-sink.hdfs.rollCount = 100


# Describing/Configuring the channel 
Agent.channels.MemChannel.type = memory 
Agent.channels.MemChannel.capacity = 1000 
Agent.channels.MemChannel.transactionCapacity = 100 

# Bind the source and sink to the channel 
Agent.sources.Netcat.channels = MemChannel
Agent.sinks.hdfs-sink.channel = MemChannel

有人知道为什么它在这种配置下工作,但在两个或更多的水槽下就不再工作了吗?

共有1个答案

丘华翰
2023-03-14

我自己找到了解决方案。据我所知,我为两个接收器使用了相同的通道。因此,更快的接收器接管了所有条目,只有部分条目被传递到hdfs接收器。

使用不同的通道并使用参数为源包括扇动后

Agent.sources.Netcat.selector.type = replicating

Flume 按预期写入本地文件和 hdfs。

 类似资料:
  • 我需要你的帮助。有一个Vue应用程序,我使用vuex商店和Vue路由器。问题是,我从API调用中获取存储数据,如果我从其他地方导航到页面,用户数据已经存储在存储中,并且在组件中,我可以使用getter接收到的数据。但是,如果我在create或mount方法中重新加载页面,则getter不包含任何数据。但如果在Vuex开发工具中查看,我可以看到fetchedd数据。如何修复此行为?

  • 我正在使用Kafka源和接收器连接器创建一个数据管道。源连接器从SQL数据库消费并发布到主题,而接收器连接器订阅主题并放入其他SQL数据库。表有16 GB的数据。现在的问题是,数据不能从一个数据库传输到另一个数据库。但是,如果表的大小很小,比如1000行,那么数据可以成功传输。 源连接器配置: 源连接器日志: 有人能指导我如何调整Kafka源连接器以传输大数据吗?

  • 我正在使用从Kafka到HDFS的Flink bucketing水槽。Flink的版本是1.4.2 我发现每次重新启动作业时都会有一些数据丢失,即使使用保存点也是如此 我发现如果我设置writer SequenceFile,这个问题可以解决。挤压型。记录而不是序列文件。挤压型。块当Flink试图保存检查点时,有效长度似乎与实际长度不同,实际长度应该包括压缩数据 但如果不能使用压缩类型,则可能会出现

  • 我用C编写了一个服务器应用程序,它从客户机应用程序(我自己没有编写)接收数据包,并将数据打印到控制台。问题是,当我试图一次接收并存储整个包体时,数据存储不正确,但是当使用recv()的多次调用接收并存储包体时,它确实正确存储。 关于endianness,客户端和服务器都运行在一个小端机器上,客户端作为小端发送数据,服务器读取数据而不需要转换。 这是客户端应用程序发送给服务器应用程序的数据包: 下面

  • 本文向大家介绍android使用SharedPreferences进行数据存储,包括了android使用SharedPreferences进行数据存储的使用技巧和注意事项,需要的朋友参考一下 很多时候我们开发的软件需要向用户提供软件参数设置功能,例如我们常用的QQ,用户可以设置是否允许陌生人添加自己为好友。对于软件配置参数的保存,如果是window软件通常我们会采用ini文件进行保存,如果是j2s

  • 求你了,我需要你的帮助。 我正在使用TCP连接在java服务器和android应用程序客户端之间建立TCP连接。假设我将发送一个序列化对象,但是每次在客户端,代码都会在中的Obj=(Person)处被阻塞。readObject;其中in是数据对象InputStream,Person是序列化对象。 然而,如果我发送的是字符串或整数,并且我使用Obj=in,代码就可以工作。readObject;直接地