在gnocchi的核心思想中,是通过后台的异步处理ceilometer发送过来的采样数据,然后根据存储策略定义的汇聚方式,对数据进行预处理。然后用户获取统计数据的时候,直接获取到对应的已经统计好的数据,以此来提升性能,以及减少存储的采样数据。这边主要分析下gnocchi的异步统计流程。
进程为:
/usr/bin/python2 /usr/bin/gnocchi-metricd --logfile /var/log/gnocchi/metricd.log
启动入口:
源码位于: gnocchi/cli.py 中的metricd中,其异步处理对象为MetricProcessor:
def metricd():
conf = service.prepare_service()
if (conf.storage.metric_reporting_delay <
conf.storage.metric_processing_delay):
LOG.error("Metric reporting must run less frequently then processing")
sys.exit(0)
signal.signal(signal.SIGTERM, _metricd_terminate)
try:
queues = []
workers = []
for worker in range(conf.metricd.workers):
queue = multiprocessing.Queue()
metric_worker = MetricProcessor(
conf, worker, conf.storage.metric_processing_delay, queue)
metric_worker.start()
queues.append(queue)
workers.append(metric_worker)
metric_report = MetricReporting(
conf, 0, conf.storage.metric_reporting_delay, queues)
metric_report.start()
workers.append(metric_report)
for worker in workers:
worker.join()
except KeyboardInterrupt:
_metricd_cleanup(workers)
sys.exit(0)
except Exception:
LOG.warning("exiting", exc_info=True)
_metricd_cleanup(workers)
sys.exit(1)
该类主要是调用存储后端的 process_background_tasks
class MetricProcessor(MetricProcessBase):
def __init__(self, conf, worker_id=0, interval_delay=0, queue=None):
super(MetricProcessor, self).__init__(conf, worker_id, interval_delay)
self.queue = queue
self.block_size = 128
def _run_job(self):
try:
if self.queue:
while not self.queue.empty():
self.block_size = self.queue.get()
LOG.debug("Re-configuring worker to handle up to %s "
"metrics", self.block_size)
self.store.process_background_tasks(self.index, self.block_size)
except Exception:
LOG.error("Unexpected error during measures processing",
exc_info=True)
process_background_tasks代码定义于存储后端的父类驱动中:
Gnocchi/storage/__init__.py中的StorageDriver中的process_background_tasks
def process_background_tasks(self, index, block_size=128, sync=False):
"""Process background tasks for this storage.
This calls :func:`process_measures` to process new measures and
:func:`expunge_metrics` to expunge deleted metrics.
:param index: An indexer to be used for querying metrics
:param block_size: number of metrics to process
:param sync: If True, then process everything synchronously and raise
on error
:type sync: bool
"""
LOG.debug("Processing new and to delete measures")
try:
self.process_measures(index, block_size, sync)
except Exception:
if sync:
raise
LOG.error("Unexpected error during measures processing",
exc_info=True)
LOG.debug("Expunging deleted metrics")
try:
#这个主要是删除被删除的metric信息,并将相关采集数据删除
self.expunge_metrics(index, sync)
except Exception:
if sync:
raise
LOG.error("Unexpected error during deleting metrics",
exc_info=True)
该方法定义于gnocchi/storage/_carbonara.py中
def process_measures(self, indexer, block_size, sync=False):
#获取当前上报的采样metric记录
metrics_to_process = self._list_metric_with_measures_to_process(
block_size, full=sync)
metrics = indexer.list_metrics(ids=metrics_to_process)
# This build the list of deleted metrics, i.e. the metrics we have
# measures to process for but that are not in the indexer anymore.
deleted_metrics_id = (set(map(uuid.UUID, metrics_to_process))
- set(m.id for m in metrics))
for metric_id in deleted_metrics_id:
# NOTE(jd): We need to lock the metric otherwise we might delete
# measures that another worker might be processing. Deleting
# measurement files under its feet is not nice!
with self._lock(metric_id)(blocking=sync):
#删除未处理的并上报的采样数据,主要是metric统计和采样的上报的异步的,存在删除的时候,但是采样值还在没被统计的情况。
self._delete_unprocessed_measures_for_metric_id(metric_id)
for metric in metrics:
lock = self._lock(metric.id)
agg_methods = list(metric.archive_policy.aggregation_methods)
# Do not block if we cannot acquire the lock, that means some other
# worker is doing the job. We'll just ignore this metric and may
# get back later to it if needed.
if lock.acquire(blocking=sync):
try:
LOG.debug("Processing measures for %s" % metric)
# measures 为新增加的采样的数据
#存放的路径为/var/lib/gnocchi/measure/$metric.id
with self._process_measure_for_metric(metric) as measures:
# NOTE(mnaser): The metric could have been handled by
# another worker, ignore if no measures.
if len(measures) == 0:
LOG.debug("Skipping %s (already processed)"
% metric)
continue
try:
with timeutils.StopWatch() as sw:
#file存储路径为/var/lib/gnocchi/$metric.id/none
raw_measures = (
self._get_unaggregated_timeserie(
metric)
)
LOG.debug(
"Retrieve unaggregated measures "
"for %s in %.2fs"
% (metric.id, sw.elapsed()))
except storage.MetricDoesNotExist:
try:
self._create_metric(metric)
except storage.MetricAlreadyExists:
# Created in the mean time, do not worry
pass
ts = None
else:
try:
ts = carbonara.BoundTimeSerie.unserialize(
raw_measures)
except ValueError:
ts = None
LOG.error(
"Data corruption detected for %s "
"unaggregated timeserie, "
"recreating an empty one."
% metric.id)
if ts is None:
# This is the first time we treat measures for this
# metric, or data are corrupted, create a new one
mbs = metric.archive_policy.max_block_size
ts = carbonara.BoundTimeSerie(
block_size=mbs,
back_window=metric.archive_policy.back_window)
def _map_add_measures(bound_timeserie):
self._map_in_thread(
self._add_measures,
((aggregation, d, metric, bound_timeserie)
for aggregation in agg_methods
for d in metric.archive_policy.definition))
with timeutils.StopWatch() as sw:
ts.set_values(
measures,
before_truncate_callback=_map_add_measures,
ignore_too_old_timestamps=True)
LOG.debug(
"Computed new metric %s with %d new measures "
"in %.2f seconds"
% (metric.id, len(measures), sw.elapsed()))
self._store_unaggregated_timeserie(metric,
ts.serialize())
except Exception:
if sync:
raise
LOG.error("Error processing new measures", exc_info=True)
finally:
lock.release()
该方法定义在对应的存储后端代码中,本文以file为例:
Gnocchi/gnocchi/storage/file.py 中的,
#获取当前上报的采样metric记录
def _list_metric_with_measures_to_process(self, block_size, full=False):
if full:
return os.listdir(self.measure_path)
return os.listdir(self.measure_path)[
block_size * self.partition:block_size * (self.partition + 1)]
该源码位于Gnocchi/gnocchi/storage/file.py 中的,
#删除未处理的并上报的采样数据
def _delete_unprocessed_measures_for_metric_id(self, metric_id):
files = self._list_measures_container_for_metric_id(metric_id)
self._delete_measures_files_for_metric_id(metric_id, files)
该源码位于Gnocchi/gnocchi/storage/file.py 中的,
#获取保存在measure/metric_id中的采样信息,处理后删除
@contextlib.contextmanager
def _process_measure_for_metric(self, metric):
files = self._list_measures_container_for_metric_id(metric.id)
measures = []
for f in files:
abspath = self._build_measure_path(metric.id, f)
with open(abspath, "rb") as e:
measures.extend(self._unserialize_measures(e.read()))
yield measures
self._delete_measures_files_for_metric_id(metric.id, files)
该源码位于Gnocchi/gnocchi/storage/file.py 中的,
#获取保存的未统计过的采样数据
#file存储路径为/var/lib/gnocchi/$metric.id/none
def _get_unaggregated_timeserie(self, metric):
path = self._build_unaggregated_timeserie_path(metric)
try:
with open(path, 'rb') as f:
return f.read()
except IOError as e:
if e.errno == errno.ENOENT:
raise storage.MetricDoesNotExist(metric)
raise
def _get_unaggregated_timeserie(self, metric):
path = self._build_unaggregated_timeserie_path(metric)
try:
with open(path, 'rb') as f:
return f.read()
except IOError as e:
if e.errno == errno.ENOENT:
raise storage.MetricDoesNotExist(metric)
raise
def _build_unaggregated_timeserie_path(self, metric):
return os.path.join(self._build_metric_dir(metric), 'none')
def _store_unaggregated_timeserie(self, metric, data):
self._atomic_file_store(
self._build_unaggregated_timeserie_path(metric),
data)
#根据metric生成对应agg数据的存储路径:
def _create_metric(self, metric):
path = self._build_metric_dir(metric)
try:
os.mkdir(path, 0o750)
except OSError as e:
if e.errno == errno.EEXIST:
raise storage.MetricAlreadyExists(metric)
raise
for agg in metric.archive_policy.aggregation_methods:
try:
os.mkdir(self._build_metric_path(metric, agg), 0o750)
except OSError as e:
if e.errno != errno.EEXIST:
raise
进行数据的汇聚处理
该源码位于gnocchi/carbonara.py中的 set_values
# 调用panna库,进行数据的异步处理功能
def set_values(self, values, before_truncate_callback=None,
ignore_too_old_timestamps=False):
if self.block_size is not None and not self.ts.empty:
values = sorted(values, key=operator.itemgetter(0))
first_block_timestamp = self._first_block_timestamp()
if ignore_too_old_timestamps:
for index, (timestamp, value) in enumerate(values):
if timestamp >= first_block_timestamp:
values = values[index:]
break
else:
values = []
else:
# Check that the smallest timestamp does not go too much back
# in time.
smallest_timestamp = values[0][0]
if smallest_timestamp < first_block_timestamp:
raise NoDeloreanAvailable(first_block_timestamp,
smallest_timestamp)
super(BoundTimeSerie, self).set_values(values)
if before_truncate_callback:
before_truncate_callback(self)
self._truncate()
代码位于 gnocchi/storage/_carbonara.py中
#这里就是进行最终的异步统计过程,使用第三方统计工具pandas,并根据archive policy定义的时间戳,保留最长的记录。
#并删除超过时间戳的数据
def _map_add_measures(bound_timeserie):
self._map_in_thread(
self._add_measures,
((aggregation, d, metric, bound_timeserie)
for aggregation in agg_methods
for d in metric.archive_policy.definition))
def _add_measures(self, aggregation, archive_policy_def,
metric, timeserie):
with timeutils.StopWatch() as sw:
ts = self._get_measures_timeserie(metric, aggregation,
archive_policy_def.granularity,
timeserie.first, timeserie.last)
LOG.debug("Retrieve measures"
"for %s/%s/%s in %.2fs"
% (metric.id, aggregation, archive_policy_def.
granularity, sw.elapsed()))
ts.update(timeserie)
with timeutils.StopWatch() as sw:
for key, split in ts.split():
self._store_metric_measures(metric, key, aggregation,
archive_policy_def.granularity,
split.serialize())
LOG.debug("Store measures for %s/%s/%s in %.2fs"
% (metric.id, aggregation,
archive_policy_def.granularity, sw.elapsed()))
if ts.last and archive_policy_def.timespan:
with timeutils.StopWatch() as sw:
oldest_point_to_keep = ts.last - datetime.timedelta(
seconds=archive_policy_def.timespan)
self._delete_metric_measures_before(
metric, aggregation, archive_policy_def.granularity,
oldest_point_to_keep)
LOG.debug("Expire measures for %s/%s/%s in %.2fs"
% (metric.id, aggregation,
archive_policy_def.granularity, sw.elapsed()))