当前位置: 首页 > 知识库问答 >
问题:

如何在FTP源Flume代理中保留文件的原始基名

林玮
2023-03-14

我配置了一个flume代理,它从一个FTP服务器读取数据,并将文件发送到hdfs接收器。我最大的问题是,我想用它们的原始文件名在hdfs中存储文件。我试着用Spooldir source,它工作得很好,可以用它们的basename在hdfs中存储文件,但是flume agent crush:

1) 如果文件在放入假脱机目录后被写入,Flume将在其日志文件中打印错误并停止处理。

2) 如果稍后重用文件名,Flume将在其日志文件中打印错误并停止处理。

事实上,spoldir-Source不适合我的用例。那么,有没有办法让ftp源代码保留文件名,然后,hdfs根据文件名单独存储文件。

这是我的代理:

agent.sources = r1
agent.channels = c1
agent.sinks = k

#configure ftp source
agent.sources.r1.type = org.keedio.flume.source.mra.source.Source
agent.sources.r1.client.source = sftp
agent.sources.r1.name.server = ip
agent.sources.r1.user = user
agent.sources.r1.password = pwd
agent.sources.r1.port = 22
agent.sources.r1.knownHosts = ~/.ssh/known_hosts
agent.sources.r1.work.dir = /DATA/flume_ftp_source
agent.sources.r1.fileHeader = true
agent.sources.r1.basenameHeader = true
agent.sources.r1.inputCharset = ISO-8859-1
agent.sources.r1.flushlines = true

#configure sink s1
agent.sinks.k.type = hdfs
agent.sinks.k.hdfs.path =  hdfs://hostname:8020/user/admin/DATA/import_flume/agents/agent1/%Y/%m/%d/%H
agent.sinks.k.hdfs.filePrefix = %{basename}
agent.sinks.k.hdfs.rollCount = 0
agent.sinks.k.hdfs.rollInterval = 0
agent.sinks.k.hdfs.rollSize = 0
agent.sinks.k.hdfs.useLocalTimeStamp = true
agent.sinks.k.hdfs.batchsize =    1000000
agent.sinks.k.hdfs.fileType = DataStream

agent.channels.c1.type = memory
agent.channels.c1.capacity =  1000000
agent.channels.c1.transactionCapacity =   1000000

agent.sources.r1.channels = c1
agent.sinks.k.channel = c1

共有2个答案

和斌
2023-03-14

正如我所说,根据以下代码更新:修复了Flume FTP源,这是我如何使用%{basename}变量的方法:

##############
# COMPONENTS #
##############
myPrj.sources = source_01
myPrj.channels = channel_01
myPrj.sinks = sink_01

############
# BINDINGS #
############
myPrj.sources.source_01.channels = channel_01
myPrj.sinks.sink_01.channel = channel_01

###########
# CHANNEL #
###########
myPrj.channels.channel_01.type = memory
myPrj.channels.channel_01.capacity = 10000
myPrj.channels.channel_01.transactionCapacity = 10000

##########
# SOURCE #
##########
myPrj.sources.source_01.type = org.keedio.flume.source.ftp.source.Source

myPrj.sources.source_01.client.source = ftp
myPrj.sources.source_01.name.server = 127.0.0.1
myPrj.sources.source_01.user = myPrj
myPrj.sources.source_01.password = myPrj
myPrj.sources.source_01.port = 21

#myPrj.sources.source_01.security.enabled = true
#myPrj.sources.source_01.security.cipher = TLS
#myPrj.sources.source_01.security.certificate.enabled = true
#myPrj.sources.source_01.path.keystore = /paht/to/keystore
#myPrj.sources.source_01.store.pass = the_keyStore_password 

myPrj.sources.source_01.run.discover.delay = 5000
myPrj.sources.source_01.flushlines = false
myPrj.sources.source_01.chunk.size = 33554432

myPrj.sources.source_01.folder = /home/foo/app-flume-ftp-hdfs
myPrj.sources.source_01.file.name = flume-ftp-hdfs.ser

myPrj.sources.source_01.fileHeader = true
myPrj.sources.source_01.basenameHeader = true

# Deserializer
myPrj.sources.source_01.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
myPrj.sources.source_01.deserializer.maxBlobLength = 33554432

########
# SINK #
########
#myPrj.sinks.sink_01.type = logger
myPrj.sinks.sink_01.type = hdfs
myPrj.sinks.sink_01.hdfs.path = /user/foo/ftp_source/%Y/%m/%d/%H/%M/%{basename}
myPrj.sinks.sink_01.hdfs.filePrefix = FTP_SOURCE
myPrj.sinks.sink_01.hdfs.useLocalTimeStamp = true
myPrj.sinks.sink_01.hdfs.rollCount = 0
myPrj.sinks.sink_01.hdfs.rollInterval = 0
myPrj.sinks.sink_01.hdfs.batchSize = 100

# Data compressed
#myPrj.sinks.sink_01.hdfs.rollSize = 33554432
#myPrj.sinks.sink_01.hdfs.codeC = gzip
#myPrj.sinks.sink_01.hdfs.fileType = CompressedStream

# Data no compressed
myPrj.sinks.sink_01.hdfs.rollSize = 33554432
myPrj.sinks.sink_01.hdfs.fileType = DataStream
公西英叡
2023-03-14

我刚刚给flume ftp github项目推了一个解决方案:

菲利普·KR

关于如何修复属性%{basename}丢失的事实,有什么窍门吗?

 类似资料:
  • 我刚开始使用Apache Flume,我很难理解它到底是如何工作的。为了解释我的问题,我解释了我的需要和我做了什么。 我想在 csv 文件目录(这些文件每 5 分钟构建一次)和 HDFS 集群之间配置一个流。 我发现“假脱机目录”源和HDFS接收器是我所需要的。给我这个flume.conf文件 结果是输入文件被重命名为“.”。我本地文件系统上的“complete ”,数据以一个新名字上传到HDFS

  • 我想下载一个文件,同时保留文件的文件名。 我有: 我可以下载文件,但我下载的文件名总是“downloadFile”。pdf或下载文件。巴布亚新几内亚'。 如何保留原始文件名?谢谢

  • 在Apache Camel上工作时,每当使用camel-zip压缩文件或通过canel-ftp复制文件时,Camel会将文件移动到.Camel文件夹中,并在处理后不将其保留在源文件夹中。我希望我的文件在处理完成后留在源文件夹中。请建议如何实现相同的。 CamelContext context=new DefaultCamelContext();context.addRoutes(new Route

  • 我一直在使用本教程对图片进行人脸检测。问题是当我获取java上使用的文件路径时 我怎么能在android Studio上翻译。我试着把我的lbpcascade_frontalface.xml放在原始资源上。Cascade分类器是opencv库提供的一个类。唯一的问题是他们只加载字符串路径(在xmlfile上)。这是我的代码。 我翻译成这样的方法。 我得到opencv的一个片段错误,它告诉我文件为空

  • 问题内容: 我只想创建一个像这样的File对象 但是它给了我 FileNotFoundException的 例外我 如何在我的原始文件夹中引用一个文件 编辑: 其实我想做这样的事情 问题答案: 这是2个功能。一种是从RAW读取的,另一种是从资产读取的 并从资产文件夹

  • 问题内容: 我是Hudson / Jenkins的新手,我想知道是否有一种方法可以将Hudson的配置文件检入到源代码管理中。 理想情况下,我希望能够单击UI中的“保存配置”按钮,并将Hudson配置文件签入源代码管理。 问题答案: 有一个名为SCM Sync配置插件的插件。 原始答案 看看我对类似问题的回答。基本思想是使用filesystem-scm- plugin 来检测对xml文件的更改。您