当前位置: 首页 > 知识库问答 >
问题:

如何增加Kafka消费者的请求时间

汪翰墨
2023-03-14

我正在使用Kafka和Spring Listener。下面是一段代码。过去,我们发布了超过10万条消息来测试主题,系统似乎运行良好。但几天前,我更改了消费者的groupId。之后,这个新消费者尝试从一开始就处理所有消息,这需要花费大量时间。但过了一段时间(10秒),经纪人会启动消费者。所以结果没有kafka寄存器来侦听消息。

 @KafkaListener(
            topicPattern = "test",
            groupId = "test",
            id = "test",
            containerFactory = "testKafkaListenerContainerFactory")
    public void consume(@Payload String payload) throws IOException {
    }

Kafka消费者配置:

Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
    props.put("security.protocol", "SSL");

然后,我使用cli通过以下命令读取消息并观察到相同的行为。10秒后,消费者停止阅读来自Kafka的消息。

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

如何增加kafka客户端的请求超时时间或其他更好的方法来解决这个问题?

共有1个答案

夏侯浩气
2023-03-14
In the past we have published more than 1 lac message to test topic and system seems to be working fine.

你的Kafka有所需的数据吗?Kafka不会永远存储消息。消息的持续时间由代理中的保留设置控制。如果您的数据早于保留期,它将丢失。

 类似资料:
  • 本文向大家介绍Kafka 的消费者如何消费数据相关面试题,主要包含被问及Kafka 的消费者如何消费数据时的应答技巧和注意事项,需要的朋友参考一下 消费者每次消费数据的时候,消费者都会记录消费的物理偏移量(offset)的位置 等到下次消费时,他会接着上次位置继续消费

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我刚接触Kafka,很少阅读教程。我无法理解使用者和分区之间的关系。 请回答我下面的问题。 > 消费者是否由ZK分配到单个分区,如果是,如果生产者将消息发送到不同的分区,那么其他分区的消费者将如何使用该消息? 我有一个主题,它有3个分区。我发布消息,它会转到P0。我有5个消费者(不同的消费者群体)。所有消费者都会阅读P0的信息吗?若我增加了许多消费者,他们会从相同的P0中阅读信息吗?如果所有消费者

  • 我有一个简单的Kafka消费者微服务应用程序,它使用来自某个主题的消息,同一个应用程序运行在两个不同的池中。 所以,当消息由制作人生成,而我的应用程序尝试使用来自主题的消息时,它只被一个池中的一个人使用。 如何停止从消费者Kafka读取并发消息。我想在两个池中使用相同的消息。 这种情况下可能的解决方案是什么

  • 我在Kafka做数据复制。但是,kafka日志文件的大小增长很快。一天内大小达到5 gb。作为这个问题解决方案,我想立即删除处理过的数据。我正在使用AdminClient中的delete record方法删除偏移量。但当我查看日志文件时,对应于该偏移量的数据不会被删除。 我不想要类似(log.retention.hours,log.retention.bytes,log.segment.bytes