当前位置: 首页 > 知识库问答 >
问题:

springboot消费者不会消费任何消息

印劲
2023-03-14

我有一个springboot消费者应用程序。当我第一次运行它时,它消耗了来自Kafka主题的信息。但当我再次运行它时,它停止了消耗。在日志中,我看到以下消息。

2022-08-02 00:14:08.130  INFO 96561 --- [ntainer#0-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-my-transformation-local-1, groupId=my-transformation-local] Fetch position FetchPosition{offset=17400, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}} is out of range for partition CraveVideoTestLog-0, resetting offset
2022-08-02 00:14:08.135  INFO 96561 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-my-transformation-local-1, groupId=my-transformation-local] Resetting offset for partition CraveVideoTestLog-0 to position FetchPosition{offset=56464, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 1 rack: null)], epoch=0}}.

我知道消费者无法获得偏移量。在这种情况下,消费者将引用自动偏移重置属性。如您所见,我已将其设置为最早,希望消费者从头开始阅读。但它没有。

应用程序. yml

spring:
  application:
    name: my-transformation
  kafka:
    bootstrap-servers: ${BOOTSTRAP_SERVERS:localhost:9092}
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
      retries: 2
      compression-type: snappy
    consumer:
      group-id: ${GROUP_ID:my-transformation}
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

在我的Java课上

    @SendTo("output-topic-here")
    @KafkaListener(topics = "${my-transformation.input.topic}", errorHandler = "myErrorHandler")
    public Message<?> process(Message<String> in, ConsumerRecord<String, String> record) throws Exception {
        log.info("Going to process event for key: {} offset: {} partition: {}", record.key(), record.offset(), record.partition());
    }

我尝试了一些东西。

  • 我将自动偏移重置值设置为none。不出所料,它抛出了一个异常,抱怨偏移量
  • 我创建了一个新的消费者群体。我可以看到,新的消费者群体是从融合的UI创建的。但是,同样,消息没有被消耗
  • 我读了很多帖子。以下是一些与我面临的问题非常接近的问题。链接1链接2

我觉得我错过了一些很傻的东西,但我不知道是什么。

共有2个答案

宦烈
2023-03-14

最后,我发现了我的消费者行为的原因。主题的保留策略设置为删除,保留时间设置为默认(1周)。

消费者停止消费记录的原因是因为主题的保留时间已经过去。因此,主题中的整个数据都被删除了,因此消费者没有什么可读的。当我再次将新记录推送到主题中时,消费者开始消费。

习洲
2023-03-14

属性将按以下方式工作。

用例1:消费者启动并拥有auto.offset。reset=最新,并且主题分区当前具有从某个范围到另一个范围的偏移量数据。消费者群体之前已经为该主题提交了一些补偿。消费者将从何处阅读?

Ans:这个消费者组和主题分区的偏移量已经提交,所以忽略auto.offset.reset属性

用例2:一个消费者启动并拥有auto.offset.reset=none,主题分区当前拥有从某个范围到另一个范围的偏移数据。消费者团体之前已经promise了一些话题的补偿。消费者将从哪里阅读?

答:auto.offset.reset=无意味着如果消费者正在恢复的偏移量已从Kafka中删除,则消费者将崩溃。

用例3:消费者具有auto.offset。reset=最新,并且主题分区当前具有从某个范围到另一个范围的偏移量数据。消费者群体以前从未为该主题提交过补偿。消费者将从何处阅读?

回答:最新意味着数据检索将从偏移当前结束的位置开始。

用例4:消费者拥有auto.offset.reset=earliest,并且主题分区当前拥有从某个范围到另一个范围的偏移量的数据。消费者组织以前从未promise过这个话题的补偿。消费者将从哪里阅读?

答:最早意味着数据检索将从分区的开头开始

用例5:消费者启动并具有auto.offset.reset=最早,并且主题分区当前具有从某个范围到其他范围的偏移量的数据。消费者组之前已经为主题提交了一些偏移量。消费者将从哪里读取?

Ans:这个消费者组和主题分区的偏移量已经提交,所以忽略auto.offset.reset属性

 类似资料:
  • 是否有一种方法以编程方式访问和打印使用者滞后偏移,或者说使用者读取的最后一条记录的偏移与某个生产者写入该使用者分区的最后一条记录的偏移之间的位置差。 要知道我的最终目标是将这个值发送到prometheus进行监视,我应该在上面添加哪些语句来得到滞后偏移值?

  • 我正在尝试让 kafka 消费者获取在 Java 中生成并发布到主题的消息。我的消费者如下。 consumer.java 当我运行上面的代码时,我在控制台中什么也看不到,屏幕后面的java producer程序正在‘AATest’主题下不断地发布数据。另外,在动物园管理员控制台中,当我尝试运行上面的consumer.java时,我得到了以下行 此外,当我运行指向 AATest 主题的单独控制台使用

  • 我有一个简单的Kafka设置。生成器正在以较高的速率向单个分区生成具有单个主题的消息。单个使用者正在使用来自此分区的消息。在此过程中,使用者可能会多次暂停处理消息。停顿可以持续几分钟。生产者停止产生消息后,所有排队的消息都将由使用者处理。生产者产生的消息似乎不会立即被消费者看到。我使用的是Kafka0.10.1.0。这里会发生什么?下面是使用消息的代码部分: 代理上的所有配置都保留为kafka默认

  • 我在ActiveMQ中使用异步消息使用者。我的制作人工作正常,向队列发送消息。现在,我的异步消息消费者正在等待调用onMessage(),但这从未发生过。因此,问题是: 异步使用者不会使用消息 ActiveMQ日志的快照还显示了许多刚刚堆积在挂起状态中的消息: 我想不出问题到底出在哪里。 计数: toPageIn 78 只是不断增加,信息仍然无法传递给消费者。 是服务器端问题还是客户端问题?

  • 我花了几个小时想弄清楚发生了什么,但没能找到解决办法。 这是我在一台机器上的设置: 1名zookeeper跑步 我正在使用kafka控制台生成器插入消息。如果我检查复制偏移量(

  • 我要求从主题中读取消息,对它们进行批处理,然后将批处理推送到外部系统。如果批处理因任何原因失败,我需要再次使用同一组消息并重复该过程。因此,对于每个批处理,每个分区的 from 和 to 偏移量都存储在数据库中。为了实现这一点,我通过向读取器分配分区来为每个分区创建一个Kafka使用者,基于先前存储的偏移量,使用者寻求该位置并开始读取。我已关闭自动提交,并且不提交来自使用者的偏移量。对于每个批处理