当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud 3.1关于如何在Spring boot应用程序中使用KTable的文档

陈夜洛
2023-03-14

我很难找到任何可以使用Spring Cloud Streams的文档,这些文档将Kafka主题放入KTable中。已在此处查找文档,例如https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.RC1/reference/html/spring-cloud-stream-binder-kafka.html#_materializing_ktable_as_a_state_store在Spring boot中使用注释来实现这一点,没有什么是非常具体的。我希望我可以在我的应用程序中使用KStream创建一个简单的KTable。我有这样的属性:spring。云流动绑定。0中的进程。目的地:我的主题

然后在我的配置中,我希望我能做这样的事情

@Bean
public Consumer<KStream<String, String>> process() {
    return input -> input.toTable(Materialized.as("my-store"))
}

请告知我遗漏了什么?


共有1个答案

梁丘逸仙
2023-03-14

如果您只想将来自Kafka主题的数据作为KTable使用,那么您可以按以下方式执行此操作。

@Bean
public Consumer<KTable<String, String>> process() {
    return input -> {
        
    };
}

如果要将表具体化为命名存储,则可以将其添加到配置中。

spring.cloud.stream.kafka.streams.bindings.process_in_0.consumer.materializedAs: my-store

您还可以执行问题中的操作,即将其作为KStream接收,然后转换为KTable。但是,如果这就是您需要做的全部,您可能更愿意首先将其作为KTable接收,就像我在这里建议的那样。

 类似资料:
  • 我有Kafka Streams java应用程序启动并运行。我试图使用KSQL创建简单的查询,并使用Kafka流来实现复杂的解决方案。我希望将KSQL和Kafka流作为Java应用程序运行。 我打算通过https://github.com/confluentinc/ksql/blob/master/ksqldb-examples/src/main/java/io/confluent/ksql/em

  • 我是kubernetes的新手,需要在openshift平台上使用k8s confimap将springboot应用程序的属性文件外部化。我已将属性文件保存在git repo中,作为“greeter.message=Spring Bootmyapplication.properties已在库伯内特斯上挂载为卷!”并使用“oc create confimap myconfig--from-file=

  • 我有一个Kafka流应用程序,有两个数据源:事件和用户。 我有4个主题:事件,用户,用户2,用户事件 Users2与Users相同,用于演示GlobalKTable。 Events主题使用时间戳模式,因此当到达时间戳字段日期时,KStream将接收事件记录。 此时,我想为用户KTable中的每个用户ID以及新的事件ID创建一个用户事件记录;但我不知道如何遍历GlobalKTable或KTable来

  • 要获取请求URL,可以在堆栈溢出中找到以下方法。 第一种方法: 第二种方法: 第三种方法: 我不知道在spring boot应用程序中使用哪一个来获取请求URL。 如果我使用第三种方法,那么我是否需要在配置类中创建RequestContextListener的bean,如下所示?

  • 我正在编写一个带有ExecutorService的单例类的SDK。它看起来像这样: 此SDK类用于在整个应用程序中运行任务/可运行程序,doSomething()函数用于在单个线程中排队并运行所有可运行程序。 但有一件事我搞不清楚,那就是什么时候给ExecutorService打电话。shutdown()方法。如果我这样称呼它: 它会破坏使用一个Thread的目的,因为如果在第二次调用doThin

  • 我已经为Postgresql启用了复制,并且正在使用PGPool进行负载平衡。 我在使用HikariCP甚至Apache DBCP连接到Postgres时遇到了问题。 在SpringBoot应用程序中有没有使用PGPool的方法? 请查找堆栈跟踪: 2018-08-10 10:20:19.124信息37879----[main]com.zaxxer.hikari.hikaridatasource: