当前位置: 首页 > 知识库问答 >
问题:

Kafka流处理器-状态存储和输入主题分区

鲜于俊侠
2023-03-14
    null

谢谢你的澄清。

共有1个答案

东方和煦
2023-03-14

您必须区分输入分区和存储碎片/Changelog主题分区以获得完整的图片。另外,这取决于您是使用DSL还是处理器API,因为DSL会自动重新分区,而处理器API不会。因为DSL向下编译到处理器API,所以我将从这个开始。

如果您有一个带有4个分区的主题,并且您创建了一个使用该主题的有状态处理器,那么您将得到4个任务,每个任务运行一个维护存储区一个碎片的处理器实例。注意,整个状态被分成4个碎片,每个碎片基本上与其他碎片隔离开来。

从处理器API运行时的角度来看,输入主题分区和状态存储碎片(包括它们对应的changelog主题分区)是并行的一个单元。因此,使用4个分区创建存储区的changelog主题,changelog-topic-partition-X映射到input-topic-partition-x。注意,Kafka Streams在写入changelog主题时不使用基于哈希的分区,而是显式地提供分区号,以确保处理input-topic-partition-X的“处理器实例x”只从changelog-topic-partition-x读/写到changelog-topic-partition-x。

如果输入主题没有按键分区,则具有相同键的消息将由不同的任务处理。根据程序的不同,这可能是可以的(例如过滤),也可能不是(例如每个键的计数)。

类似于state:您可以将任何键放入状态存储,但该键对相应的碎片是“本地的”。其他任务,永远不会看到这个关键。因此,如果在存储区中对不同任务使用相同的键,它们将完全相互独立(就像它们是两个键一样)。

使用处理器API,根据所需的操作符语义,您有责任正确地划分输入数据并正确地使用存储区。

 类似资料:
  • 我正在使用kafka处理器API做一些自定义计算。由于某些复杂的处理,DSL并不是最佳的选择。流代码如下所示。 我需要清除一些项目从状态存储基于一个事件来在一个单独的主题。我无法找到正确的方法来使用Processor API连接另一个流,或者通过其他方法来侦听另一个主题中的事件,从而能够触发CustomProcessor类中的清理代码。有没有一种方法可以在处理器API中获取另一个主题中的事件?或者

  • 我正在开发使用Spring Cloud Stream构建的Kafka Streams应用程序。在这个应用程序中,我需要: 使用可在以后检索的连续消息流。 保留与某些条件匹配的邮件 ID 列表。 在单独的线程中,运行一个计划程序,该计划程序定期读出消息 ID,检索与这些 ID 匹配的相应消息,并对这些消息执行操作。 从列表中删除已处理的消息 ID,以便不重复工作。 我已考虑如下实施: < li >将

  • 我们目前正在实现一个过程(使用Kafka处理器API),我们需要将来自一个主题的两个相关事件(消息)的信息合并,然后转发这些合并的信息。事件源于物联网设备,由于我们希望保持其有序,因此源主题使用设备标识符作为键。事件还包含相关ID: 钥匙 留言 我们的第一种方法是创建一个具有连接状态存储的处理器,该存储存储每条传入的消息,使用相关ID作为键。这使我们能够查询存储以获取传入消息的相关ID,如果存储中

  • 我用状态存储构建了一个kafka流媒体应用程序。现在我正在尝试扩展这个应用程序。当在三个不同的服务器上运行应用程序时,Kafka会随机拆分分区和状态存储。 例如: Instance1获取:分区-0,分区-1 Instance2获取:partition-2,stateStore-repartition-0 Instance3获取:stateStore-重新分区-1,stateStore-重新分区-2

  • Java:OpenJdk 11Kafka:2.2.0Kafka流库:2.3.0 我试图在docker容器中部署我的Kafka streams应用程序,但在尝试创建带有TopicAuthorizationException的内部状态存储时失败。它在本地运行良好。本地和服务器上的主要区别在于,它连接到部署了Kafka的服务器,并使用常见的Kerberos身份验证进行身份验证。我无法理解身份验证与本地商