当前位置: 首页 > 知识库问答 >
问题:

Spring Cloud Stream Kafka Streams入站KTable可预测的内部状态存储主题名称

黄俊智
2023-03-14

我们正在使用带有Spring Cloud Stream函数的Kafka Streams。我们有一个典型的示例应用程序,它将用户单击kstream与用户区域ktable结合在一起。

我们知道,在定义拓扑结构时,我们可以通过使用适当的方法来强制内部变更日志或重新分区主题的自定义名称,这些方法接受物化存储的名称:

  @Bean
  public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> bifunctionktable() {
    return (userClicksStream, userRegionsTable) -> userClicksStream
        .leftJoin(userRegionsTable,
            (clicks, region) -> new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks),
            Joined.with(Serdes.String(), Serdes.Long(), null, "bifunctionktable-leftjoin"))
        .map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(), regionWithClicks.getClicks()))
        .groupByKey(Grouped.with(Serdes.String(), Serdes.Long()).withName("bifunctionktable-groupbykey"))
        .reduce((firstClicks, secondClicks) -> firstClicks + secondClicks, Materialized.as("bifunctionktable-reduce"))
        .toStream();
  }

但是对于输入的KTable,我们不能更改它的状态存储内部主题名称,我们总是得到这个主题名称:myapp-id-user-区域-STATE-STORE-0000000001-变更日志

如果我们完全按照代码创建拓扑,那么我们确实有builder。表(最终字符串主题,最终物化)


共有1个答案

怀飞扬
2023-03-14

您可以使用以下属性为传入的KTable添加自定义名称:

spring.cloud.stream.kafka.streams.bindings.bifunctionktable-in-1.consumer.materializedAs: <Your-custom-store-name>

这在参考文件的本节中有记录。

 类似资料:
  • Java:OpenJdk 11Kafka:2.2.0Kafka流库:2.3.0 我试图在docker容器中部署我的Kafka streams应用程序,但在尝试创建带有TopicAuthorizationException的内部状态存储时失败。它在本地运行良好。本地和服务器上的主要区别在于,它连接到部署了Kafka的服务器,并使用常见的Kerberos身份验证进行身份验证。我无法理解身份验证与本地商

  • 我们有以下高级DSL处理拓扑: 简而言之,我们在上面做的是: null 其思想是创建窗口化事件计数,并将这些窗口化键用于联接和聚合操作(在KTable的情况下,这类操作没有窗口) 问题是:join和aggregate操作的状态存储没有保留机制,并导致磁盘(RocksDB)中的空间爆炸。 更具体地说:(跳跃)窗口会在键上产生笛卡尔积,并且没有删除旧窗口的机制。 请注意,支持table1和table2

  • 我有一个使用处理器api更新状态存储的拓扑,配置为复制因子3,ACKS=ALL 我试图找出这个changelog主题滞后的根本原因,因为我没有在这个处理器中发出任何外部请求。有对rocksdb状态存储的调用,但这些数据存储都是本地的,检索速度应该很快。 我的问题是这个变更日志主题的使用者到底是什么?

  • 我试图更好地理解如何设置我的集群来运行我的Kafka-Stream应用程序。我正试图更好地了解将涉及的数据量。 在这方面,虽然我可以很快地看到一个KTable需要一个状态存储,但我想知道从一个主题创建一个Kstream是否意味着立即将该主题的所有日志以一种仅追加的方式复制到状态存储中。也就是说,特别是如果我们要公开流以供查询? 当源主题是Kstream时,当它们在源主题中移动时,Kafka是否自动

  • 我正在开发使用Spring Cloud Stream构建的Kafka Streams应用程序。在这个应用程序中,我需要: 使用可在以后检索的连续消息流。 保留与某些条件匹配的邮件 ID 列表。 在单独的线程中,运行一个计划程序,该计划程序定期读出消息 ID,检索与这些 ID 匹配的相应消息,并对这些消息执行操作。 从列表中删除已处理的消息 ID,以便不重复工作。 我已考虑如下实施: < li >将