当前位置: 首页 > 知识库问答 >
问题:

.csv在选择配置单元查询时不是SequenceFile错误

龙嘉玉
2023-03-14

我是Spark和Scala的新手;)

代码摘要:

从CSV文件读取数据--

你能帮我找出哪里出了问题吗。代码并不复杂。该作业在集群执行良好。所以,当我试图可视化写在配置单元表上的数据时,我面临着一个问题。

蜂箱

失败与异常java.io.IOException:java.io.IOException:hdfs://m01.itversity.com:9000/user/itv000666/warehouse/updatedcustomers.db/customers/part-00000-348a54cf-aa0c-45b4-ac49-3a881ae39702_00000.c000. csv不是一个序列文件

object LapeyreSparkDemo extends App {
  
  //Getting spark ready
  val sparkConf = new SparkConf()
  sparkConf.set("spark.app.name","Spark for Lapeyre")
  
  //Creating Spark Session
  val spark = SparkSession.builder()
                          .config(sparkConf)
                          .enableHiveSupport()
                          .config("spark.sql.warehouse.dir","/user/itv000666/warehouse")
                          .getOrCreate()                       
  Logger.getLogger(getClass.getName).info("Spark Session Created Successfully")
  
  //Reading
  Logger.getLogger(getClass.getName).info("Data loading in DF started")
  val ordersSchema = "orderid Int, customerName String, orderDate String, custId Int, orderStatus 
  String, age String, amount Int" 
  val orders2019Df = spark.read
  .format("csv")
  .option("header",true)
  .schema(ordersSchema)
  .option("path","/user/itv0006666/lapeyrePoc/orders2019.csv")
  .load
  val newOrder = orders2019Df.withColumnRenamed("custId", "oldCustId")
                             .withColumnRenamed("customername","oldCustomerName")
   
  val orders2020Df = spark.read
  .format("csv")
  .option("header",true)
  .schema(ordersSchema)
  .option("path","/user/itv000666/lapeyrePoc/orders2020.csv")
  .load
  
  Logger.getLogger(getClass.getName).info("Data loading in DF complete")
  
  //processing
  Logger.getLogger(getClass.getName).info("Processing Started")
  val joinCondition = newOrder.col("oldCustId") === orders2020Df.col("custId")
  val joinType = "inner"
  val joinData = newOrder.join(orders2020Df, joinCondition, joinType)
                             .select("custId","customername")
  
  //Writing
  
  spark.sql("create database if not exists updatedCustomers")
                  
  joinData.write
  .format("csv")
  .mode(SaveMode.Overwrite)
  .bucketBy(4, "custId")
  .sortBy("custId")
  .saveAsTable("updatedCustomers.Customers")
                        
  //Stopping Spark Session
  spark.stop()

}

如果需要更多信息,请告诉我。提前谢谢。

共有1个答案

尉迟景福
2023-03-14

这就是罪魁祸首

joinData.write
.format("csv")

相反,使用这个,它的工作。

joinData.write
.format("Hive")

由于我正在将数据写入hive表(orc格式),因此格式应该是“Hive”而不是csv。

另外,在创建spark会话时,不要忘记启用配置单元支持。此外,在spark 2中,bucketby

 类似资料:
  • 我正在使用twitter cloudera示例创建一个表,虽然我已经成功地创建了表并获得了数据,但我遇到了一个问题。 我可以执行并返回数据,但当我进行更深入的操作(如)时,我会收到一个错误。 以下是错误和堆栈跟踪: hive>从tweets中选择计数(*);MapReduce作业总数=1启动作业1编译时确定的1个reduce任务中的1个:1为了更改还原器的平均负载(以字节为单位):set hive

  • 当我运行以下配置单元命令时 hive-e‘选择msg,将(*)从表中计数为cnt,其中像“%abcd%”这样的msg按msg排序按cnt desc;’sed的/[\t]/,/g'>table.csv 失败:ParseException第1:89行无法识别表达式规范中“like”“%”“password”附近的输入 我知道在指定字符串“%abcd%”时有问题。该命令在配置单元环境中工作正常,但这里我

  • 低于范围的查询与连接工作正常lap,但不是在Hiveserver2/Hive. CLI。 请建议如何在Hive中使用范围连接查询。 配置单元版本:1.2.1.2.6 HDP版本:2.6.0.3 查询: 下面是在配置单元CLI或配置单元服务器2中运行时引发的错误: 错误:编译语句时出错:失败:SemanticException行0:-1在联接“obsv_stop_ts”(状态=42000,代码=40

  • 我有一张蜂巢桌。我正在为配置单元表使用JSON数据。当我选择整个表时,它对我有效。如果我选择一个特定的列,它会打印空值。 它打印NULL NULL NULL 任何帮助都将不胜感激。

  • 下面的查询是我通过配置单元客户端、Java程序JDBC和Beeline执行的。 在Hive cilent上执行时,只需21s就完成了,而Java程序JDBC和beeline分别需要110s和200s。 配置单元客户端仅使用一个映射器就完成了此操作。 Java JDBC和beeline执行了5个MR作业。每个MR作业需要2个映射器和1个还原器。 下面是每个表的行计数。 table_one有44981

  • 为了提高配置单元查询的性能,有哪些优化参数 配置单元版本:-Hive 0.13.1-cdh5.2.1 配置单元查询:- 设置hive.exec.parallel=true; 您能建议任何其他设置,除了以上,以提高配置单元查询的性能,我正在使用的类型查询。