当前位置: 首页 > 知识库问答 >
问题:

在spark流媒体/结构化流媒体中阅读来自Kafka的avro消息

陆阳曜
2023-03-14

我第一次使用pyspark。Spark版本:2.3.0Kafka版本:2.2.0

我有一个Kafka制作人,它以avro格式发送嵌套数据,我正试图在pyspark中编写spark流/结构化流的代码,它将来自Kafka的avro反序列化为数据帧,并进行转换,将其以拼花格式写入s3。我在spark/scala中找到了avro转换器,但pyspark中的支持尚未添加。如何在pyspark中转换相同的值。谢谢

共有1个答案

胡昊
2023-03-14

正如您所提到的,阅读来自Kafka的Avro消息并通过pyspark进行解析,并没有针对相同内容的直接库。但我们可以通过编写小包装器来读取/解析Avro消息,并在pyspark流代码中将该函数作为UDF调用,如下所示。

参考:Pyspark 2.4.0,使用读取流从kafka读取avro-Python

注意:Avro是自Spark 2.4以来内置但外部的数据源模块。请按照“Apache Avro数据源指南”的部署部分部署应用程序。

参考资料:https://spark-test.github.io/pyspark-coverage-site/pyspark_sql_avro_functions_py.html

Spark提交:

[调整软件包版本以匹配基于spark/avro版本的安装]

/usr/hdp/2.6.1.0-129/spark2/bin/pyspark --packages org.apache.spark:spark-avro_2.11:2.4.3 --conf spark.ui.port=4064

Pyspark流代码:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.streaming import StreamingContext
from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.functions import col, struct
from pyspark.sql.functions import udf
import json
import csv
import time
import os

#  Spark Streaming context :

spark = SparkSession.builder.appName('streamingdata').getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 20)

#  Kafka Topic Details :

KAFKA_TOPIC_NAME_CONS = "topicname"
KAFKA_OUTPUT_TOPIC_NAME_CONS = "topic_to_hdfs"
KAFKA_BOOTSTRAP_SERVERS_CONS = 'localhost.com:9093'

#  Creating  readstream DataFrame :

df = spark.readStream \
     .format("kafka") \
     .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS_CONS) \
     .option("subscribe", KAFKA_TOPIC_NAME_CONS) \
     .option("startingOffsets", "latest") \
     .option("failOnDataLoss" ,"false")\
     .option("kafka.security.protocol","SASL_SSL")\
     .option("kafka.client.id" ,"MCI-CIL")\
     .option("kafka.sasl.kerberos.service.name","kafka")\
     .option("kafka.ssl.truststore.location", "/path/kafka_trust.jks") \
     .option("kafka.ssl.truststore.password", "changeit") \
     .option("kafka.sasl.kerberos.keytab","/path/bdpda.headless.keytab") \
     .option("kafka.sasl.kerberos.principal","bdpda") \
     .load()


df1 = df.selectExpr( "CAST(value AS STRING)")

df1.registerTempTable("test")


# Deserilzing the Avro code function

from pyspark.sql.column import Column, _to_java_column 
def from_avro(col): 
     jsonFormatSchema = """
                    {
                     "type": "record",
                     "name": "struct",
                     "fields": [
                       {"name": "col1", "type": "long"},
                       {"name": "col2", "type": "string"}
                                ]
                     }"""
    sc = SparkContext._active_spark_context 
    avro = sc._jvm.org.apache.spark.sql.avro
    f = getattr(getattr(avro, "package$"), "MODULE$").from_avro
    return Column(f(_to_java_column(col), jsonFormatSchema))


spark.udf.register("JsonformatterWithPython", from_avro)

squared_udf = udf(from_avro)
df1 = spark.table("test")
df2 = df1.select(squared_udf("value"))

#  Declaring the Readstream Schema DataFrame :

df2.coalesce(1).writeStream \
   .format("parquet") \
   .option("checkpointLocation","/path/chk31") \
   .outputMode("append") \
   .start("/path/stream/tgt31")


ssc.awaitTermination()
 类似资料:
  • 我设计了一个 Nifi 流,将以 Avro 格式序列化的 JSON 事件推送到 Kafka 主题中,然后我尝试在 Spark 结构化流式处理中使用它。 虽然Kafka部分工作正常,但Spark结构化流媒体无法读取Avro事件。它失败,错误如下。 火花代码 Spark中使用的模式 Kafka中的示例主题数据 以下是版本信息 感谢您的帮助。

  • 我有一个 spark 2.0 应用程序,它使用火花流(使用火花流-kafka-0-10_2.11)从 kafka 读取消息。 结构化流看起来很酷,所以我想尝试迁移代码,但我不知道如何使用它。 在常规流中,我使用kafkaUtils创建Dstrean,在我传递的参数中,它是值deserializer。 在结构化流中,文档说我应该使用DataFrame函数进行反序列化,但我不知道这到底是什么意思。 我

  • 我一直在用Scala 2.11阅读spark structured streaming(2.4.4)中Kafka的avro序列化消息。为此,我使用了spark avro(下面的dependency)。我使用合流Kafka库从python生成Kafka消息。Spark streaming能够使用模式来使用消息,但无法正确读取字段的值。我准备了一个简单的例子来说明这个问题,代码在这里可用:https:

  • 我以前能够运行Kafka结构流编程。但是突然间,我所有的结构流python程序都失败了,出现了一个错误。我从Spark网站上拿了基本的Kafka结构流式编程,也以同样的错误失败。 spark-submit--packages org.apache.spark:spark-sql-kafka-0-102.11:2.2.0c:\users\ranjith.gangam\pycharmprojects\

  • 问题内容: 我正在使用Maven 我添加了以下依赖项 我还在代码中添加了jar 它完全可以正常工作,没有任何错误,在通过spark-submit提交时出现以下错误,非常感谢您的帮助。谢谢你的时间。 线程“主要” java.lang.NoClassDefFoundError中的异常:sun.reflect处的KafkaSparkStreaming.sparkStreamingTest(KafkaSp