嗨,我正在尝试生成Salt示例的输出,但没有使用文档中提到的docker。我找到了帮助生成输出的scala代码,这是main.scala。我将main.scala修改为一个方便的main.scala,
package BinExTest
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Row
import software.uncharted.salt.core.projection.numeric._
import software.uncharted.salt.core.generation.request._
import software.uncharted.salt.core.generation.Series
import software.uncharted.salt.core.generation.TileGenerator
import software.uncharted.salt.core.generation.output.SeriesData
import software.uncharted.salt.core.analytic.numeric._
import java.io._
import scala.util.parsing.json.JSONObject
object Main {
// Defines the tile size in both x and y bin dimensions
val tileSize = 256
// Defines the output layer name
val layerName = "pickups"
// Creates and returns an Array of Double values encoded as 64bit Integers
def createByteBuffer(tile: SeriesData[(Int, Int, Int), (Int, Int), Double, (Double, Double)]): Array[Byte] = {
val byteArray = new Array[Byte](tileSize * tileSize * 8)
var j = 0
tile.bins.foreach(b => {
val data = java.lang.Double.doubleToLongBits(b)
for (i <- 0 to 7) {
byteArray(j) = ((data >> (i * 8)) & 0xff).asInstanceOf[Byte]
j += 1
}
})
byteArray
}
def main(args: Array[String]): Unit = {
val jarFile = "/home/kesava/Studies/BinExTest/BinExTest.jar";
val inputPath = "/home/kesava/Downloads/taxi_micro.csv"
val outputPath = "/home/kesava/SoftWares/salt/salt-examples/bin-example/Output"
val conf = new SparkConf().setAppName("salt-bin-example").setJars(Array(jarFile))
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
sqlContext.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load(s"file://$inputPath")
.registerTempTable("taxi_micro")
// Construct an RDD of Rows containing only the fields we need. Cache the result
val input = sqlContext.sql("select pickup_lon, pickup_lat from taxi_micro")
.rdd.cache()
// Given an input row, return pickup longitude, latitude as a tuple
val pickupExtractor = (r: Row) => {
if (r.isNullAt(0) || r.isNullAt(1)) {
None
} else {
Some((r.getDouble(0), r.getDouble(1)))
}
}
// Tile Generator object, which houses the generation logic
val gen = TileGenerator(sc)
// Break levels into batches. Process several higher levels at once because the
// number of tile outputs is quite low. Lower levels done individually due to high tile counts.
val levelBatches = List(List(0, 1, 2, 3, 4, 5, 6, 7, 8), List(9, 10, 11), List(12), List(13), List(14))
// Iterate over sets of levels to generate.
val levelMeta = levelBatches.map(level => {
println("------------------------------")
println(s"Generating level $level")
println("------------------------------")
// Construct the definition of the tiling jobs: pickups
val pickups = new Series((tileSize - 1, tileSize - 1),
pickupExtractor,
new MercatorProjection(level),
(r: Row) => Some(1),
CountAggregator,
Some(MinMaxAggregator))
// Create a request for all tiles on these levels, generate
val request = new TileLevelRequest(level, (coord: (Int, Int, Int)) => coord._1)
val rdd = gen.generate(input, pickups, request)
// Translate RDD of Tiles to RDD of (coordinate,byte array), collect to master for serialization
val output = rdd
.map(s => pickups(s).get)
.map(tile => {
// Return tuples of tile coordinate, byte array
(tile.coords, createByteBuffer(tile))
})
.collect()
// Save byte files to local filesystem
output.foreach(tile => {
val coord = tile._1
val byteArray = tile._2
val limit = (1 << coord._1) - 1
// Use standard TMS path structure and file naming
val file = new File(s"$outputPath/$layerName/${coord._1}/${coord._2}/${limit - coord._3}.bins")
file.getParentFile.mkdirs()
val output = new FileOutputStream(file)
output.write(byteArray)
output.close()
})
// Create map from each level to min / max values.
rdd
.map(s => pickups(s).get)
.map(t => (t.coords._1.toString, t.tileMeta.get))
.reduceByKey((l, r) => {
(Math.min(l._1, r._1), Math.max(l._2, r._2))
})
.mapValues(minMax => {
JSONObject(Map(
"min" -> minMax._1,
"max" -> minMax._2
))
})
.collect()
.toMap
})
// Flatten array of maps into a single map
val levelInfoJSON = JSONObject(levelMeta.reduce(_ ++ _)).toString()
// Save level metadata to filesystem
val pw = new PrintWriter(s"$outputPath/$layerName/meta.json")
pw.write(levelInfoJSON)
pw.close()
}
}
我为这个scala创建了一个单独的文件夹,
calac-cp“lib/salt.jar:lib/spark.jar”main.scala
这已成功运行并在文件夹BinexTest下生成类。
现在,项目的build.gradle有以下几行代码,用于标识这是有助于生成输出数据集的命令,
task run(overwrite: true, type: Exec, dependsOn: [assemble]) {
executable = 'spark-submit'
args = ["--class","software.uncharted.salt.examples.bin.Main","/opt/salt/build/libs/salt-bin-example-${version}.jar", "/opt/data/taxi_one_day.csv", "/opt/output"]
}
spark-submit--类binextest.main lib/salt.jar
当我这样做时,我会得到以下错误,
java.lang.ClassNotFoundException:Main.BinexTest在java.net.URLClassLoader$1上运行(URLClassLoader.java:366)在java.net.URLClassLoader$1上运行(URLClassLoader.java:355)在java.security.AccessController.DOPrivileged(本机方法)在java.net.URLClassLoader.findClass(本机方法)在java.net.URLClassLoader.java:354)在java.lang.ClassLoader.java:354)
有人能帮我一下吗?我对此完全陌生,走到这一步完全是靠探索。
spark-submit--类binextest.main--jars“binextest.jar”“lib/salt.jar”
我让ClassNotFoundException生成新的错误,
知道怎么回事吗?
这是因为Scala2.11没有提到的类吗?
spark-submit--类“binextest.main”--jar“binextest.jar,lib/scala210.jar”“lib/salt.jar”
要运行Spark作业,它需要在组成Spark集群的不同节点上自我复制它的代码。它通过将jar文件复制到其他节点来做到这一点。
这意味着您需要确保您的类文件打包在一个.jar文件中。在我的典型解决方案中,我将构建一个Uber jar,将类文件和依赖的jar文件打包在一个.jar文件中。为此,我使用了Maven Shade插件。这不一定是您的解决方案,但至少您应该从生成的类中构建一个.jar文件。
要手动提供附加的jar文件,您需要使用--jars
选项添加这些文件,该选项需要一个逗号分隔的列表。
你的问题的第二部分已经在另一个线程上得到了回答。
一些脚本在工作时什么也不做,当我手动运行它们时,其中一个失败了,出现了以下消息: 错误SparkUI:未能绑定SparkUI java.net.bindexception:地址已在使用:服务“SparkUI”在重试16次后失败! 所以我想知道是否有一种特定的方法来并行运行脚本?
我可以从IDE(远程)编程运行这个程序吗?我使用Scala-IDE。我寻找一些代码来遵循,但仍然没有找到合适的 我的环境:-Cloudera 5.8.2[OS redhat 7.2,kerberos 5,Spark2.1,scala 2.11]-Windows 7
我正在从我的开发机器上启动spark-submit。 根据在YARN文档上运行Spark,我应该在env var或上为hadoop集群配置提供一个路径。这就是它变得棘手的地方:如果我将任务发送到远程YARN服务,为什么这些文件夹必须存在于我的本地机器上?这是否意味着spark-submit必须位于集群内部,因此我不能远程启动spark任务?如果没有,我应该用什么填充这些文件夹?我应该从任务管理器服
当我使用spark-submit with master yarn和deploy-mode cluster提交spark作业时,它不会打印/返回任何applicationId,一旦作业完成,我必须手动检查MapReduce jobHistory或spark HistoryServer来获取作业细节。 我的集群被许多用户使用,在jobHistory/HistoryServer中找到我的作业需要很多时
我试图通过以下命令向CDH纱线集群提交spark作业 我试过几种组合,但都不起作用。。。现在,我的本地/root以及HDFS/user/root/lib中都有所有poi JAR,因此我尝试了以下方法 如何将JAR分发到所有集群节点?因为上面这些都不起作用,作业仍然无法引用该类,因为我一直收到相同的错误: 同样的命令也适用于“--master本地”,但没有指定--jar,因为我已经将我的jar复制到
这是我用scala编写的代码 使用sbt包编译时收到的错误是hereimage 这是我的build.sbt档案 名称:=“OV” 规模厌恶:=“2.11.8” // https://mvnrepository.com/artifact/org.apache.spark/spark-corelibraryDependencies=“org.apache.spark”%%“spark核心”%%“2.3