diff --git a/common/network/posix/socket.cpp b/common/network/posix/socket.cpp index 7a4bf83..410d1a7 100644 --- a/common/network/posix/socket.cpp +++ b/common/network/posix/socket.cpp @@ -1,6 +1,8 @@ #if ((defined __linux__) || (defined __APPLE__)) #include +#include +#include #include #include @@ -9,19 +11,41 @@ namespace cppnet { int32_t SocketNoblocking(uint64_t sock) { - int old_option = fcntl(sock, F_GETFL); - int new_option = old_option | O_NONBLOCK; + int32_t old_option = fcntl(sock, F_GETFL); + int32_t new_option = old_option | O_NONBLOCK; fcntl(sock, F_SETFL, new_option); return old_option; } int32_t ReusePort(uint64_t sock) { - int opt = 1; - int ret = setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, + int32_t opt = 1; + int32_t ret = setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &opt, static_cast(sizeof(opt))); return ret; } +bool CheckConnect(const uint64_t sock) { + struct pollfd fd; + int32_t ret = 0; + socklen_t len = 0; + fd.fd = sock; + fd.events = POLLOUT; + if (poll(&fd, 1, -1) == -1) { + if(errno != EINTR){ + return false; + } + } + len = sizeof(ret); + if (getsockopt(sock, SOL_SOCKET, SO_ERROR, &ret, &len) == -1) { + return false; + } + if(ret != 0) { + return false; + } + return true; +} + + } #endif \ No newline at end of file diff --git a/common/network/socket.h b/common/network/socket.h index 3aa01ae..36e5173 100644 --- a/common/network/socket.h +++ b/common/network/socket.h @@ -9,7 +9,8 @@ int32_t SocketNoblocking(uint64_t sock); int32_t ReusePort(uint64_t sock); - +// check socket connect +bool CheckConnect(const uint64_t sock); } #endif diff --git a/cppnet/cppnet.cpp b/cppnet/cppnet.cpp index 18653ef..d478b4a 100644 --- a/cppnet/cppnet.cpp +++ b/cppnet/cppnet.cpp @@ -11,7 +11,7 @@ CppNet::CppNet() { } CppNet::~CppNet() { - + } void CppNet::Init(int32_t thread_num) { diff --git a/cppnet/cppnet_base.cpp b/cppnet/cppnet_base.cpp index b0341d9..96de03d 100644 --- a/cppnet/cppnet_base.cpp +++ b/cppnet/cppnet_base.cpp @@ -110,23 +110,33 @@ void CppNetBase::OnTimer(std::shared_ptr sock) { } void CppNetBase::OnAccept(std::shared_ptr sock) { - _accept_cb(sock, CEC_SUCCESS); + if (_accept_cb) { + _accept_cb(sock, CEC_SUCCESS); + } } void CppNetBase::OnRead(std::shared_ptr sock, uint32_t len) { - _read_cb(sock, sock->GetReadBuffer(), len); + if (_read_cb) { + _read_cb(sock, sock->GetReadBuffer(), len); + } } void CppNetBase::OnWrite(std::shared_ptr sock, uint32_t len) { - _write_cb(sock, len); + if (_write_cb) { + _write_cb(sock, len); + } } void CppNetBase::OnConnect(std::shared_ptr sock, uint16_t err) { - _connect_cb(sock, err); + if (_connect_cb) { + _connect_cb(sock, err); + } } void CppNetBase::OnDisConnect(std::shared_ptr sock, uint16_t err) { - _disconnect_cb(sock, err); + if (_disconnect_cb) { + _disconnect_cb(sock, err); + } } } \ No newline at end of file diff --git a/cppnet/dispatcher.cpp b/cppnet/dispatcher.cpp index ee6e3f2..bb9550c 100644 --- a/cppnet/dispatcher.cpp +++ b/cppnet/dispatcher.cpp @@ -54,7 +54,7 @@ void Dispatcher::Run() { } void Dispatcher::Stop() { - _stop = false; + _stop = true; _event_actions->Wakeup(); } @@ -75,7 +75,6 @@ void Dispatcher::Listen(uint64_t sock, const std::string& ip, uint16_t port) { connect_sock->SetSocket(sock); connect_sock->Bind(ip, port); connect_sock->Listen(); - _event_actions->Wakeup(); }; PostTask(task); } @@ -83,20 +82,21 @@ void Dispatcher::Listen(uint64_t sock, const std::string& ip, uint16_t port) { void Dispatcher::Connect(const std::string& ip, uint16_t port) { if (std::this_thread::get_id() == _local_thread_id) { - auto allocter = std::make_shared(MakePoolAlloterPtr()); - auto sock = allocter->PoolNewSharePtr(allocter); + auto alloter = std::make_shared(MakePoolAlloterPtr()); + auto sock = std::make_shared(alloter); + sock->SetEventActions(_event_actions); sock->SetCppNetBase(_cppnet_base.lock()); sock->Connect(ip, port); } else { auto task = [ip, port, this]() { - auto allocter = std::make_shared(MakePoolAlloterPtr()); - auto sock = allocter->PoolNewSharePtr(allocter); + auto alloter = std::make_shared(MakePoolAlloterPtr()); + auto sock = std::make_shared(alloter); + sock->SetEventActions(_event_actions); sock->SetCppNetBase(_cppnet_base.lock()); sock->Connect(ip, port); - _event_actions->Wakeup(); }; PostTask(task); } diff --git a/cppnet/event/linux/epoll_action.cpp b/cppnet/event/linux/epoll_action.cpp index f24c6de..778ec3b 100644 --- a/cppnet/event/linux/epoll_action.cpp +++ b/cppnet/event/linux/epoll_action.cpp @@ -163,12 +163,10 @@ bool EpollEventActions::AddConnection(std::shared_ptr& event, Address& ad return true; } else if (ret._errno == EINPROGRESS) { - /*if (CheckConnect(socket_ptr->GetSocket())) { - socket_ptr->Recv(socket_ptr->_read_event); + if (CheckConnect(rw_sock->GetSocket())) { + rw_sock->OnConnect(CEC_SUCCESS); return true; } - socket_ptr->_read_event->_event_flag_set |= ERR_CONNECT_FAILED; - */ } rw_sock->OnConnect(CEC_CONNECT_REFUSE); LOG_WARN("connect event failed! %d", ret._errno); diff --git a/cppnet/event/mac/kqueue_action.cpp b/cppnet/event/mac/kqueue_action.cpp index 0d1fc8c..e8fd239 100644 --- a/cppnet/event/mac/kqueue_action.cpp +++ b/cppnet/event/mac/kqueue_action.cpp @@ -76,7 +76,7 @@ bool KqueueEventActions::AddSendEvent(std::shared_ptr& event) { udata = (void*)(((uintptr_t)udata) | 1); struct kevent ev; - EV_SET(&ev, sock->GetSocket(), EVFILT_WRITE, EV_ADD | EV_ENABLE, 0, 0, udata); + EV_SET(&ev, sock->GetSocket(), EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR, 0, 0, udata); _change_list.push_back(ev); return true; @@ -129,7 +129,7 @@ bool KqueueEventActions::AddConnection(std::shared_ptr& event, Address& a auto sock = event->GetSocket(); if (sock) { - //SocketNoblocking(sock->GetSocket()); + SocketNoblocking(sock->GetSocket()); auto ret = OsHandle::Connect(sock->GetSocket(), address); @@ -141,12 +141,10 @@ bool KqueueEventActions::AddConnection(std::shared_ptr& event, Address& a return true; } else if (ret._errno == EINPROGRESS) { - /*if (CheckConnect(socket_ptr->GetSocket())) { - socket_ptr->Recv(socket_ptr->_read_event); + if (CheckConnect(rw_sock->GetSocket())) { + rw_sock->OnConnect(CEC_SUCCESS); return true; } - socket_ptr->_read_event->_event_flag_set |= ERR_CONNECT_FAILED; - */ } rw_sock->OnConnect(CEC_CONNECT_REFUSE); return false; @@ -169,9 +167,6 @@ bool KqueueEventActions::AddDisconnection(std::shared_ptr& event) { } std::shared_ptr socket = std::dynamic_pointer_cast(sock); - if (!DelEvent(event)) { - return false; - } OsHandle::Close(socket->GetSocket()); socket->OnDisConnect(CEC_SUCCESS); return true; @@ -184,11 +179,11 @@ bool KqueueEventActions::DelEvent(std::shared_ptr& event) { } struct kevent read_ev; - EV_SET(&read_ev, sock->GetSocket(), EVFILT_READ, EV_DELETE, 0, 0, NULL); + EV_SET(&read_ev, sock->GetSocket(), EVFILT_READ, EV_DELETE | EV_DISABLE | EV_DISPATCH, 0, 0, NULL); _change_list.push_back(read_ev); struct kevent write_ev; - EV_SET(&write_ev, sock->GetSocket(), EVFILT_WRITE, EV_DELETE, 0, 0, NULL); + EV_SET(&write_ev, sock->GetSocket(), EVFILT_WRITE, EV_DELETE | EV_DISABLE | EV_DISPATCH, 0, 0, NULL); _change_list.push_back(write_ev); event->ClearType(); diff --git a/cppnet/socket/connect_socket.cpp b/cppnet/socket/connect_socket.cpp index 34fc6da..83326c3 100644 --- a/cppnet/socket/connect_socket.cpp +++ b/cppnet/socket/connect_socket.cpp @@ -55,6 +55,9 @@ bool ConnectSocket::Listen() { return false; } + //set the socket noblocking + SocketNoblocking(_sock); + Accept(); return true; @@ -74,8 +77,8 @@ void ConnectSocket::Accept() { void ConnectSocket::OnAccept() { while (true) { - std::shared_ptr allocter = std::make_shared(MakePoolAlloterPtr()); - std::shared_ptr
address = allocter->PoolNewSharePtr
(AT_IPV4); + std::shared_ptr alloter = std::make_shared(MakePoolAlloterPtr()); + std::shared_ptr
address = alloter->PoolNewSharePtr
(AT_IPV4); //may get more than one connections auto ret = OsHandle::Accept(_sock, *address); if (ret._return_value < 0) { @@ -94,10 +97,10 @@ void ConnectSocket::OnAccept() { //set the socket noblocking SocketNoblocking(ret._return_value); - //create a new socket - auto sock = allocter->PoolNewSharePtr(allocter); + //create a new socket. + auto sock = std::make_shared(ret._return_value, alloter); + sock->SetCppNetBase(cppnet_base); - sock->SetSocket(ret._return_value); sock->SetEventActions(_event_actions); sock->SetAddress(address); diff --git a/cppnet/socket/rw_socket.cpp b/cppnet/socket/rw_socket.cpp index aab12e6..ee74fff 100644 --- a/cppnet/socket/rw_socket.cpp +++ b/cppnet/socket/rw_socket.cpp @@ -15,9 +15,16 @@ #include "common/alloter/alloter_interface.h" namespace cppnet { +RWSocket::RWSocket(std::shared_ptr alloter): + Socket(alloter) { -RWSocket::RWSocket(std::shared_ptr alloter): - _alloter(alloter) { + _block_pool = _alloter->PoolNewSharePtr(__mem_block_size, __mem_block_add_step); + _write_buffer = _alloter->PoolNewSharePtr(_block_pool, _alloter); + _read_buffer = _alloter->PoolNewSharePtr(_block_pool, _alloter); +} + +RWSocket::RWSocket(uint64_t sock, std::shared_ptr alloter): + Socket(sock, alloter) { _block_pool = _alloter->PoolNewSharePtr(__mem_block_size, __mem_block_add_step); _write_buffer = _alloter->PoolNewSharePtr(_block_pool, _alloter); @@ -25,15 +32,7 @@ RWSocket::RWSocket(std::shared_ptr alloter): } RWSocket::~RWSocket() { - if (!_event) { - _event = _alloter->PoolNewSharePtr(); - _event->SetSocket(shared_from_this()); - } - - auto actions = GetEventActions(); - if (actions) { - actions->DelEvent(_event); - } + } bool RWSocket::GetAddress(std::string& ip, uint16_t& port) { @@ -48,7 +47,7 @@ bool RWSocket::GetAddress(std::string& ip, uint16_t& port) { } bool RWSocket::Close() { - __all_socket_map.erase(_sock); + Disconnect(); return true; } @@ -173,11 +172,16 @@ void RWSocket::OnConnect(uint16_t err) { void RWSocket::OnDisConnect(uint16_t err) { auto sock = shared_from_this(); __all_socket_map.erase(_sock); - + auto cppnet_base = _cppnet_base.lock(); if (cppnet_base) { cppnet_base->OnDisConnect(sock, err); } + + // not active disconnection + if (_event && !(_event->GetType() & ET_DISCONNECT)) { + OsHandle::Close(_sock); + } } bool RWSocket::Recv() { @@ -207,14 +211,16 @@ bool RWSocket::Recv() { off_set += ret._return_value; break; - } else if (errno == EBADMSG || errno == ECONNRESET) { - cppnet_base->OnDisConnect(shared_from_this(), CEC_CONNECT_BREAK); + } else if (errno == EBADMSG) { + OnDisConnect(CEC_CONNECT_BREAK); return false; } else { - cppnet_base->OnDisConnect(shared_from_this(), CEC_CLOSED); - return false; + } + } else if (ret._return_value == 0) { + OnDisConnect(CEC_CLOSED); + return false; } else { _read_buffer->MoveWritePt(ret._return_value); @@ -256,11 +262,11 @@ bool RWSocket::Send() { } } else if (errno == EBADMSG) { - cppnet_base->OnDisConnect(shared_from_this(), CEC_CONNECT_BREAK); + OnDisConnect(CEC_CONNECT_BREAK); return false; } else { - cppnet_base->OnDisConnect(shared_from_this(), CEC_CLOSED); + OnDisConnect(CEC_CLOSED); return false; } } diff --git a/cppnet/socket/rw_socket.h b/cppnet/socket/rw_socket.h index c9f5bc8..24b053f 100644 --- a/cppnet/socket/rw_socket.h +++ b/cppnet/socket/rw_socket.h @@ -19,6 +19,7 @@ class RWSocket: public: RWSocket(std::shared_ptr alloter); + RWSocket(uint64_t sock, std::shared_ptr alloter); virtual ~RWSocket(); bool GetAddress(std::string& ip, uint16_t& port); @@ -39,8 +40,6 @@ public: void OnConnect(uint16_t err); void OnDisConnect(uint16_t err); - std::shared_ptr GetAlocter() { return _alloter; } - std::shared_ptr GetReadBuffer() { return _read_buffer; } private: @@ -53,7 +52,6 @@ private: std::shared_ptr _write_buffer; std::shared_ptr _read_buffer; - std::shared_ptr _alloter; std::shared_ptr _block_pool; }; diff --git a/cppnet/socket/socket_interface.h b/cppnet/socket/socket_interface.h index 4297430..d4dd137 100644 --- a/cppnet/socket/socket_interface.h +++ b/cppnet/socket/socket_interface.h @@ -11,10 +11,14 @@ class Buffer; class Address; class CppNetBase; class Dispatcher; +class AlloterWrap; class EventActions; class Socket { public: Socket(): _sock(0) {} + Socket(std::shared_ptr alloter): _alloter(alloter) {} + Socket(uint64_t sock, std::shared_ptr alloter): + _sock(sock), _alloter(alloter) {} virtual ~Socket() {} void SetSocket(const uint64_t& sock) { _sock = sock; } @@ -31,11 +35,14 @@ public: void SetDispatcher(std::shared_ptr dis) { _dispatcher = dis; } std::shared_ptr GetDispatcher() { return _dispatcher.lock(); } - + + void SetAlloter(std::shared_ptr alloter) { _alloter = alloter; } + std::shared_ptr GetAlocter() { return _alloter; } + protected: uint64_t _sock; - - std::shared_ptr
_addr; + std::shared_ptr _alloter; + std::shared_ptr
_addr; std::weak_ptr _cppnet_base; std::weak_ptr _event_actions;