当前位置: 首页 > 面试题库 >

如何在PySpark中创建自定义估算器

暨宸
2023-03-14
问题内容

我正在尝试Estimator在PySpark MLlib构建一个简单的自定义。我在这里可以编写自定义的Transformer,但是我不确定如何在.NET上执行此操作Estimator。我也不明白做什么@keyword_only,为什么我需要这么多的设置方法和获取方法。Scikit-learn似乎有一个适用于自定义模型的文档(请参阅此处,但PySpark没有。

示例模型的伪代码:

class NormalDeviation():
    def __init__(self, threshold = 3):
    def fit(x, y=None):
       self.model = {'mean': x.mean(), 'std': x.std()]
    def predict(x):
       return ((x-self.model['mean']) > self.threshold * self.model['std'])
    def decision_function(x): # does ml-lib support this?

问题答案:

一般来说,没有文档,因为对于Spark 1.6 / 2.0,大多数相关API都不打算公开。它应该在Spark 2.1.0中更改(请参阅SPARK-7146)。

API是比较复杂的,因为它必须遵循特定的惯例,以使给定TransformerEstimator兼容的PipelineAPI。对于某些功能,例如读写和网格搜索,可能需要其中一些方法。其他,例如keyword_only,只是简单的帮手,并非严格要求。

假设您已经为平均参数定义了以下混合:

from pyspark.ml.pipeline import Estimator, Model, Pipeline
from pyspark.ml.param.shared import *
from pyspark.sql.functions import avg, stddev_samp


class HasMean(Params):

    mean = Param(Params._dummy(), "mean", "mean", 
        typeConverter=TypeConverters.toFloat)

    def __init__(self):
        super(HasMean, self).__init__()

    def setMean(self, value):
        return self._set(mean=value)

    def getMean(self):
        return self.getOrDefault(self.mean)

标准偏差参数:

class HasStandardDeviation(Params):

    standardDeviation = Param(Params._dummy(),
        "standardDeviation", "standardDeviation", 
        typeConverter=TypeConverters.toFloat)

    def __init__(self):
        super(HasStandardDeviation, self).__init__()

    def setStddev(self, value):
        return self._set(standardDeviation=value)

    def getStddev(self):
        return self.getOrDefault(self.standardDeviation)

和阈值:

class HasCenteredThreshold(Params):

    centeredThreshold = Param(Params._dummy(),
            "centeredThreshold", "centeredThreshold",
            typeConverter=TypeConverters.toFloat)

    def __init__(self):
        super(HasCenteredThreshold, self).__init__()

    def setCenteredThreshold(self, value):
        return self._set(centeredThreshold=value)

    def getCenteredThreshold(self):
        return self.getOrDefault(self.centeredThreshold)

您可以创建以下基本Estimator内容:

from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable 
from pyspark import keyword_only  

class NormalDeviation(Estimator, HasInputCol, 
        HasPredictionCol, HasCenteredThreshold,
        # Available in PySpark >= 2.3.0 
        # Credits https://stackoverflow.com/a/52467470
        # by https://stackoverflow.com/users/234944/benjamin-manns
        DefaultParamsReadable, DefaultParamsWritable):

    @keyword_only
    def __init__(self, inputCol=None, predictionCol=None, centeredThreshold=1.0):
        super(NormalDeviation, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    # Required in Spark >= 3.0
    def setInputCol(self, value):
        """
        Sets the value of :py:attr:`inputCol`.
        """
        return self._set(inputCol=value)

    # Required in Spark >= 3.0
    def setPredictionCol(self, value):
        """
        Sets the value of :py:attr:`predictionCol`.
        """
        return self._set(predictionCol=value)

    @keyword_only
    def setParams(self, inputCol=None, predictionCol=None, centeredThreshold=1.0):
        kwargs = self._input_kwargs
        return self._set(**kwargs)        

    def _fit(self, dataset):
        c = self.getInputCol()
        mu, sigma = dataset.agg(avg(c), stddev_samp(c)).first()
        return NormalDeviationModel(
            inputCol=c, mean=mu, standardDeviation=sigma, 
            centeredThreshold=self.getCenteredThreshold(),
            predictionCol=self.getPredictionCol())


class NormalDeviationModel(Model, HasInputCol, HasPredictionCol,
        HasMean, HasStandardDeviation, HasCenteredThreshold,
        DefaultParamsReadable, DefaultParamsWritable):

    @keyword_only
    def __init__(self, inputCol=None, predictionCol=None,
                mean=None, standardDeviation=None,
                centeredThreshold=None):
        super(NormalDeviationModel, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)  

    @keyword_only
    def setParams(self, inputCol=None, predictionCol=None,
                mean=None, standardDeviation=None,
                centeredThreshold=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)           

    def _transform(self, dataset):
        x = self.getInputCol()
        y = self.getPredictionCol()
        threshold = self.getCenteredThreshold()
        mu = self.getMean()
        sigma = self.getStddev()

        return dataset.withColumn(y, (dataset[x] - mu) > threshold * sigma)     

最后,它可以按如下方式使用:

df = sc.parallelize([(1, 2.0), (2, 3.0), (3, 0.0), (4, 99.0)]).toDF(["id", "x"])

normal_deviation = NormalDeviation().setInputCol("x").setCenteredThreshold(1.0)
model  = Pipeline(stages=[normal_deviation]).fit(df)

model.transform(df).show()
## +---+----+----------+
## | id|   x|prediction|
## +---+----+----------+
## |  1| 2.0|     false|
## |  2| 3.0|     false|
## |  3| 0.0|     false|
## |  4|99.0|      true|
## +---+----+----------+


 类似资料:
  • 问题内容: 我正在尝试在Log4j2中编写自己的RewritePolicy。该文档指出: RewritePolicy是一个接口,允许实现在将LogEvent传递给Appender之前检查并可能对其进行修改。RewritePolicy声明一个必须执行的名为rewrite的方法。该方法通过LogEvent传递,并且可以返回相同事件或创建一个新事件。 这是我的 java类 : 这是我的 yaml配置 文

  • 问题内容: 我正在http://www.cafeaulait.org/javafaq.html上阅读#6.10项,然后我开始怀疑大型企业如何创建自己的JVM实现。一个人会尝试(或可行)实验性的东西吗? 问题答案: 从技术上讲,创建该新JVM所需的所有信息都是该语言和目标平台的公共规范。即使字节码解释在很大程度上相同,JVM还是需要根据其是要在台式机还是手机上运行而有所不同。 一些开始寻找信息的地方

  • 问题内容: 我想知道如何在java中设置自己的侦听器。例如,我有一个将数字从1递增到100的函数。我想在值达到50时设置侦听器。我该怎么做?请给我建议任何教程。 问题答案: 查看使用侦听器的所有类的源代码。实际上,这很容易: 为您的听众创建一个界面,例如 维护清单 侦听器应在每个事件上进行遍历,然后遍历列表并使用一些事件参数调用适当的方法 至于观察者模式以及一些Java代码,请看维基百科。

  • 类似地,我们如何在log4j2中创建自定义的appender,因为我们没有AppenderSkelton类要扩展,而所有其他appender都扩展AppenderBase类。

  • 问题内容: 我正在尝试在javaFX中创建自定义光标。这是我的代码: Windows 8.1的游标创建无效吗? 问题答案: 检出ImageCursor.getBestSize()方法和ImageCursor.getMaximumColors()并查看它们返回的内容,然后尝试匹配最佳大小和最大颜色的自定义光标图像。对于Windows 8.1,这很可能是32x32的光标。 这是来自javadoc 的引

  • 问题内容: 如何在Java中创建自定义异常? 问题答案: 要定义受检查的异常,请创建的子类(或子类的层次结构)。例如: 可能引发或传播此异常的方法必须声明它: …,并且调用此方法的代码必须处理或传播此异常(或同时处理): 你会在上面的示例中注意到该错误IOException已被捕获并重新抛出为。这是用于封装异常的常用技术(通常在实现API时)。 有时在某些情况下,你不想强制每个方法在其throws