Linux 之C++ 线程池
扫描二维码
随时随地手机看文章
我想做的就是对每个线程进行有针对性的控制,也即可以对线程进行暂停,恢复,退出等等精细控制,对于此项要求,我的想法是声明一个类,该类中有些精细的操作其中包括该线程的状态,对线程控制的互斥变量(pthread_mutex_t),以及唤醒的条件(pthread_cond_t),要实现这些功能其核心在于,每个线程拥有自己的控制变量,也即线程执行函数是线程类的静态函数,该线程函数的参数是该线程类对象的this指针!!!
class Servant { private: pthread_mutex_t mutex_x; pthread_cond_t cond_t; int state; public: pthread_t m_tid; Servant(); void start(); static void* ThreadProc(void* data); void notify(); void stop(); void wakeup(); void exit(); void join(); };
这是该线程类的声明,最主要的是查看ThreadProc静态函数
void* Servant::ThreadProc(void* data) { Servant* ser=(Servant*)data; int result=0; while(ser->state!=EXIT) { while(ser->state==IDLE) { result=pthread_mutex_lock(&(ser->mutex_x)); if(0==result) { printf("waiting for the mutex n"); } result=pthread_cond_wait(&(ser->cond_t),&(ser->mutex_x)); if(ser->state!=IDLE) goto End; printf("the conditions has been notifiedn"); //we take this thread to busy list ser->m_pool->AddtoIdle(false,ser); ser->m_pool->Addtobusy(true,ser); // really work DoSomething: ser->state=WORKING; printf("Do Something...n"); ser->DoJob(); ser->Complete();//this function change state End: pthread_mutex_unlock(&(ser->mutex_x)); } } return NULL; }
这函数,主要控制的就两项,其一state,每次对线程状态的改变,都是基于原子线程已经执行完毕的基础上的,也即,核心操作是不能被状态的改变所修改的。每次被唤醒时,都需要进行检测state的值,如果STATE还是RUN,则表明有新的任务要处理,否则表明唤醒只是为了要修改当前线程状态!!!,现在看一下一个API
pthread_cond_wait,该函数调用时首先必须必须由本线程加锁,这个函数需要两个变量一个是mutex_x(加锁的互斥变量)和cond_t(条件变量),在更新条件以前,mutex_x一直保持锁定状态,并在线程挂起进入等待解锁,当执行pthread_cond_singal时,激活互斥变量mutex,并向下执行,当离开pthread_cond_wait时,重新加锁
现Servant.h文件内容如下:
#include#includeusing namespace std; class Servant { private: pthread_mutex_t mutex_x; pthread_cond_t cond_t; int state; public: pthread_t m_tid; Servant(); void start(); static void* ThreadProc(void* data); void notify(); void stop(); void wakeup(); void exit(); void join(); }; //ServantPool supposed to be singlton class ServantPool { private: vectorm_idle_list; vectorm_busy_list; ServantPool(){}; static ServantPool * hInstance; public: void TerminateAll(); void create(int num); void start(); void stop(pthread_t id); void wakeup(pthread_t id); void waitforAll(); ~ServantPool(); static ServantPool * getInstance(); };
Servant.cpp文件内容如下:
#include#include#include#include#include#include#include#include"Servant.h" #define WORKING 1 #define STOP 2 #define EXIT 3 #define IDLE 4 static int num=0; static pthread_mutex_t vec_mutex; void Servant::start() { state=IDLE; int result=pthread_create(&m_tid,NULL,ThreadProc,this); if(0!=result) { printf("thread create result :%sn",strerror(errno)); } } Servant::Servant(ServantPool* ma) { thread_id=num++; m_pool=ma; pthread_mutex_init(&mutex_x,NULL); pthread_cond_init(&cond_t,NULL); } void Servant::setSocket(int m) { m_socket=m; } void Servant::DoJob() { char buf[100]={0}; recv(m_socket,buf,100,0); printf("we haved the %d recv:%sn",num++,buf); close(m_socket); } void* Servant::ThreadProc(void* data) { Servant* ser=(Servant*)data; int result=0; while(ser->state!=EXIT) { while(ser->state==IDLE) { result=pthread_mutex_lock(&(ser->mutex_x)); if(0==result) { printf("waiting for the mutex n"); } result=pthread_cond_wait(&(ser->cond_t),&(ser->mutex_x)); if(ser->state!=IDLE) goto End; printf("the conditions has been notifiedn"); //we take this thread to busy list ser->m_pool->AddtoIdle(false,ser); ser->m_pool->Addtobusy(true,ser); // really work DoSomething: ser->state=WORKING; printf("Do Something...n"); ser->DoJob(); ser->Complete();//this function change state End: pthread_mutex_unlock(&(ser->mutex_x)); } } return NULL; } void Servant::stop() { if(state==IDLE) { m_pool->AddtoIdle(false,this); m_pool->Addtostop(true,this); state=STOP; printf("thread stop!n"); } else if(state==WORKING) { printf("current state is WORKING stop failed!n"); } else if(state==STOP) { printf("thread already stopped!n"); } else { printf("sorry unknown state!n"); state=STOP; } } void Servant::wakeup() { if(state==STOP) { m_pool->Addtostop(false,this); m_pool->AddtoIdle(true,this); state=IDLE; printf("thread wakeup!n"); } else if(state==WORKING) { printf("current state is WORKING stop failed!n"); } else if(state==IDLE) { printf("current state is idle never need wakeup!n"); } else { printf("sorry unknown state..n"); state=IDLE; } } void Servant::Complete()//完成操作 { //完成任务,该线程变为idle m_pool->Addtobusy(false,this); m_pool->AddtoIdle(true,this); state=IDLE; } void Servant::join() { pthread_join(m_tid,NULL); } void Servant::notify() { if(state==IDLE) { printf("we have notified thread runningn"); pthread_cond_signal(&cond_t); } else { printf("sorry ,the signal is not correctn"); } } void Servant::exit() { state=EXIT; pthread_cond_signal(&cond_t); } void ServantPool::StopAll() { vector::iterator itr=m_idle_list.begin(); for(;itr!=m_idle_list.end();) { (*itr)->stop(); } itr=m_busy_list.begin(); for(;itr!=m_busy_list.end();) { (*itr)->stop(); } } void ServantPool::create(int num) { int i=0; for(;i<num;i++) { Servant* tmp=new Servant(this); m_idle_list.push_back(tmp); } } void ServantPool::AddtoIdle(bool add,Servant* ser) { if(add) { // add ser to idle list pthread_mutex_lock(&vec_mutex); m_idle_list.push_back(ser); pthread_mutex_unlock(&vec_mutex); } else { // del ser from idle list pthread_mutex_lock(&vec_mutex); vector::iterator itr=m_idle_list.begin(); for(;itr!=m_idle_list.end();itr++) { if(*itr==ser) { m_idle_list.erase(itr); break; } } pthread_mutex_unlock(&vec_mutex); } } void ServantPool::Addtobusy(bool add,Servant* ser) { if(add) { // add ser to idle list pthread_mutex_lock(&vec_mutex); m_busy_list.push_back(ser); pthread_mutex_unlock(&vec_mutex); } else { // del ser from idle list pthread_mutex_lock(&vec_mutex); vector::iterator itr=m_busy_list.begin(); for(;itr!=m_busy_list.end();itr++) { if(*itr==ser) { m_busy_list.erase(itr); break; } } pthread_mutex_unlock(&vec_mutex); } } void ServantPool::Addtostop(bool add,Servant* ser) { if(add) { // add ser to idle list pthread_mutex_lock(&vec_mutex); m_stop_list.push_back(ser); pthread_mutex_unlock(&vec_mutex); } else { // del ser from idle list pthread_mutex_lock(&vec_mutex); vector::iterator itr=m_stop_list.begin(); for(;itr!=m_stop_list.end();itr++) { if(*itr==ser) { m_stop_list.erase(itr); break; } } pthread_mutex_unlock(&vec_mutex); } } void ServantPool::startAll() { int i=0; for(;istart();//create the thread } } void ServantPool::stop(Servant* id) { vector::iterator itr=m_idle_list.begin(); for(;itr!=m_idle_list.end();itr++) { if((*itr)==id) { (*itr)->stop(); return; } } } void ServantPool::waitforAll() { int i=0; int nums=m_busy_list.size(); for(;ijoin(); } nums=m_idle_list.size(); i=0; for(;ijoin(); } } void ServantPool::wakeup(Servant* id) { vector::iterator itr=m_busy_list.begin(); for(;itr!=m_busy_list.end();itr++) { if((*itr)==id) { (*itr)->wakeup(); return; } } } ServantPool * ServantPool::hInstance=NULL; ServantPool * ServantPool::getInstance() { if(NULL==hInstance) { hInstance=new ServantPool(); pthread_mutex_init(&vec_mutex,NULL); } return hInstance; } ServantPool::~ServantPool() { vector::iterator itr=m_idle_list.begin(); for(;itr!=m_idle_list.end();) { (*itr)->exit(); delete *itr; itr=m_idle_list.erase(itr); } itr=m_busy_list.begin(); for(;itr!=m_busy_list.end();) { (*itr)->exit(); delete *itr; itr=m_busy_list.erase(itr); } itr=m_stop_list.begin(); for(;itr!=m_stop_list.end();) { (*itr)->exit(); delete *itr; itr=m_stop_list.erase(itr); } } Servant* ServantPool::Accepting() { if(m_idle_list.size()>0) { return m_idle_list[0]; } return NULL; }
main.c文件如下:
#include#include#include#include#include"Servant.h" //create 100 servants thread #define SERVANT_NUM 1 int main() { ServantPool *mm=ServantPool::getInstance(); printf("we begin create %d Servant threads n",SERVANT_NUM); mm->create(SERVANT_NUM); mm->start(); mm->waitforAll(); return 0; }
makefile文件内容如下:
test:Servant.o main.o g++ -o test Servant.o main.o -lpthread Servant.o:Servant.cpp Servant.h g++ -g -c Servant.cpp -lpthread main.o:main.c Servant.h g++ -g -c main.c -lpthread clean: rm *.o test
这里面需要注意一点,就是,在修改线程状态时,首先应该修改的是线程状态值,然后才继续进行信号通知,这样可以避免,当信号通知时,状态值还没有发生变化!!!
信号修改失败。好了,这就是暂时要表达的线程池的内容了,如有错误,该请大神指正