当前位置: 首页 > 知识库问答 >
问题:

在Spark中按键减少到元组列表

权弘新
2023-03-14

我试图转置我的数据,以便它是每个键的元组列表,而不是数据列。

作为我的投入:

1   234   54   7   9
2   654   34   2   1
1   987   22   4   6

我希望将输出写入拼花文件

1:2   ((234, 54, 7, 9), (987, 22, 4, 6))
2:6   ((654, 34 2 1))

作为输入,我有两组拼花地板数据。我阅读它们并将其作为数据帧加入。然后我将每一行映射到键值对,然后将每个键还原为元组的大列表。

val keyedRDD = joinedDF.map(row => (
  ""+row.getInt(0)+":"+(row.getInt(1)/PARTITION_SIZE),
  List(Record(
    row.getInt(1),
    row.getInt(2),
    row.getInt(3),
    row.getInt(4)
  ))
))

val reduced = keyedRDD.reduceByKey(_:::_)

这里PARTITION_SIZE只是我为每次运行设置的一个变量,用于将数据分割成该大小的块。所以,如果我输入100000,并且有200万记录,那么编号为0-99,999的记录将在一个桶中,100,000-199,999将在下一个桶中,依此类推。

Record只是一个简单的case类来保存这些数据,我尝试过使用简单的元组,并将值单独放入一个列表中,得到了相同的结果。

我的理解是,这应该减少到每个键一个列表的输出,正如我前面所述。然而,我无法完成这项工作。在Spark History Server中,即使Ganglia显示至少80%的CPU使用率和高内存使用率,它也总是显示它在映射阶段挂起(甚至不启动它)。控制台被以下消息卡住:

16/01/18 01:26:10 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 2 is 2485 bytes
16/01/18 01:26:10 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 2 to ip-172-31-7-127.ec2.internal:34337
16/01/18 01:26:10 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 2 to ip-172-31-7-129.ec2.internal:45407
16/01/18 01:26:17 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 1 to ip-172-31-7-128.ec2.internal:59468
16/01/18 01:26:17 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 1 is 75087 bytes
16/01/18 01:26:18 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 1 to ip-172-31-7-127.ec2.internal:34337
16/01/18 01:26:18 INFO MapOutputTrackerMasterEndpoint: Asked to send map output locations for shuffle 1 to ip-172-31-7-129.ec2.internal:45407

一个数据集大约是3GB,另一个大约是22GB,所以一点都不大。但我想可能是我的内存不足了(即使我在被卡住20个小时后才收到OOM或executor丢失的消息)。我用m3试过EMR集群。xlarge带有2个从节点,m3。xlarge有6个从节点,甚至r3。xlarge有6个从节点,但仍然会遇到同样的问题。我已经设置了我的EMR集群,以便为Spark提供最大可能的内存分配、给定Spark动态分配、干扰memoryFraction设置等。

我只是不明白为什么这个会挂在那里。我试着简化它,只是在地图上做一个(键,1)RDD,并添加减少,它在20分钟内完成。

共有1个答案

淳于宏伯
2023-03-14

在昂贵的操作中附加到列表是一个常见的错误。记住Scala对不可变对象的偏见。最好从谷歌的“Scala列表附加性能”开始。这将为你提供几个很棒的博客,详细描述问题和建议。一般来说,添加到一个列表是一个昂贵的操作,因为每个操作都会产生一个新的列表——非常需要计算和内存。你可以在列表前加前缀,或者最好的答案通常是listbuffer。再次查看博客,并注意性能特征

http://www.scala-lang.org/docu/files/collections-api/collections_40.html

 类似资料:
  • 我的问题基本上归结为将列表减少为链表,但从reduce函数推断出的类型似乎不正确。 我的列表如下所示 我希望reduce函数在每个reduce步骤中都能做到这一点 然而,减少函数似乎认为这不起作用,因为我猜它不认为身份是节点。 这是我的密码。 我做错了什么? 在接受答案后编辑,我的代码如下所示: 现在打印出来了 我仍然不知道为什么Java不能告诉减少函数的第三个参数是不必要的,它永远不会被调用,但

  • 问题内容: 是否可以从元组中获取价值: 通过像这样调用STR键 Python说只有int可以用于这种类型的“查询” 我不能使用循环(太多的开销…) 谢谢! 问题答案: 此类查询的规范数据结构是字典: 如果使用元组,则无法避免循环(显式或隐式)。

  • 但是,此修复: 有没有更优雅的方式这样做?

  • 我正在研究一种需要对大矩阵进行数学运算的算法。基本上,该算法包括以下步骤: 输入:大小为n的两个向量u和v > 对于两个矩阵中的每个条目,应用一个函数f。返回两个矩阵M_u,M_v 求M_的本征值和本征向量。对于i=0,返回e_i,ev_i,。。。,n-1 计算每个特征向量的外积。返回一个矩阵O_i=e_i*转置(e_i),i=0,。。。,n-1 用e_i=e_i delta_i调整每个特征值,其

  • 问题内容: 我大约提高了一些代码,以前也问了一个问题在这里。@霍尔格给了我正确的回答,他说: 每当发现自己通过groupingBy使用reducer收集器时,都应检查toMap是否更合适 好像是花样!他建议我做的只是完美的。 这是众所周知的模式吗?为什么比(在某些情况下)结合和更好? 问题答案: 通过使用两个收集器的经验,这种模式变得显而易见。您会在Stackoverflow上找到一些问题和解答,