当前位置: 首页 > 工具软件 > Rqalpha > 使用案例 >

RQAlpha源代码解读(一)——事件驱动引擎(一)

荀增
2023-12-01

这是一个解读RQAlpha源代码的系列,我会定期更新。RQAlpha的github地址为:https://github.com/ricequant/rqalpha 。 对于一个事件驱动型的量化回测框架,我一般是从其关于对事件的处理的代码开始分析的,以下按照这个逻辑对RQAlpha的源代码进行分析。

1. events.py

该py文件中定义了Event, EventBus, EVENT三个类和parse_event这一个函数,这些类和函数是构建事件驱动引擎的基础。

Event类

class Event(object):
    def __init__(self, event_type, **kwargs):
        self.__dict__ = kwargs
        self.event_type = event_type

    def __repr__(self):
        return ' '.join('{}:{}'.format(k, v) for k, v in self.__dict__.items())

Event类用于定义各类事件,一个事件有两个属性:事件的类型和事件的内容,event_type属性就代表事件的类型。关于事件的内容,这里设置得相当普适,事件的内容**kwargs的形式输入,这样的话就可以以多个实例属性的形式展现该事件的内容。
但这里需要注意的是,self.dict = kwargs和self.event_type = event_type这两个语句的顺序不能替换。我们知道__dict__表示的是类或实例的属性和方法(这里是实例),如果顺序替换,则self.__dict__中不会包含event_type,就会导致event_type属性不存在于实例中。下面以一个具体的例子展示Event类的使用:

class Event(object):
    def __init__(self, event_type, **kwargs):
        self.__dict__ = kwargs
        self.event_type = event_type
        
    def __repr__(self):
        return ' '.join('{}:{}'.format(k, v) for k, v in self.__dict__.items())
    
if __name__ == "__main__":
    bar = Event("bar", 		open=3645,high=3665,low=3620,close=3637,volume=39776543,openinterest=34555555342)
    print(f"bar.event_type = {bar.event_type}")
    print(f"bar.__dict__ = {bar.__dict__}")
    print(bar)
    bar

运行结果为:

bar.event_type = bar
bar.__dict__ = {'open': 3645, 'high': 3665, 'low': 3620, 'close': 3637, 'volume': 39776543, 'openinterest': 34555555342, 'event_type': 'bar'}
open:3645 high:3665 low:3620 close:3637 volume:39776543 openinterest:34555555342 event_type:bar
open:3645 high:3665 low:3620 close:3637 volume:39776543 openinterest:34555555342 event_type:bar

这个例子中的Event实例,实际上表示的是一个Bar事件,这个事件的类型是bar,事件的内容则是这个Bar的开盘价、最高价、最低价、收盘价等信息。首先我们能看到bar.__dict__展现的就是事件的内容+事件的类型,此外由于我们重写了__repr__方法,运行print(bar)和bar时也能输出事件的内容+事件的类型(即该实例的所有的属性)。

EventBus类

class EventBus(object):
    def __init__(self):
        self._listeners = defaultdict(list)            #监听器
        self._user_listeners = defaultdict(list)   #用户监听器

    def add_listener(self, event_type, listener, user=False):
        (self._user_listeners if user else self._listeners)[event_type].append(listener)

    def prepend_listener(self, event_type, listener, user=False):
        (self._user_listeners if user else self._listeners)[event_type].insert(0, listener)

    def publish_event(self, event):
        for listener in self._listeners[event.event_type]:
            # 如果返回 True ,那么消息不再传递下去
            if listener(event):
                break

        for listener in self._user_listeners[event.event_type]:
            listener(event)

我们知道,在一个事件驱动引擎中,有了事件,接着就需要去监听、处理事件,我们用来监听事件必然是一个一直开启的东西,实时监听是否有事件产生,而处理事件则是回调函数,其能够在事件发生时被调用,对事件做出反应。因此,我们首先需要写监听事件的函数并添加处理事件的回调函数。
在EventBus类中,首先定义了两个属性——_listeners和_user_listeners,_listeners是一般的监听器,_user_listeners是特定用户的监听器。这两个属性都被定义为了defaultdict(list)类型。显然,字典的键是事件的类型,字典的值是列表,列表的每个元素是该类型事件对应的回调函数。
add_listener和prepend_listener这两个方法都是用于添加回调函数,并且都规定了user=False时向self._listeners中添加,反之则向self._user_listeners中添加。这两个方法的区别是,add_listener是向列表的末尾添加回调函数,而prepend_listener则是向列表的开头位置添加回调函数。
publish_event则起着将产生的事件event传给回调函数们处理。第一步是将event传给self._listeners对应类型的回调函数处理,只要有一个回调函数处理后返回True,则消息不再传递下去。第二步再传给_user_listeners处理。

EVENT类

这个类是继承枚举类得到的,定义了一系列的事件的名称。一般我们都会把事件的名称定义成枚举值,而不是直接用字符串。

class EVENT(Enum):
    # 系统初始化后触发
    # post_system_init()
    POST_SYSTEM_INIT = 'post_system_init'

    # 在实盘时,你可能需要在此事件后根据其他信息源对系统状态进行调整
    BEFORE_SYSTEM_RESTORED = 'before_system_restored'
    POST_SYSTEM_RESTORED = 'post_system_restored'

    # 策略执行完init函数后触发
    # post_user_init()
    POST_USER_INIT = 'post_user_init'
    # 策略证券池发生变化后触发
    # post_universe_changed(universe)
    POST_UNIVERSE_CHANGED = 'post_universe_changed'

    # 执行before_trading函数前触发
    # pre_before_trading()
    PRE_BEFORE_TRADING = 'pre_before_trading'
    # 该事件会触发策略的before_trading函数
    # before_trading()
    BEFORE_TRADING = 'before_trading'
    # 执行before_trading函数后触发
    # post_before_trading()
    POST_BEFORE_TRADING = 'post_before_trading'

    # 执行handle_bar函数前触发
    # pre_bar()
    PRE_BAR = 'pre_bar'
    # 该事件会触发策略的handle_bar函数
    # bar(bar_dict)
    BAR = 'bar'
    # 执行handle_bar函数后触发
    # post_bar()
    POST_BAR = 'post_bar'

    # 执行handle_tick前触发
    PRE_TICK = 'pre_tick'
    # 该事件会触发策略的handle_tick函数
    # tick(tick)
    TICK = 'tick'
    # 执行handle_tick后触发
    POST_TICK = 'post_tick'

    # 在scheduler执行前触发
    PRE_SCHEDULED = 'pre_scheduled'
    # 在scheduler执行后触发
    POST_SCHEDULED = 'post_scheduled'

    # 执行after_trading函数前触发
    # pre_after_trading()
    PRE_AFTER_TRADING = 'pre_after_trading'
    # 该事件会触发策略的after_trading函数
    # after_trading()
    AFTER_TRADING = 'after_trading'
    # 执行after_trading函数后触发
    # post_after_trading()
    POST_AFTER_TRADING = 'post_after_trading'

    # 结算前触发该事件
    # pre_settlement()
    PRE_SETTLEMENT = 'pre_settlement'
    # 触发结算事件
    # settlement()
    SETTLEMENT = 'settlement'
    # 结算后触发该事件
    # post_settlement()
    POST_SETTLEMENT = 'post_settlement'

    # 创建订单
    # order_pending_new(account, order)
    ORDER_PENDING_NEW = 'order_pending_new'
    # 创建订单成功
    # order_creation_pass(account, order)
    ORDER_CREATION_PASS = 'order_creation_pass'
    # 创建订单失败
    # order_creation_reject(account, order)
    ORDER_CREATION_REJECT = 'order_creation_reject'
    # 创建撤单
    # order_pending_cancel(account, order)
    ORDER_PENDING_CANCEL = 'order_pending_cancel'
    # 撤销订单成功
    # order_cancellation_pass(account, order)
    ORDER_CANCELLATION_PASS = 'order_cancellation_pass'
    # 撤销订单失败
    # order_cancellation_reject(account, order)
    ORDER_CANCELLATION_REJECT = 'order_cancellation_reject'
    # 订单状态更新
    # order_unsolicited_update(account, order)
    ORDER_UNSOLICITED_UPDATE = 'order_unsolicited_update'

    # 成交
    # trade(accout, trade, order)
    TRADE = 'trade'

    ON_LINE_PROFILER_RESULT = 'on_line_profiler_result'

    # persist immediately
    DO_PERSIST = 'do_persist'
    # reload immediately
    DO_RESTORE = "do_restore"

    # 策略被暂停
    STRATEGY_HOLD_SET = 'strategy_hold_set'
    # 策略被恢复
    STRATEGY_HOLD_CANCELLED = 'strategy_hold_canceled'

    # 心跳事件,用于触发定时任务
    HEARTBEAT = 'heartbeat'

    # 用户事件,接受用户发送的信息
    USER = 'user'

parse_event函数

这个函数很简单,因为EVENT类中的所有的事件的名称都是大写的,如果程序运行过程中误输入小写字母,可通过parse_event照样获得EVENT类中对应的枚举值而不报错。

def parse_event(event_str):
    return EVENT[event_str.upper()]
 类似资料: