这意味着我应该将代理列表(端口7072
)从Scala传递到远程Kafka,因为否则它将尝试使用默认端口。
问题是,根据日志,远程计算机无法识别参数bootstrap.servers
。我还尝试将此参数重命名为metadata.broker.list
、broker.list
和listeners
,但日志中始终出现相同的错误属性bootstrap.servers无效
,然后默认使用端口9092
(显然不使用消息)。
在POM文件中,我对Kafka和Spark使用了以下依赖关系:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.6.2</version>
</dependency>
这是我的Scala代码(如果我使用自己安装在Amazon Cloud中的Kafka,在那里我有EMR机器(在那里我有用于Kafka的端口9092
)),它绝对可以工作):
val testTopicMap = testTopic.split(",").map((_, kafkaNumThreads.toInt)).toMap
val kafkaParams = Map[String, String](
"broker.list" -> "XXX.XX.XXX.XX:7072",
"zookeeper.connect" -> "XXX.XX.XXX.XX:2181",
"group.id" -> "test",
"zookeeper.connection.timeout.ms" -> "10000",
"auto.offset.reset" -> "smallest")
val testEvents: DStream[String] =
KafkaUtils
.createStream[String, String, StringDecoder, StringDecoder](
ssc,
kafkaParams,
testTopicMap,
StorageLevel.MEMORY_AND_DISK_SER_2
).map(_._2)
我正在阅读这篇文档,但看起来我所做的一切都是正确的。我应该使用其他一些Kafka客户端API(其他Maven依赖项)吗?
更新1:
val testTopicMap = testTopic.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> "XXX.XX.XXX.XX:7072,XXX.XX.XXX.XX:7072,XXX.XX.XXX.XX:7072","bootstrap.servers" -> "XXX.XX.XXX.XX:7072,XXX.XX.XXX.XX:7072,XXX.XX.XXX.XX:7072",
"auto.offset.reset" -> "smallest")
val testEvents = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, testTopicMap).map(_._2)
testEvents.print()
17/01/02 12:23:15 ERROR ApplicationMaster: User class threw exception: org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
我想我最近遇到了同样的问题(EOFException),原因是Kafka版本不匹配。
如果我看一下https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka2.10/1.6.2,那么kafka流版本的编译时依赖关系是0.8,而您使用的是0.10。
据我所知,0.9已经与0.8不兼容了。您可以尝试设置本地0.8或0.9代理并尝试连接吗?
我50%的希望渺茫:P.我(一开始)试图从一个线程的代码中更新GUI,但得到了NullException。在阅读了一段时间后,我了解到线程淹没了JavaFX应用程序窗口,为了从线程更新GUI,我需要使用以下代码: 所以我确实使用了这个代码,它确实允许我编辑我的主类的图形用户界面。不过,我的问题很简单。= 如何将父级的public void run()中的参数传递给这个?。例如,字符串tmpStri
问题内容: 我正在使用tkinter构建的GUI的一部分有一个弹出窗口,显示“请在程序运行时等待”。然后完成后,窗户便消失了。我正在使用widget.after命令打开窗口并运行命令。但是,如果我通过函数调用参数,则永远不会出现弹出窗口。这是一个例子: 这样可以很好地运行并完成我想要的操作,在运行备份时弹出窗口,然后在备份后关闭窗口。但是,如果我从widget ..传递了and参数,就像下面的代码
我有一个片段a,包含一个片段B。 当片段A处于onActivityCreated生命周期中时,我想从片段A向片段B传递一个参数(因为我有一个来自viewmodel的数据,该数据此时到达)。 在我的片段B中,我无法得到这个论点。我有一个空异常。 你有办法解决我的问题吗? 这是我的代码 片段A 片段B fragment_a.xml
我正在为webste构建一个功能,用户可以在其中重置密码。他收到一封电子邮件,其中包含URL中生成的令牌。单击此链接时,用户将被发送到/Reset页面。该页的Get方法如下:
我最近开始学习Swift,遇到了一个关于闭包的问题。我试图将开车的参数传递给func travel并收到错误消息:无法将类型'()'的值转换为预期的参数类型'()- 有人能善意地建议吗?赞赏!
问题内容: 我想将值传递给javascript。如果可能的话,我该怎么办?如何在后备bean中接收它们? 问题答案: