我有一个服务员线程想使用RabbitMQ direct exchange向Java中的客户线程发送一道寿司,但是我的客户没有收到这道菜。下面是我的服务员用来发布寿司菜肴对象的方法:
请注意,<code>dishKey</code>作为参数传递,并在之前的if-else语句中被确定为<code>的“tamagoDishKey”</code>或<code>“ebiDishKey”的
public void waiterSendsDish (Sushi sushiDish, String dishKey) throws IOException, TimeoutException {
String sushiDishEx = "sushiDishExchange";
System.out.println("WAITER: Serving " + sushiDish.sushiName + " to Customer " + sushiDish.sushiNo + ".");
ConnectionFactory factory = new ConnectionFactory();
byte[] byteArray = getByteArraySushi(sushiDish);
try (Connection con = factory.newConnection()){
Channel chan = con.createChannel();
chan.exchangeDeclare (sushiDishEx, BuiltinExchangeType.DIRECT);
chan.basicPublish(sushiDishEx, dishKey, null, byteArray);
}
}
以下是我的客户用来消费一种寿司(tamago)的方法:
public void tamagoReceiveDish() throws IOException, TimeoutException {
String sushiDishEx = "sushiDishExchange";
String tamagoKey = "tamagoDishKey";
ConnectionFactory factory = new ConnectionFactory();
try (Connection con = factory.newConnection()) { // added a try clause
Channel chan = con.createChannel();
chan.exchangeDeclare(sushiDishEx, "direct"); // direct exchange
String queueName = chan.queueDeclare().getQueue();
chan.queueBind(queueName, sushiDishEx, tamagoKey);
chan.basicConsume(queueName, true, (x, msg)->{
byte[] byteArray = msg.getBody();
try {
Sushi sushi = (Sushi) deserialize(byteArray);
System.out.println("CUSTOMER " + this.custNo + " (ordered Tamago Sushi): Received " + sushi.sushiName + ".");
System.out.println("CUSTOMER " + this.custNo + ": Left the restaurant.");
} catch (Exception e) {}
}, x->{}
);
}
}
请注意,在为客户创建新连接时,我添加了一个< code>try子句,因为有许多客户(线程)被连续创建来订购和获取寿司,我希望客户只与服务员连接一次来获取寿司,然后停止连接(客户将离开餐厅)。
如果有人能帮我解决这个问题,我将不胜感激。我只想让一位独特的顾客接受每道菜,然后“离开餐厅”(关闭与该顾客的联系?)
我认为您无需等待消息即可关闭连接。没有什么能阻止你的代码关闭所有内容,这会杀死正在启动的消费者线程。
因此,您需要在一个time循环中等待:
private static final volatile boolean NO_LEFT = true;
public void tamagoReceiveDish() throws IOException, TimeoutException {
String sushiDishEx = "sushiDishExchange";
String tamagoKey = "tamagoDishKey";
ConnectionFactory factory = new ConnectionFactory();
try (Connection con = factory.newConnection()) { // added a try clause
Channel chan = con.createChannel();
chan.exchangeDeclare(sushiDishEx, "direct"); // direct exchange
String queueName = chan.queueDeclare().getQueue();
chan.queueBind(queueName, sushiDishEx, tamagoKey);
chan.basicConsume(queueName, true, (x, msg)->{
byte[] byteArray = msg.getBody();
try {
Sushi sushi = (Sushi) deserialize(byteArray);
System.out.println("CUSTOMER " + this.custNo + " (ordered Tamago Sushi): Received " + sushi.sushiName + ".");
System.out.println("CUSTOMER " + this.custNo + ": Left the restaurant.");
NO_LEFT = false;
} catch (Exception e) {}
}, x->{}
);
while(NO_LEFT){
Thread.sleep(100);
}
}}
我一直在尝试使用RabbitMQ,但遇到了以下问题(与此非常类似:RabbitMQ中的主题交换与直接交换)。 我需要密集地广播大约800种类型的消息(因此每种消息类型都会有很多消费者),我想知道以下哪种方法更好: > 创建一个直接交换,在该交换中,消息将使用路由密钥(消息类型名称)发送,每个消费者都将通过绑定了相应路由密钥的临时队列连接到该交换。(因为没有像“key1.key2.*”这样复杂的路由
我正在尝试设置我的第一个RabbitMQ死信交换,下面是我通过web管理界面使用的步骤: 创建名称为“dead.letter.test”的新直接交换 创建新队列“dead.letter.queue” 将“dead.letter.queue”绑定到“dead.letter.test” 创建新队列“test1”,并将死信交换设置为“dead.letter.test” 将消息发送到“test1” NAC
全部的 我对RabbitMQ在消耗大量消息(例如280000条)时的性能有一个问题。从性能角度来看,它似乎会上下波动。从管理控制台获取的图中所示的图表演示了这一点,其中消费者平均每秒约40条消息,然后跳到每秒约120条消息: 该模式将再次重复,它将再次返回到40,然后再次返回120,依此类推,如果我在1小时后运行相同的测试,则会发生相同的上下效应,但范围可能会有很大差异,例如每秒140到400条消
发布者创建reply_to队列并发布到路由密钥,其中包含一条消息,告诉消费者向队列发送响应(RPC协议),以及一个传回的相关id,以便所有未来的结果都与该唯一标识符相关联 Exchange向绑定到该路由密钥的所有队列发送消息。这里,有两个消费者的两个队列,每个都绑定到路由密钥“泵” 一段时间后,消费者回复回队列,然后确认消息,以便他们的唯一队列删除发送到其队列的消息。每个收到消息的消费者都会这样做
我们使用的是Spring kafka 2.7非阻塞重试机制。在Spring Kafka重试机制中,Kafka listenser使用来自main topic、Retry topic和DLT topic的消息,我们希望侦听器仅使用来自main和Retry topic的消息。 有没有简单的方法来进行设置? 因为我们不希望同一个消费者处理DLT消息。DLT还将被另一个进程使用,以发送请求通知。
我成功地建立了一个话题交换,并且能够同时向几个消费者传递消息。 我还想向竞争对手传递信息,并继续使用主题交换。我了解到,使用相同的队列名称可以让消费者竞争消息。然而,我可能弄错了,因为我无法使它工作。 为同一主题的多个侦听器设置: < li >申报话题交流 < li >对于每个侦听器,用自动生成的名称声明一个新队列 < li >用给定的主题路由关键字将此队列绑定到上面的交换 如何将相互竞争的消费者