我想为我正在执行的映射操作在DataSet中为Row类型编写一个编码器。本质上,我不了解如何编写编码器。
以下是地图操作的示例:
In the example below, instead of returning Dataset<String>, I would like to return Dataset<Row>
Dataset<String> output = dataset1.flatMap(new FlatMapFunction<Row, String>() {
@Override
public Iterator<String> call(Row row) throws Exception {
ArrayList<String> obj = //some map operation
return obj.iterator();
}
},Encoders.STRING());
我知道,编码器需要编写如下字符串,而不是字符串:
Encoder<Row> encoder = new Encoder<Row>() {
@Override
public StructType schema() {
return join.schema();
//return null;
}
@Override
public ClassTag<Row> clsTag() {
return null;
}
};
但是,我不了解编码器中的clsTag(),并且我试图找到一个可以演示类似内容的运行示例(即,用于行类型的编码器)
编辑-
这不是所提问题的副本:尝试将数据框行映射到更新行时出现编码器错误,因为答案涉及在Spark
2.x中使用Spark 1.x(我没有这样做),我也在寻找用于Row类的编码器,而不是解决错误。最后,我正在寻找Java而不是Scala中的解决方案。
答案是使用RowEncoder以及使用StructType的数据集的架构。
以下是使用数据集进行平面图操作的工作示例:
StructType structType = new StructType();
structType = structType.add("id1", DataTypes.LongType, false);
structType = structType.add("id2", DataTypes.LongType, false);
ExpressionEncoder<Row> encoder = RowEncoder.apply(structType);
Dataset<Row> output = join.flatMap(new FlatMapFunction<Row, Row>() {
@Override
public Iterator<Row> call(Row row) throws Exception {
// a static map operation to demonstrate
List<Object> data = new ArrayList<>();
data.add(1l);
data.add(2l);
ArrayList<Row> list = new ArrayList<>();
list.add(RowFactory.create(data.toArray()));
return list.iterator();
}
}, encoder);
数字类型 ByteType:代表一个字节的整数。范围是-128到127 ShortType:代表两个字节的整数。范围是-32768到32767 IntegerType:代表4个字节的整数。范围是-2147483648到2147483647 LongType:代表8个字节的整数。范围是-9223372036854775808到9223372036854775807 FloatType:代表4字节的单
错误:错误:(49,9)找不到存储在数据集中的类型的编码器。导入spark.implicits支持基元类型(Int、String等)和产品类型(case类)。_在以后的版本中将增加对序列化其他类型的支持。.map(msg=>{
我喜欢Spark数据集,因为它们在编译时会给我带来分析错误和语法错误,还允许我使用getter而不是硬编码的名称/数字。大多数计算都可以通过DataSet的高级API完成。例如,通过访问Dataset类型对象的执行agg、select、sum、avg、map、filter或groupBy操作要比使用RDD行的数据字段简单得多。 但是其中缺少join操作,我读到我可以像这样执行join操作 我使用的
问题内容: 我正在编写一个特定于美国的Web应用程序,因此其他国家/地区用于邮政编码的格式并不重要。我有一份我们要加载到包含以下内容的数据库表中的邮政编码列表: 5位美国邮政编码 纬度 经度 usps分类代码 状态码 城市 邮政编码是主键,因为这是我要查询的内容。我开始使用中型int 5,但是它会截断前导零的邮政编码。 我考虑使用char5,但担心对char变量建立索引会对性能造成影响。 所以我的
我试图理解Dataset和data frame之间的区别,并找到了以下有用的链接,但我无法理解类型安全是什么意思? DataFrame(在Spark 2.0中,即DataSet[Row])与Spark中RDD之间的差异
问题内容: 当我仔细观察时,我唯一提出的疑问是: 找不到适用于实际参数“ org.apache.spark.unsafe.types.UTF8String”的适用构造函数/方法;候选者为:“ public void sparkSQL.Tweet.setId(long)” 问题答案: 正如@ user9718686所写,id字段具有不同的类型:在json文件和类定义中。当您将其读入时,Spark会从