我正试图在Spark中编写一个程序来执行潜在的Dirichlet分配(LDA)。这个Spark文档页面提供了一个在样本数据上执行LDA的好例子。下面是节目
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
# Load and parse the data
data = sc.textFile("data/mllib/sample_lda_data.txt")
parsedData = data.map(lambda line: Vectors.dense([float(x) for x in line.strip().split(' ')]))
# Index documents with unique IDs
corpus = parsedData.zipWithIndex().map(lambda x: [x[1], x[0]]).cache()
# Cluster the documents into three topics using LDA
ldaModel = LDA.train(corpus, k=3)
# Output topics. Each is a distribution over words (matching word count vectors)
print("Learned topics (as distributions over vocab of " + str(ldaModel.vocabSize())
+ " words):")
topics = ldaModel.topicsMatrix()
for topic in range(3):
print("Topic " + str(topic) + ":")
for word in range(0, ldaModel.vocabSize()):
print(" " + str(topics[word][topic]))
# Save and load model
ldaModel.save(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")
sameModel = LDAModel\
.load(sc, "target/org/apache/spark/PythonLatentDirichletAllocationExample/LDAModel")
使用的示例输入(sample_lda_data.txt)如下所示
1 2 6 0 2 3 1 1 0 0 3
1 3 0 1 3 0 0 2 0 0 1
1 4 1 0 0 4 9 0 1 2 0
2 1 0 3 0 0 5 0 2 3 9
3 1 1 9 3 0 2 0 0 1 3
4 2 0 3 4 5 1 1 1 4 0
2 1 0 3 0 0 5 0 2 2 9
1 1 1 9 2 1 2 0 0 1 3
4 4 0 3 4 2 1 3 0 0 0
2 8 2 0 3 0 2 0 2 7 2
1 1 1 9 0 2 2 0 0 3 3
4 1 0 0 4 5 1 3 0 1 0
如何修改程序以运行在包含文本数据而不是数字的文本数据文件上?让示例文件包含以下文本。
潜在狄利克雷分配(LDA)是一种从文本文档集合中推断主题的主题模型。LDA可以被认为是一种聚类算法,如下所示:
主题对应于集群中心,文档对应于数据集中的示例(行)。主题和文档都存在于需求空间中,其中特征向量是字数(词袋)的向量。LDA不是使用传统的距离来估计聚类,而是使用基于文本文档如何生成的概率模型的函数。
在做了一些研究之后,我试图回答这个问题。下面是使用Spark对具有真实文本数据的文本文档执行LDA的示例代码。
from pyspark.sql import SQLContext, Row
from pyspark.ml.feature import CountVectorizer
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vector, Vectors
path = "sample_text_LDA.txt"
data = sc.textFile(path).zipWithIndex().map(lambda (words,idd): Row(idd= idd, words = words.split(" ")))
docDF = spark.createDataFrame(data)
Vector = CountVectorizer(inputCol="words", outputCol="vectors")
model = Vector.fit(docDF)
result = model.transform(docDF)
corpus = result.select("idd", "vectors").rdd.map(lambda (x,y): [x,Vectors.fromML(y)]).cache()
# Cluster the documents into three topics using LDA
ldaModel = LDA.train(corpus, k=3,maxIterations=100,optimizer='online')
topics = ldaModel.topicsMatrix()
vocabArray = model.vocabulary
wordNumbers = 10 # number of words per topic
topicIndices = sc.parallelize(ldaModel.describeTopics(maxTermsPerTopic = wordNumbers))
def topic_render(topic): # specify vector id of words to actual words
terms = topic[0]
result = []
for i in range(wordNumbers):
term = vocabArray[terms[i]]
result.append(term)
return result
topics_final = topicIndices.map(lambda topic: topic_render(topic)).collect()
for topic in range(len(topics_final)):
print ("Topic" + str(topic) + ":")
for term in topics_final[topic]:
print (term)
print ('\n')
问题中提到的文本数据中提取的主题如下:
本文向大家介绍Python中eval带来的潜在风险代码分析,包括了Python中eval带来的潜在风险代码分析的使用技巧和注意事项,需要的朋友参考一下 0x00 前言 eval是Python用于执行python表达式的一个内置函数,使用eval,可以很方便的将字符串动态执行。比如下列代码: 当内存中的内置模块含有os的话,eval同样可以做到命令执行: 当然,eval只能执行Python的表达式类
DF: 我正在创建一个新的列“identify”,以查找(ID、日期)的分区,并通过“identify”选择排序最靠前的组合 预期DF: 代码1: 我的作品: 代码尝试2: 我的作品: 关于如何调整代码以获得所需OP的任何建议都将是有帮助的
我有一个火花数据框,我需要写入MongoDB。我想知道如何在mongoDB中将数据框的一些列写成嵌套/分层JSON。假设数据框有6列,col1,col2,…… col5,col6我想要col1,col2,col3作为第一层次结构,其余列col4到col6作为第二层次结构。像这样的东西, 我如何在pyspark中实现这一点?
因此,如何跨辅助节点对RDD进行分区,是将被分区的单个RDD还是一个完整的批处理。 我可能拿错了。请指引我
我有一个关于RDD中默认分区的问题。 我不知道为什么会这样。你能帮忙吗。 谢了!
假设我正在从S3文件夹中读取100个文件。每个文件的大小为10 MB。当我执行<code>df=spark.read时。parquet(s3路径),文件(或更确切地说分区)如何在任务之间分布?E、 g.在这种情况下,<code>df</code>将有100个分区,如果spark有10个任务正在运行以将该文件夹的内容读取到数据帧中,那么这些分区是如何分配给这10个任务的?它是以循环方式进行的,还是每