当前位置: 首页 > 知识库问答 >
问题:

如何在scala中将RDD[GenericRecord]转换为数据帧?

谷梁煌
2023-03-14

我用Avro(序列化器和反序列化器)收到Kafka主题的推文。然后,我创建了一个spark consumer,它在RDD[GenericRecord]的数据流中提取推文。现在,我想将每个rdd转换为数据帧,通过SQL分析这些推文。有什么解决方案可以将RDD[GenericRecord]转换为数据帧吗?

共有3个答案

范弘亮
2023-03-14

尽管这样的事情可能会对你有所帮助,

val stream = ...

val dfStream = stream.transform(rdd:RDD[GenericRecord]=>{
     val df = rdd.map(_.toSeq)
              .map(seq=> Row.fromSeq(seq))
              .toDF(col1,col2, ....)

     df
})

我想建议您一种替代方法。使用Spark 2. x,您可以跳过创建DStreams的整个过程。相反,您可以使用结构化流执行类似的操作,

val df = ss.readStream
  .format("com.databricks.spark.avro")
  .load("/path/to/files")

这将为您提供一个可以直接查询的单个数据框。在这里,ss是火花会话的实例。/path/to/files是从kafka转储所有avro文件的地方。

PS:您可能需要导入spark avro

libraryDependencies += "com.databricks" %% "spark-avro" % "4.0.0"

希望有帮助干杯

孔彭祖
2023-03-14

组合https://stackoverflow.com/a/48828303/5957143和https://stackoverflow.com/a/47267060/5957143适合我。

我使用以下内容创建MySchemaConversion

package com.databricks.spark.avro

import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.DataType

object MySchemaConversions {
  def createConverterToSQL(avroSchema: Schema, sparkSchema: DataType): (GenericRecord) => Row =
    SchemaConverters.createConverterToSQL(avroSchema, sparkSchema).asInstanceOf[(GenericRecord) => Row]
}

然后我用

val myAvroType = SchemaConverters.toSqlType(schema).dataType
val myAvroRecordConverter = MySchemaConversions.createConverterToSQL(schema, myAvroType)

//unionedResultRdd是unionRDD[通用记录]

var rowRDD = unionedResultRdd.map(record => MyObject.myConverter(record, myAvroRecordConverter))
 val df = sparkSession.createDataFrame(rowRDD , myAvroType.asInstanceOf[StructType])

对象MyObject中使用myConverter的优点是不会遇到序列化问题(java.io.NotSerializableException)。

object MyObject{
    def myConverter(record: GenericRecord,
        myAvroRecordConverter: (GenericRecord) => Row): Row =
            myAvroRecordConverter.apply(record)
}
辛承志
2023-03-14

我花了一些时间试图使这项工作(特别是如何正确地反序列化数据,但看起来你已经涵盖了这一点)。。。已更新

  //Define function to convert from GenericRecord to Row
  def genericRecordToRow(record: GenericRecord, sqlType : SchemaConverters.SchemaType): Row = {
    val objectArray = new Array[Any](record.asInstanceOf[GenericRecord].getSchema.getFields.size)
    import scala.collection.JavaConversions._
    for (field <- record.getSchema.getFields) {
      objectArray(field.pos) = record.get(field.pos)
    }

    new GenericRowWithSchema(objectArray, sqlType.dataType.asInstanceOf[StructType])
  }

//Inside your stream foreachRDD
val yourGenericRecordRDD = ... 
val schema = new Schema.Parser().parse(...) // your schema
val sqlType = SchemaConverters.toSqlType(new Schema.Parser().parse(strSchema))

var rowRDD = yourGeneircRecordRDD.map(record => genericRecordToRow(record, sqlType))
val df = sqlContext.createDataFrame(rowRDD , sqlType.dataType.asInstanceOf[StructType])

如您所见,我正在使用SchemaConverter从您用来反序列化的模式中获取数据帧结构(使用模式注册表可能会更痛苦)。为此,您需要以下依赖项

    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-avro_2.11</artifactId>
        <version>3.2.0</version>
    </dependency>

你将需要改变你的火花版本取决于你的。

更新:上面的代码仅适用于平面avro模式。

对于嵌套结构,我使用了一些不同的东西。您可以复制类SchemaConverters,它必须在com.databricks.spark.avro(它使用了一些来自datricks包的受保护的类)中,或者您可以尝试使用spark-bigQuery依赖项。默认情况下,该类是不可访问的,因此您需要在包com.databricks.spark.avro中创建一个类来访问工厂方法。

package com.databricks.spark.avro

import com.databricks.spark.avro.SchemaConverters.createConverterToSQL
import org.apache.avro.Schema
import org.apache.spark.sql.types.StructType

class SchemaConverterUtils {

  def converterSql(schema : Schema, sqlType : StructType) = {
    createConverterToSQL(schema, sqlType)
  }

}

之后,您应该能够转换数据,例如

val schema = .. // your schema
val sqlType = SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]
....
//inside html" target="_blank">foreach RDD
var genericRecordRDD = deserializeAvroData(rdd)
/// 
var converter = SchemaConverterUtils.converterSql(schema, sqlType)
... 
val rowRdd = genericRecordRDD.flatMap(record => {
        Try(converter(record).asInstanceOf[Row]).toOption
      })
//To DataFrame
 val df = sqlContext.createDataFrame(rowRdd, sqlType)
 类似资料:
  • RDD是以数组[数组[字符串]的格式创建的,具有以下值: 我想用模式创建一个数据帧: 接下来的步骤: 给出以下错误:

  • 有人能分享一下如何将转换为吗?

  • 我有一个如下的CSV文件。 我想把这个转化成下面。 基本上,我想在输出数据帧中创建一个名为idx的新列,该列将填充与键=idx,value=“n”后面的行相同的值“n”。

  • 我有地图的RDD,我想把它转换成数据帧,这里是RDD的输入格式 有没有办法转换成数据帧像 df.show

  • 在从< code>RDD制作< code >数据帧时,我遇到了一个错误。 我收到以下错误: py spark . SQL . utils . parse exception:u " \ nmis matched input ' '应为{'SELECT ',' FROM ',' ADD ',' AS ',' ALL ',' DISTINCT ',' WHERE ',' GROUP ',' BY ',

  • 我正在尝试将RDD转换为数据帧,但失败并出现错误: org.apache.spark.SparkException:由于阶段失败而中止作业:阶段2.0中的任务0失败4次,最近一次失败:阶段2.0中丢失任务0.3(TID 11,10.139.64.5,执行器0) 这是我的代码: