当前位置: 首页 > 知识库问答 >
问题:

Kafka:=线程“main”org.apache.spark.sql.streaming.streamingQueryException:未找到连接项

丁豪
2023-03-14

2)线程中的异常“流执行线程for[id=c6426655-446f-4306-91ba-d78e68e05c15,runId=420382C1-8558-45A1-B26D-F6299044FA04]”java.lang.exceptioninInitializerError

3)线程“main”org.apache.spark.sql.streaming.StreamingQueryException:null

sbt依赖性

//https://mvnrepository.com/artifact/org.apache.kafka/kafka libraryDependencies+=“org.apache.kafka”%%“kafka”%“2.1.1”

//https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients libraryDependencies+=“org.apache.kafka”%“kafka-clients”%“2.1.1”

//https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams libraryDependencies+=“org.apache.kafka”%“kafka-streams”%“2.1.1”

//https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 libraryDependencies+=“org.apache.spark”%%“spark-sql-kafka-0-10”%“2.2.3”

//https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams-scala libraryDependencies+=“org.apache.kafka”%%“kafka-streams-scala”%“2.1.1”

import java.sql.Timestamp

import org.apache.spark.sql.SparkSession


object demo1 {

  def main(args: Array[String]): Unit = {

    System.setProperty("hadoop.home.dir","c:\\hadoop\\")

    val spark: SparkSession = SparkSession.builder
      .appName("My Spark Application")
      .master("local[*]")
      .config("spark.sql.warehouse.dir", "file:///C:/temp") // Necessary to work around a Windows bug in Spark 2.0.0; omit if you're not on Windows.
      .config("spark.sql.streaming.checkpointLocation", "file:///C:/checkpoint")
      .getOrCreate

    spark.sparkContext.setLogLevel("ERROR")

    spark.conf.set("spark,sqlshuffle.partations","2")

    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "162.244.80.189:9092")
      .option("startingOffsets", "earliest")
      .option("group.id","test1")
      .option("subscribe", "demo11")
      .load()

    import spark.implicits._


    val dsStruc = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "timestamp").as[(String, String, Timestamp)]


    val abc = df.writeStream
    .outputMode("append")
    .format("console")
    .start().awaitTermination()

    df.show()

共有1个答案

长孙瑞
2023-03-14

我也有同样的问题。我使用了错误的库spark-sql-kafka库版本(2.2.0而不是2.3.0)。我的成功配置是:

提供org.apache.spark spark-core2.11 2.3.0

Spark Spark-SQL2.11 2.3.0

 类似资料:
  • 我已经做了所有的尝试,但都不起作用,这是我的问题,我试图将一些文件从一台机器发送到另一台机器,想法是在目标机器上创建一个servlet,在服务器上运行它,并等待任何客户机连接,客户机和servlet代码在lan网络类型中完美地工作,但当涉及到wan网络类型时,我得到了这个例外 线程“main”org.apache.http.conn.HTTPhostConnectException:连接到192.

  • 问题内容: 我正在用Cucumber开发我的Selenium-JVM框架,并且在运行我的第一个功能时出现错误。 请帮忙。 我如何启动功能- 右键单击功能文件 选择运行方式->黄瓜功能 立即例外- 我在代码中拥有的- Launcher.java- 功能文件- 依赖项列表已添加到列表- 我的JVM-1.7 项目中只有这么多。 请帮忙。 问题答案: 确保为Maven项目添加以下依赖项: 您可以将版本替换

  • 我正在尝试使用Kafka Connect Elasticsearch连接器,但没有成功。它正在崩溃,并出现以下错误: 我已经在kafka子文件夹中解压了插件的编译版本,并在connect-standalone.properties中有以下代码行: 我可以看到该文件夹中的各种连接器,但Kafka Connect不加载它们;但它确实加载了标准连接器,如下所示: 如何正确注册连接器?

  • 我对Kafka connect很陌生。我想把我的信息从Kafka主题推到弹性搜索。在阅读了可用的文档之后,我从发行版tar下载并编译了弹性搜索接收器。拉链(https://github.com/confluentinc/kafka-connect-elasticsearch/releases) 我添加了弹性搜索属性文件,并将上述jar包含在类路径中。当我在独立模式下运行kafka connect时

  • 肯定有一条线,但我不明白为什么扫描仪看不到… 以下是文件的开头: 下面是我获取它的代码: 但我得到了错误: book1_enc的文件是我的LZW编码算法的输出。当我将文件传递给我的解码器时,我希望解码器知道字典的大小,在这种情况下是256...感谢您的阅读...