当前位置: 首页 > 知识库问答 >
问题:

在Spark-Scala中进行迭代连接有其他方法吗

毕宇
2023-03-14

用例是在给定列中找到最多n行(可以是n列数),一旦有了n个键,就可以将其连接回原始数据集,以获得所需的所有行

val v1 = actors.join(df, Seq("id"), "inner")
val v2 =actors.join(df, Seq("firstname"), "inner")
val v3 =actors.join(df, Seq("lastname"), "inner")
val output = v1.union(v2).union(v3)

共有1个答案

宣星光
2023-03-14

@Chlebek解决方案应该工作得很好,这是在您希望复制初始逻辑的情况下的另一种方法:

val cols = Seq("id", "firstname", "lastname")

val final_df = cols.map{
     df.join(actors, Seq(_), "inner") 
}
.reduce(_ union _)

首先,我们为每列生成一个内部联接,然后将它们联合起来。

 类似资料:
  • 嗨,我是Spark/Scala的新手,我一直在尝试-AKA失败,根据特定的递归公式在火花数据帧中创建一列: 这里是伪代码。 为了深入了解更多细节,这里是我的出发点:这个数据帧是在和个人级别上聚合的结果。 所有进一步的计算都必须针对特定的,并且必须考虑到前一周发生的事情。 为了说明这一点,我将这些值简化为0和1,删除了乘法器和,并将初始化为零。 到目前为止我所尝试的与所期望的 有没有办法做到这一点与

  • 我已经编写了以下代码,运行良好。但是我想连接UDF,这样代码可以压缩成几行。请建议我怎么做。下面是我编写的代码。

  • null 一些示例输出数据: *编辑:工作的scala代码行:

  • 迭代是什么意思? 我首先使用时间戳对dstream进行排序,假设数据是以单调递增的时间戳到达的(没有乱序)。 我需要一个全局HashMap X,我希望使用时间戳为“T1”的值更新它,然后使用“T1+1”的值更新它。由于X本身的状态会影响计算,所以它需要是一个线性运算。因此,在“t1+1”处的操作取决于HashMap X,而HashMap X取决于在“t1”处和之前的数据。 当一个人试图更新一个模型

  • 下面是Scala中的代码。我正在使用spark sql从hadoop中提取数据,对结果执行一些分组,序列化它,然后将消息写给Kafka。 我已经写了代码--但我想用函数的方式来写。我是否应该创建一个具有“get categories”函数的新类来从Hadoop中获取类别?我不知道如何处理这件事。 这是代码 提前谢谢你,苏约格

  • 迭代器不是集合,而是逐个访问集合元素的方法。 iterator it上的两个基本操作是next和hasNext 。 对it.next()调用将返回迭代器的下一个元素并提升迭代器的状态。 您可以使用Iterator的it.hasNext方法找出是否有更多元素要返回。 “逐步”迭代器返回的所有元素的最简单方法是使用while循环。 让我们按照以下示例程序进行操作。 例子 (Example) objec