PyTianQiService的核心网络服务模块

张可人
2023-12-01

PyTianQiService的核心网络服务模块,是一个单线程,基于EPOLL事件循环的TCP通信框架。


---

#!/usr/bin/env python
# -*- coding: utf-8 -*-
##########################################################
#   Teach Wisedom To Machine.
#   Please Call Me Programming devil.
#   Module Name: EpollServer
###################################################### #

from EpollComm import *
from ServiceStatus import *
from Utils import *

logger = logging.getLogger()
statuslogger = logging.getLogger("statusLogger")
netLogger = logging.getLogger("netLogger")

serviceRunningStatus = ServiceRunningStatus()

''' LogAcceptor
'''


############################################################################
class EpollServer(ServerInterface):
    '''epoll-loop-based server
    '''

    # listen_addr_list is list of item as (ip,port)
    def __init__(self, listen_addr_list=[]):
        self._listen_addr_list = listen_addr_list
        self._listen_backlog = 40000
        self._epoll_time_out = 1
        self._timer_time_out = 10
        self._max_session_count = 1024
        pass

    # ServerInterface.start()
    def start(self):
        self._is_started = 0
        self._epoll_loop = EpollLoop()
        self._max_notify_event_count = 0
        self._max_socket_fileno = 0
        self._max_doaccept_count = 0  # 一次accept通知,可以accept的最大次数

        self._listener_dict = {}
        for listen_addr in self._listen_addr_list:
            if not self._createListenSocket(listen_addr):
                return False

        self._acceptor_dict = {}  # <acceptor_id=xxx.fileno(),acceptor=object()>
        self._invalid_acceptorfd_set = set()
        self._session_id = 0
        self._is_started = 1
        self._is_abort_serve_once = 0  # 是否终止serve_once操作
        self._last_timestamp = time.time()        
        return True

    # ServerInterface.stop()
    def stop(self):
        if self._is_started == 0:
            return
        self._is_started = 0
        self._epoll_loop.close()
        for listen_socket in self._listener_dict.values():
            listen_socket.close()
        self._listener_dict.clear()
        for acceptor in self._acceptor_dict.values():
            acceptor.close()
        self._is_abort_serve_once = 1
        pass

    # ServerInterface.serve_once()
    def serve_once(self):
        """ Pool the ready event """
        # epoll 进行 fd 扫描的地方 -- 未指定超时时间则为阻塞等待
        if self._is_abort_serve_once == 1:
            netLogger.warning("serve_once _is_abort_serve_once=1 ")
            return False
        now_timestamp = time.time()
        if now_timestamp > self._last_timestamp + self._timer_time_out:
            self._last_timestamp = now_timestamp
            self._triggerTimerEvent()

        epoll_list = self._epoll_loop.poll(self._epoll_time_out)
        for fileno, events in epoll_list:
            # print_epoll_events(fileno,events)
            # 若为监听 fd 被激活
            if fileno in self._listener_dict:
                self._onFdAcceptable(listen_fd=fileno)
                continue
            acceptor = self._acceptor_dict.get(fileno, None)
            if acceptor is None:
                t = "cannot find fd:{0} from _acceptor_dict".format(fileno)
                netLogger.warning(t)
                continue
            if acceptor.isConnectSucc():
                self._checkSocketDataEvent(fileno, events)
                pass
            pass

        if len(epoll_list) > 0:
            if len(epoll_list) > self._max_notify_event_count:
                self._max_notify_event_count = len(epoll_list)
                # self._reportStatus()

        # 可以延时处理失效的acceptor
        self._checkInvalidAcceptors()
        return True
     #
    def getAcceptor(self,acceptor_fd=0,acceptor_sessionid=0):
        if acceptor_fd not in self._acceptor_dict:
            netLogger.debug("getAcceptor NULL;fd:%d sessionid:%d",acceptor_fd,acceptor_sessionid)
            return None
        acceptor =  self._acceptor_dict[acceptor_fd]
        if acceptor.client_session_id != acceptor_sessionid:
            netLogger.debug("getAcceptor failed;fd:%d sessionid:%d>currentsessionid:%d", acceptor_fd, acceptor_sessionid,acceptor.client_session_id)
            return None
        return acceptor

    # Public Virtual Method
    def onTcpConnectionEnter(self, session_id, client_socket, client_address):
        raise NotImplementedError()
        # acceptor = _Acceptor(session_id, client_socket, client_address)
        # return acceptor
    # Public Virtual Method
    def onProcessTimerTask(self):
        raise NotImplementedError()
    # Public Virtual Method
    def task_once(self):
        raise NotImplementedError()
    # Public Virtual Method
    def feedback_once(self):
        raise NotImplementedError()


    def _reportStatus(self):
        acceptors = self._acceptor_dict.values()
        accept_succ_count = reduce(lambda x, y: x + y.get_intval(), acceptors, 0)

        if accept_succ_count > 2:
            t = "reportStatus>>  acceptors:{0} accept_succ_count:{1} max_events:{2},maxfileno:{3}".format(
                len(self._acceptor_dict),
                accept_succ_count,
                self._max_notify_event_count,
                self._max_socket_fileno)
            statuslogger.debug(t)

    def _assignSessionId(self):
        self._session_id += 1
        return self._session_id

    def _pushInvalidFd(self, client_fd):
        self._invalid_acceptorfd_set.add(client_fd)

    # 定时事件触发器
    def _triggerTimerEvent(self):
        for fileno, acceptor in self._acceptor_dict.items():
            if not acceptor.onTimerEvent(self._last_timestamp):
                self._pushInvalidFd(fileno)
        self.onProcessTimerTask()
        self._reportStatus()
        global serviceRunningStatus
        serviceRunningStatus.report()
        pass

    # 创建listen socket,并且注册到epoll,关注EPOLLIN
    def _createListenSocket(self, listen_addr=()):
        try:
            listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            listen_socket.setblocking(False)
            listen_socket.bind(listen_addr)
            listen_socket.listen(self._listen_backlog)
            self._listener_dict[listen_socket.fileno()] = listen_socket
            self._epoll_loop.register(listen_socket.fileno(), select.EPOLLIN)
            t = "_createListenSocket ok addr:{0} pid:{1}".format(listen_addr, os.getpid())
            netLogger.info(t)
            return True
        except socket.error, e:
            t = "_createListenSocket,error,addr:{0} pid:{1} msg:{2}".format(listen_addr, os.getpid(), repr(e))
            netLogger.critical(t)
            sys.exit(1)
        return False

    def _checkSocketDataEvent(self, fileno, events):

        # 异常事件
        if events & (select.EPOLLHUP | select.EPOLLERR):
            self._onFdExceptional(client_fd=fileno)
            print_epoll_events(fileno, events)
            return

        # 合法事件
        if events & select.EPOLLIN:
            # <连接到达;有数据来临;>有 可读 事件激活
            self._onFdReadable(client_fd=fileno)
            return
        if events & select.EPOLLPRI:
            # <   外带数据>
            netLogger.warning("_checkSocketEvent EPOLLPRI")
            self._onFdReadable(client_fd=fileno)
            return
        if events & select.EPOLLOUT:
            # <有数据要写>有 可写 事件激活
            self._onFdWritable(client_fd=fileno)
            return

        # 没有预期的事件
        # EPOLLERR 是服务器这边出错
        # 对端正常关闭(程序里close(),shell下kill或ctr+c),触发EPOLLIN和EPOLLRDHUP,但是不触发EPOLLERR和EPOLLHUP
        # 关于这点,以前一直以为会触发EPOLLERR或者EPOLLHUP。
        # man epoll_ctl看下后两个事件的说明,这两个应该是本端(server端)出错才触发的。
        # 对端异常断开连接(只测了拔网线),没触发任何事件。
        t = "_checkSocketDataEvent <unhandle_event fileno:{0} events:{1}> ".format(fileno, events)
        netLogger.critical(t)
        pass

    # Accept就绪
    def _onFdAcceptable(self, listen_fd):
        # 进行 accept -- 获得连接上来 client 的 ip 和 port,以及 socket 句柄
        listen_socket = self._listener_dict[listen_fd]
        accept_count = 0
        while True:
            try:
                client_socket, client_address = listen_socket.accept()
                if len(self._acceptor_dict) > self._max_session_count:
                    t = "_onFdAcceptable Failed;Too Many Sessions;Then Close It"
                    netLogger.critical(t)
                    client_socket.close()
                    return False

                session_id = self._assignSessionId()
                # print "accept fd:{0} sessionId:{1}".format(client_socket.fileno(),session_id )
                # logger.debug("accept connection from %s, %d, fd = %d" % (addr[0], addr[1], conn.fileno()))
                # 将连接 socket 设置为 非阻塞
                client_socket.setblocking(0)
                acceptor = self.onTcpConnectionEnter(session_id, client_socket, client_address)
                self._max_socket_fileno = client_socket.fileno()
                self._addTcpAcceptor(acceptor.client_socket.fileno(), acceptor)
                accept_count += 1
            except socket.error, e:
                if e.errno == errno.EAGAIN or e.errno == errno.EWOULDBLOCK:
                    # 在 非阻塞 socket 上进行 recv 需要处理 读穿 的情况
                    # 这里实际上是利用 读穿 出 异常 的方式跳到这里进行后续处理
                    # print "_onFdAcceptable EAGAIN or EWOULDBLOCK,acceptor_count:{0}".format(acceptor_count)
                    if accept_count > self._max_doaccept_count:
                        self._max_doaccept_count = accept_count
                    return True
                else:
                    t = "_onFdAcceptable unhandle socket.error:{0}".format(repr(e))
                    netLogger.critical(t)
                return False

    # Read就绪
    def _onFdReadable(self, client_fd):
        # print "_onFdReadable fd:{0}".format(client_fd)
        acceptor = self._acceptor_dict.get(client_fd, None)
        if acceptor is None:
            netLogger.critical("_onFdReadable failed,NULL,client_fd:%d", client_fd)
            return
        if acceptor.onReadEvent():
            pass
            # if acceptor.isNeedSend():
            #     # 更新 epoll 句柄中连接d 注册事件为 可写
            #     #self._epoll_loop.modify(client_fd, select.EPOLLET|select.EPOLLOUT )
            #     #print "_onFdReadable modify to epoll_out"
            # else:
            #     print "_onFdReadable not modify"
            #     pass
        else:
            # 出错
            self._pushInvalidFd(client_fd)
            if not acceptor.isClosedByClient():
                t = "_onFdReadable error,client_fd:{0}".format(client_fd)
                netLogger.critical(t)

    # Write就绪
    def _onFdWritable(self, client_fd):
        # print "_onFdWritable fd:{0}".format(client_fd)
        acceptor = self._acceptor_dict.get(client_fd, None)
        if acceptor is None:
            netLogger.critical("_onFdWritable failed,client_fd:%d", client_fd)
            return
        if acceptor.onWriteEvent():
            # 更新 epoll 句柄中连接 fd 注册事件为 可读
            # self._epoll_loop.modify(client_fd, select.EPOLLET | select.EPOLLIN)
            pass
        else:
            # 出错
            self._pushInvalidFd(client_fd)
            t = "_onFdWritable error,client_fd:{0}".format(client_fd)
            netLogger.critical(t)

    # Exception通知
    def _onFdExceptional(self, client_fd):
        acceptor = self._acceptor_dict.get(client_fd, None)
        if acceptor is None:
            netLogger.critical("_onFdExceptional failed,client_fd:%d", client_fd)
            return
        # 出错
        t = "_onFdExceptional client_fd:{0}".format(client_fd)
        netLogger.critical(t)
        self._pushInvalidFd(client_fd)
        pass

    # 检查失效的acceptors
    def _checkInvalidAcceptors(self):
        if not self._invalid_acceptorfd_set:
            return
        for client_fd in self._invalid_acceptorfd_set:
            acceptor = self._acceptor_dict.get(client_fd, None)
            if acceptor is None:
                continue
            self._delTcpAcceptor(client_fd, acceptor)
            acceptor.onDisconnectEvent()
            acceptor.close()

        self._invalid_acceptorfd_set.clear()

    def _addTcpAcceptor(self, acceptor_fd, acceptor):
        # 向 epoll 句柄中注册 连接 socket 的 可读 事件
        self._epoll_loop.register(acceptor_fd, select.EPOLLET | select.EPOLLOUT | select.EPOLLIN)
        self._acceptor_dict[acceptor_fd] = acceptor

        t = "_addTcpAcceptor sessionId:{0} fd:{1} addr:{2} conns:{3}".format(
            acceptor.client_session_id,
            acceptor_fd,
            acceptor.client_addr,
            len(self._acceptor_dict))
        netLogger.debug(t)

    def _delTcpAcceptor(self, acceptor_fd, acceptor):
        if acceptor_fd in self._acceptor_dict:
            self._epoll_loop.unregister(acceptor_fd)
            del self._acceptor_dict[acceptor_fd]
            t = "_delTcpAcceptor sessionId:{0} fd:{1} addr:{2} conns:{3}".format(
                acceptor.client_session_id,
                acceptor_fd,
                acceptor.client_addr,
                len(self._acceptor_dict))
            netLogger.debug(t)
            return
        t = "_delTcpAcceptor nothing"
        netLogger.warning(t)





############################################################################
class _Acceptor(ConnectionBase):
    def __init__(self, client_session_id=-1, client_socket=None, client_addr=()):
        super(_Acceptor, self).__init__(client_session_id, client_socket, client_addr)
        self.connection_type = CONNECT_TYPE.IS_ACCEPTOR
        self.connect_status = CONNECT_STATUS.CONNECT_SUCC
        self.max_send_buffer_size = 1024 * 1024 * 10
        self.max_recv_buffer_size = 1024 * 1024 * 10
        self.max_keeplive_time = 600000000000
        self.send_count = 0

    def onDisconnectEvent(self):
        if self.connect_status == CONNECT_STATUS.CONNECT_CLOSED:
            return
        print "_Acceptor::onDisconnectEvent fd:{0} reason:{1}".format(self.client_socket.fileno(), self.connect_status)

    def onTimerEvent(self, current_time):
        # if not super(_Acceptor,self).onTimerEvent(current_time):
        #     return False
        #
        # tm = time.time()
        # data = '2'*1024*60 +"2"* 1024 * self.send_count
        # self.send_count += 1
        # self.sendData(data)
        # print tm,"sendbytes:{0} recvbytes:{1}".format(self.send_bytes,self.recv_bytes)
        return True

    # 处理收到的数据
    def _process_recv_buffer(self):
        pid = str(os.getpid())
        # print '_Acceptor_{0}-recv:'.format(pid)
        # self.sendData(pid + "say:" + self.recv_buffer)
        return len(self.recv_buffer)


if __name__ == '__main__':
    listen_addr_list = [("0.0.0.0", 1234)]
    serv = EpollServer(listen_addr_list)
    InterruptableTaskLoop(serv, 0).startAsForver()


---

 类似资料: