高性能UDP服务器的开发

  • Post author:
  • Post category:其他



转自http://blog.sina.com.cn/s/blog_4977ed470100f1x6.html


高性能UDP服务器的开发






UDP服务器的网络层开发相对与TCP服务器来说要容易和简单的多,UDP服务器的大致流程为创建一个socket然后将其绑定到完成


端口上并投递一定数量的recv操作.当有数据到来时从完成队列中取出数据发送到接收队列中即可。










测试结果如下:
















WindowsXP Professional,Intel Core Duo E4600 双核2.4G , 2G内存。同时30K个用户和该UDP服务器进行交互其CPU使用率为10%左右,内存占用7M左右。










下面详细介绍该服务器的架构及流程







下面将主要介绍UdpSer类, 该类主要用来管理UDP服务.其定义如下:



1




class DLLENTRY UdpSer

2




{


3




public:

4








UdpSer();

5








~UdpSer();

6

7









10








static void InitReource();

11

12









15








static void ReleaseReource();

16

17








//用指定本地地址和端口进行初始化

18








BOOL StartServer(const CHAR* szIp = “0.0.0.0”, INT nPort = 0);

19

20








//从数据队列的头部获取一个接收数据, pCount不为null时返回队列的长度

21








UDP_RCV_DATA* GetRcvData(DWORD* pCount);

22

23








//向对端发送数据

24








BOOL SendData(const IP_ADDR& PeerAddr, const CHAR* szData, INT nLen);

25

26









30








void CloseServer();

31

32




protected:

33








SOCKET m_hSock;

34








vector<UDP_RCV_DATA* > m_RcvDataQue;
















//接收数据队列

35








CRITICAL_SECTION m_RcvDataLock;
























//访问m_RcvDataQue的互斥锁

36








long volatile m_bThreadRun;
































//是否允许后台线程继续运行

37








BOOL m_bSerRun;












































//服务器是否正在运行

38

39








HANDLE *m_pThreads;
















//线程数组

40








HANDLE m_hCompletion;




















//完成端口句柄

41

42








void ReadCompletion(BOOL bSuccess, DWORD dwNumberOfBytesTransfere

d, LPOVERLAPPED lpOverlapped);

43

44









48








static UINT WINAPI WorkThread(LPVOID lpParam);

49




};

1. InitReource() 主要对相关的静态资源进行初始化.其实大致和TcpServer::InitReource()大致相同.在UdpSer实例使用之前必须调用该函数进行静态资源的初始化, 否则服务器无法正常使用.



2.ReleaseReource() 主要对相关静态资源进行释放.只有在应用程序结束时才能调用该函数进行静态资源的释放.

3. StartServer()


该函数的主要功能启动一个UDP服务.其大致流程为先创建服务器UDP socket, 将其绑定到完成端口上然后投递一定数量的recv操作以接收客户端的数据.其实现如下:

1




BOOL UdpSer::StartServer(const CHAR* szIp , INT nPort )

2




{


3








BOOL bRet = TRUE;

4








const int RECV_COUNT = 500;

5








WSABUF RcvBuf = { NULL, 0 };

6








DWORD dwBytes = 0;

7








DWORD dwFlag = 0;

8








INT nAddrLen = sizeof(IP_ADDR);

9








INT iErrCode = 0;

10

11








try

12








{


13












if (m_bSerRun)

14












{


15
















THROW_LINE;

16












}

17

18












m_bSerRun = TRUE;

19












m_hSock = WSASocket(AF_INET, SOCK_DGRAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);

20












if (INVALID_SOCKET == m_hSock)

21












{


22
















THROW_LINE;

23












}

24












ULONG ul = 1;

25












ioctlsocket(m_hSock, FIONBIO, &ul);

26

27












//设置为地址重用,优点在于服务器关闭后可以立即启用

28












int nOpt = 1;

29












setsockopt(m_hSock, SOL_SOCKET, SO_REUSEADDR, (char*)&nOpt, sizeof(nOpt));

30

31












//关闭系统缓存,使用自己的缓存以防止数据的复制操作

32












INT nZero = 0;

33












setsockopt(m_hSock, SOL_SOCKET, SO_SNDBUF, (char*)&nZero, sizeof(nZero));

34












setsockopt(m_hSock, SOL_SOCKET, SO_RCVBUF, (CHAR*)&nZero, sizeof(nZero));

35

36












IP_ADDR addr(szIp, nPort);

37












if (SOCKET_ERROR == bind(m_hSock, (sockaddr*)&addr, sizeof(addr)))

38












{


39
















closesocket(m_hSock);

40
















THROW_LINE;

41












}

42

43












//将SOCKET绑定到完成端口上

44












CreateIoCompletionPort((HANDLE)m_hSock, m_hCompletion, 0, 0);

45

46












//投递读操作

47












for (int nIndex = 0; nIndex < RECV_COUNT; nIndex++)

48












{


49
















UDP_CONTEXT* pRcvContext = new UDP_CONTEXT();

50
















if (pRcvContext && pRcvContext->m_pBuf)

51
















{


52




















dwFlag = 0;

53




















dwBytes = 0;

54




















nAddrLen = sizeof(IP_ADDR);

55




















RcvBuf.buf = pRcvContext->m_pBuf;

56




















RcvBuf.len = UDP_CONTEXT::S_PAGE_SIZE;

57

58




















pRcvContext->m_hSock = m_hSock;

59




















pRcvContext->m_nOperation = OP_READ;













60




















iErrCode = WSARecvFrom(pRcvContext->m_hSock, &RcvBuf, 1, &dwBytes, &dwFlag, (sockaddr*)(&pRcvContext->m_RemoteAddr)

61
























, &nAddrLen, &(pRcvContext->m_ol), NULL);

62




















if (SOCKET_ERROR == iErrCode && ERROR_IO_PENDING != WSAGetLastError())

63




















{


64
























delete pRcvContext;

65
























pRcvContext = NULL;

66




















}

67
















}

68
















else

69
















{


70




















delete pRcvContext;

71
















}

72












}

73








}

74








catch (const long &lErrLine)

75








{













76












bRet = FALSE;

77












_TRACE(“Exp : %s — %ld “, __FILE__, lErrLine);













78








}

79

80








return bRet;

81




}4. GetRcvData(), 从接收队列中取出一个数据包.

1




UDP_RCV_DATA *UdpSer::GetRcvData(DWORD* pCount)

2




{


3








UDP_RCV_DATA* pRcvData = NULL;

4

5








EnterCriticalSection(&m_RcvDataLock);

6








vector<UDP_RCV_DATA* >::iterator iterRcv = m_RcvDataQue.begin();

7








if (iterRcv != m_RcvDataQue.end())

8








{


9












pRcvData = *iterRcv;

10












m_RcvDataQue.erase(iterRcv);

11








}

12

13








if (pCount)

14








{


15












*pCount = (DWORD)(m_RcvDataQue.size());

16








}

17








LeaveCriticalSection(&m_RcvDataLock);

18

19








return pRcvData;

20




}

5. SendData() 发送指定长度的数据包.

1




BOOL UdpSer::SendData(const IP_ADDR& PeerAddr, const CHAR* szData, INT nLen)

2




{


3








BOOL bRet = TRUE;

4








try

5








{


6












if (nLen >= 1500)

7












{


8
















THROW_LINE;

9












}

10

11












UDP_CONTEXT* pSendContext = new UDP_CONTEXT();

12












if (pSendContext && pSendContext->m_pBuf)

13












{


14
















pSendContext->m_nOperation = OP_WRITE;

15
















pSendContext->m_RemoteAddr = PeerAddr;









16

17
















memcpy(pSendContext->m_pBuf, szData, nLen);

18

19
















WSABUF SendBuf = { NULL, 0 };

20
















DWORD dwBytes = 0;

21
















SendBuf.buf = pSendContext->m_pBuf;

22
















SendBuf.len = nLen;

23

24
















INT iErrCode = WSASendTo(m_hSock, &SendBuf, 1, &dwBytes, 0, (sockaddr*)&PeerAddr, sizeof(PeerAddr), &(pSendContext->m_ol), NULL);

25
















if (SOCKET_ERROR == iErrCode && ERROR_IO_PENDING != WSAGetLastError())

26
















{


27




















delete pSendContext;

28




















THROW_LINE;

29
















}

30












}

31












else

32












{


33
















delete pSendContext;

34
















THROW_LINE;

35












}

36








}

37








catch (const long &lErrLine)

38








{


39












bRet = FALSE;

40












_TRACE(“Exp : %s — %ld “, __FILE__, lErrLine);













41








}

42

43








return bRet;

44




}

6. CloseServer() 关闭服务

1




void UdpSer::CloseServer()

2




{


3








m_bSerRun = FALSE;

4








closesocket(m_hSock);

5




}

7. WorkThread() 在完成端口上工作的后台线程

1




UINT WINAPI UdpSer::WorkThread(LPVOID lpParam)

2




{


3








UdpSer *pThis = (UdpSer *)lpParam;

4








DWORD dwTrans = 0, dwKey = 0;

5








LPOVERLAPPED pOl = NULL;

6








UDP_CONTEXT *pContext = NULL;

7

8








while (TRUE)

9








{


10












BOOL bOk = GetQueuedCompletionStatu

s(pThis->m_hCompletion, &dwTrans, &dwKey, (LPOVERLAPPED *)&pOl, WSA_INFINITE);

11

12












pContext = CONTAINING_RECORD(pOl, UDP_CONTEXT, m_ol);

13












if (pContext)

14












{


15
















switch (pContext->m_nOperation)

16
















{


17
















case OP_READ:

18




















pThis->ReadCompletion(bOk, dwTrans, pOl);

19




















break;

20
















case OP_WRITE:

21




















delete pContext;

22




















pContext = NULL;

23




















break;

24
















}

25












}

26

27












if (FALSE == InterlockedExchangeAdd(&(pThis->m_bThreadRun), 0))

28












{


29
















break;

30












}

31








}

32

33








return 0;

34




}

8.ReadCompletion(), 接收操作完成后的回调函数

1




void UdpSer::ReadCompletion(BOOL bSuccess, DWORD dwNumberOfBytesTransfere

d, LPOVERLAPPED lpOverlapped)

2




{


3








UDP_CONTEXT* pRcvContext = CONTAINING_RECORD(lpOverlapped, UDP_CONTEXT, m_ol);

4








WSABUF RcvBuf = { NULL, 0 };

5








DWORD dwBytes = 0;

6








DWORD dwFlag = 0;

7








INT nAddrLen = sizeof(IP_ADDR);

8








INT iErrCode = 0;

9

10








if (TRUE == bSuccess && dwNumberOfBytesTransfere

d <= UDP_CONTEXT::S_PAGE_SIZE)

11








{


12#ifdef _XML_NET_

13












EnterCriticalSection(&m_RcvDataLock);

14

15












UDP_RCV_DATA* pRcvData = new UDP_RCV_DATA(pRcvContext->m_pBuf, dwNumberOfBytesTransfere

d, pRcvContext->m_RemoteAddr);

16












if (pRcvData && pRcvData->m_pData)

17












{


18
















m_RcvDataQue.push_back(pRcvData);

19












}





20












else

21












{


22
















delete pRcvData;

23












}

24

25












LeaveCriticalSection(&m_RcvDataLock);

26#else

27












if (dwNumberOfBytesTransfere

d >= sizeof(PACKET_HEAD))

28












{


29
















EnterCriticalSection(&m_RcvDataLock);

30

31
















UDP_RCV_DATA* pRcvData = new UDP_RCV_DATA(pRcvContext->m_pBuf, dwNumberOfBytesTransfere

d, pRcvContext->m_RemoteAddr);

32
















if (pRcvData && pRcvData->m_pData)

33
















{


34




















m_RcvDataQue.push_back(pRcvData);

35
















}





36
















else

37
















{


38




















delete pRcvData;

39
















}

40

41
















LeaveCriticalSection(&m_RcvDataLock);

42












}

43#endif

44

45












//投递下一个接收操作

46












RcvBuf.buf = pRcvContext->m_pBuf;

47












RcvBuf.len = UDP_CONTEXT::S_PAGE_SIZE;

48

49












iErrCode = WSARecvFrom(pRcvContext->m_hSock, &RcvBuf, 1, &dwBytes, &dwFlag, (sockaddr*)(&pRcvContext->m_RemoteAddr)

50
















, &nAddrLen, &(pRcvContext->m_ol), NULL);

51












if (SOCKET_ERROR == iErrCode && ERROR_IO_PENDING != WSAGetLastError())

52












{


53
















ATLTRACE(“\r\n%s — %ld dwNumberOfBytesTransfere

d = %ld, LAST_ERR = %ld”

54




















, __FILE__, __LINE__, dwNumberOfBytesTransfere

d, WSAGetLastError());

55
















delete pRcvContext;

56
















pRcvContext = NULL;

57












}

58








}

59








else

60








{


61












delete pRcvContext;

62








}