当前位置: 首页 > 知识库问答 >
问题:

如何将数组(即列表)列转换为向量

岳玉书
2023-03-14
from pyspark.sql import Row
source_data = [
    Row(city="Chicago", temperatures=[-1.0, -2.0, -3.0]),
    Row(city="New York", temperatures=[-7.0, -7.0, -5.0]), 
]
df = spark.createDataFrame(source_data)

注意温度场是一个浮动列表。我希望将这些浮点数列表转换为MLlib类型vector,并且希望这种转换使用基本的dataframeAPI来表示,而不是通过RDDs(这是低效的,因为它将所有数据从JVM发送到Python,处理是在Python中完成的,我们没有得到Spark的Catalyst优化器yada yada的好处)。我该怎么做?具体来说:

  1. 有没有办法让直铸件工作?请参见下面的详细信息(以及一次失败的变通尝试)?或者,是否有其他操作具有我在之后的效果?
  2. 在下面我建议的两个替代解决方案中(UDF与爆炸/重组列表中的项目),哪个更有效?或者还有其他几乎但不完全正确的替代方案比它们中的任何一个都好吗?

这就是我所期望的“适当”解决方案。我想将列的类型从一种类型转换为另一种类型,因此应该使用强制转换。作为上下文,让我提醒您将其转换为其他类型的正常方法:

from pyspark.sql import types
df_with_strings = df.select(
    df["city"], 
    df["temperatures"].cast(types.ArrayType(types.StringType()))),
)

现在,例如df_with_strings.collect()[0][“温度”][1]'-7.0'。但是如果我转换为一个ml向量,那么事情就不那么顺利了:

from pyspark.ml.linalg import VectorUDT
df_with_vectors = df.select(df["city"], df["temperatures"].cast(VectorUDT()))

这会产生一个错误:

pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast ArrayType(DoubleType,true) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#1 as vector), None)]
+- LogicalRDD [city#0, temperatures#1]
"

哎呀!有什么办法解决这个问题吗?

from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=["temperatures"], outputCol="temperature_vector")
df_fail = assembler.transform(df)
pyspark.sql.utils.IllegalArgumentException: 'Data type ArrayType(DoubleType,true) is not supported.'
from pyspark.ml.feature import VectorAssembler
TEMPERATURE_COUNT = 3
assembler_exploded = VectorAssembler(
    inputCols=["temperatures[{}]".format(i) for i in range(TEMPERATURE_COUNT)], 
    outputCol="temperature_vector"
)
df_exploded = df.select(
    df["city"], 
    *[df["temperatures"][i] for i in range(TEMPERATURE_COUNT)]
)
converted_df = assembler_exploded.transform(df_exploded)
final_df = converted_df.select("city", "temperature_vector")

一个更简单的替代方案是使用UDF进行转换。这让我可以非常直接地在一行代码中表达我想要做的事情,并且不需要创建一个包含大量列的数据集。但是所有这些数据都必须在Python和JVM之间交换,每个单独的数字都必须由Python处理(它在迭代单独的数据项时是出了名的慢)。这看起来是这样的:

from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
df_with_vectors = df.select(
    df["city"], 
    list_to_vector_udf(df["temperatures"]).alias("temperatures")
)

这个漫无边际的问题的其余部分是我在试图找到答案时想出的一些额外的东西。大多数读这篇文章的人可能会跳过它们。

在这个简单的示例中,可以首先使用向量类型创建数据,但当然,我的数据并不是真正的Python列表,而是从数据源读取的。但要记录在案的是,情况会是这样的:

from pyspark.ml.linalg import Vectors
from pyspark.sql import Row
source_data = [
    Row(city="Chicago", temperatures=Vectors.dense([-1.0, -2.0, -3.0])),
    Row(city="New York", temperatures=Vectors.dense([-7.0, -7.0, -5.0])),
]
df = spark.createDataFrame(source_data)
df_with_vectors = df.rdd.map(lambda row: Row(
    city=row["city"], 
    temperatures=Vectors.dense(row["temperatures"])
)).toDF()
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
list_to_almost_vector_udf = udf(lambda l: (1, None, None, l), VectorUDT.sqlType())
df_almost_vector = df.select(
    df["city"], 
    list_to_almost_vector_udf(df["temperatures"]).alias("temperatures")
)
df_with_vectors = df_almost_vector.select(
    df_almost_vector["city"], 
    df_almost_vector["temperatures"].cast(VectorUDT())
)
pyspark.sql.utils.AnalysisException: "cannot resolve 'CAST(`temperatures` AS STRUCT<`type`: TINYINT, `size`: INT, `indices`: ARRAY<INT>, `values`: ARRAY<DOUBLE>>)' due to data type mismatch: cannot cast StructType(StructField(type,ByteType,false), StructField(size,IntegerType,true), StructField(indices,ArrayType(IntegerType,false),true), StructField(values,ArrayType(DoubleType,false),true)) to org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7;;
'Project [city#0, unresolvedalias(cast(temperatures#5 as vector), None)]
+- Project [city#0, <lambda>(temperatures#1) AS temperatures#5]
+- LogicalRDD [city#0, temperatures#1]
"

共有1个答案

爱博达
2023-03-14

就我个人而言,我会使用Python UDF,而不会费心做其他事情:

  • 向量不是本机SQL类型,因此会有这样或那样的性能开销。特别是,这个过程需要两个步骤,首先将数据从外部类型转换为行,然后使用泛型rowencoder.
  • 从行转换为内部表示形式
  • 任何下游ML管道都将比简单的转换昂贵得多。此外,它需要一个与上面描述的过程相反的过程

但如果你真的想要其他选择,你可以:

.
├── build.sbt
└── udfs.scala

编辑build.sbt(调整以反映Scala和Spark版本):

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-sql" % "2.4.4",
  "org.apache.spark" %% "spark-mllib" % "2.4.4"
)

编辑udfs.scala:

package com.example.spark.udfs

import org.apache.spark.sql.functions.udf
import org.apache.spark.ml.linalg.DenseVector

object udfs {
  val as_vector = udf((xs: Seq[Double]) => new DenseVector(xs.toArray))
}

包装:

sbt package
$PROJECT_ROOT/target/scala-2.11/udfs_2.11-0.1-SNAPSHOT.jar
from pyspark.sql.column import _to_java_column, _to_seq, Column
from pyspark import SparkContext

def as_vector(col):
    sc = SparkContext.getOrCreate()
    f = sc._jvm.com.example.spark.udfs.udfs.as_vector()
    return Column(f.apply(_to_seq(sc, [col], _to_java_column)))
with_vec = df.withColumn("vector", as_vector("temperatures"))
with_vec.show()
+--------+------------------+----------------+
|    city|      temperatures|          vector|
+--------+------------------+----------------+
| Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
|New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
+--------+------------------+----------------+

with_vec.printSchema()
root
 |-- city: string (nullable = true)
 |-- temperatures: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- vector: vector (nullable = true)
from pyspark.sql.functions import to_json, from_json, col, struct, lit
from pyspark.sql.types import StructType, StructField
from pyspark.ml.linalg import VectorUDT

json_vec = to_json(struct(struct(
    lit(1).alias("type"),  # type 1 is dense, type 0 is sparse
    col("temperatures").alias("values")
).alias("v")))

schema = StructType([StructField("v", VectorUDT())])

with_parsed_vector = df.withColumn(
    "parsed_vector", from_json(json_vec, schema).getItem("v")
)

with_parsed_vector.show()
+--------+------------------+----------------+
|    city|      temperatures|   parsed_vector|
+--------+------------------+----------------+
| Chicago|[-1.0, -2.0, -3.0]|[-1.0,-2.0,-3.0]|
|New York|[-7.0, -7.0, -5.0]|[-7.0,-7.0,-5.0]|
+--------+------------------+----------------+
with_parsed_vector.printSchema()
root
 |-- city: string (nullable = true)
 |-- temperatures: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- parsed_vector: vector (nullable = true)
 类似资料:
  • 问题内容: 考虑以下代码段(假设已经设置为some ): 注意,温度字段是浮标的列表。我想将这些浮点数列表转换为MLlib类型,并且我希望使用基本API而不是通过RDD来表示这种转换(效率低下,因为它将所有数据从JVM发送到python,是使用Python完成的,我们无法获得Spark的Catalyst优化器yada yada的好处。我该怎么做呢?特别: 有没有办法让演员直接工作?请参阅下面的详细

  • 问题内容: 我是Python的新手,需要将列表转换为字典。我知道我们可以将元组列表转换为字典。 这是输入列表: 并且我想将此列表转换为元组列表(或直接转换为字典),如下所示: 我们如何在Python中轻松做到这一点? 问题答案: 您想一次将三个项目分组吗? 您想一次分组N个项目吗?

  • 通过使用Java8流: 从性能上看,哪个更好?

  • 问题内容: 我使用一个外部模块(libsvm),该模块不支持numpy数组,仅支持元组,列表和字典。但是我的数据是二维二维数组。我如何以pythonic方式转换它,也就是没有循环。 问题答案: 您可以简单地将矩阵转换为,以证明:

  • 问题内容: 我已经编写了此函数,用于将元组列表转换为列表列表。有没有更优雅的/ Pythonic的方式来做到这一点? 问题答案: 您可以使用列表推导:

  • 问题内容: 我如何转换 至 问题答案: 使用简单的列表理解: 会给你: