当前位置: 首页 > 工具软件 > lighttpd-cpp > 使用案例 >

lighttpd1.4.49 之三 网络模型

何琨
2023-12-01

 

https://blog.csdn.net/in_han/article/details/9343025

拿到lighttpd的源码就迫不及待的想去掉繁杂的皮肉以窥其简单的网络模型框架。我们平常所写的TCP网络服务程序离不开这样的步骤:新建socket ——》将socket绑定到某个地址——》侦听客户端连接——》accept获取已连接socket——》读写已连接socket。Lighttpd不外如此。

       lighttpd使用的是TCP预先派生子进程,每一个子进程各自accept的服务器设计范式,或者叫watcher-worker模型,关于各种网络程序设计范式在unix网络编程一书中有详细描述。整个程序的入口函数在server.c文件中,在main函数开始部分是各种繁杂的初始化工作,现在暂且略过,直接看到重点代码:

[cpp] view plaincopy

  1. /*当是以root用户运行程序时,调用network_init函数*/  
  2.     if(i_am_root)  
  3.     {     
  4.         …  
  5.         ...  
  6.         /* we need root-perms for port < 1024 */  
  7.         if (0 != network_init(srv)) {  
  8.                 plugins_free(srv);  
  9.                 server_free(srv);  
  10.                 return -1;  
  11.         }  
  12.         …  
  13.         ...  
  14.     }  
 
  1. /*当是以root用户运行程序时,调用network_init函数*/

  2. if(i_am_root)

  3. {

  4. ...

  5. /* we need root-perms for port < 1024 */

  6. if (0 != network_init(srv)) {

  7. plugins_free(srv);

  8. server_free(srv);

  9. return -1;

  10. }

  11. ...

  12. }

 

 

network_init定义在network.c中,起初也是各种初始化工作,最后调用network_server_init,我们假设运行平台是ipv4(代码针对不同socket类型有不同的流程,为了化繁为简,只看ipv4流程),看下network_server_init流程的关键代码:

[cpp] view plaincopy

  1. …  
  2. …  
  3. /*这是在创建socket*/  
  4. if (srv_socket->fd == -1) {  
  5.     srv_socket->addr.plain.sa_family = AF_INET;  
  6.     if (-1 == (srv_socket->fd = socket(srv_socket->addr.plain.sa_family, SOCK_STREAM, IPPROTO_TCP))) {  
  7.         log_error_write(srv, __FILE__, __LINE__, "ss", "socket failed:", strerror(errno));  
  8.         goto error_free_socket;  
  9.     }  
  10. }  
  11. …  
  12. …  
  13. /*这是在初始化socket地址*/  
  14. case AF_INET:  
  15.     memset(&srv_socket->addr, 0, sizeof(struct sockaddr_in));  
  16.     srv_socket->addr.ipv4.sin_family = AF_INET;  
  17.     if (host == NULL) {  
  18.         srv_socket->addr.ipv4.sin_addr.s_addr = htonl(INADDR_ANY);  
  19.     } else {  
  20.         struct hostent *he;  
  21.         if (NULL == (he = gethostbyname(host))) {  
  22.             log_error_write(srv, __FILE__, __LINE__,  
  23.                     "sds", "gethostbyname failed: ",  
  24.                     h_errno, host);  
  25.             goto error_free_socket;  
  26.         }  
  27.   
  28.         if (he->h_addrtype != AF_INET) {  
  29.             log_error_write(srv, __FILE__, __LINE__, "sd", "addr-type != AF_INET: ", he->h_addrtype);  
  30.             goto error_free_socket;  
  31.         }  
  32.   
  33.         if (he->h_length != sizeof(struct in_addr)) {  
  34.             log_error_write(srv, __FILE__, __LINE__, "sd", "addr-length != sizeof(in_addr): ", he->h_length);  
  35.             goto error_free_socket;  
  36.         }  
  37.   
  38.         memcpy(&(srv_socket->addr.ipv4.sin_addr.s_addr), he->h_addr_list[0], he->h_length);  
  39.     }  
  40.     srv_socket->addr.ipv4.sin_port = htons(port);  
  41.   
  42.     addr_len = sizeof(struct sockaddr_in);  
  43.   
  44.     break;  
  45. …  
  46. …  
  47. /*这是在绑定socket地址*/  
  48. if (0 != bind(srv_socket->fd, (struct sockaddr *) &(srv_socket->addr), addr_len)) {  
  49.     switch(srv_socket->addr.plain.sa_family) {  
  50.     case AF_UNIX:  
  51.         log_error_write(srv, __FILE__, __LINE__, "sds",  
  52.                 "can't bind to socket:",  
  53.                 host, strerror(errno));  
  54.         break;  
  55.     default:  
  56.         log_error_write(srv, __FILE__, __LINE__, "ssds",  
  57.                 "can't bind to port:",  
  58.                 host, port, strerror(errno));  
  59.         break;  
  60.     }  
  61.     goto error_free_socket;  
  62. }  
  63. …  
  64. …  
  65. /*这是在侦听*/  
  66.   
  67. if (-1 == listen(srv_socket->fd, 128 * 8)) {  
  68.     log_error_write(srv, __FILE__, __LINE__, "ss", "listen failed: ", strerror(errno));  
  69.     goto error_free_socket;  
  70. }  
  71. …  
  72. …  
 
  1. /*这是在创建socket*/

  2. if (srv_socket->fd == -1) {

  3. srv_socket->addr.plain.sa_family = AF_INET;

  4. if (-1 == (srv_socket->fd = socket(srv_socket->addr.plain.sa_family, SOCK_STREAM, IPPROTO_TCP))) {

  5. log_error_write(srv, __FILE__, __LINE__, "ss", "socket failed:", strerror(errno));

  6. goto error_free_socket;

  7. }

  8. }

  9. /*这是在初始化socket地址*/

  10. case AF_INET:

  11. memset(&srv_socket->addr, 0, sizeof(struct sockaddr_in));

  12. srv_socket->addr.ipv4.sin_family = AF_INET;

  13. if (host == NULL) {

  14. srv_socket->addr.ipv4.sin_addr.s_addr = htonl(INADDR_ANY);

  15. } else {

  16. struct hostent *he;

  17. if (NULL == (he = gethostbyname(host))) {

  18. log_error_write(srv, __FILE__, __LINE__,

  19. "sds", "gethostbyname failed: ",

  20. h_errno, host);

  21. goto error_free_socket;

  22. }

  23.  
  24. if (he->h_addrtype != AF_INET) {

  25. log_error_write(srv, __FILE__, __LINE__, "sd", "addr-type != AF_INET: ", he->h_addrtype);

  26. goto error_free_socket;

  27. }

  28.  
  29. if (he->h_length != sizeof(struct in_addr)) {

  30. log_error_write(srv, __FILE__, __LINE__, "sd", "addr-length != sizeof(in_addr): ", he->h_length);

  31. goto error_free_socket;

  32. }

  33.  
  34. memcpy(&(srv_socket->addr.ipv4.sin_addr.s_addr), he->h_addr_list[0], he->h_length);

  35. }

  36. srv_socket->addr.ipv4.sin_port = htons(port);

  37.  
  38. addr_len = sizeof(struct sockaddr_in);

  39.  
  40. break;

  41. /*这是在绑定socket地址*/

  42. if (0 != bind(srv_socket->fd, (struct sockaddr *) &(srv_socket->addr), addr_len)) {

  43. switch(srv_socket->addr.plain.sa_family) {

  44. case AF_UNIX:

  45. log_error_write(srv, __FILE__, __LINE__, "sds",

  46. "can't bind to socket:",

  47. host, strerror(errno));

  48. break;

  49. default:

  50. log_error_write(srv, __FILE__, __LINE__, "ssds",

  51. "can't bind to port:",

  52. host, port, strerror(errno));

  53. break;

  54. }

  55. goto error_free_socket;

  56. }

  57. /*这是在侦听*/

  58.  
  59. if (-1 == listen(srv_socket->fd, 128 * 8)) {

  60. log_error_write(srv, __FILE__, __LINE__, "ss", "listen failed: ", strerror(errno));

  61. goto error_free_socket;

  62. }

一直到此处,lighttpd走的都是我们熟悉的流程。再回到main函数,来看下main中最重要的部分:

[cpp] view plaincopy

  1. …  
  2. ...  
  3. /*父进程是watcher,fork出许多worker子进程,当子进程个数达到上限时,父进程进入等待*/  
  4. /*直到有子进程退出,父进程在while循环中运行中,一旦跳出while循环程序也结束了*/  
  5. /*子进程fork出老后跳出while,也就是后面代码都是子进程的流程。*/  
  6. /* start watcher and workers */  
  7.     num_childs = srv->srvconf.max_worker;  
  8.     if (num_childs > 0) {  
  9.         int child = 0;  
  10.         while (!child && !srv_shutdown && !graceful_shutdown) {  
  11.             if (num_childs > 0) {  
  12.                 switch (fork()) {  
  13.                 case -1:  
  14.                     return -1;  
  15.                 case 0:  
  16.                     child = 1;  
  17.                     break;  
  18.                 default:  
  19.                     num_childs--;  
  20.                     break;  
  21.                 }  
  22.             } else {  
  23.                 int status;  
  24.   
  25.                 if (-1 != wait(&status)) {  
  26.                     /** 
  27.                      * one of our workers went away 
  28.                      */  
  29.                     num_childs++;  
  30.                 } else {  
  31.                     switch (errno) {  
  32.                     case EINTR:  
  33.                         /** 
  34.                          * if we receive a SIGHUP we have to close our logs ourself as we don't 
  35.                          * have the mainloop who can help us here 
  36.                          */  
  37.                         if (handle_sig_hup) {  
  38.                             handle_sig_hup = 0;  
  39.   
  40.                             log_error_cycle(srv);  
  41.   
  42.                             /** 
  43.                              * forward to all procs in the process-group 
  44.                              * 
  45.                              * we also send it ourself 
  46.                              */             if (!forwarded_sig_hup) {  
  47.                                 forwarded_sig_hup = 1;  
  48.                                 kill(0, SIGHUP);  
  49.                             }  
  50.                         }  
  51.                         break;  
  52.                     default:  
  53.                         break;  
  54.                     }  
  55.                 }  
  56.             }  
  57.         }  
  58.   
  59.         /** 
  60.          * for the parent this is the exit-point 
  61.          */  
  62.         if (!child) {  
  63.             /** 
  64.              * kill all children too 
  65.              */  
  66.             if (graceful_shutdown) {  
  67.                 kill(0, SIGINT);  
  68.             } else if (srv_shutdown) {  
  69.                 kill(0, SIGTERM);  
  70.             }  
  71.   
  72.             log_error_close(srv);  
  73.             network_close(srv);  
  74.             connections_free(srv);  
  75.             plugins_free(srv);  
  76.             server_free(srv);  
  77.             return 0;  
  78.         }  
  79.     }  
  80. …  
  81. …  
 
  1. ...

  2. /*父进程是watcher,fork出许多worker子进程,当子进程个数达到上限时,父进程进入等待*/

  3. /*直到有子进程退出,父进程在while循环中运行中,一旦跳出while循环程序也结束了*/

  4. /*子进程fork出老后跳出while,也就是后面代码都是子进程的流程。*/

  5. /* start watcher and workers */

  6. num_childs = srv->srvconf.max_worker;

  7. if (num_childs > 0) {

  8. int child = 0;

  9. while (!child && !srv_shutdown && !graceful_shutdown) {

  10. if (num_childs > 0) {

  11. switch (fork()) {

  12. case -1:

  13. return -1;

  14. case 0:

  15. child = 1;

  16. break;

  17. default:

  18. num_childs--;

  19. break;

  20. }

  21. } else {

  22. int status;

  23.  
  24. if (-1 != wait(&status)) {

  25. /**

  26. * one of our workers went away

  27. */

  28. num_childs++;

  29. } else {

  30. switch (errno) {

  31. case EINTR:

  32. /**

  33. * if we receive a SIGHUP we have to close our logs ourself as we don't

  34. * have the mainloop who can help us here

  35. */

  36. if (handle_sig_hup) {

  37. handle_sig_hup = 0;

  38.  
  39. log_error_cycle(srv);

  40.  
  41. /**

  42. * forward to all procs in the process-group

  43. *

  44. * we also send it ourself

  45. */ if (!forwarded_sig_hup) {

  46. forwarded_sig_hup = 1;

  47. kill(0, SIGHUP);

  48. }

  49. }

  50. break;

  51. default:

  52. break;

  53. }

  54. }

  55. }

  56. }

  57.  
  58. /**

  59. * for the parent this is the exit-point

  60. */

  61. if (!child) {

  62. /**

  63. * kill all children too

  64. */

  65. if (graceful_shutdown) {

  66. kill(0, SIGINT);

  67. } else if (srv_shutdown) {

  68. kill(0, SIGTERM);

  69. }

  70.  
  71. log_error_close(srv);

  72. network_close(srv);

  73. connections_free(srv);

  74. plugins_free(srv);

  75. server_free(srv);

  76. return 0;

  77. }

  78. }

 

到此,我们知道父进程在固定端口上监听后预先fork了一定数量的子进程,子进程将会做什么呢?按照本文开头描述的应该是accept后读写socket了吧!看接下的代码是否如此:

[cpp] view plaincopy

  1. …  
  2. …  
  3. /*fdevent系统的初始化,fdevent在lighttpd中主要处理各种IO事件,lighttpd采用的*/  
  4. /*是reactor模式,也就是多路复用加非阻塞式IO,而多路复用在各种平台上有差异,fdevent*/  
  5. /*通过OO的方法封装了各个不同实现,以使得代码中可以使用统一的接口*/  
  6. if (NULL == (srv->ev = fdevent_init(srv, srv->max_fds + 1, srv->event_handler))) {  
  7.     log_error_write(srv, __FILE__, __LINE__,  
  8.             "s", "fdevent_init failed");  
  9.     return -1;  
  10. }  
  11. /*注册srv中保存的socket到fdevent中*/  
  12. /* 
  13.  * kqueue() is called here, select resets its internals, 
  14.  * all server sockets get their handlers 
  15.  * 
  16.  * */  
  17. if (0 != network_register_fdevents(srv)) {  
  18.     plugins_free(srv);  
  19.     network_close(srv);  
  20.     server_free(srv);  
  21.   
  22.     return -1;  
  23. }  
  24. …  
  25. …  
 
  1. /*fdevent系统的初始化,fdevent在lighttpd中主要处理各种IO事件,lighttpd采用的*/

  2. /*是reactor模式,也就是多路复用加非阻塞式IO,而多路复用在各种平台上有差异,fdevent*/

  3. /*通过OO的方法封装了各个不同实现,以使得代码中可以使用统一的接口*/

  4. if (NULL == (srv->ev = fdevent_init(srv, srv->max_fds + 1, srv->event_handler))) {

  5. log_error_write(srv, __FILE__, __LINE__,

  6. "s", "fdevent_init failed");

  7. return -1;

  8. }

  9. /*注册srv中保存的socket到fdevent中*/

  10. /*

  11. * kqueue() is called here, select resets its internals,

  12. * all server sockets get their handlers

  13. *

  14. * */

  15. if (0 != network_register_fdevents(srv)) {

  16. plugins_free(srv);

  17. network_close(srv);

  18. server_free(srv);

  19.  
  20. return -1;

  21. }

函数network_register_fdevents在network.c中定义,代码如下:

[cpp] view plaincopy

  1. int network_register_fdevents(server *srv) {  
  2.     size_t i;  
  3.     /*清除fdevent的IO句柄,如同select的FD_ZERO清除fd set*/  
  4.     if (-1 == fdevent_reset(srv->ev)) {  
  5.         return -1;  
  6.     }  
  7.   
  8.     /* register fdevents after reset */  
  9.     for (i = 0; i < srv->srv_sockets.used; i++) {  
  10.         server_socket *srv_socket = srv->srv_sockets.ptr[i];  
  11.         //注册回调函数   
  12.         //一旦srv_socket->fd就绪,则触发函数 network_server_handle_fdevent   
  13.         fdevent_register(srv->ev, srv_socket->fd, network_server_handle_fdevent, srv_socket);  
  14.         //告诉fdevent观察srv_socket->fd,一旦可读,则调用相应回调函数。   
  15.         fdevent_event_set(srv->ev, &(srv_socket->fde_ndx), srv_socket->fd, FDEVENT_IN);  
  16.     }  
  17.     return 0;  
  18. }  
 
  1. int network_register_fdevents(server *srv) {

  2. size_t i;

  3. /*清除fdevent的IO句柄,如同select的FD_ZERO清除fd set*/

  4. if (-1 == fdevent_reset(srv->ev)) {

  5. return -1;

  6. }

  7.  
  8. /* register fdevents after reset */

  9. for (i = 0; i < srv->srv_sockets.used; i++) {

  10. server_socket *srv_socket = srv->srv_sockets.ptr[i];

  11. //注册回调函数

  12. //一旦srv_socket->fd就绪,则触发函数 network_server_handle_fdevent

  13. fdevent_register(srv->ev, srv_socket->fd, network_server_handle_fdevent, srv_socket);

  14. //告诉fdevent观察srv_socket->fd,一旦可读,则调用相应回调函数。

  15. fdevent_event_set(srv->ev, &(srv_socket->fde_ndx), srv_socket->fd, FDEVENT_IN);

  16. }

  17. return 0;

  18. }

 

这里的srv_socket->fd其实就是之前创建的监听套接字,至此,我们假设有一个客户连接请求过来,这时子进程的srv_socket->fd 可读,回调函数network_server_handle_fdevent被调用:

 

[cpp] view plaincopy

  1. static handler_t network_server_handle_fdevent(server *srv, void *context, int revents) {  
  2.     …  
  3.     ...  
  4.     /* accept()s at most 100 connections directly 
  5.      * 
  6.      * we jump out after 100 to give the waiting connections a chance */  
  7.     for (loops = 0; loops < 100 && NULL != (con = connection_accept(srv, srv_socket)); loops++) {  
  8.         handler_t r;  
  9.   
  10.         connection_state_machine(srv, con);  
  11.   
  12.         switch(r = plugins_call_handle_joblist(srv, con)) {  
  13.         case HANDLER_FINISHED:  
  14.         case HANDLER_GO_ON:  
  15.             break;  
  16.         default:  
  17.             log_error_write(srv, __FILE__, __LINE__, "d", r);  
  18.             break;  
  19.         }  
  20.     }  
  21.     return HANDLER_GO_ON;  
  22. }  
  23. connection_accept在connections.c中定义,代码简化为如下:  
  24.     …  
  25.     …  
  26.     //获取已连接套接字   
  27.     if (-1 == (cnt = accept(srv_socket->fd, (struct sockaddr *) &cnt_addr, &cnt_len))) {  
  28.         switch (errno) {  
  29.         case EAGAIN:  
  30. #if EWOULDBLOCK != EAGAIN   
  31.         case EWOULDBLOCK:  
  32. #endif   
  33.         case EINTR:  
  34.             /* we were stopped _before_ we had a connection */  
  35.         case ECONNABORTED: /* this is a FreeBSD thingy */  
  36.             /* we were stopped _after_ we had a connection */  
  37.             break;  
  38.         case EMFILE:  
  39.             /* out of fds */  
  40.             break;  
  41.         default:  
  42.             log_error_write(srv, __FILE__, __LINE__, "ssd", "accept failed:", strerror(errno), errno);  
  43.         }  
  44.         return NULL;  
  45.     }  
  46.     …  
  47.     …  
  48.     con->fd = cnt;  
  49.     con->fde_ndx = -1;  
  50.     //在fdevent中注册已连接socket : con->fd的回调函数connection_handle_fdevent   
  51.     fdevent_register(srv->ev, con->fd, connection_handle_fdevent, con);  
  52.     …  
  53.     …  
  54.     //设置一些属性,比如将con->fd设置为非阻塞的   
  55.     if (-1 == (fdevent_fcntl_set(srv->ev, con->fd))) {  
  56.             log_error_write(srv, __FILE__, __LINE__, "ss", "fcntl failed: ", strerror(errno));  
  57.             return NULL;  
  58.         }  
  59.     …  
  60.     …  
 
  1. static handler_t network_server_handle_fdevent(server *srv, void *context, int revents) {

  2. ...

  3. /* accept()s at most 100 connections directly

  4. *

  5. * we jump out after 100 to give the waiting connections a chance */

  6. for (loops = 0; loops < 100 && NULL != (con = connection_accept(srv, srv_socket)); loops++) {

  7. handler_t r;

  8.  
  9. connection_state_machine(srv, con);

  10.  
  11. switch(r = plugins_call_handle_joblist(srv, con)) {

  12. case HANDLER_FINISHED:

  13. case HANDLER_GO_ON:

  14. break;

  15. default:

  16. log_error_write(srv, __FILE__, __LINE__, "d", r);

  17. break;

  18. }

  19. }

  20. return HANDLER_GO_ON;

  21. }

  22. connection_accept在connections.c中定义,代码简化为如下:

  23. //获取已连接套接字

  24. if (-1 == (cnt = accept(srv_socket->fd, (struct sockaddr *) &cnt_addr, &cnt_len))) {

  25. switch (errno) {

  26. case EAGAIN:

  27. #if EWOULDBLOCK != EAGAIN

  28. case EWOULDBLOCK:

  29. #endif

  30. case EINTR:

  31. /* we were stopped _before_ we had a connection */

  32. case ECONNABORTED: /* this is a FreeBSD thingy */

  33. /* we were stopped _after_ we had a connection */

  34. break;

  35. case EMFILE:

  36. /* out of fds */

  37. break;

  38. default:

  39. log_error_write(srv, __FILE__, __LINE__, "ssd", "accept failed:", strerror(errno), errno);

  40. }

  41. return NULL;

  42. }

  43. con->fd = cnt;

  44. con->fde_ndx = -1;

  45. //在fdevent中注册已连接socket : con->fd的回调函数connection_handle_fdevent

  46. fdevent_register(srv->ev, con->fd, connection_handle_fdevent, con);

  47. //设置一些属性,比如将con->fd设置为非阻塞的

  48. if (-1 == (fdevent_fcntl_set(srv->ev, con->fd))) {

  49. log_error_write(srv, __FILE__, __LINE__, "ss", "fcntl failed: ", strerror(errno));

  50. return NULL;

  51. }

 

 

分析到了这个地方,lighttpd的网络模型框架大致清楚了,正如文首所述,它和所有网络服务器程序一样都要走socket->bind->listen->accept流程,更具体的说,它使用了预先创建子进程,各子进程各自accept的范式,在UNIX网络编程中说这种范式会有accept惊群的问题,即当监听套接字可读,所有accept的子进程都会醒过来,但是只有一个进程获得已连接套接字,所有进程都唤醒是没有必要的,这样影响效率。对于这个问题,lighttpd似乎并没有处理。但是在新的linux内核中已经不存在accept惊群现象了。不过对于多路复用函数如select,epoll仍然存在类似问题,而代码里时常是先调epoll(select),再accept,lighttpd就是如此,因此还是会有新的惊群现象需要处理。如果不是我遗漏了的话,我没有发现lighttpd有相关代码对此进行处理,而nginx却有相关处理。

 

epoll 群惊现象

【遇到问题】

    手头原来有一个单进程的linux epoll服务器程序,近来希望将它改写成多进程版本,主要原因有:

  1. 在服务高峰期间 并发的 网络请求非常海量,目前的单进程版本的程序有点吃不消:单进程时只有一个循环先后处理epoll_wait()到的事件,使得某些不幸排队靠后的socket fd的事件处理不及时(担心有些客户端等不耐烦甚至超时断开);
  2. 希望充分利用到服务器的多颗CPU;

 

    但随着改写工作的深入,便第一次碰到了“惊群”问题,一开始我的程序设想如下:

  1. 主进程先监听端口, listen_fd = socket(...);
  2. 创建epoll,epoll_fd = epoll_create(...);
  3. 然后开始fork(),每个子进程进入大循环,去等待new  accept,epoll_wait(...),处理事件等。

 

    接着就遇到了“惊群”现象:当listen_fd有新的accept()请求过来,操作系统会唤醒所有子进程(因为这些进程都epoll_wait()同一个listen_fd,操作系统又无从判断由谁来负责accept,索性干脆全部叫醒……),但最终只会有一个进程成功accept,其他进程accept失败。外国IT友人认为所有子进程都是被“吓醒”的,所以称之为Thundering Herd(惊群)。

    打个比方,街边有一家麦当劳餐厅,里面有4个服务小窗口,各有一位服务员。当大门口进来一位新客人时,门铃响了,4个服务员于是都抬起头(相当于操作系统唤醒了所有服务进程)希望将客人招呼过去自己所在的服务窗口。但结果可想而知,客人最终会走向其中某一个窗口,而其他3个窗口的服务员只能“失望叹息”(这一声无奈的叹息就相当于accept报EAGAIN错误),埋头继续忙自己的事去。

    这样子“惊群”现象必然造成资源浪费,那有木有好的解决办法呢?

 

 

【寻找办法】

    看了网上N多帖子和网页,阅读多款优秀开源程序的源代码,再结合自己的实验测试,总结如下:

  1.  实际情况中,在发生惊群时,并非全部子进程都会被唤醒,而是一部分子进程被唤醒。但被唤醒的进程仍然只有1个成功accept,其他皆失败。
  2. 所有基于linux epoll机制的服务器程序在多进程时都受惊群问题的困扰,包括 lighttpd 和 nginx 等程序,各家程序的处理办法也不一样。
  3. lighttpd的解决思路:无视惊群。采用Watcher/Workers模式,具体措施有优化fork()与epoll_create()的位置(让每个子进程自己去epoll_create()和epoll_wait()),捕获accept()抛出来的错误并忽视等。这样子一来,当有新accept时仍将有多个lighttpd子进程被唤醒。
  4. nginx的解决思路:避免惊群。具体措施有使用全局互斥锁,每个子进程在epoll_wait()之前先去申请锁,申请到则继续处理,获取不到则等待,并设置了一个负载均衡的算法(当某一个子进程的任务量达到总设置量的7/8时,则不会再尝试去申请锁)来均衡各个进程的任务量。
  5. 一款国内的优秀商业MTA服务器程序(不便透露名称):采用Leader/Followers线程模式,各个线程地位平等,轮流做Leader来响应请求。
  6. 对比lighttpd和nginx两套方案,前者实现方便,逻辑简单,但那部分无谓的进程唤醒带来的资源浪费的代价如何仍待商榷(有网友测试认为这部分开销不大 http://www.iteye.com/topic/382107)。后者逻辑较复杂,引入互斥锁和负载均衡算分也带来了更多的程序开销。所以这两款程序在解决问题的同时,都有其他一部分计算开销,只是哪一个开销更大,未有数据对比。
  7. 坊间也流传Linux 2.6.x之后的内核,就已经解决了accept的惊群问题,论文地址 http://static.usenix.org/event/usenix2000/freenix/full_papers/molloy/molloy.pdf 。
  8. 但其实不然,这篇论文里提到的改进并未能彻底解决实际生产环境中的惊群问题,因为大多数多进程服务器程序都是在fork()之后,再对epoll_wait(listen_fd,...)的事件,这样子当listen_fd有新的accept请求时,进程们还是会被唤醒。论文的改进主要是在内核级别让accept()成为原子操作,避免被多个进程都调用了。

 

 

【采用方案】

    多方考量,最后选择参考lighttpd的Watcher/Workers模型,实现了我需要的那款多进程epoll程序,核心流程如下:

  1. 主进程先监听端口, listen_fd = socket(...); ,setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR,...),setnonblocking(listen_fd),listen(listen_fd,...)。
  2. 开始fork(),到达子进程数上限(建议根据服务器实际的CPU核数来配置)后,主进程变成一个Watcher,只做子进程维护和信号处理等全局性工作。
  3. 每一个子进程(Worker)中,都创建属于自己的epoll,epoll_fd = epoll_create(...);,接着将listen_fd加入epoll_fd中,然后进入大循环,epoll_wait()等待并处理事件。千万注意, epoll_create()这一步一定要在fork()之后
  4. 大胆设想(未实现):每个Worker进程采用多线程方式来提高大循环的socket fd处理速度,必要时考虑加入互斥锁来做同步,但也担心这样子得不偿失(进程+线程频繁切换带来的额外操作系统开销),这一步尚未实现和测试,但看到nginx源码中貌似有此逻辑。

 

 

 

【小结】

    纵观现如今的Linux服务器程序开发(无论是游戏服务器/WebServer服务器/balabala各类应用服务器),epoll可谓大行其道,当红炸子鸡一枚。它也确实是一个好东西,单进程时的事件处理能力就已经大大强于poll/select,难怪Nginx/Lighttpd等生力军程序都那么喜欢它。

    但毕竟只有一个进程的话,晾着服务器的多个CPU实在是罪过,为追求更高的机器利用率更短的请求响应处理时间,还是折腾着搞出了多进程epoll。从新程序在线上服务器上的表现看,效果也确实不错 ,开心。。。

    感谢诸多网友的帖子分享,现在新程序已经上线,小弟也将心得整理成这篇博文,希望能帮到有需要的童鞋。仓促成文,若有错漏恳请指正,也请诸位不吝赐教给建议,灰常感谢!

 类似资料: