我试图在Python3.8中使用更多可用的内核,通过并行化(通过joblib)来加快计算速度,但观察到它的扩展性很差。
我编写了一个小脚本来测试和演示稍后可以找到的行为。脚本(请参阅后面的内容)设计成有一个完全独立的任务,使用NumPy和Pandas对虚拟操作进行一些迭代。没有任务的输入和输出,没有磁盘或其他I/O,也没有任何通信或共享内存,只是简单的CPU和RAM使用。除了对当前时间的偶尔请求之外,进程不使用任何其他资源。Amdahl定律不应该适用于这里的代码,因为除了流程设置之外根本就没有通用代码。
我通过使用顺序和并行化处理复制任务来运行一些增加工作负载的实验,并测量了每次迭代和整个(并行)过程完成所需的时间。我在我的Windows10笔记本电脑和两台AWS EC2 Linux(亚马逊Linux 2)机器上运行了这个脚本。并行处理的数量从未超过可用内核的数量。
我观察到了以下内容(详细信息请参阅后面的结果,持续时间以秒为单位):
用户
)从未超过93%,系统
调用未超过4%,并且没有IOWAIT
(使用iostat-HXM 10
测量)
用户
)约为48%这些如此重大的发现令我始料未及。
这是什么原因导致的?
我是不是漏掉了什么?
如何补救才能充分利用使用更多核心的前景?
6个CPU、12个核心调用:python.\time_parallel_processing.py 1,2,4,8 10
Duration/Iter Duration total TotalIterCount
mean std mean mean
Mode ParallelCount
Joblib 1 4.363902 0.195268 43.673971 10
2 6.322100 0.140654 63.870973 20
4 9.270582 0.464706 93.631790 40
8 15.489000 0.222859 156.670544 80
Seq 1 4.409772 0.126686 44.133441 10
2 4.465326 0.113183 89.377296 20
4 4.534959 0.125097 181.528372 40
8 4.444790 0.083315 355.849860 80
8个CPU、16个核心调用:Python time_parallel_processing.py 1,2,4,8,16 10
Duration/Iter Duration total TotalIterCount
mean std mean mean
Mode ParCount
Joblib 1 2.196086 0.009798 21.987626 10
2 3.392873 0.010025 34.297323 20
4 4.519174 0.126054 45.967140 40
8 6.888763 0.676024 71.815990 80
16 12.191278 0.156941 123.287779 160
Seq 1 2.192089 0.010873 21.945536 10
2 2.184294 0.008955 43.735713 20
4 2.201437 0.027537 88.156621 40
8 2.145312 0.009631 171.805374 80
16 2.137723 0.018985 342.393953 160
18个CPU,36个内核调用:Python time_parallel_processing.py 1,2,4,8,16,32 10
Duration/Iter Duration total TotalIterCount
mean std mean mean
Mode ParCount
Joblib 1 1.888071 0.023799 18.905295 10
2 2.797132 0.009859 28.307708 20
4 3.349333 0.106755 34.199839 40
8 4.273267 0.705345 45.998927 80
16 6.383214 1.455857 70.469109 160
32 10.974141 4.220783 129.671016 320
Seq 1 1.891170 0.030131 18.934494 10
2 1.866365 0.007283 37.373133 20
4 1.893082 0.041085 75.813468 40
8 1.855832 0.007025 148.643725 80
16 1.896622 0.007573 303.828529 160
32 1.864366 0.009142 597.301383 320
import argparse
import sys
import time
from argparse import Namespace
from typing import List
import numpy as np
import pandas as pd
from joblib import delayed
from joblib import Parallel
from tqdm import tqdm
RESULT_COLUMNS = {"Mode": str, "ParCount": int, "ProcessId": int, "IterId": int, "Duration": float}
def _create_empty_data_frame() -> pd.DataFrame:
return pd.DataFrame({key: [] for key, _ in RESULT_COLUMNS.items()}).astype(RESULT_COLUMNS)
def _do_task() -> None:
for _ in range(10):
array: np.ndarray = np.random.rand(2500, 2500)
_ = np.matmul(array, array)
data_frame: pd.DataFrame = pd.DataFrame(np.random.rand(250, 250), columns=list(map(str, list(range(250)))))
_ = data_frame.merge(data_frame)
def _process(process_id: int, iter_count: int) -> pd.DataFrame:
durations: pd.DataFrame = _create_empty_data_frame()
for i in tqdm(range(iter_count)):
iter_start_time: float = time.time()
_do_task()
durations = durations.append(
{
"Mode": "",
"ParCount": 0,
"ProcessId": process_id,
"IterId": i,
"Duration": time.time() - iter_start_time,
},
ignore_index=True,
)
return durations
def main(args: Namespace) -> None:
"""Execute main script."""
iter_durations: List[pd.DataFrame] = []
mode_durations: List[pd.DataFrame] = []
for par_count in list(map(int, args.par_counts.split(","))):
total_iter_count: int = par_count * int(args.iter_count)
print(f"\nRunning {par_count} processes in parallel and {total_iter_count} iterations in total")
start_time_joblib: float = time.time()
with Parallel(n_jobs=par_count) as parallel:
joblib_durations: List[pd.DataFrame] = parallel(
delayed(_process)(process_id, int(args.iter_count)) for process_id in range(par_count)
)
iter_durations.append(pd.concat(joblib_durations).assign(**{"Mode": "Joblib", "ParCount": par_count}))
end_time_joblib: float = time.time()
print(f"\nRunning {par_count} processes sequentially with {total_iter_count} iterations in total")
start_time_seq: float = time.time()
seq_durations: List[pd.DataFrame] = []
for process_id in range(par_count):
seq_durations.append(_process(process_id, int(args.iter_count)))
iter_durations.append(pd.concat(seq_durations).assign(**{"Mode": "Seq", "ParCount": par_count}))
end_time_seq: float = time.time()
mode_durations.append(
pd.DataFrame(
{
"Mode": ["Joblib", "Seq"],
"ParCount": [par_count] * 2,
"Duration": [end_time_joblib - start_time_joblib, end_time_seq - start_time_seq],
"TotalIterCount": [total_iter_count] * 2,
}
)
)
print("\nDuration in seconds")
grouping_columns: List[str] = ["Mode", "ParCount"]
print(
pd.concat(iter_durations)
.groupby(grouping_columns)
.agg({"Duration": ["mean", "std"]})
.merge(
pd.concat(mode_durations).groupby(grouping_columns).agg({"Duration": ["mean"], "TotalIterCount": "mean"}),
on=grouping_columns,
suffixes=["/Iter", " total"],
how="inner",
)
)
if __name__ == "__main__":
print(f"Command line: {sys.argv}")
parser: argparse.ArgumentParser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"par_counts",
help="Comma separated list of parallel processes counts to start trials for (e.g. '1,2,4,8,16,32')",
)
parser.add_argument("iter_count", help="Number of iterations per parallel process to carry out")
args: argparse.Namespace = parser.parse_args()
start_time: float = time.time()
main(args)
print(f"\nTotal elapsed time: {time.time() - start_time:.2f} seconds")
使用“conda env create-f environment.time_parallel.yaml
”“environment.time_parallel.yaml
”创建:
name: time_parallel
channels:
- defaults
- conda-forge
dependencies:
- python=3.8.5
- pip=20.3.3
- pandas=1.2.0
- numpy=1.19.2
- joblib=1.0.0
- tqdm=4.55.1
感谢@Sholderbach的帮助,我调查了numpy/pandas的用法,发现了一些事情。
NumPy使用了一个线性代数后端,它会自动在并行线程中运行一些命令(包括矩阵乘法),这会导致太多的线程阻塞系统,并行进程越多越多,因此每次迭代的持续时间也会增加。我通过删除方法_do_task
中的NumPy和Pandas操作来测试这个hypthement,并仅用简单的数学操作来替换它:
def _do_task() -> None:
for _ in range(10):
for i in range(10000000):
_ = 1000 ^ 2 % 200
结果与预期完全一样,因为当增加进程数(超过可用内核数)时,迭代的持续时间不会改变。
Windows 10
调用python time_parallel_processing.py 1,2,4,8 5
Duration in seconds
Duration/Iter Duration total TotalIterCount
mean std mean mean
Mode ParCount
Joblib 1 2.562570 0.015496 13.468393 5
2 2.556241 0.021074 13.781174 10
4 2.565614 0.054754 16.171828 20
8 2.630463 0.258474 20.328055 40
Seq 2 2.576542 0.033270 25.874965 10
AWS C5.9X大型
调用python time_parallel_processing.py 1,2,4,8,16,32 10
Duration/Iter Duration total TotalIterCount
mean std mean mean
Mode ParCount
Joblib 1 2.082849 0.022352 20.854512 10
2 2.126195 0.034078 21.596860 20
4 2.287874 0.254493 27.420978 40
8 2.141553 0.030316 21.912917 80
16 2.156828 0.137937 24.483243 160
32 3.581366 1.197282 42.884399 320
Seq 2 2.076256 0.004231 41.571033 20
根据@Sholderbach的提示,我找到了一些其他链接,这些链接涉及线性代数后端自动使用多线程以及如何关闭这些线程的主题:
ThreadPoolCTL
包
添加到_process
:
proc = psutil.Process()
proc.cpu_affinity([process_id])
with threadpool_limits(limits=1):
...
添加到环境:
- threadpoolctl=2.1.0
- psutil=5.8.0
注意:我不得不将joblib
替换为multiprocessing
,因为钉扎在joblib
中不能正常工作(在Linux上每次只生成一半进程)。
我做了一些测试,结果好坏参半。监控显示,对每个进程进行pinnng和限制为一个线程对Windows10和Linux/AWS C5.9xLarge都有效。不幸的是,每次迭代的绝对持续时间由于这些“修正”而增加。而且,每次迭代的持续时间在并行化的某个点仍然开始增加。
以下是结果:
Windows 10调用:Python time_parallel_processing.py 1,2,4,8 5
Duration/Iter Duration total TotalIterCount
mean std mean mean
Mode ParCount
Joblib 1 9.502184 0.046554 47.542230 5
2 9.557120 0.092897 49.488612 10
4 9.602235 0.078271 50.249238 20
8 10.518716 0.422020 60.186707 40
Seq 2 9.493682 0.062105 95.083382 10
AWS C5.9xLarge调用Python time_parallel_processing.py 1,2,4,8,16,20,24,28,32 5
Duration/Iter Duration total TotalIterCount
mean std mean mean
Mode ParCount
Parallel 1 5.271010 0.008730 15.862883 3
2 5.400430 0.016094 16.271649 6
4 5.708021 0.069001 17.428172 12
8 6.088623 0.179789 18.745922 24
16 8.330902 0.177772 25.566504 48
20 10.515132 3.081697 47.895538 60
24 13.506221 4.589382 53.348917 72
28 16.318631 4.961513 57.536180 84
32 19.800182 4.435462 64.717435 96
Seq 2 5.212529 0.037129 31.332297 6
造成这种行为的原因是什么?
通常,这种类型的减速通常表示GIL阻塞、内核之间的上下文切换或执行大量pickly的某种组合
我是不是漏掉了什么?
您可能遗漏了一些小问题--请尝试分析(某些采样事件探查器的性能可能比cProfile高得多),看看时间都花在哪里了!
但是,在重新实现下面的建议之前,它的速度仍然有限
如何补救才能充分利用使用更多核心的前景?
看看numba和dask,它们允许您通过并行化GIL外部步骤来获得numpy和pandas代码的巨大加速
>
numba编译numpy代码并将其高速缓存,以获得更快的速度和更实用的处理器操作
dask是一个在单个和多个系统上进行高效并行化的框架,它包含了很多好的技巧
问题内容: 我必须设置多少个分片和副本才能使用群集中的每个cpu核心(我希望100%的负载,最快的查询结果)? 我想使用Elasticsearch进行聚合。我读到Elasticsearch使用多个cpu核心,但是没有找到关于cpu核心在分片和副本方面的确切细节。 我的观察是,单个分片在查询时使用的内核/线程不超过1个(考虑到一次仅查询一个)。使用副本时,查询1-shard索引的速度不会更快,因为E
问题内容: 当我加载该模块时: (来自http://www.freesoftwaremagazine.com/articles/drivers_linux?page=0,2) 该模块在2.6.39-02063904-generic(来自Ubuntu PPA)上被标记为in 且无法卸载。但是它在默认的2.6.38内核上可以正常工作。(均在Ubuntu 11.04 x86上)。 2.6.39中发生了什
问题内容: 我正在寻找Redis,以便为我提供一个中间高速缓存存储,其中包含有关交集和并集之类的集合操作的大量计算信息。 我查看了redis网站,发现redis不是为多核CPU设计的。我的问题是,为什么会这样呢? 另外,如果是,我们如何在多核CPU上使用Redis来100%利用CPU资源。 问题答案: 我查看了redis网站,发现redis不是为多核CPU设计的。 我的问题是,为什么会这样? 这是
Go语言具有支持高并发的特性,可以很方便地实现多线程运算,充分利用多核心 cpu 的性能。 众所周知服务器的处理器大都是单核频率较低而核心数较多,对于支持高并发的程序语言,可以充分利用服务器的多核优势,从而降低单核压力,减少性能浪费。 Go语言实现多核多线程并发运行是非常方便的,下面举个例子: 运行结果如下: 线程0, sum为:10000 线程2, sum为:10000 线程3, sum为:10
本文向大家介绍你认为vue的核心是什么?相关面试题,主要包含被问及你认为vue的核心是什么?时的应答技巧和注意事项,需要的朋友参考一下 一,数据驱动 专注于View 层。它让开发者省去了操作DOM的过程,只需要改变数据。 二,组件响应原理 数据(model)改变驱动视图(view)自动更新 三,组件化 扩展HTML元素,封装可重用的代码。
问题内容: 以下使用argparse的子解析器的代码在Python 3上失败,但在Python 2中按预期运行。在比较了文档之后,我仍然不知道为什么。 Python 2.7.6的输出为: 在Python 3.3.5中,我得到: 问题答案: 最新版本改变了它测试所需参数的方式,而次级解析器也陷入了困境。它们不再是“必需的”。 http://bugs.python.org/issue9253#msg1