diff --git a/Cppnet.vcxproj b/Cppnet.vcxproj index 3066d34..9d8fbc0 100644 --- a/Cppnet.vcxproj +++ b/Cppnet.vcxproj @@ -187,7 +187,6 @@ - diff --git a/Cppnet.vcxproj.filters b/Cppnet.vcxproj.filters index 4e1f7ac..3adae77 100644 --- a/Cppnet.vcxproj.filters +++ b/Cppnet.vcxproj.filters @@ -213,9 +213,6 @@ cppnet\event\win - - cppnet\event\win - common\log diff --git a/cppnet/event/action_interface.h b/cppnet/event/action_interface.h index a036e8b..6cd7606 100644 --- a/cppnet/event/action_interface.h +++ b/cppnet/event/action_interface.h @@ -30,14 +30,13 @@ public: virtual bool Dealloc() = 0; // net io event - virtual bool AddSendEvent(std::shared_ptr& event) = 0; - virtual bool AddRecvEvent(std::shared_ptr& event) = 0; - virtual bool AddWinAcceptEvent(std::shared_ptr& event) = 0; + virtual bool AddSendEvent(Event* event) = 0; + virtual bool AddRecvEvent(Event* event) = 0; + virtual bool AddAcceptEvent(Event* event) = 0; + virtual bool AddConnection(Event* event, Address& addr) = 0; + virtual bool AddDisconnection(Event* event) = 0; - virtual bool AddConnection(std::shared_ptr& event, Address& addr) = 0; - virtual bool AddDisconnection(std::shared_ptr& event) = 0; - - virtual bool DelEvent(std::shared_ptr& event) = 0; + virtual bool DelEvent(Event* event) = 0; // io thread process virtual void ProcessEvent(int32_t wait_ms) = 0; // weak up net io thread diff --git a/cppnet/event/event_interface.h b/cppnet/event/event_interface.h index 2378fd6..ca6ebe5 100644 --- a/cppnet/event/event_interface.h +++ b/cppnet/event/event_interface.h @@ -26,6 +26,7 @@ enum EventType { const char* TypeString(EventType type); class Socket; +class BufferQueue; class Event { public: Event(): _data(nullptr), _event_type(0) {} @@ -43,10 +44,19 @@ public: void SetSocket(std::shared_ptr socket) { _socket = socket; } std::shared_ptr GetSocket() { return _socket.lock(); } +#ifdef __win__ + void SetBuffer(std::shared_ptr& buffer) { _buffer = buffer; } + std::shared_ptr GetBuffer() { return _buffer; } +private: + std::shared_ptr _buffer; +#endif + protected: void* _data; uint16_t _event_type; std::weak_ptr _socket; + + }; } diff --git a/cppnet/event/win/iocp_action.cpp b/cppnet/event/win/iocp_action.cpp index c5331d3..814f5d8 100644 --- a/cppnet/event/win/iocp_action.cpp +++ b/cppnet/event/win/iocp_action.cpp @@ -8,7 +8,6 @@ #include "iocp_action.h" #include "cppnet/cppnet_config.h" -#include "cppnet/event/win/rw_event.h" #include "cppnet/event/win/accept_event.h" #include "cppnet/socket/win/win_rw_socket.h" #include "cppnet/socket/win/win_connect_socket.h" @@ -18,7 +17,6 @@ #include "common/network/socket.h" #include "common/buffer/buffer_queue.h" - namespace cppnet { std::shared_ptr MakeEventActions() { @@ -57,7 +55,7 @@ IOCPEventActions::~IOCPEventActions() { bool IOCPEventActions::Init(uint32_t thread_num) { WinSockInit(); - //tell iocp thread num + //tell IOCP thread num _iocp_handler = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, thread_num); if (_iocp_handler == INVALID_HANDLE_VALUE) { LOG_ERROR("IOCP create io completion port failed!"); @@ -71,115 +69,6 @@ bool IOCPEventActions::Dealloc() { return true; } -bool IOCPEventActions::AddSendEvent(std::shared_ptr& event) { - if (event->GetType() & ET_WRITE || event->GetType() & ET_DISCONNECT) { - LOG_WARN_S << "repeat send event"; - return false; - } - - auto sock = event->GetSocket(); - if (!sock) { - LOG_WARN("socket is already distroyed! event %s", "AddSendEvent"); - return false; - } - auto rw_sock = std::dynamic_pointer_cast(sock); - if (rw_sock->IsShutdown()) { - LOG_WARN_S << "socket is shutdown when send"; - return false; - } - - /*if (!(event->GetType() & ET_INACTIONS)) { - if (!AddToIOCP(sock->GetSocket())) { - LOG_WARN_S << "add to iocp failed when send"; - return false; - } - event->AddType(ET_INACTIONS); - }*/ - auto rw_event = std::dynamic_pointer_cast(event); - EventOverlapped* context = (EventOverlapped*)rw_event->GetData(); - - if (!context) { - context = rw_sock->GetAlloter()->PoolNew(); - context->_event = (void*)&event; - rw_event->SetData(context); - } - - context->_event_type = ET_WRITE; - - auto buffer = rw_event->GetBuffer(); - std::vector bufs; - buffer->GetUseMemoryBlock(bufs, __iocp_buff_size); - - DWORD dwFlags = 0; - int32_t ret = WSASend((SOCKET)sock->GetSocket(), (LPWSABUF)&(*bufs.begin()), (DWORD)bufs.size(), nullptr, dwFlags, &context->_overlapped, nullptr); - - if ((SOCKET_ERROR == ret) && (WSA_IO_PENDING != WSAGetLastError())) { - LOG_WARN("IOCP post send event failed! error code:%d, info:%s", WSAGetLastError(), ErrnoInfo(WSAGetLastError())); - //rw_sock->OnDisConnect(CEC_CLOSED); - DelEvent(event); - rw_sock->SetShutdown(); - return false; - } - - // send some data immediately - event->AddType(ET_WRITE); - LOG_DEBUG("post a new write event"); - return true; -} - -bool IOCPEventActions::AddRecvEvent(std::shared_ptr& event) { - if (event->GetType() & ET_READ || event->GetType() & ET_DISCONNECT) { - LOG_WARN_S << "repeat recv event"; - return false; - } - - auto sock = event->GetSocket(); - if (!sock) { - LOG_WARN("socket is already distroyed! event %s", "AddSendEvent"); - return false; - } - auto rw_sock = std::dynamic_pointer_cast(sock); - if (rw_sock->IsShutdown()) { - LOG_WARN_S << "socket is shutdown when recv"; - return false; - } - - /*if (!(event->GetType() & ET_INACTIONS)) { - if (!AddToIOCP(sock->GetSocket())) { - LOG_WARN_S << "add to iocp failed when recv"; - return false; - } - event->AddType(ET_INACTIONS); - }*/ - - EventOverlapped* context = (EventOverlapped*)event->GetData(); - - if (!context) { - context = rw_sock->GetAlloter()->PoolNew(); - context->_event = (void*)&event; - event->SetData(context); - } - context->_event_type = ET_READ; - - auto buffer = rw_sock->GetReadBuffer(); - std::vector bufs; - buffer->GetFreeMemoryBlock(bufs, __iocp_buff_size); - - DWORD dwFlags = 0; - int32_t ret = WSARecv((SOCKET)sock->GetSocket(), (LPWSABUF)&(*bufs.begin()), (DWORD)bufs.size(), nullptr, &dwFlags, &context->_overlapped, nullptr); - - if ((SOCKET_ERROR == ret) && (WSA_IO_PENDING != WSAGetLastError())) { - LOG_WARN("IOCP post recv event failed! error code: %d, info:%s", WSAGetLastError(), ErrnoInfo(WSAGetLastError())); - //rw_sock->OnDisConnect(CEC_CLOSED); - DelEvent(event); - rw_sock->SetShutdown(); - return false; - } - event->AddType(ET_READ); - LOG_DEBUG("post a new read event"); - return true; -} - bool IOCPEventActions::AddSendEvent(Event* event) { if (event->GetType() & ET_WRITE || event->GetType() & ET_DISCONNECT) { LOG_WARN_S << "repeat send event"; @@ -197,25 +86,17 @@ bool IOCPEventActions::AddSendEvent(Event* event) { return false; } - /*if (!(event->GetType() & ET_INACTIONS)) { - if (!AddToIOCP(sock->GetSocket())) { - LOG_WARN_S << "add to iocp failed when send"; - return false; - } - event->AddType(ET_INACTIONS); - }*/ - auto rw_event = dynamic_cast(event); - EventOverlapped* context = (EventOverlapped*)rw_event->GetData(); + EventOverlapped* context = (EventOverlapped*)event->GetData(); if (!context) { context = rw_sock->GetAlloter()->PoolNew(); context->_event = (void*)event; - rw_event->SetData(context); + event->SetData(context); } context->_event_type = ET_WRITE; - auto buffer = rw_event->GetBuffer(); + auto buffer = event->GetBuffer(); std::vector bufs; buffer->GetUseMemoryBlock(bufs, __iocp_buff_size); @@ -224,8 +105,6 @@ bool IOCPEventActions::AddSendEvent(Event* event) { if ((SOCKET_ERROR == ret) && (WSA_IO_PENDING != WSAGetLastError())) { LOG_WARN("IOCP post send event failed! error code:%d, info:%s", WSAGetLastError(), ErrnoInfo(WSAGetLastError())); - //rw_sock->OnDisConnect(CEC_CLOSED); - //DelEvent(event); rw_sock->SetShutdown(); return false; } @@ -253,14 +132,6 @@ bool IOCPEventActions::AddRecvEvent(Event* event) { return false; } - /*if (!(event->GetType() & ET_INACTIONS)) { - if (!AddToIOCP(sock->GetSocket())) { - LOG_WARN_S << "add to iocp failed when recv"; - return false; - } - event->AddType(ET_INACTIONS); - }*/ - EventOverlapped* context = (EventOverlapped*)event->GetData(); if (!context) { @@ -270,8 +141,7 @@ bool IOCPEventActions::AddRecvEvent(Event* event) { } context->_event_type = ET_READ; - auto win_rw = dynamic_cast(event); - auto buffer = win_rw->GetBuffer(); + auto buffer = event->GetBuffer(); std::vector bufs; buffer->GetFreeMemoryBlock(bufs, __iocp_buff_size); @@ -280,8 +150,6 @@ bool IOCPEventActions::AddRecvEvent(Event* event) { if ((SOCKET_ERROR == ret) && (WSA_IO_PENDING != WSAGetLastError())) { LOG_WARN("IOCP post recv event failed! error code: %d, info:%s", WSAGetLastError(), ErrnoInfo(WSAGetLastError())); - //rw_sock->OnDisConnect(CEC_CLOSED); - //DelEvent(event); rw_sock->SetShutdown(); return false; } @@ -290,7 +158,7 @@ bool IOCPEventActions::AddRecvEvent(Event* event) { return true; } -bool IOCPEventActions::AddWinAcceptEvent(std::shared_ptr& event) { +bool IOCPEventActions::AddAcceptEvent(Event* event) { if (event->GetType() & ET_ACCEPT) { LOG_WARN_S << "repeat accept event"; return false; @@ -303,63 +171,6 @@ bool IOCPEventActions::AddWinAcceptEvent(std::shared_ptr& event) { } auto accept_sock = std::dynamic_pointer_cast(sock); - if (!accept_sock->GetInActions()) { - if (!AddToIOCP(sock->GetSocket())) { - LOG_WARN_S << "add to iocp failed when accept"; - return false; - } - accept_sock->SetInActions(true); - } - - auto accept_event = std::dynamic_pointer_cast(event); - - EventOverlapped* context = (EventOverlapped*)event->GetData(); - if (!context) { - context = new EventOverlapped(); - context->_event = (void*)&event; - event->SetData(context); - } - context->_event_type = ET_ACCEPT; - - DWORD dwBytes = 0; - uint32_t ret = AcceptEx((SOCKET)sock->GetSocket(), (SOCKET)accept_event->GetClientSocket(), accept_event->GetBuf(), __iocp_buff_size - ((sizeof(SOCKADDR_STORAGE) + 16) * 2), - sizeof(SOCKADDR_STORAGE) + 16, sizeof(SOCKADDR_STORAGE) + 16, &dwBytes, &context->_overlapped); - - if (0 == ret) { - if (WSA_IO_PENDING != WSAGetLastError()) { - LOG_ERROR("IOCP post accept failed! error code:%d", WSAGetLastError()); - DelEvent(event); - return false; - } - } - - event->AddType(ET_ACCEPT); - LOG_DEBUG("post a new accept event"); - - return true; -} - -bool IOCPEventActions::AddWinAcceptEvent(Event* event) { - if (event->GetType() & ET_ACCEPT) { - LOG_WARN_S << "repeat accept event"; - return false; - } - - auto sock = event->GetSocket(); - if (!sock) { - LOG_WARN("socket is already distroyed! event %s", "AddWinAcceptEvent"); - return false; - } - - auto accept_sock = std::dynamic_pointer_cast(sock); - if (!accept_sock->GetInActions()) { - if (!AddToIOCP(sock->GetSocket())) { - LOG_WARN_S << "add to iocp failed when accept"; - return false; - } - accept_sock->SetInActions(true); - } - auto accept_event = dynamic_cast(event); EventOverlapped* context = (EventOverlapped*)event->GetData(); @@ -377,7 +188,6 @@ bool IOCPEventActions::AddWinAcceptEvent(Event* event) { if (0 == ret) { if (WSA_IO_PENDING != WSAGetLastError()) { LOG_ERROR("IOCP post accept failed! error code:%d", WSAGetLastError()); - //DelEvent(event); return false; } } @@ -388,125 +198,6 @@ bool IOCPEventActions::AddWinAcceptEvent(Event* event) { return true; } -bool IOCPEventActions::AddConnection(std::shared_ptr& event, Address& address) { - if (event->GetType() & ET_CONNECT) { - LOG_WARN_S << "repeat connect event"; - return false; - } - - auto sock = event->GetSocket(); - if (!sock) { - LOG_WARN("socket is already distroyed! event %s", "AddSendEvent"); - return false; - } - - EventOverlapped* context = (EventOverlapped*)event->GetData(); - if (!context) { - context = sock->GetAlloter()->PoolNew(); - context->_event = (void*)&event; - event->SetData(context); - } - - if (address.GetType() == AT_IPV4) { - SOCKADDR_IN local; - local.sin_family = AF_INET; - local.sin_port = htons(0); - local.sin_addr.S_un.S_addr = INADDR_ANY; - if (bind(sock->GetSocket(), (sockaddr*)&local, sizeof(local)) != 0) { - LOG_FATAL("bind local host failed! error code:%d, info:%s", WSAGetLastError(), ErrnoInfo(WSAGetLastError())); - } - - } else { - SOCKADDR_IN6 local; - local.sin6_flowinfo = 0; - local.sin6_scope_id = 0; - local.sin6_family = AF_INET6; - local.sin6_port = 0; - local.sin6_addr = in6addr_any; - if (bind(sock->GetSocket(), (sockaddr*)&local, sizeof(local)) != 0) { - LOG_FATAL("bind local host failed! error code:%d, info:%s", WSAGetLastError(), ErrnoInfo(WSAGetLastError())); - } - } - - DWORD dwBytes = 0; - int32_t ret = -1; - if (address.GetType() == AT_IPV4) { - SOCKADDR_IN addr; - addr.sin_family = AF_INET6; - addr.sin_port = htons(address.GetAddrPort()); - addr.sin_addr.S_un.S_addr = inet_addr(address.GetIp().c_str()); - ConnectEx((SOCKET)sock->GetSocket(), (sockaddr*)&addr, sizeof(addr), nullptr, 0, &dwBytes, &context->_overlapped); - - } else { - SOCKADDR_IN6 addr; - addr.sin6_flowinfo = 0; - addr.sin6_scope_id = 0; - addr.sin6_family = AF_INET6; - addr.sin6_port = htons(address.GetAddrPort()); - inet_pton(AF_INET6, address.GetIp().c_str(), &addr.sin6_addr); - ConnectEx((SOCKET)sock->GetSocket(), (sockaddr*)&addr, sizeof(addr), nullptr, 0, &dwBytes, &context->_overlapped); - } - - setsockopt((SOCKET)sock->GetSocket(), SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0); - - auto rw_sock = std::dynamic_pointer_cast(sock); - if (ret) { - rw_sock->OnConnect(CEC_SUCCESS); - return true; - } - - int32_t err = WSAGetLastError(); - if (WSA_IO_PENDING == err || ERROR_SUCCESS == err) { - if (CheckConnect(rw_sock->GetSocket())) { - rw_sock->OnConnect(CEC_SUCCESS); - return true; - } - } - context->_event_type = ET_CONNECT; - DoEvent(context, 0); - - return false; -} - -bool IOCPEventActions::AddDisconnection(std::shared_ptr& event) { - if (event->GetType() & ET_DISCONNECT) { - LOG_WARN_S << "repeat disconnect event"; - return false; - } - - auto sock = event->GetSocket(); - if (!sock) { - LOG_WARN("socket is already distroyed! event %s", "AddSendEvent"); - return false; - } - auto rw_sock = std::dynamic_pointer_cast(sock); - if (rw_sock->IsShutdown()) { - return false; - } - - EventOverlapped* context = (EventOverlapped*)event->GetData(); - if (!context) { - context = sock->GetAlloter()->PoolNew(); - context->_event = (void*)&event; - event->SetData(context); - } - - context->_event_type = ET_DISCONNECT; - int32_t ret = DisconnectionEx((SOCKET)sock->GetSocket(), &context->_overlapped, TF_REUSE_SOCKET, 0); - - if ((SOCKET_ERROR == ret) && (WSA_IO_PENDING != WSAGetLastError())) { - LOG_FATAL("IOCP post disconnect event failed! error code: %d", WSAGetLastError()); - return false; - } - LOG_DEBUG("post a new disconnect event"); - - event->AddType(ET_DISCONNECT); - //DelEvent(event); - //auto rw_sock = std::dynamic_pointer_cast(sock); - //rw_sock->OnDisConnect(CEC_CLOSED); - return true; -} - bool IOCPEventActions::AddConnection(Event* event, Address& address) { if (event->GetType() & ET_CONNECT) { LOG_WARN_S << "repeat connect event"; @@ -556,8 +247,7 @@ bool IOCPEventActions::AddConnection(Event* event, Address& address) { addr.sin_addr.S_un.S_addr = inet_addr(address.GetIp().c_str()); ConnectEx((SOCKET)sock->GetSocket(), (sockaddr*)&addr, sizeof(addr), nullptr, 0, &dwBytes, &context->_overlapped); - } - else { + } else { SOCKADDR_IN6 addr; addr.sin6_flowinfo = 0; addr.sin6_scope_id = 0; @@ -582,10 +272,9 @@ bool IOCPEventActions::AddConnection(Event* event, Address& address) { return true; } } - context->_event_type = ET_CONNECT; - DoEvent(context, 0); - return false; + rw_sock->OnConnect(CEC_CONNECT_REFUSE); + return true; } bool IOCPEventActions::AddDisconnection(Event* event) { @@ -621,13 +310,10 @@ bool IOCPEventActions::AddDisconnection(Event* event) { LOG_DEBUG("post a new disconnect event"); event->AddType(ET_DISCONNECT); - //DelEvent(event); - //auto rw_sock = std::dynamic_pointer_cast(sock); - //rw_sock->OnDisConnect(CEC_CLOSED); return true; } -bool IOCPEventActions::DelEvent(std::shared_ptr& event) { +bool IOCPEventActions::DelEvent(Event* event) { auto sock = event->GetSocket(); if (!sock) { LOG_WARN("socket is already distroyed! event %s", "AddSendEvent"); diff --git a/cppnet/event/win/iocp_action.h b/cppnet/event/win/iocp_action.h index 561c2d1..475a125 100644 --- a/cppnet/event/win/iocp_action.h +++ b/cppnet/event/win/iocp_action.h @@ -22,26 +22,21 @@ public: virtual bool Init(uint32_t thread_num = 0); virtual bool Dealloc(); // net io event - virtual bool AddSendEvent(std::shared_ptr& event); - virtual bool AddRecvEvent(std::shared_ptr& event); virtual bool AddSendEvent(Event* event); virtual bool AddRecvEvent(Event* event); - virtual bool AddWinAcceptEvent(std::shared_ptr& event); - virtual bool AddWinAcceptEvent(Event* event); - - virtual bool AddConnection(std::shared_ptr& event, Address& address); - virtual bool AddDisconnection(std::shared_ptr& event); - + virtual bool AddAcceptEvent(Event* event); virtual bool AddConnection(Event* event, Address& address); virtual bool AddDisconnection(Event* event); - virtual bool DelEvent(std::shared_ptr& event); + virtual bool DelEvent(Event* event); // io thread process virtual void ProcessEvent(int32_t wait_ms); // weak up net io thread virtual void Wakeup(); -public: + bool AddToIOCP(uint64_t sock); + +private: void DoEvent(EventOverlapped *socket_context, uint32_t bytes); protected: diff --git a/cppnet/event/win/rw_event.h b/cppnet/event/win/rw_event.h deleted file mode 100644 index 6ccdfd8..0000000 --- a/cppnet/event/win/rw_event.h +++ /dev/null @@ -1,36 +0,0 @@ -// Use of this source code is governed by a BSD 3-Clause License -// that can be found in the LICENSE file. - -// Author: caozhiyi (caozhiyi5@gmail.com) - -#ifndef CPPNET_EVENT_WIN_RW_EVENT -#define CPPNET_EVENT_WIN_RW_EVENT - -#include "cppnet/event/event_interface.h" - -namespace cppnet { - -class BufferQueue; -class WinRWEvent: - public Event{ -public: - WinRWEvent(): - _ex_data(nullptr) {} - virtual ~WinRWEvent() {} - - void SetExData(void* data) { _ex_data = data; } - void* GetExData() { return _ex_data; } - - void SetBuffer(std::shared_ptr& buffer) { _buffer = buffer; } - std::shared_ptr GetBuffer() { return _buffer; } - -private: - void* _ex_data; - -private: - std::shared_ptr _buffer; -}; - -} - -#endif \ No newline at end of file diff --git a/cppnet/socket/connect_socket.cpp b/cppnet/socket/connect_socket.cpp index 9bd941d..456b455 100644 --- a/cppnet/socket/connect_socket.cpp +++ b/cppnet/socket/connect_socket.cpp @@ -27,14 +27,7 @@ ConnectSocket::ConnectSocket() { } ConnectSocket::~ConnectSocket() { - if (_sock > 0) { -#ifdef __win__ - __all_socket_map.Erase(_sock); -#else - __all_socket_map.erase(_sock); -#endif - OsHandle::Close(_sock); - } + } bool ConnectSocket::Bind(const std::string& ip, uint16_t port) { @@ -77,6 +70,7 @@ bool ConnectSocket::Listen() { return true; } +/* void ConnectSocket::Accept() { if (!_accept_event) { _accept_event = std::make_shared(); @@ -88,5 +82,6 @@ void ConnectSocket::Accept() { actions->AddWinAcceptEvent(_accept_event); } } +*/ } \ No newline at end of file diff --git a/cppnet/socket/connect_socket.h b/cppnet/socket/connect_socket.h index f955a10..e09d3b6 100644 --- a/cppnet/socket/connect_socket.h +++ b/cppnet/socket/connect_socket.h @@ -25,12 +25,11 @@ public: virtual bool Bind(const std::string& ip, uint16_t port); virtual bool Listen(); - virtual void Accept(); + virtual void Accept() {} + virtual void Accept(uint16_t index) {} + virtual void Close() {} - virtual void OnAccept() {} - -protected: - std::shared_ptr _accept_event; + virtual void OnAccept(Event* event) {} }; std::shared_ptr MakeConnectSocket(); diff --git a/cppnet/socket/rw_socket.cpp b/cppnet/socket/rw_socket.cpp index 28735a1..f316b57 100644 --- a/cppnet/socket/rw_socket.cpp +++ b/cppnet/socket/rw_socket.cpp @@ -23,7 +23,7 @@ RWSocket::RWSocket(std::shared_ptr alloter): Socket(alloter) { _block_pool = _alloter->PoolNewSharePtr(__mem_block_size, __mem_block_add_step); - _write_buffer = _alloter->PoolNewSharePtr(_block_pool, _alloter); + //_write_buffer = _alloter->PoolNewSharePtr(_block_pool, _alloter); _read_buffer = _alloter->PoolNewSharePtr(_block_pool, _alloter); } @@ -31,15 +31,15 @@ RWSocket::RWSocket(uint64_t sock, std::shared_ptr alloter): Socket(sock, alloter) { _block_pool = _alloter->PoolNewSharePtr(__mem_block_size, __mem_block_add_step); - _write_buffer = _alloter->PoolNewSharePtr(_block_pool, _alloter); + //_write_buffer = _alloter->PoolNewSharePtr(_block_pool, _alloter); _read_buffer = _alloter->PoolNewSharePtr(_block_pool, _alloter); } RWSocket::~RWSocket() { // free buffer early than pool - _write_buffer.reset(); - _read_buffer.reset(); - _block_pool.reset(); + //_write_buffer.reset(); + //_read_buffer.reset(); + //_block_pool.reset(); } bool RWSocket::GetAddress(std::string& ip, uint16_t& port) { @@ -55,7 +55,7 @@ bool RWSocket::Close() { } void RWSocket::Read() { - if (!_event) { + /*if (!_event) { _event = _alloter->PoolNewSharePtr(); _event->SetSocket(shared_from_this()); } @@ -63,11 +63,11 @@ void RWSocket::Read() { auto actions = GetEventActions(); if (actions) { actions->AddRecvEvent(_event); - } + }*/ } void RWSocket::Connect(const std::string& ip, uint16_t port) { - if (!_event) { + /*if (!_event) { _event = _alloter->PoolNewSharePtr(); _event->SetSocket(shared_from_this()); } @@ -88,11 +88,11 @@ void RWSocket::Connect(const std::string& ip, uint16_t port) { auto actions = GetEventActions(); if (actions) { actions->AddConnection(_event, _addr); - } + }*/ } void RWSocket::Disconnect() { - if (!_event) { + /*if (!_event) { _event = _alloter->PoolNewSharePtr(); _event->SetSocket(shared_from_this()); } @@ -100,7 +100,7 @@ void RWSocket::Disconnect() { auto actions = GetEventActions(); if (actions) { actions->AddDisconnection(_event); - } + }*/ } void RWSocket::OnTimer() { @@ -155,9 +155,9 @@ void RWSocket::OnDisConnect(uint16_t err) { } // not active disconnection - if (_event && !(_event->GetType() & ET_DISCONNECT)) { + /*if (_event && !(_event->GetType() & ET_DISCONNECT)) { OsHandle::Close(_sock); - } + }*/ } } \ No newline at end of file diff --git a/cppnet/socket/rw_socket.h b/cppnet/socket/rw_socket.h index d38f8f1..b5b896a 100644 --- a/cppnet/socket/rw_socket.h +++ b/cppnet/socket/rw_socket.h @@ -46,15 +46,19 @@ public: virtual void OnDisConnect(uint16_t err); std::shared_ptr GetReadBuffer() { return _read_buffer; } +#ifndef __win__ std::shared_ptr GetWriteBuffer() { return _write_buffer; } - +#endif + protected: std::shared_ptr _block_pool; - - std::shared_ptr _event; - - std::shared_ptr _write_buffer; std::shared_ptr _read_buffer; + +#ifndef __win__ + std::shared_ptr _event; + std::shared_ptr _write_buffer; +#endif + }; std::shared_ptr MakeRWSocket(std::shared_ptr alloter); diff --git a/cppnet/socket/win/win_connect_socket.cpp b/cppnet/socket/win/win_connect_socket.cpp index f3e3a47..9504d10 100644 --- a/cppnet/socket/win/win_connect_socket.cpp +++ b/cppnet/socket/win/win_connect_socket.cpp @@ -9,9 +9,8 @@ #include "cppnet/cppnet_base.h" #include "cppnet/socket/rw_socket.h" #include "cppnet/event/win/expend_func.h" -#include "cppnet/event/win/accept_event.h" -#include "cppnet/event/action_interface.h" #include "cppnet/event/win/iocp_action.h" +#include "cppnet/event/win/accept_event.h" #include "common/log/log.h" #include "common/os/convert.h" @@ -30,8 +29,8 @@ std::shared_ptr MakeConnectSocket() { return std::make_shared(); } -WinConnectSocket::WinConnectSocket(): - _in_actions(false) { +WinConnectSocket::WinConnectSocket() { + // create all accept event. for (uint16_t i = 0; i < __iocp_accept_event_num; i++) { auto event = new WinAcceptEvent(i); _accept_event_vec.emplace_back(event); @@ -39,7 +38,42 @@ WinConnectSocket::WinConnectSocket(): } WinConnectSocket::~WinConnectSocket() { + __all_socket_map.Erase(_sock); + for (auto iter = _accept_event_vec.begin(); iter != _accept_event_vec.end(); iter++) { + delete *iter; + } +} +bool WinConnectSocket::Bind(const std::string& ip, uint16_t port) { + if (_sock == 0) { + auto ret = OsHandle::TcpSocket(); + if (ret._return_value < 0) { + LOG_ERROR("create socket failed. errno:%d, info:%s", ret._errno, ErrnoInfo(ret._errno)); + return false; + } + _sock = ret._return_value; + + auto action = GetEventActions(); + auto iocp = std::dynamic_pointer_cast(action); + if (!iocp->AddToIOCP(_sock)) { + LOG_FATAL("add accept socket to iocp failed!"); + OsHandle::Close(_sock); + return false; + } + } + + _addr.SetIp(ip); + _addr.SetAddrPort(port); + + auto ret = OsHandle::Bind(_sock, _addr); + + if (ret._return_value < 0) { + LOG_FATAL("window bind socket filed! error:%d, info:%s", ret._errno, ErrnoInfo(ret._errno)); + OsHandle::Close(_sock); + return false; + } + + return true; } void WinConnectSocket::Accept() { @@ -68,12 +102,16 @@ void WinConnectSocket::Accept(uint16_t index) { auto actions = GetEventActions(); if (actions) { - auto iocp = std::dynamic_pointer_cast(actions); - iocp->AddWinAcceptEvent(event); + actions->AddAcceptEvent(event); } } -void WinConnectSocket::OnAccept(WinAcceptEvent* event) { +void WinConnectSocket::Close() { + __all_socket_map.Erase(_sock); + OsHandle::Close(_sock); +} + +void WinConnectSocket::OnAccept(Event* event) { auto cppnet_base = _cppnet_base.lock(); if (!cppnet_base) { return; @@ -84,18 +122,18 @@ void WinConnectSocket::OnAccept(WinAcceptEvent* event) { SOCKADDR_STORAGE* LocalAddr = NULL; int localLen = sizeof(SOCKADDR_STORAGE); + auto accept_event = dynamic_cast(event); + // accept a socket and read msg - AcceptExSockAddrs(event->GetBuf(), __iocp_buff_size - ((sizeof(SOCKADDR_STORAGE) + 16) * 2), + AcceptExSockAddrs(accept_event->GetBuf(), __iocp_buff_size - ((sizeof(SOCKADDR_STORAGE) + 16) * 2), sizeof(SOCKADDR_STORAGE) + 16, sizeof(SOCKADDR_STORAGE) + 16, (LPSOCKADDR*)&LocalAddr, &localLen, (LPSOCKADDR*)&client_addr, &remote_len); // Does this call have any effect ? - setsockopt(event->GetClientSocket(), SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, + setsockopt(accept_event->GetClientSocket(), SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char *)&_sock, sizeof(_sock)); - // create a new rw socket - std::shared_ptr alloter = std::make_shared(MakePoolAlloterPtr()); + Address address; - SOCKADDR* addr_pt = (SOCKADDR*)client_addr; void* addr = nullptr; switch (addr_pt->sa_family) { @@ -117,29 +155,31 @@ void WinConnectSocket::OnAccept(WinAcceptEvent* event) { inet_ntop(AF_INET6, addr, str_addr, sizeof(str_addr)); address.SetIp(str_addr); - auto sock = MakeRWSocket(event->GetClientSocket(), std::move(alloter)); + // create a new socket + std::shared_ptr alloter = std::make_shared(MakePoolAlloterPtr()); + auto sock = MakeRWSocket(accept_event->GetClientSocket(), std::move(alloter)); sock->SetCppNetBase(cppnet_base); sock->SetEventActions(_event_actions); sock->SetAddress(std::move(address)); sock->SetDispatcher(GetDispatcher()); + auto buffer = sock->GetReadBuffer(); + buffer->Write(accept_event->GetBuf(), accept_event->GetBufOffset()); + // add socket to iocp auto action = GetEventActions(); auto iocp = std::dynamic_pointer_cast(action); - iocp->AddToIOCP(event->GetClientSocket()); - - __all_socket_map[event->GetClientSocket()] = sock; - - auto buffer = sock->GetReadBuffer(); - buffer->Write(event->GetBuf(), event->GetBufOffset()); + iocp->AddToIOCP(accept_event->GetClientSocket()); + // add socket global cache. + __all_socket_map[accept_event->GetClientSocket()] = sock; // call accept call back function cppnet_base->OnAccept(sock); - cppnet_base->OnRead(sock, event->GetBufOffset()); + cppnet_base->OnRead(sock, accept_event->GetBufOffset()); //post accept again - Accept(event->GetIndex()); + Accept(accept_event->GetIndex()); // wait for read sock->Read(); diff --git a/cppnet/socket/win/win_connect_socket.h b/cppnet/socket/win/win_connect_socket.h index 271c2b0..f84e3a0 100644 --- a/cppnet/socket/win/win_connect_socket.h +++ b/cppnet/socket/win/win_connect_socket.h @@ -18,17 +18,15 @@ public: WinConnectSocket(); ~WinConnectSocket(); + virtual bool Bind(const std::string& ip, uint16_t port); virtual void Accept(); - void Accept(uint16_t index); + virtual void Accept(uint16_t index); + virtual void Close(); - void OnAccept(WinAcceptEvent* event); - - void SetInActions(bool in) { _in_actions = in; } - bool GetInActions() { return _in_actions; } + virtual void OnAccept(Event* event); private: std::vector _accept_event_vec; - bool _in_actions; }; } diff --git a/cppnet/socket/win/win_rw_socket.cpp b/cppnet/socket/win/win_rw_socket.cpp index 22b2a7f..d3cc861 100644 --- a/cppnet/socket/win/win_rw_socket.cpp +++ b/cppnet/socket/win/win_rw_socket.cpp @@ -7,9 +7,8 @@ #include "cppnet/cppnet_base.h" #include "cppnet/cppnet_config.h" -#include "cppnet/event/win/rw_event.h" +#include "cppnet/event/event_interface.h" #include "cppnet/event/action_interface.h" -#include "cppnet/event/win/iocp_action.h" #include "common/log/log.h" #include "common/buffer/buffer_queue.h" @@ -26,7 +25,6 @@ std::shared_ptr MakeRWSocket(uint64_t sock, std::shared_ptr alloter): RWSocket(alloter), - _ref_count(0), _shutdown(false), _is_reading(false) { @@ -34,7 +32,6 @@ WinRWSocket::WinRWSocket(std::shared_ptr alloter): WinRWSocket::WinRWSocket(uint64_t sock, std::shared_ptr alloter): RWSocket(sock, alloter), - _ref_count(0), _shutdown(false), _is_reading(false) { @@ -50,7 +47,7 @@ void WinRWSocket::Read() { return; } - auto rw_event = _alloter->PoolNew(); + auto rw_event = _alloter->PoolNew(); auto buffer = _alloter->PoolNewSharePtr(_block_pool, _alloter); rw_event->SetBuffer(buffer); @@ -59,8 +56,7 @@ void WinRWSocket::Read() { auto actions = GetEventActions(); if (actions) { - auto iocp = std::dynamic_pointer_cast(actions); - if (iocp->AddRecvEvent(event)) { + if (actions->AddRecvEvent(event)) { _is_reading = true; AddEvent(event); } @@ -68,22 +64,20 @@ void WinRWSocket::Read() { } bool WinRWSocket::Write(const char* src, uint32_t len) { - auto rw_event = _alloter->PoolNew(); + // create new write buffer auto buffer = _alloter->PoolNewSharePtr(_block_pool, _alloter); - rw_event->SetBuffer(buffer); - buffer->Write(src, len); - auto event = dynamic_cast(rw_event); - event->SetSocket(shared_from_this()); + // create new write event + auto rw_event = _alloter->PoolNew(); + rw_event->SetBuffer(buffer); + + rw_event->SetSocket(shared_from_this()); auto actions = GetEventActions(); if (actions) { - auto iocp = std::dynamic_pointer_cast(actions); - if (iocp->AddSendEvent(event)) { - std::lock_guard lock(_event_mutex); - _event_set.insert(event); - + if (actions->AddSendEvent(rw_event)) { + AddEvent(rw_event); return true; } } @@ -91,29 +85,39 @@ bool WinRWSocket::Write(const char* src, uint32_t len) { } void WinRWSocket::Connect(const std::string& ip, uint16_t port) { - if (!_event) { - _event = _alloter->PoolNewSharePtr(); - _event->SetSocket(shared_from_this()); + if (_sock == 0) { + auto ret = OsHandle::TcpSocket(); + if (ret._return_value < 0) { + LOG_ERROR("create socket failed. error:%d", ret._errno); + return; + } + _sock = ret._return_value; } - RWSocket::Connect(ip, port); -} -void WinRWSocket::Disconnect() { - if (!_event) { - _event = _alloter->PoolNewSharePtr(); - _event->SetSocket(shared_from_this()); - } + _addr.SetIp(ip); + _addr.SetAddrPort(port); + + auto rw_event = _alloter->PoolNew(); + rw_event->SetSocket(shared_from_this()); auto actions = GetEventActions(); if (actions) { - if (actions->AddDisconnection(_event)) { - Incref(); + if (actions->AddConnection(rw_event, _addr)) { + AddEvent(rw_event); } } } -void WinRWSocket::OnRead(uint32_t len) { - // do nothind +void WinRWSocket::Disconnect() { + auto rw_event = _alloter->PoolNew(); + rw_event->SetSocket(shared_from_this()); + + auto actions = GetEventActions(); + if (actions) { + if (actions->AddDisconnection(rw_event)) { + AddEvent(rw_event); + } + } } void WinRWSocket::OnRead(Event* event, uint32_t len) { @@ -126,8 +130,7 @@ void WinRWSocket::OnRead(Event* event, uint32_t len) { return; } - auto rw_event = dynamic_cast(event); - + auto rw_event = dynamic_cast(event); auto buffer = rw_event->GetBuffer(); buffer->MoveWritePt(len); @@ -151,10 +154,6 @@ void WinRWSocket::OnRead(Event* event, uint32_t len) { Read(); } -void WinRWSocket::OnWrite(uint32_t len) { - // do nothing -} - void WinRWSocket::OnWrite(Event* event, uint32_t len) { auto cppnet_base = _cppnet_base.lock(); if (!cppnet_base) { @@ -165,7 +164,7 @@ void WinRWSocket::OnWrite(Event* event, uint32_t len) { return; } - auto rw_event = dynamic_cast(event); + auto rw_event = dynamic_cast(event); auto buffer = rw_event->GetBuffer(); buffer->MoveReadPt(len); @@ -175,8 +174,7 @@ void WinRWSocket::OnWrite(Event* event, uint32_t len) { if (buffer->GetCanReadLength() > 0) { auto actions = GetEventActions(); if (actions) { - auto iocp = std::dynamic_pointer_cast(actions); - if (iocp->AddSendEvent(event)) { + if (actions->AddSendEvent(event)) { LOG_ERROR("post send event. sock:%d", _sock); } } @@ -186,22 +184,17 @@ void WinRWSocket::OnWrite(Event* event, uint32_t len) { } } -void WinRWSocket::OnDisConnect(uint16_t err) { - // do nothing -} - void WinRWSocket::OnDisConnect(Event* event, uint16_t err) { RemvoeEvent(event); if (EventEmpty() && IsShutdown()) { auto sock = shared_from_this(); - __all_socket_map.Erase(_sock); auto cppnet_base = _cppnet_base.lock(); - if (cppnet_base) { cppnet_base->OnDisConnect(sock, err); } + __all_socket_map.Erase(_sock); OsHandle::Close(_sock); } } @@ -222,42 +215,4 @@ bool WinRWSocket::EventEmpty() { return _event_set.empty(); } -bool WinRWSocket::Decref(uint16_t err) { - int16_t ref = _ref_count.fetch_sub(1); - if (ref == 1 && IsShutdown()) { - RWSocket::OnDisConnect(err); - return false; - } - return true; -} - -bool WinRWSocket::Recv(uint32_t len) { - auto cppnet_base = _cppnet_base.lock(); - if (!cppnet_base) { - return false; - } - if (len == 0) { - LOG_ERROR("read invalid length. sock:%d", _sock); - } - - _read_buffer->MoveWritePt(len); - cppnet_base->OnRead(shared_from_this(), len); - return true; -} - -bool WinRWSocket::Send(uint32_t len) { - auto cppnet_base = _cppnet_base.lock(); - if (!cppnet_base) { - return false; - } - if (len > 0) { - _write_buffer->MoveReadPt(len); - cppnet_base->OnWrite(shared_from_this(), len); - // do read again - Read(); - } - - return true; -} - } \ No newline at end of file diff --git a/cppnet/socket/win/win_rw_socket.h b/cppnet/socket/win/win_rw_socket.h index 4042c3f..a9e5d1e 100644 --- a/cppnet/socket/win/win_rw_socket.h +++ b/cppnet/socket/win/win_rw_socket.h @@ -27,34 +27,25 @@ public: virtual void Connect(const std::string& ip, uint16_t port); virtual void Disconnect(); - virtual void OnRead(uint32_t len = 0); virtual void OnRead(Event* event, uint32_t len = 0); - virtual void OnWrite(uint32_t len = 0); virtual void OnWrite(Event* event, uint32_t len = 0); - virtual void OnDisConnect(uint16_t err); + virtual void OnConnect(Event* event, uint16_t err); virtual void OnDisConnect(Event* event, uint16_t err); - void Incref() { _ref_count.fetch_add(1); } - bool Decref(uint16_t err = CEC_CLOSED); - void SetShutdown() { _shutdown = true; } bool IsShutdown() { return _shutdown; } +private: void AddEvent(Event* event); void RemvoeEvent(Event* event); bool EventEmpty(); private: - bool Recv(uint32_t len); - bool Send(uint32_t len = 0); - -private: - std::atomic_int16_t _ref_count; std::atomic_bool _shutdown; - -private: std::atomic_bool _is_reading; + std::mutex _event_mutex; + // all event std::unordered_set _event_set; };