当前位置: 首页 > 知识库问答 >
问题:

如何在Spring-Cloud-Stream中使用kafka流程拓扑内的交互查询?

鄢朝斑
2023-03-14

是否可以在Spring Cloud中使用@EnableBinding注释的类流或在方法中使用@StreamListener使用交互式查询(InteractiveQueryService)?我尝试在提供的KStreamMusicSampleApplication类和process方法中实例化ReadOnlyKeyValueStore,但它始终为空。

我的@StreamListener方法正在监听一组KTables和KStreams,在进程拓扑(例如过滤)期间,我必须检查来自KStream的密钥是否已经存在于特定的KTable中。

我试图弄清楚如何扫描传入的KTable来检查密钥是否已经存在,但没有运气。然后我遇到了InteractiveQueryService,它的get()方法可以用于检查KTable中的状态存储materializedAs中是否存在键。问题是我无法使用进程拓扑(@enablebinding或@streamlistener)访问它。只能从这些注释(例如RestController)外部访问它。

有没有一种方法可以扫描传入的KTable来检查键或值的存在?如果不能,那么我们可以访问流程拓扑中的InteractiveQueryService吗?

共有1个答案

单于骁
2023-03-14

Spring Cloud Stream中的InteractiveQueryService无法在StreamListener的实际拓扑中使用。正如您提到的,它应该在主拓扑之外使用。但是,使用您描述的用例,您仍然可以使用主流中的状态存储。例如,如果您有一个传入的KStream和一个被物化为状态存储的KTable,那么您可以调用KStream上的Process并以这种方式访问状态存储。下面是一个实现这一点的粗略代码。您需要将其转换为适合您的特定用例,但这里是想法。

ReadOnlyKeyValueStore<Object, String> store;

 input.process(() -> new Processor<Object, Product>() {

                @Override
                public void init(ProcessorContext processorContext) {
                    store = (ReadOnlyKeyValueStore) processorContext.getStateStore("my-store");


                }

                @Override
                public void process(Object key, Object value) {
                    //find the key
                    store.get(key);
                }

                @Override
                public void close() {
                    if (state != null) {
                        state.close();
                    }
                }
            }, "my-store");
 类似资料:
  • 我们有一个kafka streams Spring Boot应用程序(使用spring-kafka),这个应用程序目前从上游主题读取消息,应用一些转换,并将它们写入下游主题,它不做任何聚合或联接或任何高级kafka streams功能。 代码当前类似于

  • 在准备拓扑优化时,我偶然发现了以下几点: 目前,Kafka Streams在启用时会执行两种优化: 1-源KTable将源主题重新用作变更日志主题。 2-如果可能,Kafka流会将多个重新分区主题压缩为单个重新分区主题。 这个问题是关于第一点的。我不完全明白这里发生了什么。只是为了确保我没有在这里做任何假设。有人能解释一下,以前是什么状态吗: 1-KTable使用内部变更日志主题吗?如果是,有人能

  • 在Kafka Streams Spring Boot应用程序中,配置(提供主机和端口信息)和访问交互式查询的“惯用”方式是什么? 访问KafkaStreams实例以访问状态存储的正确方法是什么? 我知道spring cloud stream中的InteractiveQueriesService,但我并不是只在spring Boot中使用spring kafka库。 谢谢你

  • 我正在学习QuarkusKafka流教程,不太明白如何启动管道。 在本教程中,用于构建。构建拓扑的方法用<code>注释为products<code>。在本备忘单中,描述了这足以运行Kafka Streams管道。在本教程中,还公开了httpendpoint。这在我目前正在实现的服务中不是必需的。此外,在本示例中,从未显式调用提供程序方法。当我在没有endpoint的情况下启动应用程序时,管道不会

  • 我试图实现Kafka流,这将把单一主题流作为全局数据库与互动查询可能。所以我想拥有: > 记录的全局存储区(GlobalKTable、KeyValueStore)