当前位置: 首页 > 知识库问答 >
问题:

只有一个文件从kafka到hdfs与水槽

龙焱
2023-03-14

我正在尝试通过水槽从kafka将数据放入hdfs中。kafka_producer每10秒发送一条消息。我想在hdfs上的一个文件中收集所有消息。这是我使用的水槽配置,但它在hdfs上存储了许多文件(一个用于消息):

agent1.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
agent1.sources.kafka-source.zookeeperConnect = localhost:2181
agent1.sources.kafka-source.topic = prova
agent1.sources.kafka-source.groupId = flume
agent1.sources.kafka-source.channels = memory-channel
agent1.sources.kafka-source.interceptors = i1
agent1.sources.kafka-source.interceptors.i1.type = timestamp
agent1.sources.kafka-source.kafka.consumer.timeout.ms = 100
agent1.channels.memory-channel.type = memory
agent1.channels.memory-channel.capacity = 10000
agent1.channels.memory-channel.transactionCapacity = 1000
agent1.sinks.hdfs-sink.type = hdfs
agent1.sinks.hdfs-sink.hdfs.path = hdfs://localhost:9000/input
agent1.sinks.hdfs-sink.hdfs.rollInterval = 5
agent1.sinks.hdfs-sink.hdfs.rollSize = 0
agent1.sinks.hdfs-sink.hdfs.rollCount = 0
agent1.sinks.hdfs-sink.hdfs.fileType = DataStream
agent1.sinks.hdfs-sink.channel = memory-channel
agent1.sources = kafka-source
agent1.channels = memory-channel
agent1.sinks = hdfs-sink

附言我从一个文件开始.csv。kafka 生产者获取文件并选择一些感兴趣的字段,然后每 10 秒发送一个条目。Flume将条目存储在Hadoophdfs上,但存储在许多文件中(1个条目=1个文件)。我希望所有条目都在一个文件中。如何更改水槽的配置?

共有2个答案

秦城
2023-03-14

将rollInterval设置为0,因为您不希望根据时间生成不同的文件。如果您想根据数字条目s或事件进行计算,请更改rollCount值。例如,如果要在一个文件中保存10个事件或条目:

agent1.sinks.hdfs-sink.hdfs.rollInterval = 0
agent1.sinks.hdfs-sink.hdfs.rollSize = 0
agent1.sinks.hdfs-sink.hdfs.rollCount = 10
子车修平
2023-03-14

看来,flume目前确实被设置为在HDFS上为每个输入文件创建一个文件。

正如这里所建议的,您可以通过编写一个周期性的猪(或map减少)作业来处理这个问题,该作业获取所有输入文件并将它们组合起来。

减少文件数的另一个选项可能是减少入站文件的频率。

 类似资料:
  • 我正在尝试实现一个简单的Flume HDFS接收器,它将从Kafka通道获取事件,并将它们作为文本文件写入HDFS。 建筑非常简单。这些事件从twitter流式传输到kafka主题,flume hdfs sink确实会将这些事件写入hdfs。这是Kafka-制片人斯塔科弗洛问题的第二部分。 当我执行这个命令时没有出现错误,看起来运行得很好,但是我看不到hdfs中的文本文件。我无法调试或调查,因为在

  • 我正在使用水槽从kafka主题HDFS文件夹加载消息。所以, 我创建了一个主题 TT 我通过Kafka控制台制作人向 TT 发送了消息 我配置了水槽代理 FF 运行 flume agent flume-ng agent -n FF -c conf -f flume.conf - Dflume.root.logger=INFO,console 代码执行停止,没有错误,并且不会向 HDFS 写入任何内

  • 我有 kafka 集群,它从生产者那里接收 avro 事件。 我想使用flume来消费这些事件并将它们作为avro文件放在HDFS中 水槽可以吗? 有没有人有一个配置文件的例子来演示如何做? Yosi

  • 是否有一种已知的方法使用Hadoop api/spark scala在Hdfs上将文件从一个目录复制到另一个目录? 我尝试使用copyFromLocalFile,但没有帮助

  • 实际上我有两个问题,我的第一个问题是:在整个文件被水槽代理刷新后,如何使HDFS关闭文件(例如。123456789.tmp)。事实上,直到我强制水槽代理停止,文件才会关闭。我相信有一种使用以下4个参数的方法: 我的第二个问题是,我的代理flume从SFTP服务器接收文件,而我需要将每个文件名保存在hdfs中。它适用于spooldir类型,但不适用于SFTP!!有什么想法吗? 我的水槽代理配置文件如

  • 我正在为我公司的 POC 实施一个小型 hadoop 集群。我正在尝试使用Flume将文件导入HDFS。每个文件都包含如下 JSON 对象(每个文件 1 个“长”行): “objectType”是数组中对象的类型(例如:事件、用户…)。 这些文件稍后将由多个任务根据“对象类型”进行处理。 我正在使用spoolDir源和HDFS接收器。 我的问题是: > 当flume写入HDFS时,是否可以保留源文