diff --git a/common/thread/thread.h b/common/thread/thread.h index 7fbed4e..e347d71 100644 --- a/common/thread/thread.h +++ b/common/thread/thread.h @@ -26,7 +26,7 @@ public: } virtual void Join() { - if (_thread) { + if (_thread && _thread->joinable()) { _thread->join(); } } diff --git a/cppnet/cppnet.cpp b/cppnet/cppnet.cpp index d95abfc..e74fd90 100644 --- a/cppnet/cppnet.cpp +++ b/cppnet/cppnet.cpp @@ -30,6 +30,10 @@ void CppNet::Init(int32_t thread_num) { } } +void CppNet::Destory() { + _cppnet_base->Dealloc(); +} + void CppNet::Join() { _cppnet_base->Join(); } diff --git a/cppnet/cppnet_config.h b/cppnet/cppnet_config.h index 64b9000..274c366 100644 --- a/cppnet/cppnet_config.h +++ b/cppnet/cppnet_config.h @@ -10,18 +10,20 @@ static const uint16_t __mem_block_size = 1024; // how many block memory will be add to block memory pool. static const uint16_t __mem_block_add_step = 5; // max number of blocks in memory pool. If block memory more than this number, will reduce to half. -static const uint16_t __max_block_num = 10; +static const uint16_t __max_block_num = 10; +// open socket reuse flag +static const bool __reuse_port = true; +// max data to write when net is busy. +static const uint32_t __max_write_cache = 1024 * 1024 * 4; // address buffer length in socket. static const uint16_t __addr_str_len = 16; // log level. static const uint16_t __log_level = 0; // not print -// log file name . +// log file name. static const char* __log_file_name = "CppNetLog"; // open log print. -static const bool __open_log = true; - -static const bool __reuse_port = true; +static const bool __open_log = false; #if ((defined __linux__) || (defined __APPLE__)) diff --git a/cppnet/dispatcher.cpp b/cppnet/dispatcher.cpp index 4bdceda..c369340 100644 --- a/cppnet/dispatcher.cpp +++ b/cppnet/dispatcher.cpp @@ -37,12 +37,6 @@ Dispatcher::~Dispatcher() { if (std::this_thread::get_id() != _local_thread_id) { Stop(); Join(); - - } else { - Stop(); - - std::unique_lock lock(_wait_destroy_map_mutex); - _wait_destroy_thread_map[std::this_thread::get_id()] = _thread; } } @@ -57,6 +51,10 @@ void Dispatcher::Run() { _timer->TimerRun(cur_time - _cur_utc_time); _cur_utc_time = cur_time; + if (_stop) { + break; + } + wait_time = _timer->MinTime(); _event_actions->ProcessEvent(wait_time); @@ -76,6 +74,8 @@ void Dispatcher::Listen(uint64_t sock, const std::string& ip, uint16_t port) { connect_sock->SetEventActions(_event_actions); connect_sock->SetCppNetBase(_cppnet_base.lock()); connect_sock->SetSocket(sock); + connect_sock->SetDispatcher(shared_from_this()); + connect_sock->Bind(ip, port); connect_sock->Listen(); @@ -85,6 +85,8 @@ void Dispatcher::Listen(uint64_t sock, const std::string& ip, uint16_t port) { connect_sock->SetEventActions(_event_actions); connect_sock->SetCppNetBase(_cppnet_base.lock()); connect_sock->SetSocket(sock); + connect_sock->SetDispatcher(shared_from_this()); + connect_sock->Bind(ip, port); connect_sock->Listen(); }; diff --git a/cppnet/dispatcher.h b/cppnet/dispatcher.h index a70e3a7..5786d94 100644 --- a/cppnet/dispatcher.h +++ b/cppnet/dispatcher.h @@ -20,7 +20,8 @@ class CppNetBase; class EventActions; class Dispatcher: - public Thread { + public Thread, + public std::enable_shared_from_this { public: Dispatcher(std::shared_ptr base); ~Dispatcher(); @@ -39,6 +40,8 @@ public: uint32_t AddTimer(std::shared_ptr sock, uint32_t interval, bool always = false); void StopTimer(uint64_t timer_id); + std::thread::id GetThreadID() { return _local_thread_id; } + private: void DoTask(); uint64_t MakeTimerID(); diff --git a/cppnet/event/mac/kqueue_action.cpp b/cppnet/event/mac/kqueue_action.cpp index e8fd239..1d4366b 100644 --- a/cppnet/event/mac/kqueue_action.cpp +++ b/cppnet/event/mac/kqueue_action.cpp @@ -5,6 +5,7 @@ #include "common/log/log.h" #include "common/util/time.h" +#include "common/os/convert.h" #include "common/util/os_return.h" #include "common/network/socket.h" #include "common/network/address.h" @@ -23,6 +24,8 @@ namespace cppnet { KqueueEventActions::KqueueEventActions(): _kqueue_handler(-1) { _active_list.resize(1024); + _kqueue_timeout.tv_nsec = 0; + _kqueue_timeout.tv_sec = 0; } KqueueEventActions::~KqueueEventActions() { @@ -133,8 +136,6 @@ bool KqueueEventActions::AddConnection(std::shared_ptr& event, Address& a auto ret = OsHandle::Connect(sock->GetSocket(), address); - SocketNoblocking(sock->GetSocket()); - auto rw_sock = std::dynamic_pointer_cast(sock); if (ret._return_value == 0) { rw_sock->OnConnect(CEC_SUCCESS); @@ -147,6 +148,7 @@ bool KqueueEventActions::AddConnection(std::shared_ptr& event, Address& a } } rw_sock->OnConnect(CEC_CONNECT_REFUSE); + LOG_ERROR("connect to peer failed! errno:%d, info:%s", ret._errno, ErrnoInfo(ret._errno)); return false; } @@ -191,23 +193,22 @@ bool KqueueEventActions::DelEvent(std::shared_ptr& event) { } void KqueueEventActions::ProcessEvent(int32_t wait_ms) { - struct timespec timeout; - int16_t ret = 0; if (wait_ms > 0) { - timeout.tv_nsec = wait_ms * 1000000; + _kqueue_timeout.tv_sec = (uint64_t)wait_ms / 1000; + _kqueue_timeout.tv_nsec = ((uint64_t)wait_ms - (_kqueue_timeout.tv_sec * 1000)) * 1000000; - ret = kevent(_kqueue_handler, &*_change_list.begin(), (int)_change_list.size(), &*_active_list.begin(), (int)_active_list.size(), &timeout); + ret = kevent(_kqueue_handler, &*_change_list.begin(), (int)_change_list.size(), &*_active_list.begin(), (int)_active_list.size(), &_kqueue_timeout); } else { ret = kevent(_kqueue_handler, &*_change_list.begin(), (int)_change_list.size(), &*_active_list.begin(), (int)_active_list.size(), nullptr); } _change_list.clear(); if (ret < 0) { - LOG_ERROR("kevent faild! error :%d", errno); + LOG_ERROR("kevent faild! error:%d, info:%s", errno, ErrnoInfo(errno)); } else { - LOG_DEBUG("kevent get events! num :%d, TheadId : %lld", ret, std::this_thread::get_id()); + LOG_DEBUG("kevent get events! num:%d, TheadId:%lld", ret, std::this_thread::get_id()); OnEvent(_active_list, ret); } diff --git a/cppnet/event/mac/kqueue_action.h b/cppnet/event/mac/kqueue_action.h index 48ad7fa..ee6a959 100644 --- a/cppnet/event/mac/kqueue_action.h +++ b/cppnet/event/mac/kqueue_action.h @@ -39,7 +39,7 @@ protected: std::mutex _mutex; int32_t _kqueue_handler; uint32_t _pipe[2]; - + timespec _kqueue_timeout; std::vector _change_list; std::vector _active_list; }; diff --git a/cppnet/socket/connect_socket.cpp b/cppnet/socket/connect_socket.cpp index 83326c3..b46ea65 100644 --- a/cppnet/socket/connect_socket.cpp +++ b/cppnet/socket/connect_socket.cpp @@ -4,6 +4,7 @@ #include "rw_socket.h" #include "connect_socket.h" #include "common/log/log.h" +#include "common/os/convert.h" #include "common/network/socket.h" #include "common/network/address.h" #include "common/network/io_handle.h" @@ -85,7 +86,7 @@ void ConnectSocket::OnAccept() { if (errno == EAGAIN) { break; } - LOG_FATAL("accept socket filed! error code:%d", ret._errno); + LOG_ERROR("accept socket filed! errno:%d, info:%s", ret._errno, ErrnoInfo(ret._errno)); break; } @@ -103,6 +104,7 @@ void ConnectSocket::OnAccept() { sock->SetCppNetBase(cppnet_base); sock->SetEventActions(_event_actions); sock->SetAddress(address); + sock->SetDispatcher(GetDispatcher()); __all_socket_map[ret._return_value] = sock; diff --git a/cppnet/socket/rw_socket.cpp b/cppnet/socket/rw_socket.cpp index 587a069..f377710 100644 --- a/cppnet/socket/rw_socket.cpp +++ b/cppnet/socket/rw_socket.cpp @@ -1,17 +1,19 @@ #include #include "rw_socket.h" -#include "common/log/log.h" +#include "include/cppnet_type.h" + #include "cppnet/dispatcher.h" #include "cppnet/cppnet_base.h" -#include "include/cppnet_type.h" #include "cppnet/cppnet_config.h" +#include "cppnet/event/event_interface.h" +#include "cppnet/event/action_interface.h" + +#include "common/log/log.h" #include "common/network/address.h" #include "common/network/io_handle.h" #include "common/alloter/pool_block.h" #include "common/buffer/buffer_queue.h" -#include "cppnet/event/event_interface.h" -#include "cppnet/event/action_interface.h" #include "common/alloter/alloter_interface.h" namespace cppnet { @@ -71,6 +73,10 @@ bool RWSocket::Write(const char* src, uint32_t len) { //can't send now if (_write_buffer->GetCanReadLength() > 0) { + if (_write_buffer->GetCanReadLength() > __max_write_cache) { + return false; + } + _write_buffer->Write(src, len); auto actions = GetEventActions(); if (actions) { diff --git a/cppnet/socket/socket_interface.h b/cppnet/socket/socket_interface.h index d4dd137..7e7a7dd 100644 --- a/cppnet/socket/socket_interface.h +++ b/cppnet/socket/socket_interface.h @@ -16,7 +16,8 @@ class EventActions; class Socket { public: Socket(): _sock(0) {} - Socket(std::shared_ptr alloter): _alloter(alloter) {} + Socket(std::shared_ptr alloter): + _sock(0), _alloter(alloter) {} Socket(uint64_t sock, std::shared_ptr alloter): _sock(sock), _alloter(alloter) {} virtual ~Socket() {} diff --git a/include/cppnet.h b/include/cppnet.h index 3347299..c893f72 100644 --- a/include/cppnet.h +++ b/include/cppnet.h @@ -16,6 +16,7 @@ public: // init cppnet library. // thread_num : the number of running threads. void Init(int32_t thread_num); + void Destory(); // thread join void Join(); diff --git a/makefile b/makefile index 2f61e7e..07b5ea5 100644 --- a/makefile +++ b/makefile @@ -25,7 +25,7 @@ CC = g++ INCLUDES = -I. #debug -CCFLAGS = -lpthread -fPIC -m64 -g -pg -std=c++11 -lstdc++ -pipe +CCFLAGS = -lpthread -fPIC -m64 -g -std=c++11 -lstdc++ -pipe #CCFLAGS = -lpthread -fPIC -m64 -O2 -std=c++11 -lstdc++ -pipe diff --git a/test/echo/EchoClient.cpp b/test/echo/EchoClient.cpp index 2cbb3d2..46fee78 100644 --- a/test/echo/EchoClient.cpp +++ b/test/echo/EchoClient.cpp @@ -54,7 +54,7 @@ int main() { net.SetWriteCallback(WriteFunc); net.SetReadCallback(ReadFunc); net.SetDisconnectionCallback(DisConnectionFunc); - for (size_t i = 0; i < 1000; i++) { + for (size_t i = 0; i < 200; i++) { net.Connection("127.0.0.1", 8921); } diff --git a/test/http/HttpServerTest.cpp b/test/http/HttpServerTest.cpp index 7309b22..c4e03b5 100644 --- a/test/http/HttpServerTest.cpp +++ b/test/http/HttpServerTest.cpp @@ -68,7 +68,7 @@ void DisConnectionFunc(const cppnet::Handle& , uint32_t ) { int main() { cppnet::CppNet net; - net.Init(2); + net.Init(4); CHttpServer server; server.SetHttpCallback(OnRequest); diff --git a/test/pingpong/Client.cpp b/test/pingpong/Client.cpp index a057fe8..c984d1b 100644 --- a/test/pingpong/Client.cpp +++ b/test/pingpong/Client.cpp @@ -129,7 +129,7 @@ public: std::cout << static_cast(totalBytesRead) / ((_timeout / 1000) * 1024 * 1024) << " MiB/s throughput" << std::endl; - delete _net; + _net->Destory(); } } @@ -187,6 +187,8 @@ int main(int argc, char* argv[]) { net->Join(); + delete net; + return 0; } diff --git a/test/sendfile/SendFileClient.cpp b/test/sendfile/SendFileClient.cpp index 33a27b6..a8091c6 100644 --- a/test/sendfile/SendFileClient.cpp +++ b/test/sendfile/SendFileClient.cpp @@ -12,14 +12,24 @@ using namespace cppnet; -class CSendFile { +class SendFile { public: - CSendFile(const std::string& file, cppnet::CppNet* net) : _status(hello), - _file_name(file), _net(net) { + SendFile(const std::string& file, cppnet::CppNet* net): + _block(false), + _status(hello), + _file_name(file), + _net(net) { } - ~CSendFile() { + ~SendFile() { + _file.close(); + } + void OnWrite(Handle handle, uint32_t len) { + if (_block) { + _block = false; + Send(handle); + } } void OnRecv(Handle handle, std::shared_ptr data, uint32_t len) { @@ -28,8 +38,10 @@ public: data->Read(ret_char, 4); } else { + std::cout << "3" << std::endl; return; } + std::cout << "4" << std::endl; std::string ret(ret_char); std::cout << "recv from server : " << ret << std::endl; @@ -51,8 +63,7 @@ public: std::cout << "something error while sending!" << std::endl; } - handle->Close(); - delete _net; + _net->Destory(); } } @@ -76,7 +87,7 @@ private: if (!_file.good()) { return false; } - + sprintf(_header._name, "%s", _file_name.c_str()); _file.seekg(0, _file.end); _header._length = _file.tellg(); @@ -86,14 +97,16 @@ private: return true; } - void Send(Handle handle) { + void Send(const Handle& handle) { char buf[__read_len]; while (!_file.eof()) { _file.read(buf, __read_len); int len = _file.gcount(); - handle->Write(buf, len); + if (!handle->Write(buf, len)) { + _block = true; + return; + } } - _file.close(); } private: @@ -101,6 +114,7 @@ private: FileHeader _header; STATUS _status; std::string _file_name; + bool _block; cppnet::CppNet* _net; }; @@ -115,15 +129,17 @@ int main(int argc, char *argv[]) { cppnet::CppNet* net(new cppnet::CppNet()); - CSendFile file(file_name, net); + SendFile file(file_name, net); net->Init(1); - net->SetConnectionCallback(std::bind(&CSendFile::OnConnect, &file, std::placeholders::_1, std::placeholders::_2)); - net->SetReadCallback(std::bind(&CSendFile::OnRecv, &file, std::placeholders::_1, std::placeholders::_2, + net->SetConnectionCallback(std::bind(&SendFile::OnConnect, &file, std::placeholders::_1, std::placeholders::_2)); + net->SetReadCallback(std::bind(&SendFile::OnRecv, &file, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); + net->SetWriteCallback(std::bind(&SendFile::OnWrite, &file, std::placeholders::_1, std::placeholders::_2)); net->Connection("127.0.0.1", 8921); net->Join(); + return 0; } \ No newline at end of file diff --git a/test/sendfile/common.h b/test/sendfile/common.h index a5aa27d..b373ea3 100644 --- a/test/sendfile/common.h +++ b/test/sendfile/common.h @@ -16,5 +16,5 @@ struct FileHeader { } }; -const int __header_len = sizeof(FileHeader); -const int __read_len = 1024*1024; \ No newline at end of file +static const int __header_len = sizeof(FileHeader); +static const int __read_len = 4096; \ No newline at end of file diff --git a/test/sendfile/makefile b/test/sendfile/makefile index 5f0f394..fd19e62 100644 --- a/test/sendfile/makefile +++ b/test/sendfile/makefile @@ -6,9 +6,9 @@ CC = g++ INCLUDES = -I../../ #debug -pg -#CCFLAGS = -lpthread -fPIC -m64 -g -std=c++11 -lstdc++ -pipe +CCFLAGS = -lpthread -fPIC -m64 -g -std=c++11 -lstdc++ -pipe -CCFLAGS = -lpthread -fPIC -m64 -O2 -std=c++11 -lstdc++ -pipe -march=corei7 +#CCFLAGS = -lpthread -fPIC -m64 -O2 -std=c++11 -lstdc++ -pipe -march=corei7 TARGET = ../../libcppnet.a SERBIN = sendfileserver