我对Kafka流很陌生,遇到了一个问题。
我有两个表 - 一个用于长期数据(描述),另一个用于实时数据(实时)。他们有一个共同的ID。
这个想法是存储来自描述的数据(假设在KTable中,为每个id保存最新的描述),当新消息出现在live - join中时,使用来自相应id上的描述的数据,并进一步发送它。
为简单起见,我们只使所有类型都为 String。
所以基本的想法就像我看过的所有教程一样:
interface Processor {
@Input("live")
KStream<String, String> input();
@Input("descriptions")
KTable<String, String> input();
@Output("output")
KStream<String, String> output();
}
然后:
@StreamListener
@SendTo("output")
public KStream<String, String> process(
@Input("live") KStream<String, String> live,
@Input("descriptions") KTable<String, String> descriptions) {
// ...
}
问题是描述主题不适合KTable(空键,只有消息)。
所以我不能将其用作输入,也不能创建任何新的中间主题来存储此表中的有效流(基本上是只读的)。
我在寻找某种记忆中的绑定目的地,但没有结果。
我认为可能的方式是创建一个中间输出,该输出仅将 KTable 存储在内存中或其他东西中,然后将此中间输出用作实时处理中的输入。喜欢:
@StreamListener("descriptions")
@SendTo("intermediate")
public KTable<String, String> process(@Input("descriptions") KStream<String, String> descriptions) {
// ...
}
希望这种绑定语义是可能的。
我认为可以尝试通过引入初始处理器来引入一个用于存储键/值的中间主题。然后将该流用作常规处理器中输入的表。以下是一些模板。我正在使用SpringCloudStream中的新功能模型来编写这些处理器。
@Bean
public Function<KStream<String, String>, KStream<String, String>> processDescriptions() {
return descriptions ->
descriptions.map((key, value) -> {
Pojo p = parseIntoPojo(value);
return new KeyValue<>(p.getId(), value);
})
.groupByKey()
.reduce((v1, v2) -> v2)
.toStream();
}
@Bean
public BiFunction<KStream<String, String>, KTable<String, String>, KStream<String, String>> realStream() {
return (live, description) -> {
}
}
第一个处理器接收描述
作为KStream
,然后用键丰富它,然后输出为KStream
。现在这个主题既有键又有值,我们可以在下一个处理器中使用它作为Ktable
。下一个处理器是java.util.function.Bi函数
,它接收两个输入并生成一个输出。输入分别是KStream
和Ktable
,输出是KStream
。
您可以在它们上设置目的地,如下所示:
spring.cloud.stream.function.definition=prorcessDescriptions;realStream
spring.cloud.stream.bindings.processDescriptions-in-0.destinaion=description-topic
spring.cloud.stream.bindings.processDescriptions-out-0.destinaion=description-table-topic
spring.cloud.stream.bindings.realStream-in-0.destinaion=live-topic
spring.cloud.stream.bindings.realStream-in-1.destinaion=description-table-topic
spring.cloud.stream.bindings.realStream-out-0.destinaion=output
您也可以使用 StreamListener
方法获得相同的结果。
这种方法的缺点是,你需要在Kafka中维护一个额外的中间主题,但如果你真的希望它是一个< code>KTable并且底层信息是非键控的,我认为这里没有太多的选项。
如果您不需要顶级< code>KTable的描述,您可能能够以某种方式将它存储在一个状态存储中,然后在单个处理器中查询存储的所有内容。我还没有尝试过,所以你需要考虑一下这个想法。基本上,你得到两个流,现场和描述
(live, descriptions) -> Reduce key/value for descriptions and keep that in a state store.
Then, do the processing on live by joining with what is in the state store.
Kafka Streams允许各种方式来完成这样的事情。查看他们的参考文档以获取更多信息。
希望这有帮助。
我在oracle数据库中有一个包含客户数据的表。以下是一个简化的定义: 此表的主键是。 该表有许多行,其中为空。在数据库级别,没有问题,但是当我试图通过JPA实体访问这些行时,会导致一些问题: 1:使用
我有两张桌子: 用户(用户名、密码) 配置文件(profileId,gender,dateofbirding,...) 目前我正在使用这种方法:每个Profile记录都有一个名为“userid”的字段作为外键,它链接到用户表。当用户注册时,他的配置文件记录将自动创建。 我对我朋友的建议感到困惑:将“userid”字段作为外部和主键,并删除“profileid”字段。哪种方法更好?
问题内容: 我有一个以字典为参数的函数。我将传递给各种字典的字典,这些字典比函数内部使用的字典要多。另外,我想在函数定义中查看需要哪些键。所以我写 但是,该函数现在接受任何输入为。有没有聪明的写作方法 就像是 问题答案: 在python3.x中,您可以使用函数注释: 您甚至可以疯狂地使用现在被口译人员接受的文字 你可以从我的第一个例子中看到,注释不 执行 任何东西。您必须在函数本身中执行验证,尽管
我正在使用Hibernate在数据库中保存一个对象。我正在生成带有@GeneratedValue注释的主键。 这里是我试图将数据保存到数据库中的代码。 当我运行这个时,我得到以下错误
我已经读到,主题建模(从文本中提取可能的主题)最常用的技术是潜在的Dirichlet分配(LDA)。但最近我了解到另一款lda2vec。然而,我感兴趣的是尝试Word2Vec输出作为LDA的输入是否是一个好主意。 你认为为了一些研究而采用这种方法有意义吗?因为我正在做主题建模,所以需要一些新颖的方法。
我有一个我觉得很神秘的问题。我在Google和StackOverflow上搜索过,没有发现任何人有类似的问题。我尝试将持久化提供程序切换到Hibernate,但我们的代码过于依赖EclipseLink特性,因此无法将其作为调试的实际选项。如果这个问题仍然存在(哈,哈;Java EE双关语),我很可能会为Hibernate重写所有持久性代码,如果可能的话。 我的一个实体被正确持久化到数据库,并且它的