我设法用@KafkaListener注释了方法中的topic/partition/offset,但是如何使用这些数据来实现一个只需一次的使用者逻辑呢?
我使用的是Con相当于4的ConCorr tKafkaListenerContainerFactory,并将AckMode设置为MANUAL。我目前的方法是使用redis来dedup:我使用主题:分区作为redis键,偏移量作为它的值,然后将即将到来的偏移量与redis中的值进行比较,如果偏移量比redis 1更新(更大),然后继续业务逻辑,否则我忽略消息。最后提交偏移量(ack.acknowledge())
但这种方式不起作用,例如,如果再平衡发生在ack之前。acknowledge()完成后,出现以下错误:org。阿帕奇。Kafka。客户。消费者CommitFailedException,
在重新平衡之后,原始分区被分配给另一个线程,这导致相同的消息将被消耗两次。
那么一言以蔽之,如何设计一个逻辑,能够让每条Kafka消息都精确——一次的传递?
Kafka还不完全支持一次。它将在0.11.0.0版本中提供:https://issues.apache.org/jira/browse/KAFKA-4923此版本计划于2017年6月14日发布,因此您可以等待或自己构建此复杂逻辑;-)
您必须在Kafka之外写出原子处理的最后一个偏移量,以及处理的结果。这可以是对数据库或文件,只是不要进行两次写入,使其成为数据和偏移量的单个原子写入。如果您的消费者崩溃,并且它或另一个实例重新启动或接管,您需要确保首先它读取与最后一个处理结果一起存储的最后一个偏移量,并在轮询()以获取更多消息之前寻求()到该位置。这就是今天现有的Kafka Sink Connectors中有多少可以实现EOS消耗。
问题内容: 我正在使用Java 编写使用者。我想保持消息的实时性,因此,如果有太多消息在等待使用,例如1000条或更多,我应该放弃未使用的消息,并从最后一个偏移量开始使用。 对于此问题,我尝试比较主题的最后提交的偏移量和主题的结束偏移量(仅1个分区),如果这两个偏移量之间的差大于某个值,则将主题的最后提交的偏移量设置为下一个偏移量,这样我就可以放弃那些多余的消息。 现在我的问题是如何获得主题的最终
现在我的问题是如何得到一个主题的结束偏移量,有人说我可以用老消费者,但是太复杂了,新消费者有这个功能吗?
我们有一个问题,似乎Kafka消费者没有收到发布到某个主题的消息。(我说这是因为我还没有弄清楚这件事的真相,我可能错了。) 我使用Spring for Apache Kafka,而我的消费者实际上是一个用注释的方法。 这个问题是断断续续的,我很难重新创建它。 有没有一种方法让我看看Kafka经纪人的日志,或任何其他工具,以帮助我找出抵消为我的消费者?我想要具体的证据来证明我的消费者是否收到了信息。
为什么实际主题中的偏移值与同一主题中的偏移值不同?PFB偏移位置以及使用的命令。 我错过了什么?
我是Kafka的新手,一直在尝试实现一个消费者。下面是我的场景 启动消费者应用程序 产生来自生产者的消息。这些消息被消费者消费 停止消费者并再次生成消息。当我启动消费者时,在消费者被停止时发布的消息不会被读取 虽然会消耗消息,但它会消耗发布到主题的所有消息。我想只消耗那些在消费者关闭时发布的消息。
问题内容: 在轮询Kafka时,我已经使用该功能订阅了多个主题。现在,我想设置的偏离,我想从每个主题阅读,而无需每次重新订阅后,并从一个话题。 在轮询数据之前,是否可以迭代调用每个主题名称 来 达到结果?偏移量如何精确存储在Kafka中? 我每个主题有一个分区,并且只有一个使用者可以读取所有主题。 问题答案: Kafka如何存储每个主题的偏移量? 卡夫卡已将抵销存储从动物园管理员转移到卡夫卡经纪人