我有一个PySpark数据帧,df1,看起来像:
Customer1 Customer2 v_cust1 v_cust2
1 2 0.9 0.1
1 3 0.3 0.4
1 4 0.2 0.9
2 1 0.8 0.8
我想得到两个数据帧的余弦相似性。并有类似的东西
Customer1 Customer2 v_cust1 v_cust2 cosine_sim
1 2 0.9 0.1 0.1
1 3 0.3 0.4 0.9
1 4 0.2 0.9 0.15
2 1 0.8 0.8 1
我有一个python函数,它接收数字/数字数组,如下所示:
def cos_sim(a, b):
return float(np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b)))
如何使用udf在我的数据帧中创建余弦_sim列?可以传递几列而不是一列给udf余弦_sim函数吗?
如果您更愿意使用pandas_udf,则效率会更高。
它在矢量化操作方面比Spark udfs表现更好:为PySpark引入Pandas UDF
from pyspark.sql.functions import PandasUDFType, pandas_udf
import pyspark.sql.functions as F
# Names of columns
a, b = "v_cust1", "v_cust2"
cosine_sim_col = "cosine_sim"
# Make a reserved column to fill the values since the constraint of pandas_udf
# is that the input schema and output schema has to remain the same.
df = df.withColumn("cosine_sim", F.lit(1.0).cast("double"))
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def cos_sim(df):
df[cosine_sim_col] = float(np.dot(df[a], df[b]) / (np.linalg.norm(df[a]) * np.linalg.norm(df[b])))
return df
# Assuming that you want to groupby Customer1 and Customer2 for arrays
df2 = df.groupby(["Customer1", "Customer2"]).apply(cos_sim)
# But if you want to send entire columns then make a column with the same
# value in all rows and group by it. For e.g.:
df3 = df.withColumn("group", F.lit("group_a")).groupby("group").apply(cos_sim)
我有一个PySpark数据帧,df1,看起来像: 我有第二个PySpark数据帧,df2 我想得到两个数据帧的余弦相似性。并有类似的东西
我有一个PySpark数据帧,df1,看起来像: 我有第二个PySpark数据帧,df2 我想将df1的所有列(我有两列以上)与客户ID上的df2连接值相乘
假设我有两个数据帧,具有不同级别的信息,如下所示: 我想加入df1和df2,并将“值”信息传递给df2:一天中的每一小时都将获得“日”值。 预期产出:
问题内容: 我有一个数据框,如下所示:框的形状是(1510,1399)。列代表产品,行代表用户为给定产品分配的值(0或1)。如何计算jaccard_similarity_score? 我创建了一个占位符数据框,列出了产品与产品 我不确定如何通过data_ibs进行迭代以计算相似性。 问题答案: 简短且向量化(快速)的答案: 从scikit的成对距离使用“汉明”学习: 说明: 假设这是您的数据集:
有没有办法选择整行作为一列输入到Pyspark过滤器udf中? 我有一个复杂的过滤函数“my_filter”,我想应用于整个数据帧: 但是 引发错误,因为这不是有效的操作。 我知道我可以将数据帧转换为RDD,然后使用RDD的过滤方法,但我不想将其转换为RDD,然后再转换回数据帧。我的数据帧具有复杂的嵌套类型,因此当我尝试再次将 RDD 转换为数据帧时,架构推断将失败。
我想使用PySpark创建spark数据帧,为此我在PyCharm中运行了以下代码: 但是,它会返回此错误: 使用 Spark 的默认 log4j 配置文件:组织/缓存/火花/log4j-defaults.属性 将默认日志级别设置为“WARN”。要调整日志记录级别,请使用 sc.setLogLevel(新级别)。对于 SparkR,请使用 setLogLevel(新级别)。18/01/08 10: