跳到主要内容

C++ 并行算法(C++17)

引言

在现代计算机中,多核处理器已经成为标准配置。然而,充分利用多核处理能力一直是编程中的一个挑战。C++17引入了并行算法,通过简单的方式让开发者能够充分利用多核处理器的性能,无需手动管理线程。

并行算法是对现有STL算法的扩展,允许开发者通过简单地添加执行策略参数来指定算法的并行执行方式。这使得并行编程变得简单、可靠,且不需要掌握复杂的多线程编程知识。

执行策略

C++17引入了三种基本的执行策略,它们被定义在<execution>头文件中:

  1. std::execution::seq(顺序执行):算法将按顺序执行,不并行化。
  2. std::execution::par(并行执行):算法可以在多个线程中并行执行。
  3. std::execution::par_unseq(并行+向量化执行):算法可以并行执行,并且可以使用SIMD指令进行向量化。
备注

使用并行算法需要包含<execution>头文件。大多数标准库算法都支持并行执行。

使用并行算法

让我们看一个简单的例子,比较顺序和并行版本的std::sort的性能:

cpp
#include <iostream>
#include <vector>
#include <algorithm>
#include <execution>
#include <random>
#include <chrono>

int main() {
// 创建一个包含大量随机数的向量
std::vector<int> data(10000000);
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> distrib(1, 100000);

for (auto& item : data) {
item = distrib(gen);
}

// 复制数据以进行公平比较
auto data_seq = data;
auto data_par = data;

// 顺序排序并测量时间
auto start = std::chrono::high_resolution_clock::now();
std::sort(std::execution::seq, data_seq.begin(), data_seq.end());
auto seq_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - start
).count();

// 并行排序并测量时间
start = std::chrono::high_resolution_clock::now();
std::sort(std::execution::par, data_par.begin(), data_par.end());
auto par_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - start
).count();

std::cout << "顺序排序耗时: " << seq_duration << " ms\n";
std::cout << "并行排序耗时: " << par_duration << " ms\n";
std::cout << "加速比: " << static_cast<double>(seq_duration) / par_duration << "x\n";

// 验证两个结果是否相同
bool same = data_seq == data_par;
std::cout << "结果相同: " << (same ? "是" : "否") << std::endl;

return 0;
}

在多核系统上运行此代码,你会看到并行版本的std::sort明显快于顺序版本。典型的输出可能如下:

顺序排序耗时: 1245 ms
并行排序耗时: 312 ms
加速比: 3.99x
结果相同: 是
提示

实际的性能提升会因处理器核心数、内存带宽和其他系统因素而有很大差异。

支持并行执行的算法

C++17中支持并行执行的算法很多,以下是一些常见的例子:

  • 非修改序列操作:for_eachcountfind
  • 修改序列操作:copyfillreplaceswap_ranges
  • 排序和相关操作:sortstable_sortpartial_sort
  • 数值运算:reducetransform_reduce

并行算法的例子

并行for_each

cpp
#include <iostream>
#include <vector>
#include <algorithm>
#include <execution>
#include <thread>

int main() {
std::vector<int> data(1000);

// 使用并行for_each初始化向量
std::for_each(std::execution::par, data.begin(), data.end(),
[](int& value) {
// 获取当前线程ID作为唯一标识
std::hash<std::thread::id> hasher;
std::size_t thread_num = hasher(std::this_thread::get_id()) % 100;
value = static_cast<int>(thread_num);
// 模拟一些计算
std::this_thread::sleep_for(std::chrono::microseconds(100));
}
);

// 计算不同线程处理的元素数量
std::vector<int> counts(100, 0);
for (int value : data) {
counts[value]++;
}

// 输出结果
std::cout << "各线程处理的元素数量:\n";
for (std::size_t i = 0; i < counts.size(); ++i) {
if (counts[i] > 0) {
std::cout << "线程 " << i << ": " << counts[i] << " 个元素\n";
}
}

return 0;
}

并行transform_reduce

cpp
#include <iostream>
#include <vector>
#include <numeric>
#include <execution>
#include <chrono>

int main() {
// 创建两个大向量
std::vector<double> v1(10000000, 1.0);
std::vector<double> v2(10000000, 2.0);

// 使用顺序执行计算点积
auto start = std::chrono::high_resolution_clock::now();
double result_seq = std::transform_reduce(
std::execution::seq,
v1.begin(), v1.end(),
v2.begin(),
0.0
);
auto seq_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - start
).count();

// 使用并行执行计算点积
start = std::chrono::high_resolution_clock::now();
double result_par = std::transform_reduce(
std::execution::par,
v1.begin(), v1.end(),
v2.begin(),
0.0
);
auto par_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - start
).count();

std::cout << "顺序计算点积耗时: " << seq_duration << " ms,结果: " << result_seq << "\n";
std::cout << "并行计算点积耗时: " << par_duration << " ms,结果: " << result_par << "\n";
std::cout << "加速比: " << static_cast<double>(seq_duration) / par_duration << "x\n";

return 0;
}

实际应用案例:图像处理

下面是一个使用并行算法对图像进行灰度转换的简化示例:

cpp
#include <iostream>
#include <vector>
#include <algorithm>
#include <execution>
#include <chrono>

// 简化的RGB像素结构
struct Pixel {
unsigned char r, g, b;
};

// 简化的图像类
class Image {
public:
Image(int width, int height) : width_(width), height_(height), pixels_(width * height) {}

Pixel& at(int x, int y) {
return pixels_[y * width_ + x];
}

const Pixel& at(int x, int y) const {
return pixels_[y * width_ + x];
}

int width() const { return width_; }
int height() const { return height_; }

std::vector<Pixel>& data() { return pixels_; }
const std::vector<Pixel>& data() const { return pixels_; }

private:
int width_, height_;
std::vector<Pixel> pixels_;
};

// 将图像转换为灰度
Image convertToGrayscale(const Image& input, bool useParallel) {
Image output(input.width(), input.height());

auto convertPixel = [](const Pixel& p) -> Pixel {
// 计算灰度值 (0.299*R + 0.587*G + 0.114*B)
unsigned char gray = static_cast<unsigned char>(
0.299 * p.r + 0.587 * p.g + 0.114 * p.b
);
return {gray, gray, gray};
};

if (useParallel) {
std::transform(
std::execution::par,
input.data().begin(),
input.data().end(),
output.data().begin(),
convertPixel
);
} else {
std::transform(
std::execution::seq,
input.data().begin(),
input.data().end(),
output.data().begin(),
convertPixel
);
}

return output;
}

int main() {
// 创建一个4K图像 (3840x2160)
const int width = 3840;
const int height = 2160;
Image image(width, height);

// 用随机颜色填充图像
std::generate(image.data().begin(), image.data().end(), []() {
return Pixel{
static_cast<unsigned char>(rand() % 256),
static_cast<unsigned char>(rand() % 256),
static_cast<unsigned char>(rand() % 256)
};
});

// 顺序执行灰度转换
auto start = std::chrono::high_resolution_clock::now();
Image grayscale1 = convertToGrayscale(image, false);
auto seq_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - start
).count();

// 并行执行灰度转换
start = std::chrono::high_resolution_clock::now();
Image grayscale2 = convertToGrayscale(image, true);
auto par_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - start
).count();

std::cout << "4K图像灰度转换:\n";
std::cout << "顺序执行耗时: " << seq_duration << " ms\n";
std::cout << "并行执行耗时: " << par_duration << " ms\n";
std::cout << "加速比: " << static_cast<double>(seq_duration) / par_duration << "x\n";

return 0;
}

注意事项和最佳实践

使用并行算法时,需要注意以下几点:

  1. 数据大小:并行算法的开销可能比顺序算法大,只有在处理大量数据时才能看到明显的性能提升。

  2. 无副作用:传递给并行算法的函数应该是纯函数,没有副作用,不依赖执行顺序。

  3. 避免数据竞争:确保并行操作不会导致数据竞争。

  4. 硬件限制:并行算法的加速受限于可用的处理器核心数和内存带宽。

  5. 编译器支持:需要使用支持C++17并且实现了并行算法的编译器。

警告

某些并行算法可能会以不同于顺序执行的顺序处理元素。如果你的代码依赖于特定的执行顺序,使用并行算法可能会导致意外结果。

何时使用并行算法

并行算法适合以下场景:

  • 处理大量数据(数百万个元素或更多)
  • 每个元素的处理是独立的
  • 处理每个元素需要一定的计算量
  • 硬件有多个处理器核心

不适合并行算法的场景:

  • 数据量小
  • 处理过程中有大量的同步需求
  • 处理每个元素非常简单,并行开销可能超过收益
  • 处理过程强烈依赖于顺序

总结

C++17并行算法是STL的强大扩展,通过简单地添加执行策略参数,就可以利用现代多核处理器的性能来加速计算。关键要点:

  • 使用#include <execution>获取执行策略
  • 三种主要的执行策略:seq(顺序)、par(并行)和par_unseq(并行+向量化)
  • 大多数STL算法都支持并行执行
  • 并行算法最适合处理大量独立数据

通过掌握并行算法,你可以在不深入了解复杂的多线程编程的情况下,编写高效利用现代硬件的代码。

练习

  1. 比较std::for_eachstd::transformstd::reduce在处理1000万个元素时的顺序和并行性能。
  2. 实现一个并行的矩阵乘法算法。
  3. 使用并行算法实现一个简单的图像模糊滤镜。
  4. 比较parpar_unseq执行策略在不同算法上的性能差异。

进一步学习资源