from kombu.mixins import ConsumerMixin
from kombu import Connection, Exchange, Queue
from loguru import logger
class MyConsumer(ConsumerMixin):
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
print('创建消费者 start')
queue_name = 'evt-ye.events-take--dna_create_service.auth'
exchange_name = 'ye.events'
routing_key = 'take'
exchange = Exchange(exchange_name, type='topic')
queue = Queue(
queue_name, exchange=exchange,
routing_key=routing_key,
queue_arguments={'x-max-priority': 10}
)
# 创建一个消费者,并设置预取消息数量为10
consumer = Consumer(
queues=[queue], callbacks=[self.on_message],
prefetch_count=10
)
print('创建消费者 down')
return [consumer]
def on_message(self, body, message):
logger.debug(f"Received message: {body}")
with Connection('amqp://pon:pon@192.168.38.191:5672//') as conn:
consumer = MyConsumer(conn)
consumer.run()
上面的代码运行后输出
╰─➤ python -u "/Users/ponponon/Desktop/code/me/pon_example/demo.py" 130 ↵
创建消费者 start
创建消费者 down
2023-07-07 14:21:29.154 | DEBUG | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.155 | DEBUG | __main__:on_message:32 - Received message: 哈哈哈
2023-07-07 14:21:29.156 | DEBUG | __main__:on_message:32 - Received message: 哈哈哈
然后就没有然后了,程序没有退出,一直阻塞着
从 rabbitmq 的监控面板看,也一直出于阻塞状态
用 wireshark 抓包看,也没有回复 ack
为什么 ?
def on_message(self, body, message):
logger.debug(f"Received message: {body}")
message.ack() # acknowledge the message
我正在尝试通过对象读取命令。为了检查输入语法,我使用<code>sc。hasNext()(对于缺少命令的情况)。它已经在很多情况下运行良好,但现在我看到了JavaAPI中描述的“MAY block and wait for Input”的情况。 方法何时阻塞,我如何控制它?有趣的是,在街区前的3个案例中,它工作得非常好。此外,JavaAPI还将描述为检查是否存在另一个Input的正确方法,从而使方
问题内容: 注意:这不是有关settimeout的复制文章,此处的关键答案是浏览器设计选项。 我开始研究node.js:一个测试异步的简单示例: 一件有趣的事情是,在带有curl的lind命令和浏览器中,它的行为是不同的:在Ubuntu 12.10中,我在两个控制台中使用curl localhost:8080,它们在几乎相同的10个发送中进行响应。 但是,我打开了两个浏览器,几乎同时发出了请求,但
问题内容: 我应该开发一个简单的SFTP。 一切都进行得很好,直到我(在本例中)没有编写全部为止。可以请我解释一下,为什么系统挂在我身上吗? 服务器端: 客户端: 问题答案: 您的循环一直运行到流结束,但是对等方永远不会关闭套接字。该协议似乎要求打开套接字以供其他命令使用,因此您必须调整它的这一部分以包括一个长度字前缀,以便您知道要复制多少字节。 问题不是关于不写所有字节,而是关于阻塞in 。
我不明白为什么webclient会阻止我使用gradle的主要netty线程,以下是它的依赖项: 这个gradle脚本在两个应用程序中都使用。在第一个应用程序中,我执行: 第二个应用程序模拟长响应处理: 我希望呼叫服务不会阻塞主线程,而是会继续处理传入的连接,但直到我收到第一个呼叫的响应(睡眠将起作用),我的下一个连接将挂起等待。 结果:第一个应用程序像tomcat一样工作,只有一个线程 我的问题
线程实例的join()方法可用于将一个线程的执行开始“连接”到另一个线程的执行结束,这样一个线程在另一个线程结束之前不会开始运行。如果对线程实例调用join(),则当前运行的线程将阻塞,直到线程实例完成执行 但是如果我有多个线程并且当我在循环内部调用join时。所有线程并行运行。但是根据连接的概念,首先连接的线程应该完成,然后只有主线程才允许连接其他线程。 } 在上面的代码中,如果第一个线程被连接
问题内容: 我想编写一个可以同时写入多个文件的程序。认为可以通过使用非阻塞模式在一个线程中实现。但是FileChannel不支持非阻塞模式。有人知道为什么吗? 问题答案: UNIX不支持非阻塞的文件I / O,看到非阻塞I / O与常规文件 。由于Java应该(至少尝试在所有平台上)提供相同的行为,因此不会实现。 但是,Java 7将包括一个支持 异步 文件I / O 的新类,这是与非阻塞I /