当前位置: 首页 > 知识库问答 >
问题:

如何使用from_json与 Kafka 连接 0.10 和 Spark 结构化流?

寿子轩
2023-03-14

我试图从[Database ricks][1]中复制示例并将其应用于Kafka的新连接器并引发结构化流,但是我无法使用Spark中的开箱即用方法正确解析JSON…

注:题目以JSON格式写入Kafka。

val ds1 = spark
          .readStream
          .format("kafka")
          .option("kafka.bootstrap.servers", IP + ":9092")
          .option("zookeeper.connect", IP + ":2181")
          .option("subscribe", TOPIC)
          .option("startingOffsets", "earliest")
          .option("max.poll.records", 10)
          .option("failOnDataLoss", false)
          .load()

下面的代码不行,我相信那是因为列json是字符串,和方法from_json签名不匹配...

    val df = ds1.select($"value" cast "string" as "json")
                .select(from_json("json") as "data")
                .select("data.*")

有什么建议吗?

[更新]示例工作:https://github.com/katsou55/kafka-spark-structured-streaming-example/blob/master/src/main/scala-2.11/Main.scala

共有1个答案

阴元青
2023-03-14

首先,您需要为JSON消息定义架构。例如

val schema = new StructType()
  .add($"id".string)
  .add($"name".string)

现在,您可以在< code>from_json方法中使用该模式,如下所示。

val df = ds1.select($"value" cast "string" as "json")
            .select(from_json($"json", schema) as "data")
            .select("data.*")
 类似资料:
  • 场景与经典的流连接略有不同 交易流: transTS, userid, productid,... streamB:创建的新产品流:productid、productname、createTS等) 我想加入与产品的交易,但我找不到水印/加入条件的组合来实现这一点。 结果为空。 我做错了什么?

  • 我使用结构化流媒体(Spark 2.0.2)来消费Kafka消息。使用scalapb,protobuf中的消息。我得到以下错误。请帮助。。 线程“main”scala中的异常。ScalaRefltionException:不是一个术语org.apache.spark.sql.catalyst.符号$SymbolApi$9.apply术语(Seflection.scala:592)org.apach

  • 我有一个Kafka2.1消息代理,希望在Spark2.4中对消息的数据进行一些处理。我想使用齐柏林0.8.1笔记本快速原型。 我下载了结构化流所必需的spark-streaming-kafka-0-102.11.jar(http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html),并将其作为“dep

  • 在过去的几个月里,我已经使用了相当多的结构化流来实现流作业(在大量使用Kafka之后)。在阅读了《Stream Processing with Apache Spark》一书之后,我有这样一个问题:有没有什么观点或用例可以让我使用Spark Streaming而不是Structured Streaming?如果我投入一些时间来研究它,或者由于im已经使用了Spark结构化流,我应该坚持使用它,而之

  • 我有一个关于Kafka流上的Spark结构化流的问题。 我有一个模式类型: 我从Kafka主题引导我的流,如下所示: 接下来转换为字符串,字符串类型: 现在我想将value字段(这是一个JSON)转换为之前转换的模式,这将使SQL查询更容易: 看来Spark 2.3.1不知道函数? 这是我的进口: 有没有办法解决这个问题?请注意,我不是在寻找Scala解决方案,而是一个纯粹的基于Java的解决方案

  • 我有一个以XML形式出现的数据集,其中一个节点包含JSON。Spark将其作为StringType读入,因此我尝试使用from_json()将json转换为数据帧。 我可以将字符串转换为JSON,但如何编写模式来处理数组? 没有数组的字符串-工作得很好 带数组的字符串 - 无法弄清楚这个