【Linux】进程间通讯的五种方式(管道、信号量、共享内存、消息队列、套接字)

  • Post author:
  • Post category:linux


信号:通知一个进程发生了什么事件


进程间的通讯方式有:

管道、信号量、共享内存、消息队列、套接字。

一、管道


特点:

1、它是半双工的(即数据只能在一个方向上流动),具有固定的读端和写端。

2、它只能用于具有亲缘关系的进程之间的通信(也是父子进程或者兄弟进程之间)。

3、它可以看成是一种特殊的文件,对于它的读写也可以使用普通的read、write 等函数。但是它不是普通的文件,并不属于其他任何文件系统,并且只存在于内存中。

管道分为

有名管道和无名管道。



无论有名管道还是无名管,在同一时刻,只能是一段读一段写


1、有名管道(命名管道):

在磁盘上会存在一个管道文件标识,但管道文件并不占用磁盘block空间。


应用:

可以应用于同一台的主机上的所有权限访问的

任意几个进程间通讯

函数

:int mkfifo(const char *pathname,int mode);


特点:

1、FIFO可以在无关的进程之间交换数据,与无名管道不同。

2、FIFO有路径名与之相关联,它以一种特殊设备文件形式存在于文件系统中。

例:

#include<stdio.h>
#include<stdlib.h>   // exit
#include<fcntl.h>    // O_WRONLY
#include<sys/stat.h>
#include<time.h>     // time

int main()
{
    int fd;
    int n, i;
    char buf[1024];
    time_t tp;

    printf("I am %d process.\n", getpid()); // 说明进程ID
    
    if((fd = open("fifo1", O_WRONLY)) < 0) // 以写打开一个FIFO 
    {
        perror("Open FIFO Failed");
        exit(1);
    }

    for(i=0; i<10; ++i)
    {
        time(&tp);  // 取系统当前时间
        n=sprintf(buf,"Process %d's time is %s",getpid(),ctime(&tp));
        printf("Send message: %s", buf); // 打印
        if(write(fd, buf, n+1) < 0)  // 写入到FIFO中
        {
            perror("Write FIFO Failed");
            close(fd);
            exit(1);
        }
        sleep(1);  // 休眠1秒
    }

    close(fd);  // 关闭FIFO文件
    return 0;
}


2、无名管道:

不会存在管道文件标识


限制:

只能用于父子进程之间。


原理:

借助父子进程之间共享fork之前打开的文件描述符。

若要数据流从父进程流向子进程,则关闭父进程的读端(

fd[0]

)与子进程的写端(

fd[1]

);反之,则可以使数据流从子进程流向父进程。


特点:

1、如果所有读端关闭,则写端也退出,反之亦然;

2、读写次数没有直接联系   →    字节流服务;

3、如果写端保持,但是并没有写数据,则读端阻塞;如果读端保持,但是并没有获取数据,写端在将管道内存写满时,阻塞。

例:

#include<stdio.h>
#include<unistd.h>

int main()
{
    int fd[2];  // 两个文件描述符
    pid_t pid;
    char buff[20];

    if(pipe(fd) < 0)  // 创建管道
        printf("Create Pipe Error!\n");

    if((pid = fork()) < 0)  // 创建子进程
        printf("Fork Error!\n");
    else if(pid > 0)  // 父进程
    {
        close(fd[0]); // 关闭读端
        write(fd[1], "hello world\n", 12);
    }
    else
    {
        close(fd[1]); // 关闭写端
        read(fd[0], buff, 20);
        printf("%s", buff);
    }

    return 0;
}

二、信号量


1、概念

信号量的本质是数据操作锁,本身不具有数据交换的功能,而是通过控制其他的通信资源(文件、外部设备)来实现进程间通信,它本身只是一种外部资源的标识。信号量在此过程中负责数据操作的互斥、同步等功能。

用来同步进程的特殊变量:一个特殊的计数器,大于0时记录资源的数量,小于0时,记录等待资源的进程的数量。当信号量的值大于0时,进程总是可以获取到资源并使用,小于0时,进程必须阻塞等待其他进程释放资源。


2、特点

  1. 信号量用于进程间同步,若要在进程间传递数据需要结合共享内存。

  2. 信号量基于操作系统的 PV 操作,程序对信号量的操作都是原子操作。

  3. 每次对信号量的 PV 操作不仅限于对信号量值加 1 或减 1,而且可以加减任意正整数。

  4. 支持信号量组。


3、作用

完成进程同步控制,用于多进程访问同一临界资源。

信号量类似计数器

①> 0  为临界资源的个数;

② == 0 没有临界资源可用,申请资源进行执行P操作,则进程会被阻塞。


进程同步:

进程协同工作。


进程异步:

进程独立运行,互不干扰,需要内核机制来通知信号。


临界资源

:同一时刻只能被一个进程访问使用的资源(临界资源可有多份)。


临界区:

访问临界资源的代码区域。


原子操作:

不会被线程调度机制打断的操作。不能被终止,不能被暂停的程序段。一旦开始操作,必须等待他结束的操作。


P操作:(获取资源)-1

a、将信号量S的值减1,既S=S-1;

b、如果S>=0,则该进程继续执行,否则该进程置为等待状态,排入等待序列。


V操作:(释放资源)+1

a、将信号量S的值加1,既S=S+1;

b、如果S > 0,则该进程继续执行,否则释放队列中等一个等待信号量的进程。


1、创建/获取信号量

int semget((key_t)key,int nsems,int flag);

nsems:创建时,指定信号量集的个数。


2、初始化信号量值

int semctl(int semid,int semnum,int cmd,/*union semun arg*/);

union semun

{

int val;

//struct semid_ds*buf

};

返回值:semid


3、P操作、V操作

int  semop(int semid,struct sembuf[ ],int size);


struct sembuf


{


short sem_num;//操作第几个


short sem_op;// -1 P,  +1 V


short sem_flg;//控制模式,IPC_NOWAIT     SEM_UNDO


}

其中 sem_op 是一次操作中的信号量的改变量:



  • sem_op > 0

    ,表示进程释放相应的资源数,将 sem_op 的值加到信号量的值上。如果有进程正在休眠等待此信号量,则换行它们。



  • sem_op < 0

    ,请求 sem_op 的绝对值的资源。

    • 如果相应的资源数可以满足请求,则将该信号量的值减去sem_op的绝对值,函数成功返回。
    • 当相应的资源数不能满足请求时,这个操作与

      sem_flg

      有关。

      • sem_flg 指定

        IPC_NOWAIT

        ,则semop函数出错返回

        EAGAIN

      • sem_flg 没有指定

        IPC_NOWAIT

        ,则将该信号量的semncnt值加1,然后进程挂起直到下述情况发生:

        1. 当相应的资源数可以满足请求,此信号量的semncnt值减1,该信号量的值减去sem_op的绝对值。成功返回;
        2. 此信号量被删除,函数smeop出错返回EIDRM;
        3. 进程捕捉到信号,并从信号处理函数返回,此情况下将此信号量的semncnt值减1,函数semop出错返回EINTR


  • sem_op == 0

    ,进程阻塞直到信号量的相应值为0:

    • 当信号量已经为0,函数立即返回。
    • 如果信号量的值不为0,则依据

      sem_flg

      决定函数动作:

      • sem_flg指定

        IPC_NOWAIT

        ,则出错返回

        EAGAIN

      • sem_flg没有指定

        IPC_NOWAIT

        ,则将该信号量的semncnt值加1,然后进程挂起直到下述情况发生:

        1. 信号量值为0,将信号量的semzcnt的值减1,函数semop成功返回;
        2. 此信号量被删除,函数smeop出错返回EIDRM;
        3. 进程捕捉到信号,并从信号处理函数返回,在此情况将此信号量的semncnt值减1,函数semop出错返回EINTR

例:

#include<stdio.h>
#include<stdlib.h>
#include<sys/sem.h>

// 联合体,用于semctl初始化
union semun
{
    int              val; /*for SETVAL*/
    struct semid_ds *buf;
    unsigned short  *array;
};

// 初始化信号量
int init_sem(int sem_id, int value)
{
    union semun tmp;
    tmp.val = value;
    if(semctl(sem_id, 0, SETVAL, tmp) == -1)
    {
        perror("Init Semaphore Error");
        return -1;
    }
    return 0;
}

// P操作:
//    若信号量值为1,获取资源并将信号量值-1 
//    若信号量值为0,进程挂起等待
int sem_p(int sem_id)
{
    struct sembuf sbuf;
    sbuf.sem_num = 0; /*序号*/
    sbuf.sem_op = -1; /*P操作*/
    sbuf.sem_flg = SEM_UNDO;

    if(semop(sem_id, &sbuf, 1) == -1)
    {
        perror("P operation Error");
        return -1;
    }
    return 0;
}

// V操作:
//    释放资源并将信号量值+1
//    如果有进程正在挂起等待,则唤醒它们
int sem_v(int sem_id)
{
    struct sembuf sbuf;
    sbuf.sem_num = 0; /*序号*/
    sbuf.sem_op = 1;  /*V操作*/
    sbuf.sem_flg = SEM_UNDO;

    if(semop(sem_id, &sbuf, 1) == -1)
    {
        perror("V operation Error");
        return -1;
    }
    return 0;
}

// 删除信号量集
int del_sem(int sem_id)
{
    union semun tmp;
    if(semctl(sem_id, 0, IPC_RMID, tmp) == -1)
    {
        perror("Delete Semaphore Error");
        return -1;
    }
    return 0;
}


int main()
{
    int sem_id;  // 信号量集ID
    key_t key;  
    pid_t pid;

    // 获取key值
    if((key = ftok(".", 'z')) < 0)
    {
        perror("ftok error");
        exit(1);
    }

    // 创建信号量集,其中只有一个信号量
    if((sem_id = semget(key, 1, IPC_CREAT|0666)) == -1)
    {
        perror("semget error");
        exit(1);
    }

    // 初始化:初值设为0资源被占用
    init_sem(sem_id, 0);

    if((pid = fork()) == -1)
        perror("Fork Error");
    else if(pid == 0) /*子进程*/ 
    {
        sleep(2);
        printf("Process child: pid=%d\n", getpid());
        sem_v(sem_id);  /*释放资源*/
    }
    else  /*父进程*/
    {
        sem_p(sem_id);   /*等待资源*/
        printf("Process father: pid=%d\n", getpid());
        sem_v(sem_id);   /*释放资源*/
        del_sem(sem_id); /*删除信号量集*/
    }
    return 0;
}

三、共享内存        最快的一种IPC


定义:

指两个或者多个进程共享一个给定的区域(物理内存地址)。


特点:

1、共享内存是最快的一种IPC,因为进程是直接对内存进行读取;

2、因为多个进程可以同时操作,所以需要进行同步。

3、信号量+共享内存通常结合在一起使用,信号量用来同步对共享内存的访问。


共享内存一旦使得进程映射到此共享内存区域,后续操作时,不需要用户态切换内核态。


共享内存相比较于其他通讯方式,会少两次数据的拷贝


共享内存的使用:


1、创建或获取一个共享内存:成功返回共享内存ID,失败返回-1

int shmget(key_t key, size_t size, int flag);

2、连接共享内存到当前进程的地址空间:成功返回指向共享内存的指针,失败返回-1

void *shmat(int shm_id, const void *addr, int flag);

3、断开与共享内存的连接:成功返回0,失败返回-1

int shmdt(void *addr);

4、删除内核对象。控制共享内存的相关信息:成功返回0,失败返回-1

int shmctl(int shm_id, int cmd, struct shmid_ds *buf);


例:实现服务器进程与客户进程间的通信。

//server.c
#include<stdio.h>
#include<stdlib.h>
#include<sys/shm.h>  // shared memory
#include<sys/sem.h>  // semaphore
#include<sys/msg.h>  // message queue
#include<string.h>   // memcpy

// 消息队列结构
struct msg_form {
    long mtype;
    char mtext;
};

// 联合体,用于semctl初始化
union semun
{
    int              val; /*for SETVAL*/
    struct semid_ds *buf;
    unsigned short  *array;
};

// 初始化信号量
int init_sem(int sem_id, int value)
{
    union semun tmp;
    tmp.val = value;
    if(semctl(sem_id, 0, SETVAL, tmp) == -1)
    {
        perror("Init Semaphore Error");
        return -1;
    }
    return 0;
}

// P操作:
//  若信号量值为1,获取资源并将信号量值-1 
//  若信号量值为0,进程挂起等待
int sem_p(int sem_id)
{
    struct sembuf sbuf;
    sbuf.sem_num = 0; /*序号*/
    sbuf.sem_op = -1; /*P操作*/
    sbuf.sem_flg = SEM_UNDO;

    if(semop(sem_id, &sbuf, 1) == -1)
    {
        perror("P operation Error");
        return -1;
    }
    return 0;
}

// V操作:
//  释放资源并将信号量值+1
//  如果有进程正在挂起等待,则唤醒它们
int sem_v(int sem_id)
{
    struct sembuf sbuf;
    sbuf.sem_num = 0; /*序号*/
    sbuf.sem_op = 1;  /*V操作*/
    sbuf.sem_flg = SEM_UNDO;

    if(semop(sem_id, &sbuf, 1) == -1)
    {
        perror("V operation Error");
        return -1;
    }
    return 0;
}

// 删除信号量集
int del_sem(int sem_id)
{
    union semun tmp;
    if(semctl(sem_id, 0, IPC_RMID, tmp) == -1)
    {
        perror("Delete Semaphore Error");
        return -1;
    }
    return 0;
}

// 创建一个信号量集
int creat_sem(key_t key)
{
    int sem_id;
    if((sem_id = semget(key, 1, IPC_CREAT|0666)) == -1)
    {
        perror("semget error");
        exit(-1);
    }
    init_sem(sem_id, 1);  /*初值设为1资源未占用*/
    return sem_id;
}


int main()
{
    key_t key;
    int shmid, semid, msqid;
    char *shm;
    char data[] = "this is server";
    struct shmid_ds buf1;  /*用于删除共享内存*/
    struct msqid_ds buf2;  /*用于删除消息队列*/
    struct msg_form msg;  /*消息队列用于通知对方更新了共享内存*/

    // 获取key值
    if((key = ftok(".", 'z')) < 0)
    {
        perror("ftok error");
        exit(1);
    }

    // 创建共享内存
    if((shmid = shmget(key, 1024, IPC_CREAT|0666)) == -1)
    {
        perror("Create Shared Memory Error");
        exit(1);
    }

    // 连接共享内存
    shm = (char*)shmat(shmid, 0, 0);
    if((int)shm == -1)
    {
        perror("Attach Shared Memory Error");
        exit(1);
    }


    // 创建消息队列
    if ((msqid = msgget(key, IPC_CREAT|0777)) == -1)
    {
        perror("msgget error");
        exit(1);
    }

    // 创建信号量
    semid = creat_sem(key);
    
    // 读数据
    while(1)
    {
        msgrcv(msqid, &msg, 1, 888, 0); /*读取类型为888的消息*/
        if(msg.mtext == 'q')  /*quit - 跳出循环*/ 
            break;
        if(msg.mtext == 'r')  /*read - 读共享内存*/
        {
            sem_p(semid);
            printf("%s\n",shm);
            sem_v(semid);
        }
    }

    // 断开连接
    shmdt(shm);

    /*删除共享内存、消息队列、信号量*/
    shmctl(shmid, IPC_RMID, &buf1);
    msgctl(msqid, IPC_RMID, &buf2);
    del_sem(semid);
    return 0;
}

client.c

#include<stdio.h>
#include<stdlib.h>
#include<sys/shm.h>  // shared memory
#include<sys/sem.h>  // semaphore
#include<sys/msg.h>  // message queue
#include<string.h>   // memcpy

// 消息队列结构
struct msg_form {
    long mtype;
    char mtext;
};

// 联合体,用于semctl初始化
union semun
{
    int              val; /*for SETVAL*/
    struct semid_ds *buf;
    unsigned short  *array;
};

// P操作:
//  若信号量值为1,获取资源并将信号量值-1 
//  若信号量值为0,进程挂起等待
int sem_p(int sem_id)
{
    struct sembuf sbuf;
    sbuf.sem_num = 0; /*序号*/
    sbuf.sem_op = -1; /*P操作*/
    sbuf.sem_flg = SEM_UNDO;

    if(semop(sem_id, &sbuf, 1) == -1)
    {
        perror("P operation Error");
        return -1;
    }
    return 0;
}

// V操作:
//  释放资源并将信号量值+1
//  如果有进程正在挂起等待,则唤醒它们
int sem_v(int sem_id)
{
    struct sembuf sbuf;
    sbuf.sem_num = 0; /*序号*/
    sbuf.sem_op = 1;  /*V操作*/
    sbuf.sem_flg = SEM_UNDO;

    if(semop(sem_id, &sbuf, 1) == -1)
    {
        perror("V operation Error");
        return -1;
    }
    return 0;
}


int main()
{
    key_t key;
    int shmid, semid, msqid;
    char *shm;
    struct msg_form msg;
    int flag = 1; /*while循环条件*/

    // 获取key值
    if((key = ftok(".", 'z')) < 0)
    {
        perror("ftok error");
        exit(1);
    }

    // 获取共享内存
    if((shmid = shmget(key, 1024, 0)) == -1)
    {
        perror("shmget error");
        exit(1);
    }

    // 连接共享内存
    shm = (char*)shmat(shmid, 0, 0);
    if((int)shm == -1)
    {
        perror("Attach Shared Memory Error");
        exit(1);
    }

    // 创建消息队列
    if ((msqid = msgget(key, 0)) == -1)
    {
        perror("msgget error");
        exit(1);
    }

    // 获取信号量
    if((semid = semget(key, 0, 0)) == -1)
    {
        perror("semget error");
        exit(1);
    }
    
    // 写数据
    printf("***************************************\n");
    printf("*                 IPC                 *\n");
    printf("*    Input r to send data to server.  *\n");
    printf("*    Input q to quit.                 *\n");
    printf("***************************************\n");
    
    while(flag)
    {
        char c;
        printf("Please input command: ");
        scanf("%c", &c);
        switch(c)
        {
            case 'r':
                printf("Data to send: ");
                sem_p(semid);  /*访问资源*/
                scanf("%s", shm);
                sem_v(semid);  /*释放资源*/
                /*清空标准输入缓冲区*/
                while((c=getchar())!='\n' && c!=EOF);
                msg.mtype = 888;  
                msg.mtext = 'r';  /*发送消息通知服务器读数据*/
                msgsnd(msqid, &msg, sizeof(msg.mtext), 0);
                break;
            case 'q':
                msg.mtype = 888;
                msg.mtext = 'q';
                msgsnd(msqid, &msg, sizeof(msg.mtext), 0);
                flag = 0;
                break;
            default:
                printf("Wrong input!\n");
                /*清空标准输入缓冲区*/
                while((c=getchar())!='\n' && c!=EOF);
        }
    }

    // 断开连接
    shmdt(shm);

    return 0;
}

四、消息队列

发送带有类型的数据,可以真正实现多进程间通讯。

消息:类型+数据

消息队列:先进先出(队列)


特点:

  1. 消息队列是面向记录的,其中的消息具有特定的格式以及特定的优先级。

  2. 消息队列独立于发送与接收进程。进程终止时,消息队列及其内容并不会被删除。

  3. 消息队列可以实现消息的随机查询,消息不一定要以先进先出的次序读取,也可以按消息的类型读取。

消息队列的使用:


1、创建/获取消息队列

int msgget((key_t)key,int mode/flag);

mode/flag:权限//IPC_CREAT|0664(所有者可读可写)

返回内核对象的标识符


2、发送消息(类型+数据)

int msgsnd(int msgid,void *ptr,int datalen,int flg);

ptr:指向的数据类型包含一个long的类型字段,一个数据字段;

datalen:数据长度(数据部分的有效数据长度);

flag:标记

struct data

{

long type;

char text[128];

};


3、接收消息

int msgrcv(int msgid,void *ptr,int size,long type,int flag);

ptr:类型、数据


4、删除内核对象 IPC结构

int msgctl(int msgid,int cmd,struct msgid_ds *buf);

ctl:设置函数;

and:IPC_RMID删除

函数

msgrcv

在读取消息队列时,type参数有下面几种情况:


  • type == 0

    ,返回队列中的第一个消息;

  • type > 0

    ,返回队列中消息类型为 type 的第一个消息;

  • type < 0

    ,返回队列中消息类型值小于或等于 type 绝对值的消息,如果有多个,则取类型值最小的消息。

五、套接字  socket

概念:

是一种通讯机制,凭借这种机制,客户/服务器系统的开发工作既可以在本地单机上进行,也可以跨网络进行。



版权声明:本文为like_that原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。