当前位置: 首页 > 知识库问答 >
问题:

KafkaConsumer恢复分区无法继续接收未提交的消息

邓季
2023-03-14

我用的是一个主题,一个分区,一个消费者,Kafka客户端版本是0.10

我得到了两个不同的结果:

>

  • 如果我先暂停分区,然后生成消息并调用恢复方法。Kafka消费者可以成功轮询未提交的消息。

    但是,如果我先生成消息并且没有提交其偏移量,则在几秒钟后暂停分区以调用reach方法。Kafka消费者不会收到未提交的消息。我使用 kafka-consumer-groups.sh 在Kafka服务器上检查了它,它显示了日志 - 结束 - 偏移量减去电流 - 偏移量 = LAG = 1

    我一直试图弄清楚它两天,我重复了很多次这样的测试,结果总是这样。我需要一些建议,或者有人可以告诉我它的Kafka的原始机制。

  • 共有1个答案

    蔚弘量
    2023-03-14

    对于您的观察#2,如果您重新启动应用程序,它将向您提供未提交偏移的所有记录,即缺失的记录,如果您的消费者再次不提交,它将在应用程序重新启动时向Kafka注册消费者时再次发送。这是意料之中的。

    假设您正在使用< code>consumer.poll()创建一个混合流接口,即在上述< code >持续时间内累积进入Kafka的数据,并在持续时间结束后将其提供给消费者进行处理。这种连续的累积发生在后端,并且不依赖于您是否提交了偏移。

    Kafka消费者

    消费者的位置给出了将要给出的下一个记录的偏移量。它将比用户在该分区中看到的最高偏移量大1。每当消费者在一个轮询呼叫(long)中收到消息时,它就自动前进。

     类似资料:
    • 我要做的是暂停< code>KafkaConsumer,如果在使用消息的过程中出现错误。 这是我写的 然后我写了一个REST服务来恢复消费者 现在,我有两个问题。第一个问题:当我打电话给消费者时。来自<code>@KafkaListener</code>注释方法的pause()会发生什么?消费者立即暂停,或者我可以接收到同一主题分区的其他偏移量上的其他消息。例如,我有偏移量为3的“message1

    • :) 我已经在一个(奇怪的)情况中结束了自己,简单地说,我不想使用来自Kafka的任何新记录,因此暂停主题中所有分区的sparkStreaming消费(InputStream[ConsumerRecord]),执行一些操作,最后,恢复消费记录。 首先这可能吗? 我一直在尝试这样的事情: 但是我得到了这个: 任何帮助我理解我遗漏了什么,以及为什么当消费者明确分配了分区时我会得到空结果的帮助都将受到欢

    • 我在这里查看的是KafkaConsumer源代码。通过查看源代码(java版本,我不太熟悉),暂停一个topicpartition似乎将其偏移量状态设置为Paused。我试图弄清楚KafkaConsumer在后台执行的to获取操作发生了什么,暂停主题分区是否会停止kafka使用者执行的预取操作,还是它继续预取,但在轮询时不返回该主题分区的ConsumerRecords。

    • 如果我创建上面的类并尝试在tomcat7上部署war,我会看到以下错误。

    • 我是ElasticSearch的新手,我遵循这里的说明:https://www.elastic.co/guide/en/elasticsearch/reference/current/docker.html 每当我运行“docker compose up”时,Kibana总是说“无法恢复连接”,但如果我运行curlhttp://localhost:9200,我会得到回复: 下面是我的docker

    • 问题内容: 我正在使用KafkaConsumer 0.10 Java api。我想从特定的分区和特定的偏移量中消费。我抬起头,发现有一个搜索方法,但是抛出异常。任何人都有类似的用例或解决方案? 码: 例外 问题答案: 你可以之前,你首先需要一个主题 或 主题,以消费者的分区。也请记住,这和懒惰- 这样,你也需要做一个“虚拟来电”,以才可以使用。 注意:从Kafka 2.0开始,新版本是异步的,不能