当前位置: 首页 > 知识库问答 >
问题:

如何使用结构化流从Kafka读取JSON格式的记录?

谢学名
2023-03-14

我试图使用结构化流方法,使用基于DataFrame/DataSet API的Spark-Streaming来加载来自Kafka的数据流。

我使用:

  • 火花2.10
  • Kafka0.10
  • SPARK-SQL-KAFKA-0-10
|key|value|topic|partition|offset|timestamp|timestampType|
 val columns = Array("column1", "column2") // column names
 val rawKafkaDF = sparkSession.sqlContext.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers","localhost:9092")
  .option("subscribe",topic)
  .load()
  val columnsToSelect = columns.map( x => new Column("value." + x))
  val kafkaDF = rawKafkaDF.select(columnsToSelect:_*)

  // some analytics using stream dataframe kafkaDF

  val query = kafkaDF.writeStream.format("console").start()
  query.awaitTermination()

共有1个答案

井高峯
2023-03-14

从Spark的角度来看,value只是一个字节序列。它不知道序列化格式或内容。为了能够提取文件,您必须首先解析它。

如果将数据序列化为JSON字符串,则有两个选项。您可以value强制转换为StringType,并使用from_json并提供一个模式:

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.from_json

val schema: StructType = StructType(Seq(
  StructField("column1", ???),
  StructField("column2", ???)
))

rawKafkaDF.select(from_json($"value".cast(StringType), schema))

强制转换StringType,使用get_json_object按路径提取字段:

import org.apache.spark.sql.functions.get_json_object

val columns: Seq[String] = ???

val exprs = columns.map(c => get_json_object($"value", s"$$.$c"))

rawKafkaDF.select(exprs: _*)
 类似资料:
  • 我正在尝试从Kafka读取JSON消息并将它们存储在具有火花结构化流的HDFS中。 我遵循了下面的示例,当我的代码如下所示时: 然后我得到hdfs中具有二进制值的行。 这些行按预期连续写入,但采用二进制格式。 我发现了这个帖子: https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structure

  • 我有一个 spark 2.0 应用程序,它使用火花流(使用火花流-kafka-0-10_2.11)从 kafka 读取消息。 结构化流看起来很酷,所以我想尝试迁移代码,但我不知道如何使用它。 在常规流中,我使用kafkaUtils创建Dstrean,在我传递的参数中,它是值deserializer。 在结构化流中,文档说我应该使用DataFrame函数进行反序列化,但我不知道这到底是什么意思。 我

  • 我用的是Spark 2.1。 我正在尝试使用 Spark 结构化流从 Kafka 读取记录,反序列化它们并在之后应用聚合。 我有以下代码: 我想要的是将字段反序列化到我的对象中,而不是转换为。 我有一个自定义的反序列化程序。 我如何在Java中做到这一点? 我找到的唯一相关链接是这个 https://databricks.com/blog/2017/04/26/processing-data-in

  • 我开发了一个Python Kafka生成器,它将多个json记录作为nd-json二进制字符串发送到一个Kafka主题。然后,我尝试用PySpark在Spark结构化流媒体中读取这些消息,如下所示:

  • 批处理查询中似乎不支持“最新”。我想知道是否有可能用另一种方法做类似的事情(不直接处理偏移)

  • 我是火花的新手。我使用结构化流从Kafka读取数据。 我可以在Scala中使用此代码读取数据: 我在值列中的数据是Thrift记录。Streaming api以二进制格式提供数据。我看到了将数据转换为string或json的示例,但我找不到任何关于如何将数据反序列化为Thrift的示例。 我如何才能实现这一点?