当前位置: 首页 > 知识库问答 >
问题:

RabbitMQ,是否可以在保持FIFO的同时从一个通道消耗1条消息?

楚冷勋
2023-03-14

我正在使用带有C#的RabbitMQ作为消息代理框架。在我的场景中,我有许多使用者可以从不同的队列中使用。从使用者的角度来看,几个队列形成一个逻辑组。

例如:队列1、队列2、队列3。

消费者1想要从[队列1,队列3]获取第一条消息,并且只有在不可用时才尝试从队列2获取

Consumer2将希望从[Queue2,Queue3]获取第一条消息,只有在不可用时,才尝试从Queue1获取

下面是一个应该返回消息的类:

public class MessagesProvider
{

    private IConnection _connection;
    private IModel _channel;
    private readonly IConnectionFactory _connectionFactory;
    private ConcurrentDictionary<string, string> _tagsQueueItems;
    private EventingBasicConsumer _consumer;
    private CancellationTokenSource _cts;
    private TaskCompletionSource<BasicGetResult> _tcs;

    public MessagesProvider()
    {
        _connectionFactory = new ConnectionFactory()
        {
            HostName = "localhost"
        };

    }

    public Task<BasicGetResult> GetMessage(int timeout, IEnumerable<string> queues)
    {
        _tagsQueueItems = new ConcurrentDictionary<string, string>();
        _cts = new CancellationTokenSource();
        _connection = _connectionFactory.CreateConnection();
        _channel = _connection.CreateModel();
        _channel.SingleMessagePerChannel();
        _consumer = new EventingBasicConsumer(_channel);
        _consumer.Received += OnReceive;

        foreach (var queue in queues)
        {
            var tag = _channel.BasicConsume(queue, false, _consumer);
            _tagsQueueItems.AddOrUpdate(queue, tag, (k, o) => queue);
        }

        return _tcs.Task;
    }

    private void SetResult(BasicGetResult result)
    {
        _tcs.SetResult(result);
        if (_channel.IsOpen)
            _channel.Close();
        if (_connection.IsOpen)
            _connection.Close();
    }

    private void OnReceive(object sender, BasicDeliverEventArgs e)
    {
        _consumer.Received -= OnReceive;
        _channel.BasicAck(e.DeliveryTag, false);
        var result = new BasicGetResult(e.DeliveryTag, e.Redelivered, e.Exchange, e.RoutingKey, 1,
            e.BasicProperties, e.Body);

        SetResult(result);

    }
}

通道定义:

 public static class  ChannelExtension
{
    public static void SingleMessagePerChannel(this IModel channel)
    {
        channel.BasicQos(0,1,true);
    }

    public static void SingleMessagePerConsumer(this IModel channel)
    {
        channel.BasicQos(0,1,false);
    }
}

问题是我一个接一个地注册一个队列(在“GetMessage”方法中),我找不到一个“原子”操作来从一组队列中取出下一个FIFO消息。

我正在寻找这样做的方法:_consumer.getNextMessage();

另一种方法是使用来自第一组优先级的所有最前面的消息,在消费者端过滤它们以找到最旧的消息,并且不为其他消息发送ack(在同一优先级组中)。这种方法也有问题,因为它意味着在获取消息时,其他消费者将无法处理它们(直到没有ack)。

有什么建议吗?谢了。

共有1个答案

苍烨然
2023-03-14

我认为你并没有按照设计的方式使用队列和消费者。将队列视为要完成的工作的桶。每个工作桶可以有N个消费者来做这项工作。

每个消费者都应该设计为使用单个队列,而不知道或关心其他队列或其他消费者。

如果您试图实现“高优先级”队列,那么我建议您使用具有2个不同队列的单个交换:

    < li >常规优先级队列 < li >高优先级队列

然后,默认情况下,消息被放入常规优先级队列。当需要高优先级的消息出现时,您可以设置消息的标头,以便将其推送到高优先级队列中。

我们今天使用同样的方法,使用直接队列,并在我们希望消息进入高优先级队列时添加一个头。

 类似资料:
  • 问题内容: 我的Java应用程序将消息发送到RabbitMQ交换,然后交换将消息重定向到绑定队列。我将RabbitMQ与Springframework AMQP java插件一起使用。 问题:消息进入队列,但消息始终处于“未确认”状态,永远不会变为“就绪”状态。 可能是什么原因? 问题答案: 一条未确认的消息表示您的使用者已经读取了该消息,但是该使用者从未将ACK发送回RabbitMQ代理以表示它

  • 是否可以在数据消耗后从Kafka中删除数据? 我正在使用Kafka和Zookeeper组合在2个节点之间交换一些数据。我在Kafka的内存方面遇到了问题,因为我要插入这么多数据。这样,我需要在使用Kafka后清理它们中的数据。这可能吗?

  • 我有一个服务员线程想使用RabbitMQ direct exchange向Java中的客户线程发送一道寿司,但是我的客户没有收到这道菜。下面是我的服务员用来发布寿司菜肴对象的方法: 请注意,<code>dishKey</code>作为参数传递,并在之前的if-else语句中被确定为<code>的“tamagoDishKey”</code>或<code>“ebiDishKey”的 以下是我的客户用来

  • 这种需求类似于通过公开的REST服务API(Spring Boot)处理来自死信队列的消息。以便一旦调用REST服务,就会从DL队列中消耗一条消息,并将再次发布到主队列中进行处理。@RabbitListener(queues=“queue_name”)立即使用消息,这在场景中是不需要的。该消息只需由REST服务API使用。有什么建议或解决办法吗?

  • 我运行生产者,它生成N条消息,我在仪表板上看到它们。当我运行接收器时,它会接收来自队列的所有消息,并且队列为空。 我需要有多个生产者生成消息到同一个队列。多个客户从队列中接收消息。消息将被队列TTL删除。但是现在第一个接收者从队列中获取所有消息。我怎么能做到这一点?

  • 问题内容: 我尝试将机器人抓取器的位置映射到抓取器所握持的物体所施加的阻力。我预订了一个主题的抓取器位置,又订阅了另一个主题的阻力值,因为我想确保抓取器位置与该位置的确切阻力值相对应。鉴于两者都是浮动消息,如何同步它们? 问题答案: 您可以在中使用。 这是一个订阅多个主题以同时获取数据的示例: 如果您的问题没有解决,则不是: 阅读更多