#ifdef __linux__ #include #include #include #include #include #include "Log.h" #include "Timer.h" #include "CEpoll.h" #include "OSInfo.h" #include "Buffer.h" #include "Socket.h" #include "LinuxFunc.h" #include "EventHandler.h" using namespace cppnet; // check socket connect bool CheckConnect(const uint64_t& sock) { struct pollfd fd; int ret = 0; socklen_t len = 0; fd.fd = sock; fd.events = POLLOUT; if (poll(&fd, 1, -1) == -1) { if(errno != EINTR){ return false; } } len = sizeof(ret); if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &ret, &len) == -1) { return false; } if(ret != 0) { return false; } return true; } enum EPOLL_CODE { EXIT_EPOLL = 1, WEAK_EPOLL = 0 }; CEpoll::CEpoll(bool per_epoll) : _run(true), _per_epoll(per_epoll), _epoll_handler(-1) { } CEpoll::~CEpoll() { if (_epoll_handler > 0) { close(_epoll_handler); } } bool CEpoll::Init(uint32_t) { //Disable SIGPIPE signal sigset_t set; sigprocmask(SIG_SETMASK, NULL, &set); sigaddset(&set, SIGPIPE); sigprocmask(SIG_SETMASK, &set, NULL); //get epoll handle. the param is invalid since linux 2.6.8 _epoll_handler = epoll_create(1500); if (_epoll_handler == -1) { base::LOG_FATAL("epoll init failed! error : %d", errno); return false; } if (pipe((int*)_pipe) == -1) { base::LOG_FATAL("pipe init failed! error : %d", errno); return false; } SetSocketNoblocking(_pipe[1]); SetSocketNoblocking(_pipe[0]); _pipe_content.events = EPOLLIN; _pipe_content.data.fd = _pipe[0]; int res = epoll_ctl(_epoll_handler, EPOLL_CTL_ADD, _pipe[0], &_pipe_content); if (res == -1) { base::LOG_ERROR("add pipe handle to epoll faild! error :%d", errno); return false; } return true; } bool CEpoll::Dealloc() { _run = false; WakeUp(); return true; } uint64_t CEpoll::AddTimerEvent(uint32_t interval, const timer_call_back& call_back, void* param, bool always) { return _timer.AddTimer(interval, call_back, param, always); } bool CEpoll::RemoveTimerEvent(uint64_t timer_id) { return _timer.DelTimer(timer_id); } bool CEpoll::AddTimerEvent(uint32_t interval, base::CMemSharePtr& event) { _timer.AddTimer(interval, event); base::LOG_DEBUG("add a timer event, %d", interval); return true; } bool CEpoll::AddSendEvent(base::CMemSharePtr& event) { auto socket_ptr = event->_client_socket.Lock(); if (socket_ptr) { bool res = false; epoll_event* content = (epoll_event*)event->_data; //if not add to epoll if (!(content->events & EPOLLOUT)) { if (socket_ptr->IsInActions()) { res = _ModifyEvent(event, EPOLLOUT, socket_ptr->GetSocket()); } else { res = _AddEvent(event, EPOLLOUT, socket_ptr->GetSocket()); } } //reset one shot flag res = _ResetOneShot(event, EPOLLOUT, socket_ptr->GetSocket()); socket_ptr->SetInActions(true); return res; } base::LOG_WARN("write event is already distroyed! in %s", "AddSendEvent"); return false; } bool CEpoll::AddRecvEvent(base::CMemSharePtr& event) { auto socket_ptr = event->_client_socket.Lock(); if (socket_ptr) { bool res = false; epoll_event* content = (epoll_event*)event->_data; //if not add to epoll if (!(content->events & EPOLLIN)) { if (socket_ptr->IsInActions()) { res = _ModifyEvent(event, EPOLLIN|EPOLLRDHUP, socket_ptr->GetSocket()); } else { res = _AddEvent(event, EPOLLIN|EPOLLRDHUP, socket_ptr->GetSocket()); } } //reset one shot flag res = _ResetOneShot(event, EPOLLIN, socket_ptr->GetSocket()); if (res) { socket_ptr->SetInActions(true); } return res; } base::LOG_WARN("read event is already distroyed!in AddRecvEvent"); return false; } bool CEpoll::AddAcceptEvent(base::CMemSharePtr& event) { bool res = false; epoll_event* content = (epoll_event*)event->_data; auto socket_ptr = event->_accept_socket; //if not add to epoll if (!(content->events & EPOLLIN)) { res = _AddEvent(event, EPOLLIN, socket_ptr->GetSocket()); } socket_ptr->SetInActions(true); return res; } bool CEpoll::AddConnection(base::CMemSharePtr& event, const std::string& ip, short port) { if (ip.empty()) { return false; } auto socket_ptr = event->_client_socket.Lock(); if (socket_ptr) { //the socket must not in epoll if (socket_ptr->IsInActions()) { return false; } struct sockaddr_in addr; addr.sin_family = AF_INET; addr.sin_port = htons(port); addr.sin_addr.s_addr = inet_addr(ip.c_str()); //block here in linux SetSocketNoblocking(socket_ptr->GetSocket()); int res = connect(socket_ptr->GetSocket(), (sockaddr *)&addr, sizeof(addr)); if (res == 0) { socket_ptr->Recv(socket_ptr->_read_event); return true; } else if (errno == EINPROGRESS) { if (CheckConnect(socket_ptr->GetSocket())) { socket_ptr->Recv(socket_ptr->_read_event); return true; } socket_ptr->_read_event->_event_flag_set |= ERR_CONNECT_FAILED; socket_ptr->Recv(socket_ptr->_read_event); } base::LOG_WARN("connect event failed! %d", errno); return false; } base::LOG_WARN("connection event is already destroyed!,%s", "AddConnection"); return false; } bool CEpoll::AddDisconnection(base::CMemSharePtr& event) { auto socket_ptr = event->_client_socket.Lock(); if (socket_ptr) { if (DelEvent(event)) { socket_ptr->Recv(socket_ptr->_read_event); } } return true; } bool CEpoll::DelEvent(base::CMemSharePtr& event) { auto socket_ptr = event->_client_socket.Lock(); if (!socket_ptr) { return false; } epoll_event* content = (epoll_event*)event->_data; int res = epoll_ctl(_epoll_handler, EPOLL_CTL_DEL, socket_ptr->GetSocket(), content); if (res == -1) { base::LOG_ERROR("remove event from epoll faild! error :%d, socket : %d", errno, socket_ptr->GetSocket()); return false; } socket_ptr->SetInActions(false); base::LOG_DEBUG("del a socket from epoll, %d", socket_ptr->GetSocket()); return true; } bool CEpoll::DelEvent(const uint64_t& sock) { int res = epoll_ctl(_epoll_handler, EPOLL_CTL_DEL, sock, nullptr); if (res == -1) { base::LOG_ERROR("remove event from epoll faild! error :%d, socket : %d", errno, sock); return false; } base::LOG_DEBUG("del a socket from epoll, %d", sock); return true; } void CEpoll::ProcessEvent() { uint32_t wait_time = 0; std::vector> timer_vec;; std::vector event_vec; event_vec.resize(1000); while (_run) { wait_time = _timer.TimeoutCheck(timer_vec); //if there is no timer event. wait until recv something if (wait_time == 0 && timer_vec.empty()) { wait_time = -1; } else { wait_time = wait_time > 0 ? wait_time : 1; } int res = epoll_wait(_epoll_handler, &*event_vec.begin(), (int)(event_vec.size()), wait_time); if (res == -1) { base::LOG_ERROR("epoll_wait faild! error :%d", errno); } if (res > 0) { base::LOG_DEBUG("epoll_wait get events! num :%d, TheadId : %lld", res, std::this_thread::get_id()); _DoEvent(event_vec, res); _DoTaskList(); if (!timer_vec.empty()) { _DoTimeoutEvent(timer_vec); } } else { if (!timer_vec.empty()) { _DoTimeoutEvent(timer_vec); } _DoTaskList(); } } if (close(_epoll_handler) == -1) { base::LOG_ERROR("epoll close failed! error : %d", errno); } if (close(_pipe[0]) == -1) { base::LOG_ERROR("_pipe[0] close failed! error : %d", errno); } if (close(_pipe[1]) == -1) { base::LOG_ERROR("_pipe[1] close failed! error : %d", errno); } base::LOG_INFO("return the net io thread"); } void CEpoll::PostTask(std::function& task) { { std::unique_lock lock(_mutex); _task_list.push_back(task); } WakeUp(); } void CEpoll::WakeUp() { write(_pipe[1], "1", 1); } bool CEpoll::_AddEvent(base::CMemSharePtr& event, int32_t event_flag, uint64_t sock) { epoll_event* content = (epoll_event*)event->_data; content->events |= event_flag | EPOLLET; content->data.ptr = (void*)&event->_client_socket; int res = epoll_ctl(_epoll_handler, EPOLL_CTL_ADD, sock, content); if (res == -1) { if (errno == EEXIST) { res = _ModifyEvent(event, event_flag, sock); } if (res == -1) { base::LOG_ERROR("add event to epoll faild! error :%d, sock: %d", errno, sock); return false; } } base::LOG_DEBUG("add a event to epoll, event : %d, sock : %d", event->_event_flag_set, sock); return true; } bool CEpoll::_AddEvent(base::CMemSharePtr& event, int32_t event_flag, uint64_t sock) { epoll_event* content = (epoll_event*)event->_data; content->events |= event_flag; content->data.ptr = (void*)&event->_accept_socket; content->data.ptr = (void*)(((uintptr_t)content->data.ptr) | 1); int res = epoll_ctl(_epoll_handler, EPOLL_CTL_ADD, sock, content); if (res == -1) { base::LOG_ERROR("add event to epoll faild! error :%d, sock: %d", errno, sock); return false; } base::LOG_DEBUG("add a event to epoll, event flag: %d, sock : %d", event->_event_flag_set, sock); return true; } bool CEpoll::_ModifyEvent(base::CMemSharePtr& event, int32_t event_flag, uint64_t sock) { epoll_event* content = (epoll_event*)event->_data; content->events |= event_flag|EPOLLET; content->data.ptr = (void*)&event->_client_socket; int res = epoll_ctl(_epoll_handler, EPOLL_CTL_MOD, sock, content); if (res == -1) { if (errno == ENOENT) { res = epoll_ctl(_epoll_handler, EPOLL_CTL_ADD, sock, content); } if (res == -1) { base::LOG_ERROR("modify event to epoll faild! error :%d, sock: %d", errno, sock); return false; } } base::LOG_DEBUG("modify a event to epoll, event flag: %d, sock : %d", event->_event_flag_set, sock); return true; } bool CEpoll::_ResetOneShot(base::CMemSharePtr& event, int32_t event_flag, uint64_t sock) { // if per epoll handle, do nothing. if (_per_epoll) { return true; } epoll_event* content = (epoll_event*)event->_data; content->events |= EPOLLONESHOT; int res = epoll_ctl(_epoll_handler, EPOLL_CTL_MOD, sock, content); if (res == -1) { if (errno == ENOENT) { res = _ModifyEvent(event, EPOLLONESHOT | event_flag, sock); } if (res == -1) { base::LOG_ERROR("reset one shot flag faild! error :%d, sock: %d", errno, sock); return false; } } base::LOG_DEBUG("reset one shot, event flag: %d, sock : %d", event->_event_flag_set, sock); return true; } void CEpoll::_DoTimeoutEvent(std::vector>& timer_vec) { for (auto iter = timer_vec.begin(); iter != timer_vec.end(); ++iter) { if ((*iter)->_event_flag & EVENT_READ) { base::CMemSharePtr event_ptr = (*iter)->_event.Lock(); base::CMemSharePtr socket_ptr = event_ptr->_client_socket.Lock(); if (socket_ptr) { event_ptr->_event_flag_set |= EVENT_TIMER; socket_ptr->Recv(event_ptr); } } else if ((*iter)->_event_flag & EVENT_WRITE) { base::CMemSharePtr event_ptr = (*iter)->_event.Lock(); base::CMemSharePtr socket_ptr = event_ptr->_client_socket.Lock(); if (socket_ptr) { event_ptr->_event_flag_set |= EVENT_TIMER; socket_ptr->Send(event_ptr); } } else if ((*iter)->_event_flag & EVENT_TIMER) { auto func = (*iter)->_timer_call_back; if (func) { func((*iter)->_timer_param); } } } timer_vec.clear(); } void CEpoll::_DoEvent(std::vector& event_vec, int num) { base::CMemWeakPtr* normal_sock = nullptr; base::CMemSharePtr* accept_sock = nullptr; void* sock = nullptr; for (int i = 0; i < num; i++) { if (event_vec[i].data.fd == _pipe[0]) { base::LOG_WARN("weak up the io thread, index : %d", i); char buf[4]; read(_pipe[0], buf, 1); continue; } sock = event_vec[i].data.ptr; if (!sock) { base::LOG_WARN("the event is nullptr, index : %d", i); continue; } if (((uintptr_t)sock) & 1) { sock = (void*)(((uintptr_t)sock) & (uintptr_t)~1); accept_sock = (base::CMemSharePtr*)sock; (*accept_sock)->_Accept((*accept_sock)->_accept_event); } else { normal_sock = (base::CMemWeakPtr*)event_vec[i].data.ptr; if (!normal_sock) { continue; } auto socket_ptr = normal_sock->Lock(); if (!socket_ptr) { continue; } if (event_vec[i].events & EPOLLIN) { // close if (event_vec[i].events & EPOLLRDHUP) { socket_ptr->_read_event->_event_flag_set |= (EVENT_DISCONNECT | ERR_CONNECT_CLOSE); } socket_ptr->Recv(socket_ptr->_read_event); } if (event_vec[i].events & EPOLLOUT) { socket_ptr->Send(socket_ptr->_write_event); } } } } void CEpoll::_DoTaskList() { std::vector> func_vec; { std::unique_lock lock(_mutex); func_vec.swap(_task_list); } for (size_t i = 0; i < func_vec.size(); ++i) { func_vec[i](); } } #endif // __linux__