sonic-buildimage的git commit为774778,将在此版本上进行分析,github地址如下:
https://github.com/Azure/sonic-buildimage/tree/77477857b47b114fde18afc33985e1a76c464c09
用到的代码为sairedis(在syncd上容器运行,负责与redis数据库通信以及调用厂家提供的api及sdk,编译后的可执行文件名字为syncd)
saireids地址:Azure/sonic-sairedis at 13474d17435d3876e7bd6b50133d25bb11dd3c54 (github.com)
在sonic中,database容器中运行的是redis数据库,而在syncd容器中使用了hiredis(redis数据库的c接口)与redis进行通信,hiredis的github地址如下:
https://github.com/search?q=hiredis
1.hiredis
2.std::bind
3.条件变量及互斥锁
4.std::tuple
5.std::vector
6.std::queue
在syncd的run函数中,有以下语句:
void Syncd::run()
{
SWSS_LOG_ENTER();
WarmRestartTable warmRestartTable("STATE_DB"); // TODO from config
syncd_restart_type_t shutdownType = SYNCD_RESTART_TYPE_COLD;
volatile bool runMainLoop = true;
std::shared_ptr<swss::Select> s = std::make_shared<swss::Select>();
try
{
onSyncdStart(m_commandLineOptions->m_startType == SAI_START_TYPE_WARM_BOOT);
// create notifications processing thread after we create_switch to
// make sure, we have switch_id translated to VID before we start
// processing possible quick fdb notifications, and pointer for
// notification queue is created before we create switch
m_processor->startNotificationsProcessingThread();
SWSS_LOG_NOTICE("syncd listening for events");
s->addSelectable(m_selectableChannel.get());
s->addSelectable(m_restartQuery.get());
s->addSelectable(m_flexCounter.get());
s->addSelectable(m_flexCounterGroup.get());
SWSS_LOG_NOTICE("starting main loop");
}
我们重点关注m_processor->startNotificationsProcessingThread(),至于m_processor的定义,我们后面再看。
很显然,看函数名字就能知道在syncd中,单独起了一个线程来发布sdk产生的通知。继续跟踪这个函数:
void NotificationProcessor::startNotificationsProcessingThread()
{
SWSS_LOG_ENTER();
m_runThread = true;
m_ntf_process_thread = std::make_shared<std::thread>(&NotificationProcessor::ntf_process_function, this);
}
很显然,在此函数中通过std::thread创建了一个线程来执行NotificationProcessor::ntf_process_function()函数,代码如下:
void NotificationProcessor::ntf_process_function()
{
SWSS_LOG_ENTER();
std::mutex ntf_mutex;
std::unique_lock<std::mutex> ulock(ntf_mutex);
while (m_runThread)
{
m_cv.wait(ulock);
// this is notifications processing thread context, which is different
// from SAI notifications context, we can safe use syncd mutex here,
// processing each notification is under same mutex as processing main
// events, counters and reinit
swss::KeyOpFieldsValuesTuple item;
while (m_notificationQueue->tryDequeue(item))
{
processNotification(item);
}
}
}
m_runThread已经在上一层函数中设置为true,所以会循环执行while中的代码。m_cv为条件变量,与ntf_mutex配合使用写成语句m_cv.wait(ulock),来进行线程间的同步以及防止竞争。
既然条件变量已经处在wait的状态,自然需要唤醒来继续执行。在sonic-sairedis\syncd\NotificationHandler.cpp文件中,有如下函数:
void NotificationHandler::enqueueNotification(
_In_ const std::string& op,
_In_ const std::string& data,
_In_ const std::vector<swss::FieldValueTuple> &entry)
{
SWSS_LOG_ENTER();
SWSS_LOG_INFO("%s %s", op.c_str(), data.c_str());
swss::KeyOpFieldsValuesTuple item(op, data, entry);
if (m_notificationQueue->enqueue(item))
{
m_processor->signal();
}
}
swss::KeyOpFieldsValuesTuple的定义如下:
typedef std::tuple<std::string, std::string,
std::vector<FieldValueTuple> > KeyOpFieldsValuesTuple;
enqueue函数如下,重点关注m_queue.push(item)语句,m_queue是队列类型的变量:
std::queue<swss::KeyOpFieldsValuesTuple> m_queue;
bool NotificationQueue::enqueue(
_In_ const swss::KeyOpFieldsValuesTuple& item)
{
MUTEX;
SWSS_LOG_ENTER();
/*
* If the queue exceeds the limit, then drop all further FDB events This is
* a temporary solution to handle high memory usage by syncd and the
* notification queue keeps growing. The permanent solution would be to
* make this stateful so that only the *latest* event is published.
*/
auto queueSize = m_queue.size();
if (queueSize < m_queueSizeLimit || kfvOp(item) != SAI_SWITCH_NOTIFICATION_NAME_FDB_EVENT) // TODO use enum instead of strings
{
m_queue.push(item);
return true;
}
m_dropCount++;
if (!(m_dropCount % NOTIFICATION_QUEUE_DROP_COUNT_INDICATOR))
{
SWSS_LOG_NOTICE(
"Too many messages in queue (%zu), dropped %zu FDB events!",
queueSize,
m_dropCount);
}
return false;
}
而在sonic-sairedis\syncd\NotificationProcessor.cpp中定义了signal函数:
void NotificationProcessor::signal()
{
SWSS_LOG_ENTER();
m_cv.notify_all();
}
由上可知,mac芯片的驱动会在需要时,将数据组织成队列,调用NotificationHandler::enqueueNotification来发送通知。至于数据的格式以及怎么调用,不在此处展开。
上一步已经将m_queue准备好,而且唤醒了线程,接下来就是发送了。所以回到ntf_process_function函数中:
void NotificationProcessor::ntf_process_function()
{
SWSS_LOG_ENTER();
std::mutex ntf_mutex;
std::unique_lock<std::mutex> ulock(ntf_mutex);
while (m_runThread)
{
m_cv.wait(ulock);
// this is notifications processing thread context, which is different
// from SAI notifications context, we can safe use syncd mutex here,
// processing each notification is under same mutex as processing main
// events, counters and reinit
swss::KeyOpFieldsValuesTuple item;
while (m_notificationQueue->tryDequeue(item))
{
processNotification(item);
}
}
}
看函数名字就知道,接下来就会执行while循环,不断将队列的数据取出,然后进行处理
void NotificationProcessor::processNotification(
_In_ const swss::KeyOpFieldsValuesTuple& item)
{
SWSS_LOG_ENTER();
m_synchronizer(item);
}
至于m_synchronizer的定义则需要回到m_processor的定义处,在class syncd的构造函数中,有以下语句,我将会把NotificationProcessor类的构造函数一起贴出:
m_processor = std::make_shared<NotificationProcessor>
(m_notifications, m_client, std::bind(&Syncd::syncProcessNotification, this, _1));
/* 以下为NotificationProcessor的构造函数 */
NotificationProcessor::NotificationProcessor(
_In_ std::shared_ptr<NotificationProducerBase> producer,
_In_ std::shared_ptr<RedisClient> client,
_In_ std::function<void(const swss::KeyOpFieldsValuesTuple&)> synchronizer):
m_synchronizer(synchronizer),
m_client(client),
m_notifications(producer)
{
SWSS_LOG_ENTER();
m_runThread = false;
m_notificationQueue = std::make_shared<NotificationQueue>();
}
所以m_synchronizer通过std::bind与Syncd::syncProcessNotification绑定
由于之后的代码比较简单且冗余,在此我可以直接告诉大家最后调用的是hiredis发送数据给redis容器完成整个流程,代码如下:
int64_t DBConnector::publish(const string &channel, const string &message)
{
RedisCommand publish;
publish.format("PUBLISH %s %s", channel.c_str(), message.c_str());
RedisReply r(this, publish, REDIS_REPLY_INTEGER);
return r.getReply<long long int>();
}
至于hiredis的操作可以参考以下博文: