当前位置: 首页 > 面试题库 >

python multiprocessing:某些函数完成后不返回(队列内容太大)

姚高爽
2023-03-14
问题内容

我正在使用多处理的“进程和队列”。我并行启动了几个函数,大多数函数表现良好:它们完成了,它们的输出进入了Queue,它们显示为.is_alive()==
False。但是由于某些原因,一些功能无法正常工作。即使函数的最后一行(显示“ Finished”的打印语句)完成,它们也始终显示.is_alive()==
True。无论我启动的功能集是什么,都会发生这种情况,即使只有一个。如果不并行运行,则功能正常,并可以正常返回。什么 样的 事情可能是什么问题?

这是我用来管理作业的通用功能。我没有显示的只是传递给它的功能。它们很长,经常使用matplotlib,有时会启动一些shell命令,但是我无法弄清楚失败的命令的共同点。

def  runFunctionsInParallel(listOf_FuncAndArgLists):
    """
    Take a list of lists like [function, arg1, arg2, ...]. Run those functions in parallel, wait for them all to finish, and return the list of their return values, in order.   
    """
    from multiprocessing import Process, Queue

    def storeOutputFFF(fff,theArgs,que): #add a argument to function for assigning a queue
        print 'MULTIPROCESSING: Launching %s in parallel '%fff.func_name
        que.put(fff(*theArgs)) #we're putting return value into queue
        print 'MULTIPROCESSING: Finished %s in parallel! '%fff.func_name
        # We get this far even for "bad" functions
        return

    queues=[Queue() for fff in listOf_FuncAndArgLists] #create a queue object for each function
    jobs = [Process(target=storeOutputFFF,args=[funcArgs[0],funcArgs[1:],queues[iii]]) for iii,funcArgs in enumerate(listOf_FuncAndArgLists)]
    for job in jobs: job.start() # Launch them all
    import time
    from math import sqrt
    n=1
    while any([jj.is_alive() for jj in jobs]): # debugging section shows progress updates
        n+=1
        time.sleep(5+sqrt(n)) # Wait a while before next update. Slow down updates for really long runs.
        print('\n---------------------------------------------------\n'+ '\t'.join(['alive?','Job','exitcode','Func',])+ '\n---------------------------------------------------')
        print('\n'.join(['%s:\t%s:\t%s:\t%s'%(job.is_alive()*'Yes',job.name,job.exitcode,listOf_FuncAndArgLists[ii][0].func_name) for ii,job in enumerate(jobs)]))
        print('---------------------------------------------------\n')
    # I never get to the following line when one of the "bad" functions is running.
    for job in jobs: job.join() # Wait for them all to finish... Hm, Is this needed to get at the Queues?
    # And now, collect all the outputs:
    return([queue.get() for queue in queues])

问题答案:

好吧,当函数的输出太大时,似乎用来填充Queue的管道被塞住了(我的粗略理解?这是一个未解决/关闭的错误?http://bugs.python.org/issue8237)。我已经修改了问题中的代码,以便有一些缓冲(进程运行时会定期清空队列),这解决了我所有的问题。因此,现在需要执行任务(函数及其参数)的集合,启动它们并收集输出。我希望它看起来更简单/更干净。

编辑(2014年9月;
2017年11月更新:为了可读性而重写):我正在使用自那时以来所做的增强功能更新代码。新代码(功能相同,但功能更好)在此处:https
:
//gitlab.com/cpbl/cpblUtilities/blob/master/parallel.py

调用说明也在下面。

def runFunctionsInParallel(*args, **kwargs):
    """ This is the main/only interface to class cRunFunctionsInParallel. See its documentation for arguments.
    """
    return cRunFunctionsInParallel(*args, **kwargs).launch_jobs()

###########################################################################################
###
class cRunFunctionsInParallel():
    ###
    #######################################################################################
    """Run any list of functions, each with any arguments and keyword-arguments, in parallel.
The functions/jobs should return (if anything) pickleable results. In order to avoid processes getting stuck due to the output queues overflowing, the queues are regularly collected and emptied.
You can now pass os.system or etc to this as the function, in order to parallelize at the OS level, with no need for a wrapper: I made use of hasattr(builtinfunction,'func_name') to check for a name.
Parameters
----------
listOf_FuncAndArgLists : a list of lists 
    List of up-to-three-element-lists, like [function, args, kwargs],
    specifying the set of functions to be launched in parallel.  If an
    element is just a function, rather than a list, then it is assumed
    to have no arguments or keyword arguments. Thus, possible formats
    for elements of the outer list are:
      function
      [function, list]
      [function, list, dict]
kwargs: dict
    One can also supply the kwargs once, for all jobs (or for those
    without their own non-empty kwargs specified in the list)
names: an optional list of names to identify the processes.
    If omitted, the function name is used, so if all the functions are
    the same (ie merely with different arguments), then they would be
    named indistinguishably
offsetsSeconds: int or list of ints
    delay some functions' start times
expectNonzeroExit: True/False
    Normal behaviour is to not proceed if any function exits with a
    failed exit code. This can be used to override this behaviour.
parallel: True/False
    Whenever the list of functions is longer than one, functions will
    be run in parallel unless this parameter is passed as False
maxAtOnce: int
    If nonzero, this limits how many jobs will be allowed to run at
    once.  By default, this is set according to how many processors
    the hardware has available.
showFinished : int
    Specifies the maximum number of successfully finished jobs to show
    in the text interface (before the last report, which should always
    show them all).
Returns
-------
Returns a tuple of (return codes, return values), each a list in order of the jobs provided.
Issues
-------
Only tested on POSIX OSes.
Examples
--------
See the testParallel() method in this module
    """


 类似资料:
  • 有人能解释一下为什么我通过Postman和http从外部http API得到不同的响应吗。NetCore web Api HttpClient。 这是一个密码 结果是 但Postman返回完整有效的json结果。 为什么结果从。NetCore HttpClient是部分的? 我已经尝试了指定请求头这样的选项: 还补充道: 没有帮助。

  • 问题内容: 我在理解CSS属性的行为时遇到了麻烦。根据规范和此处 顾名思义,和伪元素指定元素的文档树内容之前和之后的内容位置。 这似乎并未限制哪些元素可以具有(或)属性。但是,它似乎仅适用于特定元素 … 有效,无效,无效。我可以测试更多,但重点是。请注意,这在各个浏览器中似乎非常一致。 是什么决定对象是否可以接受和属性? 问题答案: 和都是被替换的元素。 替换的元素是其外观和尺寸由外部资源定义的任

  • 我在ViewDidLoad函数中有一些代码,它将在调用堆栈的末尾设置一个类变量。我试图重构代码,使其成为一个单独的函数,它将返回值,而不是设置类变量。 由于我缺乏swift知识,我不确定哪里出了问题,我的函数似乎返回得太厄尔了,因为我可以在调试器中告诉我,它在被设置为之前跳转到return。 我还可以在调试器中看到,内部函数在返回主函数后调用。 如何等待内部调用完成后再返回?或者什么是正确的快速方

  • 你能帮我做一下ElasticSearch吗?建议:https://www.elastic.co/guide/en/ElasticSearch/reference/5.1/search-suggesters-completion.html 我在ES索引中创建了类型 curl-xput“localhost:9200/tass_suggest_test/_mapping/company?pretty”-

  • 我想用编写一个函数。我正在使用PostgresEnterprise Manager v3并使用shell来创建一个函数,但是在shell中我必须定义返回类型。如果我不定义返回类型,我就无法创建函数。 如何创建没有返回结果的函数,即创建新表的函数?

  • 问题内容: 在Python中,我看到了许多示例,其中调用了多处理,但目标仅打印了一些内容。我有一种情况,目标返回2个变量,以后需要使用。例如: 怎么办?我可以执行.start和.join,但是如何检索单个结果?我需要捕捉我执行的所有作业的返回a,b,然后对其进行处理。 问题答案: 是的,可以- 您可以使用多种方法。最简单的一种是共享。在此处查看示例:http : //eli.thegreenpla