当前位置: 首页 > 知识库问答 >
问题:

在 RabbitMQ 中手动确认消息

齐栋
2023-03-14

以前我读取队列中的所有消息,但现在我必须根据用户的选择返回特定数量的消息(计数)。

我试着相应地改变for循环,但是由于自动应答,它读取了所有的消息。所以我尝试在配置文件中将它改为手动。

在我的程序中,如何在读取 msg 后手动确认消息(目前我正在使用 AmqpTemplate 接收,我没有频道的参考)?

    Properties properties = admin.getQueueProperties("queue_name");
    if(null != properties)
    {
        Integer messageCount = Integer.parseInt(properties.get("QUEUE_MESSAGE_COUNT").toString());          
        while(messageCount > 0)
        {
            Message msg = amqpTemplate.receive(queue_name);
            String value = new String(msg.getBody());
            
            valueList.add(value);
            messageCount--;
        }
}

任何帮助都是非常值得赞赏的,提前表示感谢。

共有1个答案

芮星海
2023-03-14

您不能使用receive()方法手动确认-对于具有手动确认和ChannelAwareMessageListener的事件驱动消费者,请使用SimpleMessageListenerContainerexecute()方法,该方法允许您访问通道-但您将使用较低级别的RabbitMQ API,而不是 抽象。

编辑:

您需要学习使用execute的底层RabbitMQ Java API,但类似这样的东西会起作用。。。

    final int messageCount = 3;
    boolean result = template.execute(new ChannelCallback<Boolean>() {

        @Override
        public Boolean doInRabbit(final Channel channel) throws Exception {
            int n = messageCount;
            channel.basicQos(messageCount); // prefetch
            long deliveryTag = 0;
            while (n > 0) {
                GetResponse result = channel.basicGet("si.test.queue", false);
                if (result != null) {
                    System.out.println(new String(result.getBody()));
                    deliveryTag = result.getEnvelope().getDeliveryTag();
                    n--;
                }
                else {
                    Thread.sleep(1000);
                }
            }
            if (deliveryTag > 0) {
                channel.basicAck(deliveryTag, true);
            }
            return true;
        }
    });
 类似资料:
  • 问题内容: 我的Java应用程序将消息发送到RabbitMQ交换,然后交换将消息重定向到绑定队列。我将RabbitMQ与Springframework AMQP java插件一起使用。 问题:消息进入队列,但消息始终处于“未确认”状态,永远不会变为“就绪”状态。 可能是什么原因? 问题答案: 一条未确认的消息表示您的使用者已经读取了该消息,但是该使用者从未将ACK发送回RabbitMQ代理以表示它

  • 我正在用Java实现Akka-Alpakka,用于从ActiveMQ队列消费和生成队列。我可以成功地从队列中消费,但还不能实现应用程序级消息确认。 我的目标是使用队列中的消息,并将它们发送给另一个参与者进行处理。当该角色完成处理后,我希望它能够控制ActiveMQ中的消息确认。这大概是通过向另一个可以进行确认的参与者发送消息、调用消息本身的确认函数或其他方式来完成的。 在我的测试中,两条消息被放入

  • 发布确认原理 生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上的消息都会被指派一个唯一的 ID(从一开始),一旦消息被投递到所有匹配的队列后,broker 就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了 如果消息和队列是持久化的,那么确认消息会在将消息写入磁盘后发出,broker 回传给生产者的确认消息中 ,

  • 我有一个springboot应用程序,它侦听Kafka流并将记录发送到某个服务以进行进一步处理。服务有时可能会失败。注释中提到了异常情况。到目前为止,我自己模拟了服务成功和异常场景。 侦听器代码: 用户工厂配置如下: 由于REST服务正在抛出RestClientException,它应该进入上面提到的if块。关于FixedBackOff,我不希望SeekToCurrentErrorHandler执

  • 我正在使用ReplyingKafkaTemplate.sendAndReception()发送和接收由相关ID关联的消息。用例有许多主题正在进行中,我需要手动acknoledge(提交)消耗的消息偏移量。到目前为止还不错,这是使用: 但我不知道如何手动确认最后一条消息(sendAndReceive()使用的消息)。 有什么提示吗? 谢谢 费尔南多

  • 主要内容:8. 发布高级确认,8.1 发布确认SpringBoot版本,8.2 回退消息,8.3 备份交换机8. 发布高级确认 在生产环境中由于一些不明原因,导致RabbitMQ重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复 于是,我们开始思考,如何才能进行RabbitMQ的消息可靠投递呢?特别是在这样比较极端的情况下,RabbitMQ集群不可用的时候,无法投递的消息该如何处理呢? 8.1 发布确认SpringBoot版本 8.1.1 发布确认方案 当交换机