EPOLLOUT例子 EPOLLOUT触发条件

  • Post author:
  • Post category:其他


下面的代码都在ET下工作

群里小伙伴没搞懂EPOLLOUT 再写2个例子; 2个例子都是回声服务器代码;

关于EPOLLET的基础 , 以及EPOLLIN|EPOLLOUT|EPOLLET 一起注册的例子:

EPOLLET简单例子

EPOLLOUT 的说明:

LT模式下:

与select 一致. 只要可写就一直触发


在ET模式下:


socket 一般情况下需要非阻塞的, 与EAGAIN 这个错误 配合使用.


如果是默认的阻塞socket, 不是不可以,而是你需要做额外的事, 每一次的read/write , 你都需要去getsockopt( sock,SO_ERROR,…)


直到获取 EAGAIN 算是正常读/写. 否则, 你完蛋了. 这个socket将不再收到 epoll 的触发条件了



对于EPOLLIN : 如果状态改变了[ 比如 从无到有],那么只要输入缓冲区可读就会触发


对于EPOLLOUT: 如果状态改变了[比如 从满到不满],只要输出缓冲区可写就会触发;


EPOLLOUT那么具体什么时间点触发呢


1. EPOLL_CTL_ADD或EPOLL_CTL_MOD 时 , 如果输出缓冲区状态改变了


2. 不论注册方式是 EPOLLIN | EPOLLOUT | EPOLLET 还是 EPOLLOUT | EPOLLET 只要含EPOLLOUT


只要状态改变就能触发.


状态改变到底是什么?   简单理解:


EPOLLOUT:  满->不满


EPOLLIN   :  空->不空


有没有发现 , 都跟边缘有关


3.对于 send / write 需要依靠 EPOLLOUT 触发才调用吗 ? 什么时候需要 注册上 EPOLLOUT ?


不需要.  如果要 send / write 那么就直接调用, 如果返回值 > 0  , 证明数据已经复制到发送缓冲区中.一切正常.


如果 send / write 返回 < 0 且 errno == EAGAIN . 此时说明发送缓冲区满了. 那么需要把 剩余的字节保存起来,


然后注册上 EPOLLOUT , 直到epoll_wait 返回 , 说明发送缓冲区可写, 再把 之前保存起来的数据 发送,


如果此时 write 返回 > 0  那就把EPOLLOUT 取消掉.


简单来说 :  1. 直接发送  2. 看返回值, 没发送完才挂上EPOLLOUT  3. 发送完就把EPOLLOUT 取消掉

下面2个例子都是关于EPOLLOUT,

第一个例子利用EPOLL_CTL_MOD来触发EPOLLOUT,这种方式不太好,需要利用一次系统调用epoll_wait来触发;

第2个例子在读完时, 手动调用一次发送, 如果发送遇到 EAGAIN / EWOULDBLOCK,才把EPOLLOUT注册上去,等待

EPOLLOUT事件发生再去发送, 如果不需要发送则把EPOLLOUT关闭;

另外有关于下面例子中 util.h 头文件就包含了一些常用头文件,所以就不贴了;

再次注意, 我没有对监听socket设置EPOLLET , 如果你设置了那么需要 while(1) accept().. if(errno==EAGAIN)break;

否则会漏掉客户端连接的.

另外注意不论是write, send ..等这些个函数,都是把数据复制到socket输出缓冲区中 .并不是发送到对端的意思;

还有close(fd) 会附带着 epoll_ctl ( … , EPOLL_CTL_DEL, …); 自动从epoll中清除

第一个关于EPOLLOUT的例子:

这个例子使用 EPOLL_CTL_MOD 来主动触发EPOLLOUT事件 ;

当然这种方式需要让下一次epoll_wait返回一次来触发,不是特别好;

[ 当 EPOLL_CTL_ADD或EPOLL_CTL_MOD 时 , 如果socket输出缓冲区没满就能触发一次. ]

下面这个例子唯一需要注意的就 3行代码:

                    //把EPOLLOUT一起注册上去
                    ev.events = evts[i].events | EPOLLOUT;
                    ev.data.ptr = pData;
                    
                    //利用EPOLL_CTL_MOD将触发EPOLLOUT的特性
                    epoll_ctl(epfd,EPOLL_CTL_MOD,pData->fd,&ev);
#include "util.h"

#define  BUFF_SIZE 1024
#define  MAX_EVENTS 1024



static int setnonblock(int fd , int nonblock){
    int flag = fcntl(fd,F_GETFL,0);
    if(nonblock)
        return fcntl(fd,F_SETFL,flag|O_NONBLOCK);
    else
        return fcntl(fd,F_SETFL,flag&~O_NONBLOCK);
}

//每个套接字关联的信息
typedef struct _ev_data
{
    int fd; //socket
    char * buffer; //缓冲区
    int nread; //读入字节长度
    int start_write_index; //记录下一次写的位置
    int epoll_fd; // 属于哪个epoll
} ev_data;

void free_event(ev_data * );

void free_event(ev_data * pEv){ //释放内存用的
    if(!pEv)
        return;
    if(pEv->buffer){
        free(pEv->buffer);
    }
    free(pEv);
}

//输出函数
void write_handler(ev_data *ptr)
{
    static struct epoll_event ev ={0,{0}};

    if(!ptr || !ptr->buffer){
        printf("write_handler error , ptr is empty!\n");
        return;
    }

    //需要写入socket输出缓冲区的长度
    int left = ptr->nread - ptr->start_write_index;

    //从哪里开始写
    char *pbuffer = ptr->buffer + ptr->start_write_index;
    
    //返回值
    int nwriten = -1;
    // 总共写入的字节数
    int write_bytes = 0;
    //sock输出缓冲区是否满了
    int full = 0;
    printf("write_handler begin write , nread:%d, start_write_index:%d, left:%d\n",
    ptr->nread,ptr->start_write_index,left);
    while(left > 0){
        nwriten = write(ptr->fd,pbuffer,left);
        if(nwriten <= 0){

            //如果满了, 就不写了, 此时已经注册了EPOLLOUT事件,因此等待下一次被触发
            if(nwriten < 0 && (errno == EAGAIN || EWOULDBLOCK == errno)){
                full = 1;
                puts("sock sendbuff is full");
                break;
            }
            else{
            // 输出有问题了, 具体你可以自由发挥
                close(ptr->fd);
                free_event(ptr);
                perror("write error");
                return;
            }
        }
        pbuffer += nwriten;
        left -= nwriten;
        write_bytes += nwriten;
    }

    //如果满了就记录一下,下一次要从哪里开始写
    if(full){
        ptr->start_write_index = write_bytes;
    }
    //到这,说明该写的都写了
    else if( 0 == left){
        //重置一下 读写的位置
        ptr->start_write_index = ptr->nread = 0;

        //修改事件
        ev.events = EPOLLIN | EPOLLET;
        ev.data.ptr = ptr;
        epoll_ctl(ptr->epoll_fd,EPOLL_CTL_MOD,ptr->fd,&ev);
    }
    printf("write_handler end , nread:%d, start_write_index:%d, writebytes:%d\n",
           ptr->nread,ptr->start_write_index,write_bytes);
}



int main(int argc, char ** argv) {

    signal(SIGPIPE, SIG_IGN);
    int listensock = socket(AF_INET, SOCK_STREAM, 0);
    struct sockaddr_in serv_addr, cli_addr;
    socklen_t socklen = sizeof(serv_addr);
    memset(&serv_addr, 0, socklen);
    memset(&cli_addr, 0, socklen);
    serv_addr.sin_addr.s_addr = INADDR_ANY;
    serv_addr.sin_port = htons(PORT);
    serv_addr.sin_family = AF_INET;
    int on = 1;
    setsockopt(listensock,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(int));
    if (bind(listensock, (SA *) &serv_addr, sizeof(serv_addr)) < 0) {
        perror("bind");
        return 0;
    }

    if (listen(listensock, BACKLOG) < 0) {
        perror("listen");
        return 0;
    }

    //epoll_size = 128
    int epfd = epoll_create(MAX_EVENTS);
    if (epfd < 0) {
        perror("epoll_create");
        return 0;
    }

    setnonblock(listensock,1);
    struct epoll_event ev = {0, {0}};
    ev.events = EPOLLIN ; //只有EPOLLIN
    ev_data * data = malloc(sizeof(ev_data));
    data->fd = listensock;
    data->buffer = NULL;
    ev.data.ptr = data;
    epoll_ctl(epfd,EPOLL_CTL_ADD , listensock , &ev);

    int nready = -1 , len = -1;
    struct epoll_event * evts = calloc(MAX_EVENTS, sizeof(struct epoll_event));

    while(1)
    {
        nready = epoll_wait(epfd,evts,MAX_EVENTS, -1);
        printf("\nepoll_wait return : %d\n" , nready);
        if(nready < 0){
            perror("epoll_wait");
            break;
        }

        for( int i =0 ; i < nready ; ++i){
            printf("evts[%d].events = %d\n" , i,evts[i].events);

            //出错了,该怎么样就怎么样把,自由发挥
            if( ( evts[i].events & EPOLLERR ) ||
                evts[i].events & EPOLLHUP){
                printf("i=%d, occurs err, events:%d\n",
                       i, evts[i].events);
                ev_data *  pData = (ev_data * )(evts[i].data.ptr);
                if(pData){
                    evts[i].data.ptr = NULL;
                    close(pData->fd);
                    free_event(pData);
                }
                continue;
            }
            //如果是输入事件
            if( evts[i].events & EPOLLIN ){
                if( !evts[i].data.ptr){
                    puts("ptr is empty");
                    continue;
                }
                //如果是监听套接字发生输入事件
                if( ( (ev_data * )(evts[i].data.ptr) )->fd == listensock){
                    socklen = sizeof(cli_addr);
                    int clt_fd = accept(listensock,(SA*)&cli_addr,&socklen);
                    if( clt_fd < 0){
                        puts("accept failed");
                        continue;
                    }
                    setnonblock(clt_fd,1);//nonblock

                    ev.events = EPOLLIN|EPOLLET; //  注册事件
                    ev_data * data = malloc(sizeof(ev_data));

                    //初始化每个socket关联的信息
                    data->fd = clt_fd; 
                    data->nread = 0;
                    data->buffer = malloc(BUFF_SIZE);
                    data->start_write_index = 0;
                    data->epoll_fd = epfd;
                    ev.data.ptr = data;
                    //加入epoll
                    epoll_ctl(epfd,EPOLL_CTL_ADD,clt_fd, &ev);
                    puts("\taccept success");
                }
                else{
                    // 如果是有数据来了
                    printf("\tdata is comming, u can read !\n");
                    ev_data *  pData = (ev_data * )(evts[i].data.ptr);
                    if(!pData){
                        printf("read error ! ptr is empty\n");
                        continue;
                    }
                    //一个标志,如果socket 出错或对端关了就置1
                    int sock_closed = 0;

                    //缓冲区还有多少能放入的长度
                    int left = BUFF_SIZE - pData->nread;
                    //从哪开始接受
                    char * pbuff = pData->buffer + pData->nread;

                    while(left > 0 )
                    {
                        len = read(pData->fd, pbuff,left);

                        //一旦出错
                        if (len < 0){
                            if(errno == EINTR)
                                len = 0;

                            //如果socket输入缓冲区为空了
                            else if(errno == EAGAIN || errno == EWOULDBLOCK){
                                printf("read error , no more data!  len:%d\n" , pData->nread);
                                break;
                            }
                            else{
                                //发生错误
                                perror("read error , close socket . ");
                                sock_closed = 1;
                                break;
                            }
                        }
                        else if( 0 == len){
                            //说明客户端断开了
                            printf("socket : %d closed!\n" , pData->fd);
                            sock_closed = 1;
                            break;
                        }
                        pbuff += len;
                        left -= len;
                        pData->nread += len;
                    }
                    if(sock_closed){
                        close(pData->fd);
                        free_event(pData);
                        continue;
                    }
                    /*
                         把EPOLLOUT一起注册进去, 到时一旦
                        socket输出缓冲区可写就能触发,如果此时socket输出缓冲区
                        就是可写的,那么下次epoll_wait将立即返回
                    */
                    ev.events = evts[i].events | EPOLLOUT;
                    ev.data.ptr = pData;

                //EPOLL_CTL_MOD 或者 EPOLL_CTL_ADD将触发一次EPOLLOUT,如果socket可写
                    epoll_ctl(epfd,EPOLL_CTL_MOD,pData->fd,&ev);
                }
            }
            if(evts[i].events & EPOLLOUT)
            {
                printf("\t****u can write now!! !\n");
                ev_data *  pData = (ev_data * )(evts[i].data.ptr);
                write_handler(pData);
            }
        }
    }

}

第2个例子:

基本思路就是 read完就去发送一次, 如果发送成功,那最好,

但如果发送遇到EAGAIN/EWOULDBLOCK 就去加入EPOLLOUT,等待直到socket输出缓冲区

可以写,那么epoll_wait就能触发EPOLLOUT,再去写数据,

如果数据全部发送完了, 那就把EPOLLOUT关闭了;

代码大致与上面的相似, 在结构体中增加了一个事件变量,防止重复epoll_ctl 增加负担;

#include "util.h"

#define  BUFF_SIZE 1024
#define  MAX_EVENTS 1024



static int setnonblock(int fd , int nonblock){
    int flag = fcntl(fd,F_GETFL,0);
    if(nonblock)
        return fcntl(fd,F_SETFL,flag|O_NONBLOCK);
    else
        return fcntl(fd,F_SETFL,flag&~O_NONBLOCK);
}

//每个套接字关联的信息
typedef struct _ev_data
{
    int fd; //socket fd
    char * buffer; //缓冲区
    int nread; //接受字节数
    int start_write_index; //从哪里开始写
    int epoll_fd; //属于哪个epoll
    int events;  //记录事件; 就比上面的代码多增加了一个变量,防止重复的epoll_ctl
} ev_data;

void free_event(ev_data * );

void free_event(ev_data * pEv){
    if(!pEv)
        return;
    if(pEv->buffer){
        free(pEv->buffer);
    }
    free(pEv);
}


//新的写函数
void write_handler2(ev_data * ptr)
{

    //判断指针, 可自由发挥
    if(!ptr || !ptr->buffer){
        close(ptr->fd);
        puts(" write_handler2 ptr is empty ");
        return;
    }

     //如果没数据,就直接返回了
    if(ptr->nread <= 0){
        printf("buffer is empty , ptr->nread:%d \n",ptr->nread);
        return;
    }
    static struct epoll_event ev ={0,{0}};


    //要发送的字节数
    int left = ptr->nread - ptr->start_write_index;

    //指针位置
    char * pbuffer = ptr->buffer + ptr->start_write_index;
    int nwriten = -1;
    int write_bytes = 0;
    printf("begin write , nread:%d, start_write_index:%d,left:%d\n",
    ptr->nread,ptr->start_write_index,left);


    while(left > 0){
        nwriten  = write(ptr->fd,pbuffer,left);

        //如果有错误
        if(nwriten <= 0){

            /*
                socket输出缓冲区满了
                那就加入EPOLLOUT事件,等epoll_wait返回通知你
            */
            if(errno < 0 && ( EWOULDBLOCK == errno || EAGAIN == errno)){

                //记录一下, 下一次要从哪里开始
                ptr->start_write_index += write_bytes;

                //这个纯粹为了防止重复的epoll_ctl,额外增加负担的,你不做判断也没事
                if(EPOLLOUT & ptr->events)
                    return;
            
                //增加EPOLLOUT事件
                ptr->events |= EPOLLOUT;
                ev.events = ptr->events;
                ev.data.ptr = ptr;
                
                //修改事件
                epoll_ctl(ptr->epoll_fd,EPOLL_CTL_MOD,ptr->fd,&ev);
                printf("socket buff is full , nread:%d,start_write_index:%d,left:%d\n",
                ptr->nread,ptr->start_write_index,left);
                return;
            }
            else{

                //如果出错了, 这里你可以自由发挥
                close(ptr->fd);
                free_event(ptr);
                perror("write error");
                return;
            }
        }
        pbuffer += nwriten;
        left -= nwriten;
        write_bytes += nwriten;
    }

    //到这里,说明该写的都写了, 重置一下读写位置
    ptr->start_write_index = ptr->nread = 0;

    //这个判断纯粹为了防止每次都去epoll_ctl,不加判断也行
    if(EPOLLOUT & ptr->events) {

        //把 EPOLLOUT 删了 ,这样就跟原来一样还是EPOLLIN|EPOLLET
        ptr->events &= ~EPOLLOUT;
        ev.events =ptr->events;
        ev.data.ptr = ptr;
        //修改一下
        epoll_ctl(ptr->epoll_fd, EPOLL_CTL_MOD, ptr->fd, &ev);
    }
}


int main(int argc, char ** argv) {

    signal(SIGPIPE, SIG_IGN);
    int listensock = socket(AF_INET, SOCK_STREAM, 0);
    struct sockaddr_in serv_addr, cli_addr;
    socklen_t socklen = sizeof(serv_addr);
    memset(&serv_addr, 0, socklen);
    memset(&cli_addr, 0, socklen);
    serv_addr.sin_addr.s_addr = INADDR_ANY;
    serv_addr.sin_port = htons(PORT);
    serv_addr.sin_family = AF_INET;
    int on = 1;
    setsockopt(listensock,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(int));
    if (bind(listensock, (SA *) &serv_addr, sizeof(serv_addr)) < 0) {
        perror("bind");
        return 0;
    }

    if (listen(listensock, BACKLOG) < 0) {
        perror("listen");
        return 0;
    }

    //epoll_create1(); 新版本的调用
    int epfd = epoll_create(MAX_EVENTS);
    if (epfd < 0) {
        perror("epoll_create");
        return 0;
    }
    setnonblock(listensock,1);
    struct epoll_event ev = {0, {0}};
    ev.events = EPOLLIN ;
    ev_data * data = malloc(sizeof(ev_data));
    data->fd = listensock;
    data->buffer = NULL;
    data->events = EPOLLIN;//新增加的
    ev.data.ptr = data;
    //add into epoll
    epoll_ctl(epfd,EPOLL_CTL_ADD , listensock , &ev);


// 以上的代码没什么可看的, 基本就这个模式了

    int nready = -1 , len = -1;
    struct epoll_event * evts = calloc(MAX_EVENTS, sizeof(struct epoll_event));
    while(1)
    {
        nready = epoll_wait(epfd,evts,MAX_EVENTS, -1);
        printf("\nepoll_wait return : %d\n" , nready);
        if(nready < 0){
            perror("epoll_wait");
            break;
        }

        for( int i =0 ; i < nready ; ++i){
            printf("evts[%d].events = %d\n" , i,evts[i].events);

            //出错了,该关了关了,该释放释放
            if( ( evts[i].events & EPOLLERR ) ||
                evts[i].events & EPOLLHUP){
                printf("i=%d, occurs err, events:%d\n",
                       i, evts[i].events);
                ev_data *  pData = (ev_data * )(evts[i].data.ptr);
                if(pData){
                    evts[i].data.ptr = NULL;
                    close(pData->fd);
                    free_event(pData);
                }
                continue;
            }
            //如果是输入事件
            if( evts[i].events & EPOLLIN ){
                if( !evts[i].data.ptr){
                    puts("ptr is empty");
                    continue;
                }
                //如果是监听套接字发生输入事件
                if( ( (ev_data * )(evts[i].data.ptr) )->fd == listensock){
                    socklen = sizeof(cli_addr);
                    int clt_fd = accept(listensock,(SA*)&cli_addr,&socklen);
                    if( clt_fd < 0){
                        puts("accept failed");
                        continue;
                    }
                    setnonblock(clt_fd,1);//nonblock
                    ev.events = EPOLLIN|EPOLLET; //  注册事件
                    ev_data * data = malloc(sizeof(ev_data));

                // 初始化套接字关联的结构体
                    data->fd = clt_fd;
                    data->nread = 0; 
                    data->buffer = malloc(BUFF_SIZE);
                    data->start_write_index = 0;
                    data->epoll_fd = epfd; 
                    data->events = EPOLLIN|EPOLLET; //此fd 注册的事件,新增加的
                    ev.data.ptr = data;
                    epoll_ctl(epfd,EPOLL_CTL_ADD,clt_fd, &ev);
                    puts("\taccept success");
                }
                else{
                    // 如果是有数据来了
                    printf("\tdata is comming, u can read !\n");
                    ev_data *  pData = (ev_data * )(evts[i].data.ptr);
                    if(!pData){
                        printf("read error ! ptr is empty\n");
                        continue;
                    }
    
                    // socket是否该关闭了
                    int sock_closed = 0;
                //缓冲区剩余大小
                    int left = BUFF_SIZE - pData->nread;
                    //指针从哪开写入
                    char * pbuff = pData->buffer + pData->nread;

                    while(left > 0 )
                    {
                        len = read(pData->fd, pbuff,left);
                        if (len < 0){
                            if(errno == EINTR)
                                len = 0;

                            //如果sock输入缓冲区满了
                            else if(errno == EAGAIN || errno == EWOULDBLOCK){
                                printf("read error , no more data!  len:%d\n" , pData->nread);
                                break;
                            }
                            else{
                            //出错了,至于什么错,自己发挥把
                                perror("read error , close socket . ");
                                sock_closed = 1;
                                break;
                            }
                        }

                        // 对端关闭了
                        else if( 0 == len){
                            printf("socket : %d closed!\n" , pData->fd);
                            sock_closed = 1;
                            break;
                        }
                        pbuff += len;
                        left -= len;
                        pData->nread += len;
                    }
                    //把该关的关了
                    if(sock_closed){
                        close(pData->fd);
                        free_event(pData);
                        continue;
                    }

                    /*
                        与上面的代码不一样的地方
                        读完直接调用写函数
                    */
                    write_handler2(pData);

                }
            }
            if(evts[i].events & EPOLLOUT)
            {

                //如果sock输出缓冲区可写了
                printf("\t****u can write now!! !\n");
                ev_data *  pData = (ev_data * )(evts[i].data.ptr);
                write_handler2(pData);
            }
        }
    }

}



版权声明:本文为dashoumeixi原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。