mirror of
https://github.com/caozhiyi/CppNet.git
synced 2026-01-19 00:05:14 +08:00
415 lines
14 KiB
C++
415 lines
14 KiB
C++
#ifndef __linux__
|
|
#include "Log.h"
|
|
#include "IOCP.h"
|
|
#include "Timer.h"
|
|
#include "Buffer.h"
|
|
#include "OSInfo.h"
|
|
#include "CppNetImpl.h"
|
|
#include "EventHandler.h"
|
|
#include "WinExpendFunc.h"
|
|
|
|
using namespace cppnet;
|
|
|
|
enum STATE_CODE {
|
|
EXIT_IOCP = 0xFFFFFFFF,
|
|
WEAK_UP_IOCP = 0xAAAAFFFF,
|
|
};
|
|
|
|
CIOCP::CIOCP() : _is_inited(false), _run(true) {
|
|
|
|
}
|
|
|
|
CIOCP::~CIOCP() {
|
|
|
|
}
|
|
|
|
bool CIOCP::Init(uint32_t thread_num) {
|
|
//tell iocp thread num
|
|
_iocp_handler = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, thread_num);
|
|
if (_iocp_handler == INVALID_HANDLE_VALUE) {
|
|
base::LOG_FATAL("IOCP create io completion port failed!");
|
|
return false;
|
|
}
|
|
_is_inited = true;
|
|
return true;
|
|
}
|
|
|
|
bool CIOCP::Dealloc() {
|
|
_run = false;
|
|
PostQueuedCompletionStatus(_iocp_handler, 0, EXIT_IOCP, nullptr);
|
|
return true;
|
|
}
|
|
|
|
uint64_t CIOCP::AddTimerEvent(uint32_t interval, const std::function<void(void*)>& call_back, void* param, bool always) {
|
|
return _timer.AddTimer(interval, call_back, param, always);
|
|
}
|
|
|
|
bool CIOCP::RemoveTimerEvent(uint64_t timer_id) {
|
|
return _timer.DelTimer(timer_id);
|
|
}
|
|
|
|
bool CIOCP::AddTimerEvent(uint32_t interval, base::CMemSharePtr<CEventHandler>& event) {
|
|
_timer.AddTimer(interval, event);
|
|
return true;
|
|
}
|
|
|
|
bool CIOCP::AddSendEvent(base::CMemSharePtr<CEventHandler>& event) {
|
|
auto socket_ptr = event->_client_socket.Lock();
|
|
if (socket_ptr && _AddToActions(socket_ptr)) {
|
|
((EventOverlapped*)event->_data)->_event = &event;
|
|
return _PostSend(event);
|
|
}
|
|
base::LOG_WARN("socket is already destroyed!");
|
|
return false;
|
|
}
|
|
|
|
bool CIOCP::AddRecvEvent(base::CMemSharePtr<CEventHandler>& event) {
|
|
auto socket_ptr = event->_client_socket.Lock();
|
|
if (socket_ptr && _AddToActions(socket_ptr)) {
|
|
((EventOverlapped*)event->_data)->_event = &event;
|
|
return _PostRecv(event);
|
|
}
|
|
base::LOG_WARN("socket is already destroyed!");
|
|
return false;
|
|
}
|
|
|
|
bool CIOCP::AddAcceptEvent(base::CMemSharePtr<CAcceptEventHandler>& event) {
|
|
if (!event->_accept_socket->IsInActions()) {
|
|
if (CreateIoCompletionPort((HANDLE)(event->_accept_socket->GetSocket()), _iocp_handler, 0, 0) == NULL) {
|
|
base::LOG_ERROR("IOCP bind socket to io completion port failed!");
|
|
return false;
|
|
}
|
|
}
|
|
((EventOverlapped*)event->_data)->_event = &event;
|
|
event->_accept_socket->SetInActions(true);
|
|
return _PostAccept(event);
|
|
}
|
|
|
|
bool CIOCP::AddConnection(base::CMemSharePtr<CEventHandler>& event, const std::string& ip, short port, const char* buf, uint32_t buf_len) {
|
|
auto socket_ptr = event->_client_socket.Lock();
|
|
if (socket_ptr && _AddToActions(socket_ptr)) {
|
|
((EventOverlapped*)event->_data)->_event = &event;
|
|
return _PostConnection(event, ip, port, buf, buf_len);
|
|
}
|
|
base::LOG_WARN("socket is already destroyed!");
|
|
return false;
|
|
}
|
|
|
|
bool CIOCP::AddDisconnection(base::CMemSharePtr<CEventHandler>& event) {
|
|
auto socket_ptr = event->_client_socket.Lock();
|
|
if (socket_ptr && _AddToActions(socket_ptr)) {
|
|
((EventOverlapped*)event->_data)->_event = &event;
|
|
return _PostDisconnection(event);
|
|
}
|
|
base::LOG_WARN("socket is already destroyed!");
|
|
return false;
|
|
}
|
|
|
|
bool CIOCP::DelEvent(base::CMemSharePtr<CEventHandler>& event) {
|
|
((EventOverlapped*)event->_data)->_event = nullptr;
|
|
auto socket_ptr = event->_client_socket.Lock();
|
|
if (socket_ptr) {
|
|
CancelIoEx((HANDLE)socket_ptr->GetSocket(), &((EventOverlapped*)event->_data)->_overlapped);
|
|
}
|
|
return true;
|
|
}
|
|
|
|
void CIOCP::ProcessEvent() {
|
|
DWORD bytes_transfered = 0;
|
|
EventOverlapped *socket_context = nullptr;
|
|
OVERLAPPED *over_lapped = nullptr;
|
|
unsigned int wait_time = 0;
|
|
std::vector<base::CMemSharePtr<CTimerEvent>> timer_vec;
|
|
while (_run) {
|
|
wait_time = _timer.TimeoutCheck(timer_vec);
|
|
//if there is no timer event. wait until recv something
|
|
if (wait_time == 0 && timer_vec.empty()) {
|
|
wait_time = INFINITE;
|
|
|
|
} else {
|
|
wait_time = wait_time > 0 ? wait_time : 1;
|
|
}
|
|
|
|
int res = GetQueuedCompletionStatus(_iocp_handler, &bytes_transfered, PULONG_PTR(&socket_context),
|
|
&over_lapped, wait_time);
|
|
|
|
DWORD dw_err = 0;
|
|
if (res) {
|
|
dw_err = GetLastError();
|
|
// exit
|
|
if ((PULONG_PTR)socket_context == (PULONG_PTR)EXIT_IOCP){
|
|
break;
|
|
}
|
|
|
|
} else {
|
|
dw_err = GetLastError();
|
|
}
|
|
|
|
// timer out event
|
|
if (dw_err == WAIT_TIMEOUT) {
|
|
if (!timer_vec.empty()) {
|
|
_DoTimeoutEvent(timer_vec);
|
|
}
|
|
_DoTaskList();
|
|
|
|
// read some thing
|
|
} else if (ERROR_NETNAME_DELETED == dw_err || NO_ERROR == dw_err || ERROR_IO_PENDING == dw_err) {
|
|
if (over_lapped) {
|
|
socket_context = CONTAINING_RECORD(over_lapped, EventOverlapped, _overlapped);
|
|
base::LOG_DEBUG("Get a new event : %d", socket_context->_event_flag_set);
|
|
_DoEvent(socket_context, bytes_transfered);
|
|
}
|
|
if (!timer_vec.empty()) {
|
|
_DoTimeoutEvent(timer_vec);
|
|
}
|
|
_DoTaskList();
|
|
|
|
} else if (ERROR_CONNECTION_REFUSED == dw_err || ERROR_SEM_TIMEOUT == dw_err || WSAENOTCONN == dw_err || ERROR_OPERATION_ABORTED == dw_err) {
|
|
if (over_lapped) {
|
|
socket_context = CONTAINING_RECORD(over_lapped, EventOverlapped, _overlapped);
|
|
base::LOG_DEBUG("Get a new event : %d", socket_context->_event_flag_set);
|
|
socket_context->_event_flag_set |= ERR_CONNECT_CLOSE;
|
|
_DoEvent(socket_context, bytes_transfered);
|
|
}
|
|
if (!timer_vec.empty()) {
|
|
_DoTimeoutEvent(timer_vec);
|
|
}
|
|
_DoTaskList();
|
|
|
|
} else {
|
|
base::LOG_ERROR("IOCP GetQueuedCompletionStatus return error : %d", dw_err);
|
|
continue;
|
|
}
|
|
}
|
|
|
|
if (_is_inited) {
|
|
// only one iocp handle
|
|
static bool once = true;
|
|
if (once) {
|
|
once = false;
|
|
if (CloseHandle(_iocp_handler) == -1) {
|
|
base::LOG_ERROR("IOCP close io completion port failed!");
|
|
}
|
|
}
|
|
|
|
}
|
|
_is_inited = false;
|
|
}
|
|
|
|
void CIOCP::PostTask(std::function<void(void)>& task) {
|
|
{
|
|
std::unique_lock<std::mutex> lock(_mutex);
|
|
_task_list.push_back(task);
|
|
}
|
|
WakeUp();
|
|
}
|
|
|
|
void CIOCP::WakeUp() {
|
|
PostQueuedCompletionStatus(_iocp_handler, 0, WEAK_UP_IOCP, nullptr);
|
|
}
|
|
|
|
bool CIOCP::_PostRecv(base::CMemSharePtr<CEventHandler>& event) {
|
|
EventOverlapped* context = (EventOverlapped*)event->_data;
|
|
|
|
DWORD dwFlags = 0;
|
|
DWORD dwBytes = 0;
|
|
context->Clear();
|
|
context->_event_flag_set = event->_event_flag_set;
|
|
OVERLAPPED *lapped = &context->_overlapped;
|
|
auto socket_ptr = event->_client_socket.Lock();
|
|
|
|
int res = WSARecv(socket_ptr->GetSocket(), &context->_wsa_buf, 1, &dwFlags, &dwBytes, lapped, nullptr);
|
|
|
|
if ((SOCKET_ERROR == res) && (WSA_IO_PENDING != WSAGetLastError())) {
|
|
base::LOG_WARN("IOCP post recv event failed! error code: %d", WSAGetLastError());
|
|
return false;
|
|
}
|
|
base::LOG_DEBUG("post a new event : %d", context->_event_flag_set);
|
|
return true;
|
|
}
|
|
|
|
bool CIOCP::_PostAccept(base::CMemSharePtr<CAcceptEventHandler>& event) {
|
|
if (!__AcceptEx) {
|
|
base::LOG_ERROR("__AcceptEx function is null!");
|
|
return false;
|
|
}
|
|
|
|
EventOverlapped* context = (EventOverlapped*)event->_data;
|
|
context->Clear();
|
|
DWORD dwBytes = 0;
|
|
context->_event_flag_set |= event->_event_flag_set;
|
|
OVERLAPPED *lapped = &context->_overlapped;
|
|
|
|
int res = __AcceptEx((SOCKET)event->_accept_socket->GetSocket(), (SOCKET)event->_client_socket->GetSocket(), &context->_lapped_buffer, context->_wsa_buf.len - ((sizeof(SOCKADDR_IN) + 16) * 2),
|
|
sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, &dwBytes, lapped);
|
|
|
|
if (FALSE == res) {
|
|
if (WSA_IO_PENDING != WSAGetLastError()) {
|
|
base::LOG_ERROR("IOCP post accept failed! error code:%d", WSAGetLastError());
|
|
return false;
|
|
}
|
|
}
|
|
base::LOG_DEBUG("post a new event : %d", context->_event_flag_set);
|
|
return true;
|
|
}
|
|
|
|
bool CIOCP::_PostSend(base::CMemSharePtr<CEventHandler>& event) {
|
|
EventOverlapped* context = (EventOverlapped*)event->_data;
|
|
|
|
context->Clear();
|
|
context->_event_flag_set = event->_event_flag_set;
|
|
context->_wsa_buf.len = event->_buffer->Read(context->_lapped_buffer, __iocp_buff_size);
|
|
|
|
OVERLAPPED *lapped = &context->_overlapped;
|
|
auto socket_ptr = event->_client_socket.Lock();
|
|
|
|
int res = WSASend(socket_ptr->GetSocket(), &context->_wsa_buf, 1, nullptr, 0, lapped, nullptr);
|
|
|
|
if ((SOCKET_ERROR == res) && (WSA_IO_PENDING != WSAGetLastError())) {
|
|
base::LOG_WARN("IOCP post send event failed! error code: %d", WSAGetLastError());
|
|
return false;
|
|
}
|
|
base::LOG_DEBUG("post a new event : %d", context->_event_flag_set);
|
|
return true;
|
|
}
|
|
|
|
bool CIOCP::_PostConnection(base::CMemSharePtr<CEventHandler>& event, const std::string& ip, short port, const char* buf, uint32_t buf_len) {
|
|
EventOverlapped* context = (EventOverlapped*)event->_data;
|
|
|
|
DWORD dwFlags = 0;
|
|
DWORD dwBytes = 0;
|
|
context->Clear();
|
|
context->_event_flag_set = event->_event_flag_set;
|
|
OVERLAPPED *lapped = &context->_overlapped;
|
|
|
|
SOCKADDR_IN addr;
|
|
addr.sin_family = AF_INET;
|
|
addr.sin_port = htons(port);
|
|
addr.sin_addr.S_un.S_addr = inet_addr(ip.c_str());
|
|
|
|
SOCKADDR_IN local;
|
|
local.sin_family = AF_INET;
|
|
local.sin_port = htons(0);
|
|
local.sin_addr.S_un.S_addr = INADDR_ANY;
|
|
|
|
auto socket_ptr = event->_client_socket.Lock();
|
|
if (SOCKET_ERROR == bind(socket_ptr->GetSocket(), (sockaddr*)&local, sizeof(local))) {
|
|
base::LOG_FATAL("bind local host failed! error code: %d", WSAGetLastError());
|
|
}
|
|
|
|
int res = __ConnectEx(socket_ptr->GetSocket(), (sockaddr*)&addr, sizeof(addr), (PVOID)buf, buf_len, nullptr, lapped);
|
|
|
|
if ((SOCKET_ERROR == res) && (WSA_IO_PENDING != WSAGetLastError())) {
|
|
base::LOG_FATAL("IOCP post connect event failed! error code: %d", WSAGetLastError());
|
|
return false;
|
|
}
|
|
base::LOG_DEBUG("post a new event : %d", context->_event_flag_set);
|
|
return true;
|
|
}
|
|
|
|
bool CIOCP::_PostDisconnection(base::CMemSharePtr<CEventHandler>& event) {
|
|
EventOverlapped* context = (EventOverlapped*)event->_data;
|
|
|
|
context->Clear();
|
|
context->_event_flag_set = event->_event_flag_set;
|
|
|
|
OVERLAPPED *lapped = &context->_overlapped;
|
|
auto socket_ptr = event->_client_socket.Lock();
|
|
|
|
int res = __DisconnectionEx(socket_ptr->GetSocket(), lapped, TF_REUSE_SOCKET, 0);
|
|
|
|
if ((SOCKET_ERROR == res) && (WSA_IO_PENDING != WSAGetLastError())) {
|
|
base::LOG_FATAL("IOCP post disconnect event failed! error code: %d", WSAGetLastError());
|
|
return false;
|
|
}
|
|
base::LOG_DEBUG("post a new event : %d", context->_event_flag_set);
|
|
return true;
|
|
}
|
|
|
|
void CIOCP::_DoTimeoutEvent(std::vector<base::CMemSharePtr<CTimerEvent>>& timer_vec) {
|
|
for (auto iter = timer_vec.begin(); iter != timer_vec.end(); ++iter) {
|
|
if ((*iter)->_event_flag & EVENT_READ) {
|
|
base::CMemSharePtr<CEventHandler> event_ptr = (*iter)->_event.Lock();
|
|
base::CMemSharePtr<CSocketImpl> socket_ptr = event_ptr->_client_socket.Lock();
|
|
if (socket_ptr) {
|
|
event_ptr->_event_flag_set |= EVENT_TIMER;
|
|
socket_ptr->Recv(event_ptr);
|
|
}
|
|
|
|
} else if ((*iter)->_event_flag & EVENT_WRITE) {
|
|
base::CMemSharePtr<CEventHandler> event_ptr = (*iter)->_event.Lock();
|
|
base::CMemSharePtr<CSocketImpl> socket_ptr = event_ptr->_client_socket.Lock();
|
|
if (socket_ptr) {
|
|
event_ptr->_event_flag_set |= EVENT_TIMER;
|
|
socket_ptr->Send(event_ptr);
|
|
}
|
|
|
|
} else if ((*iter)->_event_flag & EVENT_TIMER) {
|
|
auto func = (*iter)->_timer_call_back;
|
|
if (func) {
|
|
func((*iter)->_timer_param);
|
|
}
|
|
}
|
|
}
|
|
timer_vec.clear();
|
|
}
|
|
|
|
void CIOCP::_DoEvent(EventOverlapped *socket_context, uint32_t bytes) {
|
|
if (socket_context->_event_flag_set < EVENT_READ || socket_context->_event_flag_set > EVENT_ERR_MAX) {
|
|
return;
|
|
}
|
|
if (socket_context->_event_flag_set & EVENT_ACCEPT) {
|
|
base::CMemSharePtr<CAcceptEventHandler>* event = (base::CMemSharePtr<CAcceptEventHandler>*)socket_context->_event;
|
|
if (event) {
|
|
(*event)->_client_socket->_read_event->_off_set = bytes;
|
|
(*event)->_accept_socket->_Accept((*event));
|
|
}
|
|
|
|
} else {
|
|
base::CMemSharePtr<CEventHandler>* event = (base::CMemSharePtr<CEventHandler>*)socket_context->_event;
|
|
if (event && !event->Expired()) {
|
|
(*event)->_event_flag_set = socket_context->_event_flag_set;
|
|
(*event)->_off_set = bytes;
|
|
if (socket_context->_event_flag_set & EVENT_READ
|
|
|| socket_context->_event_flag_set & EVENT_CONNECT
|
|
|| socket_context->_event_flag_set & EVENT_DISCONNECT) {
|
|
auto socket_ptr = (*event)->_client_socket.Lock();
|
|
if (socket_ptr) {
|
|
socket_ptr->Recv((*event));
|
|
}
|
|
|
|
} else if ((*event)->_event_flag_set & EVENT_WRITE) {
|
|
auto socket_ptr = (*event)->_client_socket.Lock();
|
|
if (socket_ptr) {
|
|
socket_ptr->Send((*event));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void CIOCP::_DoTaskList() {
|
|
std::vector<std::function<void(void)>> func_vec;
|
|
{
|
|
std::unique_lock<std::mutex> lock(_mutex);
|
|
func_vec.swap(_task_list);
|
|
}
|
|
|
|
for (size_t i = 0; i < func_vec.size(); ++i) {
|
|
func_vec[i]();
|
|
}
|
|
}
|
|
|
|
bool CIOCP::_AddToActions(base::CMemSharePtr<CSocketImpl>& socket) {
|
|
if (!socket->IsInActions()) {
|
|
if (CreateIoCompletionPort((HANDLE)(socket->GetSocket()), _iocp_handler, 0, 0) == NULL) {
|
|
base::LOG_ERROR("IOCP bind socket to io completion port failed!");
|
|
return false;
|
|
}
|
|
socket->SetInActions(true);
|
|
}
|
|
return true;
|
|
}
|
|
|
|
#endif
|