当前位置: 首页 > 知识库问答 >
问题:

Apache Spark:在Java中有效地使用mapPartitions

裴金鑫
2023-03-14

在目前早期发布的名为高性能Spark的教科书中,Spark的开发人员注意到:

为了使Spark能够灵活地将一些记录溢出到磁盘上,在MapPartitions中表示函数是很重要的,这样函数就不会强制将整个分区加载到内存中(例如隐式转换为列表)。迭代器有许多方法,我们可以在上面编写函数样式转换,或者您可以构造自己的自定义迭代器。当一个转换直接获取并返回一个迭代器而不强制它通过另一个集合时,我们称之为迭代器到迭代器转换。

def transformRows(iter: Iterator[Row]): Iterator[Row] = iter.map(transformRow)
sqlContext.createDataFrame(df.rdd.mapPartitions(transformRows), newSchema).show
JavaRDD<OutObj> collection = prevCollection.mapPartitions((Iterator<InObj> iter) -> {
    ArrayList<OutObj> out = new ArrayList<>();
    while(iter.hasNext()) {
        InObj current = iter.next();
        out.add(someChange(current));
    }
    return out.iterator();
});

注意:虽然使用MapPartitions的这8行代码可以写成带有MapFlatMap的一行代码,但我有意使用MapPartitions来利用这样一个事实,即它在每个分区上操作,而不是在RDD中的每个元素上操作。

有什么想法吗?

共有1个答案

吴胜涝
2023-03-14

防止强制“物化”整个分区的一种方法是将迭代器转换为流,然后使用Stream的函数API(例如map函数)。

如何将迭代器转换为流?建议了一些将迭代器转换为的好方法,因此采用这里建议的选项之一,我们可以得到以下结果:

rdd.mapPartitions((Iterator<InObj> iter) -> {
    Iterable<InObj> iterable = () -> iter;
    return StreamSupport.stream(iterable.spliterator(), false)
            .map(s -> transformRow(s)) // or whatever transformation
            .iterator();
});

这应该是一个“itrator-to-iterator”转换,因为所使用的所有中间API(iterablestream)都是缓慢计算的。

编辑:我自己还没有测试过它,但是OP评论说,我引用,“在列表上使用流并没有提高效率”。我不知道为什么会这样,我也不知道总的来说这是否是真的,但值得一提。

 类似资料:
  • 问题内容: 我一直在使用JSONObject和JSONReader,但理想情况下,我正在寻找一种混合动力:) 特别是,给定JSON对象流(任意长的JSON数组的一部分),是否有一个帮助程序/库,它一次生成迭代器样式的“ JSONObject”,而无需读取其中的所有内容或不必解析单个原始字段(JsonReader )? 假设API的示例: 上面,调用readObject解析一些复杂的JSON子树,并

  • 我有一个基于maven的scala/java混合应用程序,可以提交spar作业。我的应用程序jar“myapp.jar”在lib文件夹中有一些嵌套的jar。其中之一是“common.jar”。我在清单文件中定义了类路径属性,比如。Spark executor抛出在客户端模式下提交应用程序时出错。类(com/myapp/common/myclass.Class)和jar(common.jar)在那里

  • 我正在使用ApacheSpark和Scala的MLlib。我需要转换一组向量 在标签点中,为了应用MLLib算法,每个向量由0.0(假)或1.0(真)的双值组成。所有向量都保存在RDD中,因此最终的RDD是 因此,在RDD中,有一些向量是用 我如何从这个RDD(data_tmp)或行矩阵(data)创建一个使用MLLib算法的标签点集?例如,我需要在这里应用SVMs线性alghoritms

  • 问题内容: 我想知道是否有可能在SAS中使用proc sql从宽有效地转换成长有效。 我知道proc换位比我在下面建议的方法要快得多。但是我的目标之一是避免存储转置表。 例如,假设我有table1作为 我想把它变成 我可以做到这一点; 选择id,’A’作为col1,A作为 来自table1的col2 , 其中A〜=“” 联合选择id,’B’作为col1,B作为 来自table1的col2 , 其中

  • 产品用例——我们的产品有一个典型的用例,我们将有n个用户。每个用户将有n个工作流,每个工作流可以在任何时间运行(n次)。 我希望这是任何工作流产品的典型用例。 我可以使用域来区分用户吗(我的意思是说为每个用户创建一个域)? 我可以为每个用户创建一个WorkflowClient来服务他所有的工作流执行吗?或者对于每个请求,我需要创建一个工作流客户端吗?哪一个是推荐的方法? 创建工作对象以轮询任务列表

  • 我正在尝试创建一个spark应用程序,它对创建、读取、写入和更新MySQL数据非常有用。那么,有没有办法使用Spark创建一个MySQL表? 下面是在MySQL数据库中创建表的Scala JDBC代码。我怎样才能通过Spark做到这一点?