我试图使用结构化流方法,使用基于DataFrame/DataSet API的Spark-Streaming来加载来自Kafka的数据流。
我使用:
|key|value|topic|partition|offset|timestamp|timestampType|
val columns = Array("column1", "column2") // column names
val rawKafkaDF = sparkSession.sqlContext.readStream
.format("kafka")
.option("kafka.bootstrap.servers","localhost:9092")
.option("subscribe",topic)
.load()
val columnsToSelect = columns.map( x => new Column("value." + x))
val kafkaDF = rawKafkaDF.select(columnsToSelect:_*)
// some analytics using stream dataframe kafkaDF
val query = kafkaDF.writeStream.format("console").start()
query.awaitTermination()
从Spark的角度来看,value
只是一个字节序列。它不知道序列化格式或内容。为了能够提取文件,您必须首先解析它。
如果将数据序列化为JSON字符串,则有两个选项。您可以将
value
强制转换为StringType
,并使用from_json
并提供一个模式:
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.from_json
val schema: StructType = StructType(Seq(
StructField("column1", ???),
StructField("column2", ???)
))
rawKafkaDF.select(from_json($"value".cast(StringType), schema))
或强制转换
到StringType
,使用get_json_object
按路径提取字段:
import org.apache.spark.sql.functions.get_json_object
val columns: Seq[String] = ???
val exprs = columns.map(c => get_json_object($"value", s"$$.$c"))
rawKafkaDF.select(exprs: _*)
我正在尝试从Kafka读取JSON消息并将它们存储在具有火花结构化流的HDFS中。 我遵循了下面的示例,当我的代码如下所示时: 然后我得到hdfs中具有二进制值的行。 这些行按预期连续写入,但采用二进制格式。 我发现了这个帖子: https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structure
我有一个 spark 2.0 应用程序,它使用火花流(使用火花流-kafka-0-10_2.11)从 kafka 读取消息。 结构化流看起来很酷,所以我想尝试迁移代码,但我不知道如何使用它。 在常规流中,我使用kafkaUtils创建Dstrean,在我传递的参数中,它是值deserializer。 在结构化流中,文档说我应该使用DataFrame函数进行反序列化,但我不知道这到底是什么意思。 我
我用的是Spark 2.1。 我正在尝试使用 Spark 结构化流从 Kafka 读取记录,反序列化它们并在之后应用聚合。 我有以下代码: 我想要的是将字段反序列化到我的对象中,而不是转换为。 我有一个自定义的反序列化程序。 我如何在Java中做到这一点? 我找到的唯一相关链接是这个 https://databricks.com/blog/2017/04/26/processing-data-in
我开发了一个Python Kafka生成器,它将多个json记录作为nd-json二进制字符串发送到一个Kafka主题。然后,我尝试用PySpark在Spark结构化流媒体中读取这些消息,如下所示:
批处理查询中似乎不支持“最新”。我想知道是否有可能用另一种方法做类似的事情(不直接处理偏移)
我是火花的新手。我使用结构化流从Kafka读取数据。 我可以在Scala中使用此代码读取数据: 我在值列中的数据是Thrift记录。Streaming api以二进制格式提供数据。我看到了将数据转换为string或json的示例,但我找不到任何关于如何将数据反序列化为Thrift的示例。 我如何才能实现这一点?