目录
过程二:InetAddress listenAddr(2007)
过程三:EchoServer server(&loop, listenAddr)
muduo的设计真的很巧妙。不少人说这个网络库的源码剖析起来比较容易,但是我感觉里面调用来调用去还是比较复杂的。
本文如果有不对的地方,欢迎朋友们指正。
说明:
下面是 main.cc:
// main.cc
class EchoServer
{
public:
EchoServer(muduo::net::EventLoop* loop,
const muduo::net::InetAddress& listenAddr)
: server_(loop, listenAddr, "EchoServer")
{
server_.setConnectionCallback(
std::bind(&EchoServer::onConnection, this, _1));
server_.setMessageCallback(
std::bind(&EchoServer::onMessage, this, _1, _2, _3));
}
void start()
{
server_.start();
}
private:
void onConnection(const muduo::net::TcpConnectionPtr& conn)
{
}
void onMessage(const muduo::net::TcpConnectionPtr& conn,
muduo::net::Buffer* buf,
muduo::Timestamp time)
{
muduo::string msg(buf->retrieveAllAsString());
conn->send(msg);
conn->shutdown();
}
muduo::net::TcpServer server_;
};
int main()
{
muduo::net::EventLoop loop;
muduo::net::InetAddress listenAddr(2007);
EchoServer server(&loop, listenAddr);
server.start();
loop.loop();
}
执行 main.cc 里的:
EventLoop loop;
这里是构造一个 EventLoop 的对象。那可以调到 EventLoop.cc 里看看具体是怎么构造的。
下面是 EventLoop.cc 里的构造函数:
// EventLoop.cc
EventLoop::EventLoop()
: looping_(false),
quit_(false),
eventHandling_(false),
callingPendingFunctors_(false),
iteration_(0),
threadId_(CurrentThread::tid()),
poller_(Poller::newDefaultPoller(this)),
wakeupFd_(createEventfd()),
wakeupChannel_(new Channel(this, wakeupFd_)),
currentActiveChannel_(NULL)
{
wakeupChannel_->setReadCallback(
std::bind(&EventLoop::handleRead, this));
// we are always reading the wakeupfd
wakeupChannel_->enableReading();
}
成员初始化先到下面的这行:
poller_(Poller::newDefaultPoller(this)),
在 DefaultPoller.cc 的 newDefaultPoller(EventLoop* loop) 函数里,返回的是 EPollPoller 的对象,这个对象使用的是 epoll。当然也可以设置成 PollPoller 对象,这样就会使用 poll。这里我使用的是 epoll。
由于返回的是 EPollPoller 对象,那么就要跳转到 EPollPoller 的构造函数那里。但是 EPollPoller 类是继承 Poller 类而来的,因此先跳转到到 Poller.cc 的:
// Poller.cc
Poller::Poller(EventLoop *loop)
紧接着再跳到 EPollPoller.cc:
// EPollPoller.cc
EPollPoller::EPollPoller(EventLoop* loop)
: Poller(loop),
epollfd_(::epoll_create1(EPOLL_CLOEXEC)),
events_(kInitEventListSize)
{
if (epollfd_ < 0)
{
LOG_SYSFATAL << "EPollPoller::EPollPoller";
}
}
接下来返回到 EventLoop 的构造那里,因为还没有初始化完。由于我主要关注的是函数之间如何跳转,因此就看下一步要跳的地方。
刚刚说返回 EventLoop 构造,具体来到的是:
// EventLoop.cc
wakeupFd_(createEventfd()),
所以跳到 createEventfd() 函数,返回的是用于唤醒子线程的 socket.
下面接着来到:
// EventLoop.cc
wakeupChannel_(new Channel(this, wakeupFd_)),
构造 Channel 对象,跳到 Channel.cc:
// Channel.cc
Channel::Channel(EventLoop* loop, int fd__)
: loop_(loop),
fd_(fd__),
events_(0),
revents_(0),
index_(-1),
tied_(false),
eventHandling_(false),
addedToLoop_(false)
{
}
到此 EventLoop 初始化结束,来到函数体内:
// EventLoop.cc
{
wakeupChannel_->setReadCallback(
std::bind(&EventLoop::handleRead, this));
// we are always reading the wakeupfd
wakeupChannel_->enableReading();
}
先看第一句,这将跳转到:
// Channel.h
void setReadCallback(ReadEventCallback cb)
{ readCallback_ = std::move(cb); }
把 readCallback_ 回调函数设置成:EventLoop::handleRead()。
第二句跳到:
// Channel.h
void enableReading() { events_ |= kReadEvent; update(); }
上面的函数调用 Channel 的update()。Channel 的update()又调用 EventLoop 的 updateChannel()。EventLoop 的 updateChannel() 又调用 EPollPoller 的 updateChannel()。EPollPoller 的 updateChannel() 又调用 EPollPoller 的 update()。EPollPoller 的 update() 这才调用 epoll_ctl。过程有够烦的。因为 Channel 类和 EPollPoller 类都是 EventLoop 类的某个成员的类型(这么说不太适合,就那个意思),因此需要通过 EventLoop 类来传递。
执行 main.cc 里的:
// main.cc
InetAddress listenAddr(2007);
这行绑定端口号,跳到 InetAddress.cc 的构造函数。
执行 mian.cc 的:
// main.cc
EchoServer server(&loop, listenAddr);
跳到:
// main.cc
EchoServer(muduo::net::EventLoop* loop,
const muduo::net::InetAddress& listenAddr)
: server_(loop, listenAddr, "EchoServer")
{
server_.setConnectionCallback(
std::bind(&EchoServer::onConnection, this, _1));
server_.setMessageCallback(
std::bind(&EchoServer::onMessage, this, _1, _2, _3));
}
初始化 server_ 成员变量,跳到 TcpServer.cc:
// TcpServer.cc
TcpServer::TcpServer(EventLoop* loop,
const InetAddress& listenAddr,
const string& nameArg,
Option option)
: loop_(CHECK_NOTNULL(loop)),
ipPort_(listenAddr.toIpPort()),
name_(nameArg),
acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)),
threadPool_(new EventLoopThreadPool(loop, name_)),
connectionCallback_(defaultConnectionCallback),
messageCallback_(defaultMessageCallback),
nextConnId_(1)
{
acceptor_->setNewConnectionCallback(
std::bind(&TcpServer::newConnection, this, _1, _2));
}
由 TcpServer.cc:CheckLoopNotNull() 函数检查传进来的 loop 是不是空指针。
InetAddres.cc::toIpPort() 处理 IP 地址和端口号。
接下来构造 Acceptor 对象,这是在主线程里才有的。
// Acceptor.cc
Acceptor::Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reuseport)
: loop_(loop),
acceptSocket_(sockets::createNonblockingOrDie(listenAddr.family())),
acceptChannel_(loop, acceptSocket_.fd()),
listenning_(false),
idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC))
{
assert(idleFd_ >= 0);
acceptSocket_.setReuseAddr(true);
acceptSocket_.setReusePort(reuseport);
acceptSocket_.bindAddress(listenAddr);
acceptChannel_.setReadCallback(
std::bind(&Acceptor::handleRead, this));
}
createNonblockingOrDie() 把 socket 设置为非阻塞。
下面还有一系列端口、地址的操作。
acceptChannel_.setReadCallback(
std::bind(&Acceptor::handleRead, this));
上面是将 acceptChannel_ 的 readCallback_ 绑定为 Acceptor::handleRead()。有 EPOLLIN 事件发生了,就执行 Acceptor::handleRead()。也就是一旦有客户端连接了服务器,就调用 accept() 函数创建用于和客户端通信的 socket。
接下来回到 TcpServer.cc 的成员初始化列表里,执行:
// TcpServer.cc
threadPool_(new EventLoopThreadPool(loop, name_)),
就跳到 EventLoopThreadPool 的构造函数里,这是个线程池类:
// EventLoopThreadPool.cc
EventLoopThreadPool::EventLoopThreadPool(EventLoop* baseLoop, const string& nameArg)
: baseLoop_(baseLoop),
name_(nameArg),
started_(false),
numThreads_(0),
next_(0)
{
}
构造完后返回 TcpServer 构造函数的函数体:
// TcpServer.cc
{
acceptor_->setNewConnectionCallback(
std::bind(&TcpServer::newConnection, this, _1, _2));
}
调用 Acceptor::setNewConnectionCallback 函数把Acceptor:: newConnectionCallback_ 设置为 TcpServer::newConnection。
执行 main.cc 的:
// amain.cc
server.start();
那么先跳到 TcpServer::start():
// TcpServer.cc
void TcpServer::start()
{
if (started_.getAndSet(1) == 0)
{
threadPool_->start(threadInitCallback_);
assert(!acceptor_->listenning());
loop_->runInLoop(
std::bind(&Acceptor::listen, get_pointer(acceptor_)));
}
}
转到 EventLoopThraedPool::start() 启动线程池:
// EventLoopThreadPool.cc
for (int i = 0; i < numThreads_; ++i)
{
char buf[name_.size() + 32];
snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);
EventLoopThread* t = new EventLoopThread(cb, buf);
threads_.push_back(std::unique_ptr<EventLoopThread>(t));
loops_.push_back(t->startLoop());
}
下面这行构造 EventLoopThread 对象:
// EventLoopThreadPool.cc
EventLoopThread* t = new EventLoopThread(cb, buf);
那么跳转到:
// EventLoopThread.cc
EventLoopThread::EventLoopThread(const ThreadInitCallback& cb,
const string& name)
: loop_(NULL),
exiting_(false),
thread_(std::bind(&EventLoopThread::threadFunc, this), name),
mutex_(),
cond_(mutex_),
callback_(cb)
{
}
上面的初始化列表里初始化 thread_ 成员变量,跳到:
// Thread.cc
Thread::Thread(ThreadFunc func, const string& n)
: started_(false),
joined_(false),
pthreadId_(0),
tid_(0),
func_(std::move(func)),
name_(n),
latch_(1)
{
setDefaultName();
}
Thread::func_ 设置为 EventLoopThread::threadFunc,这就是子线程运行的函数,在 EventLoopThread.cc 里定义。
接下来由 setDefaultName() 设置线程名字。
回到 EventLoopThraedPool::start(),
// EventLoopThreadPool.cc
for (int i = 0; i < numThreads_; ++i)
{
char buf[name_.size() + 32];
snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i);
EventLoopThread* t = new EventLoopThread(cb, buf);
threads_.push_back(std::unique_ptr<EventLoopThread>(t));
loops_.push_back(t->startLoop());
}
现在执行:
loops_.push_back(t->startLoop());
上面这行是执行 EventLoopThread::startLoop() 并且返回子线程的 loop 并放到 vector 里。
下面跳到 EventLoopThread::startLoop():
// EventLoopThread.cc
EventLoop* EventLoopThread::startLoop()
{
assert(!thread_.started());
thread_.start();
EventLoop* loop = NULL;
{
MutexLockGuard lock(mutex_);
while (loop_ == NULL)
{
cond_.wait();
}
loop = loop_;
}
return loop;
}
开始启动线程了。由:
thread_.start();
跳转到:Thread::start()。
再由 Thread::start() 转到 startThread(void* obj)。再由 startThread(void* obj) 转到 runInThread()。再由 runInThread() 执行 func_()。func_ 就是上面说的 EventLoopThread::threadFunc。
---------------------------------------------------------------------------------------------------------------------------------
下面到子线程了。我换种颜色。
下面看看子线程运行什么东西:
// EventLoopThread.cc
void EventLoopThread::threadFunc()
{
EventLoop loop;
if (callback_)
{
callback_(&loop);
}
{
MutexLockGuard lock(mutex_);
loop_ = &loop;
cond_.notify();
}
loop.loop();
//assert(exiting_);
MutexLockGuard lock(mutex_);
loop_ = NULL;
}
EventLoop loop;
上面这行创建属于子线程的 loop,在构造 EventLoop 的时候和主线程的 loop 一样要创建 wakeupFd_、ChannelList 等等。也要在子线程的 poller 里注册监听读事件,update 一连串的调用,最后使用 epoll_ctl。
然后到下面这行:
loop.loop();
转到
// EventLoop.cc
void EventLoop::loop()
{
assert(!looping_);
assertInLoopThread();
looping_ = true;
quit_ = false;
while (!quit_)
{
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
++iteration_;
eventHandling_ = true;
for (Channel* channel : activeChannels_)
{
currentActiveChannel_ = channel;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
eventHandling_ = false;
doPendingFunctors();
}
looping_ = false;
}
进入 while 循环,由:
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
进入 EPollPoller::poll() :
// EPollPoller.cc
Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels)
{
int numEvents = ::epoll_wait(epollfd_,
&*events_.begin(),
static_cast<int>(events_.size()),
timeoutMs);
int savedErrno = errno;
Timestamp now(Timestamp::now());
if (numEvents > 0)
{
fillActiveChannels(numEvents, activeChannels);
if (implicit_cast<size_t>(numEvents) == events_.size())
{
events_.resize(events_.size() * 2);
}
}
else if (numEvents == 0)
{}
else
{
if (savedErrno != EINTR)
{
errno = savedErrno;
}
}
return now;
}
没有事件发生的话就阻塞在 epoll_wait 那里,当然可以设置超时时间。
由于我设置了3个子线程,所以就有3个上面这样的操作。
此时子线程已经在监听主线程有没有发消息了。
当然,主线程也监听子线程有没有发消息。
但是主线程还没有开始监听客户端连接。
---------------------------------------------------------------------------------------------------------------------------------
主线程还在继续。颜色换回来。
回到 TcpServer::start():
// TcpServer.cc
void TcpServer::start()
{
if (started_.getAndSet(1) == 0)
{
threadPool_->start(threadInitCallback_);
assert(!acceptor_->listenning());
loop_->runInLoop(
std::bind(&Acceptor::listen, get_pointer(acceptor_)));
}
}
threadPool_->start(threadInitCallback_);
上面这行整完了,让3个子线程创建起来并且动起来了。
接下来:
loop_->runInLoop(
std::bind(&Acceptor::listen, get_pointer(acceptor_)));
这里绑定 Acceptor::listen() 函数和参数,将对象传递给 EventLoop 的 runInLoop() 函数。
下面跳到 EventLoop::runInLoop():
// EventLoop.cc
void EventLoop::runInLoop(Functor cb)
{
if (isInLoopThread())
{
cb();
}
else
{
queueInLoop(std::move(cb));
}
}
cb 就是 Acceptor::listen()。
于是转到 Acceptor::listen():
// Acceptor.cc
void Acceptor::listen()
{
loop_->assertInLoopThread();
listenning_ = true;
acceptSocket_.listen();
acceptChannel_.enableReading();
}
由 acceptSocket_.listen() 转到 Socket::listen(),再转到 SocketsOps::listenOrDie(),最后执行的就是 listen() 函数。
注意这是第一次调用 listen()。
由 acceptChannel_.enableReading() 转到 Channel::enableReading(),又是一系列的 updata。
注意,到现在为止主线程 enableReading 了两次,一次用于和子线程通信,即唤醒操作,一次用于监听客户端连接。也就是说 socket 有两个。一个用于和子线程通信,因为这个要给子线程发消息,就设置非阻塞。另一个监听客户端,就不用设置非阻塞。
执行 main.cc 的:
// main.cc
loop.loop();
跳转到 EventLoop::loop() :
// EventLoop.cc
void EventLoop::loop()
{
assert(!looping_);
assertInLoopThread();
looping_ = true;
quit_ = false; // FIXME: what if someone calls quit() before loop() ?
while (!quit_)
{
activeChannels_.clear();
pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_);
++iteration_;
eventHandling_ = true;
for (Channel* channel : activeChannels_)
{
currentActiveChannel_ = channel;
currentActiveChannel_->handleEvent(pollReturnTime_);
}
currentActiveChannel_ = NULL;
eventHandling_ = false;
doPendingFunctors();
}
looping_ = false;
}
进入 while 循环。
没有客户端连接的话就阻塞在 epoll_wait 那里,当然可以设置超时时间。
然后主线程就和子线程一样开始不停循环了。直到有事件发生。
注意主线程是第一次进入 EventLoop::loop()。
主线程在过程四就就开始进入 EventLoop::loop() 了。
过程一:构造主线程的 loop 和用于唤醒子线程的 wakeupFd。
过程二:设置端口号。
过程三:构造 TcpServer 、Acceptor 、线程池。
过程四:启动线程池,子线程进入 loop。主线程 listen。
过程五:主线程进入 loop。