https://www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html提到,“只要使用者定期发送心跳,它就被认为是活动的、良好的,并且正在处理来自其分区的消息。事实上,轮询消息的行为是导致使用者发送这些心跳的原因。如果使用者停止发送心跳的时间足够长,它的会话将超时,组协调器将认为它已死亡并触发重新平衡。”
类似地,https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/kafkaconsumer.html指定“代理将通过使用心跳机制自动检测测试组中失败的进程。使用者将定期自动ping集群,这让集群知道它是活动的。只要使用者能够做到这一点,它就被认为是活动的,并保留从分配给它的分区消耗的权利。如果它停止心跳的时间比session.timeout.ms长,那么它将被认为是死亡的,它的分区将被分配给另一个进程。”
在我的应用程序中,在调用另一个poll()之前,处理从上一个poll()接收的消息可能需要多达几个小时。注意:我禁用自动提交是因为我并不总是知道处理所有以前的消息需要多长时间。
a)这是否会导致小组协调人认为消费者已死亡或不活跃?
b)是否有其他方法向组协调器发送心跳消息以保持会话活动?
c)session.timeout.ms在这里对保持使用者活动/活动是否有任何影响?
a)是的,如果调用poll()
的时间超过session.timeout.ms
,Kafka认为使用者已死亡。
b)作为另一种选择,您可以在处理期间调用poll()
(即与处理交错)来触发心跳(并在每次“真正”轮询之前进行搜索)。使用额外的处理线程也是可能的,允许主线程定期轮询以发送心跳。但是,您需要确保检测到处理线程上的故障(正确执行此操作很棘手)!
c)您可以增加超时值,但是,这可能不是您想要的,就好像您的使用者失败了,这个失败很晚才被检测到。
你所描述的问题实际上是已知的,消费者的行为在未来可能会改变。已经有关于它的讨论了。详见KIP-62。
更新
由于Kafka0.10.1
使用者有两个配置参数:max.poll.interval.ms
和session.timeout.ms
。第一个是连续两次轮询之间的最大时间,而第二个是心跳超时。心跳在一个额外的线程中发送,因此现在与调用poll()
解耦。因此,增加max.poll.interval.ms
不会产生不能快速检测到整个客户端故障(无心跳)的负面影响。
这里给出了一个普通web应用程序的例子。 传统上,我们使用会话并设置超时=30分钟。如果会话到期,我们将重定向用户登录。(当用户/浏览器与Web应用程序交互时,过期时间将延长) 使用JWT,如何实现这一点? 我知道一些关于“令牌刷新”的东西,当短期令牌到期时,它会使用refresh-token刷新一个新的令牌。 但看起来它并不关心用户是否与网络应用程序交互。因此,只要刷新令牌有效,浏览器总是可以获
问题内容: 我将Node.js与MongoDB结合使用,也将Monk用于数据库访问。我有以下代码: 关于此代码,我有两个问题: 我看到执行时间,并且“文件已保存!” 首先输入字符串,然后在控制台中看到朋友的名字。这是为什么?我不应该先看名字然后再看执行时间吗?是否因为Node.js的异步特性? 名称在控制台中的打印速度非常慢,速度就像两秒钟内出现一个名称一样。为什么这么慢?有没有办法使过程更快?
我们正在spring boot(带有嵌入式tomcat)和spring Cloud上运行微服务。这意味着服务发现,定期的健康检查和响应这些健康检查的服务,……我们还有spring boot admin server用于监控,我们可以看到所有服务都运行正常。目前仅在测试环境中运行... 我们的一些微服务很少被调用(假设每两天调用一次),但是仍然有定期的健康检查。当这些服务的REST api在如此长的
我的Coroutine运行在主线程上,我在Coroutine上下文中指定了主线程: 以下是我的: 如果我的coroutine上下文运行在主线程上,为什么我仍然得到错误?
我正在尝试将excel文件转换为XSSFWorkbook,我有大约7000行和大约145列。将excel文件转换为第2行的XSSFWorkbook大约需要15分钟,代码如下:- 我不想向XFFSWorkbook添加7000行,只想在第2行转换时向XFFSWorkbook添加30行? 如果没有,如何减少将excel转换为XSSFWorkbook所需的时间?
我们有一个问题,有时调用‘轮询’方法的新KafkaConsumer挂起长达20到30分钟后,三个kafka经纪人中的一个得到重新启动! 我们使用的是3 broker kafka设置(0.9.0.1)。我们的消费者进程使用新的Java KafkaConsumer-API,并且我们将分配给特定的TopicPartition。 由于不同的原因我不能在这里展示真正的代码,但基本上我们的代码是这样工作的: