我正在使用spark-sql-2.4。1v和Java 8。我有一个用例,如下所示,
Dataset<Row> ds = //a Dataset<Row> read from DB
我需要根据另一个数据集的条目进行一些操作。
List<String> codesList = Array.asList("code1","code2")
Dataset<Row> codes = sc.createDataSet(codesList , Encoders.bean(String.class))
我需要并行处理所有代码。做同样的事情,我正在尝试如下:
Dataset<Row> ds_res = codes.map( x_cod -> //map throwing an error
calcFunction(sparkSession, filePath, ds ,x_cod );
}).reduce(new Function2<Dataset<Row> df1,Dataset<Row> df2) => df1.union(df2))
ds_res .write().path(filePath).mode("append").save();
public static Dataset<Row> calcFunction(sparkSession, filePath, ds ,x_cod ){
//some complex calculation based on x_cod
return ds_res ; // return ds_res for further processing
}
如何在集群上并行工作?
将列表编码到数据集比编码更可行。如果您计划使用bean类,您可以将其编码为该类型的Dataset
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import java.util.Arrays;
import java.util.List;
public class ParallelizeArray {
public static void main(String[] args) {
final SparkSession sparkSess = Constant.getSparkSess();
List<String> codesList = Arrays.asList("code1", "code2");
final Dataset<String> dataFrame = sparkSess.createDataset(codesList, Encoders.STRING());
dataFrame.write().mode(SaveMode.Append).csv("src/main/resources/paraArray");
}
}
或者使用
final Encoder<Dataset> bean = Encoders.bean(Dataset.class);
Dataset<Row> ds_res = codes.map((MapFunction<String, Dataset>) x_cod -> calcFunction(sparkSess, filePath, ds ,x_cod),bean)
.reduce((ReduceFunction<Dataset>) (df1, df2) -> df1.union(df2));
public static Dataset<Row> calcFunction(SparkSession sparkSession, String filePath, Dataset<Row> ds ,String x_cod ){
Dataset<Row> ds_res = null;//some complex calculation based on x_cod
return ds_res ; // return ds_res for further processing
}
问题内容: 我正在尝试检索JSON对象(我已验证的文件格式正确)中的数据,并将数据输出到firebug控制台中。我使用JSONLint(http://jsonlint.com/)验证了JSON,并且知道数据没有在JSON对象中返回,因为当我记录它时,它是以文本而不是对象的形式记录的。当我查看ajax帖子时,有一个JSON选项卡,其中显示了该对象,出于某种原因,我只是无法检索它。 我的ajax电话是
我想发送这个示例数组 作为二进制数据发送到我的websocket服务器。在服务器端,我希望将二进制数据解码回数组,进行更改并将二进制数据发送回客户端。最后在客户端,如何将二进制数据解码回数组? 示例截图我的意思是: 这是我的实际代码: 服务器端代码: 我现在可以将消息作为二进制帧发送到websocket服务器。我找到了将字符串转换为二进制类型并将其发送到ws-server的函数。 现在我有问题了。
我有以下两个场景共享的前奏代码: 现在,我想将df转换为pyspark数据帧(
我正在使用Flink表API,使用Java将数据集转换为数据流....以下是我的代码: ExpressionException:JavaStreamingTranslator的根无效:Root(ArraySeq((related_value,Double),(ref_id,String)))。您尝试将基于数据集的表转换为数据流吗?我想知道我们如何使用Flink表API将DataSet转换为Data
I一种方法,它获取一个数字列表(例如,ArrayList),并将其转换为一个排序集(例如,TreeSet),然后返回它。我写了代码,但我有一些问题。 我的问题主要是:
scikit-learn 提供了一个用于转换数据集的库, 它也许会 clean(清理)(请参阅 预处理数据), reduce(减少)(请参阅 无监督降维), expand(扩展)(请参阅 内核近似)或 generate(生成)(请参阅 特征提取) feature representations(特征表示). 像其它预估计一样, 它们由具有 fit 方法的类来表示, 该方法从训练集学习模型参数(例如