当前位置: 首页 > 工具软件 > binance > 使用案例 >

binance 币本位交割合约期限套利交易(三)--获取基差数据

宇文修筠
2023-12-01

期限套利策略,要能够稳定获得各币种期货和现货的报价,计算各币种的基差数据,然后当基差突破阈值时触发交易。以下代码可以得到稳定报价,计算基差数据,然后当突破阈值时收到提示邮件(还没写好交易模块)。

这里记录一下,开发过程遇到的主要问题,以及如何解决的,如果不感兴趣,可以跳过,直接看代码。

问题一:websocket报价数据不更新

简单的ws.run_forever,通常在运行两三天后,会出现报价卡在某个时点,不能更新的情况。例如,现货报价停留在2021年12月25日11:00。

解决方案:用while true + try except语句,一旦websock链接有问题,就发送邮件通知(不知道为啥,一直没有收到报错邮件,发送邮件的代码可能有问题),回收内存,再重新链接。

    def s_start(self):
        while True:
            try:
                s_ws=WebSocketApp(
                    url='wss://stream.binance.com:9443/ws',
                    on_open=self._s_on_open,
                    on_message=self._s_on_message,
                    on_error=self._s_on_error,
                    on_close=self._s_on_close
                    )
                s_ws.run_forever() 
            except Exception as e:
                 content="%s the spot webscoket is broken,check if it restart"%e
                 self._email(content)
                 gc.collect()   
                

问题二:启动阶段报价数据为空

因为涉及到多币种,刚刚建立链接的时候,报价数据获取有先后,有可能btc数据有了,但是bnb数据要到10秒甚至20秒后才能收到。这样程序会因报错而中断。

解决方案:定义好报错的类型,用@retry装饰器,保证程序在遇到特定类型报错的时候,能够持续运行下去。

    def retry_if_index_error(exception):
        return isinstance(exception,IndexError)
    
    @retry(retry_on_exception=retry_if_index_error)
    def get_basis(self):

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Sun Dec  5 20:15:41 2021
It seems to work really good, the price and basis data is correct after running 13 days.
2021/12/13 16:30 begin
@author: yszhu
"""


from websocket import WebSocketApp
import json,threading
import pysnooper
import time
import smtplib
from email.mime.text import MIMEText
from retrying import retry
import logging
import gc

class basis_dif(object):
    
    """
    create variable save dilivery contract price,spot price and basis difference
    """
    def __init__(self,contract_date=220325):
        #delivery contract price
        self.d_eth_price=None
        self.d_eth_time=None
        self.d_btc_price=None
        self.d_btc_time=None
        self.d_bnb_price=None
        self.d_bnb_price=None
        
        
        #spot price

        self.eth_price=None
        self.eth_time=None
        self.btc_price=None
        self.btc_time=None
        self.bnb_price=None
        self.bnb_time=None
        #delivery and spot basis difference
        self.btc_basis_dif=None
        self.eth_basis_dif=None
        self.bnb_basis_dif=None
        self.contract_date=contract_date
        self.basis_dif_threshold=0.035
        
        #email

        self._mail_host="smtp.qq.com"
        self._mail_user="315960451"
        self._mail_pwd="brvjltflaofrbhcb"
        
        self._sender="youradress@qq.com"
        self._receivers="youradress@qq.com"
        
        # I use self._break_threshold to avoid sending email repeatedly.
        # when self._break_threshold = True ,it means the basis diffrence
        # now is greater than the threshold,so if the diffence becomes smaller
        # than the threshold, this is the first time of break during one operation period.
        # So ,I will receive the email ,and then operate my account.
        self._break_threshold = True
            
        
    #websocket for delivery contrat price 
    def _d_on_open(self,d_ws):
        data={
            "method":"SUBSCRIBE",
            "params":
            [
            "btcusd_%s@aggTrade"%self.contract_date,
            "ethusd_%s@aggTrade"%self.contract_date,
            "bnbusd_%s@aggTrade"%self.contract_date
            ],
            "id": 1
            }                        
        d_ws.send(json.dumps(data))
    
    def _d_on_message(self,d_ws,d_msg):
        d_msg=json.loads(d_msg)
        
        if 's' in d_msg and d_msg['s']=="BTCUSD_%s"%self.contract_date:
            self.d_btc_price=float(d_msg['p'])
            self.d_btc_time=d_msg['T']
            self.d_btc_time=int(self.d_btc_time/1000)
            self.d_btc_time=time.localtime(self.d_btc_time)
            self.d_btc_time=time.strftime("%Y-%m-%d %H:%M:%S",self.d_btc_time)
            

        if 's' in d_msg and d_msg['s']=="ETHUSD_%s"%self.contract_date:
            self.d_eth_price=float(d_msg['p'])
            self.d_eth_time=d_msg['T']
            self.d_eth_time=int(self.d_eth_time/1000)
            self.d_eth_time=time.localtime(self.d_eth_time)
            self.d_eth_time=time.strftime("%Y-%m-%d %H:%M:%S",self.d_eth_time)
            
            
        if 's' in d_msg and d_msg['s']=="BNBUSD_%s"%self.contract_date:
            self.d_bnb_price=float(d_msg['p'])
            self.d_bnb_time=d_msg['T']
            self.d_bnb_time=int(self.d_bnb_time/1000)
            self.d_bnb_time=time.localtime(self.d_bnb_time)
            self.d_bnb_time=time.strftime("%Y-%m-%d %H:%M:%S",self.d_bnb_time)
        
    def _d_on_close(self,d_ws):
        print("##connection closed##")
        
    def _d_on_error(self,d_ws,error):
        print(f"on error:{error}")
    
    def d_start(self):
        while True:
            try:
                d_ws=WebSocketApp(
                    url='wss://dstream.binance.com/ws',
                    on_open=self._d_on_open,
                    on_message=self._d_on_message,
                    on_error=self._d_on_error,
                    on_close=self._d_on_close
                    )
                d_ws.run_forever()
            except Exception as e:
                content="%s the future webscoket is broken,check if it restart"%e
                self._email(content)
                gc.collect()


    #websocket for spot price 
    def _s_on_open(self,s_ws):
        data={
              "method": "SUBSCRIBE",
              "params": [
                "btcusdt@aggTrade",
                "ethusdt@aggTrade",
                 "bnbusdt@aggTrade"
              ],
              "id": 2
             }               
        s_ws.send(json.dumps(data))
    
    def _s_on_message(self,s_ws,s_msg):
        s_msg=json.loads(s_msg)
        
        if 's' in s_msg and s_msg['s']=="BTCUSDT":
            self.btc_price=float(s_msg['p'])
            self.btc_time=s_msg['T']
            self.btc_time=int(self.btc_time/1000)
            self.btc_time=time.localtime(self.btc_time)
            self.btc_time=time.strftime("%Y-%m-%d %H:%M:%S",self.btc_time)


        if 's' in s_msg and s_msg['s']=="ETHUSDT":
            self.eth_price=float(s_msg['p'])
            self.eth_time=s_msg['T']
            self.eth_time=int(self.eth_time/1000)
            self.eth_time=time.localtime(self.eth_time)
            self.eth_time=time.strftime("%Y-%m-%d %H:%M:%S",self.eth_time)

            
            
        if 's' in s_msg and s_msg['s']=="BNBUSDT":
            self.bnb_price=float(s_msg['p'])
            self.bnb_time=s_msg['T']
            self.bnb_time=int(self.bnb_time/1000)
            self.bnb_time=time.localtime(self.bnb_time)
            self.bnb_time=time.strftime("%Y-%m-%d %H:%M:%S",self.bnb_time)


        
    def _s_on_close(self,s_ws):
        print("##connection closed##")
        
    def _s_on_error(self,s_ws,error):
        print(f"on error:{error}")
        
    def s_start(self):
        while True:
            try:
                s_ws=WebSocketApp(
                    url='wss://stream.binance.com:9443/ws',
                    on_open=self._s_on_open,
                    on_message=self._s_on_message,
                    on_error=self._s_on_error,
                    on_close=self._s_on_close
                    )
                s_ws.run_forever() 
            except Exception as e:
                 content="%s the spot webscoket is broken,check if it restart"%e
                 self._email(content)
                 gc.collect()   
                
    
    #because there are 7 kind of coin with spot and future price , so at
    #the begining , there maybe no data for self.bnb_price for the lack of liquidity,
    #In this case , python will raise IndexError
    #we need trying when TypeError is raised. 
    def retry_if_index_error(exception):
        return isinstance(exception,IndexError)
    
    @retry(retry_on_exception=retry_if_index_error)
    def get_basis(self):
        while True:
            self.btc_basis_dif = self.d_btc_price/self.btc_price-1
            self.eth_basis_dif = self.d_eth_price/self.eth_price-1
            self.bnb_basis_dif = self.d_bnb_price/self.bnb_price-1
            
            print("btc_basis_dif is %f" % self.btc_basis_dif)
            print("btc_d_price is %f %s"%(self.d_btc_price,self.d_btc_time))
            print("btc_s_price is %f %s"%(self.btc_price,self.btc_time))

            print("eth_basis_dif is %f"%self.eth_basis_dif)
            print("eth_d_price is %f %s"%(self.d_eth_price,self.d_eth_time))
            print("eth_s_price is %f %s"%(self.eth_price,self.eth_time))
            
            print("bnb_basis_dif is %f"%self.bnb_basis_dif)
            print("bnb_d_price is %f %s"%(self.d_bnb_price,self.d_bnb_time))
            print("bnb_s_price is %f %s"%(self.bnb_price,self.bnb_time))
            
            basis_dif_dict={
                "btc":[self.btc_basis_dif,self.btc_price,self.d_btc_price],
                "eth":[self.eth_basis_dif,self.eth_price,self.d_eth_price],
                "bnb":[self.bnb_basis_dif,self.bnb_price,self.d_bnb_price],
                }
            
            basis_dif_dict=sorted(basis_dif_dict.items(),key=lambda x:x[1],reverse=True)
            greatest_basis_dif=basis_dif_dict[0][1][0]        
            print("the greatest basis is %s %f,Spot pirice %f,future price %f"%(
                                basis_dif_dict[0][0],
                                greatest_basis_dif,
                                basis_dif_dict[0][1][1],
                                basis_dif_dict[0][1][2]
                                ))

            
            if greatest_basis_dif>self.basis_dif_threshold:
                if self._break_threshold == True:
                    content="the greatest basis is %s %f,Spot pirice %f,future price %f"%(
                                basis_dif_dict[0][0],
                                greatest_basis_dif,
                                basis_dif_dict[0][1][1],
                                basis_dif_dict[0][1][2]
                                )
                    self._email(content)
                    self._break_threshold = False
                    
            if greatest_basis_dif<self.basis_dif_threshold:
                self._break_threshold= True
            
            
            
    def _email(self,content):
        '''
        if the basis_dif reached the threshold, send an email

        param:content: which coin's bais_dif has reach the threshold at what price and when
        
        '''
        
        message=MIMEText(content,'plain',"utf-8")
        message['Subject']="from new greatest basis dif:basis difference reached the threshold"
        message['From']=self._sender
        message['to']=self._receivers[0]
        smtpObj=smtplib.SMTP()
        smtpObj.connect(self._mail_host,25)
        smtpObj.login(self._mail_user,self._mail_pwd)
        smtpObj.sendmail(self._sender,self._receivers,message.as_string())
        smtpObj.quit()
    

if __name__=="__main__":
    test=basis_dif()
    # because I want to recieve spot and contract price at the same time, so I create two threads 
    t1=threading.Thread(target=test.d_start)
    t2=threading.Thread(target=test.s_start)
    t1.start()
    t2.start()
    test.get_basis()
    
   
 类似资料: