当前位置: 首页 > 知识库问答 >
问题:

如何在Spark结构化流媒体中使用Scala Case类映射Kafka源

管炳
2023-03-14

我尝试在spark中使用结构化流媒体,因为它非常适合我的用例。然而,我似乎找不到将Kafka传入的数据映射到case类的方法。根据官方文件,我可以做到这一点。

import sparkSession.sqlContext.implicits._                          
val kafkaDF:DataFrame = sparkSession
                                          .readStream
                                          .format("kafka")
                                          .option("kafka.bootstrap.servers", bootstrapServers_CML)
                                          .option("subscribe", topics_ME)
                                          .option("startingOffsets", "latest")
                                          .load()
                                          .selectExpr("cast (value as string) as json") //Kakfa sends data in a specific schema (key, value, topic, offset, timestamp etc)    

val schema_ME = StructType(Seq(
    StructField("Parm1", StringType, true),
    StructField("Parm2", StringType, true),
    StructField("Parm3", TimestampType, true)))  

val mobEventDF:DataFrame = kafkaDF
                         .select(from_json($"json", schema_ME).as("mobEvent")) //Using a StructType to convert to application specific schema. Cant seem to use a case class for schema directly yet. Perhaps with later API??
                         .na.drop()

mobEventDF有这样一个模式

root
 |-- appEvent: struct (nullable = true)
 |    |-- Parm1: string (nullable = true)
 |    |-- Parm2: string (nullable = true)
 |    |-- Parm3: string (nullable = true)

有没有更好的方法?如何将其直接映射到下面的Scala Case类中?

case class ME(name: String, 
                 factory: String,
                 delay: Timestamp)

共有1个答案

谢阳曜
2023-03-14

选择并重命名所有字段,然后作为方法调用

kafkaDF.select($"mobEvent.*").toDF("name", "factory", "delay").as[ME]
 类似资料:
  • 我以前能够运行Kafka结构流编程。但是突然间,我所有的结构流python程序都失败了,出现了一个错误。我从Spark网站上拿了基本的Kafka结构流式编程,也以同样的错误失败。 spark-submit--packages org.apache.spark:spark-sql-kafka-0-102.11:2.2.0c:\users\ranjith.gangam\pycharmprojects\

  • 我设计了一个 Nifi 流,将以 Avro 格式序列化的 JSON 事件推送到 Kafka 主题中,然后我尝试在 Spark 结构化流式处理中使用它。 虽然Kafka部分工作正常,但Spark结构化流媒体无法读取Avro事件。它失败,错误如下。 火花代码 Spark中使用的模式 Kafka中的示例主题数据 以下是版本信息 感谢您的帮助。

  • 我第一次使用pyspark。Spark版本:2.3.0Kafka版本:2.2.0 我有一个Kafka制作人,它以avro格式发送嵌套数据,我正试图在pyspark中编写spark流/结构化流的代码,它将来自Kafka的avro反序列化为数据帧,并进行转换,将其以拼花格式写入s3。我在spark/scala中找到了avro转换器,但pyspark中的支持尚未添加。如何在pyspark中转换相同的值。

  • 我有一个 spark 2.0 应用程序,它使用火花流(使用火花流-kafka-0-10_2.11)从 kafka 读取消息。 结构化流看起来很酷,所以我想尝试迁移代码,但我不知道如何使用它。 在常规流中,我使用kafkaUtils创建Dstrean,在我传递的参数中,它是值deserializer。 在结构化流中,文档说我应该使用DataFrame函数进行反序列化,但我不知道这到底是什么意思。 我

  • 我正在使用Spark 2.2上的Spark结构化流媒体将文件从HDFS目录流式传输到Kafka主题。我想为我写的主题数据捕捉Kafka偏移量。 我正在使用 给Kafka写信。 当我利用 为了捕获流的进度信息,检索到的信息与Kafka中创建的偏移量不相关。 我假设这是因为流提供的信息实际上是关于我正在使用的文件流的,而与Kafka中编写的内容无关。 有没有一种Spark Structure流式处理方

  • 问题内容: 我正在使用Maven 我添加了以下依赖项 我还在代码中添加了jar 它完全可以正常工作,没有任何错误,在通过spark-submit提交时出现以下错误,非常感谢您的帮助。谢谢你的时间。 线程“主要” java.lang.NoClassDefFoundError中的异常:sun.reflect处的KafkaSparkStreaming.sparkStreamingTest(KafkaSp