当前位置: 首页 > 知识库问答 >
问题:

Kafka不使用消息

邹京
2023-03-14

我已经更新了我的Kafka从版本0.10.2.0到版本2.1.0,现在Kafka不能消费消息。我使用Spring引导,这是我的配置:

java prettyprint-override">configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaServers);
configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, topic + "kafka2");
configProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
configProperties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 3000000);
configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configProperties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, 2000000000);
configProperties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "900000");
configProperties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "905000");
configProperties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "35000");

我已经更改了组id,以避免旧组id出现问题。我当前的spring版本是2.1。2.释放。在我的应用程序中,我可以看到我的客户是如何不断地重新连接的

你知道这个问题吗?

共有1个答案

慕永年
2023-03-14

在谈论“spring版本”时,您应该更具体一些。有许多不同版本的spring项目。

我想你是说Spring靴是2.1。2(顺便说一下,当前的2.1.x版本是2.1.6)。

Spring靴2.1。6将Spring用于ApacheKafka 2.2。7这反过来又使用了kafka客户端2.0。1.

我不认为使用2.1.0代理的2.0. x kafka客户端会有任何问题,但是您可以尝试将kafka客户端版本重写到2.1.0,如参考手册中所述。

同样,我建议为apache启用调试日志记录。Kafka查看它是否为您提供了更多信息。

 类似资料:
  • 我正在使用spring Kafka来使用LinkedIn支持大型消息的Kafka客户端生成的消息 假设此Kafka客户端总是将重写为none(如其构造函数所示)。 因此,一旦我开始使用批处理提交并将设置为false,它将抛出以下错误: 发生此问题是因为这是此使用者组第一次使用来自此主题的消息,因此它尝试使用偏移量重置策略。 虽然我将其设置为“最早”,但LinkedIn kafka客户端将其覆盖为“

  • 我已经建立了一个由3个节点组成的AWS集群。我修改了节点的/etc/hosts文件,看起来像这样 当我从其中一个节点运行命令时 bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic first_topic --from-start 它可以工作,但是当我用ip替换主机名并用下面的命令运行它时 bin/kafka-co

  • 我正在尝试将KAFKA与Spring集成,我的JAVA应用程序正在与KAFKA服务器通信,当我使用HTTP运行应用程序时,我也会收到消息。 现在我想使用 Spring 在 KAFKA 上添加 SSL,我已经完成了在 SSL KAFKA 和 SPRING KAFKA 上指定的更改 当我使用命令行(使用 SSL)运行生产者和消费者时,通信会正常发生,但是当我更改 Java 应用程序的配置并尝试生成和使

  • 我是Kafka新手,我正在使用Kafka1.0。 我使用拉取模式读取kafka消息,也就是说,我定期查看Kafka主题以获取新消息,但我没有将偏移量写回Kafka。 我会问Kafka如何知道我消耗了哪些偏移量,或者Kafka记住进度的机制是什么(Kafka偏移量)

  • 我已经开发了一个使用apache storm使用kafka消息的应用程序,当我在eclipse中运行topology using in LocalCluster时,它可以正常工作,消息也可以正常使用,但是当我使用storm命令(bin\storm jar..\kafka-storm-0.0.1-SNAPSHOT.jar com.kafka_storm.util.topology storm kaf

  • 我是Kafka的新手。我在网上读了很多关于Kafka制作人和Kafka消费者的说明。我成功地实现了前者,它可以向Kafka集群发送消息。然而,我没有完成后一个。请帮我解决这个问题。我看到我的问题像StackOverflow上的一些帖子,但我想更清楚地描述一下。我在虚拟盒子的Ubuntu服务器上运行Kafka和Zookeeper。使用1个Kafka集群和1个Zookeeper集群的最简单配置(几乎是