当前位置: 首页 > 知识库问答 >
问题:

如何使用 Flume 在源代码上执行预处理并在 hdfs sink 中保留真实文件名

贺奕
2023-03-14

我刚开始使用Apache Flume,我很难理解它到底是如何工作的。为了解释我的问题,我解释了我的需要和我做了什么。

我想在 csv 文件目录(这些文件每 5 分钟构建一次)和 HDFS 集群之间配置一个流。

我发现“假脱机目录”源和HDFS接收器是我所需要的。给我这个flume.conf文件

agent.sources = seqGenSrc
agent.channels = memoryChannel
agent.sinks = hdfsSink

# For each one of the sources, the type is defined
agent.sources.seqGenSrc.type = spooldir
agent.sources.seqGenSrc.spoolDir = /home/user/data

# The channel can be defined as follows.
agent.sources.seqGenSrc.channels = memoryChannel

# Each sink's type must be defined
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.path = hdfs://localhost/Flume/data
agent.sinks.hdfsSink.hdfs.fileType = DataStream

agent.sinks.hdfsSink.hdfs.writeFormat=Text    

#Specify the channel the sink should use
agent.sinks.hdfsSink.channel = memoryChannel

# Each channel's type is defined.
agent.channels.memoryChannel.type = memory

# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100

结果是输入文件被重命名为“.”。我本地文件系统上的“complete ”,数据以一个新名字上传到HDFS,我猜这个名字是唯一的,由Flume生成。

这几乎是我需要的。

但是在上传之前,我想做一些文件特定的操作(删除标题,转义逗号..).不知道怎么做,考虑用拦截器。但是,当数据在flume中时,它在事件中被转换并流动。在他看来,没有关于文件的知识。

否则,原始时间事件将写入文件名,因此我希望此时间与我的事件关联,而不是当前日期。

我还想保留hdfs中的原始文件名(其中有一些有用的信息)。

有人有建议来帮助我吗?

共有1个答案

郭洋
2023-03-14

如果指定,则可以将原始文件名保留为标头

agent.sources.seqGenSrc.fileHeader=true 

然后可以在水槽中检索。

如果要操作文件中的数据,请使用拦截器。您应该知道,事件基本上是假脱机目录中文件中的一行。

最后但同样重要的是,您需要使用filehead er属性将事件通过管道传输回正确的文件。这可以通过指定接收器中的路径来实现,如下所示:

agent.sinks.hdfsSink.hdfs.path = hdfs://localhost/Flume/data/%{file}

您可以使用前缀和后缀进一步配置文件名:

hdfs.filePrefix FlumeData   Name prefixed to files created by Flume in hdfs directory
hdfs.fileSuffix –   Suffix to append to file (eg .avro - NOTE: period is not automatically added)
 类似资料:
  • 我配置了一个flume代理,它从一个FTP服务器读取数据,并将文件发送到hdfs接收器。我最大的问题是,我想用它们的原始文件名在hdfs中存储文件。我试着用Spooldir source,它工作得很好,可以用它们的basename在hdfs中存储文件,但是flume agent crush: 1) 如果文件在放入假脱机目录后被写入,Flume将在其日志文件中打印错误并停止处理。 2) 如果稍后重用

  • 使用GitHub,我们可以在线存储代码,而使用Jupyter笔记本,我们只能执行Python代码的一部分。我想一起使用它们。我可以用电脑上储存的Jupyter笔记本编辑代码。但是,我无法找到运行存储在GitHub上的代码的方法。那么,你知道怎么做吗。 以下是一些例子:https://github.com/biolab/ipynb/blob/master/2015-bi/lcs.ipynb http

  • 我是Storm新手。我在用它做一个大学项目。 我创建了我的拓扑,有一个连接到MySql数据库的喷口和两个螺栓。连接到喷口的第一螺栓准备和移除元组中不需要的信息;第二,对元组进行过滤。 我在本地模式下工作。 我希望我的问题很清楚。提前感谢!

  • 我想用C重命名并保留源文件。我用这个来重命名文件。 例如: 重命名(source_file.txt,destination_file.txt); 在这里,我想保留source_file.txt.默认情况下,这个函数会删除source_file并将其保存为destination_file。

  • 问题内容: 我是Hudson / Jenkins的新手,我想知道是否有一种方法可以将Hudson的配置文件检入到源代码管理中。 理想情况下,我希望能够单击UI中的“保存配置”按钮,并将Hudson配置文件签入源代码管理。 问题答案: 有一个名为SCM Sync配置插件的插件。 原始答案 看看我对类似问题的回答。基本思想是使用filesystem-scm- plugin 来检测对xml文件的更改。您

  • 我想执行一个批处理文件 D:\apache-tomcat-6.0。20\apache-tomcat-7.0。30\bin\shutdown。球棒 它位于我的服务器上。 我应该如何编写我的文件?