java dataset join_Java Dataset.withColumn方法代码示例

冯野
2023-12-01

import org.apache.spark.sql.Dataset; //导入方法依赖的package包/类

private void start() {

Dataset householdDf = getHouseholdDataframe();

Dataset populationDf = getPopulationDataframe();

Dataset indexDf = joinHouseholdPopulation(householdDf, populationDf);

Dataset salesDf = getSalesData();

Dataset salesIndexDf = salesDf

.join(indexDf, salesDf.col("zipcode").equalTo(indexDf.col("zipcode")), "left")

.drop(indexDf.col("zipcode"));

salesIndexDf = salesIndexDf.withColumn("revenue_by_inh", salesIndexDf.col("revenue")

.divide(salesIndexDf.col("pop")));

salesIndexDf = salesIndexDf.orderBy(col("revenue_by_inh").desc());

Row bestRow = salesIndexDf.first();

double bestRevenuePerInhabitant = ((BigDecimal) bestRow.getAs("revenue_by_inh"))

.doubleValue();

int populationOfBestRevenuePerInhabitant = bestRow.getAs("pop");

double incomeOfBestRevenuePerInhabitant = bestRow.getAs("income_per_inh");

salesIndexDf = salesIndexDf.withColumn(

"best_revenue_per_inh",

salesIndexDf.col("pop").divide(salesIndexDf.col("pop"))

.multiply(bestRevenuePerInhabitant));

salesIndexDf = salesIndexDf.withColumn(

"pop_of_best",

lit(populationOfBestRevenuePerInhabitant));

salesIndexDf = salesIndexDf.withColumn(

"income_of_best",

lit(incomeOfBestRevenuePerInhabitant));

salesIndexDf = salesIndexDf.withColumn(

"idx_revenue",

salesIndexDf.col("best_revenue_per_inh")

.divide(salesIndexDf.col("revenue_by_inh")));

salesIndexDf = salesIndexDf.withColumn(

"idx_pop",

salesIndexDf.col("pop").divide(salesIndexDf.col("pop_of_best")));

salesIndexDf = salesIndexDf.withColumn(

"idx_income",

salesIndexDf.col("income_per_inh").divide(salesIndexDf.col("income_of_best")));

salesIndexDf = salesIndexDf.withColumn(

"index",

salesIndexDf.col("idx_revenue").multiply(salesIndexDf.col("idx_pop")

.multiply(salesIndexDf.col("idx_income"))));

salesIndexDf = salesIndexDf.withColumn(

"potential_revenue",

salesIndexDf.col("revenue").multiply(salesIndexDf.col("index")));

salesIndexDf = salesIndexDf

.drop("idx_income")

.drop("idx_pop")

.drop("idx_revenue")

.drop("income_of_best")

.drop("total_income")

.drop("revenue_by_inh")

.drop("pop_of_best")

.drop("best_revenue_per_inh")

.orderBy(salesIndexDf.col("potential_revenue").desc());

salesIndexDf.show();

}

 类似资料: