我有一个从Rabbit接收消息的应用程序。当收到一条消息时,它会对它进行处理,然后在完成时执行ACK。应用程序可以在一个固定的线程池中同时处理2个项目,有2个线程。Rabbit的QOS预取设置为2,因为我不想在一个时间框架内给应用提供超过它所能处理的内容。
现在,我的消费者的handleDelivery执行以下操作:
Task run = new Task(JSON.parse(message));
service.execute(new TestWrapperThread(getChannel(),run,envelope.getDeliveryTag()));
此时,您已经发现TestWrapperThread将channel.basicack(deliveryTag,false);
调用作为最后一个操作。
根据我对文档的理解,这是不正确的,而且可能有害,因为通道不是线程安全的,而且这种行为可能会把事情搞砸。那我该怎么做呢?我的意思是,我有一些想法,但他们会让一切变得更复杂,我想弄清楚它是否真的有必要。
提前致谢
我想您使用channel
只是为了您的消费者,而不是其他操作,如发布等。
在您的情况下,唯一的潜在问题是:
channel.basicAck(deliveryTag, false);
因为您跨两个线程调用此操作,顺便说一句,如果您看到java代码,则此操作是安全的:
public void basicAck(long deliveryTag, boolean multiple)
throws IOException
{
transmit(new Basic.Ack(deliveryTag, multiple));
}
public void transmit(Method m) throws IOException {
synchronized (_channelMutex) {
transmit(new AMQCommand(m));
}
}
_channelmutex
是受保护的最终对象_channelmutex=new Object();
类创建的。请参阅amqchannel.java的github代码
编辑
希望有帮助。
编辑2我还加上尼古拉斯的评论:
请注意,从多个线程消费(basicConsumer)和添加是java客户机已经使用的常见rabbitmq模式。
每个通道都有自己的分派线程。对于每个渠道一个消费者的最常见用例,这意味着消费者不会拖住其他消费者。如果每个通道有多个使用者,请注意长时间运行的使用者可能会阻碍回调到该通道上其他使用者的调度。 我有各种命令(消息)通过单个入站队列和通道进入,该队列和通道附加了DefaultConsumer。假设DefaultConsumer中有一个threadpool允许我直接从consumer回调方法运行应用程序
我刚刚阅读了RabbitMQ的Java API文档,发现它非常丰富而且简单。关于如何为发布/消费设置一个简单的的示例非常容易遵循和理解。但这是一个非常简单/基本的示例,它给我留下了一个重要的问题:如何设置1+来向多个队列发布/消费? 假设我有一个RabbitMQ服务器,上面有3个队列:、和。因此,我们需要一个来发布/使用所有3个队列,或者更有可能有3个单独的,每个通道专用于一个队列。 在此基础上,
本文向大家介绍Rust 与通道的跨线程通信,包括了Rust 与通道的跨线程通信的使用技巧和注意事项,需要的朋友参考一下 示例 通道可用于将数据从一个线程发送到另一线程。下面是一个简单的生产者-消费者系统的示例,其中主线程产生值0、1,...,9,而生成的线程将其打印出来:
RabbitMQ Java客户端有以下概念: -与RabbitMQ服务器实例的连接 -??? 使用者线程池-使用RabbitMQ服务器队列中的消息的线程池 队列-按FIFO顺序保存消息的结构 我试图理解他们之间的关系,更重要的是,他们之间的联系。 我仍然不太清楚是什么,除了这是您发布和使用的结构,并且它是从一个开放的连接创建的。如果有人能向我解释一下“通道”代表什么,可能有助于澄清一些问题。 通道
我的应用程序有多个线程将消息发布到单个RabbitMQ集群。 阅读rabbit文档:我阅读了以下内容: 对于使用多个线程/进程进行处理的应用程序,每一个线程/进程打开一个新通道,并且不在它们之间共享通道是非常常见的。 而且我明白,与其开通多个连接(昂贵) 不如开通多个通道。 但是为什么不对所有线程使用单个通道呢? 在单个通道上使用多个通道有什么好处?
我在我产品环境中发现了一个问题。 我们在一个mq集群中有6个队列,我们有200个线程的线程池(实际上会更多,因为它会在一个独立的线程池中安排一些特殊任务)来处理来自上游的请求,当处理请求时,我会发布一个消息给rabbitmq Broker。 所以我有200个线程将消息发布到这6个队列。 对于每个队列,我将创建一个AMQP连接,对于每个线程,我有一个Channel的threadlocal,这样每个线