fix WFServiceGovernance's server ref bug

This commit is contained in:
XieHan
2021-10-30 16:31:13 +08:00
parent f7c0cfc5d3
commit fa325465bf
4 changed files with 53 additions and 75 deletions

View File

@@ -465,26 +465,15 @@ int UPSWeightedRandomPolicy::remove_server_locked(const std::string& address)
int UPSWeightedRandomPolicy::select_history_weight(WFNSTracing *tracing)
{
if (!tracing || !tracing->data)
struct TracingData *tracing_data = (struct TracingData *)tracing->data;
if (!tracing_data)
return 0;
UPSAddrParams *params;
if (!tracing->deleter)
{
auto *server = (EndpointAddress *)tracing->data;
params = (UPSAddrParams *)server->params;
return params->weight;
}
int ret = 0;
auto *v = (std::vector<EndpointAddress *> *)(tracing->data);
for (auto *server : (*v))
{
params = (UPSAddrParams *)server->params;
ret += params->weight;
}
for (EndpointAddress *server : tracing_data->history)
ret += ((UPSAddrParams *)server->params)->weight;
return ret;
}

View File

@@ -16,8 +16,8 @@
Authors: Wu Jiaxu (wujiaxu@sogou-inc.com)
*/
#ifndef _UPSTREAM_POLICIES_H_
#define _UPSTREAM_POLICIES_H_
#ifndef _UPSTREAMPOLICIES_H_
#define _UPSTREAMPOLICIES_H_
#include <utility>
#include <vector>

View File

@@ -14,6 +14,7 @@
limitations under the License.
Authors: Li Yingxin (liyingxin@sogou-inc.com)
Xie Han (xiehan@sogou-inc.com)
*/
#include <vector>
@@ -123,9 +124,9 @@ EndpointAddress::EndpointAddress(const std::string& address,
WFRouterTask *WFServiceGovernance::create_router_task(const struct WFNSParams *params,
router_callback_t callback)
{
WFNSTracing *tracing = params->tracing;
EndpointAddress *addr;
WFRouterTask *task;
WFNSTracing *tracing = params->tracing;
if (this->select(params->uri, tracing, &addr) &&
copy_host_port(params->uri, addr))
@@ -139,25 +140,16 @@ WFRouterTask *WFServiceGovernance::create_router_task(const struct WFNSParams *p
task = resolver->create(params, dns_cache_level, dns_ttl_default, dns_ttl_min,
endpoint_params, std::move(callback));
if (!tracing->data)
tracing->data = addr;
else
struct TracingData *tracing_data = (struct TracingData *)tracing->data;
if (!tracing_data)
{
std::vector<EndpointAddress *> *v;
if (!tracing->deleter)
{
EndpointAddress *last_addr = (EndpointAddress *)tracing->data;
v = new std::vector<EndpointAddress *>;
v->push_back(last_addr);
tracing->deleter = WFServiceGovernance::tracing_deleter;
tracing->data = v;
}
else
v = (std::vector<EndpointAddress *> *)tracing->data;
v->push_back(addr);
tracing_data = new TracingData;
tracing_data->sg = this;
tracing->data = tracing_data;
tracing->deleter = WFServiceGovernance::tracing_deleter;
}
tracing_data->history.push_back(addr);
}
else
task = new WFSelectorFailTask(std::move(callback));
@@ -167,21 +159,31 @@ WFRouterTask *WFServiceGovernance::create_router_task(const struct WFNSParams *p
void WFServiceGovernance::tracing_deleter(void *data)
{
delete (std::vector<EndpointAddress *> *)data;
struct TracingData *tracing_data = (struct TracingData *)data;
for (EndpointAddress *addr : tracing_data->history)
{
if (--addr->ref == 0)
{
pthread_rwlock_wrlock(&tracing_data->sg->rwlock);
tracing_data->sg->pre_delete_server(addr);
pthread_rwlock_unlock(&tracing_data->sg->rwlock);
delete addr;
}
}
delete tracing_data;
}
bool WFServiceGovernance::in_select_history(WFNSTracing *tracing,
EndpointAddress *addr)
{
if (!tracing || !tracing->data)
struct TracingData *tracing_data = (struct TracingData *)tracing->data;
if (!tracing_data)
return false;
if (!tracing->deleter)
return (EndpointAddress *)tracing->data == addr;
auto *v = (std::vector<EndpointAddress *> *)(tracing->data);
for (auto *server : (*v))
for (EndpointAddress *server : tracing_data->history)
{
if (server == addr)
return true;
@@ -234,24 +236,14 @@ void WFServiceGovernance::success(RouteManager::RouteResult *result,
WFNSTracing *tracing,
CommTarget *target)
{
EndpointAddress *server;
if (tracing->deleter)
{
auto *v = (std::vector<EndpointAddress *> *)(tracing->data);
server = (*v)[v->size() - 1];
}
else
server = (EndpointAddress *)tracing->data;
struct TracingData *tracing_data = (struct TracingData *)tracing->data;
auto *v = &tracing_data->history;
EndpointAddress *server = (*v)[v->size() - 1];
pthread_rwlock_wrlock(&this->rwlock);
this->recover_server_from_breaker(server);
if (--server->ref == 0)
{
this->pre_delete_server(server);
delete server;
}
pthread_rwlock_unlock(&this->rwlock);
this->WFNSPolicy::success(result, tracing, target);
}
@@ -259,26 +251,15 @@ void WFServiceGovernance::failed(RouteManager::RouteResult *result,
WFNSTracing *tracing,
CommTarget *target)
{
EndpointAddress *server;
if (tracing->deleter)
{
auto *v = (std::vector<EndpointAddress *> *)(tracing->data);
server = (*v)[v->size() - 1];
}
else
server = (EndpointAddress *)tracing->data;
struct TracingData *tracing_data = (struct TracingData *)tracing->data;
auto *v = &tracing_data->history;
EndpointAddress *server = (*v)[v->size() - 1];
pthread_rwlock_wrlock(&this->rwlock);
if (++server->fail_count == server->params->max_fails)
this->fuse_server_to_breaker(server);
if (--server->ref == 0)
{
this->pre_delete_server(server);
delete server;
}
pthread_rwlock_unlock(&this->rwlock);
this->WFNSPolicy::failed(result, tracing, target);
}

View File

@@ -14,10 +14,11 @@
limitations under the License.
Authors: Li Yingxin (liyingxin@sogou-inc.com)
Xie Han (xiehan@sogou-inc.com)
*/
#ifndef _SERVICE_GOVERNANCE_H_
#define _SERVICE_GOVERNANCE_H_
#ifndef _WFSERVICEGOVERNANCE_H_
#define _WFSERVICEGOVERNANCE_H_
#include <pthread.h>
#include <unordered_map>
@@ -174,6 +175,13 @@ protected:
WFNSTracing *tracing);
void check_breaker();
void pre_delete_server(EndpointAddress *addr);
struct TracingData
{
std::vector<EndpointAddress *> history;
WFServiceGovernance *sg;
};
static void tracing_deleter(void *data);
std::vector<EndpointAddress *> servers;