当前位置: 首页 > 知识库问答 >
问题:

javaAPI中是否有将数据集转换为map()并返回数据集?

单于承
2023-03-14

我正在使用spark-sql-2.4。1v和Java 8。我有一个用例,如下所示,

Dataset<Row> ds = //a Dataset<Row> read from DB

我需要根据另一个数据集的条目进行一些操作。

List<String> codesList = Array.asList("code1","code2")
Dataset<Row> codes = sc.createDataSet(codesList , Encoders.bean(String.class))

我需要并行处理所有代码。做同样的事情,我正在尝试如下:

 Dataset<Row> ds_res =  codes.map( x_cod ->   //map throwing an error
        calcFunction(sparkSession, filePath, ds ,x_cod );
    }).reduce(new Function2<Dataset<Row> df1,Dataset<Row> df2) => df1.union(df2))

 ds_res .write().path(filePath).mode("append").save();

    public static Dataset<Row> calcFunction(sparkSession, filePath, ds ,x_cod ){
         //some complex calculation based on x_cod 

        return ds_res ; // return ds_res  for further processing
    }

如何在集群上并行工作?

共有1个答案

李振国
2023-03-14

将列表编码到数据集比编码更可行。如果您计划使用bean类,您可以将其编码为该类型的Dataset

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;

import java.util.Arrays;
import java.util.List;

public class ParallelizeArray {

    public static void main(String[] args) {
        final SparkSession sparkSess = Constant.getSparkSess();
        List<String> codesList = Arrays.asList("code1", "code2");
        final Dataset<String> dataFrame = sparkSess.createDataset(codesList, Encoders.STRING());
        dataFrame.write().mode(SaveMode.Append).csv("src/main/resources/paraArray");
    }
}

或者使用

final Encoder<Dataset> bean = Encoders.bean(Dataset.class);
Dataset<Row> ds_res = codes.map((MapFunction<String, Dataset>) x_cod -> calcFunction(sparkSess, filePath, ds ,x_cod),bean)
                .reduce((ReduceFunction<Dataset>) (df1, df2) -> df1.union(df2));



    public static Dataset<Row> calcFunction(SparkSession sparkSession, String filePath, Dataset<Row> ds ,String x_cod ){
        Dataset<Row> ds_res = null;//some complex calculation based on x_cod
        return ds_res ; // return ds_res  for further processing
    }

 类似资料:
  • 问题内容: 我正在尝试检索JSON对象(我已验证的文件格式正确)中的数据,并将数据输出到firebug控制台中。我使用JSONLint(http://jsonlint.com/)验证了JSON,并且知道数据没有在JSON对象中返回,因为当我记录它时,它是以文本而不是对象的形式记录的。当我查看ajax帖子时,有一个JSON选项卡,其中显示了该对象,出于某种原因,我只是无法检索它。 我的ajax电话是

  • 我想发送这个示例数组 作为二进制数据发送到我的websocket服务器。在服务器端,我希望将二进制数据解码回数组,进行更改并将二进制数据发送回客户端。最后在客户端,如何将二进制数据解码回数组? 示例截图我的意思是: 这是我的实际代码: 服务器端代码: 我现在可以将消息作为二进制帧发送到websocket服务器。我找到了将字符串转换为二进制类型并将其发送到ws-server的函数。 现在我有问题了。

  • 我有以下两个场景共享的前奏代码: 现在,我想将df转换为pyspark数据帧(

  • 我正在使用Flink表API,使用Java将数据集转换为数据流....以下是我的代码: ExpressionException:JavaStreamingTranslator的根无效:Root(ArraySeq((related_value,Double),(ref_id,String)))。您尝试将基于数据集的表转换为数据流吗?我想知道我们如何使用Flink表API将DataSet转换为Data

  • I一种方法,它获取一个数字列表(例如,ArrayList),并将其转换为一个排序集(例如,TreeSet),然后返回它。我写了代码,但我有一些问题。 我的问题主要是:

  • scikit-learn 提供了一个用于转换数据集的库, 它也许会 clean(清理)(请参阅 预处理数据), reduce(减少)(请参阅 无监督降维), expand(扩展)(请参阅 内核近似)或 generate(生成)(请参阅 特征提取) feature representations(特征表示). 像其它预估计一样, 它们由具有 fit 方法的类来表示, 该方法从训练集学习模型参数(例如