当前位置: 首页 > 工具软件 > Volume Bar > 使用案例 >

4、Python量化交易-padas导入历史bar回测

江俊能
2023-12-01

引言

  • 通过ma-5min的数据,生成回测数据,进行策略回测校验
  • 这里说的回测数据,就是bar的ticks数据,然后传递给bar_generator导入回测数据

点击下载数据样本:提取码=iat9

一、padas安装

import pandas as pd:ALT+ENTER -> 选择 -> Install package pandas

二、主要思路说明

代码中已经对get_ticks_for_backtesting方法比较详细的注释了各个实现步骤,这里主要针对一些疑难进行说明

  • step步长:需要根据开盘价进行步长设置

    • 如果股票的价格低于30,那么涨停和跌停的10%(30*10%=3,3/0.01=300),也就是说会生成300条bar数据
    • 如果股票的价格很高,比如1000
      • 如果不调整步长还使用0.01的话,涨停和跌停的10%(1000*10%=100,100/0.01=10000),这里就会生成10000条的bar数据
      • 由于ma-5min的每个bar的open,high,low,close都是确定的,回测的时候我们不需要这么多的bar数据,影响效率
      • 所以我们修改step=0.1,涨停和跌停的10%(1000*10%=100,100/0.1=1000),1000条bar足够回测了
  • arr = np.append(arr, row[‘high’])与arr = np.append(arr, row[‘low’]):这个是为了弥补步长的不对等会漏掉high或low

    • 以high为例,假设open=30.02,high=30.10,step=0.03,那么从open开始递进的步长为30.05、30.08,而下个递进步长为30.11已经大于30.10了,这样high的数据就会被忽略(low同理)

三、完整源码

import requests
from time import sleep
from datetime import datetime, time, timedelta
from dateutil import parser
import pandas as pd
import numpy as np
import os


# __init__,构造,初始化,实例化

class AstockTrading(object):
    def __init__(self, strategy_name):
        self._strategy_name = strategy_name
        self._Dt = []  # 交易时间
        self._Open = []  # 开盘价
        self._High = []  # 最高价
        self._Low = []  # 最低价
        self._Close = []  # 最新价
        self._Volume = []
        self._tick = []  # 数据
        self._last_bar_start_minute = None  # 最后一次更新bar的时间
        self._isNewBar = False  # 是否有新bar
        self._ma20 = None

        # 当前订单,dict, 字典
        self._current_orders = {}
        # 历史订单
        self._history_orders = {}
        self._order_number = 0

    def get_tick(self):
        headers = {'Referer': "https://finance.sina.com.cn"}
        page = requests.get("https://hq.sinajs.cn/list=sh600519", headers=headers)
        stock_info = page.text
        mt_info = stock_info.replace("\"", "").split("=")[1].split(",")
        # 最新价
        last = float(mt_info[1])
        trade_datetime = mt_info[30] + ' ' + mt_info[31]
        self._tick = (trade_datetime, last)

    def get_history_data_from_local_machine(self):
        # tushare 数据来源
        # self.Open = [1, 2, 3]
        # self.High = [2, 3, 4]
        pass

    # how save and import history data?
    # 策略
    def bar_generator(self):
        # assume we have history data already
        # 1、update bars,calculate 5 minutes ma20 , not daily data
        # 2、compare last and ma20  -> buy or sell or pass
        # assume we have history data,Open,High,Low,Close,Dt
        # 这里可以是5minutes、10minutes、15minutes、20minutes、30minutes
        if self._tick[0].minute % 5 == 0 and self._tick[0].minute != self._last_bar_start_minute:
            self._last_bar_start_minute = self._tick[0].minute
            self._Open.insert(0, self._tick[1])
            self._High.insert(0, self._tick[1])
            self._Low.insert(0, self._tick[1])
            self._Close.insert(0, self._tick[1])
            self._Dt.insert(0, self._tick[0])
            self._isNewBar = True
        else:
            # update current bar
            self._High[0] = max(self._High[0], self._tick[1])
            self._Low[0] = max(self._Low[0], self._tick[1])
            self._Close[0] = self._tick[1]
            self._Dt[0] = self._tick[0]
            self._isNewBar = False

    def _buy(self, price, volume):
        # create an order
        self._order_number += 1
        key = "order" + str(self._order_number)
        self._current_orders[key] = {
            "open_datetime": self._Dt[0],
            "open_price": price,
            "volume": volume  # 股数
        }
        pass

    def _sell(self, key, price):
        self._current_orders[key]['close_price'] = price
        self._current_orders[key]['close_datetime'] = self._Dt[0]

        # move order from current orders to history orders
        self._history_orders[key] = self._current_orders.pop(key)

    def strategy(self):
        # last < 0.95 *ma20 ,long position(仓位), last > ma20 *1.05, sell
        if self._isNewBar:
            sum_ = 0
            for item in self._Close[1:21]:
                sum_ = sum_ + item
            self._ma20 = sum_ / 20

        if 0 == len(self._current_orders):
            if self._Close[0] < 0.95 * self._ma20:
                # 100000/44.28 = 2258   44.28是当前价格,10万指的你拥有的钱
                # 2258 -> 2200 shares
                volume = int(100000 / self._Close[0] / 100) * 100
                self._buy(self._Close[0] + 0.01, volume)  # 这里的0.01是为了防止挂单,我们需要即可买入
        elif 1 == len(self._current_orders):
            if self._Close[0] > self._ma20 * 1.05:
                key = self._current_orders.keys()[0]
                self._sell(key, self._Close[0] - 0.01)
        else:  # len() = 2
            raise ValueError("we have more then 1 current orders")
        # Close[0] in between 0.95*ma20 and 1.05*ma20,do nothing


def get_ticks_for_backtesting():
    tick_path = "E:\\Downloads\\600036_data\\600036_ticks.csv"  # 生成的回测数据路径
    bar_path = "E:\\Downloads\\600036_data\\600036_5m.csv"  # 历史数据的tick路径

    if os.path.exists(tick_path):  # 如果已存在回测数据,直接读取回测数据ticks
        ticks = pd.read_csv(
            tick_path,
            parse_dates=['datetime'],
            index_col='datetime'
        )
    else:
        bar_5m = pd.read_csv(bar_path)  # 使用pandas读取csv数据
        ticks = []

        for index, row in bar_5m.iterrows():  # 根据不同的开盘价设置步长
            if row['open'] < 30:
                step = 0.01
            elif row['open'] < 60:
                step = 0.03
            elif row['open'] < 90:
                step = 0.05
            else:
                step = 0.1

            arr = np.arange(row['open'], row['high'], step)  # 按步长生成从open到high的数据
            arr = np.append(arr, row['high'])   # 这个是为了弥补步长的不对等会漏掉high
            arr = np.append(arr, np.arange(row['open'] - step, row['low'], -step))  # 按步长生成从open到low的数据
            arr = np.append(arr, row['low'])   # 这个是为了弥补步长的不对等会漏掉low
            arr = np.append(arr, row['close'])

            i = 0
            dt = parser.parse(row['datetime']) - timedelta(minutes=5)
            for item in arr:
                ticks.append((dt + timedelta(seconds=0.1 * i), item))   # 将数据时间模拟到0.1秒递进
                i += 1
        tick_df = pd.DataFrame(ticks, columns=['datetime', 'price'])
        tick_df.to_csv(tick_path, index=0)  # 保存到csv回测数据中
    return ticks


ma = AstockTrading('600036')  # 类实例化
ma.get_history_data_from_local_machine()

# 交易时间是9:30-11:30,13:00-15:00
while time(9, 26) < datetime.now().time() < time(11, 32) \
        or time(13) < datetime.now().time() < time(15, 2):
    ma.get_tick()
    ma.bar_generator()
    ma.strategy()
    # trade_time = parser.parse(ma._tick[0]).time()
    # sleep(3)

 类似资料: