以社区 N 版代码为例
exec ceilometer-agent-notification --config-file /etc/ceilometer/ceilometer.conf
ceilometer代码使用setuptools的pbr管理,该部分知识请见:
Openstack中setuptools和pbr软件打包管理
入口在:/ceilometer/cmd/agent_notification.py的main函数
def main():
service.prepare_service()
sm = cotyledon.ServiceManager()
sm.add(notification.NotificationService,
workers=CONF.notification.workers)
sm.run()
该部分使用cotyledon的多进程框架实现
直接看NotificationService类的 run 方法
服务启动在:NotificationService.run
def run(self):
super(NotificationService, self).run()
self.shutdown = False
self.periodic = None
self.partition_coordinator = None
self.coord_lock = threading.Lock()
self.listeners = []
# NOTE(kbespalov): for the pipeline queues used a single amqp host
# hence only one listener is required
self.pipeline_listener = None
"""
(Pdb) self.pipeline_manager.__dict__
{
'cfg_mtime': 1592551325.2732353,
'cfg_hash': 'ba1311798d634022e4b684b72bf7b42a',
'cfg_loc': '/etc/ceilometer/pipeline.yaml',
'pipelines': [ < ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3ad0 > , < ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3290 > , < ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3b90 > , < ceilometer.pipeline.SamplePipeline object at 0x7fa1741e39d0 > , < ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3950 > , < ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3a10 > , < ceilometer.pipeline.SamplePipeline object at 0x7fa1741e37d0 > ]
}
(Pdb) self.pipeline_manager.pipelines[0].__dict__
{
'source': < ceilometer.pipeline.SampleSource object at 0x7fa17405e050 > ,
'sink': < ceilometer.pipeline.SampleSink object at 0x7fa17405e350 > ,
'name': 'notification_source:notification_sink'
}
(Pdb) self.pipeline_manager.pipelines[0].source.__dict__
{
'name': 'notification_source',
'cfg': {
'interval': 3600,
'meters': ['instance', 'volume', 'image', 'snapshot', 'backup', 'ip.floating', 'router', 'network', 'subnet', 'account'],
'name': 'notification_source',
'sinks': ['notification_sink']
},
'interval': 3600,
'meters': ['instance', 'volume', 'image', 'snapshot', 'backup', 'ip.floating', 'router', 'network', 'subnet', 'account'],
'discovery': [],
'resources': [],
'sinks': ['notification_sink']
}
(Pdb) self.pipeline_manager.pipelines[0].sink.__dict__
{
'publishers': [ < ceilometer.publisher.messaging.SampleNotifierPublisher object at 0x7fa17405e410 > ],
'transformers': [],
'name': 'notification_sink',
'cfg': {
'publishers': ['notifier://'],
'transformers': None,
'name': 'notification_sink'
},
'multi_publish': False,
'transformer_cfg': []
}
"""
self.pipeline_manager = pipeline.setup_pipeline()
self.event_pipeline_manager = pipeline.setup_event_pipeline()
self.transport = messaging.get_transport()
if cfg.CONF.notification.workload_partitioning:
self.group_id = self.NOTIFICATION_NAMESPACE
self.partition_coordinator = coordination.PartitionCoordinator()
self.partition_coordinator.start()
else:
# FIXME(sileht): endpoint uses the notification_topics option
# and it should not because this is an oslo_messaging option
# not a ceilometer. Until we have something to get the
# notification_topics in another way, we must create a transport
# to ensure the option has been registered by oslo_messaging.
messaging.get_notifier(self.transport, '')
self.group_id = None
# 该函数里判断了是否支持工作负载,如果是则返回SamplePipelineTransportManager类实例替换掉
# PipelineManager类实例,否则还是PipelineManager类实例
# 这两者的区别在于publisher函数实现是不一样的
# SamplePipelineTransportManager的在调用publisher函数时会再发到消息队列中去保存,之后会再取出来处理再发到gnocchi-api上
# PipelineManager的则直接去发到gnocchi-api服务上去了
self.pipe_manager = self._get_pipe_manager(self.transport,
self.pipeline_manager)
self.event_pipe_manager = self._get_event_pipeline_manager(
self.transport)
self._configure_main_queue_listeners(self.pipe_manager,
self.event_pipe_manager)
if cfg.CONF.notification.workload_partitioning:
......
if not cfg.CONF.notification.disable_non_metric_meters:
LOG.warning(_LW('Non-metric meters may be collected. It is highly '
'advisable to disable these meters using '
'ceilometer.conf or the pipeline.yaml'))
self.init_pipeline_refresh()
以上的代码中的一些参数我都用pdb打印出来了,可以更加直观的感受,我们来逐行分析一下服务启动的过程:
前 12 主要是一些参数的定义和初始化的工作
看下第 61 行和 62 行两部分,实现的逻辑是一样的,我们来分析一下 61 行:
def setup_pipeline(transformer_manager=None):
"""Setup pipeline manager according to yaml config file."""
default = extension.ExtensionManager('ceilometer.transformer')
cfg_file = cfg.CONF.pipeline_cfg_file
return PipelineManager(cfg_file, transformer_manager or default,
SAMPLE_TYPE)
第4行加载了 ceilometer.transformer 命名空间中的插件
ceilometer.transformer =
accumulator = ceilometer.transformer.accumulator:TransformerAccumulator
delta = ceilometer.transformer.conversions:DeltaTransformer
unit_conversion = ceilometer.transformer.conversions:ScalingTransformer
rate_of_change = ceilometer.transformer.conversions:RateOfChangeTransformer
aggregator = ceilometer.transformer.conversions:AggregatorTransformer
arithmetic = ceilometer.transformer.arithmetic:ArithmeticTransformer
第 5 行的文件是 pipeline.yaml ,然后使用 PipelineManager 类进行初始化
我们来看下 PipelineManager 类初始化的内容,同样的我也加了 pdb 调试出来,更加直观:
class PipelineManager(ConfigManagerBase):
def __init__(self, cfg_info, transformer_manager, p_type=SAMPLE_TYPE):
super(PipelineManager, self).__init__()
cfg = self.load_config(cfg_info)
self.pipelines = []
if not ('sources' in cfg and 'sinks' in cfg):
raise PipelineException("Both sources & sinks are required",
cfg)
LOG.info(_LI('detected decoupled pipeline config format'))
unique_names = set()
sources = []
"""
pipeline.yaml
(Pdb) cfg.get('sources')
[{
'interval': 3600,
'meters': ['instance', 'volume', 'image', 'snapshot', 'backup', 'ip.floating', 'router', 'network', 'subnet', 'account'],
'name': 'notification_source',
'sinks': ['notification_sink']
}, {
'interval': 300,
'meters': ['poll.*', 'memory.usage', 'memory.util'],
'name': 'meter_source',
'sinks': ['meter_sink']
}]
event_pipeline.yaml
(Pdb) cfg.get('sources')
[{
'sinks': ['event_sink'],
'name': 'event_source',
'events': ['*', '!magnum.bay.metrics.update', '!compute.instance.exists', '!volume.exists', '!snapshot.exists']
}]
"""
for s in cfg.get('sources'):
name = s.get('name')
if name in unique_names:
raise PipelineException("Duplicated source names: %s" %
name, self)
else:
unique_names.add(name)
"""
pipeline.yaml
(Pdb) sources
[<ceilometer.pipeline.SampleSource object at 0x7fa17405e050>, <ceilometer.pipeline.SampleSource object at 0x7fa17405e110>, <ceilometer.pipeline.SampleSource object at 0x7fa17405e090>, <ceilometer.pipeline.SampleSource object at 0x7fa17405e190>, <ceilometer.pipeline.SampleSource object at 0x7fa17405e250>, <ceilometer.pipeline.SampleSource object at 0x7fa17405e210>, <ceilometer.pipeline.SampleSource object at 0x7fa17405e310>]
(Pdb) sources[0].__dict__
{
'name': 'notification_source',
'cfg': {
'interval': 3600,
'meters': ['instance', 'volume', 'image', 'snapshot', 'backup', 'ip.floating', 'router', 'network', 'subnet', 'account'],
'name': 'notification_source',
'sinks': ['notification_sink']
},
'interval': 3600,
'meters': ['instance', 'volume', 'image', 'snapshot', 'backup', 'ip.floating', 'router', 'network', 'subnet', 'account'],
'discovery': [],
'resources': [],
'sinks': ['notification_sink']
}
(Pdb) sources[1].__dict__
{'name': 'meter_source', 'cfg': {'interval': 300, 'meters': ['poll.*', 'memory.usage', 'memory.util'], 'name': 'meter_source', 'sinks': ['meter_sink']}, 'interval': 300, 'meters': ['poll.*', 'memory.usage', 'memory.util'], 'discovery': [], 'resources': [], 'sinks': ['meter_sink']}
event_pipeline.yaml
{
'cfg': {
'sinks': ['event_sink'],
'name': 'event_source',
'events': ['*', '!magnum.bay.metrics.update', '!compute.instance.exists', '!volume.exists', '!snapshot.exists']
},
'events': ['*', '!magnum.bay.metrics.update', '!compute.instance.exists', '!volume.exists', '!snapshot.exists'],
'name': 'event_source',
'sinks': ['event_sink']
}
"""
sources.append(p_type['source'](s))
unique_names.clear()
sinks = {}
"""
pipeline.yaml
(Pdb) cfg.get('sinks')
[{
'publishers': ['notifier://'],
'transformers': None,
'name': 'notification_sink'
}, {
'publishers': ['notifier://'],
'transformers': None,
'name': 'meter_sink'
}]
event_pipeline.yaml
(Pdb) cfg.get('sinks')
[{'publishers': ['notifier://'], 'transformers': None, 'name': 'event_sink'}]
"""
for s in cfg.get('sinks'):
name = s.get('name')
if name in unique_names:
raise PipelineException("Duplicated sink names: %s" %
name, self)
else:
unique_names.add(name)
"""
(Pdb) sinks
{'network_sink': <ceilometer.pipeline.SampleSink object at 0x7fa1741e3350>, 'volume_sink': <ceilometer.pipeline.SampleSink object at 0x7fa1741dbfd0>, 'disk_sink': <ceilometer.pipeline.SampleSink object at 0x7fa1741dbc50>, 'cpu_delta_sink': <ceilometer.pipeline.SampleSink object at 0x7fa17405e290>, 'cpu_sink': <ceilometer.pipeline.SampleSink object at 0x7fa17405e390>, 'meter_sink': <ceilometer.pipeline.SampleSink object at 0x7fa17405e3d0>, 'notification_sink': <ceilometer.pipeline.SampleSink object at 0x7fa17405e350>}
(Pdb) sinks.get('network_sink').__dict__
{
'publishers': [ < ceilometer.publisher.messaging.SampleNotifierPublisher object at 0x7fa1741e3890 > ],
'transformers': [ < ceilometer.transformer.conversions.RateOfChangeTransformer object at 0x7fa1741e3710 > ],
'name': 'network_sink',
'cfg': {
'publishers': ['notifier://'],
'transformers': [{
'name': 'rate_of_change',
'parameters': {
'source': {
'map_from': {
'name': 'network\\.(incoming|outgoing)\\.(bytes|packets)',
'unit': '(B|packet)'
}
},
'target': {
'map_to': {
'name': 'network.\\1.\\2.rate',
'unit': '\\1/s'
},
'type': 'gauge'
}
}
}],
'name': 'network_sink'
},
'multi_publish': False,
'transformer_cfg': [{
'name': 'rate_of_change',
'parameters': {
'source': {
'map_from': {
'name': 'network\\.(incoming|outgoing)\\.(bytes|packets)',
'unit': '(B|packet)'
}
},
'target': {
'map_to': {
'name': 'network.\\1.\\2.rate',
'unit': '\\1/s'
},
'type': 'gauge'
}
}
}]
}
"""
sinks[s['name']] = p_type['sink'](s, transformer_manager)
unique_names.clear()
for source in sources:
source.check_sinks(sinks)
for target in source.sinks:
pipe = p_type['pipeline'](source, sinks[target])
if pipe.name in unique_names:
raise PipelineException(
"Duplicate pipeline name: %s. Ensure pipeline"
" names are unique. (name is the source and sink"
" names combined)" % pipe.name, cfg)
else:
unique_names.add(pipe.name)
self.pipelines.append(pipe)
unique_names.clear()
def publisher(self):
"""Build a new Publisher for these manager pipelines.
:param context: The context.
"""
return PublishContext(self.pipelines)
第 37-79 行是取出 pipeline.yaml 文件中的 sources然后经过p_type[‘source’]类的初始化,再放进 sources 对象里,那么p_type[‘source’]类是什么呢,如下所示,代表SampleSource类,这个类里面就是解析下配置文件中每个资源的采集周期interval和采集插件 meters
SAMPLE_TYPE = {'pipeline': SamplePipeline,
'source': SampleSource,
'sink': SampleSink}
EVENT_TYPE = {'pipeline': EventPipeline,
'source': EventSource,
'sink': EventSink}
同样的,第 98-158 行是为了取出配置文件中的 sinks 对象,使用SampleSink类进行初始化,这个类本身没有 init 函数,所以使用此类的基类进行初始化,即 Sink 类
class Sink(object):
def __init__(self, cfg, transformer_manager):
self.cfg = cfg
try:
self.name = cfg['name']
# It's legal to have no transformer specified
self.transformer_cfg = cfg.get('transformers') or []
except KeyError as err:
raise PipelineException(
"Required field %s not specified" % err.args[0], cfg)
if not cfg.get('publishers'):
raise PipelineException("No publisher specified", cfg)
self.publishers = []
for p in cfg['publishers']:
if '://' not in p:
# Support old format without URL
p = p + "://"
try:
self.publishers.append(publisher.get_publisher(p,
self.NAMESPACE))
except Exception:
LOG.exception(_("Unable to load publisher %s"), p)
self.multi_publish = True if len(self.publishers) > 1 else False
self.transformers = self._setup_transformers(cfg, transformer_manager)
这里面主要看下第 22 行和第 28 行,分析:
我们接着来分析PipelineManager,然后第 160-185 行,就是遍历在以上分析配置文件之后存储的值 sources 和 sinks,拿出 sources 中每个 source,然后再拿出 source 中的每个 sink,使用SamplePipeline类初始化之后,放进self.pipelines列表,该类没有 init 方法,所以调用基类Pipeline
@six.add_metaclass(abc.ABCMeta)
class Pipeline(object):
"""Represents a coupling between a sink and a corresponding source."""
def __init__(self, source, sink):
self.source = source
self.sink = sink
self.name = str(self)
最终在对象存储的数据是:
pipeline.yaml
(Pdb) self.pipelines
[<ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3ad0>, <ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3290>, <ceilometer.pipeline.SamplePipeline object at 0x7fa1741e3b90>, <ceilometer.pipeline.SamplePipeline object at 0x7fa1741e39d0>]
(Pdb) self.pipelines[0].__dict__
{'source': <ceilometer.pipeline.SampleSource object at 0x7fc20c082650>, 'sink': <ceilometer.pipeline.SampleSink object at 0x7fc20c082050>, 'name': 'notification_source:notification_sink'}
event_pipeline.yaml
(Pdb) self.pipelines
[<ceilometer.pipeline.EventPipeline object at 0x7fa1741e3d10>]
(Pdb) self.pipelines[0].__dict__
{'source': <ceilometer.pipeline.EventSource object at 0x7fa1741e3dd0>, 'sink': <ceilometer.pipeline.EventSink object at 0x7fa1741e3ed0>, 'name': 'event:event_source:event_sink'}
"""
再继续 NotificationService.run 方法,61 行已经分析完毕,62 行和 61 行逻辑一致
第 79-82 行的分析见代码中
最终的是第 89 行的代码,调用了 _configure_main_queue_listeners 方法:
def _configure_main_queue_listeners(self, pipe_manager,
event_pipe_manager):
""""""
"""
(Pdb) notification_manager.extensions[0].__dict__
{
'obj': < ceilometer.network.notifications.Firewall object at 0x7fa1741e3d50 > ,
'entry_point': EntryPoint.parse('network.services.firewall = ceilometer.network.notifications:Firewall'),
'name': 'network.services.firewall',
'plugin': < class 'ceilometer.network.notifications.Firewall' >
}
"""
notification_manager = self._get_notifications_manager(pipe_manager)
if not list(notification_manager):
LOG.warning(_('Failed to load any notification handlers for %s'),
self.NOTIFICATION_NAMESPACE)
ack_on_error = cfg.CONF.notification.ack_on_event_error
endpoints = []
"""
(Pdb) endpoints
[<ceilometer.event.endpoint.EventsNotificationEndpoint object at 0x2f5f610>]
(Pdb) endpoints[0].__dict__
{
'event_converter': < ceilometer.event.converter.NotificationEventsConverter object at 0x7fa17467c050 > ,
'manager': < ceilometer.pipeline.PipelineManager object at 0x2f6d550 >
}
(Pdb) endpoints[0].manager.__dict__
{
'cfg_mtime': 1592551325.2732353,
'cfg_hash': 'c01e8ee1c40b66314628536afcd48a39',
'cfg_loc': '/etc/ceilometer/event_pipeline.yaml',
'pipelines': [ < ceilometer.pipeline.EventPipeline object at 0x7fa1741e3d10 > ]
}
"""
endpoints.append(
event_endpoint.EventsNotificationEndpoint(event_pipe_manager))
targets = []
for ext in notification_manager:
handler = ext.obj
"""
(Pdb) cfg.CONF.oslo_messaging_notifications.topics
['notifications']
(Pdb) cfg.CONF.notification.disable_non_metric_meters
True
"""
if (cfg.CONF.notification.disable_non_metric_meters and
isinstance(handler, base.NonMetricNotificationBase)):
continue
LOG.debug('Event types from %(name)s: %(type)s'
' (ack_on_error=%(error)s)',
{'name': ext.name,
'type': ', '.join(handler.event_types),
'error': ack_on_error})
# NOTE(gordc): this could be a set check but oslo_messaging issue
# https://bugs.launchpad.net/oslo.messaging/+bug/1398511
# This ensures we don't create multiple duplicate consumers.
for new_tar in handler.get_targets(cfg.CONF):
if new_tar not in targets:
targets.append(new_tar)
endpoints.append(handler)
"""
(Pdb) endpoints
[<ceilometer.event.endpoint.EventsNotificationEndpoint object at 0x2f5f610>,
<ceilometer.ipmi.notifications.ironic.TemperatureSensorNotification object at 0x7fa1741f6810>,
<ceilometer.telemetry.notifications.TelemetryIpc object at 0x7fa17424d410>,
<ceilometer.ipmi.notifications.ironic.FanSensorNotification object at 0x7fa17424d8d0>,
<ceilometer.ipmi.notifications.ironic.VoltageSensorNotification object at 0x7fa1742461d0>,
<ceilometer.meter.notifications.ProcessMeterNotifications object at 0x7fa17424dc90>,
<ceilometer.ipmi.notifications.ironic.CurrentSensorNotification object at 0x7fa17467ddd0>]
(Pdb) cfg.CONF.notification.messaging_urls
['rabbit://rabbitmq:I5dZs2KN@rabbitmq.openstack.svc.cluster.local:5672/']
"""
urls = cfg.CONF.notification.messaging_urls or [None]
for url in urls:
transport = messaging.get_transport(url)
# NOTE(gordc): ignore batching as we want pull
# to maintain sequencing as much as possible.
listener = messaging.get_batch_notification_listener(
transport, targets, endpoints)
listener.start()
self.listeners.append(listener)
第 13 行,里面主要是加载了ceilometer.notification命名空间的插件,并将pipe_manager作为自动加载extension时传入的参数
第 37 行,定义了当其余组件触发事件的时候的 endpoints,此处我们稍后分析
第 40-64 行,遍历刚在 13 行加载的插件,里面大概的逻辑是,先判断该插件的对象是不是 NonMetricNotificationBase 类的子类,如果是的话,直接跳过,如果不是的话,调用插件的 get-target 方法,生成oslo_messaging.Target实例,topic 是 notifications,然后将插件也放进 endpoints 中
第 78 行到最后就是开启服务的最后了,首先我们看下这个 urls 是什么:
(Pdb) cfg.CONF.notification.messaging_urls
['rabbit://rabbitmq:I5dZs2KN@rabbitmq.openstack.svc.cluster.local:5672/']
使用 rabbitmq 通信,然后在 83 行调用了get_batch_notification_listener方法,方法中定义接收消息的 target 类型,已经消息过来之后的 endpoints ,这个 endpoints 我们知道里面有两中类型的数据,一种是采集的数据的收集,还有一种是 event 事件的收集,我们看下这个里面发生了什么
def get_batch_notification_listener(transport, targets, endpoints,
allow_requeue=False,
batch_size=1, batch_timeout=None):
"""
(Pdb) a
transport = <oslo_messaging.transport.Transport object at 0x7fab5c6a9190>
targets = [<Target exchange=ironic, topic=notifications>, <Target exchange=ceilometer, topic=notifications>, <Target exchange=nova, topic=notifications>, <Target exchange=cinder, topic=notifications>, <Target exchange=glance, topic=notifications>, <Target exchange=neutron, topic=notifications>, <Target exchange=heat, topic=notifications>, <Target exchange=keystone, topic=notifications>, <Target exchange=sahara, topic=notifications>, <Target exchange=trove, topic=notifications>, <Target exchange=zaqar, topic=notifications>, <Target exchange=swift, topic=notifications>, <Target exchange=magnum, topic=notifications>, <Target exchange=central, topic=notifications>]
endpoints = [<ceilometer.event.endpoint.EventsNotificationEndpoint object at 0x7fab5c258c90>, <ceilometer.ipmi.notifications.ironic.TemperatureSensorNotification object at 0x7fab5c2a9610>, <ceilometer.telemetry.notifications.TelemetryIpc object at 0x7fab5c2b8110>, <ceilometer.ipmi.notifications.ironic.FanSensorNotification object at 0x7fab5c2b8510>, <ceilometer.ipmi.notifications.ironic.VoltageSensorNotification object at 0x7fab5c2b8810>, <ceilometer.meter.notifications.ProcessMeterNotifications object at 0x7fab5c2b8a90>, <ceilometer.ipmi.notifications.ironic.CurrentSensorNotification object at 0x7fab5c6be210>]
allow_requeue = False
batch_size = 1
batch_timeout = None
"""
return oslo_messaging.get_batch_notification_listener(
transport, targets, endpoints, executor='threading',
allow_requeue=allow_requeue,
batch_size=batch_size, batch_timeout=batch_timeout)
实际上调用还是oslo_messaging.get_batch_notification_listener 我们继续往里分析
def get_batch_notification_listener(transport, targets, endpoints,
executor='blocking', serializer=None,
allow_requeue=False, pool=None,
batch_size=None, batch_timeout=None):
dispatcher = notify_dispatcher.BatchNotificationDispatcher(
endpoints, serializer)
return BatchNotificationServer(
transport, targets, dispatcher, executor, allow_requeue, pool,
batch_size, batch_timeout
)
看下BatchNotificationDispatcher的实现,没有 init 方法,调用基类NotificationDispatcher的 init 方法,即:
PRIORITIES = ['audit', 'debug', 'info', 'warn', 'error', 'critical', 'sample']
class NotificationDispatcher(dispatcher.DispatcherBase):
def __init__(self, endpoints, serializer):
self.endpoints = endpoints
self.serializer = serializer or msg_serializer.NoOpSerializer()
self._callbacks_by_priority = {}
for endpoint, prio in itertools.product(endpoints, PRIORITIES):
if hasattr(endpoint, prio):
method = getattr(endpoint, prio)
screen = getattr(endpoint, 'filter_rule', None)
self._callbacks_by_priority.setdefault(prio, []).append(
(screen, method))
这里用到了itertools.product(A,B),这个方法返回A、B中的元素的笛卡尔积的元组,以此遍历,如果某个 endpoint 中实现了此 prio 的属性,如果存在的话,即存进_callbacks_by_priority,大概格式是:
'sample': [
( < oslo_messaging.notify.filter.NotificationFilter object at 0x7ff14c254590 > , < bound method TemperatureSensorNotification.sample of < ceilometer.ipmi.notifications.ironic.TemperatureSensorNotification object at 0x7ff14c254550 >> ),
( < oslo_messaging.notify.filter.NotificationFilter object at 0x7ff14c260590 > , < bound method TelemetryIpc.sample of < ceilometer.telemetry.notifications.TelemetryIpc object at 0x7ff14c260150 >> )]
列表中元祖格式(screen,method),其中 screen 是插件类的filter_rule属性,以TelemetryIpc为例,NotificationBase是其基类:
@six.add_metaclass(abc.ABCMeta)
class NotificationBase(PluginBase):
"""Base class for plugins that support the notification API."""
def __init__(self, manager):
super(NotificationBase, self).__init__()
# NOTE(gordc): this is filter rule used by oslo.messaging to dispatch
# messages to an endpoint.
if self.event_types:
self.filter_rule = oslo_messaging.NotificationFilter(
event_type='|'.join(self.event_types))
self.manager = manager
NotificationFilter是用来筛选notification 服务接收到的消息的,可以根据 context,publisher_id, event_type, metadata and payload来筛选
我们再回到get_batch_notification_listener,dispatcher分析完了之后,下面就是BatchNotificationServer类的实例
继承关系
BatchNotificationServer--->NotificationServerBase--->MessageHandlingServer
这里面的初始化就是简单的定义一些参数,返回 listen 对象
然后_configure_main_queue_listeners方法的第 83 行调用了 listen.start,这里就是调用MessageHandlingServer的 start 方法
@ordered(reset_after='stop')
def start(self, override_pool_size=None):
if self._started:
LOG.warning('The server has already been started. Ignoring '
'the redundant call to start().')
return
self._started = True
executor_opts = {}
if self.executor_type in ("threading", "eventlet"):
executor_opts["max_workers"] = (
override_pool_size or self.conf.executor_thread_pool_size
)
self._work_executor = self._executor_cls(**executor_opts)
try:
self.listener = self._create_listener()
except driver_base.TransportDriverError as ex:
raise ServerListenError(self.target, ex)
self.listener.start(self._on_incoming)
这里主要看下 19 行和 23 行,第 19 行调用了_create_listener,实际上调用的是NotificationServerBase的方法
def _create_listener(self):
return self.transport._listen_for_notifications(
self._targets_priorities, self._pool, self._batch_size,
self._batch_timeout
)
我们看到这里又调用了_listen_for_notifications
def _listen_for_notifications(self, targets_and_priorities, pool,
batch_size, batch_timeout):
for target, priority in targets_and_priorities:
if not target.topic:
raise exceptions.InvalidTarget('A target must have '
'topic specified',
target)
return self._driver.listen_for_notifications(
targets_and_priorities, pool, batch_size, batch_timeout
)
继续往里分析
def listen_for_notifications(self, targets_and_priorities, pool,
batch_size, batch_timeout):
conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
listener = NotificationAMQPListener(self, conn)
for target, priority in targets_and_priorities:
conn.declare_topic_consumer(
exchange_name=self._get_exchange(target),
topic='%s.%s' % (target.topic, priority),
callback=listener, queue_name=pool)
return base.PollStyleListenerAdapter(listener, batch_size,
batch_timeout)
这段代码主要是为了在要监听的队列上创建消费者进行监听,并初始化PollStyleListenerAdapter类实例并返回
PollStyleListenerAdapter类对象初始化时会生成一个线程对象:
File:oslo_messaging/_drivers/base.py:PollStyleListenerAdapter.init
self._listen_thread = threading.Thread(target=self.____runner)
然后调用start后就会生成一个线程运行___runner函数,该函数主要功能是不断的去获取消息,并通过调用_process_incoming函数来处理消息
class PollStyleListenerAdapter(Listener):
"""A Listener that uses a PollStyleListener for message transfer. A
dedicated thread is created to do message polling.
"""
def __init__(self, poll_style_listener, batch_size, batch_timeout):
super(PollStyleListenerAdapter, self).__init__(
batch_size, batch_timeout, poll_style_listener.prefetch_size
)
self._poll_style_listener = poll_style_listener
self._listen_thread = threading.Thread(target=self._runner)
self._listen_thread.daemon = True
self._started = False
def start(self, on_incoming_callback):
super(PollStyleListenerAdapter, self).start(on_incoming_callback)
self._started = True
self._listen_thread.start()
@excutils.forever_retry_uncaught_exceptions
def _runner(self):
while self._started:
incoming = self._poll_style_listener.poll(
batch_size=self.batch_size, batch_timeout=self.batch_timeout)
if incoming:
self.on_incoming_callback(incoming)
# listener is stopped but we need to process all already consumed
# messages
while True:
incoming = self._poll_style_listener.poll(
batch_size=self.batch_size, batch_timeout=self.batch_timeout)
if not incoming:
return
self.on_incoming_callback(incoming)
我们看到在MessageHandlingServer的 start 方法中的最后一行是 self.listener.start(self._on_incoming) 这里调用的就是上述代码中的 start,启动之后调用 runner,然后通过poll 不断从队列中取出数据,取出之后用self.on_incoming_callback(incoming)处理,即MessageHandlingServer中的self.__on_incoming处理
def _on_incoming(self, incoming):
"""Handles on_incoming event
:param incoming: incoming request.
"""
self._work_executor.submit(self._process_incoming, incoming)
我们看到这边实际使用的是self._process_incoming来处理,即采用BatchNotificationServer类中的self.__process_incoming处理
class BatchNotificationServer(NotificationServerBase):
def _process_incoming(self, incoming):
try:
not_processed_messages = self.dispatcher.dispatch(incoming)
except Exception:
......
可以看到该处理函数会调用dispatcher对象来分派消息,这里的self.dispatcher,就是之前使用NotificationDispatcher初始化后传过来的参数,即调用
class BatchNotificationDispatcher(NotificationDispatcher):
"""A message dispatcher which understands Notification messages.
A MessageHandlingServer is constructed by passing a callable dispatcher
which is invoked with a list of message dictionaries each time 'batch_size'
messages are received or 'batch_timeout' seconds is reached.
"""
def dispatch(self, incoming):
"""Dispatch notification messages to the appropriate endpoint method.
"""
messages_grouped = itertools.groupby(sorted(
(self._extract_user_message(m) for m in incoming),
key=operator.itemgetter(0)), operator.itemgetter(0))
requeues = set()
for priority, messages in messages_grouped:
__, raw_messages, messages = six.moves.zip(*messages)
if priority not in PRIORITIES:
LOG.warning('Unknown priority "%s"', priority)
continue
for screen, callback in self._callbacks_by_priority.get(priority,
[]):
if screen:
filtered_messages = [message for message in messages
if screen.match(
message["ctxt"],
message["publisher_id"],
message["event_type"],
message["metadata"],
message["payload"])]
else:
filtered_messages = list(messages)
if not filtered_messages:
continue
ret = self._exec_callback(callback, filtered_messages)
if ret == NotificationResult.REQUEUE:
requeues.update(raw_messages)
break
return requeues
def _exec_callback(self, callback, messages):
try:
return callback(messages)
except Exception:
LOG.exception("Callback raised an exception.")
return NotificationResult.REQUEUE
这里重点看通过消息的priority字段查找self._callbacks_by_priority字典里匹配的插件的对应方法,也即是获取到callback函数,然后进行调用。比如notifications.sample上的消息会匹配到TelemetryIpc类的sample方法去处理
_sample = ceilometer.telemetry.notifications:TelemetryIpc
但该类实例是调用了它父类的sample方法:ceilometer/agent/plugin_base.py:NotificationBase.sample
def sample(self, notifications):
self._process_notifications('sample', notifications)
def _process_notifications(self, priority, notifications):
for notification in notifications:
try:
notification = messaging.convert_to_old_notification_format(
priority, notification)
self.to_samples_and_publish(notification)
except Exception:
LOG.error(_LE('Fail to process notification'), exc_info=True)
def to_samples_and_publish(self, notification):
with self.manager.publisher() as p:
p(list(self.process_notification(notification)))
可以看到这边以次往下调用_process_notifications–>to_samples_and_publish,这边比较重要的是第 14 行和第 15 行,第14 行调用了self.manager.publisher(),此处的 manager 是在插件初始化的时候传进来的,是PipelineManager类的实例,所以看下PipelineManager.publisher
def publisher(self):
return PublishContext(self.pipelines)
参数self.pipelines是在init的时候生成的,看下PublishContext
class PublishContext(object):
def __init__(self, pipelines=None):
pipelines = pipelines or []
self.pipelines = set(pipelines)
def add_pipelines(self, pipelines):
self.pipelines.update(pipelines)
def __enter__(self):
def p(data):
for p in self.pipelines:
p.publish_data(data)
return p
def __exit__(self, exc_type, exc_value, traceback):
for p in self.pipelines:
p.flush()
因为这边使用 with 实现,所以看下 enter 方法,enter 返回的值与 as 后面的值绑定,data 参数就是list(self.process_notification(notification)),这里的 p 有两种类型,SamplePipeline和EventPipeline
以SamplePipeline为例
def publish_data(self, samples):
if not isinstance(samples, list):
samples = [samples]
supported = [s for s in samples if self.source.support_meter(s.name)
and self._validate_volume(s)]
self.sink.publish_samples(supported)
调用SampleSink的publish_samples方法
def _publish_samples(self, start, samples):
"""Push samples into pipeline for publishing.
:param start: The first transformer that the sample will be injected.
This is mainly for flush() invocation that transformer
may emit samples.
:param samples: Sample list.
"""
transformed_samples = []
if not self.transformers:
transformed_samples = samples
else:
for sample in samples:
LOG.debug(
"Pipeline %(pipeline)s: Transform sample "
"%(smp)s from %(trans)s transformer", {'pipeline': self,
'smp': sample,
'trans': start})
sample = self._transform_sample(start, sample)
if sample:
transformed_samples.append(sample)
if transformed_samples:
for p in self.publishers:
try:
p.publish_samples(transformed_samples)
except Exception:
LOG.exception(_(
"Pipeline %(pipeline)s: Continue after error "
"from publisher %(pub)s") % ({'pipeline': self,
'pub': p}))
def publish_samples(self, samples):
self._publish_samples(0, samples)
可以看到self._publish_samples方法被调用,在该方法中组装了transformed_samples之后,遍历self.publishers的publish_samples方法来处理数据,之前我们看到 publisher 里面放的是 notifier 插件,那么即调用SampleNotifierPublisher类,此类继承NotifierPublisher–>MessagingPublisher,即调用到MessagingPublisher的publish_samples方法
def publish_samples(self, samples):
"""Publish samples on RPC.
:param samples: Samples from pipeline after transformation.
"""
meters = [
utils.meter_message_from_counter(
sample, cfg.CONF.publisher.telemetry_secret)
for sample in samples
]
topic = cfg.CONF.publisher_notifier.metering_topic
self.local_queue.append((topic, meters))
if self.per_meter_topic:
for meter_name, meter_list in itertools.groupby(
sorted(meters, key=operator.itemgetter('counter_name')),
operator.itemgetter('counter_name')):
meter_list = list(meter_list)
topic_name = topic + '.' + meter_name
LOG.debug('Publishing %(m)d samples on %(n)s',
{'m': len(meter_list), 'n': topic_name})
self.local_queue.append((topic_name, meter_list))
self.flush()
第 13 行的 topic 是 metering,最后执行了 flush,然后执行了_process_queue,其中有个send方法,将数据发出去
def _send(self, event_type, data):
try:
self.notifier.sample({}, event_type=event_type,
payload=data)
except oslo_messaging.MessageDeliveryFailure as e:
raise_delivery_failure(e)
发到了 metering.sample 队列,之后的消息发送流程便和polling-compute 服务一样了,具体请见:polling-compute
参考:
https://www.cnblogs.com/luohaixian/p/11145939.html