当前位置: 首页 > 面试题库 >

Pyspark 2.4.0,使用读取流从kafka读取avro-Python

贡威
2023-03-14
问题内容

我正在尝试使用PySpark 2.4.0从Kafka读取avro消息。

spark-avro外部模块可以为读取avro文件提供以下解决方案:

df = spark.read.format("avro").load("examples/src/main/resources/users.avro") 
df.select("name", "favorite_color").write.format("avro").save("namesAndFavColors.avro")

但是,我需要阅读流式Avro消息。库文档建议使用 from_avro() 函数,该函数仅适用于Scala和Java。

是否有其他模块支持读取从Kafka流式传输的Avro消息?


问题答案:

您可以包括spark-avro软件包,例如使用--packages(调整版本以匹配spark安装):

bin/pyspark --packages org.apache.spark:spark-avro_2.11:2.4.0

并提供您自己的包装器:

from pyspark.sql.column import Column, _to_java_column

def from_avro(col, jsonFormatSchema): 
    sc = SparkContext._active_spark_context 
    avro = sc._jvm.org.apache.spark.sql.avro
    f = getattr(getattr(avro, "package$"), "MODULE$").from_avro
    return Column(f(_to_java_column(col), jsonFormatSchema))


def to_avro(col): 
    sc = SparkContext._active_spark_context 
    avro = sc._jvm.org.apache.spark.sql.avro
    f = getattr(getattr(avro, "package$"), "MODULE$").to_avro
    return Column(f(_to_java_column(col)))

用法示例(从官方测试套件中采用):

from pyspark.sql.functions import col, struct


avro_type_struct = """
{
  "type": "record",
  "name": "struct",
  "fields": [
    {"name": "col1", "type": "long"},
    {"name": "col2", "type": "string"}
  ]
}"""


df = spark.range(10).select(struct(
    col("id"),
    col("id").cast("string").alias("id2")
).alias("struct"))
avro_struct_df = df.select(to_avro(col("struct")).alias("avro"))
avro_struct_df.show(3)



+----------+
|      avro|
+----------+
|[00 02 30]|
|[02 02 31]|
|[04 02 32]|
+----------+
only showing top 3 rows



avro_struct_df.select(from_avro("avro", avro_type_struct)).show(3)



+------------------------------------------------+
|from_avro(avro, struct<col1:bigint,col2:string>)|
+------------------------------------------------+
|                                          [0, 0]|
|                                          [1, 1]|
|                                          [2, 2]|
+------------------------------------------------+
only showing top 3 rows


 类似资料:
  • 我有一个 spark 2.0 应用程序,它使用火花流(使用火花流-kafka-0-10_2.11)从 kafka 读取消息。 结构化流看起来很酷,所以我想尝试迁移代码,但我不知道如何使用它。 在常规流中,我使用kafkaUtils创建Dstrean,在我传递的参数中,它是值deserializer。 在结构化流中,文档说我应该使用DataFrame函数进行反序列化,但我不知道这到底是什么意思。 我

  • 我正在迁移一个Kafka Streams实现,它使用纯Kafka apis来使用sping-kafka,因为它被合并在sping-引导应用程序中。 一切都很好Stream,GlobalKtable,分支,我所有的工作都非常好,但我很难合并ReadOnlyKeyValueStore。基于这里的sping-kafka留档:https://docs.spring.io/spring-kafka/docs

  • 有没有解决这个问题的方法???我无法读取KAFKA-AVRO架构消息。我正在尝试将消息从logstash发送到KAFKA到hdfs。 以下是技术堆栈: LogStash 2.3-当前生产版本 汇流3.0。 插件:A。Logstash-kafka-Output插件B。logstash-codec-avro。 动物园管理员:3.4.6 Kafka:0.10.0.0 Logstash配置文件如下所示:

  • 我正在尝试使用Apache Avro数据源指南中描述的< code>spark-avro包。 当我提交以下命令时: 我收到一个错误: 我已经尝试了不同版本的包(2.4.0、2.4.1和2.4.2),我目前使用Spark 2.4.1,但都不工作。 我使用以下命令启动spark shell:

  • Spark是否有任何最佳实践来处理在Avro中使用模式注册表序列化的kafka流?尤其是对于Spark结构化流? 我在https://github.com/ScalaConsultants/spark-kafka-avro/blob/master/src/main/scala/io/scalac/spark/AvroConsumer.scala找到了一个例子。但是我无法加载类。我在mvnrepos

  • 我一直在用Scala 2.11阅读spark structured streaming(2.4.4)中Kafka的avro序列化消息。为此,我使用了spark avro(下面的dependency)。我使用合流Kafka库从python生成Kafka消息。Spark streaming能够使用模式来使用消息,但无法正确读取字段的值。我准备了一个简单的例子来说明这个问题,代码在这里可用:https: