#include #include #include #include #include #include #include #include "CppNet.h" #include "Runnable.h" #ifndef __linux__ #include void SetNoDelay(const uint64_t& sock) { int opt = 1; int ret = setsockopt(sock, SOL_SOCKET, TCP_NODELAY, (const char*)&opt, sizeof(opt)); } #else #include #include void SetNoDelay(const uint64_t& sock) { int optval = 1; setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &optval, static_cast(sizeof(optval))); } #endif using namespace cppnet; class Client; class Session { public: Session(Client* owner) : _owner(owner), _bytes_read(0), _messages_read(0) { } int64_t BytesRead() const { return _bytes_read; } int64_t MessagesRead() const { return _messages_read; } void OnConnection(const Handle& handle); void OnMessage(const Handle& handle, base::CBuffer* data, uint32_t) { char buff[65535]; ++_messages_read; int len_get = data->GetCanReadLength(); _bytes_read += len_get; while (data->GetCanReadLength()) { int ret = data->Read(buff, 65535); Write(handle, buff, ret); } } Client* _owner; int64_t _bytes_read; int64_t _messages_read; }; class Client { public: Client(int block_size, int session_count, int timeout, const std::string& ip, int port) : _ip(ip), _port(port), _session_count(session_count), _timeout(timeout), _block_size(block_size) { _num_connected = 0; for (int i = 0; i < _block_size; ++i) { _message.push_back(static_cast(i % 128)); } } void Start() { SetTimer(_timeout, std::bind(&Client::HandleTimeout, this, std::placeholders::_1), nullptr); for (int i = 0; i < _session_count; ++i) { Connection(_ip, _port); } } const std::string& Message() const { return _message; } void OnMessage(const Handle& handle, base::CBuffer* data, uint32_t len, uint32_t error) { if (error == CEC_SUCCESS) { auto iter = _sessions.find(handle); if (iter != _sessions.end()) { iter->second->OnMessage(handle, data, len); } } else { std::cout << " something error while reading. " << std::endl; } } void OnConnect(const Handle& handle, uint32_t error) { std::cout << "OnConnect :" << _num_connected.load() << std::endl; if (error == CEC_SUCCESS) { _num_connected++; if (_num_connected.load() == _session_count) { std::cout << "all connected" << std::endl; } auto session = std::unique_ptr(new Session(this)); session->OnConnection(handle); _sessions[handle] = std::move(session); } else { std::cout << " something error while connect. error : " << error << std::endl; } } void OnDisconnect(const Handle&, uint32_t) { std::cout << "disconnected :" << _num_connected.load() << std::endl; _num_connected--; if (_num_connected== 0) { std::cout << "all disconnected" << std::endl; int64_t totalBytesRead = 0; int64_t totalMessagesRead = 0; for (const auto& session : _sessions) { totalBytesRead += session.second->BytesRead(); totalMessagesRead += session.second->MessagesRead(); } std::cout << totalBytesRead << " total bytes read" << std::endl; std::cout << totalMessagesRead << " total messages read" << std::endl; std::cout << static_cast(totalBytesRead) / static_cast(totalMessagesRead) << " average message size" << std::endl; std::cout << static_cast(totalBytesRead) / ((_timeout / 1000) * 1024 * 1024) << " MiB/s throughput" << std::endl; Dealloc(); } } private: void HandleTimeout(void*) { std::cout << "stop" << std::endl;; for (auto& session : _sessions) { Close(session.first); } } std::string _ip; int _port; int _session_count; int _timeout; int _block_size; std::string _message; std::atomic_uint _num_connected; std::unordered_map> _sessions; }; void Session::OnConnection(const Handle& handle) { SetNoDelay(handle); auto msg = _owner->Message(); Write(handle, msg.c_str(), msg.length()); } int main(int argc, char* argv[]) { if (argc < 7) { std::cout << "please input [ip] [port] [thread count] [block size] [session count] [time out]" << std::endl; return -1; } std::string ip = std::string(argv[1]); int port = atoi(argv[2]); int threadCount = atoi(argv[3]); int block_size = atoi(argv[4]); int session_count = atoi(argv[5]); int timeout = atoi(argv[6]); cppnet::Init(threadCount); Client client(block_size, session_count, timeout, ip, port); cppnet::SetConnectionCallback(std::bind(&Client::OnConnect, &client, std::placeholders::_1, std::placeholders::_2)); cppnet::SetReadCallback(std::bind(&Client::OnMessage, &client, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4)); cppnet::SetDisconnectionCallback(std::bind(&Client::OnDisconnect, &client, std::placeholders::_1, std::placeholders::_2)); client.Start(); cppnet::Join(); return 0; }