当前位置: 首页 > 知识库问答 >
问题:

在pyspark RDD上执行映射/减少时出错

娄丁雨
2023-03-14

我只是想学习PySpark,但对以下两个RDD之间的区别感到困惑,我知道一个是类型集,一个是列表,但两者都是RDD

rdd = sc.parallelize([('a', 1), ('b', 1), ('a', 3)])
type(rdd)

rdd = sc.parallelize(['a, 1', 'b, 1', 'a, 3'])
type(rdd)

图和减函数处理代码:

priceMap= s.map(lambda o: (o.split(",")[0], float(o.split(",")[1])))
priceMap.reduceByKey(add).take(10)

我可以很容易地执行映射/减少功能对第二个rdd数据,但当我尝试执行映射或减少我得到以下错误:那么我们如何将第一个rdd转换为第二个rdd数据,或者如果有任何方法来解决以下错误请帮助

Py4JJavaError:调用z:org时出错。阿帕奇。火花应用程序编程接口。蟒蛇蟒蛇。运行作业:组织。阿帕奇。火花SparkException:作业因阶段失败而中止:阶段162.0中的任务0失败1次,最近一次失败:阶段162.0中的任务0.0丢失(TID 3850,localhost,executor driver):org。阿帕奇。火花应用程序编程接口。蟒蛇PythonException:回溯(上次调用):

共有1个答案

关项明
2023-03-14

对于第一个rdd,您可以替换map函数:

rdd = sc.parallelize([('a', 1), ('b', 1), ('a', 3)])
rdd.map(lambda o: (o[0], float(o[1]))).reduceByKey(add).collect()

这是因为拆分只适用于字符串,而不适用于元组。

 类似资料:
  • 问题内容: 如何在Java 8中使用泛型参数重载Function? 错误:java:名称冲突:sum(java.util.function.Function )和sum(java.util.function.Function )具有相同的擦除 问题答案: 您所提出的示例与Java 8无关,与Java中泛型的工作原理无关。并将在编译时进行类型擦除,并将其转换为。方法重载的经验法则是具有不同的数量,类

  • 嗨,我有下面的map-reduce代码,我试图通过它解析我的XML文件并在输出中创建一个CSV。 我还有一个名为Connect_Home的类,在这个类中,我使用JAXB解析数据,提取数据。但当我运行代码时,会出现以下错误:

  • 问题内容: 我目前正在开发一个涉及在Rails服务器上同步联系人的应用程序。我正在使用Redis服务器和sidekiq在后台执行联系人同步。我的数据库是mongodb,我正在使用Mongoid gem作为ORM。工作流程如下: 电话上的联系人通过应用程序传递到Rails服务器,然后在Rails服务器上,它在Redis服务器中排队。 现在,cron作业会触发连接到Redis的sidekiq并完成作业

  • 在Spark中有几个优化可以减少批处理的时间。这些可以在优化指南中作了讨论。这节重点讨论几个重要的。 数据接收的并行水平 通过网络(如kafka,flume,socket等)接收数据需要这些数据反序列化并被保存到Spark中。如果数据接收成为系统的瓶颈,就要考虑并行地接收数据。注意,每个输入DStream创建一个receiver(运行在worker机器上) 接收单个数据流。创建多个输入DStrea

  • 我在一个大约50个节点的集群上运行2.2.0上的hadoop,我的工作是64个map任务和20个reduce任务。map在大约30分钟内完成,然后所有reduce任务都在运行,但是我发现一个奇怪的日志是这样的:

  • 我有以下形式的地图: 让INNER成为内部地图,即。 例如,我想在一个新的中减少START映射 它们具有相同的键,但值不同。特别是,对于每个键,我希望新的Double值是相应键的INNER映射中值的SUM。 如何使用JAVA 8的流API来实现这一点? 谢谢大家。 编辑:样例地图为 我想要一张像下面这样的新地图: