跳到主要内容

C++ 11并发支持

在现代多核处理器时代,有效利用计算资源需要并发编程技术。C++11标准引入了一套完整的并发编程支持,使开发者能够更方便地编写多线程应用程序,而无需依赖操作系统特定的API。

并发编程基础

并发编程指的是程序的多个部分可以同时执行,从而提高程序的性能和响应能力。在C++11之前,开发者必须使用操作系统特定的库(如POSIX线程或Windows线程)来编写并发代码,这导致了代码的可移植性问题。

备注

并发和并行的区别:并发是指程序的多个任务在逻辑上同时进行,而并行是指这些任务在物理上同时执行(需要多核处理器)。

C++ 11引入的并发特性

1. 线程支持库

C++11引入了<thread>头文件,提供了std::thread类来创建和管理线程。

创建和使用线程

cpp
#include <iostream>
#include <thread>

void hello() {
std::cout << "Hello from thread!" << std::endl;
}

int main() {
// 创建一个线程并执行hello函数
std::thread t(hello);

// 等待线程完成
t.join();

std::cout << "Main thread continues..." << std::endl;
return 0;
}

输出:

Hello from thread!
Main thread continues...

向线程函数传递参数

cpp
#include <iostream>
#include <thread>
#include <string>

void greeting(std::string name, int times) {
for (int i = 0; i < times; ++i) {
std::cout << "Hello, " << name << "!" << std::endl;
}
}

int main() {
// 创建线程并传递参数
std::thread t(greeting, "Alice", 3);

t.join();
return 0;
}

输出:

Hello, Alice!
Hello, Alice!
Hello, Alice!
警告

如果在线程完成之前主线程结束,程序将终止并导致未定义行为。必须使用join()detach()处理每个创建的线程。

2. 互斥量和锁

在多线程环境中,当多个线程访问共享资源时,可能会导致数据竞争。C++11提供了<mutex>头文件,包含多种互斥量和锁类型来保护共享数据。

基本互斥量用法

cpp
#include <iostream>
#include <thread>
#include <mutex>
#include <vector>

std::mutex mtx; // 互斥量保护共享数据
int counter = 0; // 共享数据

void increment(int n) {
for (int i = 0; i < n; ++i) {
mtx.lock(); // 锁定互斥量
++counter; // 安全地访问共享数据
mtx.unlock(); // 解锁互斥量
}
}

int main() {
std::vector<std::thread> threads;

// 创建5个线程,每个线程增加计数器1000次
for (int i = 0; i < 5; ++i) {
threads.push_back(std::thread(increment, 1000));
}

// 等待所有线程完成
for (auto& t : threads) {
t.join();
}

std::cout << "Final counter value: " << counter << std::endl;
return 0;
}

输出:

Final counter value: 5000

使用锁保护机制

手动管理互斥量的锁定和解锁容易出错。C++11提供了RAII风格的锁保护机制:

cpp
#include <iostream>
#include <thread>
#include <mutex>

std::mutex mtx;
int counter = 0;

void increment(int n) {
for (int i = 0; i < n; ++i) {
// 使用lock_guard自动管理锁的生命周期
std::lock_guard<std::mutex> lock(mtx);
++counter;
// 离开作用域时自动解锁
}
}

C++11还提供了std::unique_lock,它比std::lock_guard更灵活,但开销略大:

cpp
void process_data() {
std::unique_lock<std::mutex> lock(mtx);
// 处理数据...
lock.unlock(); // 可以提前解锁

// 进行一些不需要锁的工作

lock.lock(); // 可以再次锁定
// 继续处理需要保护的数据
}

3. 条件变量

条件变量用于在线程间进行同步,允许线程在满足特定条件时唤醒另一个正在等待的线程。C++11提供了<condition_variable>头文件。

cpp
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>

std::mutex mtx;
std::condition_variable cv;
std::queue<int> data_queue;
bool finished = false;

void producer() {
for (int i = 0; i < 10; ++i) {
{
std::lock_guard<std::mutex> lock(mtx);
data_queue.push(i);
std::cout << "Produced: " << i << std::endl;
} // 解锁互斥量

// 通知一个等待的线程
cv.notify_one();

// 生产慢一点
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}

{
std::lock_guard<std::mutex> lock(mtx);
finished = true;
}
cv.notify_all(); // 通知所有等待的线程
}

void consumer() {
while (true) {
std::unique_lock<std::mutex> lock(mtx);

// 等待数据可用或生产结束
cv.wait(lock, []{ return !data_queue.empty() || finished; });

if (data_queue.empty() && finished) {
// 队列为空且生产已完成,退出循环
break;
}

// 处理数据
int value = data_queue.front();
data_queue.pop();
lock.unlock();

std::cout << "Consumed: " << value << std::endl;
}
}

int main() {
std::thread prod(producer);
std::thread cons(consumer);

prod.join();
cons.join();

return 0;
}

输出(可能会因为线程调度而略有不同):

Produced: 0
Consumed: 0
Produced: 1
Consumed: 1
...
Produced: 9
Consumed: 9

4. 未来值和承诺

C++11提供了<future>头文件,包含std::futurestd::promisestd::packaged_task等工具,使异步任务的结果处理更加简单。

使用std::async执行异步任务

cpp
#include <iostream>
#include <future>
#include <chrono>
#include <string>

std::string fetch_data(const std::string& url) {
// 模拟网络请求延迟
std::this_thread::sleep_for(std::chrono::seconds(2));
return "Data from " + url;
}

int main() {
// 启动异步任务
std::future<std::string> result = std::async(fetch_data, "example.com");

std::cout << "Doing other work while waiting..." << std::endl;

// 在需要结果时获取它(如果尚未完成,将阻塞等待)
std::string data = result.get();

std::cout << "Received: " << data << std::endl;
return 0;
}

输出:

Doing other work while waiting...
Received: Data from example.com

使用Promise和Future进行线程通信

cpp
#include <iostream>
#include <thread>
#include <future>

void calculate(std::promise<int> promise) {
// 执行计算
int result = 42;

// 计算完成后,将结果存储在promise中
promise.set_value(result);
}

int main() {
// 创建promise对象
std::promise<int> promise;

// 获取与promise关联的future
std::future<int> future = promise.get_future();

// 在新线程中执行计算
std::thread t(calculate, std::move(promise));

// 等待结果
std::cout << "Waiting for result..." << std::endl;
int result = future.get();
std::cout << "Result is: " << result << std::endl;

t.join();
return 0;
}

输出:

Waiting for result...
Result is: 42

实际应用案例:并行图像处理

并发编程在图像处理等计算密集型任务中特别有用。以下是一个简化的示例,展示如何使用C++11并发特性并行处理图像:

cpp
#include <iostream>
#include <vector>
#include <thread>
#include <mutex>
#include <chrono>

// 模拟图像数据(简化为整数数组)
using Image = std::vector<int>;

// 串行处理图像
void process_image_serial(Image& image) {
std::cout << "Processing image serially..." << std::endl;
auto start = std::chrono::high_resolution_clock::now();

// 模拟图像处理(简单地将每个像素值加倍)
for (auto& pixel : image) {
// 模拟复杂计算
std::this_thread::sleep_for(std::chrono::microseconds(10));
pixel *= 2;
}

auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double, std::milli> duration = end - start;
std::cout << "Serial processing took " << duration.count() << " ms." << std::endl;
}

// 并行处理图像
void process_image_parallel(Image& image) {
std::cout << "Processing image in parallel..." << std::endl;
auto start = std::chrono::high_resolution_clock::now();

// 获取可用的线程数
unsigned int num_threads = std::thread::hardware_concurrency();
std::vector<std::thread> threads;

// 计算每个线程处理的图像部分
size_t chunk_size = image.size() / num_threads;

for (unsigned int i = 0; i < num_threads; ++i) {
size_t start_idx = i * chunk_size;
size_t end_idx = (i == num_threads - 1) ? image.size() : (i + 1) * chunk_size;

// 创建线程处理图像的一部分
threads.push_back(std::thread([&image, start_idx, end_idx]() {
for (size_t j = start_idx; j < end_idx; ++j) {
// 相同的处理逻辑
std::this_thread::sleep_for(std::chrono::microseconds(10));
image[j] *= 2;
}
}));
}

// 等待所有线程完成
for (auto& thread : threads) {
thread.join();
}

auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double, std::milli> duration = end - start;
std::cout << "Parallel processing took " << duration.count() << " ms." << std::endl;
}

int main() {
// 创建测试图像(10000个像素)
Image test_image(10000, 10);

// 创建副本用于比较
Image image_for_serial = test_image;
Image image_for_parallel = test_image;

// 串行处理
process_image_serial(image_for_serial);

// 并行处理
process_image_parallel(image_for_parallel);

// 验证结果相同
bool results_match = true;
for (size_t i = 0; i < test_image.size(); ++i) {
if (image_for_serial[i] != image_for_parallel[i]) {
results_match = false;
break;
}
}

std::cout << "Results " << (results_match ? "match" : "don't match") << std::endl;

return 0;
}

在多核系统上运行此示例,并行版本通常会显著快于串行版本,展示了并发编程的性能优势。

C++ 11并发编程的最佳实践

  1. 避免数据竞争:确保多个线程访问共享数据时正确使用互斥量或其他同步机制。

  2. 防止死锁:避免循环等待资源,始终按相同顺序获取多个锁。

  3. 使用高级工具:优先使用std::async、任务执行库或线程池,而不是直接管理线程。

  4. 减少共享状态:设计系统时尽量减少线程间共享数据,使用消息传递而不是共享内存。

  5. 避免过度并行化:创建过多的线程会导致上下文切换开销,通常线程数应接近CPU核心数。

并发编程中的常见问题

数据竞争

当多个线程同时访问共享数据,且至少有一个线程执行写操作时,会发生数据竞争。

cpp
// 错误示例 - 数据竞争
int counter = 0;

void increment() {
++counter; // 多线程访问时不安全
}

// 正确示例 - 使用互斥量
std::mutex mtx;
int counter = 0;

void safe_increment() {
std::lock_guard<std::mutex> lock(mtx);
++counter; // 安全
}

死锁

当两个或多个线程互相等待对方持有的资源时,会发生死锁。

cpp
// 可能导致死锁的示例
std::mutex mutex1, mutex2;

void function1() {
std::lock_guard<std::mutex> lock1(mutex1);
std::this_thread::sleep_for(std::chrono::milliseconds(100)); // 增加死锁几率
std::lock_guard<std::mutex> lock2(mutex2);
// 处理...
}

void function2() {
std::lock_guard<std::mutex> lock2(mutex2);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::lock_guard<std::mutex> lock1(mutex1);
// 处理...
}

// 解决方法:使用std::lock同时锁定多个互斥量
void safe_function1() {
std::unique_lock<std::mutex> lock1(mutex1, std::defer_lock);
std::unique_lock<std::mutex> lock2(mutex2, std::defer_lock);
std::lock(lock1, lock2); // 原子地锁定两个互斥量
// 处理...
}

总结

C++11引入的并发支持极大地简化了多线程应用程序的开发,使并发编程成为C++语言的标准部分。主要组件包括:

  1. 线程支持库std::thread类及相关功能。
  2. 互斥量和锁:保护共享数据的同步原语。
  3. 条件变量:线程间的同步机制。
  4. 未来值和承诺:处理异步任务结果的机制。

并发编程可以显著提高程序性能,特别是在多核系统上,但也带来了数据竞争、死锁等挑战。正确使用C++11提供的并发工具,可以编写既安全又高效的多线程应用程序。

练习与进阶学习

  1. 基础练习:创建一个简单的程序,使用多线程并行计算向量元素的平方和。

  2. 中级练习:实现一个简单的线程池,可以将任务提交到工作线程队列中执行。

  3. 高级练习:使用C++11的并发特性实现一个生产者-消费者模式的任务调度系统。

进一步学习资源

  • C++ Reference:C++线程库的完整参考。
  • 《C++ Concurrency in Action》by Anthony Williams:详细介绍C++并发编程的经典书籍。
  • 《Effective Modern C++》by Scott Meyers:包含关于C++11/14并发编程的重要建议。

通过不断练习和学习,你将能够掌握C++11并发编程的技巧,编写出高效且可靠的多线程应用程序。