当前位置: 首页 > 知识库问答 >
问题:

带Gui的Websocket

颜英博
2023-03-14

我的第一个类允许我连接到websocket,第二个类允许我尝试有一个图来显示数据。

在代码中,我只是尝试在weboscket运行时使用pyqtgraph显示一个图,但是窗口完全有问题。

谢谢

import asyncio
import json
import websockets
from asyncqt import asyncSlot, QtCore
import pyqtgraph as pg

class Ws(QtCore.QObject):
    dataChanged = QtCore.pyqtSignal(dict)

    def __init__(self, parent=None):
        super().__init__(parent)
        self._websocket = None

    @property
    def websocket(self):
        return self._websocket

    async def connect(self, server,):
        self._websocket = await websockets.connect(server)
        await self.on_message()

    @asyncSlot(dict)
    async def on_message(self):
        while True:
            message = await self.websocket.recv()
            message = json.loads(message)
            self.dataChanged.emit(message)


    @asyncSlot(dict)
    async def send_message(self, message):
        while self.websocket is None:
            await asyncio.sleep(0.2)
        data = json.dumps(message)
        await self.websocket.send(data)

    def run(self):
        loop = asyncio.get_event_loop()
        loop.create_task(self.connect(server="wss://www.bitmex.com/realtime"))
        loop.run_forever()

class TestUi(object):
    def __init__(self):

        self.plt = pg.plot()

        self.ws = Ws()
        self.ws.send_message(
            {"op": "subscribe", "args": ["instrument:XBTUSD"]})
        self.ws.dataChanged.connect(self.update_data)
        self.ws.run()

    def update_data(self, data):
        print(data)


if __name__ == "__main__":
    u = TestUi()

共有1个答案

汲丰茂
2023-03-14

您必须设置asyncqt的eventloop:

import asyncio
import json
import sys
import websockets


from PyQt5 import QtCore, QtWidgets

import pyqtgraph as pg

from asyncqt import QEventLoop, asyncSlot


class Ws(QtCore.QObject):
    dataChanged = QtCore.pyqtSignal(dict)

    def __init__(self, parent=None):
        super().__init__(parent)
        self._websocket = None

    @property
    def websocket(self):
        return self._websocket

    async def connect(
        self, server,
    ):
        self._websocket = await websockets.connect(server)
        await self.on_message()

    @asyncSlot(dict)
    async def on_message(self):
        while True:
            message = await self.websocket.recv()
            message = json.loads(message)
            self.dataChanged.emit(message)

    @asyncSlot(dict)
    async def send_message(self, message):
        while self.websocket is None:
            await asyncio.sleep(0.2)
        data = json.dumps(message)
        await self.websocket.send(data)

    def run(self):
        loop = asyncio.get_event_loop()
        loop.create_task(self.connect(server="wss://www.bitmex.com/realtime"))
        loop.run_forever()


class TestUi(object):
    def __init__(self):

        app = QtWidgets.QApplication(sys.argv)
        loop = QEventLoop(app)
        asyncio.set_event_loop(loop)

        self.plt = pg.plot()

        self.ws = Ws()
        self.ws.send_message({"op": "subscribe", "args": ["instrument:XBTUSD"]})
        self.ws.dataChanged.connect(self.update_data)
        self.ws.run()

    def update_data(self, data):
        print(data)


if __name__ == "__main__":
    u = TestUi()
 类似资料:
  • 问题内容: 我需要从Docker容器中在后台运行的Java应用程序中启动Selenium。启动失败,因为在运行时无法访问X11环境。请参阅下面的内容。 我该怎么办? 问题 我从安装Java 8和Jetty 9.3.x 的简单程序开始运行一个简单的服务(实际上是selenium的东西)。该服务实际上是为了启动一些需要UI才能执行的事情而设置的。我遇到的问题是其中的任何内容执行失败,因为UI在我运行的

  • 我正在尝试使用python3构建带有GUI的youtube下载程序。在学习了python的基础知识之后,我正在尝试构建一个。我正在使用“pafy”和“TKinter”模块。 以下是GUI 1中涉及的小部件。一个输入字段,用于输入URL 2。在它旁边粘贴按钮3。媒体选择下拉菜单4。另一个下拉菜单列出介质质量(取决于以前的介质输入)5。最后是下载按钮 这是我的密码 我被“下载功能”卡住了。我是否正确使

  • 了解如何在Java编程中使用Simple GUI。 以下是最常用的示例 - 如何以不同的字体显示文字? 如何使用GUI绘制线条? 如何在新框架中显示消息? 如何使用GUI绘制多边形? 如何在矩形中显示字符串? 如何使用GUI显示不同的形状? 如何使用GUI绘制实体矩形? 如何创建透明光标? 如何检查是否启用了抗锯齿功能? 如何在框架中显示颜色? 如何使用框架显示饼图? 如何使用GUI绘制文本?

  • 我正在尝试使用Python's gui模块中的一个来检索用户的Twitter用户名和密码: 下面是我的第一次尝试(使用easygui): 下面是我的第二次尝试(使用tkinter): 目前,两个gui都自动出现。相反,一个Python图标出现在我的Windows任务栏上,我必须单击该图标才能显示gui。有什么编程方式可以让gui自动弹出吗?或者我可以使用另一个模块来实现这一点?

  • 我的程序由: > 是一系列功能的治疗。它将两个csv文件作为输入,并返回一行的pandas DataFrame。最后一个函数名为 一个函数,调用add_data(),将结果保存在csv文件中,并在执行结束时通知用户 包括我处理的整个代码会太长,但我显式导入的是:pandas、geopandas、shapely.geometrio、tkinter(见下面的代码)。因为我以前解决了这个问题,所以在使用

  • GUI

    创建和管理窗口和控件。这种窗口可以用作数据输入窗体或自定义用户界面。 Gui, sub-command [, Param2, Param3, Param4] Gui, New [, Options, Title] [v1.1.04] 创建新窗口. Gui, New ; 创建新的未命名 GUI。 Gui, Name:New ; 创建新 GUI,销毁含有此名称的现有 GUI。 为了方便,会设置新

  • GUI

    Graphical User Interface (GUI) Tika在以下链接https://tika.apache.org/download.html.提供了一个jar文件及其源代码https://tika.apache.org/download.html. 下载这两个文件,设置jar文件的类路径。 解压缩源代码zip文件夹,打开tika-app文件夹。 在“tika-1.6\tika-app