modify iocp.

This commit is contained in:
caozhiyi
2021-05-27 14:04:26 +08:00
parent 136cf53624
commit b23eab1962
26 changed files with 163 additions and 120 deletions

View File

@@ -6,7 +6,6 @@
#include <errno.h>
#include "rw_socket.h"
#include "cppnet/dispatcher.h"
#include "cppnet/cppnet_base.h"
#include "cppnet/cppnet_config.h"
@@ -16,23 +15,24 @@
#include "common/log/log.h"
#include "common/alloter/pool_block.h"
#include "common/buffer/buffer_queue.h"
#include "common/alloter/pool_alloter.h"
#include "common/alloter/alloter_interface.h"
namespace cppnet {
RWSocket::RWSocket(std::shared_ptr<AlloterWrap> alloter):
Socket(alloter) {
_block_pool = _alloter->PoolNewSharePtr<BlockMemoryPool>(__mem_block_size, __mem_block_add_step);
//_write_buffer = _alloter->PoolNewSharePtr<BufferQueue>(_block_pool, _alloter);
_read_buffer = _alloter->PoolNewSharePtr<BufferQueue>(_block_pool, _alloter);
RWSocket::RWSocket():
RWSocket(0, std::make_shared<AlloterWrap>(MakePoolAlloterPtr())) {
}
RWSocket::RWSocket(std::shared_ptr<AlloterWrap> alloter):
RWSocket(0, alloter) {
}
RWSocket::RWSocket(uint64_t sock, std::shared_ptr<AlloterWrap> alloter):
Socket(sock, alloter) {
Socket(sock),
_alloter(alloter) {
_block_pool = _alloter->PoolNewSharePtr<BlockMemoryPool>(__mem_block_size, __mem_block_add_step);
//_write_buffer = _alloter->PoolNewSharePtr<BufferQueue>(_block_pool, _alloter);
_read_buffer = _alloter->PoolNewSharePtr<BufferQueue>(_block_pool, _alloter);
}
RWSocket::~RWSocket() {
@@ -45,7 +45,6 @@ RWSocket::~RWSocket() {
bool RWSocket::GetAddress(std::string& ip, uint16_t& port) {
ip = _addr.GetIp();
port = _addr.GetAddrPort();
return true;
}
@@ -143,7 +142,7 @@ void RWSocket::OnConnect(uint16_t err) {
}
void RWSocket::OnDisConnect(uint16_t err) {
auto sock = shared_from_this();
/*auto sock = shared_from_this();
#ifdef __win__
__all_socket_map.Erase(_sock);
#else
@@ -155,7 +154,7 @@ void RWSocket::OnDisConnect(uint16_t err) {
}
// not active disconnection
/*if (_event && !(_event->GetType() & ET_DISCONNECT)) {
if (_event && !(_event->GetType() & ET_DISCONNECT)) {
OsHandle::Close(_sock);
}*/
}

View File

@@ -11,8 +11,6 @@
namespace cppnet {
class Event;
class Dispatcher;
class BufferQueue;
class AlloterWrap;
class BlockMemoryPool;
@@ -23,6 +21,7 @@ class RWSocket:
public std::enable_shared_from_this<RWSocket> {
public:
RWSocket();
RWSocket(std::shared_ptr<AlloterWrap> alloter);
RWSocket(uint64_t sock, std::shared_ptr<AlloterWrap> alloter);
virtual ~RWSocket();
@@ -45,25 +44,19 @@ public:
virtual void OnConnect(uint16_t err);
virtual void OnDisConnect(uint16_t err);
std::shared_ptr<BufferQueue> GetReadBuffer() { return _read_buffer; }
#ifndef __win__
std::shared_ptr<BufferQueue> GetWriteBuffer() { return _write_buffer; }
#endif
virtual void SetShutdown() { }
virtual bool IsShutdown() { return false; }
virtual std::shared_ptr<BufferQueue> GetReadBuffer() { return nullptr; }
std::shared_ptr<AlloterWrap> GetAlloter() { return _alloter; }
std::shared_ptr<BlockMemoryPool> GetBlockMemoryPool() { return _block_pool; }
protected:
std::shared_ptr<AlloterWrap> _alloter;
std::shared_ptr<BlockMemoryPool> _block_pool;
std::shared_ptr<BufferQueue> _read_buffer;
#ifndef __win__
std::shared_ptr<Event> _event;
std::shared_ptr<BufferQueue> _write_buffer;
#endif
};
std::shared_ptr<RWSocket> MakeRWSocket(std::shared_ptr<AlloterWrap> alloter);
std::shared_ptr<RWSocket> MakeRWSocket(uint64_t sock, std::shared_ptr<AlloterWrap> alloter);
}
#endif

View File

@@ -8,9 +8,9 @@
namespace cppnet {
#ifdef __win__
ThreadSafeUnorderedMap<uint64_t, std::shared_ptr<Socket>> Socket::__all_socket_map;
ThreadSafeUnorderedMap<uint64_t, std::shared_ptr<Socket>> Socket::__all_socket_map;
#else
thread_local std::unordered_map<uint64_t, std::shared_ptr<Socket>> Socket::__all_socket_map;
thread_local std::unordered_map<uint64_t, std::shared_ptr<Socket>> Socket::__all_socket_map;
#endif
}

View File

@@ -8,28 +8,25 @@
#include <memory>
#include <cstdint>
#include <unordered_map>
#include "common/network/address.h"
#ifdef __win__
#include "common/structure/thread_safe_unordered_map.h"
#else
#include <unordered_map>
#endif
namespace cppnet {
class Buffer;
class Address;
class CppNetBase;
class Dispatcher;
class AlloterWrap;
class EventActions;
class Socket {
public:
Socket(): _sock(0), _addr() {}
Socket(std::shared_ptr<AlloterWrap> alloter):
_sock(0), _alloter(alloter) {}
Socket(uint64_t sock, std::shared_ptr<AlloterWrap> alloter):
_sock(sock), _alloter(alloter) {}
Socket(): _sock(0) {}
Socket(uint64_t sock): _sock(sock) {}
virtual ~Socket() {}
void SetSocket(const uint64_t& sock) { _sock = sock; }
@@ -47,15 +44,10 @@ public:
void SetDispatcher(std::shared_ptr<Dispatcher> dis) { _dispatcher = dis; }
std::shared_ptr<Dispatcher> GetDispatcher() { return _dispatcher.lock(); }
void SetAlloter(std::shared_ptr<AlloterWrap> alloter) { _alloter = alloter; }
std::shared_ptr<AlloterWrap> GetAlloter() { return _alloter; }
protected:
uint64_t _sock;
Address _addr;
Address _addr;
std::shared_ptr<AlloterWrap> _alloter;
std::weak_ptr<CppNetBase> _cppnet_base;
std::weak_ptr<EventActions> _event_actions;
std::weak_ptr<Dispatcher> _dispatcher;

View File

@@ -11,6 +11,7 @@
#include "cppnet/event/win/expend_func.h"
#include "cppnet/event/win/iocp_action.h"
#include "cppnet/event/win/accept_event.h"
#include "cppnet/socket/win/win_rw_socket.h"
#include "common/log/log.h"
#include "common/os/convert.h"
@@ -52,14 +53,15 @@ bool WinConnectSocket::Bind(const std::string& ip, uint16_t port) {
return false;
}
_sock = ret._return_value;
}
auto action = GetEventActions();
auto iocp = std::dynamic_pointer_cast<IOCPEventActions>(action);
if (!iocp->AddToIOCP(_sock)) {
LOG_FATAL("add accept socket to iocp failed!");
OsHandle::Close(_sock);
return false;
}
// add to iocp.
auto action = GetEventActions();
auto iocp = std::dynamic_pointer_cast<IOCPEventActions>(action);
if (!iocp->AddToIOCP(_sock)) {
LOG_FATAL("add accept socket to iocp failed!");
OsHandle::Close(_sock);
return false;
}
_addr.SetIp(ip);
@@ -163,6 +165,7 @@ void WinConnectSocket::OnAccept(Event* event) {
sock->SetEventActions(_event_actions);
sock->SetAddress(std::move(address));
sock->SetDispatcher(GetDispatcher());
auto buffer = sock->GetReadBuffer();
buffer->Write(accept_event->GetBuf(), accept_event->GetBufOffset());

View File

@@ -12,22 +12,16 @@
#include "common/log/log.h"
#include "common/buffer/buffer_queue.h"
#include "common/alloter/pool_alloter.h"
namespace cppnet {
std::shared_ptr<RWSocket> MakeRWSocket(std::shared_ptr<AlloterWrap> alloter) {
return std::make_shared<WinRWSocket>(alloter);
WinRWSocket::WinRWSocket():
RWSocket() {
}
std::shared_ptr<RWSocket> MakeRWSocket(uint64_t sock, std::shared_ptr<AlloterWrap> alloter) {
return std::make_shared<WinRWSocket>(sock, alloter);
}
WinRWSocket::WinRWSocket(std::shared_ptr<AlloterWrap> alloter):
RWSocket(alloter),
_shutdown(false),
_is_reading(false) {
WinRWSocket::WinRWSocket(std::shared_ptr<AlloterWrap> alloter):
RWSocket(alloter) {
}
WinRWSocket::WinRWSocket(uint64_t sock, std::shared_ptr<AlloterWrap> alloter):
@@ -50,15 +44,13 @@ void WinRWSocket::Read() {
auto rw_event = _alloter->PoolNew<Event>();
auto buffer = _alloter->PoolNewSharePtr<BufferQueue>(_block_pool, _alloter);
rw_event->SetBuffer(buffer);
auto event = dynamic_cast<Event*>(rw_event);
event->SetSocket(shared_from_this());
rw_event->SetSocket(shared_from_this());
auto actions = GetEventActions();
if (actions) {
if (actions->AddRecvEvent(event)) {
if (actions->AddRecvEvent(rw_event)) {
_is_reading = true;
AddEvent(event);
AddEvent(rw_event);
}
}
}
@@ -71,7 +63,6 @@ bool WinRWSocket::Write(const char* src, uint32_t len) {
// create new write event
auto rw_event = _alloter->PoolNew<Event>();
rw_event->SetBuffer(buffer);
rw_event->SetSocket(shared_from_this());
auto actions = GetEventActions();
@@ -199,6 +190,13 @@ void WinRWSocket::OnDisConnect(Event* event, uint16_t err) {
}
}
std::shared_ptr<BufferQueue> WinRWSocket::GetReadBuffer() {
if (!_read_buffer) {
_read_buffer = _alloter->PoolNewSharePtr<BufferQueue>(_block_pool, _alloter);
}
return _read_buffer;
}
void WinRWSocket::AddEvent(Event* event) {
std::lock_guard<std::mutex> lock(_event_mutex);
_event_set.insert(std::move(event));
@@ -215,4 +213,17 @@ bool WinRWSocket::EventEmpty() {
return _event_set.empty();
}
std::shared_ptr<RWSocket> MakeRWSocket() {
return std::make_shared<WinRWSocket>();
}
std::shared_ptr<RWSocket> MakeRWSocket(std::shared_ptr<AlloterWrap> alloter) {
return std::make_shared<WinRWSocket>(alloter);
}
std::shared_ptr<RWSocket> MakeRWSocket(uint64_t sock, std::shared_ptr<AlloterWrap> alloter) {
return std::make_shared<WinRWSocket>(sock, alloter);
}
}

View File

@@ -14,10 +14,12 @@
namespace cppnet {
class Event;
class AlloterWrap;
class WinRWSocket:
public RWSocket {
public:
WinRWSocket();
WinRWSocket(std::shared_ptr<AlloterWrap> alloter);
WinRWSocket(uint64_t sock, std::shared_ptr<AlloterWrap> alloter);
virtual ~WinRWSocket();
@@ -29,11 +31,12 @@ public:
virtual void OnRead(Event* event, uint32_t len = 0);
virtual void OnWrite(Event* event, uint32_t len = 0);
virtual void OnConnect(Event* event, uint16_t err);
virtual void OnDisConnect(Event* event, uint16_t err);
void SetShutdown() { _shutdown = true; }
bool IsShutdown() { return _shutdown; }
virtual void SetShutdown() { _shutdown = true; }
virtual bool IsShutdown() { return _shutdown; }
virtual std::shared_ptr<BufferQueue> GetReadBuffer();
private:
void AddEvent(Event* event);
@@ -44,11 +47,18 @@ private:
std::atomic_bool _shutdown;
std::atomic_bool _is_reading;
std::mutex _event_mutex;
// only need read cache. data to send is saved to event buffer.
std::shared_ptr<BufferQueue> _read_buffer;
// all event
std::mutex _event_mutex;
std::unordered_set<Event*> _event_set;
};
std::shared_ptr<RWSocket> MakeRWSocket();
std::shared_ptr<RWSocket> MakeRWSocket(std::shared_ptr<AlloterWrap> alloter);
std::shared_ptr<RWSocket> MakeRWSocket(uint64_t sock, std::shared_ptr<AlloterWrap> alloter);
}
#endif