class RyuApp(object):
    The base class for Ryu applications.

    RyuApp subclasses are instantiated after ryu-manager loaded
    all requested Ryu application modules.
    __init__ should call RyuApp.__init__ with the same arguments.
    It's illegal to send any events in __init__.

    The instance attribute 'name' is the name of the class used for
    message routing among Ryu applications.  (Cf. send_event)
    It's set to __class__.__name__ by RyuApp.__init__.
    It's discouraged for subclasses to override this.

    _CONTEXTS = {}
    A dictionary to specify contexts which this Ryu application wants to use.
    Its key is a name of context and its value is an ordinary class
    which implements the context.  The class is instantiated by app_manager
    and the instance is shared among RyuApp subclasses which has _CONTEXTS
    member with the same key.  A RyuApp subclass can obtain a reference to
    the instance via its __init__'s kwargs as the following.


        _CONTEXTS = {
            'network': network.Network

        def __init__(self, *args, *kwargs):
            self.network = kwargs['network']

    _EVENTS = []
    A list of event classes which this RyuApp subclass would generate.
    This should be specified if and only if event classes are defined in
    a different python module from the RyuApp subclass is.

    A list of supported OpenFlow versions for this RyuApp.
    The default is all versions supported by the framework.


        OFP_VERSIONS = [ofproto_v1_0.OFP_VERSION,

    If multiple Ryu applications are loaded in the system,
    the intersection of their OFP_VERSIONS is used.

    def context_iteritems(cls):
        Return iterator over the (key, contxt class) of application context
        return iter(cls._CONTEXTS.items())

    def __init__(self, *_args, **_kwargs):
        super(RyuApp, self).__init__()
        self.name = self.__class__.__name__
        self.event_handlers = {}        # ev_cls -> handlers:list
        self.observers = {}     # ev_cls -> observer-name -> states:set
        self.threads = []
        self.main_thread = None
        self.events = hub.Queue(128)
        self._events_sem = hub.BoundedSemaphore(self.events.maxsize)
        if hasattr(self.__class__, 'LOGGER_NAME'):
            self.logger = logging.getLogger(self.__class__.LOGGER_NAME)
            self.logger = logging.getLogger(self.name)
        self.CONF = cfg.CONF

        # prevent accidental creation of instances of this class outside RyuApp
        class _EventThreadStop(event.EventBase):
        self._event_stop = _EventThreadStop()
        self.is_active = True
  • 涉及到了app之间的事件收发,就不得不补一下前面剩下还没有分析的RyuAPP的app基类
  • app基类中定义了app与app间收、发事件的函数,也定义了app在协程中运行的事件循环
  • 在app基类的开头首先定义了类属性_CONTEXT,我们已经很熟悉了,是用来定义本app所依赖的外部类的字典
  • 还定义了_EVENTS类属性用来定义本app能够产生的事件,后面还会详细分析
  • OFP_VERSIONS是本app工作于的openflow版本
  • app基类初始化函数中
    • event_handlers变量存储接事件到函数的映射
    • observers是对本app产生的事件监听的app们,也就是本app的消费者
    • threads是本app产生的多个协程
    • main_thread用于main函数join,main_thread退出了才是真推出
    • events变量是个队列,用于存储其他app发送来的事件
    • 其他变量见名知意
  • 接下来分析app基类中事件钩子相关的函数
  • 首先我们看到了前文提到的调用链,即instantiate_apps -> _instantiate -> register_app -> register_instance -> register_handler,中的最后一环register_handler
  • register_handler做的事情也很简单,把本app中各函数的callers收集起来,整理成一个字典,也就是self.event_handlers
  • unregister_handler也就是相应的删除函数了
  • 229行get_handlers函数,获取相应ev所对应的handlers,并进行dispatcher阶段的过滤
    def get_observers(self, ev, state):
        observers = []
        for k, v in self.observers.get(ev.__class__, {}).items():
            if not state or not v or state in v:

        return observers

    def send_request(self, req):
        Make a synchronous request.
        Set req.sync to True, send it to a Ryu application specified by
        req.dst, and block until receiving a reply.
        Returns the received reply.
        The argument should be an instance of EventRequestBase.

        assert isinstance(req, EventRequestBase)
        req.sync = True
        req.reply_q = hub.Queue()
        self.send_event(req.dst, req)
        # going to sleep for the reply
        return req.reply_q.get()

    def _event_loop(self):
        while self.is_active or not self.events.empty():
            ev, state = self.events.get()
            if ev == self._event_stop:
            handlers = self.get_handlers(ev, state)
            for handler in handlers:
                except hub.TaskExit:
                    # Normal exit.
                    # Propagate upwards, so we leave the event loop.
                    LOG.exception('%s: Exception occurred during handler processing. '
                                  'Backtrace from offending handler '
                                  '[%s] servicing event [%s] follows.',
                                  self.name, handler.__name__, ev.__class__.__name__)

    def _send_event(self, ev, state):
        self.events.put((ev, state))

    def send_event(self, name, ev, state=None):
        Send the specified event to the RyuApp instance specified by name.

        if name in SERVICE_BRICKS:
            if isinstance(ev, EventRequestBase):
                ev.src = self.name
            LOG.debug("EVENT %s->%s %s",
                      self.name, name, ev.__class__.__name__)
            SERVICE_BRICKS[name]._send_event(ev, state)
            LOG.debug("EVENT LOST %s->%s %s",
                      self.name, name, ev.__class__.__name__)

    def send_event_to_observers(self, ev, state=None):
        Send the specified event to all observers of this RyuApp.

        for observer in self.get_observers(ev, state):
            self.send_event(observer, ev, state)


def start(self):
        Hook that is called after startup initialization is done.

    def stop(self):
        if self.main_thread:
        self.is_active = False
        self._send_event(self._event_stop, None)

    def set_main_thread(self, thread):
        Set self.main_thread so that stop() can terminate it.

        Only AppManager.instantiate_apps should call this function.
        self.main_thread = thread

  • 接下来分析app基类中事件接收相关的函数
  • 还记得我们在分析app_manager的instantiate_apps函数时,对每个app都调用了start,并将返回值作为协程句柄返回以供main函数join
  • 174行可以看到start函数做的事情只有在hub中添加本app的事件循环,并且如果不继承覆盖start函数的话,并不会返回协程句柄
  • 180行stop函数则进行相应的kill
  • 281行就是本app的事件循环
    • 282行可以看出本app处于活跃阶段,并且events队列中有事件时,才会进入循环
    • 283行从队列中取出事件,287行从本app的handlers变量取出ev事件对应的事件处理函数
    • 最后288行依次执行这个event对应的handler,完成事件的接收工作
def _update_bricks(self):
        for i in SERVICE_BRICKS.values():
            for _k, m in inspect.getmembers(i, inspect.ismethod):
                if not hasattr(m, 'callers'):
                for ev_cls, c in m.callers.items():
                    if not c.ev_source:

                    brick = _lookup_service_brick_by_mod_name(c.ev_source)
                    if brick:
                        brick.register_observer(ev_cls, i.name,

                    # allow RyuApp and Event class are in different module
                    for brick in SERVICE_BRICKS.values():
                        if ev_cls in brick._EVENTS:
                            brick.register_observer(ev_cls, i.name,
  • 最后还有一个疑问是,我们知道了handler变量通过怎么样的调用链完成初始化,那么observers呢?
  • 前文提到了app_manager中instantiate_apps函数调用了_update_bricks,这里回顾_update_bricks函数,前面我们没有详细分析,现在可以回头再来看一下
  • 452行遍历了BRICK,即app
  • 453行遍历了app的所有方法
  • 454行过滤留下有callers属性,即监听了事件的函数
  • 456行过滤掉没有事件来源的事件
  • 460行寻找产生这个事件的模块
  • 462行调用前文提到的register_observer,向对方app表明自己的存在,声明自己的监听事件和监听阶段
  • 最后466行,如果事件的产生者并不在event这个类的模块里,ryu还会寻找所有brick(app)的_EVENTS属性
    • 如果app在_EVENTS中声明了自己会产生这个事件的话,那么同样在这个app进行注册
    • 这也就填坑了前面没有细说的_EVENTS,就是在这里对其进行了读取使用
    • 因此如果我们自己要编写新的event类型,同时event定义与事件产生的app在不同模块,就需要要在_EVENTS指定事件信息


  1. app的handler和observers两个变量均在instantiate_apps阶段完成变量的计算和赋值,为后续的事件路由提供信息
  2. 事件的接收是通过app基类中的事件循环完成的
  3. 事件的发送是通过本app的某函数主动调用send_event_to_observers等函数实现的

[1] ohmyadd:https://www.jianshu.com/p/ff59c7c5f056
[2] Ryubook:http://osrg.github.io/ryu-book/en/html/
[3] Ryu官方文档:https://ryu.readthedocs.io/en/latest/
