当前位置: 首页 > 工具软件 > cryptocompare > 使用案例 >

python socketio例子_连接到cryptocompare的Python socketio示例

周奇文
2023-12-01

socketIO_client库似乎不支持cryptocompare使用的XHR轮询协议.我通过覆盖socketIO_client.transports.XHR_PollingTransport类中的方法recv_packet来实现它.

import logging

import socketIO_client

from socketIO_client.transports import get_response

from socketIO_client.parsers import get_byte, _read_packet_text, parse_packet_text

from requests.exceptions import ConnectionError

# extra function to support XHR1 style protocol

def _new_read_packet_length(content, content_index):

packet_length_string = ''

while get_byte(content, content_index) != ord(':'):

byte = get_byte(content, content_index)

packet_length_string += chr(byte)

content_index += 1

content_index += 1

return content_index, int(packet_length_string)

def new_decode_engineIO_content(content):

content_index = 0

content_length = len(content)

while content_index < content_length:

try:

content_index, packet_length = _new_read_packet_length(

content, content_index)

except IndexError:

break

content_index, packet_text = _read_packet_text(

content, content_index, packet_length)

engineIO_packet_type, engineIO_packet_data = parse_packet_text(

packet_text)

yield engineIO_packet_type, engineIO_packet_data

def new_recv_packet(self):

params = dict(self._params)

params['t'] = self._get_timestamp()

response = get_response(

self.http_session.get,

self._http_url,

params=params,

**self._kw_get)

for engineIO_packet in new_decode_engineIO_content(response.content):

engineIO_packet_type, engineIO_packet_data = engineIO_packet

yield engineIO_packet_type, engineIO_packet_data

setattr(socketIO_client.transports.XHR_PollingTransport, 'recv_packet', new_recv_packet)

logging.basicConfig(level=logging.DEBUG)

try:

socket = socketIO_client.SocketIO('https://streamer.cryptocompare.com')

socket.emit('SubAdd', { 'subs': ['0~Kraken~BTC~USD'] });

socket.wait()

except ConnectionError:

print('The server is down. Try again later.')

 类似资料: