diff --git a/CppNet.vcxproj b/CppNet.vcxproj index bdde085..8117de2 100644 --- a/CppNet.vcxproj +++ b/CppNet.vcxproj @@ -123,9 +123,10 @@ - + + @@ -151,8 +152,8 @@ - + diff --git a/CppNet.vcxproj.filters b/CppNet.vcxproj.filters index f53737e..1fd3a92 100644 --- a/CppNet.vcxproj.filters +++ b/CppNet.vcxproj.filters @@ -66,7 +66,10 @@ net\linux - + + net + + net\linux @@ -140,8 +143,8 @@ net\linux - - net\linux + + net \ No newline at end of file diff --git a/CppNet_client.vcxproj b/CppNet_client.vcxproj index 46581af..50fe4dc 100644 --- a/CppNet_client.vcxproj +++ b/CppNet_client.vcxproj @@ -29,6 +29,7 @@ + @@ -55,6 +56,7 @@ + @@ -133,6 +135,7 @@ 260046848 + /FS %(AdditionalOptions) diff --git a/CppNet_client.vcxproj.filters b/CppNet_client.vcxproj.filters index aa6a6ba..c4020d2 100644 --- a/CppNet_client.vcxproj.filters +++ b/CppNet_client.vcxproj.filters @@ -69,6 +69,9 @@ net\linux + + net + @@ -140,5 +143,8 @@ net\linux + + net + \ No newline at end of file diff --git a/LinuxCppNet.vcxproj b/LinuxCppNet.vcxproj index 209e5c6..770005d 100644 --- a/LinuxCppNet.vcxproj +++ b/LinuxCppNet.vcxproj @@ -71,6 +71,7 @@ + @@ -93,6 +94,7 @@ + diff --git a/LinuxCppNet.vcxproj.filters b/LinuxCppNet.vcxproj.filters index f2667a8..03e1fe7 100644 --- a/LinuxCppNet.vcxproj.filters +++ b/LinuxCppNet.vcxproj.filters @@ -54,6 +54,9 @@ net\linux + + net + @@ -119,5 +122,8 @@ net\linux + + net + \ No newline at end of file diff --git a/base/Buffer.cpp b/base/Buffer.cpp index c758668..2146b15 100644 --- a/base/Buffer.cpp +++ b/base/Buffer.cpp @@ -4,7 +4,7 @@ #include "LoopBuffer.h" #include "Log.h" -CBuffer::CBuffer(std::shared_ptr& pool) : +CBuffer::CBuffer(std::shared_ptr& pool) : _pool(pool), _buffer_num(0), _buffer_start(nullptr), diff --git a/base/Buffer.h b/base/Buffer.h index 3f514c8..91f3f9d 100644 --- a/base/Buffer.h +++ b/base/Buffer.h @@ -12,11 +12,11 @@ static const int __max_node_size = 12; class CLoopBuffer; -class CMemaryPool; +class CMemoryPool; class CBuffer { public: - CBuffer(std::shared_ptr& pool); + CBuffer(std::shared_ptr& pool); ~CBuffer(); int Read(char* res, int len); @@ -58,6 +58,6 @@ public: CLoopBuffer* _buffer_write; std::mutex _mutex; - std::shared_ptr _pool; + std::shared_ptr _pool; }; #endif \ No newline at end of file diff --git a/base/Log.cpp b/base/Log.cpp index cd2af5f..8e7e531 100644 --- a/base/Log.cpp +++ b/base/Log.cpp @@ -22,7 +22,9 @@ void CLog::Run() { _log_file << log << std::endl; } _pool.PoolLargeFree(log); - } + } else { + break; + } } } diff --git a/base/Log.h b/base/Log.h index d989c1f..299477c 100644 --- a/base/Log.h +++ b/base/Log.h @@ -46,13 +46,13 @@ private: std::fstream _log_file; int _log_level; int _cur_date; - CMemaryPool _pool; + CMemoryPool _pool; }; -#define LOG_DEBUG(log, ...) CLog::Instance().LogDebug(__FILE__, __LINE__, log, __VA_ARGS__); -#define LOG_INFO(log, ...) CLog::Instance().LogInfo(__FILE__, __LINE__, log, __VA_ARGS__); -#define LOG_WARN(log, ...) CLog::Instance().LogWarn(__FILE__, __LINE__, log, __VA_ARGS__); -#define LOG_ERROR(log, ...) CLog::Instance().LogError(__FILE__, __LINE__, log, __VA_ARGS__); -#define LOG_FATAL(log, ...) CLog::Instance().LogFatal(__FILE__, __LINE__, log, __VA_ARGS__); +#define LOG_DEBUG(log, ...) CLog::Instance().LogDebug(__FILE__, __LINE__, log, ##__VA_ARGS__); +#define LOG_INFO(log, ...) CLog::Instance().LogInfo(__FILE__, __LINE__, log, ##__VA_ARGS__); +#define LOG_WARN(log, ...) CLog::Instance().LogWarn(__FILE__, __LINE__, log, ##__VA_ARGS__); +#define LOG_ERROR(log, ...) CLog::Instance().LogError(__FILE__, __LINE__, log, ##__VA_ARGS__); +#define LOG_FATAL(log, ...) CLog::Instance().LogFatal(__FILE__, __LINE__, log, ##__VA_ARGS__); #endif \ No newline at end of file diff --git a/base/LoopBuffer.cpp b/base/LoopBuffer.cpp index 8cd1aba..f3f0e48 100644 --- a/base/LoopBuffer.cpp +++ b/base/LoopBuffer.cpp @@ -5,7 +5,7 @@ #include "MemaryPool.h" -CLoopBuffer::CLoopBuffer(std::shared_ptr& pool, int index) : +CLoopBuffer::CLoopBuffer(std::shared_ptr& pool, int index) : _can_read(false), _next(nullptr), _pool(pool), @@ -18,7 +18,7 @@ CLoopBuffer::CLoopBuffer(std::shared_ptr& pool, int index) : } -CLoopBuffer::CLoopBuffer(std::shared_ptr& pool, int size, int index) : +CLoopBuffer::CLoopBuffer(std::shared_ptr& pool, int size, int index) : _can_read(false), _next(nullptr), _pool(pool), diff --git a/base/LoopBuffer.h b/base/LoopBuffer.h index 04f0e9c..6905558 100644 --- a/base/LoopBuffer.h +++ b/base/LoopBuffer.h @@ -2,12 +2,12 @@ #define HEADER_LOOPBUFFER #include #include -class CMemaryPool; +class CMemoryPool; class CLoopBuffer { public: //maybe throw exception - explicit CLoopBuffer(std::shared_ptr& pool, int index); - explicit CLoopBuffer(std::shared_ptr& pool, int size, int index); + explicit CLoopBuffer(std::shared_ptr& pool, int index); + explicit CLoopBuffer(std::shared_ptr& pool, int size, int index); ~CLoopBuffer(); int Read(char* res, int len); @@ -59,7 +59,7 @@ private: std::mutex _mutex; CLoopBuffer* _next; //point to next node - std::shared_ptr _pool; + std::shared_ptr _pool; }; #endif diff --git a/base/MemaryPool.cpp b/base/MemaryPool.cpp index 311d251..19f9b4d 100644 --- a/base/MemaryPool.cpp +++ b/base/MemaryPool.cpp @@ -5,7 +5,7 @@ #include using namespace std; -CMemaryPool::CMemaryPool() { +CMemoryPool::CMemoryPool() { for (int i = 0; i < __number_of_free_lists; i++) { _free_list[i] = nullptr; } @@ -14,7 +14,7 @@ CMemaryPool::CMemaryPool() { _create_thread_id = std::this_thread::get_id(); } -CMemaryPool::CMemaryPool(const int large_sz, const int add_num) : _large_size(RoundUp(large_sz)), _number_large_add_nodes(add_num){ +CMemoryPool::CMemoryPool(const int large_sz, const int add_num) : _large_size(RoundUp(large_sz)), _number_large_add_nodes(add_num){ for (int i = 0; i < __number_of_free_lists; i++) { _free_list[i] = nullptr; } @@ -23,7 +23,7 @@ CMemaryPool::CMemaryPool(const int large_sz, const int add_num) : _large_size(Ro _create_thread_id = std::this_thread::get_id(); } -CMemaryPool::~CMemaryPool() { +CMemoryPool::~CMemoryPool() { //assert(_create_thread_id == std::this_thread::get_id()); for (auto iter = _malloc_vec.begin(); iter != _malloc_vec.end(); ++iter) { if (*iter) { @@ -32,15 +32,15 @@ CMemaryPool::~CMemaryPool() { } } -std::thread::id CMemaryPool::GetCreateThreadId() { +std::thread::id CMemoryPool::GetCreateThreadId() { return _create_thread_id; } -int CMemaryPool::GetLargeSize() const { +int CMemoryPool::GetLargeSize() const { return _large_size; } -void* CMemaryPool::ReFill(int size, int num, bool is_large) { +void* CMemoryPool::ReFill(int size, int num, bool is_large) { int nums = num; char* chunk = nullptr; @@ -98,7 +98,7 @@ void* CMemaryPool::ReFill(int size, int num, bool is_large) { return res; } -void* CMemaryPool::ChunkAlloc(int size, int& nums, bool is_large) { +void* CMemoryPool::ChunkAlloc(int size, int& nums, bool is_large) { char* res; int need_bytes = size * nums; int left_bytes = _pool_end - _pool_start; diff --git a/base/MemaryPool.h b/base/MemaryPool.h index 373fcf4..9b01b61 100644 --- a/base/MemaryPool.h +++ b/base/MemaryPool.h @@ -15,12 +15,12 @@ static const int __max_bytes = 256; static const int __number_of_free_lists = __max_bytes / __align; static const int __number_add_nodes = 20; -class CMemaryPool { +class CMemoryPool { public: - CMemaryPool(); + CMemoryPool(); //bulk memory size. everytime add nodes num - CMemaryPool(const int large_sz, const int add_num); - ~CMemaryPool(); + CMemoryPool(const int large_sz, const int add_num); + ~CMemoryPool(); //for object. invocation of constructors and destructors template @@ -82,7 +82,7 @@ private: }; template -T* CMemaryPool::PoolNew(Args&&... args) { +T* CMemoryPool::PoolNew(Args&&... args) { int sz = sizeof(T); if (sz > __max_bytes) { void* bytes = malloc(sz); @@ -104,7 +104,7 @@ T* CMemaryPool::PoolNew(Args&&... args) { } template -void CMemaryPool::PoolDelete(T* &c) { +void CMemoryPool::PoolDelete(T* &c) { if (!c) { return; } @@ -127,7 +127,7 @@ void CMemaryPool::PoolDelete(T* &c) { } template -T* CMemaryPool::PoolMalloc(int sz) { +T* CMemoryPool::PoolMalloc(int sz) { if (sz > __max_bytes) { void* bytes = malloc(sz); memset(bytes, 0, sz); @@ -149,7 +149,7 @@ T* CMemaryPool::PoolMalloc(int sz) { } template -void CMemaryPool::PoolFree(T* &m, int len) { +void CMemoryPool::PoolFree(T* &m, int len) { if (!m) { return; } @@ -170,7 +170,7 @@ void CMemaryPool::PoolFree(T* &m, int len) { } template -T* CMemaryPool::PoolLargeMalloc() { +T* CMemoryPool::PoolLargeMalloc() { if (_number_large_add_nodes == 0 || _large_size == 0) { throw std::exception(std::logic_error("Large block of memory is not set!")); return nullptr; @@ -195,7 +195,7 @@ T* CMemaryPool::PoolLargeMalloc() { } template -void CMemaryPool::PoolLargeFree(T* &m) { +void CMemoryPool::PoolLargeFree(T* &m) { if (!m) { return; } @@ -214,7 +214,7 @@ void CMemaryPool::PoolLargeFree(T* &m) { } template -T* CMemaryPool::PoolLargeMalloc(int size, int& res) { +T* CMemoryPool::PoolLargeMalloc(int size, int& res) { if (_number_large_add_nodes == 0 || _large_size == 0) { throw std::exception(std::logic_error("Large block of memory is not set!")); return nullptr; @@ -241,7 +241,7 @@ T* CMemaryPool::PoolLargeMalloc(int size, int& res) { } template -void CMemaryPool::PoolLargeFree(T* &m, int size) { +void CMemoryPool::PoolLargeFree(T* &m, int size) { if (!m) { return; } diff --git a/base/PoolSharedPtr.h b/base/PoolSharedPtr.h index c56ab2e..74b2d14 100644 --- a/base/PoolSharedPtr.h +++ b/base/PoolSharedPtr.h @@ -2,6 +2,7 @@ #define HEADER_CPOOLSHAREDPTR #include #include +#include #include "MemaryPool.h" @@ -68,7 +69,7 @@ private: }; template -inline void EnableShared(T *ptr, CRefCount *ref_ptr, CMemaryPool* pool, int size = 0, MemoryType type = TYPE_NEW); +inline void EnableShared(T *ptr, CRefCount *ref_ptr, CMemoryPool* pool, int size = 0, MemoryType type = TYPE_NEW); // base class for CMemSharePtr and CMemWeakPtr template @@ -80,7 +81,7 @@ public: CBasePtr() noexcept : _ptr(nullptr), _ref_count(nullptr), _pool(nullptr) { EnableShared(_ptr, _ref_count, _pool); } - CBasePtr(T* ptr, CRefCount* ref, CMemaryPool* pool, MemoryType type, int large_size = 0) noexcept : _ptr(ptr), _ref_count(ref), _pool(pool), _memory_type(type), _malloc_size(large_size) { + CBasePtr(T* ptr, CRefCount* ref, CMemoryPool* pool, MemoryType type, int large_size = 0) noexcept : _ptr(ptr), _ref_count(ref), _pool(pool), _memory_type(type), _malloc_size(large_size) { EnableShared(_ptr, _ref_count, _pool, _malloc_size, _memory_type); } @@ -164,7 +165,7 @@ public: } // release resource and take _Other_ptr through _Other_rep - void Reset(T *other_ptr, CRefCount * other_rep, CMemaryPool* pool) { + void Reset(T *other_ptr, CRefCount * other_rep, CMemoryPool* pool) { _Reset0(other_ptr, other_rep, pool); } @@ -178,11 +179,11 @@ public: Resetw(other._ptr, other._ref_count, other._pool); } - void Resetw(T *other_ptr, CRefCount *other_rep, CMemaryPool* pool) { + void Resetw(T *other_ptr, CRefCount *other_rep, CMemoryPool* pool) { _Resetw0(other_ptr, other_rep, pool); } - void Resetw(T *other_ptr, CRefCount *other_rep, CMemaryPool* pool, int size, MemoryType type) { + void Resetw(T *other_ptr, CRefCount *other_rep, CMemoryPool* pool, int size, MemoryType type) { _Resetw0(other_ptr, other_rep, pool, type, size); } @@ -208,7 +209,7 @@ public: } // release resource and take new resource - void _Reset0(T *other_ptr, CRefCount *other_rep, CMemaryPool* pool, int size = 0, MemoryType type = TYPE_NEW) { + void _Reset0(T *other_ptr, CRefCount *other_rep, CMemoryPool* pool, int size = 0, MemoryType type = TYPE_NEW) { if (other_rep) { other_rep->IncrefUse(); } @@ -259,7 +260,7 @@ public: } // release resource and take new resource - void _Resetw0(T *other_ptr, CRefCount *other_rep, CMemaryPool* pool) { + void _Resetw0(T *other_ptr, CRefCount *other_rep, CMemoryPool* pool) { if (other_rep) { other_rep->IncrefWeak(); } @@ -274,7 +275,7 @@ public: } // release resource and take new resource - void _Resetw0(T *other_ptr, CRefCount *other_rep, CMemaryPool* pool, MemoryType type, int size) { + void _Resetw0(T *other_ptr, CRefCount *other_rep, CMemoryPool* pool, MemoryType type, int size) { if (other_rep) { other_rep->IncrefWeak(); } @@ -321,7 +322,7 @@ public: protected: T *_ptr; CRefCount *_ref_count; - CMemaryPool *_pool; + CMemoryPool *_pool; int _malloc_size; MemoryType _memory_type; @@ -336,7 +337,8 @@ public: typedef CBasePtr _BasePtr; // construct CMemSharePtr() noexcept : _BasePtr() {} - CMemSharePtr(T* ptr, CRefCount* ref, CMemaryPool* pool, MemoryType type, int large_size = 0) noexcept : _BasePtr(ptr, ref, pool, type, large_size) {} + CMemSharePtr(nullptr_t) noexcept : _BasePtr() {} + CMemSharePtr(T* ptr, CRefCount* ref, CMemoryPool* pool, MemoryType type, int large_size = 0) noexcept : _BasePtr(ptr, ref, pool, type, large_size) {} CMemSharePtr(const _BasePtr& r) : _BasePtr(r) {} CMemSharePtr(_BasePtr&& r) : _BasePtr(r) {} @@ -457,13 +459,13 @@ protected: private: template - friend void DoEnable(T1 *ptr, CEnableSharedFromThis *es, CRefCount *ref_ptr, CMemaryPool* pool = 0, int size = 0, MemoryType type = TYPE_NEW); + friend void DoEnable(T1 *ptr, CEnableSharedFromThis *es, CRefCount *ref_ptr, CMemoryPool* pool, int size, MemoryType type); CMemWeakPtr _weak_ptr; }; // reset internal weak pointer template -inline void DoEnable(T1 *ptr, CEnableSharedFromThis *es, CRefCount *ref_ptr, CMemaryPool* pool, int size, MemoryType type) { +inline void DoEnable(T1 *ptr, CEnableSharedFromThis *es, CRefCount *ref_ptr, CMemoryPool* pool = nullptr, int size = 0, MemoryType type = TYPE_NEW) { es->_weak_ptr.Resetw(ptr, ref_ptr, pool, size, type); } @@ -477,7 +479,7 @@ struct has_member_weak_ptr { }; template -inline void EnableShared(T *ptr, CRefCount *ref_ptr, CMemaryPool* pool, int size, MemoryType type) { +inline void EnableShared(T *ptr, CRefCount *ref_ptr, CMemoryPool* pool, int size, MemoryType type) { if (ptr) { if (has_member_weak_ptr::value > 0) { DoEnable(ptr, (CEnableSharedFromThis*)ptr, ref_ptr, pool, size, type); @@ -486,28 +488,28 @@ inline void EnableShared(T *ptr, CRefCount *ref_ptr, CMemaryPool* pool, int size } template -CMemSharePtr MakeNewSharedPtr(CMemaryPool* pool, Args&&... args) { +CMemSharePtr MakeNewSharedPtr(CMemoryPool* pool, Args&&... args) { T* o = pool->PoolNew(std::forward(args)...); CRefCount* ref = pool->PoolNew(); return CMemSharePtr(o, ref, pool, TYPE_NEW); } template -CMemSharePtr MakeMallocSharedPtr(CMemaryPool* pool, int size) { +CMemSharePtr MakeMallocSharedPtr(CMemoryPool* pool, int size) { T* o = (T*)pool->PoolMalloc(size); CRefCount* ref = pool->PoolNew(); return CMemSharePtr(o, ref, pool, TYPE_MALLOC, size); } template -CMemSharePtr MakeLargeSharedPtr(CMemaryPool* pool) { +CMemSharePtr MakeLargeSharedPtr(CMemoryPool* pool) { T* o = pool->PoolLargeMalloc(); CRefCount* ref = pool->PoolNew(); return CMemSharePtr(o, ref, pool, TYPE_LARGE); } template -CMemSharePtr MakeLargeSharedPtr(CMemaryPool* pool, int size) { +CMemSharePtr MakeLargeSharedPtr(CMemoryPool* pool, int size) { T* o = pool->PoolLargeMalloc(size); CRefCount* ref = pool->PoolNew(); return CMemSharePtr(o, ref, pool, TYPE_LARGE_WITH_SIZE, size); diff --git a/base/TimeTool.h b/base/TimeTool.h index 7c7f317..a5535c3 100644 --- a/base/TimeTool.h +++ b/base/TimeTool.h @@ -3,7 +3,7 @@ #include -class CMemaryPool; +class CMemoryPool; class CTimeTool { public: diff --git a/net/AcceptSocket.h b/net/AcceptSocket.h index 3c85a33..add2543 100644 --- a/net/AcceptSocket.h +++ b/net/AcceptSocket.h @@ -9,9 +9,9 @@ class CEventHandler; class CAcceptEventHandler; -class CMemaryPool; +class CMemoryPool; -class CAcceptSocket : public CSocketBase { +class CAcceptSocket : public CSocketBase, public CEnableSharedFromThis { public: CAcceptSocket(std::shared_ptr& event_actions); ~CAcceptSocket(); @@ -20,14 +20,9 @@ public: bool Listen(unsigned int listen_size); -#ifndef __linux__ - void SyncAccept(const std::function&, int error)>& accept_back = nullptr, - const std::function&, int error)>& read_back = nullptr); - void SetReadCallBack(const std::function&, int error)>& call_back); -#else - void SyncAccept(const std::function&, int error)>& call_back = nullptr); -#endif + void SyncAccept(); + void SetReadCallBack(const std::function&, int error)>& call_back); void SetAcceptCallBack(const std::function&, int error)>& call_back); public: diff --git a/net/EventActions.h b/net/EventActions.h index 5be9032..1dd69c7 100644 --- a/net/EventActions.h +++ b/net/EventActions.h @@ -10,14 +10,18 @@ public: virtual ~CEventActions() {} //param is net io thread num, default cpu number - virtual bool Init(int thread_num = 0) = 0; + virtual bool Init() = 0; virtual bool Dealloc() = 0; virtual bool AddTimerEvent(unsigned int interval, int event_flag, CMemSharePtr& event) = 0; virtual bool AddSendEvent(CMemSharePtr& event) = 0; virtual bool AddRecvEvent(CMemSharePtr& event) = 0; virtual bool AddAcceptEvent(CMemSharePtr& event) = 0; +#ifndef __linux__ + virtual bool AddConnection(CMemSharePtr& event, const std::string& ip, short port, char* buf, int buf_len) = 0; +#else virtual bool AddConnection(CMemSharePtr& event, const std::string& ip, short port) = 0; +#endif virtual bool AddDisconnection(CMemSharePtr& event) = 0; virtual bool DelEvent(CMemSharePtr& event) = 0; virtual void ProcessEvent() = 0; diff --git a/net/EventHandler.h b/net/EventHandler.h index 0b8cc06..ec46468 100644 --- a/net/EventHandler.h +++ b/net/EventHandler.h @@ -13,13 +13,13 @@ enum EVENT_FLAG { EVENT_ACCEPT = 0x0004, //accept event EVENT_TIMER = 0x0008, //timer event EVENT_CONNECT = 0x0010, //connect event - EVENT_DISCONNECT = 0x0020 //disconnect event + EVENT_DISCONNECT = 0x0020 //disconnect event }; enum EVENT_ERROR { - EVENT_ERROR_NO = 0, - EVENT_ERROR_TIMEOUT = 1, - EVENT_ERROR_CLOSED = 2 + EVENT_ERROR_NO = 0x0100, + EVENT_ERROR_TIMEOUT = 0x0200, + EVENT_ERROR_CLOSED = 0x0400 }; class Cevent { @@ -45,7 +45,7 @@ class CAcceptEventHandler : public Cevent { public: CMemSharePtr _client_socket; - CAcceptSocket* _accept_socket = nullptr; + CMemSharePtr _accept_socket = nullptr; std::function&, int error)> _call_back; }; #endif \ No newline at end of file diff --git a/net/NetObject.cpp b/net/NetObject.cpp new file mode 100644 index 0000000..2fc44ab --- /dev/null +++ b/net/NetObject.cpp @@ -0,0 +1,247 @@ +#include + +#include "NetObject.h" +#include "EventActions.h" +#include "OSInfo.h" +#include "Log.h" +#include "Runnable.h" +#ifdef __linux__ +#include "CEpoll.h" +#include "LinuxFunc.h" +#else +#include "IOCP.h" +#endif + +CNetObject::CNetObject() { + +} + +CNetObject::~CNetObject() { + +} + +void CNetObject::Init(int thread_num) { +#ifndef __linux__ + InitScoket(); +#else + SetCoreFileUnlimit(); +#endif // __linux__ + + int cpus = GetCpuNum(); + if (thread_num == 0 || thread_num > cpus * 2) { + thread_num = cpus; + } + for (int i = 0; i < thread_num; i++) { +#ifdef __linux__ + std::shared_ptr event_actions(new CEpoll); +#else + static std::shared_ptr event_actions(new CIOCP); + //std::shared_ptr event_actions(new CIOCP); +#endif + event_actions->Init(); + std::shared_ptr thd(new std::thread(std::bind(&CEventActions::ProcessEvent, event_actions))); + _actions_map[thd->get_id()] = event_actions; + _thread_vec.push_back(thd); + } +} + +void CNetObject::Dealloc() { + for (auto iter = _actions_map.begin(); iter != _actions_map.end(); ++iter) { + iter->second->Dealloc(); + } + _actions_map.clear(); +#ifndef __linux__ + DeallocSocket(); +#endif // __linux__ +} + +void CNetObject::MainLoop() { + for (;;) { + CRunnable::Sleep(50000); + } +} + +void CNetObject::Join() { + for (size_t i = 0; i < _thread_vec.size(); ++i) { + _thread_vec[i]->join(); + } +} + +void CNetObject::SetReadCallback(const call_back& func) { + _read_call_back = func; +} + +void CNetObject::SetWriteCallback(const call_back& func) { + _write_call_back = func; +} + +void CNetObject::SetDisconnectionCallback(const call_back& func) { + _disconnection_call_back = func; +} + +void CNetObject::SetAcceptCallback(const call_back& func) { + _accept_call_back = func; +} + +bool CNetObject::ListenAndAccept(int port, std::string ip) { + if (!_accept_call_back) { + LOG_WARN("accept call back function is null!, port : %d, ip : %s ", port, ip.c_str()); + return false; + } + + if (!_read_call_back) { + LOG_WARN("read call back function is null!, port : %d, ip : %s ", port, ip.c_str()); + return false; + } + + if (_actions_map.size() <= 0) { + LOG_WARN("CNetObject obj is not inited!, port : %d, ip : %s ", port, ip.c_str()); + return false; + } + + for (auto iter = _actions_map.begin(); iter != _actions_map.end(); ++iter) { + CMemSharePtr accept_socket = MakeNewSharedPtr(&_pool, _actions_map.begin()->second); + if (!accept_socket->Bind(port, ip)) { + return false; + } + + if (!accept_socket->Listen(100)) { + return false; + } + + accept_socket->SetAcceptCallBack(std::bind(&CNetObject::_AcceptFunction, this, std::placeholders::_1, std::placeholders::_2)); + accept_socket->SetReadCallBack(std::bind(&CNetObject::_ReadFunction, this, std::placeholders::_1, std::placeholders::_2)); + + accept_socket->SyncAccept(); + _accept_socket[accept_socket->GetSocket()] = accept_socket; +#ifndef __linux__ + break; +#endif + } + return true; +} + +void CNetObject::SetConnectionCallback(const call_back& func) { + _connection_call_back = func; +} + +CMemSharePtr CNetObject::Connection(int port, std::string ip, char* buf, int buf_len) { + if (!_connection_call_back) { + LOG_WARN("connection call back function is null!, port : %d, ip : %s ", port, ip.c_str()); + return nullptr; + } + if (!_write_call_back) { + LOG_WARN("read call back function is null!, port : %d, ip : %s ", port, ip.c_str()); + return nullptr; + } + + auto actions = RandomGetActions(); + CMemSharePtr sock = MakeNewSharedPtr(&_pool, actions); + sock->SetWriteCallBack(std::bind(&CNetObject::_WriteFunction, this, std::placeholders::_1, std::placeholders::_2)); + +#ifndef __linux__ + sock->SetReadCallBack(std::bind(&CNetObject::_ReadFunction, this, std::placeholders::_1, std::placeholders::_2)); + sock->SyncConnection(ip, port, buf, buf_len); +#else + auto func = [buf, buf_len, sock, this](CMemSharePtr& event, int err) { + if (err = EVENT_ERROR_NO) { + sock->SyncWrite(buf, buf_len); + } + sock->SetReadCallBack(std::bind(&CNetObject::_ReadFunction, this, std::placeholders::_1, std::placeholders::_2)); + }; + sock->SetReadCallBack(func); +#endif + return sock; +} + +CMemSharePtr CNetObject::Connection(int port, std::string ip) { + if (!_connection_call_back) { + LOG_WARN("connection call back function is null!, port : %d, ip : %s ", port, ip.c_str()); + return nullptr; + } + if (!_write_call_back) { + LOG_WARN("read call back function is null!, port : %d, ip : %s ", port, ip.c_str()); + return nullptr; + } + + auto actions = RandomGetActions(); + CMemSharePtr sock = MakeNewSharedPtr(&_pool, actions); + sock->SetWriteCallBack(std::bind(&CNetObject::_WriteFunction, this, std::placeholders::_1, std::placeholders::_2)); + sock->SetReadCallBack(std::bind(&CNetObject::_ReadFunction, this, std::placeholders::_1, std::placeholders::_2)); + +#ifndef __linux__ + sock->SyncConnection(ip, port, "", 0); +#else + sock->SyncConnection(ip, port); +#endif + return sock; +} + +void CNetObject::_AcceptFunction(CMemSharePtr& event, int err) { + if (!event) { + return; + } + + auto socket_ptr = event->_client_socket; + if (err & EVENT_ERROR_NO) { + socket_ptr->SetReadCallBack(std::bind(&CNetObject::_ReadFunction, this, std::placeholders::_1, std::placeholders::_2)); + socket_ptr->SetWriteCallBack(std::bind(&CNetObject::_WriteFunction, this, std::placeholders::_1, std::placeholders::_2)); + if (_accept_call_back) { + _accept_call_back(socket_ptr, err); + } + static int num = 0; + num++; + LOG_ERROR("get client num : %d", num); + + std::unique_lock lock(_mutex); + _socket_map[event->_client_socket->GetSocket()] = event->_client_socket; + } +} + +void CNetObject::_ReadFunction(CMemSharePtr& event, int err) { + if (!event) { + return; + } + auto socket_ptr = event->_client_socket.Lock(); + if (err & EVENT_READ && _read_call_back) { + err &= ~EVENT_READ; + _read_call_back(socket_ptr, err); + if (err == EVENT_ERROR_CLOSED) { + std::unique_lock lock(_mutex); + _socket_map.erase(socket_ptr->GetSocket()); + } + + } else if (err & EVENT_CONNECT && _connection_call_back) { + err &= ~EVENT_CONNECT; + _connection_call_back(socket_ptr, err); + } else if (err & EVENT_DISCONNECT && _disconnection_call_back) { + err &= ~EVENT_DISCONNECT; + _disconnection_call_back(socket_ptr, err); + } +} + +void CNetObject::_WriteFunction(CMemSharePtr& event, int err) { + if (!event) { + return; + } + auto socket_ptr = event->_client_socket.Lock(); + if (err & EVENT_WRITE && _write_call_back) { + err &= ~EVENT_WRITE; + _write_call_back(socket_ptr, err); + if (err == EVENT_ERROR_CLOSED) { + std::unique_lock lock(_mutex); + _socket_map.erase(socket_ptr->GetSocket()); + } + } +} + +std::shared_ptr& CNetObject::RandomGetActions() { + std::random_device rd; + std::mt19937 mt(rd()); + int index = mt() % int(_actions_map.size()); + auto iter = _actions_map.begin(); + for (int i = 0; i < index; i++) { + iter++; + } + return iter->second; +} \ No newline at end of file diff --git a/net/NetObject.h b/net/NetObject.h new file mode 100644 index 0000000..4b956fe --- /dev/null +++ b/net/NetObject.h @@ -0,0 +1,63 @@ +#ifndef HEADER_CNETOBJECT +#define HEADER_CNETOBJECT + +#include +#include +#include +#include + +#include "PoolSharedPtr.h" +#include "EventHandler.h" +#include "MemaryPool.h" + +typedef std::function&, int err)> call_back; + +class CEventActions; +class CSocket; +class CAcceptSocket; +class CNetObject +{ +public: + CNetObject(); + ~CNetObject(); + //common + void Init(int thread_num); + void Dealloc(); + void MainLoop(); + void Join(); + + void SetReadCallback(const call_back& func); + void SetWriteCallback(const call_back& func); + void SetDisconnectionCallback(const call_back& func); + + //server + void SetAcceptCallback(const call_back& func); + bool ListenAndAccept(int port, std::string ip); + + //client + void SetConnectionCallback(const call_back& func); + + CMemSharePtr Connection(int port, std::string ip, char* buf, int buf_len); + CMemSharePtr Connection(int port, std::string ip); + +private: + void _AcceptFunction(CMemSharePtr& event, int err); + void _ReadFunction(CMemSharePtr& event, int err); + void _WriteFunction(CMemSharePtr& event, int err); + std::shared_ptr& RandomGetActions(); +private: + call_back _read_call_back = nullptr; + call_back _write_call_back = nullptr; + call_back _connection_call_back = nullptr; + call_back _disconnection_call_back = nullptr; + call_back _accept_call_back = nullptr; + CMemoryPool _pool; + + std::mutex _mutex; + std::vector> _thread_vec; + std::map> _accept_socket; + std::map> _socket_map; + std::map> _actions_map; +}; + +#endif \ No newline at end of file diff --git a/net/Socket.h b/net/Socket.h index cf68420..9720aeb 100644 --- a/net/Socket.h +++ b/net/Socket.h @@ -16,20 +16,18 @@ public: CSocket(std::shared_ptr& event_actions); ~CSocket(); - void SyncRead(const std::function&, int err)>& call_back = nullptr); - void SyncWrite(char* src, int len, const std::function&, int err)>& call_back = nullptr); + void SyncRead(); + void SyncWrite(char* src, int len); - void SyncRead(unsigned int interval, const std::function&, int err)>& call_back = nullptr); - void SyncWrite(unsigned int interval, char* src, int len, const std::function&, int error)>& call_back = nullptr); + void SyncRead(unsigned int interval); + void SyncWrite(unsigned int interval, char* src, int len); #ifndef __linux__ - void SyncConnection(const std::string& ip, short port, char* buf, int buf_len, const std::function&, int err)>& call_back = nullptr); + void SyncConnection(const std::string& ip, short port, char* buf, int buf_len); #else - void SyncConnection(const std::string& ip, short port, const std::function&, int err)>& call_back = nullptr); + void SyncConnection(const std::string& ip, short port); #endif - - - void SyncDisconnection(const std::function&, int err)>& call_back = nullptr); + void SyncDisconnection(); void SetReadCallBack(const std::function&, int error)>& call_back); void SetWriteCallBack(const std::function&, int error)>& call_back); diff --git a/net/SocketBase.h b/net/SocketBase.h index fa54658..09d4107 100644 --- a/net/SocketBase.h +++ b/net/SocketBase.h @@ -11,7 +11,7 @@ void DeallocSocket(); #endif class CEventActions; -class CMemaryPool; +class CMemoryPool; class CSocketBase { public: @@ -32,7 +32,7 @@ public: char _ip[__addr_str_len]; std::shared_ptr _event_actions; - std::shared_ptr _pool; + std::shared_ptr _pool; }; #endif \ No newline at end of file diff --git a/net/linux/AcceptSocket.cpp b/net/linux/AcceptSocket.cpp index f03feff..b82c0d4 100644 --- a/net/linux/AcceptSocket.cpp +++ b/net/linux/AcceptSocket.cpp @@ -13,7 +13,8 @@ #include "LinuxFunc.h" CAcceptSocket::CAcceptSocket(std::shared_ptr& event_actions) : CSocketBase(event_actions){ - + _sock = socket(PF_INET, SOCK_STREAM, 0); + SetReusePort(_sock); } CAcceptSocket::~CAcceptSocket() { @@ -21,8 +22,6 @@ CAcceptSocket::~CAcceptSocket() { } bool CAcceptSocket::Bind(short port, const std::string& ip) { - _sock = socket(PF_INET, SOCK_STREAM, 0); - SetSocketNoblocking(_sock); struct sockaddr_in addr; @@ -53,10 +52,23 @@ bool CAcceptSocket::Listen(unsigned int listen_size) { return true; } -void CAcceptSocket::SyncAccept(const std::function&, int error)>& call_back) { +void CAcceptSocket::SyncAccept() { + if (!_accept_event->_call_back) { + LOG_WARN("call back function is null"); + return; + } + if (!_accept_event) { _accept_event = MakeNewSharedPtr(_pool.get()); } + + if (!_accept_event->_accept_socket) { + _accept_event->_accept_socket = memshared_from_this(); + } + + auto socket = _accept_event->_accept_socket; + socket->GetSocket(); + if (!_accept_event->_data) { _accept_event->_data = _pool->PoolNew(); ((epoll_event*)_accept_event->_data)->events = 0; @@ -65,10 +77,7 @@ void CAcceptSocket::SyncAccept(const std::function_client_socket) { _accept_event->_client_socket = MakeNewSharedPtr(_pool.get(), _event_actions); } - //set call back function - if (!_accept_event->_call_back) { - _accept_event->_call_back = call_back; - } + //add event to epoll if (_event_actions) { _accept_event->_event_flag_set |= EVENT_ACCEPT; @@ -76,6 +85,20 @@ void CAcceptSocket::SyncAccept(const std::function&, int error)>& call_back) { + if (!_accept_event->_client_socket) { + _accept_event->_client_socket = MakeNewSharedPtr(_pool.get(), _event_actions); + } + if (!_accept_event->_client_socket->_read_event) { + _accept_event->_client_socket->_read_event = MakeNewSharedPtr(_accept_event->_client_socket->_pool.get()); + } + if (!_accept_event->_client_socket) { + _accept_event->_client_socket = MakeNewSharedPtr(_accept_event->_client_socket->_pool.get(), _event_actions); + } + + _accept_event->_client_socket->_read_event->_call_back = call_back; +} + void CAcceptSocket::SetAcceptCallBack(const std::function&, int error)>& call_back) { if (!_accept_event) { _accept_event = MakeNewSharedPtr(_pool.get()); @@ -106,7 +129,7 @@ void CAcceptSocket::_Accept(CMemSharePtr& event) { event->_client_socket->_read_event->_client_socket = event->_client_socket; //call accept call back function if (event->_call_back) { - event->_call_back(event, EVENT_ERROR_NO); + event->_call_back(event, EVENT_ERROR_NO | EVENT_ACCEPT); } event->_event_flag_set = 0; event->_client_socket = MakeNewSharedPtr(_pool.get(), _event_actions); diff --git a/net/linux/CEpoll.cpp b/net/linux/CEpoll.cpp index be7acba..48ce1d9 100644 --- a/net/linux/CEpoll.cpp +++ b/net/linux/CEpoll.cpp @@ -2,7 +2,6 @@ #include #include #include - #include "CEpoll.h" #include "OSInfo.h" #include "Log.h" @@ -11,9 +10,12 @@ #include "Socket.h" #include "Timer.h" #include "LinuxFunc.h" -#include "EpollImpl.h" -CEpoll::CEpoll() { +enum EPOLL_CODE { + EXIT_EPOLL = 0 +}; + +CEpoll::CEpoll() : _run(true) { } @@ -21,40 +23,332 @@ CEpoll::~CEpoll() { } -bool CEpoll::Init(int thread_num = 0) { - //make coredump file - SetCoreFileUnlimit(); - - int cpus = GetCpuNum(); - if (thread_num < 0 || thread_num > cpus * 2) { - thread_num = cpus; +bool CEpoll::Init() { + //get epoll handle. the param is invalid since linux 2.6.8 + _epoll_handler = epoll_create(1500); + if (_epoll_handler == -1) { + LOG_FATAL("epoll init failed! error : %d", errno); + return false; } - - for (int i = 0; i < thread_num; i++) { - std::shared_ptr epoll(new CEpollImpl); - std::thread thd(std::thread(std::bind(&CEpollImpl::ProcessEvent, epoll))); - _epoll_map[thd->get_id()] = epoll; + if (pipe(_pipe) == -1) { + LOG_FATAL("pipe init failed! error : %d", errno); + return false; + } + LOG_FATAL("Init() "); + _pipe_content.events |= EPOLLIN; + _pipe_content.data.u32 = EXIT_EPOLL; + int res = epoll_ctl(_epoll_handler, EPOLL_CTL_ADD, _pipe[0], &_pipe_content); + if (res == -1) { + LOG_ERROR("add event to epoll faild! error :%d", errno); + return false; } - - LOG_DEBUG("epoll init success, %d", errno); return true; } bool CEpoll::Dealloc() { - for (auto iter = _epoll_map.begin(); iter < _epoll_map.end(); ++iter) { - iter->second->Dealloc(); - } - _epoll_map.clear(); - LOG_DEBUG("epoll close success, %d", errno); + _run = false; + WeakUp(); return true; } -void SyncAccept(std::shared_ptr sock, const std::function&, int error)>& call_back) { - +bool CEpoll::AddTimerEvent(unsigned int interval, int event_flag, CMemSharePtr& event) { + _timer.AddTimer(interval, event_flag, event); + LOG_DEBUG("add a timer event, %d", interval); + return true; } -void SyncConnect(std::shared_ptr sock, const std::string& ip, short port, const std::function&, int err)>& call_back) { +bool CEpoll::AddSendEvent(CMemSharePtr& event) { + auto socket_ptr = event->_client_socket.Lock(); + if (socket_ptr) { + bool res = false; + epoll_event* content = (epoll_event*)event->_data; + //if not add to epoll + if (!(content->events & EPOLLOUT)) { + if (socket_ptr->IsInActions()) { + res = _ModifyEvent(event, EPOLLOUT, socket_ptr->GetSocket()); + } + else { + res = _AddEvent(event, EPOLLOUT, socket_ptr->GetSocket()); + } + } + + //reset one shot flag + res = _ReserOneShot(event, EPOLLOUT, socket_ptr->GetSocket()); + socket_ptr->SetInActions(true); + return res; + + } + LOG_WARN("write event is already distroyed! in %s", "AddSendEvent"); + return false; } +bool CEpoll::AddRecvEvent(CMemSharePtr& event) { + auto socket_ptr = event->_client_socket.Lock(); + if (socket_ptr) { + bool res = false; + epoll_event* content = (epoll_event*)event->_data; + //if not add to epoll + if (!(content->events & EPOLLIN)) { + if (socket_ptr->IsInActions()) { + res = _ModifyEvent(event, EPOLLIN, socket_ptr->GetSocket()); + + } else { + res = _AddEvent(event, EPOLLIN, socket_ptr->GetSocket()); + } + } + + //reset one shot flag + res = _ReserOneShot(event, EPOLLOUT, socket_ptr->GetSocket()); + if (res) { + socket_ptr->SetInActions(true); + } + return res; + + } + LOG_WARN("read event is already distroyed!in %s", "AddRecvEvent"); + return false; +} + +bool CEpoll::AddAcceptEvent(CMemSharePtr& event) { + bool res = false; + epoll_event* content = (epoll_event*)event->_data; + auto socket_ptr = event->_accept_socket; + //if not add to epoll + if (!(content->events & EPOLLIN)) { + res = _AddEvent(event, EPOLLIN, socket_ptr->GetSocket()); + } + + socket_ptr->SetInActions(true); + return res; +} + +bool CEpoll::AddConnection(CMemSharePtr& event, const std::string& ip, short port) { + if (ip.empty()) { + return false; + } + auto socket_ptr = event->_client_socket.Lock(); + if (socket_ptr) { + //the socket must not in epoll + if (socket_ptr->IsInActions()) { + return false; + } + socket_ptr->SetInActions(true); + + struct sockaddr_in addr; + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = inet_addr(ip.c_str()); + + int res = connect(socket_ptr->GetSocket(), (sockaddr *)&addr, sizeof(addr)); + if (errno == EINPROGRESS) { + res = _AddEvent(event, EPOLLOUT, socket_ptr->GetSocket()); + } + if (res == 0) { + return true; + } + LOG_WARN("connect event failed! %d", errno); + return false; + } + LOG_WARN("connection event is already distroyed!,%s", "AddConnection"); + return false; +} + +bool CEpoll::AddDisconnection(CMemSharePtr& event) { + auto socket_ptr = event->_client_socket.Lock(); + if (socket_ptr) { + if (DelEvent(event)) { + close(socket_ptr->GetSocket()); + } + } + return true; +} + +bool CEpoll::DelEvent(CMemSharePtr& event) { + auto socket_ptr = event->_client_socket.Lock(); + if (!socket_ptr) { + return false; + } + epoll_event* content = (epoll_event*)event->_data; + int res = epoll_ctl(_epoll_handler, EPOLL_CTL_DEL, socket_ptr->GetSocket(), content); + if (res == -1) { + LOG_ERROR("remove event from epoll faild! error :%d, socket : %d", errno, socket_ptr->GetSocket()); + return false; + } + LOG_DEBUG("del a socket from epoll, %d", socket_ptr->GetSocket()); + return true; +} + +void CEpoll::ProcessEvent() { + unsigned int wait_time = 0; + std::vector timer_vec; + std::vector event_vec; + event_vec.resize(1000); + while (_run) { + wait_time = _timer.TimeoutCheck(timer_vec); + //if there is no timer event. wait until recv something + if (wait_time == 0 && timer_vec.empty()) { + wait_time = -1; + } + + int res = epoll_wait(_epoll_handler, &*event_vec.begin(), (int)(event_vec.size()), wait_time); + + if (res == -1) { + LOG_ERROR("epoll_wait faild! error :%d", errno); + } + + if (res > 0) { + LOG_DEBUG("epoll_wait get events! num :%d, TheadId : %d", res, std::this_thread::get_id()); + _DoEvent(event_vec, res); + + } else { + if (!timer_vec.empty()) { + _DoTimeoutEvent(timer_vec); + } + } + } + + if (close(_epoll_handler) == -1) { + LOG_ERROR("epoll close failed! error : %d", errno); + } + if (close(_pipe[0]) == -1) { + LOG_ERROR("_pipe[0] close failed! error : %d", errno); + } + if (close(_pipe[1]) == -1) { + LOG_ERROR("_pipe[1] close failed! error : %d", errno); + } +} + +void CEpoll::WeakUp() { + write(_pipe[1], "0", 1); +} + +bool CEpoll::_AddEvent(CMemSharePtr& event, int event_flag, unsigned int sock) { + epoll_event* content = (epoll_event*)event->_data; + content->events |= event_flag | EPOLLET; + content->data.ptr = (void*)&event->_client_socket; + + int res = epoll_ctl(_epoll_handler, EPOLL_CTL_ADD, sock, content); + if (res == -1) { + if (errno == EEXIST) { + res = _ModifyEvent(event, event_flag, sock); + } + if (res == -1) { + LOG_ERROR("add event to epoll faild! error :%d, sock: %d", errno, sock); + return false; + } + } + LOG_DEBUG("add a event to epoll, event : %d, sock : %d", event->_event_flag_set, sock); + return true; +} + +bool CEpoll::_AddEvent(CMemSharePtr& event, int event_flag, unsigned int sock) { + epoll_event* content = (epoll_event*)event->_data; + content->events |= event_flag | EPOLLET; + content->data.ptr = (void*)&event->_accept_socket; + content->data.ptr = ((uintptr_t)content->data.ptr) | 1; + int res = epoll_ctl(_epoll_handler, EPOLL_CTL_ADD, sock, content); + if (res == -1) { + LOG_ERROR("add event to epoll faild! error :%d, sock: %d", errno, sock); + return false; + } + LOG_DEBUG("add a event to epoll, event flag: %d, sock : %d", event->_event_flag_set, sock); + return true; +} + +bool CEpoll::_ModifyEvent(CMemSharePtr& event, int event_flag, unsigned int sock) { + epoll_event* content = (epoll_event*)event->_data; + content->events |= event_flag; + content->data.ptr = (void*)&event->_client_socket; + int res = epoll_ctl(_epoll_handler, EPOLL_CTL_MOD, sock, content); + if (res == -1) { + if (errno == ENOENT) { + res = epoll_ctl(_epoll_handler, EPOLL_CTL_ADD, sock, content); + } + if (res == -1) { + LOG_ERROR("modify event to epoll faild! error :%d, sock: %d", errno, sock); + return false; + } + } + LOG_DEBUG("modify a event to epoll, event flag: %d, sock : %d", event->_event_flag_set, sock); + return true; +} + +bool CEpoll::_ReserOneShot(CMemSharePtr& event, int event_flag, unsigned int sock) { + epoll_event* content = (epoll_event*)event->_data; + content->events |= EPOLLONESHOT; + int res = epoll_ctl(_epoll_handler, EPOLL_CTL_MOD, sock, content); + if (res == -1) { + if (errno == ENOENT) { + res = _ModifyEvent(event, EPOLLONESHOT | event_flag, sock); + } + if (res == -1) { + LOG_ERROR("reset one shot flag faild! error :%d, sock: %d", errno, sock); + return false; + } + } + LOG_DEBUG("reset one shot, event flag: %d, sock : %d", event->_event_flag_set, sock); + return true; +} + +void CEpoll::_DoTimeoutEvent(std::vector& timer_vec) { + for (auto iter = timer_vec.begin(); iter != timer_vec.end(); ++iter) { + if (iter->_event_flag & EVENT_READ) { + auto socket_ptr = iter->_event->_client_socket.Lock(); + if (socket_ptr) { + socket_ptr->_Recv(iter->_event); + } + + } + else if (iter->_event_flag & EVENT_WRITE) { + auto socket_ptr = iter->_event->_client_socket.Lock(); + if (socket_ptr) { + socket_ptr->_Send(iter->_event); + } + } + } + timer_vec.clear(); +} + +void CEpoll::_DoEvent(std::vector& event_vec, int num) { + CMemWeakPtr* normal_sock = nullptr; + CAcceptSocket* accept_sock = nullptr; + void* event = nullptr; + for (int i = 0; i < num; i++) { + if (&_pipe_content == &event_vec[i] && event_vec[i].data.u32 == EXIT_EPOLL) { + _run = false; + } + event = event_vec[i].data.ptr; + if (!event) { + LOG_WARN("the event is nullptr, index : %d", i); + continue; + } + if (((uintptr_t)event) & 1) { + event = (void*)(((uintptr_t)event) & (uintptr_t)~1); + accept_sock = (CAcceptSocket*)event; + accept_sock->_Accept(accept_sock->_accept_event); + + } else { + normal_sock = (CMemWeakPtr*)event_vec[i].data.ptr; + if (!normal_sock) { + continue; + } + auto socket_ptr = normal_sock->Lock(); + if (!socket_ptr) { + continue; + } + if (event_vec[i].events & EPOLLIN) { + if (socket_ptr) { + socket_ptr->_Recv(socket_ptr->_read_event); + } + + } else if (event_vec[i].events & EPOLLOUT) { + auto socket_ptr = normal_sock->Lock(); + if (socket_ptr) { + socket_ptr->_Send(socket_ptr->_write_event); + } + } + } + } +} #endif // __linux__ diff --git a/net/linux/CEpoll.h b/net/linux/CEpoll.h index bd792ea..ce4de72 100644 --- a/net/linux/CEpoll.h +++ b/net/linux/CEpoll.h @@ -2,32 +2,45 @@ #ifndef HEADER_CEPOOL #define HEADER_CEPOOL -#include -#include -#include - #include - #include "EventActions.h" #define MAX_BUFFER_LEN 8192 class Cevent; -class CEpollImpl; -class CEpoll +class CEpoll : public CEventActions { public: CEpoll(); ~CEpoll(); - bool Init(int thread_num = 0); - bool Dealloc(); + virtual bool Init(); + virtual bool Dealloc(); - void SyncAccept(std::shared_ptr sock, const std::function&, int error)>& call_back = nullptr); - void SyncConnect(std::shared_ptr sock, const std::string& ip, short port, const std::function&, int err)>& call_back = nullptr); + virtual bool AddTimerEvent(unsigned int interval, int event_flag, CMemSharePtr& event); + virtual bool AddSendEvent(CMemSharePtr& event); + virtual bool AddRecvEvent(CMemSharePtr& event); + virtual bool AddAcceptEvent(CMemSharePtr& event); + virtual bool AddConnection(CMemSharePtr& event, const std::string& ip, short port); + virtual bool AddDisconnection(CMemSharePtr& event); + virtual bool DelEvent(CMemSharePtr& event); + + virtual void ProcessEvent(); + + void WeakUp(); private: - CAcceptSocket _accept_socket; - std::map> _epoll_map; + bool _AddEvent(CMemSharePtr& event, int event_flag, unsigned int sock); + bool _AddEvent(CMemSharePtr& event, int event_flag, unsigned int sock); + bool _ModifyEvent(CMemSharePtr& event, int event_flag, unsigned int sock); + bool _ReserOneShot(CMemSharePtr& event, int event_flag, unsigned int sock); + + void _DoTimeoutEvent(std::vector& timer_vec); + void _DoEvent(std::vector& event_vec, int num); +private: + int _epoll_handler; + bool _run; + unsigned int _pipe[2]; + epoll_event _pipe_content; }; #endif #endif // __linux__ diff --git a/net/linux/EpollImpl.cpp b/net/linux/EpollImpl.cpp deleted file mode 100644 index 2331bc9..0000000 --- a/net/linux/EpollImpl.cpp +++ /dev/null @@ -1,331 +0,0 @@ -#ifdef __linux__ -#include -#include -#include -#include "CEpollImpl.h" -#include "OSInfo.h" -#include "Log.h" -#include "EventHandler.h" -#include "Buffer.h" -#include "Socket.h" -#include "Timer.h" -#include "LinuxFunc.h" - -CEpollImpl::CEpollImpl() : _run(true){ - -} - -CEpollImpl::~CEpollImpl() { - -} - -bool CEpollImpl::Init() { - //get epoll handle. the param is invalid since linux 2.6.8 - _epoll_handler = epoll_create(1500); - if (_epoll_handler == -1) { - LOG_FATAL("epoll init failed! error : %d", errno); - return false; - } - - return true; -} - -bool CEpollImpl::Dealloc() { - _run = false; - WeakUp(); - return true; -} - -bool CEpollImpl::AddTimerEvent(unsigned int interval, int event_flag, CMemSharePtr& event) { - _timer.AddTimer(interval, event_flag, event); - LOG_DEBUG("add a timer event, %d", interval); - return true; -} - -bool CEpollImpl::AddSendEvent(CMemSharePtr& event) { - auto socket_ptr = event->_client_socket.Lock(); - if (socket_ptr) { - bool res = false; - epoll_event* content = (epoll_event*)event->_data; - //if not add to epoll - if (!(content->events & EPOLLOUT)) { - if (socket_ptr->IsInActions()) { - res = _ModifyEvent(event, EPOLLOUT, socket_ptr->GetSocket()); - - } - else { - res = _AddEvent(event, EPOLLOUT, socket_ptr->GetSocket()); - } - } - - //reset one shot flag - res = _ReserOneShot(event, EPOLLOUT, socket_ptr->GetSocket()); - socket_ptr->SetInActions(true); - return res; - - } - LOG_WARN("write event is already distroyed! in %s", "AddSendEvent"); - return false; -} - -bool CEpollImpl::AddRecvEvent(CMemSharePtr& event) { - auto socket_ptr = event->_client_socket.Lock(); - if (socket_ptr) { - bool res = false; - epoll_event* content = (epoll_event*)event->_data; - //if not add to epoll - if (!(content->events & EPOLLIN)) { - if (socket_ptr->IsInActions()) { - res = _ModifyEvent(event, EPOLLIN, socket_ptr->GetSocket()); - - } - else { - res = _AddEvent(event, EPOLLIN, socket_ptr->GetSocket()); - } - } - - //reset one shot flag - res = _ReserOneShot(event, EPOLLOUT, socket_ptr->GetSocket()); - if (res) { - socket_ptr->SetInActions(true); - } - return res; - - } - LOG_WARN("read event is already distroyed!in %s", "AddRecvEvent"); - return false; -} - -bool CEpollImpl::AddAcceptEvent(CMemSharePtr& event) { - bool res = false; - epoll_event* content = (epoll_event*)event->_data; - auto socket_ptr = event->_accept_socket; - //if not add to epoll - if (!(content->events & EPOLLIN)) { - res = _AddEvent(event, EPOLLIN, socket_ptr->GetSocket()); - } - - socket_ptr->SetInActions(true); - return res; -} - -bool CEpollImpl::AddConnection(CMemSharePtr& event, const std::string& ip, short port) { - if (ip.empty()) { - return false; - } - auto socket_ptr = event->_client_socket.Lock(); - if (socket_ptr) { - //the socket must not in epoll - if (socket_ptr->IsInActions()) { - return false; - } - socket_ptr->SetInActions(true); - - struct sockaddr_in addr; - addr.sin_family = AF_INET; - addr.sin_port = htons(port); - addr.sin_addr.s_addr = inet_addr(ip.c_str()); - - int res = connect(socket_ptr->GetSocket(), (sockaddr *)&addr, sizeof(addr)); - if (errno == EINPROGRESS) { - res = _AddEvent(event, EPOLLOUT, socket_ptr->GetSocket()); - } - if (res == 0) { - return true; - } - LOG_WARN("connect event failed! %d", errno); - return false; - } - LOG_WARN("connection event is already distroyed!,%s", "AddConnection"); - return false; -} - -bool CEpollImpl::AddDisconnection(CMemSharePtr& event) { - auto socket_ptr = event->_client_socket.Lock(); - if (socket_ptr) { - if (DelEvent(event)) { - close(socket_ptr->GetSocket()); - } - } - return true; -} - -bool CEpollImpl::DelEvent(CMemSharePtr& event) { - auto socket_ptr = event->_client_socket.Lock(); - if (!socket_ptr) { - return false; - } - epoll_event* content = (epoll_event*)event->_data; - int res = epoll_ctl(_epoll_handler, EPOLL_CTL_DEL, socket_ptr->GetSocket(), content); - if (res == -1) { - LOG_ERROR("remove event from epoll faild! error :%d, socket : %d", errno, socket_ptr->GetSocket()); - return false; - } - LOG_DEBUG("del a socket from epoll, %d", socket_ptr->GetSocket()); - return true; -} - -void CEpollImpl::ProcessEvent() { - unsigned int wait_time = 0; - std::vector timer_vec; - std::vector event_vec; - event_vec.resize(1000); - for (;;) { - wait_time = _timer.TimeoutCheck(timer_vec); - //if there is no timer event. wait until recv something - if (wait_time == 0 && timer_vec.empty()) { - wait_time = -1; - } - - int res = epoll_wait(_epoll_handler, &*event_vec.begin(), (int)(event_vec.size()), wait_time); - - if (res == -1) { - int err = errno; - LOG_ERROR("epoll_wait faild! error :%d", errno); - } - - if (res > 0) { - LOG_DEBUG("epoll_wait get events! num :%d, TheadId : %d", res, std::this_thread::get_id()); - _DoEvent(event_vec, res); - - } - else { - if (!timer_vec.empty()) { - _DoTimeoutEvent(timer_vec); - } - } - } - - if (close(_epoll_handler) == -1) { - LOG_ERROR("epoll close failed! error : %d", errno); - } -} - -bool CEpollImpl::_AddEvent(CMemSharePtr& event, int event_flag, unsigned int sock) { - epoll_event* content = (epoll_event*)event->_data; - content->events |= event_flag | EPOLLET; - content->data.ptr = (void*)&event->_client_socket; - - int res = epoll_ctl(_epoll_handler, EPOLL_CTL_ADD, sock, content); - if (res == -1) { - if (errno == EEXIST) { - res = _ModifyEvent(event, event_flag, sock); - } - if (res == -1) { - LOG_ERROR("add event to epoll faild! error :%d, sock: %d", errno, sock); - return false; - } - } - LOG_DEBUG("add a event to epoll, event : %d, sock : %d", event->_event_flag_set, sock); - return true; -} - -bool CEpollImpl::_AddEvent(CMemSharePtr& event, int event_flag, unsigned int sock) { - epoll_event* content = (epoll_event*)event->_data; - content->events |= event_flag | EPOLLET; - content->data.ptr = event->_accept_socket; - content->data.ptr = ((uintptr_t)content->data.ptr) | 1; - int res = epoll_ctl(_epoll_handler, EPOLL_CTL_ADD, sock, content); - if (res == -1) { - LOG_ERROR("add event to epoll faild! error :%d, sock: %d", errno, sock); - return false; - } - LOG_DEBUG("add a event to epoll, event flag: %d, sock : %d", event->_event_flag_set, sock); - return true; -} - -bool CEpollImpl::_ModifyEvent(CMemSharePtr& event, int event_flag, unsigned int sock) { - epoll_event* content = (epoll_event*)event->_data; - content->events |= event_flag; - content->data.ptr = (void*)&event->_client_socket; - int res = epoll_ctl(_epoll_handler, EPOLL_CTL_MOD, sock, content); - if (res == -1) { - if (errno == ENOENT) { - res = epoll_ctl(_epoll_handler, EPOLL_CTL_ADD, sock, content); - } - if (res == -1) { - LOG_ERROR("modify event to epoll faild! error :%d, sock: %d", errno, sock); - return false; - } - } - LOG_DEBUG("modify a event to epoll, event flag: %d, sock : %d", event->_event_flag_set, sock); - return true; -} - -bool CEpollImpl::_ReserOneShot(CMemSharePtr& event, int event_flag, unsigned int sock) { - epoll_event* content = (epoll_event*)event->_data; - content->events |= EPOLLONESHOT; - int res = epoll_ctl(_epoll_handler, EPOLL_CTL_MOD, sock, content); - if (res == -1) { - if (errno == ENOENT) { - res = _ModifyEvent(event, EPOLLONESHOT | event_flag, sock); - } - if (res == -1) { - LOG_ERROR("reset one shot flag faild! error :%d, sock: %d", errno, sock); - return false; - } - } - LOG_DEBUG("reset one shot, event flag: %d, sock : %d", event->_event_flag_set, sock); - return true; -} - -void CEpollImpl::_DoTimeoutEvent(std::vector& timer_vec) { - for (auto iter = timer_vec.begin(); iter != timer_vec.end(); ++iter) { - if (iter->_event_flag & EVENT_READ) { - auto socket_ptr = iter->_event->_client_socket.Lock(); - if (socket_ptr) { - socket_ptr->_Recv(iter->_event); - } - - } - else if (iter->_event_flag & EVENT_WRITE) { - auto socket_ptr = iter->_event->_client_socket.Lock(); - if (socket_ptr) { - socket_ptr->_Send(iter->_event); - } - } - } - timer_vec.clear(); -} - -void CEpollImpl::_DoEvent(std::vector& event_vec, int num) { - CMemWeakPtr* normal_sock = nullptr; - CAcceptSocket* accept_sock = nullptr; - void* event = nullptr; - for (int i = 0; i < num; i++) { - event = event_vec[i].data.ptr; - if (!event) { - LOG_WARN("the event is nullptr, index : %d", i); - continue; - } - if (((uintptr_t)event) & 1) { - event = (void*)(((uintptr_t)event) & (uintptr_t)~1); - accept_sock = (CAcceptSocket*)event; - accept_sock->_Accept(accept_sock->_accept_event); - - } - else { - normal_sock = (CMemWeakPtr*)event_vec[i].data.ptr; - if (!normal_sock) { - continue; - } - auto socket_ptr = normal_sock->Lock(); - if (!socket_ptr) { - continue; - } - if (event_vec[i].events & EPOLLIN) { - if (socket_ptr) { - socket_ptr->_Recv(socket_ptr->_read_event); - } - - } - else if (event_vec[i].events & EPOLLOUT) { - auto socket_ptr = normal_sock->Lock(); - if (socket_ptr) { - socket_ptr->_Send(socket_ptr->_write_event); - } - } - } - } -} -#endif // __linux__ diff --git a/net/linux/EpollImpl.h b/net/linux/EpollImpl.h deleted file mode 100644 index 7442d15..0000000 --- a/net/linux/EpollImpl.h +++ /dev/null @@ -1,44 +0,0 @@ -#ifdef __linux__ - -#ifndef HEADER_CEPOOL -#define HEADER_CEPOOL -#include -#include "EventActions.h" - -#define MAX_BUFFER_LEN 8192 -class Cevent; -class CEpollImpl : public CEventActions -{ -public: - CEpollImpl(); - ~CEpollImpl(); - - virtual bool Init(); - virtual bool Dealloc(); - - virtual bool AddTimerEvent(unsigned int interval, int event_flag, CMemSharePtr& event); - virtual bool AddSendEvent(CMemSharePtr& event); - virtual bool AddRecvEvent(CMemSharePtr& event); - virtual bool AddAcceptEvent(CMemSharePtr& event); - virtual bool AddConnection(CMemSharePtr& event, const std::string& ip, short port); - virtual bool AddDisconnection(CMemSharePtr& event); - virtual bool DelEvent(CMemSharePtr& event); - - virtual void ProcessEvent(); - - void WeakUp(); - -private: - bool _AddEvent(CMemSharePtr& event, int event_flag, unsigned int sock); - bool _AddEvent(CMemSharePtr& event, int event_flag, unsigned int sock); - bool _ModifyEvent(CMemSharePtr& event, int event_flag, unsigned int sock); - bool _ReserOneShot(CMemSharePtr& event, int event_flag, unsigned int sock); - - void _DoTimeoutEvent(std::vector& timer_vec); - void _DoEvent(std::vector& event_vec, int num); -private: - int _epoll_handler; - bool _run; -}; -#endif -#endif // __linux__ diff --git a/net/linux/LinuxFunc.cpp b/net/linux/LinuxFunc.cpp index 379e333..a4f2822 100644 --- a/net/linux/LinuxFunc.cpp +++ b/net/linux/LinuxFunc.cpp @@ -1,6 +1,7 @@ #ifdef __linux__ #include #include +#include #include "LinuxFunc.h" int SetSocketNoblocking(unsigned int sock) { @@ -10,6 +11,13 @@ int SetSocketNoblocking(unsigned int sock) { return old_option; } +int SetReusePort(unsigned int sock) { + int old_option = fcntl(sock, F_GETFL); + int new_option = old_option | SO_REUSEPORT; + fcntl(sock, F_SETFL, new_option); + return old_option; +} + void SetCoreFileUnlimit() { struct rlimit rlim; struct rlimit rlim_new; diff --git a/net/linux/LinuxFunc.h b/net/linux/LinuxFunc.h index f5d0049..87b36ac 100644 --- a/net/linux/LinuxFunc.h +++ b/net/linux/LinuxFunc.h @@ -5,6 +5,8 @@ int SetSocketNoblocking(unsigned int sock); +int SetReusePort(unsigned int sock); + void SetCoreFileUnlimit(); #endif diff --git a/net/linux/Socket.cpp b/net/linux/Socket.cpp index 063a091..b2d02d3 100644 --- a/net/linux/Socket.cpp +++ b/net/linux/Socket.cpp @@ -10,7 +10,7 @@ #include "Socket.h" #include "Runnable.h" -CSocket::CSocket(std::shared_ptr& event_actions) : CSocketBase(event_actions), _post_event_num(0){ +CSocket::CSocket(std::shared_ptr& event_actions) : CSocketBase(event_actions){ _read_event = MakeNewSharedPtr(_pool.get()); _write_event = MakeNewSharedPtr(_pool.get()); } @@ -37,7 +37,12 @@ CSocket::~CSocket() { } } -void CSocket::SyncRead(const std::function&, int error)>& call_back) { +void CSocket::SyncRead() { + if (!_read_event->_call_back) { + LOG_WARN("call back function is null"); + return; + } + if (!_read_event) { _read_event = MakeNewSharedPtr(_pool.get()); } @@ -45,23 +50,22 @@ void CSocket::SyncRead(const std::function&, in _read_event->_data = _pool->PoolNew(); ((epoll_event*)_read_event->_data)->events = 0; } - if (!_read_event->_call_back) { - _read_event->_call_back = call_back; - } - if (!_read_event->_buffer) { _read_event->_buffer = MakeNewSharedPtr(_pool.get(), _pool); } if (_event_actions) { _read_event->_event_flag_set |= EVENT_READ; - if (_event_actions->AddRecvEvent(_read_event)) { - _post_event_num++; - } + _event_actions->AddRecvEvent(_read_event); } } -void CSocket::SyncWrite(char* src, int len, const std::function&, int error)>& call_back) { +void CSocket::SyncWrite(char* src, int len) { + if (!_write_event->_call_back) { + LOG_WARN("call back function is null"); + return; + } + if (!_write_event) { _write_event = MakeNewSharedPtr(_pool.get()); } @@ -69,9 +73,6 @@ void CSocket::SyncWrite(char* src, int len, const std::function_data = _pool->PoolNew(); ((epoll_event*)_write_event->_data)->events = 0; } - if (!_write_event->_call_back) { - _write_event->_call_back = call_back; - } if (!_write_event->_buffer) { _write_event->_buffer = MakeNewSharedPtr(_pool.get(), _pool); @@ -83,13 +84,16 @@ void CSocket::SyncWrite(char* src, int len, const std::function_event_flag_set |= EVENT_WRITE; - if (_event_actions->AddSendEvent(_write_event)) { - _post_event_num++; - } + _event_actions->AddSendEvent(_write_event); } } -void CSocket::SyncConnection(const std::string& ip, short port, const std::function&, int err)>& call_back) { +void CSocket::SyncConnection(const std::string& ip, short port) { + if (!_write_event->_call_back) { + LOG_WARN("call back function is null"); + return; + } + if (ip.length() > 16) { LOG_ERROR("a wrong ip! %s", ip.c_str()); return; @@ -102,9 +106,6 @@ void CSocket::SyncConnection(const std::string& ip, short port, const std::funct _write_event->_data = _pool->PoolNew(); ((epoll_event*)_write_event->_data)->events = 0; } - if (!_write_event->_call_back) { - _write_event->_call_back = call_back; - } if (!_write_event->_buffer) { _write_event->_buffer = MakeNewSharedPtr(_pool.get(), _pool); @@ -116,13 +117,16 @@ void CSocket::SyncConnection(const std::string& ip, short port, const std::funct if (_event_actions) { _write_event->_event_flag_set |= EVENT_CONNECT; - if (_event_actions->AddConnection(_write_event, ip, port)) { - _post_event_num++; - } + _event_actions->AddConnection(_write_event, ip, port); } } -void CSocket::SyncDisconnection(const std::function&, int err)>& call_back) { +void CSocket::SyncDisconnection() { + if (!_read_event->_call_back) { + LOG_WARN("call back function is null"); + return; + } + if (!_read_event) { _read_event = MakeNewSharedPtr(_pool.get()); } @@ -130,10 +134,7 @@ void CSocket::SyncDisconnection(const std::function_data = _pool->PoolNew(); ((epoll_event*)_read_event->_data)->events = 0; } - if (!_read_event->_call_back) { - _read_event->_call_back = call_back; - } - + if (!_read_event->_buffer) { _read_event->_buffer = MakeNewSharedPtr(_pool.get(), _pool); } @@ -144,13 +145,16 @@ void CSocket::SyncDisconnection(const std::function_event_flag_set |= EVENT_DISCONNECT; - if (_event_actions->AddDisconnection(_read_event)) { - _post_event_num++; - } + _event_actions->AddDisconnection(_read_event); } } -void CSocket::SyncRead(unsigned int interval, const std::function&, int error)>& call_back) { +void CSocket::SyncRead(unsigned int interval) { + if (!_read_event->_call_back) { + LOG_WARN("call back function is null"); + return; + } + if (!_read_event) { _read_event = MakeNewSharedPtr(_pool.get()); } @@ -159,29 +163,27 @@ void CSocket::SyncRead(unsigned int interval, const std::function_data)->events = 0; } - if (!_read_event->_call_back) { - _read_event->_call_back = call_back; - } - if (!_read_event->_buffer) { _read_event->_buffer = MakeNewSharedPtr(_pool.get(), _pool); } if (_event_actions) { _read_event->_event_flag_set |= EVENT_READ; - if (_event_actions->AddRecvEvent(_read_event)) { - _post_event_num++; - } + _event_actions->AddRecvEvent(_read_event); } if (_event_actions) { _read_event->_event_flag_set |= EVENT_TIMER; _event_actions->AddTimerEvent(interval, EVENT_READ, _read_event); - _post_event_num++; } } -void CSocket::SyncWrite(unsigned int interval, char* src, int len, const std::function&, int error)>& call_back) { +void CSocket::SyncWrite(unsigned int interval, char* src, int len) { + if (!_write_event->_call_back) { + LOG_WARN("call back function is null"); + return; + } + if (!_write_event) { _write_event = MakeNewSharedPtr(_pool.get()); } @@ -189,10 +191,7 @@ void CSocket::SyncWrite(unsigned int interval, char* src, int len, const std::fu _write_event->_data = _pool->PoolNew(); ((epoll_event*)_write_event->_data)->events = 0; } - if (!_write_event->_call_back) { - _write_event->_call_back = call_back; - } - + if (!_write_event->_client_socket) { _write_event->_client_socket = _read_event->_client_socket; } @@ -203,15 +202,12 @@ void CSocket::SyncWrite(unsigned int interval, char* src, int len, const std::fu if (_event_actions) { _write_event->_event_flag_set |= EVENT_WRITE; - if (_event_actions->AddSendEvent(_write_event)) { - _post_event_num++; - } + _event_actions->AddSendEvent(_write_event); } if (_event_actions) { _write_event->_event_flag_set |= EVENT_TIMER; _event_actions->AddTimerEvent(interval, EVENT_WRITE, _write_event); - _post_event_num++; } } @@ -239,7 +235,6 @@ bool operator!=(const CSocketBase& s1, const CSocketBase& s2) { return s1._sock != s2._sock; } - void CSocket::_Recv(CMemSharePtr& event) { if (!event->_client_socket) { return; @@ -250,13 +245,13 @@ void CSocket::_Recv(CMemSharePtr& event) { } int err = -1; if (event->_timer_out) { - err = EVENT_ERROR_TIMEOUT; + err = EVENT_ERROR_TIMEOUT | event->_event_flag_set; event->_timer_out = false; //reset timer flag event->_event_flag_set &= ~EVENT_TIMER; } else { - err = EVENT_ERROR_NO; + err = EVENT_ERROR_NO | event->_event_flag_set; if (event->_event_flag_set & EVENT_READ) { event->_off_set = 0; for (;;) { @@ -268,8 +263,7 @@ void CSocket::_Recv(CMemSharePtr& event) { break; } else if (errno == EBADMSG) { - err = EVENT_ERROR_CLOSED; - LOG_ERROR("recv 0 cause closed! socket : %d, errno : %d", socket_ptr->GetSocket(), errno); + err = EVENT_ERROR_CLOSED | event->_event_flag_set; break; } else { @@ -277,8 +271,7 @@ void CSocket::_Recv(CMemSharePtr& event) { break; } } else if (recv_len == 0) { - err = EVENT_ERROR_CLOSED; - LOG_ERROR("recv 0 cause closed! socket : %d, errno : %d", socket_ptr->GetSocket(), errno); + err = EVENT_ERROR_CLOSED | event->_event_flag_set; break; } event->_buffer->Write(buf, recv_len); @@ -303,7 +296,7 @@ void CSocket::_Send(CMemSharePtr& event) { int err = -1; if (event->_timer_out) { - err = EVENT_ERROR_TIMEOUT; + err = EVENT_ERROR_TIMEOUT | event->_event_flag_set; event->_timer_out = false; event->_event_flag_set &= ~EVENT_TIMER; @@ -320,12 +313,11 @@ void CSocket::_Send(CMemSharePtr& event) { //wait next time to do } else if (errno == EBADMSG) { - err = EVENT_ERROR_CLOSED; - LOG_ERROR("send 0 cause closed! socket : %d, errno : %d", socket_ptr->GetSocket(), errno); + err = EVENT_ERROR_CLOSED | event->_event_flag_set; + } else { - err = EVENT_ERROR_CLOSED; + err = EVENT_ERROR_CLOSED | event->_event_flag_set; LOG_ERROR("send filed! %d", errno); - LOG_ERROR("send 0 cause closed! socket : %d, errno : %d", socket_ptr->GetSocket(), errno); } } event->_off_set = res; diff --git a/net/linux/SocketBase.cpp b/net/linux/SocketBase.cpp index be8bc31..5b26c39 100644 --- a/net/linux/SocketBase.cpp +++ b/net/linux/SocketBase.cpp @@ -2,11 +2,11 @@ #include "SocketBase.h" #include "EventActions.h" -CSocketBase::CSocketBase() : _add_event_actions(false), _invalid(false), _event_actions(nullptr), _pool(new CMemaryPool(1024, 20)) { +CSocketBase::CSocketBase() : _add_event_actions(false), _invalid(false), _event_actions(nullptr), _pool(new CMemoryPool(1024, 20)) { memset(_ip, 0, __addr_str_len); } -CSocketBase::CSocketBase(std::shared_ptr& event_actions) : _add_event_actions(false), _invalid(false), _event_actions(event_actions), _pool(new CMemaryPool(1024, 20)) { +CSocketBase::CSocketBase(std::shared_ptr& event_actions) : _add_event_actions(false), _invalid(false), _event_actions(event_actions), _pool(new CMemoryPool(1024, 20)) { memset(_ip, 0, __addr_str_len); } diff --git a/net/win/AcceptSocket.cpp b/net/win/AcceptSocket.cpp index 08991d6..73d481f 100644 --- a/net/win/AcceptSocket.cpp +++ b/net/win/AcceptSocket.cpp @@ -25,7 +25,7 @@ bool CAcceptSocket::Bind(short port, const std::string& ip) { int ret = bind(_sock, (sockaddr *)&addr, sizeof(sockaddr)); if (SOCKET_ERROR == ret) { - LOG_FATAL("win32 bind socket filed!"); + LOG_FATAL("win32 bind socket filed! errno : %d", GetLastError()); WSACleanup(); closesocket(_sock); return false; @@ -37,7 +37,7 @@ bool CAcceptSocket::Bind(short port, const std::string& ip) { bool CAcceptSocket::Listen(unsigned int listen_size) { int ret = listen(_sock, listen_size); if (SOCKET_ERROR == ret) { - LOG_FATAL("win32 listen socket filed!"); + LOG_FATAL("win32 listen socket filed! errno : %d", GetLastError()); WSACleanup(); closesocket(_sock); return false; @@ -47,36 +47,7 @@ bool CAcceptSocket::Listen(unsigned int listen_size) { return true; } -void CAcceptSocket::SyncAccept(const std::function&, int error)>& call_back) { - if (!_accept_event) { - _accept_event = MakeNewSharedPtr(_pool.get()); - } - if (!_accept_event->_data) { - _accept_event->_data = _pool->PoolNew(); - } - - if (!_accept_event->_client_socket) { - _accept_event->_client_socket = MakeNewSharedPtr(_pool.get(), _event_actions); - } - if (!_accept_event->_client_socket->_read_event) { - _accept_event->_client_socket->_read_event = MakeNewSharedPtr(_accept_event->_client_socket->_pool.get()); - } - if (!_accept_event->_client_socket->_read_event->_buffer) { - _accept_event->_client_socket->_read_event->_buffer = MakeNewSharedPtr(_accept_event->_client_socket->_pool.get(), _accept_event->_client_socket->_pool); - } - - if (!_accept_event->_call_back) { - _accept_event->_call_back = call_back; - } - - if (_event_actions) { - _accept_event->_event_flag_set |= EVENT_ACCEPT; - _event_actions->AddAcceptEvent(_accept_event); - } -} - -void CAcceptSocket::SyncAccept(const std::function&, int error)>& accept_back, - const std::function&, int error)>& read_back) { +void CAcceptSocket::SyncAccept() { if (!_accept_event) { _accept_event = MakeNewSharedPtr(_pool.get()); } @@ -84,9 +55,12 @@ void CAcceptSocket::SyncAccept(const std::function_data = _pool->PoolNew(); } if (!_accept_event->_accept_socket) { - _accept_event->_accept_socket = this; + _accept_event->_accept_socket = memshared_from_this(); } + auto socket = _accept_event->_accept_socket; + auto sock = socket->GetSocket(); + if (!_accept_event->_client_socket) { _accept_event->_client_socket = MakeNewSharedPtr(_pool.get(), _event_actions); } @@ -98,10 +72,12 @@ void CAcceptSocket::SyncAccept(const std::function_call_back) { - _accept_event->_call_back = accept_back; + LOG_WARN("call back function is null"); + return; } if (!_accept_event->_client_socket->_read_event->_call_back) { - _accept_event->_client_socket->_read_event->_call_back = read_back; + LOG_WARN("call back function is null"); + return; } if (_event_actions) { @@ -148,14 +124,13 @@ void CAcceptSocket::_Accept(CMemSharePtr& event) { event->_client_socket->_read_event->_client_socket = event->_client_socket; //call accept call back function - event->_event_flag_set = 0; if (event->_call_back) { - event->_call_back(event, 0); + event->_call_back(event, EVENT_ERROR_NO | event->_event_flag_set); } //call read call back function if (event->_client_socket->_read_event->_call_back) { - event->_client_socket->_read_event->_call_back(event->_client_socket->_read_event, 0); + event->_client_socket->_read_event->_call_back(event->_client_socket->_read_event, EVENT_ERROR_NO | EVENT_READ); } //post Accept diff --git a/net/win/IOCP.cpp b/net/win/IOCP.cpp index 2ced16d..6f9fc5b 100644 --- a/net/win/IOCP.cpp +++ b/net/win/IOCP.cpp @@ -8,11 +8,11 @@ #include "Timer.h" enum STATE_CODE { - EXIT_IOCP = 0, - WEAK_UP_IOCP = 1, + EXIT_IOCP = 0xFFFFFFFF, + WEAK_UP_IOCP = 0xAAAAFFFF, }; -CIOCP::CIOCP() { +CIOCP::CIOCP() : _is_inited(false) { } @@ -21,21 +21,19 @@ CIOCP::~CIOCP() { } bool CIOCP::Init() { - int _threads_num = GetCpuNum() * 2; + int _threads_num = GetCpuNum(); //tell iocp the must thread num _iocp_handler = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, _threads_num); if (_iocp_handler == INVALID_HANDLE_VALUE) { LOG_FATAL("IOCP create io completion port failed!"); return false; } + _is_inited = true; return true; } bool CIOCP::Dealloc() { - if (CloseHandle(_iocp_handler) == -1) { - LOG_ERROR("IOCP close io completion port failed!"); - } - _iocp_handler = nullptr; + PostQueuedCompletionStatus(_iocp_handler, 0, EXIT_IOCP, nullptr); return true; } @@ -90,7 +88,7 @@ bool CIOCP::AddAcceptEvent(CMemSharePtr& event) { return _PostAccept(event); } -bool CIOCP::AddConnection(CMemSharePtr& event, const std::string& ip, short port) { +bool CIOCP::AddConnection(CMemSharePtr& event, const std::string& ip, short port, char* buf, int buf_len) { auto socket_ptr = event->_client_socket.Lock(); if (socket_ptr) { if (!socket_ptr->IsInActions()) { @@ -101,7 +99,7 @@ bool CIOCP::AddConnection(CMemSharePtr& event, const std::string& } ((EventOverlapped*)event->_data)->_event = &event; socket_ptr->SetInActions(true); - return _PostConnection(event, ip, port); + return _PostConnection(event, ip, port, buf, buf_len); } LOG_WARN("read event is already distroyed!"); return false; @@ -152,6 +150,9 @@ void CIOCP::ProcessEvent() { DWORD dw_err = 0; if (res) { dw_err = NO_ERROR; + if ((PULONG_PTR)socket_context == (PULONG_PTR)EXIT_IOCP){ + break; + } } else { dw_err = GetLastError(); @@ -173,6 +174,12 @@ void CIOCP::ProcessEvent() { continue; } } + if (_is_inited) { + if (CloseHandle(_iocp_handler) == -1) { + LOG_ERROR("IOCP close io completion port failed!"); + } + } + _is_inited = false; } bool CIOCP::_PostRecv(CMemSharePtr& event) { @@ -237,7 +244,7 @@ bool CIOCP::_PostSend(CMemSharePtr& event) { return true; } -bool CIOCP::_PostConnection(CMemSharePtr& event, const std::string& ip, short port) { +bool CIOCP::_PostConnection(CMemSharePtr& event, const std::string& ip, short port, char* buf, int buf_len) { EventOverlapped* context = (EventOverlapped*)event->_data; DWORD dwFlags = 0; @@ -260,7 +267,7 @@ bool CIOCP::_PostConnection(CMemSharePtr& event, const std::strin if (SOCKET_ERROR == bind(socket_ptr->GetSocket(), (sockaddr*)&local, sizeof(local))) { LOG_FATAL("bind local host failed! error code: %d", WSAGetLastError()); } - int res = __ConnectEx(socket_ptr->GetSocket(), (sockaddr*)&addr, sizeof(addr), nullptr, 0, nullptr, lapped); + int res = __ConnectEx(socket_ptr->GetSocket(), (sockaddr*)&addr, sizeof(addr), buf, buf_len, nullptr, lapped); if ((SOCKET_ERROR == res) && (WSA_IO_PENDING != WSAGetLastError())) { LOG_FATAL("IOCP post connect event failed! error code: %d", WSAGetLastError()); @@ -275,7 +282,6 @@ bool CIOCP::_PostDisconnection(CMemSharePtr& event) { context->Clear(); context->_event_flag_set = event->_event_flag_set; - context->_wsa_buf.len = event->_buffer->Read(context->_lapped_buffer, MAX_BUFFER_LEN); OVERLAPPED *lapped = &context->_overlapped; auto socket_ptr = event->_client_socket.Lock(); @@ -318,6 +324,7 @@ void CIOCP::_DoEvent(EventOverlapped *socket_context, int bytes) { } else { CMemSharePtr* event = (CMemSharePtr*)socket_context->_event; if (event) { + (*event)->_event_flag_set = socket_context->_event_flag_set; (*event)->_off_set = bytes; if (socket_context->_event_flag_set & EVENT_READ || socket_context->_event_flag_set & EVENT_CONNECT diff --git a/net/win/IOCP.h b/net/win/IOCP.h index b44fef6..4dbedc7 100644 --- a/net/win/IOCP.h +++ b/net/win/IOCP.h @@ -50,7 +50,7 @@ public: virtual bool AddSendEvent(CMemSharePtr& event); virtual bool AddRecvEvent(CMemSharePtr& event); virtual bool AddAcceptEvent(CMemSharePtr& event); - virtual bool AddConnection(CMemSharePtr& event, const std::string& ip, short port); + virtual bool AddConnection(CMemSharePtr& event, const std::string& ip, short port, char* buf, int buf_len); virtual bool AddDisconnection(CMemSharePtr& event); virtual bool DelEvent(CMemSharePtr& event); @@ -60,13 +60,14 @@ private: bool _PostRecv(CMemSharePtr& event); bool _PostAccept(CMemSharePtr& event); bool _PostSend(CMemSharePtr& event); - bool _PostConnection(CMemSharePtr& event, const std::string& ip, short port); + bool _PostConnection(CMemSharePtr& event, const std::string& ip, short port, char* buf, int buf_len); bool _PostDisconnection(CMemSharePtr& event); void _DoTimeoutEvent(std::vector& timer_vec); void _DoEvent(EventOverlapped *socket_context, int bytes); private: HANDLE _iocp_handler; + bool _is_inited; }; #endif diff --git a/net/win/Socket.cpp b/net/win/Socket.cpp index a65f0d0..9c0ebbf 100644 --- a/net/win/Socket.cpp +++ b/net/win/Socket.cpp @@ -25,22 +25,24 @@ CSocket::~CSocket() { } } -void CSocket::SyncRead(const std::function&, int error)>& call_back) { +void CSocket::SyncRead() { + if (!_read_event->_call_back) { + LOG_WARN("call back function is null"); + return; + } + if (!_read_event) { _read_event = MakeNewSharedPtr(_pool.get()); } if (!_read_event->_data) { _read_event->_data = _pool->PoolNew(); } - if (!_read_event->_call_back) { - _read_event->_call_back = call_back; - } - if (!_read_event->_buffer) { _read_event->_buffer = MakeNewSharedPtr(_pool.get(), _pool); } if (_event_actions) { + _read_event->_event_flag_set = 0; _read_event->_event_flag_set |= EVENT_READ; if (_event_actions->AddRecvEvent(_read_event)) { _post_event_num++; @@ -48,17 +50,18 @@ void CSocket::SyncRead(const std::function&, in } } -void CSocket::SyncWrite(char* src, int len, const std::function&, int error)>& call_back) { +void CSocket::SyncWrite(char* src, int len) { + if (!_write_event->_call_back) { + LOG_WARN("call back function is null, src : %s, len : %d", src, len); + return; + } + if (!_write_event) { _write_event = MakeNewSharedPtr(_pool.get()); } if (!_write_event->_data) { _write_event->_data = _pool->PoolNew(); } - if (!_write_event->_call_back) { - _write_event->_call_back = call_back; - } - if (!_write_event->_buffer) { _write_event->_buffer = MakeNewSharedPtr(_pool.get(), _pool); } @@ -68,6 +71,7 @@ void CSocket::SyncWrite(char* src, int len, const std::function_client_socket = _read_event->_client_socket; } if (_event_actions) { + _read_event->_event_flag_set = 0; _write_event->_event_flag_set |= EVENT_WRITE; if (_event_actions->AddSendEvent(_write_event)) { _post_event_num++; @@ -75,9 +79,14 @@ void CSocket::SyncWrite(char* src, int len, const std::function&, int err)>& call_back) { - if (ip.length() > 16) { - LOG_ERROR("a wrong ip!"); +void CSocket::SyncConnection(const std::string& ip, short port, char* buf, int buf_len) { + if (!_read_event->_call_back) { + LOG_WARN("call back function is null, ip : %s, port : %d", ip.c_str(), port); + return; + } + + if (ip.length() > 16 || ip.empty()) { + LOG_ERROR("a wrong ip! ip : %s", ip.c_str()); return; } strcpy(_ip, ip.c_str()); @@ -87,10 +96,7 @@ void CSocket::SyncConnection(const std::string& ip, short port, const std::funct if (!_read_event->_data) { _read_event->_data = _pool->PoolNew(); } - if (!_read_event->_call_back) { - _read_event->_call_back = call_back; - } - + if (!_read_event->_buffer) { _read_event->_buffer = MakeNewSharedPtr(_pool.get(), _pool); } @@ -100,24 +106,27 @@ void CSocket::SyncConnection(const std::string& ip, short port, const std::funct } if (_event_actions) { + _read_event->_event_flag_set = 0; _read_event->_event_flag_set |= EVENT_CONNECT; - if (_event_actions->AddConnection(_read_event, ip, port)) { + if (_event_actions->AddConnection(_read_event, ip, port, buf, buf_len)) { _post_event_num++; } } } -void CSocket::SyncDisconnection(const std::function&, int err)>& call_back) { +void CSocket::SyncDisconnection() { + if (!_read_event->_call_back) { + LOG_WARN("call back function is null"); + return; + } + if (!_read_event) { _read_event = MakeNewSharedPtr(_pool.get()); } if (!_read_event->_data) { _read_event->_data = _pool->PoolNew(); } - if (!_read_event->_call_back) { - _read_event->_call_back = call_back; - } - + if (!_read_event->_buffer) { _read_event->_buffer = MakeNewSharedPtr(_pool.get(), _pool); } @@ -127,14 +136,20 @@ void CSocket::SyncDisconnection(const std::function_event_flag_set |= EVENT_CONNECT; + _read_event->_event_flag_set = 0; + _read_event->_event_flag_set |= EVENT_DISCONNECT; if (_event_actions->AddDisconnection(_read_event)) { _post_event_num++; } } } -void CSocket::SyncRead(unsigned int interval, const std::function&, int error)>& call_back) { +void CSocket::SyncRead(unsigned int interval) { + if (!_read_event->_call_back) { + LOG_WARN("call back function is null"); + return; + } + if (!_read_event) { _read_event = MakeNewSharedPtr(_pool.get()); } @@ -142,21 +157,16 @@ void CSocket::SyncRead(unsigned int interval, const std::function_data = _pool->PoolNew(); } - if (!_read_event->_call_back) { - _read_event->_call_back = call_back; - } - if (!_read_event->_buffer) { _read_event->_buffer = MakeNewSharedPtr(_pool.get(), _pool); } - + _read_event->_event_flag_set = 0; if (_event_actions) { _read_event->_event_flag_set |= EVENT_READ; if (_event_actions->AddRecvEvent(_read_event)) { _post_event_num++; } } - if (_event_actions) { _read_event->_event_flag_set |= EVENT_TIMER; _event_actions->AddTimerEvent(interval, EVENT_READ, _read_event); @@ -164,16 +174,18 @@ void CSocket::SyncRead(unsigned int interval, const std::function&, int error)>& call_back) { +void CSocket::SyncWrite(unsigned int interval, char* src, int len) { + if (!_write_event->_call_back) { + LOG_WARN("call back function is null"); + return; + } + if (!_write_event) { _write_event = MakeNewSharedPtr(_pool.get()); } if (!_write_event->_data) { _write_event->_data = _pool->PoolNew(); } - if (!_write_event->_call_back) { - _write_event->_call_back = call_back; - } if (!_write_event->_client_socket) { _write_event->_client_socket = _read_event->_client_socket; @@ -183,6 +195,7 @@ void CSocket::SyncWrite(unsigned int interval, char* src, int len, const std::fu } _write_event->_buffer->Write(src, len); + _write_event->_event_flag_set = 0; if (_event_actions) { _write_event->_event_flag_set |= EVENT_WRITE; if (_event_actions->AddSendEvent(_write_event)) { @@ -202,7 +215,7 @@ void CSocket::SetReadCallBack(const std::function&, int error)>& call_back) { - _read_event->_call_back = call_back; + _write_event->_call_back = call_back; } bool operator>(const CSocketBase& s1, const CSocketBase& s2) { @@ -228,15 +241,15 @@ void CSocket::_Recv(CMemSharePtr& event) { _post_event_num--; int err = -1; if (event->_timer_out) { - err = EVENT_ERROR_TIMEOUT; + err = EVENT_ERROR_TIMEOUT | event->_event_flag_set; } else if (!event->_off_set) { if (_post_event_num == 0) { - err = EVENT_ERROR_CLOSED; + err = EVENT_ERROR_CLOSED | event->_event_flag_set; } } else { - err = EVENT_ERROR_NO; + err = EVENT_ERROR_NO | event->_event_flag_set; event->_buffer->Write(context->_wsa_buf.buf, event->_off_set); } if (event->_call_back && err > -1) { @@ -251,15 +264,15 @@ void CSocket::_Send(CMemSharePtr& event) { _post_event_num--; int err = -1; if (event->_timer_out) { - err = EVENT_ERROR_TIMEOUT; + err = EVENT_ERROR_TIMEOUT | event->_event_flag_set; } else if (!event->_off_set) { if (_post_event_num == 0) { - err = EVENT_ERROR_CLOSED; + err = EVENT_ERROR_CLOSED | event->_event_flag_set; } } else { - err = EVENT_ERROR_NO; + err = EVENT_ERROR_NO | event->_event_flag_set; } if (event->_call_back && err > -1) { diff --git a/net/win/SocketBase.cpp b/net/win/SocketBase.cpp index bee4bfd..047ab46 100644 --- a/net/win/SocketBase.cpp +++ b/net/win/SocketBase.cpp @@ -1,4 +1,5 @@ #ifndef __linux__ +#include #include "SocketBase.h" #include "WinExpendFunc.h" #include "Log.h" @@ -30,6 +31,11 @@ static bool _InitExFunctnion() { return true; } +void SetReusePort(unsigned int sock) { + int opt = 1; + int ret = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (const char*)&opt, sizeof(opt)); +} + bool InitScoket() { static WSADATA __wsa_data; static bool __has_init = false; @@ -58,17 +64,19 @@ void DeallocSocket() { WSACleanup(); } -CSocketBase::CSocketBase() : _add_event_actions(false), _invalid(false), _event_actions(nullptr), _pool(new CMemaryPool(1024, 20)) { +CSocketBase::CSocketBase() : _add_event_actions(false), _invalid(false), _event_actions(nullptr), _pool(new CMemoryPool(1024, 20)) { memset(_ip, 0, __addr_str_len); _sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); + SetReusePort(_sock); if (_sock == INVALID_SOCKET) { LOG_FATAL("init a new socket failed!"); } } -CSocketBase::CSocketBase(std::shared_ptr& event_actions) : _add_event_actions(false), _invalid(false), _event_actions(event_actions), _pool(new CMemaryPool(1024, 20)) { +CSocketBase::CSocketBase(std::shared_ptr& event_actions) : _add_event_actions(false), _invalid(false), _event_actions(event_actions), _pool(new CMemoryPool(1024, 20)) { memset(_ip, 0, __addr_str_len); _sock = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); + SetReusePort(_sock); if (_sock == INVALID_SOCKET) { LOG_FATAL("init a new socket failed!"); } diff --git a/net/win/WinExpendFunc.h b/net/win/WinExpendFunc.h index 297339f..1f67116 100644 --- a/net/win/WinExpendFunc.h +++ b/net/win/WinExpendFunc.h @@ -10,6 +10,8 @@ extern LPFN_ACCEPTEX __AcceptEx; extern LPFN_CONNECTEX __ConnectEx; extern LPFN_GETACCEPTEXSOCKADDRS __AcceptExScokAddrs; extern LPFN_DISCONNECTEX __DisconnectionEx; +extern void SetReusePort(unsigned int sock); + #endif #endif // __linux__ diff --git a/test/CppNet.cpp b/test/CppNet.cpp index 94b39f9..6f4d194 100644 --- a/test/CppNet.cpp +++ b/test/CppNet.cpp @@ -11,93 +11,52 @@ #include "AcceptSocket.h" #include "EventHandler.h" - - -std::mutex __mutex; - -std::map> client_map; - -void ReadFunc(CMemSharePtr& event, int error); -std::function& event, int error)> read_back = ReadFunc; - -void WriteFunc(CMemSharePtr& event, int error) { +void WriteFunc(CMemSharePtr& sock, int error) { std::cout << "WriteFunc" << std::endl; std::cout << "Thread ID : " << std::this_thread::get_id() << std::endl; - std::cout << "write count: " << event->_off_set << std::endl << std::endl; + std::cout << "write count: " << sock->_write_event->_off_set << std::endl << std::endl; if (error != EVENT_ERROR_CLOSED) { - event->_client_socket.Lock()->SyncRead(read_back); - } else { - std::unique_lock lock(__mutex); - std::cout << "~~~~~~~erase socket" << error << std::endl; - client_map.erase(event->_client_socket.Lock()->GetSocket()); + sock->SyncRead(); } } -std::function& event, int error)> write_back = WriteFunc; -void ReadFunc(CMemSharePtr& event, int error) { +void ReadFunc(CMemSharePtr& sock, int error) { std::cout << "ReadFunc" << std::endl; - std::cout << *(event->_buffer) << std::endl; - event->_buffer->Clear(); + std::cout << *(sock->_read_event->_buffer) << std::endl; + sock->_read_event->_buffer->Clear(); std::cout << "Thread ID : " << std::this_thread::get_id() << std::endl; - std::cout << "Read size : " << event->_off_set << std::endl << std::endl; + std::cout << "Read size : " << sock->_read_event->_off_set << std::endl << std::endl; - event->_buffer->Clear(); if (error != EVENT_ERROR_CLOSED) { - event->_client_socket.Lock()->SyncWrite("aaaaa21231231", strlen("aaaaa21231231"), write_back); - } else { - std::unique_lock lock(__mutex); - std::cout << "~~~~~~~erase socket" << error << std::endl; - client_map.erase(event->_client_socket.Lock()->GetSocket()); + sock->SyncWrite("aaaaa21231231", strlen("aaaaa21231231")); } } -void AcceptFunc(CMemSharePtr& event, int error) { - std::unique_lock lock(__mutex); - client_map[event->_client_socket->GetSocket()] = event->_client_socket; +void AcceptFunc(CMemSharePtr& sock, int error) { std::cout << "AcceptFunc" << std::endl; - std::cout << "client address :" << event->_client_socket->GetAddress() << std::endl << std::endl; - event->_client_socket->SyncRead(read_back); + std::cout << "client address :" << sock->GetAddress() << std::endl << std::endl; + sock->SyncRead(); } -#include "Log.h" -//#include "LinuxFunc.h" -int main() { -#ifndef __linux__ - InitScoket(); -#endif - - CLog::Instance().SetLogLevel(LOG_DEBUG_LEVEL); +#include "Log.h" +#include "NetObject.h" +int main() { + CLog::Instance().SetLogLevel(LOG_WARN_LEVEL); CLog::Instance().SetLogName("CppNet.txt"); CLog::Instance().Start(); - -#ifdef __linux__ - std::shared_ptr event_actions(new CEpoll); -#else - std::shared_ptr event_actions(new CIOCP); -#endif // __linux__ - - event_actions->Init(); - CAcceptSocket sock(event_actions); + CNetObject net; + net.Init(1); - CMemaryPool pool; - auto _accept_event = MakeNewSharedPtr(&pool); + net.SetAcceptCallback(AcceptFunc); + net.SetWriteCallback(WriteFunc); + net.SetReadCallback(ReadFunc); - std::vector thread_vec; + net.ListenAndAccept(8921, "127.0.0.1"); - for (int i = 0; i < 8; i++) { - thread_vec.push_back(std::thread(std::bind(&CEventActions::ProcessEvent, event_actions))); - } - - std::function& event, int error)> accept_func = AcceptFunc; - sock.Bind(8500, "0.0.0.0"); - sock.Listen(10); - sock.SyncAccept(accept_func, read_back); - - for (int i = 0; i < 8; i++) { - thread_vec[i].join(); - } -#ifndef __linux__ - DeallocSocket(); -#endif + //net.MainLoop(); + //net.Dealloc(); + net.Join(); + CLog::Instance().Stop(); + CLog::Instance().Join(); } \ No newline at end of file diff --git a/test/CppNet_client.cpp b/test/CppNet_client.cpp index e2a82e7..9236645 100644 --- a/test/CppNet_client.cpp +++ b/test/CppNet_client.cpp @@ -7,143 +7,56 @@ #include "EventHandler.h" #include "MemaryPool.h" -std::mutex __mutex; - -std::map> client_map; - -void ReadFunc(CMemSharePtr& event, int error); -std::function& event, int error)> read_back = ReadFunc; - -void WriteFunc(CMemSharePtr& event, int error) { +void WriteFunc(CMemSharePtr& sock, int error) { std::cout << "WriteFunc" << std::endl; std::cout << "Thread ID : " << std::this_thread::get_id() << std::endl; - std::cout << "write count: " << event->_off_set << std::endl << std::endl; - event->_client_socket.Lock()->SyncRead(read_back); + std::cout << "write count: " << sock->_write_event->_off_set << std::endl << std::endl; + sock->SyncRead(); } -std::function& event, int error)> write_back = WriteFunc; -void ConnectionFunc(CMemSharePtr& event, int error) { +void ConnectionFunc(CMemSharePtr& sock, int error) { std::cout << "ConnectionFunc" << std::endl; - std::unique_lock lock(__mutex); - event->_client_socket.Lock()->SyncWrite("aaaaa21231231", strlen("aaaaa21231231"), write_back); + sock->SyncWrite("aaaaa21231231", strlen("aaaaa21231231")); } -void ReadFunc(CMemSharePtr& event, int error) { +void DisConnectionFunc(CMemSharePtr& sock, int error) { + std::cout << "DisConnectionFunc" << std::endl; +} + +void ReadFunc(CMemSharePtr& sock, int error) { std::cout << "ReadFunc" << std::endl; - std::cout << *(event->_buffer) << std::endl; - event->_buffer->Clear(); + std::cout << *(sock->_read_event->_buffer) << std::endl; + sock->_read_event->_buffer->Clear(); std::cout << "Thread ID : " << std::this_thread::get_id() << std::endl; - std::cout << "Read size : " << event->_off_set << std::endl << std::endl; - - event->_buffer->Clear(); - if (error != EVENT_ERROR_CLOSED || error == EVENT_CONNECT) { - //event->_client_socket.Lock()->SyncRead(read_back); - event->_client_socket.Lock()->SyncWrite("aaaaa21231231", strlen("aaaaa21231231"), write_back); - //event->_client_socket.Lock()->SyncDisconnection(read_back); - } else { - if (client_map.size() < 10) { - int a = 0; - a++; - } - std::unique_lock lock(__mutex); - client_map.erase(event->_client_socket.Lock()->GetSocket()); + std::cout << "Read size : " << sock->_read_event->_off_set << std::endl << std::endl; + if (error != EVENT_ERROR_CLOSED) { + sock->SyncWrite("aaaaa21231231", strlen("aaaaa21231231")); } } -void AcceptFunc(CMemSharePtr& event, int error) { - client_map[event->_client_socket->GetSocket()] = event->_client_socket; - std::cout << "AcceptFunc" << std::endl; - std::cout << "client address :" << event->_client_socket->GetAddress() << std::endl << std::endl; - std::unique_lock lock(__mutex); - event->_client_socket->SyncRead(read_back); -} #include "Log.h" +#include "NetObject.h" + int main() { - - std::map test_map; - test_map[1] = 100; - test_map.erase(2); - - InitScoket(); - - CLog::Instance().SetLogLevel(LOG_DEBUG_LEVEL); + CLog::Instance().SetLogLevel(LOG_WARN_LEVEL); CLog::Instance().SetLogName("CppNet.txt"); CLog::Instance().Start(); - std::shared_ptr event_actions(new CIOCP); - event_actions->Init(); - //CAcceptSocket sock(event_actions); + CNetObject net; + net.Init(2); - CMemaryPool pool; - CMemSharePtr sock = MakeNewSharedPtr(&pool, event_actions); + net.SetConnectionCallback(ConnectionFunc); + net.SetWriteCallback(WriteFunc); + net.SetReadCallback(ReadFunc); + net.SetDisconnectionCallback(DisConnectionFunc); - /*void* data = &sock; - data = (void *)((uintptr_t)data | 1); - data = (void*)((uintptr_t)data & (uintptr_t)~1); - CMemSharePtr sock1 = *(CMemSharePtr*)data;*/ - - std::vector thread_vec; - - for (int i = 0; i < 1; i++) { - thread_vec.push_back(std::thread(std::bind(&CEventActions::ProcessEvent, event_actions))); - } - - std::function& event, int error)> accept_func = AcceptFunc; + auto sock = net.Connection(8921, "127.0.0.1"); + CRunnable::Sleep(2000); - sock->SyncConnection("172.16.81.132", 8500, ConnectionFunc); - //sock->SyncWrite("aaaaa21231231", strlen("aaaaa21231231"), write_back); - - for (int i = 0; i < 1; i++) { - thread_vec[i].join(); - } - DeallocSocket(); -} - -//#include -//#include -//#pragma comment(lib,"ws2_32.lib") -// -//int main() { -// static WSADATA __wsa_data; -// static bool __has_init = false; -// if (!__has_init && WSAStartup(MAKEWORD(2, 2), &__wsa_data) != 0) { -// return false; -// -// } -// else { -// __has_init = true; -// } -// -// SOCKADDR_IN addr; -// addr.sin_family = AF_INET; -// addr.sin_port = htons(8500); -// addr.sin_addr.S_un.S_addr = inet_addr("192.168.182.131"); -// -// auto func = [addr](int i) { -// SOCKET sock = socket(AF_INET, SOCK_STREAM, 0); -// -// connect(sock, (sockaddr*)&addr, sizeof(addr)); -// char buf[] = "Hello worldHello worldHello worldHello worldHello worldHello worldHello worldHello worldHello worldHello worldHello worldHello worldHello worldHello worldHello worldHello worldHello worldHello worldHello worldHello worldHello worldHello worldHello worldHello worldHello worldHello worldHello worldHello worldHello worldHello worldHello worldHello world"; -// for (;;) { -// send(sock, buf, strlen(buf), 0); -// -// char buf2[20] = { 0 }; -// recv(sock, buf2, 20, 0); -// -// Sleep(1000); -// } -// closesocket(sock); -// }; -// -// /*std::thread thread[1500]; -// for (int i = 0; i < 1500; i++) { -// Sleep(1); -// thread[i] = std::thread(func, i); -// } -// for (int i = 0; i < 1500; i++) { -// thread[i].join(); -// }*/ -// -// int a = 0; -// a++; -//} \ No newline at end of file + sock->SyncDisconnection(); + //net.MainLoop(); + //net.Dealloc(); + net.Join(); + CLog::Instance().Stop(); + CLog::Instance().Join(); +} \ No newline at end of file