我使用Apache Spark 2.2.0和Scala。
我以这个问题为指导,在不使用pivot函数的情况下透视数据帧。
我需要在不使用pivot函数的情况下透视数据帧,因为我有非数字数据,而< code>pivot只对数字数据使用聚合函数,如< code>sum 、< code>min 、< code>max。我想在< code>pivot聚合中使用一个非数字列。
这是我的数据:
+---+-------------+----------+-------------+----------+-------+
|Qid| Question|AnswerText|ParticipantID|Assessment| GeoTag|
+---+-------------+----------+-------------+----------+-------+
| 1|Question1Text| Yes| abcde1| 0|(x1,y1)|
| 2|Question2Text| No| abcde1| 0|(x1,y1)|
| 3|Question3Text| 3| abcde1| 0|(x1,y1)|
| 1|Question1Text| No| abcde2| 0|(x2,y2)|
| 2|Question2Text| Yes| abcde2| 0|(x2,y2)|
+---+-------------+----------+-------------+----------+-------+
我希望它按< code>ParticipantID、< code>Assessment和< code>GeoTag标记分组,并在< code>Question列上“pivot”并从< code>AnswerText列中取值。最后,输出应该如下所示:
+-------------+-----------+----------+-------+-----+----- +
|ParticipantID|Assessment |GeoTag |Qid_1 |Qid_2|Qid_3 |
+-------------+-----------+----------+-------+-----+------+
|abcde1 |0 |(x1,y1) |Yes |No |3 |
|abcde2 |0 |(x2,y2) |No |Yes |null |
+-------------+-----------+----------+-------+-----+------+
我试过这个:
val questions: Array[String] = df.select("Q_id")
.distinct()
.collect()
.map(_.getAs[String]("Q_id"))
.sortWith(_<_)
val df2: DataFrame = questions.foldLeft(df) {
case (data, question) => data.selectExpr("*", s"IF(Q_id = '$question', AnswerText, 0) AS $question")
}
[后跟分组依据表达式]
但是我得到了以下错误,这一定与最终语句AS$问题
的语法有关
17/12/08 16:13:12 INFO SparkSqlParser: Parsing command: *
17/12/08 16:13:12 INFO SparkSqlParser: Parsing command: IF(Q_id_string_new_2 = '101_Who_is_with_you_right_now?', AnswerText, 0) AS 101_Who_is_with_you_right_now?
extraneous input '?' expecting <EOF>(line 1, pos 104)
== SQL ==
IF(Q_id_string_new_2 = '101_Who_is_with_you_right_now?', AnswerText, 0) AS 101_Who_is_with_you_right_now?
--------------------------------------------------------------------------------------------------------^^^
org.apache.spark.sql.catalyst.parser.ParseException:
extraneous input '?' expecting <EOF>(line 1, pos 104)
== SQL ==
IF(Q_id_string_new_2 = '101_Who_is_with_you_right_now?', AnswerText, 0) AS 101_Who_is_with_you_right_now?
--------------------------------------------------------------------------------------------------------^^^
at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:217)
你知道我哪里出了问题吗?有更好的方法吗?如果有必要,我考虑在Spark之外恢复使用Panda和Python,但如果可能的话,我宁愿在相同的框架内编写所有代码。
只需注意@user8371915接受的答案,以使查询更快。
有一种方法可以避免使用标头生成问题
的昂贵扫描。
只需生成标题(在同一作业和阶段中!)然后是列上的枢轴
。
// It's a simple and cheap map-like transformation
val qid_header = input.withColumn("header", concat(lit("Qid_"), $"Qid"))
scala> qid_header.show
+---+-------------+----------+-------------+----------+-------+------+
|Qid| Question|AnswerText|ParticipantID|Assessment| GeoTag|header|
+---+-------------+----------+-------------+----------+-------+------+
| 1|Question1Text| Yes| abcde1| 0|(x1,y1)| Qid_1|
| 2|Question2Text| No| abcde1| 0|(x1,y1)| Qid_2|
| 3|Question3Text| 3| abcde1| 0|(x1,y1)| Qid_3|
| 1|Question1Text| No| abcde2| 0|(x2,y2)| Qid_1|
| 2|Question2Text| Yes| abcde2| 0|(x2,y2)| Qid_2|
+---+-------------+----------+-------------+----------+-------+------+
将标头作为数据集的一部分,让我们进行透视。
val solution = qid_header
.groupBy('ParticipantID, 'Assessment, 'GeoTag)
.pivot('header)
.agg(first('AnswerText))
scala> solution.show
+-------------+----------+-------+-----+-----+-----+
|ParticipantID|Assessment| GeoTag|Qid_1|Qid_2|Qid_3|
+-------------+----------+-------+-----+-----+-----+
| abcde1| 0|(x1,y1)| Yes| No| 3|
| abcde2| 0|(x2,y2)| No| Yes| null|
+-------------+----------+-------+-----+-----+-----+
当< code>$question将问题变量的值代入SQL语句时,您会得到一个带“?”的列名在它的SQL中。<代码>?在列名中不是有效字符,因此您必须至少使用反斜杠来引用:
s"IF(Q_id = '$question', AnswerText, 0) AS `$question`"
或者使用< code > select /< code > with column :
import org.apache.spark.sql.functions.when
case (data, question) =>
data.withColumn(question, when($"Q_id" === question, $"AnswerText"))
或者首先使用< code>regexp_replace对字符串进行santize。
我需要在不使用pivot函数的情况下透视数据帧,因为我有非数字数据和df.pivot只对数字数据使用sum、min、max等聚合函数。
您可以先使用<code>来:如何使用pivot并计算非数值列的平均值(面向AnalysisException“不是数值列”)?
data.groupBy($"ParticipantID", $"Assessment", $"GeoTag")
.pivot($"Question", questions).agg(first($"AnswerText"))
假设我们在JavaEE应用程序中有这些类,它们也是Hibernate实体: 在过去几年中,创建了一些Swing窗口,让配置器用户创建ad deploy DRools软件包规则,以自定义客户所需的工作流。这些窗口以某种方式将Swing组件转换为Drool Mvel文本,以避免配置程序用户编写原始代码。然后将这些规则保存并部署到BLOB字段中的DB表中,并在需要时执行。问题是,现在我们需要实现一个新的
问题内容: 我将如何在Python中“按任意键”(或抓住菜单选项)? raw_input要求您按回车键。 Windows msvcrt具有getch()和getche()。 有使用标准库执行此操作的可移植方法吗? 问题答案:
我们正在AWS上托管Cassandra 2.0.2群集。我们最近开始通过引导新节点和停用旧节点,从普通驱动器升级到SSD驱动器。除了两个节点永远处于退役状态外,进展相当顺利。现在,在新的6个节点运行后,我们注意到一些使用phpcassa的旧工具停止工作。安全组没有任何变化,所有端口TCP/UDP都是打开的,telnet可以通过9160连接,cqlsh可以“使用”群集,选择数据,但是,“描述群集”失
问题内容: 我经常遇到Java lambda表达式的问题,当我想对对象的任意属性或方法上的stream()进行区分(但要保留该对象而不是将其映射到该属性或方法上)时,就会遇到问题。我开始创建容器,如这里讨论的那样,但是我开始做足够的工作,直到它变得令人讨厌并制作了许多样板课程。 我把这个Pairing类放在一起,它包含两种类型的两个对象,并允许你指定从左,右或两个对象上抠出的键。我的问题是……在某
问题内容: 我需要对表中的XML列执行数据透视,其中XML包含具有多个属性的多个元素。每个元素中的属性始终相同,但是元素的数量会有所不同。让我举个例子吧… 我需要为每个不同的FieldName属性值(在此示例中为Username,FirstName,LastName和NewField)使用相应的FieldValue属性作为值的结果集结束。我上面给出的示例的结果如下所示: 我想出了一种使用静态列来完
我通过安装从FastAPI提供React应用程序 通过这个 React 应用程序得到服务,React 路由在客户端也能正常工作,但是一旦客户端重新加载未在服务器上定义但在 React 应用程序中使用的路由 FastAPI 来解决此问题,我做了如下操作。 < Li > < code > @ app . route('/network ') < Li > < code > @ app . route(