当前位置: 首页 > 知识库问答 >
问题:

使用pyspark从apache kafka反序列化avro时为空列

暨高洁
2023-03-14

我正在用Kafka、星火和朱皮特笔记本做概念验证,我遇到了一个奇怪的问题。我正在试着阅读从Kafka到Pyspark的Avro记录。我正在使用汇合模式注册表获取模式以反序列化avro消息。反序列化spark dataframe中的avro消息后,结果列为空,没有任何错误。列应该包含数据,因为当强制转换为字符串时,某些avro字段是可读的。

我也尝试过在Scala中的spark-shell(没有jupyter)上这样做,我尝试过基于docker的spark以及spark的独立安装

jars = ["kafka-clients-2.0.0.jar", "spark-avro_2.11-2.4.3.jar", "spark-        
sql-kafka-0-10_2.11-2.4.3.jar"]
jar_paths = ",".join(["/home/jovyan/work/jars/{}".format(jar) for jar in 
jars])

conf = SparkConf()
conf.set("spark.jars", jar_paths)

spark_session = SparkSession \
    .builder \
    .config(conf=conf)\
    .appName("TestStream") \
    .getOrCreate()

def from_avro(col, jsonFormatSchema): 
    sc = SparkContext._active_spark_context 
    avro = sc._jvm.org.apache.spark.sql.avro
    f = getattr(getattr(avro, "package$"), "MODULE$").from_avro
    return Column(f(_to_java_column(col), jsonFormatSchema)) 


def to_avro(col): 
    sc = SparkContext._active_spark_context 
    avro = sc._jvm.org.apache.spark.sql.avro
    f = getattr(getattr(avro, "package$"), "MODULE$").to_avro
    return Column(f(_to_java_column(col))) 

schema_registry_url = "http://schema-registry.org"
transaction_schema_name = "Transaction"

transaction_schema = requests.get(" 
{}/subjects/{}/versions/latest/schema".format(schema_registry_url, 
transaction_schema_name)).text


raw_df = spark_session.read.format("kafka") \
# SNIP
    .option("subscribe", "transaction") \
    .option("startingOffsets", "earliest").load()
raw_df = raw_df.limit(1000).cache()

extract_df = raw_df.select(
    raw_df["key"].cast("String"),
    from_avro(raw_df["value"], transaction_schema).alias("value")
)

# This shows data and fields
raw_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").show(3, truncate=False)

extract_df.show()
+---+-----+
|key|value|
+---+-----+
|...| [[]]|
|...| [[]]|
|...| [[]]|
|...| [[]]|

共有1个答案

庞安晏
2023-03-14

必须手动反序列化数据。到本文撰写之时,PySpark还没有正式支持合流模式注册表。您需要使用Confluent提供的KafkaAvroDeSerializer或第三方Spark avro库ABRiS。

Abris:https://github.com/absaoss/abris#using-abris-with-python-and-pyspark

KafKaavroDeserializer:将Spark结构化流与汇流模式注册表集成

(对不起,我不能发表评论。)

 类似资料:
  • 我是Avro和Kafka的新手,我花了几天时间来发送关于Kafka主题的序列化数据...不成功。 让我来解释一下我想要达到的目标: 在生产者方面,我通过SOAP接收数据并发送关于Kafka主题的内容。我正在使用CXF从WSDL生成POJO,并且编写了相应的模式。我正在尝试做的是序列化由CXF解封的对象,并在我的Kafka主题上发送它们。 在web上找到的大多数示例中,Avro记录都是使用已知的模式

  • 目前,我正在使用Avro1.8.0序列化/反序列化对象,但面临一些问题,特别是java.util.Map对象。不面临其他类型对象的问题。 这里的示例代码- 在deserialize方法中,我试图根据输入数据获取模式,但avro抛出错误- 多谢了。

  • 我从一个远程服务器接收到Python中的Kafka Avro消息(使用Confluent Kafka Python库的使用者),这些消息用json字典表示clickstream数据,其中包含用户代理、位置、url等字段。下面是消息的样子: 如何解码?我尝试了bson解码,但字符串没有被识别为UTF-8,因为我猜它是一种特定的Avro编码。我找到https://github.com/verisign

  • 主要目标是聚合两个Kafka主题,一个压缩慢速移动数据,另一个每秒接收一次的快速移动数据。 我已经能够在简单的场景中使用消息,例如KV(Long, String),使用如下内容: 但是,当您需要从 AVRO 反序列化时,这似乎不是方法。我有一个KV(字符串,AVRO),我需要消费。 我尝试从AVRO模式生成Java类,然后将它们包含在“应用”中,例如: 但这似乎不是正确的方法。 是否有任何文档/示

  • 我重构我的代码来使用kryo序列化。一切都很好,除了从某个类中反序列化一个土工属性。不会引发异常(我将“spark.kryo.registrationRequired”设置为true)。在调试时,我试图收集数据,但我看到geomtry中的数据只是空的。结果,我明白反序列化失败了。Geomtry来自-Any(scala)类型,因为它可能是一个复杂的属性。我的问题是为什么数据是空的,以及是否与属性的类