当前位置: 首页 > 知识库问答 >
问题:

使用ReplyingKafkaTemboard手动确认(提交)kafka消息

周祺
2023-03-14

我正在使用ReplyingKafkaTemplate.sendAndReception()发送和接收由相关ID关联的消息。用例有许多主题正在进行中,我需要手动acknoledge(提交)消耗的消息偏移量。到目前为止还不错,这是使用:

    @KafkaListener(topics = "${kafka.topic.request-topic}")
    @SendTo("int1")
    public Tx30 listen(@Payload Tx30 request, Acknowledgment ack) throws InterruptedException {
        ...
        ack.acknowledge();

但我不知道如何手动确认最后一条消息(sendAndReceive()使用的消息)。

有什么提示吗?

谢谢

费尔南多

共有1个答案

张照
2023-03-14

目前不支持手动提交回复消息的偏移量。

如果您可以解释为什么需要一个有效的用例,请随时在GitHub上打开一个新功能请求问题。

 类似资料:
  • 我正在用Java实现Akka-Alpakka,用于从ActiveMQ队列消费和生成队列。我可以成功地从队列中消费,但还不能实现应用程序级消息确认。 我的目标是使用队列中的消息,并将它们发送给另一个参与者进行处理。当该角色完成处理后,我希望它能够控制ActiveMQ中的消息确认。这大概是通过向另一个可以进行确认的参与者发送消息、调用消息本身的确认函数或其他方式来完成的。 在我的测试中,两条消息被放入

  • 我正在与JTA、两阶段提交、JMS和JDBC事务作斗争。这个想法(简而言之)是 在队列中接收消息 所以我得到了,创建,从会话创建接收器并设置消息侦听器。 在侦听器内部,在方法中,我开始我的用户事务,执行jdbc内容并提交事务或在出现问题时进行回滚。现在我期望(又名“希望”)当用户事务提交时,消息会得到确认。 但这并没有发生,消息仍然在队列中,并且一次又一次地被重新传递。 我错过了什么?我仔细检查了

  • 我第一次使用Spring Kafka,我无法在我的消费者代码中使用Acknowledgement.acknowledge()方法进行手动提交。请让我知道我的消费者配置或侦听器代码中是否缺少任何内容。或者有其他方法可以根据条件处理确认偏移。在这里,我正在寻找解决方案,例如如果偏移没有手动提交/确认,它应该由消费者选择相同的消息/偏移量。 配置 听众

  • 我正在尝试找出使用Spring-Kafka(1.1.0. RELEASE)在Kafka消费者中手动提交偏移的方法。我明白,最好将这些偏移提交给健壮的客户端实现,这样其他消费者就不会处理重复的事件,这些事件最初可能是由现已死亡的消费者处理的,或者因为重新平衡被触发了。 我知道有两种方法可以解决这个问题- > 将ACK_MODE设置为MANUAL_IMMEDIATE,并在侦听器实现中调用ack.ack

  • 以前我读取队列中的所有消息,但现在我必须根据用户的选择返回特定数量的消息(计数)。 我试着相应地改变for循环,但是由于自动应答,它读取了所有的消息。所以我尝试在配置文件中将它改为手动。 在我的程序中,如何在读取 msg 后手动确认消息(目前我正在使用 AmqpTemplate 接收,我没有频道的参考)? 任何帮助都是非常值得赞赏的,提前表示感谢。

  • 我能够使用ApacheKafka提交偏移量类,并能够使用ConsumerConnector进行提交。我查看了apache camel kafka组件,该组件的使用者选项与“auto.commit.enable”属性相同。现在,Camel Java DSL中是否有任何属性或方法,在使用消息后,我们可以手动提交偏移量(通过URL中提供的方法或消费者选项),或者我们必须再次使用Kafka消费者API提交