当前位置: 首页 > 知识库问答 >
问题:

如何在Spark中为Kmeans映射MongoDB数据?

徐博雅
2023-03-14
sc = SparkContext(appName="KMeansExample")  # SparkContext
data = sc.textFile("/home/mhoeller/kmeans_data.txt")
parsedData = data.map(lambda line: array([int(x) for x in line.split(' ')]))
clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")
spark = SparkSession \
.builder \
.appName("myApp") \
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/ycsb.usertable") \
.config("spark.mongodb.output.uri", "mongodb:/127.0.0.1/ycsb.usertable") \
.getOrCreate()

df = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri","mongodb://127.0.0.1/ycsb.usertable").load()

# <<<< Here I am missing the parsing >>>>>

clusters = KMeans.train(parsedData, 2, maxIterations=10, initializationMode="random")

我喜欢理解如何从df映射数据,以便它可以用作Kmeans的输入。

数据库的“布局”是D8:二进制(nullable=true)
--field9:二进制(nullable=true)

共有1个答案

林雅畅
2023-03-14

我喜欢理解如何从df映射数据,以便它可以用作Kmeans的输入。

根据您的代码片段,我假设您使用的是PySpark。

如果您查看clustering.kmeans Python API文档,可以看到第一个参数需要是向量或可转换序列类型的RDD

df = spark.read.format("com.mongodb.spark.sql.DefaultSource")
               .option("uri","mongodb://127.0.0.1/ycsb.usertable")
               .load()

有了上面的信息,让我们进入其中:

# Drop _id column and get RDD representation of the DataFrame
rowRDD = df.drop("_id").rdd

# Convert RDD of Row into RDD of numpy.array
parsedRdd = rowRDD.map(lambda row: array([int(x) for x in row]))

# Feed into KMeans 
clusters = KMeans.train(parsedRdd, 2, maxIterations=10, initializationMode="random")

如果希望保留布尔值(true/false)而不是整数(1/0),那么可以删除int部分。如下:

parsedRdd = rowRDD.map(lambda row: array([x for x in row]))

把它们放在一起:

from numpy import array 
from pyspark.mllib.clustering import KMeans
import org.apache.spark.sql.SparkSession

spark = SparkSession \
.builder \
.appName("myApp") \
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/ycsb.usertable") \
.config("spark.mongodb.output.uri", "mongodb:/127.0.0.1/ycsb.usertable") \
.getOrCreate()

df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").load()

rowRDD = df.drop("_id").rdd
parsedRdd = rowRDD.map(lambda row: array([int(x) for x in row]))

clusters = KMeans.train(parsedRdd, 2, maxIterations=10, initializationMode="random")
clusters.clusterCenters
 类似资料:
  • 我在Spark中有一个数据框,看起来像这样: 它有30列:只显示其中的一些! 因此,我必须在Scala中将这个数据帧转换成一个键值对,使用键作为数据帧中的一些列,并为这些键分配从索引0到计数(不同的键数)的唯一值。 例如:使用上面的案例,我希望在Scala中的map(key-value)集合中有一个输出,如下所示: 我对斯卡拉和斯帕克是新手,我试着做这样的事情。 但是,这不起作用。:/此操作完成后

  • RDD转换和操作只能由驱动程序调用,不能在其他转换内部调用;例如,rdd1。地图(x)= 正如错误所说,我试图在主映射函数中映射(转换)一个JavaRDD对象,ApacheSpark怎么可能呢? 主要JavaPairRDD对象(TextFile和Word是定义的类): 和地图功能: 我还尝试了foreach映射函数,但不起作用。(当然还有SPARK-5063)

  • 我在spark中有一个数据集,只有一列,这列是一个Map[String,Any]。我想逐行映射数据集,然后逐键映射映射映射列,计算每个键的值,并使用新数据生成与前一个相同类型的新数据集。 例如: 我想在每个值的末尾加上“”,结果将是一个数据类型的数据集,如下所示: 谢谢Nir

  • 我有两个表和一个表来映射我以前的两个表的关系,我如何使用Spring data JPA高效自动地将数据插入到映射的表中?下面是表结构。 用户(id(PK),名称,电子邮件,userRoleId)角色(id(PK),名称,userRoleId)用户角色(id(PK),userId(FK<-User),roleId(FK<-Role))

  • 我有一个以本地模式运行的Spark流程序,在该程序中,我从TCP套接字连接接收JSON消息,每个批处理间隔几条。 这些消息中的每一条都有一个ID,我用它来创建一个键/值JavaPairDStream,这样在我的JavaDStream中的RDD的每个分区中,都有一个键/值对,每个分区只有一条消息。 我现在的目标是将具有相同ID的消息分组在同一个分区中,以便我可以并行映射它们,每个分区由不同的核心处理

  • 问题内容: 我将Spring JPA与Hibernate和Postgres一起使用 在实体中,我尝试使用List和integer [] 在数据库中,我有一列类型: 有什么JPA使用方式吗? 问题答案: JPA无法直接将数组持久化到单独的表或数据库数组(例如,映射到的数组)。因此,您有两种方法: 1)用于将该列另存为BLOB或CLOB 2)使用而不是数组