本博客所有文章采用的授权方式为 自由转载-非商用-非衍生-保持署名 ,转载请务必注明出处,谢谢。
声明:
本博客欢迎转发,但请注明出处,保留原作者信息
博客地址:孟阿龙的博客
所有内容为本人学习、研究、总结。如有雷同,实属荣幸
注:
目录
前边我们说过,Ceilometer中的agent主要是两个,Polling Agent 和 Notification Agent,而Polling Agent中,主要又包含两个:Compute Agent + Central Agent, 本文就来分析Polling Agent的代码基本原理。通过对代码的分析,来进一步了解Polling Agent的工作机制。
本文测试环境是通过packstack在虚拟机环境中安装的一套 all-in-one Openstack环境。
Polling Agent的启动命令如下:
/usr/bin/ceilometer-polling --logfile /var/log/ceilometer/polling.log
启动代码的入口在
ceilometer.cmd.polling.main
def main():
conf = cfg.ConfigOpts()
conf.register_cli_opts(CLI_OPTS)
service.prepare_service(conf=conf)
sm = cotyledon.ServiceManager()
sm.add(create_polling_service, args=(conf,))
oslo_config_glue.setup(sm, conf)
sm.run()
从这里可以看到,整体的基本过程为:
配置初始化的过程主要在service.prepare_service()这个方法中,这里我们只简单说明一下基础配置项如何进行初始化
在ceilometer.service.prepare_service函数中:
def prepare_service(argv=None, config_files=None, conf=None):
if argv is None:
argv = sys.argv
if conf is None:
conf = cfg.ConfigOpts()
...
for group, options in opts.list_opts():
conf.register_opts(list(options),
group=None if group == "DEFAULT" else group)
...
conf(argv[1:], project='ceilometer', validate_default_values=True,
version=version.version_info.version_string(),
default_config_files=config_files)
这里基本的过程为:
代码路径:ceilometer.cmd.polling.create_polling_service
def create_polling_service(worker_id, conf):
return manager.AgentManager(worker_id,
conf,
conf.polling_namespaces)
这个函数中,调用AgentManager进行初始化,传入三个参数:
worker_id: 这个参数在哪里初始化的还没搞清楚
conf:即上一步初始化的配置对象
conf.polling_namespaces: 使用默认的配置,即:['compute', 'central']
代码路径:ceilometer.polling.manager.AgentManager#init
1 def __init__(self, worker_id, conf, namespaces=None):
2 namespaces = namespaces or ['compute', 'central']
3 group_prefix = conf.polling.partitioning_group_prefix
4
5 super(AgentManager, self).__init__(worker_id)
6
7 self.conf = conf
8
9 if type(namespaces) is not list:
10 namespaces = [namespaces]
11
12 # we'll have default ['compute', 'central'] here if no namespaces will
13 # be passed
14 extensions = (self._extensions('poll', namespace, self.conf).extensions
15 for namespace in namespaces)
16 # get the extensions from pollster builder
17 extensions_fb = (self._extensions_from_builder('poll', namespace)
18 for namespace in namespaces)
19
20 self.extensions = list(itertools.chain(*list(extensions))) + list(
21 itertools.chain(*list(extensions_fb)))
22
23 if not self.extensions:
24 LOG.warning('No valid pollsters can be loaded from %s '
25 'namespaces', namespaces)
26
27 discoveries = (self._extensions('discover', namespace,
28 self.conf).extensions
29 for namespace in namespaces)
30 self.discoveries = list(itertools.chain(*list(discoveries)))
31 self.polling_periodics = None
32
33 self.hashrings = None
34 self.partition_coordinator = None
35 if self.conf.coordination.backend_url:
36 # XXX uuid4().bytes ought to work, but it requires ascii for now
37 coordination_id = str(uuid.uuid4()).encode('ascii')
38 self.partition_coordinator = coordination.get_coordinator(
39 self.conf.coordination.backend_url, coordination_id)
40
41 # Compose coordination group prefix.
42 # We'll use namespaces as the basement for this partitioning.
43 namespace_prefix = '-'.join(sorted(namespaces))
44 self.group_prefix = ('%s-%s' % (namespace_prefix, group_prefix)
45 if group_prefix else namespace_prefix)
46
47 self.notifier = oslo_messaging.Notifier(
48 messaging.get_transport(self.conf),
49 driver=self.conf.publisher_notifier.telemetry_driver,
50 publisher_id="ceilometer.polling")
51
52 self._keystone = None
53 self._keystone_last_exception = None
(Pdb) p self.extensions[1]
<stevedore.extension.Extension object at 0x7f1cc9ef9310>
(Pdb) p self.extensions[1].__dict__
{'obj': <ceilometer.network.services.fwaas.FirewallPollster object at 0x7f1cc9ef92d0>, 'entry_point': EntryPoint.parse('network.services.firewall = ceilometer.network.services.fwaas:FirewallPollster'), 'name': 'network.services.firewall', 'plugin': <class 'ceilometer.network.services.fwaas.FirewallPollster'>}
(Pdb) p self.notifier.__dict__
{'_serializer': <oslo_messaging.serializer.NoOpSerializer object at 0x7f1cc9aa3710>, '_driver_mgr': <stevedore.named.NamedExtensionManager object at 0x7f1cc9aa3890>, 'retry': -1, '_driver_names': ['messagingv2'], '_topics': ['notifications'], 'publisher_id': 'ceilometer.polling', 'transport': <oslo_messaging.transport.NotificationTransport object at 0x7f1cc9b26550>}
(Pdb) p self.notifier._driver_mgr
<stevedore.named.NamedExtensionManager object at 0x7f1cc9aa3890>
(Pdb) p self.notifier._driver_mgr.__dict__
{'_extensions_by_name_cache': None, '_names': ['messagingv2'], 'namespace': 'oslo.messaging.notify.drivers', '_on_load_failure_callback': None, 'extensions': [<stevedore.extension.Extension object at 0x7f1cc9aa3c10>], 'propagate_map_exceptions': False, '_name_order': False, '_missing_names': set([])}
代码路径: ceilometer.polling.manager.AgentManager#run
def run(self):
super(AgentManager, self).run()
self.polling_manager = PollingManager(self.conf)
if self.partition_coordinator:
self.partition_coordinator.start()
self.join_partitioning_groups()
self.start_polling_tasks()
这里主要是第三行和最后一行,进入进程启动的具体过程,其中:
第3行,初始化polling_manager
第4行,启动周期任务
self.polling_manager = PollingManager(self.conf)
通过PollingManager+conf文件初始化polling_manager
{"sources": [{"name": source_1,
"interval": interval_time,
"meters" : ["meter_1", "meter_2"],
"resources": ["resource_uri1", "resource_uri2"],
},
{"name": source_2,
"interval": interval_time,
"meters" : ["meter_3"],
},
]}
}
super(PollingManager, self).__init__(conf)
cfg = self.load_config(conf.polling.cfg_file)
self.sources = []
if 'sources' not in cfg:
raise PollingException("sources required", cfg)
for s in cfg.get('sources'):
self.sources.append(PollingSource(s))
(Pdb) p self.polling_manager.sources
[<ceilometer.polling.manager.PollingSource object at 0x7f71a2f48150>]
(Pdb) p self.polling_manager.sources[0].__dict__
{'name': 'some_pollsters', 'cfg': {'interval': 300, 'meters': ['cpu', 'cpu_l3_cache', 'memory.usage', 'network.incoming.bytes', 'network.incoming.packets', 'network.outgoing.bytes', 'network.outgoing.packets', 'disk.device.read.bytes', 'disk.device.read.requests', 'disk.device.write.bytes', 'disk.device.write.requests', 'hardware.cpu.util', 'hardware.memory.used', 'hardware.memory.total', 'hardware.memory.buffer', 'hardware.memory.cached', 'hardware.memory.swap.avail', 'hardware.memory.swap.total', 'hardware.system_stats.io.outgoing.blocks', 'hardware.system_stats.io.incoming.blocks', 'hardware.network.ip.incoming.datagrams', 'hardware.network.ip.outgoing.datagrams'], 'name': 'some_pollsters'}, 'interval': 300, 'meters': ['cpu', 'cpu_l3_cache', 'memory.usage', 'network.incoming.bytes', 'network.incoming.packets', 'network.outgoing.bytes', 'network.outgoing.packets', 'disk.device.read.bytes', 'disk.device.read.requests', 'disk.device.write.bytes', 'disk.device.write.requests', 'hardware.cpu.util', 'hardware.memory.used', 'hardware.memory.total', 'hardware.memory.buffer', 'hardware.memory.cached', 'hardware.memory.swap.avail', 'hardware.memory.swap.total', 'hardware.system_stats.io.outgoing.blocks', 'hardware.system_stats.io.incoming.blocks', 'hardware.network.ip.incoming.datagrams', 'hardware.network.ip.outgoing.datagrams'], 'resources': [], 'discovery': []}
启动入口:ceilometer.polling.manager.AgentManager#start_polling_tasks
1 def start_polling_tasks(self):
2 data = self.setup_polling_tasks()
3
4 # Don't start useless threads if no task will run
5 if not data:
6 return
7
8 # One thread per polling tasks is enough
9 self.polling_periodics = periodics.PeriodicWorker.create(
10 [], executor_factory=lambda:
11 futures.ThreadPoolExecutor(max_workers=len(data)))
12
13 for interval, polling_task in data.items():
14
15 @periodics.periodic(spacing=interval, run_immediately=True)
16 def task(running_task):
17 self.interval_task(running_task)
18
19 self.polling_periodics.add(task, polling_task)
20
21 utils.spawn_thread(self.polling_periodics.start, allow_empty=True)
服务启动的具体过程主要是如下两步:
4.3.3.1 获取周期任务列表
代码入口:ceilometer.polling.manager.AgentManager#setup_polling_tasks
1 def setup_polling_tasks(self):
2 polling_tasks = {}
3 for source in self.polling_manager.sources:
4 for pollster in self.extensions:
5 if source.support_meter(pollster.name):
6 polling_task = polling_tasks.get(source.get_interval())
7 if not polling_task:
8 polling_task = PollingTask(self)
9 polling_tasks[source.get_interval()] = polling_task
10 polling_task.add(pollster, source)
11 return polling_tasks
(Pdb) p data
{300: <ceilometer.polling.manager.PollingTask object at 0x7fa6103394d0>}
(Pdb) p data[300]
<ceilometer.polling.manager.PollingTask object at 0x7fa6103394d0>
(Pdb) p data[300].__dict__
{'manager': <ceilometer.polling.manager.AgentManager object at 0x7fa610fb47d0>, '_telemetry_secret': '2e30d044a69343e0', 'pollster_matches': defaultdict(<type 'set'>, {'some_pollsters': set([<stevedore.extension.Extension object at 0x7fa610689850>, <stevedore.extension.Extension object at 0x7fa610697090>, <stevedore.extension.Extension object at 0x7fa61066c0d0>, <stevedore.extension.Extension object at 0x7fa61066c150>, <stevedore.extension.Extension object at 0x7fa610690190>, <stevedore.extension.Extension object at 0x7fa61066c190>, <stevedore.extension.Extension object at 0x7fa610690050>, <stevedore.extension.Extension object at 0x7fa61066ca50>, <stevedore.extension.Extension object at 0x7fa610690290>, <stevedore.extension.Extension object at 0x7fa61066c2d0>, <stevedore.extension.Extension object at 0x7fa610690890>, <stevedore.extension.Extension object at 0x7fa610733bd0>, <stevedore.extension.Extension object at 0x7fa610667c10>, <stevedore.extension.Extension object at 0x7fa610667490>, <stevedore.extension.Extension object at 0x7fa6106970d0>, <stevedore.extension.Extension object at 0x7fa610689550>, <stevedore.extension.Extension object at 0x7fa610667dd0>, <stevedore.extension.Extension object at 0x7fa610689e50>, <stevedore.extension.Extension object at 0x7fa610667e10>, <stevedore.extension.Extension object at 0x7fa610667e50>, <stevedore.extension.Extension object at 0x7fa610667e90>, <stevedore.extension.Extension object at 0x7fa610683790>])}), 'resources': defaultdict(<function <lambda> at 0x7fa610330b90>, {'some_pollsters-cpu_l3_cache': <ceilometer.polling.manager.Resources object at 0x7fa610339110>, 'some_pollsters-disk.device.read.bytes': <ceilometer.polling.manager.Resources object at 0x7fa6103398d0>, 'some_pollsters-hardware.memory.cached': <ceilometer.polling.manager.Resources object at 0x7fa610340310>, 'some_pollsters-disk.device.write.requests': <ceilometer.polling.manager.Resources object at 0x7fa6103401d0>, 'some_pollsters-hardware.cpu.util': <ceilometer.polling.manager.Resources object at 0x7fa6103404d0>, 'some_pollsters-memory.usage': <ceilometer.polling.manager.Resources object at 0x7fa610339790>, 'some_pollsters-hardware.system_stats.io.outgoing.blocks': <ceilometer.polling.manager.Resources object at 0x7fa610340410>, 'some_pollsters-hardware.memory.buffer': <ceilometer.polling.manager.Resources object at 0x7fa610340150>, 'some_pollsters-hardware.memory.swap.total': <ceilometer.polling.manager.Resources object at 0x7fa610340490>, 'some_pollsters-network.incoming.bytes': <ceilometer.polling.manager.Resources object at 0x7fa610339c10>, 'some_pollsters-cpu': <ceilometer.polling.manager.Resources object at 0x7fa610340190>, 'some_pollsters-disk.device.write.bytes': <ceilometer.polling.manager.Resources object at 0x7fa610340290>, 'some_pollsters-hardware.network.ip.outgoing.datagrams': <ceilometer.polling.manager.Resources object at 0x7fa610340210>, 'some_pollsters-disk.device.read.requests': <ceilometer.polling.manager.Resources object at 0x7fa610339a10>, 'some_pollsters-hardware.memory.swap.avail': <ceilometer.polling.manager.Resources object at 0x7fa610340090>, 'some_pollsters-network.incoming.packets': <ceilometer.polling.manager.Resources object at 0x7fa610339d10>, 'some_pollsters-hardware.system_stats.io.incoming.blocks': <ceilometer.polling.manager.Resources object at 0x7fa610340590>, 'some_pollsters-network.outgoing.bytes': <ceilometer.polling.manager.Resources object at 0x7fa610339a50>, 'some_pollsters-hardware.network.ip.incoming.datagrams': <ceilometer.polling.manager.Resources object at 0x7fa610340350>, 'some_pollsters-hardware.memory.total': <ceilometer.polling.manager.Resources object at 0x7fa610340250>, 'some_pollsters-network.outgoing.packets': <ceilometer.polling.manager.Resources object at 0x7fa6103400d0>, 'some_pollsters-hardware.memory.used': <ceilometer.polling.manager.Resources object at 0x7fa6103403d0>}), '_batch': True}
4.3.3.2 启动周期性任务
代码入口: ceilometer.polling.manager.AgentManager#start_polling_tasks
1 def start_polling_tasks(self):
2 data = self.setup_polling_tasks()
3
4 # Don't start useless threads if no task will run
5 if not data:
6 return
7
8 # One thread per polling tasks is enough
9 self.polling_periodics = periodics.PeriodicWorker.create(
10 [], executor_factory=lambda:
11 futures.ThreadPoolExecutor(max_workers=len(data)))
12
13 for interval, polling_task in data.items():
14
15 @periodics.periodic(spacing=interval, run_immediately=True)
16 def task(running_task):
17 self.interval_task(running_task)
18
19 self.polling_periodics.add(task, polling_task)
20
21 utils.spawn_thread(self.polling_periodics.start, allow_empty=True)
4.3.3.3 周期性采集任务
代码入口:ceilometer.polling.manager.PollingTask#poll_and_notify
1 def poll_and_notify(self):
2 """Polling sample and notify."""
3 cache = {}
4 discovery_cache = {}
5 poll_history = {}
6 for source_name, pollsters in iter_random(
7 self.pollster_matches.items()):
8 for pollster in iter_random(pollsters):
9 key = Resources.key(source_name, pollster)
10 candidate_res = list(
11 self.resources[key].get(discovery_cache))
12 if not candidate_res and pollster.obj.default_discovery:
13 candidate_res = self.manager.discover(
14 [pollster.obj.default_discovery], discovery_cache)
15
16 # Remove duplicated resources and black resources. Using
17 # set() requires well defined __hash__ for each resource.
18 # Since __eq__ is defined, 'not in' is safe here.
19 polling_resources = []
20 black_res = self.resources[key].blacklist
21 history = poll_history.get(pollster.name, [])
22 for x in candidate_res:
23 if x not in history:
24 history.append(x)
25 if x not in black_res:
26 polling_resources.append(x)
27 poll_history[pollster.name] = history
28
29 # If no resources, skip for this pollster
30 if not polling_resources:
31 p_context = 'new ' if history else ''
32 LOG.debug("Skip pollster %(name)s, no %(p_context)s"
33 "resources found this cycle",
34 {'name': pollster.name, 'p_context': p_context})
35 continue
36
37 LOG.info("Polling pollster %(poll)s in the context of "
38 "%(src)s",
39 dict(poll=pollster.name, src=source_name))
40 try:
41 polling_timestamp = timeutils.utcnow().isoformat()
42 samples = pollster.obj.get_samples(
43 manager=self.manager,
44 cache=cache,
45 resources=polling_resources
46 )
47 sample_batch = []
48
49 for sample in samples:
50 # Note(yuywz): Unify the timestamp of polled samples
51 sample.set_timestamp(polling_timestamp)
52 sample_dict = (
53 publisher_utils.meter_message_from_counter(
54 sample, self._telemetry_secret
55 ))
56 if self._batch:
57 sample_batch.append(sample_dict)
58 else:
59 self._send_notification([sample_dict])
60
61 if sample_batch:
62 self._send_notification(sample_batch)
63
64 except plugin_base.PollsterPermanentError as err:
65 LOG.error(
66 'Prevent pollster %(name)s from '
67 'polling %(res_list)s on source %(source)s anymore!',
68 dict(name=pollster.name,
69 res_list=str(err.fail_res_list),
70 source=source_name))
71 self.resources[key].blacklist.extend(err.fail_res_list)
72 except Exception as err:
73 LOG.error(
74 'Continue after error from %(name)s: %(error)s'
75 % ({'name': pollster.name, 'error': err}),
76 exc_info=True)
(Pdb) p data[300].pollster_matches
defaultdict(<type 'set'>, {'some_pollsters': set([<stevedore.extension.Extension object at 0x7fa610689850>, <stevedore.extension.Extension object at 0x7fa610697090>, <stevedore.extension.Extension object at 0x7fa61066c0d0>, <stevedore.extension.Extension object at 0x7fa61066c150>, <stevedore.extension.Extension object at 0x7fa610690190>, <stevedore.extension.Extension object at 0x7fa61066c190>, <stevedore.extension.Extension object at 0x7fa610690050>, <stevedore.extension.Extension object at 0x7fa61066ca50>, <stevedore.extension.Extension object at 0x7fa610690290>, <stevedore.extension.Extension object at 0x7fa61066c2d0>, <stevedore.extension.Extension object at 0x7fa610690890>, <stevedore.extension.Extension object at 0x7fa610733bd0>, <stevedore.extension.Extension object at 0x7fa610667c10>, <stevedore.extension.Extension object at 0x7fa610667490>, <stevedore.extension.Extension object at 0x7fa6106970d0>, <stevedore.extension.Extension object at 0x7fa610689550>, <stevedore.extension.Extension object at 0x7fa610667dd0>, <stevedore.extension.Extension object at 0x7fa610689e50>, <stevedore.extension.Extension object at 0x7fa610667e10>, <stevedore.extension.Extension object at 0x7fa610667e50>, <stevedore.extension.Extension object at 0x7fa610667e90>, <stevedore.extension.Extension object at 0x7fa610683790>])})
可以看到,第6行的source_name 即为polling.yaml 中的每一组的sourcename,pollsters为对应组中meters所对应的extensions对象的集合
至此,Compute Agent整体启动过程,插件调度过程就完成了。下一篇文章再对单个插件的调度过程进行分析说明。