当前位置: 首页 > 知识库问答 >
问题:

KT可作为具有空键的主题的输入

赵明亮
2023-03-14

我对Kafka流很陌生,遇到了一个问题。

我有两个表 - 一个用于长期数据(描述),另一个用于实时数据(实时)。他们有一个共同的ID。

这个想法是存储来自描述的数据(假设在KTable中,为每个id保存最新的描述),当新消息出现在live - join中时,使用来自相应id上的描述的数据,并进一步发送它。

为简单起见,我们只使所有类型都为 String。

所以基本的想法就像我看过的所有教程一样:

interface Processor {

        @Input("live")
        KStream<String, String> input();

        @Input("descriptions")
        KTable<String, String> input();

        @Output("output")
        KStream<String, String> output();
    }

然后:

    @StreamListener
    @SendTo("output")
    public KStream<String, String> process(
            @Input("live") KStream<String, String> live,
            @Input("descriptions") KTable<String, String> descriptions) {
        // ...
    }

问题是描述主题不适合KTable(空键,只有消息)。

所以我不能将其用作输入,也不能创建任何新的中间主题来存储此表中的有效流(基本上是只读的)。

我在寻找某种记忆中的绑定目的地,但没有结果。

我认为可能的方式是创建一个中间输出,该输出仅将 KTable 存储在内存中或其他东西中,然后将此中间输出用作实时处理中的输入。喜欢:

    @StreamListener("descriptions")
    @SendTo("intermediate")
    public KTable<String, String> process(@Input("descriptions") KStream<String, String> descriptions) {
        // ...
    }

希望这种绑定语义是可能的。

共有1个答案

裴俊能
2023-03-14

我认为可以尝试通过引入初始处理器来引入一个用于存储键/值的中间主题。然后将该流用作常规处理器中输入的表。以下是一些模板。我正在使用SpringCloudStream中的新功能模型来编写这些处理器。

@Bean
public Function<KStream<String, String>, KStream<String, String>> processDescriptions() {

        return descriptions -> 
            descriptions.map((key, value) -> {
                Pojo p = parseIntoPojo(value);
                return new KeyValue<>(p.getId(), value);
            })
            .groupByKey()
            .reduce((v1, v2) -> v2)
            .toStream();
}

@Bean
public BiFunction<KStream<String, String>, KTable<String, String>, KStream<String, String>> realStream() {

    return (live, description) -> {

    }

}       

第一个处理器接收描述作为KStream,然后用键丰富它,然后输出为KStream。现在这个主题既有键又有值,我们可以在下一个处理器中使用它作为Ktable。下一个处理器是java.util.function.Bi函数,它接收两个输入并生成一个输出。输入分别是KStreamKtable,输出是KStream

您可以在它们上设置目的地,如下所示:

spring.cloud.stream.function.definition=prorcessDescriptions;realStream

spring.cloud.stream.bindings.processDescriptions-in-0.destinaion=description-topic
spring.cloud.stream.bindings.processDescriptions-out-0.destinaion=description-table-topic

spring.cloud.stream.bindings.realStream-in-0.destinaion=live-topic
spring.cloud.stream.bindings.realStream-in-1.destinaion=description-table-topic
spring.cloud.stream.bindings.realStream-out-0.destinaion=output

您也可以使用 StreamListener 方法获得相同的结果。

这种方法的缺点是,你需要在Kafka中维护一个额外的中间主题,但如果你真的希望它是一个< code>KTable并且底层信息是非键控的,我认为这里没有太多的选项。

如果您不需要顶级< code>KTable的描述,您可能能够以某种方式将它存储在一个状态存储中,然后在单个处理器中查询存储的所有内容。我还没有尝试过,所以你需要考虑一下这个想法。基本上,你得到两个流,现场和描述

(live, descriptions) -> Reduce key/value for descriptions and keep that in a state store. 
Then, do the processing on live by joining with what is in the state store. 

Kafka Streams允许各种方式来完成这样的事情。查看他们的参考文档以获取更多信息。

希望这有帮助。

 类似资料:
  • 我在oracle数据库中有一个包含客户数据的表。以下是一个简化的定义: 此表的主键是。 该表有许多行,其中为空。在数据库级别,没有问题,但是当我试图通过JPA实体访问这些行时,会导致一些问题: 1:使用

  • 我有两张桌子: 用户(用户名、密码) 配置文件(profileId,gender,dateofbirding,...) 目前我正在使用这种方法:每个Profile记录都有一个名为“userid”的字段作为外键,它链接到用户表。当用户注册时,他的配置文件记录将自动创建。 我对我朋友的建议感到困惑:将“userid”字段作为外部和主键,并删除“profileid”字段。哪种方法更好?

  • 问题内容: 我有一个以字典为参数的函数。我将传递给各种字典的字典,这些字典比函数内部使用的字典要多。另外,我想在函数定义中查看需要哪些键。所以我写 但是,该函数现在接受任何输入为。有没有聪明的写作方法 就像是 问题答案: 在python3.x中,您可以使用函数注释: 您甚至可以疯狂地使用现在被口译人员接受的文字 你可以从我的第一个例子中看到,注释不 执行 任何东西。您必须在函数本身中执行验证,尽管

  • 我正在使用Hibernate在数据库中保存一个对象。我正在生成带有@GeneratedValue注释的主键。 这里是我试图将数据保存到数据库中的代码。 当我运行这个时,我得到以下错误

  • 我已经读到,主题建模(从文本中提取可能的主题)最常用的技术是潜在的Dirichlet分配(LDA)。但最近我了解到另一款lda2vec。然而,我感兴趣的是尝试Word2Vec输出作为LDA的输入是否是一个好主意。 你认为为了一些研究而采用这种方法有意义吗?因为我正在做主题建模,所以需要一些新颖的方法。

  • 我有一个我觉得很神秘的问题。我在Google和StackOverflow上搜索过,没有发现任何人有类似的问题。我尝试将持久化提供程序切换到Hibernate,但我们的代码过于依赖EclipseLink特性,因此无法将其作为调试的实际选项。如果这个问题仍然存在(哈,哈;Java EE双关语),我很可能会为Hibernate重写所有持久性代码,如果可能的话。 我的一个实体被正确持久化到数据库,并且它的