openstack-ceilometer第三式:源码分析-notification

端木令
2023-12-01

openstack-ceilometer第三式:源码分析-notification

以社区 N 版代码为例

一、启动命令

exec ceilometer-agent-notification --config-file /etc/ceilometer/ceilometer.conf

二、代码入口

ceilometer代码使用setuptools的pbr管理,该部分知识请见:

Python打包之setuptools

Openstack中setuptools和pbr软件打包管理

入口在:/ceilometer/cmd/agent_notification.py的main函数

def main():
    service.prepare_service()

    sm = cotyledon.ServiceManager()
    sm.add(notification.NotificationService,
           workers=CONF.notification.workers)
    sm.run()

该部分使用cotyledon的多进程框架实现

直接看NotificationService类的 run 方法

三、服务启动

服务启动在:NotificationService.run

    def run(self):
        super(NotificationService, self).run()
        self.shutdown = False
        self.periodic = None
        self.partition_coordinator = None
        self.coord_lock = threading.Lock()

        self.listeners = []

        # NOTE(kbespalov): for the pipeline queues used a single amqp host
        # hence only one listener is required
        self.pipeline_listener = None

        """
        (Pdb) self.pipeline_manager.__dict__
        {
            'cfg_mtime': 1592551325.2732353,
            'cfg_hash': 'ba1311798d634022e4b684b72bf7b42a',
            'cfg_loc': '/etc/ceilometer/pipeline.yaml',
            'pipelines': [ < ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3ad0 > , < ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3290 > , < ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3b90 > , < ceilometer.pipeline.SamplePipeline object at 0x7fa1741e39d0 > , < ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3950 > , < ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3a10 > , < ceilometer.pipeline.SamplePipeline object at 0x7fa1741e37d0 > ]
        }
        
        (Pdb) self.pipeline_manager.pipelines[0].__dict__
        {
            'source': < ceilometer.pipeline.SampleSource object at 0x7fa17405e050 > ,
            'sink': < ceilometer.pipeline.SampleSink object at 0x7fa17405e350 > ,
            'name': 'notification_source:notification_sink'
        }
        
        (Pdb) self.pipeline_manager.pipelines[0].source.__dict__
        {
            'name': 'notification_source',
            'cfg': {
                'interval': 3600,
                'meters': ['instance', 'volume', 'image', 'snapshot', 'backup', 'ip.floating', 'router', 'network', 'subnet', 'account'],
                'name': 'notification_source',
                'sinks': ['notification_sink']
            },
            'interval': 3600,
            'meters': ['instance', 'volume', 'image', 'snapshot', 'backup', 'ip.floating', 'router', 'network', 'subnet', 'account'],
            'discovery': [],
            'resources': [],
            'sinks': ['notification_sink']
        }
        
        (Pdb) self.pipeline_manager.pipelines[0].sink.__dict__
        {
            'publishers': [ < ceilometer.publisher.messaging.SampleNotifierPublisher object at 0x7fa17405e410 > ],
            'transformers': [],
            'name': 'notification_sink',
            'cfg': {
                'publishers': ['notifier://'],
                'transformers': None,
                'name': 'notification_sink'
            },
            'multi_publish': False,
            'transformer_cfg': []
        }

        """
        self.pipeline_manager = pipeline.setup_pipeline()
        self.event_pipeline_manager = pipeline.setup_event_pipeline()

        self.transport = messaging.get_transport()

        if cfg.CONF.notification.workload_partitioning:
            self.group_id = self.NOTIFICATION_NAMESPACE
            self.partition_coordinator = coordination.PartitionCoordinator()
            self.partition_coordinator.start()
        else:
            # FIXME(sileht): endpoint uses the notification_topics option
            # and it should not because this is an oslo_messaging option
            # not a ceilometer. Until we have something to get the
            # notification_topics in another way, we must create a transport
            # to ensure the option has been registered by oslo_messaging.
            messaging.get_notifier(self.transport, '')
            self.group_id = None

        # 该函数里判断了是否支持工作负载,如果是则返回SamplePipelineTransportManager类实例替换掉
    		# PipelineManager类实例,否则还是PipelineManager类实例
   		  # 这两者的区别在于publisher函数实现是不一样的
   		  # SamplePipelineTransportManager的在调用publisher函数时会再发到消息队列中去保存,之后会再取出来处理再发到gnocchi-api上
    		# PipelineManager的则直接去发到gnocchi-api服务上去了
        self.pipe_manager = self._get_pipe_manager(self.transport,
                                                   self.pipeline_manager)
        self.event_pipe_manager = self._get_event_pipeline_manager(
            self.transport)

        self._configure_main_queue_listeners(self.pipe_manager,
                                             self.event_pipe_manager)

        if cfg.CONF.notification.workload_partitioning:
						......

        if not cfg.CONF.notification.disable_non_metric_meters:
            LOG.warning(_LW('Non-metric meters may be collected. It is highly '
                            'advisable to disable these meters using '
                            'ceilometer.conf or the pipeline.yaml'))

        self.init_pipeline_refresh()

以上的代码中的一些参数我都用pdb打印出来了,可以更加直观的感受,我们来逐行分析一下服务启动的过程:

  1. 前 12 主要是一些参数的定义和初始化的工作

  2. 看下第 61 行和 62 行两部分,实现的逻辑是一样的,我们来分析一下 61 行:

    def setup_pipeline(transformer_manager=None):
        """Setup pipeline manager according to yaml config file."""
    
        default = extension.ExtensionManager('ceilometer.transformer')
        cfg_file = cfg.CONF.pipeline_cfg_file
        return PipelineManager(cfg_file, transformer_manager or default,
                               SAMPLE_TYPE)
    
    1. 第4行加载了 ceilometer.transformer 命名空间中的插件

      ceilometer.transformer =
          accumulator = ceilometer.transformer.accumulator:TransformerAccumulator
          delta = ceilometer.transformer.conversions:DeltaTransformer
          unit_conversion = ceilometer.transformer.conversions:ScalingTransformer
          rate_of_change = ceilometer.transformer.conversions:RateOfChangeTransformer
          aggregator = ceilometer.transformer.conversions:AggregatorTransformer
          arithmetic = ceilometer.transformer.arithmetic:ArithmeticTransformer
      
    2. 第 5 行的文件是 pipeline.yaml ,然后使用 PipelineManager 类进行初始化

      我们来看下 PipelineManager 类初始化的内容,同样的我也加了 pdb 调试出来,更加直观:

      class PipelineManager(ConfigManagerBase):
          def __init__(self, cfg_info, transformer_manager, p_type=SAMPLE_TYPE):
              super(PipelineManager, self).__init__()
              cfg = self.load_config(cfg_info)
      
              self.pipelines = []
              if not ('sources' in cfg and 'sinks' in cfg):
                  raise PipelineException("Both sources & sinks are required",
                                          cfg)
              LOG.info(_LI('detected decoupled pipeline config format'))
      
              unique_names = set()
              sources = []
              """
              pipeline.yaml
                  (Pdb) cfg.get('sources')
                  [{
                      'interval': 3600,
                      'meters': ['instance', 'volume', 'image', 'snapshot', 'backup', 'ip.floating', 'router', 'network', 'subnet', 'account'],
                      'name': 'notification_source',
                      'sinks': ['notification_sink']
                  }, {
                      'interval': 300,
                      'meters': ['poll.*', 'memory.usage', 'memory.util'],
                      'name': 'meter_source',
                      'sinks': ['meter_sink']
                  }]
              event_pipeline.yaml
                  (Pdb) cfg.get('sources')
                  [{
                      'sinks': ['event_sink'],
                      'name': 'event_source',
                      'events': ['*', '!magnum.bay.metrics.update', '!compute.instance.exists', '!volume.exists', '!snapshot.exists']
                  }]
              
              """
              for s in cfg.get('sources'):
                  name = s.get('name')
                  if name in unique_names:
                      raise PipelineException("Duplicated source names: %s" %
                                              name, self)
                  else:
                      unique_names.add(name)
                      """
                      pipeline.yaml
                          (Pdb) sources
                          [<ceilometer.pipeline.SampleSource object at 0x7fa17405e050>, <ceilometer.pipeline.SampleSource object at 0x7fa17405e110>, <ceilometer.pipeline.SampleSource object at 0x7fa17405e090>, <ceilometer.pipeline.SampleSource object at 0x7fa17405e190>, <ceilometer.pipeline.SampleSource object at 0x7fa17405e250>, <ceilometer.pipeline.SampleSource object at 0x7fa17405e210>, <ceilometer.pipeline.SampleSource object at 0x7fa17405e310>]
                          (Pdb) sources[0].__dict__
                          {
                              'name': 'notification_source',
                              'cfg': {
                                  'interval': 3600,
                                  'meters': ['instance', 'volume', 'image', 'snapshot', 'backup', 'ip.floating', 'router', 'network', 'subnet', 'account'],
                                  'name': 'notification_source',
                                  'sinks': ['notification_sink']
                              },
                              'interval': 3600,
                              'meters': ['instance', 'volume', 'image', 'snapshot', 'backup', 'ip.floating', 'router', 'network', 'subnet', 'account'],
                              'discovery': [],
                              'resources': [],
                              'sinks': ['notification_sink']
                          }
                          (Pdb) sources[1].__dict__
                          {'name': 'meter_source', 'cfg': {'interval': 300, 'meters': ['poll.*', 'memory.usage', 'memory.util'], 'name': 'meter_source', 'sinks': ['meter_sink']}, 'interval': 300, 'meters': ['poll.*', 'memory.usage', 'memory.util'], 'discovery': [], 'resources': [], 'sinks': ['meter_sink']}
                     
                      event_pipeline.yaml
                          {
                              'cfg': {
                                  'sinks': ['event_sink'],
                                  'name': 'event_source',
                                  'events': ['*', '!magnum.bay.metrics.update', '!compute.instance.exists', '!volume.exists', '!snapshot.exists']
                              },
                              'events': ['*', '!magnum.bay.metrics.update', '!compute.instance.exists', '!volume.exists', '!snapshot.exists'],
                              'name': 'event_source',
                              'sinks': ['event_sink']
                          }
                      """
                      sources.append(p_type['source'](s))
              unique_names.clear()
      
              sinks = {}
              """
              pipeline.yaml
                  (Pdb) cfg.get('sinks')
                  [{
                      'publishers': ['notifier://'],
                      'transformers': None,
                      'name': 'notification_sink'
                  }, {
                      'publishers': ['notifier://'],
                      'transformers': None,
                      'name': 'meter_sink'
                  }]
              event_pipeline.yaml
                  (Pdb) cfg.get('sinks')
                  [{'publishers': ['notifier://'], 'transformers': None, 'name': 'event_sink'}]
              """
              for s in cfg.get('sinks'):
                  name = s.get('name')
                  if name in unique_names:
                      raise PipelineException("Duplicated sink names: %s" %
                                              name, self)
                  else:
                      unique_names.add(name)
                      """
                      (Pdb) sinks
                      {'network_sink': <ceilometer.pipeline.SampleSink object at 0x7fa1741e3350>, 'volume_sink': <ceilometer.pipeline.SampleSink object at 0x7fa1741dbfd0>, 'disk_sink': <ceilometer.pipeline.SampleSink object at 0x7fa1741dbc50>, 'cpu_delta_sink': <ceilometer.pipeline.SampleSink object at 0x7fa17405e290>, 'cpu_sink': <ceilometer.pipeline.SampleSink object at 0x7fa17405e390>, 'meter_sink': <ceilometer.pipeline.SampleSink object at 0x7fa17405e3d0>, 'notification_sink': <ceilometer.pipeline.SampleSink object at 0x7fa17405e350>}
                      (Pdb) sinks.get('network_sink').__dict__
                      {
                          'publishers': [ < ceilometer.publisher.messaging.SampleNotifierPublisher object at 0x7fa1741e3890 > ],
                          'transformers': [ < ceilometer.transformer.conversions.RateOfChangeTransformer object at 0x7fa1741e3710 > ],
                          'name': 'network_sink',
                          'cfg': {
                              'publishers': ['notifier://'],
                              'transformers': [{
                                  'name': 'rate_of_change',
                                  'parameters': {
                                      'source': {
                                          'map_from': {
                                              'name': 'network\\.(incoming|outgoing)\\.(bytes|packets)',
                                              'unit': '(B|packet)'
                                          }
                                      },
                                      'target': {
                                          'map_to': {
                                              'name': 'network.\\1.\\2.rate',
                                              'unit': '\\1/s'
                                          },
                                          'type': 'gauge'
                                      }
                                  }
                              }],
                              'name': 'network_sink'
                          },
                          'multi_publish': False,
                          'transformer_cfg': [{
                              'name': 'rate_of_change',
                              'parameters': {
                                  'source': {
                                      'map_from': {
                                          'name': 'network\\.(incoming|outgoing)\\.(bytes|packets)',
                                          'unit': '(B|packet)'
                                      }
                                  },
                                  'target': {
                                      'map_to': {
                                          'name': 'network.\\1.\\2.rate',
                                          'unit': '\\1/s'
                                      },
                                      'type': 'gauge'
                                  }
                              }
                          }]
                      }
                      
                      """
                      sinks[s['name']] = p_type['sink'](s, transformer_manager)
              unique_names.clear()
      
              for source in sources:
                  source.check_sinks(sinks)
                  for target in source.sinks:
                      pipe = p_type['pipeline'](source, sinks[target])
                      if pipe.name in unique_names:
                          raise PipelineException(
                              "Duplicate pipeline name: %s. Ensure pipeline"
                              " names are unique. (name is the source and sink"
                              " names combined)" % pipe.name, cfg)
                      else:
                          unique_names.add(pipe.name)
                          self.pipelines.append(pipe)
              unique_names.clear()
      
          def publisher(self):
              """Build a new Publisher for these manager pipelines.
      
              :param context: The context.
              """
              return PublishContext(self.pipelines)
      
      1. 第 37-79 行是取出 pipeline.yaml 文件中的 sources然后经过p_type[‘source’]类的初始化,再放进 sources 对象里,那么p_type[‘source’]类是什么呢,如下所示,代表SampleSource类,这个类里面就是解析下配置文件中每个资源的采集周期interval和采集插件 meters

        SAMPLE_TYPE = {'pipeline': SamplePipeline,
                       'source': SampleSource,
                       'sink': SampleSink}
        EVENT_TYPE = {'pipeline': EventPipeline,
                      'source': EventSource,
                      'sink': EventSink}               
        
      2. 同样的,第 98-158 行是为了取出配置文件中的 sinks 对象,使用SampleSink类进行初始化,这个类本身没有 init 函数,所以使用此类的基类进行初始化,即 Sink 类

        class Sink(object):
            def __init__(self, cfg, transformer_manager):
                self.cfg = cfg
        
                try:
                    self.name = cfg['name']
                    # It's legal to have no transformer specified
                    self.transformer_cfg = cfg.get('transformers') or []
                except KeyError as err:
                    raise PipelineException(
                        "Required field %s not specified" % err.args[0], cfg)
        
                if not cfg.get('publishers'):
                    raise PipelineException("No publisher specified", cfg)
        
                self.publishers = []
                for p in cfg['publishers']:
                    if '://' not in p:
                        # Support old format without URL
                        p = p + "://"
                    try:
                        self.publishers.append(publisher.get_publisher(p,
                                                                       self.NAMESPACE))
                    except Exception:
                        LOG.exception(_("Unable to load publisher %s"), p)
        
                self.multi_publish = True if len(self.publishers) > 1 else False
                self.transformers = self._setup_transformers(cfg, transformer_manager)
        

        这里面主要看下第 22 行和第 28 行,分析:

        1. 第 22 行调用了get_publisher方法,这个里面主要是加载ceilometer.publisher命名空间中的notifier插件
        2. 第 28 行调用了_setup_transformers 方法,其中transformer_manager参数是ceilometer-transformer 中的所有插件,方法里面主要实现的是,遍历配置文件中的 sinks ,然后根据transformer 中的 name 来生成插件对象,比如name 是rate_of_change的话,即代表生成在ceilometer-transformer中的rate_of_change插件,然后放到transformers列表里
      3. 我们接着来分析PipelineManager,然后第 160-185 行,就是遍历在以上分析配置文件之后存储的值 sources 和 sinks,拿出 sources 中每个 source,然后再拿出 source 中的每个 sink,使用SamplePipeline类初始化之后,放进self.pipelines列表,该类没有 init 方法,所以调用基类Pipeline

        @six.add_metaclass(abc.ABCMeta)
        class Pipeline(object):
            """Represents a coupling between a sink and a corresponding source."""
        
            def __init__(self, source, sink):
                self.source = source
                self.sink = sink
                self.name = str(self)
        
      4. 最终在对象存储的数据是:

        pipeline.yaml
            (Pdb) self.pipelines
            [<ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3ad0>, <ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3290>, <ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3b90>, <ceilometer.pipeline.SamplePipeline object at 0x7fa1741e39d0>]
            (Pdb) self.pipelines[0].__dict__
            {'source': <ceilometer.pipeline.SampleSource object at 0x7fc20c082650>, 'sink': <ceilometer.pipeline.SampleSink object at 0x7fc20c082050>, 'name': 'notification_source:notification_sink'}
        event_pipeline.yaml
            (Pdb) self.pipelines
            [<ceilometer.pipeline.EventPipeline object at 0x7fa1741e3d10>]
            (Pdb) self.pipelines[0].__dict__
            {'source': <ceilometer.pipeline.EventSource object at 0x7fa1741e3dd0>, 'sink': <ceilometer.pipeline.EventSink object at 0x7fa1741e3ed0>, 'name': 'event:event_source:event_sink'}
        """
        
    3. 再继续 NotificationService.run 方法,61 行已经分析完毕,62 行和 61 行逻辑一致

    4. 第 79-82 行的分析见代码中

    5. 最终的是第 89 行的代码,调用了 _configure_main_queue_listeners 方法:

          def _configure_main_queue_listeners(self, pipe_manager,
                                              event_pipe_manager):
              """"""
              """
              (Pdb) notification_manager.extensions[0].__dict__
              {
                  'obj': < ceilometer.network.notifications.Firewall object at 0x7fa1741e3d50 > ,
                  'entry_point': EntryPoint.parse('network.services.firewall = ceilometer.network.notifications:Firewall'),
                  'name': 'network.services.firewall',
                  'plugin': < class 'ceilometer.network.notifications.Firewall' >
              }
              """
              notification_manager = self._get_notifications_manager(pipe_manager)
              if not list(notification_manager):
                  LOG.warning(_('Failed to load any notification handlers for %s'),
                              self.NOTIFICATION_NAMESPACE)
      
              ack_on_error = cfg.CONF.notification.ack_on_event_error
      
              endpoints = []
              """
              (Pdb) endpoints
              [<ceilometer.event.endpoint.EventsNotificationEndpoint object at 0x2f5f610>]
              (Pdb) endpoints[0].__dict__
              {
                  'event_converter': < ceilometer.event.converter.NotificationEventsConverter object at 0x7fa17467c050 > ,
                  'manager': < ceilometer.pipeline.PipelineManager object at 0x2f6d550 >
              }
              (Pdb) endpoints[0].manager.__dict__
              {
                  'cfg_mtime': 1592551325.2732353,
                  'cfg_hash': 'c01e8ee1c40b66314628536afcd48a39',
                  'cfg_loc': '/etc/ceilometer/event_pipeline.yaml',
                  'pipelines': [ < ceilometer.pipeline.EventPipeline object at 0x7fa1741e3d10 > ]
              }
              """
              endpoints.append(
                  event_endpoint.EventsNotificationEndpoint(event_pipe_manager))
      
              targets = []
              for ext in notification_manager:
                  handler = ext.obj
                  """
                  (Pdb) cfg.CONF.oslo_messaging_notifications.topics
                  ['notifications']
                  (Pdb) cfg.CONF.notification.disable_non_metric_meters
                  True
                  """
      
                  if (cfg.CONF.notification.disable_non_metric_meters and
                          isinstance(handler, base.NonMetricNotificationBase)):
                      continue
                  LOG.debug('Event types from %(name)s: %(type)s'
                            ' (ack_on_error=%(error)s)',
                            {'name': ext.name,
                             'type': ', '.join(handler.event_types),
                             'error': ack_on_error})
                  # NOTE(gordc): this could be a set check but oslo_messaging issue
                  # https://bugs.launchpad.net/oslo.messaging/+bug/1398511
                  # This ensures we don't create multiple duplicate consumers.
                  for new_tar in handler.get_targets(cfg.CONF):
                      if new_tar not in targets:
                          targets.append(new_tar)
                  endpoints.append(handler)
      
              """
              (Pdb) endpoints
              [<ceilometer.event.endpoint.EventsNotificationEndpoint object at 0x2f5f610>,
               <ceilometer.ipmi.notifications.ironic.TemperatureSensorNotification object at 0x7fa1741f6810>,
                <ceilometer.telemetry.notifications.TelemetryIpc object at 0x7fa17424d410>,
                 <ceilometer.ipmi.notifications.ironic.FanSensorNotification object at 0x7fa17424d8d0>,
                  <ceilometer.ipmi.notifications.ironic.VoltageSensorNotification object at 0x7fa1742461d0>,
                   <ceilometer.meter.notifications.ProcessMeterNotifications object at 0x7fa17424dc90>,
                    <ceilometer.ipmi.notifications.ironic.CurrentSensorNotification object at 0x7fa17467ddd0>]
              (Pdb) cfg.CONF.notification.messaging_urls
              ['rabbit://rabbitmq:I5dZs2KN@rabbitmq.openstack.svc.cluster.local:5672/']
              """
              urls = cfg.CONF.notification.messaging_urls or [None]
              for url in urls:
                  transport = messaging.get_transport(url)
                  # NOTE(gordc): ignore batching as we want pull
                  # to maintain sequencing as much as possible.
                  listener = messaging.get_batch_notification_listener(
                      transport, targets, endpoints)
                  listener.start()
                  self.listeners.append(listener)
      
      1. 第 13 行,里面主要是加载了ceilometer.notification命名空间的插件,并将pipe_manager作为自动加载extension时传入的参数

      2. 第 37 行,定义了当其余组件触发事件的时候的 endpoints,此处我们稍后分析

      3. 第 40-64 行,遍历刚在 13 行加载的插件,里面大概的逻辑是,先判断该插件的对象是不是 NonMetricNotificationBase 类的子类,如果是的话,直接跳过,如果不是的话,调用插件的 get-target 方法,生成oslo_messaging.Target实例,topic 是 notifications,然后将插件也放进 endpoints 中

      4. 第 78 行到最后就是开启服务的最后了,首先我们看下这个 urls 是什么:

        (Pdb) cfg.CONF.notification.messaging_urls
        ['rabbit://rabbitmq:I5dZs2KN@rabbitmq.openstack.svc.cluster.local:5672/']
        

        使用 rabbitmq 通信,然后在 83 行调用了get_batch_notification_listener方法,方法中定义接收消息的 target 类型,已经消息过来之后的 endpoints ,这个 endpoints 我们知道里面有两中类型的数据,一种是采集的数据的收集,还有一种是 event 事件的收集,我们看下这个里面发生了什么

        def get_batch_notification_listener(transport, targets, endpoints,
                                            allow_requeue=False,
                                            batch_size=1, batch_timeout=None):
            """
                (Pdb) a
                transport = <oslo_messaging.transport.Transport object at 0x7fab5c6a9190>
                targets = [<Target exchange=ironic, topic=notifications>, <Target exchange=ceilometer, topic=notifications>, <Target exchange=nova, topic=notifications>, <Target exchange=cinder, topic=notifications>, <Target exchange=glance, topic=notifications>, <Target exchange=neutron, topic=notifications>, <Target exchange=heat, topic=notifications>, <Target exchange=keystone, topic=notifications>, <Target exchange=sahara, topic=notifications>, <Target exchange=trove, topic=notifications>, <Target exchange=zaqar, topic=notifications>, <Target exchange=swift, topic=notifications>, <Target exchange=magnum, topic=notifications>, <Target exchange=central, topic=notifications>]
                endpoints = [<ceilometer.event.endpoint.EventsNotificationEndpoint object at 0x7fab5c258c90>, <ceilometer.ipmi.notifications.ironic.TemperatureSensorNotification object at 0x7fab5c2a9610>, <ceilometer.telemetry.notifications.TelemetryIpc object at 0x7fab5c2b8110>, <ceilometer.ipmi.notifications.ironic.FanSensorNotification object at 0x7fab5c2b8510>, <ceilometer.ipmi.notifications.ironic.VoltageSensorNotification object at 0x7fab5c2b8810>, <ceilometer.meter.notifications.ProcessMeterNotifications object at 0x7fab5c2b8a90>, <ceilometer.ipmi.notifications.ironic.CurrentSensorNotification object at 0x7fab5c6be210>]
                allow_requeue = False
                batch_size = 1
                batch_timeout = None
            """
            return oslo_messaging.get_batch_notification_listener(
                transport, targets, endpoints, executor='threading',
                allow_requeue=allow_requeue,
                batch_size=batch_size, batch_timeout=batch_timeout)
        

        实际上调用还是oslo_messaging.get_batch_notification_listener 我们继续往里分析

        def get_batch_notification_listener(transport, targets, endpoints,
                                            executor='blocking', serializer=None,
                                            allow_requeue=False, pool=None,
                                            batch_size=None, batch_timeout=None):
            dispatcher = notify_dispatcher.BatchNotificationDispatcher(
                endpoints, serializer)
            return BatchNotificationServer(
                transport, targets, dispatcher, executor, allow_requeue, pool,
                batch_size, batch_timeout
            )
        

        看下BatchNotificationDispatcher的实现,没有 init 方法,调用基类NotificationDispatcher的 init 方法,即:

        PRIORITIES = ['audit', 'debug', 'info', 'warn', 'error', 'critical', 'sample']
        
        class NotificationDispatcher(dispatcher.DispatcherBase):
            def __init__(self, endpoints, serializer):
        
                self.endpoints = endpoints
                self.serializer = serializer or msg_serializer.NoOpSerializer()
        
                self._callbacks_by_priority = {}
                for endpoint, prio in itertools.product(endpoints, PRIORITIES):
                    if hasattr(endpoint, prio):
                        method = getattr(endpoint, prio)
                        screen = getattr(endpoint, 'filter_rule', None)
                        self._callbacks_by_priority.setdefault(prio, []).append(
                            (screen, method))
        

        这里用到了itertools.product(A,B),这个方法返回A、B中的元素的笛卡尔积的元组,以此遍历,如果某个 endpoint 中实现了此 prio 的属性,如果存在的话,即存进_callbacks_by_priority,大概格式是:

        'sample': [
        						( < oslo_messaging.notify.filter.NotificationFilter object at 0x7ff14c254590 > , < bound method TemperatureSensorNotification.sample of < ceilometer.ipmi.notifications.ironic.TemperatureSensorNotification object at 0x7ff14c254550 >> ), 
        						( < oslo_messaging.notify.filter.NotificationFilter object at 0x7ff14c260590 > , < bound method TelemetryIpc.sample of < ceilometer.telemetry.notifications.TelemetryIpc object at 0x7ff14c260150 >> )]
        

        列表中元祖格式(screen,method),其中 screen 是插件类的filter_rule属性,以TelemetryIpc为例,NotificationBase是其基类:

        @six.add_metaclass(abc.ABCMeta)
        class NotificationBase(PluginBase):
            """Base class for plugins that support the notification API."""
            def __init__(self, manager):
                super(NotificationBase, self).__init__()
                # NOTE(gordc): this is filter rule used by oslo.messaging to dispatch
                # messages to an endpoint.
                if self.event_types:
                    self.filter_rule = oslo_messaging.NotificationFilter(
                        event_type='|'.join(self.event_types))
                self.manager = manager
        

        NotificationFilter是用来筛选notification 服务接收到的消息的,可以根据 context,publisher_id, event_type, metadata and payload来筛选

        我们再回到get_batch_notification_listener,dispatcher分析完了之后,下面就是BatchNotificationServer类的实例

        继承关系
        BatchNotificationServer--->NotificationServerBase--->MessageHandlingServer
        

        这里面的初始化就是简单的定义一些参数,返回 listen 对象

        然后_configure_main_queue_listeners方法的第 83 行调用了 listen.start,这里就是调用MessageHandlingServer的 start 方法

            @ordered(reset_after='stop')
            def start(self, override_pool_size=None):
                if self._started:
                    LOG.warning('The server has already been started. Ignoring '
                                'the redundant call to start().')
                    return
        
                self._started = True
        
                executor_opts = {}
        
                if self.executor_type in ("threading", "eventlet"):
                    executor_opts["max_workers"] = (
                        override_pool_size or self.conf.executor_thread_pool_size
                    )
                self._work_executor = self._executor_cls(**executor_opts)
        
                try:
                    self.listener = self._create_listener()
                except driver_base.TransportDriverError as ex:
                    raise ServerListenError(self.target, ex)
        
                self.listener.start(self._on_incoming)
        

        这里主要看下 19 行和 23 行,第 19 行调用了_create_listener,实际上调用的是NotificationServerBase的方法

            def _create_listener(self):
                return self.transport._listen_for_notifications(
                    self._targets_priorities, self._pool, self._batch_size,
                    self._batch_timeout
                )
        

        我们看到这里又调用了_listen_for_notifications

            def _listen_for_notifications(self, targets_and_priorities, pool,
                                          batch_size, batch_timeout):
                for target, priority in targets_and_priorities:
                    if not target.topic:
                        raise exceptions.InvalidTarget('A target must have '
                                                       'topic specified',
                                                       target)
                return self._driver.listen_for_notifications(
                    targets_and_priorities, pool, batch_size, batch_timeout
                )
        

        继续往里分析

            def listen_for_notifications(self, targets_and_priorities, pool,
                                         batch_size, batch_timeout):
                conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
        
                listener = NotificationAMQPListener(self, conn)
                for target, priority in targets_and_priorities:
                    conn.declare_topic_consumer(
                        exchange_name=self._get_exchange(target),
                        topic='%s.%s' % (target.topic, priority),
                        callback=listener, queue_name=pool)
                return base.PollStyleListenerAdapter(listener, batch_size,
                                                     batch_timeout)
        

        这段代码主要是为了在要监听的队列上创建消费者进行监听,并初始化PollStyleListenerAdapter类实例并返回

        PollStyleListenerAdapter类对象初始化时会生成一个线程对象:

        File:oslo_messaging/_drivers/base.py:PollStyleListenerAdapter.init

        self._listen_thread = threading.Thread(target=self.____runner)

        然后调用start后就会生成一个线程运行___runner函数,该函数主要功能是不断的去获取消息,并通过调用_process_incoming函数来处理消息

        class PollStyleListenerAdapter(Listener):
            """A Listener that uses a PollStyleListener for message transfer. A
            dedicated thread is created to do message polling.
            """
        
            def __init__(self, poll_style_listener, batch_size, batch_timeout):
                super(PollStyleListenerAdapter, self).__init__(
                    batch_size, batch_timeout, poll_style_listener.prefetch_size
                )
                self._poll_style_listener = poll_style_listener
                self._listen_thread = threading.Thread(target=self._runner)
                self._listen_thread.daemon = True
                self._started = False
        
            def start(self, on_incoming_callback):
                super(PollStyleListenerAdapter, self).start(on_incoming_callback)
                self._started = True
                self._listen_thread.start()
        
            @excutils.forever_retry_uncaught_exceptions
            def _runner(self):
                while self._started:
                    incoming = self._poll_style_listener.poll(
                        batch_size=self.batch_size, batch_timeout=self.batch_timeout)
        
                    if incoming:
                        self.on_incoming_callback(incoming)
        
                # listener is stopped but we need to process all already consumed
                # messages
                while True:
                    incoming = self._poll_style_listener.poll(
                        batch_size=self.batch_size, batch_timeout=self.batch_timeout)
        
                    if not incoming:
                        return
                    self.on_incoming_callback(incoming)
        

        我们看到在MessageHandlingServer的 start 方法中的最后一行是 self.listener.start(self._on_incoming) 这里调用的就是上述代码中的 start,启动之后调用 runner,然后通过poll 不断从队列中取出数据,取出之后用self.on_incoming_callback(incoming)处理,即MessageHandlingServer中的self.__on_incoming处理

            def _on_incoming(self, incoming):
                """Handles on_incoming event
        
                :param incoming: incoming request.
                """
                self._work_executor.submit(self._process_incoming, incoming)
        

        我们看到这边实际使用的是self._process_incoming来处理,即采用BatchNotificationServer类中的self.__process_incoming处理

        class BatchNotificationServer(NotificationServerBase):
        
            def _process_incoming(self, incoming):
                try:
                    not_processed_messages = self.dispatcher.dispatch(incoming)
                except Exception:
        						......
        

        可以看到该处理函数会调用dispatcher对象来分派消息,这里的self.dispatcher,就是之前使用NotificationDispatcher初始化后传过来的参数,即调用

        class BatchNotificationDispatcher(NotificationDispatcher):
            """A message dispatcher which understands Notification messages.
        
            A MessageHandlingServer is constructed by passing a callable dispatcher
            which is invoked with a list of message dictionaries each time 'batch_size'
            messages are received or 'batch_timeout' seconds is reached.
            """
        
            def dispatch(self, incoming):
                """Dispatch notification messages to the appropriate endpoint method.
                """
        
                messages_grouped = itertools.groupby(sorted(
                    (self._extract_user_message(m) for m in incoming),
                    key=operator.itemgetter(0)), operator.itemgetter(0))
        
                requeues = set()
                for priority, messages in messages_grouped:
                    __, raw_messages, messages = six.moves.zip(*messages)
                    if priority not in PRIORITIES:
                        LOG.warning('Unknown priority "%s"', priority)
                        continue
                    for screen, callback in self._callbacks_by_priority.get(priority,
                                                                            []):
                        if screen:
                            filtered_messages = [message for message in messages
                                                 if screen.match(
                                                     message["ctxt"],
                                                     message["publisher_id"],
                                                     message["event_type"],
                                                     message["metadata"],
                                                     message["payload"])]
                        else:
                            filtered_messages = list(messages)
        
                        if not filtered_messages:
                            continue
        
                        ret = self._exec_callback(callback, filtered_messages)
                        if ret == NotificationResult.REQUEUE:
                            requeues.update(raw_messages)
                            break
                return requeues
        
            def _exec_callback(self, callback, messages):
                try:
                    return callback(messages)
                except Exception:
                    LOG.exception("Callback raised an exception.")
                    return NotificationResult.REQUEUE
        

        这里重点看通过消息的priority字段查找self._callbacks_by_priority字典里匹配的插件的对应方法,也即是获取到callback函数,然后进行调用。比如notifications.sample上的消息会匹配到TelemetryIpc类的sample方法去处理

        _sample = ceilometer.telemetry.notifications:TelemetryIpc
        

        但该类实例是调用了它父类的sample方法:ceilometer/agent/plugin_base.py:NotificationBase.sample

            def sample(self, notifications):
                self._process_notifications('sample', notifications)
              
            def _process_notifications(self, priority, notifications):
                for notification in notifications:
                    try:
                        notification = messaging.convert_to_old_notification_format(
                            priority, notification)
                        self.to_samples_and_publish(notification)
                    except Exception:
                        LOG.error(_LE('Fail to process notification'), exc_info=True)
        
            def to_samples_and_publish(self, notification):
                with self.manager.publisher() as p:
                    p(list(self.process_notification(notification)))
        

        可以看到这边以次往下调用_process_notifications–>to_samples_and_publish,这边比较重要的是第 14 行和第 15 行,第14 行调用了self.manager.publisher(),此处的 manager 是在插件初始化的时候传进来的,是PipelineManager类的实例,所以看下PipelineManager.publisher

            def publisher(self):
                return PublishContext(self.pipelines)
        

        参数self.pipelines是在init的时候生成的,看下PublishContext

        class PublishContext(object):
        
            def __init__(self, pipelines=None):
                pipelines = pipelines or []
                self.pipelines = set(pipelines)
        
            def add_pipelines(self, pipelines):
                self.pipelines.update(pipelines)
        
            def __enter__(self):
                def p(data):
                    for p in self.pipelines:
                        p.publish_data(data)
                return p
        
            def __exit__(self, exc_type, exc_value, traceback):
                for p in self.pipelines:
                    p.flush()
        

        因为这边使用 with 实现,所以看下 enter 方法,enter 返回的值与 as 后面的值绑定,data 参数就是list(self.process_notification(notification)),这里的 p 有两种类型,SamplePipeline和EventPipeline

        以SamplePipeline为例

            def publish_data(self, samples):
                if not isinstance(samples, list):
                    samples = [samples]
                supported = [s for s in samples if self.source.support_meter(s.name)
                             and self._validate_volume(s)]
                self.sink.publish_samples(supported)
        

        调用SampleSink的publish_samples方法

            def _publish_samples(self, start, samples):
                """Push samples into pipeline for publishing.
        
                :param start: The first transformer that the sample will be injected.
                              This is mainly for flush() invocation that transformer
                              may emit samples.
                :param samples: Sample list.
        
                """
        
                transformed_samples = []
                if not self.transformers:
                    transformed_samples = samples
                else:
                    for sample in samples:
                        LOG.debug(
                            "Pipeline %(pipeline)s: Transform sample "
                            "%(smp)s from %(trans)s transformer", {'pipeline': self,
                                                                   'smp': sample,
                                                                   'trans': start})
                        sample = self._transform_sample(start, sample)
                        if sample:
                            transformed_samples.append(sample)
        
                if transformed_samples:
                    for p in self.publishers:
                        try:
                            p.publish_samples(transformed_samples)
                        except Exception:
                            LOG.exception(_(
                                "Pipeline %(pipeline)s: Continue after error "
                                "from publisher %(pub)s") % ({'pipeline': self,
                                                              'pub': p}))
        
            def publish_samples(self, samples):
                self._publish_samples(0, samples)
        

        可以看到self._publish_samples方法被调用,在该方法中组装了transformed_samples之后,遍历self.publishers的publish_samples方法来处理数据,之前我们看到 publisher 里面放的是 notifier 插件,那么即调用SampleNotifierPublisher类,此类继承NotifierPublisher–>MessagingPublisher,即调用到MessagingPublisher的publish_samples方法

            def publish_samples(self, samples):
                """Publish samples on RPC.
        
                :param samples: Samples from pipeline after transformation.
        
                """
        
                meters = [
                    utils.meter_message_from_counter(
                        sample, cfg.CONF.publisher.telemetry_secret)
                    for sample in samples
                ]
                topic = cfg.CONF.publisher_notifier.metering_topic
                self.local_queue.append((topic, meters))
        
                if self.per_meter_topic:
                    for meter_name, meter_list in itertools.groupby(
                            sorted(meters, key=operator.itemgetter('counter_name')),
                            operator.itemgetter('counter_name')):
                        meter_list = list(meter_list)
                        topic_name = topic + '.' + meter_name
                        LOG.debug('Publishing %(m)d samples on %(n)s',
                                  {'m': len(meter_list), 'n': topic_name})
                        self.local_queue.append((topic_name, meter_list))
        
                self.flush()
        

        第 13 行的 topic 是 metering,最后执行了 flush,然后执行了_process_queue,其中有个send方法,将数据发出去

            def _send(self, event_type, data):
                try:
                    self.notifier.sample({}, event_type=event_type,
                                         payload=data)
                except oslo_messaging.MessageDeliveryFailure as e:
                    raise_delivery_failure(e)
        

        发到了 metering.sample 队列,之后的消息发送流程便和polling-compute 服务一样了,具体请见:polling-compute

参考:
https://www.cnblogs.com/luohaixian/p/11145939.html

 类似资料: