线程池基本概念
为什么使用线程池?
我们知道进程是运行的应用程序,可以被理解为是一个具有独立功能的程序在某个数据集合下的一次活动,而线程就是进程中的一个单一顺序的控制流,被包含在进程中,是进程实际运作单位。
在之前的博文中我们直接使用pthread_create
等函数或者C++线程标准库直接创建线程,但是每个创建的线程都对应着一次线程销毁,而一个程序频繁创建销毁线程就会浪费CPU资源。所以我们提出了可以创建一个线程池去管理线程,用的时候直接在线程池中取,不用的时候放回去即可。
什么是线程池?
通俗点的话来说线程池就是将一堆线程放进一个池子里统一管理。
使用较为专业的话来解释就是线程池就是管理线程的一个容器,有任务需要处理时,会相继判断核心线程数是否还有空闲、线程池中的任务队列是否已满、是否超过线程池大小,然后调用或创建线程或者排队,线程执行完任务后并不会立即被销毁,而是仍然在线程池中等待下一个任务,如果超过存活时间还没有新的任务就会被销毁,通过这样复用线程从而降低开销。
使用线程池的优势
- 提升线程的使用率,通过重复利用已创建的线程降低线程创建和销毁造成的消耗;
- 提高响应速度,当任务到达时,任务可以不需要的等到线程创建就能立即执行;
- 提高线程的可管理性,统一管理线程,避免系统创建大量同类线程而导致消耗完内存;
创建线程池
线程池构造基本思路如下图:
任务队列对任务进行排队,将任务传递给线程池,线程池为这些任务分配给线程。涉及三个问题:存放各种函数的任务队列、存放线程以及通知线程处理任务(涉及到通知就需要使用条件变量)。
其中比较复杂的是线程池的初始化工作,具体如下:
- 初始化任务队列和工作线程组
- 将等候在条件变量(任务队列上有任务)上的一个线程唤醒并从该任务队列中取出第一个任务给该线程执行
- 等待任务队列中所有任务执行完毕
具体代码如下:
class ThreadPool{
vector<thread> threads;
queue<function<void()>> tasks;
mutex m;
condition_variable cv;
bool exited = false;
public:
ThreadPool(int n = 8){
// 实现线程个数,通常是CPU个数的2倍
for(int i=0; i<8; ++i){
threads.push_back(thread([this](){
// lamda表达式中使用了很多成员变量
// 所以需要传送this
for(;;){
function<void()> task; // 存放任务
{
// unique锁的控制范围
unique_lock<mutex> lock(m); // 和guard很相似
// 加锁
while(tasks.empty() && !exited){
// 判断任务队列是否是空的
cv.wait(lock); // 有任务就唤醒线程--加锁
// 没有任务就释放互斥锁--解锁
}
if(exited) break;
task = tasks.front();
tasks.pop();
} // 解锁
task();
}
}));
}
}
void AddTask(function<void()> task){
{
lock_guard<mutex> guard(m);
tasks.push(task);
}
cv.notify_one();
}
~ThreadPool(){
m.lock();
exited = true;
m.unlock();
cv.notify_all();
for(auto &thread:threads){
thread.join(); // 释放线程资源
}
}
};
以上实现的过程中我们使用了两种特殊的锁:lock_guard和unique_lock。这两种锁都可以实现自动加锁和解锁,即构造时加锁析构时解锁。两者的不同在于unique_lock可以独立的进行加锁和解锁而lock_guard不可以,在这一方面,unique_lock更加灵活。
注意线程对象不可复制可移动,互斥锁和条件变量均不可复制不可移动,所以使用这些对象构造的线程池也是不可复制不可移动的。
以上就是关于线程池的实现,实现结束后我们就可以利用该线程池工作了,这里的任务对象只需要是可调用对象即可,可调用对象可以是lamda表达式、函数指针、仿函数。
线程池的使用
使用流程即是:
- 初始化线程池
- 向线程池添加任务
- 销毁线程池
我们首先使用线程池调用lamda表达式如下:
int main(){
ThreadPool pool;
for(int i=0; i<20; ++i){
pool.AddTask([i](){
cout << this_thread::get_id() << " begin:" << i << endl;
this_thread::sleep_for(i*1s);
cout << this_thread::get_id() << " end:" << i << endl;
});
}
this_thread::sleep_for(20s); // 超时时间
// 防止主线程执行完毕释放线程池
}
使用函数指针:
void Test(){
for(int i=0; i<10; ++i){
cout << "Hello" << i << endl;
this_thread::sleep_for(500ms);
}
}
int main(){
ThreadPool pool;
pool.AddTask(Test);
this_thread::sleep_for(20s); // 超时时间
// 防止主线程执行完毕释放线程池
}
使用仿函数如下:
struct Caller{
void operator()(){
cout << "Caller" << endl;
this_thread::sleep_for(500ms);
}
};
int main(){
ThreadPool pool;
pool.AddTask(Caller());
this_thread::sleep_for(20s); // 超时时间
// 防止主线程执行完毕释放线程池
}
线程池中线程数目的影响
线程池中线程个数对系统性能影响很大。如果线程池线程数量太小,当有大量请求需要处理,系统响应慢影响体验,甚至会出现任务队列大量堆积任务导致OOM。如果线程池线程数量过大,大量线程可能会同时在争取CPU资源,这样会导致大量的上下文切换(cpu给线程分配时间片,当线程的cpu时间片用完后保存状态,以便下次继续运行),从而增加线程的执行时间,影响了整体执行效率。
为了确定线程数目,我们需要考虑系统性能优化指标,通常情况下优化指标为降低延迟(发送请求到接受数据的时间)和提高吞吐量(单位时间能处理更多的请求)两种。通常从两个思路来进行优化,一种是优化算法(降低程序本身的时间复杂度和空间复杂度),另一种是硬件性能优化(提高IO和CPU利用率)。
在这部分我们主要考虑怎么提高I/O和CPU利用率,我们通常将程序分为I/O密集型和CPU密集型,由于I/O操作比CPU计算耗时多,所以若程序只要有IO操作,就归纳为IO密集型计算,程序中只有CPU计算,归纳为CPU密集型程序。
对于I/O密集型程序我们通常将线程数设置为二倍的CPU核数;对于CPU密集型我们将线程数设置为CPU核数。但实际情况下我们往往不会这样设置,如I/O密集型网络中线程数的设置可以使用公式:线程数=CPU核数 /(1-阻塞系数)。
文章评论