#ifndef __linux__ #include "Log.h" #include "IOCP.h" #include "Timer.h" #include "Buffer.h" #include "OSInfo.h" #include "CppNetImpl.h" #include "EventHandler.h" #include "WinExpendFunc.h" using namespace cppnet; enum STATE_CODE { EXIT_IOCP = 0xFFFFFFFF, WEAK_UP_IOCP = 0xAAAAFFFF, }; CIOCP::CIOCP() : _is_inited(false), _run(true) { } CIOCP::~CIOCP() { } bool CIOCP::Init(uint32_t thread_num) { //tell iocp thread num _iocp_handler = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, thread_num); if (_iocp_handler == INVALID_HANDLE_VALUE) { base::LOG_FATAL("IOCP create io completion port failed!"); return false; } _is_inited = true; return true; } bool CIOCP::Dealloc() { _run = false; PostQueuedCompletionStatus(_iocp_handler, 0, EXIT_IOCP, nullptr); return true; } uint64_t CIOCP::AddTimerEvent(uint32_t interval, const std::function& call_back, void* param, bool always) { return _timer.AddTimer(interval, call_back, param, always); } bool CIOCP::RemoveTimerEvent(uint64_t timer_id) { return _timer.DelTimer(timer_id); } bool CIOCP::AddTimerEvent(uint32_t interval, base::CMemSharePtr& event) { _timer.AddTimer(interval, event); return true; } bool CIOCP::AddSendEvent(base::CMemSharePtr& event) { auto socket_ptr = event->_client_socket.Lock(); if (socket_ptr && _AddToActions(socket_ptr)) { ((EventOverlapped*)event->_data)->_event = &event; return _PostSend(event); } base::LOG_WARN("socket is already destroyed!"); return false; } bool CIOCP::AddRecvEvent(base::CMemSharePtr& event) { auto socket_ptr = event->_client_socket.Lock(); if (socket_ptr && _AddToActions(socket_ptr)) { ((EventOverlapped*)event->_data)->_event = &event; return _PostRecv(event); } base::LOG_WARN("socket is already destroyed!"); return false; } bool CIOCP::AddAcceptEvent(base::CMemSharePtr& event) { if (!event->_accept_socket->IsInActions()) { if (CreateIoCompletionPort((HANDLE)(event->_accept_socket->GetSocket()), _iocp_handler, 0, 0) == NULL) { base::LOG_ERROR("IOCP bind socket to io completion port failed!"); return false; } } ((EventOverlapped*)event->_data)->_event = &event; event->_accept_socket->SetInActions(true); return _PostAccept(event); } bool CIOCP::AddConnection(base::CMemSharePtr& event, const std::string& ip, short port, const char* buf, uint32_t buf_len) { auto socket_ptr = event->_client_socket.Lock(); if (socket_ptr && _AddToActions(socket_ptr)) { ((EventOverlapped*)event->_data)->_event = &event; return _PostConnection(event, ip, port, buf, buf_len); } base::LOG_WARN("socket is already destroyed!"); return false; } bool CIOCP::AddDisconnection(base::CMemSharePtr& event) { auto socket_ptr = event->_client_socket.Lock(); if (socket_ptr && _AddToActions(socket_ptr)) { ((EventOverlapped*)event->_data)->_event = &event; return _PostDisconnection(event); } base::LOG_WARN("socket is already destroyed!"); return false; } bool CIOCP::DelEvent(base::CMemSharePtr& event) { ((EventOverlapped*)event->_data)->_event = nullptr; auto socket_ptr = event->_client_socket.Lock(); if (socket_ptr) { CancelIoEx((HANDLE)socket_ptr->GetSocket(), &((EventOverlapped*)event->_data)->_overlapped); } return true; } void CIOCP::ProcessEvent() { DWORD bytes_transfered = 0; EventOverlapped *socket_context = nullptr; OVERLAPPED *over_lapped = nullptr; unsigned int wait_time = 0; std::vector> timer_vec; while (_run) { wait_time = _timer.TimeoutCheck(timer_vec); //if there is no timer event. wait until recv something if (wait_time == 0 && timer_vec.empty()) { wait_time = INFINITE; } else { wait_time = wait_time > 0 ? wait_time : 1; } int res = GetQueuedCompletionStatus(_iocp_handler, &bytes_transfered, PULONG_PTR(&socket_context), &over_lapped, wait_time); DWORD dw_err = 0; if (res) { dw_err = GetLastError(); // exit if ((PULONG_PTR)socket_context == (PULONG_PTR)EXIT_IOCP){ break; } } else { dw_err = GetLastError(); } // timer out event if (dw_err == WAIT_TIMEOUT) { if (!timer_vec.empty()) { _DoTimeoutEvent(timer_vec); } _DoTaskList(); // read some thing } else if (ERROR_NETNAME_DELETED == dw_err || NO_ERROR == dw_err || ERROR_IO_PENDING == dw_err) { if (over_lapped) { socket_context = CONTAINING_RECORD(over_lapped, EventOverlapped, _overlapped); base::LOG_DEBUG("Get a new event : %d", socket_context->_event_flag_set); _DoEvent(socket_context, bytes_transfered); } if (!timer_vec.empty()) { _DoTimeoutEvent(timer_vec); } _DoTaskList(); } else if (ERROR_CONNECTION_REFUSED == dw_err || ERROR_SEM_TIMEOUT == dw_err || WSAENOTCONN == dw_err || ERROR_OPERATION_ABORTED == dw_err) { if (over_lapped) { socket_context = CONTAINING_RECORD(over_lapped, EventOverlapped, _overlapped); base::LOG_DEBUG("Get a new event : %d", socket_context->_event_flag_set); socket_context->_event_flag_set |= ERR_CONNECT_CLOSE; _DoEvent(socket_context, bytes_transfered); } if (!timer_vec.empty()) { _DoTimeoutEvent(timer_vec); } _DoTaskList(); } else { base::LOG_ERROR("IOCP GetQueuedCompletionStatus return error : %d", dw_err); continue; } } if (_is_inited) { // only one iocp handle static bool once = true; if (once) { once = false; if (CloseHandle(_iocp_handler) == -1) { base::LOG_ERROR("IOCP close io completion port failed!"); } } } _is_inited = false; } void CIOCP::PostTask(std::function& task) { { std::unique_lock lock(_mutex); _task_list.push_back(task); } WakeUp(); } void CIOCP::WakeUp() { PostQueuedCompletionStatus(_iocp_handler, 0, WEAK_UP_IOCP, nullptr); } bool CIOCP::_PostRecv(base::CMemSharePtr& event) { EventOverlapped* context = (EventOverlapped*)event->_data; DWORD dwFlags = 0; DWORD dwBytes = 0; context->Clear(); context->_event_flag_set = event->_event_flag_set; OVERLAPPED *lapped = &context->_overlapped; auto socket_ptr = event->_client_socket.Lock(); int res = WSARecv(socket_ptr->GetSocket(), &context->_wsa_buf, 1, &dwFlags, &dwBytes, lapped, nullptr); if ((SOCKET_ERROR == res) && (WSA_IO_PENDING != WSAGetLastError())) { base::LOG_WARN("IOCP post recv event failed! error code: %d", WSAGetLastError()); return false; } base::LOG_DEBUG("post a new event : %d", context->_event_flag_set); return true; } bool CIOCP::_PostAccept(base::CMemSharePtr& event) { if (!__AcceptEx) { base::LOG_ERROR("__AcceptEx function is null!"); return false; } EventOverlapped* context = (EventOverlapped*)event->_data; context->Clear(); DWORD dwBytes = 0; context->_event_flag_set |= event->_event_flag_set; OVERLAPPED *lapped = &context->_overlapped; int res = __AcceptEx((SOCKET)event->_accept_socket->GetSocket(), (SOCKET)event->_client_socket->GetSocket(), &context->_lapped_buffer, context->_wsa_buf.len - ((sizeof(SOCKADDR_IN) + 16) * 2), sizeof(SOCKADDR_IN) + 16, sizeof(SOCKADDR_IN) + 16, &dwBytes, lapped); if (FALSE == res) { if (WSA_IO_PENDING != WSAGetLastError()) { base::LOG_ERROR("IOCP post accept failed! error code:%d", WSAGetLastError()); return false; } } base::LOG_DEBUG("post a new event : %d", context->_event_flag_set); return true; } bool CIOCP::_PostSend(base::CMemSharePtr& event) { EventOverlapped* context = (EventOverlapped*)event->_data; context->Clear(); context->_event_flag_set = event->_event_flag_set; context->_wsa_buf.len = event->_buffer->Read(context->_lapped_buffer, __iocp_buff_size); OVERLAPPED *lapped = &context->_overlapped; auto socket_ptr = event->_client_socket.Lock(); int res = WSASend(socket_ptr->GetSocket(), &context->_wsa_buf, 1, nullptr, 0, lapped, nullptr); if ((SOCKET_ERROR == res) && (WSA_IO_PENDING != WSAGetLastError())) { base::LOG_WARN("IOCP post send event failed! error code: %d", WSAGetLastError()); return false; } base::LOG_DEBUG("post a new event : %d", context->_event_flag_set); return true; } bool CIOCP::_PostConnection(base::CMemSharePtr& event, const std::string& ip, short port, const char* buf, uint32_t buf_len) { EventOverlapped* context = (EventOverlapped*)event->_data; DWORD dwFlags = 0; DWORD dwBytes = 0; context->Clear(); context->_event_flag_set = event->_event_flag_set; OVERLAPPED *lapped = &context->_overlapped; SOCKADDR_IN addr; addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.S_un.S_addr = inet_addr(ip.c_str()); SOCKADDR_IN local; local.sin_family = AF_INET; local.sin_port = htons(0); local.sin_addr.S_un.S_addr = INADDR_ANY; auto socket_ptr = event->_client_socket.Lock(); if (SOCKET_ERROR == bind(socket_ptr->GetSocket(), (sockaddr*)&local, sizeof(local))) { base::LOG_FATAL("bind local host failed! error code: %d", WSAGetLastError()); } int res = __ConnectEx(socket_ptr->GetSocket(), (sockaddr*)&addr, sizeof(addr), (PVOID)buf, buf_len, nullptr, lapped); if ((SOCKET_ERROR == res) && (WSA_IO_PENDING != WSAGetLastError())) { base::LOG_FATAL("IOCP post connect event failed! error code: %d", WSAGetLastError()); return false; } base::LOG_DEBUG("post a new event : %d", context->_event_flag_set); return true; } bool CIOCP::_PostDisconnection(base::CMemSharePtr& event) { EventOverlapped* context = (EventOverlapped*)event->_data; context->Clear(); context->_event_flag_set = event->_event_flag_set; OVERLAPPED *lapped = &context->_overlapped; auto socket_ptr = event->_client_socket.Lock(); int res = __DisconnectionEx(socket_ptr->GetSocket(), lapped, TF_REUSE_SOCKET, 0); if ((SOCKET_ERROR == res) && (WSA_IO_PENDING != WSAGetLastError())) { base::LOG_FATAL("IOCP post disconnect event failed! error code: %d", WSAGetLastError()); return false; } base::LOG_DEBUG("post a new event : %d", context->_event_flag_set); return true; } void CIOCP::_DoTimeoutEvent(std::vector>& timer_vec) { for (auto iter = timer_vec.begin(); iter != timer_vec.end(); ++iter) { if ((*iter)->_event_flag & EVENT_READ) { base::CMemSharePtr event_ptr = (*iter)->_event.Lock(); base::CMemSharePtr socket_ptr = event_ptr->_client_socket.Lock(); if (socket_ptr) { event_ptr->_event_flag_set |= EVENT_TIMER; socket_ptr->Recv(event_ptr); } } else if ((*iter)->_event_flag & EVENT_WRITE) { base::CMemSharePtr event_ptr = (*iter)->_event.Lock(); base::CMemSharePtr socket_ptr = event_ptr->_client_socket.Lock(); if (socket_ptr) { event_ptr->_event_flag_set |= EVENT_TIMER; socket_ptr->Send(event_ptr); } } else if ((*iter)->_event_flag & EVENT_TIMER) { auto func = (*iter)->_timer_call_back; if (func) { func((*iter)->_timer_param); } } } timer_vec.clear(); } void CIOCP::_DoEvent(EventOverlapped *socket_context, uint32_t bytes) { if (socket_context->_event_flag_set < EVENT_READ || socket_context->_event_flag_set > EVENT_ERR_MAX) { return; } if (socket_context->_event_flag_set & EVENT_ACCEPT) { base::CMemSharePtr* event = (base::CMemSharePtr*)socket_context->_event; if (event) { (*event)->_client_socket->_read_event->_off_set = bytes; (*event)->_accept_socket->_Accept((*event)); } } else { base::CMemSharePtr* event = (base::CMemSharePtr*)socket_context->_event; if (event && !event->Expired()) { (*event)->_event_flag_set = socket_context->_event_flag_set; (*event)->_off_set = bytes; if (socket_context->_event_flag_set & EVENT_READ || socket_context->_event_flag_set & EVENT_CONNECT || socket_context->_event_flag_set & EVENT_DISCONNECT) { auto socket_ptr = (*event)->_client_socket.Lock(); if (socket_ptr) { socket_ptr->Recv((*event)); } } else if ((*event)->_event_flag_set & EVENT_WRITE) { auto socket_ptr = (*event)->_client_socket.Lock(); if (socket_ptr) { socket_ptr->Send((*event)); } } } } } void CIOCP::_DoTaskList() { std::vector> func_vec; { std::unique_lock lock(_mutex); func_vec.swap(_task_list); } for (size_t i = 0; i < func_vec.size(); ++i) { func_vec[i](); } } bool CIOCP::_AddToActions(base::CMemSharePtr& socket) { if (!socket->IsInActions()) { if (CreateIoCompletionPort((HANDLE)(socket->GetSocket()), _iocp_handler, 0, 0) == NULL) { base::LOG_ERROR("IOCP bind socket to io completion port failed!"); return false; } socket->SetInActions(true); } return true; } #endif