当前位置: 首页 > 知识库问答 >
问题:

如何在流批处理流连接中定义连接条件?

公孙霖
2023-03-14

我使用的是SPARK-SQL-2.4.1V和Java1.8。和Kafka版本SPARK-SQL-KAFKA-0-10_2.11_2.4.3。

 Dataset<Row> streamingDs  = //read from kafka topic
 Dataset<Row> staticDf=  //read from oracle meta-data table.


Dataset<Row> joinDf = streamingDs.as("c").join(staticDf.as("i") ,
                      "c.code = i.industry_code"
                      );
Dataset<Row> joinDf = streamingDs.as("c").join(staticDf.as("i") ,
                      "c.code = i.industry_code",
                      "inner"
                      );

这会产生以下错误:

类型Dataset中的方法join(Dataset,String)不适用于参数(Dataset,String,String)

共有1个答案

洪子晋
2023-03-14

tl;drc.code=i.industry_code被认为是要联接的列的名称(而不是联接表达式)。

将代码更改为:

streamingDs.as("c").join(staticDf.as("i")) // INNER JOIN is the default
  .where("c.code = i.industry_code")
 类似资料:
  • 我有两个Kafka主题-和。第一个主题包含由唯一Id(称为)键入的recommendations对象。每个产品都有一个用户可以单击的URL。 主题获取通过单击推荐给用户的产品URL生成的消息。它是如此设置的,这些单击消息也由键控。 请注意 > 每个单击对象都会有一个相应的推荐对象。 click对象的时间戳将晚于Recommensions对象。 建议和相应的点击之间的间隔可能是几秒钟到几天(最多7天

  • 例如,我想在单个中组合和的流,因此结果应该是:。换句话说:如果第一个源已耗尽-从第二个源获取元素。我最近的尝试是: 也对datetime进行了类似的尝试,但结果相同。

  • 我正在尝试实现一个流,该流在其实现中使用自身的另一个实例。流前面有几个常量元素(使用IntStream.concat),所以只要连接的流懒散地创建非常量部分,这就可以工作。我想使用StreamSupport。intStream重载使用intStream获取供应商。concat(它“创建一个懒散连接的流”)应该足够懒惰,只在需要元素时创建第二个拆分器,但即使创建流(而不是计算流)也会溢出堆栈。我如何

  • 我想连接两个主题流(左连接),并在连接的流上进行基于窗口的聚合。然而,聚合将某些消息计数两倍,因为在连接期间,根据正确主题中的延迟,某些消息将发出两倍。以下是POC的代码。 它是否可以修复以避免因连接而重复?

  • 下面的代码片段是从JoinedStreams的javadoc复制的 这两个流仅基于一个键(通过< code>t =计算)进行连接 我会问我如何基于多个键进行连接,例如,one.a = two.a和

  • 我想加入一个 kstream:从主题创建,该主题具有JSON值。我使用值中的两个属性来重新键控流。示例值(json的片段)。我创建了自定义pojo类并使用自定义SERDES。 键映射为: 我查看了KStream并打印了键和我使用的属性。看起来都很好。 null 现在,当我执行内部连接并对主题进行窥视或通过/时,我看到键和值不匹配。Join似乎不起作用, 我有完全相同的东西通过ksql工作,但想做我