当前位置: 首页 > 知识库问答 >
问题:

如何在Spark Scala中读取检查点数据帧

欧阳君浩
2023-03-14

我正在尝试测试以下程序,以获取检查点并从检查点位置读取是否,以防应用程序由于资源不可用等任何原因而失败。当我终止作业并再次触发时,执行将从头开始。不确定实现这一目标还需要什么。谢谢

下面是代码:

import org.apache.log4j._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

object withCheckpoint {

  def main(args: Array[String]): Unit = {

    Logger.getLogger("org").setLevel(Level.ERROR)

    //val conf = new SparkConf().setAppName("Without Checkpoint")
    val conf = new SparkConf().setAppName("With Checkpoint")
    val sc = new SparkContext(conf)


    val checkpointDirectory = "/tmp"

    sc.setCheckpointDir(checkpointDirectory)   // set checkpoint directory

    val spark = SparkSession.builder.appName("Without Checkpoint").getOrCreate()



    /************************************************************************************************************************************************/
    /*                                                Reading source data begins here                                                               */
    /************************************************************************************************************************************************/


    val readCtryDemoFile = spark.read.option("header", "true").csv("/tmp/Ctry_Demo.csv")



    val readCtryRefFile = spark.read.option("header","true").csv("/tmp/ref_ctry.csv")



    val readCtryCntntFile = spark.read.option("header","true").csv("/tmp/ctry_to_continent.csv")


    /************************************************************************************************************************************************/
    /*                                                Reading source data Completes                                                                 */
    /************************************************************************************************************************************************/


    /************************************************************************************************************************************************/
    /*                                                Transformation begins here                                                                    */
    /************************************************************************************************************************************************/


    /*********************************************************************************/
    /* Join above created dataframes to pull respective columns                      */
    /*********************************************************************************/


    val jnCtryDemoCtryref = readCtryDemoFile.join(readCtryRefFile,Seq("NUM_CTRY_CD"))


    val jnCtryCntnt = jnCtryDemoCtryref.join(readCtryCntntFile,Seq("Alpha_2_CTRY_CD"))





    /*********************************************************************************/
    /* Checkpointing the above created Dataframe to the checkpoint Directory         */
    /*********************************************************************************/

    val jnCtryCntntchkpt = jnCtryCntnt.checkpoint()
    jnCtryCntntchkpt.collect()

    /*********************************************************************************/
    /* Creating multiple outputs based on different aggregation keys                 */
    /*********************************************************************************/

    val aggCntnNm = jnCtryCntntchkpt.groupBy("CONTINENT_NM").agg(sum("POPULATION").as("SUM_POPULATION")).orderBy("CONTINENT_NM")
    aggCntnNm.show()


    val aggCtryNm = jnCtryCntntchkpt.groupBy("Ctry_NM").agg(sum("POPULATION").as("SUM_POPULATION")).orderBy("Ctry_NM")
    aggCtryNm.show()


    val aggCtryCd = jnCtryCntntchkpt.groupBy("NUM_CTRY_CD").agg(sum("POPULATION").as("SUM_POPULATION")).orderBy("NUM_CTRY_CD")
    aggCtryCd.show()

    /************************************************************************************************************************************************/
    /*                                                Transformation begins here                                                                    */
    /************************************************************************************************************************************************/

  }
}

共有1个答案

上官高翰
2023-03-14

我希望我可以消除您对解释检查点的一些疑问,并为您提供一个如何从检查点目录中恢复数据集的示例

检查点主要用于迭代算法和流处理。

在批处理中,我们习惯于容错(缓存或持久)。这意味着,如果节点崩溃,作业不会丢失其状态,并且丢失的任务会在其他工作线程上重新计划。中间结果被写入持久存储(必须像HDFS或云对象存储一样具有容错能力)

维护RDD沿袭(缓存或持久化)提供了弹性,但是当沿袭变得非常长时也会导致问题,例如:迭代算法、流恢复可能非常昂贵,可能会出现堆栈溢出

检查点将数据保存到 HDFS - 提供跨节点的容错存储 - 不保存世系 - 在对 RDD 执行任何操作之前必须执行检查点操作

数据集检查点

Spark SQL的一个特性是截断逻辑查询计划,这对于高度迭代的数据算法特别有用(例如,Spark MLlib使用Spark SQL的Dataset API进行数据操作)。

检查点实际上是Spark Core(Spark SQL用于分布式计算)的一个特性,它允许驱动程序在失败时使用之前计算的分布式计算状态(称为RDD)重新启动。这在Spark Streaming中得到了成功的应用,Spark Streaming是基于RDD API的流处理的Spark模块,现已过时。检查点操作会截断要进行检查点操作的RDD的沿袭。这已经成功地用于像ALS这样的迭代机器学习算法的Spark MLlib中。Spark SQL中的数据集检查点使用检查点来截断被检查的数据集的底层RDD的沿袭。

使用数据集检查点需要指定检查点目录。该目录存储要检查点的RDD的检查点文件。使用SparkContext。setCheckpointDir设置检查点目录的路径。检查点可以是本地的,也可以是可靠的,这定义了检查点目录的可靠性。本地检查点使用执行器存储将检查点文件写入执行器的生命周期,因此被认为是不可靠的。可靠的检查点使用可靠的数据存储,如HadoopHDFS。

编写检查点目录

package tests

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._


/**
  * Checkpointing
  *     - Maintaining RDD lineage provides resilience but can also cause problems when the lineage gets very long
  *         - For example: iterative algorithms, streaming
  *     - Recovery can be very expensive
  *     - Potencial stack overflow
  *     - Checkpointing saves the data to HDFS
  *         - Provides fault-tolerant storage across nodes
  *         - Lineage is not saved
  *         - Must be checkpointed before any actions on the RDD
  */
object WriteCheckPoint {
  val spark = SparkSession
    .builder()
    .appName("WriteCheckPoint")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions","4") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id","WriteCheckPoint") // To silence Metrics warning
    .getOrCreate()

  val sqlContext = spark.sqlContext

  val sc = spark.sparkContext

  // Remember to set the checkpoint directory
  spark.sparkContext.setCheckpointDir("hdfs://localhost/user/cloudera/checkpoint")

  def main(args: Array[String]): Unit = {

    Logger.getRootLogger.setLevel(Level.ERROR)
    // Set org.apache.spark.rdd.ReliableRDDCheckpointData logger to INFO
    // to see what happens while an RDD is checkpointed
    // Let's use log4j API so, you should add import org.apache.log4j.{Level, Logger}
    Logger.getLogger("org.apache.spark.rdd.ReliableRDDCheckpointData").setLevel(Level.INFO)

    try {
      val nums = spark.range(5).withColumn("random", rand()).filter("random > 0.5")
      // Must be checkpointed before any actions on the RDD
      nums.checkpoint
      // Save the schema as it is going to use to reconstruct nums dataset from a RDD
      val schema = nums.schema
      schema.printTreeString()

      nums.show()

      // To have the opportunity to view the web console of Spark: http://localhost:4040/
      println("Type whatever to the console to exit......")
      scala.io.StdIn.readLine()
    } finally {
      sc.stop()
      println("SparkContext stopped")
      spark.stop()
      println("SparkSession stopped")
    }
  }
}

输出

20/06/15 16:42:50 INFO ReliableRDDCheckpointData: Done checkpointing RDD 4 to hdfs://localhost/user/cloudera/checkpoint/607daeca-6ec2-471c-9033-9c4c236880a9/rdd-4, new parent is RDD 5
root
 |-- id: long (nullable = false)
 |-- random: double (nullable = false)

+---+------------------+
| id|            random|
+---+------------------+
|  2|0.9550560942227814|
+---+------------------+

您必须定义几个在包 org.apache.spark 和 org.apache.Spark 中受保护的帮助程序对象.sql

package org.apache.spark

/**
  * SparkContext.checkpointFile is a `protected[spark]` method
  * define a helper object to "escape" the package lock-in
  */
object my {
  import scala.reflect.ClassTag
  import org.apache.spark.rdd.RDD
  def recover[T: ClassTag](sc: SparkContext, path: String): RDD[T] = {
    sc.checkpointFile[T](path)
  }
}
package org.apache.spark.sql

object my2 {
  import org.apache.spark.rdd.RDD
  import org.apache.spark.sql.{DataFrame, SparkSession}
  import org.apache.spark.sql.catalyst.InternalRow
  import org.apache.spark.sql.types.StructType
  def createDataFrame(spark: SparkSession, catalystRows: RDD[InternalRow], schema: StructType): DataFrame = {
    spark.internalCreateDataFrame(catalystRows, schema)
  }
}

读取检查点目录

package tests

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{DoubleType, IntegerType, StringType, StructType}


/**
  * Recovering RDD From Checkpoint Files
  * — SparkContext.checkpointFile Method
  *   SparkContext.checkpointFile(directory: String)
  *   checkpointFile reads (recovers) a RDD from a checkpoint directory.
  * Note SparkContext.checkpointFile is a protected[spark] method
  * so the code to access it has to be in org.apache.spark package.
  * Internally, checkpointFile creates a ReliableCheckpointRDD in a scope.
  */
object ReadingCheckPoint {
  val spark = SparkSession
    .builder()
    .appName("ReadingCheckPoint")
    .master("local[*]")
    .config("spark.sql.shuffle.partitions","4") //Change to a more reasonable default number of partitions for our data
    .config("spark.app.id","ReadingCheckPoint") // To silence Metrics warning
    .getOrCreate()

  val sqlContext = spark.sqlContext

  val sc = spark.sparkContext

  // Make sure to use the same checkpoint directory
  val pathCheckpoint = "hdfs://localhost/user/cloudera/checkpoint/607daeca-6ec2-471c-9033-9c4c236880a9/rdd-4"

  def main(args: Array[String]): Unit = {

    try {

      Logger.getRootLogger.setLevel(Level.ERROR)

      val schema = new StructType()
        .add("field1",IntegerType)
        .add("field2",DoubleType)

      import org.apache.spark.my
      import org.apache.spark.sql.catalyst.InternalRow
      val numsRddRecovered = my.recover[InternalRow](spark.sparkContext, pathCheckpoint) //org.apache.spark.rdd.RDD[org.apache.spark.sql.catalyst.InternalRow]
      numsRddRecovered.foreach(x => println(x.toString))

      // We have to convert RDD[InternalRow] to DataFrame
      import org.apache.spark.sql.my2
      val numsRecovered = my2.createDataFrame(spark, numsRddRecovered, schema)
      numsRecovered.show()


      // To have the opportunity to view the web console of Spark: http://localhost:4040/
      println("Type whatever to the console to exit......")
      scala.io.StdIn.readLine()
    } finally {
      sc.stop()
      println("SparkContext stopped")
      spark.stop()
      println("SparkSession stopped")
    }
  }
}

输出

[0,2,3fee8fd1cc5108ef]
+------+------------------+
|field1|            field2|
+------+------------------+
|     2|0.9550560942227814|
+------+------------------+

您可以通过此链接访问Spark文档:检查点

 类似资料:
  • 使用下面的代码,我们可以从JSON文件的第一个节点访问第一个数据,但是我们想读取下面JSON文件中的所有值。请为该问题提供您的意见。 typedef boost::p roperty_tree::p tree TreeNode;树节点根;boost::p roperty_tree::json_parser::read_json(std::string(igIntObj_XLS.buffer), r

  • 我偶尔收到以下信息: 这发生在具有表的页面中,其中的行延伸到视点之后。 我用来在表中选择随机行的方法 是否有一种方法来检查是否有元素在#Focus之后?

  • 问题内容: 我想知道是否有一种优雅的方法来检查数据库的存在?简而言之,如何测试数据库连接字符串的连接? 谢谢 问题答案: 设置在连接字符串中,并执行: 与设置为数据库的名称。 如果要整体检查连接字符串(而不是独立数据库的存在),请尝试以块形式连接到它。

  • 我有一个和模型,其中有一个名为的数据透视表。在透视表中,当用户按下“参与”按钮时,会保存和。 我已经显示了所有的事件列表。现在我想显示作为参与而不是作为参与而参与的事件。 我试过控制器 刀片文件 我已经在和模型中设置了关系。 这段代码的问题是第二个foreach循环是pivot表循环。假设透视表中有一个数据。我列出了5个事件。由于pivot table foreach(第二个)的缘故,刀片文件中只

  • 问题内容: 嗨,我习惯了SQL,但是我需要从HBase表读取数据。任何帮助都会很棒。一本书,或者只是一些示例代码,可以从表中读取。有人说使用扫描仪可以解决问题,但我不知道如何使用。 问题答案: 从网站:

  • 问题内容: 我正在使用PIL。如何将图片的EXIF数据转换为字典? 问题答案: 您可以使用PIL映像的受保护方法。 这应该给您一个由EXIF数字标签索引的字典。如果您希望字典由实际的EXIF标记名称字符串索引,请尝试以下操作:

  • 问题内容: 当存储一些文档时,它应该存储不存在的文档并忽略其余文档(应该在应用程序级别完成,也许检查文档的ID是否已经存在,等等?) 问题答案: 这是文档中说明的内容: 操作类型 索引操作还接受可用于强制执行创建操作的op_type,从而允许“如果不存在”行为。使用create时,如果索引中已经存在具有该ID的文档,则索引操作将失败。 这是使用op_type参数的示例: 指定create的另一个选

  • 问题内容: 我正在使用Heroku托管一个简单的Ruby on Rails测试应用程序,以了解如何使用Redis。我正在使用RedisCloud作为我的Redis服务提供商。在本地,我可以使用Redis CLI检查Redis数据库,但是如何检查Heroku应用程序正在使用的RedisCloud DB中的数据? RedisCloud提供了一个显示状态的仪表板,但不显示实际数据。另外,我尝试使用Red