linux服务器编程 9.6聊天室练习 epoll重写

  • Post author:
  • Post category:linux




《linux高性能服务器编程》 p183示例 epoll实现

示例中服务端和客户端都使用了使用poll调用做io复用,我用epoll重写服务端代码,同时尽可能的用了c++风格。。。

有些地方可能实现的很累赘,还请各位不吝赐教,这个小项目就权当是熟悉poll和epoll了。

  • 服务端代码:
/*
code for server
*/

#define _GNU_SOURCE 1

#include<iostream>
#include<string>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<assert.h>
#include<unistd.h>
#include<errno.h>
#include<cstring>
#include<fcntl.h>
#include<cstdlib>
#include<sys/epoll.h>
#include<vector>
#include<map>

const int USER_LIMIT = 5;
const int BUFFER_SIZE = 64;
const int FD_LIMIT = 65535;
const int MAX_EVENT_NUMBER = 1024;

//标识每一个连入的客户
struct client_data{
	sockaddr_in address;  //地址
	char *write_buf;  //写缓存,记录客户的输入
	char buf[BUFFER_SIZE];  //记录需要向该客户发送的数据
	int fd;  //标识文件描述符
};

int setnonblocking(int fd){
	int old_option = fcntl(fd, F_GETFL);
	int new_option = old_option | O_NONBLOCK;
	fcntl(fd, F_SETFL, new_option);
	return old_option;
}

int main()
{
	using namespace std;
	
	int ret = 0;
	struct sockaddr_in address;
	bzero(&address, sizeof(address));
	address.sin_family = AF_INET;
	address.sin_addr.s_addr = htons(INADDR_ANY);
	address.sin_port = htons(12345);

	int listenfd = socket(PF_INET, SOCK_STREAM, 0);
	assert(listenfd != -1);

	ret = bind(listenfd, reinterpret_cast<struct sockaddr*>(&address), sizeof(address));
	assert(ret != -1);

	ret = listen(listenfd, 5);
	assert(ret != -1);

	vector<struct client_data> users;
	map<int, int> fdToUser;		//为每一个接入的socket映射到唯一的客户数组索引	
	int epollFds = epoll_create(MAX_EVENT_NUMBER);
	struct epoll_event event;
	bzero(&event, sizeof(event));
	event.events = EPOLLIN | EPOLLERR;
	event.data.fd = listenfd;
	epoll_ctl(epollFds, EPOLL_CTL_ADD, listenfd, &event);
	int user_counter = 0;

	while(1)
	{
		struct epoll_event revents[1024];
		ret = epoll_wait(epollFds, revents, MAX_EVENT_NUMBER, -1);
		if(ret == -1)
		{
			cout<<"epoll fialed!"<<endl;
			return 1;
		}

		for(int i = 0; i < ret; i++)
		{
			auto evt = revents[i];
			if((evt.data.fd == listenfd) && (evt.events == EPOLLIN))
			{
				cout<<"got a new connection"<<endl;
				struct sockaddr_in client_address;
				socklen_t client_addrlength = sizeof(client_address);
				int connfd = accept(listenfd, 
									reinterpret_cast<struct sockaddr*>(&client_address), 
									&client_addrlength);
				if(connfd < 0)
				{
					cout<<"got erroes as "<<errno<<endl;
					continue;
				}
				if(user_counter > USER_LIMIT)
				{
					string info = "too many users!";
					cout<<info<<endl;
					send(connfd, info.c_str(), info.size(), 0);
					close(connfd);
					continue;
				}

				user_counter++;
				client_data data;
				data.address = client_address;
				data.fd = connfd;
				users.push_back(data);
				fdToUser[connfd] = user_counter - 1;
				//users[connfd].address = client_address;
				setnonblocking(connfd);
				struct epoll_event event;
				event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR;
				event.data.fd = connfd;
				epoll_ctl(epollFds, EPOLL_CTL_ADD, connfd, &event);
				cout<<"comes a new user, now we got "<<user_counter<<"users!"<<endl;
			}	
			else if(evt.events & EPOLLERR)
			{
				cout<<"get an error from "<<evt.data.fd<<endl;
				char errors[100];
				memset(errors, '\0', 100);
				socklen_t length = sizeof(errors);
				if(getsockopt(evt.data.fd, SOL_SOCKET, SO_ERROR, &errors, &length) < 0)
					cout<<"get socket options failed!"<<endl;
				continue;
			}
			else if(evt.events & EPOLLRDHUP)
			{
				users.erase(users.begin() + fdToUser[evt.data.fd]);
				struct epoll_event event;
				epoll_ctl(epollFds, EPOLL_CTL_DEL, evt.data.fd, &event);
				close(evt.data.fd);
				user_counter--;
				cout<<"a client left!"<<endl;
			}
			else if(evt.events & EPOLLIN)
			{
				int connfd = evt.data.fd;
				memset(users[fdToUser[connfd]].buf, '\0', BUFFER_SIZE);
				ret = recv(connfd, users[fdToUser[connfd]].buf, BUFFER_SIZE - 1, 0);
				cout<<"get "<<ret<<" bytes from client "<<connfd<<endl;
				if(ret < 0)
				{
					if(errno != EAGAIN)
					{
						close(connfd);
						users.erase(users.begin() + fdToUser[connfd]);
						user_counter--;
					}
				}
				else if(ret == 0){}
				else
				{
					for(int j = 0; j < user_counter; j++)
					{
						if(connfd == users[j].fd)
							continue;
						struct epoll_event event;
						event.data.fd = users[j].fd;
						event.events = EPOLLOUT | EPOLLRDHUP | EPOLLERR;
						epoll_ctl(epollFds, EPOLL_CTL_MOD, users[j].fd, &event);
						string str = "client" 
						             + to_string(j) 
									 + " : " 
									 + users[fdToUser[connfd]].buf;
						users[j].write_buf = const_cast<char*>(str.data());
					}
				}
			}
			else if(evt.events & EPOLLOUT)
			{
				int connfd = evt.data.fd;
				if(!users[fdToUser[connfd]].write_buf)
					continue;
				ret = send(connfd, 
						   users[fdToUser[connfd]].write_buf, 
						   strlen(users[fdToUser[connfd]].write_buf),
						   0);
				users[fdToUser[connfd]].write_buf = NULL;

				struct epoll_event event;
				event.data.fd = connfd;
				event.events = EPOLLIN | EPOLLRDHUP | EPOLLERR;	
				epoll_ctl(epollFds, EPOLL_CTL_MOD, connfd, &event);
			}
		}
	}
	close(listenfd);
	return 0;
}

  • 客户端代码:
/*
code for client
*/

#define _GNU_SOURCE 1

#include<iostream>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>
#include<arpa/inet.h>
#include<assert.h>
#include<stdio.h>
#include<unistd.h>
#include<string.h>
#include<stdlib.h>
#include<poll.h>
#include<fcntl.h>

using namespace std;

const int BUFFER_SIZE = 64;

int main(int argc, char* argv[]){
	if(argc <= 2)
	{
		cout<<"lack of parameters!"<<endl;
		return 1;
	}
	const char* ip = argv[1];
	int port = atoi(argv[2]);
	
	struct sockaddr_in server_address;
	bzero(&server_address, sizeof(server_address));
	server_address.sin_family = AF_INET;
	inet_pton(AF_INET, ip, &server_address.sin_addr);
	server_address.sin_port = htons(port);

	int sockfd = socket(PF_INET, SOCK_STREAM, 0);
	assert(sockfd >= 0);
	if(connect(sockfd, (struct sockaddr*)&server_address, sizeof(server_address)) < 0)
	{
		cout<<"connection failed!"<<endl;
		close(sockfd);
		return 1;
	}
	cout<<"connected!"<<endl;

	pollfd fds[2];
	fds[0].fd = 0;
	fds[0].events = POLLIN;
	fds[0].revents = 0;
	fds[1].fd = sockfd;
	fds[1].events = POLLIN | POLLRDHUP;
	fds[1].revents = 0;

	char read_buf[BUFFER_SIZE];
	int pipefd[2];
	int ret = pipe(pipefd);
	assert(ret != -1);

	while(1)
	{
		ret = poll(fds, 2, -1);
		if(ret < 0)
		{
			cout<<"poll failed!"<<endl;;
			break;
		}
		else if(fds[1].revents & POLLRDHUP)
		{
			cout<<"server close connection!"<<endl;;
			break;
		}
		else if(fds[1].revents & POLLIN)
		{
			memset(read_buf, '\0', BUFFER_SIZE - 1);
			recv(fds[1].fd, read_buf, BUFFER_SIZE - 1, 0);
			cout<<"\n"<<read_buf<<endl;
		}
		if(fds[0].revents & POLLIN)
		{
			ret = splice(0, NULL, pipefd[1], NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE);
			ret = splice(pipefd[0], NULL, sockfd, NULL, 32768, SPLICE_F_MORE | SPLICE_F_MOVE);
		}
	}

	close(sockfd);
	return 0;
}



版权声明:本文为weixin_44826484原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。