当前位置: 首页 > 知识库问答 >
问题:

一个队列上的多个使用者RabbitMQ-Java

奚昌胤
2023-03-14

我是新手RabbitMQ java客户端。我的问题:我创建了10个consumer并将它们添加到队列中。每个消费者使用10秒来处理我的流程。我检查了Rabbit的页面,我看到我的队列有4000条消息没有发送到客户端。我检查了日志客户端,结果是为一个消费者获取一条消息,10秒后,我为一个消费者获取一条消息,依此类推…我想要得到10个消息为所有消费者在当时(10个消息-10消费者过程在当时)请帮助我,我没有找到解决问题。多谢。

        while (!isRetry) {
        try {
            isRetry = true;
            connection = mConnectionFactory.newConnection(addresses.toArray(new Address[addresses.size()]));
            String queueName = "webhook_customer";
            String exchangeName = "webhook_exchange";
            String routingKey = "customer";
            System.out.println("step2");

            Channel channel = connection.createChannel();
            channel.exchangeDeclare(exchangeName, "topic", true);
            channel.queueDeclare(queueName, true, false, false, null);
            channel.queueBind(queueName, exchangeName, routingKey);
            channel.basicQos(1);
            for (int i = 0; i < numberWorker; i++) {
                Consumer consumer = new QueueingConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope,
                                               AMQP.BasicProperties properties, byte[] body) throws IOException {
                        long startProcess = System.nanoTime();
                        JSONObject profile = null;
                        try {

                        } catch (IOException ioe) {
                            handleLogError(profile, ioe.getMessage().toString());
                        } catch (Exception e) {
                            handleLogError(profile, e.getMessage());
                        } finally {
                            channel.basicAck(envelope.getDeliveryTag(), false);
                            long endProcess = System.nanoTime();
                            _logger.info("===========######### TIME PROCESS  + " + (endProcess - startProcess) + " Nano Seconds  ========#### " + (endProcess - startProcess) / 1000000 + " Milli Seconds");
                        }
                    }
                };

                channel.basicConsume(queueName, false, consumer);
            }
            System.out.printf("Start Listening message ...");
        } catch (Exception e) {
            System.out.println("exception " + e.getMessage());
            isRetry = closeConnection(connection);
            e.printStackTrace();
        } finally {
        }
        if (!isRetry) {
            try {
                System.out.println("sleep waiting retry ...");
                Thread.sleep(30000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        //END
    }

共有1个答案

邹祺然
2023-03-14

我确实在我的案子中找到了解决办法。当消息进入时,我在消费者中使用新线程,并在其中进行处理。并且我创建了多个通道以便同时发送多条消息。我使用threadpool来控制线程

 类似资料:
  • 是否可以使用topic将消息发送到队列,并有2个消费者接收和处理相同的消息?目前,我已经创建了两个消费者,他们正在观察与一个exchage主题绑定的队列,但是第一个消费者使用了该消息并删除了该队列,第二个消费者没有接收到该消息。

  • 我刚刚开始使用RabbitMQ和AMQP。 我有一个消息队列 我有多个消费者,我想用相同的消息做不同的事情。 RabbitMQ的大部分文档似乎都集中在循环(round-robin)上,即单个消息由单个消费者使用,负载在每个消费者之间分散。这的确是我目击的行为。 例如:生产者只有一个队列,每2秒发送一次消息: 这里有一个消费者: 如果我启动消费者两次,我可以看到每个消费者都在以循环行为消费交替消息。

  • 我目前正在尝试使用RabbitMQ(具有出色的RabbitMQBundle)来处理大量的异步工作。 目标是让一个队列发布相同类型的消息,并让多个服务器上的X个工作者在同一时间内查看消息。 每个工人都要偷看一条消息,完成工作,然后偷看另一条消息,等等。 这里是我的conf: 在我的consumer中,我有一个日志文件中的条目和120秒的睡眠。 我启动了php app/console rabbitmq

  • 问题内容: 我一般只是开始使用RabbitMQ和AMQP。 我有一条消息队列 我有多个消费者,我想用 同一条消息 做不同的事情。 RabbitMQ的大多数文档似乎都集中在循环上,即单个消息由单个使用者使用,而负载则分散在每个使用者之间。我确实是这种行为。 例如:生产者只有一个队列,每2秒发送一次消息: 这是一个消费者: 如果我启动使用者两次,则 可以看到每个使用者都以循环方式使用替代消息。 例如,

  • JMS队列有2个消费者,同步和异步Java应用程序进程等待响应。1)同步应用程序发送请求,并根据JMS相关ID等待响应60秒。2)异步线程将不断侦听同一队列。

  • 我有一个关于RabbitMQ队列的问题。我想在一个队列上发送两种类型的消息。 我知道,我可以创建两个不同的队列,并使用路由键将不同的消息发送到不同的队列。 但是我希望在一个队列上有两个消费者,并以某种方式将消费者与消息类型绑定。它是通过兔子队列驱动的事件,当客户端和核心是发布者和消费者时。 有可能吗?或者我应该使用不同的队列吗? 数据交换