当前位置: 首页 > 知识库问答 >
问题:

如何将参数传递给ML Pipeline.fit方法?

羊舌承
2023-03-14

我正在尝试使用

  • Google Dataproc Spark

如下:

>

  • 在bigquery中创建基于用户级别的功能表
    示例功能表的外观

    userid | x1 | x2 | x3 | x4 | x5 | x6 | x7 | x8 | x9 | x10
    00013 | 0.01 | 0 | 0 | 0 | 0 | 0 | 0.06 | 0.09 | 0.001

    #!/usr/bin/python
    """BigQuery I/O PySpark example."""
    import json
    import pprint
    import subprocess
    import pyspark
    import numpy as np
    from pyspark.ml.clustering import KMeans
    from pyspark import SparkContext
    from pyspark.ml import Pipeline
    from pyspark.sql import SQLContext
    from pyspark.mllib.linalg import Vectors, _convert_to_vector
    from pyspark.sql.types import Row
    from pyspark.mllib.common import callMLlibFunc, callJavaFunc, _py2java, _java2py
    sc = pyspark.SparkContext()
    
    # Use the Google Cloud Storage bucket for temporary BigQuery export data used by the InputFormat.
    # This assumes the Google Cloud Storage connector for Hadoop is configured.
    
    bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
    project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
    input_directory ='gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)
     conf = {# Input Parameters
     'mapred.bq.project.id': project,
     'mapred.bq.gcs.bucket': bucket,
     'mapred.bq.temp.gcs.path': input_directory,
     'mapred.bq.input.project.id': 'my-project',
     'mapred.bq.input.dataset.id': 'tempData',
     'mapred.bq.input.table.id': 'userFeatureInBQ'}
    
    # Load data in from BigQuery.
    table_data = sc.newAPIHadoopRDD(
     'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
     'org.apache.hadoop.io.LongWritable',
     'com.google.gson.JsonObject',conf=conf)
    
    # Tranform the userid-Feature table into feature_data RDD
     feature_data = (
     table_data
      .map(lambda (_, record): json.loads(record))
      .map(lambda   x:(x['x0'],x['x1'],x['x2'],x['x3'],x['x4'],
                      x['x5'],x['x6'],x['x7'],x['x8'],
                      x['x9'],x['x10'])))
    
    # Function to convert each line in RDD into an array, return the vector
      def parseVector(values):
         array = np.array([float(v) for v in values])
         return _convert_to_vector(array)
    
    # Convert the RDD into a row wise RDD
      data = feature_data.map(parseVector)
      row_rdd = data.map(lambda x: Row(x))
    
    sqlContext = SQLContext(sc)
    
    # cache the RDD to improve performance
    row_rdd.cache()
    
    # Create a Dataframe
    df = sqlContext.createDataFrame(row_rdd, ["features"])
    
    # cache the Dataframe
    df.cache()
    

    下面是我打印到控制台的Schema和head()

    |-- features: vector (nullable = true)
    [Row(features=DenseVector([0.01,0,0,0,0,0,0,0.06,0.09,0.001]))]
    
    • 多次运行模型
    • 使用不同的参数(即,更改#集群和init_mode)
    • 计算误差或成本度量
    • 选择最佳模型-参数组合
    • 使用KMeans作为估计器创建管道
    • 使用参数映射传递多个参数
    #Define the paramMap & model
    paramMap = ({'k':3,'initMode':'kmeans||'},{'k':3,'initMode':'random'},
      {'k':4,'initMode':'kmeans||'},{'k':4,'initMode':'random'},
      {'k':5,'initMode':'kmeans||'},{'k':5,'initMode':'random'},
      {'k':6,'initMode':'kmeans||'},{'k':6,'initMode':'random'},
      {'k':7,'initMode':'kmeans||'},{'k':7,'initMode':'random'},
      {'k':8,'initMode':'kmeans||'},{'k':8,'initMode':'random'},
      {'k':9,'initMode':'kmeans||'},{'k':9,'initMode':'random'},
      {'k':10,'initMode':'kmeans||'},{'k':10,'initMode':'random'})
    
     km = KMeans()
    
     # Create a Pipeline with estimator stage
     pipeline = Pipeline(stages=[km])
    
     # Call & fit the pipeline with the paramMap
     models = pipeline.fit(df, paramMap)`
     print models
    

    我得到以下输出与警告

    7:03:24警告组织。阿帕奇。火花mllib。集群。KMeans:输入数据未直接缓存,如果其父RDD也未缓存,则可能会影响性能。[PipelineModel_443dbf939b7bd3bf7bfc,PipelineModel_4b64bb761f4efe51da50,PipelineModel_4f58411ac19beacc1A4,PipelineModel_4f58894f14d79b936,PipelineModel_4b81944f7a5e6be6eaf33,PipelineModel_4fc5b6370bf1d7db7dba,PipelineModel_43e0a196f16cfd357;,PipelineModel_47a548;ƞD40006; b262;,PipelineModelľľb4; b4b16; 1b,PipepepelineModel 1b48c4c9968c8、管道模型_4acf9cdbfda184b00328、管道模型_42d1a0c61c5e45cdb3cd、管道模型_4F0DB3C394BCC2C2BB9352、管道模型_441697f2748328de251c、管道模型_4a64ae517d270a1e0d5a、管道模型_4372BC8B184C05B0]

    #Print the cluster centers:
    for model in models:
        print vars(model)
        print model.stages[0].clusterCenters()
        print model.extractParamMap()
    

    (7.646767666638E-07,3.58531391e-01,1.68879698e-03,0.00000000e 00,1.5347707043E-00,1.534747477.47477-00,1.53477.477.64676666666638E-07,1.68878787879696989898E-03,0.0.0.00万万万万电子00万电子00,1.53477 7.4770707070437-00万E-00,1.477-02,1.53477 7.477.477-4.437-437-0 0-02,1.7-2,1.5万E-02,1.4.477-4.477070707-437-0 0万E-00,1.0-00,1.7-00,1.5万E-02,1.5万E-00,1.24364600e-02])

    此处列出了问题列表,需要以下方面的帮助:

    • 我得到一个列表,其中只有2个集群中心作为所有模型的数组,
      • 当我试图访问管道时,KMeans模型似乎默认为k=2?为什么会发生这种事?
      • 最后一个循环应该访问管道模型和第0阶段,并运行clusterCenter()方法?这是正确的方法吗?
      • 为什么会出现数据未缓存的错误?
      • 这违背了使用管道并行运行KMeans模型和模型选择的目的,但是我尝试了以下代码:
      #computeError
      def computeCost(model, rdd):`
      """Return the K-means cost (sum of squared distances of
       points to their nearest center) for this model on the given data."""
          cost = callMLlibFunc("computeCostKmeansModel",
                                rdd.map(_convert_to_vector),
                     [_convert_to_vector(c) for c in model.clusterCenters()])
          return cost
      
      cost= np.zeros(len(paramMap))
      
      for i in range(len(paramMap)):
          cost[i] = cost[i] + computeCost(model[i].stages[0], feature_data)
      print cost
      

      这将在循环结束时打印出以下内容:

      [634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.002944687 634035.00294687 634035.00294687 634035.00294687 634035.00294687 634035.0029487634035.00294687 634035.00294687 634035.00294687 634035.0029487634035.0029487 63294687]

      • 每个模型计算的成本/误差是否相同?同样,无法使用正确的参数访问pipelineModel

      非常感谢您的帮助/指导!谢谢


  • 共有1个答案

    孙洋
    2023-03-14

    您的参数未正确定义。它应该从特定参数映射到值,而不是从任意名称映射。您将获得等于2的k,因为您传递的参数未被使用,并且每个模型都使用完全相同的默认参数。

    让我们从示例数据开始:

    import numpy as np
    from pyspark.mllib.linalg import Vector
    
    df = (sc.textFile("data/mllib/kmeans_data.txt")
      .map(lambda s: Vectors.dense(np.fromstring(s, dtype=np.float64, sep=" ")))
      .zipWithIndex()
      .toDF(["features", "id"]))
    

    管道

    from pyspark.ml.clustering import KMeans
    from pyspark.ml import Pipeline
    
    km = KMeans()
    
    pipeline = Pipeline(stages=[km])
    

    如上所述,参数映射应使用特定参数作为键。例如:

    params = [
        {km.k: 2, km.initMode: "k-means||"},
        {km.k: 3, km.initMode: "k-means||"},
        {km.k: 4, km.initMode: "k-means||"}
    ]
    
    models = pipeline.fit(df, params=params)
    
    assert [len(m.stages[0].clusterCenters()) for m in models] == [2, 3, 4]
    

    笔记:

    • 正确的initMode为K-意味着||是k-意味着||不是k意味着||
    • 在管道中使用参数映射并不意味着模型是并行训练的。Spark将训练过程并行于数据而不是参数。它只不过是一种方便的方法。
    • 您会收到关于未缓存数据的警告,因为K-Means的实际输入不是DataFrame,而是经过转换的RDD。
     类似资料:
    • 问题内容: 基本上我有一个ArrayList的位置: 在此之下,我调用以下方法: getMap()方法中的参数为: 我遇到的问题是我不确定如何在整个方法列表中传递该方法。 我试过了 但是getMap不接受,因为它不接受Objects []。 现在,如果我使用 它会完美地工作…但是我需要以某种方式传递所有位置…我当然可以继续添加等等,但是数组的大小会有所不同。我只是不习惯整个概念 最简单的方法是什么

    • 问题内容: 我正在A中使用RUN指令安装rpm 但是,我想将值“ 2.3”作为参数传递。我的RUN指令应类似于: 哪里 问题答案: 您正在寻找和指导。这些是Docker 1.9中的新功能。查看https://docs.docker.com/engine/reference/builder/#arg。这将允许您添加到,然后使用构建。

    • 问题内容: 我很好奇Go中是否有可能。我有多种方法的类型。是否可以有一个函数,该函数需要一个方法参数,然后将其称为类型? 这是我想要的一个小例子: Go认为type 有一个称为的方法,而不是用传入的方法名称替换它。 问题答案: 是的,有可能。您有2(3)个选项: 规范:方法表达式 该表达式产生的功能与第一个参数等效,但具有一个显式接收器。它有签名。 在这里,方法接收器是显式的。您只需将方法名称(具

    • 问题内容: 我想将登录用户单击的sa 列表中的传递给twitter bootstrap 。我正在与 angularjs* 一起使用 grails ,其中数据是通过 angularjs 呈现的。 *** 组态 我的grails视图页面是 我的是 所以,我怎么能传递到? 问题答案: 我尝试如下。 我在 鼓励 按钮上打电话给angularjs控制器, 我设置的从angularjs控制器。 我提供了一个p

    • 问题内容: 这似乎是一个愚蠢的问题,但是我是这个话题的新手。我正在致力于关于节点js的承诺。我想将参数传递给Promise函数。但是我不知道。 而功能类似于 问题答案: 将Promise包裹在一个函数中,否则它将立即开始工作。另外,您可以将参数传递给函数: 然后,使用它: ES6: 用:

    • 我有办法 我想知道如果它真的创建了用户,我是否可以对其进行单元测试。但是它没有参数。 我尝试了以下方法: 然而,这实际上让我通过了与扫描仪的争论,这显然是我在测试中无法做到的。还尝试了其他逻辑。我也尝试过使用when(),,,但我找不到解决这个问题的方法,因为我对模仿还比较陌生。 有人能和我分享一些想法吗?

    • 我必须向main方法传递两个参数。我的构建脚本是 如果我尝试: 然后发生了一个错误。 如何轻松地将参数传递到主方法命令行?

    • 问题内容: 我正在使用Go内置的http服务器,并拍拍来响应一些URL: 我需要向该处理函数传递一个额外的参数-一个接口。 如何向处理程序函数发送额外的参数? 问题答案: 通过使用闭包,您应该能够做您想做的事情。 更改为以下内容(未测试): 然后对