考虑以下代码段(假设spark
已经设置为some SparkSession
):
from pyspark.sql import Row
source_data = [
Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
Row(city="New York", temperatures=[-7.0, -7.0, -5.0]),
]
df = spark.createDataFrame(source_data)
注意,温度字段是浮标的列表。我想将这些浮点数列表转换为MLlib类型Vector
,并且我希望使用基本DataFrame
API而不是通过RDD来表示这种转换(效率低下,因为它将所有数据从JVM发送到python,是使用Python完成的,我们无法获得Spark的Catalyst优化器yada
yada的好处。我该怎么做呢?特别:
这就是我期望的“正确”解决方案。我想将列的类型从一种类型转换为另一种类型,所以我应该使用强制类型转换。作为上下文,让我提醒您将其转换为另一种类型的正常方法:
from pyspark.sql import types
df_with_strings = df.select(
df["city"],
df["temperatures"].cast(types.ArrayType(types.StringType()))),
)
现在,例如df_with_strings.collect()[0]["temperatures"][1]
是'-7.0'
。但是,如果我将其转换为ml
Vector,那么事情就不会那么顺利:
from pyspark.ml.linalg import VectorUDT
df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))
这给出了一个错误:
pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast ArrayType(DoubleType,true) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#1 as vector), None)]
+- LogicalRDD [city#0, temperatures#1]
"
kes!任何想法如何解决这一问题?
VectorAssembler
有一个Transformer
似乎几乎适合这种工作:对VectorAssembler
。它需要一个或多个列,并将它们连接成一个向量。不幸的是,它仅需要Vector
和Float
列,而不需要Array
列,因此以下操作无效:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["temperatures"], outputCol="temperature_vector")
df_fail = assembler.transform(df)
它给出了这个错误:
pyspark.sql.utils.IllegalArgumentException: 'Data type ArrayType(DoubleType,true) is not supported.'
我能想到的最好的解决方法是将列表分解为多列,然后使用将VectorAssembler
其再次收集起来备份:
from pyspark.ml.feature import VectorAssembler
TEMPERATURE_COUNT = 3
assembler_exploded = VectorAssembler(
inputCols=["temperatures[{}]".format(i) for i in range(TEMPERATURE_COUNT)],
outputCol="temperature_vector"
)
df_exploded = df.select(
df["city"],
*[df["temperatures"][i] for i in range(TEMPERATURE_COUNT)]
)
converted_df = assembler_exploded.transform(df_exploded)
final_df = converted_df.select("city", "temperature_vector")
这似乎很理想,不过TEMPERATURE_COUNT
要大于100,有时大于1000。(另一个问题是,如果您事先不知道数组的大小,则代码会更复杂,尽管那不是Spark确实生成了具有这么多列的中间数据集,还是只是将其视为单个项目短暂通过的中间步骤(或者,当看到以下内容时,是否确实完全优化了该离开步骤)?仅将这些列使用才能组装成向量)?
一个更简单的替代方法是使用UDF进行转换。这使我可以直接在一行代码中直接表达我想做的事情,而不需要创建包含大量列的数据集。但是所有这些数据都必须在Python和JVM之间交换,并且每个单独的数字都必须由Python处理(众所周知,这对于迭代单个数据项来说很慢)。看起来是这样的:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
df_with_vectors = df.select(
df["city"],
list_to_vector_udf(df["temperatures"]).alias("temperatures")
)
这个无聊的问题的其余部分是我在尝试找到答案时想到的一些额外内容。大多数阅读此书的人可能会跳过它们。
Vector
从一开始就使用在这个简单的示例中,可以使用向量类型开始创建数据,但是当然我的数据并不是我要并行化的Python列表,而是从数据源读取的。但是作为记录,这是这样的:
from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
source_data = [
Row(city="Chicago", temperatures=Vectors.dense([-1.0, -2.0, -3.0])),
Row(city="New York", temperatures=Vectors.dense([-7.0, -7.0, -5.0])),
]
df = spark.createDataFrame(source_data)
map()
一种可能性是使用RDDmap()
方法将列表转换为Vector
。这与UDF的想法类似,不同之处在于它甚至更糟,因为每一行中的所有字段(而不仅仅是被操作的字段)都会产生序列化等开销。作为记录,下面是该解决方案的外观:
df_with_vectors = df.rdd.map(lambda row: Row(
city=row["city"],
temperatures=Vectors.dense(row["temperatures"])
)).toDF()
无奈之下,我注意到它Vector
在内部由具有四个字段的结构表示,但使用该类型结构的传统强制类型转换也不起作用。这是一个示例(我使用udf构建结构,但udf不是重要部分):
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_almost_vector_udf = udf(lambda l: (1, None, None, l), VectorUDT.sqlType())
df_almost_vector = df.select(
df["city"],
list_to_almost_vector_udf(df["temperatures"]).alias("temperatures")
)
df_with_vectors = df_almost_vector.select(
df_almost_vector["city"],
df_almost_vector["temperatures"].cast(VectorUDT())
)
这给出了错误:
pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast StructType(StructField(type,ByteType,false), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,false),true), StructField(values,ArrayType(DoubleType,false),true)) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#5 as vector), None)]
+- Project [city#0, <lambda>(temperatures#1) AS temperatures#5]
+- LogicalRDD [city#0, temperatures#1]
"
就我个人而言,我会使用Python UDF,并且不会打扰其他任何事情:
Vectors
不是本机SQL类型,因此会以某种方式增加性能开销。特别是,此过程需要两个步骤,其中首先使用通用将数据从外部类型转换为行,然后从行转换为内部表示RowEncoder
。Pipeline
都将比简单转换昂贵得多。而且,它需要与上述相反的过程但是,如果您真的想要其他选择,您可以:
按照项目站点上的说明安装sbt。
使用以下结构创建Scala软件包:
.
├── build.sbt
└── udfs.scala
编辑build.sbt
(调整以反映Scala和Spark版本):
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % "2.4.4",
"org.apache.spark" %% "spark-mllib" % "2.4.4"
)
编辑udfs.scala
:
package com.example.spark.udfs
import org.apache.spark.sql.functions.udf
import org.apache.spark.ml.linalg.DenseVector
object udfs {
val as_vector = udf((xs: Seq[Double]) => new DenseVector(xs.toArray))
}
包:
sbt package
并包含(或等效项,具体取决于Scala版本):
$PROJECT_ROOT/target/scala-2.11/udfs_2.11-0.1-SNAPSHOT.jar
作为--driver-class-path
启动外壳程序/提交应用程序时的参数。
在PySpark中定义一个包装器:
from pyspark.sql.column import _to_java_column, _to_seq, Column
from pyspark import SparkContext
def as_vector(col):
sc = SparkContext.getOrCreate()
f = sc._jvm.com.example.spark.udfs.udfs.as_vector()
return Column(f.apply(_to_seq(sc, [col], _to_java_column)))
测试:
with_vec = df.withColumn("vector", as_vector("temperatures"))
with_vec.show()
+--------+------------------+----------------+
| city| temperatures| vector|
+--------+------------------+----------------+
| Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
|New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
+--------+------------------+----------------+
with_vec.printSchema()
root
|-- city: string (nullable = true)
|-- temperatures: array (nullable = true)
| |-- element: double (containsNull = true)
|-- vector: vector (nullable = true)
将数据转储为反映DenseVector
架构的JSON格式并读回:
from pyspark.sql.functions import to_json, from_json, col, struct, lit
from pyspark.sql.types import StructType, StructField
from pyspark.ml.linalg import VectorUDT
json_vec = to_json(struct(struct(
lit(1).alias(“type”), # type 1 is dense, type 0 is sparse
col(“temperatures”).alias(“values”)
).alias(“v”)))
schema = StructType([StructField(“v”, VectorUDT())])
with_parsed_vector = df.withColumn(
“parsed_vector”, from_json(json_vec, schema).getItem(“v”)
)
with_parsed_vector.show()
+--------+------------------+----------------+
| city| temperatures| parsed_vector|
+--------+------------------+----------------+
| Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
|New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
+--------+------------------+----------------+
with_parsed_vector.printSchema()
root
|– city: string (nullable = true)
|– temperatures: array (nullable = true)
| |– element: double (containsNull = true)
|– parsed_vector: vector (nullable = true)
注意温度场是一个浮动列表。我希望将这些浮点数列表转换为MLlib类型,并且希望这种转换使用基本的API来表示,而不是通过RDDs(这是低效的,因为它将所有数据从JVM发送到Python,处理是在Python中完成的,我们没有得到Spark的Catalyst优化器yada yada的好处)。我该怎么做?具体来说: 有没有办法让直铸件工作?请参见下面的详细信息(以及一次失败的变通尝试)?或者,是否有其
问题内容: 我是Python的新手,需要将列表转换为字典。我知道我们可以将元组列表转换为字典。 这是输入列表: 并且我想将此列表转换为元组列表(或直接转换为字典),如下所示: 我们如何在Python中轻松做到这一点? 问题答案: 您想一次将三个项目分组吗? 您想一次分组N个项目吗?
通过使用Java8流: 从性能上看,哪个更好?
问题内容: 我使用一个外部模块(libsvm),该模块不支持numpy数组,仅支持元组,列表和字典。但是我的数据是二维二维数组。我如何以pythonic方式转换它,也就是没有循环。 问题答案: 您可以简单地将矩阵转换为,以证明:
问题内容: 我已经编写了此函数,用于将元组列表转换为列表列表。有没有更优雅的/ Pythonic的方式来做到这一点? 问题答案: 您可以使用列表推导:
问题内容: 我如何转换 至 问题答案: 使用简单的列表理解: 会给你: