Merge pull request #920 from Barenboim/master

Add round-robin upstream policy
This commit is contained in:
xiehan
2022-05-26 20:40:06 +08:00
committed by GitHub
4 changed files with 89 additions and 28 deletions

View File

@@ -55,6 +55,22 @@ private:
std::vector<UPSGroupPolicy *> upstream_policies;
};
int UpstreamManager::upstream_create_round_robin(const std::string& name,
bool try_another)
{
WFNameService *ns = WFGlobal::get_name_service();
auto *policy = new UPSRoundRobinPolicy(try_another);
if (ns->add_policy(name.c_str(), policy) >= 0)
{
__UpstreamManager::get_instance()->add_upstream_policy(policy);
return 0;
}
delete policy;
return -1;
}
static unsigned int __default_consistent_hash(const char *path,
const char *query,
const char *fragment)
@@ -70,7 +86,7 @@ static unsigned int __default_consistent_hash(const char *path,
int UpstreamManager::upstream_create_consistent_hash(const std::string& name,
upstream_route_t consistent_hash)
{
auto *ns = WFGlobal::get_name_service();
WFNameService *ns = WFGlobal::get_name_service();
UPSConsistentHashPolicy *policy;
policy = new UPSConsistentHashPolicy(
@@ -89,8 +105,8 @@ int UpstreamManager::upstream_create_consistent_hash(const std::string& name,
int UpstreamManager::upstream_create_weighted_random(const std::string& name,
bool try_another)
{
auto *ns = WFGlobal::get_name_service();
UPSWeightedRandomPolicy *policy = new UPSWeightedRandomPolicy(try_another);
WFNameService *ns = WFGlobal::get_name_service();
auto *policy = new UPSWeightedRandomPolicy(try_another);
if (ns->add_policy(name.c_str(), policy) >= 0)
{
@@ -104,8 +120,8 @@ int UpstreamManager::upstream_create_weighted_random(const std::string& name,
int UpstreamManager::upstream_create_vnswrr(const std::string& name)
{
auto *ns = WFGlobal::get_name_service();
UPSWeightedRandomPolicy *policy = new UPSVNSWRRPolicy();
WFNameService *ns = WFGlobal::get_name_service();
auto *policy = new UPSVNSWRRPolicy();
if (ns->add_policy(name.c_str(), policy) >= 0)
{
@@ -122,7 +138,7 @@ int UpstreamManager::upstream_create_manual(const std::string& name,
bool try_another,
upstream_route_t consistent_hash)
{
auto *ns = WFGlobal::get_name_service();
WFNameService *ns = WFGlobal::get_name_service();
UPSManualPolicy *policy;
policy = new UPSManualPolicy(try_another, std::move(select),

View File

@@ -77,6 +77,21 @@
class UpstreamManager
{
public:
/**
* @brief MODE 0: round-robin select
* @param[in] name upstream name
* @param[in] try_another when first choice is failed, try another one or not
* @return success/fail
* @retval 0 success
* @retval -1 fail, more info see errno
* @note
* when first choose server is already down:
* - if try_another==false, request will be failed
* - if try_another==true, upstream will choose the next
*/
static int upstream_create_round_robin(const std::string& name,
bool try_another);
/**
* @brief MODE 1: consistent-hashing select
* @param[in] name upstream name

View File

@@ -165,7 +165,7 @@ bool UPSGroupPolicy::select(const ParsedURI& uri, WFNSTracing *tracing,
this->check_breaker();
// select_addr == NULL will only happened in consistent_hash
// select_addr == NULL will happen in consistent_hash
EndpointAddress *select_addr = this->first_strategy(uri, tracing);
if (!select_addr || select_addr->fail_count >= select_addr->params->max_fails)
@@ -331,8 +331,6 @@ void UPSGroupPolicy::add_server_locked(EndpointAddress *addr)
group->backups.push_back(addr);
pthread_mutex_unlock(&group->mutex);
this->server_list_change(addr, ADD_SERVER);
return;
}
int UPSGroupPolicy::remove_server_locked(const std::string& address)
@@ -448,6 +446,19 @@ void UPSGroupPolicy::hash_map_remove_addr(const std::string& address)
}
}
EndpointAddress *UPSRoundRobinPolicy::first_strategy(const ParsedURI& uri,
WFNSTracing *tracing)
{
return this->servers[this->cur_idx++ % this->servers.size()];
}
EndpointAddress *UPSRoundRobinPolicy::another_strategy(const ParsedURI& uri,
WFNSTracing *tracing)
{
EndpointAddress *addr = this->servers[this->cur_idx++ % this->servers.size()];
return this->check_and_get(addr, false, tracing);
}
void UPSWeightedRandomPolicy::add_server_locked(EndpointAddress *addr)
{
UPSAddrParams *params = static_cast<UPSAddrParams *>(addr->params);
@@ -455,8 +466,6 @@ void UPSWeightedRandomPolicy::add_server_locked(EndpointAddress *addr)
UPSGroupPolicy::add_server_locked(addr);
if (params->server_type == 0)
this->total_weight += params->weight;
return;
}
int UPSWeightedRandomPolicy::remove_server_locked(const std::string& address)
@@ -638,7 +647,6 @@ void UPSVNSWRRPolicy::add_server_locked(EndpointAddress *addr)
{
UPSWeightedRandomPolicy::add_server_locked(addr);
init();
return;
}
int UPSVNSWRRPolicy::remove_server_locked(const std::string& address)
@@ -662,8 +670,6 @@ void UPSConsistentHashPolicy::add_server_locked(EndpointAddress *addr)
{
UPSGroupPolicy::add_server_locked(addr);
this->hash_map_add_addr(addr);
return;
}
int UPSConsistentHashPolicy::remove_server_locked(const std::string& address)
@@ -702,8 +708,6 @@ void UPSManualPolicy::add_server_locked(EndpointAddress *addr)
if (this->try_another)
this->hash_map_add_addr(addr);
return;
}
int UPSManualPolicy::remove_server_locked(const std::string& address)

View File

@@ -23,13 +23,16 @@
#include <utility>
#include <map>
#include <vector>
#include <atomic>
#include <functional>
#include "URIParser.h"
#include "EndpointParams.h"
#include "WFNameService.h"
#include "WFServiceGovernance.h"
using upstream_route_t = std::function<unsigned int (const char *, const char *, const char *)>;
using upstream_route_t = std::function<unsigned int (const char *path,
const char *query,
const char *fragment)>;
class EndpointGroup;
class UPSGroupPolicy;
@@ -50,8 +53,9 @@ class UPSGroupPolicy : public WFServiceGovernance
{
public:
UPSGroupPolicy();
~UPSGroupPolicy();
virtual ~UPSGroupPolicy();
public:
virtual bool select(const ParsedURI& uri, WFNSTracing *tracing,
EndpointAddress **addr);
virtual void add_server(const std::string& address,
@@ -72,8 +76,8 @@ protected:
virtual void add_server_locked(EndpointAddress *addr);
virtual int remove_server_locked(const std::string& address);
EndpointAddress *check_and_get(EndpointAddress *addr,
bool addr_failed, WFNSTracing *tracing);
EndpointAddress *check_and_get(EndpointAddress *addr, bool addr_failed,
WFNSTracing *tracing);
bool is_alive(const EndpointAddress *addr) const;
@@ -86,6 +90,24 @@ protected:
std::map<unsigned int, EndpointAddress *> addr_hash;
};
class UPSRoundRobinPolicy : public UPSGroupPolicy
{
public:
UPSRoundRobinPolicy(bool try_another)
{
this->try_another = try_another;
}
protected:
virtual EndpointAddress *first_strategy(const ParsedURI& uri,
WFNSTracing *tracing);
virtual EndpointAddress *another_strategy(const ParsedURI& uri,
WFNSTracing *tracing);
protected:
std::atomic<size_t> cur_idx;
};
class UPSWeightedRandomPolicy : public UPSGroupPolicy
{
public:
@@ -95,10 +117,12 @@ public:
this->available_weight = 0;
this->try_another = try_another;
}
EndpointAddress *first_strategy(const ParsedURI& uri,
WFNSTracing *tracing);
EndpointAddress *another_strategy(const ParsedURI& uri,
WFNSTracing *tracing);
protected:
virtual EndpointAddress *first_strategy(const ParsedURI& uri,
WFNSTracing *tracing);
virtual EndpointAddress *another_strategy(const ParsedURI& uri,
WFNSTracing *tracing);
protected:
virtual void add_server_locked(EndpointAddress *addr);
@@ -120,8 +144,10 @@ public:
this->cur_idx = 0;
this->try_another = false;
};
EndpointAddress *first_strategy(const ParsedURI& uri,
WFNSTracing *tracing);
protected:
virtual EndpointAddress *first_strategy(const ParsedURI& uri,
WFNSTracing *tracing);
private:
virtual void add_server_locked(EndpointAddress *addr);
@@ -142,8 +168,8 @@ public:
}
protected:
EndpointAddress *first_strategy(const ParsedURI& uri,
WFNSTracing *tracing);
virtual EndpointAddress *first_strategy(const ParsedURI& uri,
WFNSTracing *tracing);
private:
virtual void add_server_locked(EndpointAddress *addr);