当前位置: 首页 > 知识库问答 >
问题:

超过内存事务容量后,Flume无法恢复

翟丰茂
2023-03-14

我正在创建一个 Flume 代理的概念验证,该代理将缓冲事件,并在接收器不可用时停止使用源中的事件。仅当接收器再次可用时,才应处理缓冲的事件,然后源重新启动消耗。

为此,我创建了一个简单的代理,它从SpoolDir读取并写入文件。为了模拟接收器服务关闭,我更改了文件权限,以便Flume无法写入。然后我启动Flume,一些事件被缓冲在内存通道中,当通道容量已满时,它将停止消耗事件,正如预期的那样。一旦文件变为可写,接收器就能够处理事件,Flume就会恢复。然而,只有在没有超过事务容量时,这才有效。一旦超过事务容量,Flume将永远无法恢复,并不断写入以下错误:

2015-10-02 14:52:51,940 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - 
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to 
deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to process transaction
    at org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:218)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Take list for MemoryTransaction, 
capacity 4 full, consider committing more frequently, increasing capacity, or 
increasing thread count
    at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:96)
    at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
    at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
    at org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:191)
    ... 3 more

一旦内存中缓冲的事件数超过事务容量 (4),就会发生此错误。我不明白为什么,因为文件输出的 batchSize 是 1,所以它应该一个接一个地取出事件。

这是我正在使用的配置:

agent.sources = spool-src
agent.channels = mem-channel
agent.sinks = fileout

agent.sources.spool-src.channels = mem-channel
agent.sources.spool-src.type = spooldir
agent.sources.spool-src.spoolDir = /tmp/flume-spool
agent.sources.spool-src.batchSize = 1

agent.channels.mem-channel.type = memory
agent.channels.mem-channel.capacity = 10
agent.channels.mem-channel.transactionCapacity = 4

agent.sinks.fileout.channel = mem-channel
agent.sinks.fileout.type = file_roll
agent.sinks.fileout.sink.directory = /tmp/flume-output
agent.sinks.fileout.sink.rollInterval = 0
agent.sinks.fileout.batchSize = 1

我已经使用不同的通道容量值测试了此配置

共有1个答案

鲜于雨石
2023-03-14

在水槽邮件列表上,有人告诉我可能是这个错误影响了我的概念验证。该错误需要批量大小为100,即使它在配置中的指定不同。我用源代码重新运行了测试

 类似资料:
  • 和我的CSS:

  • 我试图使用jdbctemplate调用java存储过程,但大多数时候我都遇到了以下异常。 在org.springframework.jdbc.support.sqlstatesqlexceptiontranslator.doTranslator(sqlstatesqlexceptiontranslator.java:98) 在org.springframework.jdbc.support.abs

  • 问题内容: 我有一个从MS-SQL2005 DB调用存储过程的Web服务。我的Web服务在对我拥有的一个存储过程的调用上超时(该过程已经生产了几个月,没有超时),因此我尝试在Query Analyzer中运行查询,该查询也超时。我决定删除并重新创建存储过程,而无需更改代码,它又开始执行。 问题: 这通常是我的存储过程的TSQL中的错误吗? -或者- 有没有人看到这并发现它是由存储过程的编译中的某些

  • 我正在尝试使用Log4J Flume appender通过Flume将事件从Log4J 1x获取到HDFS。创建了两个附加器FILE和水槽。它适用于文件附加器,但使用水槽附加器,程序只是挂在Eclipse中。Flume工作正常,我能够使用avro客户端向avro源发送消息并在HDFS中查看消息。但是,它没有与Log4J 1x集成。 我没有看到任何异常,除了下面在log.out中。 从水槽控制台 如

  • 我试图从SSRS服务器读取报告,问题是我的内存流不能超过65536字节。 到目前为止,我已经尝试过使用内存流,但尚未成功设置其容量,然后再阅读报告本身 上面的MemoryStream必须在我读取文件之前增加它的容量。 我试过在我的应用程序中玩。配置,但我不知道从哪里开始设置内存流的字节容量

  • 我正在试图了解K8S吊舱内存分配的行为,但到目前为止,我在互联网上读到的材料没有什么好运气。 我的问题是,如果我有一个用以下值定义的内存POD模板 假设我的应用程序突然需要更多内存,POD分配了4Gi(从2Gi初始内存)来完成任务。POD是否会将其获取的额外2Gi返还给底层操作系统,并在任务完成后再次成为2Gi POD,或者在任务完成后将作为具有4Gi内存的POD使用。 我的应用程序是一个在Apa