最近在看unix高级环境编程,对服务器环境下,fork处理请求和select处理请求,一直没有直观的认识,于是重读了一下scgi的源代码,以理解其实际应用.scgi对请求的处理是基于进程的,但是,它采用了一个进程池的方式去处理客户端的链接:
第一步:
在你的程序端,初始化:SCGIServer类,开启一个SCGI服务,等待web 服务器(假设是apache)将相应的请求转发过来
SCGIServer类的__init__方法如下:
def __init__(self, handler_class=SCGIHandler, host="", port=DEFAULT_PORT,
max_children=5):
self.handler_class = handler_class
self.host = host
self.port = port
self.max_children = max_children
self.children = []
self.spawn_child()
self.restart = 0
self.handler_class : 来自apache的请求 ,最终都会分配给子进程,由这个类的实例负责处理,并返回给apache
self.host 绑定到scgi server 的socket IP (现在:Apache 是发出请求的client的端,scgi是对请求处理的服务器端)
self.port 绑定的端口号
self.max_children : 进程池里的最大子进程个数
self.children: 保存在进程池里的子进程的详细信息。里面放的是: Child实例。这个类很简单,保存了子进程的pid(进程id),和与子进程进行握手用的fd(文件描述符)
self.spawn_child(): 在实例化SCGIServer时,会调用该方法在进程池里先生成一个子进程。
self.restart: 若系统产生了SIGHUP则状态改变. (注1: 不知道这个SIGHUP什么时候发生?)
self.spawn_child()方法:
def spawn_child(self, conn=None):
parent_fd, child_fd = passfd.socketpair(socket.AF_UNIX,
socket.SOCK_STREAM)
# make child fd non-blocking
flags = fcntl.fcntl(child_fd, fcntl.F_GETFL, 0)
fcntl.fcntl(child_fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
pid = os.fork()
if pid == 0:
if conn:
conn.close() # in the midst of handling a request, close
# the connection in the child
os.close(child_fd)
self.handler_class(parent_fd).serve()
sys.exit(0)
else:
os.close(parent_fd)
self.children.append(Child(pid, child_fd))
1. socketpair在当前进程中生成一对采用socket文件进行通信的文件描述符对,
2. 把留在父进程中和子进程进行通信的child_fd设置为非阻塞
3. fork()调用
4_子: 1.子进程收到conn,但是现在没用,就关掉先。
4_子: 2.因为子进程通过parent_fd和父进程同步,因此关闭不用的child_fd.
4_子: 3.生成一个真正处理apache请求的hander对象,并永远的serve下去
4_子: 4. 若serve方法出错,则退出子进程,进行exit()调用的关闭各fd处理
4_父: 1. 因为父进程通过child_fd和子进程通信,因此关闭不用的parent_fd
4_父: 2.把新产生的子进程的pid及和子进程进行通信用的fd包装称一个Child对象,并放在列表self.children中备用
第二步:
现在SCGIServer对象已经生成,调用该对象的serve方法开始死循环等待apache的请求
def get_listening_socket(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((self.host, self.port))
return s
def serve_on_socket(self, s):
self.socket = s
self.socket.listen(40)
signal.signal(signal.SIGHUP, self.hup_signal)
while 1:
try:
conn, addr = self.socket.accept()
self.delegate_request(conn)
conn.close()
except socket.error, e:
if e[0] != errno.EINTR:
raise # something weird
if self.restart:
self.do_restart()
def serve(self):
self.serve_on_socket(self.get_listening_socket())
这段代码就是socket通信的常规代码,没什么可说的:
生成一个监听socket,对这个socket命名,设置SIGHUP的信号处理函数,在监听socket上等待apache的请求到来,请求建立后,生成一个和apache直连的新的socket连接,把这个新的socket传递给delegate_request方法,委托delegate_request去处理这个真正的请求.关闭这个新的socket(在这个socket真正传递给子进程处理前,conn.close()并不会真正关闭它),继续监听apache传递新的请求
第三步:
委托子进程处理apache的请求:
def delegate_request(self, conn):
"""Pass a request fd to a child process to handle. This method
blocks if all the children are busy and we have reached the
max_children limit."""
# There lots of subtleties here. First, we can't use the write
# status of the pipes to the child since select will return true
# if the buffer is not filled. Instead, each child writes one
# byte of data when it is ready for a request. The normal case
# is that a child is ready for a request. We want that case to
# be fast. Also, we want to pass requests to the same child if
# possible. Finally, we need to gracefully handle children
# dying at any time.
# If no children are ready and we haven't reached max_children
# then we want another child to be started without delay.
timeout = 0
while 1:
fds = [child.fd for child in self.children if not child.closed]
try:
r, w, e = select.select(fds, [], [], timeout)
except select.error, e:
if e[0] == errno.EINTR: # got a signal, try again
continue
raise
if r:
# One or more children look like they are ready. Sort
# the file descriptions so that we keep preferring the
# same child.
child = None
for child in self.children:
if not child.closed and child.fd in r:
break
if child is None:
continue # no child found, should not get here
# Try to read the single byte written by the child.
# This can fail if the child died or the pipe really
# wasn't ready (select returns a hint only). The fd has
# been made non-blocking by spawn_child. If this fails
# we fall through to the "reap_children" logic and will
# retry the select call.
try:
ready_byte = os.read(child.fd, 1)
if not ready_byte:
raise IOError # child died?
assert ready_byte == "1", repr(ready_byte)
except socket.error, exc:
if exc[0] == errno.EWOULDBLOCK:
pass # select was wrong
else:
raise
except (OSError, IOError):
pass # child died?
else:
# The byte was read okay, now we need to pass the fd
# of the request to the child. This can also fail
# if the child died. Again, if this fails we fall
# through to the "reap_children" logic and will
# retry the select call.
try:
passfd.sendfd(child.fd, conn.fileno())
except IOError, exc:
if exc.errno == errno.EPIPE:
pass # broken pipe, child died?
else:
raise
else:
# fd was apparently passed okay to the child.
# The child could die before completing the
# request but that's not our problem anymore.
return
# didn't find any child, check if any died
self.reap_children()
# start more children if we haven't met max_children limit
if len(self.children) < self.max_children:
self.spawn_child(conn)
# Start blocking inside select. We might have reached
# max_children limit and they are all busy.
timeout = 2
再转回第一步的:spawn_child方法,这个方法在子进程中开起了一个处理请求的服务循环,服务循环代码如下:
服务循环开始,就是通过parent_fd向父进程发一个"1"的字符,告诉父进程,我准备好了,然后在 passfd.recvfd(self.parent_fd) 阻塞, 等待父进程把apache请求的socket fd传递过来。
def serve(self):
while 1:
try:
os.write(self.parent_fd, "1") # indicates that child is ready
fd = passfd.recvfd(self.parent_fd)
except (IOError, OSError):
# parent probably exited (EPIPE comes thru as OSError)
raise SystemExit
conn = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
# Make sure the socket is blocking. Apparently, on FreeBSD the
# socket is non-blocking. I think that's an OS bug but I don't
# have the resources to track it down.
conn.setblocking(1)
os.close(fd)
self.handle_connection(conn)
再回到delegate_request:
通过select监控self.children中所有的未关闭的child的状态,若收到某个子进程的可读的状态,则通过保存child_fd把父进程中处理请求的套接字fd传递给该子进程。
然后就不管了,返回。
若中间出现错误着看看是否有子进程挂掉了,清理挂掉的子进程,看看进程池是不是满了,若不满就再spawn_child一下
然后再循环一次,看能不能处理.
不过这也验证了,我的另外一篇文章中,关于quixote的session的问题: SessionManager和每个子进程相关。不能保证每个请求都转向同一个子进程,因此在quixote中要实现session的持久化,以在多个子进程中共享。(或多个服务器中共享)