我是新手RabbitMQ java客户端。我的问题:我创建了10个consumer并将它们添加到队列中。每个消费者使用10秒来处理我的流程。我检查了Rabbit的页面,我看到我的队列有4000条消息没有发送到客户端。我检查了日志客户端,结果是为一个消费者获取一条消息,10秒后,我为一个消费者获取一条消息,依此类推…我想要得到10个消息为所有消费者在当时(10个消息-10消费者过程在当时)请帮助我,我没有找到解决问题。多谢。
while (!isRetry) {
try {
isRetry = true;
connection = mConnectionFactory.newConnection(addresses.toArray(new Address[addresses.size()]));
String queueName = "webhook_customer";
String exchangeName = "webhook_exchange";
String routingKey = "customer";
System.out.println("step2");
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchangeName, "topic", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
channel.basicQos(1);
for (int i = 0; i < numberWorker; i++) {
Consumer consumer = new QueueingConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
long startProcess = System.nanoTime();
JSONObject profile = null;
try {
} catch (IOException ioe) {
handleLogError(profile, ioe.getMessage().toString());
} catch (Exception e) {
handleLogError(profile, e.getMessage());
} finally {
channel.basicAck(envelope.getDeliveryTag(), false);
long endProcess = System.nanoTime();
_logger.info("===========######### TIME PROCESS + " + (endProcess - startProcess) + " Nano Seconds ========#### " + (endProcess - startProcess) / 1000000 + " Milli Seconds");
}
}
};
channel.basicConsume(queueName, false, consumer);
}
System.out.printf("Start Listening message ...");
} catch (Exception e) {
System.out.println("exception " + e.getMessage());
isRetry = closeConnection(connection);
e.printStackTrace();
} finally {
}
if (!isRetry) {
try {
System.out.println("sleep waiting retry ...");
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//END
}
我确实在我的案子中找到了解决办法。当消息进入时,我在消费者中使用新线程,并在其中进行处理。并且我创建了多个通道以便同时发送多条消息。我使用threadpool来控制线程
是否可以使用topic将消息发送到队列,并有2个消费者接收和处理相同的消息?目前,我已经创建了两个消费者,他们正在观察与一个exchage主题绑定的队列,但是第一个消费者使用了该消息并删除了该队列,第二个消费者没有接收到该消息。
我刚刚开始使用RabbitMQ和AMQP。 我有一个消息队列 我有多个消费者,我想用相同的消息做不同的事情。 RabbitMQ的大部分文档似乎都集中在循环(round-robin)上,即单个消息由单个消费者使用,负载在每个消费者之间分散。这的确是我目击的行为。 例如:生产者只有一个队列,每2秒发送一次消息: 这里有一个消费者: 如果我启动消费者两次,我可以看到每个消费者都在以循环行为消费交替消息。
我目前正在尝试使用RabbitMQ(具有出色的RabbitMQBundle)来处理大量的异步工作。 目标是让一个队列发布相同类型的消息,并让多个服务器上的X个工作者在同一时间内查看消息。 每个工人都要偷看一条消息,完成工作,然后偷看另一条消息,等等。 这里是我的conf: 在我的consumer中,我有一个日志文件中的条目和120秒的睡眠。 我启动了php app/console rabbitmq
问题内容: 我一般只是开始使用RabbitMQ和AMQP。 我有一条消息队列 我有多个消费者,我想用 同一条消息 做不同的事情。 RabbitMQ的大多数文档似乎都集中在循环上,即单个消息由单个使用者使用,而负载则分散在每个使用者之间。我确实是这种行为。 例如:生产者只有一个队列,每2秒发送一次消息: 这是一个消费者: 如果我启动使用者两次,则 可以看到每个使用者都以循环方式使用替代消息。 例如,
JMS队列有2个消费者,同步和异步Java应用程序进程等待响应。1)同步应用程序发送请求,并根据JMS相关ID等待响应60秒。2)异步线程将不断侦听同一队列。
我有一个关于RabbitMQ队列的问题。我想在一个队列上发送两种类型的消息。 我知道,我可以创建两个不同的队列,并使用路由键将不同的消息发送到不同的队列。 但是我希望在一个队列上有两个消费者,并以某种方式将消费者与消息类型绑定。它是通过兔子队列驱动的事件,当客户端和核心是发布者和消费者时。 有可能吗?或者我应该使用不同的队列吗? 数据交换