发一个Linux下的线程池
时间:2010-05-24 来源:iacrqq
|
大二学习VC++时侯就在考虑写个线程池,由于当时的水平有限,一直没写出来,最近闲着没事就想到了线程池,就下定决心写一个简单的线程池实现以前的夙愿。当然如今我是Linux fans,早就将Microsoft那些东西抛弃了,于是就在Linux下写了。核心思想是通过一个条件变量来触发线程的执行,当给线程设置了任务后就触发条件变量,线程就会从阻塞于条件变量的状态被唤醒,并开始执行新任务。
1. 定义一个任务数据结构,当然不是Linux内核的任务数据结构哦,呵呵。
/**
* 定义一个表示任务的数据结构
* task是一个函数指针,函数原型为 int function(void* data)
* data是传递给task函数的参数
*/
typedef struct _TASK
{
int (*task)(void*);
void* data;
}TASK;
2. 定义一个表示线程的类Thread:
#include <pthread.h>
/**
* 自定义线程
*
*
*/
class Thread
{
public:
Thread();
~Thread();
int init(); // 初始化函数,成功返回0,失败返回非0
void setTask(TASK* task); // 设置任务
void start(); // 开始处理任务
int isIdel(); // 线程是否空闲,空闲放回1,忙返回0
void destroy(); // 销毁函数,做一些清理工作
private:
int idel; // 线程空闲标记,空闲1,忙0
pthread_rwlock_t idel_rwlock; // Read Write Lock,保护变量idel
pthread_t thread; // 指向pthread库线程
pthread_mutex_t mutex; // 互斥量,保护条件变量condition
pthread_cond_t condition; // 条件变量,控制pthread库线程
pthread_mutex_t task_mutex; // 互斥量,保护变量task
TASK* task; // 任务
static void* proccess(void* ptr); // pthread库线程函数
};
3. 下面是类Thread的实现:
#include <iostream>
#include <assert.h>
#include "thread.h"
Thread::Thread()
{
}
Thread::~Thread()
{
destroy();
}
int Thread::init()
{
int success = 0;
idel = 1;
success = pthread_mutex_init(&(this->task_mutex), NULL);
if(success)
{
std::cout<<"[ERROR] init failed, cased by create mutex failed."
<<std::endl;
//destroy(); // No need
return success;
}
success = pthread_mutex_init(&(this->mutex), NULL);
if(success)
{
std::cout<<"[ERROR] init failed, cased by create mutex failed."
<<std::endl;
//destroy();
return success;
}
success = pthread_cond_init(&(this->condition), NULL);
if(success)
{
std::cout<<"[ERROR] init failed, cased by create condition failed."
<<std::endl;
//destroy();
return success;
}
success = pthread_rwlock_init(&(this->idel_rwlock), NULL);
if(success)
{
std::cout<<"[ERROR] init failed, cased by create rwlock failed."
<<std::endl;
//destroy();
return success;
}
success = pthread_create(&(this->thread), NULL, proccess, this);
if(success)
{
std::cout<<"[ERROR] init failed, cased by create thread failed."
<<std::endl;
//destroy();
return success;
}
std::cout<<"[INFO] thread init ok"
<<std::endl;
return success;
}
void Thread:: destroy()
{
pthread_rwlock_destroy(&(this->idel_rwlock));
pthread_mutex_destroy(&(this->mutex));
pthread_mutex_destroy(&(this->task_mutex));
pthread_cond_destroy(&(this->condition));
}
void Thread::setTask(TASK* task)
{
assert(task != NULL);
pthread_mutex_lock(&(this->task_mutex));
this->task = task;
}
void Thread::start()
{
pthread_rwlock_wrlock(&(this->idel_rwlock));
this->idel = 0;
pthread_rwlock_unlock(&(this->idel_rwlock));
pthread_mutex_lock(&(this->mutex));
pthread_cond_signal(&(this->condition));
pthread_mutex_unlock(&(this->mutex));
}
int Thread::isIdel()
{
int idel = 0;
pthread_rwlock_rdlock(&(this->idel_rwlock));
idel = this->idel;
pthread_rwlock_unlock(&(this->idel_rwlock));
return idel;
}
void* Thread::proccess(void* ptr)
{
Thread *p = (Thread*)ptr;
for(;;)
{
pthread_cond_wait(&(p->condition),&(p->mutex));
std::cout<<"[INFO] in thread"
<<std::endl;
// 执行任务
p->task->task(p->task->data);
p->task = NULL;
pthread_mutex_unlock(&(p->task_mutex)); // must here unlock
pthread_rwlock_wrlock(&(p->idel_rwlock));
p->idel = 1;
pthread_rwlock_unlock(&(p->idel_rwlock));
}
return 0;
}
4. 下面定义一个表示线程池的类ThreadPool:
#include "thread.h"
class ThreadPool
{
int size; // 线程池大小
Thread* threads; // 线程数组
public:
ThreadPool();
~ThreadPool();
int init(int size); // 初始化函数,按给定的size初始化线程池
void destroy(); // 销毁函数,主要是销毁线程中的所有线程
Thread* getIdelThread(); // 获得线程池中第一个空闲线程,没用空闲线程返回NULL
};
5. 下面是ThreadPool的实现:
#include <assert.h>
#include "threadpool.h"
ThreadPool::ThreadPool()
{
size = 0;
threads = NULL;
}
ThreadPool::~ThreadPool()
{
destroy();
}
int ThreadPool::init(int size)
{
assert(size > 0);
this->size = size;
this->threads = new Thread[size];
for(int i = 0; i < size; i++)
{
if(threads[i].init())
{
destroy();
return -1;
}
}
return 0;
}
void ThreadPool::destroy()
{
delete [] threads;
}
Thread* ThreadPool::getIdelThread()
{
Thread* thread = NULL;
for(int i = 0; i < size; i++)
{
if(threads[i].isIdel())
{
thread = &(threads[i]);
break;
}
}
return thread;
}
6. 下面写一个测试程序test.cc,为了是其简单,使用了禁止使用的GOTO语句 #include <iostream>
#include "thread.h" #include "threadpool.h"
/** * 各任务的处理函数 * */ int function1(void* data); int function2(void* data); int function3(void* data); int function4(void* data); int function5(void* data); int function6(void* data); int function7(void* data); int function8(void* data);
int main(int argc, char** argv) { ThreadPool threadPool; if(threadPool.init(4)) { std::cout<<"[ERROR] threadPool.init failed" << std::endl; return -1; } // 循环次数 int n = 1000000000;
// Run task 1 TASK task1; task1.task = &function1; task1.data = (void*)(&n); L1: Thread* pThread = threadPool.getIdelThread(); if(pThread != NULL) { pThread->setTask(&task1); pThread->start(); } else { //std::cout<<"[INFO] there is no idel thread in the pool" // << std::endl; // 再次尝试获得空闲的Thread,这里是不可能的,这时肯定有四个空闲的Thread goto L1; }
// Run task 2 TASK task2; task2.task = &function2; task2.data = (void*)(&n); L2: pThread = threadPool.getIdelThread(); if(pThread != NULL) { pThread->setTask(&task2); pThread->start(); } else { //std::cout<<"[INFO] there is no idel thread in the pool" // << std::endl; // 再次尝试获得空闲的Thread,这里是不可能的,这时至少有三个空闲的Thread goto L2; }
// Run task 3 TASK task3; task3.task = &function3; task3.data = (void*)(&n); L3: pThread = threadPool.getIdelThread(); if(pThread != NULL) { pThread->setTask(&task3); pThread->start(); } else { //std::cout<<"[INFO] there is no idel thread in the pool" // << std::endl; // 再次尝试获得空闲的Thread,这里是不可能的,这时至少有两个空闲的Thread goto L3; } // Run task 4 TASK task4; task4.task = &function4; task4.data = (void*)(&n); L4: pThread = threadPool.getIdelThread(); if(pThread != NULL) { pThread->setTask(&task4); pThread->start(); } else { //std::cout<<"[INFO] there is no idel thread in the pool" // << std::endl; // 再次尝试获得空闲的Thread,这里是不可能的,这时至少有一个空闲的Thread goto L4; }
// Run task 5 TASK task5; task5.task = &function5; task5.data = (void*)(&n); L5: pThread = threadPool.getIdelThread(); if(pThread != NULL) { pThread->setTask(&task5); pThread->start(); } else { //std::cout<<"[INFO] there is no idel thread in the pool" // << std::endl; // 再次尝试获得空闲的Thread,这里是可能的 goto L5; }
// Run task 6 TASK task6; task6.task = &function6; task6.data = (void*)(&n); L6: pThread = threadPool.getIdelThread(); if(pThread != NULL) { pThread->setTask(&task6); pThread->start(); } else { //std::cout<<"[INFO] there is no idel thread in the pool" // << std::endl; // 再次尝试获得空闲的Thread,这里是可能的 goto L6; }
// Run task 7 TASK task7; task7.task = &function7; task7.data = (void*)(&n); L7: pThread = threadPool.getIdelThread(); if(pThread != NULL) { pThread->setTask(&task7); pThread->start(); } else { //std::cout<<"[INFO] there is no idel thread in the pool" // << std::endl; // 再次尝试获得空闲的Thread,这里是可能的 goto L7; } // Run task 8 TASK task8; task8.task = &function8; task8.data = (void*)(&n); L8: pThread = threadPool.getIdelThread(); if(pThread != NULL) { pThread->setTask(&task8); pThread->start(); } else { //std::cout<<"[INFO] there is no idel thread in the pool" // << std::endl; // 再次尝试获得空闲的Thread,这里是可能的 goto L8; }
// 等待所有任务完成 while(1) { }
return 0; }
/** * 任务1的处理函数,只是空循环浪费CPU时间 * */ int function1(void* data) { int n = *((int*)(data)); std::cout<<"[INFO] task1 running" <<std::endl; for(int i = 0; i < n; i++) { } std::cout<<"[INFO] task1 end" <<std::endl; return 0; }
/** * 任务2的处理函数,只是空循环浪费CPU时间 * */ int function2(void* data) { int n = *((int*)(data)); std::cout<<"[INFO] task2 running" <<std::endl; for(int i = 0; i < n; i++) { } std::cout<<"[INFO] task2 end" <<std::endl; return 0; }
/** * 任务3的处理函数,只是空循环浪费CPU时间 * */ int function3(void* data) { int n = *((int*)(data)); std::cout<<"[INFO] task3 running" <<std::endl; for(int i = 0; i < n; i++) { } std::cout<<"[INFO] task3 end" <<std::endl; return 0; }
/** * 任务4的处理函数,只是空循环浪费CPU时间 * */ int function4(void* data) {
int n = *((int*)(data)); std::cout<<"[INFO] task4 running" <<std::endl; for(int i = 0; i < n; i++) { } std::cout<<"[INFO] task4 end" <<std::endl; return 0; }
/** * 任务5的处理函数,只是空循环浪费CPU时间 * */ int function5(void* data) { int n = *((int*)(data)); std::cout<<"[INFO] task5 running" <<std::endl; for(int i = 0; i < n; i++) { } std::cout<<"[INFO] task5 end" <<std::endl; return 0; }
/** * 任务6的处理函数,只是空循环浪费CPU时间 * */ int function6(void* data) {
int n = *((int*)(data)); std::cout<<"[INFO] task6 running" <<std::endl; for(int i = 0; i < n; i++) { } std::cout<<"[INFO] task6 end" <<std::endl; return 0; }
/** * 任务7的处理函数,只是空循环浪费CPU时间 * */ int function7(void* data) { int n = *((int*)(data)); std::cout<<"[INFO] task7 running" <<std::endl; for(int i = 0; i < n; i++) { } std::cout<<"[INFO] task7 end" <<std::endl; return 0; }
/** * 任务8的处理函数,只是空循环浪费CPU时间 * */ int function8(void* data) {
int n = *((int*)(data)); std::cout<<"[INFO] task8 running" <<std::endl; for(int i = 0; i < n; i++) { } std::cout<<"[INFO] task8 end" <<std::endl; return 0; }
7. 呵呵,就是这么简单,估计肯定有BUG,希望你能在发现了BUG邮件我,我的邮件[email protected],欢迎大家指出不足之处,也欢迎大家和我交流。
这个程序代码文件和Makefile,在我上传的文件,有意请下载。
6. 下面写一个测试程序test.cc,为了是其简单,使用了禁止使用的GOTO语句 #include <iostream>
#include "thread.h" #include "threadpool.h"
/** * 各任务的处理函数 * */ int function1(void* data); int function2(void* data); int function3(void* data); int function4(void* data); int function5(void* data); int function6(void* data); int function7(void* data); int function8(void* data);
int main(int argc, char** argv) { ThreadPool threadPool; if(threadPool.init(4)) { std::cout<<"[ERROR] threadPool.init failed" << std::endl; return -1; } // 循环次数 int n = 1000000000;
// Run task 1 TASK task1; task1.task = &function1; task1.data = (void*)(&n); L1: Thread* pThread = threadPool.getIdelThread(); if(pThread != NULL) { pThread->setTask(&task1); pThread->start(); } else { //std::cout<<"[INFO] there is no idel thread in the pool" // << std::endl; // 再次尝试获得空闲的Thread,这里是不可能的,这时肯定有四个空闲的Thread goto L1; }
// Run task 2 TASK task2; task2.task = &function2; task2.data = (void*)(&n); L2: pThread = threadPool.getIdelThread(); if(pThread != NULL) { pThread->setTask(&task2); pThread->start(); } else { //std::cout<<"[INFO] there is no idel thread in the pool" // << std::endl; // 再次尝试获得空闲的Thread,这里是不可能的,这时至少有三个空闲的Thread goto L2; }
// Run task 3 TASK task3; task3.task = &function3; task3.data = (void*)(&n); L3: pThread = threadPool.getIdelThread(); if(pThread != NULL) { pThread->setTask(&task3); pThread->start(); } else { //std::cout<<"[INFO] there is no idel thread in the pool" // << std::endl; // 再次尝试获得空闲的Thread,这里是不可能的,这时至少有两个空闲的Thread goto L3; } // Run task 4 TASK task4; task4.task = &function4; task4.data = (void*)(&n); L4: pThread = threadPool.getIdelThread(); if(pThread != NULL) { pThread->setTask(&task4); pThread->start(); } else { //std::cout<<"[INFO] there is no idel thread in the pool" // << std::endl; // 再次尝试获得空闲的Thread,这里是不可能的,这时至少有一个空闲的Thread goto L4; }
// Run task 5 TASK task5; task5.task = &function5; task5.data = (void*)(&n); L5: pThread = threadPool.getIdelThread(); if(pThread != NULL) { pThread->setTask(&task5); pThread->start(); } else { //std::cout<<"[INFO] there is no idel thread in the pool" // << std::endl; // 再次尝试获得空闲的Thread,这里是可能的 goto L5; }
// Run task 6 TASK task6; task6.task = &function6; task6.data = (void*)(&n); L6: pThread = threadPool.getIdelThread(); if(pThread != NULL) { pThread->setTask(&task6); pThread->start(); } else { //std::cout<<"[INFO] there is no idel thread in the pool" // << std::endl; // 再次尝试获得空闲的Thread,这里是可能的 goto L6; }
// Run task 7 TASK task7; task7.task = &function7; task7.data = (void*)(&n); L7: pThread = threadPool.getIdelThread(); if(pThread != NULL) { pThread->setTask(&task7); pThread->start(); } else { //std::cout<<"[INFO] there is no idel thread in the pool" // << std::endl; // 再次尝试获得空闲的Thread,这里是可能的 goto L7; } // Run task 8 TASK task8; task8.task = &function8; task8.data = (void*)(&n); L8: pThread = threadPool.getIdelThread(); if(pThread != NULL) { pThread->setTask(&task8); pThread->start(); } else { //std::cout<<"[INFO] there is no idel thread in the pool" // << std::endl; // 再次尝试获得空闲的Thread,这里是可能的 goto L8; }
// 等待所有任务完成 while(1) { }
return 0; }
/** * 任务1的处理函数,只是空循环浪费CPU时间 * */ int function1(void* data) { int n = *((int*)(data)); std::cout<<"[INFO] task1 running" <<std::endl; for(int i = 0; i < n; i++) { } std::cout<<"[INFO] task1 end" <<std::endl; return 0; }
/** * 任务2的处理函数,只是空循环浪费CPU时间 * */ int function2(void* data) { int n = *((int*)(data)); std::cout<<"[INFO] task2 running" <<std::endl; for(int i = 0; i < n; i++) { } std::cout<<"[INFO] task2 end" <<std::endl; return 0; }
/** * 任务3的处理函数,只是空循环浪费CPU时间 * */ int function3(void* data) { int n = *((int*)(data)); std::cout<<"[INFO] task3 running" <<std::endl; for(int i = 0; i < n; i++) { } std::cout<<"[INFO] task3 end" <<std::endl; return 0; }
/** * 任务4的处理函数,只是空循环浪费CPU时间 * */ int function4(void* data) {
int n = *((int*)(data)); std::cout<<"[INFO] task4 running" <<std::endl; for(int i = 0; i < n; i++) { } std::cout<<"[INFO] task4 end" <<std::endl; return 0; }
/** * 任务5的处理函数,只是空循环浪费CPU时间 * */ int function5(void* data) { int n = *((int*)(data)); std::cout<<"[INFO] task5 running" <<std::endl; for(int i = 0; i < n; i++) { } std::cout<<"[INFO] task5 end" <<std::endl; return 0; }
/** * 任务6的处理函数,只是空循环浪费CPU时间 * */ int function6(void* data) {
int n = *((int*)(data)); std::cout<<"[INFO] task6 running" <<std::endl; for(int i = 0; i < n; i++) { } std::cout<<"[INFO] task6 end" <<std::endl; return 0; }
/** * 任务7的处理函数,只是空循环浪费CPU时间 * */ int function7(void* data) { int n = *((int*)(data)); std::cout<<"[INFO] task7 running" <<std::endl; for(int i = 0; i < n; i++) { } std::cout<<"[INFO] task7 end" <<std::endl; return 0; }
/** * 任务8的处理函数,只是空循环浪费CPU时间 * */ int function8(void* data) {
int n = *((int*)(data)); std::cout<<"[INFO] task8 running" <<std::endl; for(int i = 0; i < n; i++) { } std::cout<<"[INFO] task8 end" <<std::endl; return 0; }
7. 呵呵,就是这么简单,估计肯定有BUG,希望你能在发现了BUG邮件我,我的邮件[email protected],欢迎大家指出不足之处,也欢迎大家和我交流。
这个程序代码文件和Makefile,在我上传的文件,有意请下载。
相关阅读 更多 +
排行榜 更多 +