当前位置: 首页 > 面试题库 >

Pyspark数据框上的数据透视字符串列

司空浩邈
2023-03-14
问题内容

我有一个像这样的简单数据框:

rdd = sc.parallelize(
    [
        (0, "A", 223,"201603", "PORT"), 
        (0, "A", 22,"201602", "PORT"), 
        (0, "A", 422,"201601", "DOCK"), 
        (1,"B", 3213,"201602", "DOCK"), 
        (1,"B", 3213,"201601", "PORT"), 
        (2,"C", 2321,"201601", "DOCK")
    ]
)
df_data = sqlContext.createDataFrame(rdd, ["id","type", "cost", "date", "ship"])

df_data.show()
 +---+----+----+------+----+
| id|type|cost|  date|ship|
+---+----+----+------+----+
|  0|   A| 223|201603|PORT|
|  0|   A|  22|201602|PORT|
|  0|   A| 422|201601|DOCK|
|  1|   B|3213|201602|DOCK|
|  1|   B|3213|201601|PORT|
|  2|   C|2321|201601|DOCK|
+---+----+----+------+----+

我需要按日期进行调整:

df_data.groupby(df_data.id, df_data.type).pivot("date").avg("cost").show()

+---+----+------+------+------+
| id|type|201601|201602|201603|
+---+----+------+------+------+
|  2|   C|2321.0|  null|  null|
|  0|   A| 422.0|  22.0| 223.0|
|  1|   B|3213.0|3213.0|  null|
+---+----+------+------+------+

一切正常。但是现在我需要对其进行透视,并获得一个非数字列:

df_data.groupby(df_data.id, df_data.type).pivot("date").avg("ship").show()

当然,我会得到一个例外:

AnalysisException: u'"ship" is not a numeric column. Aggregation function can only be applied on a numeric column.;'

我想产生一些东西

+---+----+------+------+------+
| id|type|201601|201602|201603|
+---+----+------+------+------+
|  2|   C|DOCK  |  null|  null|
|  0|   A| DOCK |  PORT| DOCK|
|  1|   B|DOCK  |PORT  |  null|
+---+----+------+------+------+

有可能pivot吗?


问题答案:

假设(id |type | date)组合是唯一的,并且您的唯一目标是枢纽而不是合计,则可以使用first(或任何其他不限于数值的函数):

from pyspark.sql.functions import first

(df_data
    .groupby(df_data.id, df_data.type)
    .pivot("date")
    .agg(first("ship"))
    .show())

## +---+----+------+------+------+
## | id|type|201601|201602|201603|
## +---+----+------+------+------+
## |  2|   C|  DOCK|  null|  null|
## |  0|   A|  DOCK|  PORT|  PORT|
## |  1|   B|  PORT|  DOCK|  null|
## +---+----+------+------+------+

如果这些假设不正确,则必须预先汇总数据。例如,对于最常见的ship值:

from pyspark.sql.functions import max, struct

(df_data
    .groupby("id", "type", "date", "ship")
    .count()
    .groupby("id", "type")
    .pivot("date")
    .agg(max(struct("count", "ship")))
    .show())

## +---+----+--------+--------+--------+
## | id|type|  201601|  201602|  201603|
## +---+----+--------+--------+--------+
## |  2|   C|[1,DOCK]|    null|    null|
## |  0|   A|[1,DOCK]|[1,PORT]|[1,PORT]|
## |  1|   B|[1,PORT]|[1,DOCK]|    null|
## +---+----+--------+--------+--------+


 类似资料:
  • 问题内容: 我想通过替换子字符串对Spark Dataframe列执行一些基本的处理。最快的方法是什么? 在当前的用例中,我有一个要规范化的地址列表。例如,此数据框: 会成为 问题答案: 对于Spark 1.5或更高版本,可以使用功能包: 快速说明: 调用该函数可在数据框中添加(或替换,如果名称存在)列。 该函数将通过替换所有与模式匹配的子字符串来生成新列。

  • 我正在使用spark(批处理,而不是流)从kafka topic中读取数据来创建spark dataframe。我想使用spark将这个数据帧加载到cassandra。Dataframe是字符串格式,如下所示。 root |-value:string(nullable = true) 我尝试使用','分隔符拆分数据帧记录,并形成新的数据帧,我可以将其数据到cassandra。 创建了如下的火花DF

  • 问题内容: 我想在SQL Server中创建一个视图,该视图结合了几部分数据库元数据。 我想要的一个元数据保存在表中-relevent列如下: 如您所见,如果“文本”列中的数据超过最大长度(SQL Server中为8000字节/ 4000个字符,在我的示例中为12个字符),则该数据将分成多行。标识将文本重新组合在一起的顺序。 我想在我的视图中进行查询/子查询,以重新组合sys.syscomment

  • A 数据透视表介绍 B.1 什么是数据透视表? 数据透视表是一种可以快速汇总、分析大量数据表格的交互式工具。使用数据透视表可以按照数据表格的不同字段从多个角度进行透视,并建立交叉表格,用以查看数据表格不同层面的汇总信息、分析结果以及摘要数据。使用数据透视表可以深入分析数值数据,以帮助用户发现关键数据,并做出有关企业中关键数据的决策。 数据透视表是针对以下用途特别设计的:以友好的方式,查看大量的数据

  • 数据透视表显示二维交集的度量值,并在表格视图中表示数据。 图表属性 选择图表类型后,可以更改其属性来自定义图表: 选项 描述 常规 背景颜色 设置图表区域的背景颜色。 不透明度 设置背景颜色的不透明度。 显示边框 显示图表外部边框。 边界颜色 设置图表外部边框的颜色。 显示标题 显示图表的主要标题。 标题 指定图表的标题。 标题字体 设置标题的字体样式。 位置 设置标题的位置。 对齐 设置标题的水

  • 数据透视表显示二维交集的度量值,并在表格视图中表示数据。 图表属性 选择图表类型后,可以更改其属性来自定义图表: 选项 描述 常规 背景颜色 设置图表区域的背景颜色。 显示边框 显示图表外部边框。 边界颜色 设置图表外部边框的颜色。 显示标题 显示图表的主要标题。 标题 指定图表的标题。 标题字体 设置标题的字体样式。 位置 设置标题的位置。 对齐 设置标题的水平对齐方式。 数据 字体 设置字段名