neutron-server RPC

优质
小牛编辑
127浏览
2023-12-01

neutron-server的rpc

这个rpc服务端主要通过neutron.server中主函数中代码执行

neutron_rpc = service.serve_rpc()

方法的实现代码如下

def serve_rpc():
    plugin = manager.NeutronManager.get_plugin()

    # If 0 < rpc_workers then start_rpc_listeners would be called in a
    # subprocess and we cannot simply catch the NotImplementedError.  It is
    # simpler to check this up front by testing whether the plugin supports
    # multiple RPC workers.
    if not plugin.rpc_workers_supported():
        LOG.debug(_("Active plugin doesn't implement start_rpc_listeners"))
        if 0 < cfg.CONF.rpc_workers:
            msg = _("'rpc_workers = %d' ignored because start_rpc_listeners "
                    "is not implemented.")
            LOG.error(msg, cfg.CONF.rpc_workers)
        raise NotImplementedError

    try:
        rpc = RpcWorker(plugin)

        if cfg.CONF.rpc_workers < 1:
            rpc.start()
            return rpc
        else:
            launcher = common_service.ProcessLauncher(wait_interval=1.0)
            launcher.launch_service(rpc, workers=cfg.CONF.rpc_workers)
            return launcher
    except Exception:
        with excutils.save_and_reraise_exception():
            LOG.exception(_('Unrecoverable error: please check log '
                            'for details.'))

其中,RpcWorker(plugin)主要通过调用plugin的方法来创建rpc服务端。

self._servers = self._plugin.start_rpc_listeners()

该方法在大多数plugin中并未被实现,目前ml2支持该方法。

在neutron.plugin.ml2.plugin.ML2Plugin类中,该方法创建了一个topic为topics.PLUGIN的消费rpc。

def start_rpc_listeners(self):
        self.endpoints = [rpc.RpcCallbacks(self.notifier, self.type_manager),
                          agents_db.AgentExtRpcCallback()]
        self.topic = topics.PLUGIN
        self.conn = n_rpc.create_connection(new=True)
        self.conn.create_consumer(self.topic, self.endpoints,
                                  fanout=False)
        return self.conn.consume_in_threads()