From 136cf536241e021e568d2ad1829ec80bcd699292 Mon Sep 17 00:00:00 2001
From: caozhiyi <272653256@qq.com>
Date: Thu, 27 May 2021 00:00:45 +0800
Subject: [PATCH] resruct iocp event action.
---
Cppnet.vcxproj | 1 -
Cppnet.vcxproj.filters | 3 -
cppnet/event/action_interface.h | 13 +-
cppnet/event/event_interface.h | 10 +
cppnet/event/win/iocp_action.cpp | 334 +----------------------
cppnet/event/win/iocp_action.h | 15 +-
cppnet/event/win/rw_event.h | 36 ---
cppnet/socket/connect_socket.cpp | 11 +-
cppnet/socket/connect_socket.h | 9 +-
cppnet/socket/rw_socket.cpp | 26 +-
cppnet/socket/rw_socket.h | 14 +-
cppnet/socket/win/win_connect_socket.cpp | 82 ++++--
cppnet/socket/win/win_connect_socket.h | 10 +-
cppnet/socket/win/win_rw_socket.cpp | 123 +++------
cppnet/socket/win/win_rw_socket.h | 17 +-
15 files changed, 168 insertions(+), 536 deletions(-)
delete mode 100644 cppnet/event/win/rw_event.h
diff --git a/Cppnet.vcxproj b/Cppnet.vcxproj
index 3066d34..9d8fbc0 100644
--- a/Cppnet.vcxproj
+++ b/Cppnet.vcxproj
@@ -187,7 +187,6 @@
-
diff --git a/Cppnet.vcxproj.filters b/Cppnet.vcxproj.filters
index 4e1f7ac..3adae77 100644
--- a/Cppnet.vcxproj.filters
+++ b/Cppnet.vcxproj.filters
@@ -213,9 +213,6 @@
cppnet\event\win
-
- cppnet\event\win
-
common\log
diff --git a/cppnet/event/action_interface.h b/cppnet/event/action_interface.h
index a036e8b..6cd7606 100644
--- a/cppnet/event/action_interface.h
+++ b/cppnet/event/action_interface.h
@@ -30,14 +30,13 @@ public:
virtual bool Dealloc() = 0;
// net io event
- virtual bool AddSendEvent(std::shared_ptr& event) = 0;
- virtual bool AddRecvEvent(std::shared_ptr& event) = 0;
- virtual bool AddWinAcceptEvent(std::shared_ptr& event) = 0;
+ virtual bool AddSendEvent(Event* event) = 0;
+ virtual bool AddRecvEvent(Event* event) = 0;
+ virtual bool AddAcceptEvent(Event* event) = 0;
+ virtual bool AddConnection(Event* event, Address& addr) = 0;
+ virtual bool AddDisconnection(Event* event) = 0;
- virtual bool AddConnection(std::shared_ptr& event, Address& addr) = 0;
- virtual bool AddDisconnection(std::shared_ptr& event) = 0;
-
- virtual bool DelEvent(std::shared_ptr& event) = 0;
+ virtual bool DelEvent(Event* event) = 0;
// io thread process
virtual void ProcessEvent(int32_t wait_ms) = 0;
// weak up net io thread
diff --git a/cppnet/event/event_interface.h b/cppnet/event/event_interface.h
index 2378fd6..ca6ebe5 100644
--- a/cppnet/event/event_interface.h
+++ b/cppnet/event/event_interface.h
@@ -26,6 +26,7 @@ enum EventType {
const char* TypeString(EventType type);
class Socket;
+class BufferQueue;
class Event {
public:
Event(): _data(nullptr), _event_type(0) {}
@@ -43,10 +44,19 @@ public:
void SetSocket(std::shared_ptr socket) { _socket = socket; }
std::shared_ptr GetSocket() { return _socket.lock(); }
+#ifdef __win__
+ void SetBuffer(std::shared_ptr& buffer) { _buffer = buffer; }
+ std::shared_ptr GetBuffer() { return _buffer; }
+private:
+ std::shared_ptr _buffer;
+#endif
+
protected:
void* _data;
uint16_t _event_type;
std::weak_ptr _socket;
+
+
};
}
diff --git a/cppnet/event/win/iocp_action.cpp b/cppnet/event/win/iocp_action.cpp
index c5331d3..814f5d8 100644
--- a/cppnet/event/win/iocp_action.cpp
+++ b/cppnet/event/win/iocp_action.cpp
@@ -8,7 +8,6 @@
#include "iocp_action.h"
#include "cppnet/cppnet_config.h"
-#include "cppnet/event/win/rw_event.h"
#include "cppnet/event/win/accept_event.h"
#include "cppnet/socket/win/win_rw_socket.h"
#include "cppnet/socket/win/win_connect_socket.h"
@@ -18,7 +17,6 @@
#include "common/network/socket.h"
#include "common/buffer/buffer_queue.h"
-
namespace cppnet {
std::shared_ptr MakeEventActions() {
@@ -57,7 +55,7 @@ IOCPEventActions::~IOCPEventActions() {
bool IOCPEventActions::Init(uint32_t thread_num) {
WinSockInit();
- //tell iocp thread num
+ //tell IOCP thread num
_iocp_handler = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, thread_num);
if (_iocp_handler == INVALID_HANDLE_VALUE) {
LOG_ERROR("IOCP create io completion port failed!");
@@ -71,115 +69,6 @@ bool IOCPEventActions::Dealloc() {
return true;
}
-bool IOCPEventActions::AddSendEvent(std::shared_ptr& event) {
- if (event->GetType() & ET_WRITE || event->GetType() & ET_DISCONNECT) {
- LOG_WARN_S << "repeat send event";
- return false;
- }
-
- auto sock = event->GetSocket();
- if (!sock) {
- LOG_WARN("socket is already distroyed! event %s", "AddSendEvent");
- return false;
- }
- auto rw_sock = std::dynamic_pointer_cast(sock);
- if (rw_sock->IsShutdown()) {
- LOG_WARN_S << "socket is shutdown when send";
- return false;
- }
-
- /*if (!(event->GetType() & ET_INACTIONS)) {
- if (!AddToIOCP(sock->GetSocket())) {
- LOG_WARN_S << "add to iocp failed when send";
- return false;
- }
- event->AddType(ET_INACTIONS);
- }*/
- auto rw_event = std::dynamic_pointer_cast(event);
- EventOverlapped* context = (EventOverlapped*)rw_event->GetData();
-
- if (!context) {
- context = rw_sock->GetAlloter()->PoolNew();
- context->_event = (void*)&event;
- rw_event->SetData(context);
- }
-
- context->_event_type = ET_WRITE;
-
- auto buffer = rw_event->GetBuffer();
- std::vector bufs;
- buffer->GetUseMemoryBlock(bufs, __iocp_buff_size);
-
- DWORD dwFlags = 0;
- int32_t ret = WSASend((SOCKET)sock->GetSocket(), (LPWSABUF)&(*bufs.begin()), (DWORD)bufs.size(), nullptr, dwFlags, &context->_overlapped, nullptr);
-
- if ((SOCKET_ERROR == ret) && (WSA_IO_PENDING != WSAGetLastError())) {
- LOG_WARN("IOCP post send event failed! error code:%d, info:%s", WSAGetLastError(), ErrnoInfo(WSAGetLastError()));
- //rw_sock->OnDisConnect(CEC_CLOSED);
- DelEvent(event);
- rw_sock->SetShutdown();
- return false;
- }
-
- // send some data immediately
- event->AddType(ET_WRITE);
- LOG_DEBUG("post a new write event");
- return true;
-}
-
-bool IOCPEventActions::AddRecvEvent(std::shared_ptr& event) {
- if (event->GetType() & ET_READ || event->GetType() & ET_DISCONNECT) {
- LOG_WARN_S << "repeat recv event";
- return false;
- }
-
- auto sock = event->GetSocket();
- if (!sock) {
- LOG_WARN("socket is already distroyed! event %s", "AddSendEvent");
- return false;
- }
- auto rw_sock = std::dynamic_pointer_cast(sock);
- if (rw_sock->IsShutdown()) {
- LOG_WARN_S << "socket is shutdown when recv";
- return false;
- }
-
- /*if (!(event->GetType() & ET_INACTIONS)) {
- if (!AddToIOCP(sock->GetSocket())) {
- LOG_WARN_S << "add to iocp failed when recv";
- return false;
- }
- event->AddType(ET_INACTIONS);
- }*/
-
- EventOverlapped* context = (EventOverlapped*)event->GetData();
-
- if (!context) {
- context = rw_sock->GetAlloter()->PoolNew();
- context->_event = (void*)&event;
- event->SetData(context);
- }
- context->_event_type = ET_READ;
-
- auto buffer = rw_sock->GetReadBuffer();
- std::vector bufs;
- buffer->GetFreeMemoryBlock(bufs, __iocp_buff_size);
-
- DWORD dwFlags = 0;
- int32_t ret = WSARecv((SOCKET)sock->GetSocket(), (LPWSABUF)&(*bufs.begin()), (DWORD)bufs.size(), nullptr, &dwFlags, &context->_overlapped, nullptr);
-
- if ((SOCKET_ERROR == ret) && (WSA_IO_PENDING != WSAGetLastError())) {
- LOG_WARN("IOCP post recv event failed! error code: %d, info:%s", WSAGetLastError(), ErrnoInfo(WSAGetLastError()));
- //rw_sock->OnDisConnect(CEC_CLOSED);
- DelEvent(event);
- rw_sock->SetShutdown();
- return false;
- }
- event->AddType(ET_READ);
- LOG_DEBUG("post a new read event");
- return true;
-}
-
bool IOCPEventActions::AddSendEvent(Event* event) {
if (event->GetType() & ET_WRITE || event->GetType() & ET_DISCONNECT) {
LOG_WARN_S << "repeat send event";
@@ -197,25 +86,17 @@ bool IOCPEventActions::AddSendEvent(Event* event) {
return false;
}
- /*if (!(event->GetType() & ET_INACTIONS)) {
- if (!AddToIOCP(sock->GetSocket())) {
- LOG_WARN_S << "add to iocp failed when send";
- return false;
- }
- event->AddType(ET_INACTIONS);
- }*/
- auto rw_event = dynamic_cast(event);
- EventOverlapped* context = (EventOverlapped*)rw_event->GetData();
+ EventOverlapped* context = (EventOverlapped*)event->GetData();
if (!context) {
context = rw_sock->GetAlloter()->PoolNew();
context->_event = (void*)event;
- rw_event->SetData(context);
+ event->SetData(context);
}
context->_event_type = ET_WRITE;
- auto buffer = rw_event->GetBuffer();
+ auto buffer = event->GetBuffer();
std::vector bufs;
buffer->GetUseMemoryBlock(bufs, __iocp_buff_size);
@@ -224,8 +105,6 @@ bool IOCPEventActions::AddSendEvent(Event* event) {
if ((SOCKET_ERROR == ret) && (WSA_IO_PENDING != WSAGetLastError())) {
LOG_WARN("IOCP post send event failed! error code:%d, info:%s", WSAGetLastError(), ErrnoInfo(WSAGetLastError()));
- //rw_sock->OnDisConnect(CEC_CLOSED);
- //DelEvent(event);
rw_sock->SetShutdown();
return false;
}
@@ -253,14 +132,6 @@ bool IOCPEventActions::AddRecvEvent(Event* event) {
return false;
}
- /*if (!(event->GetType() & ET_INACTIONS)) {
- if (!AddToIOCP(sock->GetSocket())) {
- LOG_WARN_S << "add to iocp failed when recv";
- return false;
- }
- event->AddType(ET_INACTIONS);
- }*/
-
EventOverlapped* context = (EventOverlapped*)event->GetData();
if (!context) {
@@ -270,8 +141,7 @@ bool IOCPEventActions::AddRecvEvent(Event* event) {
}
context->_event_type = ET_READ;
- auto win_rw = dynamic_cast(event);
- auto buffer = win_rw->GetBuffer();
+ auto buffer = event->GetBuffer();
std::vector bufs;
buffer->GetFreeMemoryBlock(bufs, __iocp_buff_size);
@@ -280,8 +150,6 @@ bool IOCPEventActions::AddRecvEvent(Event* event) {
if ((SOCKET_ERROR == ret) && (WSA_IO_PENDING != WSAGetLastError())) {
LOG_WARN("IOCP post recv event failed! error code: %d, info:%s", WSAGetLastError(), ErrnoInfo(WSAGetLastError()));
- //rw_sock->OnDisConnect(CEC_CLOSED);
- //DelEvent(event);
rw_sock->SetShutdown();
return false;
}
@@ -290,7 +158,7 @@ bool IOCPEventActions::AddRecvEvent(Event* event) {
return true;
}
-bool IOCPEventActions::AddWinAcceptEvent(std::shared_ptr& event) {
+bool IOCPEventActions::AddAcceptEvent(Event* event) {
if (event->GetType() & ET_ACCEPT) {
LOG_WARN_S << "repeat accept event";
return false;
@@ -303,63 +171,6 @@ bool IOCPEventActions::AddWinAcceptEvent(std::shared_ptr& event) {
}
auto accept_sock = std::dynamic_pointer_cast(sock);
- if (!accept_sock->GetInActions()) {
- if (!AddToIOCP(sock->GetSocket())) {
- LOG_WARN_S << "add to iocp failed when accept";
- return false;
- }
- accept_sock->SetInActions(true);
- }
-
- auto accept_event = std::dynamic_pointer_cast(event);
-
- EventOverlapped* context = (EventOverlapped*)event->GetData();
- if (!context) {
- context = new EventOverlapped();
- context->_event = (void*)&event;
- event->SetData(context);
- }
- context->_event_type = ET_ACCEPT;
-
- DWORD dwBytes = 0;
- uint32_t ret = AcceptEx((SOCKET)sock->GetSocket(), (SOCKET)accept_event->GetClientSocket(), accept_event->GetBuf(), __iocp_buff_size - ((sizeof(SOCKADDR_STORAGE) + 16) * 2),
- sizeof(SOCKADDR_STORAGE) + 16, sizeof(SOCKADDR_STORAGE) + 16, &dwBytes, &context->_overlapped);
-
- if (0 == ret) {
- if (WSA_IO_PENDING != WSAGetLastError()) {
- LOG_ERROR("IOCP post accept failed! error code:%d", WSAGetLastError());
- DelEvent(event);
- return false;
- }
- }
-
- event->AddType(ET_ACCEPT);
- LOG_DEBUG("post a new accept event");
-
- return true;
-}
-
-bool IOCPEventActions::AddWinAcceptEvent(Event* event) {
- if (event->GetType() & ET_ACCEPT) {
- LOG_WARN_S << "repeat accept event";
- return false;
- }
-
- auto sock = event->GetSocket();
- if (!sock) {
- LOG_WARN("socket is already distroyed! event %s", "AddWinAcceptEvent");
- return false;
- }
-
- auto accept_sock = std::dynamic_pointer_cast(sock);
- if (!accept_sock->GetInActions()) {
- if (!AddToIOCP(sock->GetSocket())) {
- LOG_WARN_S << "add to iocp failed when accept";
- return false;
- }
- accept_sock->SetInActions(true);
- }
-
auto accept_event = dynamic_cast(event);
EventOverlapped* context = (EventOverlapped*)event->GetData();
@@ -377,7 +188,6 @@ bool IOCPEventActions::AddWinAcceptEvent(Event* event) {
if (0 == ret) {
if (WSA_IO_PENDING != WSAGetLastError()) {
LOG_ERROR("IOCP post accept failed! error code:%d", WSAGetLastError());
- //DelEvent(event);
return false;
}
}
@@ -388,125 +198,6 @@ bool IOCPEventActions::AddWinAcceptEvent(Event* event) {
return true;
}
-bool IOCPEventActions::AddConnection(std::shared_ptr& event, Address& address) {
- if (event->GetType() & ET_CONNECT) {
- LOG_WARN_S << "repeat connect event";
- return false;
- }
-
- auto sock = event->GetSocket();
- if (!sock) {
- LOG_WARN("socket is already distroyed! event %s", "AddSendEvent");
- return false;
- }
-
- EventOverlapped* context = (EventOverlapped*)event->GetData();
- if (!context) {
- context = sock->GetAlloter()->PoolNew();
- context->_event = (void*)&event;
- event->SetData(context);
- }
-
- if (address.GetType() == AT_IPV4) {
- SOCKADDR_IN local;
- local.sin_family = AF_INET;
- local.sin_port = htons(0);
- local.sin_addr.S_un.S_addr = INADDR_ANY;
- if (bind(sock->GetSocket(), (sockaddr*)&local, sizeof(local)) != 0) {
- LOG_FATAL("bind local host failed! error code:%d, info:%s", WSAGetLastError(), ErrnoInfo(WSAGetLastError()));
- }
-
- } else {
- SOCKADDR_IN6 local;
- local.sin6_flowinfo = 0;
- local.sin6_scope_id = 0;
- local.sin6_family = AF_INET6;
- local.sin6_port = 0;
- local.sin6_addr = in6addr_any;
- if (bind(sock->GetSocket(), (sockaddr*)&local, sizeof(local)) != 0) {
- LOG_FATAL("bind local host failed! error code:%d, info:%s", WSAGetLastError(), ErrnoInfo(WSAGetLastError()));
- }
- }
-
- DWORD dwBytes = 0;
- int32_t ret = -1;
- if (address.GetType() == AT_IPV4) {
- SOCKADDR_IN addr;
- addr.sin_family = AF_INET6;
- addr.sin_port = htons(address.GetAddrPort());
- addr.sin_addr.S_un.S_addr = inet_addr(address.GetIp().c_str());
- ConnectEx((SOCKET)sock->GetSocket(), (sockaddr*)&addr, sizeof(addr), nullptr, 0, &dwBytes, &context->_overlapped);
-
- } else {
- SOCKADDR_IN6 addr;
- addr.sin6_flowinfo = 0;
- addr.sin6_scope_id = 0;
- addr.sin6_family = AF_INET6;
- addr.sin6_port = htons(address.GetAddrPort());
- inet_pton(AF_INET6, address.GetIp().c_str(), &addr.sin6_addr);
- ConnectEx((SOCKET)sock->GetSocket(), (sockaddr*)&addr, sizeof(addr), nullptr, 0, &dwBytes, &context->_overlapped);
- }
-
- setsockopt((SOCKET)sock->GetSocket(), SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0);
-
- auto rw_sock = std::dynamic_pointer_cast(sock);
- if (ret) {
- rw_sock->OnConnect(CEC_SUCCESS);
- return true;
- }
-
- int32_t err = WSAGetLastError();
- if (WSA_IO_PENDING == err || ERROR_SUCCESS == err) {
- if (CheckConnect(rw_sock->GetSocket())) {
- rw_sock->OnConnect(CEC_SUCCESS);
- return true;
- }
- }
- context->_event_type = ET_CONNECT;
- DoEvent(context, 0);
-
- return false;
-}
-
-bool IOCPEventActions::AddDisconnection(std::shared_ptr& event) {
- if (event->GetType() & ET_DISCONNECT) {
- LOG_WARN_S << "repeat disconnect event";
- return false;
- }
-
- auto sock = event->GetSocket();
- if (!sock) {
- LOG_WARN("socket is already distroyed! event %s", "AddSendEvent");
- return false;
- }
- auto rw_sock = std::dynamic_pointer_cast(sock);
- if (rw_sock->IsShutdown()) {
- return false;
- }
-
- EventOverlapped* context = (EventOverlapped*)event->GetData();
- if (!context) {
- context = sock->GetAlloter()->PoolNew();
- context->_event = (void*)&event;
- event->SetData(context);
- }
-
- context->_event_type = ET_DISCONNECT;
- int32_t ret = DisconnectionEx((SOCKET)sock->GetSocket(), &context->_overlapped, TF_REUSE_SOCKET, 0);
-
- if ((SOCKET_ERROR == ret) && (WSA_IO_PENDING != WSAGetLastError())) {
- LOG_FATAL("IOCP post disconnect event failed! error code: %d", WSAGetLastError());
- return false;
- }
- LOG_DEBUG("post a new disconnect event");
-
- event->AddType(ET_DISCONNECT);
- //DelEvent(event);
- //auto rw_sock = std::dynamic_pointer_cast(sock);
- //rw_sock->OnDisConnect(CEC_CLOSED);
- return true;
-}
-
bool IOCPEventActions::AddConnection(Event* event, Address& address) {
if (event->GetType() & ET_CONNECT) {
LOG_WARN_S << "repeat connect event";
@@ -556,8 +247,7 @@ bool IOCPEventActions::AddConnection(Event* event, Address& address) {
addr.sin_addr.S_un.S_addr = inet_addr(address.GetIp().c_str());
ConnectEx((SOCKET)sock->GetSocket(), (sockaddr*)&addr, sizeof(addr), nullptr, 0, &dwBytes, &context->_overlapped);
- }
- else {
+ } else {
SOCKADDR_IN6 addr;
addr.sin6_flowinfo = 0;
addr.sin6_scope_id = 0;
@@ -582,10 +272,9 @@ bool IOCPEventActions::AddConnection(Event* event, Address& address) {
return true;
}
}
- context->_event_type = ET_CONNECT;
- DoEvent(context, 0);
- return false;
+ rw_sock->OnConnect(CEC_CONNECT_REFUSE);
+ return true;
}
bool IOCPEventActions::AddDisconnection(Event* event) {
@@ -621,13 +310,10 @@ bool IOCPEventActions::AddDisconnection(Event* event) {
LOG_DEBUG("post a new disconnect event");
event->AddType(ET_DISCONNECT);
- //DelEvent(event);
- //auto rw_sock = std::dynamic_pointer_cast(sock);
- //rw_sock->OnDisConnect(CEC_CLOSED);
return true;
}
-bool IOCPEventActions::DelEvent(std::shared_ptr& event) {
+bool IOCPEventActions::DelEvent(Event* event) {
auto sock = event->GetSocket();
if (!sock) {
LOG_WARN("socket is already distroyed! event %s", "AddSendEvent");
diff --git a/cppnet/event/win/iocp_action.h b/cppnet/event/win/iocp_action.h
index 561c2d1..475a125 100644
--- a/cppnet/event/win/iocp_action.h
+++ b/cppnet/event/win/iocp_action.h
@@ -22,26 +22,21 @@ public:
virtual bool Init(uint32_t thread_num = 0);
virtual bool Dealloc();
// net io event
- virtual bool AddSendEvent(std::shared_ptr& event);
- virtual bool AddRecvEvent(std::shared_ptr& event);
virtual bool AddSendEvent(Event* event);
virtual bool AddRecvEvent(Event* event);
- virtual bool AddWinAcceptEvent(std::shared_ptr& event);
- virtual bool AddWinAcceptEvent(Event* event);
-
- virtual bool AddConnection(std::shared_ptr& event, Address& address);
- virtual bool AddDisconnection(std::shared_ptr& event);
-
+ virtual bool AddAcceptEvent(Event* event);
virtual bool AddConnection(Event* event, Address& address);
virtual bool AddDisconnection(Event* event);
- virtual bool DelEvent(std::shared_ptr& event);
+ virtual bool DelEvent(Event* event);
// io thread process
virtual void ProcessEvent(int32_t wait_ms);
// weak up net io thread
virtual void Wakeup();
-public:
+
bool AddToIOCP(uint64_t sock);
+
+private:
void DoEvent(EventOverlapped *socket_context, uint32_t bytes);
protected:
diff --git a/cppnet/event/win/rw_event.h b/cppnet/event/win/rw_event.h
deleted file mode 100644
index 6ccdfd8..0000000
--- a/cppnet/event/win/rw_event.h
+++ /dev/null
@@ -1,36 +0,0 @@
-// Use of this source code is governed by a BSD 3-Clause License
-// that can be found in the LICENSE file.
-
-// Author: caozhiyi (caozhiyi5@gmail.com)
-
-#ifndef CPPNET_EVENT_WIN_RW_EVENT
-#define CPPNET_EVENT_WIN_RW_EVENT
-
-#include "cppnet/event/event_interface.h"
-
-namespace cppnet {
-
-class BufferQueue;
-class WinRWEvent:
- public Event{
-public:
- WinRWEvent():
- _ex_data(nullptr) {}
- virtual ~WinRWEvent() {}
-
- void SetExData(void* data) { _ex_data = data; }
- void* GetExData() { return _ex_data; }
-
- void SetBuffer(std::shared_ptr& buffer) { _buffer = buffer; }
- std::shared_ptr GetBuffer() { return _buffer; }
-
-private:
- void* _ex_data;
-
-private:
- std::shared_ptr _buffer;
-};
-
-}
-
-#endif
\ No newline at end of file
diff --git a/cppnet/socket/connect_socket.cpp b/cppnet/socket/connect_socket.cpp
index 9bd941d..456b455 100644
--- a/cppnet/socket/connect_socket.cpp
+++ b/cppnet/socket/connect_socket.cpp
@@ -27,14 +27,7 @@ ConnectSocket::ConnectSocket() {
}
ConnectSocket::~ConnectSocket() {
- if (_sock > 0) {
-#ifdef __win__
- __all_socket_map.Erase(_sock);
-#else
- __all_socket_map.erase(_sock);
-#endif
- OsHandle::Close(_sock);
- }
+
}
bool ConnectSocket::Bind(const std::string& ip, uint16_t port) {
@@ -77,6 +70,7 @@ bool ConnectSocket::Listen() {
return true;
}
+/*
void ConnectSocket::Accept() {
if (!_accept_event) {
_accept_event = std::make_shared();
@@ -88,5 +82,6 @@ void ConnectSocket::Accept() {
actions->AddWinAcceptEvent(_accept_event);
}
}
+*/
}
\ No newline at end of file
diff --git a/cppnet/socket/connect_socket.h b/cppnet/socket/connect_socket.h
index f955a10..e09d3b6 100644
--- a/cppnet/socket/connect_socket.h
+++ b/cppnet/socket/connect_socket.h
@@ -25,12 +25,11 @@ public:
virtual bool Bind(const std::string& ip, uint16_t port);
virtual bool Listen();
- virtual void Accept();
+ virtual void Accept() {}
+ virtual void Accept(uint16_t index) {}
+ virtual void Close() {}
- virtual void OnAccept() {}
-
-protected:
- std::shared_ptr _accept_event;
+ virtual void OnAccept(Event* event) {}
};
std::shared_ptr MakeConnectSocket();
diff --git a/cppnet/socket/rw_socket.cpp b/cppnet/socket/rw_socket.cpp
index 28735a1..f316b57 100644
--- a/cppnet/socket/rw_socket.cpp
+++ b/cppnet/socket/rw_socket.cpp
@@ -23,7 +23,7 @@ RWSocket::RWSocket(std::shared_ptr alloter):
Socket(alloter) {
_block_pool = _alloter->PoolNewSharePtr(__mem_block_size, __mem_block_add_step);
- _write_buffer = _alloter->PoolNewSharePtr(_block_pool, _alloter);
+ //_write_buffer = _alloter->PoolNewSharePtr(_block_pool, _alloter);
_read_buffer = _alloter->PoolNewSharePtr(_block_pool, _alloter);
}
@@ -31,15 +31,15 @@ RWSocket::RWSocket(uint64_t sock, std::shared_ptr alloter):
Socket(sock, alloter) {
_block_pool = _alloter->PoolNewSharePtr(__mem_block_size, __mem_block_add_step);
- _write_buffer = _alloter->PoolNewSharePtr(_block_pool, _alloter);
+ //_write_buffer = _alloter->PoolNewSharePtr(_block_pool, _alloter);
_read_buffer = _alloter->PoolNewSharePtr(_block_pool, _alloter);
}
RWSocket::~RWSocket() {
// free buffer early than pool
- _write_buffer.reset();
- _read_buffer.reset();
- _block_pool.reset();
+ //_write_buffer.reset();
+ //_read_buffer.reset();
+ //_block_pool.reset();
}
bool RWSocket::GetAddress(std::string& ip, uint16_t& port) {
@@ -55,7 +55,7 @@ bool RWSocket::Close() {
}
void RWSocket::Read() {
- if (!_event) {
+ /*if (!_event) {
_event = _alloter->PoolNewSharePtr();
_event->SetSocket(shared_from_this());
}
@@ -63,11 +63,11 @@ void RWSocket::Read() {
auto actions = GetEventActions();
if (actions) {
actions->AddRecvEvent(_event);
- }
+ }*/
}
void RWSocket::Connect(const std::string& ip, uint16_t port) {
- if (!_event) {
+ /*if (!_event) {
_event = _alloter->PoolNewSharePtr();
_event->SetSocket(shared_from_this());
}
@@ -88,11 +88,11 @@ void RWSocket::Connect(const std::string& ip, uint16_t port) {
auto actions = GetEventActions();
if (actions) {
actions->AddConnection(_event, _addr);
- }
+ }*/
}
void RWSocket::Disconnect() {
- if (!_event) {
+ /*if (!_event) {
_event = _alloter->PoolNewSharePtr();
_event->SetSocket(shared_from_this());
}
@@ -100,7 +100,7 @@ void RWSocket::Disconnect() {
auto actions = GetEventActions();
if (actions) {
actions->AddDisconnection(_event);
- }
+ }*/
}
void RWSocket::OnTimer() {
@@ -155,9 +155,9 @@ void RWSocket::OnDisConnect(uint16_t err) {
}
// not active disconnection
- if (_event && !(_event->GetType() & ET_DISCONNECT)) {
+ /*if (_event && !(_event->GetType() & ET_DISCONNECT)) {
OsHandle::Close(_sock);
- }
+ }*/
}
}
\ No newline at end of file
diff --git a/cppnet/socket/rw_socket.h b/cppnet/socket/rw_socket.h
index d38f8f1..b5b896a 100644
--- a/cppnet/socket/rw_socket.h
+++ b/cppnet/socket/rw_socket.h
@@ -46,15 +46,19 @@ public:
virtual void OnDisConnect(uint16_t err);
std::shared_ptr GetReadBuffer() { return _read_buffer; }
+#ifndef __win__
std::shared_ptr GetWriteBuffer() { return _write_buffer; }
-
+#endif
+
protected:
std::shared_ptr _block_pool;
-
- std::shared_ptr _event;
-
- std::shared_ptr _write_buffer;
std::shared_ptr _read_buffer;
+
+#ifndef __win__
+ std::shared_ptr _event;
+ std::shared_ptr _write_buffer;
+#endif
+
};
std::shared_ptr MakeRWSocket(std::shared_ptr alloter);
diff --git a/cppnet/socket/win/win_connect_socket.cpp b/cppnet/socket/win/win_connect_socket.cpp
index f3e3a47..9504d10 100644
--- a/cppnet/socket/win/win_connect_socket.cpp
+++ b/cppnet/socket/win/win_connect_socket.cpp
@@ -9,9 +9,8 @@
#include "cppnet/cppnet_base.h"
#include "cppnet/socket/rw_socket.h"
#include "cppnet/event/win/expend_func.h"
-#include "cppnet/event/win/accept_event.h"
-#include "cppnet/event/action_interface.h"
#include "cppnet/event/win/iocp_action.h"
+#include "cppnet/event/win/accept_event.h"
#include "common/log/log.h"
#include "common/os/convert.h"
@@ -30,8 +29,8 @@ std::shared_ptr MakeConnectSocket() {
return std::make_shared();
}
-WinConnectSocket::WinConnectSocket():
- _in_actions(false) {
+WinConnectSocket::WinConnectSocket() {
+ // create all accept event.
for (uint16_t i = 0; i < __iocp_accept_event_num; i++) {
auto event = new WinAcceptEvent(i);
_accept_event_vec.emplace_back(event);
@@ -39,7 +38,42 @@ WinConnectSocket::WinConnectSocket():
}
WinConnectSocket::~WinConnectSocket() {
+ __all_socket_map.Erase(_sock);
+ for (auto iter = _accept_event_vec.begin(); iter != _accept_event_vec.end(); iter++) {
+ delete *iter;
+ }
+}
+bool WinConnectSocket::Bind(const std::string& ip, uint16_t port) {
+ if (_sock == 0) {
+ auto ret = OsHandle::TcpSocket();
+ if (ret._return_value < 0) {
+ LOG_ERROR("create socket failed. errno:%d, info:%s", ret._errno, ErrnoInfo(ret._errno));
+ return false;
+ }
+ _sock = ret._return_value;
+
+ auto action = GetEventActions();
+ auto iocp = std::dynamic_pointer_cast(action);
+ if (!iocp->AddToIOCP(_sock)) {
+ LOG_FATAL("add accept socket to iocp failed!");
+ OsHandle::Close(_sock);
+ return false;
+ }
+ }
+
+ _addr.SetIp(ip);
+ _addr.SetAddrPort(port);
+
+ auto ret = OsHandle::Bind(_sock, _addr);
+
+ if (ret._return_value < 0) {
+ LOG_FATAL("window bind socket filed! error:%d, info:%s", ret._errno, ErrnoInfo(ret._errno));
+ OsHandle::Close(_sock);
+ return false;
+ }
+
+ return true;
}
void WinConnectSocket::Accept() {
@@ -68,12 +102,16 @@ void WinConnectSocket::Accept(uint16_t index) {
auto actions = GetEventActions();
if (actions) {
- auto iocp = std::dynamic_pointer_cast(actions);
- iocp->AddWinAcceptEvent(event);
+ actions->AddAcceptEvent(event);
}
}
-void WinConnectSocket::OnAccept(WinAcceptEvent* event) {
+void WinConnectSocket::Close() {
+ __all_socket_map.Erase(_sock);
+ OsHandle::Close(_sock);
+}
+
+void WinConnectSocket::OnAccept(Event* event) {
auto cppnet_base = _cppnet_base.lock();
if (!cppnet_base) {
return;
@@ -84,18 +122,18 @@ void WinConnectSocket::OnAccept(WinAcceptEvent* event) {
SOCKADDR_STORAGE* LocalAddr = NULL;
int localLen = sizeof(SOCKADDR_STORAGE);
+ auto accept_event = dynamic_cast(event);
+
// accept a socket and read msg
- AcceptExSockAddrs(event->GetBuf(), __iocp_buff_size - ((sizeof(SOCKADDR_STORAGE) + 16) * 2),
+ AcceptExSockAddrs(accept_event->GetBuf(), __iocp_buff_size - ((sizeof(SOCKADDR_STORAGE) + 16) * 2),
sizeof(SOCKADDR_STORAGE) + 16, sizeof(SOCKADDR_STORAGE) + 16, (LPSOCKADDR*)&LocalAddr, &localLen, (LPSOCKADDR*)&client_addr, &remote_len);
// Does this call have any effect ?
- setsockopt(event->GetClientSocket(), SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
+ setsockopt(accept_event->GetClientSocket(), SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
(char *)&_sock, sizeof(_sock));
- // create a new rw socket
- std::shared_ptr alloter = std::make_shared(MakePoolAlloterPtr());
+
Address address;
-
SOCKADDR* addr_pt = (SOCKADDR*)client_addr;
void* addr = nullptr;
switch (addr_pt->sa_family) {
@@ -117,29 +155,31 @@ void WinConnectSocket::OnAccept(WinAcceptEvent* event) {
inet_ntop(AF_INET6, addr, str_addr, sizeof(str_addr));
address.SetIp(str_addr);
- auto sock = MakeRWSocket(event->GetClientSocket(), std::move(alloter));
+ // create a new socket
+ std::shared_ptr alloter = std::make_shared(MakePoolAlloterPtr());
+ auto sock = MakeRWSocket(accept_event->GetClientSocket(), std::move(alloter));
sock->SetCppNetBase(cppnet_base);
sock->SetEventActions(_event_actions);
sock->SetAddress(std::move(address));
sock->SetDispatcher(GetDispatcher());
+ auto buffer = sock->GetReadBuffer();
+ buffer->Write(accept_event->GetBuf(), accept_event->GetBufOffset());
+ // add socket to iocp
auto action = GetEventActions();
auto iocp = std::dynamic_pointer_cast(action);
- iocp->AddToIOCP(event->GetClientSocket());
-
- __all_socket_map[event->GetClientSocket()] = sock;
-
- auto buffer = sock->GetReadBuffer();
- buffer->Write(event->GetBuf(), event->GetBufOffset());
+ iocp->AddToIOCP(accept_event->GetClientSocket());
+ // add socket global cache.
+ __all_socket_map[accept_event->GetClientSocket()] = sock;
// call accept call back function
cppnet_base->OnAccept(sock);
- cppnet_base->OnRead(sock, event->GetBufOffset());
+ cppnet_base->OnRead(sock, accept_event->GetBufOffset());
//post accept again
- Accept(event->GetIndex());
+ Accept(accept_event->GetIndex());
// wait for read
sock->Read();
diff --git a/cppnet/socket/win/win_connect_socket.h b/cppnet/socket/win/win_connect_socket.h
index 271c2b0..f84e3a0 100644
--- a/cppnet/socket/win/win_connect_socket.h
+++ b/cppnet/socket/win/win_connect_socket.h
@@ -18,17 +18,15 @@ public:
WinConnectSocket();
~WinConnectSocket();
+ virtual bool Bind(const std::string& ip, uint16_t port);
virtual void Accept();
- void Accept(uint16_t index);
+ virtual void Accept(uint16_t index);
+ virtual void Close();
- void OnAccept(WinAcceptEvent* event);
-
- void SetInActions(bool in) { _in_actions = in; }
- bool GetInActions() { return _in_actions; }
+ virtual void OnAccept(Event* event);
private:
std::vector _accept_event_vec;
- bool _in_actions;
};
}
diff --git a/cppnet/socket/win/win_rw_socket.cpp b/cppnet/socket/win/win_rw_socket.cpp
index 22b2a7f..d3cc861 100644
--- a/cppnet/socket/win/win_rw_socket.cpp
+++ b/cppnet/socket/win/win_rw_socket.cpp
@@ -7,9 +7,8 @@
#include "cppnet/cppnet_base.h"
#include "cppnet/cppnet_config.h"
-#include "cppnet/event/win/rw_event.h"
+#include "cppnet/event/event_interface.h"
#include "cppnet/event/action_interface.h"
-#include "cppnet/event/win/iocp_action.h"
#include "common/log/log.h"
#include "common/buffer/buffer_queue.h"
@@ -26,7 +25,6 @@ std::shared_ptr MakeRWSocket(uint64_t sock, std::shared_ptr alloter):
RWSocket(alloter),
- _ref_count(0),
_shutdown(false),
_is_reading(false) {
@@ -34,7 +32,6 @@ WinRWSocket::WinRWSocket(std::shared_ptr alloter):
WinRWSocket::WinRWSocket(uint64_t sock, std::shared_ptr alloter):
RWSocket(sock, alloter),
- _ref_count(0),
_shutdown(false),
_is_reading(false) {
@@ -50,7 +47,7 @@ void WinRWSocket::Read() {
return;
}
- auto rw_event = _alloter->PoolNew();
+ auto rw_event = _alloter->PoolNew();
auto buffer = _alloter->PoolNewSharePtr(_block_pool, _alloter);
rw_event->SetBuffer(buffer);
@@ -59,8 +56,7 @@ void WinRWSocket::Read() {
auto actions = GetEventActions();
if (actions) {
- auto iocp = std::dynamic_pointer_cast(actions);
- if (iocp->AddRecvEvent(event)) {
+ if (actions->AddRecvEvent(event)) {
_is_reading = true;
AddEvent(event);
}
@@ -68,22 +64,20 @@ void WinRWSocket::Read() {
}
bool WinRWSocket::Write(const char* src, uint32_t len) {
- auto rw_event = _alloter->PoolNew();
+ // create new write buffer
auto buffer = _alloter->PoolNewSharePtr(_block_pool, _alloter);
- rw_event->SetBuffer(buffer);
-
buffer->Write(src, len);
- auto event = dynamic_cast(rw_event);
- event->SetSocket(shared_from_this());
+ // create new write event
+ auto rw_event = _alloter->PoolNew();
+ rw_event->SetBuffer(buffer);
+
+ rw_event->SetSocket(shared_from_this());
auto actions = GetEventActions();
if (actions) {
- auto iocp = std::dynamic_pointer_cast(actions);
- if (iocp->AddSendEvent(event)) {
- std::lock_guard lock(_event_mutex);
- _event_set.insert(event);
-
+ if (actions->AddSendEvent(rw_event)) {
+ AddEvent(rw_event);
return true;
}
}
@@ -91,29 +85,39 @@ bool WinRWSocket::Write(const char* src, uint32_t len) {
}
void WinRWSocket::Connect(const std::string& ip, uint16_t port) {
- if (!_event) {
- _event = _alloter->PoolNewSharePtr();
- _event->SetSocket(shared_from_this());
+ if (_sock == 0) {
+ auto ret = OsHandle::TcpSocket();
+ if (ret._return_value < 0) {
+ LOG_ERROR("create socket failed. error:%d", ret._errno);
+ return;
+ }
+ _sock = ret._return_value;
}
- RWSocket::Connect(ip, port);
-}
-void WinRWSocket::Disconnect() {
- if (!_event) {
- _event = _alloter->PoolNewSharePtr();
- _event->SetSocket(shared_from_this());
- }
+ _addr.SetIp(ip);
+ _addr.SetAddrPort(port);
+
+ auto rw_event = _alloter->PoolNew();
+ rw_event->SetSocket(shared_from_this());
auto actions = GetEventActions();
if (actions) {
- if (actions->AddDisconnection(_event)) {
- Incref();
+ if (actions->AddConnection(rw_event, _addr)) {
+ AddEvent(rw_event);
}
}
}
-void WinRWSocket::OnRead(uint32_t len) {
- // do nothind
+void WinRWSocket::Disconnect() {
+ auto rw_event = _alloter->PoolNew();
+ rw_event->SetSocket(shared_from_this());
+
+ auto actions = GetEventActions();
+ if (actions) {
+ if (actions->AddDisconnection(rw_event)) {
+ AddEvent(rw_event);
+ }
+ }
}
void WinRWSocket::OnRead(Event* event, uint32_t len) {
@@ -126,8 +130,7 @@ void WinRWSocket::OnRead(Event* event, uint32_t len) {
return;
}
- auto rw_event = dynamic_cast(event);
-
+ auto rw_event = dynamic_cast(event);
auto buffer = rw_event->GetBuffer();
buffer->MoveWritePt(len);
@@ -151,10 +154,6 @@ void WinRWSocket::OnRead(Event* event, uint32_t len) {
Read();
}
-void WinRWSocket::OnWrite(uint32_t len) {
- // do nothing
-}
-
void WinRWSocket::OnWrite(Event* event, uint32_t len) {
auto cppnet_base = _cppnet_base.lock();
if (!cppnet_base) {
@@ -165,7 +164,7 @@ void WinRWSocket::OnWrite(Event* event, uint32_t len) {
return;
}
- auto rw_event = dynamic_cast(event);
+ auto rw_event = dynamic_cast(event);
auto buffer = rw_event->GetBuffer();
buffer->MoveReadPt(len);
@@ -175,8 +174,7 @@ void WinRWSocket::OnWrite(Event* event, uint32_t len) {
if (buffer->GetCanReadLength() > 0) {
auto actions = GetEventActions();
if (actions) {
- auto iocp = std::dynamic_pointer_cast(actions);
- if (iocp->AddSendEvent(event)) {
+ if (actions->AddSendEvent(event)) {
LOG_ERROR("post send event. sock:%d", _sock);
}
}
@@ -186,22 +184,17 @@ void WinRWSocket::OnWrite(Event* event, uint32_t len) {
}
}
-void WinRWSocket::OnDisConnect(uint16_t err) {
- // do nothing
-}
-
void WinRWSocket::OnDisConnect(Event* event, uint16_t err) {
RemvoeEvent(event);
if (EventEmpty() && IsShutdown()) {
auto sock = shared_from_this();
- __all_socket_map.Erase(_sock);
auto cppnet_base = _cppnet_base.lock();
-
if (cppnet_base) {
cppnet_base->OnDisConnect(sock, err);
}
+ __all_socket_map.Erase(_sock);
OsHandle::Close(_sock);
}
}
@@ -222,42 +215,4 @@ bool WinRWSocket::EventEmpty() {
return _event_set.empty();
}
-bool WinRWSocket::Decref(uint16_t err) {
- int16_t ref = _ref_count.fetch_sub(1);
- if (ref == 1 && IsShutdown()) {
- RWSocket::OnDisConnect(err);
- return false;
- }
- return true;
-}
-
-bool WinRWSocket::Recv(uint32_t len) {
- auto cppnet_base = _cppnet_base.lock();
- if (!cppnet_base) {
- return false;
- }
- if (len == 0) {
- LOG_ERROR("read invalid length. sock:%d", _sock);
- }
-
- _read_buffer->MoveWritePt(len);
- cppnet_base->OnRead(shared_from_this(), len);
- return true;
-}
-
-bool WinRWSocket::Send(uint32_t len) {
- auto cppnet_base = _cppnet_base.lock();
- if (!cppnet_base) {
- return false;
- }
- if (len > 0) {
- _write_buffer->MoveReadPt(len);
- cppnet_base->OnWrite(shared_from_this(), len);
- // do read again
- Read();
- }
-
- return true;
-}
-
}
\ No newline at end of file
diff --git a/cppnet/socket/win/win_rw_socket.h b/cppnet/socket/win/win_rw_socket.h
index 4042c3f..a9e5d1e 100644
--- a/cppnet/socket/win/win_rw_socket.h
+++ b/cppnet/socket/win/win_rw_socket.h
@@ -27,34 +27,25 @@ public:
virtual void Connect(const std::string& ip, uint16_t port);
virtual void Disconnect();
- virtual void OnRead(uint32_t len = 0);
virtual void OnRead(Event* event, uint32_t len = 0);
- virtual void OnWrite(uint32_t len = 0);
virtual void OnWrite(Event* event, uint32_t len = 0);
- virtual void OnDisConnect(uint16_t err);
+ virtual void OnConnect(Event* event, uint16_t err);
virtual void OnDisConnect(Event* event, uint16_t err);
- void Incref() { _ref_count.fetch_add(1); }
- bool Decref(uint16_t err = CEC_CLOSED);
-
void SetShutdown() { _shutdown = true; }
bool IsShutdown() { return _shutdown; }
+private:
void AddEvent(Event* event);
void RemvoeEvent(Event* event);
bool EventEmpty();
private:
- bool Recv(uint32_t len);
- bool Send(uint32_t len = 0);
-
-private:
- std::atomic_int16_t _ref_count;
std::atomic_bool _shutdown;
-
-private:
std::atomic_bool _is_reading;
+
std::mutex _event_mutex;
+ // all event
std::unordered_set _event_set;
};