当前位置: 首页 > 知识库问答 >
问题:

无法在Spring-Boot应用程序中使用Spring-integration-kafka禁用对Kafka消息的手动提交

鄢开诚
2023-03-14

由于我是kafka的新手,所以我的小组正在使用Spring-Integration-Kafka:2.0.0.Release发布在Spring-boot应用程序上。基于这里的示例,我能够使用我的KafkaConsumer来消费kafka消息。

对于这个spring-boot应用程序,我有我的application.java

package hello;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.util.concurrent.TimeUnit;
import hello.notification.Listener;

@SpringBootApplication
public class Application {

    private Listener listener;

    public void run(String... args) throws Exception {
        this.listener.countDownLatch1.await(60, TimeUnit.SECONDS);
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}
package hello.notification;

import ....; //different imports

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(1);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        //propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 6000);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(propsMap);
    }

    @Bean
    public Listener listener() {
        return new Listener();
    }
}
package hello.notification;

import ...; // different imports

public class Listener {

    public final CountDownLatch countDownLatch1 = new CountDownLatch(1);

    @KafkaListener(topics = "topic1")
    public void listen(ConsumerRecord<?, ?> record, Acknowledgment ack) {
        System.out.println(record + " and " + ack);
                // Place holder for "ack.acknowledge();"
        countDownLatch1.countDown();
    } 
}

1)在ConsumerConfig设置中,我已经将“ConsumerConfig.enable_auto_commit_config”设置为false并注释掉了“ConsumerConfig.auto_commit_interval_ms_config”,为什么它仍然自动提交?(因为如果我重新启动spring-boot应用程序,我将不再看到该消息--我希望它尝试重新发送,直到它提交完毕)

2)为了手动提交,我想我只需要在我的监听器中添加ack.acknowledge()。Listen(请参阅占位符)?除了这个之外,我还需要其他什么东西来确保这是一个手动提交吗?

3)我想有一个最简单/最干净的kafkaconsumer,我想知道你是否认为我有的是最简单的方法,最初我在寻找单线程消费者,但没有很多例子,所以我坚持使用并发侦听器。

感谢您所有的帮助和投入!

共有1个答案

席宜修
2023-03-14

为什么它仍然自动提交?

因为默认情况下KafkaMessageListenerContainer随以下内容一起提供:

private AbstractMessageListenerContainer.AckMode ackMode = AckMode.BATCH;

为了确保这是一个手动提交,我还需要其他什么吗?

ContainerProperties.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL)

有关更多信息,请参阅参考手册

 类似资料:
  • 我正在尝试找出使用Spring-Kafka(1.1.0. RELEASE)在Kafka消费者中手动提交偏移的方法。我明白,最好将这些偏移提交给健壮的客户端实现,这样其他消费者就不会处理重复的事件,这些事件最初可能是由现已死亡的消费者处理的,或者因为重新平衡被触发了。 我知道有两种方法可以解决这个问题- > 将ACK_MODE设置为MANUAL_IMMEDIATE,并在侦听器实现中调用ack.ack

  • 我们使用的是spring集成kafka版本3.1.2。RELEASE and int kafka:消息驱动的通道适配器,用于使用来自远程kafka主题的消息。生产者发送加密消息,我们使用反序列化器解密实际消息。我们可以使用主题中发布的所有消息。我们将自动提交用作false。我们想知道在成功处理消息后如何从我们的服务提交或确认消息。有人能帮助我们如何提交从消息驱动通道读取的消息并提供一些参考实现吗?

  • 我有一个运行良好的Spring启动应用程序,直到我在我的应用程序中包含Kafka消费者和生产者。运行完全没有问题的代码是有一个restController,如下所示: 这个rest终点给出了期望的响应。现在,我包括Kafka制作人和消费者 在包含这些消费者和生产者之后,我的应用程序没有启动。我没有看到之前应用程序运行正常时显示的以下行。 2019-12-12 15:01:12.090信息38376

  • 我正在尝试用Kafka客户端设置一个Spring Boot应用程序以使用SSL。我有我的密钥库。jks和信任库。jk存储在文件系统(docker容器上)上,因为:https://github.com/spring-projects/spring-kafka/issues/710 以下是我的application.yml: 但是当我启动应用程序(在docker容器中)时,它说: 我检查了容器,.jk

  • 我正在使用ReplyingKafkaTemplate.sendAndReception()发送和接收由相关ID关联的消息。用例有许多主题正在进行中,我需要手动acknoledge(提交)消耗的消息偏移量。到目前为止还不错,这是使用: 但我不知道如何手动确认最后一条消息(sendAndReceive()使用的消息)。 有什么提示吗? 谢谢 费尔南多

  • 问题内容: 我有一个配置了Spring Security的Spring Boot Web应用程序。我想暂时禁用身份验证(直到需要)。 我将此添加到: 这是我的一部分 但是我仍然包括一个基本的安全性:启动时会生成一个默认的安全性密码,并且我仍会收到HTTP身份验证提示框。 我的pom.xml: 在WebSecurityConfig.java中配置了安全性(我已注释了注释以将其禁用): 问题答案: 使