当前位置: 首页 > 知识库问答 >
问题:

@KafkaHandler中的手动提交(spring kafka)

詹钊
2023-03-14

我正在使用方法级别的@KafkaHandler(类级别的@KafkaListener)消费Kafka事件。

我见过很多例子,其中有一个“acknowledge”参数可用,在这个参数上可以调用“acknowledge()”方法来提交事件的消耗,但是,当我将它作为参数包含到我的方法中时,我无法填充acknowledge对象。使用KafkaHandler时如何手动提交?有可能吗?

代码示例:

@Service
@KafkaListener(topics = "mytopic", groupId = "mygroup")
public class TestListener {

@KafkaHandler
public void consumeEvent(MyEvent event, Acknowledgement ack) throws Exception {
    //... processing
    ack.acknowledge(); // ack is not available
}

使用SpringBoot和Spring kafka。

共有1个答案

钱卓君
2023-03-14

必须使用AckMode配置侦听器容器。手动或确认模式。手动_IMMEDIATE获取此功能。

但是,通常最好让容器负责使用AckMode提交偏移量。RECORDAckMode。BATCH(默认)。

https://docs.spring.io/spring-kafka/docs/current/reference/html/#committing-抵消

编辑

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.ack-mode=MANUAL
@SpringBootApplication
public class So68844554Application {

    public static void main(String[] args) {
        SpringApplication.run(So68844554Application.class, args);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so68844554").partitions(1).replicas(1).build();
    }

}

@Component
@KafkaListener(id =  "so68844554", topics = "so68844554")
class Foo {

    @KafkaHandler
    void listen(String in, Acknowledgment ack) {
        System.out.println(in);
        ack.acknowledge();
    }

}
% kafka-consumer-groups --bootstrap-server localhost:9092 --describe -group so68844554

Consumer group 'so68844554' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
so68844554      so68844554      0          2               2               0               -               -               -
 类似资料:
  • 问题内容: 我正在使用Spring / Spring-data-JPA,发现自己需要在单元测试中手动强制提交。我的用例是我正在做一个多线程测试,其中我必须使用在生成线程之前保留的数据。 不幸的是,鉴于测试正在事务中运行,即使不能使它也不能被衍生线程访问。 我尝试使用实体管理器来执行操作,但是在执行此操作时收到错误消息: 有什么方法可以提交事务并继续进行吗?我一直找不到任何允许我调用的方法。 问题答

  • 日志中的错误 2020-05-28 15:52:53.597错误112469---[nio-8080-exec-1]O.a.C.C.C.[.[.[/].[dispatcherServlet]:servlet.Service()对于servlet[dispatcherServlet]在路径[]上下文中引发异常[请求处理失败;嵌套异常是java.lang.IllegalStateException:没

  • 问题内容: 不使用EJB时​​,将数据库事务与Seam一起使用的最佳实践是什么-即。将Seam部署为WAR时? 默认情况下,Seam JavaBeans支持事务。我可以使用@Transactional注释方法,以确保需要进行事务处理。或者我可以使用@Transactional(NEVER)或@Transactional(MANDATORY)。我不知道怎么做是创建自己的事务,设置超时,开始然后提交/

  • 我正在使用ReplyingKafkaTemplate.sendAndReception()发送和接收由相关ID关联的消息。用例有许多主题正在进行中,我需要手动acknoledge(提交)消耗的消息偏移量。到目前为止还不错,这是使用: 但我不知道如何手动确认最后一条消息(sendAndReceive()使用的消息)。 有什么提示吗? 谢谢 费尔南多

  • 我正在使用Spring/Spring数据JPA,发现自己需要在单元测试中手动强制提交。我的用例是,我正在做一个多线程测试,在这个测试中,我必须使用在生成线程之前持久化的数据。 不幸的是,鉴于测试在事务中运行,即使是也无法让生成的线程访问它。 我曾尝试使用实体管理器来执行以下操作,但执行此操作时会收到错误消息: 有什么方法可以提交事务并继续它吗?我找不到任何允许调用提交()的方法。