当前位置: 首页 > 知识库问答 >
问题:

火花行到JSON

太叔英卫
2023-03-14

我想从Spark v.1.6(使用scala)数据帧创建一个JSON。我知道有一个简单的解决方案,就是做df.toJSON

但是,我的问题看起来有点不同。例如,考虑具有以下列的数据帧:

|  A  |     B     |  C1  |  C2  |    C3   |
-------------------------------------------
|  1  | test      |  ab  |  22  |  TRUE   |
|  2  | mytest    |  gh  |  17  |  FALSE  |

我想在最后有一个数据帧

|  A  |     B     |                        C                   |
----------------------------------------------------------------
|  1  | test      | { "c1" : "ab", "c2" : 22, "c3" : TRUE }    |
|  2  | mytest    | { "c1" : "gh", "c2" : 17, "c3" : FALSE }   |

其中C是包含C1C2C3的JSON。不幸的是,我在编译时不知道数据框是什么样子的(除了始终“固定”的列AB)。

至于我需要这个的原因:我使用Protobuf发送结果。不幸的是,我的数据帧有时有比预期更多的列,我仍然会通过Protobuf发送这些列,但我不想在定义中指定所有列。

我怎样才能做到这一点?

共有3个答案

季小云
2023-03-14

这里没有JSON解析器,它适应您的架构:

import org.apache.spark.sql.functions.{col, concat, concat_ws, lit}

df.select(
  col(df.columns(0)),
  col(df.columns(1)),
  concat(
    lit("{"), 
    concat_ws(",",df.dtypes.slice(2, df.dtypes.length).map(dt => {
      val c = dt._1;
      val t = dt._2;
      concat(
        lit("\"" + c + "\":" + (if (t == "StringType") "\""; else "")  ),
        col(c),
        lit(if(t=="StringType") "\""; else "") 
      )
    }):_*), 
    lit("}")
  ) as "C"
).collect()
南门飞扬
2023-03-14

我使用此命令来解决to_json问题:

output_df = (df.select(to_json(struct(col("*"))).alias("content")))
康烨伟
2023-03-14

Spark 2.1应该有对这个用例的本地支持(参见#15354)。

import org.apache.spark.sql.functions.to_json
df.select(to_json(struct($"c1", $"c2", $"c3")))
 类似资料:
  • 一些脚本在工作时什么也不做,当我手动运行它们时,其中一个失败了,出现了以下消息: 错误SparkUI:未能绑定SparkUI java.net.bindexception:地址已在使用:服务“SparkUI”在重试16次后失败! 所以我想知道是否有一种特定的方法来并行运行脚本?

  • 我有一个Spark集群运行在hdfs之上的纱线模式。我启动了一个带有2个内核和2G内存的worker。然后我提交了一个具有3个核心的1个执行器动态配置的作业。不过,我的工作还能运转。有人能解释启动worker的内核数量和为执行者请求的内核数量之间的差异吗。我的理解是,由于执行者在工人内部运行,他们无法获得比工人可用的资源更多的资源。

  • 我想通过分区迭代一个dataframe,对于每个分区,迭代它的所有行,并创建一个deleteList,它将包含HBase的每一行的delete对象。我将Spark和HBase与Java一起使用,并使用以下代码创建了一个行对象: 但它无法工作,因为我无法正确访问行的值。而df有一个名为“hbase_key”的列。

  • 我刚从Spark开始。我已经用Spark安装了CDH5。然而,当我尝试使用sparkcontext时,它给出了如下错误 我对此进行了研究,发现了错误:未找到:值sc 并试图启动火花上下文。/Spark-shell。它给错误

  • 我有一个Spark Spark集群,其中主节点也是工作节点。我无法从驱动程序代码节点到达主程序,并得到错误: driver-code节点中的SparkContext配置为: 我可以成功地,但不能成功地。意味着机器可以到达,但端口不能到达。 会有什么问题?我已经为主节点和驱动程序代码运行的节点(客户端)禁用了Ubuntu的防火墙。

  • 我正试图设置一个小型Spark集群进行测试。该集群由3名工人和一名师傅组成。我在每个节点上设置了Java、scala和Spark。配置文件如下:spark-defaults.conf: Spark-env.sh