当前位置: 首页 > 知识库问答 >
问题:

为什么进程不会在多进程中终止

南宫建白
2023-03-14

昨天我在python中使用多重处理处理了大约2000万行的日志文件。

  1. 启动名为“producer”的进程,逐行读取文件并将其放入队列

代码在下面

from multiprocessing import Process, Queue
from Queue import Empty
import os
import time    


def put_ip(src, q, number):
    """ 
    read file line by line, and put it to queue
    """
    print "start put_ip: %d" % os.getpid()
    with open(src) as f:
        for line in f:
            q.put(line)
        for i in range(number):
            q.put(EOFError)
    print "stop put_ip"


def get_ip(lock, src, result, index):
    """ 
    fetch line, and extract ip from it
    """
    print "start get_ip %d: %d" % (index, os.getpid())
    ips = []
    while True:
        line = src.get()
        if line == EOFError:
            print "%d get EOFError" % index
            break
        else:
            res = json.loads(line.strip())
            # process res, get ip
            ips.append(ip)
    print "get_ip %d get %d ips" % (os.getpid(), len(ips))
    result.put('\n'.join(ips))
    ips = []
    print "stop get_ip %d" % os.getpid()
    return


def test_get_ip(src, dest, number):
    """ 
    test with single process
    """
    srcq = Queue()
    result = Queue()

    with open(src) as f:
        for line in f:
            # if 'error' not in line:
            srcq.put(line)
        for i in range(number):
            srcq.put(EOFError)

    get_ip(srcq, result, 0)


def main(src, dest, number):
    """ 
    with multiprocess
    """
    srcq = Queue()
    result = Queue()

    producer = Process(target=put_ip, args=(src, srcq, number))

    consumers = [Process(target=get_ip, args=(srcq, result, i)) for i in xrange(number)]

    print 'start at %s' % time.asctime()
    starttime = time.time()

    producer.start()
    for consumer in consumers:
        consumer.start()

    producer.join()
    for consumer in consumers:
        consumer.join()

    with open(dest, 'w') as w:
        while True:
            try:
                res = result.get_nowait()
                w.write(res +'\n')
            except Empty:
                print 'Empty'
                break

    print "time: %f" % (time.time()-starttime)

if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('-i', dest='src', required=True)
    parser.add_argument('-o', dest='dest', required=True)
    parser.add_argument('-n', dest='number', type=int, default=2)

    args = parser.parse_args()

    main(args.src, args.dest, args.number)
    # test_get_ip(args.src, args.dest, args.number)

结果很奇怪,工作完成后,消费者进程不会终止,并且主函数在连接()处被阻塞。

使用以下不同的套装和代码进行测试:

  • 使用test_get_ip()而不进行多处理来处理大小日志文件,效果很好

那么,有什么问题?列表中有限制吗?有什么我错过的吗?

我的机器环境是:

Linux 3.19.0-32-generic #37~14.04.1-Ubuntu SMP Thu Oct 22 09:41:40 UTC 2015 x86_64 x86_64 x86_64 GNU/Linux 
Python 2.7.6 (default, Jun 22 2015, 17:58:13) 
[GCC 4.8.2] on linux2

谢谢你抽出时间!

共有1个答案

单于俊智
2023-03-14

我认为你的问题是,你在缓冲所有的数据,只有当消费者完成时才发送结果。

假设运行get_ip()的进程在一个列表中收集了1M个ip地址。现在,在终止之前,它需要序列化所有这些数据并将其通过队列,然后main()函数将接收并反序列化所有这些数据。

我的建议是,您直接将IP地址放入结果队列,然后让main()进程获取它们并在它们出现时写入它们。

 类似资料:
  • 问题内容: 以下守护进程正在运行: 它是守护程序,因此如果单例将终止。 因此,以下非守护程序Bean正在等待他: Bean的Spring配置为: 问题是:如果从main运行,为什么它起作用,而从jUnit测试运行,为什么不起作用? 运行代码是 要么 在主要情况下,我会看到所有的声音。在jUnit的情况下,我仅看到心跳0,然后显示消息“等待启动”,并且程序终止,好像没有人在这里等待非守护程序线程一样

  • 问题内容: 维基百科说:“一个终止但从未被其父级等待的子进程变成了僵尸进程。” 我运行此程序: 这会创建一个僵尸进程,但我不明白为什么在这里创建了僵尸进程? 该程序的输出是 但是在这种情况下,为什么“子进程终止但没有被其父进程等待”呢? 问题答案: 在您的代码中,创建了僵尸(带有以下箭头的注释): 为什么?因为你从来没有上过。调用时,它将返回有关进程的事后信息,例如其退出代码。不幸的是,当进程退出

  • 问题内容: 我正在学习如何使用Python多处理库。但是,当我浏览一些示例时,最终我在后台运行了许多python进程。 其中的例子看起来象下面这样: 现在,这是我的“ TOP”命令的屏幕截图: 我不知道如何一口气杀死他们。 ps … | grep python ....杀死吗? 我需要添加哪种python代码,以避免再次发生这种悲惨的情况。谢谢! 问题答案: 您需要在工作队列中处理您的进程,该进程

  • 本文向大家介绍终止进程用什么命令? 带什么参数?相关面试题,主要包含被问及终止进程用什么命令? 带什么参数?时的应答技巧和注意事项,需要的朋友参考一下 答案: kill [-s <信息名称或编号>][程序] 或 kill [-l <信息编号>] kill-9 pid

  • 进程的定义 根据维基百科的定义,进程(Process)是计算机中已运行程序的实体。用户下达运行程序的命令后,就会产生进程。进程需要一些资源才能完成工作,如CPU使用时间、存储器、文件以及I/O设备,且为依序逐一进行,也就是每个CPU核心任何时间内仅能运行一项进程。 我们简单总结下,进程就是代码运行的实体。这里补充一点,进程不一定都是正在运行的,也可能在等待调度或者停止,进程状态将在后续详细介绍。

  • 我有两个weblogic域,每个域都有一个托管服务器,问题是每3或4个小时可能会有不到四个进程突然被杀死,我在域控制台中发现了这一点。 ./startWebLogic。sh:第175行:53875杀死了${JAVA\u HOME}/bin/JAVA${JAVA\u VM}${MEM\u ARGS}-Dweblogic。名称=${SERVER\u Name}-Djava。安全策略=${WLU HOM