当前位置: 首页 > 面试题库 >

计算pyspark中数据框所有行之间的余弦相似度

曾高杰
2023-03-14
问题内容

我有一个数据集,其中包含工人及其年龄,性别,地址等人口统计信息及其工作地点。我从数据集创建了一个RDD,并将其转换为DataFrame。

每个ID有多个条目。因此,我创建了一个DataFrame,其中仅包含工人的ID和他/她工作过的各个办公室位置。

    |----------|----------------|
    | **ID**    **Office_Loc**  |
    |----------|----------------|
    |   1      |Delhi, Mumbai,  |
    |          | Gandhinagar    |
    |---------------------------|
    |   2      | Delhi, Mandi   | 
    |---------------------------|
    |   3      |Hyderbad, Jaipur|
    -----------------------------

我想根据他们的办公地点来计算每个工人与其他每个工人之间的余弦相似度。

因此,我遍历了DataFrame的各行,从DataFrame检索了一行:

myIndex = 1
values = (ID_place_df.rdd.zipWithIndex()
            .filter(lambda ((l, v), i): i == myIndex)
            .map(lambda ((l,v), i): (l, v))
            .collect())

然后使用地图

    cos_weight = ID_place_df.select("ID","office_location").rdd\
  .map(lambda x: get_cosine(values,x[0],x[1]))

计算提取的行与整个DataFrame之间的余弦相似度。

我不认为我的方法是一种好方法,因为我要遍历DataFrame的行,它违反了使用spark的全部目的。在pyspark中有更好的方法吗?好心提醒。


问题答案:

您可以使用该mllib包来计算L2每行TF-IDF的范数。然后将表与其自身相乘,以得到余弦相似度,即两个点的乘积乘以两个L2范数:

1. RDD

rdd = sc.parallelize([[1, "Delhi, Mumbai, Gandhinagar"],[2, " Delhi, Mandi"], [3, "Hyderbad, Jaipur"]])
  • 计算TF-IDF

    documents = rdd.map(lambda l: l[1].replace(" ", "").split(","))
    

    from pyspark.mllib.feature import HashingTF, IDF
    hashingTF = HashingTF()
    tf = hashingTF.transform(documents)

您可以指定特征的数量,HashingTF以使特征矩阵更小(较少的列)。

    tf.cache()
    idf = IDF().fit(tf)
    tfidf = idf.transform(tf)
  • 计算L2准则:

    from pyspark.mllib.feature import Normalizer
    

    labels = rdd.map(lambda l: l[0])
    features = tfidf

    normalizer = Normalizer()
    data = labels.zip(normalizer.transform(features))

  • 通过将矩阵与其自身相乘来计算余弦相似度:

    from pyspark.mllib.linalg.distributed import IndexedRowMatrix
    

    mat = IndexedRowMatrix(data).toBlockMatrix()
    dot = mat.multiply(mat.transpose())
    dot.toLocalMatrix().toArray()

    array([[ 0.        ,  0.        ,  0.        ,  0.        ],
           [ 0.        ,  1.        ,  0.10794634,  0.        ],
           [ 0.        ,  0.10794634,  1.        ,  0.        ],
           [ 0.        ,  0.        ,  0.        ,  1.        ]])
    

或: 使用笛卡尔积和dotnumpy数组上的函数:

    data.cartesian(data)\
    .map(lambda l: ((l[0][0], l[1][0]), l[0][1].dot(l[1][1])))\
    .sortByKey()\
    .collect()

    [((1, 1), 1.0),
     ((1, 2), 0.10794633570596117),
     ((1, 3), 0.0),
     ((2, 1), 0.10794633570596117),
     ((2, 2), 1.0),
     ((2, 3), 0.0),
     ((3, 1), 0.0),
     ((3, 2), 0.0),
     ((3, 3), 1.0)]

2.数据框

由于您似乎正在使用数据框,因此可以改用该spark ml包:

import pyspark.sql.functions as psf
df = rdd.toDF(["ID", "Office_Loc"])\
    .withColumn("Office_Loc", psf.split(psf.regexp_replace("Office_Loc", " ", ""), ','))
  • 计算TF-IDF:

    from pyspark.ml.feature import HashingTF, IDF
    

    hashingTF = HashingTF(inputCol=”Office_Loc”, outputCol=”tf”)
    tf = hashingTF.transform(df)

    idf = IDF(inputCol=”tf”, outputCol=”feature”).fit(tf)
    tfidf = idf.transform(tf)

  • 计算L2准则:

    from pyspark.ml.feature import Normalizer
    

    normalizer = Normalizer(inputCol=”feature”, outputCol=”norm”)
    data = normalizer.transform(tfidf)

  • 计算矩阵乘积:

    from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
    

    mat = IndexedRowMatrix(
    data.select(“ID”, “norm”)\
    .rdd.map(lambda row: IndexedRow(row.ID, row.norm.toArray()))).toBlockMatrix()
    dot = mat.multiply(mat.transpose())
    dot.toLocalMatrix().toArray()

或: 使用联接和UDFfor函数dot

    dot_udf = psf.udf(lambda x,y: float(x.dot(y)), DoubleType())
data.alias("i").join(data.alias("j"), psf.col("i.ID") < psf.col("j.ID"))\
    .select(
        psf.col("i.ID").alias("i"), 
        psf.col("j.ID").alias("j"), 
        dot_udf("i.norm", "j.norm").alias("dot"))\
    .sort("i", "j")\
    .show()

    +---+---+-------------------+
    |  i|  j|                dot|
    +---+---+-------------------+
    |  1|  2|0.10794633570596117|
    |  1|  3|                0.0|
    |  2|  3|                0.0|
    +---+---+-------------------+

本教程列出了用于乘法大型矩阵的不同方法:https : //labs.yodas.com/large-scale-matrix-multiplication-
with-pyspark-or-how-to-match-two-large-datasets-of-company
-1be4b1b2871e



 类似资料:
  • 问题内容: 假设您在数据库中按以下方式构造了一个表: 为了清楚起见,应输出: 请注意,由于向量存储在数据库中,因此我们仅需要存储非零条目。在此示例中,我们只有两个向量$ v_ {99} =(4,3,4,0)$和$ v_ {1234} =(0,5,2,3)$都在$ \ mathbb {R}中^ 4 $。 这些向量的余弦相似度应为$ \ displaystyle \ frac {23} {\ sqrt

  • 问题内容: 我一直在遵循一个教程,该教程显示了如何制作word2vec模型。 本教程使用以下代码: (未提供其他信息,但我想这来自) 现在,我已经对该方法进行了一些研究,但对此却知之甚少。据我了解,它已被许多功能取代。 我应该使用什么?有,它有一个参数(似乎正确),但没有参数。 在这种情况下我可以使用什么? 问题答案: Keras文档中有一些尚不清楚的事情,我认为了解这些至关重要: 对于keras

  • 我有一个PySpark数据帧,df1,看起来像: 我有第二个PySpark数据帧,df2 我想得到两个数据帧的余弦相似性。并有类似的东西

  • 问题内容: 我有两个标准化张量,我需要计算这些张量之间的余弦相似度。如何使用TensorFlow做到这一点? 问题答案: 这将完成工作: 此打印

  • 问题内容: 我计算了两个文档的tf / idf值。以下是tf / idf值: 这些文件就像: 如何使用这些值来计算余弦相似度? 我知道我应该计算点积,然后找到距离并除以点积。如何使用我的值来计算? 还有一个问题: 两个文档的字数相同是否重要? 问题答案: a * b是点积 一些细节: 是。在某种程度上,a和b必须具有相同的长度。但是a和b通常具有稀疏表示,您只需要存储非零条目,就可以更快地计算范数

  • 问题内容: 如何找到向量之间的余弦相似度? 我需要找到相似性来衡量两行文本之间的相关性。 例如,我有两个句子: 用户界面系统 用户界面机 …及其在tF-idf之后的向量,然后使用LSI进行标准化,例如 和。 如何测量这些向量之间的相似性? 问题答案: 我最近在大学的信息检索部门做了一些tf-idf的工作。我使用了这种余弦相似度方法,该方法使用Jama:Java Matrix Package 。 有