当前位置: 首页 > 知识库问答 >
问题:

Kafka10.2新消费者与旧消费者

和和煦
2023-03-14

我花了几个小时想弄清楚发生了什么,但没能找到解决办法。

这是我在一台机器上的设置:

  • 1名zookeeper跑步

我正在使用kafka控制台生成器插入消息。如果我检查复制偏移量(cat复制偏移量检查点),我会看到我的消息被Kafka正确接收。

现在我使用kafka控制台消费者(新):

sudo bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic testTopicPartitionned2

我没有看到任何消耗。我尝试删除我的日志文件夹(/tmp/kafka-logs-[1,2,3]),创建新主题,仍然没有。

然而,当我使用老Kafka消费者时:

sudo bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic testTopicPartitionned2

我可以看到我的消息。

我是否错过了让这个新消费者工作的重要因素?

提前谢谢。


共有2个答案

冯文彬
2023-03-14

尝试将所有代理提供给bootstrap server参数,以查看是否注意到任何差异:

sudo bin/kafka-console-consumer.sh --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --from-beginning --topic testTopicPartitionned2

此外,您的主题名称相当长。我假设您已经确保提供了正确的主题名称。

阚吕恭
2023-03-14

查看消费者使用的自动设置。抵消重置属性

这将影响没有先前提交的偏移量的使用者组在设置从何处开始从分区读取消息方面所做的工作。

查看Kafka文档了解更多信息。

 类似资料:
  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 由于它是一个Spring Boot应用程序,默认偏移量设置为Latest。我在这里做错了什么,请帮我弄明白。

  • 本文向大家介绍Kafka 新旧消费者的区别相关面试题,主要包含被问及Kafka 新旧消费者的区别时的应答技巧和注意事项,需要的朋友参考一下 旧的 Kafka 消费者 API 主要包括:SimpleConsumer(简单消费者) 和 ZookeeperConsumerConnectir(高级消费者)。SimpleConsumer 名字看起来是简单消费者,但是其实用起来很不简单,可以使用它从特定的分区

  • Kafka消费者不接收在消费者开始之前产生的消息。 ConsumerRecords始终为空 虽然,如果我启动我的消费者比生产者比它接收消息。(Kafka-客户端版本2.4.1)

  • 我的应用程序有一个生产者和一个消费者。我的生产者不定期地生成消息。有时我的队列会是空的,有时我会有一些消息。我想让我的消费者监听队列,当有消息在其中时,接受它并处理这条消息。这个过程可能需要几个小时,如果我的消费者没有完成处理当前消息,我不希望他接受队列中的另一条消息。 我认为AKKA和AWS SQS可以满足我的需求。通过阅读文档和示例,akka-camel似乎可以简化我的工作。 我在github

  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认