点击下载数据样本:提取码=iat9
import pandas as pd
:ALT+ENTER -> 选择 -> Install package pandas
代码中已经对get_ticks_for_backtesting方法比较详细的注释了各个实现步骤,这里主要针对一些疑难进行说明
step步长:需要根据开盘价进行步长设置
arr = np.append(arr, row[‘high’])与arr = np.append(arr, row[‘low’]):这个是为了弥补步长的不对等会漏掉high或low
import requests
from time import sleep
from datetime import datetime, time, timedelta
from dateutil import parser
import pandas as pd
import numpy as np
import os
# __init__,构造,初始化,实例化
class AstockTrading(object):
def __init__(self, strategy_name):
self._strategy_name = strategy_name
self._Dt = [] # 交易时间
self._Open = [] # 开盘价
self._High = [] # 最高价
self._Low = [] # 最低价
self._Close = [] # 最新价
self._Volume = []
self._tick = [] # 数据
self._last_bar_start_minute = None # 最后一次更新bar的时间
self._isNewBar = False # 是否有新bar
self._ma20 = None
# 当前订单,dict, 字典
self._current_orders = {}
# 历史订单
self._history_orders = {}
self._order_number = 0
def get_tick(self):
headers = {'Referer': "https://finance.sina.com.cn"}
page = requests.get("https://hq.sinajs.cn/list=sh600519", headers=headers)
stock_info = page.text
mt_info = stock_info.replace("\"", "").split("=")[1].split(",")
# 最新价
last = float(mt_info[1])
trade_datetime = mt_info[30] + ' ' + mt_info[31]
self._tick = (trade_datetime, last)
def get_history_data_from_local_machine(self):
# tushare 数据来源
# self.Open = [1, 2, 3]
# self.High = [2, 3, 4]
pass
# how save and import history data?
# 策略
def bar_generator(self):
# assume we have history data already
# 1、update bars,calculate 5 minutes ma20 , not daily data
# 2、compare last and ma20 -> buy or sell or pass
# assume we have history data,Open,High,Low,Close,Dt
# 这里可以是5minutes、10minutes、15minutes、20minutes、30minutes
if self._tick[0].minute % 5 == 0 and self._tick[0].minute != self._last_bar_start_minute:
self._last_bar_start_minute = self._tick[0].minute
self._Open.insert(0, self._tick[1])
self._High.insert(0, self._tick[1])
self._Low.insert(0, self._tick[1])
self._Close.insert(0, self._tick[1])
self._Dt.insert(0, self._tick[0])
self._isNewBar = True
else:
# update current bar
self._High[0] = max(self._High[0], self._tick[1])
self._Low[0] = max(self._Low[0], self._tick[1])
self._Close[0] = self._tick[1]
self._Dt[0] = self._tick[0]
self._isNewBar = False
def _buy(self, price, volume):
# create an order
self._order_number += 1
key = "order" + str(self._order_number)
self._current_orders[key] = {
"open_datetime": self._Dt[0],
"open_price": price,
"volume": volume # 股数
}
pass
def _sell(self, key, price):
self._current_orders[key]['close_price'] = price
self._current_orders[key]['close_datetime'] = self._Dt[0]
# move order from current orders to history orders
self._history_orders[key] = self._current_orders.pop(key)
def strategy(self):
# last < 0.95 *ma20 ,long position(仓位), last > ma20 *1.05, sell
if self._isNewBar:
sum_ = 0
for item in self._Close[1:21]:
sum_ = sum_ + item
self._ma20 = sum_ / 20
if 0 == len(self._current_orders):
if self._Close[0] < 0.95 * self._ma20:
# 100000/44.28 = 2258 44.28是当前价格,10万指的你拥有的钱
# 2258 -> 2200 shares
volume = int(100000 / self._Close[0] / 100) * 100
self._buy(self._Close[0] + 0.01, volume) # 这里的0.01是为了防止挂单,我们需要即可买入
elif 1 == len(self._current_orders):
if self._Close[0] > self._ma20 * 1.05:
key = self._current_orders.keys()[0]
self._sell(key, self._Close[0] - 0.01)
else: # len() = 2
raise ValueError("we have more then 1 current orders")
# Close[0] in between 0.95*ma20 and 1.05*ma20,do nothing
def get_ticks_for_backtesting():
tick_path = "E:\\Downloads\\600036_data\\600036_ticks.csv" # 生成的回测数据路径
bar_path = "E:\\Downloads\\600036_data\\600036_5m.csv" # 历史数据的tick路径
if os.path.exists(tick_path): # 如果已存在回测数据,直接读取回测数据ticks
ticks = pd.read_csv(
tick_path,
parse_dates=['datetime'],
index_col='datetime'
)
else:
bar_5m = pd.read_csv(bar_path) # 使用pandas读取csv数据
ticks = []
for index, row in bar_5m.iterrows(): # 根据不同的开盘价设置步长
if row['open'] < 30:
step = 0.01
elif row['open'] < 60:
step = 0.03
elif row['open'] < 90:
step = 0.05
else:
step = 0.1
arr = np.arange(row['open'], row['high'], step) # 按步长生成从open到high的数据
arr = np.append(arr, row['high']) # 这个是为了弥补步长的不对等会漏掉high
arr = np.append(arr, np.arange(row['open'] - step, row['low'], -step)) # 按步长生成从open到low的数据
arr = np.append(arr, row['low']) # 这个是为了弥补步长的不对等会漏掉low
arr = np.append(arr, row['close'])
i = 0
dt = parser.parse(row['datetime']) - timedelta(minutes=5)
for item in arr:
ticks.append((dt + timedelta(seconds=0.1 * i), item)) # 将数据时间模拟到0.1秒递进
i += 1
tick_df = pd.DataFrame(ticks, columns=['datetime', 'price'])
tick_df.to_csv(tick_path, index=0) # 保存到csv回测数据中
return ticks
ma = AstockTrading('600036') # 类实例化
ma.get_history_data_from_local_machine()
# 交易时间是9:30-11:30,13:00-15:00
while time(9, 26) < datetime.now().time() < time(11, 32) \
or time(13) < datetime.now().time() < time(15, 2):
ma.get_tick()
ma.bar_generator()
ma.strategy()
# trade_time = parser.parse(ma._tick[0]).time()
# sleep(3)