我想在一段时间后在我的一个工人中收到一条消息。在发现了所谓的死信交换之后,我决定使用Node和RabbitMQ。
该消息似乎已发送到DeadExchange中的队列,但是使用方在WorkExchange中的WorkQueue中经过了一段时间之后,使用者再也没有收到该消息。要么bindQueue关闭,要么死信不起作用?
我现在尝试 了很多 不同的值。有人可以指出我所缺少的吗?
var amqp = require('amqplib');
var url = 'amqp://dev.rabbitmq.com';
amqp.connect(url).then(function(conn) {
//Subscribe to the WorkQueue in WorkExchange to which the "delayed" messages get dead-letter'ed (is that a verb?) to.
return conn.createChannel().then(function(ch) {
return ch.assertExchange('WorkExchange', 'direct').then(function() {
return ch.assertQueue('WorkQueue', {
autoDelete: false,
durable: true
})
}).then(function() {
return ch.bindQueue('WorkQueue', 'WorkExchange', '');
}).then(function() {
console.log('Waiting for consume.');
return ch.consume('WorkQueue', function(msg) {
console.log('Received message.');
console.log(msg.content.toString());
ch.ack(msg);
});
});
})
}).then(function() {
//Now send a test message to DeadExchange to a random (unique) queue.
return amqp.connect(url).then(function(conn) {
return conn.createChannel();
}).then(function(ch) {
return ch.assertExchange('DeadExchange', 'direct').then(function() {
return ch.assertQueue('', {
arguments: {
'x-dead-letter-exchange': 'WorkExchange',
'x-message-ttl': 2000,
'x-expires': 10000
}
})
}).then(function(ok) {
console.log('Sending delayed message');
return ch.sendToQueue(ok.queue, new Buffer(':)'));
});
})
}).then(null, function(error) {
console.log('error\'ed')
console.log(error);
console.log(error.stack);
});
我正在使用amqp.node(https://github.com/squaremo/amqp.node),它是npm中的amqplib。尽管node-
amqp(https://github.com/postwait/node-amqp)似乎更受欢迎,但它并未实现完整的协议,并且在重新连接方面存在许多突出的问题。
dev.rabbitmq.com正在运行RabbitMQ 3.1.3。
AMQP.Node中的Channel#assertQueue中有一个错误已得到修复,请参见https://github.com/squaremo/amqp.node/commit/3749c66b448875d2df374e6a89946c0bdd0cb918。该修补程序位于GitHub上,但尚未在npm中。
我有一个使用RabbitMQ构建的发布-订阅场景。有一个交换,其中消息由发布者发送,订阅该交换的任何使用者在其各自的队列中接收这些消息。这是一个扇出场景,其中有一个生产者,但有多个消费者。 现在我愿意在系统中集成一个死信队列,以便稍后处理被拒绝的消息。我的问题是 1)我应该为每个使用者配置一个单独的死信队列,还是应该有一个单独的死信队列来处理来自所有使用者的所有被拒绝的消息? 2)如果两者都有可能
我们已经实现了延迟消息处理,有2个队列和x-死信-交换/x-消息-ttl,在queue1中的消息超时后,它将转到queue2。 现在是否有可能设置RabbitMQ,以便如果在处理来自queue2的消息时,我们将其拒绝为“死信”,那么它将自动转到queue3?我担心的是queue2中的消息已经标记为“已死”,有没有办法区分那些因为被拒绝而死的消息,并自动只将那些放在队列3中?
TL;DR:一旦我修复了最初导致消息被拒绝的消费者代码,我需要将死信消息“重放”回其原始队列中。 我已经为RabbitMQ配置了死信交换(DLX),并成功地将拒绝的消息路由到死信队列。但现在我想查看死信队列中的消息,并尝试决定如何处理每个消息。一些(许多?)一旦有问题的消费者代码被修复,这些消息中的所有消息都应该重放(重新排队)到它们的原始队列(在“x-death”标题中可用)。但我该怎么做呢?我
我正在尝试设置我的第一个RabbitMQ死信交换,下面是我通过web管理界面使用的步骤: 创建名称为“dead.letter.test”的新直接交换 创建新队列“dead.letter.queue” 将“dead.letter.queue”绑定到“dead.letter.test” 创建新队列“test1”,并将死信交换设置为“dead.letter.test” 将消息发送到“test1” NAC
我为RabbitMQ制作了一个消费者,作为一个用C#.NET编写的控制台应用程序。它被编程为永久监听队列,每当它在队列中发现消息时,它就处理它。使用者平均每秒处理35条消息。使用者被安排在系统启动时在任务计划程序中运行。消费者运行良好的3-4天。但是,它们继续运行,但不处理任何消息,尽管队列中有消息。当使用者停止并再次启动时,它再次开始正确处理消息。但是,当您手动重新启动时,数以百万计的消息排在队
这种需求类似于通过公开的REST服务API(Spring Boot)处理来自死信队列的消息。以便一旦调用REST服务,就会从DL队列中消耗一条消息,并将再次发布到主队列中进行处理。@RabbitListener(queues=“queue_name”)立即使用消息,这在场景中是不需要的。该消息只需由REST服务API使用。有什么建议或解决办法吗?