当前位置: 首页 > 知识库问答 >
问题:

如何使内置容器(集合、目录、列表)线程安全?

廉元龙
2023-03-14

我从这个问题中了解到,如果我想要一个线程安全的set,我必须自己实现线程安全部分。

因此,我可以想出:

from threading import Lock

class LockedSet(set):
    """A set where add() and remove() are thread-safe"""

    def __init__(self, *args, **kwargs):
        # Create a lock
        self._lock = Lock()
        # Call the original __init__
        super(LockedSet, self).__init__(*args, **kwargs)

    def add(self, elem):
        self._lock.acquire()
        try:
            super(LockedSet, self).add(elem)
        finally:
            self._lock.release()

    def remove(self, elem):
        self._lock.acquire()
        try:
            super(LockedSet, self).remove(elem)
        finally:
            self._lock.release()

因此,在这个实现中,当然只有add()和remove()是线程安全的。其他方法不是,因为它们在子类中没有被覆盖。

现在,模式非常简单:获取锁,调用原始方法,释放锁。如果遵循上述逻辑,我必须以基本相同的方式覆盖set公开的所有方法,例如:

(伪代码)

def <method>(<args>):
    1. acquire lock
    2. try:
    3.     call original method passing <args>
    4. finally:
    5.     release lock

(/伪代码)

这不仅单调乏味,而且容易出错。那么,关于如何更好地处理这一问题,有什么想法/建议吗?

共有3个答案

谷梁卓
2023-03-14

[事实上,见评论,这不是真的]

如果您正在运行CPython,您可以从设置的源代码中看到它没有释放GIL(http://hg.python.org/cpython/file/db20367b20de/Objects/setobject.c)所以它的所有操作都应该是原子的。

如果它是您所需要的,并且您确定要在CPython上运行代码,那么您可以直接使用它。

齐浩淼
2023-03-14

这是我第一次尝试使用decorator(尽管我的代码实际上没有使用@decoration语法),而且我在多线程/多处理方面没有太多经验。不过,有了这个免责声明,我做了一个尝试:

from multiprocessing import Lock

def decorate_all(obj):
    lock = Lock()
    #you'll want to make this more robust:
    fnc_names = [fnctn for fnctn in dir(obj) if '__' not in fnctn]
    for name in fnc_names:
        print 'decorating ' + name
        fnc = getattr(obj, name)
        setattr(obj, name, decorate(fnc, lock))
    return obj

def decorate(fnctn, lock):
    def decorated(*args):
        print 'acquiring lock'
        lock.acquire()
        try:
            print 'calling decorated function'
            return fnctn(*args)
        finally:
            print 'releasing lock'
            lock.release()
    return decorated


def thread_safe(superclass):
    lock = Lock()
    class Thread_Safe(superclass):
        def __init__(self, *args, **kwargs):
            super(Thread_Safe, self).__init__(*args, **kwargs)
    return decorate_all(Thread_Safe)


>>> thread_safe_set = thread_safe(set)
decorating add
decorating clear
decorating copy
decorating difference
decorating difference_update
decorating discard
decorating intersection
decorating intersection_update
decorating isdisjoint
decorating issubset
decorating issuperset
decorating pop
decorating remove
decorating symmetric_difference
decorating symmetric_difference_update
decorating union
decorating update
>>> s = thread_safe_set()
>>> s.add(1)
acquiring lock
calling decorated function
releasing lock
>>> s.add(4)
acquiring lock
calling decorated function
releasing lock
>>> s.pop()
acquiring lock
calling decorated function
releasing lock
1
>>> s.pop()
acquiring lock
calling decorated function
releasing lock
4
>>>
封昊天
2023-03-14

您可以使用Python的元编程工具来实现这一点。(注意:书写速度快,测试不彻底。)我更喜欢使用类装饰器。

我还认为您可能需要锁定的不仅仅是addremove,以确保设置的线程安全,但我不确定。我将忽略这个问题,只关注你的问题。

还要考虑委托(代理)是否比子类化更合适。包装对象是Python中常用的方法。

最后,元编程的“魔杖”不会神奇地为任何可变Python集合添加细粒度锁定。最安全的做法是使用RLock锁定任何方法或属性访问,但这是非常粗粒度和缓慢的,可能仍然不能保证对象在所有情况下都是线程安全的。(例如,可能有一个集合操纵其他线程可以访问的另一个非线程安全对象。)您确实需要检查每个数据结构,并考虑哪些操作是原子操作或需要锁,哪些方法可能使用相同的锁调用其他方法(即死锁本身)。

也就是说,这里有一些你可以使用的抽象顺序越来越多的技术:

class LockProxy(object):
    def __init__(self, obj):
        self.__obj = obj
        self.__lock = RLock()
        # RLock because object methods may call own methods
    def __getattr__(self, name):
        def wrapped(*a, **k):
            with self.__lock:
                getattr(self.__obj, name)(*a, **k)
        return wrapped

lockedset = LockProxy(set([1,2,3]))
class LockedSet(set):
    """A set where add(), remove(), and 'in' operator are thread-safe"""

    def __init__(self, *args, **kwargs):
        self._lock = Lock()
        super(LockedSet, self).__init__(*args, **kwargs)

    def add(self, elem):
        with self._lock:
            super(LockedSet, self).add(elem)

    def remove(self, elem):
        with self._lock:
            super(LockedSet, self).remove(elem)

    def __contains__(self, elem):
        with self._lock:
            super(LockedSet, self).__contains__(elem)
def locked_method(method):
    """Method decorator. Requires a lock object at self._lock"""
    def newmethod(self, *args, **kwargs):
        with self._lock:
            return method(self, *args, **kwargs)
    return newmethod

class DecoratorLockedSet(set):
    def __init__(self, *args, **kwargs):
        self._lock = Lock()
        super(DecoratorLockedSet, self).__init__(*args, **kwargs)

    @locked_method
    def add(self, *args, **kwargs):
        return super(DecoratorLockedSet, self).add(elem)

    @locked_method
    def remove(self, *args, **kwargs):
        return super(DecoratorLockedSet, self).remove(elem)

我认为这是最干净、最容易理解的抽象方法,因此我对其进行了扩展,允许指定要锁定的方法和锁定对象工厂。

def lock_class(methodnames, lockfactory):
    return lambda cls: make_threadsafe(cls, methodnames, lockfactory)

def lock_method(method):
    if getattr(method, '__is_locked', False):
        raise TypeError("Method %r is already locked!" % method)
    def locked_method(self, *arg, **kwarg):
        with self._lock:
            return method(self, *arg, **kwarg)
    locked_method.__name__ = '%s(%s)' % ('lock_method', method.__name__)
    locked_method.__is_locked = True
    return locked_method


def make_threadsafe(cls, methodnames, lockfactory):
    init = cls.__init__
    def newinit(self, *arg, **kwarg):
        init(self, *arg, **kwarg)
        self._lock = lockfactory()
    cls.__init__ = newinit

    for methodname in methodnames:
        oldmethod = getattr(cls, methodname)
        newmethod = lock_method(oldmethod)
        setattr(cls, methodname, newmethod)

    return cls


@lock_class(['add','remove'], Lock)
class ClassDecoratorLockedSet(set):
    @lock_method # if you double-lock a method, a TypeError is raised
    def frobnify(self):
        pass
class AttrLockedSet(set):
    def __init__(self, *args, **kwargs):
        self._lock = Lock()
        super(AttrLockedSet, self).__init__(*args, **kwargs)

    def __getattribute__(self, name):
        if name in ['add','remove']:
            # note: makes a new callable object "lockedmethod" on every call
            # best to add a layer of memoization
            lock = self._lock
            def lockedmethod(*args, **kwargs):
                with lock:
                    return super(AttrLockedSet, self).__getattribute__(name)(*args, **kwargs)
            return lockedmethod
        else:
            return super(AttrLockedSet, self).__getattribute__(name)
class NewLockedSet(set):
    def __new__(cls, *args, **kwargs):
        # modify the class by adding new unbound methods
        # you could also attach a single __getattribute__ like above
        for membername in ['add', 'remove']:
            def scoper(membername=membername):
                # You can also return the function or use a class
                def lockedmethod(self, *args, **kwargs):
                    with self._lock:
                        m = getattr(super(NewLockedSet, self), membername)
                        return m(*args, **kwargs)
                lockedmethod.__name__ = membername
                setattr(cls, membername, lockedmethod)
        self = super(NewLockedSet, cls).__new__(cls, *args, **kwargs)
        self._lock = Lock()
        return self
def _lockname(classname):
    return '_%s__%s' % (classname, 'lock')

class LockedClass(type):
    def __new__(mcls, name, bases, dict_):
        # we'll bind these after we add the methods
        cls = None
        def lockmethodfactory(methodname, lockattr):
            def lockedmethod(self, *args, **kwargs):
                with getattr(self, lockattr):
                    m = getattr(super(cls, self), methodname)
                    return m(*args,**kwargs)
            lockedmethod.__name__ = methodname
            return lockedmethod
        lockattr = _lockname(name)
        for methodname in ['add','remove']:
            dict_[methodname] = lockmethodfactory(methodname, lockattr)
        cls = type.__new__(mcls, name, bases, dict_)
        return cls

    def __call__(self, *args, **kwargs):
        #self is a class--i.e. an "instance" of the LockedClass type
        instance = super(LockedClass, self).__call__(*args, **kwargs)
        setattr(instance, _lockname(self.__name__), Lock())
        return instance



class MetaLockedSet(set):
    __metaclass__ = LockedClass
def LockedClassMetaFactory(wrapmethods):
    class LockedClass(type):
        def __new__(mcls, name, bases, dict_):
            # we'll bind these after we add the methods
            cls = None
            def lockmethodfactory(methodname, lockattr):
                def lockedmethod(self, *args, **kwargs):
                    with getattr(self, lockattr):
                        m = getattr(super(cls, self), methodname)
                        return m(*args,**kwargs)
                lockedmethod.__name__ = methodname
                return lockedmethod
            lockattr = _lockname(name)
            for methodname in wrapmethods:
                dict_[methodname] = lockmethodfactory(methodname, lockattr)
            cls = type.__new__(mcls, name, bases, dict_)
            return cls

        def __call__(self, *args, **kwargs):
            #self is a class--i.e. an "instance" of the LockedClass type
            instance = super(LockedClass, self).__call__(*args, **kwargs)
            setattr(instance, _lockname(self.__name__), Lock())
            return instance
    return LockedClass

class MetaFactoryLockedSet(set):
    __metaclass__ = LockedClassMetaFactory(['add','remove'])

我打赌使用简单、明确的试试。。。最后现在看起来还不错,对吧?

读者练习:让调用者使用以下任何方法传入自己的Lock()对象(依赖项注入)。

 类似资料:
  • 问题内容: 我从这个问题中了解到,如果我想拥有一个线程安全的线程,则必须自己实现线程安全部分。 因此,我可以提出: 因此,在此实现中,当然只有add()和remove()是线程安全的。其他方法不是因为它们未在子类中覆盖。 现在,模式非常简单:获取锁,调用原始方法,释​​放锁。如果遵循上述逻辑,则必须以基本上相同的方式覆盖所有公开的方法,例如: (伪代码) (/伪代码) 这不仅繁琐,而且容易出错。那

  • 问题内容: 我想知道Python内置容器(列表,向量,集合…)是否是线程安全的?还是我需要为共享变量实现锁定/解锁环境? 问题答案: 您需要为将在Python中修改的所有共享变量实现自己的锁定。你不必担心从不会被修改的变量读(即并发读取都OK了),所以稳定的类型(,,)都 可能是 安全的,但它不会伤害。对于你将要改变的东西- ,,,和大多数其他的对象,你应该有自己的锁定机制(而就地操作都OK在大多

  • 问题内容: 我使用以下代码将字符串添加到列表框中。当我运行代码并打开窗口时,由于窗口不够大,因此会截断较长的字符串(请参见屏幕截图)。我试图使窗口可调整大小并添加滚动条,但是我想知道是否有一种自动调整窗口大小以适合内容的方法。 问题答案: 重置列表框宽度对我有用。我使用了遗忘的答案,并注意到宽度始终为零。 我还建议在重新加载列表内容后重置根窗口的几何形状。否则,如果用户手动扩展窗口,则该窗口将停止

  • 本文向大家介绍如何在Python中列出目录的内容?,包括了如何在Python中列出目录的内容?的使用技巧和注意事项,需要的朋友参考一下 os.listdir(my_path)将为您提供my_path目录中的所有内容-文件和目录。 示例 您可以按以下方式使用它: 如果只需要文件,则可以使用isfile对其进行过滤:

  • 我正在使用skLearning TfidfVectorzer进行文本分类。 我知道这个矢量器需要原始文本作为输入,但使用列表是可行的(请参见input1)。 但是,如果我想使用多个列表(或集合),我会得到以下属性错误。 有人知道如何解决这个问题吗?提前谢谢! Traceback(最近一次调用):File",第1行,在File"/库/框架/Python.framework/Versions/3.5/

  • 我有一个在Core .NET 2.2框架顶部使用编写的控制台应用。 我正在尝试C#库来获取我的容器中所有目录的列表。我的理解是Azure Blob存储并没有真正的目录。相反,它创建虚拟名称,Blob看起来像Azure Blob Explorer等浏览器中容器内的文件夹 我使用以下代码存储文件: 所以我想在我的容器内选择前缀aka文件夹名称的不同列表。 所以如果我有以下blob“foldername