当前位置: 首页 > 知识库问答 >
问题:

如何在Spring Cloud Stream Kafka Streams应用程序中使用StateStoreBuilder添加StateStore

柳修平
2023-03-14

原生 Kafka API 允许我们使用 StreamsBuilder 创建和添加状态存储

    final StreamsBuilder builder = new StreamsBuilder();
    ...
    final StoreBuilder<WindowStore<String, Long>> dedupStoreBuilder = Stores.windowStoreBuilder(
            Stores.persistentWindowStore(storeName,
                                         retentionPeriod,
                                         windowSize,
                                         false
            ),
            Serdes.String(),
            Serdes.Long());

    builder.addStateStore(dedupStoreBuilder);

我想使用Spring Cloud Streams做同样的事情,但无法弄清楚访问StreamsBuilder以添加商店的方法。

我已尝试检索文档中所述的StreamsBuilderFactoryBean,希望可以从中获取StreamsBuilder对象,但该bean似乎不可用:

@EnableBinding(KafkaStreamsProcessor::class)
class FraudKafkaStreamsConfiguration(private val context: ApplicationContext) {

    @StreamListener
    @SendTo("output")
    fun process(@Input("input") input: KStream<String, TransferEmitted>): KStream<String, TransferEmitted> {

        val streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean::class.java)
        ...
        return xxx

    }

}

org.springframework.beans.factory.异常:没有名为流构建器进程的bean可用

无论如何,我什至不确定这是正确的方法。那么,我们如何以编程方式创建 StateStore

共有1个答案

吉玉石
2023-03-14

由于我的 Scs 版本(Fishtown SR3),我没有看到记录的过程,但好消息是,自日耳曼敦以来,可以以声明方式创建状态存储

const val DEDUP_STORE = "dedup-store"

@EnableBinding(KafkaStreamsProcessor::class)
class FraudKafkaStreamsConfiguration {

    @KafkaStreamsStateStore(name = DEDUP_STORE, type = KafkaStreamsStateStoreProperties.StoreType.KEYVALUE)
    @StreamListener
    @SendTo("output")
    fun process(@Input("input") input: KStream<String, TransferEmitted>): KStream<String, TransferEmitted> {
        return input.transform(TransformerSupplier { DeduplicationTransformer() }, DEDUP_STORE)

    }

}
 类似资料:
  • 我试图在Firebase中创建一个动态链接,当我选择android应用程序时,它显示一个错误,说“将SHA-1添加到这个android应用程序”,我已经添加了一个凭据,但我不确定我到底如何“将SHA-1添加到应用程序” 这是怎么做到的?

  • 我有一个java应用程序,需要部署在weblogic服务器中。我目前正在为该应用程序制作ear文件。我的ear文件中有一个ejb jar。我想将log4j2 jar添加到此应用程序中。所以我的文件夹结构是 目前,我已经将JAR放在APP-INF文件夹/lib和META-INF/application中。xml我把JAR放在了模块中。这是我的申请表。xml 但它不接受log4j罐子。有什么解决方案吗

  • 我正在使用一个清单中有以下内容的库。 但是,作为我用来包含库的应用程序,设置相反

  • 我安装了Tomcat6,并希望添加web应用程序来启动URL。我学习了如何在Tomcat7.0中设置web应用程序的上下文路径 我在/conf/catalina/localhost/path中创建了root.xml。在root.xml文件中有以下内容

  • 问题内容: 我想在AngularJS应用程序中添加一些实用程序功能。例如: 将它们添加为服务的最佳方法是吗?从我所阅读的内容中我可以做到,但是然后我想在HTML页面中使用它们,因此如果它们在服务中仍然可行吗?例如,我可以使用以下内容: 有人可以举例说明如何添加这些内容。我应该创建服务还是有其他实现方法。最重要的是,我希望将这些实用程序功能存储在文件中,而不要与主要设置的另一部分结合使用。 我了解有