手写线程池 - C 语言版
扫描二维码
随时随地手机看文章
线程池的组成主要分为 3 个部分,这三部分配合工作就可以得到一个完整的线程池:
- 任务队列,存储需要处理的任务,由工作的线程来处理这些任务
- 通过线程池提供的 API 函数,将一个待处理的任务添加到任务队列,或者从任务队列中删除
- 已处理的任务会被从任务队列中删除
- 线程池的使用者,也就是调用线程池函数往任务队列中添加任务的线程就是生产者线程
- 工作的线程(任务队列任务的消费者) ,N个
- 线程池中维护了一定数量的工作线程,他们的作用是是不停的读任务队列,从里边取出任务并处理
- 工作的线程相当于是任务队列的消费者角色,
- 如果任务队列为空,工作的线程将会被阻塞 (使用条件变量 / 信号量阻塞)
- 如果阻塞之后有了新的任务,由生产者将阻塞解除,工作线程开始工作
- 管理者线程(不处理任务队列中的任务),1个
- 它的任务是周期性的对任务队列中的任务数量以及处于忙状态的工作线程个数进行检测
- 当任务过多的时候,可以适当的创建一些新的工作线程
- 当任务过少的时候,可以适当的销毁一些工作的线程
2. 任务队列
// 任务结构体typedef struct Task
{
void (*function)(void* arg);
void* arg;
}Task;
3. 线程池定义
// 线程池结构体struct ThreadPool
{
// 任务队列
Task* taskQ;
int queueCapacity; // 容量
int queueSize; // 当前任务个数
int queueFront; // 队头 -> 取数据
int queueRear; // 队尾 -> 放数据
pthread_t managerID; // 管理者线程ID
pthread_t *threadIDs; // 工作的线程ID
int minNum; // 最小线程数量
int maxNum; // 最大线程数量
int busyNum; // 忙的线程的个数
int liveNum; // 存活的线程的个数
int exitNum; // 要销毁的线程个数
pthread_mutex_t mutexPool; // 锁整个的线程池
pthread_mutex_t mutexBusy; // 锁busyNum变量
pthread_cond_t notFull; // 任务队列是不是满了
pthread_cond_t notEmpty; // 任务队列是不是空了
int shutdown; // 是不是要销毁线程池, 销毁为1, 不销毁为0
};
4. 头文件声明
#ifndef _THREADPOOL_H#define _THREADPOOL_H
typedef struct ThreadPool ThreadPool;
// 创建线程池并初始化
ThreadPool *threadPoolCreate(int min, int max, int queueSize);
// 销毁线程池
int threadPoolDestroy(ThreadPool* pool);
// 给线程池添加任务
void threadPoolAdd(ThreadPool* pool, void(*func)(void*), void* arg);
// 获取线程池中工作的线程的个数
int threadPoolBusyNum(ThreadPool* pool);
// 获取线程池中活着的线程的个数
int threadPoolAliveNum(ThreadPool* pool);
//////////////////////
// 工作的线程(消费者线程)任务函数
void* worker(void* arg);
// 管理者线程任务函数
void* manager(void* arg);
// 单个线程退出
void threadExit(ThreadPool* pool);
#endif // _THREADPOOL_H
5. 源文件定义
ThreadPool* threadPoolCreate(int min, int max, int queueSize){
ThreadPool* pool = (ThreadPool*)malloc(sizeof(ThreadPool));
do
{
if (pool == NULL)
{
printf("malloc threadpool fail...\n");
break;
}
pool->threadIDs = (pthread_t*)malloc(sizeof(pthread_t) * max);
if (pool->threadIDs == NULL)
{
printf("malloc threadIDs fail...\n");
break;
}
memset(pool->threadIDs, 0, sizeof(pthread_t) * max);
pool->minNum = min;
pool->maxNum = max;
pool->busyNum = 0;
pool->liveNum = min; // 和最小个数相等
pool->exitNum = 0;
if (pthread_mutex_init(