我是Spark和Scala的新手;)
代码摘要:
从CSV文件读取数据--
你能帮我找出哪里出了问题吗。代码并不复杂。该作业在集群上执行良好。所以,当我试图可视化写在配置单元表上的数据时,我面临着一个问题。
蜂箱
失败与异常java.io.IOException:java.io.IOException:hdfs://m01.itversity.com:9000/user/itv000666/warehouse/updatedcustomers.db/customers/part-00000-348a54cf-aa0c-45b4-ac49-3a881ae39702_00000.c000. csv不是一个序列文件
object LapeyreSparkDemo extends App {
//Getting spark ready
val sparkConf = new SparkConf()
sparkConf.set("spark.app.name","Spark for Lapeyre")
//Creating Spark Session
val spark = SparkSession.builder()
.config(sparkConf)
.enableHiveSupport()
.config("spark.sql.warehouse.dir","/user/itv000666/warehouse")
.getOrCreate()
Logger.getLogger(getClass.getName).info("Spark Session Created Successfully")
//Reading
Logger.getLogger(getClass.getName).info("Data loading in DF started")
val ordersSchema = "orderid Int, customerName String, orderDate String, custId Int, orderStatus
String, age String, amount Int"
val orders2019Df = spark.read
.format("csv")
.option("header",true)
.schema(ordersSchema)
.option("path","/user/itv0006666/lapeyrePoc/orders2019.csv")
.load
val newOrder = orders2019Df.withColumnRenamed("custId", "oldCustId")
.withColumnRenamed("customername","oldCustomerName")
val orders2020Df = spark.read
.format("csv")
.option("header",true)
.schema(ordersSchema)
.option("path","/user/itv000666/lapeyrePoc/orders2020.csv")
.load
Logger.getLogger(getClass.getName).info("Data loading in DF complete")
//processing
Logger.getLogger(getClass.getName).info("Processing Started")
val joinCondition = newOrder.col("oldCustId") === orders2020Df.col("custId")
val joinType = "inner"
val joinData = newOrder.join(orders2020Df, joinCondition, joinType)
.select("custId","customername")
//Writing
spark.sql("create database if not exists updatedCustomers")
joinData.write
.format("csv")
.mode(SaveMode.Overwrite)
.bucketBy(4, "custId")
.sortBy("custId")
.saveAsTable("updatedCustomers.Customers")
//Stopping Spark Session
spark.stop()
}
如果需要更多信息,请告诉我。提前谢谢。
这就是罪魁祸首
joinData.write
.format("csv")
相反,使用这个,它的工作。
joinData.write
.format("Hive")
由于我正在将数据写入hive表(orc格式),因此格式应该是“Hive”而不是csv。
另外,在创建spark会话时,不要忘记启用配置单元支持。此外,在spark 2中,bucketby
我正在使用twitter cloudera示例创建一个表,虽然我已经成功地创建了表并获得了数据,但我遇到了一个问题。 我可以执行并返回数据,但当我进行更深入的操作(如)时,我会收到一个错误。 以下是错误和堆栈跟踪: hive>从tweets中选择计数(*);MapReduce作业总数=1启动作业1编译时确定的1个reduce任务中的1个:1为了更改还原器的平均负载(以字节为单位):set hive
当我运行以下配置单元命令时 hive-e‘选择msg,将(*)从表中计数为cnt,其中像“%abcd%”这样的msg按msg排序按cnt desc;’sed的/[\t]/,/g'>table.csv 失败:ParseException第1:89行无法识别表达式规范中“like”“%”“password”附近的输入 我知道在指定字符串“%abcd%”时有问题。该命令在配置单元环境中工作正常,但这里我
低于范围的查询与连接工作正常lap,但不是在Hiveserver2/Hive. CLI。 请建议如何在Hive中使用范围连接查询。 配置单元版本:1.2.1.2.6 HDP版本:2.6.0.3 查询: 下面是在配置单元CLI或配置单元服务器2中运行时引发的错误: 错误:编译语句时出错:失败:SemanticException行0:-1在联接“obsv_stop_ts”(状态=42000,代码=40
我有一张蜂巢桌。我正在为配置单元表使用JSON数据。当我选择整个表时,它对我有效。如果我选择一个特定的列,它会打印空值。 它打印NULL NULL NULL 任何帮助都将不胜感激。
下面的查询是我通过配置单元客户端、Java程序JDBC和Beeline执行的。 在Hive cilent上执行时,只需21s就完成了,而Java程序JDBC和beeline分别需要110s和200s。 配置单元客户端仅使用一个映射器就完成了此操作。 Java JDBC和beeline执行了5个MR作业。每个MR作业需要2个映射器和1个还原器。 下面是每个表的行计数。 table_one有44981
为了提高配置单元查询的性能,有哪些优化参数 配置单元版本:-Hive 0.13.1-cdh5.2.1 配置单元查询:- 设置hive.exec.parallel=true; 您能建议任何其他设置,除了以上,以提高配置单元查询的性能,我正在使用的类型查询。