当前位置: 首页 > 知识库问答 >
问题:

水槽以不一致的方式下沉数据

令狐翰
2023-03-14

我有一个问题。我使用apache flume从txt文件中读取日志,并将其存储到hdfs中。不知何故,一些记录在阅读时被跳过了。我正在使用fileChannel,请检查以下配置。

agent2.sources = file_server
agent2.sources.file_server.type=exec
agent2.sources.file_server.command = tail -F /home/datafile/error.log
agent2.sources.file_server.channels = fileChannel


agent2.channels = fileChannel
agent2.channels.fileChannel.type=file
agent2.channels.fileChannel.capacity = 12000
agent2.channels.fileChannel.transactionCapacity = 10000
agent2.channels.fileChannel.checkpointDir=/home/data/flume/checkpoint
agent2.channels.fileChannel.dataDirs=/home/data/flume/data


# Agent2 sinks
agent2.sinks = hadooper loged
agent2.sinks.hadooper.type = hdfs
agent2.sinks.loged.type=logger
agent2.sinks.hadooper.hdfs.path = hdfs://localhost:8020/flume/data/file
agent2.sinks.hadooper.hdfs.fileType = DataStream
agent1.sinks.hadooper.hdfs.writeFormat = Text
agent2.sinks.hadooper.hdfs.writeFormat = Text
agent2.sinks.hadooper.hdfs.rollInterval = 600
agent2.sinks.hadooper.hdfs.rollCount = 0
agent2.sinks.hadooper.hdfs.rollSize = 67108864
agent2.sinks.hadooper.hdfs.batchSize = 10
agent2.sinks.hadooper.hdfs.idleTimeout=0
agent2.sinks.hadooper.channel = fileChannel
agent2.sinks.loged.channel = fileChannel
agent2.sinks.hdfs.threadsPoolSize = 20

请帮帮忙。

共有1个答案

吕钧
2023-03-14

我认为问题是你使用了两个接收器,从一个通道读取两个接收器;在这种情况下,两个接收器之一读取的Flume事件不会被另一个接收器读取,反之亦然。

如果您希望两个接收器都接收相同Flume事件的副本,则需要为每个接收器创建一个专用通道。创建这些通道后,默认的通道选择器(ReplicatingChannelSelector)将在每个通道中创建一个副本。

 类似资料:
  • 我试图配置水槽与HDFS作为汇。 这是我的flume.conf文件: 我的hadoop版本是: 水槽版本是: 我已将这两个jar文件放在flume/lib目录中 我将hadoop common jar放在那里,因为在启动flume代理时出现以下错误: 现在代理开始了。这是启动日志: 但是当一些事件发生时,下面的错误出现在水槽日志中,并且没有任何东西被写入hdfs。 我缺少一些配置或jar文件?

  • 我使用flume将服务器日志中的数据传输到hdfs中。但是当数据流入hdfs时,它首先创建。tmp文件。在配置中有没有一种方法?可以隐藏tmp文件,或者通过在前面附加一个.来改变其名称。我的收款代理文件看起来像- 任何帮助都将不胜感激。

  • 我的项目有一个要求。我必须使用水槽收集日志数据,并且必须将数据输入到hive表中。 在这里,我需要将放置在文件夹中的文件收集到hdfs中,我正在使用Spooldir进行。在此之后,我需要处理这些文件并将输出放在hive文件夹中,以便立即查询数据。 我是否可以使用 sink 处理源文件,使放置在 hdfs 中的数据已经处理为所需的格式。? 谢了,萨希

  • 我有一个假脱机目录,所有json文件都在其中,每秒钟都会有传入的文件被添加到这个目录中,我必须反序列化传入的json文件,获取requires字段并将其附加到HDFS目录中。 我所做的是我创建了一个 flume conf 文件,其中将假脱机目录中的文件作为源,并使用 1 个接收器将 json 文件直接放入 HDFS 中。 我必须在Sink之前将这个json转换成结构化格式,并将其放入HDFS。最重

  • 我正在尝试使用hdfs水槽运行水槽。hdfs在不同的机器上正常运行,我甚至可以与水槽机器上的hdfs交互,但是当我运行水槽并向其发送事件时,我收到以下错误: 同样,一致性不是问题,因为我可以使用hadoop命令行与hdfs交互(水槽机不是datanode)。最奇怪的是,在杀死水槽后,我可以看到tmp文件是在hdfs中创建的,但它是空的(扩展名仍然是. tmp)。 关于为什么会发生这种情况的任何想法

  • 当hdfs不可用时,是否有方法确保数据安全?场景是:kafka源,flume内存通道,hdfs接收器。如果水槽服务关闭了,它是否可以存储主题分区的偏移量,并在恢复后从正确的位置消费?