当前位置: 首页 > 面试题库 >

multiprocessing.Pool-PicklingError:无法腌制 :属性查找thread.lock失败

徐俊楚
2023-03-14
问题内容

multiprocessing.Pool令我发疯……
我想升级许多软件包,对于每个软件包,我都必须检查是否有更大的版本。这是通过check_one函数来完成的。
主要代码在Updater.update方法中:在此处创建Pool对象并调用该map()方法。

这是代码:

def check_one(args):
    res, total, package, version = args
    i = res.qsize()
    logger.info('\r[{0:.1%} - {1}, {2} / {3}]',
        i / float(total), package, i, total, addn=False)
    try:
        json = PyPIJson(package).retrieve()
        new_version = Version(json['info']['version'])
    except Exception as e:
        logger.error('Error: Failed to fetch data for {0} ({1})', package, e)
        return
    if new_version > version:
        res.put_nowait((package, version, new_version, json))

class Updater(FileManager):

    # __init__ and other methods...

    def update(self):    
        logger.info('Searching for updates')
        packages = Queue.Queue()
        data = ((packages, self.set_len, dist.project_name, Version(dist.version)) \
            for dist in self.working_set)
        pool = multiprocessing.Pool()
        pool.map(check_one, data)
        pool.close()
        pool.join()
        while True:
            try:
                package, version, new_version, json = packages.get_nowait()
            except Queue.Empty:
                break
            txt = 'A new release is avaiable for {0}: {1!s} (old {2}), update'.format(package,
                                                                                      new_version,
                                                                                      version)
            u = logger.ask(txt, bool=('upgrade version', 'keep working version'), dont_ask=self.yes)
            if u:
                self.upgrade(package, json, new_version)
            else:
                logger.info('{0} has not been upgraded', package)
        self._clean()
        logger.success('Updating finished successfully')

当我运行它时,我得到这个奇怪的错误:

Searching for updates
Exception in thread Thread-1:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/local/lib/python2.7/dist-packages/multiprocessing/pool.py", line 225, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed

问题答案:

多重处理通过传递任务(包括check_onedata)到工作进程mp.SimpleQueue。与Queue.Queues不同,放入s中的所有内容都mp.SimpleQueue必须是可挑选的。Queue.Queues不可选:

import multiprocessing as mp
import Queue

def foo(queue):
    pass

pool=mp.Pool()
q=Queue.Queue()

pool.map(foo,(q,))

产生此异常:

UnpickleableError: Cannot pickle <type 'thread.lock'> objects

您的datainclude包括packages一个Queue.Queue。这可能是问题的根源。

这是一个可能的解决方法:该方法Queue有两个用途:

  1. 找出大概的大小(通过调用qsize
  2. 存储结果供以后检索。

qsize我们可以使用而不是在多个进程之间共享值,而无需调用mp.Value

除了将结果存储在队列中之外,我们可以(并且应该)仅从调用返回值check_one。该pool.map在自己制作的队列中收集结果,并返回结果的返回值pool.map

例如:

import multiprocessing as mp
import Queue
import random
import logging

# logger=mp.log_to_stderr(logging.DEBUG)
logger = logging.getLogger(__name__)


qsize = mp.Value('i', 1)
def check_one(args):
    total, package, version = args
    i = qsize.value
    logger.info('\r[{0:.1%} - {1}, {2} / {3}]'.format(
        i / float(total), package, i, total))
    new_version = random.randrange(0,100)
    qsize.value += 1
    if new_version > version:
        return (package, version, new_version, None)
    else:
        return None

def update():    
    logger.info('Searching for updates')
    set_len=10
    data = ( (set_len, 'project-{0}'.format(i), random.randrange(0,100))
             for i in range(set_len) )
    pool = mp.Pool()
    results = pool.map(check_one, data)
    pool.close()
    pool.join()
    for result in results:
        if result is None: continue
        package, version, new_version, json = result
        txt = 'A new release is avaiable for {0}: {1!s} (old {2}), update'.format(
            package, new_version, version)
        logger.info(txt)
    logger.info('Updating finished successfully')

if __name__=='__main__':
    logging.basicConfig(level=logging.DEBUG)
    update()


 类似资料:
  • 问题内容: 我正在尝试腌制: 这将产生以下输出: 我究竟做错了什么?是,这个问题是一个成员?(将的定义移到最高级别可以解决此问题,尽管我仍然很好奇为什么会这样。) 问题答案: 是的,它是类成员这一事实是一个问题: 问题在于,当返回类型对象时,它不知道将其分配给类成员的事实- 因此,它告诉类型对象其类型名称应为,即使它确实应该为。

  • 我创建了带有过滤器的简单ear项目。我想对每个环境使用不同的设置,这些设置应该传递给生成的应用程序。xml文件,格式为env条目。使用maven ear插件生成ear包,如下所示: 为此,我必须使用另一个插件属性-maven-plugin。它成功地从文件中读取属性并将它们设置为maven项目属性,因此我可以使用将它们插入pom.xml文件中。它适用于大多数pom.xml元素(即 文件创建为user

  • 我想要log4j2Spring。xml从应用程序中读取属性。属性文件。但似乎是Spring。xml无法读取此内容。我读过https://logging.apache.org/log4j/2.x/manual/lookups.html#SpringLookup为了实现这一点。 我在这个网站上看到了这个答案。我也这样试过。但这对我没有帮助。 我的身材。格雷德尔是这样的: application.pro

  • 问题内容: 我不能用一个简单的例子再现这个错误,而且我的代码太复杂了,无法发布。如果我用而不是普通的Python来运行程序,事情会很顺利。 我查阅了以前关于这个问题的一些笔记。它们都是由使用池调用类函数中定义的函数引起的。但对我来说不是这样。 我很感激你的帮助。 更新:函数是在模块的顶层定义的。尽管它调用包含嵌套函数的函数。调用调用,)有一个嵌套函数,我正在调用。、、)都是在顶层定义的。我用这个模

  • 下面是我的config.yml的一部分: 我有一个用于解析的类: 然而,当我跑的时候 发生以下错误: 为什么它抱怨找不到属性AuthenticationConfig而AuthenticationConfig只是实例变量的名称? 更新在我将实例变量从“private”更改为“public”后,它们被SnakeYaml识别,但这并不是我们所能肯定的。类不被识别为JavaBean。 更新我找到了根本原因

  • logback无法计算spring应用程序名称