多线程编程

  • Post author:
  • Post category:其他




线程概述



线程模型

按照线程运行环境和调度者的身份,线程可以分为

内核线程



用户线程


内核线程

运行在内核空间,由内核来调度。当进程的一个内核线程获得cpu使用权时,他就加载并运行一个用户线程。


用户线程

运行在用户空间,由线程库来调用。

一个进程可以拥有M个内核线程和N个用户线程,



M

N

M \le N






M













N





。并且在一个sys里的所有进程中,系统线程和用户线程的比值都是固定的:完全在用户空间实现、完全由内核实现、双层调度。


完全在用户空间实现(和协程其实有点像)

:无需内核支持,线程库管理并执行所有的线程,比如线程的优先级、时间片等。线程利用longjump来切换线程,但是实际上内核依然将整个进程作为最小单位来调度,对外表现出相同的优先级。

优点

是创建和调度无需内核干预,因而速度很快。

缺点

是对于多处理器系统,一个进程的多个线程无法在多个处理机上运行。此外线程优先级只对同一个进程中的其他线程有效,对不同进程中的线程无效。


完全由内核实现

将创建调度任务都交给了内核。线程库无需自行管理任务,优缺点与上面相反。M:N = 1:1


双层调度

在这里插入图片描述



Linux线程库

LinuxThreads和NPTL,都是按照1:1模式实现的(完全由内核实现)。Linux默认使用的是

NPTL



创建和结束线程

定义在

ptherad.h



pthread_create

#include<pthread.h>
int pthread_create(pthread_t *thread, const pthread_attr_t* attr, void*(*start_routine)(void*), void* arg);

typedef unsigned long int pthread_t; 


thread

参数是新线程的标识符,后续pthread_*系列函数通过它来引用新线程。其类型其实是

unsigned long int




attr

参数用来设定新线程的属性,设置为NULL表示使用默认属性。

start_routine



arg

表示线程将运行的函数以及参数列表。

pthread_create调用成功返回0,失败返回错误码(和fork区别)。



pthread_exit

线程函数结束时最好调用如下函数保证安全干净地退出

#include<pthread.h>
void pthread_exit(void* retval);

函数通过

retval

向线程的回收者传递其退出信息,执行完后不会回到调用者且不会执行失败。



pthread_join

一个进程中的线程可以调用pthread_join函数来回收其他线程(若其他线程是可回收的),即

等待其他线程结束

。类似于回收进程的wait和waitpid机制。

int pthread_join(pthread_t thread, void** retval);

thread是目标线程id,retval是目标线程的退出信息。

该函数会阻塞至目标线程结束为止

。成功返回0,失败:

在这里插入图片描述



pthread_cancel

int pthread_cancel(pthread_t thread);

用来异常终止一个线程。

thread

是目标线程的标识符。

接收到取消请求的线程可以决定是否取消以及如何取消。

int pthread_setcancelstate(int state, int* oldstate);
int pthread_setcanceltype(int type, int* oldtype);


state

设置取消状态(是否允许取消),

oldstate

记录原来的取消状态(和setnonblocking里返回oldopt一个意思)。


type

设置取消类型(如何取消),

oldtype

记录原来的取消类型。

在这里插入图片描述



线程属性

#include<bits/pthreadtypes.h>
#define __SIZEOF_PTHREAD_ATTR_T 36
typedef union{
char __size[__SIZEOF_PTHREAD_ATTR_T];
long int __align;
} pthread_attr_t;

可见线程的属性都包含在一个字符数组中,线程库定义了一系列函数来操作pthread_attr_t类型的变量。

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述



POSIX信号量

Linux上有两组信号量API,多进程里的System V IPC信号量(semget…等),以及线程里的POSIX信号量。接口很接近但是不保证能互换。

POSIX信号量格式是

sem_*

#include<semaphore.h>
int sem_init(sem_t* sem, int pshared, unsigned int value);
int sem_destroy(sem_t* sem);
// P操作
int sem_wait(sem_t* sem);
int sem_trywait(sem_t* sem);
// V操作
int sem_post(sem_t* sem);


sem

指向操作的信号量。

在这里插入图片描述



互斥锁



基础API

#include<pthread.h>
// init
int pthread_mutex_init(pthread_mutex_t* mutex, const pthread_mutexattr_t* mutexattr);
// init
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int pthread_mutex_destroy(pthread_mutex_t* mutex);
int pthread_mutex_lock(pthread_mutex_t* mutex);
int pthread_mutex_trylock(pthread_mutex_t* mutex);
int pthread_mutex_unlock(pthread_mutex_t* mutex);


mutex

指向要操作的目标互斥锁,互斥锁类型是

struct pthread_mutex_t




pthread_mutex_init

初始化一个互斥锁,

mutexattr

指定互斥锁的属性,NULL表示默认属性。

另外可以使用宏初始化,其实是默认将互斥锁的每个字段设置为0.

在这里插入图片描述



互斥锁属性

# define __SIZEOF_PTHREAD_MUTEX_T 40
typedef union
{
  struct __pthread_mutex_s __data;
  char __size[__SIZEOF_PTHREAD_MUTEX_T];
  long int __align;
} pthread_mutex_t;

线程库提供了一系列函数来操作

pthread_mutex_t

// 初始化互斥锁对象
int pthread_mutexattr_init(pthread_mutexattr__t* attr);
// 销毁互斥锁属性对象
int pthread_mutexattr_destroy(pthread_mutexattr_t* attr);
// 获取和设置互斥锁的pshared属性
int pthread_mutexattr_getpshared(const pthread_mutexattr__t* attr, int* pshared);
int pthread_mutexattr_setpshared(const pthread_mutexattr_t* attr, int* pshared);
// 获取和设置互斥锁的type属性
int pthread_mutexattr_gettype(const pthread_mutexattr__t* attr, int* type);
int pthread_mutexattr_settype(const pthread_mutexattr_t* attr, int* type);


pshared

指定是否允许跨进程共享互斥锁


  • PSHARED_PROCESS_SHARED

    互斥锁可以被跨进程共享

  • PTHREAD_PROCESS_PRIVATE

    互斥锁只能被和锁的初始化线程隶属的同一个进程的线程共享


type

指定互斥锁的类型

在这里插入图片描述



Example

使用

pthread_mutex_t



pthread_t

求和(采取累加方式),来测试多线程加速计算。

示例了如何编写多线程程序以及如何传递参数。

单线程:

test1.cpp

int main(){
    unsigned long long int ans = 0;
    for(unsigned long long i = 0; i < 1000000; i++){
        ans += i;
    }
    cout<<ans<<endl;
}

运行台输入

time ./test1

:

real    0m0.017s
user    0m0.017s
sys     0m0.001s

多线程计算:

test2.cpp

#include<time.h>
#include<iostream>
#include<unistd.h>
#include<pthread.h>
#include<sys/time.h>
using std::cout;
using std::endl;
// 使用struct往线程函数里传值
struct parameter{
    int begin, end;
    parameter(int a, int b) : begin(a), end(b) {}
};

// 临界值和对应的锁
unsigned long long int ans = 0;
pthread_mutex_t ans_mutex;

void* func(void* argc){
    parameter* val = (parameter*)argc;
    unsigned long long int v = 0;
    for(unsigned long long i = val->begin; i < val->end; i++){
        v += i;
    }
    pthread_mutex_lock(&ans_mutex);
    ans += v;
    pthread_mutex_unlock(&ans_mutex);
    pthread_exit(NULL);
}

int main(){
	// 初始化锁
    pthread_mutex_init(&ans_mutex, NULL);
    pthread_t id1;
    pthread_t id2;
    pthread_create(&id1, NULL, func, new parameter(0, 500000));
    pthread_create(&id2, NULL, func, new parameter(500000, 1000000));
    
    pthread_join(id1, NULL);
    pthread_join(id2, NULL);
    cout<<ans<<endl;
    pthread_mutex_destroy(&ans_mutex);
}


注意:原生Linux函数里并不带pthread,所以要在编译指令里加 -lpthread



g++ test2.cpp -o test2 -lpthread


测试:


time ./test2

real    0m0.010s
user    0m0.015s
sys     0m0.000s



条件变量

互斥锁一般用于同步线程对共享数据的访问,条件变量一般用于在线程间同步共享变量的值。条件变量是

线程间通知机制,当某个共享数据的值达到某个范围时,唤醒等待这个共享数据的线程

#include<pthread.h>
// 初始化条件变量
int pthread_cond_init(pthread_cond__t* cond, const pthread_condattr_t* cond_attr);
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
// 销毁条件变量,如果销毁一个正在被等待的条件变量会返回EBUSY
int pthread_cond_destroy(pthread_cond_t* cond);
// 以广播的方式唤醒所有等待目标条件变量的线程
int pthread_cond_broadcast(pthread_cond_t* cond);
// 唤醒一个等待目标条件变量的线程
int pthread_cond_signal(pthread_cond_t* cond);
// 等待目标条件变量
int pthread_cond_wait(pthread_cond_t* cond, pthread_mutex_t* mutex);


如何唤醒特定的线程:


在这里插入图片描述


pthread_cond_wait

:等待目标条件变量,mutex参数是用于保护条件变量的互斥锁,以确保pthread_cond_wait操作的原子性。在调用pthread_cond_wait前,必须确保互斥锁mutex已经加锁,否则会导致不可预期的后果。

pthread_cond_wait函数执行时,首先把调用线程放进条件变量的等待队列中,然后将互斥锁mutex解锁。可见,在pthread_cond_wait开始执行到其调用线程被放入条件变量的等待队列间,pthread_cond_broadcast和pthread_cond_signal等函数不会修改条件变量。

所以,pthread_cond_wait函数不会错过目标条件变量的任何变化,pthread_cond_wait成功返回时,mutex将被再次锁上。



封装成类

#ifndef LOCKER_H
#define LOCKER_H
#include<exception>
#include<pthread.h>
#include<semaphore.h>

class sem {
private:
	sem_t m_sem;
public:
	sem(int v) {
		if (sem_init(&m_sem, 0, v) != 0) {
			throw std::exception();
		}
	}

	sem(int v = 0) {
		if (sem_init(&m_sem, 0, v) != 0) {
			throw std::exception();
		}
	}

	~sem() {
		sem_destroy(&m_sem);
	}

	bool wait() {
		return sem_wait(&m_sem) == 0;
	}

	bool post() {
		return sem_post(&m_sem) == 0;
	}
};

class locker {
private:
	pthread_mutex_t mutex;
public:
	locker() {
		if (pthread_mutex_init(&mutex, NULL) != 0) {
			throw std::exception();
		}
	}

	~locker() {
		pthread_mutex_destroy(&mutex);
	}

	bool lock() {
		return pthread_mutex_lock(&mutex) == 0;
	}

	bool unlock() {
		return pthread_mutex_unlock(&mutex) == 0;
	}
};

class cond {
private:
	pthread_cond_t m_cond;
	pthread_mutex_t cond_mutex;
public:
	cond() {
		if (pthread_mutex_init(&cond_mutex) != 0) {
			throw std::exception();
		}
		if (pthread_cond_init(&m_cond, NULL) != 0) {
			throw std::exception();
		}
	}

	~cond() {
		pthread_cond_destroy(&m_cond);
		pthread_mutex_destroy(&cond_mutex);
	}

	bool wait() {
		int ret = 0;
		pthread_mutex_lock(&cond_mutex);
		ret = pthread_cond_wait(&m_cond, &cond_mutex);
		pthread_mutex_unlock(&cond_mutex);
		return  ret == 0;
	}

	bool signal() {
		return pthread_cond_signal(&m_cond) == 0;
	}
};

#endif // !LOCKER_H



多线程环境



可重入函数

如果一个函数

能被多个线程同时被调用且不发生竞态条件

,就称之为

线程安全的

,或者它是

可重入函数



Linux大多库函数都是可重入的,不可重入的主要是因为内部使用了静态变量,但是一般都有可重入版本:在函数名尾部加

_r



在多线程程序中调用一定要使用可重入版本。



线程和进程

**Problem:**在多线程程序的某个线程调用fork函数,那么新创建的子进程是否会自动创建和父进程相同数量的线程?




子进程只拥有一个执行线程,是调用fork的哪个线程的完整赋值。

void* func(void *argc){
    int v = (*((int*)argc));
    cout<<v<<endl;
    pthread_exit(NULL);
}

int main(){
    for(int i = 0; i < 5; i++){
        pthread_t id;
        int v = i;
        pthread_create(&id, NULL, func, &v);
    }
    pid_t id = fork();
    if(id == 0){
        cout<<"Child Over"<<endl;
    }else{
        cout<<"P Over"<<endl;
    }
}
/*
0
1
3
3
4
P Over
Child Over
*/

子进程会自动继承父进程中的互斥锁(条件变量与之类似)状态——即父进程中已经被加锁的互斥锁在子进程里也是加锁的。

但是子进程可能不清楚父进程的互斥锁的状态:互斥锁可能被加锁了,但是并非由调用fork的线程锁住的,而是由其他线程锁住的,这种情况再次对互斥锁加锁可能导致死锁。

pthread_mutex_t mutex;

void* func(void *argc){
    cout<<"In thread, wanna lock the locker"<<endl;
    pthread_mutex_lock(&mutex);
    cout<<"In thread, got the locker"<<endl;
    sleep(5);
    pthread_mutex_unlock(&mutex);
    cout<<"In thread, give up the locker"<<endl;
    pthread_exit(NULL);
}

int main(){
    pthread_mutex_init(&mutex, NULL);
    pthread_t id;
    pthread_create(&id, NULL, func, NULL);
    sleep(1);

    pid_t pid = fork();
    if(pid == 0){
        cout<<"Child wanna get the locker"<<endl;
        pthread_mutex_lock(&mutex);
        cout<<"Child wanna got the locker"<<endl;
        pthread_mutex_unlock(&mutex);
        exit(0);
    }else{
        waitpid(-1, NULL, 0);
    }

    pthread_join(id, NULL);
    pthread_mutex_destroy(&mutex);
    return 0;
}
/*
In thread, wanna lock the locker
In thread, got the locker
Child wanna get the locker
In thread, give up the locker
*/

pthread提供了专门的函数

pthread_atfork

,确保fork调用后父进程和子进程都有一个清楚的锁状态

int pthread_atfork(void(*prepare)(void), void(*parent)(void), boid (*child)(void));

该函数建立三个fork句柄来清理互斥锁的状态。


prepare

句柄将在fork调用创建出子进程之前被执行,它可以锁住所有父进程中的互斥锁。


parent

句柄则是在fork调用创建出子进程之后,fork返回之前在父进程中执行。作用是释放所有在prepare句柄中被锁住的互斥锁。


child

句柄是在fork返回前,在子进程中执行,用于释放所有在prepare中被锁住的互斥锁。

所以在之前的代码fork()前加上:

void pre(){
    pthread_mutex_lock(&mutex);
}

void infork(){
    pthread_mutex_unlock(&mutex);
}

pthread_atfork(pre, infork, infork);
pid_t pid = fork();
// ......



线程和信号

每个线程都可以独立设置信号掩码。

#include<pthread.h>
#include<signal.h>
int pthread_sigmask(int how, const sigset_t* newmask, sigset_t* oldmask);


newmasl

参数指定新的信号掩码,

oldmask

输出保存以前的旧的信号掩码(与

set_nonblocking

里面

return oldopt

一个意思),

how

参数指定设置进程信号掩码的方式

进程中的所有线程都共享该进程的信号,所以线程库根据线程掩码决定将信号发送给具体的哪个线程。

所以如果我们在每个子线程中都单独设置信号掩码,就很容易导致逻辑错误。

所有线程

共享信号处理函数

,所以在一个线程中设置了信号处理函数,其他线程的对应信号处理函数也会被覆盖。所以我们应该单独设置一个线程来处理所有的信号:

  • ①在主线程创建出其他子线程前就调用pthread_sigmask设置好信号掩码,所有新的子线程都将自动继承这个信号掩码。那么之后的所有子线程都不会响应被屏蔽的信号了。
  • ②在线程中调用如下函数并等待信号并处理之:
int sigwait(const sigset_t* set, int* sig);

set参数指定等待的信号集,我们可以将其指定为第一步中创建的信号掩码,那么就只有这个线程会响应对应的信号了。

sig

这个整数参数存储该函数返回的信号值。当sigwait正确返回,就可以对接收到的信号进行处理。


当我们使用sigwait时,就不应该再为信号设置信号处理函数了,因为当程序接收到信号时,二者中只有一个能起作用



example

inline void handle_err_en(int en, const char* msg) {
	errno = en;
	perror(msg);
	exit(EXIT_FAILURE);
}

void* sig_thread(void* arg) {
	// 仅在该线程处理如下信号
	sigset_t* sigs = (sigset_t*)arg;
	int s, sig;
	for (;;) {
		s = sigwait(sigs, &sig);
		if (s != 0) {
			handle_err_en(s, "sigwait");
		}
		printf("Signal handling thread got signal %d", sig);
	}
}

int main() {
	pthread_t th;
	sigset_t sigs;
	int s;

	sigemptyset(&sigs);
	sigaddset(&sigs, SIGQUIT);
	sigaddset(&sigs, SIGUSR1);
	// 其他线程屏蔽该信号集
	s = pthread_sigmask(SIG_BLOCK, &sigs, NULL);
	if (s != 0) {
		handle_err_en(s, "pthread_sigmask");
	}
	s = pthread_create(&th, NULL, sig_thread, &sigs);
	if (s != 0) {
		handle_err_en(s, "pthread_create");
	}

	return 0;
}



pthread_kill

发送信号给指定线程


int pthread_kill(pthread_t th, int sig);



sig

指定待发送的信号,如果为0,则不发送信号,但是仍然执行错误检测,可以利用此来完成线程是否存在的检测。

#include<pthread.h>
#include<signal.h>
#include<stdio.h>
#include<errno.h>
#include<unistd.h>
void* func(void* arg){
    sigset_t sigs;
    sigaddset(&sigs, SIGALRM);
    int i = 0;
    bool run = true;
    while(run){
        printf("wait for signal\n");
        int sig;
        sigwait(&sigs, &sig);
        if(sig == SIGALRM){
            if(i++ == 1) run = false;
            printf("Recv sig %d\n", sig);
        }
    }
    pthread_exit(NULL);
}

int main(){
    int ret = -1;
    pthread_t th1;
    pthread_create(&th1, NULL, func, NULL);
    sleep(5);
    for(int i = 0; i < 4; i++){
    	// 发送两次后,子线程会退出,后续会失败
        sleep(3);
        ret = pthread_kill(th1, SIGALRM);
        if(ret != 0) printf("Error code %d\n", errno);
    }
    pthread_join(th1, NULL);
    printf("Over\n");
    return 0;
}
/*
wait for signal
Recv sig 14
wait for signal
Recv sig 14
Error code 0
Error code 0
Over
*/



版权声明:本文为xzzxwmsl原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。