我创建了一个DataFrame,如下所示,我想对列标题应用映射减少算法,但是当我使用减少键函数时,我遇到了一些问题。
+-------+--------------------+------------+-----------+
|project| title|requests_num|return_size|
+-------+--------------------+------------+-----------+
| aa|%CE%92%CE%84_%CE%...| 1| 4854|
| aa|%CE%98%CE%B5%CF%8...| 1| 4917|
| aa|%CE%9C%CF%89%CE%A...| 1| 4832|
| aa|%CE%A0%CE%B9%CE%B...| 1| 4828|
| aa|%CE%A3%CE%A4%CE%8...| 1| 4819|
| aa|%D0%A1%D0%BE%D0%B...| 1| 4750|
| aa| 271_a.C| 1| 4675|
| aa|Battaglia_di_Qade...| 1| 4765|
| aa| Category:User_th| 1| 4770|
| aa| Chiron_Elias_Krase| 1| 4694|
| aa|County_Laois/en/Q...| 1| 4752|
| aa| Dassault_rafaele| 2| 9372|
| aa|Dyskusja_wikiproj...| 1| 4824|
| aa| E.Desv| 1| 4662|
| aa|Enclos-apier/fr/E...| 1| 4772|
| aa|File:Wiktionary-l...| 1| 10752|
| aa|Henri_de_Sourdis/...| 1| 4748|
| aa|Incentive_Softwar...| 1| 4777|
| aa|Indonesian_Wikipedia| 1| 4679|
| aa| Main_Page| 5| 266946|
+-------+--------------------+------------+-----------+
我试过这个,但不管用:
dataframe.select("title").map(word => (word,1)).reduceByKey(_+_);
似乎我应该先将数据帧转移到列表中,然后使用映射函数生成键值对(word,1),最后求和键值。i一种从stackoverflow将数据框转移到列表的方法,例如
val text =dataframe.select("title").map(r=>r(0).asInstanceOf[String]).collect()
但是出现了一个错误
scala> val text = dataframe.select("title").map(r=>r(0).asInstanceOf[String]).collect()
2018-04-08 21:49:35 WARN NettyRpcEnv:66 - Ignored message: HeartbeatResponse(false)
2018-04-08 21:49:35 WARN Executor:87 - Issue communicating with driver in heartbeater
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:785)
at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:814)
at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)
at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:814)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
... 14 more
java.lang.OutOfMemoryError: Java heap space
at org.apache.spark.sql.execution.SparkPlan$$anon$1.next(SparkPlan.scala:280)
at org.apache.spark.sql.execution.SparkPlan$$anon$1.next(SparkPlan.scala:276)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.sql.execution.SparkPlan$$anon$1.foreach(SparkPlan.scala:276)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeCollect$1.apply(SparkPlan.scala:298)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeCollect$1.apply(SparkPlan.scala:297)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:297)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272)
at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722)
at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:2722)
... 16 elided
scala> val text = dataframe.select("title").map(r=>r(0).asInstanceOf[String]).collect()
java.lang.OutOfMemoryError: GC overhead limit exceeded
at org.apache.spark.sql.execution.SparkPlan$$anon$1.next(SparkPlan.scala:280)
at org.apache.spark.sql.execution.SparkPlan$$anon$1.next(SparkPlan.scala:276)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at org.apache.spark.sql.execution.SparkPlan$$anon$1.foreach(SparkPlan.scala:276)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeCollect$1.apply(SparkPlan.scala:298)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeCollect$1.apply(SparkPlan.scala:297)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:297)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272)
at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722)
at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722)
at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
at org.apache.spark.sql.Dataset.collect(Dataset.scala:2722)
... 16 elided
Collect
-将数据帧绑定到Scala集合会对数据集大小施加限制。相反,您可以将数据帧转换为RDD,然后应用map
和reduceByKey
,如下所示:
val df = Seq(
("aa", "271_a.C", 1, 4675),
("aa", "271_a.C", 1, 4400),
("aa", "271_a.C", 1, 4600),
("aa", "Chiron_Elias_Krase", 1, 4694),
("aa", "Chiron_Elias_Krase", 1, 4500)
).toDF("project", "title", "request_num", "return_size")
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
val rdd = df.rdd.
map{ case Row(_, title: String, _, _) => (title, 1) }.
reduceByKey(_ + _)
rdd.collect
// res1: Array[(String, Int)] = Array((Chiron_Elias_Krase,2), (271_a.C,3))
您还可以使用groupBy
直接转换数据帧:
df.groupBy($"title").agg(count($"title").as("count")).
show
// +------------------+-----+
// | title|count|
// +------------------+-----+
// | 271_a.C| 3|
// |Chiron_Elias_Krase| 2|
// +------------------+-----+
问题内容: 这是我的基本表(显示为关联数组): 在另一个名为的表上,我可以添加多个属于一行的自定义数据。像这样: 我目前正在制作,但它只会增加,并给我行。我想要的是我的查询返回如下内容: 行值变成一列,而,它就是值。 我该怎么做? 问题答案: SELECT a.ID, a.Campaign_ID, a.FirstName, a.LastName, MAX(CASE WHEN b.data = ‘q
我有一个包含以下数据的Spark数据框(我使用Spark csv加载数据): 是否有类似于spark RDD的东西可以返回spark数据帧:(基本上,对相同的键值求和) (我可以将数据转换为RDD并执行reduceByKey操作,但是否有更具Spark DataFrame API的方法来实现这一点?)
我用的是这样的意图: 在中,我有以下内容: 当我从特定的相册中选择图像时,比如“posts”、“profile photos”(参见屏幕截图),我无法在中获取图像路径。可以从其他相册中选择图像,没有任何问题。 我尝试在中添加但返回。 这里也有类似的问题,但没有人回答。 请救命!
一条指令可以有零或多个操作数-指令操作的数据。零操作数的一个例子是NOP(no operation)。操作数可以在下面的位置: 位于指令本身(立即数) 位于寄存器(EAX, EBX, ECX, EDX, ESI, EDI, ESP, 或者EBP,如果是32位操作数;AX, BX, CX, DX, SI, DI, SP, 或者BP,如果是16位操作数;AH, AL, BH, BL, CH, CL,
我想从实体类中选择特定的列,而忽略方法中的其他列。为此,我不能使用@JsonIgnore,因为在另一个方法中,我想调用该实体类中可用的所有列。 现在我想要一个方法,它提供忽略@JsonIgnore列的数据,另一个方法提供所有四列,包括调用时带有@JsonIgnore注释的列。 你能帮我解决这个问题吗
问题内容: 我有一个包含以下数据的Spark数据帧(我使用spark-csv加载数据): 有什么类似于spark RDD的东西可以返回一个Spark DataFrame如下:(基本上,对相同的键值求和) (我可以将数据转换为RDD并进行操作,但是还有更多的Spark DataFrame API方式可以做到这一点吗?) 问题答案: 如果你不关心列名,你可以使用后跟: 否则最好替换为: 最后,您可以使