我有一个 spark 2.0 应用程序,它使用火花流(使用火花流-kafka-0-10_2.11)从 kafka 读取消息。
结构化流看起来很酷,所以我想尝试迁移代码,但我不知道如何使用它。
在常规流中,我使用kafkaUtils创建Dstrean,在我传递的参数中,它是值deserializer。
在结构化流中,文档说我应该使用DataFrame函数进行反序列化,但我不知道这到底是什么意思。
我看了一些例子,比如这个例子,但是我在Kafka中的Avro对象非常复杂,不能像例子中的字符串那样简单地转换..
到目前为止,我尝试了这种代码(我在另一个问题中看到):
import spark.implicits._
val ds1 = spark.readStream.format("kafka").
option("kafka.bootstrap.servers","localhost:9092").
option("subscribe","RED-test-tal4").load()
ds1.printSchema()
ds1.select("value").printSchema()
val ds2 = ds1.select($"value".cast(getDfSchemaFromAvroSchema(Obj.getClassSchema))).show()
val query = ds2.writeStream
.outputMode("append")
.format("console")
.start()
我得到“数据类型不匹配:无法将BinaryType转换为StructType(StructField(..”
如何反序列化值?
事实上,我的公司里有人帮我解决了这个问题,所以我会在这里发布给未来的读者。。
基本上,在miguno建议的基础上,我错过了解码部分:
def decodeMessages(iter: Iterator[KafkaMessage], schemaRegistryUrl: String) : Iterator[<YourObject>] = {
val decoder = AvroTo<YourObject>Decoder.getDecoder(schemaRegistryUrl)
iter.map(message => {
val record = decoder.fromBytes(message.value).asInstanceOf[GenericData.Record]
val field1 = record.get("field1Name").asInstanceOf[GenericData.Record]
val field2 = record.get("field1Name").asInstanceOf[GenericData.String]
...
//create an object with the fields extracted from genericRecord
})
}
现在你可以阅读Kafka的信息,并像这样解码:
val ds = spark
.readStream
.format(config.getString(ConfigUtil.inputFormat))
.option("kafka.bootstrap.servers", config.getString(ConfigUtil.kafkaBootstrapServers))
.option("subscribe", config.getString(ConfigUtil.subscribeTopic))
.load()
.as[KafkaMessage]
val decodedDs = ds.mapPartitions(decodeMessages(_, schemaRegistryUrl))
*Kafka消息
只是一个case类,它包含从Kafka读取时获得的通用对象(键、值、主题、分区、偏移量、时间戳)。
<代码>AvroTo
例如,使用Confluent的<code>Kafkaavroderializer<code>和模式注册表。
val kafkaProps = Map("schema.registry.url" -> schemaRegistryUrl)
val client = new CachedSchemaRegistryClient(schemaRegistryUrl, 20)
// If you have Avro encoded keys
val keyDeserializer = new KafkaAvroDeserializer(client)
keyDeserializer.configure(kafkaProps.asJava, true) //isKey = true
// Avro encoded values
valueDeserializer = new KafkaAvroDeserializer(client)
valueDeserializer.configure(kafkaProps.asJava, false) //isKey = false
从中调用<code>。反序列化(主题名称,字节)。asInstanceOf[GenericRecord]以获取avro对象。
希望这能帮到某人
我还不太熟悉Spark的序列化如何与新的/实验性的结构化流结合使用,但下面的方法确实有效——尽管我不确定这是不是最好的方法(我觉得这种方法看起来有点笨拙)。
我将尝试以自定义数据类型的示例(此处:Foo
case类)而不是专门的Avro来回答您的问题,但我希望它无论如何都会对您有所帮助。这个想法是使用Kryo序列化来序列化/反序列化您的自定义类型,请参阅调整:Spark留档中的数据序列化。
注意:Spark通过内置(隐式)编码器支持case类的序列化,您可以通过< code > import Spark . implicits . _ 导入这些编码器。但是为了这个例子,让我们忽略这个功能。
假设您已将以下<code>Foo</code>case类定义为自定义类型(TL;DR提示:为了防止遇到奇怪的Spark序列化投诉/错误,您应该将代码放入单独的<code>Foo.scala</code>文件中):
// This could also be your auto-generated Avro class/type
case class Foo(s: String)
现在您有了以下结构化流代码来从Kafka读取数据,其中输入主题包含消息值为二进制编码String
的Kafka消息,您的目标是创建Foo
实例基于这些消息值(即类似于您将二进制数据反序列化为Avro类的实例的方式):
val messages: DataFrame = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092")
.option("subscribe", "my-input-topic")
.load()
现在我们正在将值反序列化为自定义Foo
类型的实例,为此我们首先需要定义一个隐式Encoder[Foo]
:
implicit val myFooEncoder: Encoder[Foo] = org.apache.spark.sql.Encoders.kryo[Foo]
val foos: Dataset[Foo] = messages.map(row => Foo(new String(row.getAs[Array[Byte]]("value")))
回到你的Avro问题,你需要做的是:
编码器
。Foo(新字符串(row.getAs[数组[字节]](“值”))
替换为代码,以将二进制编码的 Avro 数据反序列化为 Avro POJO,即从消息值(row.getAs[数组[字节]](“值”))
中获取二进制编码的 Avro 数据并返回 Avro 通用记录
或您在其他地方定义的任何特定自定义对象
。如果有人知道更简洁/更好/…回答塔尔问题的方法,我洗耳恭听。:-)
另请参阅:
如上所述,从Spark 2.1.0开始,批处理阅读器支持avro,但SparkSession.readStream()不支持avro。下面是我如何根据其他响应在Scala中工作的。为了简洁,我简化了模式。
package com.sevone.sparkscala.mypackage
import org.apache.spark.sql._
import org.apache.avro.io.DecoderFactory
import org.apache.avro.Schema
import org.apache.avro.generic.{GenericDatumReader, GenericRecord}
object MyMain {
// Create avro schema and reader
case class KafkaMessage (
deviceId: Int,
deviceName: String
)
val schemaString = """{
"fields": [
{ "name": "deviceId", "type": "int"},
{ "name": "deviceName", "type": "string"},
],
"name": "kafkamsg",
"type": "record"
}"""
val messageSchema = new Schema.Parser().parse(schemaString)
val reader = new GenericDatumReader[GenericRecord](messageSchema)
// Factory to deserialize binary avro data
val avroDecoderFactory = DecoderFactory.get()
// Register implicit encoder for map operation
implicit val encoder: Encoder[GenericRecord] = org.apache.spark.sql.Encoders.kryo[GenericRecord]
def main(args: Array[String]) {
val KafkaBroker = args(0);
val InTopic = args(1);
val OutTopic = args(2);
// Get Spark session
val session = SparkSession
.builder
.master("local[*]")
.appName("myapp")
.getOrCreate()
// Load streaming data
import session.implicits._
val data = session
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", KafkaBroker)
.option("subscribe", InTopic)
.load()
.select($"value".as[Array[Byte]])
.map(d => {
val rec = reader.read(null, avroDecoderFactory.binaryDecoder(d, null))
val deviceId = rec.get("deviceId").asInstanceOf[Int]
val deviceName = rec.get("deviceName").asInstanceOf[org.apache.avro.util.Utf8].toString
new KafkaMessage(deviceId, deviceName)
})
我一直在用Scala 2.11阅读spark structured streaming(2.4.4)中Kafka的avro序列化消息。为此,我使用了spark avro(下面的dependency)。我使用合流Kafka库从python生成Kafka消息。Spark streaming能够使用模式来使用消息,但无法正确读取字段的值。我准备了一个简单的例子来说明这个问题,代码在这里可用:https:
我第一次使用pyspark。Spark版本:2.3.0Kafka版本:2.2.0 我有一个Kafka制作人,它以avro格式发送嵌套数据,我正试图在pyspark中编写spark流/结构化流的代码,它将来自Kafka的avro反序列化为数据帧,并进行转换,将其以拼花格式写入s3。我在spark/scala中找到了avro转换器,但pyspark中的支持尚未添加。如何在pyspark中转换相同的值。
我以前能够运行Kafka结构流编程。但是突然间,我所有的结构流python程序都失败了,出现了一个错误。我从Spark网站上拿了基本的Kafka结构流式编程,也以同样的错误失败。 spark-submit--packages org.apache.spark:spark-sql-kafka-0-102.11:2.2.0c:\users\ranjith.gangam\pycharmprojects\
我设计了一个 Nifi 流,将以 Avro 格式序列化的 JSON 事件推送到 Kafka 主题中,然后我尝试在 Spark 结构化流式处理中使用它。 虽然Kafka部分工作正常,但Spark结构化流媒体无法读取Avro事件。它失败,错误如下。 火花代码 Spark中使用的模式 Kafka中的示例主题数据 以下是版本信息 感谢您的帮助。
问题内容: 我正在尝试使用PySpark 2.4.0从Kafka读取avro消息。 spark-avro外部模块可以为读取avro文件提供以下解决方案: 但是,我需要阅读流式Avro消息。库文档建议使用 from_avro() 函数,该函数仅适用于Scala和Java。 是否有其他模块支持读取从Kafka流式传输的Avro消息? 问题答案: 您可以包括spark-avro软件包,例如使用(调整版本