当前位置: 首页 > 知识库问答 >
问题:

spark与kafka的集成,spark中的例外-提交一个jar

柴飞扬
2023-03-14

我试着检查在Ubuntu下安装spark是否需要set HADOOP_HOME;但是,没有设置HADOOP_HOME,请仔细检查JAR的参数。

./bin/spark-submit --class "org.apache.spark.examples.streaming.JavaKafkaWordCount" --packages org.apache.spark:spark-streaming-kafka-0-10_2.11:2.1.0 --master local[*] --jars ~/software/JavaKafkaWordCount.jar localhost:2181 test-consumer-group streams-plaintext-input 1

线程“main”java.lang.NullPointerException(位于org.apache.hadoop.fs.path.getName(Path.java:337)(位于org.apache.spark.deploy.deploy.deployls$.downloadfile(deploydencyutils.scala:136)(位于org.apache.spark.deploy.sparksubmit$$anonfun$preparesubmitEnvironment$7)apply(sparksubmit.scala:367)(位于

共有1个答案

汪坚
2023-03-14

您的路径uri jar不可理解参见此dependencyutils.scala#L136

 /**
   * Download a file from the remote to a local temporary directory. If the input path points to
   * a local path, returns it with no operation.
   *
   * @param path A file path from where the files will be downloaded.
   * @param targetDir A temporary directory for which downloaded files.
   * @param sparkConf Spark configuration.
   * @param hadoopConf Hadoop configuration.
   * @param secMgr Spark security manager.
   * @return Path to the local file.
   */
  def downloadFile(
      path: String,
      targetDir: File,
      sparkConf: SparkConf,
      hadoopConf: Configuration,
      secMgr: SecurityManager): String = {
    require(path != null, "path cannot be null.")
    val uri = Utils.resolveURI(path)

    uri.getScheme match {
      case "file" | "local" => path
      case "http" | "https" | "ftp" if Utils.isTesting =>
        // This is only used for SparkSubmitSuite unit test. Instead of downloading file remotely,
        // return a dummy local path instead.
        val file = new File(uri.getPath)
        new File(targetDir, file.getName).toURI.toString
      case _ =>
        val fname = new Path(uri).getName()
        val localFile = Utils.doFetchFile(uri.toString(), targetDir, fname, sparkConf, secMgr,
          hadoopConf)
        localFile.toURI().toString()
    }
  }

在spark-submit中,如下所示更改参数

--jars/fullpath/javakafkaWordcount.jar而不是--jars~/software/javakafkaWordcount.jar

 类似资料:
  • Apache Spark 是一个高性能集群计算框架,其中 Spark Streaming 作为实时批处理组件,因为其简单易上手的特性深受喜爱。在 es-hadoop 2.1.0 版本之后,也新增了对 Spark 的支持,使得结合 ES 和 Spark 成为可能。 目前最新版本的 es-hadoop 是 2.1.0-Beta4。安装如下: wget http://d3kbcqa49mib13.clo

  • 我想把Nifi flowfile发送到Spark,在Spark中做一些转换,然后再把结果发送回Nifi,这样我就可以在Nifi中进行进一步的操作。我不想写flowfile写到数据库或HDFS,然后触发火花作业。我想直接将flowfile发送到Spark,并直接从Spark接收到NIFI的结果。我尝试在Nifi中使用ExecuteSparkInteractive处理器,但我被卡住了。任何例子都是有帮

  • 我在Databricks上阅读下面的博客 https://databricks.com/blog/2015/03/30/improvements-to-kafka-integration-of-spark-streaming.html 在解释spark-kafka集成如何使用WAl接收器的过程时,它说 1.Kafka数据由在火花工作线程/执行程序中运行的Kafka接收器持续接收。这使用了Kafka

  • 正如标题所预期的,我在向docker上运行的spark集群提交spark作业时遇到了一些问题。 我在scala中写了一个非常简单的火花作业,订阅一个kafka服务器,安排一些数据,并将这些数据存储在一个elastichsearch数据库中。 如果我在我的开发环境(Windows/IntelliJ)中从Ide运行spark作业,那么一切都会完美工作。 然后(我一点也不喜欢java),我按照以下说明添

  • 我正在使用火花流和Kafka,我得到了这个错误。 线程“streaming-start”中的异常java.lang.NosuchMethoderror:scala.predef$.arrowassoc(ljava/lang/object;)ljava/lang/object;在org.apache.spark.streaming.kafka010.directkafkainputdstream$$

  • 我试图将Spark应用程序与spring boot集成,但由于Spark core也有jetty server和servlet包,它们与spring boot web starter servlet包冲突。 我已经在下面的帖子中排除了初学者日志记录 谢谢