文章详情

  • 游戏榜单
  • 软件榜单
关闭导航
热搜榜
热门下载
热门标签
php爱好者> php文档>发一个Linux下的线程池

发一个Linux下的线程池

时间:2010-05-24  来源:iacrqq

文件: threadpool.tar.gz
大小: 2KB
下载: 下载
发一个Linux下的线程池
    大二学习VC++时侯就在考虑写个线程池,由于当时的水平有限,一直没写出来,最近闲着没事就想到了线程池,就下定决心写一个简单的线程池实现以前的夙愿。当然如今我是Linux fans,早就将Microsoft那些东西抛弃了,于是就在Linux下写了。核心思想是通过一个条件变量来触发线程的执行,当给线程设置了任务后就触发条件变量,线程就会从阻塞于条件变量的状态被唤醒,并开始执行新任务。
    
 1. 定义一个任务数据结构,当然不是Linux内核的任务数据结构哦,呵呵。
/**
 * 定义一个表示任务的数据结构
 * task是一个函数指针,函数原型为 int function(void* data)
 * data是传递给task函数的参数
 */
typedef struct _TASK
{
     int (*task)(void*);
     void* data;
}TASK;

2. 定义一个表示线程的类Thread:
#include <pthread.h>
/**
 * 自定义线程
 *
 *
 */
class Thread
{
    public:
        Thread();
~Thread();

int  init();    // 初始化函数,成功返回0,失败返回非0
void setTask(TASK* task); // 设置任务
void start(); // 开始处理任务
int isIdel(); // 线程是否空闲,空闲放回1,忙返回0
void destroy(); // 销毁函数,做一些清理工作

    private:
int idel; // 线程空闲标记,空闲1,忙0
pthread_rwlock_t   idel_rwlock; // Read Write Lock,保护变量idel
pthread_t    thread; // 指向pthread库线程
pthread_mutex_t mutex; // 互斥量,保护条件变量condition
pthread_cond_t condition; // 条件变量,控制pthread库线程
pthread_mutex_t task_mutex; // 互斥量,保护变量task
TASK* task; // 任务
static void* proccess(void* ptr); // pthread库线程函数
};

3. 下面是类Thread的实现:
#include <iostream>
#include <assert.h>
#include "thread.h"

Thread::Thread()
{
}

Thread::~Thread()
{
destroy();
}

int Thread::init()
{
    int success = 0;
    idel = 1;

    success = pthread_mutex_init(&(this->task_mutex), NULL);
    if(success)
    {
        std::cout<<"[ERROR] init failed, cased by create mutex failed."
                 <<std::endl;
//destroy(); // No need
return success;
    }
    success = pthread_mutex_init(&(this->mutex), NULL);
    if(success)
    {
        std::cout<<"[ERROR] init failed, cased by create mutex failed."
                 <<std::endl;
//destroy();
return success;
    }

    success = pthread_cond_init(&(this->condition), NULL);
    if(success)
    {
        std::cout<<"[ERROR] init failed, cased by create condition failed."
                 <<std::endl;
//destroy();
return success;
    }

    success = pthread_rwlock_init(&(this->idel_rwlock), NULL);
    if(success)
    {
        std::cout<<"[ERROR] init failed, cased by create rwlock failed."
                 <<std::endl;
//destroy();
return success;
    }

    success = pthread_create(&(this->thread), NULL, proccess, this);
    if(success)
    {
        std::cout<<"[ERROR] init failed, cased by create thread failed."
 <<std::endl;
//destroy();
return success;
    }
    std::cout<<"[INFO] thread init ok"
     <<std::endl;
    return success;
}

void Thread:: destroy()
{
    pthread_rwlock_destroy(&(this->idel_rwlock));
    pthread_mutex_destroy(&(this->mutex));
    pthread_mutex_destroy(&(this->task_mutex));
    pthread_cond_destroy(&(this->condition));
}

void Thread::setTask(TASK* task)
{
    assert(task != NULL);
    pthread_mutex_lock(&(this->task_mutex));
    this->task = task;
}

void Thread::start()
{
    pthread_rwlock_wrlock(&(this->idel_rwlock));
    this->idel = 0;
    pthread_rwlock_unlock(&(this->idel_rwlock));

    pthread_mutex_lock(&(this->mutex));
    pthread_cond_signal(&(this->condition));
    pthread_mutex_unlock(&(this->mutex));
}

int Thread::isIdel()
{
    int idel = 0;
    pthread_rwlock_rdlock(&(this->idel_rwlock));
    idel = this->idel;
    pthread_rwlock_unlock(&(this->idel_rwlock));
    return idel;
}

void* Thread::proccess(void* ptr)
{
    Thread *p = (Thread*)ptr;

    for(;;)
    {
pthread_cond_wait(&(p->condition),&(p->mutex));
std::cout<<"[INFO] in thread"
                 <<std::endl;
        // 执行任务
p->task->task(p->task->data);
p->task = NULL;
pthread_mutex_unlock(&(p->task_mutex)); // must here unlock
pthread_rwlock_wrlock(&(p->idel_rwlock));
        p->idel = 1;
        pthread_rwlock_unlock(&(p->idel_rwlock));

    }

    return 0;
}

4. 下面定义一个表示线程池的类ThreadPool:
#include "thread.h"

class ThreadPool
{
    int size; // 线程池大小
    Thread* threads; // 线程数组

    public:
        ThreadPool();
~ThreadPool();

int init(int size); // 初始化函数,按给定的size初始化线程池
void destroy(); // 销毁函数,主要是销毁线程中的所有线程
Thread* getIdelThread(); // 获得线程池中第一个空闲线程,没用空闲线程返回NULL
};

5. 下面是ThreadPool的实现:
#include <assert.h>
#include "threadpool.h"

ThreadPool::ThreadPool()
{
    size    = 0;
    threads = NULL;
}

ThreadPool::~ThreadPool()
{
    destroy();
}

int ThreadPool::init(int size)
{
    assert(size > 0);
    this->size = size;
    this->threads = new Thread[size];

    for(int i = 0; i < size; i++)
    {
if(threads[i].init())
{
    destroy();
    return -1;
}
    }
    return 0;
}

void ThreadPool::destroy()
{
    delete [] threads;
}

Thread* ThreadPool::getIdelThread()
{
    Thread* thread = NULL;
    for(int i = 0; i < size; i++)
    {
        if(threads[i].isIdel())
{
    thread = &(threads[i]);
    break;
}
    }
    return thread;
}
6. 下面写一个测试程序test.cc,为了是其简单,使用了禁止使用的GOTO语句 #include <iostream>
#include "thread.h" #include "threadpool.h"
/**  * 各任务的处理函数  *  */ int function1(void* data); int function2(void* data); int function3(void* data); int function4(void* data); int function5(void* data); int function6(void* data); int function7(void* data); int function8(void* data);
int main(int argc, char** argv) {     ThreadPool threadPool;     if(threadPool.init(4))     { std::cout<<"[ERROR] threadPool.init failed"  << std::endl; return -1;     }     // 循环次数     int n = 1000000000;
    // Run task 1     TASK task1;     task1.task = &function1;     task1.data = (void*)(&n); L1: Thread* pThread = threadPool.getIdelThread();     if(pThread != NULL)     { pThread->setTask(&task1); pThread->start();     }     else     { //std::cout<<"[INFO] there is no idel thread in the pool" // << std::endl; // 再次尝试获得空闲的Thread,这里是不可能的,这时肯定有四个空闲的Thread goto L1;     }
    // Run task 2     TASK task2;     task2.task = &function2;     task2.data = (void*)(&n); L2: pThread = threadPool.getIdelThread();     if(pThread != NULL)     {         pThread->setTask(&task2);         pThread->start();     }     else     {         //std::cout<<"[INFO] there is no idel thread in the pool"         //         << std::endl;         // 再次尝试获得空闲的Thread,这里是不可能的,这时至少有三个空闲的Thread goto L2;     }
    // Run task 3     TASK task3;     task3.task = &function3;     task3.data = (void*)(&n); L3: pThread = threadPool.getIdelThread();     if(pThread != NULL)     {         pThread->setTask(&task3);         pThread->start();     }     else     {         //std::cout<<"[INFO] there is no idel thread in the pool"         //         << std::endl; // 再次尝试获得空闲的Thread,这里是不可能的,这时至少有两个空闲的Thread goto L3;     }          // Run task 4     TASK task4;     task4.task = &function4;     task4.data = (void*)(&n); L4: pThread = threadPool.getIdelThread();     if(pThread != NULL)     {         pThread->setTask(&task4);         pThread->start();     }     else     {         //std::cout<<"[INFO] there is no idel thread in the pool"         //         << std::endl;         // 再次尝试获得空闲的Thread,这里是不可能的,这时至少有一个空闲的Thread goto L4;     }
    // Run task 5     TASK task5;     task5.task = &function5;     task5.data = (void*)(&n); L5: pThread = threadPool.getIdelThread();     if(pThread != NULL)     {         pThread->setTask(&task5);         pThread->start();     }     else     {         //std::cout<<"[INFO] there is no idel thread in the pool"         //         << std::endl;         // 再次尝试获得空闲的Thread,这里是可能的 goto L5;     }
    // Run task 6     TASK task6;     task6.task = &function6;     task6.data = (void*)(&n); L6: pThread = threadPool.getIdelThread();     if(pThread != NULL)     {         pThread->setTask(&task6);         pThread->start();     }     else     {         //std::cout<<"[INFO] there is no idel thread in the pool"         //         << std::endl;         // 再次尝试获得空闲的Thread,这里是可能的 goto L6;     }
    // Run task 7     TASK task7;     task7.task = &function7;     task7.data = (void*)(&n); L7: pThread = threadPool.getIdelThread();     if(pThread != NULL)     {         pThread->setTask(&task7);         pThread->start();     }     else     {         //std::cout<<"[INFO] there is no idel thread in the pool"         //         << std::endl; // 再次尝试获得空闲的Thread,这里是可能的 goto L7;     }          // Run task 8     TASK task8;     task8.task = &function8;     task8.data = (void*)(&n); L8: pThread = threadPool.getIdelThread();     if(pThread != NULL)     {         pThread->setTask(&task8);         pThread->start();     }     else     {         //std::cout<<"[INFO] there is no idel thread in the pool"         //         << std::endl;         // 再次尝试获得空闲的Thread,这里是可能的 goto L8;     }
   // 等待所有任务完成    while(1)    {    }
   return 0; }
/**  * 任务1的处理函数,只是空循环浪费CPU时间  *  */ int function1(void* data) { int n = *((int*)(data)); std::cout<<"[INFO] task1 running" <<std::endl; for(int i = 0; i < n; i++) { } std::cout<<"[INFO] task1 end"                  <<std::endl; return 0; }
/**  * 任务2的处理函数,只是空循环浪费CPU时间  *  */ int function2(void* data) {         int n = *((int*)(data));         std::cout<<"[INFO] task2 running"                  <<std::endl;         for(int i = 0; i < n; i++)         {         }         std::cout<<"[INFO] task2 end"                  <<std::endl; return 0; }
/**  * 任务3的处理函数,只是空循环浪费CPU时间  *  */ int function3(void* data) {         int n = *((int*)(data));         std::cout<<"[INFO] task3 running"                  <<std::endl;         for(int i = 0; i < n; i++)         {         }         std::cout<<"[INFO] task3 end"                  <<std::endl; return 0; }
/**  * 任务4的处理函数,只是空循环浪费CPU时间  *  */ int function4(void* data) {
        int n = *((int*)(data));         std::cout<<"[INFO] task4 running"                  <<std::endl;         for(int i = 0; i < n; i++)         {         }         std::cout<<"[INFO] task4 end"                  <<std::endl; return 0; }
/**  * 任务5的处理函数,只是空循环浪费CPU时间  *  */ int function5(void* data) {         int n = *((int*)(data));         std::cout<<"[INFO] task5 running"                  <<std::endl;         for(int i = 0; i < n; i++)         {         }         std::cout<<"[INFO] task5 end"                  <<std::endl; return 0; }
/**  * 任务6的处理函数,只是空循环浪费CPU时间  *  */ int function6(void* data) {
        int n = *((int*)(data));         std::cout<<"[INFO] task6 running"                  <<std::endl;         for(int i = 0; i < n; i++)         {         }         std::cout<<"[INFO] task6 end"                  <<std::endl; return 0; }
/**  * 任务7的处理函数,只是空循环浪费CPU时间  *  */ int function7(void* data) {         int n = *((int*)(data));         std::cout<<"[INFO] task7 running"                  <<std::endl;         for(int i = 0; i < n; i++)         {         }         std::cout<<"[INFO] task7 end"                  <<std::endl; return 0; }
/**  * 任务8的处理函数,只是空循环浪费CPU时间  *  */ int function8(void* data) {
        int n = *((int*)(data));         std::cout<<"[INFO] task8 running"                  <<std::endl;         for(int i = 0; i < n; i++)         {         }         std::cout<<"[INFO] task8 end"                  <<std::endl; return 0; }

7. 呵呵,就是这么简单,估计肯定有BUG,希望你能在发现了BUG邮件我,我的邮件[email protected],欢迎大家指出不足之处,也欢迎大家和我交流。
这个程序代码文件和Makefile,在我上传的文件,有意请下载。
排行榜 更多 +
阿克里危机手机版下载

阿克里危机手机版下载

飞行射击 下载
贪婪洞窟重生手游下载

贪婪洞窟重生手游下载

角色扮演 下载
贡贡托儿所手机版下载

贡贡托儿所手机版下载

休闲益智 下载