是否可以在Spring Cloud中使用@EnableBinding注释的类流或在方法中使用@StreamListener使用交互式查询(InteractiveQueryService)?我尝试在提供的KStreamMusicSampleApplication类和process方法中实例化ReadOnlyKeyValueStore,但它始终为空。
我的@StreamListener方法正在监听一组KTables和KStreams,在进程拓扑(例如过滤)期间,我必须检查来自KStream的密钥是否已经存在于特定的KTable中。
我试图弄清楚如何扫描传入的KTable来检查密钥是否已经存在,但没有运气。然后我遇到了InteractiveQueryService,它的get()方法可以用于检查KTable中的状态存储materializedAs中是否存在键。问题是我无法使用进程拓扑(@enablebinding或@streamlistener)访问它。只能从这些注释(例如RestController)外部访问它。
有没有一种方法可以扫描传入的KTable来检查键或值的存在?如果不能,那么我们可以访问流程拓扑中的InteractiveQueryService吗?
Spring Cloud Stream中的InteractiveQueryService
无法在StreamListener
的实际拓扑中使用。正如您提到的,它应该在主拓扑之外使用。但是,使用您描述的用例,您仍然可以使用主流中的状态存储。例如,如果您有一个传入的KStream
和一个被物化为状态存储的KTable
,那么您可以调用KStream
上的Process
并以这种方式访问状态存储。下面是一个实现这一点的粗略代码。您需要将其转换为适合您的特定用例,但这里是想法。
ReadOnlyKeyValueStore<Object, String> store;
input.process(() -> new Processor<Object, Product>() {
@Override
public void init(ProcessorContext processorContext) {
store = (ReadOnlyKeyValueStore) processorContext.getStateStore("my-store");
}
@Override
public void process(Object key, Object value) {
//find the key
store.get(key);
}
@Override
public void close() {
if (state != null) {
state.close();
}
}
}, "my-store");
我们有一个kafka streams Spring Boot应用程序(使用spring-kafka),这个应用程序目前从上游主题读取消息,应用一些转换,并将它们写入下游主题,它不做任何聚合或联接或任何高级kafka streams功能。 代码当前类似于
在准备拓扑优化时,我偶然发现了以下几点: 目前,Kafka Streams在启用时会执行两种优化: 1-源KTable将源主题重新用作变更日志主题。 2-如果可能,Kafka流会将多个重新分区主题压缩为单个重新分区主题。 这个问题是关于第一点的。我不完全明白这里发生了什么。只是为了确保我没有在这里做任何假设。有人能解释一下,以前是什么状态吗: 1-KTable使用内部变更日志主题吗?如果是,有人能
在Kafka Streams Spring Boot应用程序中,配置(提供主机和端口信息)和访问交互式查询的“惯用”方式是什么? 访问KafkaStreams实例以访问状态存储的正确方法是什么? 我知道spring cloud stream中的InteractiveQueriesService,但我并不是只在spring Boot中使用spring kafka库。 谢谢你
我正在学习QuarkusKafka流教程,不太明白如何启动管道。 在本教程中,用于构建。构建拓扑的方法用<code>注释为products<code>。在本备忘单中,描述了这足以运行Kafka Streams管道。在本教程中,还公开了httpendpoint。这在我目前正在实现的服务中不是必需的。此外,在本示例中,从未显式调用提供程序方法。当我在没有endpoint的情况下启动应用程序时,管道不会
我试图实现Kafka流,这将把单一主题流作为全局数据库与互动查询可能。所以我想拥有: > 记录的全局存储区(GlobalKTable、KeyValueStore)