在Apache Flink中,如果我在一个主键上连接两个数据集,我会得到一个元组2,其中包含每个数据集中相应的数据集条目。
问题是,当将map()
方法应用于即将到来的tuple 2数据集时,它看起来并不漂亮,尤其是如果两个数据集的条目都具有大量功能。
在两个输入数据集中使用元组会给我一些这样的代码:
var in1: DataSet[(Int, Int, Int, Int, Int)] = /* */
var in2: DataSet[(Int, Int, Int, Int)] = /* */
val out = in1.join(in2).where(0, 1, 2).equalTo(0, 1, 2)
.map(join => (join._1._1, join._1._2, join._1._3,
join._1._4, join._1._5, join._2._4))
我不介意使用POJO或case类,但我不明白这会如何使它变得更好。
问题1:有没有一个很好的方法来扁平化元组2?例如使用另一个运营商。
问题 2:如何处理同一键上 3 个数据集的联接?这将使示例源更加混乱。
谢谢你的帮助。
您可以在每对连接的元素上直接应用连接函数,例如
val leftData: DataSet[(String, Int, Int)] = ...
val rightData: DataSet[(String, Int)] = ...
val joined: DataSet[(String, Int, Int)] = leftData
.join(rightData).where(0).equalTo(0) { (l, r) => (l._1, l._2, l._3 + r._2) ) }
为了回答第二个问题,Flink 只处理二进制联接。但是,Flink 的优化器可以避免执行不必要的随机操作,前提是您给出有关函数行为的提示。Forward Field 批注告诉优化程序,某些字段(如连接键)尚未被连接函数修改,并允许重用现有的分区和排序。
安装(下载 这是Flink的默认配置。 关于这里发生了什么事,有什么建议吗?
问题内容: 和之间有什么区别? 交叉加入: 内部联接: 哪一种更好,为什么我要使用其中一种呢? 问题答案: 交叉联接不会合并行,如果每个表中有100行且1对1匹配,您将得到10.000个结果,Innerjoin在相同情况下将仅返回100行。 这两个示例将返回相同的结果: 交叉联接 内部联接 使用最后一种方法
我想在Apache Flink中实现以下场景: 给定一个具有4个分区的Kafka主题,我想使用不同的逻辑在Flink中独立处理分区内数据,具体取决于事件的类型。 特别是,假设输入Kafka主题包含前面图像中描述的事件。每个事件具有不同的结构:分区1具有字段“a”作为关键字,分区2具有字段“b”作为关键字,等等。在Flink中,我希望根据事件应用不同的业务逻辑,所以我认为我应该以某种方式分割流。为了
我有2个使用kafka主题创建的流,我正在使用DataStream API加入它们。我希望将连接(应用)的结果发布到另一个kafka主题。我在外部主题中看不到连接的结果。 我确认我向两个源主题发布了正确的数据。不确定哪里出了问题。下面是代码片段, 创建的流如下所示。 流连接使用等于的连接执行,如下所示。 如下所述, 有什么线索吗,哪里出了问题?我可以在拓扑中看到可用的消息,谢谢
问题内容: 我创建了在9个表上使用INNER JOIN的SQL命令,无论如何,此命令将花费很长时间(超过五分钟)。所以我的同事建议我将INNER JOIN更改为LEFT JOIN,因为尽管我知道,但LEFT JOIN的性能更好。更改后,查询速度得到了显着提高。 我想知道为什么LEFT JOIN比INNER JOIN快? 我的SQL命令看起来象下面这样: 等 更新: 这是我的架构的简要介绍。 问题答
动机:我正在设计一个access数据库来跟踪我员工的培训状态。每个员工都有基于其角色的培训要求(例如,行政助理必须接受记录管理培训和电话礼仪,而电工必须接受危险安全培训)。我希望能够生成一份报告,显示每个人所需的所有培训,以及完成的培训课程的完成日期。 模式I有以下表格: 人员——列出员工信息,例如姓名、电话、电子邮件;键: 我创建了一个查询来匹配到他们当前的: 这是正确的,并告诉我每个人当前的培