PyTianQiService的核心网络服务模块,是一个单线程,基于EPOLL事件循环的TCP通信框架。
---
#!/usr/bin/env python
# -*- coding: utf-8 -*-
##########################################################
# Teach Wisedom To Machine.
# Please Call Me Programming devil.
# Module Name: EpollServer
###################################################### #
from EpollComm import *
from ServiceStatus import *
from Utils import *
logger = logging.getLogger()
statuslogger = logging.getLogger("statusLogger")
netLogger = logging.getLogger("netLogger")
serviceRunningStatus = ServiceRunningStatus()
''' LogAcceptor
'''
############################################################################
class EpollServer(ServerInterface):
'''epoll-loop-based server
'''
# listen_addr_list is list of item as (ip,port)
def __init__(self, listen_addr_list=[]):
self._listen_addr_list = listen_addr_list
self._listen_backlog = 40000
self._epoll_time_out = 1
self._timer_time_out = 10
self._max_session_count = 1024
pass
# ServerInterface.start()
def start(self):
self._is_started = 0
self._epoll_loop = EpollLoop()
self._max_notify_event_count = 0
self._max_socket_fileno = 0
self._max_doaccept_count = 0 # 一次accept通知,可以accept的最大次数
self._listener_dict = {}
for listen_addr in self._listen_addr_list:
if not self._createListenSocket(listen_addr):
return False
self._acceptor_dict = {} # <acceptor_id=xxx.fileno(),acceptor=object()>
self._invalid_acceptorfd_set = set()
self._session_id = 0
self._is_started = 1
self._is_abort_serve_once = 0 # 是否终止serve_once操作
self._last_timestamp = time.time()
return True
# ServerInterface.stop()
def stop(self):
if self._is_started == 0:
return
self._is_started = 0
self._epoll_loop.close()
for listen_socket in self._listener_dict.values():
listen_socket.close()
self._listener_dict.clear()
for acceptor in self._acceptor_dict.values():
acceptor.close()
self._is_abort_serve_once = 1
pass
# ServerInterface.serve_once()
def serve_once(self):
""" Pool the ready event """
# epoll 进行 fd 扫描的地方 -- 未指定超时时间则为阻塞等待
if self._is_abort_serve_once == 1:
netLogger.warning("serve_once _is_abort_serve_once=1 ")
return False
now_timestamp = time.time()
if now_timestamp > self._last_timestamp + self._timer_time_out:
self._last_timestamp = now_timestamp
self._triggerTimerEvent()
epoll_list = self._epoll_loop.poll(self._epoll_time_out)
for fileno, events in epoll_list:
# print_epoll_events(fileno,events)
# 若为监听 fd 被激活
if fileno in self._listener_dict:
self._onFdAcceptable(listen_fd=fileno)
continue
acceptor = self._acceptor_dict.get(fileno, None)
if acceptor is None:
t = "cannot find fd:{0} from _acceptor_dict".format(fileno)
netLogger.warning(t)
continue
if acceptor.isConnectSucc():
self._checkSocketDataEvent(fileno, events)
pass
pass
if len(epoll_list) > 0:
if len(epoll_list) > self._max_notify_event_count:
self._max_notify_event_count = len(epoll_list)
# self._reportStatus()
# 可以延时处理失效的acceptor
self._checkInvalidAcceptors()
return True
#
def getAcceptor(self,acceptor_fd=0,acceptor_sessionid=0):
if acceptor_fd not in self._acceptor_dict:
netLogger.debug("getAcceptor NULL;fd:%d sessionid:%d",acceptor_fd,acceptor_sessionid)
return None
acceptor = self._acceptor_dict[acceptor_fd]
if acceptor.client_session_id != acceptor_sessionid:
netLogger.debug("getAcceptor failed;fd:%d sessionid:%d>currentsessionid:%d", acceptor_fd, acceptor_sessionid,acceptor.client_session_id)
return None
return acceptor
# Public Virtual Method
def onTcpConnectionEnter(self, session_id, client_socket, client_address):
raise NotImplementedError()
# acceptor = _Acceptor(session_id, client_socket, client_address)
# return acceptor
# Public Virtual Method
def onProcessTimerTask(self):
raise NotImplementedError()
# Public Virtual Method
def task_once(self):
raise NotImplementedError()
# Public Virtual Method
def feedback_once(self):
raise NotImplementedError()
def _reportStatus(self):
acceptors = self._acceptor_dict.values()
accept_succ_count = reduce(lambda x, y: x + y.get_intval(), acceptors, 0)
if accept_succ_count > 2:
t = "reportStatus>> acceptors:{0} accept_succ_count:{1} max_events:{2},maxfileno:{3}".format(
len(self._acceptor_dict),
accept_succ_count,
self._max_notify_event_count,
self._max_socket_fileno)
statuslogger.debug(t)
def _assignSessionId(self):
self._session_id += 1
return self._session_id
def _pushInvalidFd(self, client_fd):
self._invalid_acceptorfd_set.add(client_fd)
# 定时事件触发器
def _triggerTimerEvent(self):
for fileno, acceptor in self._acceptor_dict.items():
if not acceptor.onTimerEvent(self._last_timestamp):
self._pushInvalidFd(fileno)
self.onProcessTimerTask()
self._reportStatus()
global serviceRunningStatus
serviceRunningStatus.report()
pass
# 创建listen socket,并且注册到epoll,关注EPOLLIN
def _createListenSocket(self, listen_addr=()):
try:
listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listen_socket.setblocking(False)
listen_socket.bind(listen_addr)
listen_socket.listen(self._listen_backlog)
self._listener_dict[listen_socket.fileno()] = listen_socket
self._epoll_loop.register(listen_socket.fileno(), select.EPOLLIN)
t = "_createListenSocket ok addr:{0} pid:{1}".format(listen_addr, os.getpid())
netLogger.info(t)
return True
except socket.error, e:
t = "_createListenSocket,error,addr:{0} pid:{1} msg:{2}".format(listen_addr, os.getpid(), repr(e))
netLogger.critical(t)
sys.exit(1)
return False
def _checkSocketDataEvent(self, fileno, events):
# 异常事件
if events & (select.EPOLLHUP | select.EPOLLERR):
self._onFdExceptional(client_fd=fileno)
print_epoll_events(fileno, events)
return
# 合法事件
if events & select.EPOLLIN:
# <连接到达;有数据来临;>有 可读 事件激活
self._onFdReadable(client_fd=fileno)
return
if events & select.EPOLLPRI:
# < 外带数据>
netLogger.warning("_checkSocketEvent EPOLLPRI")
self._onFdReadable(client_fd=fileno)
return
if events & select.EPOLLOUT:
# <有数据要写>有 可写 事件激活
self._onFdWritable(client_fd=fileno)
return
# 没有预期的事件
# EPOLLERR 是服务器这边出错
# 对端正常关闭(程序里close(),shell下kill或ctr+c),触发EPOLLIN和EPOLLRDHUP,但是不触发EPOLLERR和EPOLLHUP
# 关于这点,以前一直以为会触发EPOLLERR或者EPOLLHUP。
# man epoll_ctl看下后两个事件的说明,这两个应该是本端(server端)出错才触发的。
# 对端异常断开连接(只测了拔网线),没触发任何事件。
t = "_checkSocketDataEvent <unhandle_event fileno:{0} events:{1}> ".format(fileno, events)
netLogger.critical(t)
pass
# Accept就绪
def _onFdAcceptable(self, listen_fd):
# 进行 accept -- 获得连接上来 client 的 ip 和 port,以及 socket 句柄
listen_socket = self._listener_dict[listen_fd]
accept_count = 0
while True:
try:
client_socket, client_address = listen_socket.accept()
if len(self._acceptor_dict) > self._max_session_count:
t = "_onFdAcceptable Failed;Too Many Sessions;Then Close It"
netLogger.critical(t)
client_socket.close()
return False
session_id = self._assignSessionId()
# print "accept fd:{0} sessionId:{1}".format(client_socket.fileno(),session_id )
# logger.debug("accept connection from %s, %d, fd = %d" % (addr[0], addr[1], conn.fileno()))
# 将连接 socket 设置为 非阻塞
client_socket.setblocking(0)
acceptor = self.onTcpConnectionEnter(session_id, client_socket, client_address)
self._max_socket_fileno = client_socket.fileno()
self._addTcpAcceptor(acceptor.client_socket.fileno(), acceptor)
accept_count += 1
except socket.error, e:
if e.errno == errno.EAGAIN or e.errno == errno.EWOULDBLOCK:
# 在 非阻塞 socket 上进行 recv 需要处理 读穿 的情况
# 这里实际上是利用 读穿 出 异常 的方式跳到这里进行后续处理
# print "_onFdAcceptable EAGAIN or EWOULDBLOCK,acceptor_count:{0}".format(acceptor_count)
if accept_count > self._max_doaccept_count:
self._max_doaccept_count = accept_count
return True
else:
t = "_onFdAcceptable unhandle socket.error:{0}".format(repr(e))
netLogger.critical(t)
return False
# Read就绪
def _onFdReadable(self, client_fd):
# print "_onFdReadable fd:{0}".format(client_fd)
acceptor = self._acceptor_dict.get(client_fd, None)
if acceptor is None:
netLogger.critical("_onFdReadable failed,NULL,client_fd:%d", client_fd)
return
if acceptor.onReadEvent():
pass
# if acceptor.isNeedSend():
# # 更新 epoll 句柄中连接d 注册事件为 可写
# #self._epoll_loop.modify(client_fd, select.EPOLLET|select.EPOLLOUT )
# #print "_onFdReadable modify to epoll_out"
# else:
# print "_onFdReadable not modify"
# pass
else:
# 出错
self._pushInvalidFd(client_fd)
if not acceptor.isClosedByClient():
t = "_onFdReadable error,client_fd:{0}".format(client_fd)
netLogger.critical(t)
# Write就绪
def _onFdWritable(self, client_fd):
# print "_onFdWritable fd:{0}".format(client_fd)
acceptor = self._acceptor_dict.get(client_fd, None)
if acceptor is None:
netLogger.critical("_onFdWritable failed,client_fd:%d", client_fd)
return
if acceptor.onWriteEvent():
# 更新 epoll 句柄中连接 fd 注册事件为 可读
# self._epoll_loop.modify(client_fd, select.EPOLLET | select.EPOLLIN)
pass
else:
# 出错
self._pushInvalidFd(client_fd)
t = "_onFdWritable error,client_fd:{0}".format(client_fd)
netLogger.critical(t)
# Exception通知
def _onFdExceptional(self, client_fd):
acceptor = self._acceptor_dict.get(client_fd, None)
if acceptor is None:
netLogger.critical("_onFdExceptional failed,client_fd:%d", client_fd)
return
# 出错
t = "_onFdExceptional client_fd:{0}".format(client_fd)
netLogger.critical(t)
self._pushInvalidFd(client_fd)
pass
# 检查失效的acceptors
def _checkInvalidAcceptors(self):
if not self._invalid_acceptorfd_set:
return
for client_fd in self._invalid_acceptorfd_set:
acceptor = self._acceptor_dict.get(client_fd, None)
if acceptor is None:
continue
self._delTcpAcceptor(client_fd, acceptor)
acceptor.onDisconnectEvent()
acceptor.close()
self._invalid_acceptorfd_set.clear()
def _addTcpAcceptor(self, acceptor_fd, acceptor):
# 向 epoll 句柄中注册 连接 socket 的 可读 事件
self._epoll_loop.register(acceptor_fd, select.EPOLLET | select.EPOLLOUT | select.EPOLLIN)
self._acceptor_dict[acceptor_fd] = acceptor
t = "_addTcpAcceptor sessionId:{0} fd:{1} addr:{2} conns:{3}".format(
acceptor.client_session_id,
acceptor_fd,
acceptor.client_addr,
len(self._acceptor_dict))
netLogger.debug(t)
def _delTcpAcceptor(self, acceptor_fd, acceptor):
if acceptor_fd in self._acceptor_dict:
self._epoll_loop.unregister(acceptor_fd)
del self._acceptor_dict[acceptor_fd]
t = "_delTcpAcceptor sessionId:{0} fd:{1} addr:{2} conns:{3}".format(
acceptor.client_session_id,
acceptor_fd,
acceptor.client_addr,
len(self._acceptor_dict))
netLogger.debug(t)
return
t = "_delTcpAcceptor nothing"
netLogger.warning(t)
############################################################################
class _Acceptor(ConnectionBase):
def __init__(self, client_session_id=-1, client_socket=None, client_addr=()):
super(_Acceptor, self).__init__(client_session_id, client_socket, client_addr)
self.connection_type = CONNECT_TYPE.IS_ACCEPTOR
self.connect_status = CONNECT_STATUS.CONNECT_SUCC
self.max_send_buffer_size = 1024 * 1024 * 10
self.max_recv_buffer_size = 1024 * 1024 * 10
self.max_keeplive_time = 600000000000
self.send_count = 0
def onDisconnectEvent(self):
if self.connect_status == CONNECT_STATUS.CONNECT_CLOSED:
return
print "_Acceptor::onDisconnectEvent fd:{0} reason:{1}".format(self.client_socket.fileno(), self.connect_status)
def onTimerEvent(self, current_time):
# if not super(_Acceptor,self).onTimerEvent(current_time):
# return False
#
# tm = time.time()
# data = '2'*1024*60 +"2"* 1024 * self.send_count
# self.send_count += 1
# self.sendData(data)
# print tm,"sendbytes:{0} recvbytes:{1}".format(self.send_bytes,self.recv_bytes)
return True
# 处理收到的数据
def _process_recv_buffer(self):
pid = str(os.getpid())
# print '_Acceptor_{0}-recv:'.format(pid)
# self.sendData(pid + "say:" + self.recv_buffer)
return len(self.recv_buffer)
if __name__ == '__main__':
listen_addr_list = [("0.0.0.0", 1234)]
serv = EpollServer(listen_addr_list)
InterruptableTaskLoop(serv, 0).startAsForver()
---