当前位置: 首页 > 知识库问答 >
问题:

Scala:从spark结构化流媒体读取Kafka Avro消息时出错

宰父志新
2023-03-14

我一直在用Scala 2.11阅读spark structured streaming(2.4.4)中Kafka的avro序列化消息。为此,我使用了spark avro(下面的dependency)。我使用合流Kafka库从python生成Kafka消息。Spark streaming能够使用模式来使用消息,但无法正确读取字段的值。我准备了一个简单的例子来说明这个问题,代码在这里可用:https://github.com/anigmo97/SimpleExamples/tree/master/Spark_streaming_kafka_avro_scala

我用python创建记录,记录的模式是:

{
    "type": "record",
    "namespace": "example",
    "name": "RawRecord",
    "fields": [
        {"name": "int_field","type": "int"},
        {"name": "string_field","type": "string"}
    ]
}

它们是这样产生的:

from time import sleep
from confluent_kafka.avro import AvroProducer, load, loads

def generate_records():
    avro_producer_settings = {
        'bootstrap.servers': "localhost:19092",
        'group.id': 'groupid',
        'schema.registry.url': "http://127.0.0.1:8081"
    }
    producer = AvroProducer(avro_producer_settings)
    key_schema = loads('"string"')
    value_schema = load("schema.avsc")
    i = 1
    while True:
        row = {"int_field": int(i), "string_field": str(i)}
        producer.produce(topic="avro_topic", key="key-{}".format(i), 
                         value=row, key_schema=key_schema, value_schema=value_schema)
        print(row)
        sleep(1)
        i+=1

spark structured streaming(在Scala中)的消耗如下:

import org.apache.spark.sql.{ Dataset, Row}
import org.apache.spark.sql.streaming.{ OutputMode, StreamingQuery}
import org.apache.spark.sql.avro._
...
        try {

            log.info("----- reading schema")
            val jsonFormatSchema = new String(Files.readAllBytes(
                                                    Paths.get("./src/main/resources/schema.avsc")))

            val ds:Dataset[Row] = sparkSession
                .readStream
                .format("kafka")
                .option("kafka.bootstrap.servers", kafkaServers)
                .option("subscribe", topic)
                .load()

            val output:Dataset[Row] = ds
                .select(from_avro(ds.col("value"), jsonFormatSchema) as "record")
                .select("record.*")

            output.printSchema()

            var query: StreamingQuery = output.writeStream.format("console")
                .option("truncate", "false").outputMode(OutputMode.Append()).start();


            query.awaitTermination();

        } catch {
            case e: Exception => log.error("onApplicationEvent error: ", e)
            //case _: Throwable => log.error("onApplicationEvent error:")
        }
...

在spark中打印模式时,奇怪的是字段可以为空,尽管avro模式不允许这样做。Spark展示了这一点:

root
 |-- int_field: integer (nullable = true)
 |-- string_field: string (nullable = true)

我已经用python检查了另一个消费者的消息,这些消息很好,但与spark显示的消息内容无关。

+---------+------------+
|int_field|string_field|
+---------+------------+
|0        |            |
+---------+------------+

使用的主要依赖项有:

<properties>
    <spark.version>2.4.4</spark.version>
    <scala.version>2.11</scala.version>
</properties>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_${scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_${scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-avro_${scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_${scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>

有人知道为什么会这样吗?

提前感谢。重现错误的代码在这里:

https://github.com/anigmo97/SimpleExamples/tree/master/Spark_streaming_kafka_avro_scala

问题是,我在使用python中的confluent_kafka库,在使用spark avro库的spark结构化流媒体中阅读avro消息。

Confluent_kafka库使用合流的avro格式,并使用标准avro格式激发avro读取。

不同之处在于,为了使用模式注册表,conFluent avro在消息前面加上四个字节,指示应该使用哪个模式。

资料来源:https://www.confluent.io/blog/kafka-connect-tutorial-transfer-avro-schemas-across-schema-registry-clusters/

为了能够使用confluent avro并从spark结构化流媒体中读取它,我为Abris替换了spark avro库(Abris允许将avro和confluent avro与spark集成)。https://github.com/AbsaOSS/ABRiS

共有1个答案

闾丘德业
2023-03-14

问题是,我在使用python中的confluent_kafka库,在使用spark avro库的spark结构化流媒体中阅读avro消息。

Confluent_kafka库使用合流的avro格式,并使用标准avro格式激发avro读取。

不同之处在于,为了使用模式注册表,conFluent avro在消息前面加上四个字节,指示应该使用哪个模式。

资料来源:https://www.confluent.io/blog/kafka-connect-tutorial-transfer-avro-schemas-across-schema-registry-clusters/

为了能够使用confluent avro并从spark结构化流媒体中读取它,我为Abris替换了spark avro库(Abris允许将avro和confluent avro与spark集成)。https://github.com/AbsaOSS/ABRiS

我的依赖关系发生了如下变化:

<properties>
        <spark.version>2.4.4</spark.version>
        <scala.version>2.11</scala.version>
</properties>
<!-- SPARK- AVRO -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-avro_${scala.version}</artifactId>
    <version>${spark.version}</version>
</dependency>
<!-- SPARK -AVRO AND CONFLUENT-AVRO -->
<dependency>
    <groupId>za.co.absa</groupId>
    <artifactId>abris_2.11</artifactId>
    <version>3.1.1</version>
</dependency>

在这里,您可以看到一个简单的示例,该示例获取消息并将其值反序列化为avro和汇合avro。

var input: Dataset[Row] = sparkSession.readStream
    //.format("org.apache.spark.sql.kafka010.KafkaSourceProvider")
    .format("kafka")
    .option("kafka.bootstrap.servers", kafkaServers)
    .option("subscribe", topicConsumer)
    .option("failOnDataLoss", "false")
    // .option("startingOffsets", "latest")
    // .option("startingOffsets", "earliest")
    .load();


// READ WITH spark-avro library (standard avro)

val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("./src/main/resources/schema.avsc")))

var inputAvroDeserialized: Dataset[Row] = input
    .select(from_avro(functions.col("value"), jsonFormatSchema) as "record")
    .select("record.*")

//READ WITH Abris library (confuent avro) 

val schemaRegistryConfig = Map(
    SchemaManager.PARAM_SCHEMA_REGISTRY_URL -> "http://localhost:8081",
    SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC -> topicConsumer,
    SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY -> SchemaManager.SchemaStorageNamingStrategies.TOPIC_NAME, // choose a subject name strategy
    SchemaManager.PARAM_VALUE_SCHEMA_ID -> "latest" // set to "latest" if you want the latest schema version to used
)

var inputConfluentAvroDeserialized: Dataset[Row] = inputConfluentAvroSerialized
    .select(from_confluent_avro(functions.col("value"), schemaRegistryConfig) as "record")
    .select("record.*")
 类似资料:
  • 我有一个 spark 2.0 应用程序,它使用火花流(使用火花流-kafka-0-10_2.11)从 kafka 读取消息。 结构化流看起来很酷,所以我想尝试迁移代码,但我不知道如何使用它。 在常规流中,我使用kafkaUtils创建Dstrean,在我传递的参数中,它是值deserializer。 在结构化流中,文档说我应该使用DataFrame函数进行反序列化,但我不知道这到底是什么意思。 我

  • 我第一次使用pyspark。Spark版本:2.3.0Kafka版本:2.2.0 我有一个Kafka制作人,它以avro格式发送嵌套数据,我正试图在pyspark中编写spark流/结构化流的代码,它将来自Kafka的avro反序列化为数据帧,并进行转换,将其以拼花格式写入s3。我在spark/scala中找到了avro转换器,但pyspark中的支持尚未添加。如何在pyspark中转换相同的值。

  • 我以前能够运行Kafka结构流编程。但是突然间,我所有的结构流python程序都失败了,出现了一个错误。我从Spark网站上拿了基本的Kafka结构流式编程,也以同样的错误失败。 spark-submit--packages org.apache.spark:spark-sql-kafka-0-102.11:2.2.0c:\users\ranjith.gangam\pycharmprojects\

  • 我试图从kafka主题获取数据并将其推送到hdfs位置。我面临以下问题。 在每条消息(kafka)之后,hdfs位置都会更新为带有.c000.csv格式的部分文件。我已经在HDFS位置的顶部创建了一个hive表,但是HIVE无法读取从火花结构化流写入的任何数据。 以下是spark结构化流媒体之后的文件格式 以下是我要插入的代码: 谁能帮帮我,为什么要创建这样的文件? 如果我执行dfs-cat/pa