add upstream_unittest.cc and fix check_get_strong()

This commit is contained in:
holmes1412
2021-01-31 03:47:03 +08:00
parent 1879c58da3
commit 1798a7913f
4 changed files with 274 additions and 19 deletions

View File

@@ -76,6 +76,8 @@ EndpointAddress::EndpointAddress(const std::string& address,
std::vector<std::string> arr = StringUtil::split(address, ':');
this->params = *address_params;
this->address = address;
this->list.next = NULL;
this->fail_count = 0;
static std::hash<std::string> std_hash;
for (int i = 0; i < VIRTUAL_GROUP_SIZE; i++)
@@ -421,10 +423,13 @@ bool UPSGroupPolicy::select(const ParsedURI& uri, EndpointAddress **addr)
if (!select_addr || select_addr->fail_count >= select_addr->params.max_fails)
{
if (select_addr)
select_addr = select_addr->group->get_one();
select_addr = this->check_and_get(select_addr, true);
if (!select_addr && this->try_another)
{
select_addr = this->another_stradegy(uri);
select_addr = this->check_and_get(select_addr, false);
}
}
if (!select_addr)
@@ -441,6 +446,30 @@ bool UPSGroupPolicy::select(const ParsedURI& uri, EndpointAddress **addr)
return false;
}
// flag true : guarantee addr != NULL, and please return an available one
// flag false : means addr maybe useful but want one any way. addr may be NULL
inline const EndpointAddress *UPSGroupPolicy::check_and_get(const EndpointAddress *addr,
bool flag)
{
if (flag == true) // && addr->fail_count >= addr->params.max_fails
{
if (addr->params.group_id == -1)
return NULL;
return addr->group->get_one();
}
if (addr && addr->fail_count >= addr->params.max_fails &&
addr->params.group_id >= 0)
{
const EndpointAddress *tmp = addr->group->get_one();
if (tmp)
addr = tmp;
}
return addr;
}
const EndpointAddress *EndpointGroup::get_one()
{
if (this->nalives == 0)
@@ -608,7 +637,7 @@ int UPSGroupPolicy::remove_server_locked(const std::string& address)
return ret;
}
const EndpointAddress *UPSGroupPolicy::consistent_hash_with_group(unsigned int hash) const
const EndpointAddress *UPSGroupPolicy::consistent_hash_with_group(unsigned int hash)
{
const EndpointAddress *addr = NULL;
unsigned int min_dis = (unsigned int)-1;
@@ -632,7 +661,7 @@ const EndpointAddress *UPSGroupPolicy::consistent_hash_with_group(unsigned int h
}
}
return this->check_and_get(addr);
return this->check_and_get(addr, false);
}
void UPSWeightedRandomPolicy::add_server_locked(EndpointAddress *addr)
@@ -701,7 +730,8 @@ const EndpointAddress *UPSWeightedRandomPolicy::another_stradegy(const ParsedURI
break;
}
}
return this->check_and_get(addr);
return this->check_and_get(addr, false);
}
void UPSWeightedRandomPolicy::recover_one_server(const EndpointAddress *addr)

View File

@@ -183,21 +183,8 @@ protected:
virtual void add_server_locked(EndpointAddress *addr);
virtual int remove_server_locked(const std::string& address);
const EndpointAddress *consistent_hash_with_group(unsigned int hash) const;
// check_get_weak
inline const EndpointAddress *check_and_get(const EndpointAddress *addr) const
{
if (addr && addr->fail_count >= addr->params.max_fails &&
addr->params.group_id >= 0)
{
const auto *ret = addr->group->get_one();
if (ret)
addr = ret;
}
return addr;
}
const EndpointAddress *consistent_hash_with_group(unsigned int hash);
const EndpointAddress *check_and_get(const EndpointAddress *addr, bool flag);
inline bool is_alive_or_group_alive(const EndpointAddress *addr) const
{

View File

@@ -37,6 +37,7 @@ set(TEST_LIST
facilities_unittest
graph_unittest
memory_unittest
upstream_unittest
)
if (APPLE)

237
test/upstream_unittest.cc Normal file
View File

@@ -0,0 +1,237 @@
/*
Copyright (c) 2020 Sogou, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Author: Li Yingxin (liyingxin@sogou-inc.com)
*/
#include <gtest/gtest.h>
#include "workflow/UpstreamManager.h"
#include "workflow/WFHttpServer.h"
#include "workflow/WFTaskFactory.h"
#include "workflow/WFFacilities.h"
#define REDIRECT_MAX 3
#define RETRY_MAX 3
#define MTTR 30
#define MAX_FAILS 200
static void __http_process1(WFHttpTask *task)
{
auto *resp = task->get_resp();
resp->add_header_pair("Content-Type", "text/plain");
resp->append_output_body_nocopy("server1", 7);
}
static void __http_process2(WFHttpTask *task)
{
auto *resp = task->get_resp();
resp->add_header_pair("Content-Type", "text/plain");
resp->append_output_body_nocopy("server2", 7);
}
WFHttpServer http_server1(__http_process1);
WFHttpServer http_server2(__http_process2);
void register_upstream_hosts()
{
UpstreamManager::upstream_create_weighted_random("weighted.random", false);
AddressParams address_params = ADDRESS_PARAMS_DEFAULT;
address_params.weight = 1000;
UpstreamManager::upstream_add_server("weighted.random",
"127.0.0.1:8001",
&address_params);
address_params.weight = 1;
UpstreamManager::upstream_add_server("weighted.random",
"127.0.0.1:8002",
&address_params);
UpstreamManager::upstream_create_consistent_hash(
"hash",
[](const char *path, const char *query, const char *fragment) -> unsigned int {
return 1;
});
UpstreamManager::upstream_add_server("hash", "127.0.0.1:8001");
UpstreamManager::upstream_add_server("hash", "127.0.0.1:8002");
UpstreamManager::upstream_create_manual(
"manual",
[](const char *path, const char *query, const char *fragment) -> unsigned int {
return 0;
},
false, nullptr);
UpstreamManager::upstream_add_server("manual", "127.0.0.1:8001");
UpstreamManager::upstream_add_server("manual", "127.0.0.1:8002");
UpstreamManager::upstream_create_weighted_random("try_another", true);
address_params.weight = 1000;
UpstreamManager::upstream_add_server("try_another",
"127.0.0.1:8001",
&address_params);
address_params.weight = 1;
UpstreamManager::upstream_add_server("try_another",
"127.0.0.1:8002",
&address_params);
}
void basic_callback(WFHttpTask *task, std::string& message)
{
auto state = task->get_state();
EXPECT_EQ(state, WFT_STATE_SUCCESS);
if (state == WFT_STATE_SUCCESS && message.compare(""))
{
const void *body;
size_t body_len;
task->get_resp()->get_parsed_body(&body, &body_len);
std::string buffer((char *)body, body_len);
EXPECT_EQ(buffer, message);
}
WFFacilities::WaitGroup *wait_group = (WFFacilities::WaitGroup *)task->user_data;
wait_group->done();
}
TEST(upstream_unittest, BasicPolicy)
{
WFFacilities::WaitGroup wait_group(3);
register_upstream_hosts();
char url[3][30] = {"http://weighted.random", "http://hash", "http://manual"};
http_callback_t cb = std::bind(basic_callback, std::placeholders::_1,
std::string("server1"));
for (int i = 0; i < 3; i++)
{
WFHttpTask *task = WFTaskFactory::create_http_task(url[i],
REDIRECT_MAX, RETRY_MAX, cb);
task->user_data = &wait_group;
task->start();
}
wait_group.wait();
}
TEST(upstream_unittest, EnableAndDisable)
{
WFFacilities::WaitGroup wait_group(1);
UpstreamManager::upstream_disable_server("weighted.random", "127.0.0.1:8001");
//fprintf(stderr, "disable server and try......................\n");
std::string url = "http://weighted.random";
WFHttpTask *task = WFTaskFactory::create_http_task(url, REDIRECT_MAX, RETRY_MAX,
[&wait_group, &url](WFHttpTask *task){
auto state = task->get_state();
EXPECT_EQ(state, WFT_STATE_TASK_ERROR);
EXPECT_EQ(task->get_error(), WFT_ERR_UPSTREAM_UNAVAILABLE);
UpstreamManager::upstream_enable_server("weighted.random", "127.0.0.1:8001");
//fprintf(stderr, "ensable server and try......................\n");
auto *task2 = WFTaskFactory::create_http_task(url, REDIRECT_MAX, RETRY_MAX,
std::bind(basic_callback,
std::placeholders::_1,
std::string("server1")));
task2->user_data = &wait_group;
series_of(task)->push_back(task2);
});
task->user_data = &wait_group;
task->start();
wait_group.wait();
}
TEST(upstream_unittest, FuseAndRecover)
{
WFFacilities::WaitGroup wait_group(1);
WFHttpTask *task;
SeriesWork *series;
protocol::HttpRequest *req;
std::string url = "http://weighted.random";
int batch = MAX_FAILS + 50;
int timeout = (MTTR + 3) * 1000000;
http_server1.stop();
fprintf(stderr, "server 1 stopped start %d tasks to fuse it\n", batch);
ParallelWork *pwork = Workflow::create_parallel_work(
[](const ParallelWork *pwork) {
fprintf(stderr, "parallel finished\n");
});
for (int i = 0; i < batch; i++)
{
task = WFTaskFactory::create_http_task(url, REDIRECT_MAX, RETRY_MAX,
nullptr);
req = task->get_req();
req->add_header_pair("Connection", "keep-alive");
series = Workflow::create_series_work(task, nullptr);
pwork->add_series(series);
}
series = Workflow::create_series_work(pwork, nullptr);
WFTimerTask *timer = WFTaskFactory::create_timer_task(timeout,
[](WFTimerTask *task) {
fprintf(stderr, "timer_finished and start server1\n");
EXPECT_TRUE(http_server1.start("127.0.0.1", 8001) == 0)
<< "http server start failed";
});
series->push_back(timer);
task = WFTaskFactory::create_http_task(url, REDIRECT_MAX, RETRY_MAX,
std::bind(basic_callback,
std::placeholders::_1,
std::string("server1")));
task->user_data = &wait_group;
series->push_back(task);
series->start();
wait_group.wait();
}
TEST(upstream_unittest, TryAnother)
{
WFFacilities::WaitGroup wait_group(1);
UpstreamManager::upstream_disable_server("try_another", "127.0.0.1:8001");
std::string url = "http://try_another";
WFHttpTask *task = WFTaskFactory::create_http_task(url, REDIRECT_MAX, RETRY_MAX,
std::bind(basic_callback,
std::placeholders::_1,
std::string("server2")));
task->user_data = &wait_group;
task->start();
wait_group.wait();
UpstreamManager::upstream_enable_server("try_another", "127.0.0.1:8001");
}
int main(int argc, char* argv[])
{
::testing::InitGoogleTest(&argc, argv);
EXPECT_TRUE(http_server1.start("127.0.0.1", 8001) == 0)
<< "http server start failed";
EXPECT_TRUE(http_server2.start("127.0.0.1", 8002) == 0)
<< "http server start failed";
EXPECT_EQ(RUN_ALL_TESTS(), 0);
http_server1.stop();
http_server2.stop();
return 0;
}