当前位置: 首页 > 知识库问答 >
问题:

如何检测处于僵尸状态的Kafka流应用程序

龚彬
2023-03-14

我们的一个Kafka Streams应用程序的StreamThread使用者在生成以下日志消息后进入了僵尸状态:

[Consumer Clientid=Notification-Processor-DB9AA8A3-6C3B-453B-B8C8-106BF2FA257D-StreamThread-1-Consumer,GroupID=Notification-Processor]成员notification-processor-db9aa8a3-6c3b-453b-b8c8-106bf2fa257d-StreamThread-1-consumer-b2b9eac3-c374-43e2-bbc3-d9ee514a3c16由于Consumer轮询超时将离开组请求发送到协调器****:9092(ID:2147483646机架:空)已过期。这意味着对poll()的后续调用之间的时间长于配置的max.poll.interval.ms,这通常意味着poll循环花费了太多时间处理消息。您可以通过增加max.poll.interval.ms或通过减少poll()中返回的批的最大大小和max.poll.records来解决这一问题。

StreamThread的Kafka Consumer似乎已经离开了Consumer组,但Kafka Streams应用程序仍然处于运行状态,同时没有消耗任何新记录。

我想检测一个Kafka Streams应用程序已经进入了这样一个僵尸状态,所以它可以被关闭并替换为一个新的实例。通常情况下,我们通过Kubernetes健康检查来验证Kafka Streams应用程序是否处于运行或重新分区状态,但这在本例中不起作用。

因此,我有两个问题:

  1. 在没有活跃消费者的情况下,Kafka Streams应用程序仍然处于运行状态,这是不是可以预料的呢?如果是:为什么?
  2. 我们如何(以编程方式/通过度量)检测Kafka Streams应用程序是否已进入无活动使用者的僵尸状态?

共有1个答案

匡旭东
2023-03-14

在没有活跃消费者的情况下,Kafka Streams应用程序仍然处于运行状态,这是意料之中的吗?如果是:为什么?

这取决于版本。在旧版本(2.1.x及更旧版本)中,Kafka流确实会保持在运行状态,即使所有线程都死了。通过https://issues.apache.org/jira/browse/kafka-7657在V2.2.0中修复了这个问题。

我们如何(以编程方式/通过度量)检测到一个Kafka Streams应用程序已经进入了一个没有活跃消费者的僵尸状态?

即使在旧版本中,也可以在KafKastReams客户端上注册未捕获的异常处理程序。每次streamthreads死亡时都会调用此处理程序。

顺便提一下:在即将发布的2.6.0版本中,添加了一个新的度量alive-stream-threads来跟踪正在运行的线程数:https://issues.apache.org/jira/browse/kafka-9753

 类似资料:
  • 僵尸进程 当一个进程完成它的工作终止之后,它的父进程需要调用wait()或者waitpid()系统调用取得子进程的终止状态。 一个进程使用fork创建子进程,如果子进程退出,而父进程并没有调用wait或waitpid获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中。这种进程称之为僵尸进程。 理解了孤儿进程和僵尸进程,我们临时加了守护进程这一小节,守护进程就是后台进程吗?没那么简单。

  • 问题内容: 我在前台启动了我的程序(守护程序),然后用杀死了它,但剩下一个僵尸,无法用杀死它。如何杀死僵尸进程? 如果僵尸是一个死进程(已被杀死),我如何将其从输出中删除? 问题答案: 僵尸已经死了,所以您无法杀死它。要清理僵尸,必须等待其父级等待,因此杀死父级应该可以消除僵尸。(父对象死后,僵尸将被pid 1继承,而pid 1将等待该僵尸并清除其在进程表中的条目。)如果守护程序正在生成成为僵尸的

  • 最近重构公司项目的时候发现有部分代码貌似从没有被调用或者执行,但并没十分把握, 所以需要一个能够分析代码调用情况的工具。在 StackOverflow 上得到的回答是 使用 pylint 这样的静态代码分析工具,或者代码覆盖率工具 coverage.py, 由于动态语言不在运行时难以确定变量类型,因此静态分析工具的准确度并不可靠, Django 的应用注册方式就够它头疼的了,因此实际上可选方案只剩

  • 僵尸增量是一款放置游戏,你需要控制一群僵尸破坏小镇...

  • 问题内容: 我是模块的新手。我只是尝试创建以下内容:我有一个工作是从RabbitMQ获取消息并将其传递到内部队列()。然后,我想做的是:在收到新消息时生成一个进程。它可以工作,但是在完成工作后,它留下了一个僵尸进程,不会被其父进程终止。这是我的代码: 主要过程: 这是我的工人: 因此,在处理完所有消息之后,我可以看到带有命令的进程。但是我真的希望它们一旦完成就可以终止。谢谢。 问题答案: 有两件事

  • 我有两份工作。我的主要第一个jenkins项目使用“trigger/call builds on other project”插件触发另一个第二个项目。我的第二个项目是一种服务器,我首先使用触发器和我的主要第一个项目流程启动它。现在我想在我的第一个项目构建完成后停止我的第二个僵尸项目。 我找到了一些参考资料,如下所示:- 如何停止不可阻挡的僵尸工作Jenkins不重启服务器? 但我想停止我的僵尸工