#include "Log.h" #include "RPCServer.h" #include "InfoRouter.h" #include "FuncThread.h" #include "CommonStruct.h" #include "ParsePackage.h" CRPCServer::CRPCServer() : _info_router(new CInfoRouter), _parse_package(new CParsePackage), _pool(1024, 5), _need_mutex(false){ } CRPCServer::~CRPCServer() { } void CRPCServer::Init(int thread) { for (int i = 0; i < thread; i++) { auto thread = std::shared_ptr(new CFuncThread(_info_router)); _info_router->AddThread(thread); } } void CRPCServer::Destroy() { _info_router->StopAllThread(); } void CRPCServer::Start(short port, std::string ip) { _net.Init(2); _net.SetAcceptCallback(std::bind(&CRPCServer::_DoAccept, this, std::placeholders::_1, std::placeholders::_2)); _net.SetWriteCallback(std::bind(&CRPCServer::_DoWrite, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); _net.SetReadCallback(std::bind(&CRPCServer::_DoRead, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3, std::placeholders::_4)); _net.ListenAndAccept(ip, port); for (;;) { auto info = _info_router->GetRet(); if (info) { _PackageAndSend(info->_socket, info, NO_ERROR); } } _net.Join(); } bool CRPCServer::RegisterFunc(std::string name, std::string func_str, const CommonFunc& func) { std::unique_lock lock(_mutex); if (_func_map.count(name)) { return false; } _func_map[name] = func_str; _info_router->RegisterFunc(name, func); return true; } bool CRPCServer::RemoveFunc(std::string name) { std::unique_lock lock(_mutex); if (_func_map.count(name)) { _func_map.erase(name); _info_router->RemoveFunc(name); return true; } return false; } void CRPCServer::_DoRead(const cppnet::Handle& handle, base::CBuffer* data, uint32_t len, uint32_t err) { if (err != cppnet::CEC_SUCCESS) { base::LOG_ERROR("read data failed! err : %d", err); return; } int need_len = 0; for (;;) { char recv_buf[4096] = { 0 }; int read_len = data->ReadUntil(recv_buf, 4096, "\r\n\r\n", strlen("\r\n\r\n"), need_len); //get a comlete message if (read_len > 0) { FuncCallInfo* info = _pool.PoolNew(); if (_need_mutex) { std::unique_lock lock(_mutex); if (_parse_package->ParseFuncCall(recv_buf + 2, read_len - 2, info->_func_name, _func_map, info->_func_param_ret)) { info->_socket = handle; _info_router->PushTask(info); } else { base::LOG_ERROR("parse function call request failed!"); } } else { if (_parse_package->ParseFuncCall(recv_buf + 2, read_len - 2, info->_func_name, _func_map, info->_func_param_ret)) { info->_socket = handle; _info_router->PushTask(info); } else { base::LOG_ERROR("parse function call request failed!"); } } } else { break; } } } void CRPCServer::_DoWrite(const cppnet::Handle&, uint32_t, uint32_t err) { if (err != cppnet::CEC_SUCCESS) { base::LOG_ERROR("send response to client failed!"); } } void CRPCServer::_DoAccept(const cppnet::Handle& handle, uint32_t) { char buf[8192] = { 0 }; int len = 8192; std::unique_lock lock(_mutex); if (!_parse_package->PackageFuncList(buf, len, _func_map)) { base::LOG_ERROR("package functnion info failed!"); abort(); } base::LOG_DEBUG("send to %d, buf : %s", handle, buf); handle->Write(buf, len); } void CRPCServer::_PackageAndSend(const cppnet::Handle& handle, FuncCallInfo* info, int code) { if (!info) { base::LOG_ERROR("function info is null!"); return; } bool send = true; int get_len = 65535; int need_len = 0; char send_buf[65535] = { 0 }; need_len = get_len; if (!_parse_package->PackageFuncRet(send_buf, need_len, code, info->_func_name, _func_map, info->_func_param_ret)) { base::LOG_ERROR("package function response failed!"); send = false; } if (send) { handle->Write(send_buf, need_len); } }