(1)五种IO模型:
A选择模型
B用事件通知实现的重叠I/O模型
C事件选择模型
D用完成例程实现的重叠I/O模型
E完成端口模型
A.选择(Select)模型
sClient = accept(sListen, (struct sockaddr *)&client,
&iaddrSize);
g_CliSocketArr[g_iTotalConn++] = sClient;
DWORD WINAPI WorkerThread(LPVOID lpParam)
{
int
i;
fd_set
fdread;
int
ret;
struct timeval tv = {1, 0};
char
szMessage[MSGSIZE];
while (TRUE)
{
FD_ZERO(&fdread);
for (i = 0; i < g_iTotalConn;
i++)
{
FD_SET(g_CliSocketArr, &fdread);
}
// We only care read event
ret = select(0, &fdread,
NULL, NULL, &tv);
if (ret == 0)
{
// Time expired
continue;
}
for (i = 0; i < g_iTotalConn; i++)
{
if
(FD_ISSET(g_CliSocketArr, &fdread))
{
ret = recv(g_CliSocketArr, szMessage, MSGSIZE, 0);
if (ret == 0 || (ret == SOCKET_ERROR && WSAGetLastError() ==
WSAECONNRESET))
{
closesocket(g_CliSocketArr);
if (i < g_iTotalConn - 1)
{
g_CliSocketArr[i--] = g_CliSocketArr[--g_iTotalConn];
}
}
else
{
szMessage[ret] = '\0';
send(g_CliSocketArr, szMessage, strlen(szMessage), 0);
}
}
}
}
return 0;
}
B.异步选择(WSAAsyncSelect)
WSAAsyncSelect(sListen, hwnd, WM_SOCKET, FD_ACCEPT);
case WM_SOCKET:
if
(WSAGETSELECTERROR(lParam))
{
closesocket(wParam);
break;
}
switch
(WSAGETSELECTEVENT(lParam))
{
case FD_ACCEPT:
// Accept a
connection from client
sClient =
accept(wParam, (struct sockaddr *)&client, &iAddrSize);
// Associate
client socket with FD_READ and FD_CLOSE event
WSAAsyncSelect(sClient, hwnd, WM_SOCKET, FD_READ | FD_CLOSE);
break;
case FD_READ:
ret =
recv(wParam, szMessage, MSGSIZE, 0);
if (ret == 0 ||
ret == SOCKET_ERROR && WSAGetLastError() == WSAECONNRESET)
{
closesocket(wParam);
}
else
{
szMessage[ret] = '\0';
send(wParam, szMessage, strlen(szMessage), 0);
}
break;
case FD_CLOSE:
closesocket(wParam);
break;
}
return 0
}
C.事件选择(WSAEventSelect)
// Accept a connection
sClient = accept(sListen, (struct sockaddr *)&client, &iaddrSize);
printf("Accepted client:%s:%d\n", inet_ntoa(client.sin_addr), ntohs(client.sin_port));
// Associate socket with network event
g_CliSocketArr[g_iTotalConn] = sClient;
g_CliEventArr[g_iTotalConn] = WSACreateEvent();
WSAEventSelect(g_CliSocketArr[g_iTotalConn],
g_CliEventArr[g_iTotalConn],
FD_READ | FD_CLOSE);
g_iTotalConn++;
while (TRUE)
{
ret =
WSAWaitForMultipleEvents(g_iTotalConn, g_CliEventArr, FALSE, 1000, FALSE);
if (ret == WSA_WAIT_FAILED ||
ret == WSA_WAIT_TIMEOUT)
{
continue;
}
index = ret -
WSA_WAIT_EVENT_0;
WSAEnumNetworkEvents(g_CliSocketArr[index], g_CliEventArr[index],
&NetworkEvents);
if
(NetworkEvents.lNetworkEvents & FD_READ)
{
// Receive
message from client
ret =
recv(g_CliSocketArr[index], szMessage, MSGSIZE, 0);
if (ret == 0 ||
(ret == SOCKET_ERROR && WSAGetLastError() == WSAECONNRESET))
{
Cleanup(index);
}
else
{
szMessage[ret] = '\0';
send(g_CliSocketArr[index], szMessage, strlen(szMessage), 0);
}
}
if (NetworkEvents.lNetworkEvents &
FD_CLOSE)
{
Cleanup(index);
}
}
D.重叠I/O(Overlapped I/O)
a) 事件方式
while (TRUE)
{
// Accept a connection
sClient = accept(sListen,
(struct sockaddr *)&client, &iaddrSize);
printf("Accepted
client:%s:%d\n", inet_ntoa(client.sin_addr), ntohs(client.sin_port));
g_CliSocketArr[g_iTotalConn]
= sClient;
// Allocate a
PER_IO_OPERATION_DATA structure
g_pPerIODataArr[g_iTotalConn]
= (LPPER_IO_OPERATION_DATA)HeapAlloc(
GetProcessHeap(),
HEAP_ZERO_MEMORY,
sizeof(PER_IO_OPERATION_DATA));
g_pPerIODataArr[g_iTotalConn]->Buffer.len = MSGSIZE;
g_pPerIODataArr[g_iTotalConn]->Buffer.buf =
g_pPerIODataArr[g_iTotalConn]->szMessage;
g_CliEventArr[g_iTotalConn] =
g_pPerIODataArr[g_iTotalConn]->overlap.hEvent = WSACreateEvent();
// Launch an asynchronous
operation
WSARecv(
g_CliSocketArr[g_iTotalConn],
&g_pPerIODataArr[g_iTotalConn]->Buffer,
1,
&g_pPerIODataArr[g_iTotalConn]->NumberOfBytesRecvd,
&g_pPerIODataArr[g_iTotalConn]->Flags,
&g_pPerIODataArr[g_iTotalConn]->overlap,
NULL);
g_iTotalConn++;
}
while (TRUE)
{
ret =
WSAWaitForMultipleEvents(g_iTotalConn, g_CliEventArr, FALSE, 1000, FALSE);
if (ret == WSA_WAIT_FAILED ||
ret == WSA_WAIT_TIMEOUT)
{
continue;
}
index = ret -
WSA_WAIT_EVENT_0;
WSAResetEvent(g_CliEventArr[index]);
WSAGetOverlappedResult(
g_CliSocketArr[index],
&g_pPerIODataArr[index]->overlap,
&cbTransferred,
TRUE,
&g_pPerIODataArr[g_iTotalConn]->Flags);
if (cbTransferred == 0)
{
// The connection
was closed by client
Cleanup(index);
}
else
{
//
g_pPerIODataArr[index]->szMessage contains the received data
g_pPerIODataArr[index]->szMessage[cbTransferred] = '\0';
send(g_CliSocketArr[index], g_pPerIODataArr[index]->szMessage,\
cbTransferred, 0);
// Launch another
asynchronous operation
WSARecv(
g_CliSocketArr[index],
&g_pPerIODataArr[index]->Buffer,
1,
&g_pPerIODataArr[index]->NumberOfBytesRecvd,
&g_pPerIODataArr[index]->Flags,
&g_pPerIODataArr[index]->overlap,
NULL);
}
}
b) 完成例程方式
while (TRUE)
{
g_sNewClientConnection =
accept(sListen, (struct sockaddr *)&client, &iaddrSize);
g_bNewConnectionArrived =
TRUE;
}
while (TRUE)
{
if (g_bNewConnectionArrived)
{
lpPerIOData =
(LPPER_IO_OPERATION_DATA)HeapAlloc(
GetProcessHeap(),
HEAP_ZERO_MEMORY,
sizeof(PER_IO_OPERATION_DATA));
lpPerIOData->Buffer.len = MSGSIZE;
lpPerIOData->Buffer.buf = lpPerIOData->szMessage;
lpPerIOData->sClient = g_sNewClientConnection;
WSARecv(lpPerIOData->sClient,
&lpPerIOData->Buffer,
1,
&lpPerIOData->NumberOfBytesRecvd,
&lpPerIOData->Flags,
&lpPerIOData->overlap,
CompletionROUTINE);
g_bNewConnectionArrived = FALSE;
}
SleepEx(1000, TRUE);
}
void CALLBACK CompletionROUTINE(DWORD dwError,
DWORD cbTransferred,
LPWSAOVERLAPPED lpOverlapped,
DWORD dwFlags)
{
LPPER_IO_OPERATION_DATA lpPerIOData =
(LPPER_IO_OPERATION_DATA)lpOverlapped;
if (dwError != 0 || cbTransferred == 0)
{
closesocket(lpPerIOData->sClient);
HeapFree(GetProcessHeap(), 0, lpPerIOData);
}
else
{
lpPerIOData->szMessage[cbTransferred] = '\0';
send(lpPerIOData->sClient,
lpPerIOData->szMessage, cbTransferred, 0);
// Launch another
asynchronous operation
memset(&lpPerIOData->overlap,
0, sizeof(WSAOVERLAPPED));
lpPerIOData->Buffer.len =
MSGSIZE;
lpPerIOData->Buffer.buf =
lpPerIOData->szMessage;
WSARecv(lpPerIOData->sClient,
&lpPerIOData->Buffer,
1,
&lpPerIOData->NumberOfBytesRecvd,
&lpPerIOData->Flags,
&lpPerIOData->overlap,
CompletionROUTINE);
}
}
E.完成端口(Completion Port)
// Initialize Windows Socket library
WSAStartup(0x0202, &wsaData);
// Create completion port
CompletionPort =
CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
// Create worker thread
GetSystemInfo(&systeminfo);
for (i = 0; i <
systeminfo.dwNumberOfProcessors; i++)
{
CreateThread(NULL, 0,
WorkerThread, CompletionPort, 0, &dwThreadId);
}
// Create listening socket
sListen = socket(AF_INET, SOCK_STREAM,
IPPROTO_TCP);
// Bind
local.sin_addr.S_un.S_addr =
htonl(INADDR_ANY);
local.sin_family = AF_INET;
local.sin_port = htons(PORT);
bind(sListen, (struct sockaddr *)&local,
sizeof(SOCKADDR_IN));
// Listen
listen(sListen, 3);
while (TRUE)
{
// Accept a connection
sClient = accept(sListen,
(struct sockaddr *)&client, &iaddrSize);
printf("Accepted
client:%s:%d\n", inet_ntoa(client.sin_addr), ntohs(client.sin_port));
// Associate the newly
arrived client socket with completion port
CreateIoCompletionPort((HANDLE)sClient, CompletionPort, (DWORD)sClient, 0);
// Launch an asynchronous
operation for new arrived connection
lpPerIOData =
(LPPER_IO_OPERATION_DATA)HeapAlloc(
GetProcessHeap(),
HEAP_ZERO_MEMORY,
sizeof(PER_IO_OPERATION_DATA));
lpPerIOData->Buffer.len =
MSGSIZE;
lpPerIOData->Buffer.buf =
lpPerIOData->szMessage;
lpPerIOData->OperationType =
RECV_POSTED;
WSARecv(sClient,
&lpPerIOData->Buffer,
1,
&lpPerIOData->NumberOfBytesRecvd,
&lpPerIOData->Flags,
&lpPerIOData->overlap,
NULL);
}
PostQueuedCompletionStatus(CompletionPort, 0xFFFFFFFF, 0, NULL);
CloseHandle(CompletionPort);
closesocket(sListen);
WSACleanup();
DWORD WINAPI WorkerThread(LPVOID CompletionPortID)
{
HANDLE
CompletionPort=(HANDLE)CompletionPortID;
DWORD
dwBytesTransferred;
SOCKET
sClient;
LPPER_IO_OPERATION_DATA lpPerIOData =
NULL;
while (TRUE)
{
GetQueuedCompletionStatus(
CompletionPort,
&dwBytesTransferred,
&sClient,
(LPOVERLAPPED
*)&lpPerIOData,
INFINITE);
if (dwBytesTransferred ==
0xFFFFFFFF)
{
return 0;
}
if
(lpPerIOData->OperationType == RECV_POSTED)
{
if
(dwBytesTransferred == 0)
{
//
Connection was closed by client
closesocket(sClient);
HeapFree(GetProcessHeap(), 0, lpPerIOData);
}
else
{
lpPerIOData->szMessage[dwBytesTransferred] = '\0';
send(sClient, lpPerIOData->szMessage, dwBytesTransferred, 0);
//
Launch another asynchronous operation for sClient
memset(lpPerIOData, 0, sizeof(PER_IO_OPERATION_DATA));
lpPerIOData->Buffer.len = MSGSIZE;
lpPerIOData->Buffer.buf = lpPerIOData->szMessage;
lpPerIOData->OperationType = RECV_POSTED;
WSARecv(sClient,
&lpPerIOData->Buffer,
1,
&lpPerIOData->NumberOfBytesRecvd,
&lpPerIOData->Flags,
&lpPerIOData->overlap,
NULL);
}
}
}
return 0;
}
(2)比较分析
A.选择模型FD_SETSIZE,MAXIMUM_WAIT_OBJECTS宏的影响,事件选择、用事件通知实现的重叠I/O都有每线程最大64连接数限制,异步选择、用完成例程实现的重叠I/O和完成端口不受此限制
B.除了异步选择以外,其他模型至少需要2个线程。一个主线程和一个辅助线程。同样的,如果连接数大于64,则选择模型、事件选择和用事件通知实现的重叠I/O的线程数还要增加。
C.性能:选择<用事件通知实现的重叠I/O<事件选择<用完成例程实现的重叠I/O<完成端口
Copyright 2011-2020 © MallocFree. All rights reserved.