当前位置: 首页 > 知识库问答 >
问题:

RabbitMQ发布者通过主题交换确认-BasicAcks只在第一次激发一次

张啸
2023-03-14

使用主题交换,我希望有一个具有以下特性的发布/订阅消息传递模式:

  1. 是否实现了“发布者确认”。
  2. 让使用者在处理完每条消息后也确认该消息。
  3. 使用路由密钥将邮件路由到一个或多个使用者。
  4. 具有持久的使用者队列,因此如果使用者应用程序暂时关闭,它可以在重新启动时从其队列中拾取消息。

所以我创建了2个控制台应用程序(发送和接收)来测试上面的内容。

    static void Main(string[] args)
    {

        Console.WriteLine(" Type [exit] to exit.");

        Publisher publisher = new Publisher();

        do
        {
            var userInput = Console.ReadLine();
            if (userInput == "exit")
            {
                break;
            }


            publisher.SendMessageToBroker("localhost", "main", "user.update", userInput);

        } while (true);
    }
public class Publisher
{
    const string ExchangeType = "topic";

    Dictionary<ulong, string> unConfirmedMessageTags = new Dictionary<ulong, string>();

    public void SendMessageToBroker(string host, string exchangeName, string routingKey, string message)
    {

        var factory = new ConnectionFactory() { HostName = host };
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {                
            channel.BasicAcks += (sender, ea) => OnBasicAcks(ea.Multiple, ea.DeliveryTag);
            channel.BasicNacks += (sender, ea) => OnBasicNacks(ea.Multiple, ea.DeliveryTag);

            channel.ConfirmSelect();

            channel.ExchangeDeclare(exchangeName, ExchangeType);

            var body = Encoding.UTF8.GetBytes(message);

            var properties = channel.CreateBasicProperties();
            properties.Persistent = true;

            unConfirmedMessageTags.TryAdd(channel.NextPublishSeqNo, message);

            channel.BasicPublish(exchange: exchangeName,
                routingKey: routingKey,
                basicProperties: properties,
                body: body);

            Console.WriteLine(" [x] Sent {0}", message);
        }
    }

    private void OnBasicNacks(bool multiple, ulong deliveryTag)
    {
        if (multiple)
        {
            Console.WriteLine("Messages with delivery tag LESS THAN {0} have been LOST and must be resent.", deliveryTag);
        }
        else
        {
            Console.WriteLine("Message with delivery tag {0} has been LOST and must be resent.", deliveryTag);
        }
    }

    private void OnBasicAcks(bool multiple, ulong deliveryTag)
    {
        if (multiple)
        {
            var confirmed = unConfirmedMessageTags.Where(k => k.Key <= deliveryTag);
            foreach (var entry in confirmed)
            {
                unConfirmedMessageTags.Remove(entry.Key);
                Console.WriteLine("Message with delivery tag {0} has been confirmed and deleted.", entry.Key);
            }

        }
        else
        {
            unConfirmedMessageTags.Remove(deliveryTag);
            Console.WriteLine("Message with delivery tag {0} has been confirmed and deleted.", deliveryTag);
        }

    }
}

}

接收

    static void Main(string[] args)
    {
        const string ExchangeName = "main";
        const string QueueName = "q1";
        const string ExchangeType = "topic";
        const string RoutingKey = "user.update";

        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(ExchangeName, ExchangeType);

            channel.QueueDeclare(queue: QueueName, 
                durable: true, 
                autoDelete: false, 
                exclusive: false, 
                arguments: null);

            channel.QueueBind(QueueName, ExchangeName, RoutingKey);

            //channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

            var consumer = new EventingBasicConsumer(channel);

            consumer.Received += (model, ea) => Basic_Ack(channel, ea.DeliveryTag, ea.Body);

            channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }            
    }

    private static void Basic_Ack(IModel channel, ulong deliveryTag, ReadOnlyMemory<byte> body)
    {            
        var message = Encoding.UTF8.GetString(body.ToArray());
        Console.WriteLine(" [x] Received {0}", message);

        Thread.Sleep(2000);            

        channel.BasicAck(deliveryTag: deliveryTag, multiple: false);

        Console.WriteLine(" [x] Processed {0}", message);
    }
}

问题是我的Send程序中的OnBasicAcks只会为第一条消息调用一次。

共有1个答案

督坚白
2023-03-14

对于其他可能遇到此问题的人,我正在为不鼓励的每一次发布打开一个连接和通道(虚拟连接):

“连接是长期存在的。为每次操作(例如发布消息)打开连接将非常低效,因此非常不鼓励使用。”

也看这里:

“publisher确认是通过ConfirmSelect方法在通道级别启用的...必须在希望使用publisher确认的每个通道上调用此方法。确认应该只启用一次,而不是对发布的每条消息启用一次。”

切换到使用长期连接解决了我的问题。

 类似资料:
  • 我有一个场景,我需要执行一系列流程,每个步骤都在独立的应用程序中完成和扩展。我正在为所有交换使用主题交换。当前拓扑如下所示: P- 我们正在“版本化”队列,以处理可能影响消息结构的需求更改。绑定可能如下所示: 步骤1。exchange绑定到步骤1。v1。使用绑定键step1排队。v1 步骤1。exchange绑定到步骤1。v2。使用绑定键step1排队。v2级 还有其他与版本无关的绑定模式也使局部

  • 有没有办法更改发布者-确认每条消息?我们有一个接收消息并发布到RabbitMQ的Rest层。根据特定的消息属性,我们决定是否需要发布者确认。 有没有一种方法可以在发送消息时覆盖,发布者-确认?

  • TLDR;在主题交换和由使用者动态创建的队列的上下文中,当没有使用者使用消息时,如何重新传递消息/通知生产者? 我有以下组件: null 生成器使用向exchange发送消息。 每个使用者创建一个队列,并为一组路由密钥(例如,Pictures.*和Videos.Trending)将交换绑定到这个队列。 当使用者处理完文件后,它会将结果推送到processing_results队列中。 现在--这工

  • 我计划在我的项目中使用Spring Cloud Stream。是否可以使用Publisher Confirms(又名Publisher Acknowledges),即注册确认回调,如中所述http://docs.spring.io/spring-amqp/reference/html/_reference.html#cf-发布配置ret?或者是否有其他可能从RabbitMQ接收ack,表明它已从客

  • 我成功地建立了一个话题交换,并且能够同时向几个消费者传递消息。 我还想向竞争对手传递信息,并继续使用主题交换。我了解到,使用相同的队列名称可以让消费者竞争消息。然而,我可能弄错了,因为我无法使它工作。 为同一主题的多个侦听器设置: < li >申报话题交流 < li >对于每个侦听器,用自动生成的名称声明一个新队列 < li >用给定的主题路由关键字将此队列绑定到上面的交换 如何将相互竞争的消费者

  • 问题内容: 我有下面的代码使用来将消息发布到队列中。该是越来越创建,但该消息无法被看到的队列。我也没有看到任何错误。 BasicApplication.java Producer.java 问题答案: 您使用了错误的方法;该方法的第一个参数是。 使用。