当前位置: 首页 > 知识库问答 >
问题:

python - 为什么 kombu 的 ConsumerMixin 消费阻塞了?

桓修能
2023-07-08
from kombu.mixins import ConsumerMixin
from kombu import Connection, Exchange, Queue
from loguru import logger


class MyConsumer(ConsumerMixin):
    def __init__(self, connection):
        self.connection = connection

    def get_consumers(self, Consumer, channel):
        print('创建消费者 start')
        queue_name = 'evt-ye.events-take--dna_create_service.auth'
        exchange_name = 'ye.events'
        routing_key = 'take'

        exchange = Exchange(exchange_name, type='topic')
        queue = Queue(
            queue_name, exchange=exchange,
            routing_key=routing_key,
            queue_arguments={'x-max-priority': 10}
        )

        # 创建一个消费者,并设置预取消息数量为10
        consumer = Consumer(
            queues=[queue], callbacks=[self.on_message],
            prefetch_count=10
        )
        print('创建消费者 down')
        return [consumer]

    def on_message(self, body, message):
        logger.debug(f"Received message: {body}")


with Connection('amqp://pon:pon@192.168.38.191:5672//') as conn:
    consumer = MyConsumer(conn)
    consumer.run()

上面的代码运行后输出

╰─➤  python -u "/Users/ponponon/Desktop/code/me/pon_example/demo.py"                                                                                                                                                                                                      130 ↵
创建消费者 start
创建消费者 down
2023-07-07 14:21:29.154 | DEBUG    | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG    | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG    | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG    | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG    | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG    | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG    | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG    | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG    | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.156 | DEBUG    | __main__:on_message:32 - Received message: 哈哈哈

然后就没有然后了,程序没有退出,一直阻塞着

图片.png

从 rabbitmq 的监控面板看,也一直出于阻塞状态

图片.png

用 wireshark 抓包看,也没有回复 ack

为什么 ?

共有1个答案

越嘉树
2023-07-08
def on_message(self, body, message):
    logger.debug(f"Received message: {body}")
    message.ack()  # acknowledge the message
 类似资料:
  • 我正在尝试通过对象读取命令。为了检查输入语法,我使用<code>sc。hasNext()(对于缺少命令的情况)。它已经在很多情况下运行良好,但现在我看到了JavaAPI中描述的“MAY block and wait for Input”的情况。 方法何时阻塞,我如何控制它?有趣的是,在街区前的3个案例中,它工作得非常好。此外,JavaAPI还将描述为检查是否存在另一个Input的正确方法,从而使方

  • 问题内容: 注意:这不是有关settimeout的复制文章,此处的关键答案是浏览器设计选项。 我开始研究node.js:一个测试异步的简单示例: 一件有趣的事情是,在带有curl的lind命令和浏览器中,它的行为是不同的:在Ubuntu 12.10中,我在两个控制台中使用curl localhost:8080,它们在几乎相同的10个发送中进行响应。 但是,我打开了两个浏览器,几乎同时发出了请求,但

  • 问题内容: 我应该开发一个简单的SFTP。 一切都进行得很好,直到我(在本例中)没有编写全部为止。可以请我解释一下,为什么系统挂在我身上吗? 服务器端: 客户端: 问题答案: 您的循环一直运行到流结束,但是对等方永远不会关闭套接字。该协议似乎要求打开套接字以供其他命令使用,因此您必须调整它的这一部分以包括一个长度字前缀,以便您知道要复制多少字节。 问题不是关于不写所有字节,而是关于阻塞in 。

  • 我不明白为什么webclient会阻止我使用gradle的主要netty线程,以下是它的依赖项: 这个gradle脚本在两个应用程序中都使用。在第一个应用程序中,我执行: 第二个应用程序模拟长响应处理: 我希望呼叫服务不会阻塞主线程,而是会继续处理传入的连接,但直到我收到第一个呼叫的响应(睡眠将起作用),我的下一个连接将挂起等待。 结果:第一个应用程序像tomcat一样工作,只有一个线程 我的问题

  • 线程实例的join()方法可用于将一个线程的执行开始“连接”到另一个线程的执行结束,这样一个线程在另一个线程结束之前不会开始运行。如果对线程实例调用join(),则当前运行的线程将阻塞,直到线程实例完成执行 但是如果我有多个线程并且当我在循环内部调用join时。所有线程并行运行。但是根据连接的概念,首先连接的线程应该完成,然后只有主线程才允许连接其他线程。 } 在上面的代码中,如果第一个线程被连接

  • 问题内容: 我想编写一个可以同时写入多个文件的程序。认为可以通过使用非阻塞模式在一个线程中实现。但是FileChannel不支持非阻塞模式。有人知道为什么吗? 问题答案: UNIX不支持非阻塞的文件I / O,看到非阻塞I / O与常规文件 。由于Java应该(至少尝试在所有平台上)提供相同的行为,因此不会实现。 但是,Java 7将包括一个支持 异步 文件I / O 的新类,这是与非阻塞I /