当前位置: 首页 > 面试题库 >

行类型Spark数据集的编码器

朱通
2023-03-14
问题内容

我想为我正在执行的映射操作在DataSet中为Row类型编写一个编码器。本质上,我不了解如何编写编码器。

以下是地图操作的示例

In the example below, instead of returning Dataset<String>, I would like to return Dataset<Row>

Dataset<String> output = dataset1.flatMap(new FlatMapFunction<Row, String>() {
            @Override
            public Iterator<String> call(Row row) throws Exception {

                ArrayList<String> obj = //some map operation
                return obj.iterator();
            }
        },Encoders.STRING());

我知道,编码器需要编写如下字符串,而不是字符串:

    Encoder<Row> encoder = new Encoder<Row>() {
        @Override
        public StructType schema() {
            return join.schema();
            //return null;
        }

        @Override
        public ClassTag<Row> clsTag() {
            return null;
        }
    };

但是,我不了解编码器中的clsTag(),并且我试图找到一个可以演示类似内容的运行示例(即,用于行类型的编码器)

编辑-
这不是所提问题的副本:尝试将数据框行映射到更新行时出现编码器错误,因为答案涉及在Spark
2.x中使用Spark 1.x(我没有这样做),我也在寻找用于Row类的编码器,而不是解决错误。最后,我正在寻找Java而不是Scala中的解决方案。


问题答案:

答案是使用RowEncoder以及使用StructType的数据集的架构。

以下是使用数据集进行平面图操作的工作示例:

    StructType structType = new StructType();
    structType = structType.add("id1", DataTypes.LongType, false);
    structType = structType.add("id2", DataTypes.LongType, false);

    ExpressionEncoder<Row> encoder = RowEncoder.apply(structType);

    Dataset<Row> output = join.flatMap(new FlatMapFunction<Row, Row>() {
        @Override
        public Iterator<Row> call(Row row) throws Exception {
            // a static map operation to demonstrate
            List<Object> data = new ArrayList<>();
            data.add(1l);
            data.add(2l);
            ArrayList<Row> list = new ArrayList<>();
            list.add(RowFactory.create(data.toArray()));
            return list.iterator();
        }
    }, encoder);


 类似资料:
  • 数字类型 ByteType:代表一个字节的整数。范围是-128到127 ShortType:代表两个字节的整数。范围是-32768到32767 IntegerType:代表4个字节的整数。范围是-2147483648到2147483647 LongType:代表8个字节的整数。范围是-9223372036854775808到9223372036854775807 FloatType:代表4字节的单

  • 错误:错误:(49,9)找不到存储在数据集中的类型的编码器。导入spark.implicits支持基元类型(Int、String等)和产品类型(case类)。_在以后的版本中将增加对序列化其他类型的支持。.map(msg=>{

  • 我喜欢Spark数据集,因为它们在编译时会给我带来分析错误和语法错误,还允许我使用getter而不是硬编码的名称/数字。大多数计算都可以通过DataSet的高级API完成。例如,通过访问Dataset类型对象的执行agg、select、sum、avg、map、filter或groupBy操作要比使用RDD行的数据字段简单得多。 但是其中缺少join操作,我读到我可以像这样执行join操作 我使用的

  • 问题内容: 我正在编写一个特定于美国的Web应用程序,因此其他国家/地区用于邮政编码的格式并不重要。我有一份我们要加载到包含以下内容的数据库表中的邮政编码列表: 5位美国邮政编码 纬度 经度 usps分类代码 状态码 城市 邮政编码是主键,因为这是我要查询的内容。我开始使用中型int 5,但是它会截断前导零的邮政编码。 我考虑使用char5,但担心对char变量建立索引会对性能造成影响。 所以我的

  • 我试图理解Dataset和data frame之间的区别,并找到了以下有用的链接,但我无法理解类型安全是什么意思? DataFrame(在Spark 2.0中,即DataSet[Row])与Spark中RDD之间的差异

  • 问题内容: 当我仔细观察时,我唯一提出的疑问是: 找不到适用于实际参数“ org.apache.spark.unsafe.types.UTF8String”的适用构造函数/方法;候选者为:“ public void sparkSQL.Tweet.setId(long)” 问题答案: 正如@ user9718686所写,id字段具有不同的类型:在json文件和类定义中。当您将其读入时,Spark会从