bdfgdfg

[온라인 서버] I/O Completion Port(IOCP) 모델 실습 본문

게임프로그래밍/서버 책

[온라인 서버] I/O Completion Port(IOCP) 모델 실습

marmelo12 2021. 12. 30. 19:01
반응형

 

이론은 앞서 두개의 글(하나는 비공개)을 통해 공부했으니 바로 코드

 

IOCompletionPort.h

#pragma once
#include "std.h"
#define MAX_SOCKBUF 1024 // 패킷(현재는 버퍼)크기
#define MAX_CLIENT 100 // 최대 접속가능한 클라이언트 수
#define MAX_WORKERTHREAD 4 // 쓰레드 풀(CP객체)에 넣을 쓰레드 수

enum class IO_TYPE
{
	IO_RECV,
	IO_SEND
};

// Overlapped 구조체를 확장하여 사용.
struct OverlappedEx
{
	WSAOVERLAPPED m_wsaOverlapped;
	WSABUF		  m_wsaBuf;
	char		  m_dataBuffer[MAX_SOCKBUF];
	IO_TYPE		  m_ioType;
};

// 클라이언트(Session) 정보를 담기위한 구조체
struct ClientInfo
{
	ClientInfo() : m_socketClient(INVALID_SOCKET)
	{
		::ZeroMemory(&m_recvOverlapped, sizeof(OverlappedEx));
		::ZeroMemory(&m_sendOverlapped, sizeof(OverlappedEx));
	}

	SOCKET			m_socketClient; 
	OverlappedEx	m_recvOverlapped; // Recv Overlapped(비동기) I/O 작업을 위한 변수
	OverlappedEx	m_sendOverlapped; // Send Overlapped(비동기) I/O 작업을 위한 변수
};

class IOCompletionPort
{
	friend unsigned int WINAPI CallWorkerThread(LPVOID p);
	friend unsigned int WINAPI CallAccepterThread(LPVOID p);
public:
	IOCompletionPort();
	virtual ~IOCompletionPort();

public:
	//소켓을 초기화하는 함수
	bool InitSocket();
	//소켓의 연결을 종료 시킨다.
	void CloseSocket(ClientInfo* pClientInfo, bool bIsForce = false);

	//서버의 주소정보를 소켓과 연결시키고 접속 요청을 받기 위해 그 소켓을 등록하는 함수
	bool BindandListen(int nBindPort);
	//접속 요청을 수락하고 메세지를 받아서 처리하는 함수
	bool StartServer();

	//생성되어있는 쓰레드를 파괴한다.
	void DestroyThread();
private:
	//Overlapped I/O작업을 위한 쓰레드를 생성
	bool CreateWokerThread();
	//accept요청을 처리하는 쓰레드 생성
	bool CreateAccepterThread();

	//사용되지 않은 ClientInfo를 반환
	ClientInfo* GetEmptyClientInfo();

	// CP객체와 Completion Key(완료키)를 등록하는 함수.
	bool BindIOCompletionPort(ClientInfo* pClientInfo);

	//WSARecv Overlapped I/O 작업을 시킨다.
	bool BindRecv(ClientInfo* pClientInfo);

	//WSASend Overlapped I/O작업을 시킨다.
	bool SendMsg(ClientInfo* pClientInfo, char* pMsg, int nLen);

	bool IOProcess(ClientInfo* pClient,OverlappedEx* pOverlapped, DWORD& transferred);

	//Overlapped I/O작업에 대한 완료 통보를 받아 
	//그에 해당하는 처리를 하는 함수
	void WokerThread();
	//사용자의 접속을 받는 쓰레드
	void AccepterThread();



private:
	//클라이언트 정보 구조체
	ClientInfo* m_pClientInfo;
	//리슨소켓
	SOCKET		m_hSocketListen;
	// 연결된 클라이언트 수
	int			m_clientCount;
	// 워커쓰레드 핸들 배열
	HANDLE		m_hWorkerThread[MAX_WORKERTHREAD];
	// 접속 쓰레드 핸들
	HANDLE		m_hAccepterThread;
	// CP객체
	HANDLE		m_hCompletionPort;
	// 작업 쓰레드 BOOL
	bool		m_bWorkerRun;
	// 접속 쓰레드 BOOL
	bool		m_bAccepterRun;
	// 소켓 버퍼
	char		m_socketBuff[MAX_SOCKBUF];

};

Overlapped I/O 이벤트 기반과의 차이점은 소켓과 이벤트의 1대1구조가 아닌 ClientInfo를 동적 배열로 생성 후

CP객체에 소켓을 등록할 때, ClientInfo구조체를 완료키로 넘겨준다. (즉 자기자신의 소켓 및 Overlapped구조체들)

그리고 GQCS함수를 호출하여 완료큐에 일감을 뽑아온 쓰레드가 Recv,Send등의 작업을 처리할 때 완료키로 넘긴

Overlapped구조체 멤버를 다시 넘기는 형태.

 

#define MAX_SOCKBUF 1024 // 패킷(현재는 버퍼)크기
#define MAX_CLIENT 100 // 최대 접속가능한 클라이언트 수
#define MAX_WORKERTHREAD 4 // 쓰레드 풀(CP객체)에 넣을 쓰레드 수

enum class IO_TYPE
{
	IO_RECV,
	IO_SEND
};

// Overlapped 구조체를 확장하여 사용.
struct OverlappedEx
{
	WSAOVERLAPPED m_wsaOverlapped;
	WSABUF		  m_wsaBuf;
	char		  m_dataBuffer[MAX_SOCKBUF];
	IO_TYPE		  m_ioType;
};

// 클라이언트(Session) 정보를 담기위한 구조체
struct ClientInfo
{
	ClientInfo() : m_socketClient(INVALID_SOCKET)
	{
		::ZeroMemory(&m_recvOverlapped, sizeof(OverlappedEx));
		::ZeroMemory(&m_sendOverlapped, sizeof(OverlappedEx));
	}

	SOCKET			m_socketClient; 
	OverlappedEx	m_recvOverlapped; // Recv Overlapped(비동기) I/O 작업을 위한 변수
	OverlappedEx	m_sendOverlapped; // Send Overlapped(비동기) I/O 작업을 위한 변수
};

먼저 하나하나씩 살펴보면

1. IO_TYPE을 정의함. 

 - 어떤 입출력 완료인지 체크하기 위함.

 

2. 확장된 OverlappedEx구조체 사용

 - 항상 WSAOVERLAPPED 구조체 멤버는 첫번째 offset에 위치.

 - GQCS함수를 통해 꺼내와서 타입,버퍼내용등을 읽어옴(에코)

 

3. ClientInfo 구조체 사용(Session 정보)

 - CP객체에 등록할 완료 키(Completion Key)로 사용.

 - 내부적으로 recv용 Overlapped 변수와 send용 Overlapped 변수를 나눔.

 

 

함수는 순서대로 보지만 Initocket,CloseSocket,BindAndListen,DestroyThread는 넘어간다.

 

먼저 생성자 소멸자

IOCompletionPort::IOCompletionPort() :m_hSocketListen(INVALID_SOCKET),
	m_clientCount(0),m_hAccepterThread(INVALID_HANDLE_VALUE),m_hCompletionPort(INVALID_HANDLE_VALUE),
	m_bWorkerRun(true),m_bAccepterRun(true)
{
	// 클라이언트 관리를 위한 ClientInfo 배열 동적(★) 생성.
	m_pClientInfo = new ClientInfo[MAX_CLIENT]();

	for (int i = 0; i < MAX_WORKERTHREAD; ++i)
		m_hWorkerThread[i] = INVALID_HANDLE_VALUE;
	::ZeroMemory(m_socketBuff, sizeof(MAX_SOCKBUF));
}

IOCP 클래스의 객체가 생성되면 내부 멤버를 초기화 한다.

여기서 중요한건 클라이언트(Session)들을 관리하기 위한 동적 배열을 할당.

IOCompletionPort::~IOCompletionPort()
{
	::WSACleanup();
	if (m_pClientInfo)
	{
		delete[] m_pClientInfo;
		m_pClientInfo = nullptr;
	}
}

할당했으면 삭제.

 

그다음 StartServer

bool IOCompletionPort::StartServer()
{
	// ★ CP객체 생성
	m_hCompletionPort = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
	if (m_hCompletionPort == INVALID_HANDLE_VALUE)
	{
		std::cout << "CP객체 생성 실패" << std::endl;
		return false;
	}

	bool bRet = CreateWokerThread();
	if (bRet == false)
		return false;
	bRet = CreateAccepterThread();
	if (bRet == false)
		return false;



	std::cout << "서버 시작..." << std::endl;

	return true;
}

먼저 리슨 소켓까지 생성하였다면 이제 CP객체를 생성한 후

각각 워커 쓰레드(4개의 쓰레드), 클라이언트 연결 처리(1개의 쓰레드)를 하는 쓰레드들을 생성.

 

먼저 워커 쓰레드 (GQCS함수를 호출하는 쓰레드들. 즉 IOCP Queue의 완료일감이 있다면 꺼내와 실행상태가 된다.)

bool IOCompletionPort::CreateWokerThread()
{
	UINT threadId = 0;

	//WaitingThread Queue에 대기 상태로 넣을 쓰레드들 생성(쓰레드 풀)
	// 권장되는 개수 (cpu 개수 * 2 ) + 1
	for (int i = 0; i < MAX_WORKERTHREAD; ++i)
	{
		m_hWorkerThread[i] = (HANDLE)::_beginthreadex(NULL, 0, CallWorkerThread, this, CREATE_SUSPENDED, &threadId);
		if (m_hWorkerThread[i] == INVALID_HANDLE_VALUE)
		{
			std::cout << "쓰레드 생성 실패" << std::endl;
			return false;
		}
		::ResumeThread(m_hWorkerThread[i]);
	}

	std::cout << "WorkerThread 시작.." << std::endl;

	return true;
}
// WSARecv와 WSASend의 Overlapped I/O 작업 처리를 위한 쓰레드함수
unsigned int WINAPI CallWorkerThread(LPVOID p)
{
	IOCompletionPort* pEvent = (IOCompletionPort*)p;
	pEvent->WokerThread();
	return 0;
}

쓰레드 생성 후 this인자를 쓰레드 함수에 넘겨준 후 WorkerThread함수를 호출한다.

void IOCompletionPort::WokerThread()
{
	//CompletionKey(완료 키)를 받을 포인터 변수
	ClientInfo* cpKey = nullptr;
	// 함수 호출 성공여부
	bool bSuccess = true;
	//Overlapped I/O작업에서 송수신된 데이터 크기
	DWORD transferred = 0;
	// Overlapped I/O작업에서 넘겨준 Overlapped구조체
	LPOVERLAPPED pOverlapped = nullptr;
	OverlappedEx* originOverlapped = nullptr;
	while (m_bWorkerRun)
	{
		// GQCS함수로 인해 쓰레드들은 WatingThread 에 들어가 대기상태가 된다. (쓰레드 풀)
		// 완료된 Overlapped I/O작업이 생기면 IOCP queue(완료큐)에 들어가고 쓰레드들 중 하나가 깨어나
		// 그 작업을 가져와 뒤처리를 맡긴다.
		
		bSuccess = ::GetQueuedCompletionStatus(m_hCompletionPort, &transferred, (ULONG_PTR*)&cpKey,
			&pOverlapped, INFINITE);

		// 사용자가 접속을 끊었을 때.
		if (transferred == 0 && bSuccess == FALSE)
		{
			::closesocket(cpKey->m_socketClient);
			continue;
		}

		// 사용자 쓰레드 종료 메시지 처리
		if (bSuccess == TRUE && transferred == 0 && pOverlapped == nullptr)
		{
			m_bWorkerRun = false;
			continue;
		}
		if (pOverlapped == nullptr)
			continue;

		//캐스팅
		originOverlapped = (OverlappedEx*)pOverlapped;


		bool bRet = IOProcess(cpKey,originOverlapped,transferred);
		if (bRet == false)
		{
			m_bWorkerRun = false;
			continue;
		}
	}

}

Overlapped I/O를 요청하고 완료된 일감들이 있기전까지는 쓰레드들은 WaitingThread List(LIFO)에서 대기.

IOCP Queue에 I/O가 완료된 일감이 있다면 Completion Key, Overlapped 구조체 정보를 얻어온다.

 

그리고 완료된 I/O의 정보에 따라 뒤처리를 진행.

bool IOCompletionPort::IOProcess(ClientInfo* pClient,OverlappedEx* pOverlapped,DWORD& transferred)
{
	// RECV처리.(에코니 다시 전송)
	switch (pOverlapped->m_ioType)
	{
	case IO_TYPE::IO_RECV:
		pOverlapped->m_dataBuffer[transferred] = '\0';
		std::cout << "[수신] bytes " << transferred << " msg : " << pOverlapped->m_dataBuffer << std::endl;
		SendMsg(pClient, pOverlapped->m_dataBuffer, transferred);
		break;

	case IO_TYPE::IO_SEND:
		pOverlapped->m_dataBuffer[transferred] = '\0';
		std::cout << "[송신] bytes : " << transferred << " msg : " << pOverlapped->m_dataBuffer << std::endl;
		BindRecv(pClient); // 다시 Recv걸어준다. 
		break;
	default: // 예외
		return false;
	}
	return true;
}

이게 워커 쓰레드의 루틴.

 

이제 연결처리를 하는 쓰레드의 루틴을 본다.

bool IOCompletionPort::CreateAccepterThread()
{
	UINT threadId = 0;

	m_hAccepterThread = (HANDLE)::_beginthreadex(NULL, 0, CallAccepterThread, this, CREATE_SUSPENDED, &threadId);

	if (m_hAccepterThread == INVALID_HANDLE_VALUE)
	{
		std::cout << "쓰레드 생성 실패" << std::endl;
		return false;
	}

	::ResumeThread(m_hAccepterThread);
	return true;
}
// 연결처리 쓰레드 함수.
unsigned int WINAPI CallAccepterThread(LPVOID p)
{
	IOCompletionPort* pEvent = (IOCompletionPort*)p;
	pEvent->AccepterThread();
	return 0;
}

(참고로 전역함수인데 private 멤버함수가 호출가능한것은 해당 전역함수들을 friend화 하였음)

// 클라이언트 연결 처리하는 함수(AccepterThread가 처리)
void IOCompletionPort::AccepterThread()
{
	SOCKADDR_IN clientAddr;
	int addrLen = sizeof(SOCKADDR_IN);
	while (m_bAccepterRun)
	{
		// 접속 받을 구조체의 인덱스를 얻어온다.
		ClientInfo* pClient = GetEmptyClientInfo();
		if (pClient == nullptr)
			return;
		// 클라이언트 접속 요청이 올때까지 대기
		pClient->m_socketClient = ::accept(m_hSocketListen, (SOCKADDR*)&clientAddr, &addrLen);
		if (pClient->m_socketClient == INVALID_SOCKET)
		{
			if (::WSAGetLastError() == WSAEWOULDBLOCK)
				continue;

			break;
		}

		// 클라이언트와 연결된 소켓은 CP객체에 등록한다.
		bool ret = BindIOCompletionPort(pClient);
		if (!ret)
			return;

		++m_clientCount;
	}
}

연결처리를 하는 쓰레드는 클라이언트의 연결 요청이 들어온 후 CP객체에 등록하는 일까지만 담당한다.

먼저 처리하는 루틴을 하나씩 보면

 

1. IOCP클래스의 멤버 m_pClientInfo 동적 배열중 가장 앞의 빈 인덱스를 가져온다(GetEmptyClientInfo 함수)

ClientInfo* IOCompletionPort::GetEmptyClientInfo()
{
	for (int i = 0; i < MAX_CLIENT; ++i)
	{
		if (m_pClientInfo[i].m_socketClient == INVALID_SOCKET)
			return &m_pClientInfo[i];
	}
	return nullptr;
}

2. accpet 처리. 논블록킹 소켓이기에 바로 빠져나옴. 이게 연결을 처리한건지 아니면 아직 완료가 안된건지를 체크.

pClient->m_socketClient = ::accept(m_hSocketListen, (SOCKADDR*)&clientAddr, &addrLen);
if (pClient->m_socketClient == INVALID_SOCKET)
{
	if (::WSAGetLastError() == WSAEWOULDBLOCK)
		continue;

	break;
}

3. 연결이 되었다면 클라이언트와 연결된 소켓 CP객체에 등록

// 클라이언트와 연결된 소켓은 CP객체에 등록한다.
bool ret = BindIOCompletionPort(pClient);
if (!ret)
	return;

++m_clientCount;
bool IOCompletionPort::BindIOCompletionPort(ClientInfo* pClientInfo)
{
	// 소켓과 완료키 CP객체에 등록.
	// 여기서 완료키는 GQCS함수에서 다시 받아볼 수 있음.
	// 완료키는 ClientInfo를 그대로 건네준다.
	// 이 함수의 반환값은 우리가 생성한 CP객체 핸들을 반환.
	HANDLE compareCP;
	compareCP = ::CreateIoCompletionPort((HANDLE)pClientInfo->m_socketClient,
		m_hCompletionPort, reinterpret_cast<ULONG_PTR>(pClientInfo),
		0);
	if (compareCP == NULL || compareCP != m_hCompletionPort)
		return false;

	bool check = BindRecv(pClientInfo); // 꼭 한번 Recv요청을 걸어놔야한다.
	while(!check)
		check =BindRecv(pClientInfo);

	return true;
}

참고로 BindRecv는 WSARecv를 요청하는 함수. 그전에 Overlapped 구조체를 한번 세팅해준다.

 -> CP객체에 소켓을 등록 후 무조건 WSARecv함수(Overlapped I/O)를 한번 요청 해놓아야 한다.

   --> 언제 클라이언트에게 패킷이 올지 모르며, IOCP는 I/O를 요청해야 완료큐에 완료된 일감이 들어오므로.

 

반응형
Comments