当前位置: 首页 > 知识库问答 >
问题:

ApacheSpark:成对的RDD示例

郭元凯
2023-03-14

我有一个项目的RDD,还有一个函数d:(项目,项目)=

因此,我想得到一个RDD的抽样项目对(我将从中计算距离)。例如,我想获得100万对的样本。给定采样对的RDD,然后计算平均值、直方图等,以了解距离分布。

以下是所有失败的初步尝试:

>

使用。RDD自身的笛卡尔,然后。示例。这会失败(内存不足),因为显然不应该以这种方式使用笛卡尔

收集RDD的两个小样本,然后。zip这两个数组。这很好,但无法扩展。

有什么想法吗?

谢谢

编辑:下面是如何压缩每个分区中具有不同项数的两个示例:

val r = ... // RDD[Item]
val s1 = r.sample(true, 0.1, 123)
val s2 = r.sample(true, 0.1, 456)
val zipper = (i1: Iterator[Item], i2: Iterator[Item]) => i1.zip(i2)
val pairs = r1.zipPartitions(r2)(zipper) // zip the RDDs and explicitly define how to zip the partitions

关键是,虽然RDD的. zip方法不接受大小不等的分区,但迭代器的. zip方法接受(并丢弃较长迭代器的剩余部分)。

共有1个答案

暨正真
2023-03-14

回答我自己的问题:

  1. 获取rdd样品(带替换件),

代码:

import org.apache.spark.mllib.rdd.RDDFunctions._ // for .sliding
val x = ... // RDD[Item]
val xSize = x.count
val n = 1000000.0 // (approximate) desired sample size
val pairs = x.sample(true, n/xSize).sliding(2)
val distances = pairs.map(arr => dist(arr(0), arr(1)))
 类似资料:
  • 给定一个包含以下格式数据的大文件(V1,V2,…,VN) 我正在尝试使用Spark获得一个类似于下面的配对列表 我尝试了针对一个较旧的问题所提到的建议,但我遇到了一些问题。例如, 我得到了错误, 有人能告诉我哪些地方我可能做得不对,或者有什么更好的方法可以达到同样的效果?非常感谢。

  • 我正在使用ApacheSpark和Scala的MLlib。我需要转换一组向量 在标签点中,为了应用MLLib算法,每个向量由0.0(假)或1.0(真)的双值组成。所有向量都保存在RDD中,因此最终的RDD是 因此,在RDD中,有一些向量是用 我如何从这个RDD(data_tmp)或行矩阵(data)创建一个使用MLLib算法的标签点集?例如,我需要在这里应用SVMs线性alghoritms

  • 我有一个基于maven的scala/java混合应用程序,可以提交spar作业。我的应用程序jar“myapp.jar”在lib文件夹中有一些嵌套的jar。其中之一是“common.jar”。我在清单文件中定义了类路径属性,比如。Spark executor抛出在客户端模式下提交应用程序时出错。类(com/myapp/common/myclass.Class)和jar(common.jar)在那里

  • 我试图在火花笔记本的阿帕奇火花中做NLP。对于这个特定的例子,我正在使用库https://opennlp.apache.org创建一个块来提取名词短语。由于数据量的增加,我需要转向分布式计算。 问题是我无法广播我的chunker对象。通过阅读文档(只在board上投射数组等简单对象),我尝试了以下方法: 但这会引发以下错误: 如果我将chunker的初始化封装在函数中,然后在map方法中调用函数,

  • 译者:cangyunye 作者: Nathan Inkawhich 如果你正在阅读这篇文章,希望你能理解一些机器学习模型是多么有效。现在的研究正在不断推动ML模型变得更快、更准确和更高效。然而,在设计和训练模型中经常会忽视的是安全性和健壮性方面,特别是在面对欺骗模型的对手时。 本教程将提高您对ML模型安全漏洞的认识,并将深入探讨对抗性机器学习这一热门话题。您可能会惊讶地发现,在图像中添加细微的干扰

  • 我对斯卡拉和Spark都很陌生,所以如果我做错了,请原谅我。在接收csv文件,过滤和映射之后;我有一个RDD,它是一堆(字符串,双)对。 当我在RDD上使用.groupByKey()时, 得到一个有一堆(String,[Double])对的RDD。(我不知道CompactBuffer是什么意思,可能会导致我的问题?) 一旦他们被分组,我将尝试取平均值和标准偏差。我只想使用.mean()和.samp