websocket-client的使用

杨骏
2023-12-01

1. 代理访问:

import websocket
ws = websocket.WebSocket()
# websocket的代理使用
ws.connect("ws://example.com/websocket", http_proxy_host="proxy_host_name", http_proxy_port=3128)

2. Long-lived connection

import websocket
try:
    import thread
except ImportError:
    import _thread as thread
import time

def on_message(ws, message):
    print(message)

def on_error(ws, error):
    print(error)

def on_close(ws):
    print("### closed ###")

def on_open(ws):
    def run(*args):
        for i in range(3):
            time.sleep(1)
            ws.send("Hello %d" % i)
        time.sleep(1)
        ws.close()
        print("thread terminating...")
    thread.start_new_thread(run, ())


if __name__ == "__main__":
    websocket.enableTrace(True)
    ws = websocket.WebSocketApp("ws://echo.websocket.org/",
                              on_message = on_message,
                              on_error = on_error,
                              on_close = on_close)
    ws.on_open = on_open
    ws.run_forever()

3. Short-lived one-off send-receive

from websocket import create_connection
ws = create_connection("ws://echo.websocket.org/")
print("Sending 'Hello, World'...")
ws.send("Hello, World")
print("Sent")
print("Receiving...")
result =  ws.recv()
print("Received '%s'" % result)
ws.close()

If you want to customize socket options, set sockopt.
sockopt example

from websocket import create_connection
ws = create_connection("ws://echo.websocket.org/",
                        sockopt=((socket.IPPROTO_TCP, socket.TCP_NODELAY),))

4. 创建自定义的websocket类:

import socket
from websocket import create_connection, WebSocket
class MyWebSocket(WebSocket):
    def recv_frame(self):
        frame = super().recv_frame()
        print('yay! I got this frame: ', frame)
        return frame

ws = create_connection("ws://echo.websocket.org/",
                        sockopt=((socket.IPPROTO_TCP, socket.TCP_NODELAY, 1),), class_=MyWebSocket)

5. SSL连接问题: disable ssl cert verification

WebSocketApp sample

ws = websocket.WebSocketApp("wss://echo.websocket.org")
ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})

create_connection sample

ws = websocket.create_connection("wss://echo.websocket.org",
  sslopt={"cert_reqs": ssl.CERT_NONE})

WebSocket sample

ws = websocket.WebSocket(sslopt={"cert_reqs": ssl.CERT_NONE})

6. disable hostname verification

WebSocketApp sample

ws = websocket.WebSocketApp("wss://echo.websocket.org")
ws.run_forever(sslopt={"check_hostname": False})

create_connection sample

ws = websocket.create_connection("wss://echo.websocket.org",
  sslopt={"check_hostname": False})

WebSocket sample

ws = websocket.WebSocket(sslopt={"check_hostname": False})
ws.connect("wss://echo.websocket.org")

7. SNI support

自动支持Python 2.7.9+ and 3.2+

8. 支持子协议:

ws = websocket.create_connection("ws://example.com/websocket", subprotocols=["binary", "base64"])
 类似资料: