bdfgdfg
[온라인 서버] I/O Completion Port(IOCP) 모델 실습 본문
이론은 앞서 두개의 글(하나는 비공개)을 통해 공부했으니 바로 코드
IOCompletionPort.h
#pragma once
#include "std.h"
#define MAX_SOCKBUF 1024 // 패킷(현재는 버퍼)크기
#define MAX_CLIENT 100 // 최대 접속가능한 클라이언트 수
#define MAX_WORKERTHREAD 4 // 쓰레드 풀(CP객체)에 넣을 쓰레드 수
enum class IO_TYPE
{
IO_RECV,
IO_SEND
};
// Overlapped 구조체를 확장하여 사용.
struct OverlappedEx
{
WSAOVERLAPPED m_wsaOverlapped;
WSABUF m_wsaBuf;
char m_dataBuffer[MAX_SOCKBUF];
IO_TYPE m_ioType;
};
// 클라이언트(Session) 정보를 담기위한 구조체
struct ClientInfo
{
ClientInfo() : m_socketClient(INVALID_SOCKET)
{
::ZeroMemory(&m_recvOverlapped, sizeof(OverlappedEx));
::ZeroMemory(&m_sendOverlapped, sizeof(OverlappedEx));
}
SOCKET m_socketClient;
OverlappedEx m_recvOverlapped; // Recv Overlapped(비동기) I/O 작업을 위한 변수
OverlappedEx m_sendOverlapped; // Send Overlapped(비동기) I/O 작업을 위한 변수
};
class IOCompletionPort
{
friend unsigned int WINAPI CallWorkerThread(LPVOID p);
friend unsigned int WINAPI CallAccepterThread(LPVOID p);
public:
IOCompletionPort();
virtual ~IOCompletionPort();
public:
//소켓을 초기화하는 함수
bool InitSocket();
//소켓의 연결을 종료 시킨다.
void CloseSocket(ClientInfo* pClientInfo, bool bIsForce = false);
//서버의 주소정보를 소켓과 연결시키고 접속 요청을 받기 위해 그 소켓을 등록하는 함수
bool BindandListen(int nBindPort);
//접속 요청을 수락하고 메세지를 받아서 처리하는 함수
bool StartServer();
//생성되어있는 쓰레드를 파괴한다.
void DestroyThread();
private:
//Overlapped I/O작업을 위한 쓰레드를 생성
bool CreateWokerThread();
//accept요청을 처리하는 쓰레드 생성
bool CreateAccepterThread();
//사용되지 않은 ClientInfo를 반환
ClientInfo* GetEmptyClientInfo();
// CP객체와 Completion Key(완료키)를 등록하는 함수.
bool BindIOCompletionPort(ClientInfo* pClientInfo);
//WSARecv Overlapped I/O 작업을 시킨다.
bool BindRecv(ClientInfo* pClientInfo);
//WSASend Overlapped I/O작업을 시킨다.
bool SendMsg(ClientInfo* pClientInfo, char* pMsg, int nLen);
bool IOProcess(ClientInfo* pClient,OverlappedEx* pOverlapped, DWORD& transferred);
//Overlapped I/O작업에 대한 완료 통보를 받아
//그에 해당하는 처리를 하는 함수
void WokerThread();
//사용자의 접속을 받는 쓰레드
void AccepterThread();
private:
//클라이언트 정보 구조체
ClientInfo* m_pClientInfo;
//리슨소켓
SOCKET m_hSocketListen;
// 연결된 클라이언트 수
int m_clientCount;
// 워커쓰레드 핸들 배열
HANDLE m_hWorkerThread[MAX_WORKERTHREAD];
// 접속 쓰레드 핸들
HANDLE m_hAccepterThread;
// CP객체
HANDLE m_hCompletionPort;
// 작업 쓰레드 BOOL
bool m_bWorkerRun;
// 접속 쓰레드 BOOL
bool m_bAccepterRun;
// 소켓 버퍼
char m_socketBuff[MAX_SOCKBUF];
};
Overlapped I/O 이벤트 기반과의 차이점은 소켓과 이벤트의 1대1구조가 아닌 ClientInfo를 동적 배열로 생성 후
CP객체에 소켓을 등록할 때, ClientInfo구조체를 완료키로 넘겨준다. (즉 자기자신의 소켓 및 Overlapped구조체들)
그리고 GQCS함수를 호출하여 완료큐에 일감을 뽑아온 쓰레드가 Recv,Send등의 작업을 처리할 때 완료키로 넘긴
Overlapped구조체 멤버를 다시 넘기는 형태.
#define MAX_SOCKBUF 1024 // 패킷(현재는 버퍼)크기
#define MAX_CLIENT 100 // 최대 접속가능한 클라이언트 수
#define MAX_WORKERTHREAD 4 // 쓰레드 풀(CP객체)에 넣을 쓰레드 수
enum class IO_TYPE
{
IO_RECV,
IO_SEND
};
// Overlapped 구조체를 확장하여 사용.
struct OverlappedEx
{
WSAOVERLAPPED m_wsaOverlapped;
WSABUF m_wsaBuf;
char m_dataBuffer[MAX_SOCKBUF];
IO_TYPE m_ioType;
};
// 클라이언트(Session) 정보를 담기위한 구조체
struct ClientInfo
{
ClientInfo() : m_socketClient(INVALID_SOCKET)
{
::ZeroMemory(&m_recvOverlapped, sizeof(OverlappedEx));
::ZeroMemory(&m_sendOverlapped, sizeof(OverlappedEx));
}
SOCKET m_socketClient;
OverlappedEx m_recvOverlapped; // Recv Overlapped(비동기) I/O 작업을 위한 변수
OverlappedEx m_sendOverlapped; // Send Overlapped(비동기) I/O 작업을 위한 변수
};
먼저 하나하나씩 살펴보면
1. IO_TYPE을 정의함.
- 어떤 입출력 완료인지 체크하기 위함.
2. 확장된 OverlappedEx구조체 사용
- 항상 WSAOVERLAPPED 구조체 멤버는 첫번째 offset에 위치.
- GQCS함수를 통해 꺼내와서 타입,버퍼내용등을 읽어옴(에코)
3. ClientInfo 구조체 사용(Session 정보)
- CP객체에 등록할 완료 키(Completion Key)로 사용.
- 내부적으로 recv용 Overlapped 변수와 send용 Overlapped 변수를 나눔.
함수는 순서대로 보지만 Initocket,CloseSocket,BindAndListen,DestroyThread는 넘어간다.
먼저 생성자 소멸자
IOCompletionPort::IOCompletionPort() :m_hSocketListen(INVALID_SOCKET),
m_clientCount(0),m_hAccepterThread(INVALID_HANDLE_VALUE),m_hCompletionPort(INVALID_HANDLE_VALUE),
m_bWorkerRun(true),m_bAccepterRun(true)
{
// 클라이언트 관리를 위한 ClientInfo 배열 동적(★) 생성.
m_pClientInfo = new ClientInfo[MAX_CLIENT]();
for (int i = 0; i < MAX_WORKERTHREAD; ++i)
m_hWorkerThread[i] = INVALID_HANDLE_VALUE;
::ZeroMemory(m_socketBuff, sizeof(MAX_SOCKBUF));
}
IOCP 클래스의 객체가 생성되면 내부 멤버를 초기화 한다.
여기서 중요한건 클라이언트(Session)들을 관리하기 위한 동적 배열을 할당.
IOCompletionPort::~IOCompletionPort()
{
::WSACleanup();
if (m_pClientInfo)
{
delete[] m_pClientInfo;
m_pClientInfo = nullptr;
}
}
할당했으면 삭제.
그다음 StartServer
bool IOCompletionPort::StartServer()
{
// ★ CP객체 생성
m_hCompletionPort = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
if (m_hCompletionPort == INVALID_HANDLE_VALUE)
{
std::cout << "CP객체 생성 실패" << std::endl;
return false;
}
bool bRet = CreateWokerThread();
if (bRet == false)
return false;
bRet = CreateAccepterThread();
if (bRet == false)
return false;
std::cout << "서버 시작..." << std::endl;
return true;
}
먼저 리슨 소켓까지 생성하였다면 이제 CP객체를 생성한 후
각각 워커 쓰레드(4개의 쓰레드), 클라이언트 연결 처리(1개의 쓰레드)를 하는 쓰레드들을 생성.
먼저 워커 쓰레드 (GQCS함수를 호출하는 쓰레드들. 즉 IOCP Queue의 완료일감이 있다면 꺼내와 실행상태가 된다.)
bool IOCompletionPort::CreateWokerThread()
{
UINT threadId = 0;
//WaitingThread Queue에 대기 상태로 넣을 쓰레드들 생성(쓰레드 풀)
// 권장되는 개수 (cpu 개수 * 2 ) + 1
for (int i = 0; i < MAX_WORKERTHREAD; ++i)
{
m_hWorkerThread[i] = (HANDLE)::_beginthreadex(NULL, 0, CallWorkerThread, this, CREATE_SUSPENDED, &threadId);
if (m_hWorkerThread[i] == INVALID_HANDLE_VALUE)
{
std::cout << "쓰레드 생성 실패" << std::endl;
return false;
}
::ResumeThread(m_hWorkerThread[i]);
}
std::cout << "WorkerThread 시작.." << std::endl;
return true;
}
// WSARecv와 WSASend의 Overlapped I/O 작업 처리를 위한 쓰레드함수
unsigned int WINAPI CallWorkerThread(LPVOID p)
{
IOCompletionPort* pEvent = (IOCompletionPort*)p;
pEvent->WokerThread();
return 0;
}
쓰레드 생성 후 this인자를 쓰레드 함수에 넘겨준 후 WorkerThread함수를 호출한다.
void IOCompletionPort::WokerThread()
{
//CompletionKey(완료 키)를 받을 포인터 변수
ClientInfo* cpKey = nullptr;
// 함수 호출 성공여부
bool bSuccess = true;
//Overlapped I/O작업에서 송수신된 데이터 크기
DWORD transferred = 0;
// Overlapped I/O작업에서 넘겨준 Overlapped구조체
LPOVERLAPPED pOverlapped = nullptr;
OverlappedEx* originOverlapped = nullptr;
while (m_bWorkerRun)
{
// GQCS함수로 인해 쓰레드들은 WatingThread 에 들어가 대기상태가 된다. (쓰레드 풀)
// 완료된 Overlapped I/O작업이 생기면 IOCP queue(완료큐)에 들어가고 쓰레드들 중 하나가 깨어나
// 그 작업을 가져와 뒤처리를 맡긴다.
bSuccess = ::GetQueuedCompletionStatus(m_hCompletionPort, &transferred, (ULONG_PTR*)&cpKey,
&pOverlapped, INFINITE);
// 사용자가 접속을 끊었을 때.
if (transferred == 0 && bSuccess == FALSE)
{
::closesocket(cpKey->m_socketClient);
continue;
}
// 사용자 쓰레드 종료 메시지 처리
if (bSuccess == TRUE && transferred == 0 && pOverlapped == nullptr)
{
m_bWorkerRun = false;
continue;
}
if (pOverlapped == nullptr)
continue;
//캐스팅
originOverlapped = (OverlappedEx*)pOverlapped;
bool bRet = IOProcess(cpKey,originOverlapped,transferred);
if (bRet == false)
{
m_bWorkerRun = false;
continue;
}
}
}
Overlapped I/O를 요청하고 완료된 일감들이 있기전까지는 쓰레드들은 WaitingThread List(LIFO)에서 대기.
IOCP Queue에 I/O가 완료된 일감이 있다면 Completion Key, Overlapped 구조체 정보를 얻어온다.
그리고 완료된 I/O의 정보에 따라 뒤처리를 진행.
bool IOCompletionPort::IOProcess(ClientInfo* pClient,OverlappedEx* pOverlapped,DWORD& transferred)
{
// RECV처리.(에코니 다시 전송)
switch (pOverlapped->m_ioType)
{
case IO_TYPE::IO_RECV:
pOverlapped->m_dataBuffer[transferred] = '\0';
std::cout << "[수신] bytes " << transferred << " msg : " << pOverlapped->m_dataBuffer << std::endl;
SendMsg(pClient, pOverlapped->m_dataBuffer, transferred);
break;
case IO_TYPE::IO_SEND:
pOverlapped->m_dataBuffer[transferred] = '\0';
std::cout << "[송신] bytes : " << transferred << " msg : " << pOverlapped->m_dataBuffer << std::endl;
BindRecv(pClient); // 다시 Recv걸어준다.
break;
default: // 예외
return false;
}
return true;
}
이게 워커 쓰레드의 루틴.
이제 연결처리를 하는 쓰레드의 루틴을 본다.
bool IOCompletionPort::CreateAccepterThread()
{
UINT threadId = 0;
m_hAccepterThread = (HANDLE)::_beginthreadex(NULL, 0, CallAccepterThread, this, CREATE_SUSPENDED, &threadId);
if (m_hAccepterThread == INVALID_HANDLE_VALUE)
{
std::cout << "쓰레드 생성 실패" << std::endl;
return false;
}
::ResumeThread(m_hAccepterThread);
return true;
}
// 연결처리 쓰레드 함수.
unsigned int WINAPI CallAccepterThread(LPVOID p)
{
IOCompletionPort* pEvent = (IOCompletionPort*)p;
pEvent->AccepterThread();
return 0;
}
(참고로 전역함수인데 private 멤버함수가 호출가능한것은 해당 전역함수들을 friend화 하였음)
// 클라이언트 연결 처리하는 함수(AccepterThread가 처리)
void IOCompletionPort::AccepterThread()
{
SOCKADDR_IN clientAddr;
int addrLen = sizeof(SOCKADDR_IN);
while (m_bAccepterRun)
{
// 접속 받을 구조체의 인덱스를 얻어온다.
ClientInfo* pClient = GetEmptyClientInfo();
if (pClient == nullptr)
return;
// 클라이언트 접속 요청이 올때까지 대기
pClient->m_socketClient = ::accept(m_hSocketListen, (SOCKADDR*)&clientAddr, &addrLen);
if (pClient->m_socketClient == INVALID_SOCKET)
{
if (::WSAGetLastError() == WSAEWOULDBLOCK)
continue;
break;
}
// 클라이언트와 연결된 소켓은 CP객체에 등록한다.
bool ret = BindIOCompletionPort(pClient);
if (!ret)
return;
++m_clientCount;
}
}
연결처리를 하는 쓰레드는 클라이언트의 연결 요청이 들어온 후 CP객체에 등록하는 일까지만 담당한다.
먼저 처리하는 루틴을 하나씩 보면
1. IOCP클래스의 멤버 m_pClientInfo 동적 배열중 가장 앞의 빈 인덱스를 가져온다(GetEmptyClientInfo 함수)
ClientInfo* IOCompletionPort::GetEmptyClientInfo()
{
for (int i = 0; i < MAX_CLIENT; ++i)
{
if (m_pClientInfo[i].m_socketClient == INVALID_SOCKET)
return &m_pClientInfo[i];
}
return nullptr;
}
2. accpet 처리. 논블록킹 소켓이기에 바로 빠져나옴. 이게 연결을 처리한건지 아니면 아직 완료가 안된건지를 체크.
pClient->m_socketClient = ::accept(m_hSocketListen, (SOCKADDR*)&clientAddr, &addrLen);
if (pClient->m_socketClient == INVALID_SOCKET)
{
if (::WSAGetLastError() == WSAEWOULDBLOCK)
continue;
break;
}
3. 연결이 되었다면 클라이언트와 연결된 소켓 CP객체에 등록
// 클라이언트와 연결된 소켓은 CP객체에 등록한다.
bool ret = BindIOCompletionPort(pClient);
if (!ret)
return;
++m_clientCount;
bool IOCompletionPort::BindIOCompletionPort(ClientInfo* pClientInfo)
{
// 소켓과 완료키 CP객체에 등록.
// 여기서 완료키는 GQCS함수에서 다시 받아볼 수 있음.
// 완료키는 ClientInfo를 그대로 건네준다.
// 이 함수의 반환값은 우리가 생성한 CP객체 핸들을 반환.
HANDLE compareCP;
compareCP = ::CreateIoCompletionPort((HANDLE)pClientInfo->m_socketClient,
m_hCompletionPort, reinterpret_cast<ULONG_PTR>(pClientInfo),
0);
if (compareCP == NULL || compareCP != m_hCompletionPort)
return false;
bool check = BindRecv(pClientInfo); // 꼭 한번 Recv요청을 걸어놔야한다.
while(!check)
check =BindRecv(pClientInfo);
return true;
}
참고로 BindRecv는 WSARecv를 요청하는 함수. 그전에 Overlapped 구조체를 한번 세팅해준다.
-> CP객체에 소켓을 등록 후 무조건 WSARecv함수(Overlapped I/O)를 한번 요청 해놓아야 한다.
--> 언제 클라이언트에게 패킷이 올지 모르며, IOCP는 I/O를 요청해야 완료큐에 완료된 일감이 들어오므로.
'게임프로그래밍 > 서버 책' 카테고리의 다른 글
[온라인 서버] 네트워크 라이브러리 - Monitor Class (0) | 2022.01.04 |
---|---|
[온라인 서버] 네트워크 라이브러리(정적,동적) 만들기 (0) | 2022.01.03 |
[온라인 서버] I/O Completion Port(IOCP) 모델 이론 (0) | 2021.12.29 |
[온라인 서버] Overlapped IO(이벤트 기반) (0) | 2021.12.29 |
IOCP(Input Ouput Completion Port) (0) | 2021.12.17 |