我在试图通过Kafka流实现以下目标时遇到了一些困难:
我的方法如下:
val builder = new StreamsBuilderS()
val store = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("store"), kSerde, vSerde)
)
builder.addStateStore(store)
val loaderStreamer = new LoaderStreamer(store).startStream()
[...] // I wait a few seconds until the loading is complete and the stream os running
val map = instance.store("store", QueryableStoreTypes.keyValueStore[K, V]()) // !!!!!!!! ERROR HERE !!!!!!!!
builder
.stream("another-topic")(Consumed.`with`(kSerde, vSerde))
.doMyAggregationsAndgetFromTheMapAbove
.transform(() => new StoreTransformer[K, V]("store"), "store")
.to("alpha")(Produced.`with`(kSerde, vSerde))
装载机Treamer(store):
[...]
val builders = new StreamsBuilderS()
builder.addStateStore(store)
builder
.table("alpha")(Consumed.`with`(kSerde, vSerde))
builder.build
[...]
存储变压器
:
[...]
override def init(context: ProcessorContext): Unit = {
this.context = context
this.store =
context.getStateStore(store).asInstanceOf[KeyValueStore[K, V]]
}
override def transform(key: K, value: V): (K, V) = {
store.put(key, value)
(key, value)
}
[...]
...但是我得到的是:
Caused by: org.apache.kafka.streams.errors.InvalidStateStoreException:
The state store, store, may have migrated to another instance.
试图获取存储处理程序时。
你知道如何做到这一点吗?
非常感谢。
您不能在两个Kafka Streams应用程序之间共享状态存储。
根据文件:https://docs.confluent.io/current/streams/faq.html#interactive-查询上述异常可能有两个原因:
>
本地KafkaStreams实例尚未准备好,因此还无法查询其本地状态存储。
本地KafkaStreams实例已准备就绪,但特定的状态存储刚刚迁移到幕后的另一个实例。
最简单的处理方法是等待状态存储可查询:
public static <T> T waitUntilStoreIsQueryable(final String storeName,
final QueryableStoreType<T> queryableStoreType,
final KafkaStreams streams) throws InterruptedException {
while (true) {
try {
return streams.store(storeName, queryableStoreType);
} catch (InvalidStateStoreException ignored) {
// store not yet ready for querying
Thread.sleep(100);
}
}
}
整个示例可以在confluent github上找到。
我试着把我的头缠绕在Kafka的溪流和一些根本的问题,我似乎无法解决,我自己。我理解和Kafka状态存储的概念,但我很难决定如何实现它。我还在使用Spring Cloud Streams,这在此基础上增加了另一个层次的复杂性。 我的用例: 一些有状态规则如下所示: 我当前的实现将所有这些信息存储在内存中,以便能够执行分析。由于显而易见的原因,它不容易扩展。所以我想我会坚持到Kafka的州立商店。
我已经阅读了有状态流处理概述,如果理解正确的话,RocksDB被用作键值存储的默认实现的主要原因之一是这样一个事实,即与内存中的集合不同,它可以处理大于可用内存的数据,因为它可以刷新到磁盘。这两种类型的存储都可以在应用程序重新启动时幸存下来,因为数据是作为Kafka主题备份的。 但还有其他不同吗?例如,我注意到我的持久状态存储为每个主题分区创建了一些。log文件,但它们都是空的。 简而言之,我想知
我试着加入两个Kafka的话题。一个是KStream,另一个是Ktable。左联接抱怨处理器的状态存储不存在。我确实查看了kafka、GitHub和其他地方的许多代码示例,其中StateStore不是由KStream客户机代码显式创建的。请告知以下代码中缺少什么。 应用程序流与users表保持连接,以发出app和user一起的记录。应用程序的所有者是用户。 版本:1.1.0
问题内容: 摘要 我修改了基本的令牌发行Corda Bootcamp应用程序来演示此问题。我想在TokenStates与TokenChildren之间建立一对多关系的双向映射。 持久保存分层数据的最佳实践是什么? 是否可以在状态模式中使用JPA注释来实现此目的? 我有一个状态- ,其中包含一些任意数据以及带有类的对象。该列表的目的是促进H2中的记录之间的一对多关系。该州的关联架构具有相应的JPA批
什么是 Volume Volume 就是在一个或者多个容器里有特殊用途的目录。它绕过了容器内部的文件系统为持久化数据、共享数据提供了下面这些有用的特性: 容器可以通过把数据写在 Volume 上来实现数据持久化 Volume 可以在不同的容器之间共享和重用数据 容器数据的备份、恢复和迁移都可以通过 Volume 实现 通过 Volume 实现多容器共享数据,从而实现应用的横向扩展 在 DaoClo
我有一个像下面这样的用例。对于每个传入的事件,我希望查看某个字段,看看它的状态是否从a变为B,如果是,则将其发送到输出主题。流程是这样的:一个带有键“xyz”的事件以状态A进入,一段时间后另一个带有键“xyz”的事件以状态B进入。 有没有更好的方法使用DSL来编写这个逻辑? 上面代码中关于聚合创建的状态存储的两个问题。 null 提前道谢!