说明
本文主要分析ceilometer-notification-agent的消息发送以及回调的机制。
ceilometer-agent-notification服务组件实现访问openstack中各个服务推送通知(notification)信息到oslo-messaging消息框架,ceilometer-agent-notification通过访问这个消息队列服务框架,获取相关通知信息,并进一步转化为采样数据的格式。
ceilometer-notification-agent的默认监控主题定义于:
oslo_messaging/nofity/notifier.py中的
_notifier_opts = [ cfg.ListOpt('notification_topics', default=['notifications', ], deprecated_name='topics', deprecated_group='rpc_notifier2', help='AMQP topic used for OpenStack notifications.'), ] |
notification-agent的启动
其代码路径位于:
ceilometer/notification.py
主要包括初始化一些采集项功能,判断是否支持数据协同,以及初始化监听队列。
本文主要介绍监听队列的初始化,其主要代码为_configure_main_queue_listeners。
def start(self): super(NotificationService, self).start() self.pipeline_manager = pipeline.setup_pipeline() if cfg.CONF.notification.store_events: self.event_pipeline_manager = pipeline.setup_event_pipeline() self.transport = messaging.get_transport() if cfg.CONF.notification.workload_partitioning: self.ctxt = context.get_admin_context() self.group_id = self.NOTIFICATION_NAMESPACE self.partition_coordinator = coordination.PartitionCoordinator() self.partition_coordinator.start() self.partition_coordinator.join_group(self.group_id) else: # FIXME(sileht): endpoint uses the notification_topics option # and it should not because this is an oslo_messaging option # not a ceilometer. Until we have something to get the # notification_topics in another way, we must create a transport # to ensure the option has been registered by oslo_messaging. messaging.get_notifier(self.transport, '') self.group_id = None self.pipe_manager = self._get_pipe_manager(self.transport, self.pipeline_manager) self.event_pipe_manager = self._get_event_pipeline_manager( self.transport) self.listeners, self.pipeline_listeners = [], [] self._configure_main_queue_listeners(self.pipe_manager, self.event_pipe_manager) if cfg.CONF.notification.workload_partitioning: self._configure_pipeline_listeners() self.partition_coordinator.watch_group(self.group_id, self._refresh_agent) self.tg.add_timer(cfg.CONF.coordination.heartbeat, self.partition_coordinator.heartbeat) self.tg.add_timer(cfg.CONF.coordination.check_watchers, self.partition_coordinator.run_watchers) if not cfg.CONF.notification.disable_non_metric_meters: LOG.warning(_LW('Non-metric meters may be collected. It is highly ' 'advisable to disable these meters using ' 'ceilometer.conf or the pipeline.yaml')) # Add a dummy thread to have wait() working self.tg.add_timer(604800, lambda: None) self.init_pipeline_refresh() |
代码路径
同上
def _configure_main_queue_listeners(self, pipe_manager, event_pipe_manager): "" """ _get_notifications_manager 为获取定义在entry_point 中的ceilometer.notification的监听器 """ notification_manager = self._get_notifications_manager(pipe_manager) """ 监听器如下: ceilometer.notification = instance = ceilometer.compute.notifications.instance:Instance instance_scheduled = ceilometer.compute.notifications.instance:InstanceScheduled network = ceilometer.network.notifications:Network subnet = ceilometer.network.notifications:Subnet port = ceilometer.network.notifications:Port router = ceilometer.network.notifications:Router floatingip = ceilometer.network.notifications:FloatingIP """ if not list(notification_manager): LOG.warning(_('Failed to load any notification handlers for %s'), self.NOTIFICATION_NAMESPACE) ack_on_error = cfg.CONF.notification.ack_on_event_error endpoints = [] if cfg.CONF.notification.store_events: endpoints.append( event_endpoint.EventsNotificationEndpoint(event_pipe_manager)) targets = [] for ext in notification_manager: handler = ext.obj if (cfg.CONF.notification.disable_non_metric_meters and isinstance(handler, base.NonMetricNotificationBase)): continue LOG.debug('Event types from %(name)s: %(type)s' ' (ack_on_error=%(error)s)', {'name': ext.name, 'type': ', '.join(handler.event_types), 'error': ack_on_error}) # NOTE(gordc): this could be a set check but oslo_messaging issue # https://bugs.launchpad.net/oslo.messaging/+bug/1398511 # This ensures we don't create multiple duplicate consumers. """ neutron handler def get_targets(conf): return [oslo_messaging.Target(topic=topic, exchange=conf.neutron_control_exchange) for topic in conf.notification_topics] """ """ get_targets 为获取对应listener中定义的交换队列以及 topic """ for new_tar in handler.get_targets(cfg.CONF): if new_tar not in targets: targets.append(new_tar) endpoints.append(handler) urls = cfg.CONF.notification.messaging_urls or [None] for url in urls: """ messaging_urls 格式为 rabbit://guest:guest@localhost:5672/ """ transport = messaging.get_transport(url) listener = messaging.get_notification_listener( transport, targets, endpoints) listener.start() """ 启动监听器 """ self.listeners.append(listener) |
该篇主要以nova为例。
Nova在创建虚拟机的过程中,会将创建过程的状态以信息的形式推送到rabbitmq,由ceilometer将其收集并保存。
具体nova创建虚拟机流程不做分析。
在nova/compute/api.py中有一句代码,该代码即为推送消息到ceilometer的关键信息。
# send a state update notification for the initial create to # show it going from non-existent to BUILDING notifications.send_update_with_states(context, instance, None, vm_states.BUILDING, None, None, service="api") |
其底层调用的方法为:
rpc.get_notifier(service, host).info(context, 'compute.instance.update', payload) |
然后info的方法指向oslo_messaging组件的封装函数。
def info(self, ctxt, event_type, payload): """Send a notification at info level. :param ctxt: a request context dict :type ctxt: dict :param event_type: describes the event, for example 'compute.create_instance' :type event_type: str :param payload: the notification payload :type payload: dict :raises: MessageDeliveryFailure """ self._notify(ctxt, event_type, payload, 'INFO') |
查看_notify的功能,主要是将数据封装成指定格式,并发送:
包括message_id,publisher_id,event_type,priority,payload,timestamp
其中priority有固定的值,后面会说到。
def _notify(self, ctxt, event_type, payload, priority, publisher_id=None, retry=None): payload = self._serializer.serialize_entity(ctxt, payload) ctxt = self._serializer.serialize_context(ctxt) msg = dict(message_id=six.text_type(uuid.uuid4()), publisher_id=publisher_id or self.publisher_id, event_type=event_type, priority=priority, payload=payload, timestamp=six.text_type(timeutils.utcnow())) def do_notify(ext): try: ext.obj.notify(ctxt, msg, priority, retry or self.retry) except Exception as e: _LOG.exception("Problem '%(e)s' attempting to send to " "notification system. Payload=%(payload)s", dict(e=e, payload=payload)) if self._driver_mgr.extensions: self._driver_mgr.map(do_notify) |
用户通过olsomessaging中发送消息后,其回调路径位于
Oslo_messaging/nofity/dispatcher.py
def _dispatch(self, ctxt, message, executor_callback=None): ctxt = self.serializer.deserialize_context(ctxt) publisher_id = message.get('publisher_id') event_type = message.get('event_type') metadata = { 'message_id': message.get('message_id'), 'timestamp': message.get('timestamp') } priority = message.get('priority', '').lower() if priority not in PRIORITIES: LOG.warning('Unknown priority "%s"', priority) return payload = self.serializer.deserialize_entity(ctxt, message.get('payload')) for screen, callback in self._callbacks_by_priority.get(priority, []): if screen and not screen.match(ctxt, publisher_id, event_type, metadata, payload): continue localcontext._set_local_context(ctxt) try: if executor_callback: ret = executor_callback(callback, ctxt, publisher_id, event_type, payload, metadata) else: ret = callback(ctxt, publisher_id, event_type, payload, metadata) ret = NotificationResult.HANDLED if ret is None else ret if self.allow_requeue and ret == NotificationResult.REQUEUE: return ret finally: localcontext._clear_local_context() return NotificationResult.HANDLED |
其中payload是ceilometer中的采样数据。
Priority中的范围有约束,不符合该范围的为错误消息体。所以其发送与接收要使用同一套的消息发送框架oslomessaging,不然会造成数据发送了,但一直没人消费的死循环状态。
PRIORITIES = ['audit', 'debug', 'info', 'warn', 'error', 'critical', 'sample'] |
最后在会调用调用对应的listener中priority中的方法。
即ceilometer中定义在entry_points中的对应listener中的回调方法。
其ceilometer中listener的父类为,可以看到,其定义了priority中的方法包含sample以及info:
代码路径为:ceilometer/agent/plugin_base.py
@six.add_metaclass(abc.ABCMeta) class NotificationBase(PluginBase): """Base class for plugins that support the notification API.""" def __init__(self, manager): super(NotificationBase, self).__init__() # NOTE(gordc): this is filter rule used by oslo.messaging to dispatch # messages to an endpoint. if self.event_types: self.filter_rule = oslo_messaging.NotificationFilter( event_type='|'.join(self.event_types)) self.manager = manager @abc.abstractproperty def event_types(self): """Return a sequence of strings. Strings are defining the event types to be given to this plugin. """ @abc.abstractmethod def get_targets(self, conf): """Return a sequence of oslo.messaging.Target. Sequence is defining the exchange and topics to be connected for this plugin. :param conf: Configuration. """ @abc.abstractmethod def process_notification(self, message): pass def info(self, ctxt, publisher_id, event_type, payload, metadata): notification = messaging.convert_to_old_notification_format( 'info', ctxt, publisher_id, event_type, payload, metadata) self.to_samples_and_publish(context.get_admin_context(), notification) def sample(self, ctxt, publisher_id, event_type, payload, metadata): notification = messaging.convert_to_old_notification_format( 'sample', ctxt, publisher_id, event_type, payload, metadata) self.to_samples_and_publish(context.get_admin_context(), notification) def to_samples_and_publish(self, context, notification): with self.manager.publisher(context) as p: p(list(self.process_notification(notification))) |