当前位置: 首页 > 知识库问答 >
问题:

指定主题时使用直接交换的RabbitMQ

梁磊
2023-03-14

在我的应用程序中,我有3个类:< br> - Company,它为3项工作中的任何一项雇用工人< br> - Workers,每个人可以做2项工作< br> - Administrator,它接收程序中所有消息的副本,并可以向所有公司、所有工人或每个人发送消息

我使用< code > work . companies . company name 作为公司密钥,使用< code > work . workers . worker name 作为工人密钥,它们都使用默认的交换和队列进行通信。管理员通过< code>admin主题交换接收消息。

问题出在管理员身上 -

public class Administrator
{
    public static void main(String[] args) throws IOException, TimeoutException
    {
        new Thread(new TopicListener("admin", ign -> {})).start();
        TopicWriter writer = new TopicWriter();
    // lots of code

TopicListener.java:

public class TopicListener implements Runnable
{
    private final String EXCHANGE_NAME = "space";
    private String key;
    private Consumer<String> msgHandler;

    public TopicListener(String key, Consumer<String> msgHandler)
    {
        this.key = key;
        this.msgHandler = msgHandler;
    }

    @Override
    public void run()
    {
        try
        {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();

            channel.exchangeDeclare(EXCHANGE_NAME, "topic");

            String queueName = channel.queueDeclare().getQueue();
            channel.queueBind(queueName, EXCHANGE_NAME, key);

            com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel)
            {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                {
                    String msg = new String(body, StandardCharsets.UTF_8);
                    System.out.println("Received: \"" + msg + "\"");
                    msgHandler.accept(msg);
                }
            };

            channel.basicConsume(queueName, true, consumer);
        }
        catch (IOException | TimeoutException e)
        { e.printStackTrace(); }
    }
}

TopicWriter.java:

public class TopicWriter
{
    private final String EXCHANGE_NAME = "space";
    private final Channel channel;

    public TopicWriter() throws IOException, TimeoutException
    {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        this.channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
    }

    public void send(String msg, String key) throws IOException
    {
        channel.basicPublish(
                EXCHANGE_NAME,
                key,
                null,
                msg.getBytes(StandardCharsets.UTF_8));
    }
}

Company.java包含:

new Thread(new TopicListener("space.agencies." + agencyID, ign -> {})).start();

Worker.java包含:

new Thread(new TopicListener("space.carriers." + carrierID, consumer)).start();

共有1个答案

章学义
2023-03-14

我发现了问题所在:我试图用Topic给每个人发送消息,在RabbitMQ中,Topic用于指定谁应该接收消息。应该在队列键声明中使用“#”或“*”,而不是在发送带有给定键的消息时使用。

 类似资料:
  • 是否有可能为带有“直接”类型的Rabbitmq交换设置一些“默认”队列? 比如,我有一个exchange A,队列Q1、Q2、Q3和QDef。因此,如果使用路由密钥Q1发布某个消息,它将转到Q1。但如果消息使用路由密钥Q4,则它应该转到QDef。若路由密钥不是现有队列的名称,则消息应转到QDef。 有可能做兔子吗?也许交换不应该是“直接”类型,而应该是其他类型? 换句话说。如果某个消费者为某个路由

  • 我一直在尝试使用RabbitMQ,但遇到了以下问题(与此非常类似:RabbitMQ中的主题交换与直接交换)。 我需要密集地广播大约800种类型的消息(因此每种消息类型都会有很多消费者),我想知道以下哪种方法更好: > 创建一个直接交换,在该交换中,消息将使用路由密钥(消息类型名称)发送,每个消费者都将通过绑定了相应路由密钥的临时队列连接到该交换。(因为没有像“key1.key2.*”这样复杂的路由

  • 我有一个场景,我需要执行一系列流程,每个步骤都在独立的应用程序中完成和扩展。我正在为所有交换使用主题交换。当前拓扑如下所示: P- 我们正在“版本化”队列,以处理可能影响消息结构的需求更改。绑定可能如下所示: 步骤1。exchange绑定到步骤1。v1。使用绑定键step1排队。v1 步骤1。exchange绑定到步骤1。v2。使用绑定键step1排队。v2级 还有其他与版本无关的绑定模式也使局部

  • 我有点迷茫,正在尝试实施话题交流,不确定需要什么。 我想有几个路由键和一个主题交换(默认的amq.topic)。我的钥匙是: 创建customer.app 创建customer.app 客户。*.创建 我希望我的队列是持久的,但是我需要1个“客户”队列还是2个appA和appB队列?我已经弄清楚了我的发布者;连接、交换声明、基本发布。 但我正在与消费者斗争。假设我想打开3个控制台,上面提到的每个路

  • 我正在寻求一些关于如何最好地配置我的rabbitMQ交换的建议。 我试着用循环系统的方法来交换话题。每个使用者都有自己的(唯一的)命名队列连接到主题交换。我希望交换为“相同”主题循环消息到每个使用者队列-比如。 我尝试了多个组合,但似乎只能同时将消息传递到使用者队列,这实际上意味着我要处理两次消息,每个使用者一次。 为了清楚起见,我还有一个扇出交换,我用它来“控制”消费者(启动、停止等)。这应该在

  • 我想使用下面的连接器配置将多个表数据发布到同一个Kafka主题,但我看到了下面的异常 例外 原因:io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException:正在注册的架构与早期架构不兼容;错误代码:409 连接器似乎忽略了主题策略属性集,并继续使用旧的${主题}-key和${主题}-value主题。 连