XuQi's Blog

  • 首页

  • 归档

c++11并发指南

发表于 2019-06-19 更新于 2019-10-20

与 C++11 多线程相关的头文件

C++11 新标准中引入了四个头文件来支持多线程编程,他们分别是 ,,,和。

  • :该头文主要声明了两个类, std::atomic 和 std::atomic_flag,另外还声明了一套 C 风格的原子类型和与 C 兼容的原子操作的函数。

  • :该头文件主要声明了 std::thread 类,另外 std::this_thread 命名空间也在该头文件中。

  • :该头文件主要声明了与互斥量(mutex)相关的类,包括 std::mutex 系列类,std::lock_guard, std::unique_lock, 以及其他的类型和函数。

  • :该头文件主要声明了与条件变量相关的类,包括 std::condition_variable 和 std::condition_variable_any。

  • :该头文件主要声明了 std::promise, std::package_task 两个 Provider 类,以及 std::future 和 std::shared_future 两个 Future 类,另外还有一些与之相关的类型和函数,std::async() 函数就声明在此头文件中。

    Linux GCC4.6 环境下,编译时需要加 -pthread

一句话话概括各个库的特征

  • std::thread创建就允许,允许后台运行
  • std::mutex 加锁解锁要配对
  • std::lock_guard 自动解锁
  • std::unique_lock 关联互斥体
  • std::promise 许诺给个未来
  • std::package_task 任务和返回值打包给线程
  • std::async 比package_task简单,自动创建线程
  • std::condition_variable 一般和unique_lock 一起用

c++ 内存模型

假设存在两个共享变量a, b,初始值均为 0,两个线程运行不同的指令,如下表格所示,线程 1 设置 a 的值为 1,然后设置 R1 的值为 b,线程 2 设置 b 的值为 2,并设置 R2 的值为 a,请问在不加任何锁或者其他同步措施的情况下,R1,R2 的最终结果会是多少?

由于没有施加任何同步限制,两个线程将会交织执行,但交织执行时指令不发生重排,即线程 1 中的 a = 1 始终在 R1 = b 之前执行,而线程 2 中的 b = 2 始终在 R2 = a 之前执行 ,因此可能的执行序列共有 4!/(2!*2!) = 6 种:

std::thread 线程详解

  • (1). 默认构造函数,创建一个空的 thread 执行对象。

  • (2). 初始化构造函数,创建一个 thread对象,该 thread对象可被 joinable,新产生的线程会调用 fn 函数,该函数的参数由 args 给出。

  • (3). 拷贝构造函数(被禁用),意味着 thread 不可被拷贝构造。

  • (4). move 构造函数,move 构造函数,调用成功之后 x 不代表任何 thread 执行对象。

  • 注意:可被 joinable 的 thread 对象必须在他们销毁之前被主线程 join 或者将其设置为 detached.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <iostream>
#include <utility>
#include <thread>
#include <chrono>
#include <functional>
#include <atomic>

void f1(int n)
{
std::this_thread::get_id()
for (int i = 0; i < 5; ++i) {
std::cout << "Thread " << n << " executing\n";
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}

void f2(int& n)
{
std::this_thread::get_id()
for (int i = 0; i < 5; ++i) {
std::cout << "Thread 2 executing\n";
++n;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
}

int main()
{
int n = 0;
std::thread t1; // t1 is not a thread
std::thread t2(f1, n + 1); // pass by value
std::thread t3(f2, std::ref(n)); // pass by reference
std::thread t4(std::move(t3)); // t4 is now running f2(). t3 is no longer a thread
t2.join();
t4.join();
std::cout << "Final value of n is " << n << '\n';
}

std::mutex 互斥量详解

std::mutex 对象提供了独占所有权的特性——即不支持递归地对 std::mutex 对象上锁,而 std::recursive_lock 则可以递归地对互斥量对象上锁。

  • 构造函数,std::mutex不允许拷贝构造,也不允许 move 拷贝,最初产生的 mutex 对象是处于 unlocked 状态的。
  • lock(),调用线程将锁住该互斥量。线程调用该函数会发生下面 3 种情况:
    • 如果该互斥量当前没有被锁住,则调用线程将该互斥量锁住,直到调用 unlock之前,该线程一直拥有该锁。
    • 如果当前互斥量被其他线程锁住,则当前的调用线程被阻塞住。
    • 如果当前互斥量被当前调用线程锁住,则会产生死锁(deadlock)。
  • unlock(), 解锁,释放对互斥量的所有权。
  • try_lock(),尝试锁住互斥量,如果互斥量被其他线程占有,则当前线程也不会被阻塞。线程调用该函数也会出现下面 3 种情况,
    • 如果当前互斥量没有被其他线程占有,则该线程锁住互斥量,直到该线程调用 unlock 释放互斥量。
    • 如果当前互斥量被其他线程锁住,则当前调用线程返回 false,而并不会被阻塞掉。
    • 如果当前互斥量被当前调用线程锁住,则会产生死锁(deadlock)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <iostream>       // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex

volatile int counter(0); // non-atomic counter
std::mutex mtx; // locks access to counter

void attempt_10k_increases() {
for (int i=0; i<10000; ++i) {
if (mtx.try_lock()) { // only increase if currently not locked:
++counter;
mtx.unlock();
}
}
}

int main (int argc, const char* argv[]) {
std::thread threads[10];
for (int i=0; i<10; ++i)
threads[i] = std::thread(attempt_10k_increases);

for (auto& th : threads)
th.join();
std::cout << counter << " successful increases of the counter.\n";

return 0;
}

其他Mutex 系列类(三种)

  • std::recursive_mutex,递归 Mutex 类,允许同一个线程对互斥量多次上锁(即递归上锁)

  • std::time_mutex,定时 Mutex 类。

    • try_lock_for 函数接受一个时间范围,表示在这一段时间范围之内线程如果没有获得锁则被阻塞住(与 std::mutex 的 try_lock() 不同,try_lock 如果被调用时没有获得锁则直接返回 false),如果在此期间其他线程释放了锁,则该线程可以获得对互斥量的锁,如果超时(即在指定时间内还是没有获得锁),则返回 false。
    • try_lock_until 函数则接受一个时间点作为参数,在指定时间点未到来之前线程如果没有获得锁则被阻塞住,如果在此期间其他线程释放了锁,则该线程可以获得对互斥量的锁,如果超时(即在指定时间内还是没有获得锁),则返回 false。
  • std::recursive_timed_mutex,定时递归 Mutex 类。

std::lock_guard 详解

不需要管理,出范围自动解锁std::lock_guard,与 Mutex RAII() 资源获取就是初始化)相关,方便线程对互斥量上锁。

1
2
3
4
mutex m;
m.lock();
sharedVariable= getVar();
m.unlock();

如果忘记解锁,或者抛出异常会导致死锁。

使用std::lock_guard

1
2
3
4
5
{
std::mutex m,
std::lock_guard<std::mutex> lockGuard(m); //出了括号就解锁
sharedVariable= getVar();
}

std::unique_lock 详解

std::unique_lock,与 Mutex RAII (资源获取就是初始化)相关,方便线程对互斥量上锁,但提供了更好的上锁和解锁控制。

和lock_guard对比又增加以下功能:

  • 没有关联互斥体时创建
  • 没有锁定的互斥体时创建
  • 显式和重复设置或释放关联互斥锁
  • 移动互斥体 move
  • 尝试锁定互斥体
  • 延迟锁定关联互斥体

有些锁因为互斥体锁导致问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// deadlock.cpp

#include <iostream>
#include <chrono>
#include <mutex>
#include <thread>

struct CriticalData{
std::mutex mut;
};

void deadLock(CriticalData& a, CriticalData& b){

a.mut.lock();
std::cout << "get the first mutex" << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1));
b.mut.lock();
std::cout << "get the second mutex" << std::endl;
// do something with a and b
a.mut.unlock();
b.mut.unlock();

}

int main(){

CriticalData c1;
CriticalData c2;

std::thread t1([&]{deadLock(c1,c2);}); // 两个线程互相锁
std::thread t2([&]{deadLock(c2,c1);});

t1.join();
t2.join();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
struct CriticalData{
std::mutex mut;
};

void deadLock(CriticalData& a, CriticalData& b){

std::unique_lock<std::mutex>guard1(a.mut,std::defer_lock);
std::cout << "Thread: " << std::this_thread::get_id() << " first mutex" << std::endl;

std::this_thread::sleep_for(std::chrono::milliseconds(1));

std::unique_lock<std::mutex>guard2(b.mut,std::defer_lock);
std::cout << "Thread:" << std::this_thread::get_id() << " second mutex" << std::endl;

std::cout << "Thread:" << std::this_thread::get_id() << "get both mutex" << std::endl;
// 如果用参数std::defer_lock调用std::unique_lock 的构造函数,锁不会自动锁定。 锁定操作是通过使用可变参数模板std::lock以原子方式执行锁定操作,
std::lock(guard1,guard2);
// do something with a and b
}

int main(){

std::cout << std::endl;

CriticalData c1;
CriticalData c2;

std::thread t1([&]{deadLock(c1,c2);});
std::thread t2([&]{deadLock(c2,c1);});

t1.join();
t2.join();

std::cout << std::endl;

}

也可以反过来做。在第一步中,锁定互斥体,在第二步中用std::unique_lock处理资源的生命周期

1
2
3
std::lock(a.mut, b.mut);
std::lock_guard<std::mutex> guard1(a.mut, std::adopt_lock);
std::lock_guard<std::mutex> guard2(b.mut, std::adopt_lock);

std::promise 详解

头文件中包含了以下几个类和函数:

  • Providers 类:std::promise, std::package_task
  • Futures 类:std::future, shared_future.
  • Providers 函数:std::async()
  • 其他类型:std::future_error, std::future_errc, std::future_status, std::launch.

promise 对象可以保存某一类型 T 的值,该值可被 future 对象读取(可能在另外一个线程中),因此 promise 也提供了一种线程同步的手段。在 promise 对象构造时可以和一个共享状态(通常是std::future)相关联,并可以在相关联的共享状态(std::future)上保存一个类型为 T 的值。

可以通过 get_future 来获取与该 promise 对象相关联的 future 对象,调用该函数之后,两个对象共享相同的共享状态(shared state)

  • promise 对象是异步 Provider,它可以在某一时刻设置共享状态的值。
  • future 对象可以异步返回共享状态的值,或者在必要的情况下阻塞调用者并等待共享状态标志变为 ready,然后才能获取共享状态的值。

下面以一个简单的例子来说明上述关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <iostream>       // std::cout
#include <functional> // std::ref
#include <thread> // std::thread
#include <future> // std::promise, std::future

void print_int(std::future<int>& fut) {
int x = fut.get(); // 获取共享状态的值,在set_value之前会阻塞
std::cout << "value: " << x << '\n'; // 打印 value: 10.
}

int main ()
{
std::promise<int> prom; // 生成一个 std::promise<int> 对象.
std::future<int> fut = prom.get_future(); // 和 future 关联.
std::thread t(print_int, std::ref(fut)); // 将 future 交给另外一个线程t.
prom.set_value(10); // 设置共享状态的值, 此处和线程t保持同步.
t.join();
return 0;
}

std::packaged_task 详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <iostream>     // std::cout
#include <future> // std::packaged_task, std::future
#include <chrono> // std::chrono::seconds
#include <thread> // std::thread, std::this_thread::sleep_for

// count down taking a second for each value:
int countdown (int from, int to) {
for (int i=from; i!=to; --i) {
std::cout << i << '\n';
std::this_thread::sleep_for(std::chrono::seconds(1));
}
std::cout << "Finished!\n";
return from - to;
}

int main ()
{
std::packaged_task<int(int,int)> task(countdown); // 设置 packaged_task
std::future<int> ret = task.get_future(); // 获得与 packaged_task 共享状态相关联的 future 对象.

std::thread th(std::move(task), 5, 0); //创建一个新线程完成计数任务.


int value = ret.get(); // 等待任务完成并获取结果.

std::cout << "The countdown lasted for " << value << " seconds.\n";

th.join();
return 0;
}

运行结果

1
2
3
4
5
6
7
5
4
3
2
1
Finished!
The countdown lasted for 5 seconds.

std::packaged_task 构造函数共有 5 中形式,不过拷贝构造已经被禁用了。下面简单地介绍一下上述几种构造函数的语义:

  1. 默认构造函数,初始化一个空的共享状态,并且该 packaged_task 对象无包装任务。
  2. 初始化一个共享状态,并且被包装任务由参数 fn 指定。
  3. 带自定义内存分配器的构造函数,与默认构造函数类似,但是使用自定义分配器来分配共享状态。
  4. 拷贝构造函数,被禁用。
  5. 移动构造函数。

std::packaged_task::valid 介绍

检查当前 packaged_task 是否和一个有效的共享状态相关联,对于由默认构造函数生成的 packaged_task 对象,该函数返回 false,除非中间进行了 move 赋值操作或者 swap 操作。

std::async 详解

c++11还提供了异步接口std::async,通过这个异步接口可以很方便的获取线程函数的执行结果。std::async会自动创建一个线程去调用线程函数,它返回一个std::future,这个future中存储了线程函数返回的结果,当我们需要线程函数的结果时,直接从future中获取,非常方便。

std::async是为了让用户的少费点脑子的。大概的工作过程是这样的:std::async先将异步操作用std::packaged_task包装起来,然后将异步操作的结果放到std::promise中,这个过程就是创造未来的过程。外面再通过future.get/wait来获取这个未来的结果,怎么样,std::async真的是来帮忙的吧,你不用再想到底该怎么用std::future、std::promise和std::packaged_task了,std::async已经帮你搞定一切了!

std::async的原型async(std::launch::async | std::launch::deferred, f, args…),第一个参数是线程的创建策略,有两种策略,默认的策略是立即创建线程

  • std::launch::async:在调用async就开始创建线程。
  • std::launch::deferred:延迟加载方式创建线程。调用async时不创建线程,直到调用了future的get或者wait时才创建线程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// future example
#include <iostream> // std::cout
#include <future> // std::async, std::future
#include <chrono> // std::chrono::milliseconds

// a non-optimized way of checking for prime numbers:
bool
is_prime(int x)
{
for (int i = 2; i < x; ++i)
if (x % i == 0)
return false;
return true;
}

int
main()
{
// call function asynchronously:
std::future < bool > fut = std::async(is_prime, 444444443);

// do something while waiting for function to set future:
std::cout << "checking, please wait";
std::chrono::milliseconds span(100);
while (fut.wait_for(span) == std::future_status::timeout)
std::cout << '.';

bool x = fut.get(); // retrieve return value

std::cout << "\n444444443 " << (x ? "is" : "is not") << " prime.\n";

return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
std::future<int> future = std::async(std::launch::async, [](){ 
std::this_thread::sleep_for(std::chrono::seconds(3));
return 8;
});

std::cout << "waiting...\n";
std::future_status status;
do {
status = future.wait_for(std::chrono::seconds(1));
if (status == std::future_status::deferred) {
std::cout << "deferred\n";
} else if (status == std::future_status::timeout) {
std::cout << "timeout\n";
} else if (status == std::future_status::ready) {
std::cout << "ready!\n";
}
} while (status != std::future_status::ready);

std::cout << "result is " << future.get() << '\n';

std::condition_variable 条件变量详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <iostream>                // std::cout
#include <thread> // std::thread
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable

std::mutex mtx; // 全局互斥锁.
std::condition_variable cv; // 全局条件变量.
bool ready = false; // 全局标志位.

void do_print_id(int id)
{
std::unique_lock <std::mutex> lck(mtx); // 因为使用的同一把锁,所以其实不起效果
while (!ready) // 如果标志位不为 true, 则等待...
cv.wait(lck); // 当前线程被阻塞, 当全局标志位变为 true 之后,
// 线程被唤醒, 继续往下执行打印线程编号id.
std::cout << "thread " << id << '\n';
}

void go()
{
std::unique_lock <std::mutex> lck(mtx);
ready = true; // 设置全局标志位为 true.
cv.notify_all(); // 唤醒所有线程.
}

int main()
{
std::thread threads[10]; // spawn 10 threads:
for (int i = 0; i < 10; ++i)
threads[i] = std::thread(do_print_id, i);
std::cout << "10 threads ready to race...\n";
go(); // go!

for (auto & th:threads)
th.join();
return 0;
}
  • std::condition_variable::wait()

    std::condition_variable 提供了两种 wait() 函数。当前线程调用 wait() 后将被阻塞(此时当前线程应该获得了锁(mutex),不妨设获得锁 lck),直到另外某个线程调用 notify_* 唤醒了当前线程。

    在线程被阻塞时,该函数会自动调用 lck.unlock() 释放锁,使得其他被阻塞在锁竞争上的线程得以继续执行。另外,一旦当前线程获得通知(notified,通常是另外某个线程调用 notify_* 唤醒了当前线程),wait() 函数也是自动调用 lck.lock(),使得 lck 的状态和 wait 函数被调用时相同。

  • std::condition_variable::wait_for()

    wait_for 可以指定一个时间段,在当前线程收到通知或者指定的时间 rel_time 超时之前,该线程都会处于阻塞状态。而一旦超时或者收到了其他线程的通知,wait_for 返回,剩下的处理步骤和 wait() 类似。

    另外,wait_for 的重载版本(predicte(2))的最后一个参数 pred 表示 wait_for 的预测条件,只有当 pred 条件为 false 时调用 wait() 才会阻塞当前线程,并且在收到其他线程的通知后只有当 pred 为 true 时才会被解除阻塞,因此相当于如下代码:

    1
    return wait_until (lck, chrono::steady_clock::now() + rel_time, std::move(pred));
  • std::condition_variable::wait_until

    wait_until 可以指定一个时间点,在当前线程收到通知或者指定的时间点 abs_time 超时之前,该线程都会处于阻塞状态。而一旦超时或者收到了其他线程的通知,wait_until 返回,剩下的处理步骤和 wait_until() 类似。

    另外,wait_until 的重载版本(predicte(2))的最后一个参数 pred 表示 wait_until 的预测条件,只有当 pred 条件为 false 时调用 wait() 才会阻塞当前线程,并且在收到其他线程的通知后只有当 pred 为 true 时才会被解除阻塞,因此相当于如下代码:

    1
    2
    3
    4
    while (!pred())
    if ( wait_until(lck,abs_time) == cv_status::timeout)
    return pred();
    return true;
  • std::condition_variable::notify_one()

    唤醒某个等待(wait)线程。如果当前没有等待线程,则该函数什么也不做,如果同时存在多个等待线程,则唤醒某个线程是不确定的(unspecified)。

  • std::condition_variable::notify_all()

    唤醒所有的等待(wait)线程。如果当前没有等待线程,则该函数什么也不做。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#include <iostream>                // std::cout
#include <thread> // std::thread, std::this_thread::yield
#include <mutex> // std::mutex, std::unique_lock
#include <condition_variable> // std::condition_variable

std::mutex mtx;
std::condition_variable cv;

int cargo = 0;
bool shipment_available()
{
return cargo != 0;
}

// 消费者线程.
void consume(int n)
{
for (int i = 0; i < n; ++i) {
std::unique_lock <std::mutex> lck(mtx);
cv.wait(lck, shipment_available);
std::cout << cargo << '\n';
cargo = 0;
}
}

int main()
{
std::thread consumer_thread(consume, 10); // 消费者线程.

// 主线程为生产者线程, 生产 10 个物品.
for (int i = 0; i < 10; ++i) {
while (shipment_available())
std::this_thread::yield();
std::unique_lock <std::mutex> lck(mtx);
cargo = i + 1;
cv.notify_one();
}

consumer_thread.join();

return 0;
}

std::atomic_flag 原子类型详解

atomic_flag 一种简单的原子布尔类型,只支持两种操作,test-and-set 和 clear。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#include <iostream>                // std::cout
#include <atomic> // std::atomic_flag
#include <thread> // std::thread
#include <vector> // std::vector
#include <sstream> // std::stringstream

std::atomic_flag lock_stream = ATOMIC_FLAG_INIT;
std::stringstream stream;

void append_number(int x)
{
while (lock_stream.test_and_set()) {
}
stream << "thread #" << x << '\n';
lock_stream.clear();
}

int main()
{
std::vector < std::thread > threads;
for (int i = 1; i <= 10; ++i)
threads.push_back(std::thread(append_number, i));
for (auto & th:threads)
th.join();

std::cout << stream.str() << std::endl;;
return 0;
}

std::atomic 详解

std::atomic 是模板类,一个模板类型为 T 的原子对象中封装了一个类型为 T 的值。

1
2
3
4
5
6
7
8
9
template <class T> struct atomic;

# 在atomic中对基本类型有定义
typedef atomic<bool> atomic_bool;
typedef atomic<char> atomic_char;
typedef atomic<signed char> atomic_schar;
typedef atomic<unsigned char> atomic_uchar;
typedef atomic<short> atomic_short;
typedef atomic<unsigned short> atomic_ushort;

原子类型对象的主要特点就是从不同线程访问不会导致数据竞争(data race)。因此从不同线程访问某个原子对象是良性 (well-defined) 行为,而通常对于非原子类型而言,并发访问某个对象(如果不做任何同步操作)会导致未定义 (undifined) 行为发生。

std::atomic 是模板类,一个模板类型为 T 的原子对象中封装了一个类型为 T 的值。本文<std::atomic 基本介绍>一节中也提到了 std::atomic 类模板除了基本类型以外,还针对整形和指针类型做了特化。 特化的 std::atomic 类型支持更多的操作,如 fetch_add, fetch_sub, fetch_and 等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

#include <iostream> // std::cout
#include <atomic> // std::atomic
#include <thread> // std::thread, std::this_thread::yield

std::atomic <int> foo = 0;

void set_foo(int x)
{
foo = x; // 调用 std::atomic::operator=().
}

void print_foo()
{
while (foo == 0) { // wait while foo == 0
std::this_thread::yield();
}
std::cout << "foo: " << foo << '\n';
}

int main()
{
std::thread first(print_foo);
std::thread second(set_foo, 10);
first.join();
second.join();
return 0;
}

线程池的实现

.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
//
// Created by xuqi on 2019-06-17.
//

#ifndef THREADPOOL_H
#define THREADPOOL_H
#include <vector>
#include <queue>
#include <memory>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <future>
#include <functional>
#include <stdexcept>

//#define C14

class ThreadPool
{
public:
ThreadPool(size_t);

#ifdef C14
template<class F,class... Args>
auto enqueue(F&& f,Args&&... args);

#else
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
#endif
~ThreadPool();

private:
// 线程池
std::vector< std::thread > workers;
// 任务队列
std::queue< std::function<void()> > tasks;

// synchronization同步
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};


#endif //THREADPOOL_H

.cpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
//
// Created by xuqi on 2019-06-17.
//

#include "ThreadPool.h"
ThreadPool::ThreadPool(size_t threads)
: stop(false)
{
// 创建n个线程,每个线程等待是否有新的task, 或者线程stop(要终止)
for (size_t i = 0; i < threads; ++i) {

// 构造并插入vector最后
workers.emplace_back(
[this]
{
for (;;)// 轮询
{
std::function<void()> task; // 只有取Task的时候是锁的
{
// 获取同步锁
std::unique_lock<std::mutex> lock(this->queue_mutex);
// 线程会一直阻塞,直到有新的task,或者是线程要退出
this->condition.wait(lock,
[this]
{ return this->stop || !this->tasks.empty(); });
// 线程退出
if (this->stop && this->tasks.empty())
return;
// 将task取出
task = std::move(this->tasks.front());
// 队列中移除以及取出的task
this->tasks.pop();
}
// 执行task,完了则进入下一次循环
task();
}
}
);
}
}


// add new work item to the pool
// 将队列压入线程池,其中f是要执行的函数, args是多有的参数
#ifdef C14

template<class F, class... Args>
auto ThreadPool::enqueue(F &&f, Args &&... args)
{
return nullptr;
}

#else

template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) // 右值引用
-> std::future<typename std::result_of<F(Args...)>::type> // 拖尾类型返回
{
// 返回的结果的类型,当然可以根据实际的需要去掉这个(gcc4.7的c++11不支持)
using return_type = typename std::result_of<F(Args...)>::type;
// 将函数handle与参数绑定
auto task = std::make_shared<std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);

//after finishing the task, then get result by res.get() (mainly used in the invoked function)
std::future<return_type> res = task->get_future();
{
// 压入队列需要线程安全,因此需要先获取锁
std::unique_lock<std::mutex> lock(queue_mutex);

// don't allow enqueueing after stopping the pool
if (stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
// 任务压入队列
tasks.emplace([task]()
{ (*task)(); });
}
// 添加了新的task,因此条件变量通知其他线程可以获取task执行
condition.notify_one();
return res;
}

#endif

ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
// 通知所有在等待锁的线程
condition.notify_all();
// 等待所有的线程任务执行完成退出
for (std::thread &worker: workers)
worker.join();
}

使用示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ThreadPool tp(5);

//线程执行结果的返回
std::vector<std::future<std::string>> vecStr;
// 下载对应的url链接,没有返回值
tp.enqueue(dummy_download, "www.baidu.jpg");
tp.enqueue(dummy_download, "www.yy.jpg");

//数据库根据id查询user name
vecStr.emplace_back(tp.enqueue(get_user_name, 101));
vecStr.emplace_back(tp.enqueue(get_user_name, 102));

//输出线程返回的值,实际中可以不要
std::future<void> res1 = std::async(std::launch::async, [&vecStr](){
for (auto &&ret:vecStr) {
std::cout<<"get user: "<<ret.get();
}
std::cout<<std::endl;
});
# c++11并发指南
线程池
socket编程
  • 文章目录
  • 站点概览

XuQi

44 日志
30 标签
  1. 1. 与 C++11 多线程相关的头文件
    1. 1.1. 一句话话概括各个库的特征
  2. 2. c++ 内存模型
  3. 3. std::thread 线程详解
  4. 4. std::mutex 互斥量详解
  5. 5. std::lock_guard 详解
  6. 6. std::unique_lock 详解
  7. 7. std::promise 详解
  8. 8. std::packaged_task 详解
  9. 9. std::async 详解
  10. 10. std::condition_variable 条件变量详解
  11. 11. std::atomic_flag 原子类型详解
  12. 12. std::atomic 详解
  13. 13. 线程池的实现
© 2019 XuQi
由 Hexo 强力驱动 v3.9.0
|
主题 – NexT.Muse v7.3.0