分类和回归 - 保序回归


1 保序回归

  保序回归解决了下面的问题:给定包含n个数据点的序列 y_1,y_2,...,y_n , 怎样通过一个单调的序列 beta_1,beta_2,...,beta_n 来归纳这个问题。形式上,这个问题就是为了找到


  大部分时候,我们会在括号前加上权重w_i。解决这个问题的一个方法就是 pool adjacent violators algorithm(PAVA) 算法。粗略的讲,PAVA算法的过程描述如下:

  我们从左边的y_1开始,右移y_1直到我们遇到第一个违例(violation)即y_i < y_i+1,然后,我们用违例之前的所有y的平方替换这些y,以满足单调性。我们继续这个过程,直到我们最后到达y_n

2 近似保序



  在上式中,X_+表示正数部分,即X_+ = X.1 (x>0)。这是一个凸优化问题,处罚项处罚违反单调性(即beta_i > beta_i+1)的邻近对。




3 近似保序算法流程




  这个引理证明的事实极大地简化了近似保序解路径(solution path)的构造。假设在参数值为lambda的情况下,有K_lambda个连接块,我们用A_1,A_2,..,A_K_lambda表示。这样我们可以重写(2)为如下(3)的形式。




  为了符合方便,令s_0 = s_K_lambda = 0。并且,





  随着lambda的增长,方程(5)将连续的给出解决方案的斜率直到组A_1,A_2,...,A_K_lambda改变。更加引理1,只有两个组合并时,这才会发生。m_i表示斜率,那么对于每一个i=1,...,K_lambda - 1A_iA_i+1合并之后得到的公式如下


  因此我们可以一直移动,直到lambda “下一个”值的到来






  • 初始时,lambda=0K_lambda=n,A_i={i},i=1,2,...,n。对于每个i,解是beta_lambda,i = y_i

  • 重复下面过程



  3、如果t_i,i+1 < lambda,终止



  对于每个i,根据公式(8)合并合适的组别(所以K_lambda^star = K_lambda - 1),并设置lambda = lambda^star

4 源码分析


4.1 实例

  1. import org.apache.spark.mllib.regression.{IsotonicRegression, IsotonicRegressionModel}
  2. val data = sc.textFile("data/mllib/sample_isotonic_regression_data.txt")
  3. // 创建(label, feature, weight) tuples ,权重默认设置为1.0
  4. val parsedData = data.map { line =>
  5. val parts = line.split(',').map(_.toDouble)
  6. (parts(0), parts(1), 1.0)
  7. }
  8. // Split data into training (60%) and test (40%) sets.
  9. val splits = parsedData.randomSplit(Array(0.6, 0.4), seed = 11L)
  10. val training = splits(0)
  11. val test = splits(1)
  12. // Create isotonic regression model from training data.
  13. // Isotonic parameter defaults to true so it is only shown for demonstration
  14. val model = new IsotonicRegression().setIsotonic(true).run(training)
  15. // Create tuples of predicted and real labels.
  16. val predictionAndLabel = test.map { point =>
  17. val predictedLabel = model.predict(point._2)
  18. (predictedLabel, point._1)
  19. }
  20. // Calculate mean squared error between predicted and real labels.
  21. val meanSquaredError = predictionAndLabel.map { case (p, l) => math.pow((p - l), 2) }.mean()
  22. println("Mean Squared Error = " + meanSquaredError)

4.2 训练过程分析


  1. private def parallelPoolAdjacentViolators(
  2. input: RDD[(Double, Double, Double)]): Array[(Double, Double, Double)] = {
  3. val parallelStepResult = input
  4. //以(feature,label)为key进行排序
  5. .sortBy(x => (x._2, x._1))
  6. .glom()//合并不同分区的数据为一个数组
  7. .flatMap(poolAdjacentViolators)
  8. .collect()
  9. .sortBy(x => (x._2, x._1)) // Sort again because collect() doesn't promise ordering.
  10. poolAdjacentViolators(parallelStepResult)
  11. }


  1. var i = 0
  2. val len = input.length
  3. while (i < len) {
  4. var j = i
  5. //找到破坏单调性的元祖的index
  6. while (j < len - 1 && input(j)._1 > input(j + 1)._1) {
  7. j = j + 1
  8. }
  9. // 如果没有找到违规点,移动到下一个数据点
  10. if (i == j) {
  11. i = i + 1
  12. } else {
  13. // 否则用pool方法处理违规的节点
  14. // 并且检查pool之后,之前处理过的节点是否违反了单调性约束
  15. while (i >= 0 && input(i)._1 > input(i + 1)._1) {
  16. pool(input, i, j)
  17. i = i - 1
  18. }
  19. i = j
  20. }
  21. }


  1. def pool(input: Array[(Double, Double, Double)], start: Int, end: Int): Unit = {
  2. //取得i到j之间的元组组成的子序列
  3. val poolSubArray = input.slice(start, end + 1)
  4. //求子序列sum(label * w)之和
  5. val weightedSum = poolSubArray.map(lp => lp._1 * lp._3).sum
  6. //求权重之和
  7. val weight = poolSubArray.map(_._3).sum
  8. var i = start
  9. //子区间的所有元组标签相同,即拥有相同的预测
  10. while (i <= end) {
  11. //修改标签值为两者之商
  12. input(i) = (weightedSum / weight, input(i)._2, input(i)._3)
  13. i = i + 1
  14. }
  15. }


  1. val compressed = ArrayBuffer.empty[(Double, Double, Double)]
  2. var (curLabel, curFeature, curWeight) = input.head
  3. var rightBound = curFeature
  4. def merge(): Unit = {
  5. compressed += ((curLabel, curFeature, curWeight))
  6. if (rightBound > curFeature) {
  7. compressed += ((curLabel, rightBound, 0.0))
  8. }
  9. }
  10. i = 1
  11. while (i < input.length) {
  12. val (label, feature, weight) = input(i)
  13. if (label == curLabel) {
  14. //权重叠加
  15. curWeight += weight
  16. rightBound = feature
  17. } else {//如果标签不同,合并
  18. merge()
  19. curLabel = label
  20. curFeature = feature
  21. curWeight = weight
  22. rightBound = curFeature
  23. }
  24. i += 1
  25. }
  26. merge()


  1. //标签集
  2. val predictions = if (isotonic) pooled.map(_._1) else pooled.map(-_._1)
  3. //特征集
  4. val boundaries = pooled.map(_._2)
  5. new IsotonicRegressionModel(boundaries, predictions, isotonic)

4.3 预测过程分析

  1. def predict(testData: Double): Double = {
  2. def linearInterpolation(x1: Double, y1: Double, x2: Double, y2: Double, x: Double): Double = {
  3. y1 + (y2 - y1) * (x - x1) / (x2 - x1)
  4. }
  5. //二分查找index
  6. val foundIndex = binarySearch(boundaries, testData)
  7. val insertIndex = -foundIndex - 1
  8. // Find if the index was lower than all values,
  9. // higher than all values, in between two values or exact match.
  10. if (insertIndex == 0) {
  11. predictions.head
  12. } else if (insertIndex == boundaries.length){
  13. predictions.last
  14. } else if (foundIndex < 0) {
  15. linearInterpolation(
  16. boundaries(insertIndex - 1),
  17. predictions(insertIndex - 1),
  18. boundaries(insertIndex),
  19. predictions(insertIndex),
  20. testData)
  21. } else {
  22. predictions(foundIndex)
  23. }
  24. }
