skip tracing in WeightedRandomPolicy and update version 0.9.5

This commit is contained in:
holmes1412
2021-03-17 22:20:02 +08:00
parent d9db40fefd
commit 85344e2934
6 changed files with 166 additions and 65 deletions

View File

@@ -5,7 +5,7 @@ set(CMAKE_SKIP_RPATH TRUE)
project(
workflow
VERSION 0.9.4
VERSION 0.9.5
LANGUAGES C CXX
)

View File

@@ -36,8 +36,8 @@ public:
this->weight = 0;
}
const EndpointAddress *get_one();
const EndpointAddress *get_one_backup();
const EndpointAddress *get_one(WFNSTracing *tracing);
const EndpointAddress *get_one_backup(WFNSTracing *tracing);
public:
int id;
@@ -180,22 +180,19 @@ bool UPSGroupPolicy::select(const ParsedURI& uri, WFNSTracing *tracing,
}
// select_addr == NULL will only happened in consistent_hash
const EndpointAddress *select_addr = this->first_strategy(uri);
const EndpointAddress *select_addr = this->first_strategy(uri, tracing);
if (!select_addr || select_addr->fail_count >= select_addr->params->max_fails)
{
if (select_addr)
select_addr = this->check_and_get(select_addr, true);
select_addr = this->check_and_get(select_addr, true, tracing);
if (!select_addr && this->try_another)
{
select_addr = this->another_strategy(uri);
select_addr = this->check_and_get(select_addr, false);
}
select_addr = this->another_strategy(uri, tracing);
}
if (!select_addr)
select_addr = this->default_group->get_one_backup();
select_addr = this->default_group->get_one_backup(tracing);
pthread_rwlock_unlock(&this->rwlock);
@@ -208,10 +205,14 @@ bool UPSGroupPolicy::select(const ParsedURI& uri, WFNSTracing *tracing,
return false;
}
// flag true : guarantee addr != NULL, and please return an available one
// flag false : means addr maybe useful but want one any way. addr may be NULL
/*
* flag true : guarantee addr != NULL, and please return an available one.
* if no available addr, return NULL.
* false: addr may be NULL, means addr maybe useful but want one any way.
*/
inline const EndpointAddress *UPSGroupPolicy::check_and_get(const EndpointAddress *addr,
bool flag)
bool flag,
WFNSTracing *tracing)
{
UPSAddrParams *params = static_cast<UPSAddrParams *>(addr->params);
if (flag == true) // && addr->fail_count >= addr->params->max_fails
@@ -219,13 +220,13 @@ inline const EndpointAddress *UPSGroupPolicy::check_and_get(const EndpointAddres
if (params->group_id == -1)
return NULL;
return params->group->get_one();
return params->group->get_one(tracing);
}
if (addr && addr->fail_count >= addr->params->max_fails &&
params->group_id >= 0)
{
const EndpointAddress *tmp = params->group->get_one();
const EndpointAddress *tmp = params->group->get_one(tracing);
if (tmp)
addr = tmp;
}
@@ -233,7 +234,7 @@ inline const EndpointAddress *UPSGroupPolicy::check_and_get(const EndpointAddres
return addr;
}
const EndpointAddress *EndpointGroup::get_one()
const EndpointAddress *EndpointGroup::get_one(WFNSTracing *tracing)
{
if (this->nalives == 0)
return NULL;
@@ -244,7 +245,8 @@ const EndpointAddress *EndpointGroup::get_one()
std::shuffle(this->mains.begin(), this->mains.end(), this->gen);
for (size_t i = 0; i < this->mains.size(); i++)
{
if (this->mains[i]->fail_count < this->mains[i]->params->max_fails)
if (this->mains[i]->fail_count < this->mains[i]->params->max_fails &&
WFServiceGovernance::in_tracing(tracing, this->mains[i]) == false)
{
addr = this->mains[i];
break;
@@ -256,7 +258,8 @@ const EndpointAddress *EndpointGroup::get_one()
std::shuffle(this->backups.begin(), this->backups.end(), this->gen);
for (size_t i = 0; i < this->backups.size(); i++)
{
if (this->backups[i]->fail_count < this->backups[i]->params->max_fails)
if (this->backups[i]->fail_count < this->backups[i]->params->max_fails &&
WFServiceGovernance::in_tracing(tracing, this->backups[i]) == false)
{
addr = this->backups[i];
break;
@@ -268,7 +271,7 @@ const EndpointAddress *EndpointGroup::get_one()
return addr;
}
const EndpointAddress *EndpointGroup::get_one_backup()
const EndpointAddress *EndpointGroup::get_one_backup(WFNSTracing *tracing)
{
if (this->nalives == 0)
return NULL;
@@ -279,7 +282,8 @@ const EndpointAddress *EndpointGroup::get_one_backup()
std::shuffle(this->backups.begin(), this->backups.end(), this->gen);
for (size_t i = 0; i < this->backups.size(); i++)
{
if (this->backups[i]->fail_count < this->backups[i]->params->max_fails)
if (this->backups[i]->fail_count < this->backups[i]->params->max_fails &&
WFServiceGovernance::in_tracing(tracing, this->backups[i]) == false)
{
addr = this->backups[i];
break;
@@ -430,7 +434,7 @@ const EndpointAddress *UPSGroupPolicy::consistent_hash_with_group(unsigned int h
}
}
return this->check_and_get(addr, false);
return this->check_and_get(addr, false, NULL);
}
void UPSWeightedRandomPolicy::add_server_locked(EndpointAddress *addr)
@@ -461,19 +465,50 @@ int UPSWeightedRandomPolicy::remove_server_locked(const std::string& address)
return UPSGroupPolicy::remove_server_locked(address);
}
const EndpointAddress *UPSWeightedRandomPolicy::first_strategy(const ParsedURI& uri)
int UPSWeightedRandomPolicy::tracing_weight(WFNSTracing *tracing)
{
if (!tracing || !tracing->data)
return 0;
UPSAddrParams *params;
if (!tracing->deleter)
{
auto *server = (EndpointAddress *)tracing->data;
params = (UPSAddrParams *)server->params;
return params->weight;
}
int ret = 0;
auto *v = (std::vector<EndpointAddress *> *)(tracing->data);
for (auto *server : (*v))
{
params = (UPSAddrParams *)server->params;
ret += params->weight;
}
return ret;
}
const EndpointAddress *UPSWeightedRandomPolicy::first_strategy(const ParsedURI& uri,
WFNSTracing *tracing)
{
int x = 0;
int s = 0;
size_t idx;
UPSAddrParams *params;
int temp_weight = this->total_weight;
temp_weight -= UPSWeightedRandomPolicy::tracing_weight(tracing);
if (temp_weight > 0)
x = rand() % temp_weight;
for (idx = 0; idx < this->servers.size(); idx++)
{
if (WFServiceGovernance::in_tracing(tracing, this->servers[idx]))
continue;
params = static_cast<UPSAddrParams *>(this->servers[idx]->params);
s += params->weight;
if (s > x)
@@ -485,7 +520,8 @@ const EndpointAddress *UPSWeightedRandomPolicy::first_strategy(const ParsedURI&
return this->servers[idx];
}
const EndpointAddress *UPSWeightedRandomPolicy::another_strategy(const ParsedURI& uri)
const EndpointAddress *UPSWeightedRandomPolicy::another_strategy(const ParsedURI& uri,
WFNSTracing *tracing)
{
UPSAddrParams *params;
int temp_weight = this->available_weight;
@@ -500,7 +536,7 @@ const EndpointAddress *UPSWeightedRandomPolicy::another_strategy(const ParsedURI
{
if (this->is_alive_or_group_alive(server))
{
addr = server; // TODO: please double check here
addr = server;
params = static_cast<UPSAddrParams *>(server->params);
s += params->weight;
if (s > x)
@@ -508,7 +544,7 @@ const EndpointAddress *UPSWeightedRandomPolicy::another_strategy(const ParsedURI
}
}
return this->check_and_get(addr, false);
return this->check_and_get(addr, false, tracing);
}
void UPSWeightedRandomPolicy::recover_one_server(const EndpointAddress *addr)
@@ -535,7 +571,8 @@ void UPSWeightedRandomPolicy::fuse_one_server(const EndpointAddress *addr)
this->available_weight -= params->weight;
}
const EndpointAddress *UPSConsistentHashPolicy::first_strategy(const ParsedURI& uri)
const EndpointAddress *UPSConsistentHashPolicy::first_strategy(const ParsedURI& uri,
WFNSTracing *tracing)
{
unsigned int hash_value;
@@ -550,7 +587,8 @@ const EndpointAddress *UPSConsistentHashPolicy::first_strategy(const ParsedURI&
return this->consistent_hash_with_group(hash_value);
}
const EndpointAddress *UPSManualPolicy::first_strategy(const ParsedURI& uri)
const EndpointAddress *UPSManualPolicy::first_strategy(const ParsedURI& uri,
WFNSTracing *tracing)
{
unsigned int idx = this->manual_select(uri.path ? uri.path : "",
uri.query ? uri.query : "",
@@ -562,7 +600,8 @@ const EndpointAddress *UPSManualPolicy::first_strategy(const ParsedURI& uri)
return this->servers[idx];
}
const EndpointAddress *UPSManualPolicy::another_strategy(const ParsedURI& uri)
const EndpointAddress *UPSManualPolicy::another_strategy(const ParsedURI& uri,
WFNSTracing *tracing)
{
unsigned int hash_value;
@@ -576,3 +615,4 @@ const EndpointAddress *UPSManualPolicy::another_strategy(const ParsedURI& uri)
uri.fragment ? uri.fragment : "");
return this->consistent_hash_with_group(hash_value);
}

View File

@@ -76,7 +76,8 @@ protected:
virtual int remove_server_locked(const std::string& address);
const EndpointAddress *consistent_hash_with_group(unsigned int hash);
const EndpointAddress *check_and_get(const EndpointAddress *addr, bool flag);
const EndpointAddress *check_and_get(const EndpointAddress *addr,
bool flag, WFNSTracing *tracing);
bool is_alive_or_group_alive(const EndpointAddress *addr) const;
};
@@ -90,8 +91,10 @@ public:
this->available_weight = 0;
this->try_another = try_another;
}
const EndpointAddress *first_strategy(const ParsedURI& uri);
const EndpointAddress *another_strategy(const ParsedURI& uri);
const EndpointAddress *first_strategy(const ParsedURI& uri,
WFNSTracing *tracing);
const EndpointAddress *another_strategy(const ParsedURI& uri,
WFNSTracing *tracing);
protected:
int total_weight;
@@ -102,6 +105,7 @@ private:
virtual void fuse_one_server(const EndpointAddress *addr);
virtual void add_server_locked(EndpointAddress *addr);
virtual int remove_server_locked(const std::string& address);
static int tracing_weight(WFNSTracing *tracing);
};
class UPSConsistentHashPolicy : public UPSGroupPolicy
@@ -118,7 +122,8 @@ public:
}
protected:
const EndpointAddress *first_strategy(const ParsedURI& uri);
const EndpointAddress *first_strategy(const ParsedURI& uri,
WFNSTracing *tracing);
private:
upstream_route_t consistent_hash;
@@ -148,8 +153,10 @@ public:
this->try_another = try_another;
}
const EndpointAddress *first_strategy(const ParsedURI& uri);
const EndpointAddress *another_strategy(const ParsedURI& uri);
const EndpointAddress *first_strategy(const ParsedURI& uri,
WFNSTracing *tracing);
const EndpointAddress *another_strategy(const ParsedURI& uri,
WFNSTracing *tracing);
private:
upstream_route_t manual_select;

View File

@@ -166,6 +166,26 @@ void WFServiceGovernance::tracing_deleter(void *data)
delete (std::vector<EndpointAddress *> *)data;
}
bool WFServiceGovernance::in_tracing(WFNSTracing *tracing,
EndpointAddress *addr)
{
if (!tracing || !tracing->data)
return false;
if (!tracing->deleter)
return (EndpointAddress *)tracing->data == addr;
auto *v = (std::vector<EndpointAddress *> *)(tracing->data);
for (auto *server : (*v))
{
if (server == addr)
return true;
}
return false;
}
inline void WFServiceGovernance::recover_server_from_breaker(EndpointAddress *addr)
{
addr->fail_count = 0;
@@ -268,15 +288,17 @@ void WFServiceGovernance::check_breaker()
pthread_mutex_unlock(&this->breaker_lock);
}
const EndpointAddress *WFServiceGovernance::first_strategy(const ParsedURI& uri)
const EndpointAddress *WFServiceGovernance::first_strategy(const ParsedURI& uri,
WFNSTracing *tracing)
{
unsigned int idx = rand() % this->servers.size();
return this->servers[idx];
}
const EndpointAddress *WFServiceGovernance::another_strategy(const ParsedURI& uri)
const EndpointAddress *WFServiceGovernance::another_strategy(const ParsedURI& uri,
WFNSTracing *tracing)
{
return this->first_strategy(uri);
return this->first_strategy(uri, tracing);
}
bool WFServiceGovernance::select(const ParsedURI& uri, WFNSTracing *tracing,
@@ -299,13 +321,13 @@ bool WFServiceGovernance::select(const ParsedURI& uri, WFNSTracing *tracing,
}
// select_addr == NULL will only happened in consistent_hash
const EndpointAddress *select_addr = this->first_strategy(uri);
const EndpointAddress *select_addr = this->first_strategy(uri, tracing);
if (!select_addr ||
select_addr->fail_count >= select_addr->params->max_fails)
{
if (this->try_another)
select_addr = this->another_strategy(uri);
select_addr = this->another_strategy(uri, tracing);
}
pthread_rwlock_unlock(&this->rwlock);

View File

@@ -126,6 +126,7 @@ public:
virtual void server_list_change(const EndpointAddress *address, int state)
{}
void set_mttr_second(unsigned int second) { this->mttr_second = second; }
static bool in_tracing(WFNSTracing *tracing, EndpointAddress *addr);
public:
WFServiceGovernance() :
@@ -169,8 +170,10 @@ private:
unsigned int mttr_second;
protected:
virtual const EndpointAddress *first_strategy(const ParsedURI& uri);
virtual const EndpointAddress *another_strategy(const ParsedURI& uri);
virtual const EndpointAddress *first_strategy(const ParsedURI& uri,
WFNSTracing *tracing);
virtual const EndpointAddress *another_strategy(const ParsedURI& uri,
WFNSTracing *tracing);
void check_breaker();
static void tracing_deleter(void *data);

View File

@@ -28,22 +28,22 @@
#define MTTR 2
#define MAX_FAILS 200
static void __http_process1(WFHttpTask *task)
static void __http_process(WFHttpTask *task, const char *name)
{
auto *resp = task->get_resp();
resp->add_header_pair("Content-Type", "text/plain");
resp->append_output_body_nocopy("server1", 7);
resp->append_output_body_nocopy(name, strlen(name));
}
static void __http_process2(WFHttpTask *task)
{
auto *resp = task->get_resp();
resp->add_header_pair("Content-Type", "text/plain");
resp->append_output_body_nocopy("server2", 7);
}
WFHttpServer http_server1(__http_process1);
WFHttpServer http_server2(__http_process2);
WFHttpServer http_server1(std::bind(&__http_process,
std::placeholders::_1,
"server1"));
WFHttpServer http_server2(std::bind(&__http_process,
std::placeholders::_1,
"server2"));
WFHttpServer http_server3(std::bind(&__http_process,
std::placeholders::_1,
"server3"));
void register_upstream_hosts()
{
@@ -86,6 +86,20 @@ void register_upstream_hosts()
false, nullptr);
UpstreamManager::upstream_add_server("try_another", "127.0.0.1:8001");
UpstreamManager::upstream_add_server("try_another", "127.0.0.1:8002");
UpstreamManager::upstream_create_weighted_random("test_tracing", true);
address_params.weight = 1000;
UpstreamManager::upstream_add_server("test_tracing",
"127.0.0.1:8001",
&address_params);
address_params.weight = 1;
UpstreamManager::upstream_add_server("test_tracing",
"127.0.0.1:8002",
&address_params);
address_params.weight = 1000;
UpstreamManager::upstream_add_server("test_tracing",
"127.0.0.1:8003",
&address_params);
}
void basic_callback(WFHttpTask *task, std::string& message)
@@ -181,8 +195,7 @@ TEST(upstream_unittest, FuseAndRecover)
for (int i = 0; i < batch; i++)
{
task = WFTaskFactory::create_http_task(url, REDIRECT_MAX, RETRY_MAX,
nullptr);
task = WFTaskFactory::create_http_task(url, 0, 0, nullptr);
req = task->get_req();
req->add_header_pair("Connection", "keep-alive");
series = Workflow::create_series_work(task, nullptr);
@@ -241,26 +254,38 @@ TEST(upstream_unittest, TryAnother)
UpstreamManager::upstream_enable_server("try_another", "127.0.0.1:8001");
}
TEST(upstream_unittest, Cookies)
TEST(upstream_unittest, Tracing)
{
WFFacilities::WaitGroup wait_group(1);
WFFacilities::WaitGroup wait_group(2);
http_server1.stop();
//change manual
std::string url = "http://weighted.random";
WFHttpTask *task = WFTaskFactory::create_http_task(url, REDIRECT_MAX, RETRY_MAX,
[&wait_group, &url](WFHttpTask *task){
int state = task->get_state();
EXPECT_EQ(state, WFT_STATE_SYS_ERROR);
wait_group.done();
});
// test first_strategy()
WFHttpTask *task = WFTaskFactory::create_http_task(
"http://weighted.random",
REDIRECT_MAX, RETRY_MAX,
std::bind(basic_callback,
std::placeholders::_1,
std::string("server2")));
task->user_data = &wait_group;
task->start();
// test another_strategy()
UpstreamManager::upstream_disable_server("test_tracing",
"127.0.0.1:8003");
WFHttpTask *task2 = WFTaskFactory::create_http_task(
"http://test_tracing",
REDIRECT_MAX, RETRY_MAX,
std::bind(basic_callback,
std::placeholders::_1,
std::string("server2")));
task2->user_data = &wait_group;
task2->start();
wait_group.wait();
EXPECT_TRUE(http_server1.start("127.0.0.1", 8001) == 0)
<< "http server start failed";
UpstreamManager::upstream_enable_server("test_tracing", "127.0.0.1:8003");
}
int main(int argc, char* argv[])
@@ -274,11 +299,15 @@ int main(int argc, char* argv[])
EXPECT_TRUE(http_server2.start("127.0.0.1", 8002) == 0)
<< "http server start failed";
EXPECT_TRUE(http_server3.start("127.0.0.1", 8003) == 0)
<< "http server start failed";
EXPECT_EQ(RUN_ALL_TESTS(), 0);
http_server1.stop();
http_server2.stop();
http_server3.stop();
return 0;
}