当前位置: 首页 > 知识库问答 >
问题:

Kafka Streams应用程序在一段时间没有读取消息后停止工作

濮阳翔
2023-03-14

我注意到我的Kafka Streams应用程序在一段时间没有读取来自Kafka主题的新消息时停止工作。这是我第三次看到这种情况发生。

自5天以来,没有向主题发送任何消息。我的Kafka Streams应用程序也托管了一个spark java Web服务器,它仍然具有响应能力。然而,Kafka Streams不再阅读我向Kafka主题发出的消息。当我重新启动应用程序时,所有消息都将从代理获取。

如何使我的Kafka Streams应用程序对这种场景更持久?它觉得Kafka Streams有一个内部“超时”,之后当没有收到消息时,它会关闭与Kafka代理的连接。我在文档中找不到这样的设置。

我使用Kafka 1.1.0和Kafka Streams 1.0.0

共有1个答案

万承志
2023-03-14

Kafka Streams没有内部超时来控制何时永久关闭与Kafka代理的连接;另一方面,Kafka代理确实有一些超时值来关闭来自客户端的空闲连接。但是一旦Streams有一些处理过的结果数据准备好发送给代理,它就会继续尝试重新连接。所以我怀疑你观察到的问题来自其他一些原因。

您能否分享您的应用程序拓扑草图和您使用的配置属性,以便我更好地了解您的问题?

 类似资料:
  • 更新06/04这里是消费者出厂设置。它是Spring-Kafka-1.3.1。Kafka经纪人合流版 注意:容器工厂已将自动启动设置为false。这是在加载大文件时手动启动/停止使用者。 运行大约1小时后(时间不同),使用者停止使用来自其主题的消息,即使该主题有许多可用消息。Consumer方法中有一个log语句,用于停止在日志中打印。 我们如何设计Spring-Kafka消费者,使其在停止消费的

  • 在我的Spring启动应用程序中,我有kafka消费者类,每当主题中有可用消息时,它会频繁读取消息。我想限制消费者每隔2小时消费一次消息。就像阅读完一条消息后,消费者将暂停2小时,然后再消费另一条消息。这是我的消费者配置方法:- 然后我创建了这个容器方法,在其中我设置了kafka配置的其余部分 使用此代码分区每2小时重新平衡一次,但它根本没有读取消息。我的kafka消费者方法:-

  • 我有一个KafkaStream应用程序,以前工作得很好。现在无论我用新的应用程序id重新启动它多少次,它都不会开始消耗主题,我收到了这个日志: 当我将“日志级别”设置为“调试”时,应用程序会生成以下日志: 没有分配任务,如日志所示: 这是日志中显示领导和成员信息的部分: 我有很多应用程序正在运行,他们现在都经历了这种情况。我试着制作新的连接器,但仍然没有成功。但我可以通过kafka console

  • 我正在运行kafka2.11-0.9.0.0和一个基于Java的生产者/消费者。与消息~70 KB一切工作良好。但是,在生产者将一个更大的70 MB消息排入队列之后,kafka似乎停止将消息传递给消费者。即。不仅大的消息没有传递,后续的小消息也没有传递。我知道制作人成功了,因为我使用了kafka回调进行确认,我可以在kafka消息日志中看到消息。 kafka配置自定义更改: 使用者配置:

  • 问题内容: 这可能是一个重复的问题,但我没有找到想要的东西。我在UI活动中调用AsyncTask, 在doInBackground中调用需要时间的方法。如果一段时间后没有返回数据,我想中断该线程。以下是我尝试执行此操作的代码。 但这并不能在30秒后停止任务,事实上,这花费了更多时间。我也尝试过,但这也不起作用。 谁能告诉我该怎么做或如何在doInBackground中使用isCancelled()