我需要展平一个数据帧,以便将其与Spark(Scala)中的另一个数据帧连接起来。
基本上,我的2个数据帧有以下模式:
数据流1
root
|-- field1: string (nullable = true)
|-- field2: long (nullable = true)
|-- field3: long (nullable = true)
|-- field4: long (nullable = true)
|-- field5: integer (nullable = true)
|-- field6: timestamp (nullable = true)
|-- field7: long (nullable = true)
|-- field8: long (nullable = true)
|-- field9: long (nullable = true)
|-- field10: integer (nullable = true)
DF2
root
|-- field1: long (nullable = true)
|-- field2: long (nullable = true)
|-- field3: string (nullable = true)
|-- field4: integer (nullable = true)
|-- field5: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- field6: long (nullable = true)
| | |-- field7: integer (nullable = true)
| | |-- field8: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- field9: string (nullable = true)
| | | | |-- field10: integer (nullable = true)
|-- field11: timestamp (nullable = true)
老实说,我不知道如何使DF2变平。最后,我需要连接DF.field4 = DF2.field9上的2个数据帧
我用的是2.1.0
我的第一个想法是使用爆炸,但在Spark 2.1.0中已经被否决了,有人能给我一点提示吗?
我的错误分解功能在Spark 2.1.0中org.apache.spark.sql包中的functions.explode下仍然可用
谢谢
您可以在下面找到代码:
val DF2Exploded1 = DF2.select(DF2("*"), functions.explode(DF2("field5"))
.alias("field5_exploded"))
val DF2Exploded2 = DF2Exploded1.select(DF2Exploded1("*"), functions.explode(DF2Exploded1("field5_exploded.field8"))
.alias("field8_exploded"))
我已经在Spark中读取了一个JSON文件。该文件具有以下结构: 我创建了一个递归函数来使用嵌套结构类型的列来展平架构 如何展平包含嵌套结构类型的ArrayType,例如engagementItems:数组(nullable=true) 感谢您的帮助。
我正在使用数据帧读取。拼花地板文件,但不是将它们转换为rdd来进行我的正常处理,我想对它们进行处理。 所以我有我的文件: 即使从数据帧转换为RDD,我也会收到以下错误: :26:错误:值zipWithIndex不是组织的成员。阿帕奇。火花sql。一行 任何人都知道如何做我正在尝试做的事情,本质上是尝试获取值和列索引。 我在想这样的事情: 但最后一部分被卡住了,因为不知道如何做zipWithInde
问题内容: 在我们的应用程序中,我们使用Spark sql获取字段值作为列。我正在尝试弄清楚如何将列值放入嵌套的json对象并推送到Elasticsearch。还有一种方法可以参数化值以传递给正则表达式? 我们目前正在使用Spark Java API。 实际输出: 我们需要在节点“ txn_summary”下的上述列,例如以下json: 预期产量: 问题答案: 将所有列添加到顶层结构应提供预期的输
有人能解释一下将为Spark Dataframe创建的分区数量吗。 我知道对于RDD,在创建它时,我们可以提到如下分区的数量。 但是对于创建时的Spark数据帧,看起来我们没有像RDD那样指定分区数量的选项。 我认为唯一的可能性是,在创建数据帧后,我们可以使用重新分区API。 有人能告诉我在创建数据帧时,我们是否可以指定分区的数量。
null null 为什么要使用UDF/UADF而不是map(假设map保留在数据集表示中)?
如何解析嵌套列表的JSON字符串以在pyspark中触发数据帧? 输入数据帧: 预期产出: 示例代码: 有几个例子,但我不知道如何做到这一点: > < li> 如何在pyspark中解析和转换spark数据帧行中的json字符串 如何从pyspark中的spark数据帧行转换具有多个键的JSON字符串?