当前位置: 首页 > 知识库问答 >
问题:

Kafka流式处理器消耗更改日志主题和初始化状态存储需要很长时间

秦才
2023-03-14

共有1个答案

罗诚
2023-03-14

出现这种情况有多种原因:

  1. 您的代理性能,即您的KStream应用程序可以从每个代理获取多少数据
  2. 您的KStream性能
  3. 序列化格式(如果使用Avro,数据大小将小得多)

避免昂贵的重新启动的解决方案是拥有一个持久的本地状态存储。例如,您可以将默认状态存储文件夹(/tmp/kafka-streams)映射到某种持久卷

 类似资料:
  • 我有一个Python进程(或者更确切地说,在一个使用者组中并行运行的一组进程),它根据来自某个主题的Kafka消息输入来处理数据。通常每条消息的处理都很快,但有时,取决于消息的内容,可能需要很长时间(几分钟)。在这种情况下,Kafka broker断开客户端与组的连接,并启动重新平衡。我可以将设置为一个非常大的值,但它可能会超过10分钟,这意味着如果客户机死亡,集群在10分钟内无法正确地重新平衡。

  • 一个与主题压缩有关的问题。在压缩主题中,当日志清理器在清理特定键的以前偏移量(3,4,5)时出现延迟(假设5是最新的偏移量),而作为使用者使用这些偏移量时,即使3和4还没有压缩,我会只看到该键的最新偏移量(5)吗?还是使用者将按照该顺序获得(3,4,5)?

  • 我正在使用kafka处理器API做一些自定义计算。由于某些复杂的处理,DSL并不是最佳的选择。流代码如下所示。 我需要清除一些项目从状态存储基于一个事件来在一个单独的主题。我无法找到正确的方法来使用Processor API连接另一个流,或者通过其他方法来侦听另一个主题中的事件,从而能够触发CustomProcessor类中的清理代码。有没有一种方法可以在处理器API中获取另一个主题中的事件?或者

  • 我试着加入两个Kafka的话题。一个是KStream,另一个是Ktable。左联接抱怨处理器的状态存储不存在。我确实查看了kafka、GitHub和其他地方的许多代码示例,其中StateStore不是由KStream客户机代码显式创建的。请告知以下代码中缺少什么。 应用程序流与users表保持连接,以发出app和user一起的记录。应用程序的所有者是用户。 版本:1.1.0

  • 我正在使用Apache Kafka streaming对从Kafka主题中消耗的数据进行聚合。然后,聚合被序列化到另一个主题,它本身被使用,结果存储在一个DB中。我想是很经典的用例吧。 聚合调用的结果是创建一个由Kafka变更日志“主题”备份的KTable。 这实际上是很好的/必要的,因为这避免了当将来的事件带有相同的键时丢失我的聚合状态。 然而,从长远来看,这意味着这个变更日志将永远增长(随着更