ceilometer-notification-agent消息推送机制

梁才
2023-12-01

说明

本文主要分析ceilometer-notification-agent的消息发送以及回调的机制。

ceilometer-agent-notification服务组件实现访问openstack中各个服务推送通知(notification)信息到oslo-messaging消息框架,ceilometer-agent-notification通过访问这个消息队列服务框架,获取相关通知信息,并进一步转化为采样数据的格式。

1      默认主题

ceilometer-notification-agent的默认监控主题定义于:

oslo_messaging/nofity/notifier.py中的

 

_notifier_opts = [
    cfg.ListOpt('notification_topics',
                default=['notifications', ],
                deprecated_name='topics',
                deprecated_group='rpc_notifier2',
                help='AMQP topic used for OpenStack notifications.'),
]

 

2      ceilometer-notification-agent启动监听的流程

notification-agent的启动

其代码路径位于:

ceilometer/notification.py

主要包括初始化一些采集项功能,判断是否支持数据协同,以及初始化监听队列。

本文主要介绍监听队列的初始化,其主要代码为_configure_main_queue_listeners。

def start(self):
    super(NotificationService, self).start()

    self.pipeline_manager = pipeline.setup_pipeline()

    if cfg.CONF.notification.store_events:
        self.event_pipeline_manager = pipeline.setup_event_pipeline()

    self.transport = messaging.get_transport()

    if cfg.CONF.notification.workload_partitioning:
        self.ctxt = context.get_admin_context()
        self.group_id = self.NOTIFICATION_NAMESPACE
        self.partition_coordinator = coordination.PartitionCoordinator()
        self.partition_coordinator.start()
        self.partition_coordinator.join_group(self.group_id)
    else:
        # FIXME(sileht): endpoint uses the notification_topics option
       
# and it should not because this is an oslo_messaging option
        # not a ceilometer. Until we have something to get the
        # notification_topics in another way, we must create a transport
        # to ensure the option has been registered by oslo_messaging.
       
messaging.get_notifier(self.transport, '')
        self.group_id = None

   
self.pipe_manager = self._get_pipe_manager(self.transport,
                                               self.pipeline_manager)
    self.event_pipe_manager = self._get_event_pipeline_manager(
        self.transport)

    self.listeners, self.pipeline_listeners = [], []
    self._configure_main_queue_listeners(self.pipe_manager,
                                         self.event_pipe_manager)

    if cfg.CONF.notification.workload_partitioning:
        self._configure_pipeline_listeners()
        self.partition_coordinator.watch_group(self.group_id,
                                               self._refresh_agent)

        self.tg.add_timer(cfg.CONF.coordination.heartbeat,
                          self.partition_coordinator.heartbeat)
        self.tg.add_timer(cfg.CONF.coordination.check_watchers,
                          self.partition_coordinator.run_watchers)

    if not cfg.CONF.notification.disable_non_metric_meters:
        LOG.warning(_LW('Non-metric meters may be collected. It is highly '
                        'advisable to disable these meters using '
                        'ceilometer.conf or the pipeline.yaml'
))
    # Add a dummy thread to have wait() working
   
self.tg.add_timer(604800, lambda: None)

    self.init_pipeline_refresh()

 

 

代码路径

同上

def _configure_main_queue_listeners(self, pipe_manager,
                                    event_pipe_manager):
    ""
   
"""
    _get_notifications_manager
为获取定义在entry_point

    中的ceilometer.notification的监听器
    """
   
notification_manager = self._get_notifications_manager(pipe_manager)
    """

监听器如下:
    ceilometer.notification =
    instance = ceilometer.compute.notifications.instance:Instance
    instance_scheduled = ceilometer.compute.notifications.instance:InstanceScheduled
    network = ceilometer.network.notifications:Network
    subnet = ceilometer.network.notifications:Subnet
    port = ceilometer.network.notifications:Port
    router = ceilometer.network.notifications:Router
    floatingip = ceilometer.network.notifications:FloatingIP
    """
   
if not
list(notification_manager):
        LOG.warning(_('Failed to load any notification handlers for %s'),
                    self.NOTIFICATION_NAMESPACE)

    ack_on_error = cfg.CONF.notification.ack_on_event_error

    endpoints = []
    if cfg.CONF.notification.store_events:
        endpoints.append(
            event_endpoint.EventsNotificationEndpoint(event_pipe_manager))

    targets = []
    for ext in notification_manager:
        handler = ext.obj
        if (cfg.CONF.notification.disable_non_metric_meters and
               
isinstance(handler, base.NonMetricNotificationBase)):
            continue
       
LOG.debug('Event types from %(name)s: %(type)s'
                  ' (ack_on_error=%(error)s)'
,
                  {'name': ext.name,
                   'type': ', '.join(handler.event_types),
                   'error': ack_on_error})
        # NOTE(gordc): this could be a set check but oslo_messaging issue
        # https://bugs.launchpad.net/oslo.messaging/+bug/1398511
        # This ensures we don't create multiple duplicate consumers.
       
"""
        neutron handler
        def get_targets(conf):
            return [oslo_messaging.Target(topic=topic,
                                          exchange=conf.neutron_control_exchange)
                    for topic in conf.notification_topics]
        """
        """
        get_targets
为获取对应listener中定义的交换队列以及 topic
       """
        for
new_tar in handler.get_targets(cfg.CONF):
            if new_tar not in targets:
                targets.append(new_tar)
        endpoints.append(handler)

    urls = cfg.CONF.notification.messaging_urls or [None]
    for url in urls:
        """
        messaging_urls
格式为 rabbit://guest:guest@localhost:5672/
       """
       
transport = messaging.get_transport(url)
        listener = messaging.get_notification_listener(
            transport, targets, endpoints)
        listener.start()
        """
        
启动监听器
       """
       
self.listeners.append(listener)

 

 

3      各个服务(nova,cinder…)消息的推送

该篇主要以nova为例。

Nova在创建虚拟机的过程中,会将创建过程的状态以信息的形式推送到rabbitmq,由ceilometer将其收集并保存。

具体nova创建虚拟机流程不做分析。

在nova/compute/api.py中有一句代码,该代码即为推送消息到ceilometer的关键信息。

 

# send a state update notification for the initial create to
# show it going from non-existent to BUILDING
notifications.send_update_with_states(context, instance, None,
        vm_states.BUILDING, None, None, service="api")

 

 

其底层调用的方法为:

rpc.get_notifier(service, host).info(context,
                                     'compute.instance.update', payload)

 

 

然后info的方法指向oslo_messaging组件的封装函数。

def info(self, ctxt, event_type, payload):
    """Send a notification at info level.

    :param ctxt: a request context dict
    :type ctxt: dict
    :param event_type: describes the event, for example
                       'compute.create_instance'
    :type event_type: str
    :param payload: the notification payload
    :type payload: dict
    :raises: MessageDeliveryFailure
    """
   
self._notify(ctxt, event_type, payload, 'INFO')

 

查看_notify的功能,主要是将数据封装成指定格式,并发送:

包括message_id,publisher_id,event_type,priority,payload,timestamp

其中priority有固定的值,后面会说到。

def _notify(self, ctxt, event_type, payload, priority, publisher_id=None,
            retry=None):
    payload = self._serializer.serialize_entity(ctxt, payload)
    ctxt = self._serializer.serialize_context(ctxt)

    msg = dict(message_id=six.text_type(uuid.uuid4()),
               publisher_id=publisher_id or self.publisher_id,
               event_type=event_type,
               priority=priority,
               payload=payload,
               timestamp=six.text_type(timeutils.utcnow()))

    def do_notify(ext):
        try:
            ext.obj.notify(ctxt, msg, priority, retry or self.retry)
        except Exception as e:
            _LOG.exception("Problem '%(e)s' attempting to send to "
                           "notification system. Payload=%(payload)s"
,
                           dict(e=e, payload=payload))
    if self._driver_mgr.extensions:
        self._driver_mgr.map(do_notify)

 

4      oslomessaging中的消息回调

用户通过olsomessaging中发送消息后,其回调路径位于

Oslo_messaging/nofity/dispatcher.py

 

def _dispatch(self, ctxt, message, executor_callback=None):
    ctxt = self.serializer.deserialize_context(ctxt)

    publisher_id = message.get('publisher_id')
    event_type = message.get('event_type')
    metadata = {
        'message_id': message.get('message_id'),
        'timestamp': message.get('timestamp')
    }
    priority = message.get('priority', '').lower()
    if priority not in PRIORITIES:
        LOG.warning('Unknown priority "%s"', priority)
        return

   
payload = self.serializer.deserialize_entity(ctxt,
                                                 message.get('payload'))

    for screen, callback in self._callbacks_by_priority.get(priority, []):
        if screen and not screen.match(ctxt, publisher_id, event_type,
                                       metadata, payload):
            continue
       
localcontext._set_local_context(ctxt)
        try:
            if executor_callback:
                ret = executor_callback(callback, ctxt, publisher_id,
                                        event_type, payload, metadata)
            else:
                ret = callback(ctxt, publisher_id, event_type, payload,
                               metadata)
            ret = NotificationResult.HANDLED if ret is None else ret
            if self.allow_requeue and ret == NotificationResult.REQUEUE:
                return ret
        finally:
            localcontext._clear_local_context()
    return NotificationResult.HANDLED

 

 

其中payload是ceilometer中的采样数据。

Priority中的范围有约束,不符合该范围的为错误消息体。所以其发送与接收要使用同一套的消息发送框架oslomessaging,不然会造成数据发送了,但一直没人消费的死循环状态。

PRIORITIES = ['audit', 'debug', 'info', 'warn', 'error', 'critical', 'sample']

 

   最后在会调用调用对应的listener中priority中的方法。

 

即ceilometer中定义在entry_points中的对应listener中的回调方法。

其ceilometer中listener的父类为,可以看到,其定义了priority中的方法包含sample以及info:

代码路径为:ceilometer/agent/plugin_base.py


@six.add_metaclass(abc.ABCMeta)
class NotificationBase(PluginBase):
    """Base class for plugins that support the notification API."""
   
def __init__(self, manager):
        super(NotificationBase, self).__init__()
        # NOTE(gordc): this is filter rule used by oslo.messaging to dispatch
        # messages to an endpoint.
       
if self.event_types:
            self.filter_rule = oslo_messaging.NotificationFilter(
                event_type='|'.join(self.event_types))
        self.manager = manager

    @abc.abstractproperty
   
def event_types(self):
        """Return a sequence of strings.

        Strings are defining the event types to be given to this plugin.
        """

   
@abc.abstractmethod
   
def get_targets(self, conf):
        """Return a sequence of oslo.messaging.Target.

        Sequence is defining the exchange and topics to be connected for this
        plugin.
       
:param conf: Configuration.
        """

   
@abc.abstractmethod
   
def process_notification(self, message):
        pass
   
def info(self, ctxt, publisher_id, event_type, payload, metadata):
       
notification = messaging.convert_to_old_notification_format(
            'info', ctxt, publisher_id, event_type, payload, metadata)
        self.to_samples_and_publish(context.get_admin_context(), notification)

    def sample(self, ctxt, publisher_id, event_type, payload, metadata):
       
notification = messaging.convert_to_old_notification_format(
            'sample', ctxt, publisher_id, event_type, payload, metadata)
        self.to_samples_and_publish(context.get_admin_context(), notification)

    def to_samples_and_publish(self, context, notification):
       
with self.manager.publisher(context) as p:
            p(list(self.process_notification(notification)))

 

  


 类似资料: