socket多线程编程实现并发服务器
一、多线程简介
1、什么是线程?
线程在操作系统原理中是这样描述的:线程是进程的一条执行路径。操作系统为每道进程的运行分配‘处理器’与‘内存’资源,但是每次都这样开销太大,随后才设计出‘单位’更小的线程,使用进程+线程的方式,即进程分配‘内存’,线程分配‘处理器’控制权,一个进程里面可有多个线程,进程的内存由线程共享。
所有的线程都是在同一进程空间运行,这也意味着多条线程将共享该进程中的全部系统资源,如虚拟地址空间,文件描述符和信号处理等等。但同一进程中的多个线程有各自的调用栈(call stack),自己的寄存器环境(register context),自己的线程本地存储(thread-local storage)。 一个进程可以有很多线程,每条线程并行执行不同的任务。
2、主线程和子线程
一个进程创建后,会首先生成一个缺省的线程,通常称这个线程为主线程(或称控制线程),C/C++程序中,主线程就是通过main函数进入的线程,由主线程调用pthread_create()创建的线程称为子线程,子线程也可以有自己的入口函数,该函数由用户在创建的时候指定。每个线程都有自己的线程ID,可以通过pthread_self()函数获取。最常见的线程模型中,除主线程较为特殊之外,其他线程一旦被创建,相互之间就是对等关系,不存在隐含的层次关系。每个进程可创建的最大线程数由具体实现决定。
无论在windows中还是Posix中,主线程和子线程的默认关系是:
无论子线程执行完毕与否,一旦主线程执行完毕退出,所有子线程执行都会终止
。这时整个进程结束或僵死,部分线程保持一种终止执行但还未销毁的状态,而进程必须在其所有线程销毁后销毁,这时进程处于僵死状态。线程函数执行完毕退出,或以其他非常方式终止,线程进入终止态,但是为线程分配的系统资源不一定释放,可能在系统重启之前,一直都不能释放,终止态的线程,仍旧作为一个线程实体存在于操作系统中,什么时候销毁,取决于线程属性。在这种情况下,主线程和子线程通常定义以下两种关系:
-
可会合(joinable)
:这种关系下,主线程需要明确执行等待操作,在子线程结束后,主线程的等待操作执行完毕,子线程和主线程会合,这时主线程继续执行等待操作之后的下一步操作。主线程必须会合可会合的子线程。在主线程的线程函数内部调用子线程对象的wait函数实现,即使子线程能够在主线程之前执行完毕,进入终止态,也必须执行会合操作,否则,系统永远不会主动销毁线程,分配给该线程的系统资源也永远不会释放。 -
相分离(detached)
:表示子线程无需和主线程会合,也就是相分离的,这种情况下,子线程一旦进入终止状态,这种方式常用在线程数较多的情况下,有时让主线程逐个等待子线程结束,或者让主线程安排每个子线程结束的等待顺序,是很困难或不可能的,所以在并发子线程较多的情况下,这种方式也会经常使用。
线程的分离状态决定一个线程以什么样的方式来终止自己,在默认的情况下,线程是非分离状态的,这种情况下,原有的线程等待创建的线程结束,只有当pthread_join函数返回时,创建的线程才算终止,释放自己占用的系统资源,而分离线程没有被其他的线程所等待,自己运行结束了,线程也就终止了,马上释放系统资源。
3.Pthread
Linux下的多线程遵从POSIX线程接口,简称pthread,在pthread库中提供。
pthread_create():创建一个线程
pthread_exit():退出一个线程
pthread_jion():阻塞当前线程,直到另一个线程执行结束
pthread_attr_init():设置线程是否脱离属性
pthread_kill():给线程发送kill信号
同步函数:
pthread_mutex_lock():互斥加锁
pthread_mutex_unlock():互斥锁解锁
pthread_cond_init():初始化条件变量
pthread_cond_signal():发送信号唤醒进程
pthread_cond_wait():等待条件变量的特殊事件发生
二、创建子线程
1.pthread_create()函数
函数原型:
#include <pthread.h>
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void
*arg);
参数介绍:
- 第一个参数thread是一个pthread_t类型的指针,它用来返回该线程的线程ID。每个线程都能够通过pthread_self()来获取自己的线程ID(pthread_t类型)
- 第二个参数是线程的属性attr,其类型是 pthread_attr_t 结构体类型,其定义如下:
typedef struct
{
int detachstate; //线程的分离状态
int schedpolicy; //线程调度策略
struct sched_param schedparam; //线程的调度参数
int inheritsched; //线程的继承性
int scope; //线程的作用域
size_t guardsize; //线程栈末尾的警戒缓冲区大小
int stackaddr_set;
void *stackaddr; //线程栈的位置
size_t stacksize; //线程栈的大小
}pthread_attr_t;
/*
对于这些属性,我们需要设定的是线程的分离状态,如果有必要,也需修改每个线程的栈大小。
每个线程创建后默认是joinable状态,该状态需要主线程调用 pthread_join 等待它退出,否则,
子线程在结束时,内存资源不能得到释放造成内存泄漏。所以我们创建线程时一般会将线程设置为
分离状态,具体有两种方法:
1. 线程里面调用 pthread_detach(pthread_self()) 函数,这个方法最简单
2. 在创建线程的属性设置里设置PTHREAD_CREATE_DETACHED属性
*/
- 第三个参数start_routine是一个函数指针,它指向的函数原型是 void *func(void *),这是所创建的子线程要执行的任务(回调函数,返回值是void *类型,形参是void *)
- 第四个参数arg是传给所调用的函数的参数,如果有多个参数要传递的话,就需要将这多个参数封装到一个结构体中,再传入函数中。
2.线程的属性:
前面我们说到过线程是有属性的,这个属性由一个线程属性对象来描述。线程属性对象由pthread_attr_init()接口初始化,并由pthread_attr_destory()来销毁,它们的完整定义是:
int pthread_attr_init(pthread_attr_t *attr);
int pthread_attr_destory(pthread_attr_t *attr);
那么线程拥有哪些属性呢?
一般地,Linux下的线程有:
分离属性、调度属性、堆栈大小属性和满占警戒区大小属性
。
下面我们就分别来具体介绍这些属性:
1.分离属性
前面说过线程能够被合并和分离,分离属性就是让线程在创建之前就决定它应该是分离的。如果设置了这个属性,就没有必要调用pthread_join()或pthread_detach()来回收线程资源了。
设置分离属性的接口是pthread_attr_setdetachstate(),它的完整定义是:
pthread_attr_setdetachstat(pthread_attr_t *attr, int detachstate);
它的第二个参数有两个取值:PTHREAD_CREATE_DETACHED(分离的)和PTHREAD_CREATE_JOINABLE(可合并的,也是默认属性)。
下面代码演示了这个属性的使用。
#include <stdio.h>
#include <pthread.h>
……
int main( int argc, char *argv[] )
{
pthread_attr_t attr;
pthread_t th;
……
pthread_attr_init( &attr );
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_create( &th, &attr, thread, NULL );
……
}
2.调度属性
线程的调度属性有三个,分别是:
算法、优先级和继承权
。
1.算法
其中Linux提供的线程调度算法有三个:
轮询、先进先出和其它
。其中轮询和先进先出调度算法是POSIX标准所规定,而其他则代表采用Linux自己认为更合适的调度算法,所以默认的调度算法也就是其它了。
轮询和先进先出调度算法都属于
实时调度算法。
-
轮询
指的是时间片轮转,当线程的时间片用完,系统将重新分配时间片,并将它放置在就绪队列尾部,这样可以保证具有相同优先级的轮询任务获得公平的CPU占用时间; -
先进先出
就是先到先服务,一旦线程占用了CPU则一直运行,直到有更高优先级的线程出现或自己放弃。
设置线程调度算法的接口是pthread_attr_setschedpolicy(),它的完整定义是:
pthread_attr_setschedpolicy(pthread_attr_t *attr, int policy);
它的第二个参数有三个取值:SCHED_RR(轮询)、SCHED_FIFO(先进先出)和SCHED_OTHER(其它)。
2.优先级:
Linux的线程优先级与进程的优先级不一样,进程优先级我们后面再说。
Linux的线程优先级是从1到99的数值,数值越大代表优先级越高。
而且要注意的是,只有采用SHCED_RR或SCHED_FIFO调度算法时,优先级才有效。对于采用SCHED_OTHER调度算法的线程,其优先级恒为0。
设置线程优先级的接口是pthread_attr_setschedparam(),它的完整定义是:
struct sched_param {
int sched_priority;
}
int pthread_attr_setschedparam(pthread_attr_t *attr, struct sched_param *param);
sched_param结构体的sched_priority字段就是线程的优先级了。
此外,即便采用SCHED_RR或SCHED_FIFO调度算法,线程优先级也不是随便就能设置的。首先,进程必须是以root账号运行的;其次,还需要放弃线程的继承权。什么是继承权呢?
3.继承权
继承权就是当创建新的线程时,新线程要继承父线程(创建者线程)的调度属性。如果不希望新线程继承父线程的调度属性,就要放弃继承权。
设置线程继承权的接口是pthread_attr_setinheritsched(),它的完整定义是:
int pthread_attr_setinheritsched(pthread_attr_t *attr, int inheritsched);
它的第二个参数有两个取值:PTHREAD_INHERIT_SCHED(拥有继承权)和PTHREAD_EXPLICIT_SCHED(放弃继承权)。新线程在默认情况下是拥有继承权。
以下代码能够演示不同调度算法和不同优先级下各线程的行为,同时也展示如何修改线程的调度属性。
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#define THREAD_COUNT 12
void show_thread_policy( int threadno )
{
int policy;
struct sched_param param;
pthread_getschedparam( pthread_self(), &policy, param );
switch( policy ){
case SCHED_OTHER:
printf( "SCHED_OTHER %d\n", threadno );
break;
case SCHED_RR:
printf( "SCHDE_RR %d\n", threadno );
break;
case SCHED_FIFO:
printf( "SCHED_FIFO %d\n", threadno );
break;
default:
printf( "UNKNOWN\n");
}
}
void* thread( void *arg )
{
int i, j;
long threadno = (long)arg;
printf( "thread %d start\n", threadno );
sleep(1);
show_thread_policy( threadno );
for( i = 0; i < 10; ++i ) {
for( j = 0; j < 100000000; ++j ){}
printf( "thread %d\n", threadno );
}
printf( "thread %d exit\n", threadno );
return NULL;
}
int main( int argc, char *argv[] )
{
long i;
pthread_attr_t attr[THREAD_COUNT];
pthread_t pth[THREAD_COUNT];
struct sched_param param;
for( i = 0; i < THREAD_COUNT; ++i )
pthread_attr_init( &attr[i] );
for( i = 0; i < THREAD_COUNT / 2; ++i )
{
param.sched_priority = 10;
pthread_attr_setschedpolicy( &attr[i], SCHED_FIFO ); //设置线程的调度策略
pthread_attr_setschedparam( &attr[i], param ); // 设置静态优先级
pthread_attr_setinheritsched( &attr[i], PTHREAD_EXPLICIT_SCHED ); //设置线程是否继承父线程调度策略
}
for( i = THREAD_COUNT / 2; i < THREAD_COUNT; ++i )
{
param.sched_priority = 20;
pthread_attr_setschedpolicy( &attr[i], SCHED_FIFO );
pthread_attr_setschedparam( &attr[i], param );
pthread_attr_setinheritsched( &attr[i], PTHREAD_EXPLICIT_SCHED );
}
for( i = 0; i < THREAD_COUNT; ++i )
pthread_create( &pth[i], &attr[i], thread, (void*)i );
for( i = 0; i < THREAD_COUNT; ++i )
pthread_join( pth[i], NULL );
for( i = 0; i < THREAD_COUNT; ++i )
pthread_attr_destroy( &attr[i] );
return 0;
}
这段代码中含有一些没有介绍过的接口,大家可自行查看它们的具体用法和作用。
4.堆栈大小属性
从前面的这些例子中可以了解到,线程的主函数与程序的主函数main()有一个很相似的特性,那就是可以拥有局部变量。虽然同一个进程的线程之间是共享内存空间的,但是它的局部变量确并不共享。原因就是局部变量存储在堆栈中,而不同的线程拥有不同的堆栈。Linux系统为每个线程默认分配了8MB的堆栈空间,如果觉得这个空间不够用,可以通过修改线程的堆栈大小属性进行扩容。
修改线程堆栈大小属性的接口是pthread_attr_setstacksize(),它的完整定义为:
int pthread_attr_setstacksize(pthread_attr_t *attr, size_t stacksize);
它的第二个参数就是堆栈大小了,以字节为单位。需要注意的是,线程堆栈不能小于16KB,而且尽量按4KB(32位系统)或2MB(64位系统)的整数倍分配,也就是内存页面大小的整数倍。此外,修改线程堆栈大小是有风险的,
如果你不清楚你在做什么,最好别动它
。
5.满栈警戒区属性
既然线程是有堆栈的,而且还有大小限制,那么就一定会出现将堆栈用满的情况。线程的堆栈用满是非常危险的事情,因为这可能会导致对内核空间的破坏,一旦被有心人士所利用,后果也不堪设想。为了防治这类事情的发生,Linux为线程堆栈设置了一个满栈警戒区。这个区域一般就是一个页面,属于线程堆栈的一个扩展区域。一旦有代码访问了这个区域,就会发出SIGSEGV信号进行通知。
虽然满栈警戒区可以起到安全作用,但是也有弊病,就是会白白浪费掉内存空间,对于内存紧张的系统会使系统变得很慢。所有就有了关闭这个警戒区的需求。同时,如果我们修改了线程堆栈的大小,那么系统会认为我们会自己管理堆栈,也会将警戒区取消掉,如果有需要就要开启它。
修改满栈警戒区属性的接口是pthread_attr_setguardsize(),它的完整定义为:
int pthread_attr_setguardsize(pthread_attr_t *attr, size_t guardsize);
它的第二个参数就是警戒区大小了,以字节为单位。与设置线程堆栈大小属性相仿,应该尽量按照4KB或2MB的整数倍来分配。当设置警戒区大小为0时,就关闭了这个警戒区。
虽然栈满警戒区需要浪费掉一点内存,但是能够极大的提高安全性,所以这点损失是值得的。而且一旦修改了线程堆栈的大小,一定要记得同时设置这个警戒区。
下面我们使用一个简单的例子来讲解线程创建的基本使用和相关概念:
vim thread.c
1 #include <stdio.h>
2 #include <string.h>
3 #include <errno.h>
4 #include <stdlib.h>
5 #include <unistd.h>
6 #include <pthread.h>
7
8 void *thread_worker1(void *args);
9 void *thread_worker2(void *args);
10
11 int main(int argc, char *argv[])
12 {
13 int shared_var = 1000;
14 pthread_t tid;
15 pthread_attr_t thread_attr;
16
17
18 if (pthread_attr_init(&thread_attr))
19 {
20 printf("pthread_attr_init() failure: %s\n", strerror(errno));
21 return -1;
22 }
23
24 if (pthread_attr_setstacksize(&thread_attr, 120*1024)) //重新设置子线程栈大小
25 {
26 printf("pthread_attr_setstacksize() failure: %s\n", strerror(errno));
27 return -1;
28 }
29 //设置子线程与主线程为相分离的关系
30 if (pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED))
31 {
32 printf("pthread_attr_setdetachstate() failure: %s\n", strerror(errno));
33 return -1;
34 }
35 //创建第一个子线程,去执行thread_worker1()
36 pthread_create(&tid, &thread_attr, thread_worker1, &shared_var);
37 printf("Thread worker1 tid[%ld] created ok\n", tid);
38 //创建第二个子线程,去执行thread_worker2()
39 pthread_create(&tid, NULL, thread_worker2, &shared_var);
40 printf("Thread worker2 tid[%ld] created ok\n", tid);
41
42 pthread_attr_destroy(&thread_attr); //销毁为线程重新设置的属性
43
44 /* 第二个子线程默认是joinable,在这里阻塞,等待与子线程会合 */
45 pthread_join(tid, NULL);
46
47
48 while (1)
49 {
50 printf("Main/Control thread shared_var: %d\n", shared_var);
51 sleep(10);
52 }
53 }
54
55 void *thread_worker1(void *args)
56 {
57 int *ptr = (int *)args;
58
59 if (!args)
60 {
61 printf("%s() get invalid arguments\n", __FUNCTION__);
62 pthread_exit(NULL);
63 }
64
65 printf("Thread workder 1 [%ld] start running...\n", pthread_self());
66
67 while (1)
68 {
69 printf("+++: %s before shared_var++: %d\n", __FUNCTION__, *ptr);
70 *ptr += 1;
71 sleep(2);
72 printf("+++: %s after sleep shared_var: %d\n", __FUNCTION__, *ptr);
73 }
74
75 printf("Thread workder 1 exit...\n");
76
77 return NULL;
78 }
79 //宏__FUNCTION__用来获取函数名
80 void *thread_worker2(void *args)
81 {
82 int *ptr = (int *)args;
83
84 if (!args)
85 {
86 printf("%s() get invalid arguments\n", __FUNCTION__);
87 pthread_exit(NULL);
88 }
89
90 printf("Thread workder 2 [%ld] start running...\n", pthread_self());
91
92 while (1)
93 {
94 printf("---: %s before shared_var++: %d\n", __FUNCTION__, *ptr);
95 *ptr += 1;
96 sleep(2);
97 printf("---: %s after sleep shared_var: %d\n", __FUNCTION__, *ptr);
98 }
99
100 printf("Thread workder 2 exit...\n");
101
102 return NULL;
103 }
现在我们编译运行一下程序看看效果,注意对于多线程编程在编译时,一定要加上-lpthread 选项告诉链接器在链接的时候要连
接pthread库:
gcc thread.c -o thread -lpthread
Thread worker1 tid[1993757808] created ok
Thread workder 1 [1993757808] start running...
+++: thread_worker1 before shared_var++: 1000
Thread worker2 tid[1993634928] created ok
Thread workder 2 [1993634928] start running...
---: thread_worker2 before shared_var++: 1001
+++: thread_worker1 after sleep shared_var: 1002
+++: thread_worker1 before shared_var++: 1002
---: thread_worker2 after sleep shared_var: 1002
---: thread_worker2 before shared_var++: 1003
+++: thread_worker1 after sleep shared_var: 1004
+++: thread_worker1 before shared_var++: 1004
---: thread_worker2 after sleep shared_var: 1005
---: thread_worker2 before shared_var++: 1005
程序分析:
- 36行和39行调用pthread_create()函数用来创建了两个子线程;
- 代码15行我们定义了创建线程的属性变量thread_attr ,在对该属性进行设置前,我们需要先调用pthread_attr_init函数初始化它(第18行),在第24行我们设置线程的栈大小为120K,同时在第30行设置线程的属性为分离状态。第36行创建线程时使用了该属性创建线程,这时创建的子进程就是分离状 态了。线程属性在使用完之后,需调用 pthread_attr_destroy (第45行)把它摧毁释放;
- 而代码39行创建子线程时并没有使用该线程,同时在thread_worker2()里并没有调用pthread_detach()将线程设置为分离状态。这时就需要主线程在45行处调用pthread_join()等待第二个子线程退出。因此主线程也就阻塞在这里,从而不会往下继续执行;
- 在创建两个线程时,我们都通过第四个参数将主线程栈中的 shared_var变量地址传给了子线程,因为所有线程都是在同一进程空间中运行,而只是子线程有自己独立的栈空间,所以这时所有子线程都可以访问主线程空间的shared_var变量。
三.线程本地存储
内线程之间可以共享内存地址空间,线程之间的数据交换可以非常快捷,这是线程最显著的优点。但是多个线程访问共享数据,需要昂贵的同步开销,也容易造成与同步相关的BUG,更麻烦的是有些数据根本就不希望被共享,这又是缺点。可谓:“成也萧何,败也萧何”,说的就是这个道理。
C程序库中的errno是个最典型的一个例子。errno是一个全局变量,会保存最后一个系统调用的错误代码。在单线程环境并不会出现什么问题。但是在多线程环境,由于所有线程都会有可能修改errno,这就很难确定errno代表的到底是哪个系统调用的错误代码了。这就是有名的“非线程安全(Non Thread-Safe)”的。
此外,从现代技术角度看,在很多时候使用多线程的目的并不是为了对共享数据进行并行处理(在Linux下有更好的方案,后面会介绍)。更多是由于多核心CPU技术的引入,为了充分利用CPU资源而进行并行运算(不互相干扰)。换句话说,大多数情况下每个线程只会关心自己的数据而不需要与别人同步。
为了解决这些问题,可以有很多种方案。比如使用不同名称的全局变量。但是像errno这种名称已经固定了的全局变量就没办法了。在前面的内容中提到在线程堆栈中分配局部变量是不在线程间共享的。但是它有一个弊病,就是线程内部的其它函数很难访问到。
目前解决这个问题的简便易行的方案是
线程本地存储,即Thread Local Storage,简称TLS
。利用TLS,errno所反映的就是本线程内最后一个系统调用的错误代码了,也就是线程安全的了。
Linux提供了对TLS的完整支持,通过下面这些接口来实现:
int pthread_key_create(pthread_key_t *key, void (*destructor)(void*));
int pthread_key_delete(pthread_key_t key);
void* pthread_getspecific(pthread_key_t key);
int pthread_setspecific(pthread_key_t key, const void *value);
pthread_key_create()接口用于创建一个线程本地存储区。
- key : 第一个参数用来返回这个存储区的句柄,需要使用一个全局变量保存,以便所有线程都能访问到。
- *destructor : 第二个参数是线程本地数据的回收函数指针,如果希望自己控制线程本地数据的生命周期,这个参数可以传递NULL。
pthread_key_delete()接口用于回收线程本地存储区。其唯一的参数就要回收的存储区的句柄。
pthread_getspecific()和pthread_setspecific()这个两个接口分别用于获取和设置线程本地存储区的数据。这两个接口在不同的线程下会有不同的结果不同(相同的线程下就会有相同的结果),这也就是线程本地存储的关键所在。
以下代码展示了如何在Linux使用线程本地存储,注意执行结果,分析一下线程本地存储的一些特性,以及内存回收的时机。
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define THREAD_COUNT 10
pthread_key_t g_key;
typedef struct thread_data{
int thread_no;
} thread_data_t;
void show_thread_data()
{
thread_data_t *data = pthread_getspecific( g_key );
printf( "Thread %d \n", data->thread_no );
}
void* thread( void *arg )
{
thread_data_t *data = (thread_data_t *)arg;
printf( "Start thread %d\n", data->thread_no );
pthread_setspecific( g_key, data );
show_thread_data();
printf( "Thread %d exit\n", data->thread_no );
}
void free_thread_data( void *arg )
{
thread_data_t *data = (thread_data_t*)arg;
printf( "Free thread %d data\n", data->thread_no );
free( data );
}
int main( int argc, char *argv[] )
{
int i;
pthread_t pth[THREAD_COUNT];
thread_data_t *data = NULL;
pthread_key_create( &g_key, free_thread_data );
for( i = 0; i < THREAD_COUNT; ++i ) {
data = malloc( sizeof( thread_data_t ) );
data->thread_no = i;
pthread_create( &pth[i], NULL, thread, data );
}
for( i = 0; i < THREAD_COUNT; ++i )
pthread_join( pth[i], NULL );
pthread_key_delete( g_key );
return 0;
}
四、线程的同步
虽然线程本地存储可以避免线程访问共享数据,但是线程之间的大部分数据始终还是共享的。在涉及到对共享数据进行读写操作时,就必须使用同步机制,否则就会造成线程们哄抢共享数据的结果,这会把你的数据弄的七零八落理不清头绪。
Linux提供的线程同步机制主要有
互斥锁
和
条件变量。
1. 互斥锁
试想一下,我们寝室只有一个洗手间,那多个人是怎么解决洗漱台共享的问题?那么,这时就要引入锁的机制!在这里洗漱台就是临界资源,我们在进入到洗手间(临界区)后,就首先将洗手间上锁; 然后用完离开洗手间(临界区)之后,把锁打开以供别人使用。如果有人想去洗手间时发现门锁上了,他也有两种方法:
1、在洗手间门口等(阻塞模式);
2、暂时先离开,不过过会儿再过来看(非阻塞模式);
那么对于上面的程序,变量shared_var就是临界资源,操作该变量的那段代码就是临界区。那我们把代码修改一下,通过锁的机制解决共享资源的问题:
#include <stdio.h>
#include <string.h>
#include <errno.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>7
void *thread_worker1(void *args);
void *thread_worker2(void *args);
//由于要传两个参数,所以这里定义了一个结构体
typedef struct worker_ctx_s
{
int shared_var;
pthread_mutex_t lock; //引入锁
}worker_ctx_t;
int main(int argc, char **argv)
{
worker_ctx_t worker_ctx;
pthread_t tid;
pthread_attr_t thread_attr;
worker_ctx.shared_var = 1000;
pthread_mutex_init(&worker_ctx.lock, NULL);
//初始化互斥锁
if (pthread_attr_init(&thread_attr))
{
printf("pthread_attr_init() failure: %s\n", strerror(errno));
return -1;
}
if (pthread_attr_setstacksize(&thread_attr, 120*1024))
{
printf("pthread_attr_setstacksize() failure: %s\n", strerror(errno));
return -1;
}
if (pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED))
{
printf("pthread_attr_setdetachstate() failure: %s\n", strerror(errno));
return -1;
}
pthread_create(&tid, &thread_attr, thread_worker1, &worker_ctx);
printf("Thread worker1 tid[%ld] created ok\n", tid);
pthread_create(&tid, &thread_attr, thread_worker2, &worker_ctx);
printf("Thread worker2 tid[%ld] created ok\n", tid);
while (1)
{
printf("Main/Control thread shared_var: %d\n", worker_ctx.shared_var);
sleep(10);
}
pthread_mutex_destroy(&worker_ctx.lock);
}
void *thread_worker1(void *args)
{
worker_ctx_t *ctx = (worker_ctx_t *)args;
if (!args)
{
printf("%s() get invalid arguments\n", __FUNCTION__);
pthread_exit(NULL);
}
printf("Thread workder 1 [%ld] start running...\n", pthread_self());
while (1)
{
pthread_mutex_lock(&ctx->lock); //设置阻塞锁
printf("+++: %s before shared_var++: %d\n", __FUNCTION__, ctx->shared_var);77 ctx---shared_var ++;
sleep(2);
printf("+++: %s after sleep shared_var: %d\n", __FUNCTION__, ctx->shared_var);
pthread_mutex_unlock(&ctx->lock); //打开阻塞锁
sleep(1);
}
printf("Thread workder 1 exit...\n");
return NULL;
}
void *thread_worker2(void *args)
{
worker_ctx_t *ctx = (worker_ctx_t *)args;
if (!args)
{
printf("%s() get invalid arguments\n", __FUNCTION__);
pthread_exit(NULL);
}
printf("Thread workder 2 [%ld] start running...\n", pthread_self());
while(1)
{
if (0 != pthread_mutex_trylock(&ctx->lock)) //设置非阻塞锁
{
continue;
}
printf("---: %s before shared_var++: %d\n", __FUNCTION__, ctx->shared_var);
ctx->shared_var ++;
sleep(2);
printf("---: %s after sleep shared_var: %d\n", __FUNCTION__, ctx->shared_var);
pthread_mutex_unlock(&ctx->lock);//打开非阻塞锁
sleep(1);
}
printf("Thread workder 2 exit...\n");
return NULL;
}
程序分析:
- 代码的11~15行:因为在创建线程给线程执行函数传参的时候只能传一个参数,而我们要传递共享的变量shared_var和它相应的互斥锁lock,所以在这里需要用结构体(worker_ctx_t,ctx: context)将它们封装在一块传进去。
- 代码19行:使用work_ctx_t结构体类型定义了传给子线程的变量参数;
- 代码24行:互斥锁在使用之前,需要先调用pthread_mutex_init() 函数来初始化互斥锁;
- 代码48行:在创建第二个线程时也设置了分离属性,这时主线程后面的while(1)循环就会执行了;
- 代码57行:互斥锁在使用完之后,我们应该调用pthread_mutex_destroy()将他摧毁释放;
- 代码74行: 这里调用pthread_mutex_lock() 来申请锁,这里是阻塞锁,如果锁被别的线程持有,则该函数不会返回;
- 代码81行:在访问临界资源(shared_var)完成退出临界区时,我们调用pthread_mutex_unlock来释放锁,这样其他线程才能再次访问;
-
代码105行: 第二个线程我们使用pthread_mutex_trylock()
来申请锁,这里使用的是非阻塞锁;如果锁现在被别的线程占用则返回非0值,如果没有被占用则返回0; - 代码83行、117行: 这里都要加上延时,否则一个线程拿到锁之后会一直占有该锁;另外一个线程则不能获取到锁;
再次运行:
Thread worker1 tid[1994032240] created ok
Thread workder 1 [1994032240] start running...
+++: thread_worker1 before shared_var++: 1000
Thread worker2 tid[1993909360] created ok
Main/Control thread shared_var: 1001
Thread workder 2 [1993909360] start running...
+++: thread_worker1 after sleep shared_var: 1001
---: thread_worker2 before shared_var++: 1001
---: thread_worker2 after sleep shared_var: 1002
+++: thread_worker1 before shared_var++: 1002
+++: thread_worker1 after sleep shared_var: 1003
---: thread_worker2 before shared_var++: 1003
---: thread_worker2 after sleep shared_var: 1004
+++: thread_worker1 before shared_var++: 1004
Main/Control thread shared_var: 1005
+++: thread_worker1 after sleep shared_var: 1005
---: thread_worker2 before shared_var++: 1005
通过引入互斥锁,就解决了数据不一致的问题。
死锁
如果多个线程要调用多个对象,则在上锁的时候可能会出现“死锁”。举个例子: A、B两个线程会同时使用到两个共享变量m和n,同时每个变量都有自己相应的锁M和N。 这时A线程首先拿到M锁访问m,接下来他需要拿N锁来访问变量n; 而如果此时B线程拿着N锁等待着M锁的话,就造成了线程“死锁”。
死锁产生的4个必要条件:
1、
互斥
:某种资源一次只允许一个进程访问,即该资源一旦分配给某个进程,其他进程就不能再访问,直到该进程访问结束。
2、
占有且等待
:一个进程本身占有资源(一种或多种),同时还有资源未得到满足,正在等待其他进程释放该资源。
3、
不可抢占
:别人已经占有了某项资源,你不能因为自己也需要该资源,就去把别人的资源抢过来。
4、
循环等待
:存在一个进程链,使得每个进程都占有下一个进程所需的至少一种资源。
当以上四个条件均满足,必然会造成死锁
,发生死锁的进程无法进行下去,它们所持有的资源也无法释放。这样会导致CPU的吞吐量下降。所以死锁情况是非常浪费系统资源以及影响计算机的使用性能的。那么,解决死锁问题就是相当有必要的了!
由于产生死锁需要四个条件,那么,
只要这四个条件中至少有一个条件得不到满足
,就不可能发生死锁了。由于互斥条件是非共享资源所必须的,不仅不能改变,还应加以保证,所以,主要是破坏产生死锁的其他三个条件。
a、破坏“占有且等待”条件
方法1
:所有的进程在开始运行之前,必须一次性地申请其在整个运行过程中所需要的全部资源。
优点:简单易实施且安全。缺点:因为某项资源不满足,进程无法启动,而其他已经满足了的资源也不会得到利用,严重降低了资源的利用率,造成资源浪费。使进程经常发生饥饿现象。
方法2
:该方法是对第一种方法的改进,允许进程只获得运行初期需要的资源,便开始运行,在运行过程中逐步释放掉分配到的已经使用完毕的资源,然后再去请求新的资源。这样的话,资源的利用率会得到提高,也会减少进程的饥饿问题。
b、
破坏“不可抢占”条件
当一个已经持有了一些资源的进程在提出新的资源请求没有得到满足时,它必须释放已经保持的所有资源,待以后需要使用的时候再重新申请。这就意味着进程已占有的资源会被短暂地释放或者说是被抢占了。该种方法实现起来比较复杂,且代价也比较大。释放已经保持的资源很有可能会导致进程之前的工作实效等,反复的申请和释放资源会导致进程的执行被无限的推迟,这不仅会延长进程的周转周期,还会影响系统的吞吐量。
c、
破坏“循环等待”条件
可以通过定义资源类型的线性顺序来预防,可将每个资源编号,当一个进程占有编号为i的资源时,那么它下一次申请资源只能申请编号大于i的资源。
2.条件变量
条件变量关键点在“变量”上。与锁的不同之处就是,当线程遇到这个“变量”,并不是类似锁那样的被系统给“拍晕”,而是根据“条件”来选择是否在那里等待。等待什么呢?等待允许通过的“信号”。这个“信号”是系统控制的吗?显然不是!它是由另外一个线程来控制的。
如果说互斥锁可以比作独木桥,那么条件变量这就好比是马路上的红绿灯。车辆遇到红绿灯肯定会根据“灯”的颜色来判断是否通行,毕竟红灯停绿灯行这个道理在幼儿园的时候老师就教了。那么谁来控制“灯”的颜色呢?一定是交警啊,至少你我都不敢动它(有人会说那是自动的,可是间隔多少时间变换也是交警设置不是?)。那么“车辆”和“交警”就是马路上的两类线程,大多数情况下都是“车”多“交警”少。
更深一步理解,条件变量是一种事件机制。由一类线程来控制“事件”的发生,另外一类线程等待“事件”的发生。为了实现这种机制,条件变量必须是共享于线程之间的全局变量。而且,条件变量也需要与互斥锁同时使用。
初始化和销毁条件变量的接口是pthread_cond_init()和pthread_cond_destory();
控制“事件”发生的接口是pthread_cond_signal()或pthread_cond_broadcast();
等待“事件”发生的接口是pthead_cond_wait()或pthread_cond_timedwait()。
它们的完整定义如下:
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr);
int pthread_cond_destory(pthread_cond_t *cond);
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
int pthread_cond_timedwait(pthread_cond_t *cond,pthread_mutex_t *mutex, const timespec *abstime);
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);
对于等待“事件”的接口从其名称中可以看出,一种是无限期等待,一种是限时等待。后者与互斥锁的pthread_mutex_trylock()有些类似,即当等待的“事件”经过一段时间之后依然没有发生,那就去干点别的有意义的事情去。
而对于控制“事件”发生的接口则有“单播”和“广播”之说。所谓单播就是只有一个线程会得到“事件”已经发生了的“通知”,而广播就是所有线程都会得到“通知”。对于广播情况,所有被“通知”到的线程也要经过由互斥锁控制的独木桥。
对于条件变量的使用,可以参考以下代码,它实现了一种生产者与消费者的线程同步方案。
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#define BUFFER_SIZE 5
pthread_mutex_t g_mutex;
pthread_cond_t g_cond;
typedef struct {
char buf[BUFFER_SIZE];
int count;
} buffer_t;
buffer_t g_share = {"", 0};
char g_ch = 'A';
void* producer( void *arg )
{
printf( "Producer starting.\n" );
while( g_ch != 'Z' ) {
pthread_mutex_lock( &g_mutex );
if( g_share.count < BUFFER_SIZE ) {
g_share.buf[g_share.count++] = g_ch++;
printf( "Prodcuer got char[%c]\n", g_ch - 1 );
if( BUFFER_SIZE == g_share.count ) {
printf( "Producer signaling full.\n" );
pthread_cond_signal( &g_cond );
}
}
pthread_mutex_unlock( &g_mutex );
}
printf( "Producer exit.\n" );
return NULL;
}
void* consumer( void *arg )
{
int i;
printf( "Consumer starting.\n" );
while( g_ch != 'Z' ) {
pthread_mutex_lock( &g_mutex );
printf( "Consumer waiting\n" );
pthread_cond_wait( &g_cond, &g_mutex );
printf( "Consumer writing buffer\n" );
for( i = 0; g_share.buf[i] && g_share.count; ++i ) {
putchar( g_share.buf[i] );
--g_share.count;
}
putchar('\n');
pthread_mutex_unlock( &g_mutex );
}
printf( "Consumer exit.\n" );
return NULL;
}
int main( int argc, char *argv[] )
{
pthread_t ppth, cpth;
pthread_mutex_init( &g_mutex, NULL );
pthread_cond_init( &g_cond, NULL );
pthread_create( &cpth, NULL, consumer, NULL );
pthread_create( &ppth, NULL, producer, NULL );
pthread_join( ppth, NULL );
pthread_join( cpth, NULL );
pthread_mutex_destroy( &g_mutex );
pthread_cond_destroy( &g_cond );
return 0;
}
这段代码存在一个潜在的问题:
如果producer线程并行执行的比consumer快,producer线程会先获取锁,之后向consumer发出信号,但此时consumer没办法获取锁,也就执行不到pthead_cond_wait() 处,那么程序就陷入尴尬的境地,发生死锁。
简单的,可以在 pthread_create( &cpth, NULL, consumer, NULL ); 和pthread_create( &ppth, NULL, producer, NULL ); 之间加入一个长的延时函数usleep(100),确保consumer线程先行执行到pthead_cond_wait() 处。
从代码中会发现,等待“事件”发生的接口都需要传递一个互斥锁给它。而实际上这个互斥锁还要在调用它们之前加锁,调用之后解锁。不单如此,在调用操作“事件”发生的接口之前也要加锁,调用之后解锁。这就面临一个问题,按照这种方式,等于“发生事件”和“等待事件”是互为临界区的。也就是说,如果“事件”还没有发生,那么有线程要等待这个“事件”就会阻止“事件”的发生。更干脆一点,就是这个“生产者”和“消费者”是在来回的走独木桥。但是实际的情况是,“消费者”在缓冲区满的时候会得到这个“事件”的“通知”,然后将字符逐个打印出来,并清理缓冲区。直到缓冲区的所有字符都被打印出来之后,“生产者”才开始继续工作。
为什么会有这样的结果呢?这就要说明一下pthread_cond_wait()接口对互斥锁做什么。答案是:解锁。pthread_cond_wait()首先会解锁互斥锁,然后进入等待。这个时候“生产者”就能够进入临界区,然后在条件满足的时候向“消费者”发出信号。
当pthead_cond_wait()获得“通知”之后,它还要对互斥锁加锁,这样可以防止“生产者”继续工作而“撑坏”缓冲区。另外,“生产者”在缓冲区不满的情况下才能工作的这个限定条件是很有必要的。因为在pthread_cond_wait()获得通知之后,在没有对互斥锁加锁之前,“生产者”可能已经重新进入临界区了,这样“消费者”又被堵住了。也就是因为条件变量这种工作性质,导致它必须与互斥锁联合使用。
此外,利用条件变量和互斥锁,可以模拟出很多其它类型的线程同步机制,比如:event、semaphore等。
五、多线程改写服务器程序
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <stdlib.h>
#include <getopt.h>
#include <pthread.h>
#include <ctype.h>
typedef void *(THREAD_BODY) (void *thread_arg);
void *thread_worker(void *ctx);
//封装thread_start()函数,实现创建子线程的功能
int thread_start(pthread_t * thread_id, THREAD_BODY * thread_workbody, void *thread_arg);
void print_usage(char *progname)
{
printf("%s usage: \n", progname);
printf("-p(--port): sepcify server listen port.\n");
printf("-h(--Help): print this help information.\n");
return ;
}
int main(int argc, char **argv)
{
int sockfd = -1;
int rv = -1;
struct sockaddr_in servaddr;
struct sockaddr_in cliaddr;
socklen_t len;
int port = 0;
int clifd;
int ch;
int on = 1;
pthread_t tid;
struct option opts[] = {
{"port", required_argument, NULL, 'p'},
{"help", no_argument, NULL, 'h'},
{NULL, 0, NULL, 0}
};
while ((ch=getopt_long(argc, argv, "p:h", opts, NULL)) != -1)
{
switch(ch)
{
case 'p':
port=atoi(optarg);
break;
case 'h':
print_usage(argv[0]);
return 0;
}
}
if( !port )
{
print_usage(argv[0]);
return 0;
}
sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (sockfd < 0)
{
printf("Create socket failure: %s\n", strerror(errno));
return -1;
}
printf("Create socket[%d] successfully!\n", sockfd);
setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(port);
servaddr.sin_addr.s_addr = htonl(INADDR_ANY); /* 监听本机所有IP*/
//inet_aton("192.168.0.16", &servaddr.sin_addr); /* 监听指定ip */
rv = bind(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
if (rv < 0)
{
printf("Socket[%d] bind on port[%d] failure: %s\n", sockfd, port, strerror(errno));
return -2;
}
listen(sockfd, 13);
printf("Start to listen on port [%d]\n", port);
while (1)
{
printf("Start accept new client incoming...\n");
clifd = accept(sockfd, (struct sockaddr *)&cliaddr, &len);
if (clifd < 0)
{
printf("Accept new client failure: %s\n", strerror(errno));
continue;
}
printf("Accept new client[%s:%d] successfully\n", inet_ntoa(cliaddr.sin_addr),
ntohs(cliaddr.sin_port));
/*注意,这里传入的是clifd的值,而不是clifd的地址*/
thread_start(&tid, thread_worker, (void *)clifd);
}
close(sockfd);
return 0;
}、
int thread_start(pthread_t *thread_id, THREAD_BODY *thread_workbody, void *thread_arg)
{
int rv = -1;
pthread_attr_t thread_attr;
if (pthread_attr_init(&thread_attr))
{
printf("pthread_attr_init() failure: %s\n", strerror(errno));
goto CleanUp;
}
if (pthread_attr_setstacksize(&thread_attr, 120*1024))
{
printf("pthread_attr_setstacksize() failure: %s\n", strerror(errno));
goto CleanUp;
}
if (pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_DETACHED))
{
printf("pthread_attr_setdetachstate() failure: %s\n", strerror(errno));
goto CleanUp;
}
/* Create the thread */
if (pthread_create(thread_id, &thread_attr, thread_workbody, thread_arg))
{
printf("Create thread failure: %s\n", strerror(errno));
goto CleanUp;
}
rv = 0;
CleanUp:
/* Destroy the attributes of thread */
pthread_attr_destroy(&thread_attr);
return rv;
}
void *thread_worker(void *ctx)
{
int clifd;
int rv;
char buf[1024];
int i;
if (!ctx)
{
printf("Invalid input arguments in %s()\n", __FUNCTION__);
pthread_exit(NULL);
}
clifd = (int)ctx;
printf("Child thread start to commuicate with socket client...\n");
while (1)
{
memset(buf, 0, sizeof(buf));
rv = read(clifd, buf, sizeof(buf));
if (rv < 0)
{
printf("Read data from client sockfd[%d] failure: %s and thread will exit\n", clifd,
strerror(errno));
close(clifd);
pthread_exit(NULL);
}
else if( rv == 0)
{
printf("Socket[%d] get disconnected and thread will exit.\n", clifd);
close(clifd);
pthread_exit(NULL);
}
else if( rv > 0 )
{
printf("Read %d bytes data from Server: %s\n", rv, buf);
}
/* convert letter from lowercase to uppercase */
for (i = 0; i < rv; i++)
{
buf[i] = toupper(buf[i]);
}
rv = write(clifd, buf, rv);
if (rv < 0)
{
printf("Write to client by sockfd[%d] failure: %s and thread will exit\n", clifd,
strerror(errno));
close(clifd);
pthread_exit(NULL);
}
}
}