当前位置: 首页 > 知识库问答 >
问题:

组织。阿帕奇。Kafka。常见的错误。超时异常

贺俊楚
2023-03-14

我有两个代理1.0.0Kafka集群,我正在针对这个Kafka运行1.0.0Kafka流API应用程序。我增加了制片人的要求。暂停。毫秒到5分钟来修复生产者超时异常。

目前,在运行一段时间后,我发现以下两种类型的异常。我试图按照ApacheKafka中的建议修复这些异常:TimeoutException,然后什么都不起作用‏ 但不完整的解决方案就在这里。建议使用此解决方案(减少生产批量)。请帮忙。例外1

2017-12-08 13:11:55,129 ERROR o.a.k.s.p.i.RecordCollectorImpl [sample-app-0.0.1-156ec0d4-6d7c-40b0-a493-370f8d9a092c-StreamThread-1] task [2_0] Error sending record (key 5a12c529e532af0b84f5d937 value com.kafka.streams.SampleEvent@54a6900d timestamp 1512536799387) to topic abc due to org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms.; No more records will be sent and no more offsets will be recorded for this task.
2017-12-08 13:11:55,131 ERROR o.a.k.s.p.i.AssignedTasks [sample-app-0.0.1-156ec0d4-6d7c-40b0-a493-370f8d9a092c-StreamThread-1] stream-thread [sample-app-0.0.1-156ec0d4-6d7c-40b0-a493-370f8d9a092c-StreamThread-1] Failed to process stream task 2_0 due to the following error: org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=2_0, processor=KSTREAM-SOURCE-0000000004, topic=Sample-Event, partition=0, offset=508417
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:232)
        at org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:403)
        at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:317)
        at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:942)
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:822)
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774)
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744)
Caused by: org.apache.kafka.streams.errors.StreamsException: task [2_0] Abort sending since an error caught with a previous record (key 5a12c529e532af0b84f5d937 value com.kafka.streams.SampleEvent@54a6900d timestamp 1512536799387) to topic abc due to org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms..
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:819)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:760)
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:100)
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:78)
        at org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:87)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
        at org.apache.kafka.streams.kstream.internals.KStreamTransform$KStreamTransformProcessor.process(KStreamTransform.java:56)
        at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
        at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
        at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
        at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
        at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
        ... 6 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 60000 ms.

例外2

2017-12-11 11:08:35,257 ERROR o.a.k.s.p.i.RecordCollectorImpl [kafka-producer-network-thread | sample-app-0.0.1-030b5133-df00-4abd-a3de-8bfab114f626-StreamThread-1-producer] task [2_0] Error sending record (key 5a12c529e532af0b84f5d937 value com.kafka.streams.SampleEvent@1758de61 timestamp 1512795449471) to topic abc due to org.apache.kafka.common.errors.TimeoutException: Expiring 14 record(s) for abc-0: 122597 ms has passed since last append; No more records will be sent and no more offsets will be recorded for this task.
2017-12-11 11:08:56,001 ERROR o.a.k.s.p.i.AssignedTasks [sample-app-0.0.1-030b5133-df00-4abd-a3de-8bfab114f626-StreamThread-1] stream-thread [sample-app-0.0.1-030b5133-df00-4abd-a3de-8bfab114f626-StreamThread-1] Failed to commit stream task 2_0 due to the following error: org.apache.kafka.streams.errors.StreamsException: task [2_0] Abort sending since an error caught with a previous record (key 5a12c529e532af0b84f5d937 value com.kafka.streams.SampleEvent@1758de61 timestamp 1512795449471) to topic abc due to org.apache.kafka.common.errors.TimeoutException: Expiring 14 record(s) for abc-0: 122597 ms has passed since last append.
        at org.apache.kafka.streams.processor.internals.RecordCollectorImpl$1.onCompletion(RecordCollectorImpl.java:118)
        at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204)
        at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187)
        at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627)
        at org.apache.kafka.clients.producer.internals.Sender.sendProducerData(Sender.java:287)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:238)
        at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 14 record(s) for abc-0: 122597 ms has passed since last append

共有3个答案

松锐藻
2023-03-14

第一个问题是由于这个原因:(生产者发送连续的心跳,它将等待60000毫秒(默认值)的元数据。如果元数据在指定的时间内不存在,则会引发streams timeout异常。要解决这个问题,请将kafka producer config(ProducerConfig.MAX_BLOCK_MS_config)添加到一个60000ms的值过滤器中。这将解决问题。

慕容渊
2023-03-14

这看起来是在预期主题尚未创建时经常发生的事情。尝试进一步查看日志文件。

您还可以显式地使用管理客户端来检查存在哪些主题。

戎亦
2023-03-14

我们遇到了类似的问题,我们解决了第一个问题:max.block.ms高于当前配置的值。

第二期作者:增加批次。尺寸和减少逗留时间。Kafka制作人端的ms(可能会增加延迟)。增加批次。size将发送更多批次,每个批次中的消息更少

 类似资料:
  • 我试图从JMS源读取数据,并将它们推送到KAFKA主题中,几个小时后,我观察到推送到KAFKA主题的频率几乎为零,经过一些初步分析,我在FLUME日志中发现以下异常。 my flume显示max.request的当前设置值(在日志中)。尺寸为1048576,明显小于1399305,增加了此最大要求。大小可能会消除这些异常,但我无法找到更新该值的正确位置。 我的水槽。配置, 任何帮助都将不胜感激!!

  • 我的应用程序使用一台机器上运行的Kafka服务器上的消息,然后将它们转发给另一台在其他实例上运行的远程Kafka服务器。在我将应用程序部署到Cloud Foundry并向第一台Kafka服务器发送消息后,应用程序按预期工作。消息被消费并转发到远程Kafka。 然而,在这之后,我在Cloud Foundry(以及在我的本地机器上以较慢的速度)中得到了下面的无限循环异常: StackTrace: 我的

  • 我正试图找出这两种设置之间的区别。大小和缓冲区。Kafka制作人的记忆。 据我所知。大小:这是可以发送的批次的最大大小。 文档描述了缓冲区。memory as:生产者可以用来缓冲等待发送的记录的内存字节。 我不明白这两者之间的区别。有人能解释一下吗? 谢啦

  • 我正在编写一个Flink-Kafka集成程序,如下所示,但Kafka出现超时错误: 从终端我可以看到Kafka和zookeeper正在运行,但当我从Intellij运行上面的程序时,它显示了这个错误: 组织。阿帕奇。Kafka。常见的错误。TimeoutException:在60000毫秒后更新元数据失败。2017年12月15日14:42:50作业执行切换到失败状态。[错误](run-main-0

  • 我们需要的是直接的API来设置和使用集群消息队列。我们最初的计划是使用Camel在集群JMS或ActiveMQ队列上进行消费/生产。Kafka如何使这项任务变得更容易?在任何一种情况下,应用程序本身都将在WebLogic服务器上运行。 消息传递将是点对点类型,其中有多个相同服务的实例在运行,但根据负载平衡策略,只有一个实例应该处理消息并发出结果。消息队列也是群集的,因此服务实例或队列实例的失败都不

  • 我有两个Spring Boot服务A和B。还有一个外部服务C。这是请求路径: 网络浏览器 外部服务正在返回一个返回前端的资源。为了在A、B和C之间进行通信,我使用了Rest模板。进入Web应用程序时一切都很好,但是一旦我运行并行运行的BDD测试(9个线程),我就会在调用外部服务C时在服务B中获得NoHttp响应异常。 这是我的Rest模板配置: 我已经尝试调用但没有帮助。 让我补充一点,从服务B到