当前位置: 首页 > 知识库问答 >
问题:

基于公共字段的Kstream连接

秦德海
2023-03-14

我们希望基于公共字段(主键)执行Kstream Kstream连接。目前,使用下面的代码,我们得到的结果是只合并了两个流,没有任何主键约束。

val userRegions: KStream[String, String] = builder.stream(inputTopic1)
val regionMetrics: KStream[String, String] = builder.stream(inputTopic2)


userRegions.join(regionMetrics)(
  ((regionValue, metricValue) => regionValue + "/" + metricValue),
  JoinWindows.of(Duration.ofMinutes(5L))
).to(outputTopicName)

您能建议如何根据公共字段/列连接2个流吗。

共有1个答案

满子实
2023-03-14

为了加入基于公共字段/列的2个流,您可以使用selectKey()函数,我将为您提供一个片段,可以帮助您。

val userRegions: KStream[String, String] = builder.stream(inputTopic1)
val regionMetrics: KStream[String, String] = builder.stream(inputTopic2)

// New code
val userRegionsWithKeys = userRegions.selectKey(new ValueMapper (String key, String Value) {
   // create your key here and return it
   // Please the syntax for Scala
    @override
    void apply (String key, String value) {
       return "key that you want" 
    }
});

val regionMetricsWithKeys = regionMetrics.selectKey(new ValueMapper (String key, String Value) {
   // create your key here and return it
   // Please the syntax for Scala
    @override
    void apply (String key, String value) {
       return "key that you want" 
    }
});


userRegionsWithKeys .join(regionMetricsWithKeys )(
  ((regionValue, metricValue) => regionValue + "/" + metricValue),
  JoinWindows.of(Duration.ofMinutes(5L))
).to(outputTopicName)

我希望这个解决方案能帮助你。

非常感谢。

 类似资料:
  • 我正在尝试执行kstream-kstream之间的内部连接。我注意到,当来自两个KStreams的消息都具有复合键(例如,具有许多属性的java pojo)时,即使用作复合键的pojo都实现了hashCode()和equals(Object o)方法,联接也不起作用。 uniqueidKey.java 当两个KStreams都有带有简单基元键(例如String、int、double)的消息时,内部

  • 教学应用案例 公共基础课 胡涛——国家精品课《化学与社会》 链接 :https://www.icourse163.org/live/view/480000001951319.htm "慕课堂帮助胡涛教授量化学生过程性评价,激励学生内在学习动机,极大提高课程通过率,挂科率一度跌至0。" 传统的师讲生听“讲座式”教学模式理论上是一种比较高效的知识传达方式。但是,它是不是一个高效的学生学习知识的方式呢?

  • 我正在尝试从Java切换到Kotlin。但我有很多遗留代码和第三方库。我看到,Java类中经常存在没有getter和setter的公共字段,这些字段必须从其他类访问。如果没有Kotlin代码中的getter,我如何访问Java类的公共字段?

  • null 我所尝试的:(我想是愚蠢的) 我有一个类,该对象应该作为共享,这样我就可以将其作为全局使用,而无需实例化它,例如. 这不起作用,我无法将在一个线程中接收到的数据发送到另一个线程。 我知道有一个明显的锁定问题,因为如果一个线程正在写一个对象,其他线程不能访问它,直到第一个线程完成了写。 和类。 我认为,我创建动态线程来为每个连接的客户机服务的方式(共享相同的数据源)不可能使用Global

  • 我有一个Angular应用程序,有一个API,它连接并调用一个在服务器上建立的数据库,该数据库具有一个公共IP。 第二,你有什么想法为什么会发生这种情况吗? 谢谢。

  • 问题内容: 在我多年的编程工作中,我经常创建一些类,这些类仅将一些变量与其设置器和获取器组合在一起。我已经看到了这些类型的对象,这些对象称为值对象,域对象或模型对象,具体取决于使用它们的上下文。通用用法最合适的术语似乎是数据传输对象(DTO)。这描述了仅包含访问器和更改器的POJO。 我刚刚编写了一个这样的对象,其中包含大约五十个用于在图表上设置主题参数的字段。现在,我想知道是否应该将这些字段声明