C++ packaged_task
在 C++11 引入的多线程支持中,std::packaged_task
是一个强大而灵活的工具,它允许你包装任意的可调用对象(如函数、lambda 表达式、函数对象等),并将其执行结果与 std::future
关联起来。这种机制使得异步任务的执行和结果获取变得简单而直观。
packaged_task 是什么?
std::packaged_task
本质上是一个包装器,它包装了一个可调用对象,并允许它在未来的某个时间点被异步执行。当这个被包装的可调用对象执行完成后,其结果会被存储在一个与之关联的 std::future
对象中,从而可以在任意时间点获取执行结果。
基本用法
创建和执行 packaged_task
让我们从一个简单的例子开始,展示如何创建和使用 std::packaged_task
:
#include <iostream>
#include <future>
#include <thread>
#include <functional>
// 定义一个简单的函数
int sum(int a, int b) {
return a + b;
}
int main() {
// 创建 packaged_task
std::packaged_task<int(int, int)> task(sum);
// 获取与任务关联的 future
std::future<int> result = task.get_future();
// 在另一个线程中执行任务
std::thread t(std::move(task), 2, 3);
t.detach(); // 分离线程
// 等待任务完成并获取结果
std::cout << "计算结果: " << result.get() << std::endl;
return 0;
}
输出:
计算结果: 5
在这个例子中:
- 我们创建了一个
std::packaged_task
,它包装了sum
函数 - 通过调用
get_future()
获取了一个与任务关联的future
对象 - 在一个新线程中执行了任务
- 通过
future.get()
等待任务完成并获取结果
使用 lambda 表达式
除了包装普通函数,std::packaged_task
也可以包装 lambda 表达式:
#include <iostream>
#include <future>
#include <thread>
int main() {
// 使用 lambda 表达式创建 packaged_task
std::packaged_task<int(int, int)> task([](int a, int b) {
std::cout << "正在计算 " << a << " * " << b << std::endl;
return a * b;
});
std::future<int> result = task.get_future();
// 直接调用执行
task(5, 6);
std::cout << "结果: " << result.get() << std::endl;
return 0;
}
输出:
正在计算 5 * 6
结果: 30
注意这里我们直接调用 task(5, 6)
而不是在新线程中执行。packaged_task
可以在当前线程中调用,也可以在新线程中执行。
packaged_task 的高级特性
重置 packaged_task
std::packaged_task
提供了 reset()
方法,使我们可以重用同一个任务:
#include <iostream>
#include <future>
int main() {
std::packaged_task<int(int)> task([](int x) { return x * x; });
// 首次执行
task(5);
std::cout << "第一次结果: " << task.get_future().get() << std::endl;
// 重置任务
task.reset();
// 再次执行
task(10);
std::cout << "第二次结果: " << task.get_future().get() << std::endl;
return 0;
}
输出:
第一次结果: 25
第二次结果: 100
检查 packaged_task 的状态
std::packaged_task
提供了 valid()
方法,用于检查任务是否有效:
#include <iostream>
#include <future>
int main() {
std::packaged_task<void()> task;
std::cout << "初始状态: " << (task.valid() ? "有效" : "无效") << std::endl;
task = std::packaged_task<void()>([]() {
std::cout << "执行任务" << std::endl;
});
std::cout << "赋值后: " << (task.valid() ? "有效" : "无效") << std::endl;
// 使用 std::move 转移所有权后,原任务变为无效
auto moved_task = std::move(task);
std::cout << "移动后: " << (task.valid() ? "有效" : "无效") << std::endl;
std::cout << "新任务: " << (moved_task.valid() ? "有效" : "无效") << std::endl;
return 0;
}
输出:
初始状态: 无效
赋值后: 有效
移动后: 无效
新任务: 有效
实际应用场景
场景1:异步文件读取
假设我们需要从文件中读取大量数据,但不希望阻塞主线程:
#include <iostream>
#include <future>
#include <thread>
#include <fstream>
#include <vector>
#include <string>
// 异步读取文件内容
std::vector<std::string> readFileLines(const std::string& filename) {
std::vector<std::string> lines;
std::ifstream file(filename);
std::string line;
if (file.is_open()) {
// 模拟耗时操作
std::this_thread::sleep_for(std::chrono::seconds(2));
while (std::getline(file, line)) {
lines.push_back(line);
}
file.close();
}
return lines;
}
int main() {
std::string filename = "data.txt";
// 创建任务
std::packaged_task<std::vector<std::string>(const std::string&)>
task(readFileLines);
// 获取future
std::future<std::vector<std::string>> future = task.get_future();
// 在新线程中执行文件读取
std::thread reader_thread(std::move(task), filename);
// 主线程可以继续做其他工作
std::cout << "文件读取中,主线程继续执行..." << std::endl;
// 做一些其他工作...
for (int i = 0; i < 3; ++i) {
std::cout << "主线程正在处理其他任务..." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
// 等待文件读取完成并获取结果
std::vector<std::string> lines = future.get();
reader_thread.join();
std::cout << "文件读取完成,共读取 " << lines.size() << " 行" << std::endl;
return 0;
}
场景2:任务调度系统
packaged_task
非常适合构建简单的任务调度系统:
#include <iostream>
#include <future>
#include <thread>
#include <queue>
#include <functional>
#include <mutex>
#include <condition_variable>
class TaskScheduler {
private:
std::queue<std::function<void()>> tasks;
std::mutex mtx;
std::condition_variable cv;
bool stop;
std::thread worker;
public:
TaskScheduler() : stop(false) {
worker = std::thread([this]() {
while (true) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [this]() { return stop || !tasks.empty(); });
if (stop && tasks.empty()) {
return;
}
task = std::move(tasks.front());
tasks.pop();
}
task(); // 执行任务
}
});
}
~TaskScheduler() {
{
std::unique_lock<std::mutex> lock(mtx);
stop = true;
}
cv.notify_all();
if (worker.joinable()) {
worker.join();
}
}
template<typename F, typename... Args>
auto schedule(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type> {
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> result = task->get_future();
{
std::unique_lock<std::mutex> lock(mtx);
if (stop) {
throw std::runtime_error("调度器已停止");
}
tasks.emplace([task]() { (*task)(); });
}
cv.notify_one();
return result;
}
};
int main() {
TaskScheduler scheduler;
// 调度一些任务
auto result1 = scheduler.schedule([](int x, int y) {
std::cout << "执行任务1" << std::endl;
return x + y;
}, 10, 20);
auto result2 = scheduler.schedule([]() {
std::cout << "执行任务2" << std::endl;
return "Hello, World!";
});
// 获取结果
std::cout << "任务1结果: " << result1.get() << std::endl;
std::cout << "任务2结果: " << result2.get() << std::endl;
return 0;
}
这个例子展示了如何使用 packaged_task
构建一个简单的任务调度系统,可以异步执行各种任务并方便地获取它们的执行结果。
packaged_task 与其他并发工具的比较
理解 packaged_task
与其他 C++ 并发工具的关系和区别非常有帮助。
工具 | 主要特点 | 适用场景 |
---|---|---|
std::thread | 直接控制线程 | 需要精细控制线程生命周期 |
std::async | 高级异步执行接口 | 简单地执行异步任务 |
std::packaged_task | 包装任务与结果 | 需要任务与结果分离或在特定时间执行任务 |
std::promise | 手动设置值/异常 | 需要在特定条件下设置结果 |
总结
std::packaged_task
是 C++11 提供的一个强大工具,它将可调用对象与 std::future
关联起来,使得我们可以方便地异步执行任务并在需要时获取结果。它的主要优点包括:
- 将任务的执行与结果的获取分离
- 可以灵活决定任务的执行时间和线程
- 可以轻松集成到各种并发架构中
通过学习本文内容,你应该能够理解 packaged_task
的基本概念,掌握其基本用法,并了解如何在实际应用中使用它来处理异步任务和并发问题。
练习
- 创建一个
packaged_task
包装一个计算斐波那契数列的函数,在新线程中执行它并获取结果。 - 修改上面的
TaskScheduler
类,添加一个功能,允许按优先级调度任务。 - 使用
packaged_task
实现一个简单的异步图像处理系统,包括加载、处理和保存图像的功能。
进一步阅读
- C++ 标准库参考:std::packaged_task
- 相关主题:
std::future
、std::promise
、std::async
- 《C++ Concurrency in Action》第4章