当前位置: 首页 > 工具软件 > ZLMediaKit > 使用案例 >

ZLMediaKit源码分析 - NotifyCenter

孔欣荣
2023-12-01

简介

ZLTooKit的NoticeCenter实现一个全局的事件订阅分发的松耦合机制,通过单例模式获取全局唯一的NoticeCenter对象进行事件监听(addListener)、发送(emitEvent)等主要接口的调用。

事件监听

addListener调用者将事件(event)、标识(tag)、处理函数(func),addListener函数根据事件类型先从_mapListener找到事件派发器(没有则会创建),然后调用其addListener函数

// 保存事件的无序map
std::unordered_map<std::string, EventDispatcher::Ptr> _mapListener;

template<typename FUNC>
void addListener(void *tag, const std::string &event, FUNC &&func) {
    getDispatcher(event, true)->addListener(tag, std::forward<FUNC>(func));
}

EventDispatcher::Ptr getDispatcher(const std::string &event, bool create = false) {
    std::lock_guard<std::recursive_mutex> lck(_mtxListener);
    //查找事件
    auto it = _mapListener.find(event);
    if (it != _mapListener.end()) {
        //事件存在则直接返回
        return it->second;
    }
    if (create) {
        //如果为空则创建一个
        EventDispatcher::Ptr dispatcher(new EventDispatcher());
        _mapListener.emplace(event, dispatcher);
        return dispatcher;
    }
    return nullptr;
}

EventDispatcher,该函数将标识(tag),处理函数(func)插入自身的成员变量_mapListener,这是一个键值对。存储了多个执行函数

     using MapType = std::unordered_multimap<void *, std::shared_ptr<void> >;
    template<typename FUNC>
    void addListener(void *tag, FUNC &&func) {
        // 类型别名
        using funType = typename function_traits<typename std::remove_reference<FUNC>::type>::stl_function_type;
        // 创建对象
        std::shared_ptr<void> pListener(new funType(std::forward<FUNC>(func)), [](void *ptr) {
            // 析构
            funType *obj = (funType *) ptr;
            delete obj;
        });
        std::lock_guard<std::recursive_mutex> lck(_mtxListener);
        _mapListener.emplace(tag, pListener);
    }

事件分发

emitEvent也是调用getDispatcher,然后找到EventDispatcher执行emitEvent

   template<typename ...ArgsType>
    int emitEvent(const std::string &strEvent, ArgsType &&...args) {
        auto dispatcher = getDispatcher(strEvent);
        if (!dispatcher) {
            //该事件无人监听
            return 0;
        }
        return dispatcher->emitEvent(std::forward<ArgsType>(args)...);
    }

EventDispatcher的emitEvent函数如下

  template<typename ...ArgsType>
    int emitEvent(ArgsType &&...args) {
        using funType = std::function<void(decltype(std::forward<ArgsType>(args))...)>;
        decltype(_mapListener) copy;
        {
            //先拷贝(开销比较小),目的是防止在触发回调时还是上锁状态从而导致交叉互锁
            std::lock_guard<std::recursive_mutex> lck(_mtxListener);
            copy = _mapListener;
        }

        int ret = 0;
        // 遍历
        for (auto &pr : copy) {
            funType *obj = (funType *) (pr.second.get());
            try {
                //执行
                (*obj)(std::forward<ArgsType>(args)...);
                ++ret;
            } catch (InterruptException &) {
                ++ret;
                break;
            }
        }
        return ret;
    }

总结

  1. NoticeCenter实现了全局的事件分发器,我们可以学习其实现应用到我们项目中解耦类似的场景。
  2. 如果处理函数没有做异步处理,那么他归属的线程在触发事件里,就是在emitEvent时的线程,而不是在调用者所在的线程,

完整源码及测试

function_traits.h

#ifndef SRC_UTIL_FUNCTION_TRAITS_H_
#define SRC_UTIL_FUNCTION_TRAITS_H_

#include <tuple>

namespace toolkit {

template<typename T>
struct function_traits;

//普通函数
template<typename Ret, typename... Args>
struct function_traits<Ret(Args...)>
{
public:
    enum { arity = sizeof...(Args) };
    typedef Ret function_type(Args...);
    typedef Ret return_type;
    using stl_function_type = std::function<function_type>;
    typedef Ret(*pointer)(Args...);

    template<size_t I>
    struct args
    {
        static_assert(I < arity, "index is out of range, index must less than sizeof Args");
        using type = typename std::tuple_element<I, std::tuple<Args...> >::type;
    };
};

//函数指针
template<typename Ret, typename... Args>
struct function_traits<Ret(*)(Args...)> : function_traits<Ret(Args...)>{};

//std::function
template <typename Ret, typename... Args>
struct function_traits<std::function<Ret(Args...)>> : function_traits<Ret(Args...)>{};

//member function
#define FUNCTION_TRAITS(...) \
    template <typename ReturnType, typename ClassType, typename... Args>\
    struct function_traits<ReturnType(ClassType::*)(Args...) __VA_ARGS__> : function_traits<ReturnType(Args...)>{}; \

FUNCTION_TRAITS()
FUNCTION_TRAITS(const)
FUNCTION_TRAITS(volatile)
FUNCTION_TRAITS(const volatile)

//函数对象
template<typename Callable>
struct function_traits : function_traits<decltype(&Callable::operator())>{};

} /* namespace toolkit */

#endif /* SRC_UTIL_FUNCTION_TRAITS_H_ */

NoticeCenter.h

/*
 * Copyright (c) 2016 The ZLToolKit project authors. All Rights Reserved.
 *
 * This file is part of ZLToolKit(https://github.com/ZLMediaKit/ZLToolKit).
 *
 * Use of this source code is governed by MIT license that can be found in the
 * LICENSE file in the root of the source tree. All contributing project authors
 * may be found in the AUTHORS file in the root of the source tree.
 */

#ifndef SRC_UTIL_NOTICECENTER_H_
#define SRC_UTIL_NOTICECENTER_H_

#include <mutex>
#include <memory>
#include <string>
#include <exception>
#include <functional>
#include <unordered_map>
#include "function_traits.h"

namespace toolkit {

class EventDispatcher {
public:
    friend class NoticeCenter;
    using Ptr = std::shared_ptr<EventDispatcher>;

    ~EventDispatcher() = default;

private:
    using MapType = std::unordered_multimap<void *, std::shared_ptr<void> >;

    EventDispatcher() = default;

    class InterruptException : public std::runtime_error {
    public:
        InterruptException() : std::runtime_error("InterruptException") {}

        ~InterruptException() {}
    };

    template<typename ...ArgsType>
    int emitEvent(ArgsType &&...args) {
        using funType = std::function<void(decltype(std::forward<ArgsType>(args))...)>;
        decltype(_mapListener) copy;
        {
            //先拷贝(开销比较小),目的是防止在触发回调时还是上锁状态从而导致交叉互锁
            std::lock_guard<std::recursive_mutex> lck(_mtxListener);
            copy = _mapListener;
        }

        int ret = 0;
        for (auto &pr : copy) {
            funType *obj = (funType *) (pr.second.get());
            try {
                (*obj)(std::forward<ArgsType>(args)...);
                ++ret;
            } catch (InterruptException &) {
                ++ret;
                break;
            }
        }
        return ret;
    }

    template<typename FUNC>
    void addListener(void *tag, FUNC &&func) {
        using funType = typename function_traits<typename std::remove_reference<FUNC>::type>::stl_function_type;
        std::shared_ptr<void> pListener(new funType(std::forward<FUNC>(func)), [](void *ptr) {
            funType *obj = (funType *) ptr;
            delete obj;
        });
        std::lock_guard<std::recursive_mutex> lck(_mtxListener);
        _mapListener.emplace(tag, pListener);
    }

    void delListener(void *tag, bool &empty) {
        std::lock_guard<std::recursive_mutex> lck(_mtxListener);
        _mapListener.erase(tag);
        empty = _mapListener.empty();
    }

private:
    std::recursive_mutex _mtxListener;
    MapType _mapListener;
};

class NoticeCenter : public std::enable_shared_from_this<NoticeCenter> {
public:
    using Ptr = std::shared_ptr<NoticeCenter>;

    static NoticeCenter &Instance();

    template<typename ...ArgsType>
    int emitEvent(const std::string &strEvent, ArgsType &&...args) {
        auto dispatcher = getDispatcher(strEvent);
        if (!dispatcher) {
            //该事件无人监听
            return 0;
        }
        return dispatcher->emitEvent(std::forward<ArgsType>(args)...);
    }

    template<typename FUNC>
    void addListener(void *tag, const std::string &event, FUNC &&func) {
        getDispatcher(event, true)->addListener(tag, std::forward<FUNC>(func));
    }

    void delListener(void *tag, const std::string &event) {
        auto dispatcher = getDispatcher(event);
        if (!dispatcher) {
            //不存在该事件
            return;
        }
        bool empty;
        dispatcher->delListener(tag, empty);
        if (empty) {
            delDispatcher(event, dispatcher);
        }
    }

    //这个方法性能比较差
    void delListener(void *tag) {
        std::lock_guard<std::recursive_mutex> lck(_mtxListener);
        bool empty;
        for (auto it = _mapListener.begin(); it != _mapListener.end();) {
            it->second->delListener(tag, empty);
            if (empty) {
                it = _mapListener.erase(it);
                continue;
            }
            ++it;
        }
    }

    void clearAll() {
        std::lock_guard<std::recursive_mutex> lck(_mtxListener);
        _mapListener.clear();
    }

private:
    EventDispatcher::Ptr getDispatcher(const std::string &event, bool create = false) {
        std::lock_guard<std::recursive_mutex> lck(_mtxListener);
        auto it = _mapListener.find(event);
        if (it != _mapListener.end()) {
            return it->second;
        }
        if (create) {
            //如果为空则创建一个
            EventDispatcher::Ptr dispatcher(new EventDispatcher());
            _mapListener.emplace(event, dispatcher);
            return dispatcher;
        }
        return nullptr;
    }

    void delDispatcher(const std::string &event, const EventDispatcher::Ptr &dispatcher) {
        std::lock_guard<std::recursive_mutex> lck(_mtxListener);
        auto it = _mapListener.find(event);
        if (it != _mapListener.end() && dispatcher == it->second) {
            //两者相同则删除
            _mapListener.erase(it);
        }
    }

private:
    std::recursive_mutex _mtxListener;
    std::unordered_map<std::string, EventDispatcher::Ptr> _mapListener;
};

} /* namespace toolkit */
#endif /* SRC_UTIL_NOTICECENTER_H_ */

NoticeCenter.cpp

#include "util.h"
#include "NoticeCenter.h"

namespace toolkit {

INSTANCE_IMP(NoticeCenter) // 利用宏定义实现单例模式

} /* namespace toolkit */


单例宏定义
#define INSTANCE_IMP(class_name, ...) \
class_name &class_name::Instance() { \
    static std::shared_ptr<class_name> s_instance(new class_name(__VA_ARGS__)); \
    static class_name &s_instance_ref = *s_instance; \
    return s_instance_ref; \
}

推荐一个零声学院免费公开课程,个人觉得老师讲得不错,
分享给大家:[Linux,Nginx,ZeroMQ,MySQL,Redis,
fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,
TCP/IP,协程,DPDK等技术内容,点击立即学习]

 类似资料: