Kafka 消费者在每个投票中轮询 500 条消息()
。我们禁用了启用自动提交 = 假
,
假设我们已成功处理 100 条消息,偏移量也为 100
现在在第101条消息中,我们遇到了一个错误,我们没有提交偏移量
但是因为我们已经有了500条消息,所以我们处理了第102条消息,我们成功地处理了它,并且我们还提交了第102条消息的偏移量。
雀:
>
如果您promise已在偏移量 102 处成功处理了消息(因此您提交了偏移量 103),则除非手动查找到先前位置,否则在该偏移量之前将不会再收到任何消息。
克服此问题的一种常见方法是实现死信队列(DLQ),您可以在线找到有关此模式的信息。简而言之,如果错误不可检索(例如无效数据),您将消息发送到另一个主题,以便在主应用程序继续处理较新消息时手动排除发生的事情。这种方法将要求您设置某种警报,当消息发送到DLQ时通知您,并准备一些工具来检查这些消息,并在必要时将它们重新发送到主队列(例如在修复错误之后)
如果您没有在异常时中断记录批处理的轮询循环,则会删除该记录,并且您已实现“最多一次处理”。
但是,数据在代理中仍然可用,但您需要倒带使用者(使用 seek 方法),并且轮询偏移量为 101-601。然后,您最终会得到“至少一次处理”,因为您将多次处理偏移量101和102。您需要更新任何处理逻辑,以确保这不会造成意外的副作用或错误。
除了让消费者崩溃,让它重新启动并再次轮询(可能会一次又一次崩溃)之外,你可以尝试创建一个生产者,除非这会将数据推到一个全新的主题(称为死信主题)。然后,您需要编写更多的消费者处理代码来检查和处理这些事件。
我在使用者组中轮询来自 Kafka 的消息时遇到问题。我的使用者对象分配给给定的分区 之后,消费者向该分区分配: 之后,我可以计算分区内的消息 和 ..... 在我的主题中有超过30000条消息。问题是我只收到一条消息。 具有< code > max _ poll _ records = 200 < code > AUTO _ OFFSET _ RESET 的消费者配置是最早的 这是我的函数,我正
假设我的使用者从一个代理轮询,该代理有多个主题,每个主题有多个分区。我在同一个消费群体中总共有5个消费者。如果我的每个消费者都进行投票,将返回的数据顺序是什么? topicD-分区5 我的问题是,在这个单一的1轮询中,在按顺序移动到下一个主题/分区之前,我会收到来自该主题/分区的所有可用消息吗?意思例如: 在一次投票循环中,我收到了这个... 或者在那个单一的1轮询循环中,有可能接收到这个消息顺序
我想在特定时间停止对特定主题的轮询。 Spring防尘套2.X Springkafka 2.5.5 Kafka版本2.5.1 比如即使有消息进来测试题目分区,消息也是从00到01堆在分区里,没有消耗。 01点之后,我想再次使用有关TEST主题的消息。 如何暂停和恢复?
我试图在我的spring boot项目中使用spring kafka来阅读来自我的kafka的消息。我正在使用@KafkaListener,但问题是我的消费者总是在运行。只要我从控制台生成一条消息,它就会在我的应用程序中弹出。我想定期投票。我怎样才能做到这一点? } 这是我的消费者配置:
我的问题是,我无法足够快地轮询我的队列,以保持我的队列为空或接近空。我最初的想法是,我可以让使用者以x/s的速率通过Camel从SQS接收消息。从那里,我可以简单地创建更多的消费者,以达到我需要的消息处理速度。 我的消费者: 如图所示,我设置了和以提高消息的速率,但是我无法生成具有相同endpoint的多个使用者。 我在文档中读到,我相信SQSendpoint也是如此,因为生成多个使用者将只给我一
Kafka consumer有一个配置< code>max.poll.records,它控制对poll()的单次调用中返回的最大记录数,其默认值为500。我将它设置为一个很高的数字,这样我就可以在一次轮询中获得所有的消息。然而,即使这个主题有更多的信息,在一次呼叫中,民意调查只返回几千条信息(大约6000条)。< br> 如何进一步增加单个消费者阅读的邮件数量?