当前位置: 首页 > 知识库问答 >
问题:

ClassNotFoundException火花-提交scala

陶永望
2023-03-14

嗨,我正在尝试生成Salt示例的输出,但没有使用文档中提到的docker。我找到了帮助生成输出的scala代码,这是main.scala。我将main.scala修改为一个方便的main.scala,

package BinExTest
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row

import software.uncharted.salt.core.projection.numeric._
import software.uncharted.salt.core.generation.request._
import software.uncharted.salt.core.generation.Series
import software.uncharted.salt.core.generation.TileGenerator
import software.uncharted.salt.core.generation.output.SeriesData
import software.uncharted.salt.core.analytic.numeric._

import java.io._

import scala.util.parsing.json.JSONObject

object Main {

  // Defines the tile size in both x and y bin dimensions
  val tileSize = 256

  // Defines the output layer name
  val layerName = "pickups"

  // Creates and returns an Array of Double values encoded as 64bit Integers
  def createByteBuffer(tile: SeriesData[(Int, Int, Int), (Int, Int), Double, (Double, Double)]): Array[Byte] = {
    val byteArray = new Array[Byte](tileSize * tileSize * 8)
    var j = 0
    tile.bins.foreach(b => {
      val data = java.lang.Double.doubleToLongBits(b)
      for (i <- 0 to 7) {
        byteArray(j) = ((data >> (i * 8)) & 0xff).asInstanceOf[Byte]
        j += 1
      }
    })
    byteArray
  }

  def main(args: Array[String]): Unit = {

    val jarFile = "/home/kesava/Studies/BinExTest/BinExTest.jar"; 
    val inputPath = "/home/kesava/Downloads/taxi_micro.csv"
    val outputPath = "/home/kesava/SoftWares/salt/salt-examples/bin-example/Output"

    val conf = new SparkConf().setAppName("salt-bin-example").setJars(Array(jarFile))
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    sqlContext.read.format("com.databricks.spark.csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load(s"file://$inputPath")
      .registerTempTable("taxi_micro")

    // Construct an RDD of Rows containing only the fields we need. Cache the result
    val input = sqlContext.sql("select pickup_lon, pickup_lat from taxi_micro")
      .rdd.cache()

    // Given an input row, return pickup longitude, latitude as a tuple
    val pickupExtractor = (r: Row) => {
      if (r.isNullAt(0) || r.isNullAt(1)) {
        None
      } else {
        Some((r.getDouble(0), r.getDouble(1)))
      }
    }

    // Tile Generator object, which houses the generation logic
    val gen = TileGenerator(sc)

    // Break levels into batches. Process several higher levels at once because the
    // number of tile outputs is quite low. Lower levels done individually due to high tile counts.
    val levelBatches = List(List(0, 1, 2, 3, 4, 5, 6, 7, 8), List(9, 10, 11), List(12), List(13), List(14))

    // Iterate over sets of levels to generate.
    val levelMeta = levelBatches.map(level => {

      println("------------------------------")
      println(s"Generating level $level")
      println("------------------------------")

      // Construct the definition of the tiling jobs: pickups
      val pickups = new Series((tileSize - 1, tileSize - 1),
        pickupExtractor,
        new MercatorProjection(level),
        (r: Row) => Some(1),
        CountAggregator,
        Some(MinMaxAggregator))

      // Create a request for all tiles on these levels, generate
      val request = new TileLevelRequest(level, (coord: (Int, Int, Int)) => coord._1)
      val rdd = gen.generate(input, pickups, request)

      // Translate RDD of Tiles to RDD of (coordinate,byte array), collect to master for serialization
      val output = rdd
        .map(s => pickups(s).get)
        .map(tile => {
          // Return tuples of tile coordinate, byte array
          (tile.coords, createByteBuffer(tile))
        })
        .collect()

      // Save byte files to local filesystem
      output.foreach(tile => {
        val coord = tile._1
        val byteArray = tile._2
        val limit = (1 << coord._1) - 1
        // Use standard TMS path structure and file naming
        val file = new File(s"$outputPath/$layerName/${coord._1}/${coord._2}/${limit - coord._3}.bins")
        file.getParentFile.mkdirs()
        val output = new FileOutputStream(file)
        output.write(byteArray)
        output.close()
      })

      // Create map from each level to min / max values.
      rdd
        .map(s => pickups(s).get)
        .map(t => (t.coords._1.toString, t.tileMeta.get))
        .reduceByKey((l, r) => {
          (Math.min(l._1, r._1), Math.max(l._2, r._2))
        })
        .mapValues(minMax => {
          JSONObject(Map(
            "min" -> minMax._1,
            "max" -> minMax._2
          ))
        })
        .collect()
        .toMap
    })

    // Flatten array of maps into a single map
    val levelInfoJSON = JSONObject(levelMeta.reduce(_ ++ _)).toString()
    // Save level metadata to filesystem
    val pw = new PrintWriter(s"$outputPath/$layerName/meta.json")
    pw.write(levelInfoJSON)
    pw.close()

  }
}

我为这个scala创建了一个单独的文件夹,

calac-cp“lib/salt.jar:lib/spark.jar”main.scala

这已成功运行并在文件夹BinexTest下生成类。

现在,项目的build.gradle有以下几行代码,用于标识这是有助于生成输出数据集的命令,

task run(overwrite: true, type: Exec, dependsOn: [assemble]) {
  executable = 'spark-submit'
  args = ["--class","software.uncharted.salt.examples.bin.Main","/opt/salt/build/libs/salt-bin-example-${version}.jar", "/opt/data/taxi_one_day.csv", "/opt/output"]
}

spark-submit--类binextest.main lib/salt.jar

当我这样做时,我会得到以下错误,

java.lang.ClassNotFoundException:Main.BinexTest在java.net.URLClassLoader$1上运行(URLClassLoader.java:366)在java.net.URLClassLoader$1上运行(URLClassLoader.java:355)在java.security.AccessController.DOPrivileged(本机方法)在java.net.URLClassLoader.findClass(本机方法)在java.net.URLClassLoader.java:354)在java.lang.ClassLoader.java:354)

有人能帮我一下吗?我对此完全陌生,走到这一步完全是靠探索。

spark-submit--类binextest.main--jars“binextest.jar”“lib/salt.jar”

我让ClassNotFoundException生成新的错误,

知道怎么回事吗?

这是因为Scala2.11没有提到的类吗?

spark-submit--类“binextest.main”--jar“binextest.jar,lib/scala210.jar”“lib/salt.jar”

共有1个答案

吴涵育
2023-03-14

要运行Spark作业,它需要在组成Spark集群的不同节点上自我复制它的代码。它通过将jar文件复制到其他节点来做到这一点。

这意味着您需要确保您的类文件打包在一个.jar文件中。在我的典型解决方案中,我将构建一个Uber jar,将类文件和依赖的jar文件打包在一个.jar文件中。为此,我使用了Maven Shade插件。这不一定是您的解决方案,但至少您应该从生成的类中构建一个.jar文件。

要手动提供附加的jar文件,您需要使用--jars选项添加这些文件,该选项需要一个逗号分隔的列表。

你的问题的第二部分已经在另一个线程上得到了回答。

 类似资料:
  • 一些脚本在工作时什么也不做,当我手动运行它们时,其中一个失败了,出现了以下消息: 错误SparkUI:未能绑定SparkUI java.net.bindexception:地址已在使用:服务“SparkUI”在重试16次后失败! 所以我想知道是否有一种特定的方法来并行运行脚本?

  • 我可以从IDE(远程)编程运行这个程序吗?我使用Scala-IDE。我寻找一些代码来遵循,但仍然没有找到合适的 我的环境:-Cloudera 5.8.2[OS redhat 7.2,kerberos 5,Spark2.1,scala 2.11]-Windows 7

  • 我正在从我的开发机器上启动spark-submit。 根据在YARN文档上运行Spark,我应该在env var或上为hadoop集群配置提供一个路径。这就是它变得棘手的地方:如果我将任务发送到远程YARN服务,为什么这些文件夹必须存在于我的本地机器上?这是否意味着spark-submit必须位于集群内部,因此我不能远程启动spark任务?如果没有,我应该用什么填充这些文件夹?我应该从任务管理器服

  • 当我使用spark-submit with master yarn和deploy-mode cluster提交spark作业时,它不会打印/返回任何applicationId,一旦作业完成,我必须手动检查MapReduce jobHistory或spark HistoryServer来获取作业细节。 我的集群被许多用户使用,在jobHistory/HistoryServer中找到我的作业需要很多时

  • 我试图通过以下命令向CDH纱线集群提交spark作业 我试过几种组合,但都不起作用。。。现在,我的本地/root以及HDFS/user/root/lib中都有所有poi JAR,因此我尝试了以下方法 如何将JAR分发到所有集群节点?因为上面这些都不起作用,作业仍然无法引用该类,因为我一直收到相同的错误: 同样的命令也适用于“--master本地”,但没有指定--jar,因为我已经将我的jar复制到

  • 这是我用scala编写的代码 使用sbt包编译时收到的错误是hereimage 这是我的build.sbt档案 名称:=“OV” 规模厌恶:=“2.11.8” // https://mvnrepository.com/artifact/org.apache.spark/spark-corelibraryDependencies=“org.apache.spark”%%“spark核心”%%“2.3