当前位置: 首页 > 知识库问答 >
问题:

使用Spark 2.0.2(结构化流媒体)从Kafka读取Avro消息

诸俊才
2023-03-14

我有一个 spark 2.0 应用程序,它使用火花流(使用火花流-kafka-0-10_2.11)从 kafka 读取消息。

结构化流看起来很酷,所以我想尝试迁移代码,但我不知道如何使用它。

在常规流中,我使用kafkaUtils创建Dstrean,在我传递的参数中,它是值deserializer。

在结构化流中,文档说我应该使用DataFrame函数进行反序列化,但我不知道这到底是什么意思。

我看了一些例子,比如这个例子,但是我在Kafka中的Avro对象非常复杂,不能像例子中的字符串那样简单地转换..

到目前为止,我尝试了这种代码(我在另一个问题中看到):

import spark.implicits._

  val ds1 = spark.readStream.format("kafka").
    option("kafka.bootstrap.servers","localhost:9092").
    option("subscribe","RED-test-tal4").load()

  ds1.printSchema()
  ds1.select("value").printSchema()
  val ds2 = ds1.select($"value".cast(getDfSchemaFromAvroSchema(Obj.getClassSchema))).show()  
  val query = ds2.writeStream
    .outputMode("append")
    .format("console")
    .start()

我得到“数据类型不匹配:无法将BinaryType转换为StructType(StructField(..”

如何反序列化值?

共有3个答案

澹台华采
2023-03-14

事实上,我的公司里有人帮我解决了这个问题,所以我会在这里发布给未来的读者。。

基本上,在miguno建议的基础上,我错过了解码部分:

def decodeMessages(iter: Iterator[KafkaMessage], schemaRegistryUrl: String) : Iterator[<YourObject>] = {
val decoder = AvroTo<YourObject>Decoder.getDecoder(schemaRegistryUrl)
iter.map(message => {
  val record = decoder.fromBytes(message.value).asInstanceOf[GenericData.Record]
  val field1 = record.get("field1Name").asInstanceOf[GenericData.Record]
  val field2 = record.get("field1Name").asInstanceOf[GenericData.String]
        ...
  //create an object with the fields extracted from genericRecord
  })
}

现在你可以阅读Kafka的信息,并像这样解码:

val ds = spark
  .readStream
  .format(config.getString(ConfigUtil.inputFormat))
  .option("kafka.bootstrap.servers", config.getString(ConfigUtil.kafkaBootstrapServers))
  .option("subscribe", config.getString(ConfigUtil.subscribeTopic))
  .load()
  .as[KafkaMessage]

val decodedDs  = ds.mapPartitions(decodeMessages(_, schemaRegistryUrl))

*Kafka消息只是一个case类,它包含从Kafka读取时获得的通用对象(键、值、主题、分区、偏移量、时间戳)。

<代码>AvroTo

例如,使用Confluent的<code>Kafkaavroderializer<code>和模式注册表。

val kafkaProps = Map("schema.registry.url" -> schemaRegistryUrl)
val client = new CachedSchemaRegistryClient(schemaRegistryUrl, 20)

// If you have Avro encoded keys
val keyDeserializer = new KafkaAvroDeserializer(client)
keyDeserializer.configure(kafkaProps.asJava, true) //isKey = true

// Avro encoded values
valueDeserializer = new KafkaAvroDeserializer(client)
valueDeserializer.configure(kafkaProps.asJava, false) //isKey = false

从中调用<code>。反序列化(主题名称,字节)。asInstanceOf[GenericRecord]以获取avro对象。

希望这能帮到某人

濮君植
2023-03-14

我还不太熟悉Spark的序列化如何与新的/实验性的结构化流结合使用,但下面的方法确实有效——尽管我不确定这是不是最好的方法(我觉得这种方法看起来有点笨拙)。

我将尝试以自定义数据类型的示例(此处:Foocase类)而不是专门的Avro来回答您的问题,但我希望它无论如何都会对您有所帮助。这个想法是使用Kryo序列化来序列化/反序列化您的自定义类型,请参阅调整:Spark留档中的数据序列化。

注意:Spark通过内置(隐式)编码器支持case类的序列化,您可以通过< code > import Spark . implicits . _ 导入这些编码器。但是为了这个例子,让我们忽略这个功能。

假设您已将以下<code>Foo</code>case类定义为自定义类型(TL;DR提示:为了防止遇到奇怪的Spark序列化投诉/错误,您应该将代码放入单独的<code>Foo.scala</code>文件中):

// This could also be your auto-generated Avro class/type
case class Foo(s: String)

现在您有了以下结构化流代码来从Kafka读取数据,其中输入主题包含消息值为二进制编码String的Kafka消息,您的目标是创建Foo实例基于这些消息值(即类似于您将二进制数据反序列化为Avro类的实例的方式):

val messages: DataFrame = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
    .option("subscribe", "my-input-topic")
    .load()

现在我们正在将值反序列化为自定义Foo类型的实例,为此我们首先需要定义一个隐式Encoder[Foo]

implicit val myFooEncoder: Encoder[Foo] = org.apache.spark.sql.Encoders.kryo[Foo]
val foos: Dataset[Foo] = messages.map(row => Foo(new String(row.getAs[Array[Byte]]("value")))

回到你的Avro问题,你需要做的是:

  1. 根据您的需求创建合适的编码器
  2. Foo(新字符串(row.getAs[数组[字节]](“值”))替换为代码,以将二进制编码的 Avro 数据反序列化为 Avro POJO,即从消息值(row.getAs[数组[字节]](“值”))中获取二进制编码的 Avro 数据并返回 Avro 通用记录或您在其他地方定义的任何特定自定义对象

如果有人知道更简洁/更好/…回答塔尔问题的方法,我洗耳恭听。:-)

另请参阅:

  • 如何在数据集中存储自定义对象?
  • 尝试将数据框行映射到更新的行时出现编码器错误
周翰池
2023-03-14

如上所述,从Spark 2.1.0开始,批处理阅读器支持avro,但SparkSession.readStream()不支持avro。下面是我如何根据其他响应在Scala中工作的。为了简洁,我简化了模式。

package com.sevone.sparkscala.mypackage

import org.apache.spark.sql._
import org.apache.avro.io.DecoderFactory
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}

object MyMain {

    // Create avro schema and reader
    case class KafkaMessage (
        deviceId: Int,
        deviceName: String
    )
    val schemaString = """{
        "fields": [
            { "name":  "deviceId",      "type": "int"},
            { "name":  "deviceName",    "type": "string"},
        ],
        "name": "kafkamsg",
        "type": "record"
    }"""
    val messageSchema = new Schema.Parser().parse(schemaString)
    val reader = new GenericDatumReader[GenericRecord](messageSchema)
    // Factory to deserialize binary avro data
    val avroDecoderFactory = DecoderFactory.get()
    // Register implicit encoder for map operation
    implicit val encoder: Encoder[GenericRecord] = org.apache.spark.sql.Encoders.kryo[GenericRecord]

    def main(args: Array[String]) {

        val KafkaBroker =  args(0);
        val InTopic = args(1);
        val OutTopic = args(2);

        // Get Spark session
        val session = SparkSession
                .builder
                .master("local[*]")
                .appName("myapp")
                .getOrCreate()

        // Load streaming data
        import session.implicits._
        val data = session
                .readStream
                .format("kafka")
                .option("kafka.bootstrap.servers", KafkaBroker)
                .option("subscribe", InTopic)
                .load()
                .select($"value".as[Array[Byte]])
                .map(d => {
                    val rec = reader.read(null, avroDecoderFactory.binaryDecoder(d, null))
                    val deviceId = rec.get("deviceId").asInstanceOf[Int]
                    val deviceName = rec.get("deviceName").asInstanceOf[org.apache.avro.util.Utf8].toString
                    new KafkaMessage(deviceId, deviceName)
                })
 类似资料:
  • 我一直在用Scala 2.11阅读spark structured streaming(2.4.4)中Kafka的avro序列化消息。为此,我使用了spark avro(下面的dependency)。我使用合流Kafka库从python生成Kafka消息。Spark streaming能够使用模式来使用消息,但无法正确读取字段的值。我准备了一个简单的例子来说明这个问题,代码在这里可用:https:

  • 我第一次使用pyspark。Spark版本:2.3.0Kafka版本:2.2.0 我有一个Kafka制作人,它以avro格式发送嵌套数据,我正试图在pyspark中编写spark流/结构化流的代码,它将来自Kafka的avro反序列化为数据帧,并进行转换,将其以拼花格式写入s3。我在spark/scala中找到了avro转换器,但pyspark中的支持尚未添加。如何在pyspark中转换相同的值。

  • 我以前能够运行Kafka结构流编程。但是突然间,我所有的结构流python程序都失败了,出现了一个错误。我从Spark网站上拿了基本的Kafka结构流式编程,也以同样的错误失败。 spark-submit--packages org.apache.spark:spark-sql-kafka-0-102.11:2.2.0c:\users\ranjith.gangam\pycharmprojects\

  • 我设计了一个 Nifi 流,将以 Avro 格式序列化的 JSON 事件推送到 Kafka 主题中,然后我尝试在 Spark 结构化流式处理中使用它。 虽然Kafka部分工作正常,但Spark结构化流媒体无法读取Avro事件。它失败,错误如下。 火花代码 Spark中使用的模式 Kafka中的示例主题数据 以下是版本信息 感谢您的帮助。

  • 问题内容: 我正在尝试使用PySpark 2.4.0从Kafka读取avro消息。 spark-avro外部模块可以为读取avro文件提供以下解决方案: 但是,我需要阅读流式Avro消息。库文档建议使用 from_avro() 函数,该函数仅适用于Scala和Java。 是否有其他模块支持读取从Kafka流式传输的Avro消息? 问题答案: 您可以包括spark-avro软件包,例如使用(调整版本