python使用eventlet(协程)启动多个daemon process(守护进程)

宋鸿
2023-12-01

这里我们需要解决的问题是:我们有多个服务,且每个服务(server)都是守护进程(daemon process),我们希望一次启动我们所有的服务,即使用eventlet创建我们的daemon processes

1.首先创建我们的守护进程

先创建我们的守护进程类,通过fork()函数实现.
* 关于我们为啥要使用signal.signal(signal.SIGCHLD,signal.SIG_IGN),请参考:
http://blog.csdn.net/u010571844/article/details/50419798.
* 第一次fork()后父进程使用的是sys.exit(0),会产生SystemExit的异常,我们不希望父进程退出,因为我们还希望使用父进程创建其他守护进程,我们可以catch Systemxit 这个异常。
* 第二次fork()后我们使用的是os._exit(0),父进程直接退出。这里为了避免父进程产生僵尸进程,使用了signal.signal(signal.SIGCHLD,signal.SIG_IGN)函数,使父进程的父进程不在关心他的子进程,子进程结束,由系统内核回收。僵尸进程参考:http://blog.csdn.net/u010571844/article/details/50419518
* sys.exit(0)与os._exit(0) 参考:http://blog.csdn.net/u010571844/article/details/50419261

# daemon.py
import sys
import time
import  os
import atexit
import signal
import logging

log = logging.getLogger(__name__)

class Daemon:
    """
        A generic daemon class.
        Usage: subclass the Daemon class and override the run() method
    """
    startmsg = "RPC server started with pid %s"
    """
       A generic daemon class.

       Usage: subclass the Daemon class and override the run() method
       """

    def __init__(self, pidfile, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'):
        self.stdin = stdin
        self.stdout = stdout
        self.stderr = stderr
        self.pidfile = pidfile
        log.info("create daemon pidfile at:%s" % self.pidfile)

    def daemonize(self):
        """
        do the UNIX double-fork magic, see Stevens' "Advanced
        Programming in the UNIX Environment" for details (ISBN 0201563177)
        http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16
        """
    #避免产生僵尸进程
        signal.signal(signal.SIGCHLD,signal.SIG_IGN)
        try:
            pid = os.fork()
            if pid > 0:
                # exit first parent
                log.info("1. fork 1# ----  ppid:%s" % str(os.getpid()))
                sys.exit(0) #该退出方式会产生SystemExit异常,可以捕获,做一些情场工作,不捕获该进程将会退出
        except OSError, e:
            log.error("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
            sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror))
            sys.exit(1)

        # decouple from parent environment
        log.info("2. fork 1# ----   pid:%s" % str(os.getpid()))
        os.chdir("/")
        os.setsid()
        os.umask(0)

        # do second fork
        try:
            pid = os.fork()
            if pid > 0:
                log.info("3. fork 2# ----   ppid:%s" % str(os.getpid()))
                os._exit(0) #直接退出进程,不会抛异常
        except OSError, e:
            log.error("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
            sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror))
            sys.exit(1)

        #redirect standard file descriptors
        log.info("4. fork 2# ----   pid:%s" % str(os.getpid()))
        sys.stdout.flush()
        sys.stderr.flush()
        si = file(self.stdin, 'r')
        so = file(self.stdout, 'a+')
        se = file(self.stderr, 'a+', 0)
        os.dup2(si.fileno(), sys.stdin.fileno())
        os.dup2(so.fileno(), sys.stdout.fileno())
        os.dup2(se.fileno(), sys.stderr.fileno())

        # write pidfile
        atexit.register(self.delpid)
        pid = str(os.getpid())
        log.info("write pid : %s to pidfile : %s" %(pid, self.pidfile))
        file(self.pidfile, 'w+').write("%s\n" % pid)


    def delpid(self):
        os.remove(self.pidfile)
        log.info("do pid : %s os remove(%s)" % (str(os.getpid()), self.pidfile))

    def start(self):
        """
        Start the daemon
        """
        # Check for a pidfile to see if the daemon already runs
        try:
            pf = file(self.pidfile, 'r')
            pid = int(pf.read().strip())
            pf.close()
        except IOError:
            pid = None

        if pid:
            message = "pidfile %s already exist. Daemon already running?\n"
            sys.stderr.write(message % self.pidfile)
            log.warning(message % self.pidfile)
            sys.exit(1)

        # Start the daemon
        self.daemonize()
        self.run()

    def stop(self):
        """
        Stop the daemon
        """
        # Get the pid from the pidfile
        try:
            pf = file(self.pidfile, 'r')
            pid = int(pf.read().strip())
            pf.close()
        except IOError:
            pid = None

        if not pid:
            message = "pidfile %s does not exist. Daemon not running?\n"
            sys.stderr.write(message % self.pidfile)
            return  # not an error in a restart

        # Try killing the daemon process
        try:
            while 1:
                os.kill(pid, signal.SIGTERM)
                time.sleep(0.1)
        except OSError, err:
            err = str(err)
            if err.find("No such process") > 0:
                if os.path.exists(self.pidfile):
                    os.remove(self.pidfile)
                    log.info("pid : %s delete pidfile : %s" % (str(os.getpid()), self.pidfile))
            else:
                print str(err)
                sys.exit(1)

    def restart(self):
        """
        Restart the daemon
        """
        self.stop()
        self.start()

    def run(self):
        """
        You should override this method when you subclass Daemon. It will be called after the process has been
        daemonized by start() or restart().
        """

2.创建测试入口

我们的需求是使用协程启动多个守护进程。
* 创建我们两个server1和server2,两个server都是守护进程。
* 使用eventlet.monkey_patch绿化我们的线程
* 使用日志文件详细记录创建守护进程的过程
* 使用命令行开启,关闭我们的服务:python test.py -a start启动所有的server,python test.py stop关闭所有的server

#test.py
import os
import sys
import time
import argparse
import logging
import eventlet

eventlet.monkey_patch(
    os=True,
    select=True,
    socket=True,
    thread=False if '--use-debugger' in sys.argv else True,
    time=True)

from daemon import Daemon

log = logging.getLogger(__name__)


class MyDaemon(Daemon):
        def run(self):
            while True:
                time.sleep(60)
                log.info("I am running backend!")

def server1(action):
    # daemon1 = MyDaemon("/tmp/testdeamon1.pid")
    # log.info("server1 request action : %s" % action)
    # if action == "start":
    #   daemon1.start()
    #   log.info("started test daemon1")
    # else:
    #   daemon1.stop()
    #   log.info("stopped test daemon1")
    try:
        daemon1 = MyDaemon("/tmp/testdeamon1.pid")
        log.info("server1 request action : %s" % action)
        if action == "start":
            daemon1.start()
            log.info("started test daemon1")
        else:
            daemon1.stop()
            log.info("stopped test daemon1")
    except SystemExit, e:
        log.info("server1 catch SystemExit e:%s" %e)


def server2(action):
    # daemon2 = MyDaemon("/tmp/testdeamon2.pid")
    # log.info("server2 request action : %s" % action)
    # if action == "start":
    #   daemon2.start()
    #   log.info("started test daemon2")
    # else:
    #   daemon2.stop()
    #   log.info("stopped tets daemon2")
    try:
        daemon2 = MyDaemon("/tmp/testdeamon2.pid")
        log.info("server2 request action : %s" % action)
        if action == "start":
            daemon2.start()
            log.info("started test daemon2")
        else:
            daemon2.stop()
            log.info("stopped tets daemon2")
    except SystemExit, e:
        log.info("server2 catch SystemExit e:%s" %e)

def launch(action):
    log.info("launch all server request action : %s" % action)

    threads = [eventlet.spawn(LAUNCH_OPTIONS[option], action)
                    for option in LAUNCH_OPTIONS.keys()]

    [thread.wait() for thread in threads]


LAUNCH_OPTIONS = {
    'server1': server1,
    'server2': server2

}

def main():
    logging.basicConfig(format="%(asctime)s - [%(filename)s:%(lineno)s - pid:%(process)d] - %(levelname)s - %(message)s",
                        filename='/tmp/test_deamon.log',
                        level=logging.DEBUG)
    try:
        parser = argparse.ArgumentParser()
        parser.add_argument("-s1", help="server1: -s start or -s stop", dest="server1")
        parser.add_argument("-s2", help="server2: -s start or -s stop", dest="server2")
        parser.add_argument("-a", help="all server: -a start or -a stop", dest="allserver")

        args = parser.parse_args()

        if args.server2:
            server2(args.server2)
        elif args.server1:
            server1(args.server1)
        elif args.allserver:
            launch(args.allserver)
        else:
            log.info("the args wrong, do not start or stop any server")
        exit(0)

    except RuntimeError, e:
        log.error("catch RuntimeError e:%s" % e)
        sys.exit(1)


if __name__ == '__main__':
    main()

3.查看我们的日志文件

最重要的就是对日志进行分析了:
从日志的pid的变化可以清楚的知道所有服务创建的过程。

2016-01-06 14:32:30,160 - [test.py:70 - pid:12123] - INFO - launch all server request action : start
2016-01-06 14:32:30,161 - [daemon.py:29 - pid:12123] - INFO - create daemon pidfile at:/tmp/testdeamon1.pid
2016-01-06 14:32:30,161 - [test.py:37 - pid:12123] - INFO - server1 request action : start
2016-01-06 14:32:30,162 - [daemon.py:42 - pid:12123] - INFO - 1. fork 1# ----  ppid:12123
2016-01-06 14:32:30,162 - [daemon.py:50 - pid:12124] - INFO - 2. fork 1# ----   pid:12124
2016-01-06 14:32:30,162 - [test.py:45 - pid:12123] - INFO - server1 catch SystemExit e:0
2016-01-06 14:32:30,162 - [daemon.py:29 - pid:12123] - INFO - create daemon pidfile at:/tmp/testdeamon2.pid
2016-01-06 14:32:30,162 - [test.py:59 - pid:12123] - INFO - server2 request action : start
2016-01-06 14:32:30,162 - [daemon.py:59 - pid:12124] - INFO - 3. fork 2# ----   ppid:12124
2016-01-06 14:32:30,162 - [daemon.py:67 - pid:12125] - INFO - 4. fork 2# ----   pid:12125
2016-01-06 14:32:30,163 - [daemon.py:42 - pid:12123] - INFO - 1. fork 1# ----  ppid:12123
2016-01-06 14:32:30,163 - [daemon.py:80 - pid:12125] - INFO - write pid : 12125 to pidfile : /tmp/testdeamon1.pid
2016-01-06 14:32:30,163 - [test.py:67 - pid:12123] - INFO - server2 catch SystemExit e:0
2016-01-06 14:32:30,163 - [daemon.py:50 - pid:12126] - INFO - 2. fork 1# ----   pid:12126
2016-01-06 14:32:30,164 - [daemon.py:59 - pid:12126] - INFO - 3. fork 2# ----   ppid:12126
2016-01-06 14:32:30,164 - [daemon.py:67 - pid:12127] - INFO - 4. fork 2# ----   pid:12127
2016-01-06 14:32:30,165 - [daemon.py:80 - pid:12127] - INFO - write pid : 12127 to pidfile : /tmp/testdeamon2.pid
2016-01-06 14:32:30,165 - [daemon.py:29 - pid:12125] - INFO - create daemon pidfile at:/tmp/testdeamon2.pid
2016-01-06 14:32:30,165 - [test.py:59 - pid:12125] - INFO - server2 request action : start
2016-01-06 14:32:30,166 - [daemon.py:103 - pid:12125] - WARNING - pidfile /tmp/testdeamon2.pid already exist. Daemon already running?

2016-01-06 14:32:30,166 - [test.py:67 - pid:12125] - INFO - server2 catch SystemExit e:1
2016-01-06 14:33:30,181 - [test.py:24 - pid:12127] - INFO - I am running backend!
2016-01-06 14:33:30,212 - [test.py:24 - pid:12125] - INFO - I am running backend!
2016-01-06 14:34:30,237 - [test.py:24 - pid:12127] - INFO - I am running backend!
2016-01-06 14:34:30,267 - [test.py:24 - pid:12125] - INFO - I am running backend!
2016-01-06 14:35:30,266 - [test.py:24 - pid:12127] - INFO - I am running backend!
2016-01-06 14:35:30,312 - [test.py:24 - pid:12125] - INFO - I am running backend!

4.总结

TODO

5. 参考

[Python daemon process]http://code.activestate.com/recipes/278731/

 类似资料: