python中的进程与线程,使用.multiprocessing

姬实
2023-12-01

多任务

并发

一段时间交替去执行多个任务

例:单核cpu处理多任务,操作系统轮流让各个任务交替执行

任务数量大于CPU的核心数

并行

在一段时间内真正的同时一起执行多个任务

例:多核cpu处理多任务,操作系统给cpu的每个内核安排一个执行的任务

任务数量小于等于CPU的核心数

进程

进程(process)是资源分配的最小单位

创建进程步骤

1.导入进程包

import multiprocessing

2.通过进程类创建进程对象

无传参

进程对象 = multiprocessing.Process(target = 进程指定的函数名)

有传参

进程对象 = multiprocessing.Process(target = 进程指定的函数名 , args = (3, ) ) #元组传参

进程对象 = multiprocessing.Process(target = 进程指定的函数名 , kwargs = (”num“, 3 ) ) #字典传参

3.启动进程执行任务

进程对象.start()

4.获取进程编号

import os

os.getpid() #当前进程编号

os.getppid() #父进程编号

5.主进程会等待所有子进程执行结束再结束,除非设置子进程守护主进程

6.进程里不能套进程

进程代码

IO密集型

   如果是一个磁盘或网络为主的应用程序(IO密集型程序),一个线程处在 IO 等待的时候,另一个线程还可以在 CPU 里面跑,有时候 CPU 闲着没事干,所有的线程都在等着 IO,这时候他们就是同时的了,而单线程的话,此时还是在一个一个等待的。我们都知道IO的速度比起 CPU 来是很慢的。此时线程数可以是CPU核心数的数倍(视情况而定)。

无返回值多进程

from tqdm import tqdm
import multiprocessing
from concurrent.futures import ProcessPoolExecutor

def single_processor(images_file):
		*****

tasks = preprocessing(args.input)
with ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
		list(tqdm(executor.map(single_processor, tasks), total=len(tasks)))

有返回值简洁多进程

https://blog.csdn.net/weixin_36149892/article/details/105152574

from concurrent.futures import ProcessPoolExecutor
def eval_lanes_list(
        self, input_dict
        # self, idx, target_lane_type, badcase=None
    ):
        idx, target_lane_type, badcase = input_dict['idx'], input_dict['target_lane_type'], input_dict['badcase']

all_inputs = []
for task in tasks:
    # all_inputs.append((task, lane_type, badcase, ))
    all_inputs.append(dict(
        idx=task,
        target_lane_type=lane_type,
        badcase=badcase,
    ))

with ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
    results = list(tqdm(executor.map(self.eval_lanes_list, all_inputs), total=len(all_inputs)))

有返回值优雅多进程

#先得到需要多进程的list
tasks = preprocessing(args.input)
#存储返回值里的东西
nusences_data = {}
#创建进程池子
pool = multiprocessing.Pool(processes=15)
#接收返回值
tasks_pools = []
for task in tasks:
	#apply_async是异步非阻塞式,不用等待当前进程执行完毕,随时跟进操作系统调度来进行进程切换,即多个进程并行执行,提高程序的执行效率。
	tasks_pools.append(pool.apply_async(func = single_processor, (task,)))
for res in tqdm(tasks_pools):
	single_img_label = res.get()
if single_img_label is not None:
	nusences_data.update(single_img_label)
nusences_data = list(nusences_data.values())
#解析返回值
for data in nusences_data:

CPU密集型

    一个计算为主的应用程序(CPU密集型程序),多线程或多进程跑的时候,可以充分利用起所有的 CPU 核心数,比如说16核的CPU ,开16个线程的时候,可以同时跑16个线程的运算任务,此时是最大效率。但是如果线程数/进程数远远超出 CPU 核心数量,反而会使得任务效率下降,因为**频繁的切换线程或进程**也是要消耗时间的。因此对于 CPU 密集型的任务来说,线程数/进程数等于 CPU 数是最好的了。

略微麻烦高效多进程

def eval_lanes_list(
        self, start_index, end_index, target_lane_type, badcase=None, q=None
    ):
        total_tp, total_fp, total_fn = 0
        if q is not None:
            q.put([total_tp, total_fp, total_fn])
        else:
            return total_tp, total_fp, total_fn

def lane_multi_process(self, total_len, process_num, lane_type, badcase):
      jobs = []
      every_process_len = ((total_len - 1) // process_num) + 1
      q = Manager().Queue()
      for i in range(process_num):
          start_index = i * every_process_len
          if i == process_num - 1:
              end_index = total_len
          else:
              end_index = (i + 1) * every_process_len
          p = multiprocessing.Process(
              target=self.eval_lanes_list,
              args=(start_index, end_index, lane_type, badcase, q),
          )
          jobs.append(p)
          p.start()
      for p in jobs:
          p.join()
      result = [q.get() for j in jobs]
      total_tp = total_fp = total_fn = 0.0
      for tp, fp, fn in result:
          total_tp += tp
          total_fp += fp
          total_fn += fn
      return total_tp, total_fp, total_fn

process_num = 20
total_len = len(self.anno_lanes)

if (process_num > 0) & (total_len - 1 > process_num):
    total_tp, total_fp, total_fn = self.lane_multi_process(
        total_len, process_num, lane_type, badcase
    )
else:
    total_tp, total_fp, total_fn = self.eval_lanes_list(
        0, total_len, lane_type, badcase, q=None
    )
            

线程

线程是程序执行的最小单位,可以和同属一个进程的其他线程共享进程的全部资源

例:一个QQ软件的两个聊天窗口

创建线程步骤

1.导入线程包

import threading

2.通过线程类创建线程对象

无传参

线程对象 = threading.Thread(target = 线程指定的函数名)

有传参

线程对象 = multiprocessing.Process(target = 线程指定的函数名 , args = (3, ) ) #元组传参

线程对象 = multiprocessing.Process(target = 线程指定的函数名 , kwargs = (”num“, 3 ) ) #字典传参

3.启动线程执行任务

线程对象.start()

4.获取线程编号

import os

os.getpid() #当前线程编号

os.getppid() #父线程编号

5.主进程会等待所有子进程执行结束再结束,除非设置子进程守护主进程

6.进程里不能套进程

 类似资料: