使用主题交换,我希望有一个具有以下特性的发布/订阅消息传递模式:
所以我创建了2个控制台应用程序(发送和接收)来测试上面的内容。
static void Main(string[] args)
{
Console.WriteLine(" Type [exit] to exit.");
Publisher publisher = new Publisher();
do
{
var userInput = Console.ReadLine();
if (userInput == "exit")
{
break;
}
publisher.SendMessageToBroker("localhost", "main", "user.update", userInput);
} while (true);
}
public class Publisher
{
const string ExchangeType = "topic";
Dictionary<ulong, string> unConfirmedMessageTags = new Dictionary<ulong, string>();
public void SendMessageToBroker(string host, string exchangeName, string routingKey, string message)
{
var factory = new ConnectionFactory() { HostName = host };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.BasicAcks += (sender, ea) => OnBasicAcks(ea.Multiple, ea.DeliveryTag);
channel.BasicNacks += (sender, ea) => OnBasicNacks(ea.Multiple, ea.DeliveryTag);
channel.ConfirmSelect();
channel.ExchangeDeclare(exchangeName, ExchangeType);
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
unConfirmedMessageTags.TryAdd(channel.NextPublishSeqNo, message);
channel.BasicPublish(exchange: exchangeName,
routingKey: routingKey,
basicProperties: properties,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
}
private void OnBasicNacks(bool multiple, ulong deliveryTag)
{
if (multiple)
{
Console.WriteLine("Messages with delivery tag LESS THAN {0} have been LOST and must be resent.", deliveryTag);
}
else
{
Console.WriteLine("Message with delivery tag {0} has been LOST and must be resent.", deliveryTag);
}
}
private void OnBasicAcks(bool multiple, ulong deliveryTag)
{
if (multiple)
{
var confirmed = unConfirmedMessageTags.Where(k => k.Key <= deliveryTag);
foreach (var entry in confirmed)
{
unConfirmedMessageTags.Remove(entry.Key);
Console.WriteLine("Message with delivery tag {0} has been confirmed and deleted.", entry.Key);
}
}
else
{
unConfirmedMessageTags.Remove(deliveryTag);
Console.WriteLine("Message with delivery tag {0} has been confirmed and deleted.", deliveryTag);
}
}
}
}
接收
static void Main(string[] args)
{
const string ExchangeName = "main";
const string QueueName = "q1";
const string ExchangeType = "topic";
const string RoutingKey = "user.update";
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.ExchangeDeclare(ExchangeName, ExchangeType);
channel.QueueDeclare(queue: QueueName,
durable: true,
autoDelete: false,
exclusive: false,
arguments: null);
channel.QueueBind(QueueName, ExchangeName, RoutingKey);
//channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) => Basic_Ack(channel, ea.DeliveryTag, ea.Body);
channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
private static void Basic_Ack(IModel channel, ulong deliveryTag, ReadOnlyMemory<byte> body)
{
var message = Encoding.UTF8.GetString(body.ToArray());
Console.WriteLine(" [x] Received {0}", message);
Thread.Sleep(2000);
channel.BasicAck(deliveryTag: deliveryTag, multiple: false);
Console.WriteLine(" [x] Processed {0}", message);
}
}
问题是我的Send程序中的OnBasicAcks只会为第一条消息调用一次。
对于其他可能遇到此问题的人,我正在为不鼓励的每一次发布打开一个连接和通道(虚拟连接):
“连接是长期存在的。为每次操作(例如发布消息)打开连接将非常低效,因此非常不鼓励使用。”
也看这里:
“publisher确认是通过ConfirmSelect方法在通道级别启用的...必须在希望使用publisher确认的每个通道上调用此方法。确认应该只启用一次,而不是对发布的每条消息启用一次。”
切换到使用长期连接解决了我的问题。
我有一个场景,我需要执行一系列流程,每个步骤都在独立的应用程序中完成和扩展。我正在为所有交换使用主题交换。当前拓扑如下所示: P- 我们正在“版本化”队列,以处理可能影响消息结构的需求更改。绑定可能如下所示: 步骤1。exchange绑定到步骤1。v1。使用绑定键step1排队。v1 步骤1。exchange绑定到步骤1。v2。使用绑定键step1排队。v2级 还有其他与版本无关的绑定模式也使局部
有没有办法更改发布者-确认每条消息?我们有一个接收消息并发布到RabbitMQ的Rest层。根据特定的消息属性,我们决定是否需要发布者确认。 有没有一种方法可以在发送消息时覆盖,发布者-确认?
TLDR;在主题交换和由使用者动态创建的队列的上下文中,当没有使用者使用消息时,如何重新传递消息/通知生产者? 我有以下组件: null 生成器使用向exchange发送消息。 每个使用者创建一个队列,并为一组路由密钥(例如,Pictures.*和Videos.Trending)将交换绑定到这个队列。 当使用者处理完文件后,它会将结果推送到processing_results队列中。 现在--这工
我计划在我的项目中使用Spring Cloud Stream。是否可以使用Publisher Confirms(又名Publisher Acknowledges),即注册确认回调,如中所述http://docs.spring.io/spring-amqp/reference/html/_reference.html#cf-发布配置ret?或者是否有其他可能从RabbitMQ接收ack,表明它已从客
我成功地建立了一个话题交换,并且能够同时向几个消费者传递消息。 我还想向竞争对手传递信息,并继续使用主题交换。我了解到,使用相同的队列名称可以让消费者竞争消息。然而,我可能弄错了,因为我无法使它工作。 为同一主题的多个侦听器设置: < li >申报话题交流 < li >对于每个侦听器,用自动生成的名称声明一个新队列 < li >用给定的主题路由关键字将此队列绑定到上面的交换 如何将相互竞争的消费者
问题内容: 我有下面的代码使用来将消息发布到队列中。该是越来越创建,但该消息无法被看到的队列。我也没有看到任何错误。 BasicApplication.java Producer.java 问题答案: 您使用了错误的方法;该方法的第一个参数是。 使用。