当前位置: 首页 > 知识库问答 >
问题:

scala中的Flink Kafka程序给出超时错误。阿帕奇。Kafka。常见的错误。TimeoutException:在60000毫秒后更新元数据失败

商辰钊
2023-03-14

我正在编写一个Flink-Kafka集成程序,如下所示,但Kafka出现超时错误:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, 
FlinkKafkaProducer010}
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import java.util.Properties

object StreamKafkaProducer {

def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("serializer.class", "kafka.serializer.StringEncoder")


val stream: DataStream[String] =env.fromElements(
  ("Adam"),
  ("Sarah"))

val kafkaProducer = new FlinkKafkaProducer010[String](
  "localhost:9092",
  "output",
  new SimpleStringSchema
)
// write data into Kafka
stream.addSink(kafkaProducer)

env.execute("Flink kafka integration  ")
}
}

从终端我可以看到Kafka和zookeeper正在运行,但当我从Intellij运行上面的程序时,它显示了这个错误:

C:\Users\amdass\workspace\flink-project-master>sbt run
Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=256m; 
support was removed in 8.0
[info] Loading project definition from C:\Users\amdass\workspace\flink-
project-master\project
[info] Set current project to Flink Project (in build 
file:/C:/Users/amdass/workspace/flink-project-master/)
[info] Compiling 1 Scala source to C:\Users\amdass\workspace\flink-project-
master\target\scala-2.11\classes...
[info] Running org.example.StreamKafkaProducer
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further 
details.
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-563113020] 
with leader session id 5a637740-5c73-4f69-a19e-c8ef7141efa1.
12/15/2017 14:41:49     Job execution switched to status RUNNING.
12/15/2017 14:41:49     Source: Collection Source(1/1) switched to SCHEDULED
12/15/2017 14:41:49     Sink: Unnamed(1/4) switched to SCHEDULED
12/15/2017 14:41:49     Sink: Unnamed(2/4) switched to SCHEDULED
12/15/2017 14:41:49     Sink: Unnamed(3/4) switched to SCHEDULED
12/15/2017 14:41:49     Sink: Unnamed(4/4) switched to SCHEDULED
12/15/2017 14:41:49     Source: Collection Source(1/1) switched to DEPLOYING
12/15/2017 14:41:49     Sink: Unnamed(1/4) switched to DEPLOYING
12/15/2017 14:41:49     Sink: Unnamed(2/4) switched to DEPLOYING
12/15/2017 14:41:49     Sink: Unnamed(3/4) switched to DEPLOYING
12/15/2017 14:41:49     Sink: Unnamed(4/4) switched to DEPLOYING
12/15/2017 14:41:50     Source: Collection Source(1/1) switched to RUNNING
12/15/2017 14:41:50     Sink: Unnamed(2/4) switched to RUNNING
12/15/2017 14:41:50     Sink: Unnamed(4/4) switched to RUNNING
12/15/2017 14:41:50     Sink: Unnamed(3/4) switched to RUNNING
12/15/2017 14:41:50     Sink: Unnamed(1/4) switched to RUNNING
12/15/2017 14:41:50     Source: Collection Source(1/1) switched to FINISHED
12/15/2017 14:41:50     Sink: Unnamed(3/4) switched to FINISHED
12/15/2017 14:41:50     Sink: Unnamed(4/4) switched to FINISHED
12/15/2017 14:42:50     Sink: Unnamed(1/4) switched to FAILED
<b>  org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
after 60000 ms. </b>

12/15/2017 14:42:50     Sink: Unnamed(2/4) switched to FAILED
org.apache.kafka.common.errors.TimeoutException: Failed to update metadata 
after 60000 ms.

12/15/2017 14:42:50     Job execution switched to status FAILING.

组织。阿帕奇。Kafka。常见的错误。TimeoutException:在60000毫秒后更新元数据失败。2017年12月15日14:42:50作业执行切换到失败状态。[错误](run-main-0)组织。阿帕奇。Flink。运行时。客户JobExecutionException:作业执行失败。组织。阿帕奇。Flink。运行时。客户JobExecutionException:作业执行失败。在org。阿帕奇。Flink。运行时。工作经理。JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6。在org上申请$mcV$sp(JobManager.scala:933)。阿帕奇。Flink。运行时。工作经理。JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6。申请(JobManager.scala:876)组织。阿帕奇。Flink。运行时。工作经理。JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6。在scala申请(JobManager.scala:876)。同时发生的impl。未来$PromiseCompletingRunnable。在scala,liftedTree1美元(未来,scala:24)。同时发生的impl。未来$PromiseCompletingRunnable。在akka跑步(Future.scala:24)。派遣任务调用。在akka运行(AbstractDispatcher.scala:40)。派遣ForkJoinExecutor配置程序$AkkaForkJoinTask。scala的执行官(AbstractDispatcher.scala:397)。同时发生的分叉连接。这是一项艰巨的任务。scala的doExec(ForkJoinTask.java:260)。同时发生的分叉连接。ForkJoinPool$WorkQueue。scala上的runTask(ForkJoinPool.java:1339)。同时发生的分叉连接。福克斯:泳池。runWorker(ForkJoinPool.java:1979)在

斯卡拉。同时发生的分叉连接。ForkJoinWorkerThread。运行(ForkJoinWorkerThread.java:107)的原因是:org。阿帕奇。Kafka。常见的错误。TimeoutException:在60000毫秒后更新元数据失败。[trace]堆栈跟踪被抑制:最后运行*:运行完整输出。JAVAlang.RuntimeException:scala的非零退出代码:1。系统。套餐美元。错误(package.scala:27)[trace]堆栈跟踪被抑制:运行上次编译:运行完整输出。[错误](编译:运行)非零退出代码:1[错误]总时间:75秒,已完成2017年12月15日下午2:42:51

共有1个答案

冯阳云
2023-03-14

请检查并确保您的Kafka服务器正在运行。此错误通常发生在Flink程序无法连接到Kafka服务器时。Flink会在一定时间内自动尝试连接到Kafka服务器。一旦达到这个临界值,Flink仍然无法与Kafka建立联系,那么它就会抛出这个组织。阿帕奇。Kafka。常见的错误。超时异常

请检查您的Kafka服务器详细信息,Kafka主题,并验证您的Kafka服务器是否正在运行。

 类似资料:
  • 我有两个代理1.0.0Kafka集群,我正在针对这个Kafka运行1.0.0Kafka流API应用程序。我增加了制片人的要求。暂停。毫秒到5分钟来修复生产者超时异常。 目前,在运行一段时间后,我发现以下两种类型的异常。我试图按照ApacheKafka中的建议修复这些异常:TimeoutException,然后什么都不起作用‏ 但不完整的解决方案就在这里。建议使用此解决方案(减少生产批量)。请帮忙。

  • 我是KAFKA的新手,我知道这个问题在stack overflow上被问了很多次,但没有一个解决方案对我有效,所以我在这里再次问同样的问题,试试我的运气。我已经在Centos7 VM上下载并安装了KFKA。虚拟机在我的笔记本电脑上。当我从命令行运行KAFKA生产者和消费者时,它工作得很好。下一步,我想创建一个Java生产者,但它总是超时,并出现以下异常。 生产者的 Java 代码是: 对于引导服务

  • 我试图从JMS源读取数据,并将它们推送到KAFKA主题中,几个小时后,我观察到推送到KAFKA主题的频率几乎为零,经过一些初步分析,我在FLUME日志中发现以下异常。 my flume显示max.request的当前设置值(在日志中)。尺寸为1048576,明显小于1399305,增加了此最大要求。大小可能会消除这些异常,但我无法找到更新该值的正确位置。 我的水槽。配置, 任何帮助都将不胜感激!!

  • 我的应用程序使用一台机器上运行的Kafka服务器上的消息,然后将它们转发给另一台在其他实例上运行的远程Kafka服务器。在我将应用程序部署到Cloud Foundry并向第一台Kafka服务器发送消息后,应用程序按预期工作。消息被消费并转发到远程Kafka。 然而,在这之后,我在Cloud Foundry(以及在我的本地机器上以较慢的速度)中得到了下面的无限循环异常: StackTrace: 我的

  • 我们使用SpringKafka流生产者来产生Kafka主题的数据。当我们做弹性测试时,我们得到了下面的错误。 `2020-08-28 16:18:35.536警告[,,]26---[ad|producer-3]好的客户。制作人内部。发件人:[Producer clientId=Producer-3]在分区topic1-0上的生成请求中收到无效元数据错误,原因是组织。阿帕奇。Kafka。常见的错误。

  • 所以在离开我的Android Studio项目一段时间后,我运行了所有的更新。