当前位置: 首页 > 知识库问答 >
问题:

PySpark RuntimeError:在迭代期间设置更改的大小

尹承业
2023-03-14

我正在运行一个pyspark脚本,在下面遇到了一个错误。由于我的代码“如果len(RDD.Take(1))>0:”,它似乎在说“RuntimeError:在迭代期间设置更改的大小”。我不确定这是不是真正的原因,不知道到底出了什么问题。任何帮助都将不胜感激。

谢谢!

17/03/23 21:54:17 INFO DStreamGraph: Updated checkpoint data for time 1490320070000 ms
17/03/23 21:54:17 INFO JobScheduler: Finished job streaming job 1490320072000 ms.0 from job set of time 1490320072000 ms
17/03/23 21:54:17 INFO JobScheduler: Starting job streaming job 1490320072000 ms.1 from job set of time 1490320072000 ms
17/03/23 21:54:17 ERROR JobScheduler: Error running job streaming job 1490320072000 ms.0
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/util.py",

第65行,在调用r=self.func(t,*rdds)文件“/usr/lib/spark/python/lib/pysspark.zip/pyspark/streaming/dstream.py”中,第159行,在func=lambda t中,第159行,在_compute_glb_max中,如果len(rdd.take(1))>0:File“/usr/lib/spark/python/lib/pysspark.zip/pyspark/rdd)文件”vars=[x._jbroadcast for x in sc._pickled_broadcast_vars]运行时错误:在迭代期间设置更改的大小

  at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
  at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
  at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
  at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
  at scala.util.Try$.apply(Try.scala:192)
  at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:254)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:253)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)
Traceback (most recent call last):
  File "/home/richard/Documents/spark_code/with_kafka/./mongo_kafka_spark_script.py",

行224,在SSC.AwaitTermination()中;文件“/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/context.py”,第206行,在等待终止文件“/usr/lib/spark/py4j-0.10.4-src.zip/py4j/java_gateway.py”中,第1133行,在调用文件“/usr/lib/spark/py4j-0.10.4-src.zip/pyspark/sql/utils.py”中,第63行,在deco文件“:org.apache.spark.sparkException:Python引发了异常:Traceback(最近一次调用):文件“/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/util.py”,第65行,在调用r=self.func(t,*rdds)文件“/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py”,第159行,在func=lambda t,rdd:old_func(rdd)文件“roadcast_vars,env,includes=_prepare_for_python_rdd(sc,command)文件“/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py”,行2363,in_prepare_for_python_rdd broadcast_vars=[x._jbroadcast for x in sc._pickled_broadcast_vars]运行时错误:在迭代期间设置更改的大小

  at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
  at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
  at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
  at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
  at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
  at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
  at scala.util.Try$.apply(Try.scala:192)
  at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:254)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:254)
  at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
  at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:253)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  at java.lang.Thread.run(Thread.java:745)

共有1个答案

严繁
2023-03-14

在迭代之间创建广播变量似乎不是最佳实践。如果可能,在需要有状态数据时,始终使用updateStateByKey。

 类似资料:
  • 如何在迭代中更改python迭代器? 例如: 此打印: 我想打印: 编辑:抱歉,这个问题让人们感到困惑。我对为什么当我在for循环中尝试更改迭代器时,for循环在下一次迭代中忽略它很感兴趣。 其他人已经提交了一个答案来清除我的困惑。范围创建一个列表,然后for循环将迭代器分配给列表中的下一个变量。

  • 问题内容: 假设我有以下几种类型: 我想迭代节点的属性以更改它们。 我本来希望能够做到: 但是由于不是指针,所以这行不通,我必须这样做: 有没有更简单或更快速的方法?是否可以直接从中获取指针? 显然,我不想仅仅为了迭代而更改结构,更冗长的解决方案不是解决方案。 问题答案: 不,您想要的缩写是不可能的。 原因是从您要遍历的切片中复制值。关于范围的规范说: value (if 2nd variable

  • 问题内容: 我有这样的obj 它应该扩展到 我在下面编写了代码,按拆分,删除旧键,如果包含则追加新键,但是它说 对于obj.iteritems()中的k,v: RuntimeError:词典在迭代过程中更改了大小 问题答案: 就像消息说的那样:在循环遍历expand中的这些条目的过程中,您更改了expandField()内部obj中的条目数。 您可以尝试创建所需形式的新字典,或者以某种方式记录要进

  • 问题内容: 所以一切正常 但是,如果已经创建了驱动程序,则无法安装代理。这是行不通的 也是如此。 为什么?不能理解。我做错了吗? 问题答案: 在Firefox中使用WebDriver时,使用配置文件是一次性的事情。驱动程序启动浏览器时,它将配置文件对象写入磁盘,然后启动浏览器可执行文件。此后,浏览器将没有任何机制可以读取对WebDriver配置文件对象的任何进一步的更改。要更改代理,必须在启动浏览

  • 我正在努力改进我写的一个数据传输程序。我在寻找如何让它更快的建议。我的程序通过填充ResultSet并将结果写入文件来从数据库(通常是Oracle11g)中提取数据。该程序定期查看表,并查询某个特殊列是否发生了更改。例如,这可能是这样一个查询:

  • 使用指南 - 统计设置 - 概述 - 设置更改后的生效时间 百度统计中的设置项在设置或更改后,一般20分钟后生效。