高级IO复用应用:聊天室程序

  • Post author:
  • Post category:其他


简单的聊天室程序:客户端从标准输入输入数据后发送给服务端,服务端将用户发送来的数据转发给其它用户。这里采用IO复用poll技术。客户端采用了splice零拷贝。服务端采用了空间换时间(分配超大的用户数据数组,然后通过用户连接的文件描述符即可以索引到用户数据)

客户端程序:

#define _GNU_SOURCE 1//为了支持POLLRDHUP事件
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<assert.h>
#include<stdio.h>
#include<unistd.h>
#include<string.h>
#include<stdlib.h>
#include<poll.h>
#include<fcntl.h>
#include<iostream>
#define BUFFER_SIZE 64
using namespace std;
int main(int argc,char* argv[]){
    if(argc<=2){
        cout<<"argc<=2"<<endl;
        return 1;
    }
    const char* ip=argv[1];//服务端地址
    int port=atoi(argv[2]);
    struct sockaddr_in address;
    bzero(&address,sizeof(address));
    address.sin_family=AF_INET;
    inet_pton(AF_INET,ip,&address.sin_addr);
    address.sin_port=htons(port);
    int sockfd=socket(PF_INET,SOCK_STREAM,0);
    assert(sockfd>=0);
    if(connect(sockfd,(struct sockaddr*)&address,sizeof(address))<0){
        cout<<"connect error"<<endl;
        close(sockfd);
        return 1;
    }
    pollfd fds[2];//pollfd结构体数组
    fds[0].fd=0;//fds[0]是标准输入
    fds[0].events=POLLIN;//注册可写事件
    fds[0].revents=0;
    fds[1].fd=sockfd;//fds[1]是socket描述符
    fds[1].events=POLLIN|POLLRDHUP;//注册可写和挂起事件
    fds[1].revents=0;
    char read_buf[BUFFER_SIZE];
    int pipefd[2];
    int ret=pipe(pipefd);//创建一个管道,splice函数的参数必须有个是管道描述符(实现零拷贝)
    assert(ret!=-1);
    while(1){
        ret=poll(fds,2,-1);//无限期等待注册事件发生
        if(ret<0){
            cout<<"poll error"<<endl;
            break;
        }
        if(fds[1].revents&POLLRDHUP){//若是socket描述符挂起事件则代表服务器关闭了连接
            cout<<"server close the connection"<<endl;
            break;
        }
        else if(fds[1].revents&POLLIN){//sokect描述符可读事件
            memset(read_buf,'\0',BUFFER_SIZE);
            recv(fds[1].fd,read_buf,BUFFER_SIZE-1,0);//接收服务端发送来的数据(服务端的数据是其它用户发送给它的数据)
            cout<<read_buf<<endl;
        }
        if(fds[0].revents&POLLIN){//标准输入端可写事件发生(该用户有数据输入并需要发送给服务端)
            ret=splice(0,NULL,pipefd[1],NULL,32768,SPLICE_F_MORE|SPLICE_F_MOVE);//将标准输入的数据零拷贝到管道的写端
            ret=splice(pipefd[0],NULL,sockfd,NULL,32768,SPLICE_F_MORE|SPLICE_F_MOVE);//将管道的读端数据零拷贝到socket描述符
        }
    }
    close(sockfd);
    return 0;
}

服务端程序:

#define _GNU_SOURCE 1//支持POLLRDHUP事件
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<assert.h>
#include<stdio.h>
#include<unistd.h>
#include<errno.h>
#include<string.h>
#include<fcntl.h>
#include<stdlib.h>
#include<poll.h>
#include<iostream>
#define USER_LIMIT 5//最大用户数,限制用户是为了提高poll性能
#define BUFFER_SIZE 64//缓冲区大小
#define FD_LIMIT 65535//文件描述符限制
using namespace std;
struct client_data{//客户数据:客户端socket地址、待写到客户端的数据位置、从客户端读入的数据。用于服务端与客户端高速交互
    sockaddr_in address;
    char* write_buf;
    char buf[BUFFER_SIZE];
};
int setnonblocking(int fd){//设置文件描述符为非阻塞
    int old_option=fcntl(fd,F_GETFL);
    int new_option=old_option|O_NONBLOCK;
    fcntl(fd,F_SETFL,new_option);
    return old_option;
}
int main(int argc,char* argv[]){
    if(argc<=2){
        cout<<"argc<=2"<<endl;
        return 1;
    }
    const char* ip=argv[1];
    int port=atoi(argv[2]);
    int ret=0;
    struct sockaddr_in address;//服务器地址
    bzero(&address,sizeof(address));
    address.sin_family=AF_INET;
    inet_pton(AF_INET,ip,&address.sin_addr);
    address.sin_port=htons(port);
    int listenfd=socket(PF_INET,SOCK_STREAM,0);
    assert(listenfd>=0);
    ret=bind(listenfd,(struct sockaddr*)&address,sizeof(address));
    assert(ret!=-1);
    ret=listen(listenfd,5);
    assert(listenfd!=-1);
    client_data* users=new client_data[FD_LIMIT];//分配一个超大的用户数据数组,当用户与服务器连接产生的连接描述符可以作为数组下标迅速索引到用户数据
    pollfd fds[USER_LIMIT+1];//pollfd结构体数组,每个用户注册一组poll事件
    int user_count=0;
    for(int i=0;i<=USER_LIMIT;i++){//初始化poll事件
        fds[i].fd=-1;//不可能的描述符
        fds[i].events=0;
    }
    fds[0].fd=listenfd;
    fds[0].events=POLLIN|POLLERR;//监听端口注册可读和错误事件
    fds[0].revents=0;
    while(1){
        ret=poll(fds,user_count+1,-1);//无限期等待注册事件就绪
        if(ret<0){
            cout<<"poll error"<<endl;
            break;
        }
        for(int i=0;i<user_count+1;i++){//这是poll的特征,遍历全部注册文件描述符(+1是由于多了监听描述符的缘故)
            if((fds[i].fd==listenfd)&&(fds[i].revents&POLLIN)){//监听端口若可读事件发生说明有新用户请求连接
                struct sockaddr_in client_address;
                socklen_t client_addrlength=sizeof(client_address);
                int connfd=accept(listenfd,(struct sockaddr*)&client_address,&client_addrlength);//新用户的连接
                if(connfd<0){
                    cout<<"accept error "<<strerror(errno)<<endl;
                    continue;
                }
                if(user_count>=USER_LIMIT){//已连接的用户数大于最大用户数,则不允许连接
                    const char* info="too many users,you can't connect\n";
                    cout<<info<<endl;
                    send(connfd,info,strlen(info),0);
                    close(connfd);
                    continue;
                }
                user_count++;
                users[connfd].address=client_address;//新用户的数据更新
                setnonblocking(connfd);
                fds[user_count].fd=connfd;
                fds[user_count].events=POLLIN|POLLRDHUP|POLLERR;
                fds[user_count].revents=0;
                cout<<"a new user come id:"<<user_count<<endl;
            }
            else if(fds[i].revents&POLLERR){//用户连接出错
                cout<<"poll error in:"<<fds[i].fd<<endl;
                char errors[100];
                memset(errors,'\0',100);
                socklen_t length=sizeof(errors);
                if(getsockopt(fds[i].fd,SOL_SOCKET,SO_ERROR,&errors,&length)<0){
                    cout<<"get socket option error"<<endl;
                }
                continue;
            }
            else if(fds[i].revents&POLLRDHUP){//用户连接挂起则断开连接
                users[fds[i].fd]=users[fds[user_count].fd];
                close(fds[i].fd);
                fds[i]=fds[user_count];
                i--;
                user_count--;
                cout<<"a user leave"<<endl;
            }
            else if(fds[i].revents&POLLIN){//用户连接可读事件发生,表示用户有数据发送到来
                int connfd=fds[i].fd;
                memset(users[connfd].buf,'\0',BUFFER_SIZE-1);
                ret=recv(connfd,users[connfd].buf,BUFFER_SIZE-1,0);
                cout<<"get data:"<<users[connfd].buf<<"  from user:"<<connfd<<" bytes:"<<ret<<endl;
                if(ret<0){
                    if(errno!=EAGAIN){//若是EAGAIN不是网络错误,非阻塞情形下可能是设备不可用,这里EAGAIN表示数据读取完毕可以进行期待下次事件发生
                        close(connfd);//否则断开用户连接
                        users[fds[i].fd]=users[fds[user_count].fd];
                        fds[i]=fds[user_count];
                        i--;
                        user_count--;
                    }
                }
                else if(ret==0){}//这里的原因是由于下面###1###处强行将其它用户连接事件置为POLLIN但是若其它用户连接其实没有数据可读的情形
                else{
                    for(int j=1;j<=user_count;j++){//除去该用户外的其它用户事件置为POLLOUT
                        if(fds[j].fd==connfd){
                            continue;
                        }
                        fds[j].events|=~POLLIN;
                        fds[j].events|=POLLOUT;
                        users[fds[j].fd].write_buf=users[connfd].buf;//共享缓冲区数据
                    }
                }
            }
            else if(fds[i].revents&POLLOUT){//被置为POLLOUT事件后意味着有某个用户的数据需要发送给当前用户
                int connfd=fds[i].fd;
                if(!users[connfd].write_buf){
                    continue;
                }
                ret=send(connfd,users[connfd].write_buf,strlen(users[connfd].write_buf),0);
                users[connfd].write_buf=NULL;//恢复当前用户数据
                fds[i].events|=~POLLOUT;
                fds[i].events|=POLLIN;//###1###
            }
        }
    }
    delete[] users;
    close(listenfd);
    return 0;
}//这里会出现一个问题就是若某个用户A频繁发送消息给服务端服务端,再转发给其它用户,此期间其它用户也有话要说,则服务端可读事件全被A用户占领了,其它用户全是POLLOUT事件



版权声明:本文为liuxuejiang158原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。