当前位置: 首页 > 知识库问答 >
问题:

Apache Spark:大型数据集的pyspark崩溃

史俊德
2023-03-14

我是新来的火花。并且我有一个训练数据4000x1800的输入文件。当我试图训练这个数据(编写的python)时,得到以下错误:

>

  • 14/11/15 22:39:13错误pythonrdd:Python工作器意外退出(崩溃)java.net.socketException:对等点重置连接:套接字写入错误

    sparkException:作业因阶段失败而中止:阶段0.0中的任务0失败了1次,最近的失败:阶段0.0中的任务0丢失(TID 0,本地主机):java.net.socketException:由对等方重置连接:套接字写入错误

    使用spark 1.1.0。任何建议都会有很大的帮助。

     from pyspark.mllib.classification import SVMWithSGD
        from pyspark.mllib.regression import LabeledPoint
        from pyspark.mllib.linalg import Vectors 
        from pyspark import SparkContext
        from pyspark import SparkConf, SparkContext
        from numpy import array
    
    
        #Train the model using feature matrix
        # Load and parse the data
        def parsePoint(line):
            values = [float(x) for x in line.split(' ')]
            return LabeledPoint(values[0], values[1:])
    
        #create spark Context
        conf = (SparkConf()
             .setMaster("local")
             .setAppName("My app")
             .set("spark.executor.memory", "1g"))
        sc = SparkContext(conf = conf)
    
        data = sc.textFile("myfile.txt")
        parsedData = data.map(parsePoint)
    
        #Train SVM model
        model = SVMWithSGD.train(parsedData,100)
    

    我得到以下错误:

    14/11/15 22:38:38 INFO MemoryStore: ensureFreeSpace(32768) called with curMem=0, maxMem=278302556
    14/11/15 22:38:38 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.0 KB, free 265.4 MB)
    >>> parsedData = data.map(parsePoint)
    >>> model = SVMWithSGD.train(parsedData,100)
    14/11/15 22:39:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    14/11/15 22:39:12 WARN LoadSnappy: Snappy native library not loaded
    14/11/15 22:39:12 INFO FileInputFormat: Total input paths to process : 1
    14/11/15 22:39:13 INFO SparkContext: Starting job: runJob at PythonRDD.scala:296
    14/11/15 22:39:13 INFO DAGScheduler: Got job 0 (runJob at PythonRDD.scala:296) with 1 output partitions (allowLocal=true)
    14/11/15 22:39:13 INFO DAGScheduler: Final stage: Stage 0(runJob at PythonRDD.scala:296)
    14/11/15 22:39:13 INFO DAGScheduler: Parents of final stage: List()
    14/11/15 22:39:13 INFO DAGScheduler: Missing parents: List()
    14/11/15 22:39:13 INFO DAGScheduler: Submitting Stage 0 (PythonRDD[3] at RDD at PythonRDD.scala:43), which has no missing parents
    14/11/15 22:39:13 INFO MemoryStore: ensureFreeSpace(5088) called with curMem=32768, maxMem=278302556
    14/11/15 22:39:13 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 5.0 KB, free 265.4 MB)
    14/11/15 22:39:13 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (PythonRDD[3] at RDD at PythonRDD.scala:43)
    14/11/15 22:39:13 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
    14/11/15 22:39:13 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 1221 bytes)
    14/11/15 22:39:13 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
    14/11/15 22:39:13 INFO HadoopRDD: Input split: file:/G:/SparkTest/spark-1.1.0/spark-1.1.0/bin/FeatureMatrix.txt:0+8103732
    14/11/15 22:39:13 INFO PythonRDD: Times: total = 264, boot = 233, init = 29, finish = 2
    14/11/15 22:39:13 ERROR PythonRDD: Python worker exited unexpectedly (crashed)
    java.net.SocketException: Connection reset by peer: socket write error
            at java.net.SocketOutputStream.socketWrite0(Native Method)
            at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
            at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
            at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
            at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
            at java.io.DataOutputStream.write(DataOutputStream.java:107)
            at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
            at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:533)
            at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:341)
            at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:340)
            at scala.collection.Iterator$class.foreach(Iterator.scala:727)
            at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
            at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:340)
            at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)
            at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
            at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
            at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
            at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)
    14/11/15 22:39:13 ERROR PythonRDD: This may have been caused by a prior exception:
    java.net.SocketException: Connection reset by peer: socket write error
            at java.net.SocketOutputStream.socketWrite0(Native Method)
            at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
            at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
            at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
            at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
            at java.io.DataOutputStream.write(DataOutputStream.java:107)
            at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
            at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:533)
            at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:341)
            at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:340)
            at scala.collection.Iterator$class.foreach(Iterator.scala:727)
            at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
            at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:340)
            at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)
            at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
            at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
            at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
            at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)
    14/11/15 22:39:13 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
    java.net.SocketException: Connection reset by peer: socket write error
            at java.net.SocketOutputStream.socketWrite0(Native Method)
            at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
            at java.net.SocketOutputStream.write(SocketOutputStream.java:159)
            at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
            at java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
            at java.io.DataOutputStream.write(DataOutputStream.java:107)
            at java.io.FilterOutputStream.write(FilterOutputStream.java:97)
            at org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:533)
            at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:341)
            at org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:340)
            at scala.collection.Iterator$class.foreach(Iterator.scala:727)
            at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
            at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:340)
            at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)
            at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
            at org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
            at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
            at org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)
    14/11/15 22:39:13 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.net.SocketException: Connection reset by peer: socket write error
            java.net.SocketOutputStream.socketWrite0(Native Method)
            java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
            java.net.SocketOutputStream.write(SocketOutputStream.java:159)
            java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
            java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
            java.io.DataOutputStream.write(DataOutputStream.java:107)
            java.io.FilterOutputStream.write(FilterOutputStream.java:97)
            org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:533)
            org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:341)
            org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:340)
            scala.collection.Iterator$class.foreach(Iterator.scala:727)
            scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
            org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:340)
            org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)
            org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
            org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
            org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
            org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)
    14/11/15 22:39:13 ERROR TaskSetManager: Task 0 in stage 0.0 failed 1 times; aborting job
    14/11/15 22:39:13 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
    14/11/15 22:39:13 INFO TaskSchedulerImpl: Cancelling stage 0
    14/11/15 22:39:13 INFO DAGScheduler: Failed to run runJob at PythonRDD.scala:296
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "G:\SparkTest\spark-1.1.0\spark-1.1.0\python\pyspark\mllib\classification.py", line 178, in train
        return _regression_train_wrapper(sc, train_func, SVMModel, data, initialWeights)
      File "G:\SparkTest\spark-1.1.0\spark-1.1.0\python\pyspark\mllib\_common.py", line 430, in _regression_train_wrapper
        initial_weights = _get_initial_weights(initial_weights, data)
      File "G:\SparkTest\spark-1.1.0\spark-1.1.0\python\pyspark\mllib\_common.py", line 415, in _get_initial_weights
        initial_weights = _convert_vector(data.first().features)
      File "G:\SparkTest\spark-1.1.0\spark-1.1.0\python\pyspark\rdd.py", line 1167, in first
        return self.take(1)[0]
      File "G:\SparkTest\spark-1.1.0\spark-1.1.0\python\pyspark\rdd.py", line 1153, in take
        res = self.context.runJob(self, takeUpToNumLeft, p, True)
      File "G:\SparkTest\spark-1.1.0\spark-1.1.0\python\pyspark\context.py", line 770, in runJob
        it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal)
      File "G:\SparkTest\spark-1.1.0\spark-1.1.0\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py", line 538, in __call__
      File "G:\SparkTest\spark-1.1.0\spark-1.1.0\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py", line 300, in get_return_value
    py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, lo
    host): java.net.SocketException: Connection reset by peer: socket write error
            java.net.SocketOutputStream.socketWrite0(Native Method)
            java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)
            java.net.SocketOutputStream.write(SocketOutputStream.java:159)
            java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
            java.io.BufferedOutputStream.write(BufferedOutputStream.java:126)
            java.io.DataOutputStream.write(DataOutputStream.java:107)
            java.io.FilterOutputStream.write(FilterOutputStream.java:97)
            org.apache.spark.api.python.PythonRDD$.writeUTF(PythonRDD.scala:533)
            org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:341)
            org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$2.apply(PythonRDD.scala:340)
            scala.collection.Iterator$class.foreach(Iterator.scala:727)
            scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
            org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:340)
            org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:209)
            org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
            org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:184)
            org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1311)
            org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:183)
    Driver stacktrace:
            at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
            at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
            at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
            at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
            at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
            at scala.Option.foreach(Option.scala:236)
            at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
            at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
            at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
            at akka.actor.ActorCell.invoke(ActorCell.scala:456)
            at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
            at akka.dispatch.Mailbox.run(Mailbox.scala:219)
            at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
            at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
            at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
            at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
            at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
    
    >>> 14/11/15 23:22:52 INFO BlockManager: Removing broadcast 1
    14/11/15 23:22:52 INFO BlockManager: Removing block broadcast_1
    14/11/15 23:22:52 INFO MemoryStore: Block broadcast_1 of size 5088 dropped from memory (free 278269788)
    14/11/15 23:22:52 INFO ContextCleaner: Cleaned broadcast 1
    

    问候,Mrutyunjay

  • 共有1个答案

    薛高澹
    2023-03-14

    太简单了。

    conf = SparkConf().setMaster("local").setAppName("RatingsHistogram") 
    sc = SparkContext(conf = conf) 
    lines = sc.textFile("file:///SparkCourse/filter_1.csv",2000) 
    print lines.first()
    

    在使用sc.textfile时,向大值添加一个用于划分数的参数。数据越大,数值越大。

     类似资料:
    • 我有两个包含GB数据的大型pyspark数据框df1和df2。第一个数据框中的列是id1,col1。第二个数据框中的列是id2,col2。数据框的行数相等。id1和id2的所有值都是唯一的。id1的所有值也正好对应一个值id2。 因为。前几个条目与df1和df2区域相同,如下所示 DF1: df2: 所以我需要连接键 id1 和 id2 上的两个数据帧。df = df1.join(df2, df1

    • 问题内容: 一台服务器上的应用程序查询在另一台服务器上运行的redis。来自查询的结果数据集大约为25万,在应用服务器上似乎需要40秒。 在redis服务器或app服务器上使用命令执行命令时,在两种情况下,它们都需要大约40秒才能完成,如所述。 在查询期间,redis服务器使用大约15%的CPU。 问题: 花费40秒检索250k记录是否很慢?是否有可能将其加速到几秒钟? 问题答案: 首先,它取决于

    • 本文向大家介绍pytorch下大型数据集(大型图片)的导入方式,包括了pytorch下大型数据集(大型图片)的导入方式的使用技巧和注意事项,需要的朋友参考一下 使用torch.utils.data.Dataset类 处理图片数据时, 1. 我们需要定义三个基本的函数,以下是基本流程 这里,我将 读取图片 的步骤 放到 __getitem__ ,是因为 这样放的话,对内存的要求会降低很多,我们只是将

    • 问题内容: 我有一个大约有800万条新闻文章的语料库,我需要以稀疏矩阵的形式获取它们的TFIDF表示形式。我已经能够使用scikit-learn来实现相对较少的样本数量,但是我相信它不能用于如此庞大的数据集,因为它首先将输入矩阵加载到内存中,这是一个昂贵的过程。 谁知道,对于大型数据集,提取TFIDF向量的最佳方法是什么? 问题答案: Gensim具有高效的tf-idf模型,不需要一次将所有内容存

    • 问题内容: 我正在尝试设计一种无需分页就可以将大量数据(最多1000行)加载到页面中的方法。这方面的第一个障碍是以并行咬大小块查询数据库,这是我在如何使用AngularJS进行顺序RestWeb服务调用的解决方案的帮助下完成的。 但是,我在实施时遇到了两个问题: 每个返回的对象都将传递到一个数组中,然后该数组本身将作为Angular用来绑定的数组返回。即[[{{键:值,键:值,键:值},{键:值,

    • 问题内容: 当使用各种JDBC模板方法之一时,我对如何迭代/滚动大结果集(不适合内存)感到困惑。即使没有直接公开Iterable接口,我至少也希望RowQuerybackHandler实例在查询执行后( 而 不是在堆溢出之后)执行时被调用。 我也有在看一个这个(这什么都没有改变,我尽管是在精神上类似这个帖子上的堆栈溢出),并在该岗位在spring论坛。后者似乎暗示在游标获取数据时确实应该调用回调处

    • 我有一个庞大的CA的csv数据集。7GB,它有不同类型的列:string和Float。那么将其导入到Neo4J中的超快解决方案是什么呢? 我也尝试使用neo4j-admin导入工具,但每次我都被以下错误所困扰: Invoke-Neo4jAdmin:c:\users\shafigh.neo4jdesktop\neo4jdatabases\database-417e361b-f273-496c-983

    • 为了生成某些情况下的概率密度函数,可能需要考虑100万次观测。当我使用numpy数组时,遇到了大小限制32。 是不是太少了? 在这种情况下,我们如何存储32个以上的元素,而不将元素分布到不同的列中,或者在数组中分布数组?