当前位置: 首页 > 知识库问答 >
问题:

带statestore的Kafka有状态流处理器:幕后

韶景曜
2023-03-14

我正在尝试理解有状态流处理器

据我所知,在这种类型的流处理器中,它使用状态存储来维护某种状态。

我开始知道,实现State Store方法之一是使用RocksDB。假设以下拓扑(并且只有一个处理器是stateful

A-

假设sp只监听一个Kafka主题,比如带有10个分区的topic-1

我观察到,当应用程序启动时(在不同的物理机器上有2个实例,并且num.stream.threads=5),然后对于状态存储,它会创建目录结构,其内容如下:

0_0,0_1,0_2......0_9(每台机器有5个分区)。

我正在浏览一些在线资料,其中说我们应该创建一个StoreBuilder,并使用addStateStore()将其连接到拓扑结构,而不是在处理器中创建状态存储。

喜欢:

topology.addStateStore(storeBuilder,"processorName")

Ref also: org.apache.kafka.streams.state.Store

我不明白将storeBuilder附加到拓扑与在处理器中实际创建statestore有什么区别。它们之间有什么区别?

第二部分:为statestore创建目录,如:0\u 0、0\u 1等。谁以及如何创建它?其中Kafk:1是为某个目录创建的映射,该目录是?

共有1个答案

林亦
2023-03-14

我不明白将storeBuilder附加到拓扑与在处理器中实际创建statestore有什么区别。它们之间有什么区别?

为了让Kafka Streams为您管理存储(容错、迁移),Kafka Streams需要了解存储。因此,您给Kafka Streams一个StoreBuilder,Kafka Streams为您创建和管理存储。

如果您只是在处理器中创建一个存储,Kafka Streams不知道该存储,并且该存储不会容错。

对于statestore,它创建目录,如:0_0、0_1等。它是谁以及如何创建的?kafka主题(sp正在监听)和为State Store创建的目录数量之间是否存在某种1:1映射?

是的,有一个地图。该存储区以输入主题分区的数量为基础进行共享。每个分区也有一个“任务”,任务目录是namey_z,其中y是子拓扑编号,z是分区编号。对于简单拓扑,只有一个子拓扑指向所有具有相同前缀的目录。

因此,逻辑存储有10个物理碎片。当相应的输入主题分区被分配给不同的实例时,这种分片允许Kafka流进入Mirregate状态。总的来说,您最多可以运行10个实例,每个实例将处理一个分区,并托管一个存储碎片。

 类似资料:
  • 我正在使用kafka处理器API做一些自定义计算。由于某些复杂的处理,DSL并不是最佳的选择。流代码如下所示。 我需要清除一些项目从状态存储基于一个事件来在一个单独的主题。我无法找到正确的方法来使用Processor API连接另一个流,或者通过其他方法来侦听另一个主题中的事件,从而能够触发CustomProcessor类中的清理代码。有没有一种方法可以在处理器API中获取另一个主题中的事件?或者

  • 我是Spark结构化流处理的新手,目前正在处理一个用例,其中结构化流应用程序将从Azure IoT中心-事件中心(例如每20秒)获取事件。 任务是使用这些事件并实时处理。为此,我在下面用Spark Java编写了Spark结构化流媒体程序。 以下是要点 目前我已经应用了10分钟间隔和5分钟滑动窗口的窗口操作。 水印被应用在以10分钟间隔的eventDate参数上。 目前,我没有执行任何其他操作,只

  • 对于我的一个Kafka streams应用程序,我需要同时使用DSL和处理器API的特性。我的流媒体应用程序流是 聚合之后,我需要向接收器发送单个聚合消息。因此我定义拓扑如下 知道这里出了什么问题吗?

  • 我正在Kafka流中的处理器节点上工作。对于一个简单的代码,我编写如下代码只是为了过滤用户ID,这是在kafka流中处理处理器节点的正确方法吗? 但是,下面的代码没有编译,抛出了一个错误:

  • 我有一个kafka流应用程序,它在其中使用stateStore(由RocksDB支持)。 stream thread所做的只是从kafka主题获取数据并将数据放入State Store。(还有其他线程从statestore读取数据并进行业务逻辑处理)。 我观察到它创造了一个新的Kafka主题“变化日志”,因为Statestore。 但我没有明白“变化”Kafka话题有什么用处? 为什么需要它(更改