当前位置: 首页 > 知识库问答 >
问题:

Kafka streams 1.0:以高max.poll.interval.ms和session.timeout.ms处理超时

司马念
2023-03-14

我使用的是一个使用Kafka Streams1.0和Kafka Broker1.0.1的无状态处理器

session.timeout.ms=15000

heartbeat.interval.ms=3000//将其设置为1/3会话。超时

max.poll.interval.ms=integer.max_value//使其更大,因为我正在进行密集的计算操作,处理1条kafka消息(NLP操作)可能需要10分钟

尽管有这样的配置和我对kafka超时配置工作原理的理解,但我看到消费者每隔几秒钟就会重新平衡一次。

我已经阅读了下面的文章和其他stackoverflow问题。关于如何调优长时间操作,避免会话超时太长导致故障检测太晚,然而,我仍然看到意想不到的行为,除非我误解了什么。

KIP-62

对于消费者环境设置,我有8台机器,每台16个代码,从一个主题消耗100个分区,我正在遵循这里的汇合文档推荐的实践。

有什么线索吗?

共有1个答案

凌昕
2023-03-14

我想出来了。在对kafka streams客户端和代理进行大量调试并启用详细的日志记录之后,结果是两件事:

  1. 在streams 1.1.0(这里)中有一个严重的bug,所以我将客户端版本从1.1.0升级到1.0.1
  2. 我将使用者属性default.deserialization.exception.handler的值从org.apache.kafka.streams.errors.LogandFailExceptionHandler更新为org.apache.kafka.streams.errors.LogandContinueExceptionHandler

在以上2个改变后,一切都是如此完美,没有重启,我正在使用格拉法纳监控重启,在过去的48小时,没有一个重启发生。

我可能会做更多的故障排除,以确保以上两个项目中的哪一个使真正的修复,但我正在赶时间部署到生产,所以如果有人想从那里开始,继续,否则,一旦我有时间将做进一步的分析和更新答案!

很高兴能把它修好!!!

 类似资料:
  • 我不清楚为什么我们同时需要和以及我们何时使用其中之一或两者?这两个设置似乎都指出了协调器在假定消费者已死亡之前等待从消费者获取心跳的时间上限。 另外,它在基于KIP-62的0.10.1.0+版本中的表现如何?

  • 我试图理解以下两种合流的消费者配置的默认值是如何一起工作的。 max.poll.interval.ms-根据汇流文档,默认值为300,000毫秒 session.timeout.ms-根据汇合文档,默认值为10,000 ms 例如,假设消费者每3,000 ms发送一次心跳,我的第一次轮询发生在时间戳t1,然后第二次轮询发生在t1+20,00 ms。那么是否会因为超出“session.timeout

  • 我试图做一个简单的poc与Spring启动与版本(2.3.7发布)的SpringKafka,以实现消费者批处理的工作原理,以及如何再平衡工作,如果消费者需要更多的流转时长,因为我是全新的这个消息系统。 现在我看到kafka重新平衡单个消费者(不允许并发)的问题。 这些是我设置的max.poll.interval属性。ms=50000和factory.getContanerProperties。se

  • 我正在开发一个windows应用程序,它以600Hz的频率从传感器接收数据。在五分之二的情况下,我的IO线程成功地从传感器读取4字节的数据,并将其传递给GUI线程。 问题是五次中有三次,QSerialPort有无法解释的超时,其中QSerialPort的waitForReadyRead()返回false和serial。errorString()有超时错误。在这种情况下,它将永远不会读取数据。如果我

  • 本文向大家介绍Java  HttpURLConnection超时和IO异常处理,包括了Java  HttpURLConnection超时和IO异常处理的使用技巧和注意事项,需要的朋友参考一下 最近同步数据的时候发现了一个问题,我本身后台插入数据后给其他部门后台做同步。说简单一点其实就是调用对方提供的接口,进行HTTP请求调用。然后后面发现问题了。HTTP请求的话,有可能请求超时,中断失败,IO异常

  • 问题内容: 我试图用一个批量一些 KTable 值,并送他们。似乎30秒钟超出了使用者超时间隔,在此间隔之后,Kafka认为该使用者已失效并释放了分区。 我尝试提高 轮询 和 提交间隔 的频率来避免这种情况: 不幸的是,这些错误仍在发生: (很多) 其次是: 显然,我需要更频繁地将心跳发送回服务器。怎么样? 我的拓扑是: 该 KTable 是关键,每30秒分组值。在 Processor.init(