当前位置: 首页 > 知识库问答 >
问题:

如何理解Spark中的reduceByKey?

范振海
2023-03-14

我正在努力学习Spark,到目前为止进展顺利,除了我需要在值为列表的一对RDD上使用诸如还原ByKey或组合ByKey之类的函数的问题。

我一直试图为这些函数找到详细的留档,这可以解释参数实际上是什么,这样我就可以自己解决它,而不必去Stack Overflow,但我就是找不到任何好的Spark留档。我读过Learning Spark的第3章和第4章,但老实说,对最复杂函数的解释非常糟糕。

我现在处理的问题如下:我有一对RDD,其中键是字符串,值是两个元素的列表,这两个元素都是整数。大概是这样的:(国家,[小时,计数])。对于每个键,我希望只保留计数最高的值,而不考虑时间。一旦我获得了上述格式的RDD,我就试图通过调用Spark中的以下函数来找到最大值:

reduceByKey(lambda x, y: max(x[1], y[1]))

但这会引发以下错误:

TypeError: 'int' object is not subscriptable

这对我来说毫无意义。我将参数x和y解释为两个键的值,例如x=[13,445]和y=[14,109],但是这个错误没有任何意义。我做错了什么?

共有1个答案

呼延鹏云
2023-03-14

假设您有((“key”,[13445]),(“key”,[14109]),(“key”,[15309])]

当它被传递给ReporteByKey时,它会将具有相同键的所有值分组到一个执行器中,即[13,445]、[14,109]、[15,309]并在值之间迭代

在第一次迭代中,x[13,445]y[14,109],输出是max(x[1], y[1])max(445,109),即445

在第二次迭代中,x是445,即前一个循环的最大值,y是15309

现在,当试图通过x[1]获得的第二个元素,并且445只是一个整数时,就会发生错误

TypeError:“int”对象不可下标

我希望这个错误的含义是清楚的。你可以在我的另一个答案中找到更多细节

上面的解释也解释了为什么@pault在评论部分提出的解决方案有效,即。

reduceByKey(lambda x, y: (x[0], max(x[1], y[1])))
 类似资料:
  • 我们在所有节点上都有6台机器、hdfs和纱线服务,1个主节点和6个从节点。我们在3台机器上安装Spark,1台主机器,3个工人(1个节点主工人)。我们知道,当主spark://[主机]:[端口]时,作业将仅运行3个节点,使用独立模式。当使用spark submit--master Thread提交一个jar时,它会使用所有6个服务器cpu和内存,还是只使用3个spark worker节点机器?如果

  • 我无法理解与Scope方法的功能(实际上,我真的不知道RDD操作范围类的含义) 特别是,(正文:=)的含义是什么 您可以通过以下链接找到源代码:https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala 任何人都可以帮我吗?谢谢,我对此感到困

  • 在Spark中,如何知道哪些对象在driver上实例化,哪些对象在executor上实例化,因此如何确定哪些类需要实现Serializable?

  • 我试图创建一个简洁的结构,用于理解基于未来的业务逻辑。下面是一个示例,其中包含一个基于异常处理的工作示例: 然而,这可能被视为一种非功能性或非Scala的处理方式。有更好的方法吗? 请注意,这些错误来自不同的来源——有些在业务级别(“检查所有权”),有些在控制器级别(“授权”),有些在数据库级别(“找不到实体”)。因此,从单一常见错误类型派生它们的方法可能不起作用。

  • 嗨,我是阿帕奇星火新用户。我正在学习的路上。我已经从kafka主题为json数据编写了spark streaming。下面是json数据的连续流。但现在我不知道如何使用这个json数据。我使用DataSet和DataFrame来处理Json数据,但遇到了一些错误。请用几个例子来帮助我,如何使用流式传输的数据流。 注意:我使用的是Apache Spark1.6.3版本。 代码: