转自http://blog.sina.com.cn/s/blog_4977ed470100f1x6.html
高性能UDP服务器的开发
UDP服务器的网络层开发相对与TCP服务器来说要容易和简单的多,UDP服务器的大致流程为创建一个socket然后将其绑定到完成
端口上并投递一定数量的recv操作.当有数据到来时从完成队列中取出数据发送到接收队列中即可。
测试结果如下:
WindowsXP Professional,Intel Core Duo E4600 双核2.4G , 2G内存。同时30K个用户和该UDP服务器进行交互其CPU使用率为10%左右,内存占用7M左右。
下面详细介绍该服务器的架构及流程
:
下面将主要介绍UdpSer类, 该类主要用来管理UDP服务.其定义如下:
1
class DLLENTRY UdpSer
2
{
3
public:
4
UdpSer();
5
~UdpSer();
6
7
10
static void InitReource();
11
12
15
static void ReleaseReource();
16
17
//用指定本地地址和端口进行初始化
18
BOOL StartServer(const CHAR* szIp = “0.0.0.0”, INT nPort = 0);
19
20
//从数据队列的头部获取一个接收数据, pCount不为null时返回队列的长度
21
UDP_RCV_DATA* GetRcvData(DWORD* pCount);
22
23
//向对端发送数据
24
BOOL SendData(const IP_ADDR& PeerAddr, const CHAR* szData, INT nLen);
25
26
30
void CloseServer();
31
32
protected:
33
SOCKET m_hSock;
34
vector<UDP_RCV_DATA* > m_RcvDataQue;
//接收数据队列
35
CRITICAL_SECTION m_RcvDataLock;
//访问m_RcvDataQue的互斥锁
36
long volatile m_bThreadRun;
//是否允许后台线程继续运行
37
BOOL m_bSerRun;
//服务器是否正在运行
38
39
HANDLE *m_pThreads;
//线程数组
40
HANDLE m_hCompletion;
//完成端口句柄
41
42
void ReadCompletion(BOOL bSuccess, DWORD dwNumberOfBytesTransfere
d, LPOVERLAPPED lpOverlapped);
43
44
48
static UINT WINAPI WorkThread(LPVOID lpParam);
49
};
1. InitReource() 主要对相关的静态资源进行初始化.其实大致和TcpServer::InitReource()大致相同.在UdpSer实例使用之前必须调用该函数进行静态资源的初始化, 否则服务器无法正常使用.
2.ReleaseReource() 主要对相关静态资源进行释放.只有在应用程序结束时才能调用该函数进行静态资源的释放.
3. StartServer()
该函数的主要功能启动一个UDP服务.其大致流程为先创建服务器UDP socket, 将其绑定到完成端口上然后投递一定数量的recv操作以接收客户端的数据.其实现如下:
1
BOOL UdpSer::StartServer(const CHAR* szIp , INT nPort )
2
{
3
BOOL bRet = TRUE;
4
const int RECV_COUNT = 500;
5
WSABUF RcvBuf = { NULL, 0 };
6
DWORD dwBytes = 0;
7
DWORD dwFlag = 0;
8
INT nAddrLen = sizeof(IP_ADDR);
9
INT iErrCode = 0;
10
11
try
12
{
13
if (m_bSerRun)
14
{
15
THROW_LINE;
16
}
17
18
m_bSerRun = TRUE;
19
m_hSock = WSASocket(AF_INET, SOCK_DGRAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
20
if (INVALID_SOCKET == m_hSock)
21
{
22
THROW_LINE;
23
}
24
ULONG ul = 1;
25
ioctlsocket(m_hSock, FIONBIO, &ul);
26
27
//设置为地址重用,优点在于服务器关闭后可以立即启用
28
int nOpt = 1;
29
setsockopt(m_hSock, SOL_SOCKET, SO_REUSEADDR, (char*)&nOpt, sizeof(nOpt));
30
31
//关闭系统缓存,使用自己的缓存以防止数据的复制操作
32
INT nZero = 0;
33
setsockopt(m_hSock, SOL_SOCKET, SO_SNDBUF, (char*)&nZero, sizeof(nZero));
34
setsockopt(m_hSock, SOL_SOCKET, SO_RCVBUF, (CHAR*)&nZero, sizeof(nZero));
35
36
IP_ADDR addr(szIp, nPort);
37
if (SOCKET_ERROR == bind(m_hSock, (sockaddr*)&addr, sizeof(addr)))
38
{
39
closesocket(m_hSock);
40
THROW_LINE;
41
}
42
43
//将SOCKET绑定到完成端口上
44
CreateIoCompletionPort((HANDLE)m_hSock, m_hCompletion, 0, 0);
45
46
//投递读操作
47
for (int nIndex = 0; nIndex < RECV_COUNT; nIndex++)
48
{
49
UDP_CONTEXT* pRcvContext = new UDP_CONTEXT();
50
if (pRcvContext && pRcvContext->m_pBuf)
51
{
52
dwFlag = 0;
53
dwBytes = 0;
54
nAddrLen = sizeof(IP_ADDR);
55
RcvBuf.buf = pRcvContext->m_pBuf;
56
RcvBuf.len = UDP_CONTEXT::S_PAGE_SIZE;
57
58
pRcvContext->m_hSock = m_hSock;
59
pRcvContext->m_nOperation = OP_READ;
60
iErrCode = WSARecvFrom(pRcvContext->m_hSock, &RcvBuf, 1, &dwBytes, &dwFlag, (sockaddr*)(&pRcvContext->m_RemoteAddr)
61
, &nAddrLen, &(pRcvContext->m_ol), NULL);
62
if (SOCKET_ERROR == iErrCode && ERROR_IO_PENDING != WSAGetLastError())
63
{
64
delete pRcvContext;
65
pRcvContext = NULL;
66
}
67
}
68
else
69
{
70
delete pRcvContext;
71
}
72
}
73
}
74
catch (const long &lErrLine)
75
{
76
bRet = FALSE;
77
_TRACE(“Exp : %s — %ld “, __FILE__, lErrLine);
78
}
79
80
return bRet;
81
}4. GetRcvData(), 从接收队列中取出一个数据包.
1
UDP_RCV_DATA *UdpSer::GetRcvData(DWORD* pCount)
2
{
3
UDP_RCV_DATA* pRcvData = NULL;
4
5
EnterCriticalSection(&m_RcvDataLock);
6
vector<UDP_RCV_DATA* >::iterator iterRcv = m_RcvDataQue.begin();
7
if (iterRcv != m_RcvDataQue.end())
8
{
9
pRcvData = *iterRcv;
10
m_RcvDataQue.erase(iterRcv);
11
}
12
13
if (pCount)
14
{
15
*pCount = (DWORD)(m_RcvDataQue.size());
16
}
17
LeaveCriticalSection(&m_RcvDataLock);
18
19
return pRcvData;
20
}
5. SendData() 发送指定长度的数据包.
1
BOOL UdpSer::SendData(const IP_ADDR& PeerAddr, const CHAR* szData, INT nLen)
2
{
3
BOOL bRet = TRUE;
4
try
5
{
6
if (nLen >= 1500)
7
{
8
THROW_LINE;
9
}
10
11
UDP_CONTEXT* pSendContext = new UDP_CONTEXT();
12
if (pSendContext && pSendContext->m_pBuf)
13
{
14
pSendContext->m_nOperation = OP_WRITE;
15
pSendContext->m_RemoteAddr = PeerAddr;
16
17
memcpy(pSendContext->m_pBuf, szData, nLen);
18
19
WSABUF SendBuf = { NULL, 0 };
20
DWORD dwBytes = 0;
21
SendBuf.buf = pSendContext->m_pBuf;
22
SendBuf.len = nLen;
23
24
INT iErrCode = WSASendTo(m_hSock, &SendBuf, 1, &dwBytes, 0, (sockaddr*)&PeerAddr, sizeof(PeerAddr), &(pSendContext->m_ol), NULL);
25
if (SOCKET_ERROR == iErrCode && ERROR_IO_PENDING != WSAGetLastError())
26
{
27
delete pSendContext;
28
THROW_LINE;
29
}
30
}
31
else
32
{
33
delete pSendContext;
34
THROW_LINE;
35
}
36
}
37
catch (const long &lErrLine)
38
{
39
bRet = FALSE;
40
_TRACE(“Exp : %s — %ld “, __FILE__, lErrLine);
41
}
42
43
return bRet;
44
}
6. CloseServer() 关闭服务
1
void UdpSer::CloseServer()
2
{
3
m_bSerRun = FALSE;
4
closesocket(m_hSock);
5
}
7. WorkThread() 在完成端口上工作的后台线程
1
UINT WINAPI UdpSer::WorkThread(LPVOID lpParam)
2
{
3
UdpSer *pThis = (UdpSer *)lpParam;
4
DWORD dwTrans = 0, dwKey = 0;
5
LPOVERLAPPED pOl = NULL;
6
UDP_CONTEXT *pContext = NULL;
7
8
while (TRUE)
9
{
10
BOOL bOk = GetQueuedCompletionStatu
s(pThis->m_hCompletion, &dwTrans, &dwKey, (LPOVERLAPPED *)&pOl, WSA_INFINITE);
11
12
pContext = CONTAINING_RECORD(pOl, UDP_CONTEXT, m_ol);
13
if (pContext)
14
{
15
switch (pContext->m_nOperation)
16
{
17
case OP_READ:
18
pThis->ReadCompletion(bOk, dwTrans, pOl);
19
break;
20
case OP_WRITE:
21
delete pContext;
22
pContext = NULL;
23
break;
24
}
25
}
26
27
if (FALSE == InterlockedExchangeAdd(&(pThis->m_bThreadRun), 0))
28
{
29
break;
30
}
31
}
32
33
return 0;
34
}
8.ReadCompletion(), 接收操作完成后的回调函数
1
void UdpSer::ReadCompletion(BOOL bSuccess, DWORD dwNumberOfBytesTransfere
d, LPOVERLAPPED lpOverlapped)
2
{
3
UDP_CONTEXT* pRcvContext = CONTAINING_RECORD(lpOverlapped, UDP_CONTEXT, m_ol);
4
WSABUF RcvBuf = { NULL, 0 };
5
DWORD dwBytes = 0;
6
DWORD dwFlag = 0;
7
INT nAddrLen = sizeof(IP_ADDR);
8
INT iErrCode = 0;
9
10
if (TRUE == bSuccess && dwNumberOfBytesTransfere
d <= UDP_CONTEXT::S_PAGE_SIZE)
11
{
12#ifdef _XML_NET_
13
EnterCriticalSection(&m_RcvDataLock);
14
15
UDP_RCV_DATA* pRcvData = new UDP_RCV_DATA(pRcvContext->m_pBuf, dwNumberOfBytesTransfere
d, pRcvContext->m_RemoteAddr);
16
if (pRcvData && pRcvData->m_pData)
17
{
18
m_RcvDataQue.push_back(pRcvData);
19
}
20
else
21
{
22
delete pRcvData;
23
}
24
25
LeaveCriticalSection(&m_RcvDataLock);
26#else
27
if (dwNumberOfBytesTransfere
d >= sizeof(PACKET_HEAD))
28
{
29
EnterCriticalSection(&m_RcvDataLock);
30
31
UDP_RCV_DATA* pRcvData = new UDP_RCV_DATA(pRcvContext->m_pBuf, dwNumberOfBytesTransfere
d, pRcvContext->m_RemoteAddr);
32
if (pRcvData && pRcvData->m_pData)
33
{
34
m_RcvDataQue.push_back(pRcvData);
35
}
36
else
37
{
38
delete pRcvData;
39
}
40
41
LeaveCriticalSection(&m_RcvDataLock);
42
}
43#endif
44
45
//投递下一个接收操作
46
RcvBuf.buf = pRcvContext->m_pBuf;
47
RcvBuf.len = UDP_CONTEXT::S_PAGE_SIZE;
48
49
iErrCode = WSARecvFrom(pRcvContext->m_hSock, &RcvBuf, 1, &dwBytes, &dwFlag, (sockaddr*)(&pRcvContext->m_RemoteAddr)
50
, &nAddrLen, &(pRcvContext->m_ol), NULL);
51
if (SOCKET_ERROR == iErrCode && ERROR_IO_PENDING != WSAGetLastError())
52
{
53
ATLTRACE(“\r\n%s — %ld dwNumberOfBytesTransfere
d = %ld, LAST_ERR = %ld”
54
, __FILE__, __LINE__, dwNumberOfBytesTransfere
d, WSAGetLastError());
55
delete pRcvContext;
56
pRcvContext = NULL;
57
}
58
}
59
else
60
{
61
delete pRcvContext;
62
}