下面的代码都在ET下工作
群里小伙伴没搞懂EPOLLOUT 再写2个例子; 2个例子都是回声服务器代码;
关于EPOLLET的基础 , 以及EPOLLIN|EPOLLOUT|EPOLLET 一起注册的例子:
EPOLLET简单例子
EPOLLOUT 的说明:
LT模式下:
与select 一致. 只要可写就一直触发
在ET模式下:
socket 一般情况下需要非阻塞的, 与EAGAIN 这个错误 配合使用.
如果是默认的阻塞socket, 不是不可以,而是你需要做额外的事, 每一次的read/write , 你都需要去getsockopt( sock,SO_ERROR,…)
直到获取 EAGAIN 算是正常读/写. 否则, 你完蛋了. 这个socket将不再收到 epoll 的触发条件了
对于EPOLLIN : 如果状态改变了[ 比如 从无到有],那么只要输入缓冲区可读就会触发
对于EPOLLOUT: 如果状态改变了[比如 从满到不满],只要输出缓冲区可写就会触发;
EPOLLOUT那么具体什么时间点触发呢
1. EPOLL_CTL_ADD或EPOLL_CTL_MOD 时 , 如果输出缓冲区状态改变了
2. 不论注册方式是 EPOLLIN | EPOLLOUT | EPOLLET 还是 EPOLLOUT | EPOLLET 只要含EPOLLOUT
只要状态改变就能触发.
状态改变到底是什么? 简单理解:
EPOLLOUT: 满->不满
EPOLLIN : 空->不空
有没有发现 , 都跟边缘有关
3.对于 send / write 需要依靠 EPOLLOUT 触发才调用吗 ? 什么时候需要 注册上 EPOLLOUT ?
不需要. 如果要 send / write 那么就直接调用, 如果返回值 > 0 , 证明数据已经复制到发送缓冲区中.一切正常.
如果 send / write 返回 < 0 且 errno == EAGAIN . 此时说明发送缓冲区满了. 那么需要把 剩余的字节保存起来,
然后注册上 EPOLLOUT , 直到epoll_wait 返回 , 说明发送缓冲区可写, 再把 之前保存起来的数据 发送,
如果此时 write 返回 > 0 那就把EPOLLOUT 取消掉.
简单来说 : 1. 直接发送 2. 看返回值, 没发送完才挂上EPOLLOUT 3. 发送完就把EPOLLOUT 取消掉
下面2个例子都是关于EPOLLOUT,
第一个例子利用EPOLL_CTL_MOD来触发EPOLLOUT,这种方式不太好,需要利用一次系统调用epoll_wait来触发;
第2个例子在读完时, 手动调用一次发送, 如果发送遇到 EAGAIN / EWOULDBLOCK,才把EPOLLOUT注册上去,等待
EPOLLOUT事件发生再去发送, 如果不需要发送则把EPOLLOUT关闭;
另外有关于下面例子中 util.h 头文件就包含了一些常用头文件,所以就不贴了;
再次注意, 我没有对监听socket设置EPOLLET , 如果你设置了那么需要 while(1) accept().. if(errno==EAGAIN)break;
否则会漏掉客户端连接的.
另外注意不论是write, send ..等这些个函数,都是把数据复制到socket输出缓冲区中 .并不是发送到对端的意思;
还有close(fd) 会附带着 epoll_ctl ( … , EPOLL_CTL_DEL, …); 自动从epoll中清除
第一个关于EPOLLOUT的例子:
这个例子使用 EPOLL_CTL_MOD 来主动触发EPOLLOUT事件 ;
当然这种方式需要让下一次epoll_wait返回一次来触发,不是特别好;
[ 当 EPOLL_CTL_ADD或EPOLL_CTL_MOD 时 , 如果socket输出缓冲区没满就能触发一次. ]
下面这个例子唯一需要注意的就 3行代码:
//把EPOLLOUT一起注册上去
ev.events = evts[i].events | EPOLLOUT;
ev.data.ptr = pData;
//利用EPOLL_CTL_MOD将触发EPOLLOUT的特性
epoll_ctl(epfd,EPOLL_CTL_MOD,pData->fd,&ev);
#include "util.h"
#define BUFF_SIZE 1024
#define MAX_EVENTS 1024
static int setnonblock(int fd , int nonblock){
int flag = fcntl(fd,F_GETFL,0);
if(nonblock)
return fcntl(fd,F_SETFL,flag|O_NONBLOCK);
else
return fcntl(fd,F_SETFL,flag&~O_NONBLOCK);
}
//每个套接字关联的信息
typedef struct _ev_data
{
int fd; //socket
char * buffer; //缓冲区
int nread; //读入字节长度
int start_write_index; //记录下一次写的位置
int epoll_fd; // 属于哪个epoll
} ev_data;
void free_event(ev_data * );
void free_event(ev_data * pEv){ //释放内存用的
if(!pEv)
return;
if(pEv->buffer){
free(pEv->buffer);
}
free(pEv);
}
//输出函数
void write_handler(ev_data *ptr)
{
static struct epoll_event ev ={0,{0}};
if(!ptr || !ptr->buffer){
printf("write_handler error , ptr is empty!\n");
return;
}
//需要写入socket输出缓冲区的长度
int left = ptr->nread - ptr->start_write_index;
//从哪里开始写
char *pbuffer = ptr->buffer + ptr->start_write_index;
//返回值
int nwriten = -1;
// 总共写入的字节数
int write_bytes = 0;
//sock输出缓冲区是否满了
int full = 0;
printf("write_handler begin write , nread:%d, start_write_index:%d, left:%d\n",
ptr->nread,ptr->start_write_index,left);
while(left > 0){
nwriten = write(ptr->fd,pbuffer,left);
if(nwriten <= 0){
//如果满了, 就不写了, 此时已经注册了EPOLLOUT事件,因此等待下一次被触发
if(nwriten < 0 && (errno == EAGAIN || EWOULDBLOCK == errno)){
full = 1;
puts("sock sendbuff is full");
break;
}
else{
// 输出有问题了, 具体你可以自由发挥
close(ptr->fd);
free_event(ptr);
perror("write error");
return;
}
}
pbuffer += nwriten;
left -= nwriten;
write_bytes += nwriten;
}
//如果满了就记录一下,下一次要从哪里开始写
if(full){
ptr->start_write_index = write_bytes;
}
//到这,说明该写的都写了
else if( 0 == left){
//重置一下 读写的位置
ptr->start_write_index = ptr->nread = 0;
//修改事件
ev.events = EPOLLIN | EPOLLET;
ev.data.ptr = ptr;
epoll_ctl(ptr->epoll_fd,EPOLL_CTL_MOD,ptr->fd,&ev);
}
printf("write_handler end , nread:%d, start_write_index:%d, writebytes:%d\n",
ptr->nread,ptr->start_write_index,write_bytes);
}
int main(int argc, char ** argv) {
signal(SIGPIPE, SIG_IGN);
int listensock = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in serv_addr, cli_addr;
socklen_t socklen = sizeof(serv_addr);
memset(&serv_addr, 0, socklen);
memset(&cli_addr, 0, socklen);
serv_addr.sin_addr.s_addr = INADDR_ANY;
serv_addr.sin_port = htons(PORT);
serv_addr.sin_family = AF_INET;
int on = 1;
setsockopt(listensock,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(int));
if (bind(listensock, (SA *) &serv_addr, sizeof(serv_addr)) < 0) {
perror("bind");
return 0;
}
if (listen(listensock, BACKLOG) < 0) {
perror("listen");
return 0;
}
//epoll_size = 128
int epfd = epoll_create(MAX_EVENTS);
if (epfd < 0) {
perror("epoll_create");
return 0;
}
setnonblock(listensock,1);
struct epoll_event ev = {0, {0}};
ev.events = EPOLLIN ; //只有EPOLLIN
ev_data * data = malloc(sizeof(ev_data));
data->fd = listensock;
data->buffer = NULL;
ev.data.ptr = data;
epoll_ctl(epfd,EPOLL_CTL_ADD , listensock , &ev);
int nready = -1 , len = -1;
struct epoll_event * evts = calloc(MAX_EVENTS, sizeof(struct epoll_event));
while(1)
{
nready = epoll_wait(epfd,evts,MAX_EVENTS, -1);
printf("\nepoll_wait return : %d\n" , nready);
if(nready < 0){
perror("epoll_wait");
break;
}
for( int i =0 ; i < nready ; ++i){
printf("evts[%d].events = %d\n" , i,evts[i].events);
//出错了,该怎么样就怎么样把,自由发挥
if( ( evts[i].events & EPOLLERR ) ||
evts[i].events & EPOLLHUP){
printf("i=%d, occurs err, events:%d\n",
i, evts[i].events);
ev_data * pData = (ev_data * )(evts[i].data.ptr);
if(pData){
evts[i].data.ptr = NULL;
close(pData->fd);
free_event(pData);
}
continue;
}
//如果是输入事件
if( evts[i].events & EPOLLIN ){
if( !evts[i].data.ptr){
puts("ptr is empty");
continue;
}
//如果是监听套接字发生输入事件
if( ( (ev_data * )(evts[i].data.ptr) )->fd == listensock){
socklen = sizeof(cli_addr);
int clt_fd = accept(listensock,(SA*)&cli_addr,&socklen);
if( clt_fd < 0){
puts("accept failed");
continue;
}
setnonblock(clt_fd,1);//nonblock
ev.events = EPOLLIN|EPOLLET; // 注册事件
ev_data * data = malloc(sizeof(ev_data));
//初始化每个socket关联的信息
data->fd = clt_fd;
data->nread = 0;
data->buffer = malloc(BUFF_SIZE);
data->start_write_index = 0;
data->epoll_fd = epfd;
ev.data.ptr = data;
//加入epoll
epoll_ctl(epfd,EPOLL_CTL_ADD,clt_fd, &ev);
puts("\taccept success");
}
else{
// 如果是有数据来了
printf("\tdata is comming, u can read !\n");
ev_data * pData = (ev_data * )(evts[i].data.ptr);
if(!pData){
printf("read error ! ptr is empty\n");
continue;
}
//一个标志,如果socket 出错或对端关了就置1
int sock_closed = 0;
//缓冲区还有多少能放入的长度
int left = BUFF_SIZE - pData->nread;
//从哪开始接受
char * pbuff = pData->buffer + pData->nread;
while(left > 0 )
{
len = read(pData->fd, pbuff,left);
//一旦出错
if (len < 0){
if(errno == EINTR)
len = 0;
//如果socket输入缓冲区为空了
else if(errno == EAGAIN || errno == EWOULDBLOCK){
printf("read error , no more data! len:%d\n" , pData->nread);
break;
}
else{
//发生错误
perror("read error , close socket . ");
sock_closed = 1;
break;
}
}
else if( 0 == len){
//说明客户端断开了
printf("socket : %d closed!\n" , pData->fd);
sock_closed = 1;
break;
}
pbuff += len;
left -= len;
pData->nread += len;
}
if(sock_closed){
close(pData->fd);
free_event(pData);
continue;
}
/*
把EPOLLOUT一起注册进去, 到时一旦
socket输出缓冲区可写就能触发,如果此时socket输出缓冲区
就是可写的,那么下次epoll_wait将立即返回
*/
ev.events = evts[i].events | EPOLLOUT;
ev.data.ptr = pData;
//EPOLL_CTL_MOD 或者 EPOLL_CTL_ADD将触发一次EPOLLOUT,如果socket可写
epoll_ctl(epfd,EPOLL_CTL_MOD,pData->fd,&ev);
}
}
if(evts[i].events & EPOLLOUT)
{
printf("\t****u can write now!! !\n");
ev_data * pData = (ev_data * )(evts[i].data.ptr);
write_handler(pData);
}
}
}
}
第2个例子:
基本思路就是 read完就去发送一次, 如果发送成功,那最好,
但如果发送遇到EAGAIN/EWOULDBLOCK 就去加入EPOLLOUT,等待直到socket输出缓冲区
可以写,那么epoll_wait就能触发EPOLLOUT,再去写数据,
如果数据全部发送完了, 那就把EPOLLOUT关闭了;
代码大致与上面的相似, 在结构体中增加了一个事件变量,防止重复epoll_ctl 增加负担;
#include "util.h"
#define BUFF_SIZE 1024
#define MAX_EVENTS 1024
static int setnonblock(int fd , int nonblock){
int flag = fcntl(fd,F_GETFL,0);
if(nonblock)
return fcntl(fd,F_SETFL,flag|O_NONBLOCK);
else
return fcntl(fd,F_SETFL,flag&~O_NONBLOCK);
}
//每个套接字关联的信息
typedef struct _ev_data
{
int fd; //socket fd
char * buffer; //缓冲区
int nread; //接受字节数
int start_write_index; //从哪里开始写
int epoll_fd; //属于哪个epoll
int events; //记录事件; 就比上面的代码多增加了一个变量,防止重复的epoll_ctl
} ev_data;
void free_event(ev_data * );
void free_event(ev_data * pEv){
if(!pEv)
return;
if(pEv->buffer){
free(pEv->buffer);
}
free(pEv);
}
//新的写函数
void write_handler2(ev_data * ptr)
{
//判断指针, 可自由发挥
if(!ptr || !ptr->buffer){
close(ptr->fd);
puts(" write_handler2 ptr is empty ");
return;
}
//如果没数据,就直接返回了
if(ptr->nread <= 0){
printf("buffer is empty , ptr->nread:%d \n",ptr->nread);
return;
}
static struct epoll_event ev ={0,{0}};
//要发送的字节数
int left = ptr->nread - ptr->start_write_index;
//指针位置
char * pbuffer = ptr->buffer + ptr->start_write_index;
int nwriten = -1;
int write_bytes = 0;
printf("begin write , nread:%d, start_write_index:%d,left:%d\n",
ptr->nread,ptr->start_write_index,left);
while(left > 0){
nwriten = write(ptr->fd,pbuffer,left);
//如果有错误
if(nwriten <= 0){
/*
socket输出缓冲区满了
那就加入EPOLLOUT事件,等epoll_wait返回通知你
*/
if(errno < 0 && ( EWOULDBLOCK == errno || EAGAIN == errno)){
//记录一下, 下一次要从哪里开始
ptr->start_write_index += write_bytes;
//这个纯粹为了防止重复的epoll_ctl,额外增加负担的,你不做判断也没事
if(EPOLLOUT & ptr->events)
return;
//增加EPOLLOUT事件
ptr->events |= EPOLLOUT;
ev.events = ptr->events;
ev.data.ptr = ptr;
//修改事件
epoll_ctl(ptr->epoll_fd,EPOLL_CTL_MOD,ptr->fd,&ev);
printf("socket buff is full , nread:%d,start_write_index:%d,left:%d\n",
ptr->nread,ptr->start_write_index,left);
return;
}
else{
//如果出错了, 这里你可以自由发挥
close(ptr->fd);
free_event(ptr);
perror("write error");
return;
}
}
pbuffer += nwriten;
left -= nwriten;
write_bytes += nwriten;
}
//到这里,说明该写的都写了, 重置一下读写位置
ptr->start_write_index = ptr->nread = 0;
//这个判断纯粹为了防止每次都去epoll_ctl,不加判断也行
if(EPOLLOUT & ptr->events) {
//把 EPOLLOUT 删了 ,这样就跟原来一样还是EPOLLIN|EPOLLET
ptr->events &= ~EPOLLOUT;
ev.events =ptr->events;
ev.data.ptr = ptr;
//修改一下
epoll_ctl(ptr->epoll_fd, EPOLL_CTL_MOD, ptr->fd, &ev);
}
}
int main(int argc, char ** argv) {
signal(SIGPIPE, SIG_IGN);
int listensock = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in serv_addr, cli_addr;
socklen_t socklen = sizeof(serv_addr);
memset(&serv_addr, 0, socklen);
memset(&cli_addr, 0, socklen);
serv_addr.sin_addr.s_addr = INADDR_ANY;
serv_addr.sin_port = htons(PORT);
serv_addr.sin_family = AF_INET;
int on = 1;
setsockopt(listensock,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(int));
if (bind(listensock, (SA *) &serv_addr, sizeof(serv_addr)) < 0) {
perror("bind");
return 0;
}
if (listen(listensock, BACKLOG) < 0) {
perror("listen");
return 0;
}
//epoll_create1(); 新版本的调用
int epfd = epoll_create(MAX_EVENTS);
if (epfd < 0) {
perror("epoll_create");
return 0;
}
setnonblock(listensock,1);
struct epoll_event ev = {0, {0}};
ev.events = EPOLLIN ;
ev_data * data = malloc(sizeof(ev_data));
data->fd = listensock;
data->buffer = NULL;
data->events = EPOLLIN;//新增加的
ev.data.ptr = data;
//add into epoll
epoll_ctl(epfd,EPOLL_CTL_ADD , listensock , &ev);
// 以上的代码没什么可看的, 基本就这个模式了
int nready = -1 , len = -1;
struct epoll_event * evts = calloc(MAX_EVENTS, sizeof(struct epoll_event));
while(1)
{
nready = epoll_wait(epfd,evts,MAX_EVENTS, -1);
printf("\nepoll_wait return : %d\n" , nready);
if(nready < 0){
perror("epoll_wait");
break;
}
for( int i =0 ; i < nready ; ++i){
printf("evts[%d].events = %d\n" , i,evts[i].events);
//出错了,该关了关了,该释放释放
if( ( evts[i].events & EPOLLERR ) ||
evts[i].events & EPOLLHUP){
printf("i=%d, occurs err, events:%d\n",
i, evts[i].events);
ev_data * pData = (ev_data * )(evts[i].data.ptr);
if(pData){
evts[i].data.ptr = NULL;
close(pData->fd);
free_event(pData);
}
continue;
}
//如果是输入事件
if( evts[i].events & EPOLLIN ){
if( !evts[i].data.ptr){
puts("ptr is empty");
continue;
}
//如果是监听套接字发生输入事件
if( ( (ev_data * )(evts[i].data.ptr) )->fd == listensock){
socklen = sizeof(cli_addr);
int clt_fd = accept(listensock,(SA*)&cli_addr,&socklen);
if( clt_fd < 0){
puts("accept failed");
continue;
}
setnonblock(clt_fd,1);//nonblock
ev.events = EPOLLIN|EPOLLET; // 注册事件
ev_data * data = malloc(sizeof(ev_data));
// 初始化套接字关联的结构体
data->fd = clt_fd;
data->nread = 0;
data->buffer = malloc(BUFF_SIZE);
data->start_write_index = 0;
data->epoll_fd = epfd;
data->events = EPOLLIN|EPOLLET; //此fd 注册的事件,新增加的
ev.data.ptr = data;
epoll_ctl(epfd,EPOLL_CTL_ADD,clt_fd, &ev);
puts("\taccept success");
}
else{
// 如果是有数据来了
printf("\tdata is comming, u can read !\n");
ev_data * pData = (ev_data * )(evts[i].data.ptr);
if(!pData){
printf("read error ! ptr is empty\n");
continue;
}
// socket是否该关闭了
int sock_closed = 0;
//缓冲区剩余大小
int left = BUFF_SIZE - pData->nread;
//指针从哪开写入
char * pbuff = pData->buffer + pData->nread;
while(left > 0 )
{
len = read(pData->fd, pbuff,left);
if (len < 0){
if(errno == EINTR)
len = 0;
//如果sock输入缓冲区满了
else if(errno == EAGAIN || errno == EWOULDBLOCK){
printf("read error , no more data! len:%d\n" , pData->nread);
break;
}
else{
//出错了,至于什么错,自己发挥把
perror("read error , close socket . ");
sock_closed = 1;
break;
}
}
// 对端关闭了
else if( 0 == len){
printf("socket : %d closed!\n" , pData->fd);
sock_closed = 1;
break;
}
pbuff += len;
left -= len;
pData->nread += len;
}
//把该关的关了
if(sock_closed){
close(pData->fd);
free_event(pData);
continue;
}
/*
与上面的代码不一样的地方
读完直接调用写函数
*/
write_handler2(pData);
}
}
if(evts[i].events & EPOLLOUT)
{
//如果sock输出缓冲区可写了
printf("\t****u can write now!! !\n");
ev_data * pData = (ev_data * )(evts[i].data.ptr);
write_handler2(pData);
}
}
}
}