当前位置: 首页 > 知识库问答 >
问题:

python多进程共享一个可操作的变量, 如何保证原子操作?

轩辕越泽
2023-11-21

python多进程共享一个可操作的变量, 如何保证原子操作?

我的需求
目前有多组数据需要执行计算型任务, 每执行完一组数据就要给java通知并传递生成的结果文件, 并且我要在把所有任务都执行完的时候(也就是最后一组数据执行完的时候)告诉java本轮次数据全部执行完毕, java那边就要去整体做数据库入库的操作。

我的设想
多进程之间维护一个整型数值, 在执行完一组数据的时候自增1, 并在通知java的方法中去比较这个整型数值和总任务数量, 相等就代表全部完毕。(这中间不考虑计算任务执行失败的情况)

面临的情况
我通过 multiprocessing 模块中的 Manager 去声明了一个整型变量, 然后给每个进程传递了过去, 在执行自增的时候, 出现了多个进程读取到同一个值的情况(详细可以看下面的输出, 或者自行执行下面的demo), 无法满足数值的读写原子, 我尝试了加锁, 但是没有生效。

这是我抽象出来的demo

from concurrent.futures import ProcessPoolExecutorimport ctypesfrom multiprocessing import Manager, Lockfrom multiprocessing.managers import ValueProxyimport osm = Manager().Value(ctypes.c_int, 0)def calc_number(x: int, y: int, _m: "ValueProxy", total_tasks: int):    """模拟耗时任务函数"""    # 模拟耗时计算    res = x**y    # with Lock(): 加锁也不管用...    # 多进程共享变量, 用于比较总任务数量    with Lock():        _m.value += 1    # 当总任务数量和_m.value相等的时候, 通知第三方任务全部做完了    if _m.value == total_tasks:        print(True)    print(f"m_value: {_m.value}, p_id: {os.getpid()}, res: {res}")def main():    # 以下假设有8组任务    t1 = (100, 200, 300, 400, 500, 600, 700, 800)    t2 = (80, 70, 60, 50, 40, 30, 20, 10)    len_t = len(t1)    # 多进程执行cpu耗时性任务    with ProcessPoolExecutor(max_workers=len_t) as executor:        {executor.submit(calc_number, x, y, m, len_t) for x, y in zip(t1, t2)}if __name__ == "__main__":    main()

这是我目前的demo,从我的业务代码中抽象出来的。

这是代码的输出打印(很明显是错误的):

m_value: 2, p_id: 14873, res: 118059162071741130342400000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000m_value: 2, p_id: 14877, res: 12676506002282294014967032053760000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000m_value: 3, p_id: 14875, res: 42391158275216203514294433201000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000m_value: 3, p_id: 14872, res: 10000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000m_value: 4, p_id: 14883, res: 797922662976120010000000000000000000000000000000000000000m_value: 5, p_id: 14879, res: 909494701772928237915039062500000000000000000000000000000000000000000000000000000000000000000000000000000000m_value: 5, p_id: 14881, res: 221073919720733357899776000000000000000000000000000000000000000000000000000000000000m_value: 6, p_id: 14885, res: 107374182400000000000000000000

正确的输出应该 m_value1,2,3,4,5,6,7,8 以此打印出来的。

希望对此颇有研究的大佬指点迷津, 或者给出其他可行方案。最好贴出代码, 不胜感激。


问题已解决: 锁多次创建没有保证是同一把锁是主因

共有1个答案

熊俊人
2023-11-21
from concurrent.futures import ProcessPoolExecutorimport ctypesfrom multiprocessing import Manager, Lockimport os# 创建 Manager 和 Lockmanager = Manager()m = manager.Value(ctypes.c_int, 0)lock = manager.Lock()def calc_number(x: int, y: int, _m, total_tasks: int, _lock):    """模拟耗时任务函数"""    # 模拟耗时计算    res = x ** y    # 用锁来保证原子操作    with _lock:        _m.value += 1        current_value = _m.value    # 当总任务数量和_m.value相等的时候, 通知第三方任务全部做完了    if current_value == total_tasks:        print(True)    print(f"m_value: {current_value}, p_id: {os.getpid()}, res: {res}")def main():    # 任务参数    t1 = (100, 200, 300, 400, 500, 600, 700, 800)    t2 = (80, 70, 60, 50, 40, 30, 20, 10)    len_t = len(t1)    # 多进程执行任务    with ProcessPoolExecutor(max_workers=len_t) as executor:        for x, y in zip(t1, t2):            executor.submit(calc_number, x, y, m, len_t, lock)if __name__ == "__main__":    main()
 类似资料:
  • 问题内容: 最近,我正在阅读一个教程,其中遇到了一条声明: “Java语言规范保证了读取或写入的变量是一个原子操作(除非该变量的类型的或)类型的操作变量或者是只有当它们与申报原子的关键字。” 或提供类似的方法,并且其是原子的。 我对以上声明感到有些困惑。.请您澄清一下 何时使用 或 使用类。 问题答案: 否则(与作为一个)是一个原子操作。但是执行操作不是原子操作,因为它需要读取a的值,递增和写入a

  • 读了很多关于易失性、原子性和可见性的文章后,有一个问题仍然存在。以下跨线程工作,当更新/读取“B”时,“A”始终可见: 原子变量是独立的对象,这同样适用吗?下面的操作会起作用吗? 如果答案是否定的,那么扩展AtomicInteger类并在其中包含“a”就可以了,因为AtomicInteger包装了一个volatile。

  • 原子操作是否足够安全,可以在多线程应用程序中使用它,而不会导致竞争条件和其他并发问题?假设我们不担心可见性(我们从CPU读取/写入所有内容)。

  • 本文向大家介绍python多进程操作实例,包括了python多进程操作实例的使用技巧和注意事项,需要的朋友参考一下 由于CPython实现中的GIL的限制,python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况我们需要使用多进程。 这也许就是python中多进程类库如此简洁好用的原因所在。在python中可以向多线程一样简单地使用多进程。 一、

  • 我只是一个非开发人员,所以我的问题可能非常简单! 我只是在测试Java多线程的东西,这不是真正的代码。我想知道如何在 Java 中同时更新两个成员变量,以防我们希望它们都同步。举个例子: 在这种情况下(当然,想象一下多线程),我希望能够保证对< code>items和< code>itemToStatus的任何读取总是返回相同的结果。 因此,如果代码在< code>itemToStatus.put

  • 我正在研究AWS lambda(无状态服务,您可以在其中运行代码)链接-什么是AWS lambda 在我的例子中,lambda正在运行1个服务,对于该服务,每个请求都必须执行写操作。每个请求绑定一个Id。 如果,请求是用相同的id绑定的。那么,应该按顺序处理。 所以,要做到这一点。我们必须锁定“id”。这样,我们就可以避免写操作冲突。 要做到这一点(我的研究)- 创建具有1列Id的MySQL表,它