以社区 N 版代码为例
exec ceilometer-polling --polling-namespaces compute --config-file /etc/ceilometer/ceilometer.conf
ceilometer代码使用setuptools的pbr管理,该部分知识请见:
Openstack中setuptools和pbr软件打包管理
入口在:/ceilometer/cmd/polling.py的main函数
def create_polling_service(worker_id):
return manager.AgentManager(CONF.polling_namespaces,
CONF.pollster_list,
worker_id)
def main():
service.prepare_service()
sm = cotyledon.ServiceManager()
sm.add(create_polling_service)
sm.run()
'''
关键:
1 cotyledon.Service(worker_id)
作用: 创建一个新的服务
参数: worker_id (int) – service实例的标示符
2 ServiceManager()
2.1 作用
类似于主进程,管理服务的生命周期。
控制子进程的生命周期,如果子进程意外死亡就重启他们。
每一个子进程ServiceWorker运行在一个服务的实例上。
一个应用必须创建一个ServiceManager类并且使用
ServiceManager.run()做为和应用的主循环
样例:
class cotyledon.ServiceManager(wait_interval=0.01, graceful_shutdown_timeout=60)
2.2 cotyledon.ServiceManager.add
cotyledon.ServiceManager.add(service, workers=1, args=None, kwargs=None)
作用: 创建一个子进程来运行AgentService服务
参数:
service (callable) – callable that return an instance of Service
workers (int) – number of processes/workers for this service
args (tuple) – additional positional arguments for this service
kwargs (dict) – additional keywoard arguments for this service
Returns:
a service id
2.3 cotyledon.ServiceManager.run()
开启并监督服务工作者
这个方法将会开启和监督所有子进程,直到主进程被关闭了
参考:
https://blog.csdn.net/qingyuanluofeng/article/details/95533476
'''
该部分使用cotyledon的多进程框架实现
实例化子进程调用create_polling_service方法,该方法又调用AgentManager类,传入三个参数,分别是
compute服务即从此处开始,调用AgentManager的 init方法初始化,然后调用run方法进行数据的采集
让我们来看看AgentManager里面是什么
class AgentManager(service_base.PipelineBasedService):
def __init__(self, namespaces=None, pollster_list=None, worker_id=0):
namespaces = namespaces or ['compute', 'central']
pollster_list = pollster_list or []
group_prefix = cfg.CONF.polling.partitioning_group_prefix
self._inspector = virt_inspector.get_hypervisor_inspector()
self.nv = nova_client.Client()
# features of using coordination and pollster-list are exclusive, and
# cannot be used at one moment to avoid both samples duplication and
# samples being lost
......
# we'll have default ['compute', 'central'] here if no namespaces will
# be passed
'''
(Pdb) p self.extensions[0].__dict__
{
'obj': < ceilometer.compute.pollsters.cpu.CPUL3CachePollster object at 0x7f392837acd0 > ,
'entry_point': EntryPoint.parse('cpu_l3_cache = ceilometer.compute.pollsters.cpu:CPUL3CachePollster'),
'name': 'cpu_l3_cache',
'plugin': < class 'ceilometer.compute.pollsters.cpu.CPUL3CachePollster' >
}
(Pdb) p self.extensions[1].__dict__
{
'obj': < ceilometer.compute.pollsters.disk.WriteRequestsRatePollster object at 0x7f3928385650 > ,
'entry_point': EntryPoint.parse('disk.write.requests.rate = ceilometer.compute.pollsters.disk:WriteRequestsRatePollster'),
'name': 'disk.write.requests.rate',
'plugin': < class 'ceilometer.compute.pollsters.disk.WriteRequestsRatePollster' >
}
(Pdb) p self.extensions[2].__dict__
{
'obj': < ceilometer.compute.pollsters.disk.DisksTotalPoller object at 0x7f39283973d0 > ,
'entry_point': EntryPoint.parse('disks.total = ceilometer.compute.pollsters.disk:DisksTotalPoller'),
'name': 'disks.total',
'plugin': < class 'ceilometer.compute.pollsters.disk.DisksTotalPoller' >
}
'''
extensions = (self._extensions('poll', namespace).extensions
for namespace in namespaces)
# get the extensions from pollster builder
extensions_fb = (self._extensions_from_builder('poll', namespace)
for namespace in namespaces)
if pollster_list:
extensions = (moves.filter(_match, exts)
for exts in extensions)
extensions_fb = (moves.filter(_match, exts)
for exts in extensions_fb)
self.extensions = list(itertools.chain(*list(extensions))) + list(
itertools.chain(*list(extensions_fb)))
if self.extensions == []:
raise EmptyPollstersList()
"""
(Pdb) p self.discoveries[0].__dict__
{
'obj': < ceilometer.compute.discovery.InstanceDiscovery object at 0x7f392864d390 > ,
'entry_point': EntryPoint.parse('local_instances = ceilometer.compute.discovery:InstanceDiscovery'),
'name': 'local_instances',
'plugin': < class 'ceilometer.compute.discovery.InstanceDiscovery' >
}
"""
discoveries = (self._extensions('discover', namespace).extensions
for namespace in namespaces)
self.discoveries = list(itertools.chain(*list(discoveries)))
self.polling_periodics = None
"""
(Pdb) p self.heartbeat_timer.__dict__
{
'_dead': < threading._Event object at 0x7f3928654c10 > ,
'_waiter': < Condition( < _RLock owner = None count = 0 > , 0) > ,
'_now_func': < function monotonic at 0x18d51b8 > ,
'_schedule': < futurist.periodics._Schedule object at 0x7f3928654d10 > ,
'_active': < threading._Event object at 0x7f3928654c90 > ,
'_initial_schedule_strategy': < function _now_plus_periodicity at 0x1bccc80 > ,
'_watchers': [({
'successes': 0,
'failures': 0,
'runs': 0,
'elapsed': 0,
'elapsed_waiting': 0
}, < Watcher object at 0x7f3928654d90(runs = 0, successes = 0, failures = 0, elapsed = 0.00, elapsed_waiting = 0.00) > )],
'_on_failure': < functools.partial object at 0x7f3928649b50 > ,
'_executor_factory': < function < lambda > at 0x7f39283809b0 > ,
'_schedule_strategy': < function _last_started_strategy at 0x1bccb90 > ,
'_log': < logging.Logger object at 0x1bbfa90 > ,
'_callables': [( < function < lambda > at 0x7f39286581b8 > , 'ceilometer.utils.<lambda>', (), {})],
'_cond_cls': < function Condition at 0x7f39364b2938 > ,
'_tombstone': < threading._Event object at 0x7f3928654b10 > ,
'_immediates': deque([0])
}
(Pdb) p self.partition_coordinator.__dict__
{'_groups': set([]), '_my_id': 'c6ee6425-1bac-4e9c-a026-2022b133c825', '_coordinator': None}
"""
self.partition_coordinator = coordination.PartitionCoordinator()
self.heartbeat_timer = utils.create_periodic(
target=self.partition_coordinator.heartbeat,
spacing=cfg.CONF.coordination.heartbeat,
run_immediately=True)
# Compose coordination group prefix.
# We'll use namespaces as the basement for this partitioning.
namespace_prefix = '-'.join(sorted(namespaces))
self.group_prefix = ('%s-%s' % (namespace_prefix, group_prefix)
if group_prefix else namespace_prefix)
"""
(Pdb) p self.notifier.__dict__
{
'_serializer': < oslo_messaging.serializer.NoOpSerializer object at 0x7f1cc9aa3710 > ,
'_driver_mgr': < stevedore.named.NamedExtensionManager object at 0x7f1cc9aa3890 > ,
'retry': -1,
'_driver_names': ['messagingv2'],
'_topics': ['notifications'],
'publisher_id': 'ceilometer.polling',
'transport': < oslo_messaging.transport.NotificationTransport object at 0x7f1cc9b26550 >
}
(Pdb) p self.notifier._driver_mgr
<stevedore.named.NamedExtensionManager object at 0x7f1cc9aa3890>
(Pdb) p self.notifier._driver_mgr.__dict__
{
'_names': ['messagingv2'],
'namespace': 'oslo.messaging.notify.drivers',
'_on_load_failure_callback': None,
'extensions': [ < stevedore.extension.Extension object at 0x7fddfc80e990 > ],
'propagate_map_exceptions': False,
'_extensions_by_name': None,
'_name_order': False,
'_missing_names': set([])
}
(Pdb) p self.notifier._driver_mgr.__dict__.get('extensions')[0].__dict__
{
'obj': < oslo_messaging.notify.messaging.MessagingV2Driver object at 0x7fddfc80e750 > ,
'entry_point': EntryPoint.parse('messagingv2 = oslo_messaging.notify.messaging:MessagingV2Driver'),
'name': 'messagingv2',
'plugin': < class 'oslo_messaging.notify.messaging.MessagingV2Driver' >
}
"""
self.notifier = oslo_messaging.Notifier(
messaging.get_transport(),
driver=cfg.CONF.publisher_notifier.telemetry_driver,
publisher_id="ceilometer.polling")
self._keystone = None
self._keystone_last_exception = None
以上的代码中的一些参数我都用pdb打印出来了,可以更加直观的感受,我们来逐行分析一下这个初始化的过程:
前5行应该不必多说,基本的参数传递,init的参数已经在代码入口处介绍过
第6行是为了实现工作负载分区协调
第7行调用get_hypervisor_inspector方法来加载libvirt驱动
def get_hypervisor_inspector():
try:
namespace = 'ceilometer.compute.virt'
mgr = driver.DriverManager(namespace,
cfg.CONF.hypervisor_inspector,
invoke_on_load=True)
return mgr.driver
except ImportError as e:
LOG.error(_("Unable to load the hypervisor inspector: %s") % e)
return Inspector()
这里使用的是stevedore库,在该篇文章不过多介绍,具体请见:Python插件之stevedore
第8行初始化了个nove的client
第40-69行主要就是在加载插件了,加载插件的方式也是使用stevedore库,其中40和43行的namespace就是你传进来的方法,我们来看下_extensions的实现
def _extensions(self, category, agent_ns=None):
namespace = ('ceilometer.%s.%s' % (category, agent_ns) if agent_ns
else 'ceilometer.%s' % category)
return self._get_ext_mgr(namespace)
这里主要就是加载在ceilometer.poll.compute命名空间中的插件(如果开启的是central服务,即加载的是ceilometer.poll.central命名空间中的插件),插件在setup.cfg文件中
ceilometer.poll.compute =
disks.util = ceilometer.compute.pollsters.disk:DisksUtilPoller
disks.total = ceilometer.compute.pollsters.disk:DisksTotalPoller
disks.used = ceilometer.compute.pollsters.disk:DisksUsedPoller
...
ceilometer.poll.central =
account = ceilometer.chakra.chakra:AccountPollster
ip.floating = ceilometer.network.floatingip:FloatingIPPollster
image = ceilometer.image.glance:ImagePollster
...
加载后的extension对象是这样的,是一个存储了在命名空间在所有插件对象的列表:
(Pdb) p self.extensions[0].__dict__
{
'obj': < ceilometer.compute.pollsters.cpu.CPUL3CachePollster object at 0x7f392837acd0 > ,
'entry_point': EntryPoint.parse('cpu_l3_cache = ceilometer.compute.pollsters.cpu:CPUL3CachePollster'),
'name': 'cpu_l3_cache',
'plugin': < class 'ceilometer.compute.pollsters.cpu.CPUL3CachePollster' >
}
(Pdb) p self.extensions[1].__dict__
{
'obj': < ceilometer.compute.pollsters.disk.WriteRequestsRatePollster object at 0x7f3928385650 > ,
'entry_point': EntryPoint.parse('disk.write.requests.rate = ceilometer.compute.pollsters.disk:WriteRequestsRatePollster'),
'name': 'disk.write.requests.rate',
'plugin': < class 'ceilometer.compute.pollsters.disk.WriteRequestsRatePollster' >
}
(Pdb) p self.extensions[2].__dict__
{
'obj': < ceilometer.compute.pollsters.disk.DisksTotalPoller object at 0x7f39283973d0 > ,
'entry_point': EntryPoint.parse('disks.total = ceilometer.compute.pollsters.disk:DisksTotalPoller'),
'name': 'disks.total',
'plugin': < class 'ceilometer.compute.pollsters.disk.DisksTotalPoller' >
}
43-69行的加载插件的原理同上,不赘述
第99-103行是为了获取一个工作负载分区协调类实例,用来协调多个采集程序worker时的分工处理,每个对象的实例在代码中已经打印,有兴趣的可以了解下,这里使用的是tooz库的一些用法,简单介绍下:
1 Tooz基础
作用:
1) 提供分布式协调API来管理群组和群组中的成员
2) 提供分布式锁从而允许分布式节点获取和释放锁来实现同步
解决的问题: 多个分布式进程同步问题
2 Tooz架构
本质: Tooz是zookeeper, Raft consensus algorithm, Redis等方案的抽象,
通过驱动(driver)形式来提供后端功能
驱动分类:
zookeeper, Zake, memcached, redis,
SysV IPC(只提供分布式锁功能), PostgreSQL(只提供分布式锁功能), MySQL(只提供分布式锁功能)
驱动特点:
所有驱动都支持分布式进程, Tooz API完全异步,更高校。
3 Tooz功能
3.1 群组管理
管理群组成员。
操作: 群组创建,加入群组,离开群组,查看群组成员,有成员加入或离开群组时通知的功能
应用场景:
ceilometer-notification服务利用群组管理实现负载均衡和真正意义上的服务水平扩展。
3.2 领导选取
每个群组都有领导,所有节点可决定是否参与选举;
领导消失则选取新领导节点;
领导被选取其他成员可能得到通知;
各节点可随时获取当前组的领导。
感悟:
考虑可以使用tooz实现自己的leader选举算法和服务高可用。
3.3 分布式锁
应用场景:
原来ceilometer中通过RPC检测alarm evaluator进程是否存活。
后来ceilometer通过Tooz群组管理来协调多个alarm evaluator进程。
应用场景2:
gnocchi中利用分布式锁操作监控项与监控数据
参考:https://blog.csdn.net/qingyuanluofeng/article/details/90349185
第154-157比较重要,这里是初始化了个oslo_messaging.Notifier的实例,用来发送采集之后的数据到指定的队列notifications.sample,基本解释如下
"""
self.notifier.topics = ['notifications']
self.notifier._driver_names = 'messagingv2'(定义在ceilometer/publisher/messaging.py 中的 telemetry_driver这个变量),代表的是ceilometer向消息队列发送消息时使用的驱动类型
self.notifier.publisher_id = 'ceilometer.polling'
self.notifier._driver_mgr 是从oslo.messaging.notify.drivers 中加载名称为messagingv2对应的插件
self.notifier.transport 是从oslo.messaging.drivers 中加载名称为 rabbit对应的插件。因为ceilometer.conf 中 transport_url定义是:transport_url=rabbit://guest:guest@192.168.2.120:5672/
"""
此时就初始化完毕了,下面就是采集数据了,执行的是sm.run()代码,则是调用到了AgentManager类实例的run方法
入口:ceilometer/agent/manager.py:AgentManager.run
def run(self):
super(AgentManager, self).run()
self.polling_manager = pipeline.setup_polling()
self.join_partitioning_groups()
self.start_polling_tasks()
self.init_pipeline_refresh()
run方法看起来比较简单,我们来分析一下:
第4行调用了setup_polling的方法
def setup_polling():
"""Setup polling manager according to yaml config file."""
cfg_file = cfg.CONF.pipeline_cfg_file
return PollingManager(cfg_file)
这里面是为了加载配置文件pipeline.yaml文件中的你所配置的采集项,我们来看下这里面发生了什么:
class PollingManager(ConfigManagerBase):
"""Polling Manager
Polling manager sets up polling according to config file.
"""
def __init__(self, cfg_info):
"""Setup the polling according to config.
The configuration is the sources half of the Pipeline Config.
"""
super(PollingManager, self).__init__()
# 加载配置文件中内容
cfg = self.load_config(cfg_info)
self.sources = []
if not ('sources' in cfg and 'sinks' in cfg):
raise PipelineException("Both sources & sinks are required",
cfg)
LOG.info(_LI('detected decoupled pipeline config format'))
unique_names = set()
# 初始化并记录配置文件中的 sources 项
for s in cfg.get('sources'):
name = s.get('name')
if name in unique_names:
raise PipelineException("Duplicated source names: %s" %
name, self)
else:
unique_names.add(name)
# 将配置文件中的数据皆放进 self.sources 中
self.sources.append(SampleSource(s))
unique_names.clear()
初始化好了之后,返回一个 self.polling_manager 的对象, 我们来看下这个对象中存的东西:
"""
(Pdb) p self.polling_manager.__dict__
{'cfg_mtime': 1592535895.477891, 'cfg_hash': 'ba1311798d634022e4b684b72bf7b42a', 'cfg_loc': '/etc/ceilometer/pipeline.yaml', 'sources': [<ceilometer.pipeline.SampleSource object at 0x7f0eec042110>, <ceilometer.pipeline.SampleSource object at 0x7f0eec0423d0>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042050>, <ceilometer.pipeline.SampleSource object at 0x7f0eec0422d0>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042290>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042390>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042350>]}
(Pdb) p self.polling_manager.sources
[<ceilometer.pipeline.SampleSource object at 0x7f0eec042110>, <ceilometer.pipeline.SampleSource object at 0x7f0eec0423d0>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042050>, <ceilometer.pipeline.SampleSource object at 0x7f0eec0422d0>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042290>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042390>, <ceilometer.pipeline.SampleSource object at 0x7f0eec042350>]
(Pdb) p self.polling_manager.sources[0].__dict__
{
'name': 'notification_source',
'cfg': {
'interval': 3600,
'meters': ['instance', 'volume', 'image', 'snapshot', 'backup', 'ip.floating', 'router', 'network', 'subnet', 'account'],
'name': 'notification_source',
'sinks': ['notification_sink']
},
'interval': 3600,
'meters': ['instance', 'volume', 'image', 'snapshot', 'backup', 'ip.floating', 'router', 'network', 'subnet', 'account'],
'discovery': [],
'resources': [],
'sinks': ['notification_sink']
}
(Pdb) p self.polling_manager.sources[1].__dict__
{
'name': 'meter_source',
'cfg': {
'interval': 300,
'meters': ['poll.*', 'memory.usage', 'memory.util'],
'name': 'meter_source',
'sinks': ['meter_sink']
},
'interval': 300,
'meters': ['poll.*', 'memory.usage', 'memory.util'],
'discovery': [],
'resources': [],
'sinks': ['meter_sink']
}
"""
可以看到,这行代码的意义就是为了加载在配置文件的定义的采集项
第 6 行调用 start_polling_tasks 方法:
def start_polling_tasks(self):
# allow time for coordination if necessary
delay_start = self.partition_coordinator.is_active()
# set shuffle time before polling task if necessary
delay_polling_time = random.randint(
0, cfg.CONF.shuffle_time_before_polling_task)
data = self.setup_polling_tasks()
# One thread per polling tasks is enough
# 按照时间间隔创建线程池,相同时间间隔创建一个线程
self.polling_periodics = periodics.PeriodicWorker.create(
[], executor_factory=lambda:
futures.ThreadPoolExecutor(max_workers=len(data)))
for interval, polling_task in data.items():
delay_time = (interval + delay_polling_time if delay_start
else delay_polling_time)
@periodics.periodic(spacing=interval, run_immediately=False)
def task(running_task):
self.interval_task(running_task)
utils.spawn_thread(utils.delayed, delay_time,
self.polling_periodics.add, task, polling_task)
if data:
# Don't start useless threads if no task will run
utils.spawn_thread(self.polling_periodics.start, allow_empty=True)
前 8 行不用说,就是简单的参数定义,重要的是第 9 行之后的代码,分析一下
第9 行调用了 setup_polling_tasks 的方法:
def setup_polling_tasks(self):
"""
(Pdb) p self.polling_manager.sources[1].__dict__
{
'name': 'meter_source',
'cfg': {
'interval': 300,
'meters': ['poll.*', 'memory.usage', 'memory.util'],
'name': 'meter_source',
'sinks': ['meter_sink']
},
'interval': 300,
'meters': ['poll.*', 'memory.usage', 'memory.util'],
'discovery': [],
'resources': [],
'sinks': ['meter_sink']
}
(Pdb) p self.extensions[0].__dict__
{
'obj': < ceilometer.compute.pollsters.memory.MemoryUtilizationPollster object at 0x7ff12062e590 > ,
'entry_point': EntryPoint.parse('memory.util = ceilometer.compute.pollsters.memory:MemoryUtilizationPollster'),
'name': 'memory.util',
'plugin': < class 'ceilometer.compute.pollsters.memory.MemoryUtilizationPollster' >
}
"""
polling_tasks = {}
for source in self.polling_manager.sources:
polling_task = None
for pollster in self.extensions:
# 将 souce 中的 meters 与 extensions 中的每个对象的 name 进行匹配,如果一样则以采集周期为 key,以 PollingTask 对象为 value,加入到 polling_tasks中
if source.support_meter(pollster.name):
polling_task = polling_tasks.get(source.get_interval())
if not polling_task:
polling_task = self.create_polling_task()
polling_tasks[source.get_interval()] = polling_task
polling_task.add(pollster, source)
return polling_tasks
这里是遍历self.polling_manager.sources中的source,同时遍历self.extensions,查看如果对应插件在对应source的meters列表中,代表该插件需要定期执行,获取该插件定义的执行周期,将其加入到polling_tasks中
我们来看下 polling_tasks 中的 value 对象,其实就是初始化了一些值,然后调用 add 方法
class PollingTask(object):
"""Polling task for polling samples and notifying.
A polling task can be invoked periodically or only once.
"""
def __init__(self, agent_manager):
self.manager = agent_manager
# elements of the Cartesian product of sources X pollsters
# with a common interval
self.pollster_matches = collections.defaultdict(set)
# we relate the static resources and per-source discovery to
# each combination of pollster and matching source
resource_factory = lambda: Resources(agent_manager)
self.resources = collections.defaultdict(resource_factory)
self._batch = cfg.CONF.batch_polled_samples
self._telemetry_secret = cfg.CONF.publisher.telemetry_secret
def add(self, pollster, source):
self.pollster_matches[source.name].add(pollster)
key = Resources.key(source.name, pollster)
self.resources[key].setup(source)
即最终返回后的 data 是:
(Pdb) p data
{
3600: < ceilometer.agent.manager.PollingTask object at 0x7fe6a4043bd0 > ,
300: < ceilometer.agent.manager.PollingTask object at 0x7fe6a4043d10 >
}
(Pdb) p data[3600].__dict__
{
'manager': < ceilometer.agent.manager.AgentManager object at 0x3230b50 > ,
'_telemetry_secret': 'change this for valid signing',
'pollster_matches': defaultdict( < type 'set' > , {
'notification_source': set([ < stevedore.extension.Extension object at 0x7fe6b062ec50 > ])
}),
'resources': defaultdict( < function < lambda > at 0x7fe6a40319b0 > , {
'notification_source-instance': < ceilometer.agent.manager.Resources object at 0x7fe6a4043b90 >
}),
'_batch': True
}
第 13 到 30 行即是按照时间间隔创建线程池,相同时间间隔创建一个线程,这里用到了futurist 库(这里不做介绍),实现了一个周期任务,每隔配置文件中定义的采样周期调用 self.interval_task,而 self.interval_task 又调用了 task.poll_and_notify(),下面看下task.poll_and_notify()的实现:
def poll_and_notify(self):
"""Polling sample and notify."""
cache = {}
discovery_cache = {}
poll_history = {}
"""
(Pdb) self.pollster_matches
defaultdict( < type 'set' > , {
'meter_source': set([ < stevedore.extension.Extension object at 0x7f9a645de8d0 > , < stevedore.extension.Extension object at 0x7f9a645d40d0 > ]),
'cpu_source': set([ < stevedore.extension.Extension object at 0x7f9a645d4cd0 > ]),
'cpu_util_source': set([ < stevedore.extension.Extension object at 0x7f9a645dec10 > ]),
'disk_source': set([ < stevedore.extension.Extension object at 0x7f9a64399c90 > , < stevedore.extension.Extension object at 0x7f9a645d4ad0 > , < stevedore.extension.Extension object at 0x7f9a645d49d0 > , < stevedore.extension.Extension object at 0x7f9a645d4950 > , < stevedore.extension.Extension object at 0x7f9a645dea90 > , < stevedore.extension.Extension object at 0x7f9a64389fd0 > , < stevedore.extension.Extension object at 0x7f9a645d43d0 > , < stevedore.extension.Extension object at 0x7f9a645d4d50 > ]),
'network_source': set([ < stevedore.extension.Extension object at 0x7f9a645def10 > , < stevedore.extension.Extension object at 0x7f9a645d4650 > , < stevedore.extension.Extension object at 0x7f9a645d4450 > , < stevedore.extension.Extension object at 0x7f9a645d4550 > ]),
'volume_source': set([ < stevedore.extension.Extension object at 0x7f9a645d4b50 > , < stevedore.extension.Extension object at 0x7f9a645d4c50 > ])
})
"""
# self.pollster_matches 中存的是以你在配置文件中定义的 source name 为 key,以在 source name 中定义的每个采集插件的对象的集合为 value
# Resources中的 key 是 source.name + 插件的名字
for source_name in self.pollster_matches:
for pollster in self.pollster_matches[source_name]:
"""
first
(Pdb) p key
'meter_source-memory.util'
(Pdb) p candidate_res
[]
(Pdb) p pollster.obj.default_discovery
'local_instances'
"""
key = Resources.key(source_name, pollster)
candidate_res = list(
self.resources[key].get(discovery_cache))
if not candidate_res and pollster.obj.default_discovery:
candidate_res = self.manager.discover(
[pollster.obj.default_discovery], discovery_cache)
"""
(Pdb) p candidate_res
[<Server: jy-2>]
"""
# Remove duplicated resources and black resources. Using
# set() requires well defined __hash__ for each resource.
# Since __eq__ is defined, 'not in' is safe here.
polling_resources = []
black_res = self.resources[key].blacklist
history = poll_history.get(pollster.name, [])
for x in candidate_res:
if x not in history:
history.append(x)
if x not in black_res:
polling_resources.append(x)
"""
first
(Pdb) p poll_history
{'memory.util': [<Server: jy-2>]}
second
(Pdb) p poll_history
{'memory.util': [<Server: jy-2>], 'memory.usage': [<Server: jy-2>]}
"""
poll_history[pollster.name] = history
# If no resources, skip for this pollster
if not polling_resources:
p_context = 'new ' if history else ''
LOG.info(_("Skip pollster %(name)s, no %(p_context)s"
"resources found this cycle"),
{'name': pollster.name, 'p_context': p_context})
continue
LOG.info(_("Polling pollster %(poll)s in the context of "
"%(src)s"),
dict(poll=pollster.name, src=source_name))
try:
polling_timestamp = timeutils.utcnow().isoformat()
samples = pollster.obj.get_samples(
manager=self.manager,
cache=cache,
resources=polling_resources
)
sample_batch = []
# filter None in samples
"""
first
(Pdb) samples
[<name: memory.util, volume: 18.58, resource_id: 32eb16e0-c8af-4b41-b18c-b63152f0f8fc, timestamp: None>]
second
(Pdb) samples
[<name: memory.usage, volume: 91, resource_id: 32eb16e0-c8af-4b41-b18c-b63152f0f8fc, timestamp: None>]
"""
samples = [s for s in samples if s is not None]
for sample in samples:
# Note(yuywz): Unify the timestamp of polled samples
sample.set_timestamp(polling_timestamp)
"""
(Pdb) sample_dict
{
'counter_name': 'memory.util',
'resource_id': u '32eb16e0-c8af-4b41-b18c-b63152f0f8fc',
'timestamp': '2020-06-19T06:21:32.542893',
'counter_volume': 18.58,
'user_id': u '7431e07e49de2703f1b9e703daf5aff158e32028506f77e05a62e9eb3892dcde',
'message_signature': '76e6cbd944963855306b829cdf2c49bba9f0b20220c8c738b61b5048be51b24f',
'resource_metadata': {
'status': u 'active',
'ephemeral_gb': 0,
'disk_gb': 0,
'instance_host': u 'node-3.domain.tld',
'kernel_id': None,
'image': None,
'ramdisk_id': None,
'host': u '619a474e64dbedcd55508bda51aea8a611fd15f3f3e8fa39ce0d0552',
'flavor': {
'name': u '1C-0.5G',
u 'links': [{
u 'href': u 'http://nova-api.openstack.svc.cluster.local:8774/ca80e5ccd445438580c4b128296d1936/flavors/211',
u 'rel': u 'bookmark'
}],
'ram': 512,
'ephemeral': 0,
'vcpus': 1,
'disk': 0,
u 'id': u '211'
},
'task_state': None,
'image_ref_url': None,
'memory_mb': 512,
'root_gb': 0,
'display_name': u 'jy-2',
'name': u 'instance-00000002',
'vcpus': 1,
'instance_id': u '32eb16e0-c8af-4b41-b18c-b63152f0f8fc',
'instance_type': u '1C-0.5G',
'state': u 'active',
'image_ref': None,
'OS-EXT-AZ:availability_zone': u 'default-az'
},
'source': 'openstack',
'counter_unit': '%',
'project_id': u '34ecb9aca8454157bd5c3c64708990bf',
'message_id': '6fbcdc0e-b1f5-11ea-a901-be1696955049',
'monotonic_time': None,
'counter_type': 'gauge'
}
"""
sample_dict = (
publisher_utils.meter_message_from_counter(
sample, self._telemetry_secret
))
if self._batch:
sample_batch.append(sample_dict)
else:
self._send_notification([sample_dict])
if sample_batch:
self._send_notification(sample_batch)
except plugin_base.PollsterPermanentError as err:
LOG.error(_(
'Prevent pollster %(name)s for '
'polling source %(source)s anymore!')
% ({'name': pollster.name, 'source': source_name}))
self.resources[key].blacklist.extend(err.fail_res_list)
except Exception as err:
LOG.warning(_(
'Continue after error from %(name)s: %(error)s')
% ({'name': pollster.name, 'error': err}),
exc_info=True)
def _send_notification(self, samples):
self.manager.notifier.sample(
{},
'telemetry.polling',
{'samples': samples}
)
前 5 行不谈
第 19 和 20 行,拿出在 self.pollster_matches 中定义的资源和插件遍历,然后第 31 行到最后都是采集数据的一些转换,其中比较重要的地方都已经打印出来了,很清楚,选几个介绍下(candidate_res这个对象我现在貌似还不能非常清楚的理解,如果有清楚的大佬,还请麻烦留言介绍下,十分感谢)
看下第 76 行,这边就是采集数据的地方,调用的就是插件中的方法,比如现在的pollster.name=memory.usage,那么就是调用MemoryUsagePollster类的get_samples方法,可以看见在该方法中实现数据采集之后,yield 返回
class MemoryUsagePollster(pollsters.BaseComputePollster):
def get_samples(self, manager, cache, resources):
self._inspection_duration = self._record_poll_time()
for instance in resources:
LOG.debug('Checking memory usage for instance %s', instance.id)
try:
memory_info = self.inspector.inspect_memory_usage(
instance, self._inspection_duration)
LOG.debug("MEMORY USAGE: %(instance)s %(usage)f",
{'instance': instance,
'usage': memory_info.usage})
yield util.make_sample_from_instance(
instance,
name='memory.usage',
type=sample.TYPE_GAUGE,
unit='MB',
volume=memory_info.usage,
)
再看下第 157 行,这里是实现发送消息的地方,调用self._send_notification(sample_batch)
def _send_notification(self, samples):
self.manager.notifier.sample(
{},
'telemetry.polling',
{'samples': samples}
)
这里使用了oslo_messaging库,继续往里分析,这里的 sample 调用的是 oslo_messaging.py/notify/notifier.py:Notifier 类的 sample 方法:
def sample(self, ctxt, event_type, payload):
self._notify(ctxt, event_type, payload, 'SAMPLE')
继续往里调用 _notify 方法
def _notify(self, ctxt, event_type, payload, priority, publisher_id=None,
retry=None):
payload = self._serializer.serialize_entity(ctxt, payload)
ctxt = self._serializer.serialize_context(ctxt)
msg = dict(message_id=six.text_type(uuid.uuid4()),
publisher_id=publisher_id or self.publisher_id,
event_type=event_type,
priority=priority,
payload=payload,
timestamp=six.text_type(timeutils.utcnow()))
def do_notify(ext):
try:
ext.obj.notify(ctxt, msg, priority, retry or self.retry)
except Exception as e:
_LOG.exception("Problem '%(e)s' attempting to send to "
"notification system. Payload=%(payload)s",
{'e': e, 'payload': payload})
if self._driver_mgr.extensions:
self._driver_mgr.map(do_notify)
这里主要是调用了 15 行,我们看下这个对象的信息:
(Pdb) p ext.obj
<oslo_messaging.notify.messaging.MessagingV2Driver object at 0x7f72f876d510>
(Pdb) p ext.obj.__dict__
{'topics': ['notifications'], 'version': 2.0, 'transport': <oslo_messaging.transport.Transport object at 0x7f72f85dd1d0>, 'conf': <oslo_config.cfg.ConfigOpts object at 0x13dc990>}
继续往里调用 notify 方法(oslo_messaging.py/notify/messaging.py:MessagingDriver类):
def notify(self, ctxt, message, priority, retry):
priority = priority.lower()
for topic in self.topics:
target = oslo_messaging.Target(topic='%s.%s' % (topic, priority))
try:
self.transport._send_notification(target, ctxt, message,
version=self.version,
retry=retry)
except Exception:
LOG.exception("Could not send notification to %(topic)s. "
"Payload=%(message)s",
{'topic': topic, 'message': message})
可以看到这里的 target 就是:
(Pdb) p target
<Target topic=notifications.sample>
所以最终调用 self.transport._send_notification 将消息发送至 notifications.sample 队列,由此整个 ceilometer-compute 服务的采集任务就完成了
ceilometer-central 服务和 ceilometer-compute 服务差不多,加载命令空间的插件不同,逻辑基本一致,不赘述。
参考:
https://www.cnblogs.com/luohaixian/p/11145939.html