当前位置: 首页 > 知识库问答 >
问题:

读取主题后立即异步提交消息

阎德辉
2023-03-14

我正试着在从主题读到它之后提交一个信息。我通过这个链接(https://www.confluent.io/blog/apache-kafka-spring-boot-application)使用Spring创建一个Kafka消费者。通常情况下,它工作得很好,消费者得到消息,然后等待另一个人进入队列。但问题是,当我处理这些消息时,它会花费很多时间(大约10分钟),kafka队列认为消息没有被消耗(提交),消费者会一次又一次地读取它。我不得不说,当我的处理时间少于5分钟时,它工作得很好,但当它持续更长时间时,它就不能提交消息。

我已经四处寻找了一些答案,但这对我没有帮助,因为我使用的不是相同的源代码(当然还有不同的结构)。我尝试发送异步方法,也尝试异步提交消息,但失败了。其中一些来源是:

Spring Boot Kafka:提交无法完成,因为组已经重新平衡

https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/

https://dzone.com/articles/kafka-clients-at-most-once-at-limat-once-extimat-once-extimat-o

https://github.com/confluentinc/confluent-kafka-dotnet/issues/470

主要的类在这里:


@SpringBootApplication
@EnableAsync
public class SpringBootKafkaApp {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootKafkaApp .class, args);
    }


consumer类(我需要在其中提交消息)

@Service
public class Consumer {

@Autowired
    AppPropert prop;

   Consumer cons;
@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
    public void consume(String message) throws IOException {
        /*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */

        Properties  props=prope.startProp();//just getting my properties from my config-file
        ControllerPRO pro = new ControllerPRO();

        List<Future<String>> async= new ArrayList<Future<String>>();//call this method asynchronous, doesn't help me
        try {

            CompletableFuture<String> ret=pro.processLaunch(message,props);//here I call the process method 
            /*This works fine when the processLaunch method takes less than 5 minutes, 
            if it takes longer the consumer will get the same message from the topic and start again with this operation 
            */

        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("End of consumer method ");

    }

    }


=======================================================================================================

我遵循了@Victor gallet的建议,将confumer属性的声明包含在Consumer方法中,以适应“确认”对象。我还通过这个链接(https://www.programcreek.com/java-api-examples/?code=SpringOnePlatform2016/grussell-spring-kafka/grussell-spring-kafka-master/s1p-kafka/src/main/java.org/s1p/commonconfiguration.java)获得了我用来声明和设置所有属性(consumerProperties、consumerFactory和kafkaListenerContainerFactory)的所有方法。我发现的唯一问题是“new SeekToCurrentErrorHandler()”声明,因为我得到了一个错误,目前我无法解决它(如果有人给我解释一下就好了)。


@Service
public class Consumer {

@Autowired
    AppPropert prop;

   Consumer cons;


   @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();

        factory.setConsumerFactory(consumerFactory());
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        //factory.setErrorHandler(new SeekToCurrentErrorHandler());//getting error here despite I've loaded the library
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerProperties());
    }

     @Bean
    public Map<String, Object> consumerProperties() {
        Map<String, Object> props = new HashMap<>();
        Properties  propsManu=prop.startProperties();// here I'm getting my porperties file where I retrive the configuration from a remote server (you have to trust that this method works)
        //props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.configProperties.getBrokerAddress());
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsManu.getProperty("bootstrap-servers"));
        //props.put(ConsumerConfig.GROUP_ID_CONFIG, "s1pGroup");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, propsManu.getProperty("group-id"));
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
        //props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, propsManu.getProperty("key-deserializer"));
        //props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, propsManu.getProperty("value-deserializer")); 
        return props;
    }




    @KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
    public void consume(String message) throws IOException {
        /*HERE I MUST CONSUME THE MESSAGE AND COMMIT IT */
        acknowledgment.acknowledge();// commit immediately
        Properties  props=prop.startProp();//just getting my properties from my config-file
        ControllerPRO pro = new ControllerPRO();

        List<Future<String>> async= new ArrayList<Future<String>>();//call this method asynchronous, doesn't help me
        try {

            CompletableFuture<String> ret=pro.processLaunch(message,props);//here I call the process method 
            /*This works fine when the processLaunch method takes less than 5 minutes, 
            if it takes longer the consumer will get the same message from the topic and start again with this operation 
            */

        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println("End of consumer method ");

    }

    }

``````````````````````````````````````````````````````````

共有1个答案

董同
2023-03-14

必须将属性enable.auto.commit设置为false来修改使用者配置:

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

然后,您必须修改Spring Kafka Listener factory并将ack-mode设置为manual_immediate。下面是ConcurrentKafKalistenerContainerFactory的示例:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    factory.setErrorHandler(new SeekToCurrentErrorHandler());
    return factory;
}

正如文档中所解释的,manual_immediate意味着:当侦听器调用Accountledgment.Accountled()方法时,立即提交偏移量。

您可以在这里找到所有提交方法。

然后,在侦听器代码中,可以通过添加确认对象手动提交偏移量,例如:

@KafkaListener(topics = "${app.topic.pro}", groupId = "group_id")
public void consume(String message, Acknowledgment acknowledgment) {
   // commit immediately
    acknowledgment.acknowledge();
}
 类似资料:
  • 我有以下问题: 我正在使用Spring MVC 4.0.5和Hibernate 4.3.5,我正在尝试创建一个Restfull Web应用程序。问题是,我想排除某些字段,使其无法序列化为 JSON,具体取决于使用方面在控制器中调用的方法。 我现在的问题是Hibrate不会在事务从方法返回后立即提交事务,而只是在序列化之前提交。 Controller.java Service.java 方面的建议执

  • 我试图理解异步响应在Jersey上的工作方式。我阅读了新泽西文档(https://jersey.java.net/documentation/latest/async.html)的第10章,但它对我的问题没有帮助。这里关于stackoverflow的研究也没有得出令人满意的答案(这一点我可以理解)。 我试图做的与本文中的一个问题类似(使用http状态202进行异步操作)。我想使用HTML表单文档将

  • 因此,根据我对Apache Kafka中事务的理解,read_committed消费者不会返回作为正在进行的事务一部分的消息。因此,我猜想,消费者可以选择将其偏移量提交给那些正在进行的事务消息(例如,读取非事务消息),或者可以选择在提交/中止遇到的事务之前不进一步推进。我只是假设(Kafka)允许跳过那些挂起的交易记录,但考虑到它的抵消可能已经很远了,那么消费者在提交时将如何读取它们呢? 更新 考

  • 我的目标是使用Flink KafkaSource阅读来自Kafka主题的所有消息。我尝试用批处理和流模式执行。问题如下:当我将env.setParallelism设置为高于2时,我必须使用包含bug的接收器。于是,我设置了例如:< code > streamexecutionenvironment . setparallelism(1); 我想使用的Kafka主题包含3个分区。这是我的代码片段:

  • 正在寻找有关mongo db异步提交的适当文档。我们有一个spring boot应用程序,我们试图为我们的域对象生成审计,我们希望将javers生成的审计异步提交到mongo db中,而我们的主要基于SQL的事务是此mongodb调用的fr。任何关于这方面的建议都会非常有用。