当前位置: 首页 > 知识库问答 >
问题:

kafka流实例上状态存储和分区的混合

尉迟京
2023-03-14

我用状态存储构建了一个kafka流媒体应用程序。现在我正在尝试扩展这个应用程序。当在三个不同的服务器上运行应用程序时,Kafka会随机拆分分区和状态存储。

例如:

Instance1获取:分区-0,分区-1

Instance2获取:partition-2,stateStore-repartition-0

Instance3获取:stateStore-重新分区-1,stateStore-重新分区-2

我想为每个实例分配一个stateStore和一个分区。我做错了什么?

我的KafkaStreams配置

final Properties properties = new Properties();
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "my-app");
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);

try {
     properties.setProperty(StreamsConfig.STATE_DIR_CONFIG,
           Files.createTempDirectory(stateStoreName).toAbsolutePath().toString());
} catch (final IOException e) {
         // use the default one
}

我的流是:

stream.groupByKey()
       .windowedBy(TimeWindows.of(timeWindowDuration))
       .<TradeStats>aggregate(
           () -> new TradeStats(),
           (k, v, tradestats) -> tradestats.add(v),
           Materialized.<String, TradeStats, WindowStore<Bytes, byte[]>>as(stateStoreName)
        .withValueSerde(new TradeStatsSerde()))
        .toStream();

共有1个答案

沃念
2023-03-14

据我目前所知(正如我对你的问题的评论中提到的,请分享你的状态存储定义),一切都很好,我怀疑你方对这个问题有一点误解

我做错了什么?

基本上,什么都没有。:-)

对于问题的分区部分:它们根据配置的html" target="_blank">赋值器分布在消费者周围(请参阅https://kafka.apache.org/26/javadoc/index.html?org/apache/kafka/clients/consumer/CooperativeStickyAssignor.html或相邻接口)。

对于您问题的状态存储部分:这里可能存在一些关于(内存中)状态存储如何工作的误解:它们通常由一个Kafka主题支持,该主题不位于应用程序主机上,而是位于Kafka集群本身。更准确地说,整个状态存储的一部分存在于每个应用程序主机上的(RocksDB)内存键/值存储中,正如您在问题中的状态存储分配中所示。然而,这些只是Kafka集群中维护的完整状态存储的一部分或一部分。

简而言之:一切都很好,让Kafka完成任务,只有在你有真正特殊的用例或很好的理由的情况下才干预Kafka还确保在应用程序主机中断的情况下正确冗余和重新平衡所有分区。

如果您仍然想自己分配一些东西,用例将有兴趣获得进一步的帮助。

 类似资料:
  • 我正在开发使用Spring Cloud Stream构建的Kafka Streams应用程序。在这个应用程序中,我需要: 使用可在以后检索的连续消息流。 保留与某些条件匹配的邮件 ID 列表。 在单独的线程中,运行一个计划程序,该计划程序定期读出消息 ID,检索与这些 ID 匹配的相应消息,并对这些消息执行操作。 从列表中删除已处理的消息 ID,以便不重复工作。 我已考虑如下实施: < li >将

  • 定义了一个自定义存储,用于自定义变压器(参考下面)。 https://github.com/apache/kafka/blob/trunk/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java 我得到以下例外。不确定,为什么内部主题“test_01

  • 我有5台不同的机器,每个机器都有使用kafka-streams应用程序的缩放的5个Spring Boot实例。我正在使用50个分区压缩主题与不同的2-3个主题,我的每个实例有10个并发。我正在使用docker swarm和docker Volume。使用这些主题KTable或KStream对我的kafka streams应用程序执行一些flatMap、map和join操作。 如果一切正常,在我的应

  • 我有一个像下面这样的用例。对于每个传入的事件,我希望查看某个字段,看看它的状态是否从a变为B,如果是,则将其发送到输出主题。流程是这样的:一个带有键“xyz”的事件以状态A进入,一段时间后另一个带有键“xyz”的事件以状态B进入。 有没有更好的方法使用DSL来编写这个逻辑? 上面代码中关于聚合创建的状态存储的两个问题。 null 提前道谢!

  • 在kafka中的全局状态存储是否总是一个分区,或者我们可以为全局状态存储更改日志主题设置多个分区? 我找不到任何关于这方面的明确文件。