当前位置: 首页 > 知识库问答 >
问题:

简单-Kafka-消费者消息传递复制

东方华晖
2023-03-14

我正在尝试用Java实现一个简单的生产者-->Kafka-->消费者应用程序。我能够成功地生成和使用消息,但是当我重新启动消费者时,问题就出现了,其中一些已经使用的消息再次被消费者从Kafka中拾取(不是所有的消息,而是最近使用的一些消息)。

我已在我的使用者中设置了AutoOffset.Reset=Light,并且我的AutoCommity.Interval.ms属性设置为1000毫秒。

“重新传递一些已使用的消息”是一个已知的问题,还是有任何其他设置,我没有在这里?

基本上,有没有一种方法可以确保以前使用过的消息不会被使用者接收到/使用?

共有1个答案

封永嘉
2023-03-14

Kafaka使用Zookeeper存储消费者抵消。由于Zookeeper操作非常慢,因此不建议在使用每个消息后提交offset。

可以向使用者添加shutdown钩子,在退出之前手动提交主题偏移量。但是,在某些情况下(如jvm崩溃或kill-9),这不会有帮助。为了防止这种情况,我建议实现自定义提交逻辑,在处理每个消息(文件或本地数据库)后在本地提交偏移量,并且每1000ms向Zookeeper提交偏移量。当使用者启动时,这两个位置都应该被查询,并且两个值中的最大值应该被用作消耗偏移量。

 类似资料:
  • 我是Kafka的新手,运行一个简单的Kafka消费者/生产者的例子,就像在Kafka消费者和KafkaProducer上给出的那样。当我从终端运行消费者时,消费者正在接收消息,但我不能使用Java代码监听。我也在StackoverFlow上搜索了类似的问题(链接: Link1,Link2),并尝试了解决方案,但似乎没有什么对我有用。kafka版本:和相应的maven依赖在pom中使用。 Java生

  • 我是Kafka的新手,我对消费者的理解是,基本上有两种类型的实现 1)高级消费者/消费者群体 2)简单消费者 高级抽象最重要的部分是当Kafka不关心处理偏移量,而Simple消费者对偏移量管理提供了更好的控制时使用它。让我困惑的是,如果我想在多线程环境中运行consumer,并且还想控制偏移量,该怎么办。如果我使用消费者组,这是否意味着我必须读取存储在zookeeper中的最后一个偏移量?这是我

  • 我有一个 Kafka 应用程序,我一直在使用它 kafka-console-consumer.sh 使用消息,如下所示: 它提供了我通过Kafka消费者给Kafka经纪人写的所有消息,没有任何遗漏。 最近,我将该应用程序部署在另一个环境中,因为某些原因,zookeperhost无法访问。所以我使用的是kafka简单的消费者外壳。sh,如下所示: 但是有了这个,我看到很少的消息(大约5000个中有2

  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认

  • 我正在使用这个库来实现节点kafka与消费者暂停和恢复方法来处理背压。我已经创建了一个小演示,我可以在其中和,但问题是在后它停止了消费消息。 这是我的代码。 任何人都可以帮助我,我在恢复消费者时做错了什么?当我启动使用者时,它只接收一条消息,并且在恢复后仍然不消耗任何其他消息。

  • 有什么方法可以阻止Kafka的消费者在一段时间内消费信息吗?我希望消费者停止一段时间,然后开始消费最后一条未消费的消息。