线程池
介绍
线程池: 一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量
线程池的价值:
- 需要大量的线程来完成任务,且完成任务的时间比较短。可同时处理多任务,多请求。
- 有任务可以立即从线程池中调取线程取处理,节省了线程创建的时间
- 有效防止服务端线程过多而导致系统过载的问题
实现
线程池中首先需要有很多个线程,用户可以自己选择创建多少个线程。为了实现线程间的同步与互斥,还需要增加两个变量——互斥量和条件变量。我们还需要一个任务队列,主线程不断往里面塞任务,线程池的线程不断去处理。需要注意的是:这里的任务队列可以为空,但不能满,所以任务队列的容量不限定(实际场景中,任务队列容量不够就需要考虑换一台更大的服务器)
线程池的四个成员变量:
- 一个队列: 存放任务
- 线程池中线程数: 记录线程池中创建的线程数
- 互斥量: 一个互斥锁
- 条件变量: 两个条件变量
线程池:首先需要创建几个线程,还有一个任务队列,当任务队列有任务的时候就唤醒一个正在等待的线程,让线程去执行任务,线程池中的线程执行完任务不会销毁,大大减少的cpu的消耗。
需要两个条件变量和一个互斥锁,这个互斥锁用来锁住任务队列,因为任务队列是公共资源,其次还需要两个条件变量,一个条件变量用来阻塞取任务的线程,当队列中有任务的时候,直接取任务,然后解锁,当任务队列中没有任务的时候,解锁等待条件,条件满足抢锁,取任务,解锁。另一个条件变量用来阻塞添加者进程,当任务队列满了,会让添加者进程等待,当有线程取走一个任务的时候,会唤醒添加者进程。
版本一
任务函数
这里的任务函数采用的时回调函数的方式,提高了代码的通用性,可以根据自己的需求改写任务函数
//任务回调函数 void taskRun(void *arg) { PoolTask *task = (PoolTask*)arg; int num = task->tasknum; printf("task %d is runing %lun",num,pthread_self()); sleep(1); printf("task %d is done %lun",num,pthread_self()); }
线程池的主要代码框架
class ThreadPool { public: //构造函数,初始化线程池 ThreadPool(int thrnum, int maxtasknum) { } static void* thrRun(void* arg) { } //析构函数,摧毁线程池 ~ThreadPool() { } public: //添加任务到线程池 void addtask(){}; private: //任务队列相关的参数 int max_job_num;//最大任务个数 int job_num;//实际任务个数 PoolTask *tasks;//任务队列数组 int job_push;//入队位置 int job_pop;// 出队位置 //线程相关参数 int thr_num;//线程池内线程个数 pthread_t *threads;//线程池内线程数组 int shutdown;//是否关闭线程池 pthread_mutex_t pool_lock;//线程池的锁 pthread_cond_t empty_task;//任务队列为空的条件 pthread_cond_t not_empty_task;//任务队列不为空的条件 };
放任务: 主线程无脑往任务队列中塞任务,塞任务之前进行加锁,塞完任务解锁,如果任务队列已经满了,等待线程取任务,然后唤醒在条件变量下等待的队列;放入了任务就给线程发送信号,唤醒线程来取
取任务: 线程池中的线程从任务队列中取任务,需要对任务队列上锁,因为对公共资源的操作都需要上锁,如果没有任务就阻塞,等待放任务唤醒;如果取完了一个任务,就唤醒添加任务
这就是两个条件变量和一个互斥锁的用法
//添加任务到线程池 void addtask() { pthread_mutex_lock(&(this->pool_lock)); //实际任务总数大于最大任务个数则阻塞等待(等待任务被处理) while(this->max_job_num <= this->job_num) { pthread_cond_wait(&(this->empty_task),&(this->pool_lock)); } int taskpos = (this->job_push++)%this->max_job_num; this->tasks[taskpos].tasknum = beginnum++; this->tasks[taskpos].arg = (void*)&this->tasks[taskpos]; this->tasks[taskpos].task_func = taskRun; this->job_num++; pthread_mutex_unlock(&(this->pool_lock)); pthread_cond_signal(&(this->not_empty_task));//通知包身工 }
//取任务 static void* thrRun(void* arg) { ThreadPool *pool = (ThreadPool*)arg; int taskpos = 0;//任务位置 PoolTask *task = new PoolTask(); while(1) { //获取任务,先要尝试加锁 pthread_mutex_lock(&pool->pool_lock); //无任务并且线程池不是要摧毁 while(pool->job_num <= 0 && !pool->shutdown ) { //如果没有任务,线程会阻塞 pthread_cond_wait(&pool->not_empty_task,&pool->pool_lock); } if(pool->job_num) { //有任务需要处理 taskpos = (pool->job_pop++)%pool->max_job_num; //printf("task out %d...tasknum===%d tid=%lun",taskpos,thrPool->tasks[taskpos].tasknum,pthread_self()); //为什么要拷贝?避免任务被修改,生产者会添加任务 memcpy(task,&pool->tasks[taskpos],sizeof(PoolTask)); task->arg = task; pool->job_num--; //task = &thrPool->tasks[taskpos]; pthread_cond_signal(&pool->empty_task);//通知生产者 } if(pool->shutdown) { //代表要摧毁线程池,此时线程退出即可 //pthread_detach(pthread_self());//临死前分家 pthread_mutex_unlock(&pool->pool_lock); delete(task); pthread_exit(NULL); } //释放锁 pthread_mutex_unlock(&pool->pool_lock); task->task_func(task->arg);//执行回调函数 } }
整体代码:
#include<iostream> #include<string.h> #include<pthread.h> #include<sys/types.h> #include<stdio.h> #include<unistd.h> using namespace std; int beginnum = 1; class PoolTask { public: int tasknum;//模拟任务编号 void *arg;//回调函数参数 void (*task_func)(void *arg);//任务的回调函数 }; //任务回调函数 void taskRun(void *arg) { PoolTask *task = (PoolTask*)arg; int num = task->tasknum; printf("task %d is runing %lun",num,pthread_self()); sleep(1); printf("task %d is done %lun",num,pthread_self()); } class ThreadPool { public: //构造函数,初始化线程池 ThreadPool(int thrnum, int maxtasknum) { this->thr_num = thrnum; this->max_job_num = maxtasknum; this->shutdown = 0;//是否摧毁线程池,1代表摧毁 this->job_push = 0;//任务队列添加的位置 this->job_pop = 0;//任务队列出队的位置 this->job_num = 0;//初始化的任务个数为0 //申请最大的任务队列 this->tasks = new PoolTask[thrnum]; //初始化锁和条件变量 pthread_mutex_init(&(this->pool_lock),NULL); pthread_cond_init(&(this->empty_task),NULL); pthread_cond_init(&(this->not_empty_task),NULL); int i = 0; this->threads = (pthread_t *)malloc(sizeof(pthread_t)*thrnum);//申请n个线程id的空间 pthread_attr_t attr; pthread_attr_init(&attr); pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); for(i = 0;i < thrnum;i++) { pthread_create(&(this->threads[i]),&attr,thrRun,this);//创建多个线程 } } static void* thrRun(void* arg) { ThreadPool *pool = (ThreadPool*)arg; int taskpos = 0;//任务位置 PoolTask *task = new PoolTask(); while(1) { //获取任务,先要尝试加锁 pthread_mutex_lock(&pool->pool_lock); //无任务并且线程池不是要摧毁 while(pool->job_num <= 0 && !pool->shutdown ) { //如果没有任务,线程会阻塞 pthread_cond_wait(&pool->not_empty_task,&pool->pool_lock); } if(pool->job_num) { //有任务需要处理 taskpos = (pool->job_pop++)%pool->max_job_num; //printf("task out %d...tasknum===%d tid=%lun",taskpos,thrPool->tasks[taskpos].tasknum,pthread_self()); //为什么要拷贝?避免任务被修改,生产者会添加任务 memcpy(task,&pool->tasks[taskpos],sizeof(PoolTask)); task->arg = task; pool->job_num--; //task = &thrPool->tasks[taskpos]; pthread_cond_signal(&pool->empty_task);//通知生产者 } if(pool->shutdown) { //代表要摧毁线程池,此时线程退出即可 //pthread_detach(pthread_self());//临死前分家 pthread_mutex_unlock(&pool->pool_lock); delete(task); pthread_exit(NULL); } //释放锁 pthread_mutex_unlock(&pool->pool_lock); task->task_func(task->arg);//执行回调函数 } } //析构函数,摧毁线程池 ~ThreadPool() { this->shutdown = 1;//关闭线程池 pthread_cond_broadcast(&(this->not_empty_task));//诱杀 int i = 0; for(i = 0; i<this->thr_num ; i++) { pthread_join(this->threads[i],NULL); } pthread_cond_destroy(&(this->not_empty_task)); pthread_cond_destroy(&(this->empty_task)); pthread_mutex_destroy(&(this->pool_lock)); delete []tasks; tasks = NULL; free(this->threads); } public: //添加任务到线程池 void addtask() { pthread_mutex_lock(&(this->pool_lock)); cout << "当前任务队列中任务的个数是: " <<this-> job_num <<endl; //实际任务总数大于最大任务个数则阻塞等待(等待任务被处理) while(this->max_job_num <= this->job_num) { pthread_cond_wait(&(this->empty_task),&(this->pool_lock)); } int taskpos = (this->job_push++)%this->max_job_num; this->tasks[taskpos].tasknum = beginnum++; this->tasks[taskpos].arg = (void*)&this->tasks[taskpos]; this->tasks[taskpos].task_func = taskRun; this->job_num++; pthread_mutex_unlock(&(this->pool_lock)); pthread_cond_signal(&(this->not_empty_task));//通知包身工 } private: //任务队列相关的参数 int max_job_num;//最大任务个数 int job_num;//实际任务个数 PoolTask *tasks;//任务队列数组 int job_push;//入队位置 int job_pop;// 出队位置 //线程相关参数 int thr_num;//线程池内线程个数 pthread_t *threads;//线程池内线程数组 int shutdown;//是否关闭线程池 pthread_mutex_t pool_lock;//线程池的锁 pthread_cond_t empty_task;//任务队列为空的条件 pthread_cond_t not_empty_task;//任务队列不为空的条件 }; int main() { ThreadPool *m = new ThreadPool(3,20); int j = 0; for(j=0;j<20;j++) { m->addtask(); } sleep(20); delete m; m = NULL; system("pause"); return EXIT_SUCCESS; }
运行结果如下:
可以看到线程最多处理三个任务,而任务队列中最多可以存在20个任务,当线程取走了任务之后,唤醒生产者继续添加任务。
版本二
首先封装一个任务:
class Task { public: Task(int a = 0, int b = 0) :_a(a) ,_b(b) {} void Run() { //执行的任务可以自己编写 } private: int _a; int _b; };
线程池的主要代码框架(唤醒和等待操作都已经封装好):
#define DEFAULT_MAX_PTHREAD 5 class ThreadPool { public: ThreadPool(int max_pthread = DEFAULT_MAX_PTHREAD) :_max_thread(max_pthread) {} ~ThreadPool() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_cond); } public: void LockQueue() { pthread_mutex_lock(&_mutex); } void UnlockQueue() { pthread_mutex_unlock(&_mutex); } void ThreadWait() { pthread_cond_wait(&_cond, &_mutex); } void WakeUpThread() { pthread_cond_signal(&_cond); //pthread_cond_broadcast(&_cond); } bool IsEmpty() { return _q.empty(); } private: queue<Task*> _q; int _max_thread; pthread_mutex_t _mutex; pthread_cond_t _cond; };
创建多个线程
创建多个线程可以用一个循环进行创建。需要注意的是,创建一个线程还需要提供一个线程启动后要执行的函数,这个启动函数只能有一个参数。如果把这个函数设置为成员函数,那么这个函数的第一个参数默认是this指针,这样显然是不可行的,所以这里我们考虑把这个启动函数设置为静态的。但是设置为静态的成员函数又会面临一个问题:如何调用其他成员函数和成员变量? 所以这里我们考虑创建线程的时候,把this指针传过去,让启动函数的arg 参数去接收即可
static void* Runtine(void* arg) { pthread_detach(pthread_self()); ThreadPool* this_p = (ThreadPool*)arg; while (1){ this_p->LockQueue(); while (this_p->IsEmpty()){ this_p->ThreadWait(); } Task* t; this_p->Get(t); this_p->UnlockQueue(); // 解锁后处理任务 t->Run(); delete t; } } void ThreadPoolInit() { pthread_mutex_init(&_mutex, nullptr); pthread_cond_init(&_cond, nullptr); pthread_t t[_max_thread]; for(int i = 0; i < _max_thread; ++i) { pthread_create(t + i, nullptr, Runtine, this); } }
注意: 线程创建后,执行启动函数,在这个函数中,线程会去任务队列中取任务并处理,取任务前需要进行加锁的操作(如果队列为空需要挂起等待),取完任务然后进行解锁,然后处理任务,让其它线程去任务队列中取任务
放任务: 主线程无脑往任务队列中塞任务,塞任务之前进行加锁,塞完任务解锁,然后唤醒在条件变量下等待的队列
取任务: 线程池中的线程从任务队列中取任务,这里不需要加锁,因为这个动作在启动函数中加锁的那一段区间中被调用的,其实已经上锁了
// 放任务 void Put(Task* data) { LockQueue(); _q.push(data); UnlockQueue(); WakeUpThread(); } // 取任务 void Get(Task*& data) { data = _q.front(); _q.pop(); }
这两个版本都可以实现简易的线程池,下面线程池版本的服务器主要是用版本二来实现,因为版本一要修改的内容有点多,小伙伴们可以自己修改一下
线程池版本服务器
多线程版本效果看起来还不错,但是来一个连接就创建一个线程,断开一个连接就释放一个线程,这样频繁地创建和释放线程资源,对OS来说是一种负担,同时也带来资源的浪费,如果我们使用线程池,把每一个客户端连接封装成一个任务,让线程池去处理,这样就不需要频繁地创建和销毁消除,效率也能提升很多。
线程池采用版本二,代码如下:
#pragma once #include <iostream> #include <queue> #include <pthread.h> #include <unistd.h> #include "Task.hpp" #define DEFAULT_MAX_PTHREAD 5 class ThreadPool { public: ThreadPool(int max_pthread = DEFAULT_MAX_PTHREAD) :_max_thread(max_pthread) {} static void* Runtine(void* arg) { pthread_detach(pthread_self()); ThreadPool* this_p = (ThreadPool*)arg; while (1){ this_p->LockQueue(); while (this_p->IsEmpty()){ this_p->ThreadWait(); } Task* t; this_p->Get(t); this_p->UnlockQueue(); // 解锁后处理任务 t->Run(); delete t; } } void ThreadPoolInit() { pthread_mutex_init(&_mutex, nullptr); pthread_cond_init(&_cond, nullptr); pthread_t t[_max_thread]; for(int i = 0; i < _max_thread; ++i) { pthread_create(t + i, nullptr, Runtine, this); } } void Put(Task* data) { LockQueue(); _q.push(data); UnlockQueue(); WakeUpThread(); } void Get(Task*& data) { data = _q.front(); _q.pop(); } ~ThreadPool() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_cond); } public: void LockQueue() { pthread_mutex_lock(&_mutex); } void UnlockQueue() { pthread_mutex_unlock(&_mutex); } void ThreadWait() { pthread_cond_wait(&_cond, &_mutex); } void WakeUpThread() { pthread_cond_signal(&_cond); //pthread_cond_broadcast(&_cond); } bool IsEmpty() { return _q.empty(); } private: std::queue<Task*> _q; int _max_thread; pthread_mutex_t _mutex; pthread_cond_t _cond; };
这里我们单独写一个头文件——Task.hpp,其中有任务类,任务类里面有三个成员变量,也就是端口号,IP和套接字,其中有一个成员方法——Run,里面封装了一个Service函数,也就是前面写的,把它放在Task.hpp这个头文件下,线程池里面的线程执行run函数即可,头文件内容如下:
#pragma once #include <iostream> #include <unistd.h> static void Service(std::string ip, int port, int sock) { while (1){ char buf[256]; ssize_t size = read(sock, buf, sizeof(buf)-1); if (size > 0){ // 正常读取size字节的数据 buf[size] = 0; std::cout << "[" << ip << "]:[" << port << "]# "<< buf << std::endl; std::string msg = "server get!-> "; msg += buf; write(sock, msg.c_str(), msg.size()); } else if (size == 0){ // 对端关闭 std::cout << "[" << ip << "]:[" << port << "]# close" << std::endl; break; } else{ // 出错 std::cerr << sock << "read error" << std::endl; break; } } close(sock); std::cout << "service done" << std::endl; } struct Task { int _port; std::string _ip; int _sock; Task(int port, std::string ip, int sock) :_port(port) ,_ip(ip) ,_sock(sock) {} void Run() { Service(_ip, _port, _sock); } };
服务器类的核心代码如下:
void loop() { struct sockaddr_in peer;// 获取远端端口号和ip信息 socklen_t len = sizeof(peer); _tp = new ThreadPool(THREAD_NUM); _tp->ThreadPoolInit(); while (1){ // 获取链接 // sock 是进行通信的一个套接字 _listen_sock 是进行监听获取链接的一个套接字 int sock = accept(_listen_sock, (struct sockaddr*)&peer, &len); if (sock < 0){ std::cout << "accept fail, continue accept" << std::endl; continue; } int peerPort = ntohs(peer.sin_port); std::string peerIp = inet_ntoa(peer.sin_addr); std::cout << "get a new link, [" << peerIp << "]:[" << peerPort << "]"<< std::endl; Task* task = new Task(peerPort, peerIp, sock); _tp->Put(task); } }
注意几点变化:
- 服务器类增加一个线程池成员变量,初始化函数里面增加线程池创建(在堆上申请)
- 析构函数增加释放线程池资源一步
- loop函数中只需要封装任务,并把任务丢进线程池中即可