当前位置: 首页 > 知识库问答 >
问题:

用Java8实现Spark 2的方差分析统计计算

秦安怡
2023-03-14

我在Java8中有一段代码,以便使用Spark SQL API计算Annova统计信息,如下面的代码片段1所示。此代码段是基于原始SCALA代码进行的裁剪,可在https://gist.github.com/srnghn/c74835818802fefabd76f1bcd6746831/77690607caab9039b015d2232c1216500427a995

问题

问题的实质已在以下案文中以粗体突出:

线程“main”org.apache.spark.sql.analysisException中的异常:无法解析“c.sum(valueSq))”给定的输入列:[B.sum(value),d.cat,a.count,c.cat,c.sum(valueSq),b.cat,d.avg(value),a.cat];“项目[cat#51,count#74L,sum(value)#70,'c.sum(valueSq),'avg(value))]

片段1:

private static AnovaStats computeAnovaStats(SparkSession spark, Dataset<Row> outliersDF){
    outliersDF.createOrReplaceTempView("outliersDF");
    Dataset<Row> anovaBaseDF =
            spark.sql("SELECT usercode as cat, cast((frequency) as double) as value FROM outliersDF");

    anovaBaseDF.createOrReplaceTempView("anovaBaseDF");
    Dataset<Row> newDF =
            spark.sql(
                      "SELECT " +
                            "A.cat, A.value, " +
                            "cast((A.value * A.value) as double) as valueSq, " +
                            "((A.value - B.avg) * (A.value - B.avg)) as diffSq " +
                            "FROM anovaBaseDF A " +
                            "JOIN " +
                            "(SELECT cat, avg(value) as avg FROM anovaBaseDF GROUP BY cat) B " +
                            "WHERE A.cat = B.cat");

    RelationalGroupedDataset grouped = newDF.groupBy("cat");
    Dataset<Row> sums = grouped.sum("value");
    Dataset<Row> counts = grouped.count();
    long numCats = counts.count();
    Dataset<Row> sumsq = grouped.sum("valueSq");
    Dataset<Row> avgs = grouped.avg("value");

    double totN = toDouble(counts.agg(org.apache.spark.sql.functions.sum("count")).first().get(0));
    double totSum = toDouble(sums.agg(org.apache.spark.sql.functions.sum("sum(value)")).first().get(0));
    double totSumSq = toDouble(sumsq.agg(org.apache.spark.sql.functions.sum("sum(valueSq)")).first().get(0));

    double totMean = totSum / totN;
    double dft = totN - 1;
    double dfb = numCats - 1;
    double dfw = totN - numCats;

    //!!!! VARIABLE UNDER QUESTION IS AS FOLLOWS !!!!
    Dataset<Row> joined =
            (counts.as("a")
                    .join(sums.as("b"), (col("a.cat").$eq$eq$eq(col("b.cat"))))
                    .join(sumsq.as("c"), (col("a.cat").$eq$eq$eq(col("c.cat"))))
                    .join(avgs.as("d"), (col("a.cat").$eq$eq$eq(col("d.cat"))))
                    .select(col("a.cat"), col("count"), col("sum(value)"),
                            col("sum(valueSq))"), col("avg(value))")));

 /*
 The original SCALA version of the local variable "joined", which is of type 
 "Dataset<Row>", is as follows:

 val joined = (counts.as("a").join(sums.as("b"), $"a.cat" === 
 $"b.cat")).join(sumsq.as("c"),$"a.cat" === 
 $"c.cat").join(avgs.as("d"),$"a.cat"===$"d.cat").select($"a.cat",$"count",$"sum(value)",$"sum(valueSq)",$"avg(value)")
 */

    Dataset<Row> finaldf = joined.withColumn("totMean", lit(totMean));

    JavaPairRDD<String, Double> ssb_tmp =
            finaldf.javaRDD()
                    .mapToPair(x -> new Tuple2(x.getString(0), ((toDouble(x.get(4)) - toDouble(x.get(4))) * (toDouble(x.get(5)) * toDouble(x.get(4)) - toDouble(x.get(4)) * toDouble(x.get(1))))));

    Dataset<Row> ssbDR = spark.sqlContext().createDataset(JavaPairRDD.toRDD(ssb_tmp), Encoders.tuple(Encoders.STRING(),Encoders.DOUBLE())).toDF();
    double ssb = ssbDR.agg(org.apache.spark.sql.functions.sum("_2")).first().getDouble(0);

    Dataset<Row> ssw_tmp = grouped.sum("diffSq");
    double ssw = toDouble(ssw_tmp.agg(org.apache.spark.sql.functions.sum("sum(diffSq)")).first().get(0));

    double sst = ssb + ssw;
    double msb = ssb / dfb;
    double msw = ssw / dfw;
    double fValue = msb / msw;
    double etaSq = ssb / sst;
    double omegaSq = (ssb - ((numCats - 1) * msw))/(sst + msw);

    AnovaStats anovaStats = new AnovaStats(dfb, dfw, fValue, etaSq, omegaSq);
    return anovaStats;
}

private static double toDouble(Object value){
    double retVal = 0d;
    if(value instanceof  Double){
        retVal = ((Double) value).doubleValue();
    } else if (value instanceof Long){
        retVal = ((Long) value).doubleValue();
    } else if (value == null){
        retVal = 0d;
    }
    return retVal;
}

片段2:

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '`c.sum(valueSq))`' given input columns: [b.sum(value), d.cat, a.count, c.cat, c.sum(valueSq), b.cat, d.avg(value), a.cat];;
'Project [cat#51, count#74L, sum(value)#70, 'c.sum(valueSq)), 'avg(value))]
+- AnalysisBarrier
      +- Join Inner, (cat#51 = cat#175)
         :- Join Inner, (cat#51 = cat#154)
         :  :- Join Inner, (cat#51 = cat#139)
         :  :  :- SubqueryAlias a
         :  :  :  +- Aggregate [cat#51], [cat#51, count(1) AS count#74L]
         :  :  :     +- Project [cat#51, value#52, cast((value#52 * value#52) as double) AS valueSq#56, ((value#52 - avg#55) * (value#52 - avg#55)) AS diffSq#57]
         :  :  :        +- Filter (cat#51 = cat#59)
         :  :  :           +- Join Inner
         :  :  :              :- SubqueryAlias A
         :  :  :              :  +- SubqueryAlias anovabasedf
         :  :  :              :     +- Project [usercode#10 AS cat#51, cast(frequency#0L as double) AS value#52]
         :  :  :              :        +- SubqueryAlias outliersdf
         :  :  :              :           +- Filter ((cast(frequency#0L as double) >= -718.5) && (cast(frequency#0L as double) <= 1413.5))
         :  :  :              :              +- Project [flowId#6, StateId#9, usercode#10, frequency#0L]
         :  :  :              :                 +- Filter (frequency#0L > cast(30 as bigint))
         :  :  :              :                    +- SubqueryAlias T
         :  :  :              :                       +- SubqueryAlias basedf
         :  :  :              :                          +- Project [flowId#6, StateId#9, usercode#10, frequency#0L]
         :  :  :              :                             +- Sort [flowId#6 ASC NULLS FIRST, StateId#9 ASC NULLS FIRST, usercode#10 ASC NULLS FIRST], true
         :  :  :              :                                +- Aggregate [flowId#6, StateId#9, usercode#10], [flowId#6, StateId#9, usercode#10, count(instanceuserid#25) AS frequency#0L]

共有1个答案

殷德本
2023-03-14

如评论中所示,有一个错别字错误(即sum(valueSq)而不是sum(valueSq))。相同的avg(值)而不是avg(值))。工作解决方案如下:

private static AnovaStats computeAnovaStats(SparkSession spark, Dataset<Row> outliersDF, int flowId){
    outliersDF.createOrReplaceTempView("outliersDF");
    Dataset<Row> anovaBaseDF =
            spark.sql("SELECT usercode as cat, cast((frequency) as double) as value FROM outliersDF");

    anovaBaseDF.createOrReplaceTempView("anovaBaseDF");
    Dataset<Row> newDF =
            spark.sql(
                      "SELECT " +
                            "A.cat, A.value, " +
                            "cast((A.value * A.value) as double) as valueSq, " +
                            "((A.value - B.avg) * (A.value - B.avg)) as diffSq " +
                            "FROM anovaBaseDF A " +
                            "JOIN " +
                            "(SELECT cat, avg(value) as avg FROM anovaBaseDF GROUP BY cat) B " +
                            "WHERE A.cat = B.cat");

    RelationalGroupedDataset grouped = newDF.groupBy("cat");
    Dataset<Row> sums = grouped.sum("value");
    Dataset<Row> counts = grouped.count();
    long numCats = counts.count();
    Dataset<Row> sumsq = grouped.sum("valueSq");
    Dataset<Row> avgs = grouped.avg("value");

    double totN = toDouble(counts.agg(org.apache.spark.sql.functions.sum("count")).first().get(0));
    double totSum = toDouble(sums.agg(org.apache.spark.sql.functions.sum("sum(value)")).first().get(0));
    double totSumSq = toDouble(sumsq.agg(org.apache.spark.sql.functions.sum("sum(valueSq)")).first().get(0));

    double totMean = totSum / totN;
    double dft = totN - 1;
    double dfb = numCats - 1;
    double dfw = totN - numCats;

    Dataset<Row> joined =
            (counts.as("a")
                    .join(sums.as("b"), (col("a.cat").equalTo(col("b.cat"))))
                    .join(sumsq.as("c"), (col("a.cat").equalTo(col("c.cat"))))
                    .join(avgs.as("d"), (col("a.cat").equalTo(col("d.cat"))))
                    .select(col("a.cat"), col("count"), col("sum(value)"),
                            col("sum(valueSq)"), col("avg(value)")));

    Dataset<Row> finaldf = joined.withColumn("totMean", lit(totMean));

    JavaPairRDD<String, Double> ssb_tmp =
            finaldf.javaRDD()
                    .mapToPair(x -> new Tuple2(x.getString(0), ((toDouble(x.get(4)) - toDouble(x.get(4))) * (toDouble(x.get(5)) * toDouble(x.get(4)) - toDouble(x.get(4)) * toDouble(x.get(1))))));

    Dataset<Row> ssbDR = spark.sqlContext().createDataset(JavaPairRDD.toRDD(ssb_tmp), Encoders.tuple(Encoders.STRING(),Encoders.DOUBLE())).toDF();
    double ssb = ssbDR.agg(org.apache.spark.sql.functions.sum("_2")).first().getDouble(0);

    Dataset<Row> ssw_tmp = grouped.sum("diffSq");
    double ssw = toDouble(ssw_tmp.agg(org.apache.spark.sql.functions.sum("sum(diffSq)")).first().get(0));

    double sst = ssb + ssw;
    double msb = ssb / dfb;
    double msw = ssw / dfw;
    double fValue = msb / msw;
    double etaSq = ssb / sst;
    double omegaSq = (ssb - ((numCats - 1) * msw))/(sst + msw);

    AnovaStats anovaStats = new AnovaStats(dfb, dfw, fValue, etaSq, omegaSq, flowId);
    return anovaStats;
}
 类似资料:
  • 本文向大家介绍Python实现统计代码行的方法分析,包括了Python实现统计代码行的方法分析的使用技巧和注意事项,需要的朋友参考一下 本文实例讲述了Python实现统计代码行的方法。分享给大家供大家参考,具体如下: 参加光荣之路测试开发班已三月有余,吴总上课也总问“ 咱们的课上了这么多次了大家实践了多少行代码了?”。这里是一个一脸懵逼的表情。该怎么统计呢?一个个文件数当然不可取,能用代码解决的事

  • 统计分析 点面互查 缓冲区分析 高级筛选

  • 本文向大家介绍oralce 计算时间差的实现,包括了oralce 计算时间差的实现的使用技巧和注意事项,需要的朋友参考一下 oralce 计算时间差的实现 查询系统时间和给定时间相差的毫秒数 将account表中第一条数据的dcur_state_start_date(timestamp类型)字段值的相差豪秒数算出来 备注 *24(小时) *60(分) *60(秒) *1000(毫秒) 感谢阅读,希

  • 获取小程序概况趋势: $app->data_cube->summaryTrend('20170313', '20170313') 开始日期与结束日期的格式为 yyyymmdd。 API summaryTrend(string $from, string $to); 概况趋势 dailyVisitTrend(string $from, string $to); 访问日趋势 weeklyVisitT

  • 通过数据接口,开发者可以获取与公众平台官网统计模块类似但更灵活的数据,还可根据需要进行高级处理。 {info} 接口侧的公众号数据的数据库中仅存储了 2014年12月1日之后的数据,将查询不到在此之前的日期,即使有查到,也是不可信的脏数据; 请开发者在调用接口获取数据后,将数据保存在自身数据库中,即加快下次用户的访问速度,也降低了微信侧接口调用的不必要损耗。 额外注意,获取图文群发每日数据接口的结

  • 我正在寻找一个在lambda表达式中基于计数的group by、having和then filter的实现。 使用lambda表达式实现这一点是否有任何简单的实现。