当前位置: 首页 > 知识库问答 >
问题:

从java客户端中的多个RabbitMQ交换中读取无轮询

钱欣然
2023-03-14

请解释如何配置Java客户端,使其在不进行轮询的情况下从两个不同的RabbitMQ交换中读取。我希望客户端在消息到达时唤醒,然后再次阻塞。

在我的小系统集成问题中,一个RabbitMQ交换携带使用多种路由密钥的工作消息(我知道如何使用通配符捕捉它们),另一个交换携带控制消息(例如,“stop”)。所以我的当事人要收听两地的留言。这是一个相对低容量的系统问题,我不是问负载分担或公平性等等。

当然,我可以运行一个线程来轮询每个交换、Hibernate、调度,直到永远。但我想避免投票。

不知怎的,我想起了Unix select()系统调用,当任何文件描述符上的数据准备就绪时,它就会被唤醒。RabbitMQ也有类似的东西吗?

我当前的解决方案是一个适配器,它在每个输入交换上旋转一个线程来阻塞;接收到后,每个线程将写入java.util.concurrent集合;我还使用另一个线程来阻止该集合,并在消息到达最终使用者时将其传递出去。它工作很好,但如果我能删掉这种复杂性,那就太好了。

这些帖子围绕着这个问题跳来跳去,如果我在这些帖子中忽略了这个问题,请随时给我打听一下解决方案:

For Java:RabbitMQ示例:多线程、通道和队列

对于C#:从多个队列读取,RabbitMQ

提前道谢。

共有1个答案

田玉韵
2023-03-14

谢谢robthewolf的评论。是的,我读过教程,我知道我需要一个线程每个消费者。

结果表明,用一个线程读取多个交换非常简单,根本不需要轮询:获取一个新队列,并将其绑定到所有相关的交换。适用于话题和扇出。用SSCE对此进行了测试,见下文。

我对RabbitMQ javadoc中缺少细节感到遗憾,在channel#queueBind(String,String,String)方法中使用几个选择词会有很大帮助。

HTH

package rabbitExample;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
 * Demonstrates reading messages from two exchanges via a single queue monitored
 * by a single thread.
 * 
 */
public class MultiExchangeReadTest implements Runnable {

private final String exch1 = "my.topic.exchange";
private final String exch2 = "my.fanout.exchange";
private final Channel channel;
private final QueueingConsumer consumer;

public MultiExchangeReadTest(final String mqHost) throws Exception {

    // Connect to server
    System.out.println("Connecting to host " + mqHost);
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost(mqHost);
    Connection connection = factory.newConnection();
    channel = connection.createChannel();

    // Declare exchanges; use defaults for durable etc.
    channel.exchangeDeclare(exch1, "topic");
    channel.exchangeDeclare(exch2, "fanout");

    // Get a new, unique queue name
    final String queue = channel.queueDeclare().getQueue();

    // Bind the queue to the exchanges; topic gets non-empty routing key
    channel.queueBind(queue, exch1, "my.key");
    channel.queueBind(queue, exch2, "");

    // Configure the channel to fetch one message at a time, auto-ACK
    channel.basicQos(1);
    consumer = new QueueingConsumer(channel);
    channel.basicConsume(queue, true, consumer);
}

public void run() {
    // Reads messages until interrupted
    try {
        while (true) {
            // Wait for a message
            System.out.println("Awaiting message");
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            // Show contents using default encoding scheme
            String body = new String(delivery.getBody());
            System.out.println("Message from exch "
                    + delivery.getEnvelope().getExchange() + ", key '"
                    + delivery.getEnvelope().getRoutingKey() + "':\n"
                    + body);
        } // while
    } catch (Exception ex) {
        ex.printStackTrace();
    }
}

public static void main(String[] args) throws Exception {
    if (args.length != 1) {
        System.err
                .println("Usaage: MultiExchangeReadTest.main mq-host-name");
    } else {
        MultiExchangeReadTest multiReader = new MultiExchangeReadTest(
                args[0]);
        multiReader.run();
    }
}
}
 类似资料:
  • 每个通道都有自己的分派线程。对于每个渠道一个消费者的最常见用例,这意味着消费者不会拖住其他消费者。如果每个通道有多个使用者,请注意长时间运行的使用者可能会阻碍回调到该通道上其他使用者的调度。 我有各种命令(消息)通过单个入站队列和通道进入,该队列和通道附加了DefaultConsumer。假设DefaultConsumer中有一个threadpool允许我直接从consumer回调方法运行应用程序

  • 反应阿波罗客户端应用程序。我试图使用readQuery从缓存中读取数据,但无法读取缓存字段。 这里有一个纲要: 组件调用,它执行我的查询并缓存结果。查询返回类型。一切都很好,开发工具显示包含一个名为getPost的字段和该帖子。 有一个子组件,它会在单击时删除评论。它调用并删除评论。查询返回类型。 Post有一个注释数组,现在我需要在缓存中更新它的数组并删除已删除的注释。我使用突变中的函数来获取缓

  • 我在我的项目中使用RabbitMQ,我想知道是应该使用具有多个路由密钥的单个交换还是使用多个交换?哪个效率更高? 例如,如果我使用带有路由键A、B、C的单个exchange E,消费者连接到该exchange E,并使用A、B、C路由键获取数据。另一种选择是,我应该将其发送到exchange A、exchange B、exchange C,而不使用路由密钥,并且消费者可以连接到每个exchange

  • 问题内容: 我在JBOSS中有一个客户端服务器通信方案,并且在浏览器中作为客户端( JAVA PROGRAM )。最初在建立连接时,客户端将其证书发送到服务器。服务器从证书中提取客户端的公钥,因此通信将继续。 现在我的问题是 如何将证书(.cer)从客户端发送到服务器? 如何在服务器中接收证书并提取其公钥? 问题答案: 如何将证书(.cer)从客户端发送到服务器? 客户端证书(.cer,.crt,

  • 我正在尝试通过 tcp 连接远程执行程序,我想在客户端之间实时共享标准输出和标准输出 我有以下没有错误处理的测试服务器:p我知道,目前我无法执行带有参数的程序,但这很容易处理:) 你看,我尝试与 c.Write() 共享标准输出,但这不起作用。 我认为cmd.Stdin的另一个问题将与Stdout的问题相同。此时我没有实现任何标准函数。 有人能给我一个关于这个函数的提示或示例代码吗?

  • 在移动应用程序和Web服务之间有某种代理,我们在发出发布请求时对响应感到困惑。我们收到状态为200: OK的响应。但是我们找不到/提取JSON响应正文。 我们正在使用JAX-RS。有人能提供一些提示,从服务器响应中提取JSON主体(