首页 | 新闻 | 新品 | 文库 | 方案 | 视频 | 下载 | 商城 | 开发板 | 数据中心 | 座谈新版 | 培训 | 工具 | 博客 | 论坛 | 百科 | GEC | 活动 | 主题月 | 电子展
返回列表 回复 发帖

如何使用 C++11 编写 Linux 多线程程序(5)几个高级概念

如何使用 C++11 编写 Linux 多线程程序(5)几个高级概念

C++11 提供了若干多线程编程的高级概念:promise/future, packaged_task, async,来简化多线程编程,尤其是线程之间的数据交互比较简单的情况下,让我们可以将注意力更多地放在业务处理上。
promise/future 可以用来在线程之间进行简单的数据交互,而不需要考虑锁的问题,线程 A 将数据保存在一个 promise 变量中,另外一个线程 B 可以通过这个 promise 变量的 get_future() 获取其值,当线程 A 尚未在 promise 变量中赋值时,线程 B 也可以等待这个 promise 变量的赋值:
清单 16.例子 thread_promise_future.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
promise<string> val;
static void
threadPromiseFuture(){
thread ta([](){
future<string> fu = val.get_future();
cout << "waiting promise->future" << endl;
cout << fu.get() << endl;
});
thread tb([](){
this_thread::sleep_for( chrono::milliseconds(100) );
val.set_value("promise is set");
});
ta.join();
tb.join();
}




一个 future 变量只能调用一次 get(),如果需要多次调用 get(),可以使用 shared_future,通过 promise/future 还可以在线程之间传递异常。
如果将一个 callable 对象和一个 promise 组合,那就是 packaged_task,它可以进一步简化操作:
清单 17.例子 thread_packaged_task.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static mutex g_mutex;
static void
threadPackagedTask(){
auto run = [=](int index){
{
lock_guard<mutex> _(g_mutex);
cout << "tasklet " << index << endl;
}
this_thread::sleep_for( chrono::seconds(10) );
return index * 1000;
};
packaged_task<int(int)> pt1(run);
packaged_task<int(int)> pt2(run);
thread t1([&](){pt1(2);} );
thread t2([&](){pt2(3);} );
int f1 = pt1.get_future().get();
int f2 = pt2.get_future().get();
cout << "task result=" << f1 << endl;
cout << "task result=" << f2 << endl;
t1.join();
t2.join();
}




我们还可以试图将一个 packaged_task 和一个线程组合,那就是 async() 函数。使用 async() 函数启动执行代码,返回一个 future 对象来保存代码返回值,不需要我们显式地创建和销毁线程等,而是由 C++11 库的实现决定何时创建和销毁线程,以及创建几个线程等,示例如下:
清单 18.例子 thread_async.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
static long
do_sum(vector<long> *arr, size_t start, size_t count){
static mutex _m;
long sum = 0;
for(size_t i = 0; i < count; i++){
sum += (*arr)[start + i];
}
{
lock_guard<mutex> _(_m);
cout << "thread " << this_thread::get_id()
<< ", count=" << count
<< ", sum=" << sum << endl;
}
return sum;
}
static void
threadAsync(){
# define COUNT 1000000
vector<long> data(COUNT);
for(size_t i = 0; i < COUNT; i++){
data = random() & 0xff;
}
//
vector< future<long> > result;
size_t ptc = thread::hardware_concurrency() * 2;
for(size_t batch = 0; batch < ptc; batch++){
size_t batch_each = COUNT / ptc;
if (batch == ptc - 1){
batch_each = COUNT - (COUNT / ptc * batch);
}
result.push_back(async(do_sum, &data, batch * batch_each, batch_each));
}
long total = 0;
for(size_t batch = 0; batch < ptc; batch++){
total += result[batch].get();
}
cout << "total=" << total << endl;
}




如果是在多核或者多 CPU 的环境上面运行上述例子,仔细观察输出结果,可能会发现有些线程 ID 是重复的,这说明重复使用了线程,也就是说,通过使用 async() 还可达到一些线程池的功能。
返回列表