C++ 多线程最佳实践
简介
多线程是现代C++编程中不可或缺的一部分,尤其是在追求高性能计算和响应式应用程序开发时。从C++11开始,标准库引入了原生的多线程支持,无需依赖第三方库或操作系统特定API,极大简化了多线程程序的开发。然而,多线程编程也引入了许多新的挑战,如数据竞争、死锁和性能问题。
本文将介绍C++多线程编程的最佳实践,帮助初学者避开常见陷阱,编写高效、安全的多线程代码。
C++ 多线程基础回顾
在深入最佳实践之前,我们先简要回顾C++多线程的基本概念:
- 线程(Thread): 由
std::thread
类表示的执行单元 - 互斥量(Mutex): 由
std::mutex
等类实现的同步原语 - 锁(Lock): 如
std::lock_guard
和std::unique_lock
的RAII风格锁管理工具 - 条件变量(Condition Variable): 由
std::condition_variable
实现的线程通知机制 - future和promise: 用于异步任务结果传递的机制
最佳实践1:优先使用高级抽象而非原始线程
尽可能使用C++标准库提供的任务抽象,如std::async
,而不是直接操作std::thread
。
不推荐的方式:
std::thread t([](int x) {
// 执行一些计算
return x * x;
}, 42);
// 需要手动管理线程生命周期
t.join();
推荐的方式:
auto future = std::async(std::launch::async, [](int x) {
// 执行一些计算
return x * x;
}, 42);
// 获取结果(自动等待任务完成)
int result = future.get();
std::cout << "Result: " << result << std::endl;
// 输出:
// Result: 1764
std::async
提供了更高级的抽象,自动管理线程创建和销毁,并通过future
机制提供了获取结果的简便方式。
最佳实践2:使用RAII风格的锁管理
永远不要直接调用mutex.lock()
和mutex.unlock()
,而是使用RAII锁对象。
不推荐的方式:
std::mutex mtx;
void unsafe_function() {
mtx.lock();
// 如果这里抛出异常,mutex永远不会解锁
process_data();
mtx.unlock();
}
推荐的方式:
std::mutex mtx;
void safe_function() {
std::lock_guard<std::mutex> lock(mtx);
// 离开作用域时自动解锁,即使发生异常
process_data();
}
使用std::lock_guard
或std::unique_lock
确保mutex总是被正确解锁,即使在函数提前返回或抛出异常的情况下。
最佳实践3:避免死锁
固定加锁顺序
当需要同时获取多个锁时,始终以相同的顺序获取锁:
std::mutex mutex1, mutex2;
void safe_operation() {
// 总是先锁mutex1,再锁mutex2
std::lock_guard<std::mutex> lock1(mutex1);
std::lock_guard<std::mutex> lock2(mutex2);
// 执行需要两个锁保护的操作
}
使用std::lock同时锁定多个互斥量
void better_operation() {
std::unique_lock<std::mutex> lock1(mutex1, std::defer_lock);
std::unique_lock<std::mutex> lock2(mutex2, std::defer_lock);
// 原子方式锁定多个mutex,避免死锁
std::lock(mutex1, mutex2);
// 执行需要两个锁保护的操作
}
或者使用C++17引入的更简洁的std::scoped_lock
:
void modern_operation() {
std::scoped_lock lock(mutex1, mutex2);
// 执行需要两个锁保护的操作
}
最佳实践4:减少锁的粒度
将锁的作用域限制在最小范围内,避免在持有锁的同时执行耗时操作:
不推荐的方式:
void process_data(const std::vector<int>& input) {
std::lock_guard<std::mutex> lock(data_mutex);
// 计算结果(可能很耗时)
auto result = perform_expensive_calculation(input);
// 更新共享数据
shared_data = result;
}
推荐的方式:
void process_data(const std::vector<int>& input) {
// 不需要锁的耗时计算放在锁外面
auto result = perform_expensive_calculation(input);
// 只在更新共享数据时加锁
{
std::lock_guard<std::mutex> lock(data_mutex);
shared_data = result;
}
}
最佳实践5:使用适当的并发数据结构
在可能的情况下,使用专为并发设计的数据结构,而不是用互斥量保护普通数据结构:
// 不推荐:用mutex保护普通队列
std::queue<int> task_queue;
std::mutex queue_mutex;
// 推荐:使用线程安全的队列实现
template<typename T>
class ThreadSafeQueue {
std::queue<T> queue;
mutable std::mutex mtx;
std::condition_variable cv;
public:
void push(T item) {
std::lock_guard<std::mutex> lock(mtx);
queue.push(std::move(item));
cv.notify_one();
}
bool try_pop(T& item) {
std::lock_guard<std::mutex> lock(mtx);
if (queue.empty()) {
return false;
}
item = std::move(queue.front());
queue.pop();
return true;
}
void wait_and_pop(T& item) {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [this]{ return !queue.empty(); });
item = std::move(queue.front());
queue.pop();
}
};
最佳实践6:正确处理线程异常
确保异常不会跨线程边界传播,因为跨线程的异常会导致程序终止:
void start_worker_thread() {
std::thread t([]() {
try {
worker_function();
} catch (const std::exception& e) {
// 记录异常,不要让它跨越线程边界
std::cerr << "Worker thread exception: " << e.what() << std::endl;
} catch (...) {
std::cerr << "Unknown worker thread exception" << std::endl;
}
});
t.detach(); // 或 t.join(),取决于使用场景
}
最佳实践7:适当使用原子操作代替互斥量
对于简单的计数器或标志变量,使用std::atomic
通常比互斥量更高效:
// 不推荐:用mutex保护一个简单计数器
int counter = 0;
std::mutex counter_mutex;
void increment_counter() {
std::lock_guard<std::mutex> lock(counter_mutex);
counter++;
}
// 推荐:使用原子变量
std::atomic<int> atomic_counter(0);
void increment_atomic_counter() {
atomic_counter++; // 原子操作,无需显式锁
}
实际应用案例:线程池实现
以下是一个简单线程池的实现,它综合运用了上面提到的多个最佳实践:
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <future>
#include <memory>
class ThreadPool {
public:
explicit ThreadPool(size_t num_threads) : stop(false) {
for (size_t i = 0; i < num_threads; ++i) {
workers.emplace_back([this] {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(queue_mutex);
condition.wait(lock, [this] {
return stop || !tasks.empty();
});
if (stop && tasks.empty()) {
return;
}
task = std::move(tasks.front());
tasks.pop();
}
task();
}
});
}
}
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> result = task->get_future();
{
std::lock_guard<std::mutex> lock(queue_mutex);
// 不允许在停止后添加任务
if (stop) {
throw std::runtime_error("Cannot enqueue on stopped ThreadPool");
}
tasks.emplace([task]() { (*task)(); });
}
condition.notify_one();
return result;
}
~ThreadPool() {
{
std::lock_guard<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
for (std::thread &worker : workers) {
worker.join();
}
}
private:
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
使用线程池的示例:
int main() {
ThreadPool pool(4); // 创建4个工作线程
// 提交一些任务
std::vector<std::future<int>> results;
for (int i = 0; i < 8; ++i) {
auto future = pool.enqueue([i] {
std::cout << "Task " << i << " executing in thread "
<< std::this_thread::get_id() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
return i * i;
});
results.push_back(std::move(future));
}
// 获取所有结果
for (auto& result : results) {
std::cout << "Got result: " << result.get() << std::endl;
}
return 0;
}
// 可能的输出:
// Task 0 executing in thread 140185344876288
// Task 1 executing in thread 140185336483584
// Task 2 executing in thread 140185328090880
// Task 3 executing in thread 140185319698176
// Task 4 executing in thread 140185344876288
// Task 5 executing in thread 140185336483584
// Task 6 executing in thread 140185328090880
// Task 7 executing in thread 140185319698176
// Got result: 0
// Got result: 1
// Got result: 4
// Got result: 9
// Got result: 16
// Got result: 25
// Got result: 36
// Got result: 49
最佳实践8:避免频繁创建和销毁线程
线程的创建和销毁是昂贵的操作,应尽量避免在程序执行过程中频繁地创建和销毁线程。上面的线程池实现就是一种解决方案,它在初始化时创建固定数量的线程,这些线程在池的生命周期内一直存在并重复使用。
最佳实践9:考虑数据局部性
在多线程环境中,考虑CPU缓存和数据局部性对性能有很大影响:
- 避免伪共享(false sharing):确保不同线程操作的数据在不同的缓存行
- 尽量让一个线程操作连续的内存区域
// 不好的设计 - 可能导致伪共享
struct SharedData {
int value1; // 线程1使用
int value2; // 线程2使用
};
// 更好的设计
struct PaddedValue {
int value;
// 填充到缓存行大小(通常64字节)
char padding[60]; // 假设int为4字节
};
struct BetterSharedData {
PaddedValue value1; // 线程1使用
PaddedValue value2; // 线程2使用
};
最佳实践10:使用条件变量处理线程通知
当一个线程需要等待特定条件满足时,使用条件变量是比轮询更高效的方式:
std::mutex mtx;
std::condition_variable cv;
bool data_ready = false;
std::vector<int> shared_data;
// 生产者线程
void producer() {
// 准备数据
std::vector<int> local_data = prepare_data();
// 更新共享数据
{
std::lock_guard<std::mutex> lock(mtx);
shared_data = std::move(local_data);
data_ready = true;
}
// 通知消费者
cv.notify_one();
}
// 消费者线程
void consumer() {
std::unique_lock<std::mutex> lock(mtx);
// 等待数据准备好
cv.wait(lock, []{ return data_ready; });
// 数据已经准备好,可以处理
process_data(shared_data);
}
总结
高效的C++多线程编程需要对语言特性和并发原理有深入理解。以下是本文介绍的核心最佳实践:
- 优先使用高级抽象如
std::async
而非直接操作线程 - 总是使用RAII风格的锁管理
- 采用一致的锁定顺序或使用
std::lock
来避免死锁 - 减小锁的粒度,避免在持有锁时执行耗时操作
- 使用专为并发设计的数据结构
- 正确处理线程异常,不要让异常跨越线程边界
- 适当使用原子操作代替互斥量
- 避免频繁创建和销毁线程,考虑使用线程池
- 考虑数据局部性和CPU缓存
- 使用条件变量处理线程通信
遵循这些最佳实践,不仅可以帮助你编写更安全、更高效的多线程代码,还能避免许多难以调试的并发问题。
练习建议
- 实现一个简单的线程安全单例模式
- 修改上述线程池实现,添加任务优先级功能
- 实现一个生产者-消费者模式的应用,比如多线程文件处理系统
- 使用
std::atomic
实现一个简单的自旋锁 - 尝试找出并修复一个包含竞态条件的程序
其他资源
- C++ Reference: Thread support library
- 《C++ Concurrency in Action》by Anthony Williams
- 《Effective Modern C++》by Scott Meyers (Item 35-40)