import websocket
ws = websocket.WebSocket()
# websocket的代理使用
ws.connect("ws://example.com/websocket", http_proxy_host="proxy_host_name", http_proxy_port=3128)
import websocket
try:
import thread
except ImportError:
import _thread as thread
import time
def on_message(ws, message):
print(message)
def on_error(ws, error):
print(error)
def on_close(ws):
print("### closed ###")
def on_open(ws):
def run(*args):
for i in range(3):
time.sleep(1)
ws.send("Hello %d" % i)
time.sleep(1)
ws.close()
print("thread terminating...")
thread.start_new_thread(run, ())
if __name__ == "__main__":
websocket.enableTrace(True)
ws = websocket.WebSocketApp("ws://echo.websocket.org/",
on_message = on_message,
on_error = on_error,
on_close = on_close)
ws.on_open = on_open
ws.run_forever()
from websocket import create_connection
ws = create_connection("ws://echo.websocket.org/")
print("Sending 'Hello, World'...")
ws.send("Hello, World")
print("Sent")
print("Receiving...")
result = ws.recv()
print("Received '%s'" % result)
ws.close()
If you want to customize socket options, set sockopt.
sockopt example
from websocket import create_connection
ws = create_connection("ws://echo.websocket.org/",
sockopt=((socket.IPPROTO_TCP, socket.TCP_NODELAY),))
import socket
from websocket import create_connection, WebSocket
class MyWebSocket(WebSocket):
def recv_frame(self):
frame = super().recv_frame()
print('yay! I got this frame: ', frame)
return frame
ws = create_connection("ws://echo.websocket.org/",
sockopt=((socket.IPPROTO_TCP, socket.TCP_NODELAY, 1),), class_=MyWebSocket)
WebSocketApp sample
ws = websocket.WebSocketApp("wss://echo.websocket.org")
ws.run_forever(sslopt={"cert_reqs": ssl.CERT_NONE})
create_connection sample
ws = websocket.create_connection("wss://echo.websocket.org",
sslopt={"cert_reqs": ssl.CERT_NONE})
WebSocket sample
ws = websocket.WebSocket(sslopt={"cert_reqs": ssl.CERT_NONE})
WebSocketApp sample
ws = websocket.WebSocketApp("wss://echo.websocket.org")
ws.run_forever(sslopt={"check_hostname": False})
create_connection sample
ws = websocket.create_connection("wss://echo.websocket.org",
sslopt={"check_hostname": False})
WebSocket sample
ws = websocket.WebSocket(sslopt={"check_hostname": False})
ws.connect("wss://echo.websocket.org")
自动支持Python 2.7.9+ and 3.2+
ws = websocket.create_connection("ws://example.com/websocket", subprotocols=["binary", "base64"])