当前位置: 首页 > 知识库问答 >
问题:

如何在两个Kafka流之间使用持久化状态存储

班经亘
2023-03-14

我在试图通过Kafka流实现以下目标时遇到了一些困难:

  • 在应用程序启动时,(压缩的)主题α被加载到键值StateStore中
  • Kafka流从另一个主题中消费,使用上面的映射(get),并最终在主题alpha中生成一个新记录
  • 结果是,即使拖缆重新启动,内存中的映射也应与底层主题对齐

我的方法如下:

val builder = new StreamsBuilderS()

val store = Stores.keyValueStoreBuilder(
   Stores.persistentKeyValueStore("store"), kSerde, vSerde)
)

builder.addStateStore(store)

val loaderStreamer = new LoaderStreamer(store).startStream()

[...] // I wait a few seconds until the loading is complete and the stream os running

val map = instance.store("store", QueryableStoreTypes.keyValueStore[K, V]()) // !!!!!!!! ERROR HERE !!!!!!!!

builder
  .stream("another-topic")(Consumed.`with`(kSerde, vSerde))
  .doMyAggregationsAndgetFromTheMapAbove
  .transform(() => new StoreTransformer[K, V]("store"), "store")
  .to("alpha")(Produced.`with`(kSerde, vSerde))

装载机Treamer(store):

[...]
val builders = new StreamsBuilderS()

builder.addStateStore(store)

builder
  .table("alpha")(Consumed.`with`(kSerde, vSerde))

builder.build
[...]

存储变压器

[...]
override def init(context: ProcessorContext): Unit = {
  this.context = context
  this.store = 
    context.getStateStore(store).asInstanceOf[KeyValueStore[K, V]]
}

override def transform(key: K, value: V): (K, V) = {
  store.put(key, value)
  (key, value)
}
[...]

...但是我得到的是:

Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException:
The state store, store, may have migrated to another instance.

试图获取存储处理程序时。

你知道如何做到这一点吗?

非常感谢。

共有1个答案

翟俊
2023-03-14

您不能在两个Kafka Streams应用程序之间共享状态存储。

根据文件:https://docs.confluent.io/current/streams/faq.html#interactive-查询上述异常可能有两个原因:

>

  • 本地KafkaStreams实例尚未准备好,因此还无法查询其本地状态存储。

    本地KafkaStreams实例已准备就绪,但特定的状态存储刚刚迁移到幕后的另一个实例。

    最简单的处理方法是等待状态存储可查询:

    public static <T> T waitUntilStoreIsQueryable(final String storeName,
                                                  final QueryableStoreType<T> queryableStoreType,
                                                  final KafkaStreams streams) throws InterruptedException {
      while (true) {
        try {
          return streams.store(storeName, queryableStoreType);
        } catch (InvalidStateStoreException ignored) {
          // store not yet ready for querying
          Thread.sleep(100);
        }
      }
    }
    

    整个示例可以在confluent github上找到。

  •  类似资料:
    • 我试着把我的头缠绕在Kafka的溪流和一些根本的问题,我似乎无法解决,我自己。我理解和Kafka状态存储的概念,但我很难决定如何实现它。我还在使用Spring Cloud Streams,这在此基础上增加了另一个层次的复杂性。 我的用例: 一些有状态规则如下所示: 我当前的实现将所有这些信息存储在内存中,以便能够执行分析。由于显而易见的原因,它不容易扩展。所以我想我会坚持到Kafka的州立商店。

    • 我已经阅读了有状态流处理概述,如果理解正确的话,RocksDB被用作键值存储的默认实现的主要原因之一是这样一个事实,即与内存中的集合不同,它可以处理大于可用内存的数据,因为它可以刷新到磁盘。这两种类型的存储都可以在应用程序重新启动时幸存下来,因为数据是作为Kafka主题备份的。 但还有其他不同吗?例如,我注意到我的持久状态存储为每个主题分区创建了一些。log文件,但它们都是空的。 简而言之,我想知

    • 我试着加入两个Kafka的话题。一个是KStream,另一个是Ktable。左联接抱怨处理器的状态存储不存在。我确实查看了kafka、GitHub和其他地方的许多代码示例,其中StateStore不是由KStream客户机代码显式创建的。请告知以下代码中缺少什么。 应用程序流与users表保持连接,以发出app和user一起的记录。应用程序的所有者是用户。 版本:1.1.0

    • 问题内容: 摘要 我修改了基本的令牌发行Corda Bootcamp应用程序来演示此问题。我想在TokenStates与TokenChildren之间建立一对多关系的双向映射。 持久保存分层数据的最佳实践是什么? 是否可以在状态模式中使用JPA注释来实现此目的? 我有一个状态- ,其中包含一些任意数据以及带有类的对象。该列表的目的是促进H2中的记录之间的一对多关系。该州的关联架构具有相应的JPA批

    • 什么是 Volume Volume 就是在一个或者多个容器里有特殊用途的目录。它绕过了容器内部的文件系统为持久化数据、共享数据提供了下面这些有用的特性: 容器可以通过把数据写在 Volume 上来实现数据持久化 Volume 可以在不同的容器之间共享和重用数据 容器数据的备份、恢复和迁移都可以通过 Volume 实现 通过 Volume 实现多容器共享数据,从而实现应用的横向扩展 在 DaoClo

    • 我有一个像下面这样的用例。对于每个传入的事件,我希望查看某个字段,看看它的状态是否从a变为B,如果是,则将其发送到输出主题。流程是这样的:一个带有键“xyz”的事件以状态A进入,一段时间后另一个带有键“xyz”的事件以状态B进入。 有没有更好的方法使用DSL来编写这个逻辑? 上面代码中关于聚合创建的状态存储的两个问题。 null 提前道谢!