From 002d28705e5e53f017e9ef8aceb4adbc5632b976 Mon Sep 17 00:00:00 2001 From: holmes1412 Date: Wed, 11 Aug 2021 20:43:05 +0800 Subject: [PATCH] Add ref for EndpointAddresses for release server memory in remove_server() --- src/nameservice/UpstreamPolicies.cc | 26 +++++++--- src/nameservice/WFServiceGovernance.cc | 38 ++++++++++++--- src/nameservice/WFServiceGovernance.h | 7 +-- test/upstream_unittest.cc | 67 +++++++++++++++++++++++++- 4 files changed, 120 insertions(+), 18 deletions(-) diff --git a/src/nameservice/UpstreamPolicies.cc b/src/nameservice/UpstreamPolicies.cc index b8f6be86..7dd57af7 100644 --- a/src/nameservice/UpstreamPolicies.cc +++ b/src/nameservice/UpstreamPolicies.cc @@ -188,14 +188,15 @@ bool UPSGroupPolicy::select(const ParsedURI& uri, WFNSTracing *tracing, if (!select_addr) select_addr = this->default_group->get_one_backup(tracing); - pthread_rwlock_unlock(&this->rwlock); - if (select_addr) { *addr = (EndpointAddress *)select_addr; + ++(*addr)->ref; + pthread_rwlock_unlock(&this->rwlock); return true; } + pthread_rwlock_unlock(&this->rwlock); return false; } @@ -303,7 +304,6 @@ void UPSGroupPolicy::add_server_locked(EndpointAddress *addr) rb_node *parent = NULL; EndpointGroup *group; - this->addresses.push_back(addr); this->server_map[addr->address].push_back(addr); if (params->server_type == 0) @@ -340,6 +340,7 @@ void UPSGroupPolicy::add_server_locked(EndpointAddress *addr) else group->backups.push_back(addr); pthread_mutex_unlock(&group->mutex); + this->server_list_change(addr, ADD_SERVER); return; } @@ -347,6 +348,8 @@ void UPSGroupPolicy::add_server_locked(EndpointAddress *addr) int UPSGroupPolicy::remove_server_locked(const std::string& address) { UPSAddrParams *params; + std::vector remove_list; + const auto map_it = this->server_map.find(address); if (map_it != this->server_map.cend()) @@ -364,9 +367,6 @@ int UPSGroupPolicy::remove_server_locked(const std::string& address) //std::lock_guard lock(group->mutex); pthread_mutex_lock(&group->mutex); - if (addr->fail_count < params->max_fails) - this->fuse_one_server(addr); - if (params->server_type == 0) group->weight -= params->weight; @@ -378,6 +378,14 @@ int UPSGroupPolicy::remove_server_locked(const std::string& address) break; } } + + if (addr->fail_count < params->max_fails) + this->fuse_one_server(addr); + else + this->remove_server_from_breaker(addr); + + this->server_list_change(addr, REMOVE_SERVER); + remove_list.push_back(addr); pthread_mutex_unlock(&group->mutex); } @@ -405,6 +413,12 @@ int UPSGroupPolicy::remove_server_locked(const std::string& address) ret = n - new_n; } + for (EndpointAddress *server : remove_list) + { + if (--server->ref == 0) + delete server; + } + return ret; } diff --git a/src/nameservice/WFServiceGovernance.cc b/src/nameservice/WFServiceGovernance.cc index 6ae74974..381f7f57 100644 --- a/src/nameservice/WFServiceGovernance.cc +++ b/src/nameservice/WFServiceGovernance.cc @@ -103,6 +103,7 @@ EndpointAddress::EndpointAddress(const std::string& address, this->address = address; this->fail_count = 0; + this->ref = 1; this->entry.list.next = NULL; this->entry.ptr = this; @@ -214,6 +215,14 @@ inline void WFServiceGovernance::fuse_server_to_breaker(EndpointAddress *addr) pthread_mutex_unlock(&this->breaker_lock); } +void WFServiceGovernance::remove_server_from_breaker(EndpointAddress *addr) +{ + pthread_mutex_lock(&this->breaker_lock); + if (addr->entry.list.next) + list_del(&addr->entry.list); + pthread_mutex_unlock(&this->breaker_lock); +} + void WFServiceGovernance::success(RouteManager::RouteResult *result, WFNSTracing *tracing, CommTarget *target) @@ -229,6 +238,8 @@ void WFServiceGovernance::success(RouteManager::RouteResult *result, pthread_rwlock_wrlock(&this->rwlock); this->recover_server_from_breaker(server); + if (--server->ref == 0) + delete server; pthread_rwlock_unlock(&this->rwlock); this->WFNSPolicy::success(result, tracing, target); @@ -249,7 +260,10 @@ void WFServiceGovernance::failed(RouteManager::RouteResult *result, pthread_rwlock_wrlock(&this->rwlock); size_t fail_count = ++server->fail_count; - if (fail_count == server->params->max_fails) + + if (--server->ref == 0) + delete server; + else if (fail_count == server->params->max_fails) this->fuse_server_to_breaker(server); pthread_rwlock_unlock(&this->rwlock); @@ -331,20 +345,20 @@ bool WFServiceGovernance::select(const ParsedURI& uri, WFNSTracing *tracing, select_addr = this->another_strategy(uri, tracing); } - pthread_rwlock_unlock(&this->rwlock); - if (select_addr) { *addr = (EndpointAddress *)select_addr; + ++(*addr)->ref; + pthread_rwlock_unlock(&this->rwlock); return true; } + pthread_rwlock_unlock(&this->rwlock); return false; } void WFServiceGovernance::add_server_locked(EndpointAddress *addr) { - this->addresses.push_back(addr); this->server_map[addr->address].push_back(addr); this->servers.push_back(addr); this->recover_one_server(addr); @@ -353,6 +367,8 @@ void WFServiceGovernance::add_server_locked(EndpointAddress *addr) int WFServiceGovernance::remove_server_locked(const std::string& address) { + std::vector remove_list; + const auto map_it = this->server_map.find(address); if (map_it != this->server_map.cend()) { @@ -360,10 +376,12 @@ int WFServiceGovernance::remove_server_locked(const std::string& address) { // or not: it has already been -- in nalives if (addr->fail_count < addr->params->max_fails) - { this->fuse_one_server(addr); - this->server_list_change(addr, REMOVE_SERVER); - } + else + this->remove_server_from_breaker(addr); + + this->server_list_change(addr, REMOVE_SERVER); + remove_list.push_back(addr); } this->server_map.erase(map_it); @@ -390,6 +408,12 @@ int WFServiceGovernance::remove_server_locked(const std::string& address) ret = n - new_n; } + for (EndpointAddress *server : remove_list) + { + if (--server->ref == 0) + delete server; + } + return ret; } diff --git a/src/nameservice/WFServiceGovernance.h b/src/nameservice/WFServiceGovernance.h index e6e6aa92..ae499fb0 100644 --- a/src/nameservice/WFServiceGovernance.h +++ b/src/nameservice/WFServiceGovernance.h @@ -87,7 +87,8 @@ public: std::string address; std::string host; std::string port; - std::atomic fail_count; + unsigned int fail_count; + unsigned int ref; long long broken_timeout; PolicyAddrParams *params; @@ -141,7 +142,7 @@ public: virtual ~WFServiceGovernance() { - for (EndpointAddress *addr : this->addresses) + for (EndpointAddress *addr : this->servers) delete addr; } @@ -175,10 +176,10 @@ protected: virtual const EndpointAddress *another_strategy(const ParsedURI& uri, WFNSTracing *tracing); void check_breaker(); + void remove_server_from_breaker(EndpointAddress *addr); static void tracing_deleter(void *data); std::vector servers; // current servers - std::vector addresses; // memory management std::unordered_map> server_map; pthread_rwlock_t rwlock; diff --git a/test/upstream_unittest.cc b/test/upstream_unittest.cc index 951b0182..05835159 100644 --- a/test/upstream_unittest.cc +++ b/test/upstream_unittest.cc @@ -152,8 +152,8 @@ TEST(upstream_unittest, EnableAndDisable) UpstreamManager::upstream_enable_server("weighted.random", "127.0.0.1:8001"); auto *task2 = WFTaskFactory::create_http_task(url, REDIRECT_MAX, RETRY_MAX, std::bind(basic_callback, - std::placeholders::_1, - std::string("server1"))); + std::placeholders::_1, + std::string("server1"))); task2->user_data = &wait_group; series_of(task)->push_back(task2); }); @@ -163,6 +163,69 @@ TEST(upstream_unittest, EnableAndDisable) wait_group.wait(); } +TEST(upstream_unittest, AddAndRemove) +{ + WFFacilities::WaitGroup wait_group(2); + WFHttpTask *task; + SeriesWork *series; + protocol::HttpRequest *req; + int batch = MAX_FAILS + 50; + std::string url = "http://add_and_remove"; + std::string name = "add_and_remove"; + UPSWeightedRandomPolicy test_policy(false); + + AddressParams address_params = ADDRESS_PARAMS_DEFAULT; + + address_params.weight = 1000; + test_policy.add_server("127.0.0.1:8001", &address_params); + + address_params.weight = 1; + test_policy.add_server("127.0.0.1:8002", &address_params); + + auto *ns = WFGlobal::get_name_service(); + EXPECT_EQ(ns->add_policy(name.c_str(), &test_policy), 0); + + UpstreamManager::upstream_remove_server(name, "127.0.0.1:8001"); + task = WFTaskFactory::create_http_task(url, REDIRECT_MAX, RETRY_MAX, + std::bind(basic_callback, + std::placeholders::_1, + std::string("server2"))); + task->user_data = &wait_group; + task->start(); + + //test remove fused server + address_params.weight = 1000; + test_policy.add_server("127.0.0.1:8001", &address_params); + http_server1.stop(); + + fprintf(stderr, "server 1 stopped start %d tasks to fuse it\n", batch); + ParallelWork *pwork = Workflow::create_parallel_work( + [&wait_group, &name, &url](const ParallelWork *pwork) { + fprintf(stderr, "parallel finished and remove server1\n"); + UpstreamManager::upstream_remove_server(name, "127.0.0.1:8001"); + auto *task = WFTaskFactory::create_http_task(url, REDIRECT_MAX, RETRY_MAX, + std::bind(basic_callback, + std::placeholders::_1, + std::string("server2"))); + task->user_data = &wait_group; + series_of(pwork)->push_back(task); + }); + + for (int i = 0; i < batch; i++) + { + task = WFTaskFactory::create_http_task(url, 0, 0, nullptr); + req = task->get_req(); + req->add_header_pair("Connection", "keep-alive"); + series = Workflow::create_series_work(task, nullptr); + pwork->add_series(series); + } + + pwork->start(); + wait_group.wait(); + EXPECT_TRUE(http_server1.start("127.0.0.1", 8001) == 0) + << "http server start failed"; +} + TEST(upstream_unittest, FuseAndRecover) { WFFacilities::WaitGroup wait_group(1);