Python3: 支持 WebSocket 客户端/服务端、同步/异步 的多个 Python 模块

姬昊焱
2023-12-01

本文链接: https://xiets.blog.csdn.net/article/details/115558069

Python3 学习笔记(目录)

Python 有多个支持 WebSocket 客户端、服务端、同步、异步 的第三方相关模块。

用于 webscoket echo 回显测试的地址,连接上服务器后给服务器发送消息,服务器会把消息回传给客户端:

WebSocket 协议标准:

1. websocket-client 模块

websocket-client 是 Python 的 WebSocket (同步)客户端,它提供了对 WebSocket 低级 API 的访问。websocket-client 目前版本实现了 WebSocket 协议的 hybi-13 版本,不支持来自 RFC 7692 的 permessage-deflate 扩展。

相关链接:

安装 websocket-client 模块:

$ pip3 install websocket-client

WebSocket 简单示例,连接、发送/接收消息、关闭连接:

>>> import websocket
>>> 
>>> # websocket.enableTrace(True)   # 打开日志, 将详细输出通讯过程
>>> 
>>> ws = websocket.WebSocket()
>>> ws.connect("ws://echo.websocket.org")
>>> ws.send("Hello Server")
18
>>> ws.recv()
'Hello Server'
>>> ws.close()

作为长连接的 WebSocketApp 简单示例:

import websocket
import time
import threading


def on_open(wsapp):
    print("on_open")
    
    def send_message():
        for i in range(10):
            wsapp.send(f"Hello {i}")
            time.sleep(1)
        wsapp.close()
        
    threading.Thread(target=send_message).start()


def on_message(wsapp, message):
    print("on_message:", message)


def on_close(wsapp):
    print("on_close")


wsapp = websocket.WebSocketApp("ws://echo.websocket.org",
                               on_open=on_open,
                               on_message=on_message,
                               on_close=on_close)
wsapp.run_forever()

WebSocket 只适用于短期连接(通过自己封装也可以用于长期连接),长期连接需使用 WebSocketApp

1.1 websocket.WebSocket

websocket.WebSocket 是适合用于短期的 WebSocket 客户端,需要手动接收数据。

1.1.1 WebSocket 类的 主要方法

WebSocket 中主要的几个方法:

  • class WebSocket()
  • def connect()
  • def send()
  • def recv()
  • def close()
# 创建一个 WebSocket 客户端对象, 所有参数都是可选参数。
#
# get_mask_key: 自定义 mask key 的生成函数, 此参数主要是为了测试目的。
#               参数类型为一个可调用对象(函数), 此函数有一个 int 类型的参数, 
#               函数必须返回一个长度为传入参数长度的字符串 (字节数组)。
#
# sockopt: socket.setsockopt 的可选参数值, tuple 类型。
#
# sslopt: SSL Socket 的可选项参数, 可选项 dict 对象。
#         例如禁用 SSL 证书校验: sslopt={"cert_reqs": ssl.CERT_NONE}
#         禁用域名校验: sslopt={"check_hostname": False}
#
# fire_cont_frame: 是否为每个 连续帧 (cont frame) 触发 recv 事件。
# enable_multithread: 如果设置为 True, 则锁定发送方法。
# skip_utf8_validation: 是否跳过 utf8 校验。
#
class WebSocket(get_mask_key=None, sockopt=None, sslopt=None,
                fire_cont_frame=False, enable_multithread=False,
                skip_utf8_validation=False, **_)


# 连接 WebSocket 服务器。
#
# url: WebSocket 服务器地址
#
# **options: 可选参数
#   header: list/dict, 自定义的 HTTP 请求头, 格式为: ["name1: value1", ...] 或 {"name1": "value1", ...}
#   cookie: str, 自定义 HTTP 请求头 Cookie 的值
#   host: str, 自定义 HTTP 请求头 Host 的值。
#   origin: str, 自定义 HTTP 请求头 Origin 的值(URL)。
#   connection: str, 自定义 HTTP 请求头 Connection 的值, 默认为 "Upgrade"。
#   suppress_origin: bool, 抑制输出 origin header。
#
#   timeout: int/float, 超时时间, 单位为秒
#
#   proxy_type: 代理的类型, 默认为 "http", 其他类型 'socks4', 'socks5', 'socks5h'
#   http_proxy_host: 代理的 host
#   http_proxy_port: 代理的 port, 默认为 80
#   http_no_proxy: 不使用代理的 host names, list 类型, 如: ["host1", "host2", ...]
#   http_proxy_auth: HTTP 代理认证, tuple 类型, 如: ("username", "password")
#
#   redirect_limit: 最大重定向次数, int 类型
#   subprotocols: 可用的子协议数组
#   socket: 被预先初始化的 Socket 流
#
connect(url, **options)


# 发送数据
#
# payload: 发送的数据(载荷), str/bytes 类型
#
# opcode: 操作码, 可选值:
#         ABNF.OPCODE_CONT
#         ABNF.OPCODE_TEXT
#         ABNF.OPCODE_BINARY
#         ABNF.OPCODE_CLOSE
#         ABNF.OPCODE_PING
#         ABNF.OPCODE_PONG
#
# 默认为发送 TEXT 数据帧
#
send(payload, opcode=ABNF.OPCODE_TEXT)


# 接收数据
# 
# 接收 ABNF.OPCODE_TEXT 数据帧, 返回 str
# 接收 ABNF.OPCODE_BINARY 数据帧, 返回 bytes
# 其他类型的数据将 将被忽略(跳过/不返回) 或 直接返回 ""
#
recv()


# 关闭 WebSocket, 先发送关闭帧再关闭底层 socket。
#
# status: 发送关闭数据帧时的数据(告诉服务端关闭的状态码), 默认为正常关闭。
# reason: 发送关闭数据帧时的数据(告诉服务端关闭的原因)。
# timeout: 直到超时还没有收到一个关闭帧, 则不再等待, 直接关闭底层 socket。
#          正常为收到关闭帧后再关闭底层 socket。
#          int/float 秒数, None 表示一直等待直到收到一个关闭帧。
#
close(status=STATUS_NORMAL, reason=bytes('', encoding='utf-8'), timeout=3.0)

也可以调用 websocket.create_connection() 函数直接创建 WebSocket 客户端并连接服务器,函数原型:

# 创建 WebSocket 客户端, 并连接 url 服务器,
# class_ 表示要创建的客户端的类型,
# options 参数与 WebSocket.connect() 中的 options 参数一致
websocket.create_connection(url, timeout=None, class_=WebSocket, **options)

1.1.2 WebSocket 类的 其他属性和方法

handshake_response      # WebSocket 连接成功后的 握手响应
getstatus()             # int, 握手响应的 HTTP 状态码
getheaders()            # dict, 握手响应的 HTTP 响应头
is_ssl()                # bool, 是否是 SSL 连接


send_frame(frame)       # 发送数据帧, 数据帧是 操作码+数据 的封装, 所有操作码和数据发送最终都经过此方法
send_binary(payload)    # 发送二进制数据, 即: send(payload, ABNF.OPCODE_BINARY)
ping(payload="")        # 发送 ping 数据, 即: send(payload, ABNF.OPCODE_PING)
pong(payload="")        # 发送 pong 数据, 即: send(payload, ABNF.OPCODE_PONG)


recv_data(control_frame=False)          # 接收数据和操作码, 返回: (opcode, frame.data)
                                        # control_frame 表示如果是控制帧(PING/PONG) 是否返回数据
recv_data_frame(control_frame=False)    # 接收数据和操作码, 返回: (opcode, frame)
recv_frame()                            # 接收数据帧, 所有操作码和数据接收最终都经过此方法
next()                                  # 接收下一条数据, 即: recv()


send_close(status=STATUS_NORMAL,        # 手动发送关闭帧, 一般使用 close()
           reason=bytes('', encoding='utf-8'))
abort()                 # 立即关闭底层 socket, 一般使用 close()
shutdown()              # 关闭底层 socket, 一般使用 close()
connected               # 连接是否已关闭


set_mask_key(func)      # 构造方法中的 get_mask_key 参数
getsubprotocol()        # 获取自协议
gettimeout()            # connect() 中的 timeout
settimeout(timeout)     # connect() 中的 timeout
sock                    # 底层 socket
fileno()                # sock.fileno()

1.1.3 WebSocket 类的 代码示例

import websocket

ws = websocket.WebSocket()              # 创建 WebSocket 客户端对象
ws.connect("ws://echo.websocket.org")   # 连接 WebSocket 服务器

ws.send("Hello")        # 发送 TEXT 数据帧, 数据为 "Hello"
print(ws.recv())        # 接收 TEXT/BINARY 数据帧的数据, 输出: "Hello"

ws.ping()               # 发送 PING 数据帧, 内部调用 send("", ABNF.OPCODE_PING)

print(ws.recv_frame())  # 接收原始数据帧, 结果为 fin=1 opcode=10 data=b'',
                        # 其中 10 表示操作码 ABNF.OPCODE_PONG

ws.close()              # 关闭客户端

ping(), pong()recv_xxx() 方法的说明:

# ping() 和 pong() 是 WebSocket 协议的一部分, 用于 客户端 和 服务端 之间双向测试保活连接。
#
# 客户端: 客户端定时向服务端发送 ping 数据帧, 服务端收到后会立即返回一个 pong 数据帧。
# 服务端: 服务端定时向客户端推送 ping 数据帧, 客户端收到后需要立即发回一个 pong 数据帧。
#
# 发出 ping 数据帧后, 如果超时没有收到 pong 数据帧的回复, 则连接可能已断开, 需要重新发起连接。
# ping 是主动发送, pong 是被动发送, 主动发送 pong 会无反应。
#
# 接收数据帧的方法调用层级:
#
# recv() -> str/bytes
#     recv_data(control_frame=False) -> (opcode, data)
#        recv_data_frame(control_frame=False) -> (opcode, frame)
#            recv_frame() -> frame
#
# recv()
#       只接收 TEXT 和 BINARY 操作码的数据(载荷),
#       返回 str/bytes, 如果是其他操作码 则返回 ""
#
# recv_data(control_frame=False)
#       接收 操作码 和 数据(载荷), 返回操作码和数据组成的元祖 (opcode, data)
#       control_frame 参数将直接传递给 recv_data_frame()
#
#
# recv_data_frame(control_frame=False)
#       接收 操作码 和 数据帧, 返回操作码和数据帧组成的元祖 (opcode, frame)
#
#       control_frame 参数表示接收到控制帧(PING/PONG) 后是否返回。
#       control_frame=True 表示接收到控制帧(PING/PONG) 后返回, 不再继续接收数据帧。
#       control_frame=False 表示接收到控制帧(PING/PONG) 后不返回, 继续等待接收下一个数据帧。
#
#       注意:
#           此次方法接收到 PING 数据帧后, 会自动给服务端回复 PONG 帧,
#           无需手动回复, 回复后根据 control_frame 参数决定是否返回。
#
#           此方法接收到 CLOSE 数据帧后, 会自动调用 send_close() 回复 CLOSE 数据帧给服务端,
#           并直接返回 (opcode, frame), 即不会判断 control_frame。
#           接收到 CLOSE 数据帧一般为服务端通知客户端即将要关闭这个连接, 客户端则标记此连接已关闭, 
#           并回复服务端一个 CLOSE 数据帧表示客户端成功收到关闭通知了。
#
# recv_frame()
#       接收 数据帧 的原始方法, 返回数据帧对象, 不会自动处理 PING/PONG/CLOSE。
#
# 数据帧 = 操作码 + 数据(载荷)

1.2 websocket.WebSocketApp

websocket.WebSocketApp 是对 websocket.WebSocket 的封装,支持自动定时发送 PING 帧,支持事件驱动方式的数据帧接收,可用于长期的 WebSocket 连接。

1.2.1 WebSocketApp 类的主要方法

WebSocketApp 的构造方法:

# WebSocketApp 构造方法
class websocket.WebSocketApp(
            url,
            header=None,
            cookie=None,
            on_open=None,
            on_data=None, 
            on_message=None, 
            on_cont_message=None,
            on_ping=None, 
            on_pong=None,
            on_error=None,
            on_close=None, 
            get_mask_key=None, 
            subprotocols=None)

# 参数说明:
#       on_ 开头的参数均为事件回调函数。
#
# url:
#       WebSocket 服务端 URL
#
# header:
#       连接握手时的自定义 HTTP 请求头, 格式参考 WebSocket
#
# cookie:
#       连接握手时自定义 HTTP 请求头 Cookie 的值, 参考 WebSocketApp
#
# on_open:
#       WebSocket 连接成功后调用的函数, 只调用一次。
#       函数格式: 
#           on_open(wsapp)
#
# on_data:
#       接收 TEXT, BINARY 或 CONT 数据帧时调用, 
#       此方法在 on_message 或 on_cont_message 之前调用。
#       on_data 调用后, on_message 或 on_cont_message 也会调用。
#       函数格式: 
#           on_data(wsapp, frame_data, frame_opcode, frame_fin)
#
# on_message:
#       接收到 TEXT 或 BINARY 数据帧时调用的函数
#       函数格式: 
#           on_message(wsapp, data)
#
# on_cont_message:
#       接收到 CONT 数据帧时调用
#       函数格式: 
#           on_cont_message(wsapp, frame_data, frame_fin)
#
# on_ping:
#       接收到 PING 数据帧时调用, 
#       不会自动回复 PONG, 需手动发送 PONG 回复服务端
#       函数格式: 
#           on_ping(wsapp, frame_data)
#
# on_pong:
#       接收到 PONG 数据帧时调用
#       函数格式: 
#           on_pong(wsapp, frame_data)
#
# on_error:
#       当事件循环中有任何异常抛出, 将调用此函数, 
#       然后关闭 WebSocket 连接, 调用 on_close(),
#       然后 run_forever() 将被返回。
#       即调用 on_error 后必定会调用 on_close。
#       on_xxx 回调方法中抛出的异常将被捕获, 不会抛给 run_forever()
#       函数格式: 
#           on_error(wsapp, exception)
#
# on_close:
#       WebSocket 被关闭时调用的函数。
#       事件循环中抛出异常 或 收到服务端推送的 CLOSE 数据帧关闭客户端时将被调用。
#       主动调用 wsapp.close() 不会调用 on_close()
#       函数格式: 
#           on_close(wsapp, close_status_code, close_reason)
#
# get_mask_key:
#       自定义 mask key 的生成函数, 参考 WebSocketApp
#
# subprotocols:
#       可用的子协议数组, 参考 WebSocketApp

WebSocketApp 的主要方法:

  • run_forever()
  • send()
  • close()
# 运行 WebSocketApp 的事件循环, 先创建 WebSocket 对象, 
# 然后 connect 连接服务器, 之后一直循环运行接收数据帧, 回调对应函数处理数据帧。
# 当 WebSocket 客户端被关闭后, 将调用 on_close() 方法, 然后结束循环返回。
# 当循环中引发异常被捕获, 将依次调用 on_error()、on_close() 方法, 然后结束循环返回。
#
#
# 参数说明:
#   sockopt: socket.setsockopt 的可选参数值, tuple 类型。
#   sslopt: SSL Socket 的可选项参数, 可选项 dict 对象。
#           例如禁用 SSL 证书校验: sslopt={"cert_reqs": ssl.CERT_NONE}
#           禁用域名校验: sslopt={"check_hostname": False}
#
#   ping_interval: 自动发送 PING 数据帧的间隔秒数, 默认为 0 表示不自动发送 PING
#   ping_timeout: PING 发送后接收 PONG 的超时秒数, 如果超时未收到 PONG, 
#                 则抛出异常并调用 on_error()、on_close(), 然后结束循环。
#                 None 表示不检测超时。
#   ping_payload: 发送 PING 数据帧时携带的数据, 默认为空字符串
#
#   proxy_type: 代理的类型, 默认为 "http", 其他类型 'socks4', 'socks5', 'socks5h'
#   http_proxy_host: 代理的 host
#   http_proxy_port: 代理的 port, 默认为 80
#   http_no_proxy: 不使用代理的 host names, list 类型, 如: ["host1", "host2", ...]
#   http_proxy_auth: HTTP 代理认证, tuple 类型, 如: ("username", "password")
#
#   skip_utf8_validation: 是否跳过 utf8 校验。
#   host: str, 自定义 HTTP 请求头 Host 的值。
#   origin: str, 自定义 HTTP 请求头 Origin 的值(URL)。
#   dispatcher: 自定义从 socket 读取数据的函数
#   suppress_origin: bool, 抑制输出 origin header。
#
run_forever(sockopt=None, 
            sslopt=None,
            ping_interval=0, 
            ping_timeout=None,
            ping_payload="",
            proxy_type=None,
            http_proxy_host=None, 
            http_proxy_port=None,
            http_no_proxy=None,,
            http_proxy_auth=None
            skip_utf8_validation=False,
            host=None, 
            origin=None, 
            dispatcher=None,
            suppress_origin=False)


# 发送数据
#
# payload: 发送的数据(载荷), str/bytes 类型
#
# opcode: 操作码, 可选值:
#         ABNF.OPCODE_CONT
#         ABNF.OPCODE_TEXT
#         ABNF.OPCODE_BINARY
#         ABNF.OPCODE_CLOSE
#         ABNF.OPCODE_PING
#         ABNF.OPCODE_PONG
#
# 默认为发送 TEXT 数据帧
#
send(data, opcode=ABNF.OPCODE_TEXT)


# 关闭 WebSocket, 先发送关闭帧再关闭底层 socket。
#
# status: 发送关闭数据帧时的数据(告诉服务端关闭的状态码), 默认为正常关闭。
# reason: 发送关闭数据帧时的数据(告诉服务端关闭的原因)。
# timeout: 直到超时还没有收到一个关闭帧, 则不再等待, 直接关闭底层 socket。
#          正常为收到关闭帧后再关闭底层 socket。
#          int/float 秒数, None 表示一直等待直到收到一个关闭帧。
#
close(status=STATUS_NORMAL, reason=bytes('', encoding='utf-8'), timeout=3.0)

1.2.2 WebSocketApp 类的 代码示例

import websocket
import time
import threading


def on_open(wsapp):
    print("on_open")

    def send_message():
        for i in range(5):
            wsapp.send(f"Hello {i}")
            time.sleep(1)

    threading.Thread(target=send_message).start()


def on_data(wsapp, frame_data, frame_opcode, frame_fin):
    print("on_data", frame_data, frame_opcode, frame_fin, sep=", ")


def on_message(wsapp, data):
    print("on_message", data, sep=", ")


def on_cont_message(wsapp, frame_data, frame_fin):
    print("on_cont_message", frame_data, frame_fin, sep=", ")


def on_ping(wsapp, frame_data):
    print("on_ping", frame_data, sep=", ")
    # 接收到 PING 数据帧后, 需要立即给服务端回复 PONG 数据帧
    wsapp.send("", websocket.ABNF.OPCODE_PONG)


def on_pong(wsapp, frame_data):
    print("on_pong", frame_data, sep=", ")


def on_error(wsapp, e):
    print("on_error", e, sep=", ")


def on_close(wsapp, close_status_code, close_reason):
    print("on_close", close_status_code, close_reason, sep=", ")


wsapp = websocket.WebSocketApp("ws://echo.websocket.org",
                               on_open=on_open,
                               on_data=on_data,
                               on_message=on_message,
                               on_cont_message=on_cont_message,
                               on_ping=on_ping,
                               on_pong=on_pong,
                               on_error=on_error,
                               on_close=on_close)

wsapp.run_forever(ping_interval=5, ping_timeout=2)

2. 异步 websocket 客户端/服务端 aiohttp 模块

aiohttp 是一个基于 asyncio 异步的 HTTP 客户端 和 服务端,并且支持 WebSockets 客户端 和 服务端,这里只介绍 WebSockets。

相关链接:

安装 aiohttp 模块:

# (必选) 安装 aiohttp 模块
$ pip3 install aiohttp

# (可选) 安装
$ pip3 install cchardet     # 使用更快的 cchardet 库作为 chardet 的替代品
$ pip3 install aiodns       # 为了通过客户端 API 加速 DNS 解析

2.1 aiohttp 的 HTTP 请求

import aiohttp
import asyncio


async def main():
    # 创建 HTTP 会话客户端
    session = aiohttp.ClientSession()

    # 异步请求, 经过 async with 异步处理后在 __aenter__() 方法中返回响应对象 response
    async with session.get("https://httpbin.org/get", verify_ssl=False) as response:
        print(type(response))           # <class 'aiohttp.client_reqrep.ClientResponse'>
        print(response.version)         # HttpVersion(major=1, minor=1)
        print(response.status)          # 200
        print(response.reason)          # OK
        print(response.headers)         # <CIMultiDictProxy('Content-Type': 'application/json'...
        print(response.content)         # <StreamReader 316 bytes eof>,
                                        # content 是 Body 的 IO 流对象, 可以通过 await read(n) 方法逐块读取响应
        body = await response.text()    # 异步读取响应并解码为文本
        print(body)

    # 异步关闭客户端 (session 也支持 async with)
    await session.close()


asyncio.run(main())

2.2 aiohttp WebSocket Server

使用 aiohttp 实现一个用于 echo 回显的 WebSocket 服务端(顺便实现一个 HTTP Server):

from aiohttp import web
from aiohttp.web_request import Request
from aiohttp.web_response import Response
from aiohttp.web_ws import WebSocketResponse


async def http_handle(request: Request) -> Response:
    """
    HTTP 请求处理器
    """
    # web.get("/{name}", ...) 中的 name
    name = request.match_info.get("name")

    # 请求 Body, 也可以通过 await request.content.read(n) 逐块读取
    body = await request.text()

    resp_text = f"Your Request Info:\n" \
                f"name: {name}\n" \
                f"IP: {request.remote}\n" \
                f"Method: {request.method}\n" \
                f"Version: {request.version}\n" \
                f"Url: {request.url}\n" \
                f"Path: {request.path}\n" \
                f"Headers: {request.headers}\n" \
                f"Body: {body}"

    # 返回一个 响应对象
    return web.Response(status=200,
                        text=resp_text,
                        content_type="text/plain")


async def ws_handle(request: Request) -> WebSocketResponse:
    """
    WebSocket 请求处理器
    """
    # 创建一个 WebSocket 响应, 自动响应 CLOSE, 收到 PING 后自动回复 PONG
    ws = web.WebSocketResponse(autoclose=True, autoping=True)
    # 预处理请求
    await ws.prepare(request)

    # 循环处理消息, 直到 WebSocket 退出
    # ws.__anext__() 方法中调用了 await ws.receive() 接收消息并返回
    async for msg in ws:
        # msg: <class 'aiohttp.http_websocket.WSMessage'>
        if msg.type == web.WSMsgType.TEXT:
            await ws.send_str(msg.data)
        elif msg.type == web.WSMsgType.BINARY:
            await ws.send_bytes(msg.data)
        elif msg.type == web.WSMsgType.CLOSE:
            break

    print("websocket connection closed.")
    return ws


# 创建一个 Web 应用
app = web.Application()

# 添加路由路径对应的处理器, 按列表顺序依次匹配
app.add_routes([web.get("/", http_handle),          # curl http://localhost:8080/
                web.post("/", http_handle),         # curl -d "aa=bb" http://localhost:8080/
                web.get("/echo", ws_handle),        # ws://localhost:8080/echo
                web.get("/{name}", http_handle)])   # curl http://localhost:8080/hello

# 运行一个 Web 应用,
# 这里不需要在异步方法中运行, 内部会在 asyncio 异步事件循环中处理
web.run_app(app, host=None, port=8080)

2.3 aiohttp WebSocket Client

import asyncio
import aiohttp
from aiohttp import web
from aiohttp.client_ws import ClientWebSocketResponse
import random


async def message_handler(ws: ClientWebSocketResponse):
    while True:
        # msg: <class 'aiohttp.http_websocket.WSMessage'>
        msg = await ws.receive()
        if msg.type == web.WSMsgType.TEXT:
            print("RECV:", msg.data)
        elif msg.type == web.WSMsgType.BINARY:
            print("RECV:", msg.data)
        elif msg.type == web.WSMsgType.PING:
            print("RECV: PING")
            await ws.pong()     # 收到 PING 后给服务端回复 PONG
        elif msg.type == web.WSMsgType.PONG:
            print("RECV: PONG")
        elif msg.type == web.WSMsgType.CLOSE:
            print("RECV: CLOSE")
        else:
            print("RECV:", msg)


async def main():
    async with aiohttp.ClientSession() as session:
        # 链接 WebSocket 服务器, 不自动处理 PING
        async with session.ws_connect("ws://localhost:8080/echo", autoping=False) as ws:
            # ws: <class 'aiohttp.client_ws.ClientWebSocketResponse'>

            # 异步 循环接收消息
            recv_task = asyncio.create_task(message_handler(ws))

            while True:
                if ws.closed:
                    break

                # 发送文本
                text = f"Hello: {random.randint(100, 999)}"
                print("SEND:", text)
                await ws.send_str(text)

                await asyncio.sleep(2)

                # 发送二进制
                bs = f"World: {random.randint(100, 999)}".encode()
                print("SEND:", bs)
                await ws.send_bytes(bs)

                await asyncio.sleep(2)

                # 发送 PING 数据帧
                print("SEND: PING")
                await ws.ping()

                await asyncio.sleep(2)

            await recv_task


asyncio.run(main())

3. 异步 websockets 模块

websockets 是用于在 Python 中构建 WebSocket 客户端 和 服务端 的一个库。它建立在 Python 的标准异步 I/O 框架 asyncio 之上,提供了一个优雅的基于协程的 API。

websockets 需要 Python 3.6.1+。

相关网站:

安装 websockets 模块:

$ pip3 install websockets

websocket 服务端/客户端 简单实现:

服务端:

import asyncio
import websockets
from websockets.legacy.server import WebSocketServerProtocol


async def ws_handle(websocket: WebSocketServerProtocol, path: str):
    async for message in websocket:
        await websocket.send(message)


async def main():
    async with websockets.serve(ws_handle, "localhost", 8080):
        await asyncio.Future()              # run forever


asyncio.run(main())

客户端:

import asyncio
import websockets


async def main():
    async with websockets.connect("ws://localhost:8080") as websocket:
        # websocket: <class 'websockets.legacy.client.WebSocketClientProtocol'>
        await websocket.send("Hello World")
        msg = await websocket.recv()
        print(msg)


asyncio.run(main())
 类似资料: