当前位置: 首页 > 面试题库 >

如何将嵌套的Avro GenericRecord转换为行

别峻
2023-03-14
问题内容

我有一个代码可以使用功能将我的avro记录转换为Row avroToRowConverter()

directKafkaStream.foreachRDD(rdd -> {
        JavaRDD<Row> newRDD= rdd.map(x->{

            Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(SchemaRegstryClient.getLatestSchema("poc2"));
            return avroToRowConverter(recordInjection.invert(x._2).get());
            });

此功能不适用于嵌套模式(TYPE= UNION)

private static Row avroToRowConverter(GenericRecord avroRecord) {
    if (null == avroRecord) {
        return null;
    }
    //GenericData
    Object[] objectArray = new Object[avroRecord.getSchema().getFields().size()];
    StructType structType = (StructType) SchemaConverters.toSqlType(avroRecord.getSchema()).dataType();
    for (Schema.Field field : avroRecord.getSchema().getFields()) {

        if(field.schema().getType().toString().equalsIgnoreCase("STRING") || field.schema().getType().toString().equalsIgnoreCase("ENUM")){
            objectArray[field.pos()] = ""+avroRecord.get(field.pos());
        }else {
            objectArray[field.pos()] = avroRecord.get(field.pos());
        }
    }

    return new GenericRowWithSchema(objectArray, structType);
}

谁能建议我如何将复杂的架构转换为ROW?


问题答案:

有,SchemaConverters.createConverterToSQL但是不幸的是私人的。有一些PR可以将其公开,但是它们从未合并:

  • https://github.com/databricks/spark-avro/pull/89
  • https://github.com/databricks/spark-avro/pull/132

尽管我们使用了一种解决方法。

您可以通过在com.databricks.spark.avro包中创建一个类来公开它:

package com.databricks.spark.avro

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.DataType

object MySchemaConversions {
  def createConverterToSQL(avroSchema: Schema, sparkSchema: DataType): (GenericRecord) => Row =
    SchemaConverters.createConverterToSQL(avroSchema, sparkSchema).asInstanceOf[(GenericRecord) => Row]
}

然后,您可以在代码中使用它,如下所示:

final DataType myAvroType = SchemaConverters.toSqlType(MyAvroRecord.getClassSchema()).dataType();

final Function1<GenericRecord, Row> myAvroRecordConverter =
        MySchemaConversions.createConverterToSQL(MyAvroRecord.getClassSchema(), myAvroType);

Row[] convertAvroRecordsToRows(List<GenericRecord> records) {
    return records.stream().map(myAvroRecordConverter::apply).toArray(Row[]::new);
}

对于一条记录,您可以这样称呼它:

final Row row = myAvroRecordConverter.apply(record);


 类似资料:
  • 问题内容: 我是Hibernate和HQL的新手。我想在HQL中编写一个更新查询,其SQL等效项如下: 是的PK ,是的FK和PK 。有一对一的映射。 相应的Java类是Patient(患者)(具有lastName,firstName,doctorId字段)和Doctor(具有DoctorId字段)。 谁能告诉我上面的SQL查询的HQL等效项是什么? 非常感谢。 问题答案: 如果您检查规范,则可以

  • 当我试图使用ModelMapper将嵌套的java对象转换为嵌套的DTO时,我遇到了一个问题。在父DTO对象中,子DTO为null。以下是代码片段。 实体类: DTO的课程: 这是映射器代码: 输出: 输出用户DTO:UserDTO[名称=xyz,地址=null,产品=null] 在这里,我想将用户实体转换为UserDTO-dto。我得到了地址和产品DTO的空值。我在这里到底缺少什么?有人知道吗?

  • 问题内容: 我在Scala和Java之间遇到编译问题。 我的Java代码需要一个 我的scala代码有一个 我收到编译错误: 似乎scala.collection.JavaConversions不适用于嵌套集合,即使Vector可以隐式转换为Iterable。除了遍历scala集合并手动进行转换之外,我还能做些什么使这些类型起作用? 问题答案: 应该弃用恕我直言。您最好使用来明确说明转换的时间和地

  • 问题内容: 我是Python和Pandas的新手。我正在尝试将Pandas Dataframe转换为嵌套的JSON。函数.to_json()不能为我的目标提供足够的灵活性。 以下是数据框的一些数据点(在csv中,以逗号分隔): 有很多重复的信息,我想要一个这样的JSON: 我怎样才能做到这一点? 编辑: 再现数据帧的代码: 问题答案: 更新: 结果(格式化): 旧答案: 你可以用它做的,和方法:

  • 我遇到了一个问题,我的程序只在的一次迭代中结束,我不确定不合逻辑的代码在哪里。 全球给予: 我复杂的嵌套从这里开始,但我不确定如何修复它,以便它迭代通过和数字for-循环: 我得到以下输出: 我从来没有得到Susie的数据,我很早就回来了,但似乎不知道在哪里。对于问题所在(甚至重构)的任何帮助/指导都将不胜感激。提前谢谢!

  • 我认为将对象强制转换为相当简单,但是