当前位置: 首页 > 知识库问答 >
问题:

我对承认的原则感到困惑

乐山
2023-03-14

版本:2.1.11

问题:我有一些错误的参数配置:max.poll.records:500 max.poll.interval.ms:10000。但消耗大约需要25000秒。因此,它将遇到异常:org.apache.kafka.clients.consumer.CommitFailedException:提交无法完成,因为组已经重新平衡并将分区分配给了另一个成员。这意味着对poll()的后续调用之间的时间比配置的max.poll.interval.ms长,这通常意味着poll循环花费了太多时间处理消息。您可以通过增加会话超时,或者通过使用max.poll.records减小poll()中返回的批的最大大小来解决这一问题。

然后我修正了参数,这样这个错误就解决了。

但是,我很困惑,我已经提交了由ack.acknowledge()消费的一条消息。为什么偏移量根本不能提交?这是代码

@KafkaListener(topics = "${kafka.topic}")
public void consume(ConsumerRecord<String, SyncResMessage> record, Acknowledgment ack) {
    
    try {
        // consume message
        consumerService.dealResource(record.value());
    } catch (Exception e) {
        LOGGER.error("error when consume data, data key is {}, exception is {}.", record.key(), e);
    }

    if (ack != null) {
        LOGGER.info("commit successfully.");
        ack.acknowledge();
    } else {
        LOGGER.error("message of commit is null, record is {}.", JsonUtil.toString(record));
    }
}

任何帮助都将不胜感激~

共有1个答案

苏浩瀚
2023-03-14

也许这就是答案。手动表示不立即提交,它将更新此映射。MANUAL_IMMEDIATE表示立即提交。

 类似资料:
  • 我用这样的JAVA_OPTS启动了 jvm: 过了一段时间,当我查看GC日志时:

  • 我正在尝试提出一种解决方案,它涉及在连接操作之后应用一些逻辑,从多个中的中选择一个事件。这类似于reduce函数,但它只返回1个元素,而不是递增地返回。因此最终结果将是单个(,对,而不是一个 每个键保证只到达一次。 假设像上面这样的连接操作,它用4个生成了1个,成功地连接并收集在。现在,我想做的是,立即访问这些值,并执行一些逻辑以将正确匹配到一个。例如,对于上面的数据集,我需要(,和)。 将为每个

  • 所以我一直在读Kafka的语义学,我对它的工作原理有点困惑。 我理解生产者如何避免发送重复的消息(以防代理的ack失败),但我不明白的是,在消费者处理消息但在提交偏移量之前崩溃的情况下,一次是如何工作的。Kafka不会在这种情况下重试吗?

  • 但是如果我对if语句进行注释,则输出是正确的,即 所以,我不知道它为什么会发生,如何修复?这是我的代码:

  • 问题内容: 我可以理解以下定义: 每个对象都有一个标识,一个类型和一个值。一旦创建了对象,其身份就永远不会改变。您可能会认为它是对象在内存中的地址。所述操作者比较两个对象的身份; 该函数返回一个表示其身份的整数。 我认为上面的定义在创建“某物”时起作用,例如: 但是我不理解: 我还没有创建任何东西。那么整数“ 1”如何具有ID?这是否意味着只要我在Python Shell中“提及” 1,便立即将其

  • 问题内容: 我已经在eclipse中创建了一个项目,并添加了Maven依赖项。在Eclipse中,它表示我正在使用JRE 1.5。一切在Eclipse中都可以正常运行,例如,我可以运行测试。 当我尝试从终端运行时,出现以下错误。 …在-source 1.3中不支持泛型(使用-source 5或更高版本来启用泛型)… 看来,Maven认为我正在使用JRE 1.3,并且无法识别泛型或for-each循