当前位置: 首页 > 知识库问答 >
问题:

无法为ReduceByKey操作选择特定列

翟博雅
2023-03-14

我创建了一个DataFrame,如下所示,我想对列标题应用映射减少算法,但是当我使用减少键函数时,我遇到了一些问题。

+-------+--------------------+------------+-----------+
|project|               title|requests_num|return_size|
+-------+--------------------+------------+-----------+
|     aa|%CE%92%CE%84_%CE%...|           1|       4854|
|     aa|%CE%98%CE%B5%CF%8...|           1|       4917|
|     aa|%CE%9C%CF%89%CE%A...|           1|       4832|
|     aa|%CE%A0%CE%B9%CE%B...|           1|       4828|
|     aa|%CE%A3%CE%A4%CE%8...|           1|       4819|
|     aa|%D0%A1%D0%BE%D0%B...|           1|       4750|
|     aa|             271_a.C|           1|       4675|
|     aa|Battaglia_di_Qade...|           1|       4765|
|     aa|    Category:User_th|           1|       4770|
|     aa|  Chiron_Elias_Krase|           1|       4694|
|     aa|County_Laois/en/Q...|           1|       4752|
|     aa|    Dassault_rafaele|           2|       9372|
|     aa|Dyskusja_wikiproj...|           1|       4824|
|     aa|              E.Desv|           1|       4662|
|     aa|Enclos-apier/fr/E...|           1|       4772|
|     aa|File:Wiktionary-l...|           1|      10752|
|     aa|Henri_de_Sourdis/...|           1|       4748|
|     aa|Incentive_Softwar...|           1|       4777|
|     aa|Indonesian_Wikipedia|           1|       4679|
|     aa|           Main_Page|           5|     266946|
+-------+--------------------+------------+-----------+

我试过这个,但不管用:

dataframe.select("title").map(word => (word,1)).reduceByKey(_+_);

似乎我应该先将数据帧转移到列表中,然后使用映射函数生成键值对(word,1),最后求和键值。i一种从stackoverflow将数据框转移到列表的方法,例如

val text =dataframe.select("title").map(r=>r(0).asInstanceOf[String]).collect()

但是出现了一个错误

scala> val text = dataframe.select("title").map(r=>r(0).asInstanceOf[String]).collect()
2018-04-08 21:49:35 WARN  NettyRpcEnv:66 - Ignored message: HeartbeatResponse(false)
2018-04-08 21:49:35 WARN  Executor:87 - Issue communicating with driver in heartbeater
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
    at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:47)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:62)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:58)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
    at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:92)
    at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:785)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply$mcV$sp(Executor.scala:814)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
    at org.apache.spark.executor.Executor$$anon$2$$anonfun$run$1.apply(Executor.scala:814)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)
    at org.apache.spark.executor.Executor$$anon$2.run(Executor.scala:814)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    ... 14 more
java.lang.OutOfMemoryError: Java heap space
  at org.apache.spark.sql.execution.SparkPlan$$anon$1.next(SparkPlan.scala:280)
  at org.apache.spark.sql.execution.SparkPlan$$anon$1.next(SparkPlan.scala:276)
  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  at org.apache.spark.sql.execution.SparkPlan$$anon$1.foreach(SparkPlan.scala:276)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeCollect$1.apply(SparkPlan.scala:298)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeCollect$1.apply(SparkPlan.scala:297)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:297)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272)
  at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722)
  at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722)
  at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
  at org.apache.spark.sql.Dataset.collect(Dataset.scala:2722)
  ... 16 elided

scala> val text = dataframe.select("title").map(r=>r(0).asInstanceOf[String]).collect()
java.lang.OutOfMemoryError: GC overhead limit exceeded                          
  at org.apache.spark.sql.execution.SparkPlan$$anon$1.next(SparkPlan.scala:280)
  at org.apache.spark.sql.execution.SparkPlan$$anon$1.next(SparkPlan.scala:276)
  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
  at org.apache.spark.sql.execution.SparkPlan$$anon$1.foreach(SparkPlan.scala:276)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeCollect$1.apply(SparkPlan.scala:298)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeCollect$1.apply(SparkPlan.scala:297)
  at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
  at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:297)
  at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3272)
  at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722)
  at org.apache.spark.sql.Dataset$$anonfun$collect$1.apply(Dataset.scala:2722)
  at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3253)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3252)
  at org.apache.spark.sql.Dataset.collect(Dataset.scala:2722)
  ... 16 elided

共有1个答案

汤跃
2023-03-14

Collect-将数据帧绑定到Scala集合会对数据集大小施加限制。相反,您可以将数据帧转换为RDD,然后应用mapreduceByKey,如下所示:

val df = Seq(
  ("aa", "271_a.C", 1, 4675),
  ("aa", "271_a.C", 1, 4400),
  ("aa", "271_a.C", 1, 4600),
  ("aa", "Chiron_Elias_Krase", 1, 4694),
  ("aa", "Chiron_Elias_Krase", 1, 4500)
).toDF("project", "title", "request_num", "return_size")

import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row

val rdd = df.rdd.
  map{ case Row(_, title: String, _, _) => (title, 1) }.
  reduceByKey(_ + _)

rdd.collect
// res1: Array[(String, Int)] = Array((Chiron_Elias_Krase,2), (271_a.C,3))

您还可以使用groupBy直接转换数据帧:

df.groupBy($"title").agg(count($"title").as("count")).
  show
// +------------------+-----+
// |             title|count|
// +------------------+-----+
// |           271_a.C|    3|
// |Chiron_Elias_Krase|    2|
// +------------------+-----+
 类似资料:
  • 问题内容: 这是我的基本表(显示为关联数组): 在另一个名为的表上,我可以添加多个属于一行的自定义数据。像这样: 我目前正在制作,但它只会增加,并给我行。我想要的是我的查询返回如下内容: 行值变成一列,而,它就是值。 我该怎么做? 问题答案: SELECT a.ID, a.Campaign_ID, a.FirstName, a.LastName, MAX(CASE WHEN b.data = ‘q

  • 我有一个包含以下数据的Spark数据框(我使用Spark csv加载数据): 是否有类似于spark RDD的东西可以返回spark数据帧:(基本上,对相同的键值求和) (我可以将数据转换为RDD并执行reduceByKey操作,但是否有更具Spark DataFrame API的方法来实现这一点?)

  • 我用的是这样的意图: 在中,我有以下内容: 当我从特定的相册中选择图像时,比如“posts”、“profile photos”(参见屏幕截图),我无法在中获取图像路径。可以从其他相册中选择图像,没有任何问题。 我尝试在中添加但返回。 这里也有类似的问题,但没有人回答。 请救命!

  • 一条指令可以有零或多个操作数-指令操作的数据。零操作数的一个例子是NOP(no operation)。操作数可以在下面的位置: 位于指令本身(立即数) 位于寄存器(EAX, EBX, ECX, EDX, ESI, EDI, ESP, 或者EBP,如果是32位操作数;AX, BX, CX, DX, SI, DI, SP, 或者BP,如果是16位操作数;AH, AL, BH, BL, CH, CL,

  • 我想从实体类中选择特定的列,而忽略方法中的其他列。为此,我不能使用@JsonIgnore,因为在另一个方法中,我想调用该实体类中可用的所有列。 现在我想要一个方法,它提供忽略@JsonIgnore列的数据,另一个方法提供所有四列,包括调用时带有@JsonIgnore注释的列。 你能帮我解决这个问题吗

  • 问题内容: 我有一个包含以下数据的Spark数据帧(我使用spark-csv加载数据): 有什么类似于spark RDD的东西可以返回一个Spark DataFrame如下:(基本上,对相同的键值求和) (我可以将数据转换为RDD并进行操作,但是还有更多的Spark DataFrame API方式可以做到这一点吗?) 问题答案: 如果你不关心列名,你可以使用后跟: 否则最好替换为: 最后,您可以使