当前位置: 首页 > 知识库问答 >
问题:

在Spark结构化流媒体中使用来自Kafka的Avro事件

戚正业
2023-03-14

我设计了一个 Nifi 流,将以 Avro 格式序列化的 JSON 事件推送到 Kafka 主题中,然后我尝试在 Spark 结构化流式处理中使用它。

虽然Kafka部分工作正常,但Spark结构化流媒体无法读取Avro事件。它失败,错误如下。

[Stage 0:>                                                          (0 + 1) / 1]2019-07-19 16:56:57 ERROR Utils:91 - Aborting task
org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -62
        at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
        at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
        at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
        at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
        at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)

火花代码

import org.apache.spark.sql.types.{ StructField, StructType }
import org.apache.spark.sql.types.{ DecimalType, LongType, ByteType, StringType }
import org.apache.spark.sql.types.DataType._
import scala.collection.Seq
import org.apache.spark._
import spark.implicits._
import org.apache.spark.streaming._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.sql.avro._
import java.nio.file.{Files, Path, Paths}

val spark = SparkSession.builder.appName("Spark-Kafka-Integration").master("local").getOrCreate()
val jsonFormatSchema = new String(Files.readAllBytes(Paths.get("schema.avsc")))
val df = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "host:port").option("subscribe", "topic_name").load()
val df1 = df.select(from_avro(col("value"),jsonFormatSchema).as("data")).select("data.*")
df1.writeStream.format("console").option("truncate","false").start()
))

Spark中使用的模式

{
 "type": "record",
 "name": "kafka_demo_new",
 "fields": [
  {
   "name": "host",
   "type": "string"
  },
  {
   "name": "event",
   "type": "string"
  },
  {
   "name": "connectiontype",
   "type": "string"
  },
  {
   "name": "user",
   "type": "string"
  },
  {
   "name": "eventtimestamp",
   "type": "string"
  }
 ]
}

Kafka中的示例主题数据

{"host":"localhost","event":"Qradar_Demo","connectiontype":"tcp/ip","user":"user","eventtimestamp":"2018-05-24 23:15:07"}

以下是版本信息

HDP - 3.1.0
Kafka - 2.0.0
Spark - 2.4.0

感谢您的帮助。

共有1个答案

郎聪
2023-03-14

遇到了类似的问题,发现Kafka / KSQL有一个不同版本的AVRO,这使得其他组件抱怨。

这也可能是你的情况:看看:https://github.com/confluentinc/ksql/issues/1742

 类似资料:
  • 我第一次使用pyspark。Spark版本:2.3.0Kafka版本:2.2.0 我有一个Kafka制作人,它以avro格式发送嵌套数据,我正试图在pyspark中编写spark流/结构化流的代码,它将来自Kafka的avro反序列化为数据帧,并进行转换,将其以拼花格式写入s3。我在spark/scala中找到了avro转换器,但pyspark中的支持尚未添加。如何在pyspark中转换相同的值。

  • 我有一个 spark 2.0 应用程序,它使用火花流(使用火花流-kafka-0-10_2.11)从 kafka 读取消息。 结构化流看起来很酷,所以我想尝试迁移代码,但我不知道如何使用它。 在常规流中,我使用kafkaUtils创建Dstrean,在我传递的参数中,它是值deserializer。 在结构化流中,文档说我应该使用DataFrame函数进行反序列化,但我不知道这到底是什么意思。 我

  • 我以前能够运行Kafka结构流编程。但是突然间,我所有的结构流python程序都失败了,出现了一个错误。我从Spark网站上拿了基本的Kafka结构流式编程,也以同样的错误失败。 spark-submit--packages org.apache.spark:spark-sql-kafka-0-102.11:2.2.0c:\users\ranjith.gangam\pycharmprojects\

  • 我一直在用Scala 2.11阅读spark structured streaming(2.4.4)中Kafka的avro序列化消息。为此,我使用了spark avro(下面的dependency)。我使用合流Kafka库从python生成Kafka消息。Spark streaming能够使用模式来使用消息,但无法正确读取字段的值。我准备了一个简单的例子来说明这个问题,代码在这里可用:https:

  • 我尝试在spark中使用结构化流媒体,因为它非常适合我的用例。然而,我似乎找不到将Kafka传入的数据映射到case类的方法。根据官方文件,我可以做到这一点。 mobEventDF有这样一个模式 有没有更好的方法?如何将其直接映射到下面的Scala Case类中?

  • 我是Kafka流媒体的新手。我使用python设置了一个twitter监听器,它运行在localhost:9092kafka服务器中。我可以使用kafka客户端工具(conduktor)并使用命令“bin/kafka-console-consumer.sh--bootstrap-server localhost:9092-topic twitter--from-begind”来使用侦听器生成的流,