openstack-ceilometer第二式:源码分析-polling

从景曜
2023-12-01

openstack-ceilometer第二式:源码分析-polling

以社区 N 版代码为例

一、启动命令

exec ceilometer-polling --polling-namespaces compute --config-file /etc/ceilometer/ceilometer.conf

二、代码入口

ceilometer代码使用setuptools的pbr管理,该部分知识请见:

Python打包之setuptools

Openstack中setuptools和pbr软件打包管理

入口在:/ceilometer/cmd/polling.py的main函数

def create_polling_service(worker_id):
    return manager.AgentManager(CONF.polling_namespaces,
                                CONF.pollster_list,
                                worker_id)


def main():
    service.prepare_service()
    sm = cotyledon.ServiceManager()
    sm.add(create_polling_service)
    sm.run()
    
'''
关键:
1 cotyledon.Service(worker_id)
作用: 创建一个新的服务
参数: worker_id (int) – service实例的标示符
2 ServiceManager()
2.1 作用
类似于主进程,管理服务的生命周期。
控制子进程的生命周期,如果子进程意外死亡就重启他们。
每一个子进程ServiceWorker运行在一个服务的实例上。
一个应用必须创建一个ServiceManager类并且使用
ServiceManager.run()做为和应用的主循环
样例:
class cotyledon.ServiceManager(wait_interval=0.01, graceful_shutdown_timeout=60)
    
2.2 cotyledon.ServiceManager.add
cotyledon.ServiceManager.add(service, workers=1, args=None, kwargs=None)
作用: 创建一个子进程来运行AgentService服务
参数:
service (callable) – callable that return an instance of Service
workers (int) – number of processes/workers for this service
args (tuple) – additional positional arguments for this service
kwargs (dict) – additional keywoard arguments for this service
Returns:
a service id
    
2.3 cotyledon.ServiceManager.run()
开启并监督服务工作者
这个方法将会开启和监督所有子进程,直到主进程被关闭了
    
    
参考:
https://blog.csdn.net/qingyuanluofeng/article/details/95533476
'''    

该部分使用cotyledon的多进程框架实现

实例化子进程调用create_polling_service方法,该方法又调用AgentManager类,传入三个参数,分别是

  • CONF.polling_namespaces # 即是你在命令行中传入的参数,compute或者是central,分别代表两个服务
  • CONF.pollster_list # 默认为[]
  • worker_id # cotyledon中服务实例的标识符

compute服务即从此处开始,调用AgentManager的 init方法初始化,然后调用run方法进行数据的采集

让我们来看看AgentManager里面是什么

三、初始化AgentManager

class AgentManager(service_base.PipelineBasedService):

    def __init__(self, namespaces=None, pollster_list=None, worker_id=0):
        namespaces = namespaces or ['compute', 'central']
        pollster_list = pollster_list or []
        group_prefix = cfg.CONF.polling.partitioning_group_prefix
        self._inspector = virt_inspector.get_hypervisor_inspector()
        self.nv = nova_client.Client()

        # features of using coordination and pollster-list are exclusive, and
        # cannot be used at one moment to avoid both samples duplication and
        # samples being lost
				......
        # we'll have default ['compute', 'central'] here if no namespaces will
        # be passed
        ''' 
        (Pdb) p self.extensions[0].__dict__
        {
            'obj': < ceilometer.compute.pollsters.cpu.CPUL3CachePollster object at 0x7f392837acd0 > ,
            'entry_point': EntryPoint.parse('cpu_l3_cache = ceilometer.compute.pollsters.cpu:CPUL3CachePollster'),
            'name': 'cpu_l3_cache',
            'plugin': < class 'ceilometer.compute.pollsters.cpu.CPUL3CachePollster' >
        }
        (Pdb) p self.extensions[1].__dict__
        {
            'obj': < ceilometer.compute.pollsters.disk.WriteRequestsRatePollster object at 0x7f3928385650 > ,
            'entry_point': EntryPoint.parse('disk.write.requests.rate = ceilometer.compute.pollsters.disk:WriteRequestsRatePollster'),
            'name': 'disk.write.requests.rate',
            'plugin': < class 'ceilometer.compute.pollsters.disk.WriteRequestsRatePollster' >
        }
        (Pdb) p self.extensions[2].__dict__
        {
            'obj': < ceilometer.compute.pollsters.disk.DisksTotalPoller object at 0x7f39283973d0 > ,
            'entry_point': EntryPoint.parse('disks.total = ceilometer.compute.pollsters.disk:DisksTotalPoller'),
            'name': 'disks.total',
            'plugin': < class 'ceilometer.compute.pollsters.disk.DisksTotalPoller' >
        }
                
        '''
        extensions = (self._extensions('poll', namespace).extensions
                      for namespace in namespaces)
        # get the extensions from pollster builder
        extensions_fb = (self._extensions_from_builder('poll', namespace)
                         for namespace in namespaces)
        if pollster_list:
            extensions = (moves.filter(_match, exts)
                          for exts in extensions)
            extensions_fb = (moves.filter(_match, exts)
                             for exts in extensions_fb)

        self.extensions = list(itertools.chain(*list(extensions))) + list(
            itertools.chain(*list(extensions_fb)))

        if self.extensions == []:
            raise EmptyPollstersList()

        """
        (Pdb) p self.discoveries[0].__dict__
        {
            'obj': < ceilometer.compute.discovery.InstanceDiscovery object at 0x7f392864d390 > ,
            'entry_point': EntryPoint.parse('local_instances = ceilometer.compute.discovery:InstanceDiscovery'),
            'name': 'local_instances',
            'plugin': < class 'ceilometer.compute.discovery.InstanceDiscovery' >
        }
        """
        discoveries = (self._extensions('discover', namespace).extensions
                       for namespace in namespaces)
        self.discoveries = list(itertools.chain(*list(discoveries)))
        self.polling_periodics = None

        """
        (Pdb) p self.heartbeat_timer.__dict__
        {
            '_dead': < threading._Event object at 0x7f3928654c10 > ,
            '_waiter': < Condition( < _RLock owner = None count = 0 > , 0) > ,
            '_now_func': < function monotonic at 0x18d51b8 > ,
            '_schedule': < futurist.periodics._Schedule object at 0x7f3928654d10 > ,
            '_active': < threading._Event object at 0x7f3928654c90 > ,
            '_initial_schedule_strategy': < function _now_plus_periodicity at 0x1bccc80 > ,
            '_watchers': [({
                'successes': 0,
                'failures': 0,
                'runs': 0,
                'elapsed': 0,
                'elapsed_waiting': 0
            }, < Watcher object at 0x7f3928654d90(runs = 0, successes = 0, failures = 0, elapsed = 0.00, elapsed_waiting = 0.00) > )],
            '_on_failure': < functools.partial object at 0x7f3928649b50 > ,
            '_executor_factory': < function < lambda > at 0x7f39283809b0 > ,
            '_schedule_strategy': < function _last_started_strategy at 0x1bccb90 > ,
            '_log': < logging.Logger object at 0x1bbfa90 > ,
            '_callables': [( < function < lambda > at 0x7f39286581b8 > , 'ceilometer.utils.<lambda>', (), {})],
            '_cond_cls': < function Condition at 0x7f39364b2938 > ,
            '_tombstone': < threading._Event object at 0x7f3928654b10 > ,
            '_immediates': deque([0])
        }
        (Pdb) p self.partition_coordinator.__dict__
        {'_groups': set([]), '_my_id': 'c6ee6425-1bac-4e9c-a026-2022b133c825', '_coordinator': None}
        """
        self.partition_coordinator = coordination.PartitionCoordinator()
        self.heartbeat_timer = utils.create_periodic(
            target=self.partition_coordinator.heartbeat,
            spacing=cfg.CONF.coordination.heartbeat,
            run_immediately=True)

        # Compose coordination group prefix.
        # We'll use namespaces as the basement for this partitioning.
        namespace_prefix = '-'.join(sorted(namespaces))
        self.group_prefix = ('%s-%s' % (namespace_prefix, group_prefix)
                             if group_prefix else namespace_prefix)

        """
        (Pdb) p self.notifier.__dict__
        {
            '_serializer': < oslo_messaging.serializer.NoOpSerializer object at 0x7f1cc9aa3710 > ,
            '_driver_mgr': < stevedore.named.NamedExtensionManager object at 0x7f1cc9aa3890 > ,
            'retry': -1,
            '_driver_names': ['messagingv2'],
            '_topics': ['notifications'],
            'publisher_id': 'ceilometer.polling',
            'transport': < oslo_messaging.transport.NotificationTransport object at 0x7f1cc9b26550 >
        }
        
        (Pdb) p self.notifier._driver_mgr
        <stevedore.named.NamedExtensionManager object at 0x7f1cc9aa3890>
        
        (Pdb) p self.notifier._driver_mgr.__dict__
        {
            '_names': ['messagingv2'],
            'namespace': 'oslo.messaging.notify.drivers',
            '_on_load_failure_callback': None,
            'extensions': [ < stevedore.extension.Extension object at 0x7fddfc80e990 > ],
            'propagate_map_exceptions': False,
            '_extensions_by_name': None,
            '_name_order': False,
            '_missing_names': set([])
        }
        
        (Pdb) p self.notifier._driver_mgr.__dict__.get('extensions')[0].__dict__
        {
            'obj': < oslo_messaging.notify.messaging.MessagingV2Driver object at 0x7fddfc80e750 > ,
            'entry_point': EntryPoint.parse('messagingv2 = oslo_messaging.notify.messaging:MessagingV2Driver'),
            'name': 'messagingv2',
            'plugin': < class 'oslo_messaging.notify.messaging.MessagingV2Driver' >
        }
        
        """
        self.notifier = oslo_messaging.Notifier(
            messaging.get_transport(),
            driver=cfg.CONF.publisher_notifier.telemetry_driver,
            publisher_id="ceilometer.polling")

        self._keystone = None
        self._keystone_last_exception = None

以上的代码中的一些参数我都用pdb打印出来了,可以更加直观的感受,我们来逐行分析一下这个初始化的过程:

  1. 前5行应该不必多说,基本的参数传递,init的参数已经在代码入口处介绍过

  2. 第6行是为了实现工作负载分区协调

  3. 第7行调用get_hypervisor_inspector方法来加载libvirt驱动

    def get_hypervisor_inspector():
        try:
            namespace = 'ceilometer.compute.virt'
            mgr = driver.DriverManager(namespace,
                                       cfg.CONF.hypervisor_inspector,
                                       invoke_on_load=True)
            return mgr.driver
        except ImportError as e:
            LOG.error(_("Unable to load the hypervisor inspector: %s") % e)
            return Inspector()
    

    这里使用的是stevedore库,在该篇文章不过多介绍,具体请见:Python插件之stevedore

  4. 第8行初始化了个nove的client

  5. 第40-69行主要就是在加载插件了,加载插件的方式也是使用stevedore库,其中40和43行的namespace就是你传进来的方法,我们来看下_extensions的实现

    def _extensions(self, category, agent_ns=None):
      namespace = ('ceilometer.%s.%s' % (category, agent_ns) if agent_ns
                   else 'ceilometer.%s' % category)
      return self._get_ext_mgr(namespace)
    

    这里主要就是加载在ceilometer.poll.compute命名空间中的插件(如果开启的是central服务,即加载的是ceilometer.poll.central命名空间中的插件),插件在setup.cfg文件中

    ceilometer.poll.compute =
        disks.util = ceilometer.compute.pollsters.disk:DisksUtilPoller
        disks.total = ceilometer.compute.pollsters.disk:DisksTotalPoller
        disks.used = ceilometer.compute.pollsters.disk:DisksUsedPoller
        ...
        
    ceilometer.poll.central =
        account = ceilometer.chakra.chakra:AccountPollster
        ip.floating = ceilometer.network.floatingip:FloatingIPPollster
        image = ceilometer.image.glance:ImagePollster
        ...
    

    加载后的extension对象是这样的,是一个存储了在命名空间在所有插件对象的列表:

    (Pdb) p self.extensions[0].__dict__
    {
      'obj': < ceilometer.compute.pollsters.cpu.CPUL3CachePollster object at 0x7f392837acd0 > ,
      'entry_point': EntryPoint.parse('cpu_l3_cache = ceilometer.compute.pollsters.cpu:CPUL3CachePollster'),
      'name': 'cpu_l3_cache',
      'plugin': < class 'ceilometer.compute.pollsters.cpu.CPUL3CachePollster' >
    }
    (Pdb) p self.extensions[1].__dict__
    {
      'obj': < ceilometer.compute.pollsters.disk.WriteRequestsRatePollster object at 0x7f3928385650 > ,
      'entry_point': EntryPoint.parse('disk.write.requests.rate = ceilometer.compute.pollsters.disk:WriteRequestsRatePollster'),
      'name': 'disk.write.requests.rate',
      'plugin': < class 'ceilometer.compute.pollsters.disk.WriteRequestsRatePollster' >
    }
    (Pdb) p self.extensions[2].__dict__
    {
      'obj': < ceilometer.compute.pollsters.disk.DisksTotalPoller object at 0x7f39283973d0 > ,
      'entry_point': EntryPoint.parse('disks.total = ceilometer.compute.pollsters.disk:DisksTotalPoller'),
      'name': 'disks.total',
      'plugin': < class 'ceilometer.compute.pollsters.disk.DisksTotalPoller' >
    }
    

    43-69行的加载插件的原理同上,不赘述

  6. 第99-103行是为了获取一个工作负载分区协调类实例,用来协调多个采集程序worker时的分工处理,每个对象的实例在代码中已经打印,有兴趣的可以了解下,这里使用的是tooz库的一些用法,简单介绍下:

    1 Tooz基础
    
    作用: 
    1) 提供分布式协调API来管理群组和群组中的成员
    2) 提供分布式锁从而允许分布式节点获取和释放锁来实现同步
    解决的问题: 多个分布式进程同步问题
    
    2 Tooz架构
    
    本质: Tooz是zookeeper,  Raft consensus algorithm, Redis等方案的抽象,
        通过驱动(driver)形式来提供后端功能
    驱动分类:
    zookeeper, Zake, memcached, redis,
    SysV IPC(只提供分布式锁功能), PostgreSQL(只提供分布式锁功能), MySQL(只提供分布式锁功能)
    驱动特点:
    所有驱动都支持分布式进程, Tooz API完全异步,更高校。
    
    3 Tooz功能
    
    3.1 群组管理
    
    管理群组成员。
    操作: 群组创建,加入群组,离开群组,查看群组成员,有成员加入或离开群组时通知的功能
    应用场景:
    ceilometer-notification服务利用群组管理实现负载均衡和真正意义上的服务水平扩展。
    
    3.2 领导选取
    
    每个群组都有领导,所有节点可决定是否参与选举;
    领导消失则选取新领导节点;
    领导被选取其他成员可能得到通知;
    各节点可随时获取当前组的领导。
    感悟:
    考虑可以使用tooz实现自己的leader选举算法和服务高可用。
    
    3.3 分布式锁
    
    应用场景:
    原来ceilometer中通过RPC检测alarm evaluator进程是否存活。
    后来ceilometer通过Tooz群组管理来协调多个alarm evaluator进程。
    应用场景2:
    gnocchi中利用分布式锁操作监控项与监控数据
    
    参考:https://blog.csdn.net/qingyuanluofeng/article/details/90349185
    
  7. 第154-157比较重要,这里是初始化了个oslo_messaging.Notifier的实例,用来发送采集之后的数据到指定的队列notifications.sample,基本解释如下

    """       
      self.notifier.topics = ['notifications']
      self.notifier._driver_names = 'messagingv2'(定义在ceilometer/publisher/messaging.py 中的 telemetry_driver这个变量),代表的是ceilometer向消息队列发送消息时使用的驱动类型
      self.notifier.publisher_id = 'ceilometer.polling'
      self.notifier._driver_mgr 是从oslo.messaging.notify.drivers 中加载名称为messagingv2对应的插件
      self.notifier.transport 是从oslo.messaging.drivers 中加载名称为 rabbit对应的插件。因为ceilometer.conf 中 transport_url定义是:transport_url=rabbit://guest:guest@192.168.2.120:5672/
    """
    
  8. 此时就初始化完毕了,下面就是采集数据了,执行的是sm.run()代码,则是调用到了AgentManager类实例的run方法

四、采集数据

入口:ceilometer/agent/manager.py:AgentManager.run

def run(self):
  super(AgentManager, self).run()

  self.polling_manager = pipeline.setup_polling()
  self.join_partitioning_groups()
  self.start_polling_tasks()
  self.init_pipeline_refresh()

run方法看起来比较简单,我们来分析一下:

  1. 第4行调用了setup_polling的方法

    def setup_polling():
        """Setup polling manager according to yaml config file."""
        cfg_file = cfg.CONF.pipeline_cfg_file
        return PollingManager(cfg_file)
    

    这里面是为了加载配置文件pipeline.yaml文件中的你所配置的采集项,我们来看下这里面发生了什么:

    class PollingManager(ConfigManagerBase):
        """Polling Manager
    
        Polling manager sets up polling according to config file.
        """
    
        def __init__(self, cfg_info):
            """Setup the polling according to config.
    
            The configuration is the sources half of the Pipeline Config.
            """
            super(PollingManager, self).__init__()
            # 加载配置文件中内容
            cfg = self.load_config(cfg_info)
            self.sources = []
            if not ('sources' in cfg and 'sinks' in cfg):
                raise PipelineException("Both sources & sinks are required",
                                        cfg)
            LOG.info(_LI('detected decoupled pipeline config format'))
    
            unique_names = set()
            # 初始化并记录配置文件中的 sources 项
            for s in cfg.get('sources'):
                name = s.get('name')
                if name in unique_names:
                    raise PipelineException("Duplicated source names: %s" %
                                            name, self)
                else:
                    unique_names.add(name)
                    # 将配置文件中的数据皆放进 self.sources 中
                    self.sources.append(SampleSource(s))
            unique_names.clear()
    

    初始化好了之后,返回一个 self.polling_manager 的对象, 我们来看下这个对象中存的东西:

    """
    
    (Pdb) p self.polling_manager.__dict__
    
    {'cfg_mtime': 1592535895.477891, 'cfg_hash': 'ba1311798d634022e4b684b72bf7b42a', 'cfg_loc': '/etc/ceilometer/pipeline.yaml', 'sources': [<ceilometer.pipeline.SampleSource object at 0x7f0eec042110>, <ceilometer.pipeline.SampleSource object at 0x7f0eec0423d0>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042050>, <ceilometer.pipeline.SampleSource object at 0x7f0eec0422d0>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042290>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042390>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042350>]}
    
    (Pdb) p self.polling_manager.sources
    
    [<ceilometer.pipeline.SampleSource object at 0x7f0eec042110>, <ceilometer.pipeline.SampleSource object at 0x7f0eec0423d0>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042050>, <ceilometer.pipeline.SampleSource object at 0x7f0eec0422d0>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042290>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042390>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042350>]
    
    (Pdb) p self.polling_manager.sources[0].__dict__
    
    {
        'name': 'notification_source',
        'cfg': {
            'interval': 3600,
            'meters': ['instance', 'volume', 'image', 'snapshot', 'backup', 'ip.floating', 'router', 'network', 'subnet', 'account'],
            'name': 'notification_source',
            'sinks': ['notification_sink']
        },
        'interval': 3600,
        'meters': ['instance', 'volume', 'image', 'snapshot', 'backup', 'ip.floating', 'router', 'network', 'subnet', 'account'],
        'discovery': [],
        'resources': [],
        'sinks': ['notification_sink']
    }
    
    (Pdb) p self.polling_manager.sources[1].__dict__
    
    {
        'name': 'meter_source',
        'cfg': {
            'interval': 300,
            'meters': ['poll.*', 'memory.usage', 'memory.util'],
            'name': 'meter_source',
            'sinks': ['meter_sink']
        },
        'interval': 300,
        'meters': ['poll.*', 'memory.usage', 'memory.util'],
        'discovery': [],
        'resources': [],
        'sinks': ['meter_sink']
    }
    
                    """
    

    可以看到,这行代码的意义就是为了加载在配置文件的定义的采集项

  2. 第 6 行调用 start_polling_tasks 方法:

        def start_polling_tasks(self):
            # allow time for coordination if necessary
            delay_start = self.partition_coordinator.is_active()
    
            # set shuffle time before polling task if necessary
            delay_polling_time = random.randint(
                0, cfg.CONF.shuffle_time_before_polling_task)
    
            data = self.setup_polling_tasks()
    
            # One thread per polling tasks is enough
            # 按照时间间隔创建线程池,相同时间间隔创建一个线程
            self.polling_periodics = periodics.PeriodicWorker.create(
                [], executor_factory=lambda:
                futures.ThreadPoolExecutor(max_workers=len(data)))
    
            for interval, polling_task in data.items():
                delay_time = (interval + delay_polling_time if delay_start
                              else delay_polling_time)
    
                @periodics.periodic(spacing=interval, run_immediately=False)
                def task(running_task):
                    self.interval_task(running_task)
    
                utils.spawn_thread(utils.delayed, delay_time,
                                   self.polling_periodics.add, task, polling_task)
    
            if data:
                # Don't start useless threads if no task will run
                utils.spawn_thread(self.polling_periodics.start, allow_empty=True)
    
    1. 前 8 行不用说,就是简单的参数定义,重要的是第 9 行之后的代码,分析一下

    2. 第9 行调用了 setup_polling_tasks 的方法:

          def setup_polling_tasks(self):
              """  
              (Pdb) p self.polling_manager.sources[1].__dict__
              {
                  'name': 'meter_source',
                  'cfg': {
                      'interval': 300,
                      'meters': ['poll.*', 'memory.usage', 'memory.util'],
                      'name': 'meter_source',
                      'sinks': ['meter_sink']
                  },
                  'interval': 300,
                  'meters': ['poll.*', 'memory.usage', 'memory.util'],
                  'discovery': [],
                  'resources': [],
                  'sinks': ['meter_sink']
              }
              
              (Pdb) p self.extensions[0].__dict__
              {
                  'obj': < ceilometer.compute.pollsters.memory.MemoryUtilizationPollster object at 0x7ff12062e590 > ,
                  'entry_point': EntryPoint.parse('memory.util = ceilometer.compute.pollsters.memory:MemoryUtilizationPollster'),
                  'name': 'memory.util',
                  'plugin': < class 'ceilometer.compute.pollsters.memory.MemoryUtilizationPollster' >
              }
              """
              polling_tasks = {}
              for source in self.polling_manager.sources:
                  polling_task = None
                  for pollster in self.extensions:
                      # 将 souce 中的 meters 与 extensions 中的每个对象的 name 进行匹配,如果一样则以采集周期为 key,以 PollingTask 对象为 value,加入到 polling_tasks中
                      if source.support_meter(pollster.name):
                          polling_task = polling_tasks.get(source.get_interval())
                          if not polling_task:
                              polling_task = self.create_polling_task()
                              polling_tasks[source.get_interval()] = polling_task
                          polling_task.add(pollster, source)
              return polling_tasks
      

      这里是遍历self.polling_manager.sources中的source,同时遍历self.extensions,查看如果对应插件在对应source的meters列表中,代表该插件需要定期执行,获取该插件定义的执行周期,将其加入到polling_tasks中

      我们来看下 polling_tasks 中的 value 对象,其实就是初始化了一些值,然后调用 add 方法

      class PollingTask(object):
          """Polling task for polling samples and notifying.
      
          A polling task can be invoked periodically or only once.
          """
      
          def __init__(self, agent_manager):
              self.manager = agent_manager
      
              # elements of the Cartesian product of sources X pollsters
              # with a common interval
              self.pollster_matches = collections.defaultdict(set)
      
              # we relate the static resources and per-source discovery to
              # each combination of pollster and matching source
              resource_factory = lambda: Resources(agent_manager)
              self.resources = collections.defaultdict(resource_factory)
      
              self._batch = cfg.CONF.batch_polled_samples
              self._telemetry_secret = cfg.CONF.publisher.telemetry_secret
              
          def add(self, pollster, source):
              self.pollster_matches[source.name].add(pollster)
              key = Resources.key(source.name, pollster)
              self.resources[key].setup(source)
      

      即最终返回后的 data 是:

      (Pdb) p data
      {
        3600: < ceilometer.agent.manager.PollingTask object at 0x7fe6a4043bd0 > ,
        300: < ceilometer.agent.manager.PollingTask object at 0x7fe6a4043d10 >
      }
      (Pdb) p data[3600].__dict__
      {
        'manager': < ceilometer.agent.manager.AgentManager object at 0x3230b50 > ,
        '_telemetry_secret': 'change this for valid signing',
        'pollster_matches': defaultdict( < type 'set' > , {
        		'notification_source': set([ < stevedore.extension.Extension object at 0x7fe6b062ec50 > ])
        }),
        'resources': defaultdict( < function < lambda > at 0x7fe6a40319b0 > , {
        		'notification_source-instance': < ceilometer.agent.manager.Resources object at 0x7fe6a4043b90 >
        }),
        '_batch': True
      }
      
    3. 第 13 到 30 行即是按照时间间隔创建线程池,相同时间间隔创建一个线程,这里用到了futurist 库(这里不做介绍),实现了一个周期任务,每隔配置文件中定义的采样周期调用 self.interval_task,而 self.interval_task 又调用了 task.poll_and_notify(),下面看下task.poll_and_notify()的实现:

      def poll_and_notify(self):
              """Polling sample and notify."""
              cache = {}
              discovery_cache = {}
              poll_history = {}
              """
              (Pdb) self.pollster_matches
              defaultdict( < type 'set' > , {
                  'meter_source': set([ < stevedore.extension.Extension object at 0x7f9a645de8d0 > , < stevedore.extension.Extension object at 0x7f9a645d40d0 > ]),
                  'cpu_source': set([ < stevedore.extension.Extension object at 0x7f9a645d4cd0 > ]),
                  'cpu_util_source': set([ < stevedore.extension.Extension object at 0x7f9a645dec10 > ]),
                  'disk_source': set([ < stevedore.extension.Extension object at 0x7f9a64399c90 > , < stevedore.extension.Extension object at 0x7f9a645d4ad0 > , < stevedore.extension.Extension object at 0x7f9a645d49d0 > , < stevedore.extension.Extension object at 0x7f9a645d4950 > , < stevedore.extension.Extension object at 0x7f9a645dea90 > , < stevedore.extension.Extension object at 0x7f9a64389fd0 > , < stevedore.extension.Extension object at 0x7f9a645d43d0 > , < stevedore.extension.Extension object at 0x7f9a645d4d50 > ]),
                  'network_source': set([ < stevedore.extension.Extension object at 0x7f9a645def10 > , < stevedore.extension.Extension object at 0x7f9a645d4650 > , < stevedore.extension.Extension object at 0x7f9a645d4450 > , < stevedore.extension.Extension object at 0x7f9a645d4550 > ]),
                  'volume_source': set([ < stevedore.extension.Extension object at 0x7f9a645d4b50 > , < stevedore.extension.Extension object at 0x7f9a645d4c50 > ])
              })
              """
              # self.pollster_matches 中存的是以你在配置文件中定义的 source name 为 key,以在 source name 中定义的每个采集插件的对象的集合为 value
              # Resources中的 key 是 source.name + 插件的名字
              for source_name in self.pollster_matches:
                  for pollster in self.pollster_matches[source_name]:
      
                      """
                      first
                          (Pdb) p key
                          'meter_source-memory.util'
                          (Pdb) p candidate_res
                          []
                          (Pdb) p pollster.obj.default_discovery
                          'local_instances'                               
                      """
                      key = Resources.key(source_name, pollster)
                      candidate_res = list(
                          self.resources[key].get(discovery_cache))
                      if not candidate_res and pollster.obj.default_discovery:
                          candidate_res = self.manager.discover(
                              [pollster.obj.default_discovery], discovery_cache)
                          """
                          (Pdb) p candidate_res
                          [<Server: jy-2>]
                          """
      
                      # Remove duplicated resources and black resources. Using
                      # set() requires well defined __hash__ for each resource.
                      # Since __eq__ is defined, 'not in' is safe here.
                      polling_resources = []
                      black_res = self.resources[key].blacklist
                      history = poll_history.get(pollster.name, [])
                      for x in candidate_res:
                          if x not in history:
                              history.append(x)
                              if x not in black_res:
                                  polling_resources.append(x)
                      """
                      first
                          (Pdb) p poll_history
                          {'memory.util': [<Server: jy-2>]}
                      second
                          (Pdb) p poll_history
                          {'memory.util': [<Server: jy-2>], 'memory.usage': [<Server: jy-2>]}
                      """
                      poll_history[pollster.name] = history
      
                      # If no resources, skip for this pollster
                      if not polling_resources:
                          p_context = 'new ' if history else ''
                          LOG.info(_("Skip pollster %(name)s, no %(p_context)s"
                                     "resources found this cycle"),
                                   {'name': pollster.name, 'p_context': p_context})
                          continue
      
                      LOG.info(_("Polling pollster %(poll)s in the context of "
                                 "%(src)s"),
                               dict(poll=pollster.name, src=source_name))
                      try:
                          polling_timestamp = timeutils.utcnow().isoformat()
                          samples = pollster.obj.get_samples(
                              manager=self.manager,
                              cache=cache,
                              resources=polling_resources
                          )
                          sample_batch = []
      
                          # filter None in samples
                          """
                          first
                              (Pdb) samples
                              [<name: memory.util, volume: 18.58, resource_id: 32eb16e0-c8af-4b41-b18c-b63152f0f8fc, timestamp: None>]
                          second
                              (Pdb) samples
                              [<name: memory.usage, volume: 91, resource_id: 32eb16e0-c8af-4b41-b18c-b63152f0f8fc, timestamp: None>]
                          """
                          samples = [s for s in samples if s is not None]
      
                          for sample in samples:
                              # Note(yuywz): Unify the timestamp of polled samples
                              sample.set_timestamp(polling_timestamp)
                              """
                              (Pdb) sample_dict
                              {
                                  'counter_name': 'memory.util',
                                  'resource_id': u '32eb16e0-c8af-4b41-b18c-b63152f0f8fc',
                                  'timestamp': '2020-06-19T06:21:32.542893',
                                  'counter_volume': 18.58,
                                  'user_id': u '7431e07e49de2703f1b9e703daf5aff158e32028506f77e05a62e9eb3892dcde',
                                  'message_signature': '76e6cbd944963855306b829cdf2c49bba9f0b20220c8c738b61b5048be51b24f',
                                  'resource_metadata': {
                                      'status': u 'active',
                                      'ephemeral_gb': 0,
                                      'disk_gb': 0,
                                      'instance_host': u 'node-3.domain.tld',
                                      'kernel_id': None,
                                      'image': None,
                                      'ramdisk_id': None,
                                      'host': u '619a474e64dbedcd55508bda51aea8a611fd15f3f3e8fa39ce0d0552',
                                      'flavor': {
                                          'name': u '1C-0.5G',
                                          u 'links': [{
                                              u 'href': u 'http://nova-api.openstack.svc.cluster.local:8774/ca80e5ccd445438580c4b128296d1936/flavors/211',
                                              u 'rel': u 'bookmark'
                                          }],
                                          'ram': 512,
                                          'ephemeral': 0,
                                          'vcpus': 1,
                                          'disk': 0,
                                          u 'id': u '211'
                                      },
                                      'task_state': None,
                                      'image_ref_url': None,
                                      'memory_mb': 512,
                                      'root_gb': 0,
                                      'display_name': u 'jy-2',
                                      'name': u 'instance-00000002',
                                      'vcpus': 1,
                                      'instance_id': u '32eb16e0-c8af-4b41-b18c-b63152f0f8fc',
                                      'instance_type': u '1C-0.5G',
                                      'state': u 'active',
                                      'image_ref': None,
                                      'OS-EXT-AZ:availability_zone': u 'default-az'
                                  },
                                  'source': 'openstack',
                                  'counter_unit': '%',
                                  'project_id': u '34ecb9aca8454157bd5c3c64708990bf',
                                  'message_id': '6fbcdc0e-b1f5-11ea-a901-be1696955049',
                                  'monotonic_time': None,
                                  'counter_type': 'gauge'
                              }
                              """
                              sample_dict = (
                                  publisher_utils.meter_message_from_counter(
                                      sample, self._telemetry_secret
                                  ))
                              if self._batch:
                                  sample_batch.append(sample_dict)
                              else:
                                  self._send_notification([sample_dict])
      
                          if sample_batch:
                              self._send_notification(sample_batch)
      
                      except plugin_base.PollsterPermanentError as err:
                          LOG.error(_(
                              'Prevent pollster %(name)s for '
                              'polling source %(source)s anymore!')
                              % ({'name': pollster.name, 'source': source_name}))
                          self.resources[key].blacklist.extend(err.fail_res_list)
                      except Exception as err:
                          LOG.warning(_(
                              'Continue after error from %(name)s: %(error)s')
                              % ({'name': pollster.name, 'error': err}),
                              exc_info=True)
      
          def _send_notification(self, samples):
              self.manager.notifier.sample(
                  {},
                  'telemetry.polling',
                  {'samples': samples}
              )
      
      
      1. 前 5 行不谈

      2. 第 19 和 20 行,拿出在 self.pollster_matches 中定义的资源和插件遍历,然后第 31 行到最后都是采集数据的一些转换,其中比较重要的地方都已经打印出来了,很清楚,选几个介绍下(candidate_res这个对象我现在貌似还不能非常清楚的理解,如果有清楚的大佬,还请麻烦留言介绍下,十分感谢)

      3. 看下第 76 行,这边就是采集数据的地方,调用的就是插件中的方法,比如现在的pollster.name=memory.usage,那么就是调用MemoryUsagePollster类的get_samples方法,可以看见在该方法中实现数据采集之后,yield 返回

        class MemoryUsagePollster(pollsters.BaseComputePollster):
        
            def get_samples(self, manager, cache, resources):
                self._inspection_duration = self._record_poll_time()
                for instance in resources:
                    LOG.debug('Checking memory usage for instance %s', instance.id)
                    try:
                        memory_info = self.inspector.inspect_memory_usage(
                            instance, self._inspection_duration)
                        LOG.debug("MEMORY USAGE: %(instance)s %(usage)f",
                                  {'instance': instance,
                                   'usage': memory_info.usage})
                        yield util.make_sample_from_instance(
                            instance,
                            name='memory.usage',
                            type=sample.TYPE_GAUGE,
                            unit='MB',
                            volume=memory_info.usage,
                        )
        
      4. 再看下第 157 行,这里是实现发送消息的地方,调用self._send_notification(sample_batch)

            def _send_notification(self, samples):
                self.manager.notifier.sample(
                    {},
                    'telemetry.polling',
                    {'samples': samples}
                )
        

        这里使用了oslo_messaging库,继续往里分析,这里的 sample 调用的是 oslo_messaging.py/notify/notifier.py:Notifier 类的 sample 方法:

            def sample(self, ctxt, event_type, payload):
        
                self._notify(ctxt, event_type, payload, 'SAMPLE')
        

        继续往里调用 _notify 方法

            def _notify(self, ctxt, event_type, payload, priority, publisher_id=None,
                        retry=None):
                payload = self._serializer.serialize_entity(ctxt, payload)
                ctxt = self._serializer.serialize_context(ctxt)
        
                msg = dict(message_id=six.text_type(uuid.uuid4()),
                           publisher_id=publisher_id or self.publisher_id,
                           event_type=event_type,
                           priority=priority,
                           payload=payload,
                           timestamp=six.text_type(timeutils.utcnow()))
        
                def do_notify(ext):
                    try:
                        ext.obj.notify(ctxt, msg, priority, retry or self.retry)
                    except Exception as e:
                        _LOG.exception("Problem '%(e)s' attempting to send to "
                                       "notification system. Payload=%(payload)s",
                                       {'e': e, 'payload': payload})
        
                if self._driver_mgr.extensions:
                    self._driver_mgr.map(do_notify)
        

        这里主要是调用了 15 行,我们看下这个对象的信息:

        (Pdb) p ext.obj
        <oslo_messaging.notify.messaging.MessagingV2Driver object at 0x7f72f876d510>
        (Pdb) p ext.obj.__dict__
        {'topics': ['notifications'], 'version': 2.0, 'transport': <oslo_messaging.transport.Transport object at 0x7f72f85dd1d0>, 'conf': <oslo_config.cfg.ConfigOpts object at 0x13dc990>}
        

        继续往里调用 notify 方法(oslo_messaging.py/notify/messaging.py:MessagingDriver类):

            def notify(self, ctxt, message, priority, retry):
                priority = priority.lower()
                for topic in self.topics:
                    target = oslo_messaging.Target(topic='%s.%s' % (topic, priority))
                    try:
                        self.transport._send_notification(target, ctxt, message,
                                                          version=self.version,
                                                          retry=retry)
                    except Exception:
                        LOG.exception("Could not send notification to %(topic)s. "
                                      "Payload=%(message)s",
                                      {'topic': topic, 'message': message})
        

        可以看到这里的 target 就是:

        (Pdb) p target
        <Target topic=notifications.sample>
        

        所以最终调用 self.transport._send_notification 将消息发送至 notifications.sample 队列,由此整个 ceilometer-compute 服务的采集任务就完成了

        ceilometer-central 服务和 ceilometer-compute 服务差不多,加载命令空间的插件不同,逻辑基本一致,不赘述。

参考:
https://www.cnblogs.com/luohaixian/p/11145939.html

 类似资料: