当前位置: 首页 > 知识库问答 >
问题:

为什么Python在更多CPU/核心上的并行化规模如此之小?

徐兴昌
2023-03-14

我试图在Python3.8中使用更多可用的内核,通过并行化(通过joblib)来加快计算速度,但观察到它的扩展性很差。

我编写了一个小脚本来测试和演示稍后可以找到的行为。脚本(请参阅后面的内容)设计成有一个完全独立的任务,使用NumPy和Pandas对虚拟操作进行一些迭代。没有任务的输入和输出,没有磁盘或其他I/O,也没有任何通信或共享内存,只是简单的CPU和RAM使用。除了对当前时间的偶尔请求之外,进程不使用任何其他资源。Amdahl定律不应该适用于这里的代码,因为除了流程设置之外根本就没有通用代码。

我通过使用顺序和并行化处理复制任务来运行一些增加工作负载的实验,并测量了每次迭代和整个(并行)过程完成所需的时间。我在我的Windows10笔记本电脑和两台AWS EC2 Linux(亚马逊Linux 2)机器上运行了这个脚本。并行处理的数量从未超过可用内核的数量。

我观察到了以下内容(详细信息请参阅后面的结果,持续时间以秒为单位):

  • 如果并行处理的数量少于可用内核的数量,则总平均CPU利用率(用户)从未超过93%,系统调用未超过4%,并且没有IOWAIT(使用iostat-HXM 10测量)
    • 工作负载似乎平均分布在可用的核心上,这可能表明尽管有大量可用的核心,但进程之间的频繁切换
    • 有趣的是,对于顺序处理,CPU利用率(用户)约为48%

    这些如此重大的发现令我始料未及。

    这是什么原因导致的?

    我是不是漏掉了什么?

    如何补救才能充分利用使用更多核心的前景?

    6个CPU、12个核心调用:python.\time_parallel_processing.py 1,2,4,8 10

                         Duration/Iter            Duration total TotalIterCount
                                  mean        std           mean           mean
    Mode   ParallelCount
    Joblib 1                  4.363902   0.195268      43.673971             10
           2                  6.322100   0.140654      63.870973             20
           4                  9.270582   0.464706      93.631790             40
           8                 15.489000   0.222859     156.670544             80
    Seq    1                  4.409772   0.126686      44.133441             10
           2                  4.465326   0.113183      89.377296             20
           4                  4.534959   0.125097     181.528372             40
           8                  4.444790   0.083315     355.849860             80
    

    8个CPU、16个核心调用:Python time_parallel_processing.py 1,2,4,8,16 10

                    Duration/Iter           Duration total TotalIterCount
                             mean       std           mean           mean
    Mode   ParCount
    Joblib 1             2.196086  0.009798      21.987626             10
           2             3.392873  0.010025      34.297323             20
           4             4.519174  0.126054      45.967140             40
           8             6.888763  0.676024      71.815990             80
           16           12.191278  0.156941     123.287779            160
    Seq    1             2.192089  0.010873      21.945536             10
           2             2.184294  0.008955      43.735713             20
           4             2.201437  0.027537      88.156621             40
           8             2.145312  0.009631     171.805374             80
           16            2.137723  0.018985     342.393953            160
    

    18个CPU,36个内核调用:Python time_parallel_processing.py 1,2,4,8,16,32 10

                    Duration/Iter           Duration total TotalIterCount
                             mean       std           mean           mean
    Mode   ParCount
    Joblib 1             1.888071  0.023799      18.905295             10
           2             2.797132  0.009859      28.307708             20
           4             3.349333  0.106755      34.199839             40
           8             4.273267  0.705345      45.998927             80
           16            6.383214  1.455857      70.469109            160
           32           10.974141  4.220783     129.671016            320
    Seq    1             1.891170  0.030131      18.934494             10
           2             1.866365  0.007283      37.373133             20
           4             1.893082  0.041085      75.813468             40
           8             1.855832  0.007025     148.643725             80
           16            1.896622  0.007573     303.828529            160
           32            1.864366  0.009142     597.301383            320
    
    import argparse
    import sys
    import time
    from argparse import Namespace
    from typing import List
    import numpy as np
    import pandas as pd
    from joblib import delayed
    from joblib import Parallel
    from tqdm import tqdm
    
    RESULT_COLUMNS = {"Mode": str, "ParCount": int, "ProcessId": int, "IterId": int, "Duration": float}
    
    def _create_empty_data_frame() -> pd.DataFrame:
        return pd.DataFrame({key: [] for key, _ in RESULT_COLUMNS.items()}).astype(RESULT_COLUMNS)
    
    def _do_task() -> None:
        for _ in range(10):
            array: np.ndarray = np.random.rand(2500, 2500)
            _ = np.matmul(array, array)
            data_frame: pd.DataFrame = pd.DataFrame(np.random.rand(250, 250), columns=list(map(str, list(range(250)))))
            _ = data_frame.merge(data_frame)
    
    def _process(process_id: int, iter_count: int) -> pd.DataFrame:
        durations: pd.DataFrame = _create_empty_data_frame()
        for i in tqdm(range(iter_count)):
            iter_start_time: float = time.time()
            _do_task()
            durations = durations.append(
                {
                    "Mode": "",
                    "ParCount": 0,
                    "ProcessId": process_id,
                    "IterId": i,
                    "Duration": time.time() - iter_start_time,
                },
                ignore_index=True,
            )
        return durations
    
    def main(args: Namespace) -> None:
        """Execute main script."""
        iter_durations: List[pd.DataFrame] = []
        mode_durations: List[pd.DataFrame] = []
    
        for par_count in list(map(int, args.par_counts.split(","))):
            total_iter_count: int = par_count * int(args.iter_count)
    
            print(f"\nRunning {par_count} processes in parallel and {total_iter_count} iterations in total")
            start_time_joblib: float = time.time()
            with Parallel(n_jobs=par_count) as parallel:
                joblib_durations: List[pd.DataFrame] = parallel(
                    delayed(_process)(process_id, int(args.iter_count)) for process_id in range(par_count)
                )
            iter_durations.append(pd.concat(joblib_durations).assign(**{"Mode": "Joblib", "ParCount": par_count}))
            end_time_joblib: float = time.time()
    
            print(f"\nRunning {par_count} processes sequentially with {total_iter_count} iterations in total")
            start_time_seq: float = time.time()
            seq_durations: List[pd.DataFrame] = []
            for process_id in range(par_count):
                seq_durations.append(_process(process_id, int(args.iter_count)))
            iter_durations.append(pd.concat(seq_durations).assign(**{"Mode": "Seq", "ParCount": par_count}))
            end_time_seq: float = time.time()
    
            mode_durations.append(
                pd.DataFrame(
                    {
                        "Mode": ["Joblib", "Seq"],
                        "ParCount": [par_count] * 2,
                        "Duration": [end_time_joblib - start_time_joblib, end_time_seq - start_time_seq],
                        "TotalIterCount": [total_iter_count] * 2,
                    }
                )
            )
    
        print("\nDuration in seconds")
        grouping_columns: List[str] = ["Mode", "ParCount"]
        print(
            pd.concat(iter_durations)
            .groupby(grouping_columns)
            .agg({"Duration": ["mean", "std"]})
            .merge(
                pd.concat(mode_durations).groupby(grouping_columns).agg({"Duration": ["mean"], "TotalIterCount": "mean"}),
                on=grouping_columns,
                suffixes=["/Iter", " total"],
                how="inner",
            )
        )
    
    if __name__ == "__main__":
        print(f"Command line: {sys.argv}")
        parser: argparse.ArgumentParser = argparse.ArgumentParser(description=__doc__)
        parser.add_argument(
            "par_counts",
            help="Comma separated list of parallel processes counts to start trials for (e.g. '1,2,4,8,16,32')",
        )
        parser.add_argument("iter_count", help="Number of iterations per parallel process to carry out")
        args: argparse.Namespace = parser.parse_args()
    
        start_time: float = time.time()
        main(args)
        print(f"\nTotal elapsed time: {time.time() - start_time:.2f} seconds")
    

    使用“conda env create-f environment.time_parallel.yaml“environment.time_parallel.yaml”创建:

    name: time_parallel
    channels:
      - defaults
      - conda-forge
    dependencies:
      - python=3.8.5
      - pip=20.3.3
      - pandas=1.2.0
      - numpy=1.19.2
      - joblib=1.0.0
      - tqdm=4.55.1
    

    感谢@Sholderbach的帮助,我调查了numpy/pandas的用法,发现了一些事情。

    NumPy使用了一个线性代数后端,它会自动在并行线程中运行一些命令(包括矩阵乘法),这会导致太多的线程阻塞系统,并行进程越多越多,因此每次迭代的持续时间也会增加。我通过删除方法_do_task中的NumPy和Pandas操作来测试这个hypthement,并仅用简单的数学操作来替换它:

    def _do_task() -> None:
        for _ in range(10):
            for i in range(10000000):
                _ = 1000 ^ 2 % 200   
    

    结果与预期完全一样,因为当增加进程数(超过可用内核数)时,迭代的持续时间不会改变。

    Windows 10

    调用python time_parallel_processing.py 1,2,4,8 5

    Duration in seconds
                      Duration/Iter           Duration total TotalIterCount
                               mean       std           mean           mean
    Mode     ParCount
    Joblib   1             2.562570  0.015496      13.468393              5
             2             2.556241  0.021074      13.781174             10
             4             2.565614  0.054754      16.171828             20
             8             2.630463  0.258474      20.328055             40
    Seq      2             2.576542  0.033270      25.874965             10
    

    AWS C5.9X大型

    调用python time_parallel_processing.py 1,2,4,8,16,32 10

                    Duration/Iter           Duration total TotalIterCount
                             mean       std           mean           mean
    Mode   ParCount
    Joblib 1             2.082849  0.022352      20.854512             10
           2             2.126195  0.034078      21.596860             20
           4             2.287874  0.254493      27.420978             40
           8             2.141553  0.030316      21.912917             80
           16            2.156828  0.137937      24.483243            160
           32            3.581366  1.197282      42.884399            320
    Seq    2             2.076256  0.004231      41.571033             20
    

    根据@Sholderbach的提示,我找到了一些其他链接,这些链接涉及线性代数后端自动使用多线程以及如何关闭这些线程的主题:

    • 数量问题(来自@sholderbach)
    • ThreadPoolCTL
      • 文章不错

      添加到_process:

      proc = psutil.Process()
      proc.cpu_affinity([process_id])
      with threadpool_limits(limits=1):   
         ...
      

      添加到环境:

      - threadpoolctl=2.1.0
      - psutil=5.8.0
      

      注意:我不得不将joblib替换为multiprocessing,因为钉扎在joblib中不能正常工作(在Linux上每次只生成一半进程)。

      我做了一些测试,结果好坏参半。监控显示,对每个进程进行pinnng和限制为一个线程对Windows10和Linux/AWS C5.9xLarge都有效。不幸的是,每次迭代的绝对持续时间由于这些“修正”而增加。而且,每次迭代的持续时间在并行化的某个点仍然开始增加。

      以下是结果:

      Windows 10调用:Python time_parallel_processing.py 1,2,4,8 5

                      Duration/Iter           Duration total TotalIterCount
                               mean       std           mean           mean
      Mode   ParCount
      Joblib 1             9.502184  0.046554      47.542230              5
             2             9.557120  0.092897      49.488612             10
             4             9.602235  0.078271      50.249238             20
             8            10.518716  0.422020      60.186707             40
      Seq    2             9.493682  0.062105      95.083382             10
      

      AWS C5.9xLarge调用Python time_parallel_processing.py 1,2,4,8,16,20,24,28,32 5

                        Duration/Iter           Duration total TotalIterCount
                                 mean       std           mean           mean
      Mode     ParCount
      Parallel 1             5.271010  0.008730      15.862883              3
               2             5.400430  0.016094      16.271649              6
               4             5.708021  0.069001      17.428172             12
               8             6.088623  0.179789      18.745922             24
               16            8.330902  0.177772      25.566504             48
               20           10.515132  3.081697      47.895538             60
               24           13.506221  4.589382      53.348917             72
               28           16.318631  4.961513      57.536180             84            
               32           19.800182  4.435462      64.717435             96
      Seq      2             5.212529  0.037129      31.332297              6
      

共有1个答案

艾晋
2023-03-14

造成这种行为的原因是什么?

通常,这种类型的减速通常表示GIL阻塞、内核之间的上下文切换或执行大量pickly的某种组合

我是不是漏掉了什么?

您可能遗漏了一些小问题--请尝试分析(某些采样事件探查器的性能可能比cProfile高得多),看看时间都花在哪里了!
但是,在重新实现下面的建议之前,它的速度仍然有限

如何补救才能充分利用使用更多核心的前景?

看看numba和dask,它们允许您通过并行化GIL外部步骤来获得numpy和pandas代码的巨大加速

>

  • numba编译numpy代码并将其高速缓存,以获得更快的速度和更实用的处理器操作

    dask是一个在单个和多个系统上进行高效并行化的框架,它包含了很多好的技巧

  •  类似资料:
    • 问题内容: 我必须设置多少个分片和副本才能使用群集中的每个cpu核心(我希望100%的负载,最快的查询结果)? 我想使用Elasticsearch进行聚合。我读到Elasticsearch使用多个cpu核心,但是没有找到关于cpu核心在分片和副本方面的确切细节。 我的观察是,单个分片在查询时使用的内核/线程不超过1个(考虑到一次仅查询一个)。使用副本时,查询1-shard索引的速度不会更快,因为E

    • 问题内容: 当我加载该模块时: (来自http://www.freesoftwaremagazine.com/articles/drivers_linux?page=0,2) 该模块在2.6.39-02063904-generic(来自Ubuntu PPA)上被标记为in 且无法卸载。但是它在默认的2.6.38内核上可以正常工作。(均在Ubuntu 11.04 x86上)。 2.6.39中发生了什

    • 问题内容: 我正在寻找Redis,以便为我提供一个中间高速缓存存储,其中包含有关交集和并集之类的集合操作的大量计算信息。 我查看了redis网站,发现redis不是为多核CPU设计的。我的问题是,为什么会这样呢? 另外,如果是,我们如何在多核CPU上使用Redis来100%利用CPU资源。 问题答案: 我查看了redis网站,发现redis不是为多核CPU设计的。 我的问题是,为什么会这样? 这是

    • Go语言具有支持高并发的特性,可以很方便地实现多线程运算,充分利用多核心 cpu 的性能。 众所周知服务器的处理器大都是单核频率较低而核心数较多,对于支持高并发的程序语言,可以充分利用服务器的多核优势,从而降低单核压力,减少性能浪费。 Go语言实现多核多线程并发运行是非常方便的,下面举个例子: 运行结果如下: 线程0, sum为:10000 线程2, sum为:10000 线程3, sum为:10

    • 本文向大家介绍你认为vue的核心是什么?相关面试题,主要包含被问及你认为vue的核心是什么?时的应答技巧和注意事项,需要的朋友参考一下 一,数据驱动 专注于View 层。它让开发者省去了操作DOM的过程,只需要改变数据。 二,组件响应原理 数据(model)改变驱动视图(view)自动更新 三,组件化 扩展HTML元素,封装可重用的代码。

    • 问题内容: 以下使用argparse的子解析器的代码在Python 3上失败,但在Python 2中按预期运行。在比较了文档之后,我仍然不知道为什么。 Python 2.7.6的输出为: 在Python 3.3.5中,我得到: 问题答案: 最新版本改变了它测试所需参数的方式,而次级解析器也陷入了困境。它们不再是“必需的”。 http://bugs.python.org/issue9253#msg1