这是依赖于一个数据源的天气服务,按照约定规范提供给客户端天气数据,且在服务端进行数据的预取与缓存,加速客户端获取天气的响应。
完全采用Python代码实现,线上运行数据是:每秒处理200个查询请求,CPU毫无压力;
网络库netcore是采用自己实现的Epoll事件模型+消息队列+多进程Worker的设计;
数据缓存直接采用Dict数据结构,Pickle序列化的本地;
Worker进程实现周期性预取,处理耗时的压缩、加密的操作。
项目索引:
https://github.com/changshoumeng/PyTianQiService
#!/usr/bin/env python
# -*- coding: utf-8 -*-
##########################################################
# Teach Wisedom To Machine.
# Please Call Me Programming devil.
# Model Name: BaseService
######################################################## #
import MultiProcessWrapper as mpw
from EpollServer import *
import os
import thread
import traceback
import time
import commands
logger = logging.getLogger()
statuslogger = logging.getLogger("statusLogger")
netLogger = logging.getLogger("netLogger")
managerLogger = logging.getLogger("managerLogger")
listen_addr_list = [("0.0.0.0", 8554)]
work_process_count = 5
project_index = 0
class BaseAcceptor(ConnectionBase):
'''Init connection
'''
def __init__(self, client_session_id=-1, client_socket=None, client_addr=()):
super(BaseAcceptor, self).__init__(client_session_id, client_socket, client_addr)
self.connection_type = CONNECT_TYPE.IS_ACCEPTOR
self.connect_status = CONNECT_STATUS.CONNECT_SUCC
self.max_send_buffer_size = 1024 * 2
self.max_recv_buffer_size = 1024 * 1024 * 10
self.max_keeplive_time = 600 # seconds
self.send_count = 0
self.task_list = []
def onDisconnectEvent(self):
super(BaseAcceptor, self).onDisconnectEvent()
def onTimerEvent(self, current_time):
if not super(BaseAcceptor, self).onTimerEvent(current_time):
return False
self.keeplive()
return True
# 处理收到的数据
def _process_recv_buffer(self):
global serviceRunningStatus
total_bufsize = len(self.recv_buffer)
has_unpack_bufsize = 0
while has_unpack_bufsize < total_bufsize:
(unpack_size, packet_head) = self._unpack_frombuffer(self.recv_buffer[has_unpack_bufsize:])
if unpack_size == 0:
break
if unpack_size < 0:
return unpack_size
self._dispatch_packet(packet_head, self.recv_buffer[has_unpack_bufsize:has_unpack_bufsize + unpack_size])
has_unpack_bufsize += unpack_size
serviceRunningStatus.recv(unpack_size)
# else:
# print "process all:",has_unpack_bufsize,total_bufsize
return has_unpack_bufsize
# how to unpack a packet from buffer
def _unpack_frombuffer(self, buffer=""):
raise NotImplementedError()
return (0, None)
# packet_data is full packet
def _dispatch_packet(self, head=None, packet_data=""):
print ">>_dispatch_packet"
raise NotImplementedError()
# how to keep live
def keeplive(self):
raise NotImplementedError()
def feedback_consumer(gracefulexit_event, serv):
managerLogger.debug("feedback_consumer start")
try:
processMaxNum = 100
while not gracefulexit_event.is_stop():
serv.feedback_once()
else:
managerLogger.error("feedback_consumer got parent exit notify")
return
except mpw.GracefulExitException:
managerLogger.error("feedback_consumer got graceful exit exception.")
return
except:
info = sys.exc_info()
for file, lineno, function, text in traceback.extract_tb(info[2]):
str_info = "feedback_consumer>{0} line:{1} in function:{2}".format(file, lineno, function)
managerLogger.critical(str_info)
str_text = "feedback_consumer>** %s: %s" % info[:2]
managerLogger.critical(str_text)
managerLogger.critical("feedback_consumer caught unhandle exception")
finally:
managerLogger.critical("feedback_consumer exit")
class HeavyWorker(mpw.SimpleWorker):
def __init__(self, worker_id=0, serv=None):
super(HeavyWorker, self).__init__()
self._worker_id = worker_id
self._serv = serv
pass
def onStart(self, gracefulexit_event):
self.task_process_count = 0
pid = os.getpid()
pid = str(pid)
if self._worker_id == 0:
self._serv.start()
managerLogger.info("tcpservice process start,pid:%s", pid)
with open("run/tcpservice.pid", "w") as f:
f.write(pid)
f.write(" ")
thread.start_new_thread(feedback_consumer, (gracefulexit_event, self._serv))
return
if self._worker_id != 0:
managerLogger.info("worker_%d process start,pid:%s", self._worker_id, pid)
with open("run/worker_{0}.pid".format(self._worker_id), "w") as f:
f.write(pid)
f.write(" ")
# exec("from gevent import monkey; monkey.patch_all();import gevent;")
pass
def onEnd(self, end_code, end_reason):
if self._worker_id == 0:
pid = os.getpid()
managerLogger.info("tcpservice end at pid:{0} reason:{1}".format(pid, end_reason))
if end_code < 0:
print end_reason
return
if end_code == 2:
self.onExit()
pass
def onRunOnce(self):
processSuccNum = 0
processMaxNum = 1000
if self._worker_id == 0:
self._serv.serve_once()
return
if self._worker_id != 0:
self._serv.task_once()
return
def onTimer(self):
pass
def onExit(self):
if self._worker_id == 0:
self._serv.stop()
pass
class MasterTimer(mpw.TimerInterface):
def __init__(self):
pass
def timeout(self):
return 2
def onTimer(self):
pass
def process_entry(serv):
global work_process_count
pid = os.getpid()
pid = str(pid)
managerLogger.info("master process start,pid:%s,workercount:%d", pid, work_process_count)
with open("run/master.pid", "w") as f:
f.write(pid)
f.write(" ")
if work_process_count == 1:
print "single_process_entry>> begin"
InterruptableTaskLoop(serv).startAsForver()
print "single_process_entry>> end"
return
managerLogger.debug("###############begin###################")
worker_list = [HeavyWorker(i, serv) for i in range(work_process_count)]
p = mpw.MultiProcessWrapper()
p.startAsForver(worker_list, MasterTimer())
managerLogger.debug("###############end###################")
def process_exit(service_name='', server={}):
global work_process_count
print "******process_exit*****"
master_pid = Utils.file2str('run/master.pid')
print "master_pid:", master_pid
tcpservice_pid = Utils.file2str('run/tcpservice.pid')
print "tcpservice_pid:", tcpservice_pid
worker_pids = [master_pid, tcpservice_pid]
for i in xrange(work_process_count):
worker_pid_file = 'run/worker_{0}.pid'.format(i)
if os.path.exists(worker_pid_file):
worker_pid = Utils.file2str(worker_pid_file)
worker_pids.append(worker_pid)
print "worker_pid:", worker_pid
for pid in worker_pids:
print "-------------------------------------"
cmd = 'pidof python'
results = commands.getoutput(cmd)
i = 0
while True:
print i, cmd, "->", results, " ->", pid
if (pid not in results) or i >= 5:
break
cmd2 = 'kill {0}'.format(pid)
print i, cmd2, commands.getstatusoutput(cmd2)
time.sleep(3)
results = commands.getoutput(cmd)
i += 1
print "Close pid:", pid
ip, port = server["host"], server["port"]
print "Service should work at:", ip, port
result = os.popen("netstat -ntlp|grep {0}".format(port)).read()
if result:
print "---------------------------"
print "Notice It:"
print result
return
print "DONE"