当前位置: 首页 > 知识库问答 >
问题:

Spark中的潜在Dirichlet分配

燕飞文
2023-03-14

我正试图在Spark中编写一个程序来执行潜在的Dirichlet分配(LDA)。这个Spark文档页面提供了一个在样本数据上执行LDA的好例子。下面是节目

from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors

# Load and parse the data
data = sc.textFile("data/mllib/sample_lda_data.txt")
parsedData = data.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
# Index documents with unique IDs
corpus = parsedData.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()

# Cluster the documents into three topics using LDA
ldaModel = LDA.train(corpus, k=3)

# Output topics. Each is a distribution over words (matching word count vectors)
print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize())
      + " words):")
topics = ldaModel.topicsMatrix()
for topic in range(3):
    print("Topic " + str(topic) + ":")
    for word in range(0, ldaModel.vocabSize()):
        print(" " + str(topics[word][topic]))

# Save and load model
ldaModel.save(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")
sameModel = LDAModel\
    .load(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")

使用的示例输入(sample_lda_data.txt)如下所示

1 2 6 0 2 3 1 1 0 0 3
1 3 0 1 3 0 0 2 0 0 1
1 4 1 0 0 4 9 0 1 2 0
2 1 0 3 0 0 5 0 2 3 9
3 1 1 9 3 0 2 0 0 1 3
4 2 0 3 4 5 1 1 1 4 0
2 1 0 3 0 0 5 0 2 2 9
1 1 1 9 2 1 2 0 0 1 3
4 4 0 3 4 2 1 3 0 0 0
2 8 2 0 3 0 2 0 2 7 2
1 1 1 9 0 2 2 0 0 3 3
4 1 0 0 4 5 1 3 0 1 0

如何修改程序以运行在包含文本数据而不是数字的文本数据文件上?让示例文件包含以下文本。

潜在狄利克雷分配(LDA)是一种从文本文档集合中推断主题的主题模型。LDA可以被认为是一种聚类算法,如下所示:

主题对应于集群中心,文档对应于数据集中的示例(行)。主题和文档都存在于需求空间中,其中特征向量是字数(词袋)的向量。LDA不是使用传统的距离来估计聚类,而是使用基于文本文档如何生成的概率模型的函数。

共有1个答案

詹甫
2023-03-14

在做了一些研究之后,我试图回答这个问题。下面是使用Spark对具有真实文本数据的文本文档执行LDA的示例代码。

from pyspark.sql import SQLContext, Row
from pyspark.ml.feature import CountVectorizer
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vector, Vectors

path = "sample_text_LDA.txt"

data = sc.textFile(path).zipWithIndex().map(lambda (words,idd): Row(idd= idd, words = words.split(" ")))
docDF = spark.createDataFrame(data)
Vector = CountVectorizer(inputCol="words", outputCol="vectors")
model = Vector.fit(docDF)
result = model.transform(docDF)

corpus = result.select("idd", "vectors").rdd.map(lambda (x,y): [x,Vectors.fromML(y)]).cache()

# Cluster the documents into three topics using LDA
ldaModel = LDA.train(corpus, k=3,maxIterations=100,optimizer='online')
topics = ldaModel.topicsMatrix()
vocabArray = model.vocabulary

wordNumbers = 10  # number of words per topic
topicIndices = sc.parallelize(ldaModel.describeTopics(maxTermsPerTopic = wordNumbers))

def topic_render(topic):  # specify vector id of words to actual words
    terms = topic[0]
    result = []
    for i in range(wordNumbers):
        term = vocabArray[terms[i]]
        result.append(term)
    return result

topics_final = topicIndices.map(lambda topic: topic_render(topic)).collect()

for topic in range(len(topics_final)):
    print ("Topic" + str(topic) + ":")
    for term in topics_final[topic]:
        print (term)
    print ('\n')

问题中提到的文本数据中提取的主题如下:

 类似资料:
  • 本文向大家介绍Python中eval带来的潜在风险代码分析,包括了Python中eval带来的潜在风险代码分析的使用技巧和注意事项,需要的朋友参考一下 0x00 前言 eval是Python用于执行python表达式的一个内置函数,使用eval,可以很方便的将字符串动态执行。比如下列代码: 当内存中的内置模块含有os的话,eval同样可以做到命令执行: 当然,eval只能执行Python的表达式类

  • DF: 我正在创建一个新的列“identify”,以查找(ID、日期)的分区,并通过“identify”选择排序最靠前的组合 预期DF: 代码1: 我的作品: 代码尝试2: 我的作品: 关于如何调整代码以获得所需OP的任何建议都将是有帮助的

  • 我有一个火花数据框,我需要写入MongoDB。我想知道如何在mongoDB中将数据框的一些列写成嵌套/分层JSON。假设数据框有6列,col1,col2,…… col5,col6我想要col1,col2,col3作为第一层次结构,其余列col4到col6作为第二层次结构。像这样的东西, 我如何在pyspark中实现这一点?

  • 因此,如何跨辅助节点对RDD进行分区,是将被分区的单个RDD还是一个完整的批处理。 我可能拿错了。请指引我

  • 我有一个关于RDD中默认分区的问题。 我不知道为什么会这样。你能帮忙吗。 谢了!

  • 假设我正在从S3文件夹中读取100个文件。每个文件的大小为10 MB。当我执行<code>df=spark.read时。parquet(s3路径),文件(或更确切地说分区)如何在任务之间分布?E、 g.在这种情况下,<code>df</code>将有100个分区,如果spark有10个任务正在运行以将该文件夹的内容读取到数据帧中,那么这些分区是如何分配给这10个任务的?它是以循环方式进行的,还是每