当前位置: 首页 > 知识库问答 >
问题:

Apache Flume:kafka.consumer.消费者时间异常

柴泰平
2023-03-14

我正在尝试使用Apache Flume构建管道:Spooldir-

事件毫无问题地进入kafka主题,我可以使用kafkacat请求看到它们。但是kafka通道无法通过接收器将文件写入hdfs。错误是:

等待来自 Kafka 的数据时超时

完整日志:

2016-02-26 18:25:17,125 (SinkRunner-PollingRunner-DefaultSinkProcessor-SendThread(zoo02:2181)) [DEBUG - org.apache.zookeeper.ClientCnxn$SendThread.readResponse(ClientCnxn.java:717)] 收到会话 id 的 ping 响应:0ms 后0x2524a81676d02aa

2016-02-26 18:25:19127(SinkRunner PollingRunner DefaultSinkProcessor SendThread(zoo02:2181))[DEBUG-org.apache.zookeeper.ClientCnxn$SendThad.readResponse(ClientCnxn.java:717)]在1ms后获得会话ID:0x2524a81676d02aa的ping响应

2016-02-26 18:25:21,129(sink runner-polling runner-DefaultSinkProcessor-send thread(zoo 02:2181))[DEBUG-org . Apache . zookeeper . client cnxn $ send thread . read response(client cnxn . Java:717)]0 ms后得到sessionid: 0x2524a81676d02aa的ping响应

2016-02-26 18:25:21,775 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:327)] 在等待数据来自 Kafka 时超时 kafka.consumer.ConsumerTimeoutException at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:69) at kafka.consumer.ConsumerIterator.makeNext(ConsumerIterator.scala:33) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:306) at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113) at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95) at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:374) atorg.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) at java.lang.Thread.run(Thread.java:745)

我的FlUME配置是:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c2

# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/alex/spoolFlume

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path =  hdfs://10.12.0.1:54310/logs/flumetest/
a1.sinks.k1.hdfs.filePrefix = flume-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text

a1.channels.c2.type   = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 1000
a1.channels.c2.brokerList=kafka10:9092,kafka11:9092,kafka12:9092
a1.channels.c2.topic=flume_test_001
a1.channels.c2.zookeeperConnect=zoo00:2181,zoo01:2181,zoo02:2181

# Bind the source and sink to the channel
a1.sources.r1.channels = c2
a1.sinks.k1.channel = c2

使用内存通道而不是Kafka通道一切正常。

提前感谢大家的任何想法!

共有3个答案

公羊渝
2023-03-14

我阅读了flume的源代码,发现flume读取“consumer.timeout.ms”的键“timeout”的值。

因此,您可以像这样配置“consumer.timeout.ms”的值:

agent1.channels.kafka_channel.超时=-1

刘乐童
2023-03-14

Kafka的消费者配置类具有“consumer.timeout.ms”配置属性,Kafka默认将其设置为-1。任何新的Kafka消费者都应该使用合适的值覆盖该属性。

以下是Kafka文件的参考:

consumer.timeout.ms     -1  
By default, this value is -1 and a consumer blocks indefinitely if no new message is available for consumption. By setting the value to a positive integer, a timeout exception is thrown to the consumer if no message is available for consumption after the specified timeout value.

当Flume创建Kafka通道时,它正在设置超时。ms值设置为100,如信息级Flume日志所示。这解释了为什么我们会看到大量这些ConsumerTimeoutException。

 level: INFO Post-validation flume configuration contains configuration for agents: [agent]
 level: INFO Creating channels
 level: DEBUG Channel type org.apache.flume.channel.kafka.KafkaChannel is a custom type
 level: INFO Creating instance of channel c1 type org.apache.flume.channel.kafka.KafkaChannel
 level: DEBUG Channel type org.apache.flume.channel.kafka.KafkaChannel is a custom type
 level: INFO Group ID was not specified. Using flume as the group id.
 level: INFO {metadata.broker.list=kafka:9092, request.required.acks=-1, group.id=flume, 
              zookeeper.connect=zookeeper:2181, **consumer.timeout.ms=100**, auto.commit.enable=false}
 level: INFO Created channel c1

按照 Kafka 频道设置上的 Flume 用户指南,我试图通过指定以下内容来覆盖此值,但这似乎不起作用:

agent.channels.c1.kafka.consumer.timeout.ms=5000

此外,我们做了一个负载测试,不断地通过通道冲击数据,在测试过程中没有出现这种异常。

荆树
2023-03-14

ConsumerTimeoutException意味着长时间没有新消息,并不意味着Kafka的连接超时。

http://kafka.apache.org/documentation.html

消费者超时。ms-1如果在指定的时间间隔之后没有消息可供使用,则向使用者抛出超时异常

 类似资料:
  • 在使用Spring Kafka Consumer时,我有时会收到以下错误消息。如代码片段所示,我至少实现了一次语义 1)我的疑问是,我是否错过了来自消费者的任何信息? 2) 我需要处理这个错误吗。由于 org.apache.kafka.clients.consumer.提交失败异常:无法完成偏移提交,因为消费者不是自动分区分配的活动组的一部分;消费者很可能被踢出组。 我的SpringKafka消费

  • 我试图做一个简单的poc与Spring启动与版本(2.3.7发布)的SpringKafka,以实现消费者批处理的工作原理,以及如何再平衡工作,如果消费者需要更多的流转时长,因为我是全新的这个消息系统。 现在我看到kafka重新平衡单个消费者(不允许并发)的问题。 这些是我设置的max.poll.interval属性。ms=50000和factory.getContanerProperties。se

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 试图理解消费者补偿和消费者群体补偿之间的关系。 下面的堆栈溢出链接提供了对消费群体补偿管理的极好理解<什么决定Kafka消费补偿?现在问题来了, 情节: 我们在一个消费者组组1中有消费者(c1)。 偏移值是否将存储在消费者(c1)和组(group1)两个级别?或者如果消费者属于任何消费者组,偏移量将存储在仅消费者组级别? 如果偏移值将存储在两个级别中,它是否是消费者级别偏移值将覆盖消费者组级别偏移

  • 我是Spring-Kafka的新手,在使用Spring Kafka RetryTemplate处理kafka消息期间,尝试在失败或任何异常的情况下实现重试。 我使用了以下代码: //这是KafkaListenerContainerFactory: 重试模板 这是消费者工厂 当任何异常发生时,它会按照重试策略按预期重试。一旦max重试耗尽,它就会调用恢复回调方法。但很快,它会给出“java.lang