我正在计算两大组向量(具有相同特征)之间的余弦相似度。每组向量表示为一个scipy CSR稀疏矩阵a和B。我想计算一个x B^T,它不会稀疏。但是,我只需要跟踪超过某个阈值的值,例如0.8。我正试图用vanilla RDD在Pyspark中实现这一点,目的是使用为scipy CSR矩阵实现的快速向量操作。
A和B的行是标准化的,所以为了计算余弦相似度,我只需要找到A中每一行与B中每一行的点积。A的尺寸是5000000 x 5000。B的尺寸为2000000 x 5000。
假设A和B太大,无法作为广播变量放入我的工作节点上的内存中。我应该如何以最佳方式并行化A和B?
编辑在我发布了我的解决方案后,我一直在探索其他可能更清晰、更优化的方法,特别是为Spark MLlib IndexedRowMatrix对象实现的列相似性()函数。(哪个pyspark抽象适合我的大矩阵乘法?)
我能够在这个框架中实现一个解决方案。
欢迎深入了解为什么这个解决方案很慢——是自定义序列化吗?
def csr_mult_helper(pair):
threshold=0.8
A_row = pair[0][0] # keep track of the row offset
B_col = pair[1][0] # offset for B (this will be a column index, after the transpose op)
A = sparse.csr_matrix(pair[0][1], pair[0][2]) # non-zero entires, size data
B = sparse.csr_matrix(pair[1][1], pair[1][2])
C = A * B.T # scipy sparse mat mul
for row_idx, row in enumerate(C): # I think it would be better to use a filter Transformation instead
col_indices = row.indices # but I had trouble with the row and column index book keeping
col_values = row.data
for col_idx, val in zip(col_indices, col_values):
if val > threshold:
yield (A_row + row_idx, B_col + col_idx, val) # source vector, target vector, cosine score
def parallelize_sparse_csr(M, rows_per_chunk=1):
[rows, cols] = M.shape
i_row = 0
submatrices = []
while i_row < rows:
current_chunk_size = min(rows_per_chunk, rows - i_row)
submat = M[i_row:(i_row + current_chunk_size)]
submatrices.append( (i_row, # offset
(submat.data, submat.indices, submat.indptr), # sparse matrix data
(current_chunk_size, cols)) ) # sparse matrix shape
i_row += current_chunk_size
return sc.parallelize(submatrices)
########## generate test data ###########
K,L,M,N = 5,2000,3,2000 # matrix dimensions (toy example)
A_ = sparse.rand(K,L, density=0.1, format='csr')
B_ = sparse.rand(M,N, density=0.1, format='csr')
print("benchmark: {} \n".format((A_ * B_.T).todense())) # benchmark solution for comparison
########## parallelize, multiply, and filter #########
t_start = time.time()
A = parallelize_sparse_csr(A_, rows_per_chunk=10)
B = parallelize_sparse_csr(B_, rows_per_chunk=10) # number of elements per partition, from B
# warning: this code breaks if the B_ matrix rows_per_chunk parameter != 1
# although I don't understand why yet
print("custom pyspark solution: ")
result = A.cartesian(B).flatMap(csr_mult_helper).collect()
print(results)
print("\n {} s elapsed".format(time.time() - t_start))
2.5.1 介绍 (密集) 矩阵是: 数据对象 存储二维值数组的数据结构 重要特征: 一次分配所有项目的内存 通常是一个连续组块,想一想Numpy数组 快速访问个项目(*) 2.5.1.1 为什么有稀疏矩阵? 内存,增长是n**2 小例子(双精度矩阵): In [2]: import numpy as np import matplotlib.pyplot as plt x = np.li
我有许多scipy稀疏矩阵(目前为CSR格式),需要与密集的numpy 1D向量相乘。该向量称为G: 每个稀疏矩阵都具有形状(163842097152),并且非常稀疏。密度约为4.0e-6。我有一个包含100个稀疏矩阵的列表,称为spmats。 我可以轻松地将每个矩阵与G相乘,如下所示: 这将产生一个形状密集向量列表(16384,)。 我的应用程序对性能相当关键,所以我尝试了另一种方法,即首先将所
在课堂上,我必须为稀疏矩阵编写自己的线性方程求解器。我可以自由地使用任何类型的数据结构为稀疏矩阵,我必须实现几个解决方案,包括共轭梯度。 谢了!
在使用numpy的python中,假设我有两个矩阵: 稀疏矩阵 密集的x*y矩阵 现在我想做,它将返回一个密集的矩阵。 但是,我只关心中非零的单元格,这意味着如果我这样做了,对我的应用程序不会有任何影响 <代码>S\u=S*S\u 显然,这将是对操作的浪费,因为我想把在
我正在实现一个稀疏矩阵类,使用映射向量来存储数据(映射表示矩阵的一行,其中键是列的索引,值是该位置的maitrix的值)我已经编写了计算行列式的函数,但我不知道是否有一种方法可以计算这种节省的时间(因为矩阵是稀疏的,大多数值为零)在这里我的实现: 这是类接口 我计算行列式的方式是什么?假设运算符()以这种方式重载 提前感谢您的帮助
稀疏矩阵(Sparse Matrix) 注:压缩存储的矩阵可以分为特殊矩阵和稀疏矩阵。对于那些具有相同元素或零元素在矩阵中分布具有一定规律的矩阵,被称之为特殊矩阵。对于那些零元素数据远远多于非零元素数目,并且非零元素的分布没有规律的矩阵称之为稀疏矩阵。 1. 稀疏矩阵的概念 在矩阵中,若数值为0的元素数目远远多于非0元素的数目时,则称该矩阵为稀疏矩阵。与之相反,若非0元素数目占大多数时,则称该矩阵