我正在尝试使用
如下:
>
在bigquery中创建基于用户级别的功能表
示例:功能表的外观
userid | x1 | x2 | x3 | x4 | x5 | x6 | x7 | x8 | x9 | x10
00013 | 0.01 | 0 | 0 | 0 | 0 | 0 | 0.06 | 0.09 | 0.001
#!/usr/bin/python
"""BigQuery I/O PySpark example."""
import json
import pprint
import subprocess
import pyspark
import numpy as np
from pyspark.ml.clustering import KMeans
from pyspark import SparkContext
from pyspark.ml import Pipeline
from pyspark.sql import SQLContext
from pyspark.mllib.linalg import Vectors, _convert_to_vector
from pyspark.sql.types import Row
from pyspark.mllib.common import callMLlibFunc, callJavaFunc, _py2java, _java2py
sc = pyspark.SparkContext()
# Use the Google Cloud Storage bucket for temporary BigQuery export data used by the InputFormat.
# This assumes the Google Cloud Storage connector for Hadoop is configured.
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory ='gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
conf = {# Input Parameters
'mapred.bq.project.id': project,
'mapred.bq.gcs.bucket': bucket,
'mapred.bq.temp.gcs.path': input_directory,
'mapred.bq.input.project.id': 'my-project',
'mapred.bq.input.dataset.id': 'tempData',
'mapred.bq.input.table.id': 'userFeatureInBQ'}
# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
'org.apache.hadoop.io.LongWritable',
'com.google.gson.JsonObject',conf=conf)
# Tranform the userid-Feature table into feature_data RDD
feature_data = (
table_data
.map(lambda (_, record): json.loads(record))
.map(lambda x:(x['x0'],x['x1'],x['x2'],x['x3'],x['x4'],
x['x5'],x['x6'],x['x7'],x['x8'],
x['x9'],x['x10'])))
# Function to convert each line in RDD into an array, return the vector
def parseVector(values):
array = np.array([float(v) for v in values])
return _convert_to_vector(array)
# Convert the RDD into a row wise RDD
data = feature_data.map(parseVector)
row_rdd = data.map(lambda x: Row(x))
sqlContext = SQLContext(sc)
# cache the RDD to improve performance
row_rdd.cache()
# Create a Dataframe
df = sqlContext.createDataFrame(row_rdd, ["features"])
# cache the Dataframe
df.cache()
下面是我打印到控制台的Schema和head()
|-- features: vector (nullable = true)
[Row(features=DenseVector([0.01,0,0,0,0,0,0,0.06,0.09,0.001]))]
#Define the paramMap & model
paramMap = ({'k':3,'initMode':'kmeans||'},{'k':3,'initMode':'random'},
{'k':4,'initMode':'kmeans||'},{'k':4,'initMode':'random'},
{'k':5,'initMode':'kmeans||'},{'k':5,'initMode':'random'},
{'k':6,'initMode':'kmeans||'},{'k':6,'initMode':'random'},
{'k':7,'initMode':'kmeans||'},{'k':7,'initMode':'random'},
{'k':8,'initMode':'kmeans||'},{'k':8,'initMode':'random'},
{'k':9,'initMode':'kmeans||'},{'k':9,'initMode':'random'},
{'k':10,'initMode':'kmeans||'},{'k':10,'initMode':'random'})
km = KMeans()
# Create a Pipeline with estimator stage
pipeline = Pipeline(stages=[km])
# Call & fit the pipeline with the paramMap
models = pipeline.fit(df, paramMap)`
print models
我得到以下输出与警告
7:03:24警告组织。阿帕奇。火花mllib。集群。KMeans:输入数据未直接缓存,如果其父RDD也未缓存,则可能会影响性能。[PipelineModel_443dbf939b7bd3bf7bfc,PipelineModel_4b64bb761f4efe51da50,PipelineModel_4f58411ac19beacc1A4,PipelineModel_4f58894f14d79b936,PipelineModel_4b81944f7a5e6be6eaf33,PipelineModel_4fc5b6370bf1d7db7dba,PipelineModel_43e0a196f16cfd357;,PipelineModel_47a548;ƞD40006; b262;,PipelineModelľľb4; b4b16; 1b,PipepepelineModel 1b48c4c9968c8、管道模型_4acf9cdbfda184b00328、管道模型_42d1a0c61c5e45cdb3cd、管道模型_4F0DB3C394BCC2C2BB9352、管道模型_441697f2748328de251c、管道模型_4a64ae517d270a1e0d5a、管道模型_4372BC8B184C05B0]
#Print the cluster centers:
for model in models:
print vars(model)
print model.stages[0].clusterCenters()
print model.extractParamMap()
(7.646767666638E-07,3.58531391e-01,1.68879698e-03,0.00000000e 00,1.5347707043E-00,1.534747477.47477-00,1.53477.477.64676666666638E-07,1.68878787879696989898E-03,0.0.0.00万万万万电子00万电子00,1.53477 7.4770707070437-00万E-00,1.477-02,1.53477 7.477.477-4.437-437-0 0-02,1.7-2,1.5万E-02,1.4.477-4.477070707-437-0 0万E-00,1.0-00,1.7-00,1.5万E-02,1.5万E-00,1.24364600e-02])
此处列出了问题列表,需要以下方面的帮助:
#computeError
def computeCost(model, rdd):`
"""Return the K-means cost (sum of squared distances of
points to their nearest center) for this model on the given data."""
cost = callMLlibFunc("computeCostKmeansModel",
rdd.map(_convert_to_vector),
[_convert_to_vector(c) for c in model.clusterCenters()])
return cost
cost= np.zeros(len(paramMap))
for i in range(len(paramMap)):
cost[i] = cost[i] + computeCost(model[i].stages[0], feature_data)
print cost
这将在循环结束时打印出以下内容:
[634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.002944687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.0029487634035.00294687 634035.00294687 634035.00294687 634035.0029487634035.0029487 63294687]
每个模型计算的成本/误差是否相同?同样,无法使用正确的参数访问pipelineModel
非常感谢您的帮助/指导!谢谢
您的参数未正确定义。它应该从特定参数映射到值,而不是从任意名称映射。您将获得等于2的k
,因为您传递的参数未被使用,并且每个模型都使用完全相同的默认参数。
让我们从示例数据开始:
import numpy as np
from pyspark.mllib.linalg import Vector
df = (sc.textFile("data/mllib/kmeans_data.txt")
.map(lambda s: Vectors.dense(np.fromstring(s, dtype=np.float64, sep=" ")))
.zipWithIndex()
.toDF(["features", "id"]))
和管道
:
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline
km = KMeans()
pipeline = Pipeline(stages=[km])
如上所述,参数映射应使用特定参数作为键。例如:
params = [
{km.k: 2, km.initMode: "k-means||"},
{km.k: 3, km.initMode: "k-means||"},
{km.k: 4, km.initMode: "k-means||"}
]
models = pipeline.fit(df, params=params)
assert [len(m.stages[0].clusterCenters()) for m in models] == [2, 3, 4]
笔记:
initMode
为K-意味着||是k-意味着||
不是k意味着||
。 DataFrame
,而是经过转换的RDD。问题内容: 基本上我有一个ArrayList的位置: 在此之下,我调用以下方法: getMap()方法中的参数为: 我遇到的问题是我不确定如何在整个方法列表中传递该方法。 我试过了 但是getMap不接受,因为它不接受Objects []。 现在,如果我使用 它会完美地工作…但是我需要以某种方式传递所有位置…我当然可以继续添加等等,但是数组的大小会有所不同。我只是不习惯整个概念 最简单的方法是什么
问题内容: 我正在A中使用RUN指令安装rpm 但是,我想将值“ 2.3”作为参数传递。我的RUN指令应类似于: 哪里 问题答案: 您正在寻找和指导。这些是Docker 1.9中的新功能。查看https://docs.docker.com/engine/reference/builder/#arg。这将允许您添加到,然后使用构建。
问题内容: 我很好奇Go中是否有可能。我有多种方法的类型。是否可以有一个函数,该函数需要一个方法参数,然后将其称为类型? 这是我想要的一个小例子: Go认为type 有一个称为的方法,而不是用传入的方法名称替换它。 问题答案: 是的,有可能。您有2(3)个选项: 规范:方法表达式 该表达式产生的功能与第一个参数等效,但具有一个显式接收器。它有签名。 在这里,方法接收器是显式的。您只需将方法名称(具
问题内容: 我想将登录用户单击的sa 列表中的传递给twitter bootstrap 。我正在与 angularjs* 一起使用 grails ,其中数据是通过 angularjs 呈现的。 *** 组态 我的grails视图页面是 我的是 所以,我怎么能传递到? 问题答案: 我尝试如下。 我在 鼓励 按钮上打电话给angularjs控制器, 我设置的从angularjs控制器。 我提供了一个p
问题内容: 这似乎是一个愚蠢的问题,但是我是这个话题的新手。我正在致力于关于节点js的承诺。我想将参数传递给Promise函数。但是我不知道。 而功能类似于 问题答案: 将Promise包裹在一个函数中,否则它将立即开始工作。另外,您可以将参数传递给函数: 然后,使用它: ES6: 用:
我有办法 我想知道如果它真的创建了用户,我是否可以对其进行单元测试。但是它没有参数。 我尝试了以下方法: 然而,这实际上让我通过了与扫描仪的争论,这显然是我在测试中无法做到的。还尝试了其他逻辑。我也尝试过使用when(),,,但我找不到解决这个问题的方法,因为我对模仿还比较陌生。 有人能和我分享一些想法吗?
我必须向main方法传递两个参数。我的构建脚本是 如果我尝试: 然后发生了一个错误。 如何轻松地将参数传递到主方法命令行?
问题内容: 我正在使用Go内置的http服务器,并拍拍来响应一些URL: 我需要向该处理函数传递一个额外的参数-一个接口。 如何向处理程序函数发送额外的参数? 问题答案: 通过使用闭包,您应该能够做您想做的事情。 更改为以下内容(未测试): 然后对