线程控制总结及多线程经典案例

  • Post author:
  • Post category:其他




线程终止

Linux下有两种方式可以使线程终止:

  • 通过return 从线程函数返回
  • 通过调用函数pthread_exit()使线程退出
#include<pthread.h>
void pthread_exit(void *retval);

两种特殊情况

  • 在主线程中,如果从main函数返回或是调用了exit函数退出主线程,则整个进程终止,此时进程中所有线程也将终止
  • 在主线程中,如果调用pthread_exit()函数,则仅仅是主线程消亡,进程不会结束,进程内其他线程也不会终止,直到所有线程结束,进程才会结束。

两个问题:

  • 线程终止必须

    释放其占有的临界资源

    ,采用pthread_cleanup_push()、pthread_cleanup_pop()函数用于自动释放资源
#include<pthread.h>
#define pthread_cleanup_push(routine,arg){
	struct _pthread_cleanup_buffer buffer;
		_pthread_cleanup_push(&buffer,(routine),(srg));
#define pthread_cleanup_pop
		_pthread_cleanup_pop(&buffer,(exeute));}

  • 线程间的同步问题


    函数pthread_join()用来等待一个线程的结束
#include<pthread.h>
void pthread_exit(void *retval);
int  pthread_join(pthread_t th,void *thread_return);
int pthread_detach(pthrerad_t th);

一个线程仅允许一个线程使用pthread_join()等待它的终止,并且被等待的线程应该处于可join状态。为了避免内存泄漏,所有线程终止时,要么被设为DETACHED,要么使用pthread_join()来回收资源。



私有数据

一键多值:好像是对一个变量进行访问,其实是在访问不同的数据

操作线程私有数据的函数主要有:

#include<pthread.h>
int pthread_key_create(pthread_key_t *key,void (*destr_function)(void*));//创建一个键
int pthread_setspecific(pthread_key_t key,const void* pointer);//为一个键设置进程私有数据
void *pthread_getspecific(pthread_key_t key);//从一个键读取线程私有数据
int pthread_key_delete(pthread_key_t key);//删除一个键
  • pthread_key_create:第一个参数为指向键值的指针,第二个参数为一个函数指针,如果指针不为空,则在线程退出时将以key所关联的数据为参数调用destr_function(),释放分配的缓冲区。



线程同步

处理线程间的同步问题最常用的有互斥锁、条件变量、异步信号。



  • 互斥锁
操作 函数
初始化 pthread_mutex_init()
加锁 pthread_mutex_lock()
测试加锁 pthread_mutex_trylock()
解锁 pthread_mutex_unlock()
销毁 pthread_mutex_destory()


  • 条件变量

使用条件变量主要包括两个动作,一个等待使用资源的线程等待“条件变量被设置为真”,另一个线程在使用完资源后“设置条件为真”,从而保证线程间的同步。

操作 函数
初始化 pthread_cond_init
无条件等待 pthread_cond_wait
计时等待 pthread_cond_timewait
解除特定线程的阻塞 pthread_cond_signal
解除所有线程的阻塞 pthread_cond_broadcast
清除条件变量 pthread_cond_deestory



多线程经典案例



  • 生产者消费者问题

生产者消费者共享缓冲区,生产者向缓冲区中放数据,消费者从缓冲取中取数据,当缓冲区中被放满时,生产者进程就必须进入挂起状态,直到消费者从缓冲中取走数据时,生产者才能继续向缓冲区中存放数据,同样当缓冲取中没有数据时,消费者进程就必须进入挂起休眠状态,直到生产者向缓冲区中放入数据时,消费者才能被唤醒继续从缓冲区中取走数据。

需要注意的问题:

  • 缓冲区为空时,消费者不能再进行消费
  • 缓冲区为满时,生产者不能再进行生产
  • 在一个线程进行生产或消费时,其余线程不能再进行生产或消费,因此要保证线程的同步。
  • 生产者线程在生产之前,需要wait直到获取自己所需要的信号后,才会进行生产操作,消费者线程在消费之前,需要wait直到获取自己所需要的信号后,才会进行消费操作。
  • 为避免多个线程同时访问资源造成混乱,需要对共享资源加锁,从而保证某时刻只有一个线程在访问共享资源。

实现代码如下:

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>
#include<stdlib.h>

#define N 100
#define true 1
#define producerNum  10
#define consumerNum  5
#define sleepTime 1000

typedef int semaphore;
typedef int item;
item buffer[N] = {0};
int in = 0;
int out = 0;
int proCount = 0;
semaphore mutex = 1, empty = N, full = 0, proCmutex = 1;

void * producer(void * a){
    while(true){
        while(proCmutex <= 0);
        proCmutex--;
        proCount++;
        printf("生产一个产品ID%d, 缓冲区位置为%d\n",proCount,in);
        proCmutex++;

        while(empty <= 0){
            printf("缓冲区已满!\n");
        }
        empty--;

        while(mutex <= 0);
        mutex--;

        buffer[in] = proCount;
        in = (in + 1) % N;

        mutex++;
        full++;
        sleep(sleepTime);
    }
}

void * consumer(void *b){
    while(true){
        while(full <= 0){
            printf("缓冲区为空!\n");
        }
        full--;

        while(mutex <= 0);
        mutex--;

        int nextc = buffer[out];
        buffer[out] = 0;//消费完将缓冲区设置为0

        out = (out + 1) % N;

        mutex++;
        empty++;

        printf(“消费一个产品ID%d,缓冲区位置为%d\n", nextc,out);
        sleep(sleepTime);
    }
}

int main()
{
    pthread_t threadPool[producerNum+consumerNum];//创建线程池
    int i;
    for(i = 0; i < producerNum; i++){
        pthread_t temp;
        if(pthread_create(&temp, NULL, producer, NULL) == -1){
            printf("ERROR, fail to create producer%d\n", i);
            exit(1);
        }
        threadPool[i] = temp;
    }//创建生产者进程放入线程池


    for(i = 0; i < consumerNum; i++){
        pthread_t temp;
        if(pthread_create(&temp, NULL, consumer, NULL) == -1){
            printf("ERROR, fail to create consumer%d\n", i);
            exit(1);
        }
        threadPool[i+producerNum] = temp;
    }//创建消费者进程放入线程池


    void * result;
    for(i = 0; i < producerNum+consumerNum; i++){
        if(pthread_join(threadPool[i], &result) == -1){
            printf("fail to recollect\n");
            exit(1);
        }
    }//运行线程池
    return 0;
}


  • 哲学家问题

五个哲学家绕着圆桌坐,每个哲学家面前有一盘面,两人之间有一支筷子,这样每个哲学家左右各有一支筷子。哲学家有2个状态,思考或者拿起筷子吃饭。如果哲学家拿到一只筷子,不能吃饭,直到拿到2只才能吃饭,并且一次只能拿起身边的一支筷子。一旦拿起便不会放下筷子直到把饭吃完,此时才把这双筷子放回原处。如果,很不幸地,每个哲学家拿起他或她左边的筷子,那么就没有人可以吃到饭了。



至多只允许有四位哲学家同时去拿左边的筷子,最终能保证至少有一位哲学家能够进餐,并在用餐完毕后能释放他占用的筷子,从而使别的哲学家能够进餐;

假设哲学家的编号是A、B、C、D、E,筷子编号是1、2、3、4、5,哲学家和筷子围成一圈如下图所示:

在这里插入图片描述

代码实现:

#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <errno.h>

pthread_mutex_t chopstick[6];

void *eat_think(void *arg)
{
    char phi = *(char*)arg;
    int left, right;
    int i = 0;

    switch(phi)
    {
        case 'A':
            left = 5;
            right = 1;
            break;
        case 'B':
            left = 1;
            right = 2;
            break;
        case 'C':
            left = 2;
            right = 3;
            break;
        case 'D':
            left = 3;
            right = 4;
            break;
        case 'E':
            left = 4;
            right = 5;
            break;
        default:
            break;
    }

    while(1)
    {
        sleep(2);
        pthread_mutex_lock(&chopstick[left]);  //拿起左边的筷子
        printf("哲学家%c拿起筷子%d\n", phi, left);
        if(pthread_mutex_trylock(&chopstick[right]) == EBUSY){  //拿起右边的筷子
            pthread_mutex_unlock(&chopstick[left]);  //右边的筷子被拿走则放下左边的筷子
            continue;
        }
        printf("哲学家%c拿起筷子%d\n", phi, right);
        printf("哲学家%c吃饭\n", phi);
        sleep(2);  //吃饭

        pthread_mutex_unlock(&chopstick[left]);  //放下左边的筷子
        printf("哲学家%c放下筷子%d\n", phi, left);
        pthread_mutex_unlock(&chopstick[right]);  //放下右边的筷子
        printf("哲学家%c放下筷子%d\n", phi, right);
    }
}

int main()
{
    pthread_t A, B, C, D, E;  //五位哲学家
    for(int i = 0; i < 5; ++i)
    {
        pthread_mutex_init(&chopstick[i], NULL);
    }

    pthread_create(&A, NULL, eat_think, "A");
    pthread_create(&B, NULL, eat_think, "B");
    pthread_create(&C, NULL, eat_think, "C");
    pthread_create(&D, NULL, eat_think, "D");
    pthread_create(&E, NULL, eat_think, "E");

    pthread_join(A, NULL);
    pthread_join(B, NULL);
    pthread_join(C, NULL);
    pthread_join(D, NULL);
    pthread_join(E, NULL);

    return 0;
}


  • 实现简单的线程池

为什么要使用线程池?

  • 提高程序的执行效率

    如果程序中有大量短时间任务的线程任务,由于创建和销毁线程需要和底层操作系统交互,大量时间都耗费在创建和销毁线程上,因而比较浪费时间,系统效率很低,而线程池里的每一个线程任务结束后,并不会死亡,而是再次回到线程池中成为空闲状态,等待下一个对象来使用,因而借助线程池可以提高程序的执行效率
  • 控制线程的数量,防止程序崩溃

    线程池中线程数量是一定的,可以有效避免出现内存溢出
#include<stdio.h>
#include<pthread.h>
#include<malloc.h>
#include<errno.h>

//要创建任务队列,因此需要定义任务结点结构
typedef struct task{
    void *(*run)(void *arg); //任务回调函数
    void *arg;               //任务回调函数的参数
    struct task *next;       //下一个任务结点
}task_t;

//封装互斥量和条件变量为一个condition_t
typedef struct
{
    pthread_mutex_t  mutex;    //互斥量
    pthread_cond_t   cond;     //条件变量
}condition_t;

//线程池结构
typedef struct
{
    condition_t ready;       //同步、互斥条件
    task_t      *first;      //任务队列头结点指针
    task_t      *last;       //任务队列尾结点指针
    int         count;       //当前线程数量
    int         free;        //当前空闲线程数量
    int         flag;        //销毁线程池标志
    int         max_threads; //线程池中线程总数上限
}threadpool_t;

//互斥量的初始化
void condition_init(condition_t *cond){
    pthread_mutex_init(&cond->mutex ,NULL);
    pthread_cond_init(&cond->cond ,NULL);
}

void condition_lock(condition_t *cond){
    pthread_mutex_lock(&cond->mutex );
}
void condition_unlock(condition_t *cond){
    pthread_mutex_unlock(&cond->mutex );
}
int condition_wait(condition_t *cond){
    return pthread_cond_wait(&cond->cond ,&cond->mutex );
}
int condition_timedwait(condition_t *cnd,const struct timespec *abstime){
    return pthread_cond_timedwait(&cond->cond,&cond->mutex,abstime);
}
void condition_signal(condition_t *cond){
    pthread_cond_signal(&cond->cond );
}
void condition_broadcast(condition_t *cond){
    pthread_cond_broadcast(&cond->cond );
}
void condition_destroy(condition_t *cond){
    pthread_mutex_destroy(&cond->mutex);
    pthread_cond_destroy(&cond->cond);
}


//初始化线程函数
void threadpool_init(threadpool_t *threadpool,int max_threads){
    condition_init(&(threadpool->ready));

    threadpool->first =NULL;//设置任务队列头指针、尾指针
    threadpool->last =NULL;

    threadpool->count =0;
    threadpool->free =0;//初始化当前线程数量和当前空闲线程数量

    threadpool->max_threads =max_threads;//设置线程池中线程总数上限

    threadpool->flag =0;//将销毁线程池标志设置为0,0为不销毁,1为销毁

}

//线程处理函数
void *route(void *arg)
{
    //分离线程
    pthread_detach(pthread_self());

    //获取线程池结构指针
    threadpool_t *threadpool = (threadpool_t *)arg;

    //超时标志
    int timeout = 0;

    while(1)
    {
        //上锁
        condition_lock(&threadpool->ready);

        //超时标志重置
        timeout = 0;

        //当线程创建好,但任务尚未执行时,或者上次任务执行完毕之后再次进入循环,该线程属于空闲线程,所以空闲线程数+1
        threadpool->free++;

        //当任务队列为空并且还没有开始销毁线程时,线程进入等待状态
        while(threadpool->first == NULL && threadpool->flag == 0)
        {
            //获取系统当前时间
            struct timespec wait_time;
            clock_gettime(CLOCK_REALTIME, &wait_time);

            //超时等待时间为3秒钟, 如果3秒内没有被唤醒(当新任务来时,会发送唤醒信号),则返回。
            wait_time.tv_sec += 3;

            int ret = condition_timedwait(&threadpool->ready, &wait_time);
            if(ret == ETIMEDOUT)
            {
                printf("%#x thread timeout!\n", (int)pthread_self());

                //设置超时标志
                timeout = 1;

                break;
            }
        }

        //程序能走到这里,有三种情况: 1.等待超时;  2.任务队列不为空 3.被销毁线程池时发送的广播信号唤醒
        //而无论哪种情况,空闲线程都应该 - 1
        threadpool->free--;

        //第一种情况: 等待超时而且当前无任务可执行,则销毁该线程
        if(timeout == 1 && threadpool->first == NULL)
        {
            threadpool->count--;
            condition_unlock(&threadpool->ready);
            break;
        }

        //第二种情况: 任务队列不为空, 则执行任务
        if(threadpool->first != NULL)
        {
            //取出队头任务
            task_t *task = threadpool->first;
            threadpool->first = threadpool->first->next;

            //为了避免任务执行时间过长,别的线程获取不到锁,所以先解锁后加锁,绕过任务执行函数
            condition_unlock(&threadpool->ready);  //解锁
            task->run(task->arg);                  //执行任务
            condition_lock(&threadpool->ready);    //加锁

            //任务执行完毕释放任务结点
            free(task);
        }
        //第三种情况: 下达销毁线程池命令, 且当前无任务可执行,则销毁线程
        if(threadpool->flag == 1 && threadpool->first == NULL)
        {
            threadpool->count--;

            //说明是线程池最后一个线程, 应该发送一个唤醒信号让销毁线程池的函数继续往下执行
            if(threadpool->count == 0)
            {
                condition_signal(&threadpool->ready);
            }

            //解锁
            condition_unlock(&threadpool->ready);
            break;
        }

        //这里的解锁主要是为任务执行完毕后的解锁
        condition_unlock(&threadpool->ready);
    }

}

//向线程池的任务队列中添加任务
void threadpool_add_task(threadpool_t *threadpool, void *(*run)(void *), void *arg)
{
    //上锁(对队列进行增删查改需要互斥访问)
    condition_lock(&threadpool->ready);

    //构造新任务结点
    task_t *new_task = malloc(sizeof(task_t));
    new_task->run = run;
    new_task->arg = arg;
    new_task->next = NULL;


    //如果任务队列为空
    if(threadpool->first == NULL)
    {
        threadpool->first = new_task;
    }
    else
    {//如果任务队列不为空
        threadpool->last->next = new_task;
    }

    //修正任务队列尾指针
    threadpool->last = new_task;

    //如果有空闲线程,则说明可能有空闲线程在等待执行任务
    if(threadpool->flag > 0)
    {
        //唤醒一个在等待状态的空闲线程
        condition_signal(&threadpool->ready);
    }

        //没有空闲线程, 而且当前总线程数小于线程池最大线程个数,则要创建一个新线程
    else if(threadpool->count < threadpool->max_threads)
    {
        pthread_t tid;

        //创建出新线程并将线程池结构指针作为线程处理函数的参数
        pthread_create(&tid, NULL, route, threadpool);

        //当前线程池总线程数数加1
        threadpool->count ++;
    }

    //解锁
    condition_unlock(&threadpool->ready);
}

//销毁线程池
void threadpool_destroy(threadpool_t *threadpool)
{
    //说明有线程已经在处理销毁线程池,其他线程不能进去
    if(threadpool->flag == 1)
    {
        return;
    }

    //上锁
    condition_lock(&threadpool->ready);
    threadpool->flag = 1;   //标志已经开始销毁线程池

    //如果有空闲线程, 则说明有线程处于等待状态,则要将它们全部唤醒,让它们自行销毁
    if(threadpool->free > 0)
    {
        condition_broadcast(&threadpool->ready);
    }
    else
    {//没有空闲线程,则等待所有正在执行任务的线程执行完自己的任务后自行销毁
        while(threadpool->count > 0)
        {
            //阻塞等待所有正在执行任务的线程执行完毕,当最后一个线程执行完毕时,会给发送一个唤醒信号让销毁线程也退出
            condition_wait(&threadpool->ready);
        }
    }

    //销毁
    condition_destroy(&threadpool->ready);

    //解锁
    condition_unlock(&threadpool->ready);
}



版权声明:本文为weixin_52634688原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。