Add ref for EndpointAddresses for release server memory in remove_server()

This commit is contained in:
holmes1412
2021-08-11 20:43:05 +08:00
parent 1f19783624
commit 002d28705e
4 changed files with 120 additions and 18 deletions

View File

@@ -188,14 +188,15 @@ bool UPSGroupPolicy::select(const ParsedURI& uri, WFNSTracing *tracing,
if (!select_addr)
select_addr = this->default_group->get_one_backup(tracing);
pthread_rwlock_unlock(&this->rwlock);
if (select_addr)
{
*addr = (EndpointAddress *)select_addr;
++(*addr)->ref;
pthread_rwlock_unlock(&this->rwlock);
return true;
}
pthread_rwlock_unlock(&this->rwlock);
return false;
}
@@ -303,7 +304,6 @@ void UPSGroupPolicy::add_server_locked(EndpointAddress *addr)
rb_node *parent = NULL;
EndpointGroup *group;
this->addresses.push_back(addr);
this->server_map[addr->address].push_back(addr);
if (params->server_type == 0)
@@ -340,6 +340,7 @@ void UPSGroupPolicy::add_server_locked(EndpointAddress *addr)
else
group->backups.push_back(addr);
pthread_mutex_unlock(&group->mutex);
this->server_list_change(addr, ADD_SERVER);
return;
}
@@ -347,6 +348,8 @@ void UPSGroupPolicy::add_server_locked(EndpointAddress *addr)
int UPSGroupPolicy::remove_server_locked(const std::string& address)
{
UPSAddrParams *params;
std::vector<EndpointAddress *> remove_list;
const auto map_it = this->server_map.find(address);
if (map_it != this->server_map.cend())
@@ -364,9 +367,6 @@ int UPSGroupPolicy::remove_server_locked(const std::string& address)
//std::lock_guard<std::mutex> lock(group->mutex);
pthread_mutex_lock(&group->mutex);
if (addr->fail_count < params->max_fails)
this->fuse_one_server(addr);
if (params->server_type == 0)
group->weight -= params->weight;
@@ -378,6 +378,14 @@ int UPSGroupPolicy::remove_server_locked(const std::string& address)
break;
}
}
if (addr->fail_count < params->max_fails)
this->fuse_one_server(addr);
else
this->remove_server_from_breaker(addr);
this->server_list_change(addr, REMOVE_SERVER);
remove_list.push_back(addr);
pthread_mutex_unlock(&group->mutex);
}
@@ -405,6 +413,12 @@ int UPSGroupPolicy::remove_server_locked(const std::string& address)
ret = n - new_n;
}
for (EndpointAddress *server : remove_list)
{
if (--server->ref == 0)
delete server;
}
return ret;
}

View File

@@ -103,6 +103,7 @@ EndpointAddress::EndpointAddress(const std::string& address,
this->address = address;
this->fail_count = 0;
this->ref = 1;
this->entry.list.next = NULL;
this->entry.ptr = this;
@@ -214,6 +215,14 @@ inline void WFServiceGovernance::fuse_server_to_breaker(EndpointAddress *addr)
pthread_mutex_unlock(&this->breaker_lock);
}
void WFServiceGovernance::remove_server_from_breaker(EndpointAddress *addr)
{
pthread_mutex_lock(&this->breaker_lock);
if (addr->entry.list.next)
list_del(&addr->entry.list);
pthread_mutex_unlock(&this->breaker_lock);
}
void WFServiceGovernance::success(RouteManager::RouteResult *result,
WFNSTracing *tracing,
CommTarget *target)
@@ -229,6 +238,8 @@ void WFServiceGovernance::success(RouteManager::RouteResult *result,
pthread_rwlock_wrlock(&this->rwlock);
this->recover_server_from_breaker(server);
if (--server->ref == 0)
delete server;
pthread_rwlock_unlock(&this->rwlock);
this->WFNSPolicy::success(result, tracing, target);
@@ -249,7 +260,10 @@ void WFServiceGovernance::failed(RouteManager::RouteResult *result,
pthread_rwlock_wrlock(&this->rwlock);
size_t fail_count = ++server->fail_count;
if (fail_count == server->params->max_fails)
if (--server->ref == 0)
delete server;
else if (fail_count == server->params->max_fails)
this->fuse_server_to_breaker(server);
pthread_rwlock_unlock(&this->rwlock);
@@ -331,20 +345,20 @@ bool WFServiceGovernance::select(const ParsedURI& uri, WFNSTracing *tracing,
select_addr = this->another_strategy(uri, tracing);
}
pthread_rwlock_unlock(&this->rwlock);
if (select_addr)
{
*addr = (EndpointAddress *)select_addr;
++(*addr)->ref;
pthread_rwlock_unlock(&this->rwlock);
return true;
}
pthread_rwlock_unlock(&this->rwlock);
return false;
}
void WFServiceGovernance::add_server_locked(EndpointAddress *addr)
{
this->addresses.push_back(addr);
this->server_map[addr->address].push_back(addr);
this->servers.push_back(addr);
this->recover_one_server(addr);
@@ -353,6 +367,8 @@ void WFServiceGovernance::add_server_locked(EndpointAddress *addr)
int WFServiceGovernance::remove_server_locked(const std::string& address)
{
std::vector<EndpointAddress *> remove_list;
const auto map_it = this->server_map.find(address);
if (map_it != this->server_map.cend())
{
@@ -360,10 +376,12 @@ int WFServiceGovernance::remove_server_locked(const std::string& address)
{
// or not: it has already been -- in nalives
if (addr->fail_count < addr->params->max_fails)
{
this->fuse_one_server(addr);
this->server_list_change(addr, REMOVE_SERVER);
}
else
this->remove_server_from_breaker(addr);
this->server_list_change(addr, REMOVE_SERVER);
remove_list.push_back(addr);
}
this->server_map.erase(map_it);
@@ -390,6 +408,12 @@ int WFServiceGovernance::remove_server_locked(const std::string& address)
ret = n - new_n;
}
for (EndpointAddress *server : remove_list)
{
if (--server->ref == 0)
delete server;
}
return ret;
}

View File

@@ -87,7 +87,8 @@ public:
std::string address;
std::string host;
std::string port;
std::atomic<unsigned int> fail_count;
unsigned int fail_count;
unsigned int ref;
long long broken_timeout;
PolicyAddrParams *params;
@@ -141,7 +142,7 @@ public:
virtual ~WFServiceGovernance()
{
for (EndpointAddress *addr : this->addresses)
for (EndpointAddress *addr : this->servers)
delete addr;
}
@@ -175,10 +176,10 @@ protected:
virtual const EndpointAddress *another_strategy(const ParsedURI& uri,
WFNSTracing *tracing);
void check_breaker();
void remove_server_from_breaker(EndpointAddress *addr);
static void tracing_deleter(void *data);
std::vector<EndpointAddress *> servers; // current servers
std::vector<EndpointAddress *> addresses; // memory management
std::unordered_map<std::string,
std::vector<EndpointAddress *>> server_map;
pthread_rwlock_t rwlock;

View File

@@ -152,8 +152,8 @@ TEST(upstream_unittest, EnableAndDisable)
UpstreamManager::upstream_enable_server("weighted.random", "127.0.0.1:8001");
auto *task2 = WFTaskFactory::create_http_task(url, REDIRECT_MAX, RETRY_MAX,
std::bind(basic_callback,
std::placeholders::_1,
std::string("server1")));
std::placeholders::_1,
std::string("server1")));
task2->user_data = &wait_group;
series_of(task)->push_back(task2);
});
@@ -163,6 +163,69 @@ TEST(upstream_unittest, EnableAndDisable)
wait_group.wait();
}
TEST(upstream_unittest, AddAndRemove)
{
WFFacilities::WaitGroup wait_group(2);
WFHttpTask *task;
SeriesWork *series;
protocol::HttpRequest *req;
int batch = MAX_FAILS + 50;
std::string url = "http://add_and_remove";
std::string name = "add_and_remove";
UPSWeightedRandomPolicy test_policy(false);
AddressParams address_params = ADDRESS_PARAMS_DEFAULT;
address_params.weight = 1000;
test_policy.add_server("127.0.0.1:8001", &address_params);
address_params.weight = 1;
test_policy.add_server("127.0.0.1:8002", &address_params);
auto *ns = WFGlobal::get_name_service();
EXPECT_EQ(ns->add_policy(name.c_str(), &test_policy), 0);
UpstreamManager::upstream_remove_server(name, "127.0.0.1:8001");
task = WFTaskFactory::create_http_task(url, REDIRECT_MAX, RETRY_MAX,
std::bind(basic_callback,
std::placeholders::_1,
std::string("server2")));
task->user_data = &wait_group;
task->start();
//test remove fused server
address_params.weight = 1000;
test_policy.add_server("127.0.0.1:8001", &address_params);
http_server1.stop();
fprintf(stderr, "server 1 stopped start %d tasks to fuse it\n", batch);
ParallelWork *pwork = Workflow::create_parallel_work(
[&wait_group, &name, &url](const ParallelWork *pwork) {
fprintf(stderr, "parallel finished and remove server1\n");
UpstreamManager::upstream_remove_server(name, "127.0.0.1:8001");
auto *task = WFTaskFactory::create_http_task(url, REDIRECT_MAX, RETRY_MAX,
std::bind(basic_callback,
std::placeholders::_1,
std::string("server2")));
task->user_data = &wait_group;
series_of(pwork)->push_back(task);
});
for (int i = 0; i < batch; i++)
{
task = WFTaskFactory::create_http_task(url, 0, 0, nullptr);
req = task->get_req();
req->add_header_pair("Connection", "keep-alive");
series = Workflow::create_series_work(task, nullptr);
pwork->add_series(series);
}
pwork->start();
wait_group.wait();
EXPECT_TRUE(http_server1.start("127.0.0.1", 8001) == 0)
<< "http server start failed";
}
TEST(upstream_unittest, FuseAndRecover)
{
WFFacilities::WaitGroup wait_group(1);