import org.apache.spark.sql.Dataset; //导入方法依赖的package包/类
private void start() {
Dataset householdDf = getHouseholdDataframe();
Dataset populationDf = getPopulationDataframe();
Dataset indexDf = joinHouseholdPopulation(householdDf, populationDf);
Dataset salesDf = getSalesData();
Dataset salesIndexDf = salesDf
.join(indexDf, salesDf.col("zipcode").equalTo(indexDf.col("zipcode")), "left")
.drop(indexDf.col("zipcode"));
salesIndexDf = salesIndexDf.withColumn("revenue_by_inh", salesIndexDf.col("revenue")
.divide(salesIndexDf.col("pop")));
salesIndexDf = salesIndexDf.orderBy(col("revenue_by_inh").desc());
Row bestRow = salesIndexDf.first();
double bestRevenuePerInhabitant = ((BigDecimal) bestRow.getAs("revenue_by_inh"))
.doubleValue();
int populationOfBestRevenuePerInhabitant = bestRow.getAs("pop");
double incomeOfBestRevenuePerInhabitant = bestRow.getAs("income_per_inh");
salesIndexDf = salesIndexDf.withColumn(
"best_revenue_per_inh",
salesIndexDf.col("pop").divide(salesIndexDf.col("pop"))
.multiply(bestRevenuePerInhabitant));
salesIndexDf = salesIndexDf.withColumn(
"pop_of_best",
lit(populationOfBestRevenuePerInhabitant));
salesIndexDf = salesIndexDf.withColumn(
"income_of_best",
lit(incomeOfBestRevenuePerInhabitant));
salesIndexDf = salesIndexDf.withColumn(
"idx_revenue",
salesIndexDf.col("best_revenue_per_inh")
.divide(salesIndexDf.col("revenue_by_inh")));
salesIndexDf = salesIndexDf.withColumn(
"idx_pop",
salesIndexDf.col("pop").divide(salesIndexDf.col("pop_of_best")));
salesIndexDf = salesIndexDf.withColumn(
"idx_income",
salesIndexDf.col("income_per_inh").divide(salesIndexDf.col("income_of_best")));
salesIndexDf = salesIndexDf.withColumn(
"index",
salesIndexDf.col("idx_revenue").multiply(salesIndexDf.col("idx_pop")
.multiply(salesIndexDf.col("idx_income"))));
salesIndexDf = salesIndexDf.withColumn(
"potential_revenue",
salesIndexDf.col("revenue").multiply(salesIndexDf.col("index")));
salesIndexDf = salesIndexDf
.drop("idx_income")
.drop("idx_pop")
.drop("idx_revenue")
.drop("income_of_best")
.drop("total_income")
.drop("revenue_by_inh")
.drop("pop_of_best")
.drop("best_revenue_per_inh")
.orderBy(salesIndexDf.col("potential_revenue").desc());
salesIndexDf.show();
}