socketIO_client库似乎不支持cryptocompare使用的XHR轮询协议.我通过覆盖socketIO_client.transports.XHR_PollingTransport类中的方法recv_packet来实现它.
import logging
import socketIO_client
from socketIO_client.transports import get_response
from socketIO_client.parsers import get_byte, _read_packet_text, parse_packet_text
from requests.exceptions import ConnectionError
# extra function to support XHR1 style protocol
def _new_read_packet_length(content, content_index):
packet_length_string = ''
while get_byte(content, content_index) != ord(':'):
byte = get_byte(content, content_index)
packet_length_string += chr(byte)
content_index += 1
content_index += 1
return content_index, int(packet_length_string)
def new_decode_engineIO_content(content):
content_index = 0
content_length = len(content)
while content_index < content_length:
try:
content_index, packet_length = _new_read_packet_length(
content, content_index)
except IndexError:
break
content_index, packet_text = _read_packet_text(
content, content_index, packet_length)
engineIO_packet_type, engineIO_packet_data = parse_packet_text(
packet_text)
yield engineIO_packet_type, engineIO_packet_data
def new_recv_packet(self):
params = dict(self._params)
params['t'] = self._get_timestamp()
response = get_response(
self.http_session.get,
self._http_url,
params=params,
**self._kw_get)
for engineIO_packet in new_decode_engineIO_content(response.content):
engineIO_packet_type, engineIO_packet_data = engineIO_packet
yield engineIO_packet_type, engineIO_packet_data
setattr(socketIO_client.transports.XHR_PollingTransport, 'recv_packet', new_recv_packet)
logging.basicConfig(level=logging.DEBUG)
try:
socket = socketIO_client.SocketIO('https://streamer.cryptocompare.com')
socket.emit('SubAdd', { 'subs': ['0~Kraken~BTC~USD'] });
socket.wait()
except ConnectionError:
print('The server is down. Try again later.')