当前位置: 首页 > 知识库问答 >
问题:

如何使水槽加载文件到HDFS,HDFS从不关闭文件.tmp并按名称重命名文件。

王叶五
2023-03-14

实际上我有两个问题,我的第一个问题是:在整个文件被水槽代理刷新后,如何使HDFS关闭文件(例如。123456789.tmp)。事实上,直到我强制水槽代理停止,文件才会关闭。我相信有一种使用以下4个参数的方法

hdfs.rollSize = 0 
hdfs.rollCount =0
hdfs.rollInterval = 0
hdfs.batchsize =    1000000

我的第二个问题是,我的代理flume从SFTP服务器接收文件,而我需要将每个文件名保存在hdfs中。它适用于spooldir类型,但不适用于SFTP!!有什么想法吗?

我的水槽代理配置文件如下:

agent.sources = r1 
agent.channels = c1
agent.sinks = k

configure ftp source

agent.sources.r1.type = org.keedio.flume.source.mra.source.Source
agent.sources.r1.client.source = sftp
agent.sources.r1.name.server = ip
agent.sources.r1.user = user
agent.sources.r1.password = secret
agent.sources.r1.port = 22
agent.sources.r1.knownHosts = ~/.ssh/known_hosts
agent.sources.r1.work.dir = /DATA/test/flumrFTP
agent.sources.r1.fileHeader = true
agent.sources.r1.basenameHeader = true
agent.sources.r1.inputCharset = ISO-8859-1
#agent.sources.r1.batchSize = 1000
agent.sources.r1.flushlines = true

configure sink s1
agent.sinks.k.type = hdfs
agent.sinks.k.hdfs.path =  hdfs://hostname:8000/user/admin/DATA/import_flume/
agent.sinks.k.hdfs.filePrefix = %{basename}
agent.sinks.k.hdfs.rollCount = 0
agent.sinks.k.hdfs.rollInterval = 0
agent.sinks.k.hdfs.rollSize = 0
agent.sinks.k.hdfs.useLocalTimeStamp = true
agent.sinks.k.hdfs.batchsize =    1000000
agent.sinks.k.hdfs.fileType = DataStream

Use a channel which buffers events in memory
agent.channels.c1.type = memory
agent.channels.c1.capacity =  1000000
agent.channels.c1.transactionCapacity =   1000000

agent.sources.r1.channels = c1
agent.sinks.k.channel = c1

共有1个答案

闾丘朗
2023-03-14

尝试设置变量

hdfs.rollInterval滚动当前文件之前等待的秒数

此设置会在您设置的秒数后关闭文件。我将我的设置为 200 秒,并且正在加载较小的文件

 类似资料:
  • 我已将Flume源配置为Spooldir类型。我有很多CSV文件,.xl3和.xls,我希望我的Flume代理将所有文件从假脱机程序加载到HDFS接收器。然而,水槽代理返回异常 这是我对水槽源的配置: 和我的HDFS接收器:

  • 问题内容: 我正在将大量数据存储到hdfs中。我需要将文件从一个文件夹移动到另一个文件夹。 请问一般来说,文件系统重命名方法的成本是多少?假设我必须移动TB的数据。 非常感谢你。 问题答案: 在HDFS或任何文件系统(如果实施得当)中移动文件涉及对名称空间的更改,而不涉及实际数据的移动。遍历代码仅完成“名称”节点中名称空间(内存和编辑日志)的更改。 从NameNode.java类 NameNode

  • 我正在尝试实现一个简单的Flume HDFS接收器,它将从Kafka通道获取事件,并将它们作为文本文件写入HDFS。 建筑非常简单。这些事件从twitter流式传输到kafka主题,flume hdfs sink确实会将这些事件写入hdfs。这是Kafka-制片人斯塔科弗洛问题的第二部分。 当我执行这个命令时没有出现错误,看起来运行得很好,但是我看不到hdfs中的文本文件。我无法调试或调查,因为在

  • 我正在尝试通过水槽从kafka将数据放入hdfs中。kafka_producer每10秒发送一条消息。我想在hdfs上的一个文件中收集所有消息。这是我使用的水槽配置,但它在hdfs上存储了许多文件(一个用于消息): 附言我从一个文件开始.csv。kafka 生产者获取文件并选择一些感兴趣的字段,然后每 10 秒发送一个条目。Flume将条目存储在Hadoophdfs上,但存储在许多文件中(1个条目

  • 我已经为我的应用程序配置了水槽代理,其中源是Spooldir,接收器是HDFS 我能够在hdfs中收集文件。 代理配置为: 我有以下格式的hdfs文件: /flume/events/file1.txt。1411543838171/水槽/事件/文件2.txt.1411544272696 我想知道我可以删除时间戳(1411543838171) /唯一号码,这是自动生成的每个事件的文件名?

  • 在HDFS的上下文中,我们有Namenode和Datanode,说Namenode存储了文件系统名称空间是什么意思? 还有,我们为datanode指定的目录(在hdfs-core.xml中)是唯一可以存储数据的地方,还是我们可以指定任何其他目录来保存数据?