当前位置: 首页 > 工具软件 > Rayon > 使用案例 >

【每周一库】- Rayon 数据并行计算库

史钊
2023-12-01

Rayon - 数据并行计算库

Rayon 是一个Rust的数据并行计算库。它非常轻巧,可以轻松地将顺序计算转换为并行计算。同时保证不会有数据争用情况出现。

并行迭代器

使用Rayon,可以轻松地将顺序迭代器转换为并行迭代器:通常,只需将您的foo.iter()调用更改为foo.par_iter(),其余则由Rayon完成:

use rayon::prelude::*;
fn sum_of_squares(input: &[i32]) -> i32 {
    input.par_iter() // <-- 只需要更改这里
         .map(|&i| i * i)
         .sum()
}

并行迭代器负责确定如何将数据划分为任务;它会动态适应以达到最佳性能。如果你需要更大的灵活性,那么Rayon还提供了joinscope函数,允许用户自己创建并行任务。为了获得更多控制,还可以创建自定义线程池,而不是使用Rayon的默认全局线程池。

无数据争用

通常大家可能觉得并行执行会产生各种疯狂的错误。不用紧张,Rayon的API均保证无数据争用情况发生,通常可以排除大多数并行错误(尽管不是全部)。换句话说,只要代码通过编译,它通常会执行与非并行情况下相同的操作。

对于大多数情况,使用并行迭代器产生可以保证结果与顺序迭代器结果相同。不过需要特别注意的是:如果您的迭代器有副作用(例如,通过Rust通道将方法发送到其他线程,或者磁盘写入),这些副作用可能会以不同的顺序发生。还要注意,在某些情况下,并行迭代器提供了具有更高性能的顺序迭代器方法的替代版本。

使用Rayon

你可以在crates.io找到Rayon. 推荐的使用方法是在Cargo.toml文件中添加以下一行:

[dependencies]
rayon = "1.1"

要使用并行迭代器API,特定的特征必须被提前引用。引用这些特征最简单方法是使用Rayon prelude。在每个要使用并行迭代器API的模块中,只需添加:

use rayon::prelude::*;

Rayon 需要 rustc 1.31.0 及以上版本.

示范

想要了解Rayon的实际使用方法,请查看rayon-demo目录,其中包括使用Rayon的许多代码演示。例如,运行此命令以获得nbody模拟的可视化。要查看使用Rayon的效果,请按s进行顺序运行,按p进行并行运行。

> cd rayon-demo
> cargo run --release -- nbody visualize

想要了解更多关于示范的信息, 运行:

> cd rayon-demo
> cargo run --release -- --help

模块 rayon::iter

使用迭代器风格的接口编写并行程序所需的特征

除非你需要命名一种迭代器类型,否则几乎不需要直接与此模块进行交互。

并行迭代器使编写类似迭代器的并行执行链变得容易:通常,您要做的就是将第一个.iter() (或 iter_mut()into_iter(), 等) 方法转换为par_iter() (或 par_iter_mut()into_par_iter(), 等)。例如,要计算整数序列的平方和,可以这样写:

use rayon::prelude::*;
fn sum_of_squares(input: &[i32]) -> i32 {
    input.par_iter()
         .map(|i| i * i)
         .sum()
}

或者,要递增切片中的每个整数,可以这样写:

use rayon::prelude::*;
fn increment_all(input: &mut [i32]) {
    input.par_iter_mut()
         .for_each(|p| *p += 1);
}

要使用并行迭代器,首先通过在你的模块中添加use rayon::prelude::*来导入特征。然后,您可以调用par_iterpar_iter_mutinto_par_iter来获取并行迭代器。像常规迭代器一样,并行迭代器的工作方式是先构造一个计算,然后执行。

函数 rayon::join

pub fn join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB) where
    A: FnOnce() -> RA + Send,
    B: FnOnce() -> RB + Send,
    RA: Send,
    RB: Send,

进行两个闭包,尽可能的以并行的方式运行。并从这些闭包中返回一对结果。

从概念上讲,调用join()类似于生成两个线程,每个线程执行其中一个闭包。但是,实现方式却大不相同,并且产生的额外开销非常低。我们使用的底层技术称为“工作窃取”:Rayon运行时使用固定的工作线程池,并尝试仅在有空闲CPU处理时并行执行代码。

当从线程池外部调用join时,当闭包在池中执行时,调用线程将阻塞。当在池中调用join时,调用线程仍会积极参与线程池。它将从在当前线程上执行闭包A开始。在执行的同时,它会通告其他线程闭包B为可被执行状态。一旦闭包A完成,当前线程将尝试执行闭包B。但是,如果封包B被"窃取",那么它将在等待"窃取线程"完全执行封包B的同时寻找其他工作。(这是典型的工作窃取策略)。

例子:

let mut v = vec![5, 1, 8, 22, 0, 44];
quick_sort(&mut v);
assert_eq!(v, vec![0, 1, 5, 8, 22, 44]);

fn quick_sort<T:PartialOrd+Send>(v: &mut [T]) {
   if v.len() > 1 {
       let mid = partition(v);
       let (lo, hi) = v.split_at_mut(mid);
       rayon::join(|| quick_sort(lo),
                   || quick_sort(hi));
   }
}

// 分区会将分界值左侧所有的元素重新排列到切片的第一部分中
// (分界值被任意选取为切片中的最后一个元素)
// 然后返回分界值的索引
fn partition<T:PartialOrd+Send>(v: &mut [T]) -> usize {
    let pivot = v.len() - 1;
    let mut i = 0;
    for j in 0..pivot {
        if v[j] <= v[pivot] {
            v.swap(i, j);
            i += 1;
        }
    }
    v.swap(i, pivot);
    i
}

这次的每周一库就到这里。大家保重身体!Keep coding!

 类似资料: