当前位置: 首页 > 知识库问答 >
问题:

水槽内存香奈儿到HDFS水槽

古明煦
2023-03-14

我遇到了Flume的问题(Cloudera CDH 5.3上的1.5):

spoolDir source -> memory channel -> HDFS sink

我想做的是:每5分钟,大约20个文件被推送到假脱机目录(从远程存储中抓取)。每个文件包含多行,每行是一个日志(在JSON中)。文件大小在10KB到1MB之间。

当我启动代理时,所有文件都被成功推送到HDFS。1分钟后(这是我在flume.conf中设置的),文件被滚动(删除. tmp后缀并关闭)。

但是,当在假脱机目录中找到新文件时,我会收到以下消息:

org.apache.flume.source.SpoolDirectorySource: The channel is full, and cannot write data now. The source will try again after 250 milliseconds

在尝试了许多不同的配置都没有成功(增加/减少通道事务容量和容量,增加/减少batchSize等)之后,我请求您的帮助。

这是我最新的水槽配置:

# source definition
sebanalytics.sources.spooldir-source.type = spooldir
sebanalytics.sources.spooldir-source.spoolDir = /var/flume/in
sebanalytics.sources.spooldir-source.basenameHeader = true
sebanalytics.sources.spooldir-source.basenameHeaderKey = basename
sebanalytics.sources.spooldir-source.batchSize = 10
sebanalytics.sources.spooldir-source.deletePolicy = immediate
# Max blob size: 1.5Go
sebanalytics.sources.spooldir-source.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
sebanalytics.sources.spooldir-source.deserializer.maxBlobLength = 1610000000
# Attach the interceptor to the source
sebanalytics.sources.spooldir-source.interceptors = json-interceptor
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.type = com.app.flume.interceptor.JsonInterceptor$Builder
# Define event's headers. basenameHeader must be the same than source.basenameHeaderKey (defaults is basename)
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.basenameHeader = basename
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.resourceHeader = resources
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.ssidHeader = ssid

# channel definition
sebanalytics.channels.mem-channel-1.type = memory
sebanalytics.channels.mem-channel-1.capacity = 1000000
sebanalytics.channels.mem-channel-1.transactionCapacity = 10

# sink definition
sebanalytics.sinks.hdfs-sink-1.type = hdfs
sebanalytics.sinks.hdfs-sink-1.hdfs.path = hdfs://StandbyNameNode/data/in
sebanalytics.sinks.hdfs-sink-1.hdfs.filePrefix = %{resources}_%{ssid}
sebanalytics.sinks.hdfs-sink-1.hdfs.fileSuffix = .json
sebanalytics.sinks.hdfs-sink-1.hdfs.fileType = DataStream
sebanalytics.sinks.hdfs-sink-1.hdfs.writeFormat = Text
sebanalytics.sinks.hdfs-sink-1.hdfs.rollInterval = 3600
sebanalytics.sinks.hdfs-sink-1.hdfs.rollSize = 63000000
sebanalytics.sinks.hdfs-sink-1.hdfs.rollCount = 0
sebanalytics.sinks.hdfs-sink-1.hdfs.batchSize = 10
sebanalytics.sinks.hdfs-sink-1.hdfs.idleTimeout = 60

# connect source and sink to channel
sebanalytics.sources.spooldir-source.channels = mem-channel-1
sebanalytics.sinks.hdfs-sink-1.channel = mem-channel-1

共有1个答案

端木震博
2023-03-14

满通道意味着:通道无法从源接收更多的事件,因为接收器消耗这些事件的速度比源慢。

增加信道容量只会解决问题。可能的解决方案

    < li >改善水槽的处理...如果接收器是自定义的(改善/避免循环,使用更有效的后端API等)。在这种情况下,这似乎是不可能的,因为您使用的是默认的HDFS接收器。 < li >降低数据发送到源的频率。尽管如此,我猜你不想/不能这样做,因为你的加工要求。 < li >添加更多并行工作的接收器。我不确定这一点,但我可以想象Flume的设计师决定在一个单独的线程中运行每个水槽。如果这是真的,那么你可以尝试多个平行的HDFS水槽。为了将数据分成几个接收器,您必须使用不同于默认复制选择器的多路复用选择器。

哼!

 类似资料:
  • 我想使用 flume 将数据从 hdfs 目录传输到 hdfs 中的目录,在此传输中,我想应用处理形态线。 例如:我的来源是 我的水槽是 有水槽可能吗? 如果是,源水槽的类型是什么?

  • 我正在尝试使用hdfs水槽运行水槽。hdfs在不同的机器上正常运行,我甚至可以与水槽机器上的hdfs交互,但是当我运行水槽并向其发送事件时,我收到以下错误: 同样,一致性不是问题,因为我可以使用hadoop命令行与hdfs交互(水槽机不是datanode)。最奇怪的是,在杀死水槽后,我可以看到tmp文件是在hdfs中创建的,但它是空的(扩展名仍然是. tmp)。 关于为什么会发生这种情况的任何想法

  • 我们可以为HDFS Sink添加分隔符吗?写入文件时,我们如何添加记录分隔符? 以下是配置:-

  • https://cwiki.apache.org/confluence/display/FLUME/Getting 开始的页面说 HDFS sink 支持追加,但我无法找到有关如何启用它的任何信息,每个示例都在滚动文件上。因此,如果可能的话,我将不胜感激有关如何将水槽附加到现有文件的任何信息) 使现代化 可以将所有滚动属性设置为0,这将使flume写入单个文件,但它不会关闭文件,新记录对其他进程不

  • 我是使用Flume和Hadoop的新手,所以我试图尽可能设置一个最简单的(但有些帮助/现实的)例子。我在虚拟机客户端中使用HortonWorks沙盒。在完成了教程12(包括设置和使用Flume)之后,一切看起来都正常了。 所以我建立了自己的flume.conf 从apache访问日志中读取 使用内存通道 写入HDFS 够简单吧?这是我的会议文件 我见过几个人在给HDFS写信时遇到问题,大多数情况下

  • 我试图配置水槽与HDFS作为汇。 这是我的flume.conf文件: 我的hadoop版本是: 水槽版本是: 我已将这两个jar文件放在flume/lib目录中 我将hadoop common jar放在那里,因为在启动flume代理时出现以下错误: 现在代理开始了。这是启动日志: 但是当一些事件发生时,下面的错误出现在水槽日志中,并且没有任何东西被写入hdfs。 我缺少一些配置或jar文件?