当前位置: 首页 > 知识库问答 >
问题:

使用架构将带有Spark的AVRO消息转换为DataFrame

刘海
2023-03-14
{
  "fields": [
    { "name": "firstName", "type": "string" },
    { "name": "lastName", "type": "string" }
  ],
  "name": "user",
  "type": "record"
}
object Injection {
  val parser = new Schema.Parser()
  val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
  val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
}

...

messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
  val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
  import sqlContext.implicits._

  val df = rdd.map(message => Injection.injection.invert(message._2).get)
    .map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()

  df.show()
})

case class User(firstName: String, lastName: String)

不知何故,除了使用case类将AVRO消息转换为DataFrame之外,我找不到其他方法。是否有可能改用模式?我使用的是Spark 1.6.2Kafka 0.10

完整的代码,如果你感兴趣的话。

import com.twitter.bijection.Injection
import com.twitter.bijection.avro.GenericAvroCodecs
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.{SparkConf, SparkContext}

object ReadMessagesFromKafka {
  object Injection {
    val parser = new Schema.Parser()
    val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
    val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
  }

  def main(args: Array[String]) {
    val brokers = "127.0.0.1:9092"
    val topics = "test"

    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("ReadMessagesFromKafka").setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, Seconds(2))

    // Create direct kafka stream with brokers and topics
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
  ssc, kafkaParams, topicsSet)

    messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
      val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
      import sqlContext.implicits._

      val df = rdd.map(message => Injection.injection.invert(message._2).get)
    .map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()

      df.show()
    })

    // Start the computation
    ssc.start()
    ssc.awaitTermination()
  }
}

/** Case class for converting RDD to DataFrame */
case class User(firstName: String, lastName: String)

/** Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {
  @transient  private var instance: SQLContext = _

  def getInstance(sparkContext: SparkContext): SQLContext = {
    if (instance == null) {
      instance = new SQLContext(sparkContext)
    }
    instance
  }
}

共有1个答案

傅朝
2023-03-14

OP可能解决了这个问题,但为了将来的参考,我很普遍地解决了这个问题,所以我认为在这里发帖可能会有帮助。

因此,一般来说,您应该将Avro模式转换为spark StructType,并将RDD中的对象转换为row[Any],然后使用:

spark.createDataFrame(<RDD[obj] mapped to RDD[Row}>,<schema as StructType>

为了转换Avro模式,我使用了spark-avro,如下所示:

SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
rdd.map(obj=>{
    val seq = (obj.getName(),obj.getAge()
    Row.fromSeq(seq))
    })

OP提出的方法也适用于某些类型的对象,但很难适用于复杂对象(不是原语或类)

另一个技巧是,如果在类中有一个类,则应该将该类转换为行,这样包装类就可以转换为以下内容:

Row(Any,Any,Any,Row,...)

您还可以查看我前面提到的如何将对象转换为行的spark-avro项目。我自己也用了一些逻辑

 类似资料:
  • 这实际上与我之前的问题相同,但使用Avro而不是JSON作为数据格式。 我正在使用一个Spark数据框架,它可以从几个不同的模式版本之一加载数据: 我正在使用Spark Avro加载数据。 它可能是版本一文件或版本二文件。但是我希望能够以相同的方式处理它,将未知值设置为“null”。我之前的问题中的建议是设置模式,但是我不想重复自己在文件中编写模式,也不想重复自己在和朋友中编写模式。如何将avro

  • 因此,我们计划使用Avro在融合的Kafka生态系统上进行交流。我目前对Avro的理解是,每条消息都有自己的模式。如果是这样的话,我们需要模式注册表来解决版本更新吗? 我问,因为在每条消息中携带模式可以防止需要像模式注册表这样的东西来将消息ID映射到模式。还是我在这里错过了什么?

  • 有没有解决这个问题的方法???我无法读取KAFKA-AVRO架构消息。我正在尝试将消息从logstash发送到KAFKA到hdfs。 以下是技术堆栈: LogStash 2.3-当前生产版本 汇流3.0。 插件:A。Logstash-kafka-Output插件B。logstash-codec-avro。 动物园管理员:3.4.6 Kafka:0.10.0.0 Logstash配置文件如下所示:

  • 我有一个问题,源发送GenericMessage[payload=xxxxx,...]而接收器接收消息作为10,120,120,120,120,120。 这个问题发生在我设置Avro消息转换器之后。如果我删除Avro消息转换器并使用StreamListener来处理消息转换,它会正常工作。 源应用程序。属性 水槽应用 消息转换器 应用程序类 我是否缺少配置?谢谢

  • 我正在尝试将单个输入消息转换为多个消息。我有一个带有以下签名的方法: 类类似于: 对于中的每个,我想创建一个的实例。我如何做到这一点并处理

  • 我想将xml文件转换为avro。数据将采用xml格式,并将首先触及Kafka主题。然后,我可以使用flume或spark streaming来摄取xml并将其转换为avro,然后将文件放在hdfs中。我有一个cloudera环境。 当avro文件到达hdfs时,我希望能够稍后将它们读入hive表。 我想知道做这件事最好的方法是什么?我尝试过自动模式转换,比如spark avro(这没有spark流