当前位置: 首页 > 知识库问答 >
问题:

RabbitMQ示例:多线程、通道和队列

仲孙德惠
2023-03-14

我刚刚阅读了RabbitMQ的Java API文档,发现它非常丰富而且简单。关于如何为发布/消费设置一个简单的通道的示例非常容易遵循和理解。但这是一个非常简单/基本的示例,它给我留下了一个重要的问题:如何设置1+channels来向多个队列发布/消费?

假设我有一个RabbitMQ服务器,上面有3个队列:loggingsecurity_eventscustomer_orders。因此,我们需要一个通道来发布/使用所有3个队列,或者更有可能有3个单独的通道,每个通道专用于一个队列。

在此基础上,RabbitMQ的最佳实践要求我们为每个消费者线程设置1channel。对于本例,假设security_events只需要1个使用者线程就可以了,但是loggingcustomer_order都需要5个线程来处理卷。那么,如果我没有理解错的话,这是否意味着我们需要:

  • 1个通道和1个用于向security_events发布/使用的使用者线程;和
  • 5个通道和5个使用者线程,用于向日志记录发布/消费;和
  • 5个通道和5个使用者线程,用于向customer_orders发布/消费?

如果我的理解在这里被误导了,请从纠正我开始。不管是哪种方式,一些厌倦了战斗的RabbitMQ老手是否可以帮助我用一个像样的代码示例来“连接点”,以设置满足我这里的需求的发布者/消费者?提前道谢!

共有1个答案

尉迟俊能
2023-03-14

我想你有几个问题是初步了解的。坦率地说,我看到以下内容有点吃惊:两者都需要5个线程来处理卷。你怎么知道你需要那个确切的数字?你有什么保证5个线程就足够了吗?

RabbitMQ经过调优和时间测试,因此它完全是关于正确的设计和高效的消息处理。

让我们试着回顾一下这个问题,找出一个恰当的解决方法。顺便说一句,消息队列本身并不能保证您有真正好的解决方案。你必须了解你在做什么,还要做一些额外的测试。

因此,消息处理逻辑很可能是确保您有足够吞吐量的合适位置。当然,每次需要处理消息时,您都可以跨越一个新线程,但这种方法最终会扼杀您的系统。基本上,线程越多,延迟就越大(如果需要,可以检查Amdahl定律)。

(见图解的Amdahl定律)

技巧#1:小心使用线程,使用线程池(详细信息)

public class Main {
  private static final int NTHREDS = 10;

  public static void main(String[] args) {
    ExecutorService executor = Executors.newFixedThreadPool(NTHREDS);
    for (int i = 0; i < 500; i++) {
      Runnable worker = new MyRunnable(10000000L + i);
      executor.execute(worker);
    }
    // This will make the executor accept no new threads
    // and finish all existing threads in the queue
    executor.shutdown();
    // Wait until all threads are finish
    executor.awaitTermination();
    System.out.println("Finished all threads");
  }
} 

我会说这是明显的优化技术。您可能会发送小而易于处理的消息。整个方法是要连续设置和处理较小的消息。大的信息最终会开一个不好的玩笑,所以最好避免这种情况。

所以发送微小的信息更好,但是处理呢?每次提交作业都有一个开销。在传入消息率较高的情况下,批处理会非常有帮助。

例如,假设我们有简单的消息处理逻辑,我们不希望每次处理消息时都有特定于线程的开销。为了优化非常简单的compositerunnable:

class CompositeRunnable implements Runnable {

    protected Queue<Runnable> queue = new LinkedList<>();

    public void add(Runnable a) {
        queue.add(a);
    }

    @Override
    public void run() {
        for(Runnable r: queue) {
            r.run();
        }
    }
}
class CompositeMessageWorker<T> implements Runnable {

    protected Queue<T> queue = new LinkedList<>();

    public void add(T message) {
        queue.add(message);
    }

    @Override
    public void run() {
        for(T message: queue) {
            // process a message
        }
    }
}

尽管您知道可以并行处理消息(tip#1)并减少处理开销(tip#2),但您必须快速完成所有操作。冗余的处理步骤、繁重的循环等可能会对性能产生很大影响。请参阅有趣的案例研究:

通过选择正确的XML解析器将消息队列吞吐量提高10倍

技巧4:连接和渠道管理

    null

请注意,所有的提示都是完美的配合。如果你需要更多的细节,请随时告诉我。

完整的消费者示例(源代码

请注意以下事项:

static class Worker extends DefaultConsumer {

    String name;
    Channel channel;
    String queue;
    int processed;
    ExecutorService executorService;

    public Worker(int prefetch, ExecutorService threadExecutor,
                  , Channel c, String q) throws Exception {
        super(c);
        channel = c;
        queue = q;
        channel.basicQos(prefetch);
        channel.basicConsume(queue, false, this);
        executorService = threadExecutor;
    }

    @Override
    public void handleDelivery(String consumerTag,
                               Envelope envelope,
                               AMQP.BasicProperties properties,
                               byte[] body) throws IOException {
        Runnable task = new VariableLengthTask(this,
                                               envelope.getDeliveryTag(),
                                               channel);
        executorService.submit(task);
    }
}
  • 使用队列设计解决方案架构?
  • 一些排队论:吞吐量、延迟和带宽
  • 快速消息队列基准:ActiveMQ、RabbitMQ、HornetQ、QPID、Apollo…
 类似资料:
  • 我在我产品环境中发现了一个问题。 我们在一个mq集群中有6个队列,我们有200个线程的线程池(实际上会更多,因为它会在一个独立的线程池中安排一些特殊任务)来处理来自上游的请求,当处理请求时,我会发布一个消息给rabbitmq Broker。 所以我有200个线程将消息发布到这6个队列。 对于每个队列,我将创建一个AMQP连接,对于每个线程,我有一个Channel的threadlocal,这样每个线

  • 我试图理解多个线程是否可以并发/并行来自Spring集成队列通道的消息。假设我有以下配置: 如果无法实现队列通道中消息的并发处理,建议采用什么方法

  • 我的应用程序有多个线程将消息发布到单个RabbitMQ集群。 阅读rabbit文档:我阅读了以下内容: 对于使用多个线程/进程进行处理的应用程序,每一个线程/进程打开一个新通道,并且不在它们之间共享通道是非常常见的。 而且我明白,与其开通多个连接(昂贵) 不如开通多个通道。 但是为什么不对所有线程使用单个通道呢? 在单个通道上使用多个通道有什么好处?

  • 我有一个从Rabbit接收消息的应用程序。当收到一条消息时,它会对它进行处理,然后在完成时执行ACK。应用程序可以在一个固定的线程池中同时处理2个项目,有2个线程。Rabbit的QOS预取设置为2,因为我不想在一个时间框架内给应用提供超过它所能处理的内容。 现在,我的消费者的handleDelivery执行以下操作: 此时,您已经发现TestWrapperThread将调用作为最后一个操作。 根据

  • 每个通道都有自己的分派线程。对于每个渠道一个消费者的最常见用例,这意味着消费者不会拖住其他消费者。如果每个通道有多个使用者,请注意长时间运行的使用者可能会阻碍回调到该通道上其他使用者的调度。 我有各种命令(消息)通过单个入站队列和通道进入,该队列和通道附加了DefaultConsumer。假设DefaultConsumer中有一个threadpool允许我直接从consumer回调方法运行应用程序

  • 到目前为止,对于RabbitMQ中的一个队列,我使用了一个通道,但现在我动态创建了多个队列,所以我必须为每个队列创建一个新通道,还是一个通道可以从不同的队列接收/发送消息?