当前位置: 首页 > 知识库问答 >
问题:

编码还原ByKey(lambda)在map不工作pySpark

赵经国
2023-03-14

我不明白为什么我的代码不起作用。最后一行是问题:

import findspark
findspark.init()
from pyspark import SparkConf, SparkContext
from pyspark.sql.types import StringType
from pyspark import SQLContext
conf=SparkConf().setMaster("local").setAppName("mein soft")
sc=SparkContext(conf=conf)
sqlContext=SQLContext(sc)

lines=sc.textFile("File.txt")
#lines.repartition(3)
lines.getNumPartitions()

def lan_map(x):
    if "word1" and "word2" in x:
        return ("Count",(1,1))
    elif "word1" in x:
        return ("Count",("1,0"))
    elif "word2" in x:
        return ("Count",("0,1"))
    else:
        return ("Count",("0,0"))
    
mapfun=lines.map(lan_map)

mapfun.reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1])).collect() 

错误是:

---------------------------------------------------------------------------Py4JJavaError回溯(最近一次调用最后一次)在1#Esto resume lo que se hicimos 3 celdas atrás中----

C:\spark-3.1.2-bin-hadoop3。2\python\pyspark\rdd。将SCCallSiteSync(self.context)作为css保存在collect(self)947“948中的py:--

C:\Spark-3.1.2-bin-hadoop3.2\python\lib\py4j-0.10.9-src.zip\py4j\java_gateway.py在调用(自我,*args)1302 1303回答=self.gateway_client.send_command(命令)-

C:\spark-3.1.2-bin-hadoop3。2\python\pyspark\sql\utils。带装饰的py(*a,**千瓦)109 def装饰(*a,**千瓦):110尝试:--

C:\spark-3.1.2-bin-hadoop3。2\python\lib\py4j-0.10.9-src。zip\py4j\protocol。py in get_return_value(应答,网关客户端,目标id,名称)324 value=OUTPUT_CONVERTER[type](应答[2],网关客户端)325如果应答[1]==引用类型:--

Py4JJavaError:调用z:org时出错。阿帕奇。火花应用程序编程接口。python蟒蛇。收集和服务:组织。阿帕奇。火花SparkException:作业因阶段失败而中止:阶段0.0中的任务0失败1次,最近的失败:阶段0.0中的任务0.0丢失(TID 0)(笔记本电脑-PB7QDPVE执行器驱动程序):org。阿帕奇。火花应用程序编程接口。pythonPythonException:Traceback(最后一次调用):文件“C:\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\worker.py”,第604行,主文件“C:\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\worker.py”,第594行,进程文件“C:\spark-3.1.2-bin-hadoop3.2\python\pyspark\rdd.py”,第2916行,在管道函数返回函数(拆分,前置函数(拆分,迭代器))文件“C:\spark-3.1.2-bin-hadoop3.2\python\pyspark\rdd.py”第2916行中,在管道函数返回函数(拆分,前置函数(拆分,迭代器))文件“C:\spark-3.1.2-bin-hadoop3.2\python\pyspark\rdd.py”第418行中,在func返回f(迭代器)文件“C:\spark-3.1.2-bin-hadoop3.2\python\pyspark\rdd.py”中,第2144行,组合合并。合并值(迭代器)文件“C:\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\shuffle.py”,第242行,在合并值d[k]=comb(d[k],v)中如果d else创建者中有k(v)文件“C:\spark-3.1.2-bin-hadoop3.2\python\pyspark\util.py”,第73行,在包装器返回f(*args,**kwargs)文件“,第2行,in-TypeError:'int'和'str'的操作数类型不受支持

在org。阿帕奇。火花应用程序编程接口。pythonBasePythonRunner$ReaderIterator。handlePythonException(PythonRunner.scala:517)位于org。阿帕奇。火花应用程序编程接口。python蟒蛇跑步者$$anon$3。在org上阅读(PythonRunner.scala:652)。阿帕奇。火花应用程序编程接口。python蟒蛇跑步者$$anon$3。在org上阅读(PythonRunner.scala:635)。阿帕奇。火花应用程序编程接口。pythonBasePythonRunner$ReaderIterator。hasNext(PythonRunner.scala:470)位于org。阿帕奇。火花中断迭代器。scala上的hasNext(interruptableiterator.scala:37)。收集迭代器$groupeditor。在scala上填充(迭代器.scala:1209)。收集迭代器$groupeditor。scala上的hasNext(迭代器.scala:1215)。收集迭代器$$anon$10。hasNext(Iterator.scala:458)位于org。阿帕奇。火花洗牌分类绕过SortshufflewWriter。在org上编写(BypassMergeSortShuffleWriter.java:132)。阿帕奇。火花洗牌ShuffleWriteProcessor。在org上写(ShuffleWriteProcessor.scala:59)。阿帕奇。火花调度程序。洗牌运动。runTask(ShuffleMapTask.scala:99)位于org。阿帕奇。火花调度程序。洗牌运动。org上的runTask(ShuffleMapTask.scala:52)。阿帕奇。火花调度程序。任务在组织上运行(Task.scala:131)。阿帕奇。火花遗嘱执行人。执行者$TaskRunner$anonfun$run$3(Executor.scala:497)位于org。阿帕奇。火花util。Utils$。tryWithSafeFinally(Utils.scala:1439)位于org。阿帕奇。火花遗嘱执行人。执行者$TaskRunner。在java上运行(Executor.scala:500)。util。同时发生的线程池执行器。java上的runWorker(未知源)。util。同时发生的线程池执行器$Worker。在java上运行(未知源代码)。朗。丝线。运行(未知源)

驱动程序stacktrace:at org。阿帕奇。火花调度程序。达格调度器。组织中的failJobAndIndependentStages(DAGScheduler.scala:2258)。阿帕奇。火花调度程序。达格调度器$org上的anonfun$abortStage$2(DAGScheduler.scala:2207)。阿帕奇。火花调度程序。达格调度器$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)在scala。收集易变的。调整大小。scala的foreach(resizeablearray.scala:62)。收集易变的。调整大小。scala上的foreach$(resizeablearray.scala:55)。收集易变的。ArrayBuffer。foreach(ArrayBuffer.scala:49)位于org。阿帕奇。火花调度程序。达格调度器。组织上的abortStage(DAGScheduler.scala:2206)。阿帕奇。火花调度程序。达格调度器$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)位于组织。阿帕奇。火花调度程序。达格调度器$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)在scala上失败。选项foreach(Option.scala:407)位于org。阿帕奇。火花调度程序。达格调度器。handleTaskSetFailed(DAGScheduler.scala:1079)位于组织。阿帕奇。火花调度程序。DAGSchedulerEventProcessLoop。doOnReceive(DAGScheduler.scala:2445)位于org。阿帕奇。火花调度程序。DAGSchedulerEventProcessLoop。org上的onReceive(DAGScheduler.scala:2387)。阿帕奇。火花调度程序。DAGSchedulerEventProcessLoop。org上的onReceive(DAGScheduler.scala:2376)。阿帕奇。火花util。EventLoop$$anon$1。在org上运行(EventLoop.scala:49)。阿帕奇。火花调度程序。达格调度器。org上的runJob(DAGScheduler.scala:868)。阿帕奇。火花SparkContext。org上的runJob(SparkContext.scala:2196)。阿帕奇。火花SparkContext。org上的runJob(SparkContext.scala:2217)。阿帕奇。火花SparkContext。org上的runJob(SparkContext.scala:2236)。阿帕奇。火花SparkContext。org上的runJob(SparkContext.scala:2261)。阿帕奇。火花rdd。RDD$anonfun$在org上收集$1(RDD.scala:1030)。阿帕奇。火花rdd。RDDOperationScope$。使用组织上的示波器(RDDOperationScope.scala:151)。阿帕奇。火花rdd。RDDOperationScope$。在org上使用scope(RDDOperationScope.scala:112)。阿帕奇。火花rdd。RDD。组织上的WistScope(RDD.scala:414)。阿帕奇。火花rdd。RDD。在org上收集(RDD.scala:1029)。阿帕奇。火花应用程序编程接口。python蟒蛇$。collectAndServe(PythonRDD.scala:180)位于org。阿帕奇。火花应用程序编程接口。python蟒蛇。在太阳下收集和服务(蟒蛇)。反映NativeMethodAccessorImpl。在sun上调用0(本机方法)。反映NativeMethodAccessorImpl。在sun上调用(未知源)。反映DelegatingMethodAccessorImpl。在java上调用(未知源)。朗。反思。方法在py4j调用(未知源)。反射MethodInvoker。在py4j上调用(MethodInvoker.java:244)。反射反射引擎。在py4j调用(ReflectionEngine.java:357)。网关。在py4j调用(Gateway.java:282)。命令。抽象命令。py4j上的invokeMethod(AbstractCommand.java:132)。命令。呼叫命令。在py4j上执行(CallCommand.java:79)。网关连接。在java上运行(GatewayConnection.java:238)。朗。丝线。运行(未知源)原因:org。阿帕奇。火花应用程序编程接口。pythonPythonException:Traceback(最后一次调用):文件“C:\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\worker.py”,第604行,主文件“C:\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\worker.py”,第594行,进程文件“C:\spark-3.1.2-bin-hadoop3.2\python\pyspark\rdd.py”,第2916行,在管道函数返回函数(拆分,前置函数(拆分,迭代器))文件“C:\spark-3.1.2-bin-hadoop3.2\python\pyspark\rdd.py”第2916行中,在管道函数返回函数(拆分,前置函数(拆分,迭代器))文件“C:\spark-3.1.2-bin-hadoop3.2\python\pyspark\rdd.py”第418行中,在func返回f(迭代器)文件“C:\spark-3.1.2-bin-hadoop3.2\python\pyspark\rdd.py”中,第2144行,组合合并。合并值(迭代器)文件“C:\spark-3.1.2-bin-hadoop3.2\python\lib\pyspark.zip\pyspark\shuffle.py”,第242行,在合并值d[k]=comb(d[k],v)中如果d else创建者中有k(v)文件“C:\spark-3.1.2-bin-hadoop3.2\python\pyspark\util.py”,第73行,在包装器返回f(*args,**kwargs)文件“,第2行,in-TypeError:'int'和'str'的操作数类型不受支持

在org。阿帕奇。火花应用程序编程接口。pythonBasePythonRunner$ReaderIterator。handlePythonException(PythonRunner.scala:517)位于org。阿帕奇。火花应用程序编程接口。python蟒蛇跑步者$$anon$3。在org上阅读(PythonRunner.scala:652)。阿帕奇。火花应用程序编程接口。python蟒蛇跑步者$$anon$3。在org上阅读(PythonRunner.scala:635)。阿帕奇。火花应用程序编程接口。pythonBasePythonRunner$ReaderIterator。hasNext(PythonRunner.scala:470)位于org。阿帕奇。火花中断迭代器。scala上的hasNext(interruptableiterator.scala:37)。收集迭代器$groupeditor。在scala上填充(迭代器.scala:1209)。收集迭代器$groupeditor。scala上的hasNext(迭代器.scala:1215)。收集迭代器$$anon$10。hasNext(Iterator.scala:458)位于org。阿帕奇。火花洗牌分类绕过SortshufflewWriter。在org上编写(BypassMergeSortShuffleWriter.java:132)。阿帕奇。火花洗牌ShuffleWriteProcessor。在org上写(ShuffleWriteProcessor.scala:59)。阿帕奇。火花调度程序。洗牌运动。runTask(ShuffleMapTask.scala:99)位于org。阿帕奇。火花调度程序。洗牌运动。org上的runTask(ShuffleMapTask.scala:52)。阿帕奇。火花调度程序。任务在组织上运行(Task.scala:131)。阿帕奇。火花遗嘱执行人。执行者$TaskRunner$anonfun$run$3(Executor.scala:497)位于org。阿帕奇。火花util。Utils$。tryWithSafeFinally(Utils.scala:1439)位于org。阿帕奇。火花遗嘱执行人。执行者$TaskRunner。在java上运行(Executor.scala:500)。util。同时发生的线程池执行器。java上的runWorker(未知源)。util。同时发生的线程池执行器$Worker。运行(未知源)。。。还有一个

我感到如此失落,以至于我甚至不能从我的地图上只返回一个位置。我的意思是这不应该工作:

mapfun[1]

我试过用函数代替。但我失败得更惨:

def fun2(x,y):
    x[0]+y[0]
    x[1]+y[1]
mapfun.reduceByKey(fun2(x,y)).collect()

共有1个答案

上官凯歌
2023-03-14

您正在接收错误

类型错误:不支持的操作数类型:'int'和'str'

因为你的元组值是字符串("1,0")而不是(1,0),python目前不会应用此运算符或添加intstr(string)数据类型。

此外,在映射函数中,您在x中具有word1和word2的比较中似乎存在逻辑错误,因为这将仅检查word2是否在x中。我建议重写以下内容:

def lan_map(x):
    if "word1" in x and "word2" in x:
        return ("Count",(1,1))
    elif "word1" in x:
        return ("Count",(1,0))
    elif "word2" in x:
        return ("Count",(0,1))
    else:
        return ("Count",(0,0))

或者可能更短

def lan_map(x):
     return ("Count", (
         1 if "word1" in x else 0,
         1 if "word2" in x else 0
     ))

让我知道这是否适合你。

 类似资料:
  • 请..来个身体救救我!!!! 这是main.class 源代码:https://github.com/yezhang1989/k-means-clustering-on-MapReduce

  • 我正在集群中使用hazelcast-2.5。我有一个映射(键:字符串,值:用户定义对象的ArrayList)。我可以在大多数地方put/remove fine,但在代码的一个特定部分,put操作会无声无息地失败(用于put操作的键字符串是唯一的,ArrayList也不是空的)。不会引发异常。如果涉及锁,我甚至尝试了tryPut,该调用给出了一个真实的返回值。在put操作之后,我尝试打印出映射的ke

  • 我尝试使用之前的Google Map示例代码,它在中提供,使用了所有可用的选项,它显示了以下日志。 01-14 17:58:39.773:E/Google Maps Android API(13114):授权失败。 它向我显示了所有示例映射选项的空白屏幕。 manifest.xml 日志 截图 这就是结果,带有缩放选项的空白屏幕,在MapView中没有地图。 2.)Google Map Andro

  • 问题内容: 我开始学习Android操作系统。我仍处于学习模式。在学习Android的同时,似乎我对Java有了一些更新,并且因为我发现的大多数示例都希望您使用Eclipse …我想我也在学习Eclipse(这是一件好事,我没有抱怨)…这个问题主要是关于Eclipse的,以及如何对当前状态进行基本保存。 我没有版本控制系统运行。一旦对行进路线,想要的东西以及需要做的事情有了很好的感觉,我便会解决一

  • 我有一本书。带有字符串的java文件。 当我在中打开这个文件并将编码更改为ISO-8859-1时,它会显示适当的字符串:,但如果我在理念intellij中打开文件并将编码更改为ISO-8859-1,它会给我一个警告,即某些符号无法转换,然后用标记替换这些符号:。 为什么会这样?为什么Notepad可以转换文件,而idea不能?