我正在使用方法级别的@KafkaHandler(类级别的@KafkaListener)消费Kafka事件。
我见过很多例子,其中有一个“acknowledge”参数可用,在这个参数上可以调用“acknowledge()”方法来提交事件的消耗,但是,当我将它作为参数包含到我的方法中时,我无法填充acknowledge对象。使用KafkaHandler时如何手动提交?有可能吗?
代码示例:
@Service
@KafkaListener(topics = "mytopic", groupId = "mygroup")
public class TestListener {
@KafkaHandler
public void consumeEvent(MyEvent event, Acknowledgement ack) throws Exception {
//... processing
ack.acknowledge(); // ack is not available
}
使用SpringBoot和Spring kafka。
必须使用AckMode配置侦听器容器。手动
或确认模式。手动_IMMEDIATE获取此功能。
但是,通常最好让容器负责使用AckMode提交偏移量。RECORD
或AckMode。BATCH
(默认)。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#committing-抵消
编辑
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.ack-mode=MANUAL
@SpringBootApplication
public class So68844554Application {
public static void main(String[] args) {
SpringApplication.run(So68844554Application.class, args);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so68844554").partitions(1).replicas(1).build();
}
}
@Component
@KafkaListener(id = "so68844554", topics = "so68844554")
class Foo {
@KafkaHandler
void listen(String in, Acknowledgment ack) {
System.out.println(in);
ack.acknowledge();
}
}
% kafka-consumer-groups --bootstrap-server localhost:9092 --describe -group so68844554
Consumer group 'so68844554' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
so68844554 so68844554 0 2 2 0 - - -
问题内容: 我正在使用Spring / Spring-data-JPA,发现自己需要在单元测试中手动强制提交。我的用例是我正在做一个多线程测试,其中我必须使用在生成线程之前保留的数据。 不幸的是,鉴于测试正在事务中运行,即使不能使它也不能被衍生线程访问。 我尝试使用实体管理器来执行操作,但是在执行此操作时收到错误消息: 有什么方法可以提交事务并继续进行吗?我一直找不到任何允许我调用的方法。 问题答
日志中的错误 2020-05-28 15:52:53.597错误112469---[nio-8080-exec-1]O.a.C.C.C.[.[.[/].[dispatcherServlet]:servlet.Service()对于servlet[dispatcherServlet]在路径[]上下文中引发异常[请求处理失败;嵌套异常是java.lang.IllegalStateException:没
问题内容: 不使用EJB时,将数据库事务与Seam一起使用的最佳实践是什么-即。将Seam部署为WAR时? 默认情况下,Seam JavaBeans支持事务。我可以使用@Transactional注释方法,以确保需要进行事务处理。或者我可以使用@Transactional(NEVER)或@Transactional(MANDATORY)。我不知道怎么做是创建自己的事务,设置超时,开始然后提交/
我正在使用ReplyingKafkaTemplate.sendAndReception()发送和接收由相关ID关联的消息。用例有许多主题正在进行中,我需要手动acknoledge(提交)消耗的消息偏移量。到目前为止还不错,这是使用: 但我不知道如何手动确认最后一条消息(sendAndReceive()使用的消息)。 有什么提示吗? 谢谢 费尔南多
我正在使用Spring/Spring数据JPA,发现自己需要在单元测试中手动强制提交。我的用例是,我正在做一个多线程测试,在这个测试中,我必须使用在生成线程之前持久化的数据。 不幸的是,鉴于测试在事务中运行,即使是也无法让生成的线程访问它。 我曾尝试使用实体管理器来执行以下操作,但执行此操作时会收到错误消息: 有什么方法可以提交事务并继续它吗?我找不到任何允许调用提交()的方法。