我在使用 PySpark 的保存文件() 时遇到错误,并且在使用保存文件() 时收到相同的错误。我需要保存(键,值)的RDD,其中键是字符串,而值是标记点RDD(标签,稀疏向量)。错误如下所示。谷歌搜索几个来源,似乎我应该能够在IPython笔记本中做到这一点。我需要序列化这个大型RDD,以便我可以在Java中处理它,因为Spark的一些MLLib功能还不适用于python。根据这篇文章,这应该是可行的。
查看此页面,我看到:
_picklable_classes = [
'LinkedList',
'SparseVector',
'DenseVector',
'DenseMatrix',
'Rating',
'LabeledPoint',
]
所以我真的不知道我为什么会犯这个错误。
代码:labeleddatardd . saveassequencefile('/tmp/pysequencefile/')
错误:
Py4JJava错误: 调用 z:org.apache.spark.api.python.PythonRDD.saveAsSequenceFile 时出错。:组织.apache.spark.Spark异常:作业因阶段失败而中止:阶段 527.0 中的任务 0 失败 1 次,最近一次失败:阶段 527.0(TID 1454,本地主机)中的任务 0.0 丢失:net.razorvine.pickle.Pickle.Pickle 异常:在 net.razorvine.pickle.对象中构造 Class.java Dict(用于 numpy.dtype)的预期零参数
编辑:我发现了这个:
public class More ...ClassDictConstructor implements IObjectConstructor {
12
13 String module;
14 String name;
15
16 public More ...ClassDictConstructor(String module, String name) {
17 this.module = module;
18 this.name = name;
19 }
20
21 public Object More ...construct(Object[] args) {
22 if (args.length > 0)
23 throw new PickleException("expected zero arguments for construction of ClassDict (for "+module+"."+name+")");
24 return new ClassDict(module, name);
25 }
26}
我没有直接使用上面的构造()方法……所以我不知道为什么我尝试传递参数的saveAs……方法当它不需要参数时。
编辑2:遵循zero323建议(谢谢)有点小问题。当我尝试zero323写的内容时,我得到一个错误(见下文)。然而,当我得到一个更简单的RDD时,它可以工作,并将这个更简单的保存到。拼花锉(把它分成几份。拼花文件)。更简单的RDD如下:
simplerRDD = labeledDataRDD.map(lambda (k,v): (v.label, v.features))
sqlContext.createDataFrame(simplerRDD, ("k", "v")).write.parquet("labeledData_parquet_file")
尝试保存labeledDataRDD时出错:
/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/sql/types.pyc in _infer_schema(row)
831 raise TypeError("Can not infer schema for type: %s" % type(row))
832
--> 833 fields = [StructField(k, _infer_type(v), True) for k, v in items]
834 return StructType(fields)
835
/usr/local/Cellar/apache-spark/1.5.1/libexec/python/pyspark/sql/types.pyc in _infer_type(obj)
808 return _infer_schema(obj)
809 except TypeError:
--> 810 raise TypeError("not supported type: %s" % type(obj))
811
812
TypeError: not supported type: <type 'numpy.unicode_'>
问题的根源不是酸洗本身。如果是,您将看不到net.razorvine.pickle.PickleException
。如果您查看saveAsSequenceFile
留档,您将看到它需要两个步骤:
您的程序在第一步就失败了,但即使失败了,我也不能确切地确定预期的Java对象是什么以及如何读回它。
我不会玩序列文件,而是简单地将数据写入Parket文件:
from pyspark.mllib.regression import LabeledPoint
rdd = sc.parallelize([
("foo", LabeledPoint(1.0, [1.0, 2.0, 3.0])),
("bar", LabeledPoint(2.0, [4.0, 5.0, 6.0]))])
sqlContext.createDataFrame(rdd, ("k", "v")).write.parquet("a_parquet_file")
读回并转换:
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.sql.Row
import org.apache.spark.rdd.RDD
val rdd: RDD[(String, LabeledPoint)] = sqlContext.read.parquet("a_parquet_file")
.select($"k", $"v.label", $"v.features")
.map{case Row(k: String, label: Double, features: Vector) =>
(k, LabeledPoint(label, features))}
rdd.sortBy(_._1, false).take(2)
// Array[(String, org.apache.spark.mllib.regression.LabeledPoint)] =
// Array((foo,(1.0,[1.0,2.0,3.0])), (bar,(2.0,[4.0,5.0,6.0])))
或者如果您更喜欢类似Java的方法:
def rowToKeyLabeledPointPair(r: Row): Tuple2[String, LabeledPoint] = {
// Vector -> org.apache.spark.mllib.linalg.Vector
Tuple2(r.getString(0), LabeledPoint(r.getDouble(1), r.getAs[Vector](2)))
}
sqlContext.read.parquet("a_parquet_file")
.select($"k", $"v.label", $"v.features")
.map(rowToKeyLabeledPointPair)
编辑
一般来说,在 Spark SQL 中不支持将 NumPy 类型作为独立值。如果您在RDD中有Numpy类型,请先将这些类型转换为标准Python类型:
tmp = rdd.map(lambda kv: (str(kv[0]), kv[1]))
sqlContext.createDataFrame(tmp, ("k", "v")).write.parquet("a_parquet_file")
问题内容: 我要序列化第三方库中的特定类。我将如何去做呢? 我假设我将不得不编写一个方法,该方法接受类的对象并使用反射来获取私有成员值。然后对于反序列化,我将使用反射将值放回去。 这行得通吗?有更容易的方法吗? 问题答案: 您可以只使用实现Serializable且与第三方对象具有相同字段的传输对象。让传输对象实现一个方法,该方法返回原始第三方类的对象,您已完成: 伪代码: 如果您有任何特殊的成员
问题内容: 如何序列化未实现Serializable的对象?我不能将其标记为Serializable,因为该类来自第3方库。 问题答案: 您不能序列化未实现的类,但可以将其包装在可以实现的类中。为此,您应该在包装器类上实现和,以便可以以自定义方式序列化其对象。 首先,使您的非序列化字段。 在中,首先调用流以存储所有非瞬态字段,然后调用其他方法来序列化不可序列化对象的各个属性。 在中,首先调用流以读
问题内容: 我的直觉告诉我,必须以某种方式将其转换为字符串或byte [](在Go中甚至可能是相同的东西?),然后将其保存到磁盘。 我找到了这个包(http://golang.org/pkg/encoding/gob/),但似乎仅用于结构? 问题答案: 序列化数据有多种方法,Go为此提供了许多软件包。某些常见编码方式的软件包: 处理地图很好。以下示例显示了地图的编码/解码: 操场
我有一个kdtree,其节点由以下字段组成:公共静态类节点实现可序列化{ 其中DataPoint定义: 公共静态类DataPoint实现可序列化{公共可比X;公共可比Y;公共可比Z; 我想序列化树,存储在文件中并在回答范围查询时反序列化。我对这个概念od序列化的理解并不好。从我收集的任何内容中,我编写了以下函数,但不起作用。有人能帮忙吗?
问题内容: 我想深入克隆一个列表。为此,我们有一种方法 所以现在要克隆我的列表,我应该先将其转换为可序列化的。是否可以将列表转换为可序列化列表? 问题答案: 已实施的所有标准实施。 因此,即使它本身不是的子类型,也可以安全地将列表强制转换为,只要您知道它是诸如或的标准实现之一。 如果不确定,请先复制列表(使用),然后知道它是可序列化的。
我有一个模型,它使用一个通用外键,使用“content\u type”字段来存储内容类型,“object\u id”来存储对象id。这个模型需要使用CRUD API进行操作,我使用的是DRF。我有一个模型的序列化程序,但我遇到了一点问题。如果我只是将content\u type添加到如下字段列表中 序列化程序将JSON表示设置为ContentType模型实例的ID。API的用户不知道这些ID,我不