当前位置: 首页 > 知识库问答 >
问题:

python WebSocket(异步版本)强制关闭连接

邹高峻
2023-03-14

我在为python编码

command is: stop
Do command is stopped
Stop 1
Stop 2
Stop 3
^CException ignored in: <module 'threading' from '/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py'>
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 1294, in _shutdown
    t.join()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 1056, in join
    self._wait_for_tstate_lock()
  File "/Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/threading.py", line 1072, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt
(pyalmondplus) Pauls-MBP:pyalmondplus paulenright$ 

参考代码:

import threading
import asyncio
import websockets
import json

class PyAlmondPlus:
    def __init__(self, api_url, event_callback=None):
        self.api_url = api_url
        self.ws = None
        self.loop = asyncio.get_event_loop()
        self.receive_task = None
        self.event_callback = event_callback
        self.keep_running = False

    async def connect(self):
        print("connecting")
        if self.ws is None:
            print("opening socket")
            self.ws = await websockets.connect(self.api_url)
        print(self.ws)

    async def disconnect(self):
        pass

    async def send(self, message):
        pass

    async def receive(self):
        print("receive started")
        while self.keep_running:
            if self.ws is None:
                await self.connect()
            recv_data = await self.ws.recv()
            print(recv_data)
        print("receive ended")

    def start(self):
        self.keep_running = True
        print("Start 1")
        print("Start 2")
        t = threading.Thread(target=self.start_loop, args=())
        print("Start 3")
        t.start()
        print("Receiver running")

    def start_loop(self):
        print("Loop helper 1")
        policy = asyncio.get_event_loop_policy()
        policy.set_event_loop(policy.new_event_loop())
        self.loop = asyncio.get_event_loop()
        self.loop.set_debug(True)
        asyncio.set_event_loop(self.loop)
        self.loop.run_until_complete(self.receive())
        print("Loop helper 2")

    def stop(self):
        print("Stop 1")
        self.keep_running = False
        print("Stop 2")
        self.ws.close()
        print("Stop 3")

共有1个答案

祁远
2023-03-14

我正在研究如何取消当前的ws。recv()[…]我看到了所有三站,但从未看到过接收站。

您的receive协同程序可能在等待某些数据到达时挂起,因此无法检查keep_running标志。

停止运行协程的简单而健壮的方法是取消驱动它的异步任务。这将立即取消挂起协程,并使它所等待的引发一个取消错误。使用取消时,您根本不需要keep_running标志,异常将自动终止循环。

打电话给警察局。start()创建一个助手线程来启动asynico事件循环。

这是可行的,但是对于PyAlmondPlus的每个实例,您并不真正需要一个新线程和一个全新的事件循环。Asyncio设计为在单个线程内运行,因此一个事件循环实例可以承载任意数量的协同路由

下面是一个可能实现这两种想法的设计(未使用实际的web套接字进行测试):

# pre-start a single thread that runs the asyncio event loop
bgloop = asyncio.new_event_loop()
_thread = threading.Thread(target=bgloop.run_forever)
_thread.daemon = True
_thread.start()

class PyAlmondPlus:
    def __init__(self, api_url):
        self.api_url = api_url
        self.ws = None

    async def connect(self):
        if self.ws is None:
            self.ws = await websockets.connect(self.api_url)

    async def receive(self):
        # keep_running is not needed - cancel the task instead
        while True:
            if self.ws is None:
                await self.connect()
            recv_data = await self.ws.recv()

    async def init_receive_task(self):
        self.receive_task = bgloop.create_task(self.receive())

    def start(self):
        # use run_coroutine_threadsafe to safely submit a coroutine
        # to the event loop running in a different thread
        init_done = asyncio.run_coroutine_threadsafe(
            self.init_receive_task(), bgloop)
        # wait for the init coroutine to actually finish
        init_done.result()

    def stop(self):
        # Cancel the running task. Since the event loop is in a
        # background thread, request cancellation with
        # call_soon_threadsafe.
        bgloop.call_soon_threadsafe(self.receive_task.cancel)
 类似资料:
  • 我将EclipseLink与JTA一起使用。当我使用WebLogic server版本(12.2.1.3.0)运行应用程序时,遇到了以下问题 PS-WebLogic版本(12.2.1.2.0)不面临此问题

  • 我在SpringBoot2中使用Hibernate5.3和Hikari2.7,通过官方的JDBC驱动程序使用FileMaker16数据源。 FileMaker服务器性能较差,SQL查询执行时间对于大表可以达到一分钟。有时,当连接池充满从未释放的活动连接时,它会导致连接泄漏。 问题是如何强制挂在池中的活动连接关闭,比如两分钟,将它们闲置并再次使用。 例如,我使用: 在raw中调用几次会导致连接泄漏,

  • 本章关于开始学习 Git。 我们从介绍有关版本控制工具的一些背景知识开始,然后讲解如何在你的系统运行 Git,最后是关于如何设置 Git 开始你的工作。 通过本章的学习,你应该了解为什么 Git 这么流行,为什么你应该使用 Git 以及你应该如何设置以便使用 Git。 什么是“版本控制”?我为什么要关心它呢? 版本控制是一种记录一个或若干文件内容变化,以便将来查阅特定版本修订情况的系统。 在本书所

  • 有人能帮我解决以下异常吗, 我正在使用Oracle OEPE eclipse和weblogic 12c。在我开始开发移动应用程序之前,它工作得很好。 我在eclipse中安装了以下插件。1.Android工具2。JBoos混合移动工具 我的电脑1上安装了以下开源软件。节点Js 2。科尔多瓦3。Ripple仿真器

  • 故事是这样的,我有一个远程服务器和一个防火墙后面的客户端。客户端由netty实现,它将建立一个与远程服务器的保活连接。如果200秒内通道中没有消息传输,防火墙将重置连接到远程服务器端的连接,但客户端没有收到任何tcp数据包(例如RST包),因此客户端认为此连接是活的,而事实并非如此。那么如何在防火墙错误处理此保活连接之前强制关闭不寻常的连接呢?顺便说一句:我无法配置防火墙

  • 问题内容: 这是我第一次尝试在我的应用中添加广告。我已阅读在线文档并逐字阅读。现在,我唯一不了解的部分是如何实际请求广告并将其添加到我的应用中。 到目前为止,这似乎很好。我要做的其他任何事情都将被迫关闭。例如,添加以下行: 我已将AdListener实现到Activity。 顺便说一句,当我将鼠标悬停在导入的类上时: 或其他类似的类,它表示:注意:该元素既没有附加源,也没有附加Javadoc,因此

  • 我正在尝试使用Azure Blob存储。我成功上传了一些图像,但突然我得到了错误: 远程主机强制关闭了现有连接 我研究了一下,每当我试图检查Blob容器是否存在时,都会引发异常。 这是我的代码: BlobClient getter属性:(注意,我已将连接字符串中的敏感数据标记为**) 抛出异常的实际代码: 准确地说,异常发生在我检查容器是否存在的行。 我不知道怎么了。我确信连接字符串是正确的(我复

  • 正如我前面所说的,我能够在visual studio的调试模式下使其正确工作,但一旦创建了可执行文件,我就不能再读取网页了。如有任何帮助,我们将不胜感激。是什么导致了这个问题?