Windows服务器端5种IO模型


(1)五种IO模型:

 

A选择模型

B用事件通知实现的重叠I/O模型

C事件选择模型

D用完成例程实现的重叠I/O模型

E完成端口模型


A.选择(Select)模型

 

sClient = accept(sListen, (struct sockaddr *)&client, &iaddrSize);

g_CliSocketArr[g_iTotalConn++] = sClient;

 

 

DWORD WINAPI WorkerThread(LPVOID lpParam)

{

  int            i;

  fd_set         fdread;

  int            ret;

  struct timeval tv = {1, 0};

  char           szMessage[MSGSIZE];

 

  while (TRUE)

  {

    FD_ZERO(&fdread);

    for (i = 0; i < g_iTotalConn; i++)

    {

      FD_SET(g_CliSocketArr, &fdread);

    }

 

    // We only care read event

    ret = select(0, &fdread, NULL, NULL, &tv);

 

    if (ret == 0)

    {

      // Time expired

      continue;

    }

 

   for (i = 0; i < g_iTotalConn; i++)

   {

        if (FD_ISSET(g_CliSocketArr, &fdread))

        {

 

            ret = recv(g_CliSocketArr, szMessage, MSGSIZE, 0);

            if (ret == 0 || (ret == SOCKET_ERROR && WSAGetLastError() == WSAECONNRESET))

            {

              closesocket(g_CliSocketArr);

             if (i < g_iTotalConn - 1)

             {           

                g_CliSocketArr[i--] = g_CliSocketArr[--g_iTotalConn];

             }

           }

           else

           {

             szMessage[ret] = '\0';

             send(g_CliSocketArr, szMessage, strlen(szMessage), 0);

          }

       }

   }

  }

 

 return 0;

}

B.异步选择(WSAAsyncSelect

WSAAsyncSelect(sListen, hwnd, WM_SOCKET, FD_ACCEPT);

 

case WM_SOCKET:

    if (WSAGETSELECTERROR(lParam))

    {

      closesocket(wParam);

      break;

    }

   

    switch (WSAGETSELECTEVENT(lParam))

    {

    case FD_ACCEPT:

      // Accept a connection from client

      sClient = accept(wParam, (struct sockaddr *)&client, &iAddrSize);

     

      // Associate client socket with FD_READ and FD_CLOSE event

      WSAAsyncSelect(sClient, hwnd, WM_SOCKET, FD_READ | FD_CLOSE);

      break;

 

    case FD_READ:

      ret = recv(wParam, szMessage, MSGSIZE, 0);

 

      if (ret == 0 || ret == SOCKET_ERROR && WSAGetLastError() == WSAECONNRESET)

      {

        closesocket(wParam);

      }

      else

      {

        szMessage[ret] = '\0';

        send(wParam, szMessage, strlen(szMessage), 0);

      }

      break;

     

    case FD_CLOSE:

      closesocket(wParam);     

      break;

}

return 0

}

 

C.事件选择(WSAEventSelect

// Accept a connection

sClient = accept(sListen, (struct sockaddr *)&client, &iaddrSize);

printf("Accepted client:%s:%d\n", inet_ntoa(client.sin_addr), ntohs(client.sin_port));

 

// Associate socket with network event

g_CliSocketArr[g_iTotalConn] = sClient;

g_CliEventArr[g_iTotalConn] = WSACreateEvent();

WSAEventSelect(g_CliSocketArr[g_iTotalConn],

              g_CliEventArr[g_iTotalConn],

              FD_READ | FD_CLOSE);

g_iTotalConn++;

 

while (TRUE)

{

    ret = WSAWaitForMultipleEvents(g_iTotalConn, g_CliEventArr, FALSE, 1000, FALSE);

    if (ret == WSA_WAIT_FAILED || ret == WSA_WAIT_TIMEOUT)

    {

      continue;

    }

 

    index = ret - WSA_WAIT_EVENT_0;

    WSAEnumNetworkEvents(g_CliSocketArr[index], g_CliEventArr[index], &NetworkEvents);

 

    if (NetworkEvents.lNetworkEvents & FD_READ)

    {

      // Receive message from client

      ret = recv(g_CliSocketArr[index], szMessage, MSGSIZE, 0);

      if (ret == 0 || (ret == SOCKET_ERROR && WSAGetLastError() == WSAECONNRESET))

      {

        Cleanup(index);

      }

      else

      {

        szMessage[ret] = '\0';

        send(g_CliSocketArr[index], szMessage, strlen(szMessage), 0);

      }

   }

 

   if (NetworkEvents.lNetworkEvents & FD_CLOSE)

   {

      Cleanup(index);

   }

}

D.重叠I/OOverlapped I/O

a) 事件方式

while (TRUE)

  {

    // Accept a connection

    sClient = accept(sListen, (struct sockaddr *)&client, &iaddrSize);

    printf("Accepted client:%s:%d\n", inet_ntoa(client.sin_addr), ntohs(client.sin_port));

 

    g_CliSocketArr[g_iTotalConn] = sClient;

   

    // Allocate a PER_IO_OPERATION_DATA structure

    g_pPerIODataArr[g_iTotalConn] = (LPPER_IO_OPERATION_DATA)HeapAlloc(

      GetProcessHeap(),

      HEAP_ZERO_MEMORY,

      sizeof(PER_IO_OPERATION_DATA));

    g_pPerIODataArr[g_iTotalConn]->Buffer.len = MSGSIZE;

    g_pPerIODataArr[g_iTotalConn]->Buffer.buf = g_pPerIODataArr[g_iTotalConn]->szMessage;

    g_CliEventArr[g_iTotalConn] = g_pPerIODataArr[g_iTotalConn]->overlap.hEvent = WSACreateEvent();

 

    // Launch an asynchronous operation

    WSARecv(

      g_CliSocketArr[g_iTotalConn],

      &g_pPerIODataArr[g_iTotalConn]->Buffer,

      1,

      &g_pPerIODataArr[g_iTotalConn]->NumberOfBytesRecvd,

      &g_pPerIODataArr[g_iTotalConn]->Flags,

      &g_pPerIODataArr[g_iTotalConn]->overlap,

      NULL);

   

    g_iTotalConn++;

  }

 

while (TRUE)

{

    ret = WSAWaitForMultipleEvents(g_iTotalConn, g_CliEventArr, FALSE, 1000, FALSE);

    if (ret == WSA_WAIT_FAILED || ret == WSA_WAIT_TIMEOUT)

    {

      continue;

    }

 

    index = ret - WSA_WAIT_EVENT_0;

    WSAResetEvent(g_CliEventArr[index]);

 

    WSAGetOverlappedResult(

      g_CliSocketArr[index],

      &g_pPerIODataArr[index]->overlap,

      &cbTransferred,

      TRUE,

      &g_pPerIODataArr[g_iTotalConn]->Flags);

 

    if (cbTransferred == 0)

    {

      // The connection was closed by client

      Cleanup(index);

    }

    else

    {

      // g_pPerIODataArr[index]->szMessage contains the received data

      g_pPerIODataArr[index]->szMessage[cbTransferred] = '\0';

      send(g_CliSocketArr[index], g_pPerIODataArr[index]->szMessage,\

        cbTransferred, 0);

 

      // Launch another asynchronous operation

      WSARecv(

        g_CliSocketArr[index],

        &g_pPerIODataArr[index]->Buffer,

        1,

        &g_pPerIODataArr[index]->NumberOfBytesRecvd,

        &g_pPerIODataArr[index]->Flags,

        &g_pPerIODataArr[index]->overlap,

        NULL);

    }

}

b) 完成例程方式

  while (TRUE)

  {

    g_sNewClientConnection = accept(sListen, (struct sockaddr *)&client, &iaddrSize);

    g_bNewConnectionArrived = TRUE;

  }

 while (TRUE)

  {

    if (g_bNewConnectionArrived)

    {

      lpPerIOData = (LPPER_IO_OPERATION_DATA)HeapAlloc(

        GetProcessHeap(),

        HEAP_ZERO_MEMORY,

        sizeof(PER_IO_OPERATION_DATA));

      lpPerIOData->Buffer.len = MSGSIZE;

      lpPerIOData->Buffer.buf = lpPerIOData->szMessage;

      lpPerIOData->sClient = g_sNewClientConnection;

     

      WSARecv(lpPerIOData->sClient,

        &lpPerIOData->Buffer,

        1,

        &lpPerIOData->NumberOfBytesRecvd,

        &lpPerIOData->Flags,

        &lpPerIOData->overlap,

        CompletionROUTINE);     

     

      g_bNewConnectionArrived = FALSE;

    }

 

    SleepEx(1000, TRUE);

}

 

void CALLBACK CompletionROUTINE(DWORD dwError,

                                DWORD cbTransferred,

                                LPWSAOVERLAPPED lpOverlapped,

                                DWORD dwFlags)

{

  LPPER_IO_OPERATION_DATA lpPerIOData = (LPPER_IO_OPERATION_DATA)lpOverlapped;

 

  if (dwError != 0 || cbTransferred == 0)

  {

      closesocket(lpPerIOData->sClient);

      HeapFree(GetProcessHeap(), 0, lpPerIOData);

  }

  else

  {

    lpPerIOData->szMessage[cbTransferred] = '\0';

    send(lpPerIOData->sClient, lpPerIOData->szMessage, cbTransferred, 0);

   

    // Launch another asynchronous operation

    memset(&lpPerIOData->overlap, 0, sizeof(WSAOVERLAPPED));

    lpPerIOData->Buffer.len = MSGSIZE;

    lpPerIOData->Buffer.buf = lpPerIOData->szMessage;   

 

    WSARecv(lpPerIOData->sClient,

      &lpPerIOData->Buffer,

      1,

      &lpPerIOData->NumberOfBytesRecvd,

      &lpPerIOData->Flags,

      &lpPerIOData->overlap,

      CompletionROUTINE);

  }

}

 

E.完成端口(Completion Port)

  // Initialize Windows Socket library

  WSAStartup(0x0202, &wsaData);

 

  // Create completion port

  CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);

 

  // Create worker thread

  GetSystemInfo(&systeminfo);

  for (i = 0; i < systeminfo.dwNumberOfProcessors; i++)

  {

    CreateThread(NULL, 0, WorkerThread, CompletionPort, 0, &dwThreadId);

  }

 

  // Create listening socket

  sListen = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);

 

  // Bind

  local.sin_addr.S_un.S_addr = htonl(INADDR_ANY);

local.sin_family = AF_INET;

local.sin_port = htons(PORT);

  bind(sListen, (struct sockaddr *)&local, sizeof(SOCKADDR_IN));

 

  // Listen

  listen(sListen, 3);

 

  while (TRUE)

  {

    // Accept a connection

    sClient = accept(sListen, (struct sockaddr *)&client, &iaddrSize);

    printf("Accepted client:%s:%d\n", inet_ntoa(client.sin_addr), ntohs(client.sin_port));

 

    // Associate the newly arrived client socket with completion port

    CreateIoCompletionPort((HANDLE)sClient, CompletionPort, (DWORD)sClient, 0);

   

    // Launch an asynchronous operation for new arrived connection

    lpPerIOData = (LPPER_IO_OPERATION_DATA)HeapAlloc(

      GetProcessHeap(),

      HEAP_ZERO_MEMORY,

      sizeof(PER_IO_OPERATION_DATA));

    lpPerIOData->Buffer.len = MSGSIZE;

    lpPerIOData->Buffer.buf = lpPerIOData->szMessage;

    lpPerIOData->OperationType = RECV_POSTED;

    WSARecv(sClient,

      &lpPerIOData->Buffer,

      1,

      &lpPerIOData->NumberOfBytesRecvd,

      &lpPerIOData->Flags,

      &lpPerIOData->overlap,

      NULL);

  }

 

  PostQueuedCompletionStatus(CompletionPort, 0xFFFFFFFF, 0, NULL);

  CloseHandle(CompletionPort);

  closesocket(sListen);

  WSACleanup();

 

 

DWORD WINAPI WorkerThread(LPVOID CompletionPortID)

{

  HANDLE                  CompletionPort=(HANDLE)CompletionPortID;

  DWORD                   dwBytesTransferred;

  SOCKET                  sClient;

  LPPER_IO_OPERATION_DATA lpPerIOData = NULL;

 

  while (TRUE)

  {

    GetQueuedCompletionStatus(

      CompletionPort,

      &dwBytesTransferred,

      &sClient,

      (LPOVERLAPPED *)&lpPerIOData,

      INFINITE);

    if (dwBytesTransferred == 0xFFFFFFFF)

    {

      return 0;

    }

   

    if (lpPerIOData->OperationType == RECV_POSTED)

    {

      if (dwBytesTransferred == 0)

      {

        // Connection was closed by client

        closesocket(sClient);

        HeapFree(GetProcessHeap(), 0, lpPerIOData);       

      }

      else

      {

        lpPerIOData->szMessage[dwBytesTransferred] = '\0';

        send(sClient, lpPerIOData->szMessage, dwBytesTransferred, 0);

       

        // Launch another asynchronous operation for sClient

        memset(lpPerIOData, 0, sizeof(PER_IO_OPERATION_DATA));

        lpPerIOData->Buffer.len = MSGSIZE;

        lpPerIOData->Buffer.buf = lpPerIOData->szMessage;

        lpPerIOData->OperationType = RECV_POSTED;

        WSARecv(sClient,

          &lpPerIOData->Buffer,

          1,

          &lpPerIOData->NumberOfBytesRecvd,

          &lpPerIOData->Flags,

          &lpPerIOData->overlap,

          NULL);

      }

    }

  }

  return 0;

}

(2)比较分析

A.选择模型FD_SETSIZEMAXIMUM_WAIT_OBJECTS宏的影响,事件选择、用事件通知实现的重叠I/O都有每线程最大64连接数限制,异步选择、用完成例程实现的重叠I/O和完成端口不受此限制

 

B.除了异步选择以外,其他模型至少需要2个线程。一个主线程和一个辅助线程。同样的,如果连接数大于64,则选择模型、事件选择和用事件通知实现的重叠I/O的线程数还要增加。

 

C.性能:选择<用事件通知实现的重叠I/O<事件选择<用完成例程实现的重叠I/O<完成端口



看文字不过瘾?点击我,进入周哥教IT视频教学
麦洛科菲长期致力于IT安全技术的推广与普及,我们更专业!我们的学员已经广泛就职于BAT360等各大IT互联网公司。详情请参考我们的 业界反馈 《周哥教IT.C语言深学活用》视频

我们的微信公众号,敬请关注