这是一个解读RQAlpha源代码的系列,我会定期更新。RQAlpha的github地址为:https://github.com/ricequant/rqalpha 。 对于一个事件驱动型的量化回测框架,我一般是从其关于对事件的处理的代码开始分析的,以下按照这个逻辑对RQAlpha的源代码进行分析。
该py文件中定义了Event, EventBus, EVENT三个类和parse_event这一个函数,这些类和函数是构建事件驱动引擎的基础。
class Event(object):
def __init__(self, event_type, **kwargs):
self.__dict__ = kwargs
self.event_type = event_type
def __repr__(self):
return ' '.join('{}:{}'.format(k, v) for k, v in self.__dict__.items())
Event类用于定义各类事件,一个事件有两个属性:事件的类型和事件的内容,event_type属性就代表事件的类型。关于事件的内容,这里设置得相当普适,事件的内容**kwargs的形式输入,这样的话就可以以多个实例属性的形式展现该事件的内容。
但这里需要注意的是,self.dict = kwargs和self.event_type = event_type这两个语句的顺序不能替换。我们知道__dict__表示的是类或实例的属性和方法(这里是实例),如果顺序替换,则self.__dict__中不会包含event_type,就会导致event_type属性不存在于实例中。下面以一个具体的例子展示Event类的使用:
class Event(object):
def __init__(self, event_type, **kwargs):
self.__dict__ = kwargs
self.event_type = event_type
def __repr__(self):
return ' '.join('{}:{}'.format(k, v) for k, v in self.__dict__.items())
if __name__ == "__main__":
bar = Event("bar", open=3645,high=3665,low=3620,close=3637,volume=39776543,openinterest=34555555342)
print(f"bar.event_type = {bar.event_type}")
print(f"bar.__dict__ = {bar.__dict__}")
print(bar)
bar
运行结果为:
bar.event_type = bar
bar.__dict__ = {'open': 3645, 'high': 3665, 'low': 3620, 'close': 3637, 'volume': 39776543, 'openinterest': 34555555342, 'event_type': 'bar'}
open:3645 high:3665 low:3620 close:3637 volume:39776543 openinterest:34555555342 event_type:bar
open:3645 high:3665 low:3620 close:3637 volume:39776543 openinterest:34555555342 event_type:bar
这个例子中的Event实例,实际上表示的是一个Bar事件,这个事件的类型是bar,事件的内容则是这个Bar的开盘价、最高价、最低价、收盘价等信息。首先我们能看到bar.__dict__展现的就是事件的内容+事件的类型,此外由于我们重写了__repr__方法,运行print(bar)和bar时也能输出事件的内容+事件的类型(即该实例的所有的属性)。
class EventBus(object):
def __init__(self):
self._listeners = defaultdict(list) #监听器
self._user_listeners = defaultdict(list) #用户监听器
def add_listener(self, event_type, listener, user=False):
(self._user_listeners if user else self._listeners)[event_type].append(listener)
def prepend_listener(self, event_type, listener, user=False):
(self._user_listeners if user else self._listeners)[event_type].insert(0, listener)
def publish_event(self, event):
for listener in self._listeners[event.event_type]:
# 如果返回 True ,那么消息不再传递下去
if listener(event):
break
for listener in self._user_listeners[event.event_type]:
listener(event)
我们知道,在一个事件驱动引擎中,有了事件,接着就需要去监听、处理事件,我们用来监听事件必然是一个一直开启的东西,实时监听是否有事件产生,而处理事件则是回调函数,其能够在事件发生时被调用,对事件做出反应。因此,我们首先需要写监听事件的函数并添加处理事件的回调函数。
在EventBus类中,首先定义了两个属性——_listeners和_user_listeners,_listeners是一般的监听器,_user_listeners是特定用户的监听器。这两个属性都被定义为了defaultdict(list)类型。显然,字典的键是事件的类型,字典的值是列表,列表的每个元素是该类型事件对应的回调函数。
add_listener和prepend_listener这两个方法都是用于添加回调函数,并且都规定了user=False时向self._listeners中添加,反之则向self._user_listeners中添加。这两个方法的区别是,add_listener是向列表的末尾添加回调函数,而prepend_listener则是向列表的开头位置添加回调函数。
publish_event则起着将产生的事件event传给回调函数们处理。第一步是将event传给self._listeners对应类型的回调函数处理,只要有一个回调函数处理后返回True,则消息不再传递下去。第二步再传给_user_listeners处理。
这个类是继承枚举类得到的,定义了一系列的事件的名称。一般我们都会把事件的名称定义成枚举值,而不是直接用字符串。
class EVENT(Enum):
# 系统初始化后触发
# post_system_init()
POST_SYSTEM_INIT = 'post_system_init'
# 在实盘时,你可能需要在此事件后根据其他信息源对系统状态进行调整
BEFORE_SYSTEM_RESTORED = 'before_system_restored'
POST_SYSTEM_RESTORED = 'post_system_restored'
# 策略执行完init函数后触发
# post_user_init()
POST_USER_INIT = 'post_user_init'
# 策略证券池发生变化后触发
# post_universe_changed(universe)
POST_UNIVERSE_CHANGED = 'post_universe_changed'
# 执行before_trading函数前触发
# pre_before_trading()
PRE_BEFORE_TRADING = 'pre_before_trading'
# 该事件会触发策略的before_trading函数
# before_trading()
BEFORE_TRADING = 'before_trading'
# 执行before_trading函数后触发
# post_before_trading()
POST_BEFORE_TRADING = 'post_before_trading'
# 执行handle_bar函数前触发
# pre_bar()
PRE_BAR = 'pre_bar'
# 该事件会触发策略的handle_bar函数
# bar(bar_dict)
BAR = 'bar'
# 执行handle_bar函数后触发
# post_bar()
POST_BAR = 'post_bar'
# 执行handle_tick前触发
PRE_TICK = 'pre_tick'
# 该事件会触发策略的handle_tick函数
# tick(tick)
TICK = 'tick'
# 执行handle_tick后触发
POST_TICK = 'post_tick'
# 在scheduler执行前触发
PRE_SCHEDULED = 'pre_scheduled'
# 在scheduler执行后触发
POST_SCHEDULED = 'post_scheduled'
# 执行after_trading函数前触发
# pre_after_trading()
PRE_AFTER_TRADING = 'pre_after_trading'
# 该事件会触发策略的after_trading函数
# after_trading()
AFTER_TRADING = 'after_trading'
# 执行after_trading函数后触发
# post_after_trading()
POST_AFTER_TRADING = 'post_after_trading'
# 结算前触发该事件
# pre_settlement()
PRE_SETTLEMENT = 'pre_settlement'
# 触发结算事件
# settlement()
SETTLEMENT = 'settlement'
# 结算后触发该事件
# post_settlement()
POST_SETTLEMENT = 'post_settlement'
# 创建订单
# order_pending_new(account, order)
ORDER_PENDING_NEW = 'order_pending_new'
# 创建订单成功
# order_creation_pass(account, order)
ORDER_CREATION_PASS = 'order_creation_pass'
# 创建订单失败
# order_creation_reject(account, order)
ORDER_CREATION_REJECT = 'order_creation_reject'
# 创建撤单
# order_pending_cancel(account, order)
ORDER_PENDING_CANCEL = 'order_pending_cancel'
# 撤销订单成功
# order_cancellation_pass(account, order)
ORDER_CANCELLATION_PASS = 'order_cancellation_pass'
# 撤销订单失败
# order_cancellation_reject(account, order)
ORDER_CANCELLATION_REJECT = 'order_cancellation_reject'
# 订单状态更新
# order_unsolicited_update(account, order)
ORDER_UNSOLICITED_UPDATE = 'order_unsolicited_update'
# 成交
# trade(accout, trade, order)
TRADE = 'trade'
ON_LINE_PROFILER_RESULT = 'on_line_profiler_result'
# persist immediately
DO_PERSIST = 'do_persist'
# reload immediately
DO_RESTORE = "do_restore"
# 策略被暂停
STRATEGY_HOLD_SET = 'strategy_hold_set'
# 策略被恢复
STRATEGY_HOLD_CANCELLED = 'strategy_hold_canceled'
# 心跳事件,用于触发定时任务
HEARTBEAT = 'heartbeat'
# 用户事件,接受用户发送的信息
USER = 'user'
这个函数很简单,因为EVENT类中的所有的事件的名称都是大写的,如果程序运行过程中误输入小写字母,可通过parse_event照样获得EVENT类中对应的枚举值而不报错。
def parse_event(event_str):
return EVENT[event_str.upper()]