当前位置: 首页 > 知识库问答 >
问题:

Kafka:消费者api:无法手动读取和确认Kafka消费者api的偏移量

钱经赋
2023-03-14

我的用例是使用kafka消费者api,这样我们就可以从kafka主题中手动读取最后一次成功处理的数据的偏移量,然后手动确认Kafka的成功处理数据。(这是为了减少数据丢失)。然而,在我当前的实现中,程序向前移动并从下一个偏移读取,即使我注释掉了“ack.acknowledge()”。我是新来的Kafka和实现我的消费者下面的方式(我们使用Spring引导)

问题是:即使我注释掉ack.acknowledge(),偏移量仍然在更新,消费者正在从下一个偏移量中读取,这是出乎意料的(到目前为止,我的理解是)

消费者配置[请注意,我将ConsumerConfig.ENABLE_AUTO_COMMIT_Config设置为false,并将factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE)]:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Autowired
    private AdapterStreamProperties appProperties;

    @Value("${spring.kafka.streams.properties.receive.buffer.bytes}")
    private String receiveBufferBytes;

    @Bean
    public ConsumerFactory<PreferredMediaMsgKey, SendEmailCmd> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, appProperties.getApplicationId());
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, appProperties.getBootstrapServers());
        props.put(
                ConsumerConfig.GROUP_ID_CONFIG,
                "adapter");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "com.ringo.notification.adapter.serializers.PreferredMediaMsgKeyDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "com.ringo.notification.adapter.serializers.SendEmailCmdDeserializer");
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        props.put(StreamsConfig.RECEIVE_BUFFER_CONFIG, receiveBufferBytes);

        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<PreferredMediaMsgKey, SendEmailCmd>
    kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<PreferredMediaMsgKey, SendEmailCmd> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }

然后我的消费者以这种方式消费[即使我注释掉ack.acknowledge ()' , 下次它仍然从下一个偏移读取:

  @KafkaListener(topics = Constants.INPUT_TOPIC, groupId = "adapter")
  public void listen(ConsumerRecord<PreferredMediaMsgKey, SendEmailCmd> record,
                     @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
                     @Header(KafkaHeaders.OFFSET) Long offset, Acknowledgment ack) {

    System.out.println("----------------------------------------------------------------------------");
    System.out.println("Reading from Offset: " +  offset + ", and Partition: " + partition);
    System.out.println("Record for this pertition: Key : "+ record.key() + ", Value : " +   record.value());
    System.out.println("----------------------------------------------------------------------------");
    NotificationProcessedFinal result = processor.processEmail(record.key(),record.value());

    if( StringUtils.isNotEmpty(result.getErrorCmd().getErrorMsg())) {
      kafkaErrorProducerTemplate.send(adapterMsgProperties.getErrorTopic(), record.key(), result.getErrorCmd());
    }
    else {
      kafkaOutputProducerTemplate.send(adapterMsgProperties.getOutputTopic(), record.key(), result.getNotifyEmailCmd());
    }
    ack.acknowledge();
  }

我的gradle的Kafkaapi版本。建造:

//Kafka Dependencie
implementation      'org.apache.kafka:kafka-streams:2.0.1'
implementation      'org.apache.kafka:kafka-clients:2.0.1'

任何洞察力都会有所帮助。

提前谢谢

共有1个答案

平羽
2023-03-14
ack.acknowledge()

仅表示您已提交,您的消费者已成功处理该消息

这将更新消费者组偏移量,如果应用程序在没有ack的情况下停止,则当前偏移量将不会提交给kafka(您已处理但未确认的最后一条消息)。

具有相同用户组(和相同分配分区)的新用户将再次使用相同的消息,因为应用程序将从用户组偏移量读取起始点。

如果你ack或don't ack没有影响您的当前侦听器,它将简单地继续处理新消息(只要没有错误发生,我想重新平衡也可能是偏移重载的来源(不完全确定))

 类似资料:
  • 我正在使用Kafka2.0版和java消费者API来消费来自一个主题的消息。我们使用的是一个单节点Kafka服务器,每个分区有一个使用者。我注意到消费者正在丢失一些消息。场景是:消费者投票主题。我为每个线程创建了一个消费者。获取消息并将其交给处理程序来处理消息。然后使用“至少一次”的Kafka消费者语义来提交Kafka偏移量来提交偏移量。同时,我有另一个消费者使用不同的group-id运行。在这个

  • 我最近开始学习Kafka,最后就问了这些问题。 > 消费者和流的区别是什么?对我来说,如果任何工具/应用程序消费来自Kafka的消息,那么它就是Kafka世界中的消费者。 流与Kafka有何不同?为什么需要它,因为我们可以使用消费者API编写自己的消费者应用程序,并根据需要处理它们,或者将它们从消费者应用程序发送到Spark? 我做了谷歌对此,但没有得到任何好的答案。抱歉,如果这个问题太琐碎了。

  • 我们有一个问题,似乎Kafka消费者没有收到发布到某个主题的消息。(我说这是因为我还没有弄清楚这件事的真相,我可能错了。) 我使用Spring for Apache Kafka,而我的消费者实际上是一个用注释的方法。 这个问题是断断续续的,我很难重新创建它。 有没有一种方法让我看看Kafka经纪人的日志,或任何其他工具,以帮助我找出抵消为我的消费者?我想要具体的证据来证明我的消费者是否收到了信息。

  • Flink kafka消费者有两种类型的消费者,例如: 这两个消费者层次结构扩展了相同的类。我想知道维护编号类背后的设计决策是什么?我们什么时候应该使用其中一种? 我注意到带有数字后缀的类有更多的特性(例如ratelimiting)。 https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka

  • 试图理解消费者补偿和消费者群体补偿之间的关系。 下面的堆栈溢出链接提供了对消费群体补偿管理的极好理解<什么决定Kafka消费补偿?现在问题来了, 情节: 我们在一个消费者组组1中有消费者(c1)。 偏移值是否将存储在消费者(c1)和组(group1)两个级别?或者如果消费者属于任何消费者组,偏移量将存储在仅消费者组级别? 如果偏移值将存储在两个级别中,它是否是消费者级别偏移值将覆盖消费者组级别偏移