当前位置: 首页 > 面试题库 >

终止挂起的redis pubsub.listen()线程

慕容越泽
2023-03-14
问题内容

与此问题相关,我有以下代码可订阅redispubsub队列,并使用__init__中提供的处理程序将消息提供给处理它们的类:

from threading import Thread
import msgpack

class Subscriber(Thread):

    def __init__(self, redis_connection, channel_name, handler):
        super(Subscriber, self).__init__(name="Receiver")
        self.connection = redis_connection
        self.pubsub = self.connection.pubsub()
        self.channel_name = channel_name
        self.handler = handler
        self.should_die = False

    def start(self):
        self.pubsub.subscribe(self.channel_name)
        super(Subscriber, self).start()

    def run(self):
        for msg in  self.pubsub.listen():
            if self.should_die:
                return
            try:
                data = msg["data"]
                unpacked = msgpack.unpackb(data)
            except TypeError:
                # stop non-msgpacked, invalid, messages breaking stuff
                # other validation happens in handler
                continue
            self.handler(unpacked)

    def die(self):
        self.should_die = True

在上面的链接问题中,请注意,pubsub.listen()如果断开连接,则永不返回。因此,die()尽管我的函数可以被调用,但它实际上不会导致线程终止,因为它挂在listen()对线程内部的调用上run()

链接问题的可接受答案提到了黑客入侵redis-py的连接池。我真的不想这样做,并且有一个分支版本的redis-
py(至少在希望该修补程序被母版接受之前),但是无论如何我一直在看一下redis-py代码,并且不要立即看看将在哪里进行更改。

有谁知道如何彻底解决悬挂的redis-py listen()电话吗?

我将直接使用哪些问题Thread._Thread__stop


问题答案:

这么多年后才将其关闭。它最终是redis库中的一个bug。我调试了它并提交了PR。它不应该再发生了。



 类似资料:
  • 首先,这不是重复的: 好的,这是我的错误: 我所做的: 在VisualStudio中,我尝试拉取并更新一个分支,得到了提到的错误。谷歌搜索了一下,发现另一个git进程中的解决方案似乎正在这个存储库中运行,转到命令行(不是git bash),试图删除索引。锁定失败,转到git bash,尝试删除索引。锁,没有错误索引仍然存在(我猜它不是index.lock只是index)。去了VisualStudi

  • 目前,如果Autosys作业达到最大运行警报状态,就会向我们的帮助台生成电子邮件警报,他们可以采取适当的行动。根据我对AutoSys内部数据模型的理解,这是一个可能发生在作业上的“事件”。 这不同于我所知道的工作可以属于的各种雕像,一次一个; 已激活 非活动 开始 正在运行 成功 失败 挂起 冰上 开始晚 机器挂起 已终止 克里斯

  • 主要内容:1 什么是Java终止线程,2 Thread类终止线程的方法,3 Java终止线程的例子1,4 Java终止线程的例子2,5 Java终止线程的例子3,6 isInterrupted和interrupted方法1 什么是Java终止线程 如果任何线程处于睡眠或等待状态(即,调用sleep()或wait()方法),则在线程上调用interrupt()方法,会抛出InterruptedException中断睡眠或等待状态。如果线程未处于睡眠或等待状态,则调用interrupt()方法将执行

  • 问题内容: 当我测试创建子线程的方法的执行时,JUnit测试会在子线程之前终止并杀死它。 我如何强制JUnit等待子线程完成其执行? 谢谢 问题答案: 阅读问题和评论后,似乎您需要的是 一种对异步操作进行单元测试的技术 。doSomething()立即返回,但是您希望测试代码等待其完成,然后进行一些验证。 问题在于该测试无法识别该调用所产生的线程,因此显然它无法等待它们。人们可以想到许多复杂的(可

  • 请帮助我处理这个主线程/父线程将触发子线程。如果我们停止父线程/主线程,它还必须停止所有子线程/子线程 我想用中断做这件事,但做不到。请帮我把代码弄出来 以及如何确保所有子线程都已停止?有什么办法也可以做到这一点吗 提前谢谢! 我正在尝试这样做: 公共类ThreadTest1扩展Thread{私有静态最终记录器LOGGER=Logger.get记录器(mylogger); }