当前位置: 首页 > 知识库问答 >
问题:

KAFKA:60000毫秒后无法更新元数据

谢高峯
2023-03-14

我是KAFKA的新手,我知道这个问题在stack overflow上被问了很多次,但没有一个解决方案对我有效,所以我在这里再次问同样的问题,试试我的运气。我已经在Centos7 VM上下载并安装了KFKA。虚拟机在我的笔记本电脑上。当我从命令行运行KAFKA生产者和消费者时,它工作得很好。下一步,我想创建一个Java生产者,但它总是超时,并出现以下异常。

    Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
    at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1186)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:880)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:690)
    at com.soft.teradata.KafkaProducerExample.runProducer(KafkaProducerExample.java:40)
    at com.soft.teradata.KafkaProducerExample.main(KafkaProducerExample.java:55)
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.

生产者的 Java 代码是:

package com.soft;
import java.util.Properties;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class SimpleProducer {
    public static void main(String[] args) throws Exception {
        try {
            String topicName = "Hello-Kafka";
            Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.xxx.xxx:9092");
            props.put("acks", "all");
            props.put("retries", 1);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            Producer<String, String> producer = new KafkaProducer<String, String>(props);
            Future<RecordMetadata> f=producer.send(new ProducerRecord<String, String>(topicName, "Eclipse"));
            System.out.println("Message sent successfully");
            producer.close();
        } catch (Exception e) {
            System.out.println(e.getMessage());
        } 
            System.out.println("Successful");

    }
} 

对于引导服务器,我甚至尝试了以下内容:

props.put(“bootstrap.servers”、“原告://192.168.xxx.xxx:9092”);

请注意,我正在笔记本电脑上执行Eclipse的java代码,KAFKA安装在笔记本电脑上的CENTOS7虚拟机上。192.168.xxx.xxx是CENTOS7虚拟机的IP地址。我注意到我的笔记本电脑无法访问192.168.xxx.xxx:9092(telnet192.168.xxx.xxx9092)。我将端口添加到防火墙,但仍然没有成功。

firewall-cmd --zone=public --add-port=9092/tcp --permanent
        firewall-cmd --reload
        firewall-cmd --list-ports

KAFKA的版本是2.12-2.0.0,我在Eclipse Classpath中添加了以下jar:

  • kafka-clients-2.0.0.jar
  • lz4-java-1.4.1.jar
  • slf4j-api-1.7.25.jar
  • snapy-java-1.1.7.1.jar

非常感谢你的帮助:)

迪尔沙·雷加德斯。

共有3个答案

堵茂勋
2023-03-14

我在本地运行集群时遇到了同样的问题,我检查了主题是否是在代理中创建的:

./kafka-topics.sh --list  --bootstrap-server=localhost:9092

创建了我的代码,所以我再次检查了代码,我意识到在发送消息时主题名中有一个尾随空格,我删除了空格,它就可以工作了。

华誉
2023-03-14

我也面临同样的问题。您需要通告Kafka broker的主机名/ip,以便可以从Kafka Producer pc访问。

 kafka-server-start.sh config/server.properties --override  advertised.listeners=PLAINTEXT://<accessible-hostname>:9092
江渊
2023-03-14

这个错误可能表明主题不存在,所以您可能需要仔细检查您的topicName = "Hello-Kafka "。

虽然这并不是一个深刻的答案,但它似乎是一个常见的问题,另请参阅https://github.com/dpkp/kafka-python/issues/607

 类似资料:
  • 我正在编写一个Flink-Kafka集成程序,如下所示,但Kafka出现超时错误: 从终端我可以看到Kafka和zookeeper正在运行,但当我从Intellij运行上面的程序时,它显示了这个错误: 组织。阿帕奇。Kafka。常见的错误。TimeoutException:在60000毫秒后更新元数据失败。2017年12月15日14:42:50作业执行切换到失败状态。[错误](run-main-0

  • 我用基本的图形用户界面创建了一个小画板程序。我使用了画图组件方法。我想每毫秒更新正在绘制的图形。这样,用户可以在释放鼠标点击之前看到他们将要绘制的内容。例如,如果我正在绘制一个矩形,我想在绘制矩形时看到它。如果你不明白我到底在说什么,打开微软油漆,点击矩形工具。画一个矩形。注意它是如何持续更新的,而不是在释放鼠标后。我想一定有办法让它每毫秒更新我的图形。做这件事最好的方法是什么?抱歉,如果这是一个

  • timeoutException:60000 ms后更新元数据失败。 10分钟后,由于Broker2是新的领导者,我希望生产者发送数据给Broker2,但它继续失败,给出了上述异常。lastRefreshMs和lastSuccessfullRefreshMs仍然相同,尽管生产者的metadataExpireMs是30万。 我正在使用Kafka新的生产者方面的生产者实现。 配置参数。 用例: 1-启

  • 问题内容: 我知道POSIX 函数会使程序hibernatex秒。在C ++中是否有使程序hibernatex 毫秒 的功能? 问题答案: 请注意,毫秒级没有标准的C API,因此(在Unix上)您必须满足,它接受微秒:

  • 根据要求,我的代码应该将ZonedDateTime参数中的日期和OffSetTime参数中的时间附加到格式“yyyy-MM-dd-HH:MM:ss.SSSz”中。然而,我没能做到这一点 我用DateTimeFormatter尝试了各种方法,包括下面的方法。 我注意到:-代码在LocalDateTime抛出“java.time.DateTimeException:无法提取值:class java.t

  • 问题内容: 我正在编写一个赛车应用程序,想要将大量毫秒转换为minutes:seconds.milliseconds(例如)。目前,我只是通过数学()进行运算,然后求余数并进一步除法,依此类推,但是我发现它时不时关闭1毫秒。我知道这听起来似乎没什么大不了的,但是我试图找到和的平均值,然后返回,这意味着它输入不正确,而应用程序的其余部分无法处理。 有没有一种方法使用或格式化毫秒到MM或类似的东西:s