当前位置: 首页 > 知识库问答 >
问题:

无法在docker上的spark群集上提交spark作业

慕容聪
2023-03-14

正如标题所预期的,我在向docker上运行的spark集群提交spark作业时遇到了一些问题。

我在scala中写了一个非常简单的火花作业,订阅一个kafka服务器,安排一些数据,并将这些数据存储在一个elastichsearch数据库中。

如果我在我的开发环境(Windows/IntelliJ)中从Ide运行spark作业,那么一切都会完美工作。

然后(我一点也不喜欢java),我按照以下说明添加了一个火花集群:https://github.com/big-data-europe/docker-spark

当查看其仪表板时,群集看起来很健康。我创建了一个由一个主程序和一个辅助程序组成的集群。

这是我用scala写的工作:

import java.io.Serializable

import org.apache.commons.codec.StringDecoder
import org.apache.hadoop.fs.LocalFileSystem
import org.apache.hadoop.hdfs.DistributedFileSystem
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark
import org.apache.spark.SparkConf
import org.elasticsearch.spark._
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.util.parsing.json.JSON

object KafkaConsumer {
  def main(args: Array[String]): Unit = {

    val sc = new SparkConf()
      .setMaster("local[*]")
      .setAppName("Elastic Search Indexer App")

    sc.set("es.index.auto.create", "true")

    val elasticResource = "iot/demo"
    val ssc = new StreamingContext(sc, Seconds(10))

    //ssc.checkpoint("./checkpoint")

    val kafkaParams = Map(
      "bootstrap.servers" -> "kafka:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "auto.offset.reset" -> "earliest",
      "group.id" -> "group0"
    )

    val topics = List("test")
    val stream = KafkaUtils.createDirectStream(
      ssc,
      PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](topics.distinct, kafkaParams)
    )

    case class message(key: String, timestamp: Long, payload: Object)
    val rdds = stream.map(record => message(record.key, record.timestamp, record.value))

    val es_config: scala.collection.mutable.Map[String, String] =
      scala.collection.mutable.Map(
        "pushdown" -> "true",
        "es.nodes" -> "http://docker-host",
        "es.nodes.wan.only" -> "true",
        "es.resource" -> elasticResource,
        "es.ingest.pipeline" -> "iot-test-pipeline"
      )


    rdds.foreachRDD { rdd =>
      rdd.saveToEs(es_config)
      rdd.collect().foreach(println)
    }

    ssc.start()
    ssc.awaitTermination()
  }
}

要将其提交到集群,我执行了以下操作:

  • 使用“sbt汇编”插件,我创建了一个包含所有依赖项的胖jar文件

然后提交:

请提交。cmd——Kafka消费类——硕士spark://docker-host:7077/c/Users/shams/Documents/Appunti/iot demo app/spark streaming/target/scala-2.11/spark-streaming-assembly-1.0。罐子

但我有一个错误:

19/02/27 11:18:12警告NativeCodeLoader:无法为您的平台加载本机hadoop库。。。在线程“main”java中使用内置java类(如果适用)。木卫一。IOException:在组织中没有scheme:C的文件系统。阿帕奇。hadoop。财政司司长。文件系统。位于org的getFileSystemClass(FileSystem.java:2660)。阿帕奇。hadoop。财政司司长。文件系统。在org上创建文件系统(FileSystem.java:2667)。阿帕奇。hadoop。财政司司长。文件系统。在org上访问$200(FileSystem.java:94)。阿帕奇。hadoop。财政司司长。文件系统$Cache。位于org的getInternal(FileSystem.java:2703)。阿帕奇。hadoop。财政司司长。文件系统$Cache。在org上获取(FileSystem.java:2685)。阿帕奇。hadoop。财政司司长。文件系统。在org上获取(FileSystem.java:373)。阿帕奇。火花util。Utils$。getHadoopFileSystem(Utils.scala:1897)位于org。阿帕奇。火花util。Utils$。doFetchFile(Utils.scala:694)位于org。阿帕奇。火花部署DependencyUtils$。在org下载文件(DependencyUtils.scala:135)。阿帕奇。火花部署SparkSubmit$$anonfun$doprepareResubitenEnvironment$7。在org上应用(SparkSubmit.scala:416)。阿帕奇。火花部署SparkSubmit$$anonfun$doprepareResubitenEnvironment$7。在scala上应用(SparkSubmit.scala:416)。选项地图(Option.scala:146)位于org。阿帕奇。火花部署SparkSubmit$。doprepareResubitenEnvironment(SparkSubmit.scala:415)位于org。阿帕奇。火花部署SparkSubmit$。在org上准备submitenvironment(SparkSubmit.scala:250)。阿帕奇。火花部署SparkSubmit$。在org上提交(SparkSubmit.scala:171)。阿帕奇。火花部署SparkSubmit$。main(SparkSubmit.scala:137)位于org。阿帕奇。火花部署SparkSubmit。main(SparkSubmit.scala)

经过一天的尝试,我还没有解决,我不明白在我的工作中想要访问某个体积,似乎是由错误所说

可能与警告信息有关?那么,我应该如何编辑我的脚本来避免这个问题?

提前谢谢。

更新:

问题似乎与我的代码无关,因为我试图提交一个简单的你好世界应用程序以同样的方式编译,但我有同样的问题。

共有1个答案

龙令
2023-03-14

经过多次尝试和研究,我得出结论,问题可能是我正在使用pc上的windows版本spark submit提交作业。

我不能完全理解,但现在,将文件直接移动到主节点和工作节点,我可以从那里提交它。

容器上的第一份副本:

docker cp spark-streaming-assembly-1.0.jar 21b43cb2e698:/spark/bin

然后我执行(在 /spark/bin文件夹中):

./spark-submit --class KafkaConsumer --deploy-mode cluster --master spark://spark-master:7077 spark-streaming-assembly-1.0.jar

这就是我目前找到的解决办法。

 类似资料:
  • 我被困在: 在我得到这个之前: 当我签出应用程序跟踪页面时,我在stderr上得到以下信息: 我对这一切都很陌生,也许我的推理有缺陷,任何投入或建议都会有所帮助。

  • 在k8s集群中。如何配置zeppelin在现有spark集群中运行spark作业,而不是旋转一个新的Pod? 我有一个k8s集群正在运行,我想在其中运行与齐柏林飞艇的火花。 Spark使用官方的Bitnami/Spark helm chart(v3.0.0)进行部署。我有一个主舱和两个工人舱运转良好,一切都很好。 短伪DockerFile: 我稍微修改了。(Image,imagePullSecre

  • null sbin/start-slave.sh spark://c96___37fb:7077--用于并置从机的端口7078 sbin/start-slave.sh spark://masternodeip:7077--其他两个从机的端口7078 前面引用的所有端口都从nodeMaster重定向到相应的Docker。 因此,webUI向我显示,我的集群有3个连接的节点,不幸的是,当运行时,只有并

  • 我试图设置一个火花3光泽使用两个系统运行Windows10。我可以开始用master ,它在启动主程序

  • 我已经在我的Windows7机器上设置了一个本地spark集群(一个主节点和辅助节点)。我已经创建了一个简单的scala脚本,我用sbt构建了这个脚本,并尝试用Spark-Submit运行这个脚本。请参阅以下资源 Scala代码: 现在,我用sbt构建并打包scala代码,并将其打包到一个JAR中。我的build.sbt文件如下所示 它创建一个jar,我使用spark submit命令提交它,如下

  • 我是一名spark/纱线新手,在提交纱线集群上的spark作业时遇到exitCode=13。当spark作业在本地模式下运行时,一切正常。 我使用的命令是: Spark错误日志: