当前位置: 首页 > 知识库问答 >
问题:

使用spool目录写入flume如何重命名文件

郭麒
2023-03-14

我正在使用Flume spool目录写入hdfs。这是我的代码

 #initialize agent's source, channel and sink
agent.sources = test
agent.channels = memoryChannel
agent.sinks = flumeHDFS

# Setting the source to spool directory where the file exists
agent.sources.test.type = spooldir
agent.sources.test.spoolDir = /johir
agent.sources.test.fileHeader = false
agent.sources.test.fileSuffix = .COMPLETED

# Setting the channel to memory
agent.channels.memoryChannel.type = memory
# Max number of events stored in the memory channel
agent.channels.memoryChannel.capacity = 10000
# agent.channels.memoryChannel.batchSize = 15000
agent.channels.memoryChannel.transactioncapacity = 1000000

# Setting the sink to HDFS
agent.sinks.flumeHDFS.type = hdfs
agent.sinks.flumeHDFS.hdfs.path =/user/root/
agent.sinks.flumeHDFS.hdfs.fileType = DataStream

# Write format can be text or writable
agent.sinks.flumeHDFS.hdfs.writeFormat = Text

# use a single csv file at a time
agent.sinks.flumeHDFS.hdfs.maxOpenFiles = 1

# rollover file based on maximum size of 10 MB
agent.sinks.flumeHDFS.hdfs.rollCount=0
agent.sinks.flumeHDFS.hdfs.rollInterval=0
agent.sinks.flumeHDFS.hdfs.rollSize = 1000000
agent.sinks.flumeHDFS.hdfs.batchSize =1000

# never rollover based on the number of events
agent.sinks.flumeHDFS.hdfs.rollCount = 0

# rollover file based on max time of 1 min
#agent.sinks.flumeHDFS.hdfs.rollInterval = 0
# agent.sinks.flumeHDFS.hdfs.idleTimeout = 600

# Connect source and sink with channel
agent.sources.test.channels = memoryChannel
agent.sinks.flumeHDFS.channel = memoryChannel

但问题是写入文件的数据被重命名为某个随机的tmp名称。如何将hdfs中的文件重命名为源目录中的原始文件名。例如,我有文件day1.txt、day2.txt和day3.txt。这是两天的数据。我想将它们保存在hdfs中,分别为day1.txt、day2.txt和day3.txt。但这三个文件被合并并存储在hdfs中,作为FlumeData.1464629158164.tmp文件。有什么办法可以做到这一点吗?

共有1个答案

葛胜泫
2023-03-14

如果要保留原始文件名,则应将文件名作为标头附加到每个事件。

  1. 将basenamehead er属性设置为true。这将创建一个带有basename键的标头,除非使用basenamehead erKey属性设置为其他内容。
  2. 使用hdfs.file前缀属性使用basenamehead er值设置文件名。

将以下属性添加到配置文件中。

#source properties
agent.sources.test.basenameHeader = true

#sink properties
agent.sinks.flumeHDFS.type = hdfs
agent.sinks.flumeHDFS.hdfs.filePrefix = %{basename}
 类似资料:
  • 我有一个水槽代理,它从假脱机目录源读取,并在一些转换后写入 hdfs。由于 flume 尝试将处理后的文件重命名为“.已完成“,我在假脱机目录中写入时遇到权限被拒绝异常。 我想知道向敏感数据授予写入权限的安全性如何。 是否有一轮关于水槽的解决方案来识别假脱机目录中已处理的文件

  • 前面小节介绍了文件和目录的创建、移动、删除、打包、压缩,本小节介绍如何对已有的文件或者重命名,好的文件命名规范将提升对文件和目录管理的效率。 1. 使用 mv 命令对文件名称 这里还是以 /home 目录的 alltxt.tar 文件为例,可以使用 mv 命令对其重命名: ls mv alltxt.tar newtxt.tar ls 执行结果如下图: 如上图所示相当于文件移动路径没变,名称变

  • 我正在使用Flume从我的本地文件系统写一些CSV文件到HDFS。 我想知道HDFS水槽的最佳配置是什么,这样本地系统上的每个文件都会在HDFS以CSV格式准确复制。我希望Flume处理的每个CSV文件都是单个事件,作为单个文件刷新和写入。尽可能多地,我希望文件是完全一样的,没有标题的东西等。 我需要在这些值上加什么来模拟我想要的行为? 如果还有其他Flume代理配置变量需要更改,请提供。 如果这

  • 问题内容: 我正在尝试使用以下Python脚本重命名目录中的多个文件: 运行此脚本时,出现以下错误: 这是为什么?我该如何解决这个问题? 谢谢。 问题答案: 重命名时您没有给出完整的路径,请按照以下步骤操作: 编辑 :感谢tavo,第一个解决方案将文件移动到当前目录,修复该问题。

  • 问题内容: 我有这样的事情: 我想将这些文件重命名为以下形式: 在同一目录中。 我想我可以使用,但是我不知道如何在文件夹和文件重命名的同时使用它。 问题答案: 可以使用bash for loop和: 请注意,如果目录名称包含空格,则上述解决方案将不起作用。相关链接。 另一种基于注释的解决方案(也适用于名称中也包含空格的目录):

  • 问题内容: 我想更改为。 问题答案: 用途