当前位置: 首页 > 知识库问答 >
问题:

无法将broker list参数从Scala传递到Kafka:属性Bootstrap.Servers无效

刘德义
2023-03-14
    null

这意味着我应该将代理列表(端口7072)从Scala传递到远程Kafka,因为否则它将尝试使用默认端口。

问题是,根据日志,远程计算机无法识别参数bootstrap.servers。我还尝试将此参数重命名为metadata.broker.listbroker.listlisteners,但日志中始终出现相同的错误属性bootstrap.servers无效,然后默认使用端口9092(显然不使用消息)。

在POM文件中,我对Kafka和Spark使用了以下依赖关系:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.10</artifactId>
    <version>1.6.2</version>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.10</artifactId>
    <version>1.6.2</version>
</dependency>

这是我的Scala代码(如果我使用自己安装在Amazon Cloud中的Kafka,在那里我有EMR机器(在那里我有用于Kafka的端口9092)),它绝对可以工作):

    val testTopicMap = testTopic.split(",").map((_, kafkaNumThreads.toInt)).toMap

   val kafkaParams = Map[String, String](
      "broker.list" -> "XXX.XX.XXX.XX:7072",
      "zookeeper.connect" -> "XXX.XX.XXX.XX:2181",
      "group.id" -> "test",
      "zookeeper.connection.timeout.ms" -> "10000",
      "auto.offset.reset" -> "smallest")

    val testEvents: DStream[String] =
      KafkaUtils
        .createStream[String, String, StringDecoder, StringDecoder](
        ssc,
        kafkaParams,
        testTopicMap,
        StorageLevel.MEMORY_AND_DISK_SER_2
      ).map(_._2)

我正在阅读这篇文档,但看起来我所做的一切都是正确的。我应该使用其他一些Kafka客户端API(其他Maven依赖项)吗?

更新1:

val testTopicMap = testTopic.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> "XXX.XX.XXX.XX:7072,XXX.XX.XXX.XX:7072,XXX.XX.XXX.XX:7072","bootstrap.servers" -> "XXX.XX.XXX.XX:7072,XXX.XX.XXX.XX:7072,XXX.XX.XXX.XX:7072",
                                      "auto.offset.reset" -> "smallest")
val testEvents = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, testTopicMap).map(_._2)

testEvents.print()

17/01/02 12:23:15 ERROR ApplicationMaster: User class threw exception: org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.

共有1个答案

商正浩
2023-03-14

我想我最近遇到了同样的问题(EOFException),原因是Kafka版本不匹配。

如果我看一下https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka2.10/1.6.2,那么kafka流版本的编译时依赖关系是0.8,而您使用的是0.10。

据我所知,0.9已经与0.8不兼容了。您可以尝试设置本地0.8或0.9代理并尝试连接吗?

 类似资料:
  • 我50%的希望渺茫:P.我(一开始)试图从一个线程的代码中更新GUI,但得到了NullException。在阅读了一段时间后,我了解到线程淹没了JavaFX应用程序窗口,为了从线程更新GUI,我需要使用以下代码: 所以我确实使用了这个代码,它确实允许我编辑我的主类的图形用户界面。不过,我的问题很简单。= 如何将父级的public void run()中的参数传递给这个?。例如,字符串tmpStri

  • 问题内容: 我正在使用tkinter构建的GUI的一部分有一个弹出窗口,显示“请在程序运行时等待”。然后完成后,窗户便消失了。我正在使用widget.after命令打开窗口并运行命令。但是,如果我通过函数调用参数,则永远不会出现弹出窗口。这是一个例子: 这样可以很好地运行并完成我想要的操作,在运行备份时弹出窗口,然后在备份后关闭窗口。但是,如果我从widget ..传递了and参数,就像下面的代码

  • 我有一个片段a,包含一个片段B。 当片段A处于onActivityCreated生命周期中时,我想从片段A向片段B传递一个参数(因为我有一个来自viewmodel的数据,该数据此时到达)。 在我的片段B中,我无法得到这个论点。我有一个空异常。 你有办法解决我的问题吗? 这是我的代码 片段A 片段B fragment_a.xml

  • 我正在为webste构建一个功能,用户可以在其中重置密码。他收到一封电子邮件,其中包含URL中生成的令牌。单击此链接时,用户将被发送到/Reset页面。该页的Get方法如下:

  • 我最近开始学习Swift,遇到了一个关于闭包的问题。我试图将开车的参数传递给func travel并收到错误消息:无法将类型'()'的值转换为预期的参数类型'()- 有人能善意地建议吗?赞赏!

  • 问题内容: 我想将值传递给javascript。如果可能的话,我该怎么办?如何在后备bean中接收它们? 问题答案: