当前位置: 首页 > 知识库问答 >
问题:

为什么重启后消费者会阅读Kafka主题的所有消息?

康恩
2023-03-14

我使用confluent .net客户端。订阅者在重启(订阅者服务重启)后始终读取 Kafka 主题的所有消息。如何提交消费者已经实现的偏移并从中读取?也许一些消费者配置可以提供帮助...

共有2个答案

颛孙和颂
2023-03-14

你有两个选择:

>

  • 通过设置使用者属性启用自动提交EnableAutoCommit=true(消息在可配置的时间后提交,通常为5秒),或

    手动提交使用者获取的偏移量。提交(消耗结果)。

    GitHub上显示了手动提交的示例。

  • 周昊乾
    2023-03-14

    这只是一个猜测,但如何声明消费者的组id?我见过一些使用这种随机赋值的例子:

         ["group.id"] = Guid.NewGuid().ToString(),
    

    如果每次启动使用者时都声明一个新的/随机 group.id,这将导致在每次执行时注册一个新的使用者组,这涉及 auto.offset.reset 启动。

    如果此属性设置为“forest”,则每次启动消费者时(假设他们每次都有不同的group.id),他们将从第一个可用的偏移量开始,就像您的情况一样,从头开始再次读取所有消息。

    如果此属性设置为“< code>latest”,并且您的生成器当前没有发送任何消息,您将无法读取任何内容,这可能会造成一些混乱。

    尝试设置一个固定的< code>group.id:开始使用,当消息在代理上仍然可用时停止流程,并再次启动使用者,而不更改最后的< code>group.id。

    这一次,由于使用者组已经注册,< code>auto.offset.reset将被忽略,开始位置将由您提交的偏移量定义,这些偏移量默认存储在名为< code>__consumer_offsets的特殊主题中。

     类似资料:
    • 我有一个简单的java制作人,如下所示 我正在尝试读取如下数据 但消费者并没有从Kafka那里读到任何信息。如果我在处添加以下内容 然后消费者开始从题目开始阅读。但是每次消费者重新启动时,它都从我不想要的主题开始读取消息。如果我在启动消费程序时添加了以下配置 然后,它从主题中读取消息,但是如果消费者在处理所有消息之前重新启动,那么它不会读取未处理的消息。 有人可以让我知道出了什么问题,我该如何解决

    • 我有以下用例: 我有两个Kafka主题,一个是用来处理传入消息流的,另一个是用来存储记录的,作为应用程序初始状态的引导。 有没有办法做到以下几点: 当应用程序启动时,读取Kafka主题中的所有消息,并将该主题中用于将应用程序引导至初始状态的所有存储在内存中 只有在读取了所有消息后,才允许处理流主题中的 因为在应用程序运行时,状态主题上可能会有其他记录,以便在不必重新启动应用程序的情况下将它们合并到

    • 因为我是新的Kafka,所以我能够从文件中读取记录,并通过生产者将消息发送到Kafka主题,但不能通过消费者消费相同的主题。 注意:您可以从任何文本文件中读取数据,我使用的是Kafka2.11-0.9。0.0版本 这是我的密码: 下面是输出:

    • 我试图消费一个Kafka主题从Spring启动应用程序。我使用的是下面提到的版本的Spring云流 Spring boot starter父级:2.5.7 Spring云版本:2020.0.4 下面是代码和配置 application.yml 消息消费者类 下面的消息发布者正在正确地发布消息。发布者是在不同的微服务中编写的。 pom.xml

    • 我正在用java编写一个简单的Kafka使用者,它被配置为读取多个主题。目前,让我们假设两个主题(topic1和Topic2),并为两个主题设置一个分区。 Kafka用户从topic1和Topic2读取的顺序是什么。如果这两个主题都有,假设已经发布了100条消息。 使用者首先从topic1读取所有消息,然后再从topic2读取? 用户按时间顺序阅读,将来自两个主题的消息混合在一起? 我看了Kafk

    • 我将python kafka consumer的< code>auto_commit设置为< code>False,我正在手动提交消息。然而,重启后,消费者再次消费来自每个分区的最后一条消息。只有最后一个,不能再多。 这就是所展示的: 不知道为什么会显示滞后,whu当前偏移设置为最后一条消息而不是下一条?当我提交偏移量3时,当前偏移量不应该移动到4吗? 我提交我使用的每条消息,但是在重启时,它总是