当前位置: 首页 > 知识库问答 >
问题:

在多个python进程之间共享RabbitMQ通道

谢弘阔
2023-03-14

我想在多个python进程之间共享blockingchannel。以便从其他python进程发送basic_ack

如何跨多个python进程共享blockingchannel

代码如下:

self.__connection__ = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.__channel__ = self.__connection__.channel()
filepath = "temp/" + "merger_channel.sav"
pickle.dump(self.__channel__, open(filepath, 'wb'))

目标是从通道从其他python进程发送basic_ack

共有1个答案

汤才捷
2023-03-14

在多个线程之间共享一个通道是一种反模式,您不太可能在进程之间共享它。

经验法则是每个进程1个连接和每个线程1个通道

您可以在以下链接中阅读更多有关此事的内容:

    null

如果您希望将消息消耗与多处理结合在一起,通常的模式是让主进程接收消息,将它们的有效负载传递给一个工作进程池,并在完成后对它们进行确认。

使用pika.blockingchannelconcurrent.futures.ProcessPoolExecutor的简单示例:

def ack_message(channel, delivery_tag, _future):
    """Called once the message has been processed.
    Acknowledge the message to RabbitMQ.
    """
    channel.basic_ack(delivery_tag=delivery_tag)

for message in channel.consume(queue='example'):
    method, properties, body = message

    future = pool.submit(process_message, body)
    # use partial to pass channel and ack_tag to callback function
    ack_message_callback = functools.partial(ack_message, channel, method.delivery_tag)
    future.add_done_callback(ack_message_callback)      

上面的循环将无休止地使用example队列中的消息,并将它们提交到进程池中。您可以通过RabbitMQ消费者预取参数控制并发处理多少消息。查看pika.basic_qos查看如何在Python中执行此操作。

 类似资料:
  • 问题内容: 我正在尝试使用部分函数,​​以便pool.map()可以定位具有多个参数(在本例中为Lock()对象)的函数。 这是示例代码(摘自我之前的问题的答案): 但是,当我运行此代码时,出现错误: 我在这里想念什么?如何在子流程之间共享锁? 问题答案: 您不能将普通对象传递给方法,因为它们不能被腌制。有两种方法可以解决此问题。一种是创建并传递一个: 不过,这有点重量级;使用需要产生另一个进程来

  • 问题内容: 该模块的文档显示了如何将队列传递给以开头的进程。但是,如何与开始的异步工作进程共享队列?我不需要动态加入或其他任何方式,而只是工人(反复)将其结果报告给基地的一种方法。 失败的原因是: 。我理解这意味着什么,并且我理解继承的建议,而不是要求进行酸洗/酸洗(以及所有Windows特殊限制)。但如何 做 我通过队列的方式,作品?我找不到一个示例,并且我尝试了多种失败的替代方法。请帮忙? 问

  • 问题内容: 我知道如何将其用于创建共享对象,尤其是可以在工作人员之间共享的队列。有这个问题,这个问题,[这个问题](http://codingdict.com/questions/1299甚至是我自己的一个问题。 但是,我需要定义很多队列,每个队列都链接一对特定的进程。假设每对进程及其链接队列均由变量标识。 当我需要放置和获取数据时,我想使用字典来访问我的队列。我无法完成这项工作。我已经尝试了很多

  • Python 3.1.2 我对多处理产生的两个线程之间的变量共享有问题。过程这是一个简单的bool变量,它应该决定线程是应该运行还是应该停止执行。下面是三种情况下显示的简化代码(但使用与我的原始代码相同的机制): 主要用于Thread加工。Thread类型和自紧度。正在运行布尔类型[工作正常] 我想了解的是为什么它是这样工作的,而不是另一种。(即,为什么第2点没有像我认为的那样起作用)。 测试是从

  • 问题内容: 我在Linux 2.6中。我有一个环境,其中2个进程通过消息传递模式的简单实现来模拟(使用共享内存)数据交换。 我有一个客户端进程(从父进程(即服务器)派生),该进程将struct(消息)写入使用以下命令创建的内存映射区域(在派生之后): 然后将此指针以链接列表的形式写入到另一个共享内存区域中的队列中,该共享内存区域对于服务器和客户端进程是通用的(因为如果在派生之前使用上面的相同代码创