resruct iocp event action.

This commit is contained in:
caozhiyi
2021-05-27 00:00:45 +08:00
parent 8b73906438
commit 136cf53624
15 changed files with 168 additions and 536 deletions

View File

@@ -27,14 +27,7 @@ ConnectSocket::ConnectSocket() {
}
ConnectSocket::~ConnectSocket() {
if (_sock > 0) {
#ifdef __win__
__all_socket_map.Erase(_sock);
#else
__all_socket_map.erase(_sock);
#endif
OsHandle::Close(_sock);
}
}
bool ConnectSocket::Bind(const std::string& ip, uint16_t port) {
@@ -77,6 +70,7 @@ bool ConnectSocket::Listen() {
return true;
}
/*
void ConnectSocket::Accept() {
if (!_accept_event) {
_accept_event = std::make_shared<Event>();
@@ -88,5 +82,6 @@ void ConnectSocket::Accept() {
actions->AddWinAcceptEvent(_accept_event);
}
}
*/
}

View File

@@ -25,12 +25,11 @@ public:
virtual bool Bind(const std::string& ip, uint16_t port);
virtual bool Listen();
virtual void Accept();
virtual void Accept() {}
virtual void Accept(uint16_t index) {}
virtual void Close() {}
virtual void OnAccept() {}
protected:
std::shared_ptr<Event> _accept_event;
virtual void OnAccept(Event* event) {}
};
std::shared_ptr<ConnectSocket> MakeConnectSocket();

View File

@@ -23,7 +23,7 @@ RWSocket::RWSocket(std::shared_ptr<AlloterWrap> alloter):
Socket(alloter) {
_block_pool = _alloter->PoolNewSharePtr<BlockMemoryPool>(__mem_block_size, __mem_block_add_step);
_write_buffer = _alloter->PoolNewSharePtr<BufferQueue>(_block_pool, _alloter);
//_write_buffer = _alloter->PoolNewSharePtr<BufferQueue>(_block_pool, _alloter);
_read_buffer = _alloter->PoolNewSharePtr<BufferQueue>(_block_pool, _alloter);
}
@@ -31,15 +31,15 @@ RWSocket::RWSocket(uint64_t sock, std::shared_ptr<AlloterWrap> alloter):
Socket(sock, alloter) {
_block_pool = _alloter->PoolNewSharePtr<BlockMemoryPool>(__mem_block_size, __mem_block_add_step);
_write_buffer = _alloter->PoolNewSharePtr<BufferQueue>(_block_pool, _alloter);
//_write_buffer = _alloter->PoolNewSharePtr<BufferQueue>(_block_pool, _alloter);
_read_buffer = _alloter->PoolNewSharePtr<BufferQueue>(_block_pool, _alloter);
}
RWSocket::~RWSocket() {
// free buffer early than pool
_write_buffer.reset();
_read_buffer.reset();
_block_pool.reset();
//_write_buffer.reset();
//_read_buffer.reset();
//_block_pool.reset();
}
bool RWSocket::GetAddress(std::string& ip, uint16_t& port) {
@@ -55,7 +55,7 @@ bool RWSocket::Close() {
}
void RWSocket::Read() {
if (!_event) {
/*if (!_event) {
_event = _alloter->PoolNewSharePtr<Event>();
_event->SetSocket(shared_from_this());
}
@@ -63,11 +63,11 @@ void RWSocket::Read() {
auto actions = GetEventActions();
if (actions) {
actions->AddRecvEvent(_event);
}
}*/
}
void RWSocket::Connect(const std::string& ip, uint16_t port) {
if (!_event) {
/*if (!_event) {
_event = _alloter->PoolNewSharePtr<Event>();
_event->SetSocket(shared_from_this());
}
@@ -88,11 +88,11 @@ void RWSocket::Connect(const std::string& ip, uint16_t port) {
auto actions = GetEventActions();
if (actions) {
actions->AddConnection(_event, _addr);
}
}*/
}
void RWSocket::Disconnect() {
if (!_event) {
/*if (!_event) {
_event = _alloter->PoolNewSharePtr<Event>();
_event->SetSocket(shared_from_this());
}
@@ -100,7 +100,7 @@ void RWSocket::Disconnect() {
auto actions = GetEventActions();
if (actions) {
actions->AddDisconnection(_event);
}
}*/
}
void RWSocket::OnTimer() {
@@ -155,9 +155,9 @@ void RWSocket::OnDisConnect(uint16_t err) {
}
// not active disconnection
if (_event && !(_event->GetType() & ET_DISCONNECT)) {
/*if (_event && !(_event->GetType() & ET_DISCONNECT)) {
OsHandle::Close(_sock);
}
}*/
}
}

View File

@@ -46,15 +46,19 @@ public:
virtual void OnDisConnect(uint16_t err);
std::shared_ptr<BufferQueue> GetReadBuffer() { return _read_buffer; }
#ifndef __win__
std::shared_ptr<BufferQueue> GetWriteBuffer() { return _write_buffer; }
#endif
protected:
std::shared_ptr<BlockMemoryPool> _block_pool;
std::shared_ptr<Event> _event;
std::shared_ptr<BufferQueue> _write_buffer;
std::shared_ptr<BufferQueue> _read_buffer;
#ifndef __win__
std::shared_ptr<Event> _event;
std::shared_ptr<BufferQueue> _write_buffer;
#endif
};
std::shared_ptr<RWSocket> MakeRWSocket(std::shared_ptr<AlloterWrap> alloter);

View File

@@ -9,9 +9,8 @@
#include "cppnet/cppnet_base.h"
#include "cppnet/socket/rw_socket.h"
#include "cppnet/event/win/expend_func.h"
#include "cppnet/event/win/accept_event.h"
#include "cppnet/event/action_interface.h"
#include "cppnet/event/win/iocp_action.h"
#include "cppnet/event/win/accept_event.h"
#include "common/log/log.h"
#include "common/os/convert.h"
@@ -30,8 +29,8 @@ std::shared_ptr<ConnectSocket> MakeConnectSocket() {
return std::make_shared<WinConnectSocket>();
}
WinConnectSocket::WinConnectSocket():
_in_actions(false) {
WinConnectSocket::WinConnectSocket() {
// create all accept event.
for (uint16_t i = 0; i < __iocp_accept_event_num; i++) {
auto event = new WinAcceptEvent(i);
_accept_event_vec.emplace_back(event);
@@ -39,7 +38,42 @@ WinConnectSocket::WinConnectSocket():
}
WinConnectSocket::~WinConnectSocket() {
__all_socket_map.Erase(_sock);
for (auto iter = _accept_event_vec.begin(); iter != _accept_event_vec.end(); iter++) {
delete *iter;
}
}
bool WinConnectSocket::Bind(const std::string& ip, uint16_t port) {
if (_sock == 0) {
auto ret = OsHandle::TcpSocket();
if (ret._return_value < 0) {
LOG_ERROR("create socket failed. errno:%d, info:%s", ret._errno, ErrnoInfo(ret._errno));
return false;
}
_sock = ret._return_value;
auto action = GetEventActions();
auto iocp = std::dynamic_pointer_cast<IOCPEventActions>(action);
if (!iocp->AddToIOCP(_sock)) {
LOG_FATAL("add accept socket to iocp failed!");
OsHandle::Close(_sock);
return false;
}
}
_addr.SetIp(ip);
_addr.SetAddrPort(port);
auto ret = OsHandle::Bind(_sock, _addr);
if (ret._return_value < 0) {
LOG_FATAL("window bind socket filed! error:%d, info:%s", ret._errno, ErrnoInfo(ret._errno));
OsHandle::Close(_sock);
return false;
}
return true;
}
void WinConnectSocket::Accept() {
@@ -68,12 +102,16 @@ void WinConnectSocket::Accept(uint16_t index) {
auto actions = GetEventActions();
if (actions) {
auto iocp = std::dynamic_pointer_cast<IOCPEventActions>(actions);
iocp->AddWinAcceptEvent(event);
actions->AddAcceptEvent(event);
}
}
void WinConnectSocket::OnAccept(WinAcceptEvent* event) {
void WinConnectSocket::Close() {
__all_socket_map.Erase(_sock);
OsHandle::Close(_sock);
}
void WinConnectSocket::OnAccept(Event* event) {
auto cppnet_base = _cppnet_base.lock();
if (!cppnet_base) {
return;
@@ -84,18 +122,18 @@ void WinConnectSocket::OnAccept(WinAcceptEvent* event) {
SOCKADDR_STORAGE* LocalAddr = NULL;
int localLen = sizeof(SOCKADDR_STORAGE);
auto accept_event = dynamic_cast<WinAcceptEvent*>(event);
// accept a socket and read msg
AcceptExSockAddrs(event->GetBuf(), __iocp_buff_size - ((sizeof(SOCKADDR_STORAGE) + 16) * 2),
AcceptExSockAddrs(accept_event->GetBuf(), __iocp_buff_size - ((sizeof(SOCKADDR_STORAGE) + 16) * 2),
sizeof(SOCKADDR_STORAGE) + 16, sizeof(SOCKADDR_STORAGE) + 16, (LPSOCKADDR*)&LocalAddr, &localLen, (LPSOCKADDR*)&client_addr, &remote_len);
// Does this call have any effect ?
setsockopt(event->GetClientSocket(), SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
setsockopt(accept_event->GetClientSocket(), SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT,
(char *)&_sock, sizeof(_sock));
// create a new rw socket
std::shared_ptr<AlloterWrap> alloter = std::make_shared<AlloterWrap>(MakePoolAlloterPtr());
Address address;
SOCKADDR* addr_pt = (SOCKADDR*)client_addr;
void* addr = nullptr;
switch (addr_pt->sa_family) {
@@ -117,29 +155,31 @@ void WinConnectSocket::OnAccept(WinAcceptEvent* event) {
inet_ntop(AF_INET6, addr, str_addr, sizeof(str_addr));
address.SetIp(str_addr);
auto sock = MakeRWSocket(event->GetClientSocket(), std::move(alloter));
// create a new socket
std::shared_ptr<AlloterWrap> alloter = std::make_shared<AlloterWrap>(MakePoolAlloterPtr());
auto sock = MakeRWSocket(accept_event->GetClientSocket(), std::move(alloter));
sock->SetCppNetBase(cppnet_base);
sock->SetEventActions(_event_actions);
sock->SetAddress(std::move(address));
sock->SetDispatcher(GetDispatcher());
auto buffer = sock->GetReadBuffer();
buffer->Write(accept_event->GetBuf(), accept_event->GetBufOffset());
// add socket to iocp
auto action = GetEventActions();
auto iocp = std::dynamic_pointer_cast<IOCPEventActions>(action);
iocp->AddToIOCP(event->GetClientSocket());
__all_socket_map[event->GetClientSocket()] = sock;
auto buffer = sock->GetReadBuffer();
buffer->Write(event->GetBuf(), event->GetBufOffset());
iocp->AddToIOCP(accept_event->GetClientSocket());
// add socket global cache.
__all_socket_map[accept_event->GetClientSocket()] = sock;
// call accept call back function
cppnet_base->OnAccept(sock);
cppnet_base->OnRead(sock, event->GetBufOffset());
cppnet_base->OnRead(sock, accept_event->GetBufOffset());
//post accept again
Accept(event->GetIndex());
Accept(accept_event->GetIndex());
// wait for read
sock->Read();

View File

@@ -18,17 +18,15 @@ public:
WinConnectSocket();
~WinConnectSocket();
virtual bool Bind(const std::string& ip, uint16_t port);
virtual void Accept();
void Accept(uint16_t index);
virtual void Accept(uint16_t index);
virtual void Close();
void OnAccept(WinAcceptEvent* event);
void SetInActions(bool in) { _in_actions = in; }
bool GetInActions() { return _in_actions; }
virtual void OnAccept(Event* event);
private:
std::vector<Event*> _accept_event_vec;
bool _in_actions;
};
}

View File

@@ -7,9 +7,8 @@
#include "cppnet/cppnet_base.h"
#include "cppnet/cppnet_config.h"
#include "cppnet/event/win/rw_event.h"
#include "cppnet/event/event_interface.h"
#include "cppnet/event/action_interface.h"
#include "cppnet/event/win/iocp_action.h"
#include "common/log/log.h"
#include "common/buffer/buffer_queue.h"
@@ -26,7 +25,6 @@ std::shared_ptr<RWSocket> MakeRWSocket(uint64_t sock, std::shared_ptr<AlloterWra
WinRWSocket::WinRWSocket(std::shared_ptr<AlloterWrap> alloter):
RWSocket(alloter),
_ref_count(0),
_shutdown(false),
_is_reading(false) {
@@ -34,7 +32,6 @@ WinRWSocket::WinRWSocket(std::shared_ptr<AlloterWrap> alloter):
WinRWSocket::WinRWSocket(uint64_t sock, std::shared_ptr<AlloterWrap> alloter):
RWSocket(sock, alloter),
_ref_count(0),
_shutdown(false),
_is_reading(false) {
@@ -50,7 +47,7 @@ void WinRWSocket::Read() {
return;
}
auto rw_event = _alloter->PoolNew<WinRWEvent>();
auto rw_event = _alloter->PoolNew<Event>();
auto buffer = _alloter->PoolNewSharePtr<BufferQueue>(_block_pool, _alloter);
rw_event->SetBuffer(buffer);
@@ -59,8 +56,7 @@ void WinRWSocket::Read() {
auto actions = GetEventActions();
if (actions) {
auto iocp = std::dynamic_pointer_cast<IOCPEventActions>(actions);
if (iocp->AddRecvEvent(event)) {
if (actions->AddRecvEvent(event)) {
_is_reading = true;
AddEvent(event);
}
@@ -68,22 +64,20 @@ void WinRWSocket::Read() {
}
bool WinRWSocket::Write(const char* src, uint32_t len) {
auto rw_event = _alloter->PoolNew<WinRWEvent>();
// create new write buffer
auto buffer = _alloter->PoolNewSharePtr<BufferQueue>(_block_pool, _alloter);
rw_event->SetBuffer(buffer);
buffer->Write(src, len);
auto event = dynamic_cast<Event*>(rw_event);
event->SetSocket(shared_from_this());
// create new write event
auto rw_event = _alloter->PoolNew<Event>();
rw_event->SetBuffer(buffer);
rw_event->SetSocket(shared_from_this());
auto actions = GetEventActions();
if (actions) {
auto iocp = std::dynamic_pointer_cast<IOCPEventActions>(actions);
if (iocp->AddSendEvent(event)) {
std::lock_guard<std::mutex> lock(_event_mutex);
_event_set.insert(event);
if (actions->AddSendEvent(rw_event)) {
AddEvent(rw_event);
return true;
}
}
@@ -91,29 +85,39 @@ bool WinRWSocket::Write(const char* src, uint32_t len) {
}
void WinRWSocket::Connect(const std::string& ip, uint16_t port) {
if (!_event) {
_event = _alloter->PoolNewSharePtr<WinRWEvent>();
_event->SetSocket(shared_from_this());
if (_sock == 0) {
auto ret = OsHandle::TcpSocket();
if (ret._return_value < 0) {
LOG_ERROR("create socket failed. error:%d", ret._errno);
return;
}
_sock = ret._return_value;
}
RWSocket::Connect(ip, port);
}
void WinRWSocket::Disconnect() {
if (!_event) {
_event = _alloter->PoolNewSharePtr<Event>();
_event->SetSocket(shared_from_this());
}
_addr.SetIp(ip);
_addr.SetAddrPort(port);
auto rw_event = _alloter->PoolNew<Event>();
rw_event->SetSocket(shared_from_this());
auto actions = GetEventActions();
if (actions) {
if (actions->AddDisconnection(_event)) {
Incref();
if (actions->AddConnection(rw_event, _addr)) {
AddEvent(rw_event);
}
}
}
void WinRWSocket::OnRead(uint32_t len) {
// do nothind
void WinRWSocket::Disconnect() {
auto rw_event = _alloter->PoolNew<Event>();
rw_event->SetSocket(shared_from_this());
auto actions = GetEventActions();
if (actions) {
if (actions->AddDisconnection(rw_event)) {
AddEvent(rw_event);
}
}
}
void WinRWSocket::OnRead(Event* event, uint32_t len) {
@@ -126,8 +130,7 @@ void WinRWSocket::OnRead(Event* event, uint32_t len) {
return;
}
auto rw_event = dynamic_cast<WinRWEvent*>(event);
auto rw_event = dynamic_cast<Event*>(event);
auto buffer = rw_event->GetBuffer();
buffer->MoveWritePt(len);
@@ -151,10 +154,6 @@ void WinRWSocket::OnRead(Event* event, uint32_t len) {
Read();
}
void WinRWSocket::OnWrite(uint32_t len) {
// do nothing
}
void WinRWSocket::OnWrite(Event* event, uint32_t len) {
auto cppnet_base = _cppnet_base.lock();
if (!cppnet_base) {
@@ -165,7 +164,7 @@ void WinRWSocket::OnWrite(Event* event, uint32_t len) {
return;
}
auto rw_event = dynamic_cast<WinRWEvent*>(event);
auto rw_event = dynamic_cast<Event*>(event);
auto buffer = rw_event->GetBuffer();
buffer->MoveReadPt(len);
@@ -175,8 +174,7 @@ void WinRWSocket::OnWrite(Event* event, uint32_t len) {
if (buffer->GetCanReadLength() > 0) {
auto actions = GetEventActions();
if (actions) {
auto iocp = std::dynamic_pointer_cast<IOCPEventActions>(actions);
if (iocp->AddSendEvent(event)) {
if (actions->AddSendEvent(event)) {
LOG_ERROR("post send event. sock:%d", _sock);
}
}
@@ -186,22 +184,17 @@ void WinRWSocket::OnWrite(Event* event, uint32_t len) {
}
}
void WinRWSocket::OnDisConnect(uint16_t err) {
// do nothing
}
void WinRWSocket::OnDisConnect(Event* event, uint16_t err) {
RemvoeEvent(event);
if (EventEmpty() && IsShutdown()) {
auto sock = shared_from_this();
__all_socket_map.Erase(_sock);
auto cppnet_base = _cppnet_base.lock();
if (cppnet_base) {
cppnet_base->OnDisConnect(sock, err);
}
__all_socket_map.Erase(_sock);
OsHandle::Close(_sock);
}
}
@@ -222,42 +215,4 @@ bool WinRWSocket::EventEmpty() {
return _event_set.empty();
}
bool WinRWSocket::Decref(uint16_t err) {
int16_t ref = _ref_count.fetch_sub(1);
if (ref == 1 && IsShutdown()) {
RWSocket::OnDisConnect(err);
return false;
}
return true;
}
bool WinRWSocket::Recv(uint32_t len) {
auto cppnet_base = _cppnet_base.lock();
if (!cppnet_base) {
return false;
}
if (len == 0) {
LOG_ERROR("read invalid length. sock:%d", _sock);
}
_read_buffer->MoveWritePt(len);
cppnet_base->OnRead(shared_from_this(), len);
return true;
}
bool WinRWSocket::Send(uint32_t len) {
auto cppnet_base = _cppnet_base.lock();
if (!cppnet_base) {
return false;
}
if (len > 0) {
_write_buffer->MoveReadPt(len);
cppnet_base->OnWrite(shared_from_this(), len);
// do read again
Read();
}
return true;
}
}

View File

@@ -27,34 +27,25 @@ public:
virtual void Connect(const std::string& ip, uint16_t port);
virtual void Disconnect();
virtual void OnRead(uint32_t len = 0);
virtual void OnRead(Event* event, uint32_t len = 0);
virtual void OnWrite(uint32_t len = 0);
virtual void OnWrite(Event* event, uint32_t len = 0);
virtual void OnDisConnect(uint16_t err);
virtual void OnConnect(Event* event, uint16_t err);
virtual void OnDisConnect(Event* event, uint16_t err);
void Incref() { _ref_count.fetch_add(1); }
bool Decref(uint16_t err = CEC_CLOSED);
void SetShutdown() { _shutdown = true; }
bool IsShutdown() { return _shutdown; }
private:
void AddEvent(Event* event);
void RemvoeEvent(Event* event);
bool EventEmpty();
private:
bool Recv(uint32_t len);
bool Send(uint32_t len = 0);
private:
std::atomic_int16_t _ref_count;
std::atomic_bool _shutdown;
private:
std::atomic_bool _is_reading;
std::mutex _event_mutex;
// all event
std::unordered_set<Event*> _event_set;
};