当前位置: 首页 > 知识库问答 >
问题:

水槽HDFS接收器:从文件名中删除时间戳

端木鹏
2023-03-14

我已经为我的应用程序配置了水槽代理,其中源是Spooldir,接收器是HDFS

我能够在hdfs中收集文件。

代理配置为:

agent.sources = src-1
agent.channels = c1
agent.sinks = k1

agent.sources.src-1.type = spooldir
agent.sources.src-1.channels = c1
agent.sources.src-1.spoolDir = /home/Documents/id/
agent.sources.src-1.deserializer=org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
agent.sources.src-1.fileHeader=true
agent.channels.c1.type = file
agent.sources.src-1.basenameHeader=true
agent.sources.src-1.basenameHeaderKey=basename

agent.sinks.k1.type = hdfs
agent.sinks.k1.channel = c1
agent.sinks.k1.hdfs.path =hdfs://localhost:8020/user/flume/events/
agent.sinks.k1.hdfs.filePrefix = %{basename}
agent.sinks.k1.hdfs.fileHeader = true
agent.sinks.k1.hdfs.fileType = DataStream

我有以下格式的hdfs文件:

/flume/events/file1.txt。1411543838171/水槽/事件/文件2.txt.1411544272696

我想知道我可以删除时间戳(1411543838171) /唯一号码,这是自动生成的每个事件的文件名?

共有1个答案

唐修能
2023-03-14

仅通过配置似乎无法删除时间戳。如果您查看HDFS Sink的工作原理,您会发现以下内容:

long counter = fileExtensionCounter.incrementAndGet();
String fullFileName = fileName + "." + counter;

其中fileExtension计数器fileExtension计数器=新原子长(clock.currentTimeMillis());

你可以在这里和这里为作家检查水槽的代码。

如果你想做的是把更多的事件放在一个文件中,那么你可以看看接收器的属性

  • rollTime
  • roll大小
  • 滚动计数
  • 批量大小
 类似资料:
  • 我已将Flume源配置为Spooldir类型。我有很多CSV文件,.xl3和.xls,我希望我的Flume代理将所有文件从假脱机程序加载到HDFS接收器。然而,水槽代理返回异常 这是我对水槽源的配置: 和我的HDFS接收器:

  • 我正在使用flume将本地文件源到HDFS接收器,下面是我的conf: 我使用用户“flume”来执行这个conf文件。 但它显示我找不到本地文件,权限被拒绝 如何解决这个问题?

  • 我想使用 flume 将数据从 hdfs 目录传输到 hdfs 中的目录,在此传输中,我想应用处理形态线。 例如:我的来源是 我的水槽是 有水槽可能吗? 如果是,源水槽的类型是什么?

  • 我正在尝试实现一个简单的Flume HDFS接收器,它将从Kafka通道获取事件,并将它们作为文本文件写入HDFS。 建筑非常简单。这些事件从twitter流式传输到kafka主题,flume hdfs sink确实会将这些事件写入hdfs。这是Kafka-制片人斯塔科弗洛问题的第二部分。 当我执行这个命令时没有出现错误,看起来运行得很好,但是我看不到hdfs中的文本文件。我无法调试或调查,因为在

  • 我尝试每5分钟用其他事件刷新. tmp文件,我的源代码很慢,需要30分钟才能在我的hdfs接收器中获取128MB文件。 flume hdfs接收器中是否有任何属性,我可以在将.tmp文件滚动到hdfs之前控制该文件的刷新率。 我需要它使用. tmp文件中的hive表查看HDFS中的数据。 目前我正在查看来自的数据。tmp文件,但是。由于卷大小为128MB,tmp文件长时间不刷新。

  • 我正在尝试通过水槽从kafka将数据放入hdfs中。kafka_producer每10秒发送一条消息。我想在hdfs上的一个文件中收集所有消息。这是我使用的水槽配置,但它在hdfs上存储了许多文件(一个用于消息): 附言我从一个文件开始.csv。kafka 生产者获取文件并选择一些感兴趣的字段,然后每 10 秒发送一个条目。Flume将条目存储在Hadoophdfs上,但存储在许多文件中(1个条目