我试图从[Database ricks][1]中复制示例并将其应用于Kafka的新连接器并引发结构化流,但是我无法使用Spark中的开箱即用方法正确解析JSON…
注:题目以JSON格式写入Kafka。
val ds1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", IP + ":9092")
.option("zookeeper.connect", IP + ":2181")
.option("subscribe", TOPIC)
.option("startingOffsets", "earliest")
.option("max.poll.records", 10)
.option("failOnDataLoss", false)
.load()
下面的代码不行,我相信那是因为列json是字符串,和方法from_json签名不匹配...
val df = ds1.select($"value" cast "string" as "json")
.select(from_json("json") as "data")
.select("data.*")
有什么建议吗?
[更新]示例工作:https://github.com/katsou55/kafka-spark-structured-streaming-example/blob/master/src/main/scala-2.11/Main.scala
首先,您需要为JSON消息定义架构。例如
val schema = new StructType()
.add($"id".string)
.add($"name".string)
现在,您可以在< code>from_json方法中使用该模式,如下所示。
val df = ds1.select($"value" cast "string" as "json")
.select(from_json($"json", schema) as "data")
.select("data.*")
场景与经典的流连接略有不同 交易流: transTS, userid, productid,... streamB:创建的新产品流:productid、productname、createTS等) 我想加入与产品的交易,但我找不到水印/加入条件的组合来实现这一点。 结果为空。 我做错了什么?
我使用结构化流媒体(Spark 2.0.2)来消费Kafka消息。使用scalapb,protobuf中的消息。我得到以下错误。请帮助。。 线程“main”scala中的异常。ScalaRefltionException:不是一个术语org.apache.spark.sql.catalyst.符号$SymbolApi$9.apply术语(Seflection.scala:592)org.apach
我有一个Kafka2.1消息代理,希望在Spark2.4中对消息的数据进行一些处理。我想使用齐柏林0.8.1笔记本快速原型。 我下载了结构化流所必需的spark-streaming-kafka-0-102.11.jar(http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html),并将其作为“dep
在过去的几个月里,我已经使用了相当多的结构化流来实现流作业(在大量使用Kafka之后)。在阅读了《Stream Processing with Apache Spark》一书之后,我有这样一个问题:有没有什么观点或用例可以让我使用Spark Streaming而不是Structured Streaming?如果我投入一些时间来研究它,或者由于im已经使用了Spark结构化流,我应该坚持使用它,而之
我有一个关于Kafka流上的Spark结构化流的问题。 我有一个模式类型: 我从Kafka主题引导我的流,如下所示: 接下来转换为字符串,字符串类型: 现在我想将value字段(这是一个JSON)转换为之前转换的模式,这将使SQL查询更容易: 看来Spark 2.3.1不知道函数? 这是我的进口: 有没有办法解决这个问题?请注意,我不是在寻找Scala解决方案,而是一个纯粹的基于Java的解决方案
我有一个以XML形式出现的数据集,其中一个节点包含JSON。Spark将其作为StringType读入,因此我尝试使用from_json()将json转换为数据帧。 我可以将字符串转换为JSON,但如何编写模式来处理数组? 没有数组的字符串-工作得很好 带数组的字符串 - 无法弄清楚这个