PyTianQiService天气获取服务项目

芮立果
2023-12-01

PyTianQiService

这是依赖于一个数据源的天气服务,按照约定规范提供给客户端天气数据,且在服务端进行数据的预取与缓存,加速客户端获取天气的响应。

完全采用Python代码实现,线上运行数据是:每秒处理200个查询请求,CPU毫无压力;

网络库netcore是采用自己实现的Epoll事件模型+消息队列+多进程Worker的设计;

数据缓存直接采用Dict数据结构,Pickle序列化的本地;

Worker进程实现周期性预取,处理耗时的压缩、加密的操作。


项目索引:

https://github.com/changshoumeng/PyTianQiService


#!/usr/bin/env python
# -*- coding: utf-8 -*-
##########################################################
#   Teach Wisedom To Machine.
#   Please Call Me Programming devil.
#   Model Name: BaseService
######################################################## #
import MultiProcessWrapper as mpw
from EpollServer import *
import os
import thread
import traceback
import time
import commands

logger = logging.getLogger()
statuslogger = logging.getLogger("statusLogger")
netLogger = logging.getLogger("netLogger")
managerLogger = logging.getLogger("managerLogger")
listen_addr_list = [("0.0.0.0", 8554)]
work_process_count = 5
project_index = 0


class BaseAcceptor(ConnectionBase):
    '''Init connection
    '''

    def __init__(self, client_session_id=-1, client_socket=None, client_addr=()):
        super(BaseAcceptor, self).__init__(client_session_id, client_socket, client_addr)
        self.connection_type = CONNECT_TYPE.IS_ACCEPTOR
        self.connect_status = CONNECT_STATUS.CONNECT_SUCC
        self.max_send_buffer_size = 1024 * 2
        self.max_recv_buffer_size = 1024 * 1024 * 10
        self.max_keeplive_time = 600  # seconds
        self.send_count = 0
        self.task_list = []

    def onDisconnectEvent(self):
        super(BaseAcceptor, self).onDisconnectEvent()

    def onTimerEvent(self, current_time):
        if not super(BaseAcceptor, self).onTimerEvent(current_time):
            return False
        self.keeplive()
        return True

    # 处理收到的数据
    def _process_recv_buffer(self):
        global serviceRunningStatus
        total_bufsize = len(self.recv_buffer)
        has_unpack_bufsize = 0
        while has_unpack_bufsize < total_bufsize:
            (unpack_size, packet_head) = self._unpack_frombuffer(self.recv_buffer[has_unpack_bufsize:])
            if unpack_size == 0:
                break
            if unpack_size < 0:
                return unpack_size
            self._dispatch_packet(packet_head, self.recv_buffer[has_unpack_bufsize:has_unpack_bufsize + unpack_size])
            has_unpack_bufsize += unpack_size
            serviceRunningStatus.recv(unpack_size)
        # else:
        #    print "process all:",has_unpack_bufsize,total_bufsize
        return has_unpack_bufsize

    # how to unpack a packet from buffer
    def _unpack_frombuffer(self, buffer=""):
        raise NotImplementedError()
        return (0, None)

    # packet_data is full packet
    def _dispatch_packet(self, head=None, packet_data=""):
        print ">>_dispatch_packet"
        raise NotImplementedError()

    # how to keep live
    def keeplive(self):
        raise NotImplementedError()


def feedback_consumer(gracefulexit_event, serv):
    managerLogger.debug("feedback_consumer start")
    try:
        processMaxNum = 100
        while not gracefulexit_event.is_stop():
            serv.feedback_once()
        else:
            managerLogger.error("feedback_consumer got parent exit notify")
            return
    except mpw.GracefulExitException:
        managerLogger.error("feedback_consumer got graceful exit exception.")
        return
    except:
        info = sys.exc_info()
        for file, lineno, function, text in traceback.extract_tb(info[2]):
            str_info = "feedback_consumer>{0} line:{1} in function:{2}".format(file, lineno, function)
            managerLogger.critical(str_info)
            str_text = "feedback_consumer>** %s: %s" % info[:2]
            managerLogger.critical(str_text)
        managerLogger.critical("feedback_consumer caught unhandle exception")

    finally:
        managerLogger.critical("feedback_consumer exit")


class HeavyWorker(mpw.SimpleWorker):
    def __init__(self, worker_id=0, serv=None):
        super(HeavyWorker, self).__init__()
        self._worker_id = worker_id
        self._serv = serv
        pass

    def onStart(self, gracefulexit_event):
        self.task_process_count = 0
        pid = os.getpid()
        pid = str(pid)
        if self._worker_id == 0:
            self._serv.start()
            managerLogger.info("tcpservice process start,pid:%s", pid)
            with open("run/tcpservice.pid", "w") as f:
                f.write(pid)
                f.write(" ")
            thread.start_new_thread(feedback_consumer, (gracefulexit_event, self._serv))
            return
        if self._worker_id != 0:
            managerLogger.info("worker_%d process start,pid:%s", self._worker_id, pid)
            with open("run/worker_{0}.pid".format(self._worker_id), "w") as f:
                f.write(pid)
                f.write(" ")
                # exec("from gevent import monkey; monkey.patch_all();import gevent;")
        pass

    def onEnd(self, end_code, end_reason):
        if self._worker_id == 0:
            pid = os.getpid()
            managerLogger.info("tcpservice end at pid:{0} reason:{1}".format(pid, end_reason))
        if end_code < 0:
            print end_reason
            return
        if end_code == 2:
            self.onExit()
        pass

    def onRunOnce(self):
        processSuccNum = 0
        processMaxNum = 1000
        if self._worker_id == 0:
            self._serv.serve_once()
            return
        if self._worker_id != 0:
            self._serv.task_once()
            return

    def onTimer(self):
        pass

    def onExit(self):
        if self._worker_id == 0:
            self._serv.stop()
        pass


class MasterTimer(mpw.TimerInterface):
    def __init__(self):
        pass

    def timeout(self):
        return 2

    def onTimer(self):
        pass


def process_entry(serv):
    global work_process_count
    pid = os.getpid()
    pid = str(pid)
    managerLogger.info("master process start,pid:%s,workercount:%d", pid, work_process_count)
    with open("run/master.pid", "w") as f:
        f.write(pid)
        f.write(" ")
    if work_process_count == 1:
        print "single_process_entry>> begin"
        InterruptableTaskLoop(serv).startAsForver()
        print "single_process_entry>> end"
        return
    managerLogger.debug("###############begin###################")
    worker_list = [HeavyWorker(i, serv) for i in range(work_process_count)]
    p = mpw.MultiProcessWrapper()
    p.startAsForver(worker_list, MasterTimer())
    managerLogger.debug("###############end###################")


def process_exit(service_name='', server={}):
    global work_process_count
    print "******process_exit*****"
    master_pid = Utils.file2str('run/master.pid')
    print "master_pid:", master_pid
    tcpservice_pid = Utils.file2str('run/tcpservice.pid')
    print "tcpservice_pid:", tcpservice_pid
    worker_pids = [master_pid, tcpservice_pid]
    for i in xrange(work_process_count):
        worker_pid_file = 'run/worker_{0}.pid'.format(i)
        if os.path.exists(worker_pid_file):
            worker_pid = Utils.file2str(worker_pid_file)
            worker_pids.append(worker_pid)
            print "worker_pid:", worker_pid
    for pid in worker_pids:
        print "-------------------------------------"
        cmd = 'pidof python'
        results = commands.getoutput(cmd)
        i = 0
        while True:
            print i, cmd, "->", results, " ->", pid
            if (pid not in results) or i >= 5:
                break
            cmd2 = 'kill {0}'.format(pid)
            print i, cmd2, commands.getstatusoutput(cmd2)
            time.sleep(3)
            results = commands.getoutput(cmd)
            i += 1
        print "Close pid:", pid

    ip, port = server["host"], server["port"]
    print "Service should work at:", ip, port
    result = os.popen("netstat -ntlp|grep {0}".format(port)).read()
    if result:
        print "---------------------------"
        print "Notice It:"
        print result
        return
    print "DONE"



 类似资料: